• 遍历出来,一个 list 包 json 不算难吧

  • 目前已部署在自己的服务器上,欢迎大家体验并指出问题:http://julio.zone:7000

  • 请教一个 sql 查询问题 at 2021年07月21日

    select * from (select * from categorys order by value desc) c group by category;
    或者
    select distinct(category), value from (select * from categorys order by value desc) c;

  • 学习前端,做个似的平台,上手很快

  • 仅楼主可见
  • 之前有考虑多线程的方式运行,提升执行效率;但是目前没有对强关联的 case 里面做一些逻辑,所以只能按顺序执行;后面把强关联做了之后,就会用多线程的方式实现用例执行

  • 我的做法和你这边差不多,主要是在异常处理上面做了一些东西,将 for 循环拆分到各个方法里面进行区分,代码如下,不知道有没有解决到你的问题,或者可以指出我方法里面可以优化的地方,一起讨论

    class RunCase:
    
        def __init__(self, env_variable_id):
            self._result_data = {"pass": 0, "fail": 0, "result": []}
            self._db = api_platform_test_db
            self._env_param = EnvVariableParam().get_env_variable_param(env_variable_id)
            self._env_variable_id = env_variable_id
            self._case_env_param = {}
            self._running_case_name = None
            self._running_case_id = None
            self.logs_tool = CaseRunningLogs()
    
        def collect_case(self, test_case_id):
            case_steps = self._db.getAll(
                "SELECT * FROM `api_platform`.`test_case_step` WHERE `test_case_id` = {} AND `status` = 0".format(
                    test_case_id))
            return case_steps
    
        def run_test_suite(self, suite_id, create_by, implement_type=2):
            suite_data = self._db.getOne(
                "SELECT `name`,`test_case_list` FROM `api_platform`.`test_suite` WHERE `id` = {} AND `status` = 0".format(
                    suite_id))
            if suite_data:
                insert_id = self._db.insertOne(
                    "INSERT INTO `api_platform`.`implement_logs` (`relation_id`, `env_variable_id`, `implement_type`, `create_by`, `create_at`) VALUES ({}, {}, {},'{}', {});".format(
                        suite_id, self._env_variable_id,implement_type, create_by, round(time.time())))
                self._db.end()
                try:
                    test_case_list = eval(suite_data[1])
                    for case in test_case_list:
                        self._run_single_case(case)
                    self._db.update(
                        "UPDATE `api_platform`.`implement_logs` SET `logs`= %s, `status` = 1, `update_at`= %s WHERE `id`= %s;",
                        [str(self._result_data), round(time.time()), insert_id])
                    self._db.end()
                except Exception as e:
                    self._db.update(
                        "UPDATE `api_platform`.`implement_logs` SET `logs`= %s, `status` = 2, `update_at`= %s WHERE `id`= %s;",
                        [str(e), round(time.time()), insert_id])
                    self._db.end()
            return self._result_data
    
        def run_single_case(self, test_case_id, create_by):
            insert_id = self._db.insertOne(
                "INSERT INTO `api_platform`.`implement_logs` (`relation_id`, `env_variable_id`, `implement_type`, `create_by`, `create_at`) VALUES ({}, {}, 0, '{}', {});".format(
                    test_case_id, self._env_variable_id, create_by, round(time.time())))
            self._db.end()
            try:
                self._run_single_case(test_case_id)
                self._db.update(
                    "UPDATE `api_platform`.`implement_logs` SET `logs`= %s, `status` = 1, `update_at`= %s WHERE `id`= %s;",
                    [str(self._result_data), round(time.time()), insert_id])
                self._db.end()
            except Exception as e:
                self._db.update(
                    "UPDATE `api_platform`.`implement_logs` SET `logs`= %s, `status` = 2, `update_at`= %s WHERE `id`= %s;",
                    [str(e), round(time.time()), insert_id])
                self._db.end()
            return self._result_data
    
        def _run_single_case(self, test_case_id):
            case_steps = self.collect_case(test_case_id)
            self._running_case_name = \
                self._db.getOne("SELECT `name` FROM `api_platform`.`test_case` WHERE id = {};".format(test_case_id))[0]
            self._running_case_id = test_case_id
            self.logs_tool = CaseRunningLogs()
            self.logs_tool.info(
                "*" * 60 + "【开始执行测试用例-{}-{}】".format(self._running_case_id, self._running_case_name) + "*" * 60)
            if case_steps:
                for step in case_steps:
                    try:
                        self.logs_tool.info(" "*20+"-" * 40 + "【开始执行步骤-{}-{}】".format(step[0], step[3]) + "-" * 40)
                        if step[2] == 0:
                            path = self.replace_variable(step[4])
                            header = self.replace_variable(step[6]) if step[6] else '{}'
                            param = self.replace_variable(step[7]) if step[7] else '{}'
                            if step[5] == 0:
                                res = requests.get(url=path, headers=eval(header), params=eval(param))
                                self.logs_tool.info(
                                    "获取响应数据为: {}".format(json.dumps(res.json(), indent=4, ensure_ascii=False)))
                                if step[11]:
                                    self.save_case_env_param(eval(step[11]), res.json())
                            elif step[5] == 1:
                                res = requests.post(url=path, headers=eval(header), json=eval(param))
                                self.logs_tool.info(
                                    "获取响应数据为: {}".format(json.dumps(res.json(), indent=4, ensure_ascii=False)))
                                if step[11]:
                                    self.save_case_env_param(eval(step[11]), res.json())
                            elif step[5] == 2:
                                res = requests.head(url=path, headers=eval(header))
                                self.logs_tool.info(
                                    "获取响应数据为: {}".format(json.dumps(res.json(), indent=4, ensure_ascii=False)))
                                if step[11]:
                                    self.save_case_env_param(eval(step[11]), res.json())
                            elif step[5] == 3:
                                res = requests.put(url=path, headers=eval(header), data=eval(param))
                                self.logs_tool.info(
                                    "获取响应数据为: {}".format(json.dumps(res.json(), indent=4, ensure_ascii=False)))
                                if step[11]:
                                    self.save_case_env_param(eval(step[11]), res.json())
                            elif step[5] == 4:
                                res = requests.delete(url=path, headers=eval(header))
                                self.logs_tool.info(
                                    "获取响应数据为: {}".format(json.dumps(res.json(), indent=4, ensure_ascii=False)))
                                if step[11]:
                                    self.save_case_env_param(eval(step[11]), res.json())
                            elif step[5] == 5:
                                res = requests.get(url=path, headers=eval(header), params=eval(param))
                                self.logs_tool.info(
                                    "获取响应数据为: {}".format(json.dumps(res.json(), indent=4, ensure_ascii=False)))
                                if step[11]:
                                    self.save_case_env_param(eval(step[11]), res.json())
                            elif step[5] == 6:
                                res = requests.options(url=path, headers=eval(header))
                                self.logs_tool.info(
                                    "获取响应数据为: {}".format(json.dumps(res.json(), indent=4, ensure_ascii=False)))
                                if step[11]:
                                    self.save_case_env_param(eval(step[11]), res.json())
                            elif step[5] == 7:
                                res = requests.get(url=path, headers=eval(header), params=eval(param))
                                self.logs_tool.info(
                                    "获取响应数据为: {}".format(json.dumps(res.json(), indent=4, ensure_ascii=False)))
                                if step[11]:
                                    self.save_case_env_param(eval(step[11]), res.json())
                            elif step[5] == 8:
                                res = requests.patch(url=path, headers=eval(header), data=eval(param))
                                self.logs_tool.info(
                                    "获取响应数据为: {}".format(json.dumps(res.json(), indent=4, ensure_ascii=False)))
                                if step[11]:
                                    self.save_case_env_param(eval(step[11]), res.json())
                        elif step[2] == 1:
                            except_body = eval(self.replace_variable(step[8]))
                            actual_body = eval(self.replace_variable(step[9]))
                            ComparData(self.logs_tool).do_check(expect=except_body, actual=actual_body,
                                                                ignore=eval(step[10]))
                    except Exception as e:
                        self.logs_tool.info("执行异常,错误信息为: {}".format(e))
                        self._result_data['result'].append(
                            {"case_id": self._running_case_id, "case_name": self._running_case_name, "status": 1,
                             "running_logs": self.logs_tool.get_logs()})
                        self._result_data["fail"] += 1
                        return self._result_data
                self._result_data['result'].append(
                    {"case_id": self._running_case_id, "case_name": self._running_case_name, "status": 0,
                     "running_logs": self.logs_tool.get_logs()})
                self._result_data["pass"] += 1
                return self._result_data
    
        def replace_variable(self, data):
            try:
                rep = re.findall('{{(.*?)}}', data)
                # rep = re.findall('{{.*?}}', data)
                for variable in rep:
                    if variable in self._env_param.keys():
                        data = data.replace("{{" + variable + "}}", str(self._env_param[variable]))
                    else:
                        data = data.replace("{{" + variable + "}}", str(self._case_env_param[variable]))
                return data
            except Exception as e:
                self.logs_tool.info("不存在的环境变量 {}".format(e))
                raise Exception("不存在的环境变量 {}".format(e))
    
        def save_case_env_param(self, case_env_param, response):
            try:
                for key_value in case_env_param.items():
                    result = response
                    if key_value[1]:
                        expect_split = key_value[1].split('.')
                        for i in range(0, len(expect_split)):
                            if isinstance(result, dict):
                                result = result[expect_split[i]]
                            elif isinstance(result, list):
                                result = result[int(expect_split[i])]
                    self._case_env_param[key_value[0]] = result
            except Exception as e:
                self.logs_tool.info("响应中未找到设置参数 {} 中的 {}, 响应内容为 {}".format(case_env_param, e, response))
                raise Exception("响应中未找到设置参数 {} 中的 {}, 响应内容为 {}".format(case_env_param, e, response))
    
  • 我这个没有集成 alluer 报告哈,是自己收集的报告,如果是集成 alluer 报告的话,可以采用 iframe 的方式嵌入!因为他们编码方式确实不同

  • 用例执行的话我设计了 3 种类型,一种是单用例选择环境变量执行,每次可以独立选择,默认带出上一次的,第二种是测试套件配置环境变量执行,第三种定时执行!执行过程就是收集库里面的测试用例,按顺序执行,目前没考虑多线程,因为有些用例是会有先后顺序的

  • 目前前端还有一些没有完成,完成之后再分享