#!usr/bin/env python
# -*- coding:utf-8 _*-
# noinspection SpellCheckingInspection
""
@author:zx
@file: locust_lian_deng.py
@time: 2024/1/4 10:39
# @describe: Locust - API进件压测
"""
import os
import subprocess
from locust import HttpUser, FastHttpUser, task, between, SequentialTaskSet
from common.Sigin_Encryption import SignEncryption
from utils.RandomString import RandomStr
class QuickstartUser(SequentialTaskSet):
# wait_time = between(1, 3)
def on_start(self):
# self.client.verify = False # 禁用SSL证书验证
print("开始测试")
@task
def credential_stuffing(self):
""" 撞库 """
phone_number = '131' + RandomStr().random_number(8)
print("credential_stuffing-撞库手机号", phone_number)
crm_appkey = 'xxxxxxxxxxx'
crm_secretKey = 'xxxxxxxxxx'
name = '测试'
idcard = 'xxxxxxxxxxx'
header_data = {
'Content-Type': 'application/json'
}
sign = SignEncryption()
apics = sign.credential_stuffing_sign(name=name, sex=1, mobile=phone_number,
secretKey=crm_secretKey, idcard=idcard)
data = {
"appkey": crm_appkey,
"apics": apics,
"version": "2.0"
}
# print("撞库参数", data)
response = self.client.post("/api/hitLibrary", headers=header_data, json=data, name="撞库")
print("撞库返回值:", response.text)
return phone_number
# @task
def crm_order_incoming(self):
""" API进件 """
phone_number = self.credential_stuffing()
crm_appkey = 'xxxxx'
crm_secretKey = 'xxxx'
print("crm_order_incoming-进件手机号", phone_number)
sex = 1
age = 27
realName = '测试'
header_data = {
'Content-Type': 'application/json'
}
sign = SignEncryption()
apics = sign.crm_order_incoming_sign(secretKey=crm_secretKey, phone=phone_number,
sex=sex, age=age,realName=realName)
data = {
"appkey": crm_appkey,
"apics": apics,
"version": "2.0"
}
# print("进件参数", data)
response = self.client.post("/api/intoOrder", headers=header_data, json=data, name="进件")
print("进件返回值:", response.text)
def on_stop(self):
print("结束测试")
class WebsiteUser(FastHttpUser):
tasks = [QuickstartUser]
host = 'https://xxxxx.xxxxxx.com'
min_wait = 1000
max_wait = 3000
if __name__ == '__main__':
"""测试服地址, locust默认端口:8089 """
# 设置多进程、运行时间、运行每秒启动的虚拟用户数
command = [
"locust",
"-f",
"locust_lian_deng.py",
"--processes",
"4",
"--run-time",
"2h",
"--hatch-rate",
"10"
]
subprocess.run(command, check=True)