去年 11 月被裁,到现在还没上岸,gap 半年了。上岸无望,专业技能不能落下,花了两三天时间,把之前工作中搭建使用的接口自动化框架,重写了一套。
楼主代码菜鸡,代码可能比较 low - -,
希望通过本次分享,给社区里想写接口自动化的同学一些借鉴,也希望社区里的大神多给一些优化建议,大家互帮互助,共同进步~
框架基于 python 语言,框架使用 pytest,报告使用 allure
支持多环境运行,通过命令行传参区分
支持多进程跑测,用例需独立无依赖,conftest.py 中包含多进程下只运行一次的 fileLock fixture
支持数据库连接单例,一个库在一个进程下只会建立一次连接
支持 mysql、redis 操作
支持 get、post、put、delete 请求方法,请求是通过用例的请求头 Content-Type 来区分,是使用 params、data 还是 json 传参
支持参数化数据驱动,用参数化参数字典,去更新通用参数字典,更新后发起请求
以下使用 windows 环境
安装 conda
https://www.anaconda.com/download/success
新建工程,在 pycharm 中新建 conda 虚拟环境
https://repo.maven.apache.org/maven2/io/qameta/allure/allure-commandline/2.29.0/
解压并配置环境变量
D:\allure\allure-2.29.0
cmd 命令行,验证 allure 安装
pycharm 命令行运行
激活虚拟环境
conda activate pytest_api_auto
安装 python3.8
conda install python=3.8
安装依赖包
pip install requests
pip install jsonpath
pip install pytest
pip install allure-pytest
pip install pytest-sugar
pip install pytest-xdist
pip install pytest-assume
pip install pymysql
pip install redis
pip install faker
pip install filelock
封装 log 日志工具类
# common/log_util.py
import logging
import os
# create logger
log = logging.getLogger("pytest_api_auto")
log.setLevel(logging.INFO)
# create file handler
# mode 默认为a追加模式,如果修改为w为覆盖模式,多进程运行会出现日志缺失和错乱
# 获取项目根目录拼接,日志会存在工程根目录pytest.log 每次运行追加写入
fh = logging.FileHandler(os.path.join(os.path.dirname(os.path.dirname(__file__)), "pytest.log"),
mode='a', encoding='UTF-8')
fh.setLevel(logging.INFO)
# create stream handler
sh = logging.StreamHandler(stream=None)
# create formatter
fmt = "%(asctime)s - %(filename)s - %(funcName)s - %(lineno)d - %(levelname)s - %(message)s"
formatter = logging.Formatter(fmt)
# add handler and formatter to logger
fh.setFormatter(formatter)
sh.setFormatter(formatter)
log.addHandler(fh)
log.addHandler(sh)
封装环境配置
config/env.py
from common.log_util import log
class ENV:
# 环境信息:test 测试 prod 准生产 # 从pytest命令行获取
info = None
# 测试环境服务域名配置
class UrlTestConfig:
api_backend = "http://api_backend.cn:8899"
# 准生产环境服务域名配置
class UrlProdConfig:
api_backend = "http://api_backend.cn:8899"
def get_url(server_name):
if ENV.info == "test":
url = getattr(UrlTestConfig, server_name)
log.info(f"测试环境获取服务域名 - {server_name} : {url}")
return url
elif ENV.info == "prod":
url = getattr(UrlProdConfig, server_name)
log.info(f"准生产环境获取服务域名 - {server_name} : {url}")
return url
else:
raise Exception("--env 环境信息有误")
封装 mysql 操作工具类
# common/mysql_util.py
# 装饰器,同一个mysql数据库只建立一次连接
import pymysql
from time import sleep
from common.log_util import log
# 装饰器,同一个mysql数据库只建立一次连接
def decorate_single(cls):
connect_list = {}
def wrapper(*args, **kwargs):
nonlocal connect_list
db_name = args[0]["db"]
if db_name not in connect_list:
connect_list[db_name] = cls(*args, **kwargs)
log.info(f"建立mysql连接并返回 - {db_name}")
else:
log.info(f"mysql连接已建立,直接返回 - {db_name}")
return connect_list[db_name]
return wrapper
@decorate_single
class MySql:
def __init__(self, db_config: dict):
"""
:params: db_config 数据库配置 类型为字典
"""
# 数据库配置
# autocommit: True 选项很关键,如果不设置,新增数据无法查出
# mysql默认数据引擎是innodb 默认数据隔离级别重复读,如果事务不提交,那么每次查询,查询都是同一块数据快照
self.conn = None
while True:
try:
self.conn = pymysql.connect(**db_config)
break
# 数据库连接,偶尔会连接不上
# 报错 pymysql.err.OperationalError: (2013, 'Lost connection to MySQL server during query')
# 解决办法,就是重新连接
except pymysql.err.OperationalError:
log.warning("连接失败,可能环境不稳定,重新连接!")
sleep(1)
except Exception as e:
log.warning("获取mysql连接失败!请检查数据库配置或网络连接")
raise e
def fetchone(self, sql_str: str):
"""
:params: sql_str 数据库sql
:return: 返回查询结果的一条记录,类型是字典; 若未查询到,则返回None
"""
try:
with self.conn.cursor(pymysql.cursors.DictCursor) as cursor:
log.info(f"执行sql: {sql_str}")
cursor.execute(sql_str)
data = cursor.fetchone()
log.info(f"sql执行结果: {data}")
return data
except Exception as e:
log.warning("执行sql失败!")
raise e
def fetchall(self, sql_str: str):
"""
:params: sql_str 数据库sql
:return: 返回查询结果的全部记录,类型是列表,列表元素为字典
"""
try:
with self.conn.cursor(pymysql.cursors.DictCursor) as cursor:
log.info(f"执行sql: {sql_str}")
cursor.execute(sql_str)
data = cursor.fetchall()
log.info(f"sql执行结果: {data}")
return data
except Exception as e:
log.warning("执行sql失败!")
raise e
def execute_dml(self, sql_str):
"""
function: 执行insert、update、delete
:param sql_str 数据库sql
:return: 无返回
"""
try:
with self.conn.cursor(pymysql.cursors.DictCursor) as cursor:
log.info(f"执行sql: {sql_str}")
data = cursor.execute(sql_str)
# 提交操作,我们配置连接是自动提交,所以下面提交步骤也可省略
self.conn.commit()
log.info(f"sql执行结果: {data}")
except Exception as e:
log.warning("执行sql失败!")
raise e
def close(self):
"""
function:关闭数据库连接
params: conn 数据库连接
"""
self.conn.close()
封装 mysql 配置和连接获取
# config/mysql.py
from common.mysql_util import MySql
from config.env import ENV
from common.log_util import log
# 数据库连接配置
class MysqlTestConfig:
"""Mysql测试环境配置"""
api_auto = {'host': 'localhost', 'port': 3306,
'db': 'api_auto', 'user': 'root',
'password': 'root', 'autocommit': True
}
class MysqlProdConfig:
"""Mysql准生产环境配置"""
api_auto = {'host': 'localhost', 'port': 3306,
'db': 'api_auto', 'user': 'root',
'password': 'root', 'autocommit': True
}
def get_mysql_conn(db_name):
if ENV.info == "test":
log.info("测试环境建立mysql连接 - " + db_name)
return MySql(getattr(MysqlTestConfig, db_name))
elif ENV.info == "prod":
log.info("准生产环境建立mysql连接 - " + db_name)
return MySql(getattr(MysqlProdConfig, db_name))
else:
raise Exception("--env 环境信息有误")
封装 redis 工具类
# common/redis_util.py
import redis
from common.log_util import log
# 装饰器,同一个redis只建立一次连接
def decorate_single(cls):
connect_list = {}
def wrapper(*args, **kwargs):
nonlocal connect_list
host = args[0]["host"]
if host not in connect_list:
connect_list[host] = cls(*args, **kwargs)
log.info(f"建立redis连接并返回 - {host}")
else:
log.info(f"redis连接已建立,直接返回 - {host}")
return connect_list[host]
return wrapper
@decorate_single
class Redis:
def __init__(self, db_config):
"""
:params: db_config 数据库配置 类型为字典
"""
self.pool = redis.ConnectionPool(**db_config)
self.rs = redis.Redis(connection_pool=self.pool)
def del_key(self, key):
"""
:param key: redis key str字符类型
:return: 删除成功返回 True 否则 False
"""
log.info(f"redis 删除key {key}")
if self.rs.delete(key) == 1:
log.info(f"key {key} 删除成功")
return True
else:
log.warning(f"key: {key} 不存在!")
return False
def del_keys(self, keys_pattern):
"""
:param keys_pattern: key通配符 str字符类型 ex: *name*
:return:删除成功返回 True 否则 False
"""
log.info(f"redis 删除keys 通配符 {keys_pattern}")
keys = self.rs.keys(keys_pattern)
if keys:
log.info(f"redis 删除keys {keys}")
for k in keys:
self.rs.delete(k)
log.info(f"keys {keys} 删除成功")
return True
else:
log.warning("通配符未匹配到key!")
return False
def set(self, key, value, ex=8 * 60 * 60):
"""
操作str类型
:param key: redis key str字符类型
:param value: str字符类型
:param ex: 数据超时时间,默认8小时
return: 写入成功返回 True
"""
log.info(f"redis str类型 数据写入 key: {key} value: {value}")
return self.rs.set(key, value, ex=ex)
def get(self, key):
"""
操作str类型
:param key: redis key str字符类型
:return: 获取到返回str字符类型 # 未获取到返回 None
"""
data = self.rs.get(key)
log.info(f"redis str类型 数据获取 key: {key} value: {data}")
return data
def lrange(self, key):
"""
操作list类型
:param key: redis key str字符类型
return: 获取到返回list列表类型 # 未获取到返回空列表 []
"""
data = self.rs.lrange(key, 0, -1)
log.info(f"redis list类型 数据获取 key: {key} values: {data}")
return data
def smembers(self, key):
"""
操作 set 集合
:param key: redis key str字符类型
return: 获取到返回set集合类型 # 未获取到返回空集合 set()
"""
data = self.rs.smembers(key)
log.info(f"redis set类型 数据获取 key: {key} values: {data}")
return data
def zrange(self, key):
"""
操作 zset 有序集合
:param key: redis key str字符类型
return: 获取到返回list列表类型 # 未获取到返回空列表 []
"""
data = self.rs.zrange(key, 0, -1)
log.info(f"redis zset类型 数据获取 key: {key} values: {data}")
return data
# hash 操作 hset hget 后续可扩展
def close(self):
"""
function:关闭数据库连接
params: rs Redis对象
"""
self.rs.close()
封装 redis 配置和连接获取
# config/redis.py
from common.redis_util import Redis
from config.env import ENV
from common.log_util import log
class RedisTestConfig:
api_backend = {'host': 'api_backend.cn', 'password': 'redis123',
'port': 6379, 'db': 0, 'decode_responses': True}
class RedisProdConfig:
api_backend = {'host': 'api_backend.cn', 'password': 'redis123',
'port': 6379, 'db': 0, 'decode_responses': True}
def get_redis_conn(name):
if ENV.info == "test":
log.info("测试环境建立redis连接 - " + name)
return Redis(getattr(RedisTestConfig, name))
elif ENV.info == "prod":
log.info("准生产环境建立redis连接 - " + name)
return Redis(getattr(RedisProdConfig, name))
else:
raise Exception("--env 环境信息有误")
封装 requests 工具类
# common/requests_util.py
import requests
from common.log_util import log
def send_request(url, method, data, headers, **kwargs):
"""
:param url: 请求域名 类型 str ex: http://xxx.com/path
:param method: 请求方法 类型 str 暂时支持 get、post、put、delete
:param data: 请求数据,类型 dict、list、str
:param headers: 请求头,类型 dict
:param kwargs: 扩展支持 files 上传文件、proxy 代理等
:return:
"""
if not url.startswith("http://") and not url.startswith("https://"):
raise Exception("请求url缺少协议名")
if method.lower() not in ("get", "post", "put", "delete"):
raise Exception(f"暂不支持请求方法 - {method} - 可后续扩展")
log.info("请求参数:")
log.info(f"url: {url}")
log.info(f"method: {method}")
log.info(f"data: {data}")
log.info(f"headers: {headers}")
log.info(f"kwargs: {kwargs}")
try:
if "Content-Type" in headers.keys():
# headers 包含传参类型
if headers["Content-Type"] in ("application/x-www-form-urlencoded", "multipart/form-data"):
res = requests.request(url=url, method=method, data=data, headers=headers, timeout=30, **kwargs)
elif headers["Content-Type"] == "application/json":
res = requests.request(url=url, method=method, json=data, headers=headers, timeout=30, **kwargs)
else: # 若非上面三种类型,默认使用json传参 text/html, text/plain等,可后续扩展验证
res = requests.request(url=url, method=method, json=data, headers=headers, timeout=30, **kwargs)
else:
# 请求头没指定传参类型Content-Type,则使用params传参,即在url中传参,如get请求
res = requests.request(url=url, method=method, params=data, headers=headers, timeout=30, **kwargs)
except Exception as e:
log.warning("请求发生异常!!!")
raise e
if res.status_code == 200:
log.info("请求成功")
log.info("响应参数:")
log.info(f"{res.text}")
else:
log.warning(f"请求失败!!! 返回码不为200, 状态码为: {res.status_code}")
log.warning(f"响应参数:")
log.warning(f"text: {res.text}")
log.warning(f"raw: {res.raw}")
raise Exception("返回码不为200")
try:
# 返回为字典类型
return res.json()
except requests.exceptions.JSONDecodeError:
log.warning("响应参数不为json,返回响应 response对象")
return res
封装用例数据解析
# common/data_parser.py
from config.env import get_url
from common.request_util import send_request
import jsonpath
from common.log_util import log
def parser(server_name, data_dict, param=None, **kwargs):
"""
:param server_name: env.py 中的服务域名 类型str ex: api_backend
:param data_dict: test_xxx.py测试用例对应data.py中的接口请求数据字典 ex: api_backend["get_student"]
:param param: data.py 参数化列表中一项中的data ex: api_backend["get_student"]["param_list"][0]["data"]
:**kwargs: 扩展参数
:return: 请求结果,如果响应是json类型返回dict,否则返回response对象
"""
# 获取配置中的服务器域名,拼接path
url = get_url(server_name) + data_dict["path"]
method = data_dict["method"]
headers = data_dict["headers"]
data = data_dict["data"]
# 参数化后发起请求,用参数化参数更新或替代通用参数
if param:
if isinstance(data, dict) and isinstance(param, dict):
# 如果通用参数为字典,参数化参数也为字典,使用参数化参数更新通用参数 ex: {"xx": "xx"}
data.update(param)
else:
# 如果通用参数是字符串、列表(元素为字符、数字、字典),直接使用参数化参数据替换通用参数 ex: ["xx", "xx"]
data = param
res = send_request(url, method, data, headers, **kwargs)
return res
def assert_res(res_dict, expect_dict):
"""
:param res_dict: request请求返回的结果字典,类型 dict
:param expect_dict: 预期结果字典, 类型 dict
"""
if isinstance(res_dict, dict):
log.info("开始断言")
log.info(f"预期结果: {expect_dict}")
# 遍历预期结果的key,使用jsonpath获取请求结果的value,与预期结果value比对
for k in expect_dict.keys():
res_list = jsonpath.jsonpath(res_dict, '$..' + str(k)) # 返回列表
assert expect_dict[k] in res_list
log.info("断言通过")
else:
log.warning("请求结果不为dict字典类型,跳过断言!")
封装 faker 模拟数据
# common/faker.py
from faker import Faker
from common.log_util import log
fake = Faker("zh_CN")
def get_name():
name = fake.name()
log.info(f"faker 生成姓名: {name}")
return name
def get_phone_number():
phone_number = fake.phone_number()
log.info(f"faker 生成手机号: {phone_number}")
return phone_number
def get_id_card():
id_card = fake.ssn()
log.info(f"faker 生成身份证号: {id_card}")
return id_card
pytest.ini
[pytest]
addopts = -p no:warnings -vs
markers =
multiprocess: suppurt mutl-process execute cases
全局 conftest.py
# conftest.py
import pytest
from common.log_util import log
from filelock import FileLock
import json
from config.env import ENV
import os
import allure
# 自定义环境信息pytest命令行
def pytest_addoption(parser):
parser.addoption(
"--env",
action="store",
default="test",
help="set pytest running environment ex: --env=test --env=prod"
)
# 从pytest命令行获取环境信息
@pytest.fixture(scope="session")
def get_env(request):
ENV.info = request.config.getoption("--env")
log.info("运行环境: " + ENV.info)
return ENV.info
# 终结函数,最后执行
@pytest.fixture(scope="session", autouse=True)
def fixture_case_end(request):
def case_end():
log.info("测试结束")
request.addfinalizer(case_end)
@pytest.fixture(scope="session", autouse=True)
# fixture 嵌套先执行获取环境信息get_env
# 加入 tmp_path_factory worker_id 用于多进程执行 # 多进程运行,token只获取一次
def fixture_get_token(get_env, tmp_path_factory, worker_id):
# 单进程执行
if worker_id == "master":
# 获取token
token = {"token": "xpcs"}
log.info("fixture_get_token master获取token %s" % token['token'])
else:
# 多进程执行
root_tmp_dir = tmp_path_factory.getbasetemp().parent
fn = root_tmp_dir / "data.json"
# 这里with里面的语句,理解为是被加锁的,同一时间只能有一个进程访问
with FileLock(str(fn) + ".lock"):
if fn.is_file():
# session_fixture 获取token已执行,直接从文件中读取token
token = json.loads(fn.read_text())
log.info("fixture_get_token slave使用token %s" % token['token'])
else:
token = {"token": "xpcs"}
fn.write_text(json.dumps(token))
log.info("fixture_get_token slave获取token %s" % token['token'])
yield token['token']
# session 结束后自动执行如下
log.info("session结束")
# 用例失败自动执行钩子函数
@pytest.hookimpl(tryfirst=True, hookwrapper=True)
def pytest_runtest_makereport(item):
# 获取钩子方法的调用结果
outcome = yield
rep = outcome.get_result()
# 仅仅获取用例call 执行结果是失败的情况, 不包含 setup/teardown
if rep.when == "call" and rep.failed:
mode = "a" if os.path.exists("failures") else "w"
with open("failures", mode) as f:
# let's also access a fixture for the fun of it
if "tmpdir" in item.fixturenames:
extra = " (%s)" % item.funcargs["tmpdir"]
else:
extra = ""
f.write(rep.nodeid + extra + "\n")
with allure.step("用例运行失败,可加入信息"):
allure.attach("失败内容: ----xpcs----", "失败标题", allure.attachment_type.TEXT)
测试用例
# line_of_business/service_name_api_backend/test_api_backend.py
import pytest
from time import sleep
import allure
from common.data_parser import parser, assert_res
from config.mysql import get_mysql_conn
from common.log_util import log
from line_of_business_name.service_name_api_backend.data import api_backend
from common.faker_util import get_name, get_id_card, get_phone_number
from config.redis import get_redis_conn
@allure.feature("flask后端接口测试")
class TestApiBackend:
@classmethod
def setup_class(cls):
# 获取数据库连接,执行sql测试
log.info("setup_class")
# 数据库连接根据db库名单例,相同库返回同一个连接
conn = get_mysql_conn("api_auto")
conn1 = get_mysql_conn("api_auto")
conn.execute_dml("insert into test_xdist(msg) values ('%s')" % "class_setup-数据库写入测试")
conn1.fetchone("select * from test_xdist limit 1")
@classmethod
def teardown_class(cls):
log.info("steup_teardowm")
# 获取redis连接,执行命令测试
# redis连接根据host单例,相同host返回同一个连接
rs = get_redis_conn("api_backend")
rs1 = get_redis_conn("api_backend")
rs.set("name", "xp")
rs1.get("name")
@allure.story("测试故事1")
@pytest.mark.xfail(reason='预期失败用例')
@user12ize("param", [{"title": "标题1", "param": 2, "assert": 3}])
def test_case_one(self, param):
sleep(1)
allure.dynamic.description("测试故事1-描述信息")
allure.dynamic.severity(allure.severity_level.CRITICAL) # 用例级别严重
# allure动态标题
allure.dynamic.title(param["title"])
log.info("测试faker数据")
log.info(f"{get_name()} {get_phone_number()} {get_id_card()}")
# pytest.assume(False) # 多重断言插件,断言失败继续执行下面
assert param["param"] + 2 == param["assert"]
@allure.story("查询学生接口")
@user14cess # 此用例分组到可多进程跑测
@user15ize("param", api_backend["get_student"]["param_list"])
def test_get_student(self, param, fixture_get_token):
sleep(1)
allure.dynamic.title(param["title"])
data_dict = api_backend["get_student"]
data_dict["headers"]["Cookie"] = fixture_get_token
res = parser("api_backend", data_dict, param["data"])
assert_res(res, param["assert"])
@allure.story("新增学生接口")
@user17cess # 此用例分组到可多进程跑测
@user18ize("param", api_backend["post_student"]["param_list"])
def test_post_student(self, param):
sleep(1)
allure.dynamic.title(param["title"])
data_dict = api_backend["post_student"]
res = parser("api_backend", data_dict, param["data"])
assert_res(res, param["assert"])
@allure.story("更新学生接口")
@user20cess # 此用例分组到可多进程跑测
@user21ize("param", api_backend["put_student"]["param_list"])
def test_put_student(self, param):
sleep(1)
allure.dynamic.title(param["title"])
data_dict = api_backend["put_student"]
res = parser("api_backend", data_dict, param["data"])
assert_res(res, param["assert"])
用例数据驱动
# line_of_business/service_name_api_backend/data.py
# 服务名外层大字典,参数key是接口名,value是接口的请求信息字典,用例模块可通过接口名引用接口信息字典
# param_list 参数化列表,用于pytest参数化,每次选取其中一项的data,去更新外部data通用参数,发起请求
api_backend = {
"get_student": dict(path="/student",
method="get",
# headers 不包含Content-Type 则request使用params传参
headers={},
# 通用参数,每次请求使用
data={"test": "test"},
# 参数化参数,每次使用其中一项,更新通用参数
param_list=[
{"title": "获取学生信息-张三", "data": {"name": "张三"}, "assert": {"code": 0, "msg": "ok"}},
{"title": "获取学生信息-李四", "data": {"name": "李四"}, "assert": {"code": 0, "msg": "ok"}},
{"title": "获取学生信息-王五", "data": {"name": "王五"}, "assert": {"code": 0, "msg": "ok"}}
]),
'post_student': dict(path="/student",
method="post",
# headers Content-Type = application/x-www-form-urlencoded 则使用 request使用data传参
headers={"Cookie": "", "Content-Type": "application/x-www-form-urlencoded"},
data={"test": "test"},
param_list=[
{"title": "新增学生信息-张三", "data": {"name": "张三"}, "assert": {"code": 1, "msg": "ok"}},
{"title": "新增学生信息-李四", "data": {"name": "李四"}, "assert": {"code": 0, "msg": "ok"}},
{"title": "新增学生信息-王五", "data": {"name": "王五"}, "assert": {"code": 0, "msg": "ok"}}
]),
'put_student': dict(path="/student",
method="put",
# headers Content-Type = application/json 则使用 request使用json传参
headers={"Cookie": "", "Content-Type": "application/json"},
data={"test": "test"},
param_list=[
{"title": "更新学生信息-张三", "data": {"name": "张三"}, "assert": {"code": 0, "msg": "ok"}},
{"title": "更新学生信息-李四", "data": {"name": "李四"}, "assert": {"code": 0, "msg": "okk"}},
{"title": "更新学生信息-王五", "data": {"name": "王五"}, "assert": {"code": 0, "msg": "ok"}}
])
}
调试运行入口
# run.py
import pytest
import os
# 用例调试入口
if __name__ == '__main__':
pytest.main([r"line_of_business_name", "--clean-alluredir", "--alluredir=allure_result", "--cache-clear", "--env=prod"])
# pytest.main([r"-m multiprocess", "--clean-alluredir", "--alluredir=allure_result", "-n 3", "--cache-clear", "--env=prod"])
os.system(r"allure generate allure_result -c -o allure_report")
os.system(r"allure open -h 127.0.0.1 -p 8899 allure_report")
失败用例重跑
# failed_run.py
import pytest
import os
# 失败用例重跑
if __name__ == '__main__':
pytest.main([r"line_of_business_name", "--lf", "--clean-alluredir", "--alluredir=allure_result", "--env=prod"])
os.system(r"allure generate allure_result -c -o allure_report")
os.system(r"allure open -h 127.0.0.1 -p 8899 allure_report")