其他测试框架 pytest+request+allure 接口自动化框架搭建分享

xpcs · 2024年05月24日 · 最后由 皮卡丘 回复于 2024年06月13日 · 8418 次阅读

去年 11 月被裁,到现在还没上岸,gap 半年了。上岸无望,专业技能不能落下,花了两三天时间,把之前工作中搭建使用的接口自动化框架,重写了一套。
楼主代码菜鸡,代码可能比较 low - -,
希望通过本次分享,给社区里想写接口自动化的同学一些借鉴,也希望社区里的大神多给一些优化建议,大家互帮互助,共同进步~

框架基于 python 语言,框架使用 pytest,报告使用 allure
支持多环境运行,通过命令行传参区分
支持多进程跑测,用例需独立无依赖,conftest.py 中包含多进程下只运行一次的 fileLock fixture
支持数据库连接单例,一个库在一个进程下只会建立一次连接
支持 mysql、redis 操作
支持 get、post、put、delete 请求方法,请求是通过用例的请求头 Content-Type 来区分,是使用 params、data 还是 json 传参
支持参数化数据驱动,用参数化参数字典,去更新通用参数字典,更新后发起请求

以下使用 windows 环境

conda 配置和新建工程:

安装 conda
https://www.anaconda.com/download/success
新建工程,在 pycharm 中新建 conda 虚拟环境

安装 allure 报告

https://repo.maven.apache.org/maven2/io/qameta/allure/allure-commandline/2.29.0/

解压并配置环境变量
D:\allure\allure-2.29.0


cmd 命令行,验证 allure 安装

设置 pycharm 文件编码 UTF-8

依赖安装

pycharm 命令行运行
激活虚拟环境
conda activate pytest_api_auto
安装 python3.8
conda install python=3.8
安装依赖包
pip install requests
pip install jsonpath
pip install pytest
pip install allure-pytest
pip install pytest-sugar
pip install pytest-xdist
pip install pytest-assume
pip install pymysql
pip install redis
pip install faker
pip install filelock

目录划分

以下是源码部分

封装 log 日志工具类

# common/log_util.py
import logging
import os

# create logger
log = logging.getLogger("pytest_api_auto")
log.setLevel(logging.INFO)

# create file handler
# mode 默认为a追加模式,如果修改为w为覆盖模式,多进程运行会出现日志缺失和错乱
# 获取项目根目录拼接,日志会存在工程根目录pytest.log 每次运行追加写入
fh = logging.FileHandler(os.path.join(os.path.dirname(os.path.dirname(__file__)), "pytest.log"),
                         mode='a', encoding='UTF-8')
fh.setLevel(logging.INFO)

# create stream handler
sh = logging.StreamHandler(stream=None)

# create formatter
fmt = "%(asctime)s - %(filename)s - %(funcName)s - %(lineno)d - %(levelname)s - %(message)s"
formatter = logging.Formatter(fmt)

# add handler and formatter to logger
fh.setFormatter(formatter)
sh.setFormatter(formatter)

log.addHandler(fh)
log.addHandler(sh)

封装环境配置

config/env.py
from common.log_util import log


class ENV:
    # 环境信息:test 测试  prod 准生产 # 从pytest命令行获取
    info = None


# 测试环境服务域名配置
class UrlTestConfig:
    api_backend = "http://api_backend.cn:8899"


# 准生产环境服务域名配置
class UrlProdConfig:
    api_backend = "http://api_backend.cn:8899"


def get_url(server_name):
    if ENV.info == "test":
        url = getattr(UrlTestConfig, server_name)
        log.info(f"测试环境获取服务域名 - {server_name} : {url}")
        return url
    elif ENV.info == "prod":
        url = getattr(UrlProdConfig, server_name)
        log.info(f"准生产环境获取服务域名 - {server_name} : {url}")
        return url
    else:
        raise Exception("--env 环境信息有误")

封装 mysql 操作工具类

# common/mysql_util.py
# 装饰器,同一个mysql数据库只建立一次连接
import pymysql
from time import sleep
from common.log_util import log


# 装饰器,同一个mysql数据库只建立一次连接
def decorate_single(cls):
    connect_list = {}

    def wrapper(*args, **kwargs):
        nonlocal connect_list
        db_name = args[0]["db"]
        if db_name not in connect_list:
            connect_list[db_name] = cls(*args, **kwargs)
            log.info(f"建立mysql连接并返回 - {db_name}")
        else:
            log.info(f"mysql连接已建立,直接返回 - {db_name}")
        return connect_list[db_name]
    return wrapper


@decorate_single
class MySql:

    def __init__(self, db_config: dict):
        """
        :params: db_config 数据库配置 类型为字典
        """
        # 数据库配置
        # autocommit: True 选项很关键,如果不设置,新增数据无法查出
        # mysql默认数据引擎是innodb 默认数据隔离级别重复读,如果事务不提交,那么每次查询,查询都是同一块数据快照
        self.conn = None
        while True:
            try:
                self.conn = pymysql.connect(**db_config)
                break
                # 数据库连接,偶尔会连接不上
                # 报错 pymysql.err.OperationalError: (2013, 'Lost connection to MySQL server during query')
                # 解决办法,就是重新连接
            except pymysql.err.OperationalError:
                log.warning("连接失败,可能环境不稳定,重新连接!")
                sleep(1)
            except Exception as e:
                log.warning("获取mysql连接失败!请检查数据库配置或网络连接")
                raise e

    def fetchone(self, sql_str: str):
        """
        :params: sql_str 数据库sql
        :return: 返回查询结果的一条记录,类型是字典; 若未查询到,则返回None
        """
        try:
            with self.conn.cursor(pymysql.cursors.DictCursor) as cursor:
                log.info(f"执行sql: {sql_str}")
                cursor.execute(sql_str)
                data = cursor.fetchone()
                log.info(f"sql执行结果: {data}")
                return data
        except Exception as e:
            log.warning("执行sql失败!")
            raise e

    def fetchall(self, sql_str: str):
        """
        :params: sql_str 数据库sql
        :return: 返回查询结果的全部记录,类型是列表,列表元素为字典
        """
        try:
            with self.conn.cursor(pymysql.cursors.DictCursor) as cursor:
                log.info(f"执行sql: {sql_str}")
                cursor.execute(sql_str)
                data = cursor.fetchall()
                log.info(f"sql执行结果: {data}")
            return data
        except Exception as e:
            log.warning("执行sql失败!")
            raise e

    def execute_dml(self, sql_str):
        """
        function: 执行insert、update、delete
        :param sql_str 数据库sql
        :return: 无返回
        """
        try:
            with self.conn.cursor(pymysql.cursors.DictCursor) as cursor:
                log.info(f"执行sql: {sql_str}")
                data = cursor.execute(sql_str)
                # 提交操作,我们配置连接是自动提交,所以下面提交步骤也可省略
                self.conn.commit()
                log.info(f"sql执行结果: {data}")
        except Exception as e:
            log.warning("执行sql失败!")
            raise e

    def close(self):
        """
        function:关闭数据库连接
        params: conn 数据库连接
        """
        self.conn.close()

封装 mysql 配置和连接获取

# config/mysql.py
from common.mysql_util import MySql
from config.env import ENV
from common.log_util import log


# 数据库连接配置
class MysqlTestConfig:
    """Mysql测试环境配置"""
    api_auto = {'host': 'localhost', 'port': 3306,
                'db': 'api_auto', 'user': 'root',
                'password': 'root', 'autocommit': True
                }


class MysqlProdConfig:
    """Mysql准生产环境配置"""
    api_auto = {'host': 'localhost', 'port': 3306,
                'db': 'api_auto', 'user': 'root',
                'password': 'root', 'autocommit': True
                }


def get_mysql_conn(db_name):
    if ENV.info == "test":
        log.info("测试环境建立mysql连接 - " + db_name)
        return MySql(getattr(MysqlTestConfig, db_name))
    elif ENV.info == "prod":
        log.info("准生产环境建立mysql连接 - " + db_name)
        return MySql(getattr(MysqlProdConfig, db_name))
    else:
        raise Exception("--env 环境信息有误")

封装 redis 工具类

# common/redis_util.py
import redis
from common.log_util import log


# 装饰器,同一个redis只建立一次连接
def decorate_single(cls):
    connect_list = {}

    def wrapper(*args, **kwargs):
        nonlocal connect_list
        host = args[0]["host"]
        if host not in connect_list:
            connect_list[host] = cls(*args, **kwargs)
            log.info(f"建立redis连接并返回 - {host}")
        else:
            log.info(f"redis连接已建立,直接返回 - {host}")
        return connect_list[host]
    return wrapper


@decorate_single
class Redis:

    def __init__(self, db_config):
        """
        :params: db_config 数据库配置 类型为字典
        """
        self.pool = redis.ConnectionPool(**db_config)
        self.rs = redis.Redis(connection_pool=self.pool)

    def del_key(self, key):
        """
        :param key: redis key str字符类型
        :return: 删除成功返回 True 否则 False
        """
        log.info(f"redis 删除key {key}")
        if self.rs.delete(key) == 1:
            log.info(f"key {key} 删除成功")
            return True
        else:
            log.warning(f"key: {key} 不存在!")
            return False

    def del_keys(self, keys_pattern):
        """
        :param keys_pattern: key通配符 str字符类型 ex: *name*
        :return:删除成功返回 True 否则 False
        """
        log.info(f"redis 删除keys 通配符 {keys_pattern}")
        keys = self.rs.keys(keys_pattern)
        if keys:
            log.info(f"redis 删除keys {keys}")
            for k in keys:
                self.rs.delete(k)
            log.info(f"keys {keys} 删除成功")
            return True
        else:
            log.warning("通配符未匹配到key!")
            return False

    def set(self, key, value, ex=8 * 60 * 60):
        """
        操作str类型
        :param key: redis key str字符类型
        :param value: str字符类型
        :param ex: 数据超时时间,默认8小时
        return: 写入成功返回 True
        """
        log.info(f"redis str类型 数据写入 key: {key}  value: {value}")
        return self.rs.set(key, value, ex=ex)

    def get(self, key):
        """
        操作str类型
        :param key: redis key str字符类型
        :return: 获取到返回str字符类型 # 未获取到返回 None
        """
        data = self.rs.get(key)
        log.info(f"redis str类型 数据获取 key: {key}  value: {data}")
        return data

    def lrange(self, key):
        """
        操作list类型
        :param key: redis key str字符类型
        return: 获取到返回list列表类型 # 未获取到返回空列表 []
        """
        data = self.rs.lrange(key, 0, -1)
        log.info(f"redis list类型 数据获取 key: {key}  values: {data}")
        return data

    def smembers(self, key):
        """
        操作 set 集合
        :param key: redis key str字符类型
        return: 获取到返回set集合类型 # 未获取到返回空集合 set()
        """
        data = self.rs.smembers(key)
        log.info(f"redis set类型 数据获取 key: {key}  values: {data}")
        return data

    def zrange(self, key):
        """
        操作 zset 有序集合
        :param key: redis key str字符类型
        return: 获取到返回list列表类型 # 未获取到返回空列表 []
        """
        data = self.rs.zrange(key, 0, -1)
        log.info(f"redis zset类型 数据获取 key: {key}  values: {data}")
        return data

    # hash 操作 hset hget 后续可扩展

    def close(self):
        """
        function:关闭数据库连接
        params: rs Redis对象
        """
        self.rs.close()

封装 redis 配置和连接获取

# config/redis.py
from common.redis_util import Redis
from config.env import ENV
from common.log_util import log


class RedisTestConfig:
    api_backend = {'host': 'api_backend.cn', 'password': 'redis123',
                   'port': 6379, 'db': 0, 'decode_responses': True}


class RedisProdConfig:
    api_backend = {'host': 'api_backend.cn', 'password': 'redis123',
                   'port': 6379, 'db': 0, 'decode_responses': True}


def get_redis_conn(name):
    if ENV.info == "test":
        log.info("测试环境建立redis连接 - " + name)
        return Redis(getattr(RedisTestConfig, name))
    elif ENV.info == "prod":
        log.info("准生产环境建立redis连接 - " + name)
        return Redis(getattr(RedisProdConfig, name))
    else:
        raise Exception("--env 环境信息有误")

封装 requests 工具类

# common/requests_util.py
import requests
from common.log_util import log


def send_request(url, method, data, headers, **kwargs):
    """
    :param url: 请求域名  类型 str ex: http://xxx.com/path
    :param method: 请求方法 类型 str 暂时支持 get、post、put、delete
    :param data: 请求数据,类型 dict、list、str
    :param headers: 请求头,类型 dict
    :param kwargs: 扩展支持 files 上传文件、proxy 代理等
    :return:
    """
    if not url.startswith("http://") and not url.startswith("https://"):
        raise Exception("请求url缺少协议名")
    if method.lower() not in ("get", "post", "put", "delete"):
        raise Exception(f"暂不支持请求方法 - {method} - 可后续扩展")

    log.info("请求参数:")
    log.info(f"url: {url}")
    log.info(f"method: {method}")
    log.info(f"data: {data}")
    log.info(f"headers: {headers}")
    log.info(f"kwargs: {kwargs}")

    try:
        if "Content-Type" in headers.keys():
            # headers 包含传参类型
            if headers["Content-Type"] in ("application/x-www-form-urlencoded", "multipart/form-data"):
                res = requests.request(url=url, method=method, data=data, headers=headers, timeout=30, **kwargs)
            elif headers["Content-Type"] == "application/json":
                res = requests.request(url=url, method=method, json=data, headers=headers, timeout=30, **kwargs)
            else:  # 若非上面三种类型,默认使用json传参 text/html, text/plain等,可后续扩展验证
                res = requests.request(url=url, method=method, json=data, headers=headers, timeout=30, **kwargs)
        else:
            # 请求头没指定传参类型Content-Type,则使用params传参,即在url中传参,如get请求
            res = requests.request(url=url, method=method, params=data, headers=headers, timeout=30, **kwargs)
    except Exception as e:
        log.warning("请求发生异常!!!")
        raise e

    if res.status_code == 200:
        log.info("请求成功")
        log.info("响应参数:")
        log.info(f"{res.text}")
    else:
        log.warning(f"请求失败!!! 返回码不为200, 状态码为: {res.status_code}")
        log.warning(f"响应参数:")
        log.warning(f"text: {res.text}")
        log.warning(f"raw: {res.raw}")
        raise Exception("返回码不为200")

    try:
        # 返回为字典类型
        return res.json()
    except requests.exceptions.JSONDecodeError:
        log.warning("响应参数不为json,返回响应 response对象")
        return res

封装用例数据解析

# common/data_parser.py
from config.env import get_url
from common.request_util import send_request
import jsonpath
from common.log_util import log


def parser(server_name, data_dict, param=None, **kwargs):
    """
    :param server_name: env.py 中的服务域名 类型str  ex: api_backend
    :param data_dict: test_xxx.py测试用例对应data.py中的接口请求数据字典  ex: api_backend["get_student"]
    :param param: data.py 参数化列表中一项中的data ex: api_backend["get_student"]["param_list"][0]["data"]
    :**kwargs: 扩展参数
    :return: 请求结果,如果响应是json类型返回dict,否则返回response对象
    """
    # 获取配置中的服务器域名,拼接path
    url = get_url(server_name) + data_dict["path"]
    method = data_dict["method"]
    headers = data_dict["headers"]
    data = data_dict["data"]

    #  参数化后发起请求,用参数化参数更新或替代通用参数
    if param:
        if isinstance(data, dict) and isinstance(param, dict):
            #  如果通用参数为字典,参数化参数也为字典,使用参数化参数更新通用参数  ex: {"xx": "xx"}
            data.update(param)
        else:
            # 如果通用参数是字符串、列表(元素为字符、数字、字典),直接使用参数化参数据替换通用参数 ex: ["xx", "xx"]
            data = param

    res = send_request(url, method, data, headers, **kwargs)
    return res


def assert_res(res_dict, expect_dict):
    """
    :param res_dict: request请求返回的结果字典,类型 dict
    :param expect_dict: 预期结果字典, 类型 dict
    """
    if isinstance(res_dict, dict):
        log.info("开始断言")
        log.info(f"预期结果: {expect_dict}")
        # 遍历预期结果的key,使用jsonpath获取请求结果的value,与预期结果value比对
        for k in expect_dict.keys():
            res_list = jsonpath.jsonpath(res_dict, '$..' + str(k))  # 返回列表
            assert expect_dict[k] in res_list
        log.info("断言通过")
    else:
        log.warning("请求结果不为dict字典类型,跳过断言!")

封装 faker 模拟数据

# common/faker.py
from faker import Faker
from common.log_util import log


fake = Faker("zh_CN")


def get_name():
    name = fake.name()
    log.info(f"faker 生成姓名: {name}")
    return name


def get_phone_number():
    phone_number = fake.phone_number()
    log.info(f"faker 生成手机号: {phone_number}")
    return phone_number


def get_id_card():
    id_card = fake.ssn()
    log.info(f"faker 生成身份证号: {id_card}")
    return id_card

pytest.ini

[pytest]
addopts = -p no:warnings -vs
markers =
    multiprocess: suppurt mutl-process execute cases

全局 conftest.py

# conftest.py
import pytest
from common.log_util import log
from filelock import FileLock
import json
from config.env import ENV
import os
import allure


# 自定义环境信息pytest命令行
def pytest_addoption(parser):
    parser.addoption(
        "--env",
        action="store",
        default="test",
        help="set pytest running environment  ex: --env=test  --env=prod"
    )


# 从pytest命令行获取环境信息
@pytest.fixture(scope="session")
def get_env(request):
    ENV.info = request.config.getoption("--env")
    log.info("运行环境: " + ENV.info)
    return ENV.info


# 终结函数,最后执行
@pytest.fixture(scope="session", autouse=True)
def fixture_case_end(request):
    def case_end():
        log.info("测试结束")
    request.addfinalizer(case_end)


@pytest.fixture(scope="session", autouse=True)
# fixture 嵌套先执行获取环境信息get_env
# 加入 tmp_path_factory worker_id 用于多进程执行 # 多进程运行,token只获取一次
def fixture_get_token(get_env, tmp_path_factory, worker_id):
    # 单进程执行
    if worker_id == "master":
        # 获取token
        token = {"token": "xpcs"}
        log.info("fixture_get_token master获取token %s" % token['token'])
    else:
        # 多进程执行
        root_tmp_dir = tmp_path_factory.getbasetemp().parent
        fn = root_tmp_dir / "data.json"
        # 这里with里面的语句,理解为是被加锁的,同一时间只能有一个进程访问
        with FileLock(str(fn) + ".lock"):
            if fn.is_file():
                #  session_fixture 获取token已执行,直接从文件中读取token
                token = json.loads(fn.read_text())
                log.info("fixture_get_token slave使用token %s" % token['token'])
            else:
                token = {"token": "xpcs"}
                fn.write_text(json.dumps(token))
                log.info("fixture_get_token slave获取token %s" % token['token'])

    yield token['token']
    # session 结束后自动执行如下
    log.info("session结束")


# 用例失败自动执行钩子函数
@pytest.hookimpl(tryfirst=True, hookwrapper=True)
def pytest_runtest_makereport(item):
    # 获取钩子方法的调用结果
    outcome = yield
    rep = outcome.get_result()
    # 仅仅获取用例call 执行结果是失败的情况, 不包含 setup/teardown
    if rep.when == "call" and rep.failed:
        mode = "a" if os.path.exists("failures") else "w"
        with open("failures", mode) as f:
            # let's also access a fixture for the fun of it
            if "tmpdir" in item.fixturenames:
                extra = " (%s)" % item.funcargs["tmpdir"]
            else:
                extra = ""
            f.write(rep.nodeid + extra + "\n")

        with allure.step("用例运行失败,可加入信息"):
            allure.attach("失败内容: ----xpcs----", "失败标题", allure.attachment_type.TEXT)

测试用例

# line_of_business/service_name_api_backend/test_api_backend.py
import pytest
from time import sleep
import allure
from common.data_parser import parser, assert_res
from config.mysql import get_mysql_conn
from common.log_util import log
from line_of_business_name.service_name_api_backend.data import api_backend
from common.faker_util import get_name, get_id_card, get_phone_number
from config.redis import get_redis_conn


@allure.feature("flask后端接口测试")
class TestApiBackend:

    @classmethod
    def setup_class(cls):
        # 获取数据库连接,执行sql测试
        log.info("setup_class")
        # 数据库连接根据db库名单例,相同库返回同一个连接
        conn = get_mysql_conn("api_auto")
        conn1 = get_mysql_conn("api_auto")
        conn.execute_dml("insert into test_xdist(msg) values ('%s')" % "class_setup-数据库写入测试")
        conn1.fetchone("select * from test_xdist limit 1")

    @classmethod
    def teardown_class(cls):
        log.info("steup_teardowm")
        # 获取redis连接,执行命令测试
        # redis连接根据host单例,相同host返回同一个连接
        rs = get_redis_conn("api_backend")
        rs1 = get_redis_conn("api_backend")
        rs.set("name", "xp")
        rs1.get("name")

    @allure.story("测试故事1")
    @pytest.mark.xfail(reason='预期失败用例')
    @user12ize("param", [{"title": "标题1", "param": 2, "assert": 3}])
    def test_case_one(self, param):
        sleep(1)
        allure.dynamic.description("测试故事1-描述信息")
        allure.dynamic.severity(allure.severity_level.CRITICAL)  # 用例级别严重
        # allure动态标题
        allure.dynamic.title(param["title"])
        log.info("测试faker数据")
        log.info(f"{get_name()}  {get_phone_number()}  {get_id_card()}")
        # pytest.assume(False) # 多重断言插件,断言失败继续执行下面
        assert param["param"] + 2 == param["assert"]

    @allure.story("查询学生接口")
    @user14cess  # 此用例分组到可多进程跑测
    @user15ize("param", api_backend["get_student"]["param_list"])
    def test_get_student(self, param, fixture_get_token):
        sleep(1)
        allure.dynamic.title(param["title"])
        data_dict = api_backend["get_student"]
        data_dict["headers"]["Cookie"] = fixture_get_token
        res = parser("api_backend", data_dict, param["data"])
        assert_res(res, param["assert"])

    @allure.story("新增学生接口")
    @user17cess  # 此用例分组到可多进程跑测
    @user18ize("param", api_backend["post_student"]["param_list"])
    def test_post_student(self, param):
        sleep(1)
        allure.dynamic.title(param["title"])
        data_dict = api_backend["post_student"]
        res = parser("api_backend", data_dict, param["data"])
        assert_res(res, param["assert"])

    @allure.story("更新学生接口")
    @user20cess  # 此用例分组到可多进程跑测
    @user21ize("param", api_backend["put_student"]["param_list"])
    def test_put_student(self, param):
        sleep(1)
        allure.dynamic.title(param["title"])
        data_dict = api_backend["put_student"]
        res = parser("api_backend", data_dict, param["data"])
        assert_res(res, param["assert"])

用例数据驱动

# line_of_business/service_name_api_backend/data.py
# 服务名外层大字典,参数key是接口名,value是接口的请求信息字典,用例模块可通过接口名引用接口信息字典
# param_list 参数化列表,用于pytest参数化,每次选取其中一项的data,去更新外部data通用参数,发起请求
api_backend = {
    "get_student": dict(path="/student",
                        method="get",
                        # headers 不包含Content-Type 则request使用params传参
                        headers={},
                        # 通用参数,每次请求使用
                        data={"test": "test"},
                        #  参数化参数,每次使用其中一项,更新通用参数
                        param_list=[
                            {"title": "获取学生信息-张三", "data": {"name": "张三"}, "assert": {"code": 0, "msg": "ok"}},
                            {"title": "获取学生信息-李四", "data": {"name": "李四"}, "assert": {"code": 0, "msg": "ok"}},
                            {"title": "获取学生信息-王五", "data": {"name": "王五"}, "assert": {"code": 0, "msg": "ok"}}
                        ]),
    'post_student': dict(path="/student",
                         method="post",
                         # headers  Content-Type = application/x-www-form-urlencoded 则使用 request使用data传参
                         headers={"Cookie": "", "Content-Type": "application/x-www-form-urlencoded"},
                         data={"test": "test"},
                         param_list=[
                             {"title": "新增学生信息-张三", "data": {"name": "张三"}, "assert": {"code": 1, "msg": "ok"}},
                             {"title": "新增学生信息-李四", "data": {"name": "李四"}, "assert": {"code": 0, "msg": "ok"}},
                             {"title": "新增学生信息-王五", "data": {"name": "王五"}, "assert": {"code": 0, "msg": "ok"}}
                         ]),
    'put_student': dict(path="/student",
                        method="put",
                        # headers  Content-Type = application/json 则使用 request使用json传参
                        headers={"Cookie": "", "Content-Type": "application/json"},
                        data={"test": "test"},
                        param_list=[
                            {"title": "更新学生信息-张三", "data": {"name": "张三"}, "assert": {"code": 0, "msg": "ok"}},
                            {"title": "更新学生信息-李四", "data": {"name": "李四"}, "assert": {"code": 0, "msg": "okk"}},
                            {"title": "更新学生信息-王五", "data": {"name": "王五"}, "assert": {"code": 0, "msg": "ok"}}
                        ])
}

调试运行入口

# run.py
import pytest
import os


# 用例调试入口
if __name__ == '__main__':
    pytest.main([r"line_of_business_name", "--clean-alluredir", "--alluredir=allure_result", "--cache-clear", "--env=prod"])
    # pytest.main([r"-m multiprocess", "--clean-alluredir", "--alluredir=allure_result", "-n 3", "--cache-clear", "--env=prod"])
    os.system(r"allure generate allure_result -c -o allure_report")
    os.system(r"allure open -h 127.0.0.1 -p 8899 allure_report")

失败用例重跑

# failed_run.py
import pytest
import os


# 失败用例重跑
if __name__ == '__main__':
    pytest.main([r"line_of_business_name", "--lf", "--clean-alluredir", "--alluredir=allure_result",  "--env=prod"])
    os.system(r"allure generate allure_result -c -o allure_report")
    os.system(r"allure open -h 127.0.0.1 -p 8899 allure_report")

报告展示



共收到 37 条回复 时间 点赞

这种写着玩还可以,复杂的业务场景根本维护不了,还是不要造轮子,用现成的轮子不香吗。
改造下 httprunner 就可以用的很爽

大佬有项目源码吗? 想拉来看看

楼主代码菜鸡,代码可能比较 low - -, 能跑就行,什么 low 不 low 的,都是草台班子。

打工人 回复

哦哦,httprunner 我去看看去😀 ,我这套框架也可兼容写业务流程验证的。

disable 回复

😂

打工人 回复

源码已经全贴出来了😁 我看看往 git 上传一份

Vanessa 回复

直接根据 httprunner 的源码修改的吗? 我看 httprunner 官网写的还需要部署服务啥的 ?

Vanessa 回复

哈哈哈,是搞贷款的吗?

我最近也是搞贷款授信相关的自动化,参考 httprunner,不过通过录制来生成 api,然后再组装 case

花菜 回复

我之前也是通过录制来生成 api case,然后根据需要改改就成,之前分享过,跟这位楼主的标题都极度相似哈

蹲一个 git 链接

我想问下 gap 期间怎么过的,有没有其他收入还是已经自由了

13楼 已删除
xpcs #15 · 2024年05月27日 Author
王中 回复

👍 👍 👍

xpcs #16 · 2024年05月27日 Author
yeyu 回复

没有收入,全靠礼包死撑😥

打工人 回复

拿到源码了吗 我也想要

花菜 回复

抓到一个花菜留言

建议接口数据放到 yaml 文件中,这样看起来维护起来没那么费劲。

xpcs #20 · 2024年05月28日 Author
chen 回复

好的,我加入下 yaml 支持,用例同时支持字典和 yaml 😁

xpcs 回复

感谢

@user15ize("param", api_backend["get_student"]["param_list"])

Axian711 回复

请问这句是什么意思呢,好像是要参数化,但是没看懂@user15ize是什么意思

看到你 request 封装写了一推判断,其实没必要,无非就是区别到底 key 是 data 还是 json, request 入参直接可以用包装好的字典入参。所以为了减少一推多余的 ifelse,在原有的代码风格上帮忙简单优化了下

if "Content-Type" in headers.keys():
            # headers 包含传参类型
            if headers["Content-Type"] in ("application/x-www-form-urlencoded", "multipart/form-data"):
                res = requests.request(url=url, method=method, data=data, headers=headers, timeout=30, **kwargs)
            elif headers["Content-Type"] == "application/json":
                res = requests.request(url=url, method=method, json=data, headers=headers, timeout=30, **kwargs)
            else:  # 若非上面三种类型,默认使用json传参 text/html, text/plain等,可后续扩展验证
                res = requests.request(url=url, method=method, json=data, headers=headers, timeout=30, **kwargs)
        else:
            # 请求头没指定传参类型,使用params传参,即在url中传参,如正常get请求
            res = requests.request(url=url, method=method, params=data, headers=headers, timeout=30, **kwargs)

requests 工具类 用 ["Content-Type"] 判断 data 参数 还是 json 挺好。。。
我是根据 method=大小写 post 来判断😂

Axian711 回复

哦哦,应该是代码放上来之后,这里被压缩显示了。我看你 gitee 里面的代码是 pytest.mark.parametrize

xpcs #27 · 2024年05月28日 Author
Axian711 回复

好的

xpcs #28 · 2024年05月28日 Author
disable 回复

👍 👍 👍 学习了

王中 回复

包装成字典直接入参就行,压根不需要判断是不是 data 还是 json,

所以没必要写 data=data,headers=headers, url=url,method=method,这样入参只会麻烦自己。。。。

xpcs 回复

礼包挺大,不行开小店或者摆摊吧,干好了辛苦点还能有个收入

xpcs #31 · 2024年05月29日 Author
yeyu 回复

我都快考虑去外包了,先找自研,虽说自研也不一定稳,可外包后下一家只能还是外包,我没年龄优势了。现在是考虑北京不行换杭州试试,再不行就全国可飞了😥

32楼 已删除
xpcs #33 · 2024年05月29日 Author
disable 回复

这个是说,是直接用例里通过 key 判断么?用例中同时包含 params,data,json 三种数据,默认为空,直接全部解析后传到 request,到时候取到哪个就直接解析进去,取不到为空,那自然也传的空

这样也支持了比如 post 接口,同时需要 params 和 json 传参,这种用例里写个 params 再写个 json 两个就行
是这意思吗,大佬😁

xpcs 回复

差不多吧,反正设计的步骤字段跟 request 库入参保持一致就省事多了,不然请求前就需要重新包装下数据结构

仅楼主可见
36楼 已删除
xpcs #37 · 2024年06月11日 Author

V:252909768

38楼 已删除
仅楼主可见
xpcs #40 · 2024年06月13日 Author

微信:252909768

仅楼主可见
需要 登录 后方可回复, 如果你还没有账号请点击这里 注册