接口测试 request 接口自动化测试脚本转化为 locust 性能测试脚本

roc · August 18, 2020 · Last by 乐观的星辰 replied at September 29, 2020 · 12770 hits

request接口自动化测试脚本转化为locust性能测试脚本

一、目标说明

1、locust 可以调用request的封装接口(比如登录)
2、对locust请求数进行部分控制优化
3、可以作为快速造数据的一种手段
4、locust适配版本为 v1.1(0版本需要自己修改一下相关代码参数与部分逻辑)

二、实现原理

1、对request进行二次封装,同时实现locust代码记录转换
2、依据locust底层采用了request的session,因此替换session即可(RequestsBasic)

三、核心代码逻辑

3.1 requests_basic.py request二次封装,核心转化模块

import time
import urllib3
import requests
import threading
import sys
from collections import namedtuple
import traceback
from utils import logger
from utils.logger import log #此处的log 可以直接用系统的 logging


class RequestsBasic(object):
urllib3.disable_warnings() # 去掉验证证书告警问题
#相关类变量:
locust_start_time=None #统计 locust 开始发送请求的时间
#默认headers,可以设置更改
default_headers = {
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Accept-Encoding": "gzip, deflate, br",
"Accept": "application/json, text/plain, */*",
"Connection": "keep-alive",
"User-Agent":
"Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.157 Safari/537.36"}

def __init__(self,http_client_session=None):
self.http_client_session = http_client_session or requests.Session()
self.verify = False #默认设置是否需要进证书校验,默认不需要(主要https用)
self.requests = requests #默认模块为一个对象,留作扩展
self.headers = self.get_default_headers()
self.proxies = {'http': 'http://127.0.0.1:8888',
'https': 'https://127.0.0.1:8888'}#这个与使用抓包工具的代理有关如:我的fildder 用的是8888 端口
self.proxies_open=0 #默认关闭不使用代理
self.locust_client_dict:dict={} #根据线程id存放不同的客户端user.client
self.locust_count =0 #统计该进程运行时 发送 locust http请求个数
self.runtimes=1 # 运行次数,非locust请求时 增加失败重发机制
# locust_response_json_assert 针对locust的响应进行处理其是否成功或者失败,可另外进行扩展,比如加入大于小于等于等进行判断
self.locust_response_json_assert={}
self.locust_name_flag = 1 # locust 请求时 设置 name参数设置,否则不设置url的名字





def get_locust_client_dict(self,locust_client):
"""
替换 request中 的session 为单独用户的 locust_client,在locust 中的 on_start 方法中调用
:param locust_client: 在locust 中一般为 self.client
eg:my_request.get_locust_client_dict(self.client)
:return: 返回 self 该类的实例化本身,因为此处在每个线程,每个user.client是有细微差异方便对该类实例化对象进行设置
"""

"""

:return:
"""

thread_id = threading.current_thread().ident # 记录当前线程id
self.locust_client_dict[thread_id] = locust_client # 注意设置此项,且保证 my_request 是同一个不可变的对象

return self #注意此处,返回的不同 RequestsBasic 对象

#默认设置请求的 Content-Type
def get_default_headers(self,content_type=1,default_headers=None):
"""
获取默认的请求头
:param content_type:
:param default_headers:
:return:
"""


headers ={
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Accept-Encoding": "gzip, deflate, br",
"Accept": "application/json, text/plain, */*",
"Connection": "keep-alive",
"User-Agent":
"Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.157 Safari/537.36"}
if default_headers !=None:
headers=default_headers

if content_type ==-1:
pass #无 Content-Type 属性
elif content_type ==0:
headers['Content-Type']="application/json; charset=UTF-8"
elif content_type==1:
headers['Content-Type'] = "application/x-www-form-urlencoded; charset=UTF-8"
elif content_type ==2:
headers['Content-Type'] = "multipart/form-data"

return headers

def request(self, method, url, locust_client=None, locust_response_json_assert_dict:dict=None,**kwargs):
"""
支持request请求,和locust请求(以locust* 开头的参数都是locust特有)

:param method: #methods= ('post','put','get','delete') #可以参考 request的request方法
:param locust_client: locust的客户端,或者设置 self.locust_client_dict 对象
:param locust_response_json_assert_dict: locust时对返回的response 参数进行特殊断言
:param url:
:param
:param kwargs:
:return:

"""

#、设置返回相关参数,设置一个namedtuple对象,进行一些参数封装
request_meta = namedtuple('RequestsMeta',
['start_time', 'elapsed_time', 'response', 'content_size', 'status_code'])
request_meta.start_time = time.time() #统计时间
param_info = {"method": method, "url": url, "kwargs": kwargs} # 异常处理时调用参数
#print("locust_response_json_assert_dict:", locust_response_json_assert_dict,kwargs)

# 开始发起http/https请求
try:
#1、处理请求头
if 'verify' not in kwargs.keys():
kwargs['verify'] = self.verify
if 'headers' not in kwargs.keys():
kwargs['headers'] = self.headers
# urllib3.disable_warnings() # 去掉验证证书告警问题

#2、处理locust和普通请求
if locust_client == None and len(self.locust_client_dict)== 0: #普通请求
#处理代理
if self.proxies_open == 1:
# "proxies" in kwargs.keys() and kwargs['proxies'] != None
kwargs['proxies']=self.proxies
log.warning("此处使用了代理协议请检查(若未开代理可能会失败!)"
"proxies:{}".format(kwargs['proxies']))
#支持多次发送
if 'name' in kwargs.keys():
log.error("【注意】name 参数一般不在 kwargs参数中,locust时特用!目前已废弃,请注意检查!")


try:
runtimes = 1 #普通请求,多次尝试计数
while runtimes <= self.runtimes:
response = self.http_client_session.request(method, url, **kwargs)
# log.info(runtimes,self.runtimes,response.status_code)
# print("runtimes:", runtimes,response.status_code)
if response.status_code != 200 and self.runtimes > 1:
elapsed_time = int((time.time() - request_meta.start_time) * 1000) # 毫秒
log.warning("""runtimes:{}
,耗时:{} ms,返回状态码:{}""(!=200),请检查请求:\n{}\n【返回值】:{}
"""
.format(runtimes, elapsed_time,response.status_code, param_info, response.text))
time.sleep(runtimes * 2)
request_meta.start_time = time.time() # 重新记录开始时间
else:
break

runtimes += 1 # 自增运行的次数


except Exception as e:
log.warning("runtimes:{} 运行失败,请检查:{}".format(runtimes,traceback.format_exc()))
#其他处理
#log.debug("session:{}".format(response.cookies))
# 异常处理
if response.status_code != 200:
log.warning("""返回状态码:{}(!=200),
请检查请求:\n{}\n【返回值】:{}
"""
.format(response.status_code, param_info,response.text))


else: # locust
self.locust_count +=1 #统计locust发送的请求次数
if self.locust_count <=1:
self.locust_start_time= time.time() #统计从locust发送的第一个请求开始计时

thread_id = threading.current_thread().ident # 获取当前线程id
#拿取 locust_client
if locust_client != None:
#使用自己本身
self.locust_client_dict[thread_id]=locust_client # 获取当前线程的 user.client
else:
locust_client = self.locust_client_dict[thread_id]

if self.locust_name_flag ==1 and "?" in url:
if 'name' not in kwargs:
url_pre = url.split("?")[0]
kwargs['name'] = url_pre
log.warning("对locust的 name(url) 参数进行特殊为:{}".format(url))


# 发送请求: 注意 catch_response
elapsed_time_total = int((time.time() - self.locust_start_time) * 1000)
locust_count=self.locust_count #locust 当前统计的次数
print('{}次请求中!总耗时(ms):'.format(locust_count),elapsed_time_total,"线程id:{}".format(thread_id),url, "调试打印locust_response_json_assert_dict:",
locust_response_json_assert_dict)
with locust_client.request(method, url,catch_response=True, **kwargs) as response:

if hasattr(response, 'elapsed'):
elapsed=response.elapsed
elapsed_time_total = int((time.time() - self.locust_start_time) * 1000)
print("{}次请求完!总耗时(ms):".format(locust_count),elapsed_time_total,'本次耗时:',elapsed,"线程id:{}".format(thread_id),url,)
else:
elapsed=''

if response.status_code < 400:
response.success()
# 处理json统计接口是否失败
#进行 locust_response_json_assert_dict 特殊处理
if "locust_response_json_assert_dict" in kwargs.keys():
locust_response_json_assert_dict = kwargs['locust_response_json_assert_dict']
# print("request:locust_response_json_assert_dict:",locust_response_json_assert_dict,kwargs)

if locust_response_json_assert_dict != None:
try:
data_dict = response.json()
for k, v in locust_response_json_assert_dict.items():
if k in data_dict and data_dict[k] == locust_response_json_assert_dict[k]:
response.success()
else:
response.failure('json检查k,v{} 失败!{}'.format((k, v), response.text))
except Exception as e:
print("处理locust_response_json_assert_dict异常:\n", traceback.format_exc())
finally:
pass

else:
#注意此处
response.failure('elapsed:{} 状态码:{} 失败!{}'.format(0,response.status_code,response.text))




# response.r
response.encoding = 'utf-8'
# response.content.decode("unicode_escape")
request_meta.elapsed_time = int((time.time() - request_meta.start_time) * 1000) # 毫秒
request_meta.status_code = response.status_code
request_meta.response = response
request_meta.content_size = int(response.headers.get("content-length") or 0)



except Exception as e:
my_request.request_except_deal(e, traceback.format_exc(),**kwargs)
raise e
# log.error("【错误详细信息】:\n" + traceback.format_exc())

return request_meta

def request_except_deal(self,e,traceback,result='',**kwargs):
"""
#处理接口异常信息
:param e: 捕获到的异常类
:param traceback: 异常相关信息
:return: 异常类型
"""

try:
except_type = type(e)
if type(result)!=str:
text = result.response.text
else:
text=""

log.warning("text:{}\n抛出异常类型为:{},异常信息为:{}\n参数为:{}".format(text,except_type,traceback,kwargs))
except Exception as e:
print("处理request报错异常(print):",traceback.format_exc())

return except_type




#实例化一个默认对象
my_request=RequestsBasic()

if __name__ == "__main__":

print("开始测试",my_request.headers)

四、使用示例

4.1 ut_basic.py 对http请求的一个封装,可以跑接口测试,也可以跑locust 性能测试


#!/usr/bin/env python
# coding:utf-8
"""
@author:YuanPengCheng
@PROJECT_NAME:autoTest
@file:UT_basic.py
@createtime:2020/8/6 9:22
@software: PyCharm
@description:
"系统性能框架测试模块 unittest(UT)"

"""
import traceback
import random

# sys.path.append(os.getcwd())
from common.http import requests_basic #核心代码 requests_basic.py

from utils.logger import log
from utils.testTools import tools #一些常用小工具封装,可以自己去替换


my_request = requests_basic.RequestsBasic() # 初始化一个 http请求,防止不同的对象的请求混乱
my_request.headers = my_request.get_default_headers(content_type=0) # 设置请求头的类型
# proxies_my = my_request.proxies # debug时 是否开启代理抓包模式
proxies_my=None

class UTBasic():
def __init__(self):
self.my_request = my_request
self.url_basic='' #:9900
self.ut_path= "/UT" # self.ut_path='' ,默认走网关
pass

#获取服务器时间(即空接口测试)
def getGlobalTime(self,**kwargs):
"""
获取服务器时间(即空接口测试)
:param kwargs:
:return:
"""
try:
# /dp_server/api/connect/list
url = self.url_basic + "{}/mock/testRecordData/getGlobalTime".format(self.ut_path)
# searchContent
payload = {}

for k, v in kwargs.items(): # 对函数关键字进行处理,
payload[k] = v

result = my_request.request('get', url=url, data=payload,**kwargs)
log.info(url,result.elapsed_time,'ms ' ,result.response.text)
# data_dict = result.response.json()
# return data_dict
except Exception as e:
my_request.request_except_deal(e, traceback.format_exc(), result)
raise e
#

#Mysql数据批量保存接口
def mysql_saveList(self,dataSize=20,**kwargs):
"""
Mysql数据批量保存接口
:param kwargs:
:return:
"""
try:
# /dp_server/api/connect/list
url = self.url_basic + "{}/mock/testRecordData/saveList?dataSize={}".format(self.ut_path,dataSize)
# searchContent
payload ={}

for k, v in kwargs.items(): # 对函数关键字进行处理,
payload[k] = v

result = my_request.request('get', url=url, params=payload,**kwargs)
print(result.elapsed_time,'ms ' ,url,result.response.elapsed,url,result.response.text)
# data_dict = result.response.json()
# return data_dict
except Exception as e:
my_request.request_except_deal(e, traceback.format_exc(), result)
raise e
#


#跨服务调用空接口,深度和层次
def crossServiceLoopByDeep(self,callCount=1,deep=5,**kwargs):
"""
Mysql数据批量保存接口
callCount:调用链路号(Int),用于区分调用的次数,每次调用可递增
deep:跨服务调用深度(Int)
:param kwargs:
:return:
"""
try:
print("(callCount,deep):",(callCount,deep))
# /dp_server/api/connect/list
url = self.url_basic + "{}/mock/testRecordData/crossServiceLoopByDeep?callCount={}&deep={}".format(self.ut_path,callCount,deep)
# searchContent
payload = {}

for k, v in kwargs.items(): # 对函数关键字进行处理,
payload[k] = v

result = my_request.request('get', url=url, params=payload,**kwargs)
print(result.elapsed_time,'ms ' ,(callCount,deep),url,result.response.text)
# data_dict = result.response.json()
# return data_dict
except Exception as e:
my_request.request_except_deal(e, traceback.format_exc(), result)
raise e
#

#mysql 高级查询
def search(self,filter: str, pageIndex=0,pageSize=10,sortField='', sortOrder='', **kwargs):
"""
:param filter: 是一个 字符串的 list[dict]
[{"operation":"like","alias":"","dataType":"","fieldname":"majorName","name":"majorName","value":"06191455%"}]
:param kwargs:
:return:
"""
try:
# /dp_server/api/connect/list
url = self.url_basic + "{}/mock/testRecordData/search".format(self.ut_path)
# searchContent
payload = {'filter': filter,
# [{"operation":"like","alias":"","dataType":"","fieldname":"indexInGroup","name":"indexInGroup","value":"19%"}]
'pageIndex': pageIndex,
'pageSize': pageSize,
'sortField': sortField,
'sortOrder': sortOrder}
result = my_request.request('post', url=url, data=payload,headers=my_request.get_default_headers(1),proxies=proxies_my,**kwargs)
print(result.elapsed_time,'ms ' ,url,"状态码:",result.status_code,result.response.elapsed,url,"response.text 数据:\n",result.response.text,"data参数为:\n{}".format(payload))

# print(result.response.text)
data_dict = result.response.json()
# print("1data_dict:",data_dict)
print("search data_dict:\n", "total:", data_dict['total'], "pageSize:", data_dict['pageSize'],
"totalPages:", data_dict['totalPages'])

return data_dict
except Exception as e:
my_request.request_except_deal(e, traceback.format_exc(), result)
raise e
#


#图数据库保存
def saveGraphData(self,dataGroupId:str,linkedSize:int,linkedNum:int, **kwargs):
"""
:param filter: 是一个 字符串的 list[dict]
:param kwargs:
:return:
"""
try:
# /dp_server/api/connect/list
#url = self.url_basic + "{}/mock/testGraphData/saveGraphData?dataGroupId={}&linkedSize={}&linkedNum={}".format(self.ut_path,dataGroupId,linkedSize,linkedNum)
url_basic = self.url_basic + "{}/mock/testGraphData/saveGraphData".format(self.ut_path)
url=url_basic+"?dataGroupId={}&linkedSize={}&linkedNum={}".format(dataGroupId,linkedSize,linkedNum)
#?dataGroupId=0001&linkedSize=20&linkedNum=30
payload = {"dataGroupId":dataGroupId,
"linkedSize":linkedSize,
"linkedNum":linkedNum,
}
result = my_request.request('get', url=url, data={},headers=my_request.get_default_headers(1),**kwargs)
print(result.elapsed_time,'ms ' ,url,"\nsaveGraphData text:",[result.response.text])

return result



except Exception as e:
my_request.request_except_deal(e, traceback.format_exc(), result)
raise e
#


#图数据库查询
def findListByNeo4j(self,startLable:str,maxLevel:int,targetLable:str, **kwargs):
"""
:param startLable: ‘Tag’_dataGroupId_编号 是一个组合参数
:param maxLevel: #与保存时的 linkedSize tag生成的列数有关,几乎相等,可以小于它,但注意数据关联
:param targetLable: ‘Tag’_dataGroupId_编号 是一个组合参数,注意与 startLable maxLevel 进行数据关联
:param kwargs:
:return:
"""
try:
# /dp_server/api/connect/list
#url = self.url_basic + f"{self.ut_path}/mock/testGraphData/findListByNeo4j?startLable={startLable}&maxLevel={maxLevel}&targetLable={targetLable}"
url_basic = self.url_basic + f"{self.ut_path}/mock/testGraphData/findListByNeo4j"
url=url_basic+f"?startLable={startLable}&maxLevel={maxLevel}&targetLable={targetLable}"
#/mock/testGraphData/findListByNeo4j?startLable=Tag_0001_0&maxLevel=20&targetLable=Tag_0001_19
# searchContent
payload = {"startLable":startLable,
"maxLevel":maxLevel,
"targetLable":targetLable}
result = my_request.request('get', url=url, data={},headers=my_request.get_default_headers(1))
print(result.elapsed_time,'ms ' ,url,"\n",[result.response.text])



except Exception as e:
my_request.request_except_deal(e, traceback.format_exc(), result)
raise e
#

#分布式事务验证接口
def testDistriTransaction(self,dataGroupId:str='555555555', **kwargs):
"""
:param dataGroupId 数据集ID。验证如果表中不存在该dataGroupId对应的数据即生效
:param kwargs:
:return:
"""
try:
# /dp_server/api/connect/list
url_basic = self.url_basic + f"{self.ut_path}/mock/testRecordData/testDistriTransaction"
url = url_basic + f"?dataGroupId={dataGroupId}"
#:9900/UT/mock/testRecordData/testDistriTransaction?dataGroupId=555555555
payload = {}
result = my_request.request('get', url=url, data=payload,headers=my_request.get_default_headers(1))
print(result.elapsed_time,'ms ' ,url,"\n",[result.response.text])



except Exception as e:
my_request.request_except_deal(e, traceback.format_exc(), result)
raise e
#






#默认生成一个公共对象
utb=UTBasic()

if __name__ == "__main__":
log.set_logpath("/UT/basic/")
utb.my_request.proxies_open=1 #调试走协议
utb.url_basic='http://10.2.1.150:9900'
#utb.url_basic = 'http://10.2.1.95:9900'
# utb.getGlobalTime()
# utb.crossServiceLoopByDeep()
#搜索
def test_search():
search_value = '196'
print("开始搜索:{}".format(search_value))

filter = [
{"operation": "like", "alias": "", "dataType": "", "fieldname": "indexInGroup", "name": "indexInGroup",
"value": "{}%".format(search_value)}]
filter = str(filter).replace("'", '"')
data_dict = utb.search(filter=filter)
print(data_dict)

def testDistriTransaction():
print("开始分布式事务调用")
dataGroupId='555555555'
utb.testDistriTransaction(dataGroupId=dataGroupId)
locust_response_json_assert_dict = {"success": True}
filter = [
{"operation": "like", "alias": "", "dataType": "", "fieldname": "dataGroupId", "name": "dataGroupId",
"value": "{}%".format(dataGroupId)}]
filter = str(filter).replace("'", '"')
data_dict = utb.search(filter=filter,locust_response_json_assert_dict=locust_response_json_assert_dict)
print('data_dict:',data_dict)


def test_saveGraphData():
print("开始保存图数据库:")
timestamp=tools.TimeDate.get_timestamp('%d%H%M%S%f')
dataGroupId = 'roc{}'.format(timestamp)
linkedSize=5
result=utb.saveGraphData(dataGroupId=dataGroupId,linkedSize=linkedSize,linkedNum=10)
startLable='Tag_'+dataGroupId+"_0"
maxLevel=random.randint(2,linkedSize)

targetLable='Tag_'+dataGroupId+"_{}".format(maxLevel-1)
#time.sleep(1)
utb.findListByNeo4j(startLable,maxLevel,targetLable)

def test_crossServiceLoopByDeep():
flag=random.randint(1,10)
data_dict = utb.crossServiceLoopByDeep(callCount=1,deep=random.randint(1,10))
print(data_dict)



#search()
test_search()

4.2 requests_basic.py request二次封装,核心转化模块

from locust import SequentialTaskSet, HttpUser, between, task, tag
from locust import events
from locust.contrib.fasthttp import FastHttpUser
import os, sys
import random
from locust import stats

##修改环境变量路径
sys_path_root = os.getcwd().split('scripts')[0]
sys_path_scripts = sys_path_root + "scripts" #因整个工程有个框架结构 scripts,把此处加入到环境变量,保证locust 命令cmd中可用
sys.path.extend((sys_path_root, sys_path_scripts))

from utils.logger import log
from scripts.WisdomTraining.cases.locustFile.ut.UT_basic import utb,my_request
#from .UT_basic import utb,my_request #同级下,



from locust import events
from gevent._semaphore import Semaphore
all_locusts_spawned = Semaphore() #对协程进行管理
all_locusts_spawned.acquire()
def on_hatch_complete(environment,user_count=100,**kwargs):
"""
对启动用户进行集合点设置
适用于场景如:总用户1000,每秒启动用户数1000(注意user_count参数)
:param environment: 当前进程的环境 可以用用 self.user.environment 拿取
:param user_count: 当前启动的线程(用户数)大于user_count 时才会发起http请求(启动时设置集合点)
:param kwargs:
:return:
"""

userCount=environment.runner.user_count
log.info("当前启动用户数:{}".format(userCount))
if userCount>=user_count:
all_locusts_spawned.release() #创建钩子方法
print("[环境已准备好]当前启动用户数:", userCount)
else:
all_locusts_spawned.wait()





class LocustCount():
"""
统计locust 相关使用,一种数据共享方式
"""

times=0

class TaskSetUT(SequentialTaskSet):
# 类继承SequentialTaskSet TaskSet
# 当类里面的任务请求有先后顺序时,继承SequentialTaskSet类,
# 没有先后顺序,可以使用继承TaskSet
pageIndex = 0
pageSize = 20


def on_start(self):
[print(('on_start:',s)) for s in [self.user.environment.stats.total]]
my_request_self=my_request.get_locust_client_dict(self.client) #替换reqeust session locust_clientsession
my_request_self.locust_name_flag=0 #设置locust 请求的 name 属性参数,url中带问号
#log.info("共享数据的环境:",self.user.environment)
log.info("共享数据的环境 stats:", self.user.environment.stats.total)
#log.info("开始统计数据 stats_history_csv:", stats.stats_history_csv(self.user.environment))
#utb.wt_b.login_WT()
on_hatch_complete(self.user.environment, user_count=1) #启动时设置1000个用户已经准备好
log.info("开始测试了,当前用户为:", self.user.environment.runner.user_count)







def on_stop(self):
[print('on_stop:',s) for s in [self.user.environment.stats.total]]
pass

@task(1)
def test_RecordData(self):
dataSize = random.randint(1, 100)
result=utb.mysql_saveList(dataSize)



@task(1)
def test_getGlobalTime(self):
result = utb.getGlobalTime()


@task(1)
@tag("leave_1")
def test_search(self):
search_value = random.randint(1,10)
filter = [
{"operation": "like", "alias": "", "dataType": "", "fieldname": "indexInGroup", "name": "indexInGroup",
"value": "{}%".format(search_value)}]
filter = str(filter).replace("'", '"')
pageIndex = self.pageIndex
pageSize = self.pageSize

data_dict = utb.search(filter=filter, pageIndex=pageIndex, pageSize=pageSize)
self.pageIndex += 1
self.pageSize *= self.pageIndex
try:
if data_dict['totalPages'] <= pageIndex + 1:
self.pageIndex = 0

if pageSize >= data_dict['total']:
self.pageSize = random.randint(pageIndex, data_dict['total'])
except Exception as e:
print(str(e))
self.pageIndex = 0
self.pageSize = 20

print('{} test_search :/n'.format(pageIndex), self.pageIndex, self.pageSize)

@task(2)
def test_search_new(self):

filter =''
data_dict = utb.search(filter=filter, pageIndex=0, pageSize=200)



@task(1)
def test_crossServiceLoopByDeep(self):
LocustCount.times+=1
callCount=LocustCount.times
flag=random.randint(1,10)
data_dict = utb.crossServiceLoopByDeep(callCount=callCount,deep=random.randint(1,10))





# This is the HttpUser class.
class UserBehavior(HttpUser): # HttpUser FastHttpUser
tasks = [TaskSetUT]
times = 1
wait_time = between(0, 0)
network_timeout = 600
connection_timeout = 600
print("UserBehavior:测试")
share_data = ['url1', 'url2', 'url3', 'url4', 'url5']


if __name__ == '__main__':
import subprocess,os
#log.delete_log_report_file()
#utb.url_basic = 'http://10.2.1.95:9900'
utb.url_basic = 'http://10.2.1.150:9900'
#utb.url_basic = 'http://10.2.1.150:8533'

path = os.path.dirname(os.path.abspath(__file__))
file_name = os.path.split(__file__)[-1].split(".")[0]
default_url=utb.url_basic
print(path,file_name)


print("开始运行wt") # --no-web -c 2 -r 1 -t 3s
#locust -f --headless -u 1000 -r 100 --run-time 1h30m --step-load --step-users 300 --step-time 20m
# subprocess.call(
# 'locust -f {}/{}.py -u 1 -r 1 -t 10 -l --csv-full-history'.format(path,file_name, default_url),
# shell=True)
cmd ='locust -f {}/{}.py --host={} --web-host="127.0.0.1" --web-port 8090 --csv CSV_{}'.format(path,file_name, default_url,file_name)
print("cmd:",cmd)
dd=subprocess.call(cmd,shell=True) ##--web-port 8090
#d=subprocess.check_call('locust -f {}/{}.py --worker'.format(path,file_name, default_url),shell=True)
# d1=subprocess.check_call('locust -f {}/{}.py --worker'.format(path,file_name, default_url),shell=True)
d2 = subprocess.call('locust -f {}/{}.py --master --host={} --web-host="127.0.0.1" --web-port 8090 --csv CSV_{}'.format(path,file_name,default_url,file_name),shell=True)

4.3 贴图效果如下:

五、缺陷不足

1、分布式数据不太好统计
2、要多台电脑运行,是否可以用一个脚本或者工具进行远程控制(一键命令式)
3、对fashHttp支持不够友好,有时候会报错
4、请大家在使用中进一步提出更好的建议

六、其他说明

####6.1 说明
欢迎大家提建议转载分享,请大家带上原文链接,改文目前仅发表于TesterHome

6.2 收集的locust其他相关资料

0、官方文档
https://docs.locust.io/en/stable/installation.html
1、添加集合点:
https://testerhome.com/topics/12126
2、数据的几种方式
https://blog.csdn.net/gogoboi_jin/article/details/79229570
https://www.cnblogs.com/changqing8023/p/9563364.html
3、数据的使用
https://debugtalk.com/post/head-first-locust-advanced-script/
4、locust 原理介绍 与源码
https://www.cnblogs.com/belle-ls/p/10487597.html
https://cloud.tencent.com/developer/article/1506669
https://testerhome.com/topics/11829
5、统计性能数据 与优化
http://www.testqa.cn/article/detail/241
http://www.testqa.cn/article/detail/236
6、停止 locust
https://stackoverflow.com/questions/60163341/stop-locust-after-specific-number-of-requests

共收到 4 条回复 时间 点赞

感谢楼主,最近也在找locust的资料看

locust 本身对协议扩展支持是非常灵活和便捷的
看下来,只是为了怎么更优雅的整合api接口压测场景封装和调用。最好是复用接口平台里面的case。
核心思路,不在于request
用locust也有一段时间,从之前一天5~6个接口支持,现在经过改装一天可以撸个50+接口也很快,也可以很灵活的对接从自动化平台过来的场景case

roc #3 · August 20, 2020 作者

【问】:
1、可以分享一下你的case转化为性能脚本吗,提供一下核心思路和做法
2、你目前的做法在能web端能统计到相关的请求数据吗

【答】:
一、主要从底层转换http协议(request)有下面几个考量:
1、想复用整个工具本身的数据统计,能看到web端的http请求的相关数据
2、若进行扩展新的协议,可以扩展如上类似的解决方式来解决
3、http协议在我们的实际测试中占了很大的比例,所有优先从协议转化上进行处理

二、关于直接从case用例转性能,我的理解是:
1、是可以直接把用例导入进行运行(本人理论上认为是可以,但没有具体实践过)
2、但我不建议直接从用例拿过来用,可以copy其中的一个方法进行封装过来(主要性能测试要不断调试,自动化脚本拿来直接用有冗余)

roc 回复

前段时间有点忙,今天才看到回复
1、可以分享一下你的case转化为性能脚本吗,提供一下核心思路和做法
-- 后续可以撸个文章,个人觉的这个问题最本质的是快速实现压测接口,然后接口组合压测场景,想要从1人/天 10接口提升到100个,确实需要改造,还有第二点,是接口模版的改动,可以最小代价落地

2、你目前的做法在能web端能统计到相关的请求数据吗
-- locust虽然提供了图表,但是只是一个辅助手段,压测数据都是要以服务环境数据指标做第一优先;而且分布式slave多,数据会有几秒的滞后,但是不影响整体的准确性,现在监控体系都非常成熟了,从普罗米修斯到elk+grf日志分析 各个组件的监控,各种云上面都有自建都指标,如果是自建的,本身就有监控数据视图,locust本身最重要的功能还是提供一个压测能力,相比jmt 分布式部署会很便捷,而且对扩展协议压测支持很便捷,最是最大的优点

第1个问题,只有相对解最优解;而且在实际压测过程中,需要控制分布式参数唯一性的问题,自动化 case过来的数据,需要做一轮抽象,然后组合初始化压测数据满足动态大并发要求。

需要 Sign In 后方可回复, 如果你还没有账号请点击这里 Sign Up