接口测试 request 接口自动化测试脚本转化为 locust 性能测试脚本

roc · 2020年08月18日 · 最后由 danning 回复于 2023年01月11日 · 4272 次阅读

request 接口自动化测试脚本转化为 locust 性能测试脚本

一、目标说明

1、locust 可以调用 request 的封装接口(比如登录)
2、对 locust 请求数进行部分控制优化
3、可以作为快速造数据的一种手段
4、locust 适配版本为 v1.1(0 版本需要自己修改一下相关代码参数与部分逻辑)

二、实现原理

1、对 request 进行二次封装,同时实现 locust 代码记录转换
2、依据 locust 底层采用了 request 的 session,因此替换 session 即可(RequestsBasic)

三、核心代码逻辑

3.1 requests_basic.py request 二次封装,核心转化模块

import time
import urllib3
import requests
import threading
import sys
from collections import namedtuple
import traceback
from utils import logger
from utils.logger import log  #此处的log  可以直接用系统的 logging


class RequestsBasic(object):
    urllib3.disable_warnings()  # 去掉验证证书告警问题
    #相关类变量:
    locust_start_time=None #统计 locust 开始发送请求的时间
    #默认headers,可以设置更改
    default_headers = {
        "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
        "Accept-Encoding": "gzip, deflate, br",
        "Accept": "application/json, text/plain, */*",
        "Connection": "keep-alive",
        "User-Agent":
        "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.157 Safari/537.36"}

    def __init__(self,http_client_session=None):
        self.http_client_session = http_client_session or requests.Session()
        self.verify = False #默认设置是否需要进证书校验,默认不需要(主要https用)
        self.requests = requests #默认模块为一个对象,留作扩展
        self.headers = self.get_default_headers()
        self.proxies = {'http': 'http://127.0.0.1:8888',
                        'https': 'https://127.0.0.1:8888'}#这个与使用抓包工具的代理有关如:我的fildder 用的是8888 端口
        self.proxies_open=0 #默认关闭不使用代理
        self.locust_client_dict:dict={}  #根据线程id存放不同的客户端user.client
        self.locust_count =0 #统计该进程运行时 发送 locust http请求个数
        self.runtimes=1 # 运行次数,非locust请求时 增加失败重发机制
        # locust_response_json_assert 针对locust的响应进行处理其是否成功或者失败,可另外进行扩展,比如加入大于小于等于等进行判断
        self.locust_response_json_assert={}
        self.locust_name_flag = 1 # 对locust 请求时 设置 name参数设置,否则不设置url的名字





    def get_locust_client_dict(self,locust_client):
        """
        替换 request中 的session 为单独用户的 locust_client,在locust 中的 on_start 方法中调用
        :param locust_client: 在locust 中一般为  self.client
        eg:my_request.get_locust_client_dict(self.client)
        :return: 返回 self 该类的实例化本身,因为此处在每个线程,每个user.client是有细微差异方便对该类实例化对象进行设置
        """
        """

        :return:
        """
        thread_id = threading.current_thread().ident  # 记录当前线程id
        self.locust_client_dict[thread_id] = locust_client  # 注意设置此项,且保证 my_request 是同一个不可变的对象

        return self #注意此处,返回的不同 RequestsBasic 对象

    #默认设置请求的 Content-Type
    def get_default_headers(self,content_type=1,default_headers=None):
        """
        获取默认的请求头
        :param content_type:
        :param default_headers:
        :return:
        """

        headers ={
        "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
        "Accept-Encoding": "gzip, deflate, br",
        "Accept": "application/json, text/plain, */*",
        "Connection": "keep-alive",
        "User-Agent":
        "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.157 Safari/537.36"}
        if default_headers !=None:
            headers=default_headers

        if content_type ==-1:
            pass #无   Content-Type 属性
        elif content_type ==0:
            headers['Content-Type']="application/json; charset=UTF-8"
        elif content_type==1:
            headers['Content-Type'] = "application/x-www-form-urlencoded; charset=UTF-8"
        elif content_type ==2:
            headers['Content-Type'] = "multipart/form-data"

        return headers

    def request(self, method, url, locust_client=None, locust_response_json_assert_dict:dict=None,**kwargs):
        """
        支持request请求,和locust请求(以locust* 开头的参数都是locust特有)

        :param method: #methods= ('post','put','get','delete')  #可以参考 request的request方法
        :param locust_client: locust的客户端,或者设置 self.locust_client_dict 对象
        :param locust_response_json_assert_dict: locust时对返回的response 参数进行特殊断言
        :param url:
        :param
        :param kwargs:
        :return:

        """
        #、设置返回相关参数,设置一个namedtuple对象,进行一些参数封装
        request_meta = namedtuple('RequestsMeta',
                                  ['start_time', 'elapsed_time', 'response', 'content_size', 'status_code'])
        request_meta.start_time = time.time() #统计时间
        param_info = {"method": method, "url": url, "kwargs": kwargs}  # 异常处理时调用参数
        #print("locust_response_json_assert_dict:", locust_response_json_assert_dict,kwargs)

        # 开始发起http/https请求
        try:
            #1、处理请求头
            if 'verify' not in kwargs.keys():
                kwargs['verify'] = self.verify
            if 'headers' not in kwargs.keys():
                kwargs['headers'] = self.headers
            # urllib3.disable_warnings()  # 去掉验证证书告警问题

            #2、处理locust和普通请求
            if locust_client == None and len(self.locust_client_dict)== 0:  #普通请求
                #处理代理
                if self.proxies_open == 1:
                    # "proxies" in kwargs.keys() and kwargs['proxies'] != None
                    kwargs['proxies']=self.proxies
                    log.warning("此处使用了代理协议请检查(若未开代理可能会失败!)"
                                "proxies:{}".format(kwargs['proxies']))
                #支持多次发送
                if 'name' in kwargs.keys():
                    log.error("【注意】name 参数一般不在 kwargs参数中,locust时特用!目前已废弃,请注意检查!")


                try:
                    runtimes = 1  #普通请求,多次尝试计数
                    while runtimes <= self.runtimes:
                        response = self.http_client_session.request(method, url, **kwargs)
                        # log.info(runtimes,self.runtimes,response.status_code)
                        # print("runtimes:", runtimes,response.status_code)
                        if response.status_code != 200 and self.runtimes > 1:
                            elapsed_time = int((time.time() - request_meta.start_time) * 1000)  # 毫秒
                            log.warning("""runtimes:{} 
                            ,耗时:{} ms,返回状态码:{}""(!=200),请检查请求:\n{}\n【返回值】:{}
                            """.format(runtimes, elapsed_time,response.status_code, param_info, response.text))
                            time.sleep(runtimes * 2)
                            request_meta.start_time = time.time()  # 重新记录开始时间
                        else:
                            break

                        runtimes += 1  # 自增运行的次数


                except Exception as  e:
                    log.warning("runtimes:{} 运行失败,请检查:{}".format(runtimes,traceback.format_exc()))
                #其他处理
                #log.debug("session:{}".format(response.cookies))
                # 异常处理
                if response.status_code != 200:
                    log.warning("""返回状态码:{}(!=200),
                    请检查请求:\n{}\n【返回值】:{}
                    """.format(response.status_code, param_info,response.text))


            else:  # 走locust
                self.locust_count +=1  #统计locust发送的请求次数
                if self.locust_count <=1:
                    self.locust_start_time= time.time() #统计从locust发送的第一个请求开始计时

                thread_id = threading.current_thread().ident  # 获取当前线程id
                #拿取  locust_client
                if  locust_client != None:
                    #使用自己本身
                    self.locust_client_dict[thread_id]=locust_client  # 获取当前线程的 user.client
                else:
                    locust_client = self.locust_client_dict[thread_id]

                if self.locust_name_flag ==1 and "?" in url:
                    if 'name' not in kwargs:
                        url_pre = url.split("?")[0]
                        kwargs['name'] = url_pre
                    log.warning("对locust的 name(url) 参数进行特殊为:{}".format(url))


                # 发送请求: 注意 catch_response
                elapsed_time_total = int((time.time() - self.locust_start_time) * 1000)
                locust_count=self.locust_count #locust 当前统计的次数
                print('{}次请求中!总耗时(ms):'.format(locust_count),elapsed_time_total,"线程id:{}".format(thread_id),url, "调试打印locust_response_json_assert_dict:",
                      locust_response_json_assert_dict)
                with locust_client.request(method, url,catch_response=True, **kwargs) as response:

                    if hasattr(response, 'elapsed'):
                        elapsed=response.elapsed
                        elapsed_time_total = int((time.time() - self.locust_start_time) * 1000)
                        print("{}次请求完!总耗时(ms):".format(locust_count),elapsed_time_total,'本次耗时:',elapsed,"线程id:{}".format(thread_id),url,)
                    else:
                        elapsed=''

                    if response.status_code < 400:
                        response.success()
                        # 处理json统计接口是否失败
                        #进行 locust_response_json_assert_dict 特殊处理
                        if "locust_response_json_assert_dict" in kwargs.keys():
                            locust_response_json_assert_dict = kwargs['locust_response_json_assert_dict']
                        # print("request:locust_response_json_assert_dict:",locust_response_json_assert_dict,kwargs)

                        if locust_response_json_assert_dict != None:
                            try:
                                data_dict = response.json()
                                for k, v in locust_response_json_assert_dict.items():
                                    if k in data_dict and data_dict[k] == locust_response_json_assert_dict[k]:
                                        response.success()
                                    else:
                                        response.failure('json检查k,v:{} 失败!{}'.format((k, v), response.text))
                            except Exception as e:
                                print("处理locust_response_json_assert_dict异常:\n", traceback.format_exc())
                            finally:
                                pass

                    else:
                        #注意此处
                        response.failure('elapsed:{} 状态码:{} 失败!{}'.format(0,response.status_code,response.text))




            # response.r
            response.encoding = 'utf-8'
            # response.content.decode("unicode_escape")
            request_meta.elapsed_time = int((time.time() - request_meta.start_time) * 1000)  # 毫秒
            request_meta.status_code = response.status_code
            request_meta.response = response
            request_meta.content_size = int(response.headers.get("content-length") or 0)



        except Exception as e:
            my_request.request_except_deal(e, traceback.format_exc(),**kwargs)
            raise e
            # log.error("【错误详细信息】:\n" + traceback.format_exc())

        return request_meta

    def request_except_deal(self,e,traceback,result='',**kwargs):
        """
        #处理接口异常信息
        :param e:   捕获到的异常类
        :param traceback:  异常相关信息
        :return: 异常类型
        """
        try:
            except_type = type(e)
            if type(result)!=str:
                text = result.response.text
            else:
                text=""

            log.warning("text:{}\n抛出异常类型为:{},异常信息为:{}\n参数为:{}".format(text,except_type,traceback,kwargs))
        except Exception as e:
            print("处理request报错异常(print):",traceback.format_exc())

        return except_type




#实例化一个默认对象
my_request=RequestsBasic()

if __name__ == "__main__":

    print("开始测试",my_request.headers)

四、使用示例

4.1 ut_basic.py 对 http 请求的一个封装,可以跑接口测试,也可以跑 locust 性能测试


#!/usr/bin/env python
# coding:utf-8
"""
@author:YuanPengCheng
@PROJECT_NAME:autoTest
@file:UT_basic.py
@createtime:2020/8/6 9:22
@software:  PyCharm
@description:
"系统性能框架测试模块  unittest(UT)"

"""
import traceback
import random

# sys.path.append(os.getcwd())
from common.http import requests_basic  #核心代码 requests_basic.py

from utils.logger import log
from utils.testTools import tools #一些常用小工具封装,可以自己去替换


my_request = requests_basic.RequestsBasic()  # 初始化一个 http请求,防止不同的对象的请求混乱
my_request.headers = my_request.get_default_headers(content_type=0)  # 设置请求头的类型
# proxies_my = my_request.proxies  # debug时 是否开启代理抓包模式
proxies_my=None

class UTBasic():
    def __init__(self):
        self.my_request = my_request
        self.url_basic=''  #:9900
        self.ut_path= "/UT"  #  self.ut_path=''  ,默认走网关
        pass

    #获取服务器时间(即空接口测试)
    def getGlobalTime(self,**kwargs):
        """
        获取服务器时间(即空接口测试)
        :param kwargs:
        :return:
        """
        try:
            # /dp_server/api/connect/list
            url = self.url_basic + "{}/mock/testRecordData/getGlobalTime".format(self.ut_path)
            # searchContent
            payload = {}

            for k, v in kwargs.items():  # 对函数关键字进行处理,
                payload[k] = v

            result = my_request.request('get', url=url, data=payload,**kwargs)
            log.info(url,result.elapsed_time,'ms ' ,result.response.text)
            # data_dict = result.response.json()
            # return data_dict
        except Exception as e:
            my_request.request_except_deal(e, traceback.format_exc(), result)
            raise e
            #

    #Mysql数据批量保存接口
    def mysql_saveList(self,dataSize=20,**kwargs):
        """
        Mysql数据批量保存接口
        :param kwargs:
        :return:
        """
        try:
            # /dp_server/api/connect/list
            url = self.url_basic + "{}/mock/testRecordData/saveList?dataSize={}".format(self.ut_path,dataSize)
            # searchContent
            payload ={}

            for k, v in kwargs.items():  # 对函数关键字进行处理,
                payload[k] = v

            result = my_request.request('get', url=url, params=payload,**kwargs)
            print(result.elapsed_time,'ms ' ,url,result.response.elapsed,url,result.response.text)
            # data_dict = result.response.json()
            # return data_dict
        except Exception as e:
            my_request.request_except_deal(e, traceback.format_exc(), result)
            raise e
            #


    #跨服务调用空接口,深度和层次
    def crossServiceLoopByDeep(self,callCount=1,deep=5,**kwargs):
        """
        Mysql数据批量保存接口
        callCount:调用链路号(Int),用于区分调用的次数,每次调用可递增
        deep:跨服务调用深度(Int)
        :param kwargs:
        :return:
        """
        try:
            print("(callCount,deep):",(callCount,deep))
            # /dp_server/api/connect/list
            url = self.url_basic + "{}/mock/testRecordData/crossServiceLoopByDeep?callCount={}&deep={}".format(self.ut_path,callCount,deep)
            # searchContent
            payload = {}

            for k, v in kwargs.items():  # 对函数关键字进行处理,
                payload[k] = v

            result = my_request.request('get', url=url, params=payload,**kwargs)
            print(result.elapsed_time,'ms ' ,(callCount,deep),url,result.response.text)
            # data_dict = result.response.json()
            # return data_dict
        except Exception as e:
            my_request.request_except_deal(e, traceback.format_exc(), result)
            raise e
            #

    #mysql 高级查询
    def search(self,filter: str, pageIndex=0,pageSize=10,sortField='', sortOrder='', **kwargs):
        """
        :param filter:  是一个 字符串的  list[dict]
        [{"operation":"like","alias":"","dataType":"","fieldname":"majorName","name":"majorName","value":"06191455%"}]
        :param kwargs:
        :return:
        """
        try:
            # /dp_server/api/connect/list
            url = self.url_basic + "{}/mock/testRecordData/search".format(self.ut_path)
            # searchContent
            payload = {'filter': filter,
                       # [{"operation":"like","alias":"","dataType":"","fieldname":"indexInGroup","name":"indexInGroup","value":"19%"}]
                       'pageIndex': pageIndex,
                       'pageSize': pageSize,
                       'sortField': sortField,
                       'sortOrder': sortOrder}
            result = my_request.request('post', url=url, data=payload,headers=my_request.get_default_headers(1),proxies=proxies_my,**kwargs)
            print(result.elapsed_time,'ms ' ,url,"状态码:",result.status_code,result.response.elapsed,url,"response.text 数据:\n",result.response.text,"data参数为:\n{}".format(payload))

            # print(result.response.text)
            data_dict = result.response.json()
            # print("1data_dict:",data_dict)
            print("search data_dict:\n", "total:", data_dict['total'], "pageSize:", data_dict['pageSize'],
                  "totalPages:", data_dict['totalPages'])

            return data_dict
        except Exception as e:
            my_request.request_except_deal(e, traceback.format_exc(), result)
            raise e
            #


    #图数据库保存
    def saveGraphData(self,dataGroupId:str,linkedSize:int,linkedNum:int, **kwargs):
        """
        :param filter:  是一个 字符串的  list[dict]
        :param kwargs:
        :return:
        """
        try:
            # /dp_server/api/connect/list
            #url = self.url_basic + "{}/mock/testGraphData/saveGraphData?dataGroupId={}&linkedSize={}&linkedNum={}".format(self.ut_path,dataGroupId,linkedSize,linkedNum)
            url_basic = self.url_basic + "{}/mock/testGraphData/saveGraphData".format(self.ut_path)
            url=url_basic+"?dataGroupId={}&linkedSize={}&linkedNum={}".format(dataGroupId,linkedSize,linkedNum)
            #?dataGroupId=0001&linkedSize=20&linkedNum=30
            payload = {"dataGroupId":dataGroupId,
                       "linkedSize":linkedSize,
                       "linkedNum":linkedNum,
                       }
            result = my_request.request('get', url=url, data={},headers=my_request.get_default_headers(1),**kwargs)
            print(result.elapsed_time,'ms ' ,url,"\nsaveGraphData text:",[result.response.text])

            return result



        except Exception as e:
            my_request.request_except_deal(e, traceback.format_exc(), result)
            raise e
            #


    #图数据库查询
    def findListByNeo4j(self,startLable:str,maxLevel:int,targetLable:str, **kwargs):
        """
        :param startLable: ‘Tag’_dataGroupId_编号  是一个组合参数
        :param maxLevel:   #与保存时的 linkedSize tag生成的列数有关,几乎相等,可以小于它,但注意数据关联
        :param targetLable: ‘Tag’_dataGroupId_编号  是一个组合参数,注意与 startLable  maxLevel 进行数据关联
        :param kwargs:
        :return:
        """
        try:
            # /dp_server/api/connect/list
            #url = self.url_basic + f"{self.ut_path}/mock/testGraphData/findListByNeo4j?startLable={startLable}&maxLevel={maxLevel}&targetLable={targetLable}"
            url_basic = self.url_basic + f"{self.ut_path}/mock/testGraphData/findListByNeo4j"
            url=url_basic+f"?startLable={startLable}&maxLevel={maxLevel}&targetLable={targetLable}"
            #/mock/testGraphData/findListByNeo4j?startLable=Tag_0001_0&maxLevel=20&targetLable=Tag_0001_19
            # searchContent
            payload = {"startLable":startLable,
                       "maxLevel":maxLevel,
                       "targetLable":targetLable}
            result = my_request.request('get', url=url, data={},headers=my_request.get_default_headers(1))
            print(result.elapsed_time,'ms ' ,url,"\n",[result.response.text])



        except Exception as e:
            my_request.request_except_deal(e, traceback.format_exc(), result)
            raise e
            #

    #分布式事务验证接口
    def testDistriTransaction(self,dataGroupId:str='555555555', **kwargs):
        """
        :param dataGroupId 数据集ID。验证如果表中不存在该dataGroupId对应的数据即生效
        :param kwargs:
        :return:
        """
        try:
            # /dp_server/api/connect/list
            url_basic = self.url_basic + f"{self.ut_path}/mock/testRecordData/testDistriTransaction"
            url = url_basic + f"?dataGroupId={dataGroupId}"
            #:9900/UT/mock/testRecordData/testDistriTransaction?dataGroupId=555555555
            payload = {}
            result = my_request.request('get', url=url, data=payload,headers=my_request.get_default_headers(1))
            print(result.elapsed_time,'ms ' ,url,"\n",[result.response.text])



        except Exception as e:
            my_request.request_except_deal(e, traceback.format_exc(), result)
            raise e
            #






#默认生成一个公共对象
utb=UTBasic()

if __name__ == "__main__":
    log.set_logpath("/UT/basic/")
    utb.my_request.proxies_open=1 #调试走协议
    utb.url_basic='http://10.2.1.150:9900'
    #utb.url_basic = 'http://10.2.1.95:9900'
    # utb.getGlobalTime()
    # utb.crossServiceLoopByDeep()
    #搜索
    def test_search():
        search_value = '196'
        print("开始搜索:{}".format(search_value))

        filter = [
            {"operation": "like", "alias": "", "dataType": "", "fieldname": "indexInGroup", "name": "indexInGroup",
             "value": "{}%".format(search_value)}]
        filter = str(filter).replace("'", '"')
        data_dict = utb.search(filter=filter)
        print(data_dict)

    def testDistriTransaction():
        print("开始分布式事务调用")
        dataGroupId='555555555'
        utb.testDistriTransaction(dataGroupId=dataGroupId)
        locust_response_json_assert_dict = {"success": True}
        filter = [
            {"operation": "like", "alias": "", "dataType": "", "fieldname": "dataGroupId", "name": "dataGroupId",
             "value": "{}%".format(dataGroupId)}]
        filter = str(filter).replace("'", '"')
        data_dict = utb.search(filter=filter,locust_response_json_assert_dict=locust_response_json_assert_dict)
        print('data_dict:',data_dict)


    def test_saveGraphData():
        print("开始保存图数据库:")
        timestamp=tools.TimeDate.get_timestamp('%d%H%M%S%f')
        dataGroupId = 'roc{}'.format(timestamp)
        linkedSize=5
        result=utb.saveGraphData(dataGroupId=dataGroupId,linkedSize=linkedSize,linkedNum=10)
        startLable='Tag_'+dataGroupId+"_0"
        maxLevel=random.randint(2,linkedSize)

        targetLable='Tag_'+dataGroupId+"_{}".format(maxLevel-1)
        #time.sleep(1)
        utb.findListByNeo4j(startLable,maxLevel,targetLable)

    def test_crossServiceLoopByDeep():
        flag=random.randint(1,10)
        data_dict = utb.crossServiceLoopByDeep(callCount=1,deep=random.randint(1,10))
        print(data_dict)



    #search()
    test_search()

4.2 requests_basic.py request 二次封装,核心转化模块

from locust import SequentialTaskSet, HttpUser, between, task, tag
from locust import events
from locust.contrib.fasthttp import FastHttpUser
import os, sys
import random
from locust import stats

##修改环境变量路径
sys_path_root = os.getcwd().split('scripts')[0]
sys_path_scripts = sys_path_root + "scripts" #因整个工程有个框架结构 scripts,把此处加入到环境变量,保证locust 命令cmd中可用
sys.path.extend((sys_path_root, sys_path_scripts))

from utils.logger import log
from scripts.WisdomTraining.cases.locustFile.ut.UT_basic import utb,my_request
#from .UT_basic import utb,my_request #同级下,



from locust import events
from gevent._semaphore import Semaphore
all_locusts_spawned = Semaphore()  #对协程进行管理
all_locusts_spawned.acquire()
def on_hatch_complete(environment,user_count=100,**kwargs):
    """
    对启动用户进行集合点设置
    适用于场景如:总用户1000,每秒启动用户数1000(注意user_count参数)
    :param environment: 当前进程的环境 可以用用 self.user.environment 拿取
    :param user_count: 当前启动的线程(用户数)大于user_count 时才会发起http请求(启动时设置集合点)
    :param kwargs:
    :return:
    """
    userCount=environment.runner.user_count
    log.info("当前启动用户数:{}".format(userCount))
    if  userCount>=user_count:
        all_locusts_spawned.release()  #创建钩子方法
        print("[环境已准备好]当前启动用户数:", userCount)
    else:
        all_locusts_spawned.wait()





class LocustCount():
    """
    统计locust 相关使用,一种数据共享方式
    """
    times=0

class TaskSetUT(SequentialTaskSet):
    # 类继承SequentialTaskSet 或 TaskSet类
    # 当类里面的任务请求有先后顺序时,继承SequentialTaskSet类,
    # 没有先后顺序,可以使用继承TaskSet类
    pageIndex = 0
    pageSize = 20


    def on_start(self):
        [print(('on_start:',s)) for s in [self.user.environment.stats.total]]
        my_request_self=my_request.get_locust_client_dict(self.client) #替换reqeust的 session 为locust_client的session
        my_request_self.locust_name_flag=0 #设置locust 请求的 name 属性参数,url中带问号
        #log.info("共享数据的环境:",self.user.environment)
        log.info("共享数据的环境 stats:", self.user.environment.stats.total)
        #log.info("开始统计数据 stats_history_csv:", stats.stats_history_csv(self.user.environment))
        #utb.wt_b.login_WT()
        on_hatch_complete(self.user.environment, user_count=1) #启动时设置1000个用户已经准备好
        log.info("开始测试了,当前用户为:", self.user.environment.runner.user_count)







    def on_stop(self):
        [print('on_stop:',s) for s in [self.user.environment.stats.total]]
        pass

    @task(1)
    def test_RecordData(self):
        dataSize = random.randint(1, 100)
        result=utb.mysql_saveList(dataSize)



    @task(1)
    def test_getGlobalTime(self):
        result = utb.getGlobalTime()


    @task(1)
    @tag("leave_1")
    def test_search(self):
        search_value = random.randint(1,10)
        filter = [
            {"operation": "like", "alias": "", "dataType": "", "fieldname": "indexInGroup", "name": "indexInGroup",
             "value": "{}%".format(search_value)}]
        filter = str(filter).replace("'", '"')
        pageIndex = self.pageIndex
        pageSize = self.pageSize

        data_dict = utb.search(filter=filter, pageIndex=pageIndex, pageSize=pageSize)
        self.pageIndex += 1
        self.pageSize *= self.pageIndex
        try:
            if data_dict['totalPages'] <= pageIndex + 1:
                self.pageIndex = 0

            if pageSize >= data_dict['total']:
                self.pageSize = random.randint(pageIndex, data_dict['total'])
        except Exception as e:
            print(str(e))
            self.pageIndex = 0
            self.pageSize = 20

        print('{} test_search :/n'.format(pageIndex), self.pageIndex, self.pageSize)

    @task(2)
    def test_search_new(self):

        filter =''
        data_dict = utb.search(filter=filter, pageIndex=0, pageSize=200)



    @task(1)
    def test_crossServiceLoopByDeep(self):
        LocustCount.times+=1
        callCount=LocustCount.times
        flag=random.randint(1,10)
        data_dict = utb.crossServiceLoopByDeep(callCount=callCount,deep=random.randint(1,10))





# This is the HttpUser class.
class UserBehavior(HttpUser):  # HttpUser  FastHttpUser  注意此处,若需走fasthttp 该类继承 FastHttpUser   就ok了,但是有可能会出问题(请自己检测)
    tasks = [TaskSetUT]
    times = 1
    wait_time = between(0, 0)
    network_timeout = 600
    connection_timeout = 600
    print("UserBehavior:测试")
    share_data = ['url1', 'url2', 'url3', 'url4', 'url5']


if __name__ == '__main__':
    import subprocess,os
    #log.delete_log_report_file()
    #utb.url_basic = 'http://10.2.1.95:9900'
    utb.url_basic = 'http://10.2.1.150:9900'
    #utb.url_basic = 'http://10.2.1.150:8533'

    path = os.path.dirname(os.path.abspath(__file__))
    file_name = os.path.split(__file__)[-1].split(".")[0]
    default_url=utb.url_basic
    print(path,file_name)


    print("开始运行wt")  # --no-web -c 2 -r 1 -t 3s
    #locust -f --headless -u 1000 -r 100 --run-time 1h30m --step-load --step-users 300 --step-time 20m
    # subprocess.call(
    #     'locust -f {}/{}.py  -u 1 -r 1 -t 10 -l --csv-full-history'.format(path,file_name, default_url),
    #     shell=True)
    cmd ='locust -f {}/{}.py  --host={}  --web-host="127.0.0.1" --web-port 8090 --csv CSV_{}'.format(path,file_name, default_url,file_name)
    print("cmd:",cmd)
    dd=subprocess.call(cmd,shell=True) ##--web-port 8090
    #d=subprocess.check_call('locust -f {}/{}.py  --worker'.format(path,file_name, default_url),shell=True)
    # d1=subprocess.check_call('locust -f {}/{}.py  --worker'.format(path,file_name, default_url),shell=True)
    d2 = subprocess.call('locust -f {}/{}.py  --master --host={}  --web-host="127.0.0.1" --web-port 8090 --csv CSV_{}'.format(path,file_name,default_url,file_name),shell=True)

4.3 贴图效果如下:

五、缺陷不足

1、分布式数据不太好统计
2、要多台电脑运行,是否可以用一个脚本或者工具进行远程控制(一键命令式)
3、对 fashHttp 支持不够友好,有时候会报错(继承 HttpUser 的类改为 FastHttpUser 就行)
4、请大家在使用中进一步提出更好的建议

六、其他说明

####6.1 说明
欢迎大家提建议转载分享,请大家带上原文链接,改文目前仅发表于 TesterHome

6.2 收集的 locust 其他相关资料

0、官方文档
https://docs.locust.io/en/stable/installation.html
1、添加集合点:
https://testerhome.com/topics/12126
2、数据的几种方式
https://blog.csdn.net/gogoboi_jin/article/details/79229570
https://www.cnblogs.com/changqing8023/p/9563364.html
3、数据的使用
https://debugtalk.com/post/head-first-locust-advanced-script/
4、locust 原理介绍 与源码
https://www.cnblogs.com/belle-ls/p/10487597.html
https://cloud.tencent.com/developer/article/1506669
https://testerhome.com/topics/11829
5、统计性能数据 与优化
http://www.testqa.cn/article/detail/241
http://www.testqa.cn/article/detail/236
6、停止 locust
https://stackoverflow.com/questions/60163341/stop-locust-after-specific-number-of-requests

共收到 5 条回复 时间 点赞

感谢楼主,最近也在找 locust 的资料看

locust 本身对协议扩展支持是非常灵活和便捷的
看下来,只是为了怎么更优雅的整合 api 接口压测场景封装和调用。最好是复用接口平台里面的 case。
核心思路,不在于 request
用 locust 也有一段时间,从之前一天 5~6 个接口支持,现在经过改装一天可以撸个 50+ 接口也很快,也可以很灵活的对接从自动化平台过来的场景 case

roc #3 · 2020年08月20日 Author

【问】:
1、可以分享一下你的 case 转化为性能脚本吗,提供一下核心思路和做法
2、你目前的做法在能 web 端能统计到相关的请求数据吗

【答】:
一、主要从底层转换 http 协议(request)有下面几个考量:
1、想复用整个工具本身的数据统计,能看到 web 端的 http 请求的相关数据
2、若进行扩展新的协议,可以扩展如上类似的解决方式来解决
3、http 协议在我们的实际测试中占了很大的比例,所有优先从协议转化上进行处理

二、关于直接从 case 用例转性能,我的理解是:
1、是可以直接把用例导入进行运行(本人理论上认为是可以,但没有具体实践过)
2、但我不建议直接从用例拿过来用,可以 copy 其中的一个方法进行封装过来(主要性能测试要不断调试,自动化脚本拿来直接用有冗余)

roc 回复

前段时间有点忙,今天才看到回复
1、可以分享一下你的 case 转化为性能脚本吗,提供一下核心思路和做法
-- 后续可以撸个文章,个人觉的这个问题最本质的是快速实现压测接口,然后接口组合压测场景,想要从 1 人/天 10 接口提升到 100 个,确实需要改造,还有第二点,是接口模版的改动,可以最小代价落地

2、你目前的做法在能 web 端能统计到相关的请求数据吗
-- locust 虽然提供了图表,但是只是一个辅助手段,压测数据都是要以服务环境数据指标做第一优先;而且分布式 slave 多,数据会有几秒的滞后,但是不影响整体的准确性,现在监控体系都非常成熟了,从普罗米修斯到 elk+grf 日志分析 各个组件的监控,各种云上面都有自建都指标,如果是自建的,本身就有监控数据视图,locust 本身最重要的功能还是提供一个压测能力,相比 jmt 分布式部署会很便捷,而且对扩展协议压测支持很便捷,最是最大的优点

第 1 个问题,只有相对解最优解;而且在实际压测过程中,需要控制分布式参数唯一性的问题,自动化 case 过来的数据,需要做一轮抽象,然后组合初始化压测数据满足动态大并发要求。

感谢楼主,求指教,locust 使用过程中每次修改任务权重都要停止脚本重新启动,想问下您知不知道有什么好的方法可以在不重启脚本的情况下修改 task 权重呢?

需要 登录 後方可回應,如果你還沒有帳號按這裡 注册