1、locust 可以调用 request 的封装接口(比如登录)
2、对 locust 请求数进行部分控制优化
3、可以作为快速造数据的一种手段
4、locust 适配版本为 v1.1(0 版本需要自己修改一下相关代码参数与部分逻辑)
1、对 request 进行二次封装,同时实现 locust 代码记录转换
2、依据 locust 底层采用了 request 的 session,因此替换 session 即可(RequestsBasic)
import time
import urllib3
import requests
import threading
import sys
from collections import namedtuple
import traceback
from utils import logger
from utils.logger import log #此处的log 可以直接用系统的 logging
class RequestsBasic(object):
urllib3.disable_warnings() # 去掉验证证书告警问题
#相关类变量:
locust_start_time=None #统计 locust 开始发送请求的时间
#默认headers,可以设置更改
default_headers = {
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Accept-Encoding": "gzip, deflate, br",
"Accept": "application/json, text/plain, */*",
"Connection": "keep-alive",
"User-Agent":
"Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.157 Safari/537.36"}
def __init__(self,http_client_session=None):
self.http_client_session = http_client_session or requests.Session()
self.verify = False #默认设置是否需要进证书校验,默认不需要(主要https用)
self.requests = requests #默认模块为一个对象,留作扩展
self.headers = self.get_default_headers()
self.proxies = {'http': 'http://127.0.0.1:8888',
'https': 'https://127.0.0.1:8888'}#这个与使用抓包工具的代理有关如:我的fildder 用的是8888 端口
self.proxies_open=0 #默认关闭不使用代理
self.locust_client_dict:dict={} #根据线程id存放不同的客户端user.client
self.locust_count =0 #统计该进程运行时 发送 locust http请求个数
self.runtimes=1 # 运行次数,非locust请求时 增加失败重发机制
# locust_response_json_assert 针对locust的响应进行处理其是否成功或者失败,可另外进行扩展,比如加入大于小于等于等进行判断
self.locust_response_json_assert={}
self.locust_name_flag = 1 # 对locust 请求时 设置 name参数设置,否则不设置url的名字
def get_locust_client_dict(self,locust_client):
"""
替换 request中 的session 为单独用户的 locust_client,在locust 中的 on_start 方法中调用
:param locust_client: 在locust 中一般为 self.client
eg:my_request.get_locust_client_dict(self.client)
:return: 返回 self 该类的实例化本身,因为此处在每个线程,每个user.client是有细微差异方便对该类实例化对象进行设置
"""
"""
:return:
"""
thread_id = threading.current_thread().ident # 记录当前线程id
self.locust_client_dict[thread_id] = locust_client # 注意设置此项,且保证 my_request 是同一个不可变的对象
return self #注意此处,返回的不同 RequestsBasic 对象
#默认设置请求的 Content-Type
def get_default_headers(self,content_type=1,default_headers=None):
"""
获取默认的请求头
:param content_type:
:param default_headers:
:return:
"""
headers ={
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Accept-Encoding": "gzip, deflate, br",
"Accept": "application/json, text/plain, */*",
"Connection": "keep-alive",
"User-Agent":
"Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.157 Safari/537.36"}
if default_headers !=None:
headers=default_headers
if content_type ==-1:
pass #无 Content-Type 属性
elif content_type ==0:
headers['Content-Type']="application/json; charset=UTF-8"
elif content_type==1:
headers['Content-Type'] = "application/x-www-form-urlencoded; charset=UTF-8"
elif content_type ==2:
headers['Content-Type'] = "multipart/form-data"
return headers
def request(self, method, url, locust_client=None, locust_response_json_assert_dict:dict=None,**kwargs):
"""
支持request请求,和locust请求(以locust* 开头的参数都是locust特有)
:param method: #methods= ('post','put','get','delete') #可以参考 request的request方法
:param locust_client: locust的客户端,或者设置 self.locust_client_dict 对象
:param locust_response_json_assert_dict: locust时对返回的response 参数进行特殊断言
:param url:
:param
:param kwargs:
:return:
"""
#、设置返回相关参数,设置一个namedtuple对象,进行一些参数封装
request_meta = namedtuple('RequestsMeta',
['start_time', 'elapsed_time', 'response', 'content_size', 'status_code'])
request_meta.start_time = time.time() #统计时间
param_info = {"method": method, "url": url, "kwargs": kwargs} # 异常处理时调用参数
#print("locust_response_json_assert_dict:", locust_response_json_assert_dict,kwargs)
# 开始发起http/https请求
try:
#1、处理请求头
if 'verify' not in kwargs.keys():
kwargs['verify'] = self.verify
if 'headers' not in kwargs.keys():
kwargs['headers'] = self.headers
# urllib3.disable_warnings() # 去掉验证证书告警问题
#2、处理locust和普通请求
if locust_client == None and len(self.locust_client_dict)== 0: #普通请求
#处理代理
if self.proxies_open == 1:
# "proxies" in kwargs.keys() and kwargs['proxies'] != None
kwargs['proxies']=self.proxies
log.warning("此处使用了代理协议请检查(若未开代理可能会失败!)"
"proxies:{}".format(kwargs['proxies']))
#支持多次发送
if 'name' in kwargs.keys():
log.error("【注意】name 参数一般不在 kwargs参数中,locust时特用!目前已废弃,请注意检查!")
try:
runtimes = 1 #普通请求,多次尝试计数
while runtimes <= self.runtimes:
response = self.http_client_session.request(method, url, **kwargs)
# log.info(runtimes,self.runtimes,response.status_code)
# print("runtimes:", runtimes,response.status_code)
if response.status_code != 200 and self.runtimes > 1:
elapsed_time = int((time.time() - request_meta.start_time) * 1000) # 毫秒
log.warning("""runtimes:{}
,耗时:{} ms,返回状态码:{}""(!=200),请检查请求:\n{}\n【返回值】:{}
""".format(runtimes, elapsed_time,response.status_code, param_info, response.text))
time.sleep(runtimes * 2)
request_meta.start_time = time.time() # 重新记录开始时间
else:
break
runtimes += 1 # 自增运行的次数
except Exception as e:
log.warning("runtimes:{} 运行失败,请检查:{}".format(runtimes,traceback.format_exc()))
#其他处理
#log.debug("session:{}".format(response.cookies))
# 异常处理
if response.status_code != 200:
log.warning("""返回状态码:{}(!=200),
请检查请求:\n{}\n【返回值】:{}
""".format(response.status_code, param_info,response.text))
else: # 走locust
self.locust_count +=1 #统计locust发送的请求次数
if self.locust_count <=1:
self.locust_start_time= time.time() #统计从locust发送的第一个请求开始计时
thread_id = threading.current_thread().ident # 获取当前线程id
#拿取 locust_client
if locust_client != None:
#使用自己本身
self.locust_client_dict[thread_id]=locust_client # 获取当前线程的 user.client
else:
locust_client = self.locust_client_dict[thread_id]
if self.locust_name_flag ==1 and "?" in url:
if 'name' not in kwargs:
url_pre = url.split("?")[0]
kwargs['name'] = url_pre
log.warning("对locust的 name(url) 参数进行特殊为:{}".format(url))
# 发送请求: 注意 catch_response
elapsed_time_total = int((time.time() - self.locust_start_time) * 1000)
locust_count=self.locust_count #locust 当前统计的次数
print('{}次请求中!总耗时(ms):'.format(locust_count),elapsed_time_total,"线程id:{}".format(thread_id),url, "调试打印locust_response_json_assert_dict:",
locust_response_json_assert_dict)
with locust_client.request(method, url,catch_response=True, **kwargs) as response:
if hasattr(response, 'elapsed'):
elapsed=response.elapsed
elapsed_time_total = int((time.time() - self.locust_start_time) * 1000)
print("{}次请求完!总耗时(ms):".format(locust_count),elapsed_time_total,'本次耗时:',elapsed,"线程id:{}".format(thread_id),url,)
else:
elapsed=''
if response.status_code < 400:
response.success()
# 处理json统计接口是否失败
#进行 locust_response_json_assert_dict 特殊处理
if "locust_response_json_assert_dict" in kwargs.keys():
locust_response_json_assert_dict = kwargs['locust_response_json_assert_dict']
# print("request:locust_response_json_assert_dict:",locust_response_json_assert_dict,kwargs)
if locust_response_json_assert_dict != None:
try:
data_dict = response.json()
for k, v in locust_response_json_assert_dict.items():
if k in data_dict and data_dict[k] == locust_response_json_assert_dict[k]:
response.success()
else:
response.failure('json检查k,v:{} 失败!{}'.format((k, v), response.text))
except Exception as e:
print("处理locust_response_json_assert_dict异常:\n", traceback.format_exc())
finally:
pass
else:
#注意此处
response.failure('elapsed:{} 状态码:{} 失败!{}'.format(0,response.status_code,response.text))
# response.r
response.encoding = 'utf-8'
# response.content.decode("unicode_escape")
request_meta.elapsed_time = int((time.time() - request_meta.start_time) * 1000) # 毫秒
request_meta.status_code = response.status_code
request_meta.response = response
request_meta.content_size = int(response.headers.get("content-length") or 0)
except Exception as e:
my_request.request_except_deal(e, traceback.format_exc(),**kwargs)
raise e
# log.error("【错误详细信息】:\n" + traceback.format_exc())
return request_meta
def request_except_deal(self,e,traceback,result='',**kwargs):
"""
#处理接口异常信息
:param e: 捕获到的异常类
:param traceback: 异常相关信息
:return: 异常类型
"""
try:
except_type = type(e)
if type(result)!=str:
text = result.response.text
else:
text=""
log.warning("text:{}\n抛出异常类型为:{},异常信息为:{}\n参数为:{}".format(text,except_type,traceback,kwargs))
except Exception as e:
print("处理request报错异常(print):",traceback.format_exc())
return except_type
#实例化一个默认对象
my_request=RequestsBasic()
if __name__ == "__main__":
print("开始测试",my_request.headers)
#!/usr/bin/env python
# coding:utf-8
"""
@author:YuanPengCheng
@PROJECT_NAME:autoTest
@file:UT_basic.py
@createtime:2020/8/6 9:22
@software: PyCharm
@description:
"系统性能框架测试模块 unittest(UT)"
"""
import traceback
import random
# sys.path.append(os.getcwd())
from common.http import requests_basic #核心代码 requests_basic.py
from utils.logger import log
from utils.testTools import tools #一些常用小工具封装,可以自己去替换
my_request = requests_basic.RequestsBasic() # 初始化一个 http请求,防止不同的对象的请求混乱
my_request.headers = my_request.get_default_headers(content_type=0) # 设置请求头的类型
# proxies_my = my_request.proxies # debug时 是否开启代理抓包模式
proxies_my=None
class UTBasic():
def __init__(self):
self.my_request = my_request
self.url_basic='' #:9900
self.ut_path= "/UT" # self.ut_path='' ,默认走网关
pass
#获取服务器时间(即空接口测试)
def getGlobalTime(self,**kwargs):
"""
获取服务器时间(即空接口测试)
:param kwargs:
:return:
"""
try:
# /dp_server/api/connect/list
url = self.url_basic + "{}/mock/testRecordData/getGlobalTime".format(self.ut_path)
# searchContent
payload = {}
for k, v in kwargs.items(): # 对函数关键字进行处理,
payload[k] = v
result = my_request.request('get', url=url, data=payload,**kwargs)
log.info(url,result.elapsed_time,'ms ' ,result.response.text)
# data_dict = result.response.json()
# return data_dict
except Exception as e:
my_request.request_except_deal(e, traceback.format_exc(), result)
raise e
#
#Mysql数据批量保存接口
def mysql_saveList(self,dataSize=20,**kwargs):
"""
Mysql数据批量保存接口
:param kwargs:
:return:
"""
try:
# /dp_server/api/connect/list
url = self.url_basic + "{}/mock/testRecordData/saveList?dataSize={}".format(self.ut_path,dataSize)
# searchContent
payload ={}
for k, v in kwargs.items(): # 对函数关键字进行处理,
payload[k] = v
result = my_request.request('get', url=url, params=payload,**kwargs)
print(result.elapsed_time,'ms ' ,url,result.response.elapsed,url,result.response.text)
# data_dict = result.response.json()
# return data_dict
except Exception as e:
my_request.request_except_deal(e, traceback.format_exc(), result)
raise e
#
#跨服务调用空接口,深度和层次
def crossServiceLoopByDeep(self,callCount=1,deep=5,**kwargs):
"""
Mysql数据批量保存接口
callCount:调用链路号(Int),用于区分调用的次数,每次调用可递增
deep:跨服务调用深度(Int)
:param kwargs:
:return:
"""
try:
print("(callCount,deep):",(callCount,deep))
# /dp_server/api/connect/list
url = self.url_basic + "{}/mock/testRecordData/crossServiceLoopByDeep?callCount={}&deep={}".format(self.ut_path,callCount,deep)
# searchContent
payload = {}
for k, v in kwargs.items(): # 对函数关键字进行处理,
payload[k] = v
result = my_request.request('get', url=url, params=payload,**kwargs)
print(result.elapsed_time,'ms ' ,(callCount,deep),url,result.response.text)
# data_dict = result.response.json()
# return data_dict
except Exception as e:
my_request.request_except_deal(e, traceback.format_exc(), result)
raise e
#
#mysql 高级查询
def search(self,filter: str, pageIndex=0,pageSize=10,sortField='', sortOrder='', **kwargs):
"""
:param filter: 是一个 字符串的 list[dict]
[{"operation":"like","alias":"","dataType":"","fieldname":"majorName","name":"majorName","value":"06191455%"}]
:param kwargs:
:return:
"""
try:
# /dp_server/api/connect/list
url = self.url_basic + "{}/mock/testRecordData/search".format(self.ut_path)
# searchContent
payload = {'filter': filter,
# [{"operation":"like","alias":"","dataType":"","fieldname":"indexInGroup","name":"indexInGroup","value":"19%"}]
'pageIndex': pageIndex,
'pageSize': pageSize,
'sortField': sortField,
'sortOrder': sortOrder}
result = my_request.request('post', url=url, data=payload,headers=my_request.get_default_headers(1),proxies=proxies_my,**kwargs)
print(result.elapsed_time,'ms ' ,url,"状态码:",result.status_code,result.response.elapsed,url,"response.text 数据:\n",result.response.text,"data参数为:\n{}".format(payload))
# print(result.response.text)
data_dict = result.response.json()
# print("1data_dict:",data_dict)
print("search data_dict:\n", "total:", data_dict['total'], "pageSize:", data_dict['pageSize'],
"totalPages:", data_dict['totalPages'])
return data_dict
except Exception as e:
my_request.request_except_deal(e, traceback.format_exc(), result)
raise e
#
#图数据库保存
def saveGraphData(self,dataGroupId:str,linkedSize:int,linkedNum:int, **kwargs):
"""
:param filter: 是一个 字符串的 list[dict]
:param kwargs:
:return:
"""
try:
# /dp_server/api/connect/list
#url = self.url_basic + "{}/mock/testGraphData/saveGraphData?dataGroupId={}&linkedSize={}&linkedNum={}".format(self.ut_path,dataGroupId,linkedSize,linkedNum)
url_basic = self.url_basic + "{}/mock/testGraphData/saveGraphData".format(self.ut_path)
url=url_basic+"?dataGroupId={}&linkedSize={}&linkedNum={}".format(dataGroupId,linkedSize,linkedNum)
#?dataGroupId=0001&linkedSize=20&linkedNum=30
payload = {"dataGroupId":dataGroupId,
"linkedSize":linkedSize,
"linkedNum":linkedNum,
}
result = my_request.request('get', url=url, data={},headers=my_request.get_default_headers(1),**kwargs)
print(result.elapsed_time,'ms ' ,url,"\nsaveGraphData text:",[result.response.text])
return result
except Exception as e:
my_request.request_except_deal(e, traceback.format_exc(), result)
raise e
#
#图数据库查询
def findListByNeo4j(self,startLable:str,maxLevel:int,targetLable:str, **kwargs):
"""
:param startLable: ‘Tag’_dataGroupId_编号 是一个组合参数
:param maxLevel: #与保存时的 linkedSize tag生成的列数有关,几乎相等,可以小于它,但注意数据关联
:param targetLable: ‘Tag’_dataGroupId_编号 是一个组合参数,注意与 startLable maxLevel 进行数据关联
:param kwargs:
:return:
"""
try:
# /dp_server/api/connect/list
#url = self.url_basic + f"{self.ut_path}/mock/testGraphData/findListByNeo4j?startLable={startLable}&maxLevel={maxLevel}&targetLable={targetLable}"
url_basic = self.url_basic + f"{self.ut_path}/mock/testGraphData/findListByNeo4j"
url=url_basic+f"?startLable={startLable}&maxLevel={maxLevel}&targetLable={targetLable}"
#/mock/testGraphData/findListByNeo4j?startLable=Tag_0001_0&maxLevel=20&targetLable=Tag_0001_19
# searchContent
payload = {"startLable":startLable,
"maxLevel":maxLevel,
"targetLable":targetLable}
result = my_request.request('get', url=url, data={},headers=my_request.get_default_headers(1))
print(result.elapsed_time,'ms ' ,url,"\n",[result.response.text])
except Exception as e:
my_request.request_except_deal(e, traceback.format_exc(), result)
raise e
#
#分布式事务验证接口
def testDistriTransaction(self,dataGroupId:str='555555555', **kwargs):
"""
:param dataGroupId 数据集ID。验证如果表中不存在该dataGroupId对应的数据即生效
:param kwargs:
:return:
"""
try:
# /dp_server/api/connect/list
url_basic = self.url_basic + f"{self.ut_path}/mock/testRecordData/testDistriTransaction"
url = url_basic + f"?dataGroupId={dataGroupId}"
#:9900/UT/mock/testRecordData/testDistriTransaction?dataGroupId=555555555
payload = {}
result = my_request.request('get', url=url, data=payload,headers=my_request.get_default_headers(1))
print(result.elapsed_time,'ms ' ,url,"\n",[result.response.text])
except Exception as e:
my_request.request_except_deal(e, traceback.format_exc(), result)
raise e
#
#默认生成一个公共对象
utb=UTBasic()
if __name__ == "__main__":
log.set_logpath("/UT/basic/")
utb.my_request.proxies_open=1 #调试走协议
utb.url_basic='http://10.2.1.150:9900'
#utb.url_basic = 'http://10.2.1.95:9900'
# utb.getGlobalTime()
# utb.crossServiceLoopByDeep()
#搜索
def test_search():
search_value = '196'
print("开始搜索:{}".format(search_value))
filter = [
{"operation": "like", "alias": "", "dataType": "", "fieldname": "indexInGroup", "name": "indexInGroup",
"value": "{}%".format(search_value)}]
filter = str(filter).replace("'", '"')
data_dict = utb.search(filter=filter)
print(data_dict)
def testDistriTransaction():
print("开始分布式事务调用")
dataGroupId='555555555'
utb.testDistriTransaction(dataGroupId=dataGroupId)
locust_response_json_assert_dict = {"success": True}
filter = [
{"operation": "like", "alias": "", "dataType": "", "fieldname": "dataGroupId", "name": "dataGroupId",
"value": "{}%".format(dataGroupId)}]
filter = str(filter).replace("'", '"')
data_dict = utb.search(filter=filter,locust_response_json_assert_dict=locust_response_json_assert_dict)
print('data_dict:',data_dict)
def test_saveGraphData():
print("开始保存图数据库:")
timestamp=tools.TimeDate.get_timestamp('%d%H%M%S%f')
dataGroupId = 'roc{}'.format(timestamp)
linkedSize=5
result=utb.saveGraphData(dataGroupId=dataGroupId,linkedSize=linkedSize,linkedNum=10)
startLable='Tag_'+dataGroupId+"_0"
maxLevel=random.randint(2,linkedSize)
targetLable='Tag_'+dataGroupId+"_{}".format(maxLevel-1)
#time.sleep(1)
utb.findListByNeo4j(startLable,maxLevel,targetLable)
def test_crossServiceLoopByDeep():
flag=random.randint(1,10)
data_dict = utb.crossServiceLoopByDeep(callCount=1,deep=random.randint(1,10))
print(data_dict)
#search()
test_search()
from locust import SequentialTaskSet, HttpUser, between, task, tag
from locust import events
from locust.contrib.fasthttp import FastHttpUser
import os, sys
import random
from locust import stats
##修改环境变量路径
sys_path_root = os.getcwd().split('scripts')[0]
sys_path_scripts = sys_path_root + "scripts" #因整个工程有个框架结构 scripts,把此处加入到环境变量,保证locust 命令cmd中可用
sys.path.extend((sys_path_root, sys_path_scripts))
from utils.logger import log
from scripts.WisdomTraining.cases.locustFile.ut.UT_basic import utb,my_request
#from .UT_basic import utb,my_request #同级下,
from locust import events
from gevent._semaphore import Semaphore
all_locusts_spawned = Semaphore() #对协程进行管理
all_locusts_spawned.acquire()
def on_hatch_complete(environment,user_count=100,**kwargs):
"""
对启动用户进行集合点设置
适用于场景如:总用户1000,每秒启动用户数1000(注意user_count参数)
:param environment: 当前进程的环境 可以用用 self.user.environment 拿取
:param user_count: 当前启动的线程(用户数)大于user_count 时才会发起http请求(启动时设置集合点)
:param kwargs:
:return:
"""
userCount=environment.runner.user_count
log.info("当前启动用户数:{}".format(userCount))
if userCount>=user_count:
all_locusts_spawned.release() #创建钩子方法
print("[环境已准备好]当前启动用户数:", userCount)
else:
all_locusts_spawned.wait()
class LocustCount():
"""
统计locust 相关使用,一种数据共享方式
"""
times=0
class TaskSetUT(SequentialTaskSet):
# 类继承SequentialTaskSet 或 TaskSet类
# 当类里面的任务请求有先后顺序时,继承SequentialTaskSet类,
# 没有先后顺序,可以使用继承TaskSet类
pageIndex = 0
pageSize = 20
def on_start(self):
[print(('on_start:',s)) for s in [self.user.environment.stats.total]]
my_request_self=my_request.get_locust_client_dict(self.client) #替换reqeust的 session 为locust_client的session
my_request_self.locust_name_flag=0 #设置locust 请求的 name 属性参数,url中带问号
#log.info("共享数据的环境:",self.user.environment)
log.info("共享数据的环境 stats:", self.user.environment.stats.total)
#log.info("开始统计数据 stats_history_csv:", stats.stats_history_csv(self.user.environment))
#utb.wt_b.login_WT()
on_hatch_complete(self.user.environment, user_count=1) #启动时设置1000个用户已经准备好
log.info("开始测试了,当前用户为:", self.user.environment.runner.user_count)
def on_stop(self):
[print('on_stop:',s) for s in [self.user.environment.stats.total]]
pass
@task(1)
def test_RecordData(self):
dataSize = random.randint(1, 100)
result=utb.mysql_saveList(dataSize)
@task(1)
def test_getGlobalTime(self):
result = utb.getGlobalTime()
@task(1)
@tag("leave_1")
def test_search(self):
search_value = random.randint(1,10)
filter = [
{"operation": "like", "alias": "", "dataType": "", "fieldname": "indexInGroup", "name": "indexInGroup",
"value": "{}%".format(search_value)}]
filter = str(filter).replace("'", '"')
pageIndex = self.pageIndex
pageSize = self.pageSize
data_dict = utb.search(filter=filter, pageIndex=pageIndex, pageSize=pageSize)
self.pageIndex += 1
self.pageSize *= self.pageIndex
try:
if data_dict['totalPages'] <= pageIndex + 1:
self.pageIndex = 0
if pageSize >= data_dict['total']:
self.pageSize = random.randint(pageIndex, data_dict['total'])
except Exception as e:
print(str(e))
self.pageIndex = 0
self.pageSize = 20
print('{} test_search :/n'.format(pageIndex), self.pageIndex, self.pageSize)
@task(2)
def test_search_new(self):
filter =''
data_dict = utb.search(filter=filter, pageIndex=0, pageSize=200)
@task(1)
def test_crossServiceLoopByDeep(self):
LocustCount.times+=1
callCount=LocustCount.times
flag=random.randint(1,10)
data_dict = utb.crossServiceLoopByDeep(callCount=callCount,deep=random.randint(1,10))
# This is the HttpUser class.
class UserBehavior(HttpUser): # HttpUser FastHttpUser 注意此处,若需走fasthttp 该类继承 FastHttpUser 就ok了,但是有可能会出问题(请自己检测)
tasks = [TaskSetUT]
times = 1
wait_time = between(0, 0)
network_timeout = 600
connection_timeout = 600
print("UserBehavior:测试")
share_data = ['url1', 'url2', 'url3', 'url4', 'url5']
if __name__ == '__main__':
import subprocess,os
#log.delete_log_report_file()
#utb.url_basic = 'http://10.2.1.95:9900'
utb.url_basic = 'http://10.2.1.150:9900'
#utb.url_basic = 'http://10.2.1.150:8533'
path = os.path.dirname(os.path.abspath(__file__))
file_name = os.path.split(__file__)[-1].split(".")[0]
default_url=utb.url_basic
print(path,file_name)
print("开始运行wt") # --no-web -c 2 -r 1 -t 3s
#locust -f --headless -u 1000 -r 100 --run-time 1h30m --step-load --step-users 300 --step-time 20m
# subprocess.call(
# 'locust -f {}/{}.py -u 1 -r 1 -t 10 -l --csv-full-history'.format(path,file_name, default_url),
# shell=True)
cmd ='locust -f {}/{}.py --host={} --web-host="127.0.0.1" --web-port 8090 --csv CSV_{}'.format(path,file_name, default_url,file_name)
print("cmd:",cmd)
dd=subprocess.call(cmd,shell=True) ##--web-port 8090
#d=subprocess.check_call('locust -f {}/{}.py --worker'.format(path,file_name, default_url),shell=True)
# d1=subprocess.check_call('locust -f {}/{}.py --worker'.format(path,file_name, default_url),shell=True)
d2 = subprocess.call('locust -f {}/{}.py --master --host={} --web-host="127.0.0.1" --web-port 8090 --csv CSV_{}'.format(path,file_name,default_url,file_name),shell=True)
1、分布式数据不太好统计
2、要多台电脑运行,是否可以用一个脚本或者工具进行远程控制(一键命令式)
3、对 fashHttp 支持不够友好,有时候会报错(继承 HttpUser 的类改为 FastHttpUser 就行)
4、请大家在使用中进一步提出更好的建议
####6.1 说明
欢迎大家提建议转载分享,请大家带上原文链接,改文目前仅发表于 TesterHome
0、官方文档
https://docs.locust.io/en/stable/installation.html
1、添加集合点:
https://testerhome.com/topics/12126
2、数据的几种方式
https://blog.csdn.net/gogoboi_jin/article/details/79229570
https://www.cnblogs.com/changqing8023/p/9563364.html
3、数据的使用
https://debugtalk.com/post/head-first-locust-advanced-script/
4、locust 原理介绍 与源码
https://www.cnblogs.com/belle-ls/p/10487597.html
https://cloud.tencent.com/developer/article/1506669
https://testerhome.com/topics/11829
5、统计性能数据 与优化
http://www.testqa.cn/article/detail/241
http://www.testqa.cn/article/detail/236
6、停止 locust
https://stackoverflow.com/questions/60163341/stop-locust-after-specific-number-of-requests