gap 7 个月了,快撑不住去外包了 ,沉下心来,继续练习专业技能,上次分享 pytest 接口自动化框架后,在平台同学们的讨论建议下,改良不少,受益匪浅。
本次分享之前工作中使用的造数工具项目(Vue + Flask)的后台部分,我在原有框架的思路上重新搭建,加入了自动注册蓝图和数据库连接池。
造数工具后台,使用了 Flask Web 应用框架,用于编写辅助测试的接口,方便生成测试数据,适用于业务线流程比较长的,经常依赖于前置数据的业务
希望通过本次分享,给想写造数据平台的同学一点参考思路,欢迎大家参与讨论,提出不足和改良建议。
创建 flask 应用 app 对象,加载配置到 app,添加跨域到 app,添加 404 错误处理,初始化项目日志,自动查找并注册蓝图,初始化数据库连接池,返回应用 app
# apps/__init.py
from flask import Flask, Blueprint
from flask_cors import CORS
from apps.settings import Config
import pkgutil
import sys
from logging.handlers import RotatingFileHandler
import logging
import os
from common.sql_helper import init_pool
from common.response_handler import response_error
def create_app(config_name):
app = Flask(__name__)
# 读取配置信息
app.config.from_object(Config.get(config_name))
# r'/*' 是通配符,让本服务器所有的 URL 都允许跨域请求
CORS(app, resources=r'/*')
# 错误处理
app.register_error_handler(404, lambda x: response_error("sorry, page does not exist"))
app.register_error_handler(500, lambda x: response_error("internal server error"))
# 初始化日志
init_log(app)
# 注册蓝图
register_bp(app)
# 初始化连接池
init_pool(app)
# 打印路由关系表
app.logger.info(f"路由关系 URL_MAP:\n{app.url_map}")
app.logger.info("初始化完成, 返回APP")
return app
def register_bp(app: Flask):
app.logger.info("初始化蓝图")
# 遍历apps包下的所有模块
for filefinder, name, ispkg in pkgutil.walk_packages(__path__, __name__ + "."):
if str(name).endswith("_view"): # 找出_view结尾的模块
__import__(name) # 引入模块
module = sys.modules[name] # 获取模块
for i in dir(module): # dir()列出模块中的类、方法、变量,进行遍历
var = getattr(module, i) # 获取变量
if isinstance(var, Blueprint): # 找到蓝图, 拼接项目前缀 /api/ 进行注册
url_prefix = var.url_prefix if var.url_prefix else ""
app.register_blueprint(var, url_prefix="/api/" + url_prefix)
def init_log(app: Flask):
app.logger.setLevel(logging.DEBUG)
log_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "log", "flask.log")
fmt = "%(asctime)s - %(filename)s - %(funcName)s - %(lineno)d - %(levelname)s - %(message)s"
formatter = logging.Formatter(fmt)
fh = RotatingFileHandler(
log_path,
maxBytes=1024 * 1024 * 1,
backupCount=10,
encoding="utf-8"
)
fh.setLevel(logging.INFO)
fh.setFormatter(formatter)
app.logger.addHandler(fh)
存放数据库配置,请求域名配置等信息
# apps/settings.py
class BaseConfig:
# 解决 jsonify 返回中文unicode编码
JSON_AS_ASCII = False
# 配置秘钥才可使用session
SECRET_KEY = "xpcs"
class DevConfig(BaseConfig):
DEBUG = True
# 开发环境配置
ENV = "development"
# mysql数据库配置
DB_API_AUTO = {'host': 'localhost', 'port': 3306,
'db': 'api_auto', 'user': 'root',
'password': 'root', 'autocommit': True
}
# 服务域名配置
URL_API_BACKEND = "http://api.cn"
class ProdConfig(BaseConfig):
# 生产环境配置 # 本机WSL
ENV = "production"
# mysql数据库配置
DB_API_AUTO = {'host': '172.20.95.252', 'port': 3306,
'db': 'api_auto', 'user': 'xpcs',
'password': 'xpcs', 'autocommit': True
}
# 服务域名配置
URL_API_BACKEND = "http://api.cn"
Config = {
"development": DevConfig,
"production": ProdConfig
}
减少每次连接关闭数据库的开销,支持多线程并发操作数据库
# common/sql_helper.py
# pip install DButils # 安装数据库连接池模块
import pymysql
from dbutils.pooled_db import PooledDB
from flask import Flask
class POOL:
def __init__(self, app: Flask, db_config: dict):
"""
:params: app flask app
:params: db_config 数据库配置 类型为字典
"""
self.app = app
try:
# 创建数据库连接池
self.pool = PooledDB(
creator=pymysql, # 使用链接数据库的模块
maxconnections=10, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached=1, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
maxcached=2, # 链接池中最多闲置的链接,0和None不限制
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
ping=0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never
**db_config)
except Exception as e:
app.logger.warning(f"{db_config['db']}-数据库连接池创建失败, 失败原因:{e}")
raise e
def fetchone(self, sql_str, *args):
"""
:param sql_str 数据库sql
:param args 可变参数,替换sql_str中的占位符,可不传
:return: 返回查询结果的一条记录,类型是字典; 若未查询到,则返回None
"""
conn = self.pool.connection()
with conn.cursor(pymysql.cursors.DictCursor) as cursor:
self.app.logger.info(f"执行sql: {sql_str}")
if args:
self.app.logger.info(f"占位符参数: {args}")
try:
cursor.execute(sql_str, args)
data = cursor.fetchone()
self.app.logger.info(f"sql执行结果: {data}")
except Exception as e:
cursor.close()
conn.close()
self.app.logger.warning(f"执行sql失败!- 失败信息: {e}")
raise e
conn.close()
return data
def fetchall(self, sql_str, *args):
"""
:param sql_str 数据库sql
:param args 可变参数,替换sql_str中的占位符,可不传
:return: 返回查询结果的全部记录,类型是列表,列表元素为字典
"""
conn = self.pool.connection()
with conn.cursor(pymysql.cursors.DictCursor) as cursor:
self.app.logger.info(f"执行sql: {sql_str}")
if args:
self.app.logger.info(f"占位符参数: {args}")
try:
cursor.execute(sql_str, args)
data = cursor.fetchall()
self.app.logger.info(f"sql执行结果: {data}")
except Exception as e:
cursor.close()
conn.close()
self.app.logger.warning(f"执行sql失败!- 失败信息: {e}")
raise e
conn.close()
return data
def execute_dml(self, sql_str, *args):
"""
function: 执行insert、update、delete
:param sql_str 数据库sql
:param args 可变参数,替换sql_str中的占位符,可不传
:return: 无返回
"""
conn = self.pool.connection()
with conn.cursor(pymysql.cursors.DictCursor) as cursor:
self.app.logger.info(f"执行sql: {sql_str}")
if args:
self.app.logger.info(f"占位符参数: {args}")
try:
data = cursor.execute(sql_str, args)
# 提交操作,我们配置连接是自动提交,所以下面提交步骤也可省略
conn.commit()
self.app.logger.info(f"sql执行结果: {data}")
except Exception as e:
cursor.close()
conn.close()
self.app.logger.warning(f"执行sql失败!- 失败信息: {e}")
raise e
conn.close()
return data
# 单例模式,全局连接均从这里取连接池
POOLS = {}
def init_pool(app: Flask):
app.logger.info("初始化数据库连接池")
POOLS["DB_API_AUTO"] = POOL(app, app.config["DB_API_AUTO"])
def get_pool(name) -> POOL:
return POOLS.get(name, None)
请求模块用于发送 http 请求,调用开发接口
# common/request_handler.py
import requests
from flask import current_app
import jsonpath
def send_request(request_body, **kwargs):
"""
:param request_body 请求数据
:param kwargs: 扩展支持 files 上传文件、proxy 代理等
:return:
"""
url = request_body["url"]
method = request_body["method"]
headers = request_body["headers"]
params = request_body["params"]
data = request_body["data"]
json = request_body["json"]
if not url.startswith("http://") and not url.startswith("https://"):
raise ValueError("请求url缺少协议名")
if method.lower() not in ("get", "post", "put", "delete"):
raise ValueError(f"暂不支持请求方法 - {method} - 可后续扩展")
data_log = ""
if params:
data_log = f"params: {params}"
if data:
data_log = f"data: {data}"
if json:
data_log = f"json: {json}"
if kwargs:
data_log += f"\nkwargs: {kwargs}"
current_app.logger.info("\n---------- request info ----------\n"
f"url: {url}\n"
f"method: {method}\n"
f"headers: {headers}\n"
f"{data_log}"
)
try:
response = requests.request(**request_body, timeout=30, **kwargs)
except Exception as e:
current_app.logger.warning(f"请求发生异常!!!")
raise Exception(f"request exception {str(e)}")
if response.status_code == 200:
current_app.logger.info("\n---------- response info ----------\n"
f"status: {response.status_code}\n"
f"headers: {response.headers}\n"
f"body: {response.text}")
else:
current_app.logger.warning(f"请求失败!!! 返回码不为200, 状态码为: {response.status_code}")
current_app.logger.warning("\n---------- response info ----------\n"
f"text: {response.text}\n"
f"raw: {response.raw}")
raise ValueError("返回码不为200")
try:
# 返回为字典类型
return response.json()
except requests.exceptions.JSONDecodeError:
current_app.logger.warning("响应参数不为json,返回响应 response对象")
return response
def make_request_body(server_name, data_dict: dict):
"""
:param server_name: 服务名, str类型, 在settings中配置的服务域名
:param data_dict: 接口的请求参数字典,dict类型
:return: 返回 request body
"""
# 获取配置中的服务器域名,拼接path
url = current_app.config[server_name] + data_dict.get("path", "")
method = data_dict.get("method", "get")
headers = data_dict.get("headers", {})
params = data_dict.get("params", {})
data = data_dict.get("data", {})
json = data_dict.get("json", {})
request_body = {
"url": url,
"method": method,
"headers": headers,
"params": params,
"data": data,
"json": json
}
return request_body
响应模块,用于返回统一格式的 json 数据
# common/response_handler.py
from flask import jsonify, current_app
def response_ok(data):
response = {"code": 0, "msg": "ok", "data": data}
current_app.logger.info(response)
return jsonify(response)
def response_error(msg):
response = {"code": -1, "msg": msg}
current_app.logger.info(response)
return jsonify(response)
我要构造用户相关的数据,那么我们就在 apps 下创建 user 包,里面创建 user_view、user_service、user_data、user_model 四个模块
user_view 用于创建蓝图,创建视图函数,绑定路由,接受请求参数,返回响应出参数
from flask import Blueprint, request
from common.response_handler import response_error, response_ok
from .user_model import add_student, query_student, clear_student, get_city, get_area
from .user_service import get_uuid
from .user_service import req_get_student
user_bp = Blueprint("user", __name__, url_prefix="/user")
@user_bp.route("/index", methods=["GET"])
def index():
data = {str(get_uuid()): req_get_student()["data"]}
return response_ok(data)
@user_bp.route("/add", methods=["GET"])
def add():
name = add_student()
return response_ok(f"add student {name} success")
@user_bp.route("/info", methods=["GET"])
def info():
data = query_student()
return response_ok(data)
@user_bp.route("/del", methods=["GET"])
def clear():
clear_student()
return response_ok("clear student success")
@user_bp.route("/city", methods=["GET"])
def city():
data = get_city()
return response_ok(data)
@user_bp.route("/area", methods=["GET"])
def area():
city_id = request.args.get("city_id")
if city_id:
data = get_area(int(city_id))
return response_ok(data)
else:
return response_error("缺少参数:city_id")
user_data 用于存放请求开发接口的参数字典
# apps/user/user_data.py
user_data = {
"get_student":
dict(path="/api/student",
method="get",
headers={},
params={"test": "test"}),
"post_student":
dict(path="/api/student",
method="post",
headers={},
data={"test1": "test1"}),
"delete_student":
dict(path="/api/student",
method="get",
headers={},
json={"test2": "test2"})
}
user_service 用于处理请求参数,以及发送请求调用开发接口
apps/user/user_service.py
from uuid import uuid4
from common.request_handler import make_request_body, send_request
from .user_data import user_data
def get_uuid():
return uuid4()
# 引入请求字典,修改请求参数,发送请求,请求项目自己的接口,模拟调用开发接口
def req_get_student():
data_dict = user_data["get_student"]
data_dict["params"]["test"] = "xpcs"
response = send_request(make_request_body("URL_API_BACKEND", data_dict))
return response
user_model 用于与数据库交互,查询或更新数据
from common.faker_helper import get_name
from random import randint
from common.sql_helper import get_pool
def add_student():
name = get_name()
age = randint(18, 65)
sql_str = f"""insert into test_flask(name, age) values (%s, %s)"""
get_pool("DB_API_AUTO").execute_dml(sql_str, name, age)
return name
def query_student():
sql_str = f"""select * from test_flask order by id desc limit 10"""
student_list = get_pool("DB_API_AUTO").fetchall(sql_str)
return student_list
def clear_student():
sql_str = f"""delete from test_flask"""
get_pool("DB_API_AUTO").execute_dml(sql_str)
def get_city():
sql_str = f"""select * from flask_city"""
city_list = get_pool("DB_API_AUTO").fetchall(sql_str)
return city_list
def get_area(city_id):
sql_str = f"""select * from flask_area where city_id = %s"""
area_list = get_pool("DB_API_AUTO").fetchall(sql_str, city_id)
return area_list
from apps import create_app
app = create_app("production")
if __name__ == '__main__':
app.run(host="0.0.0.0", port=8090)
我们需要将项目部署到 uwsgi 服务器,但是 windows 不支持 uwsgi 服务器 = =, 那么我们就部署到 linux
win10、win11 用户,可直接安装 wsl (Windows Subsystem Linux)号称最牛逼的 linux 版本
我们是 windows11 系统,管理员运行 cmd
wsl --install 安装,默认安装 ubuntu
提示无法解析服务器的名称或地址 # 我们修改无线网卡的 DNS 为 114.114.114.114 解决
安装成功,重启电脑
重启后,会打开终端,输入用户名和密码,完成初始化
以后就可通过 cmd 中,输入 wsl,进入 linux 子系统
在 wsl 中执行如下
sudo apt update # 更新系统包
sudo apt install python3-pip # 安装pip python 工具包
sudo apt install net-tools # 安装ifconfig 可查看linux 地址
python3 --version 系统自带python3.10
pip install uwsgi # 安装uwsgi,安装完成
配置 uwsgi.ini
[uwsgi]
# 使用http协议,对外暴露端口8090 # 默认IP地址为服务器地址
# http = :8090
# 对外暴露socket与nginx交互
socket = :8090
# 项目地址
chdir = /mnt/d/2024/pythonCode/gitee/api_backend
# 启动模块名和APP
module = app:app
# 主进程
master = true
# 单进程单线程情况下,一个请求未处理完,另一个请求进来会阻塞,所以开启多进程多线程
# 进程数
processes = 2
# 线程数
threads = 2
运行服务,后台运行,退出 wsl 终端后,服务不会停止
uwsgi --ini uwsgi.ini &
但是当前还不能访问,因为我们对外暴露的是 socket
如果暴露 http,则可通过浏览器直接访问了
如我们 wsl 地址 172.20.95.252
http://172.20.95.252:8090/api/index 即可访问服务
我们不想对外暴露服务端口,且访问不想带端口号,那么就安装 nginx,监听 80 端口,进行转发
# nginx 常用操作
# 安装 nginx
sudo apt install nginx
# 查看版本
nginx -v
# 启动nginx
sudo systemctl start nginx
# 设置开机启动
sudo systemctl enable nginx
# 查看nginx状态
sudo systemctl status nginx
# 停止nginx
sudo systemctl stop nginx
# 修改了配置文件,重新加载nginx
sudo systemctl reload nginx
cd /etc/nginx
sudo vi nginx.conf
# 在http中添加如下server
http {
access_log /var/log/nginx/access.log;
error_log /var/log/nginx/error.log;
include /etc/nginx/conf.d/*.conf;
include /etc/nginx/sites-enabled/*;
server {
listen 80;
server_name "api.cn";
location /api/ {
include uwsgi_params;
uwsgi_pass localhost:8090;
}
}
}
启动 nginx
sudo systemctl start nginx
配置 windows 客户端 HOST: 172.20.95.252 api.cn
浏览器输入 http://api.cn/api/index 访问服务
app 初始化
创建应用对象,加载配置,绑定路由到 endpoint(视图函数名),app.viewfunctions,存放 endpoint 与 视图函数对应关系,即路由可以映射到视图函数
蓝图作用
目录划分,即业务划分,公共前缀 prefix 划分,小范围应用 before_request、after_request 等装饰
上下文
外面一个请求进来,会分别生成两个上下文 ctx,请求上下文和应用上下文,类型是 localStack(可以简单理解为 thread.local,为每个线程开辟一个空间存储变量,后面线程读取到的还是自己线程所定义的变量)这里可以把每个请求理解为是一个线程
请求上下文管理 request_ctx_stack :包含 request 和 session,视图函数会从请求上下文中取请求数据 request.args 、reuqest.form、request.json
app(应用)上下文管理 _app_ctx_stack : 包含 app 和 g,获取 app 的配置信息,current_app.config[XX] 获取
g 对象 # 这个目前我还没使用到
在一次请求的周期,可以在 g 中设置值,在本次的请求周期中都可以读取或复制, 相当于是一次请求周期的"全局变量"