写在前面

gap 7 个月了,快撑不住去外包了 ,沉下心来,继续练习专业技能,上次分享 pytest 接口自动化框架后,在平台同学们的讨论建议下,改良不少,受益匪浅。
本次分享之前工作中使用的造数工具项目(Vue + Flask)的后台部分,我在原有框架的思路上重新搭建,加入了自动注册蓝图和数据库连接池。
造数工具后台,使用了 Flask Web 应用框架,用于编写辅助测试的接口,方便生成测试数据,适用于业务线流程比较长的,经常依赖于前置数据的业务
希望通过本次分享,给想写造数据平台的同学一点参考思路,欢迎大家参与讨论,提出不足和改良建议。

目录划分

应用初始化过程

创建 flask 应用 app 对象,加载配置到 app,添加跨域到 app,添加 404 错误处理,初始化项目日志,自动查找并注册蓝图,初始化数据库连接池,返回应用 app

# apps/__init.py
from flask import Flask, Blueprint
from flask_cors import CORS
from apps.settings import Config
import pkgutil
import sys
from logging.handlers import RotatingFileHandler
import logging
import os
from common.sql_helper import init_pool
from common.response_handler import response_error


def create_app(config_name):
    app = Flask(__name__)

    # 读取配置信息
    app.config.from_object(Config.get(config_name))

    #  r'/*' 是通配符,让本服务器所有的 URL 都允许跨域请求
    CORS(app, resources=r'/*')

    # 错误处理
    app.register_error_handler(404, lambda x: response_error("sorry, page does not exist"))
    app.register_error_handler(500, lambda x: response_error("internal server error"))

    # 初始化日志
    init_log(app)

    # 注册蓝图
    register_bp(app)

    # 初始化连接池
    init_pool(app)

    # 打印路由关系表
    app.logger.info(f"路由关系 URL_MAP:\n{app.url_map}")
    app.logger.info("初始化完成, 返回APP")

    return app


def register_bp(app: Flask):
    app.logger.info("初始化蓝图")
    # 遍历apps包下的所有模块
    for filefinder, name, ispkg in pkgutil.walk_packages(__path__, __name__ + "."):
        if str(name).endswith("_view"):  # 找出_view结尾的模块
            __import__(name)  # 引入模块
            module = sys.modules[name]  # 获取模块
            for i in dir(module):  # dir()列出模块中的类、方法、变量,进行遍历
                var = getattr(module, i)  # 获取变量
                if isinstance(var, Blueprint):  # 找到蓝图, 拼接项目前缀 /api/ 进行注册
                    url_prefix = var.url_prefix if var.url_prefix else ""
                    app.register_blueprint(var, url_prefix="/api/" + url_prefix)


def init_log(app: Flask):
    app.logger.setLevel(logging.DEBUG)

    log_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "log", "flask.log")
    fmt = "%(asctime)s - %(filename)s - %(funcName)s - %(lineno)d - %(levelname)s - %(message)s"
    formatter = logging.Formatter(fmt)
    fh = RotatingFileHandler(
        log_path,
        maxBytes=1024 * 1024 * 1,
        backupCount=10,
        encoding="utf-8"
    )
    fh.setLevel(logging.INFO)
    fh.setFormatter(formatter)
    app.logger.addHandler(fh)

settings 配置

存放数据库配置,请求域名配置等信息

# apps/settings.py
class BaseConfig:
    # 解决 jsonify 返回中文unicode编码
    JSON_AS_ASCII = False
    # 配置秘钥才可使用session
    SECRET_KEY = "xpcs"


class DevConfig(BaseConfig):
    DEBUG = True
    # 开发环境配置
    ENV = "development"

    # mysql数据库配置
    DB_API_AUTO = {'host': 'localhost', 'port': 3306,
                   'db': 'api_auto', 'user': 'root',
                   'password': 'root', 'autocommit': True
                   }

    # 服务域名配置
    URL_API_BACKEND = "http://api.cn"


class ProdConfig(BaseConfig):
    # 生产环境配置  # 本机WSL
    ENV = "production"

    # mysql数据库配置
    DB_API_AUTO = {'host': '172.20.95.252', 'port': 3306,
                   'db': 'api_auto', 'user': 'xpcs',
                   'password': 'xpcs', 'autocommit': True
                   }

    # 服务域名配置
    URL_API_BACKEND = "http://api.cn"


Config = {
    "development": DevConfig,
    "production": ProdConfig
}

数据连接池

减少每次连接关闭数据库的开销,支持多线程并发操作数据库

# common/sql_helper.py
# pip install DButils # 安装数据库连接池模块
import pymysql
from dbutils.pooled_db import PooledDB
from flask import Flask


class POOL:

    def __init__(self, app: Flask, db_config: dict):
        """
        :params: app flask app
        :params: db_config 数据库配置 类型为字典
        """
        self.app = app
        try:
            # 创建数据库连接池
            self.pool = PooledDB(
                creator=pymysql,  # 使用链接数据库的模块
                maxconnections=10,  # 连接池允许的最大连接数,0和None表示不限制连接数
                mincached=1,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
                maxcached=2,  # 链接池中最多闲置的链接,0和None不限制
                blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
                ping=0,  # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never
                **db_config)
        except Exception as e:
            app.logger.warning(f"{db_config['db']}-数据库连接池创建失败, 失败原因:{e}")
            raise e

    def fetchone(self, sql_str, *args):
        """
        :param sql_str 数据库sql
        :param args 可变参数,替换sql_str中的占位符,可不传
        :return: 返回查询结果的一条记录,类型是字典; 若未查询到,则返回None
        """
        conn = self.pool.connection()
        with conn.cursor(pymysql.cursors.DictCursor) as cursor:
            self.app.logger.info(f"执行sql: {sql_str}")
            if args:
                self.app.logger.info(f"占位符参数: {args}")
            try:
                cursor.execute(sql_str, args)
                data = cursor.fetchone()
                self.app.logger.info(f"sql执行结果: {data}")
            except Exception as e:
                cursor.close()
                conn.close()
                self.app.logger.warning(f"执行sql失败!- 失败信息: {e}")
                raise e
        conn.close()
        return data

    def fetchall(self, sql_str, *args):
        """
        :param sql_str 数据库sql
        :param args 可变参数,替换sql_str中的占位符,可不传
        :return: 返回查询结果的全部记录,类型是列表,列表元素为字典
        """
        conn = self.pool.connection()
        with conn.cursor(pymysql.cursors.DictCursor) as cursor:
            self.app.logger.info(f"执行sql: {sql_str}")
            if args:
                self.app.logger.info(f"占位符参数: {args}")
            try:
                cursor.execute(sql_str, args)
                data = cursor.fetchall()
                self.app.logger.info(f"sql执行结果: {data}")
            except Exception as e:
                cursor.close()
                conn.close()
                self.app.logger.warning(f"执行sql失败!- 失败信息: {e}")
                raise e
        conn.close()
        return data

    def execute_dml(self, sql_str, *args):
        """
        function: 执行insert、update、delete
        :param sql_str 数据库sql
        :param args 可变参数,替换sql_str中的占位符,可不传
        :return: 无返回
        """
        conn = self.pool.connection()
        with conn.cursor(pymysql.cursors.DictCursor) as cursor:
            self.app.logger.info(f"执行sql: {sql_str}")
            if args:
                self.app.logger.info(f"占位符参数: {args}")
            try:
                data = cursor.execute(sql_str, args)
                # 提交操作,我们配置连接是自动提交,所以下面提交步骤也可省略
                conn.commit()
                self.app.logger.info(f"sql执行结果: {data}")
            except Exception as e:
                cursor.close()
                conn.close()
                self.app.logger.warning(f"执行sql失败!- 失败信息: {e}")
                raise e
        conn.close()
        return data


# 单例模式,全局连接均从这里取连接池
POOLS = {}


def init_pool(app: Flask):
    app.logger.info("初始化数据库连接池")
    POOLS["DB_API_AUTO"] = POOL(app, app.config["DB_API_AUTO"])


def get_pool(name) -> POOL:
    return POOLS.get(name, None)

公共请求模块、响应模块

请求模块用于发送 http 请求,调用开发接口

# common/request_handler.py
import requests
from flask import current_app
import jsonpath


def send_request(request_body, **kwargs):
    """
    :param request_body 请求数据
    :param kwargs: 扩展支持 files 上传文件、proxy 代理等
    :return:
    """
    url = request_body["url"]
    method = request_body["method"]
    headers = request_body["headers"]
    params = request_body["params"]
    data = request_body["data"]
    json = request_body["json"]

    if not url.startswith("http://") and not url.startswith("https://"):
        raise ValueError("请求url缺少协议名")
    if method.lower() not in ("get", "post", "put", "delete"):
        raise ValueError(f"暂不支持请求方法 - {method} - 可后续扩展")

    data_log = ""
    if params:
        data_log = f"params: {params}"
    if data:
        data_log = f"data: {data}"
    if json:
        data_log = f"json: {json}"
    if kwargs:
        data_log += f"\nkwargs: {kwargs}"

    current_app.logger.info("\n----------   request  info  ----------\n"
                            f"url: {url}\n"
                            f"method: {method}\n"
                            f"headers: {headers}\n"
                            f"{data_log}"
                            )

    try:
        response = requests.request(**request_body, timeout=30, **kwargs)
    except Exception as e:
        current_app.logger.warning(f"请求发生异常!!!")
        raise Exception(f"request exception {str(e)}")

    if response.status_code == 200:
        current_app.logger.info("\n----------   response  info  ----------\n"
                                f"status: {response.status_code}\n"
                                f"headers: {response.headers}\n"
                                f"body: {response.text}")
    else:
        current_app.logger.warning(f"请求失败!!! 返回码不为200, 状态码为: {response.status_code}")
        current_app.logger.warning("\n----------   response  info  ----------\n"
                                   f"text: {response.text}\n"
                                   f"raw: {response.raw}")
        raise ValueError("返回码不为200")
    try:
        # 返回为字典类型
        return response.json()
    except requests.exceptions.JSONDecodeError:
        current_app.logger.warning("响应参数不为json,返回响应 response对象")
        return response


def make_request_body(server_name, data_dict: dict):
    """
    :param server_name: 服务名, str类型, 在settings中配置的服务域名
    :param data_dict: 接口的请求参数字典,dict类型
    :return: 返回 request body
    """
    # 获取配置中的服务器域名,拼接path
    url = current_app.config[server_name] + data_dict.get("path", "")
    method = data_dict.get("method", "get")
    headers = data_dict.get("headers", {})
    params = data_dict.get("params", {})
    data = data_dict.get("data", {})
    json = data_dict.get("json", {})

    request_body = {
        "url": url,
        "method": method,
        "headers": headers,
        "params": params,
        "data": data,
        "json": json
    }

    return request_body

响应模块,用于返回统一格式的 json 数据

# common/response_handler.py
from flask import jsonify, current_app


def response_ok(data):
    response = {"code": 0, "msg": "ok", "data": data}
    current_app.logger.info(response)
    return jsonify(response)


def response_error(msg):
    response = {"code": -1, "msg": msg}
    current_app.logger.info(response)
    return jsonify(response)

用户业务,view、service、data、model

我要构造用户相关的数据,那么我们就在 apps 下创建 user 包,里面创建 user_view、user_service、user_data、user_model 四个模块
user_view 用于创建蓝图,创建视图函数,绑定路由,接受请求参数,返回响应出参数

from flask import Blueprint, request
from common.response_handler import response_error, response_ok
from .user_model import add_student, query_student, clear_student, get_city, get_area
from .user_service import get_uuid
from .user_service import req_get_student


user_bp = Blueprint("user", __name__, url_prefix="/user")


@user_bp.route("/index", methods=["GET"])
def index():
    data = {str(get_uuid()): req_get_student()["data"]}
    return response_ok(data)


@user_bp.route("/add", methods=["GET"])
def add():
    name = add_student()
    return response_ok(f"add student {name} success")


@user_bp.route("/info", methods=["GET"])
def info():
    data = query_student()
    return response_ok(data)


@user_bp.route("/del", methods=["GET"])
def clear():
    clear_student()
    return response_ok("clear student success")


@user_bp.route("/city", methods=["GET"])
def city():
    data = get_city()
    return response_ok(data)


@user_bp.route("/area", methods=["GET"])
def area():
    city_id = request.args.get("city_id")
    if city_id:
        data = get_area(int(city_id))
        return response_ok(data)
    else:
        return response_error("缺少参数:city_id")


user_data 用于存放请求开发接口的参数字典

# apps/user/user_data.py
user_data = {
    "get_student":
        dict(path="/api/student",
             method="get",
             headers={},
             params={"test": "test"}),
    "post_student":
        dict(path="/api/student",
             method="post",
             headers={},
             data={"test1": "test1"}),
    "delete_student":
        dict(path="/api/student",
             method="get",
             headers={},
             json={"test2": "test2"})
}

user_service 用于处理请求参数,以及发送请求调用开发接口

apps/user/user_service.py
from uuid import uuid4
from common.request_handler import make_request_body, send_request
from .user_data import user_data


def get_uuid():
    return uuid4()

# 引入请求字典,修改请求参数,发送请求,请求项目自己的接口,模拟调用开发接口
def req_get_student():
    data_dict = user_data["get_student"]
    data_dict["params"]["test"] = "xpcs"
    response = send_request(make_request_body("URL_API_BACKEND", data_dict))
    return response

user_model 用于与数据库交互,查询或更新数据

from common.faker_helper import get_name
from random import randint
from common.sql_helper import get_pool


def add_student():
    name = get_name()
    age = randint(18, 65)
    sql_str = f"""insert into test_flask(name, age) values (%s, %s)"""
    get_pool("DB_API_AUTO").execute_dml(sql_str, name, age)
    return name


def query_student():
    sql_str = f"""select * from test_flask order by id desc limit 10"""
    student_list = get_pool("DB_API_AUTO").fetchall(sql_str)
    return student_list


def clear_student():
    sql_str = f"""delete from test_flask"""
    get_pool("DB_API_AUTO").execute_dml(sql_str)


def get_city():
    sql_str = f"""select * from flask_city"""
    city_list = get_pool("DB_API_AUTO").fetchall(sql_str)
    return city_list


def get_area(city_id):
    sql_str = f"""select * from flask_area where city_id = %s"""
    area_list = get_pool("DB_API_AUTO").fetchall(sql_str, city_id)
    return area_list

程序入口

from apps import create_app

app = create_app("production")

if __name__ == '__main__':
    app.run(host="0.0.0.0", port=8090)

wsl

我们需要将项目部署到 uwsgi 服务器,但是 windows 不支持 uwsgi 服务器 = =, 那么我们就部署到 linux
win10、win11 用户,可直接安装 wsl (Windows Subsystem Linux)号称最牛逼的 linux 版本

我们是 windows11 系统,管理员运行 cmd
wsl --install 安装,默认安装 ubuntu
提示无法解析服务器的名称或地址 # 我们修改无线网卡的 DNS 为 114.114.114.114 解决

安装成功,重启电脑

重启后,会打开终端,输入用户名和密码,完成初始化


以后就可通过 cmd 中,输入 wsl,进入 linux 子系统

uwsgi 配置与运行

在 wsl 中执行如下

sudo apt update # 更新系统包
sudo apt install python3-pip  # 安装pip python 工具包
sudo apt install net-tools # 安装ifconfig 可查看linux 地址
python3 --version 系统自带python3.10
pip install uwsgi  # 安装uwsgi,安装完成

配置 uwsgi.ini

[uwsgi]
# 使用http协议,对外暴露端口8090 # 默认IP地址为服务器地址
# http = :8090
# 对外暴露socket与nginx交互
socket = :8090
# 项目地址
chdir = /mnt/d/2024/pythonCode/gitee/api_backend
# 启动模块名和APP
module = app:app
# 主进程
master = true
# 单进程单线程情况下,一个请求未处理完,另一个请求进来会阻塞,所以开启多进程多线程
# 进程数
processes = 2
# 线程数
threads = 2

运行服务,后台运行,退出 wsl 终端后,服务不会停止
uwsgi --ini uwsgi.ini &
但是当前还不能访问,因为我们对外暴露的是 socket

如果暴露 http,则可通过浏览器直接访问了
如我们 wsl 地址 172.20.95.252

http://172.20.95.252:8090/api/index 即可访问服务

安装 nginx

我们不想对外暴露服务端口,且访问不想带端口号,那么就安装 nginx,监听 80 端口,进行转发

# nginx 常用操作
# 安装 nginx
sudo apt install nginx
# 查看版本
nginx -v
# 启动nginx
sudo systemctl start nginx
# 设置开机启动
sudo systemctl enable nginx
# 查看nginx状态
sudo systemctl status nginx
# 停止nginx
sudo systemctl stop nginx
# 修改了配置文件,重新加载nginx
sudo systemctl reload nginx

修改 nginx 配置文件

cd /etc/nginx
sudo vi nginx.conf

# 在http中添加如下server
http {
        access_log /var/log/nginx/access.log;
        error_log /var/log/nginx/error.log;

        include /etc/nginx/conf.d/*.conf;
        include /etc/nginx/sites-enabled/*;

        server {
                listen 80;
                server_name "api.cn";
                location /api/ {
                        include uwsgi_params;
                        uwsgi_pass localhost:8090;
                        }
                }

}

启动 nginx
sudo systemctl start nginx
配置 windows 客户端 HOST: 172.20.95.252 api.cn
浏览器输入 http://api.cn/api/index 访问服务

PS: flask 概念(个人)理解

app 初始化
创建应用对象,加载配置,绑定路由到 endpoint(视图函数名),app.viewfunctions,存放 endpoint 与 视图函数对应关系,即路由可以映射到视图函数

蓝图作用
目录划分,即业务划分,公共前缀 prefix 划分,小范围应用 before_request、after_request 等装饰

上下文
外面一个请求进来,会分别生成两个上下文 ctx,请求上下文和应用上下文,类型是 localStack(可以简单理解为 thread.local,为每个线程开辟一个空间存储变量,后面线程读取到的还是自己线程所定义的变量)这里可以把每个请求理解为是一个线程
请求上下文管理 request_ctx_stack :包含 request 和 session,视图函数会从请求上下文中取请求数据 request.args 、reuqest.form、request.json
app(应用)上下文管理 _app_ctx_stack : 包含 app 和 g,获取 app 的配置信息,current_app.config[XX] 获取
g 对象 # 这个目前我还没使用到
在一次请求的周期,可以在 g 中设置值,在本次的请求周期中都可以读取或复制, 相当于是一次请求周期的"全局变量"


↙↙↙阅读原文可查看相关链接,并与作者交流