相关链接

关于这个框架

设计初衷:

解决我们项目的接口测试痛点。从之前的 1-2 小时测试时间压缩到现在的 1 分钟以内,效率提升,效果显著

对于读者:

关于框架代码结构:

本次更新

重试机制

主要用于重试连接超时接口、response 响应码非 200 的接口、其他异常情况

代码片段
def retry11(app_type, retry=3):
    """
    重试机制,默认3次
    :param app_type: 0 >> A; 1 >> B; 2 >> C; 3 >> D
    :param retry: 重试次数
    :return:
    """
    r1 = Retry(retry)
    if len(r1.get_diff()) > 0:
        print('发现diff接口,重试机制启动...')
        r1.retry1(app_type)


class Retry(object):
    def __init__(self, retry):
        self.retry = retry
        self.after_normal_sessions_path = '%s%s%s' % (
            utils.GlobalList.SESSIONS_PATH, "\\Sessions\\", utils.GlobalList.HOST)

    def __get_normal_after_sessions(self):
        """
        获取遍历后正常(接口通过)的接口列表
        :return:
        """
        return utils.FileUtil.get_file_list(self.after_normal_sessions_path)

    def __get_not_normal_after_sessions(self):
        """
        获取需要人工验证的接口VerifyRequest
        :return:
        """
        return self.__get_check_after_sessions('VerifyRequest')

    def __get_crash_after_sessions(self):
        """
        获取程序异常接口ProgramCrash
        :return:
        """
        return self.__get_check_after_sessions('ProgramCrash')

    def __get_unexpected_after_sessions(self):
        """
        获取非预期接口Unexpected
        :return:
        """
        return self.__get_check_after_sessions('Unexpected')

    def __get_field_change_after_sessions(self):
        """
        获取字段改变接口FieldChange
        :return:
        """
        return self.__get_check_after_sessions('FieldChange')

    def __get_check_after_sessions(self, sessions_type):
        """
        获取需要检查的接口列表,已去重
        :param sessions_type:
        :return:
        """
        path = '%s%s%s%s' % (self.after_normal_sessions_path, '\\Check\\', sessions_type, '.txt')
        try:
            l = open(path, encoding='utf-8').readlines()
            sessions1 = ('%s%s' % (i.replace('\n', '')[::-1].split('/', 1)[0][::-1], '.txt') for i in l if
                         i.startswith('Request url: '))
            return list(set(sessions1))
        except FileNotFoundError:
            return ()

    def get_diff(self):
        """
        获取diff接口
        diff 接口包含类型
        1.response 响应码 非200的接口
        2.请求超时的接口
        3.其他未知情况的接口(如 代码异常导致)
        :return:
        """
        before_sessions = utils.GlobalList.BEFORE_SESSIONS
        after_sessions = []
        normal_after_sessions = self.__get_normal_after_sessions()
        not_normal_after_sessions = self.__get_not_normal_after_sessions()
        unexpected_after_sessions = self.__get_unexpected_after_sessions()
        field_change_after_sessions = self.__get_field_change_after_sessions()
        crash_after_sessions = self.__get_crash_after_sessions()
        if str(type(normal_after_sessions)) != "<class 'NoneType'>":
            after_sessions.extend(normal_after_sessions)
        if str(type(not_normal_after_sessions)) != "<class 'NoneType'>":
            after_sessions.extend(not_normal_after_sessions)
        if str(type(crash_after_sessions)) != "<class 'NoneType'>":
            after_sessions.extend(crash_after_sessions)
        if str(type(unexpected_after_sessions)) != "<class 'NoneType'>":
            after_sessions.extend(unexpected_after_sessions)
        if str(type(field_change_after_sessions)) != "<class 'NoneType'>":
            after_sessions.extend(field_change_after_sessions)
        return list(set(before_sessions).difference(set(after_sessions)))

    def __get_diff_sessions(self):
        """
        从本地磁盘读取diff接口sessions(request url; request header; ...)
        pass: 一个接口多条session
        :return:
        """
        diff = self.get_diff()
        for d in diff:
            print('diff sessions: %s' % (d, ))
            total_session = sessions.ReadSessions.ReadSessions().get_single_session(d)
            if len(total_session) == 0:
                print('发现录制异常接口:' + d)
                print('执行移除操作,移除重试队列')
                # 移除录制异常的接口
                os.remove('%s%s%s%s%s' % (utils.GlobalList.SESSIONS_PATH, "\\Api\\", utils.GlobalList.HOST, "\\", d))
                # 全局变量遍历前的全部接口也需要移除
                utils.GlobalList.BEFORE_SESSIONS.remove(d)
            else:
                yield sessions.ReadSessions.ReadSessions().get_single_session(d)

    def __will_request_sessions(self):
        """
        将要重跑的sessions,把多个接口的多个session合并为一个列表
        :return:
        """
        s = self.__get_diff_sessions()
        for i in s:
            for j in i:
                yield j

    def __request_sessions(self, app_type):
        """
        请求接口
        :return:
        """
        s = self.__will_request_sessions()
        base.Request.thread_pool(app_type, s)

    def retry1(self, app_type):
        """
        重试的接口类型
        1.response 响应码 非200的接口
        2.请求超时的接口
        3.其他未知情况的接口(如 代码异常导致)
        注意:已经知道失败的接口不会再重试,目前是这样考虑的
        :return:
        """
        temp = self.retry
        while self.retry > 0:
            self.retry -= 1
            # 请求接口
            self.__request_sessions(app_type)
            print('第%d次尝试请求diff...' % (temp - self.retry, ))
            # 再次求差异化文件,还有diff继续,否则停止
            if len(self.get_diff()) > 0 and self.retry > 0:
                print('发现diff存在,继续尝试请求...')
                continue
            else:
                break
        print('diff请求完成...')
清理测试数据

目的:在于避免接口回放创建的数据(如发朋友圈)对线上数据的影响

思路:通过一个接口对(如:发朋友圈与删除朋友圈),即创建数据与删除数据完成操作

代码片段
def clear_up(app_type):
    """
    执行清理创建的数据
    :param app_type:
    :return:
    """
    print("清理创建的接口数据...")
    DelaySessions().request_sessions(app_type)


class DelaySessions(object):
    def __init__(self):
        self.create_session_path = '%s%s%s%s' % (utils.GlobalList.SESSIONS_PATH, "\\Sessions\\", utils.GlobalList.HOST, "\\")
        self.delete_session_path = '%s%s%s%s' % (utils.GlobalList.SESSIONS_PATH, "\\Api\\", utils.GlobalList.HOST, "\\")
        self.create_sessions_parameter_value = self.__get_all_session_create_parameter()

    def __get_single_session_create_parameter(self, session_name):
        """
        获取单个创建数据接口response body json中创建数据成功的字段
        :return:
        """
        total_session = []
        parameter = []
        file_path = '%s%s%s' % (self.create_session_path, session_name, '.txt')
        if os.path.exists(file_path):
            total_session = sessions.ReadSessions.ReadSessions().get_single_session_full_path(
                '%s%s%s' % (self.create_session_path, session_name, '.txt'))
        req = re.compile(r'"%s":[0-9]+' % (utils.GlobalList.CREATE_DICT[session_name], ))
        for i in total_session:
            parameter.append(re.findall(req, i[-1])[0].split(":")[-1])
        return parameter

    def __get_all_session_create_parameter(self):
        """
        获取所有创建数据接口response body json中创建数据成功的字段
        :return:
        """
        create_parameter = {}
        for i in utils.GlobalList.CREATE_DICT.keys():
            create_parameter[i] = self.__get_single_session_create_parameter(i)
        return create_parameter

    def __get_single_session_delete_parameter(self, session_name):
        """
        替换单个删除数据接口request body中传入删除数据的字段值i
        :return:
        """
        total_session = []
        delete_session_name = utils.GlobalList.DELETE_DICT[session_name]
        file_path = '%s%s%s' % (self.delete_session_path, session_name, '.txt')
        if os.path.exists(file_path):
            total_session = sessions.ReadSessions.ReadSessions().get_single_session_full_path(file_path)
        req = re.compile(r'%s=[0-9]+' % (delete_session_name, ))
        for i in total_session:
            if len(i) == 4:
                temp = re.findall(req, i[1])[0].split('=')[-1]
                for j in utils.GlobalList.MAPPING_DICT.keys():
                    if j == session_name:
                        # 匹配对应的创建数据接口
                        create_session_name = utils.GlobalList.MAPPING_DICT[session_name]
                        # 取第一个值,用完删除
                        value = self.create_sessions_parameter_value[create_session_name][0]
                        i = str(i).replace(str(temp), value)  # 替换value
                        l = list(self.create_sessions_parameter_value[create_session_name])
                        l.remove(value)
                        self.create_sessions_parameter_value[create_session_name] = l
                        return eval(i)

    def __get_all_session_delete_parameter(self):
        """
        替换全部删除数据接口request body中传入删除数据的字段值,并返回一个即将请求接口的list
        :return:
        """
        # 根据创建数据接口找到对应的删除接口,并拿出创建数据接口值的长度,多长就调用多少次对应删除接口
        for i in self.create_sessions_parameter_value.keys():
            for j in utils.GlobalList.MAPPING_DICT.keys():
                if utils.GlobalList.MAPPING_DICT[j] == i:
                    for k in range(1, len(self.create_sessions_parameter_value[i]) + 1):
                        # 此处可优化 目前会读取多次文件
                        yield self.__get_single_session_delete_parameter(j)

    def request_sessions(self, app_type):
        """
        请求删除接口
        :return:
        """
        s = self.__get_all_session_delete_parameter()
        base.Request.thread_pool(app_type, s)
        print("接口数据清理完成!")
response 字段校验

目的:校验是否缺失字段、校验字段类型是否改变、校验结果是否预期结果

思路:遍历 response body json 生成 <字段 | 字段类型的 item> 存入 list

pass:

收获:

字段类型改变(一般会导致客户端崩溃,当然客户端容错机制也没做到位)

Diff: ['CircleUserName|int', 'CircleUserName|str']
"CircleId":6420,"CircleUserName":0,"PostDate":""
"CircleId":6152,"CircleUserName":"虾米","PostDate":"2016年07月22日"

再细化的 response 字段校验

代码片段

遍历 json 取出字段及其类型

def decode_json(self, json_data):
    """
    解析json并返回对应的key|value
    :param json_data: json数据源
    :return: 返回json各字段以及字段值
    """
    try:
        data = json.loads(json_data)
    except Exception as e:
        print(e)
        print("JSON format error")
        return []
    self.__iterate_json(data)
    return self.json_list


def __iterate_json(self, json_data, i=0):
    """
    遍历json
    :param i: 遍历深度
    :param json_data: json数据源
    :return: 返回json各字段以及字段值
    """
    if isinstance(json_data, dict):
        for k in json_data.keys():
            self.json_list.append('%s|%s' % (k, str(type(json_data[k])).split("'")[1]))
            if str(type(json_data[k])).startswith("<class 'list'>"):
                if len((json_data[k])) and isinstance(json_data[k][0], dict):
                    self.__iterate_json(json_data[k][0], i=i + 1)
            if isinstance(json_data[k], dict):
                self.__iterate_json(json_data[k], i=i + 1)
    else:
        print("JSON format error")

接口流程走向

接口回归测试启动...
清理测试数据...
读取配置文件中...
读取接口数据中...
接口请求中,请等待...
http://a-b.test.c.com/api/Circle/AddCancelCollectCircle
....................................................
http://a-b.test.c.com/api/GroupActivity/UploadActivityImage
http://a-b.test.c.com/api/photo/UploadImage
RequestException url: http://a-b.test.c.com/api/Demand/GetDemandKnockSourceListV4
HTTPConnectionPool(host='http://a-b.test.c.com', port=80): Read timed out. (read timeout=30)
IndexError url:
http://a-b.test.c.com/api/Demand/GetDemandKnockSourceListV4
接口请求完成!
发现diff接口,重试机制启动...
第1次尝试请求diff...
diff sessions: GetDemandKnockSourceListV4.txt
diff sessions: GetSecondHouseTopic.txt
http://a-b.test.c.com/api/Demand/GetDemandKnockSourceListV4
http://a-b.test.c.com/api/SecondHouseSource/GetSecondHouseTopic
RequestException url: http://a-b.test.c.com/api/Demand/GetDemandKnockSourceListV4
HTTPConnectionPool(host='http://a-b.test.c.com', port=80): Read timed out. (read timeout=30)
IndexError url:
http://a-b.test.c.com/api/Demand/GetDemandKnockSourceListV4
发现diff存在,继续尝试请求...
第2次尝试请求diff...
diff sessions: GetDemandKnockSourceListV4.txt
http://a-b.test.c.com/api/Demand/GetDemandKnockSourceListV4
RequestException url: http://a-b.test.c.com/api/Demand/GetDemandKnockSourceListV4
HTTPConnectionPool(host='http://a-b.test.c.com', port=80): Read timed out. (read timeout=30)
IndexError url:
http://a-b.test.c.com/api/Demand/GetDemandKnockSourceListV4
发现diff存在,继续尝试请求...
第3次尝试请求diff...
diff sessions: GetDemandKnockSourceListV4.txt
http://a-b.test.c.com/api/Demand/GetDemandKnockSourceListV4
RequestException url: http://a-b.test.c.com/api/Demand/GetDemandKnockSourceListV4
HTTPConnectionPool(host='http://a-b.test.c.com', port=80): Read timed out. (read timeout=30)
IndexError url:
http://a-b.test.c.com/api/Demand/GetDemandKnockSourceListV4
diff请求完成...
正在整理创建的数据...
清理创建的接口数据...
http://a-b.test.c.com/api/Circle/DeleteContent
http://a-b.test.c.com/api/Group/DeleteAnnouncement
http://a-b.test.c.com/api/RentDemand/DeleteRentDemandById
http://a-b.test.c.com/api/GroupFile/DeleteGroupFile
http://a-b.test.c.com/api/GroupDynamic/DeleteGroupDynamic
http://a-b.test.c.com/api/Demand/DeleteDemandById
http://a-b.test.c.com/api/GroupActivity/DeleteGroupActivity
接口数据清理完成!
测试报告准备中...
接口回归测试完成!
耗时: 125s

请求接口后写入本地的数据说明

FieldChange >> 字段改变的接口写入该文件
ProgramCrash >> 程序异常接口写入该文件
Unexpected >> 未达到预期字段校验的接口写入该文件
VerifyRequest >> 需要再次确认的接口写入该文件
GetUserInfoV2 >> 正常接口(一个接口一个文件)

关于接口回放的数据

框架的下一步

框架的更下一步

GitHub

框架地址


↙↙↙阅读原文可查看相关链接,并与作者交流