前言:上周临近下班时,同事叫我帮忙压测一个接口,要求压测时间 2 小时,每秒请求数为 50。 当时百度了一下,知道设置压测时间,但是每秒请求数未设置好,今天得空,查了一下,做下总结。有不足的地方,欢迎各位大佬指正。
#!usr/bin/env python
# -*- coding:utf-8 _*-
# noinspection SpellCheckingInspection
""
@author:zx
@file: locust_lian_deng.py
@time: 2024/1/4 10:39
# @describe: Locust - API进件压测
"""
import os
import subprocess
from locust import HttpUser, FastHttpUser, task, between, SequentialTaskSet
from common.Sigin_Encryption import SignEncryption
from utils.RandomString import RandomStr
class QuickstartUser(SequentialTaskSet):
# wait_time = between(1, 3)
def on_start(self):
# self.client.verify = False # 禁用SSL证书验证
print("开始测试")
@task
def credential_stuffing(self):
""" 撞库 """
phone_number = '131' + RandomStr().random_number(8)
print("credential_stuffing-撞库手机号", phone_number)
crm_appkey = 'xxxxxxxxxxx'
crm_secretKey = 'xxxxxxxxxx'
name = '测试'
idcard = 'xxxxxxxxxxx'
header_data = {
'Content-Type': 'application/json'
}
sign = SignEncryption()
apics = sign.credential_stuffing_sign(name=name, sex=1, mobile=phone_number,
secretKey=crm_secretKey, idcard=idcard)
data = {
"appkey": crm_appkey,
"apics": apics,
"version": "2.0"
}
# print("撞库参数", data)
response = self.client.post("/api/hitLibrary", headers=header_data, json=data, name="撞库")
print("撞库返回值:", response.text)
return phone_number
# @task
def crm_order_incoming(self):
""" API进件 """
phone_number = self.credential_stuffing()
crm_appkey = 'xxxxx'
crm_secretKey = 'xxxx'
print("crm_order_incoming-进件手机号", phone_number)
sex = 1
age = 27
realName = '测试'
header_data = {
'Content-Type': 'application/json'
}
sign = SignEncryption()
apics = sign.crm_order_incoming_sign(secretKey=crm_secretKey, phone=phone_number,
sex=sex, age=age,realName=realName)
data = {
"appkey": crm_appkey,
"apics": apics,
"version": "2.0"
}
# print("进件参数", data)
response = self.client.post("/api/intoOrder", headers=header_data, json=data, name="进件")
print("进件返回值:", response.text)
def on_stop(self):
print("结束测试")
class WebsiteUser(FastHttpUser):
tasks = [QuickstartUser]
host = 'https://xxxxx.xxxxxx.com'
min_wait = 1000
max_wait = 3000
if __name__ == '__main__':
"""测试服地址, locust默认端口:8089 """
# 设置多进程、运行时间、运行每秒启动的虚拟用户数
command = [
"locust",
"-f",
"locust_lian_deng.py",
"--processes",
"4",
"--run-time",
"2h",
"--hatch-rate",
"10"
]
subprocess.run(command, check=True)
1. 脚本说明:本次测试接口为:credential_stuffing-撞库接口
2. 参数说明:
- -f locust_lian_deng.py: 指定了要运行的 Locust 文件
- --processes 4: 指定要使用的进程数
- --run-time 2h: 指定测试运行的时间
- --hatch-rate 10: 指定每秒钟要生成的用户数量
- subprocess.run(command, check=True):执行名为 command 的子进程,并在执行完成后检查返回码。如果返回码不为零,将引发异常。
3. 运行的效果 (注意点:Number of users 需要设置成:50):
4. 问题: 每秒请求数:50-70 左右。如何让请求数维持在 50 左右。