接口测试 接口组合参数压力测试进阶篇

测试小书童 · 2016年12月16日 · 最后由 卡斯 回复于 2016年12月20日 · 784 次阅读

基于组合测试生成参数在接口测试中的探索篇接口参数组合多线程和协程批量接口测试

本次新增

  • 对每组参数采用协程压测的方式进行压测
  • yaml 管理用例
  • 支持登陆成功后返回 token 或者 user_id 给其他接口使用,如果接参数需要多个加密参数,留了扩展,自己去封装
  • 检查点采用检查接口和访问数据库的方式进行检查
    • 如果正常参数直接访问数据库,如果是异常参数直接读取接口返回值
  • 注意此框架暂时还是探索阶段,有什么好想法欢迎提供

常用配置

  • 全局变量
PICT_PARAMS = "d:/params.txt" # 请求参数存放地址txt
PICT_PARAMS_RESULT = "d:/t2.txt" # 参数配对后的路径excel
# 数据库的常用字段
FIND_BY_SQL = "findBySql" # 根据sql查找
COUNT_BY_SQL = "countBySql" # 自定义sql 统计影响行数
INSERT = "insert" # 插入
UPDATE_BY_ATTR = "updateByAttr" # 更新数据
DELETE_BY_ATTR = "deleteByAttr" # 删除数据
FIND_BY_ATTR = "findByAttr" # 根据条件查询一条记录
FIND_ALL_BY_ATTR = "findAllByAttr"  #根据条件查询多条记录
COUNT = "count" # 统计行
EXIST = "exist" # 是否存在该记录
#接口简单点中的erro定义
NORMAL = "0" # 正常参数,可以到表里面找到
DEFAULT = "1" # 无此参数
EMPTY = "2" # 参数为空值,如name=''
DROP = "3" # 数据库中找不到此参数
# 接口统计
LOGIN_KEY = ""  # 登陆后返回的key
LOGIN_VALUE = ""  # 登陆后返回的value
RESULT = {"info": []} # 存最后结果
  • api.yaml
---
title: XXXX接口测试
host: rap.taobao.org
port: 80
protocol: http://
header: {"Accept":"text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8","User-Agent":"Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.59 Safari/537.36"}
database: {"databaseName":userinfo,"host":"127.0.0.1", "user": "root", "password": "", "port": 3306, "charset": "utf8"} #配置数据库
api_list:
- id: 1001
  name: 登陆
  method: post
  url: /mockjs/11463/login
  stress: 2
  hope_sql: {"findKey": "findBySql", "sql": "select * from info", "params": { }} #注意findKey的值,要对应全局变量里面的值
  params:
  - "user_name:user_name:error:0:send_keys:333:type:str,user_name:error:1,user_name:error:2,user_name:error:3:send_keys:22222:type:str"
  - "pwd:pwd:error:0:send_keys:111:type:str,pwd:error:1,pwd:error:2,pwd:error:3:send_keys:32321:type:str"
  # 注意这里的error,对应全局变量里面的error
  is_first: 1 # 预览的登陆接口
  login_key: user_id # 返回给其他接口使用的key
- id: 1002
  ...
  get_login_param: 1 # 需要登陆接口返回过来的参数

核心代码分析

  • 入口代码
from DAL import httpConfig as hc
from DAL import gevents, dbConnection, httpParams
from common import operateYaml
from DAL.pairs import *
PATH = lambda p: os.path.abspath(
    os.path.join(os.path.dirname(__file__), p)
)
def myRequest(**kwargs):
  # {"appStatus": {"errorCode": 0,"message": "操作成功"},"content": {"nickname":"1212121","user_id": 30}} 接口定义的规则
  # 现在只是考虑到了登陆后返回token,user_id这方面的需求

    method = kwargs["param_req"]["method"]
    get_login_params = 0 # 标识接受了返回多少个参数(user_id,token),用作后面拓展
    param = httpParams.params_filter(kwargs["param_result"])  # 请求参数的处理,如果这里有各种加密可从此处扩展
    # get_login_param表示此接口需要登陆返回来的id(token),一般登陆成功后返回的字段
    if kwargs["param_req"].get("is_first", "false") == "false" and kwargs["param_req"].get("get_login_param", "false") != "false":
        param[Const.LOGIN_KEY]= Const.LOGIN_VALUE
        get_login_params += 1
    if kwargs["param_req"]["method"] == Const.HTTP_POST:
        really_result = kwargs["http_config"].post(dict_post=kwargs["param_req"], param=param) # 发送post请求
    elif kwargs["param_req"]["method"] == Const.HTTP_GET:
        really_result = kwargs["http_config"].get(dict_get=kwargs["param_req"], param=param)  # 发送get请求
    if really_result.get("status_code") == 200:
        print("请求%s成功鸟" %method)
        if kwargs["param_req"].get("is_first", "false") != "false" :
            # 需要接口返回过来的login_key,如token,user_id)等,此时就不用查数据库作为检查点,检查点为直接读取响应结果
            if really_result["appStatus"]["errorCode"] == 0:
                Const.LOGIN_KEY = kwargs["param_req"]["login_key"]
                Const.LOGIN_VALUE = really_result["content"][Const.LOGIN_KEY]
                print("%s接口验证通过,不查数据库" %method)
                kwargs["result"]["success"] += 1
            else:
                print("%s接口测试失败,不查数据库~" %method)
                kwargs["result"]["failed"] += 1

        #如果实际的参数是异常,is_first表示是非登陆接口,就不查数据库.
        elif len(kwargs["param_result"].keys()) != len(param) - get_login_params:
            #根据接口返回的errorCode判断,假如errorCode=2表示参数异常
            if really_result["appStatus"]["errorCode"] == 2:
                print("%s接口异常参数检测通过" % method)
                kwargs["result"]["success"] += 1
            else:
                print("%s接口异常参数检测失败" % method)
                kwargs["result"]["failed"] += 1
            return
        else: #直接查询数据库作为检查点
            check_sql_key = kwargs["param_req"]["hope_sql"]["findKey"]  # 根据这里的key,来跳转到不同的数据库查询语句
            kwargs["param_req"]["hope_sql"]["params"] = param  # 已经处理好的请求参数传给数据库sql语句参数,结果为:params{"a":"b"}
            for item in kwargs["param_result"]:
                #  error: 0正常,1无此参数,2参数的值为空,3在数据库中不存.0和3查数据库,1,2直接读取接口返回信息
                error = kwargs["param_result"][item]["error"]
                if error == Const.NORMAL or error == Const.DROP:
                    if kwargs["check_sql"].findKeySql(check_sql_key, **kwargs["param_req"]["hope_sql"]):
                        print("%s数据库接口验证成功" %method)
                        kwargs["result"]["success"] += 1
                    else:
                        print("%s数据库接口验证失败" %method)
                        kwargs["result"]["failed"] += 1
                    return
                elif error == Const.DEFAULT or error == Const.EMPTY:
                    if really_result["appStatus"]["errorCode"] == 2: # 接口返回的2为参数异常
                        print("%s接口异常参数检测成功" %method)
                        kwargs["result"]["success"] += 1
                    else:
                        print("%s接口异常参数检测失败" % method)
                        kwargs["result"]["failed"] += 1
                    return
    else:
        print("请求发送失败,状态码为:%s" % really_result.get("status_code"))
def gevent_request(**kwargs):
    for i in kwargs["api_config"]:  # 读取各个接口的配置,api.ymal
        # 生成参数
        pict_param(params=i["params"], pict_params=Const.PICT_PARAMS,
                   pict_params_result=Const.PICT_PARAMS_RESULT)
        # 读取参数
        get_param = read_pict_param(Const.PICT_PARAMS_RESULT)
        count = len(get_param) # 根据不同分组参数,循环请求
        green_let = []
        req = {}
        for key in i:
            if key != "params":  # 过滤请求参数,参数上面已经处理好了
                req[key] = i[key]
        result = {}  # 统计数据
        result["name"] = req["name"]  # 接口名字
        result["method"] = req["method"]
        result["url"] = req["url"]
        result["sum"] = count
        result["stress"] = req["stress"]
        result["success"] = 0
        result["failed"] = 0
        kwargs["result"] = result
        for k in range(0, count):
            kwargs["param_result"] = get_param[k]  # 接口中不同的参数组合,是dict类型
            kwargs["param_req"] = req  #每次请求除组合参数之外的参数,如逾期只,请求的url,method,结束等
            for item in range(kwargs["param_req"]["stress"]):  # 压力测试,启动协程去压测
                green_let.append(gevents.requestGevent(myRequest(**kwargs)))
            for k in range(0, kwargs["param_req"]["stress"]):
                green_let[k].start()
            for k in range(0, kwargs["param_req"]["stress"]):
                green_let[k].join()
        Const.RESULT["info"].append(kwargs["result"])
def get_config(api_ymal):
    '''
    得到api.ymal中的设置的接口信息
    :param api_ymal:
    :return:
    '''
    http_config = {} # http信息的记录
    api_config = [] # api的记录记录
    get_api_list = operateYaml.getYam(api_ymal)
    for key in get_api_list:
        if type(get_api_list[key]) != list:
            http_config[key] = get_api_list[key]
        else:
            api_config = get_api_list[key]
    return http_config, api_config

if __name__ == "__main__":
    start_time = time.time()
    get_api_config = get_config(PATH("api.ymal"))
    http_conf = hc.ConfigHttp(dict_http=get_api_config[0]) # http请求的设置
    apiConfigs = get_api_config[1]
    check_sql = dbConnection. MySQLet(host=get_api_config[0]["database"]["host"], user=get_api_config[0]["database"]["user"],
                                     password=get_api_config[0]["database"]["password"], charset=get_api_config[0]["database"]["charset"],
                                     database=get_api_config[0]["database"]["databaseName"], port=get_api_config[0]["database"]["port"])
    gevent_request(http_config=http_conf, api_config=get_api_config[1], check_sql=check_sql)
    check_sql.close()
    end_time = time.time()
    print("共花费:""%.2f" % (end_time - start_time))
    print(Const.RESULT)


  • 封装好的访问数据库,查看主要代码来自这里,我修改了一些东西和 bug
import mysql.connector
import mysql.connector.errors
from common.customConst import Const
class MySQLet:
    """Connection to a MySQL"""
    # def __init__(self,user='',password='',database='',charset=None,port=3306):
    def __init__(self,**kwargs):
        try:
            self._conn = mysql.connector.connect(host=kwargs["host"], user=kwargs["user"], password=kwargs["password"],
                                                 charset=kwargs["charset"], database=kwargs["database"], port=kwargs["port"])
            self.__cursor = None
            print("连接数据库")
            #set charset charset = ('latin1','latin1_general_ci')
        except mysql.connector.errors.ProgrammingError as err:
            print('mysql连接错误:' + err.msg)

    # def findBySql(self, sql, params={}, limit=0, join='AND'):
    def findBySql(self, **kwargs):
        """
        自定义sql语句查找
        limit = 是否需要返回多少行
        params = dict(field=value)
        join = 'AND | OR'
        """
        print("-----------findbysql-----")
        print(kwargs)
        cursor = self.__getCursor()
        # sql = self.__joinWhere(kwargs["sql"], kwargs["params"], kwargs["join"])
        if kwargs.get("join", 0) == 0: kwargs["join"] = "AND"
        if kwargs.get("limit", "0") == "0": kwargs["limit"] = 1
        sql = self.__joinWhere(**kwargs)
        cursor.execute(sql, tuple(kwargs["params"].values()))
        rows = cursor.fetchmany(size=kwargs["limit"]) if kwargs["limit"] > 0 else cursor.fetchall()
        result = [dict(zip(cursor.column_names,row)) for row in rows] if rows else None
        return result

    # def countBySql(self,sql,params = {},join = 'AND'):
    def countBySql(self, **kwargs):
        """自定义sql 统计影响行数"""
        if kwargs.get("join", 0) == 0: kwargs["join"] = "AND"
        cursor = self.__getCursor()
        # sql = self.__joinWhere(kwargs["sql"], kwargs["params"], kwargs["join"])
        sql = self.__joinWhere(**kwargs)
        cursor.execute(sql, tuple(kwargs["params"].values()))
        result = cursor.fetchall() # fetchone是一条记录, fetchall 所有记录
        return len(result) if result else 0

    # def insert(self,table,data):
    def insert(self, **kwargs):
        """新增一条记录
          table: 表名
          data: dict 插入的数据
        """
        fields = ','.join('`'+k+'`' for k in kwargs["data"].keys())
        values = ','.join(("%s", ) * len(kwargs["data"]))
        sql = 'INSERT INTO `%s` (%s) VALUES (%s)' % (kwargs["table"], fields, values)
        cursor = self.__getCursor()
        cursor.execute(sql, tuple(kwargs["data"].values()))
        insert_id = cursor.lastrowid
        self._conn.commit()
        return insert_id

    # def updateByAttr(self,table,data,params={},join='AND'):
    def updateByAttr(self, **kwargs):
    #     """更新数据"""
        if kwargs.get("params", 0) == 0:
            kwargs["params"] = {}
        if kwargs.get("join", 0) == 0:
            kwargs["join"] = "AND"
        fields = ','.join('`' + k + '`=%s' for k in kwargs["data"].keys())
        values = list(kwargs["data"].values())


        values.extend(list(kwargs["params"].values()))
        sql = "UPDATE `%s` SET %s " % (kwargs["table"], fields)
        kwargs["sql"] = sql
        sql = self.__joinWhere(**kwargs)
        cursor = self.__getCursor()
        cursor.execute(sql, tuple(values))
        self._conn.commit()
        return cursor.rowcount


    # def updateByPk(self,table,data,id,pk='id'):
    def updateByPk(self, **kwargs):
        """根据主键更新,默认是id为主键"""
        return self.updateByAttr(**kwargs)

    # def deleteByAttr(self,table,params={},join='AND'):
    def deleteByAttr(self, **kwargs):
        """删除数据"""
        if kwargs.get("params", 0) == 0:
            kwargs["params"] = {}
        if kwargs.get("join", 0) == 0:
            kwargs["join"] = "AND"
        # fields = ','.join('`'+k+'`=%s' for k in kwargs["params"].keys())
        sql = "DELETE FROM `%s` " % kwargs["table"]
        kwargs["sql"] = sql
        # sql = self.__joinWhere(sql, kwargs["params"], kwargs["join"])
        sql = self.__joinWhere(**kwargs)
        cursor = self.__getCursor()
        cursor.execute(sql, tuple(kwargs["params"].values()))
        self._conn.commit()
        return cursor.rowcount

    # def deleteByPk(self,table,id,pk='id'):
    def deleteByPk(self, **kwargs):
        """根据主键删除,默认是id为主键"""
        return self.deleteByAttr(**kwargs)

    # def findByAttr(self,table,criteria = {}):
    def findByAttr(self, **kwargs):
        """根據條件查找一條記錄"""
        return self.__query(**kwargs)

    # def findByPk(self,table,id,pk='id'):
    def findByPk(self, **kwargs):
        return self.findByAttr(**kwargs)

    # def findAllByAttr(self,table,criteria={}, whole=true):
    def findAllByAttr(self, **kwargs):
        """根據條件查找記錄"""
        return self.__query(**kwargs)

    # def count(self,table,params={},join='AND'):
    def count(self, **kwargs):
        """根据条件统计行数"""
        if kwargs.get("join", 0) == 0: kwargs["join"] = "AND"
        sql = 'SELECT COUNT(*) FROM `%s`' % kwargs["table"]
        # sql = self.__joinWhere(sql, kwargs["params"], kwargs["join"])
        kwargs["sql"] = sql
        sql = self.__joinWhere(**kwargs)
        cursor = self.__getCursor()
        cursor.execute(sql, tuple(kwargs["params"].values()))
        result = cursor.fetchone()
        return result[0] if result else 0

    # def exist(self,table,params={},join='AND'):
    def exist(self, **kwargs):
        """判断是否存在"""
        return self.count(**kwargs) > 0

    def close(self):
        """关闭游标和数据库连接"""
        if self.__cursor is not None:
            self.__cursor.close()
        self._conn.close()

    def __getCursor(self):
        """获取游标"""
        if self.__cursor is None:
            self.__cursor = self._conn.cursor()
        return self.__cursor

    # def __joinWhere(self,sql,params,join):
    def __joinWhere(self, **kwargs):
        """转换params为where连接语句"""
        if kwargs["params"]:
            keys,_keys = self.__tParams(**kwargs)
            where = ' AND '.join(k+'='+_k for k,_k in zip(keys,_keys)) if kwargs["join"] == 'AND' else ' OR '.join(k+'='+_k for k,_k in zip(keys,_keys))
            kwargs["sql"]+=' WHERE ' + where
        return kwargs["sql"]

    # def __tParams(self,params):
    def __tParams(self, **kwargs):
        keys = ['`'+k+'`' for k in kwargs["params"].keys()]
        _keys = ['%s' for k in kwargs["params"].keys()]
        return keys,_keys

    # def __query(self,table,criteria,whole=False):
    def __query(self, **kwargs):
        if kwargs.get("whole", False) == False or kwargs["whole"] is not True:
            kwargs["whole"] = False
            kwargs["criteria"]['limit'] = 1
        # sql = self.__contact_sql(kwargs["table"], kwargs["criteria"])
        sql = self.__contact_sql(**kwargs)
        cursor = self.__getCursor()
        cursor.execute(sql)
        rows = cursor.fetchall() if kwargs["whole"] else cursor.fetchone()
        result = [dict(zip(cursor.column_names, row)) for row in rows] if kwargs["whole"] else dict(zip(cursor.column_names, rows)) if rows else None
        return result

    # def __contact_sql(self,table,criteria):
    def __contact_sql(self, **kwargs):
        sql = 'SELECT '
        if kwargs["criteria"] and type(kwargs["criteria"]) is dict:
            #select fields
            if 'select' in kwargs["criteria"]:
                fields = kwargs["criteria"]['select'].split(',')
                sql+= ','.join('`'+field+'`' for field in fields)
            else:
                sql+=' * '
            #table
            sql+=' FROM `%s`'% kwargs["table"]
            #where
            if 'where' in kwargs["criteria"]:
                sql+=' WHERE '+ kwargs["criteria"]['where']
            #group by
            if 'group' in kwargs["criteria"]:
                sql+=' GROUP BY '+ kwargs["criteria"]['group']
            #having
            if 'having' in kwargs["criteria"]:
                sql+=' HAVING '+ kwargs["criteria"]['having']
            #order by
            if 'order' in kwargs["criteria"]:
                sql+=' ORDER BY '+ kwargs["criteria"]['order']
            #limit
            if 'limit' in kwargs["criteria"]:
                sql+=' LIMIT '+ str(kwargs["criteria"]['limit'])
            #offset
            if 'offset' in kwargs["criteria"]:
                sql+=' OFFSET '+ str(kwargs["criteria"]['offset'])
        else:
            sql+=' * FROM `%s`'% kwargs["table"]
        return sql
    def findKeySql(self, key ,**kwargs):
        print("-----------")
        print(key)
        sqlOperate = {
        Const.COUNT: lambda: self.count(**kwargs),
        Const.COUNT_BY_SQL: lambda: self.countBySql(**kwargs),
        Const.DELETE_BY_ATTR: lambda: self.deleteByAttr(**kwargs),
        Const.EXIST: lambda: self.exist(**kwargs),
        Const.FIND_ALL_BY_ATTR: lambda: self.findAllByAttr(**kwargs),
        Const.INSERT: lambda: self.insert(**kwargs),
        Const.FIND_BY_ATTR: lambda: self.findByAttr(**kwargs),
        Const.UPDATE_BY_ATTR: lambda: self.updateByAttr(**kwargs),
        Const.FIND_BY_SQL: lambda: self.findBySql(**kwargs)

        }
        return sqlOperate[key]()

if __name__ == "__main__":
    mysqlet = MySQLet(host="127.0.0.1", user="root", password="", charset="utf8", database="userinfo", port=3306)
    # 根据字段统计count, join>>AND,OR,可以不传,默认为AND
    # print(mysqlet.findKeySql(Const.COUNT, table="info", params={"id": "11", "name": "666"}, join="OR"))
    # # 自定义sql语句统计count
    # print(mysqlet.findKeySql(Const.COUNT_BY_SQL, sql="select * from info", params={"name": "666"}, join="AND"))
    # #插入数据
    # print(mysqlet.findKeySql(Const.INSERT, table="info", data={"name":"333", "pwd": "111"}))
  • 测试结果分析

  • 现在只是简单的记录下结果,后续优化

#{'method': 'post', 'success': 32, 'stress': 2, 'failed': 0, 'url': '/mockjs/11463/login', 'name': '登陆', 'sum': 16}
  '''
  sum 表示此接口有16组参数
  stress: 表示每组参数压测两次
  method: 请求方法
  success: 成功请求次数
  failed:失败请求次数
  url:请求的网址
  name:接口名字
  '''

其他

  • 后面会简单把结果记录到 excel 中,发邮件
  • 最终的目的是想做成平台化,这里估计短时间内无法完成
  • 可以点击查看源代码
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
共收到 21 条回复 时间 点赞

加油,如果一直做下去,建议切换语言为 java

#1 楼 @kasi 为啥要切换成 java,效率问题么?

#1 楼 @kasi 同问,我感觉用 python 挺好,也没有出现撒瓶颈啊,现在打算入手 django

建议用 java 吧,后期要考虑数据还是初始化,无论 redis,memcached 缓存数据准备,或者 mysql 数据准备,切合开发的技术选型框架很重要,我自己这边就是直接抛掉 python 搞 java 去了

—— 来自 TesterHome 官方 安卓客户端

期待你改成 java 版

—— 来自 TesterHome 官方 安卓客户端

大部分工程都是 java、数据类型会搞死

恩,之前就想过学学下 java,既然如此那好好学习下 java 去

—— 来自 TesterHome 官方 安卓客户端

@hu_qingen @kasi 目前 redis 缓存数据,mysql 数据准备仍然是用 python 写的,目前还没有遇见什么大的瓶颈,可能是我用得还不深?

@shixue33 你的数据类型碰到的太少了,例如 java 给的参数中有 double,integer 之类的你就会痛苦了

用 python 挺好呀

#9 楼 @kasi orz,我们接口所有所有所有数据,全部是转成了 string

@shixue33 只能说你们太幸福了~ but 你要做内部服务的时候 还是得切回 java 的 O(∩_∩) O

#12 楼 @kasi 恩,确实是我现在测试目标全部是外部的接口,不知道后台开发基于什么考虑,所有的接口调用,数据返回全部是 string 类型,以 json 组织。我在准备数据时,从数据库和 redis 抓回的数据都是直接转成了 dict 类型,只有一些时间类型需要特殊转换一下。感觉直接在 python 里边操作 json 数据很方便的样子?

我的理解,用 java 是不是就可以直接复用开发使用的 bean class ,从而减少了建模?

#8 楼 @shixue33 redis,zk 这些 python 可以搞定,但我个人放弃 python 的一个真正原因,局限性,具体有点忘了,当时处理 rsa 加解密的时候,参考 java 实现方法重写 python 方法,结果怎么加密都不对,后来查了国外资料,python 只支持 128 个 bit 位,不支持 256,其次这些核心方法 java 开发都已经写好了,轮子造好了,直接拿来用好了;顺便说一点,在一些大型企业内,很多都是私有协议,程序内部处理逻辑不是简单的 http 协议,这个时候用 python,我个人真不知道如何处理,反过来用 java,我根本不必关心怎么处理,我调它的方法就好了

—— 来自 TesterHome 官方 安卓客户端

#14 楼 @hu_qingen 我之前玩过 rsa,倒没有遇见你的问题……不过 JAVA 的优势的确有很多

—— 来自 TesterHome 官方 安卓客户端

其实最关键的就是服务端开发都是用 java,所以你只能用 java

#17 楼 @kasi 打算入手 groovy,很喜欢这个的语法,连 sql 也封装好

一样的坑 你到后面会发现的 除非你写中间件

#19 楼 @kasi 指点下,难道要全部手写原生的?

我之前还用 python 调 beanshell 来处理 java 的各种坑 实在没办法只能用 java

测试小书童 接口测试覆盖率设计讨论 中提及了此贴 02月28日 11:06
测试小书童 关闭了讨论 06月18日 15:49
需要 登录 后方可回复, 如果你还没有账号请点击这里 注册