自动化工具 python+react 完成前后端分离的接口自动化平台【http://www.julio2th.top:8070】

知识分子 · 2021年07月19日 · 最后由 知识分子 回复于 2021年07月26日 · 5877 次阅读

后端使用了 python+mysql 的方式进行数据处理,较其他框架使用 allure 等生成报告,本平台采用自主收集大部分数据,使用 mysql 长文本方式存储,使用这种方式的目的大致是为了方便自定义前端界面报告显示,缺点也比较明显,针对于测试套件,mysql 的读写来得可能还是不够高效,打算后期报告数据存放在 mongo 里面;
前端是采用 react+antd 进行开发,目前安排的模块如下:
1.接口管理,方便后续添加测试用例的时候直接获取 url+method 等信息,更为快速

2.全局环境变量管理
目前后端代码,数据结构设计都已完成,前端暂未完成(所以就不放截图了),可以存放多套环境变量,用于测试套件执行和单用例调试时使用

3.测试用例 + 测试步骤管理

步骤目前安排了两类(有其他的需要添加的麻烦有经验的大佬推荐下):1.接口请求 2.数据对比


调试的时候可以选择一套环境变量进行执行,默认为上次执行所选择的一套环境,执行的过程中为全量数据对比,忽略设置的忽略值

4.测试套件管理,用于批量测试用例,可配置一套环境变量

5.测试计划管理,用于定时执行套件或用例

6.日志管理,用于存放和查看执行报告


前端的目录结构大致是这样的

目前已部署在自己的服务器上,欢迎大家体验并指出问题:http://www.julio2th.top:8070

最佳回复

目前已部署在自己的服务器上,欢迎大家体验并指出问题:http://julio.zone:7000

共收到 14 条回复 时间 点赞
知识分子 关闭了讨论 07月19日 20:32
知识分子 重新开启了讨论 07月19日 20:33
仅楼主可见

能把前端代码分享下吗。最近也在看 react,准备做这一块的开发,也准备用 antd+react

目前前端还有一些没有完成,完成之后再分享

请问楼主 allure 报告怎么集成在平台上的,怎么存储每次执行的 allure 报告,然后在平台查看呢

楼主请问用例执行是怎么实现的呢

回复

用例执行的话我设计了 3 种类型,一种是单用例选择环境变量执行,每次可以独立选择,默认带出上一次的,第二种是测试套件配置环境变量执行,第三种定时执行!执行过程就是收集库里面的测试用例,按顺序执行,目前没考虑多线程,因为有些用例是会有先后顺序的

轻舟 回复

我这个没有集成 alluer 报告哈,是自己收集的报告,如果是集成 alluer 报告的话,可以采用 iframe 的方式嵌入!因为他们编码方式确实不同

知识分子 回复

e, 之前个人的做法是直接采用 for 循环,循环里面执行请求接口的方法 感觉方式欠佳,方便透露下您的做法嘛

回复

我的做法和你这边差不多,主要是在异常处理上面做了一些东西,将 for 循环拆分到各个方法里面进行区分,代码如下,不知道有没有解决到你的问题,或者可以指出我方法里面可以优化的地方,一起讨论

class RunCase:

    def __init__(self, env_variable_id):
        self._result_data = {"pass": 0, "fail": 0, "result": []}
        self._db = api_platform_test_db
        self._env_param = EnvVariableParam().get_env_variable_param(env_variable_id)
        self._env_variable_id = env_variable_id
        self._case_env_param = {}
        self._running_case_name = None
        self._running_case_id = None
        self.logs_tool = CaseRunningLogs()

    def collect_case(self, test_case_id):
        case_steps = self._db.getAll(
            "SELECT * FROM `api_platform`.`test_case_step` WHERE `test_case_id` = {} AND `status` = 0".format(
                test_case_id))
        return case_steps

    def run_test_suite(self, suite_id, create_by, implement_type=2):
        suite_data = self._db.getOne(
            "SELECT `name`,`test_case_list` FROM `api_platform`.`test_suite` WHERE `id` = {} AND `status` = 0".format(
                suite_id))
        if suite_data:
            insert_id = self._db.insertOne(
                "INSERT INTO `api_platform`.`implement_logs` (`relation_id`, `env_variable_id`, `implement_type`, `create_by`, `create_at`) VALUES ({}, {}, {},'{}', {});".format(
                    suite_id, self._env_variable_id,implement_type, create_by, round(time.time())))
            self._db.end()
            try:
                test_case_list = eval(suite_data[1])
                for case in test_case_list:
                    self._run_single_case(case)
                self._db.update(
                    "UPDATE `api_platform`.`implement_logs` SET `logs`= %s, `status` = 1, `update_at`= %s WHERE `id`= %s;",
                    [str(self._result_data), round(time.time()), insert_id])
                self._db.end()
            except Exception as e:
                self._db.update(
                    "UPDATE `api_platform`.`implement_logs` SET `logs`= %s, `status` = 2, `update_at`= %s WHERE `id`= %s;",
                    [str(e), round(time.time()), insert_id])
                self._db.end()
        return self._result_data

    def run_single_case(self, test_case_id, create_by):
        insert_id = self._db.insertOne(
            "INSERT INTO `api_platform`.`implement_logs` (`relation_id`, `env_variable_id`, `implement_type`, `create_by`, `create_at`) VALUES ({}, {}, 0, '{}', {});".format(
                test_case_id, self._env_variable_id, create_by, round(time.time())))
        self._db.end()
        try:
            self._run_single_case(test_case_id)
            self._db.update(
                "UPDATE `api_platform`.`implement_logs` SET `logs`= %s, `status` = 1, `update_at`= %s WHERE `id`= %s;",
                [str(self._result_data), round(time.time()), insert_id])
            self._db.end()
        except Exception as e:
            self._db.update(
                "UPDATE `api_platform`.`implement_logs` SET `logs`= %s, `status` = 2, `update_at`= %s WHERE `id`= %s;",
                [str(e), round(time.time()), insert_id])
            self._db.end()
        return self._result_data

    def _run_single_case(self, test_case_id):
        case_steps = self.collect_case(test_case_id)
        self._running_case_name = \
            self._db.getOne("SELECT `name` FROM `api_platform`.`test_case` WHERE id = {};".format(test_case_id))[0]
        self._running_case_id = test_case_id
        self.logs_tool = CaseRunningLogs()
        self.logs_tool.info(
            "*" * 60 + "【开始执行测试用例-{}-{}】".format(self._running_case_id, self._running_case_name) + "*" * 60)
        if case_steps:
            for step in case_steps:
                try:
                    self.logs_tool.info(" "*20+"-" * 40 + "【开始执行步骤-{}-{}】".format(step[0], step[3]) + "-" * 40)
                    if step[2] == 0:
                        path = self.replace_variable(step[4])
                        header = self.replace_variable(step[6]) if step[6] else '{}'
                        param = self.replace_variable(step[7]) if step[7] else '{}'
                        if step[5] == 0:
                            res = requests.get(url=path, headers=eval(header), params=eval(param))
                            self.logs_tool.info(
                                "获取响应数据为: {}".format(json.dumps(res.json(), indent=4, ensure_ascii=False)))
                            if step[11]:
                                self.save_case_env_param(eval(step[11]), res.json())
                        elif step[5] == 1:
                            res = requests.post(url=path, headers=eval(header), json=eval(param))
                            self.logs_tool.info(
                                "获取响应数据为: {}".format(json.dumps(res.json(), indent=4, ensure_ascii=False)))
                            if step[11]:
                                self.save_case_env_param(eval(step[11]), res.json())
                        elif step[5] == 2:
                            res = requests.head(url=path, headers=eval(header))
                            self.logs_tool.info(
                                "获取响应数据为: {}".format(json.dumps(res.json(), indent=4, ensure_ascii=False)))
                            if step[11]:
                                self.save_case_env_param(eval(step[11]), res.json())
                        elif step[5] == 3:
                            res = requests.put(url=path, headers=eval(header), data=eval(param))
                            self.logs_tool.info(
                                "获取响应数据为: {}".format(json.dumps(res.json(), indent=4, ensure_ascii=False)))
                            if step[11]:
                                self.save_case_env_param(eval(step[11]), res.json())
                        elif step[5] == 4:
                            res = requests.delete(url=path, headers=eval(header))
                            self.logs_tool.info(
                                "获取响应数据为: {}".format(json.dumps(res.json(), indent=4, ensure_ascii=False)))
                            if step[11]:
                                self.save_case_env_param(eval(step[11]), res.json())
                        elif step[5] == 5:
                            res = requests.get(url=path, headers=eval(header), params=eval(param))
                            self.logs_tool.info(
                                "获取响应数据为: {}".format(json.dumps(res.json(), indent=4, ensure_ascii=False)))
                            if step[11]:
                                self.save_case_env_param(eval(step[11]), res.json())
                        elif step[5] == 6:
                            res = requests.options(url=path, headers=eval(header))
                            self.logs_tool.info(
                                "获取响应数据为: {}".format(json.dumps(res.json(), indent=4, ensure_ascii=False)))
                            if step[11]:
                                self.save_case_env_param(eval(step[11]), res.json())
                        elif step[5] == 7:
                            res = requests.get(url=path, headers=eval(header), params=eval(param))
                            self.logs_tool.info(
                                "获取响应数据为: {}".format(json.dumps(res.json(), indent=4, ensure_ascii=False)))
                            if step[11]:
                                self.save_case_env_param(eval(step[11]), res.json())
                        elif step[5] == 8:
                            res = requests.patch(url=path, headers=eval(header), data=eval(param))
                            self.logs_tool.info(
                                "获取响应数据为: {}".format(json.dumps(res.json(), indent=4, ensure_ascii=False)))
                            if step[11]:
                                self.save_case_env_param(eval(step[11]), res.json())
                    elif step[2] == 1:
                        except_body = eval(self.replace_variable(step[8]))
                        actual_body = eval(self.replace_variable(step[9]))
                        ComparData(self.logs_tool).do_check(expect=except_body, actual=actual_body,
                                                            ignore=eval(step[10]))
                except Exception as e:
                    self.logs_tool.info("执行异常,错误信息为: {}".format(e))
                    self._result_data['result'].append(
                        {"case_id": self._running_case_id, "case_name": self._running_case_name, "status": 1,
                         "running_logs": self.logs_tool.get_logs()})
                    self._result_data["fail"] += 1
                    return self._result_data
            self._result_data['result'].append(
                {"case_id": self._running_case_id, "case_name": self._running_case_name, "status": 0,
                 "running_logs": self.logs_tool.get_logs()})
            self._result_data["pass"] += 1
            return self._result_data

    def replace_variable(self, data):
        try:
            rep = re.findall('{{(.*?)}}', data)
            # rep = re.findall('{{.*?}}', data)
            for variable in rep:
                if variable in self._env_param.keys():
                    data = data.replace("{{" + variable + "}}", str(self._env_param[variable]))
                else:
                    data = data.replace("{{" + variable + "}}", str(self._case_env_param[variable]))
            return data
        except Exception as e:
            self.logs_tool.info("不存在的环境变量 {}".format(e))
            raise Exception("不存在的环境变量 {}".format(e))

    def save_case_env_param(self, case_env_param, response):
        try:
            for key_value in case_env_param.items():
                result = response
                if key_value[1]:
                    expect_split = key_value[1].split('.')
                    for i in range(0, len(expect_split)):
                        if isinstance(result, dict):
                            result = result[expect_split[i]]
                        elif isinstance(result, list):
                            result = result[int(expect_split[i])]
                self._case_env_param[key_value[0]] = result
        except Exception as e:
            self.logs_tool.info("响应中未找到设置参数 {} 中的 {}, 响应内容为 {}".format(case_env_param, e, response))
            raise Exception("响应中未找到设置参数 {} 中的 {}, 响应内容为 {}".format(case_env_param, e, response))
知识分子 回复

之前有考虑多线程的方式运行,提升执行效率;但是目前没有对强关联的 case 里面做一些逻辑,所以只能按顺序执行;后面把强关联做了之后,就会用多线程的方式实现用例执行

知识分子 回复

期待后续😀

知识分子 · #14 · 2021年07月20日 Author
仅楼主可见

目前已部署在自己的服务器上,欢迎大家体验并指出问题:http://julio.zone:7000

需要 登录 后方可回复, 如果你还没有账号请点击这里 注册