专栏文章 Python 进行 pymysql 封装

大海 · 2022年04月27日 · 3436 次阅读

前言

在搭建测试框架过程中,会遇到需要频繁操作数据库的情况,会用到 pymysql 进行数据库的操作,当操作的连接数过多时,会出现断连的情况。以下代码是借鉴他人的代码总结而成。

代码部分

1、封装链接池部分
from timeit import default_timer

import pymysql
from dbutils.pooled_db import PooledDB
from pymysql.cursors import DictCursor


class DB_MySQL_Pool:
    """db连接池"""
    __pool = None
    __MAX_CONNECTIONS = 100  # 创建连接池的最大数量
    __MIN_CACHED = 10  # 连接池中空闲连接的初始数量
    __MAX_CACHED = 20  # 连接池中空闲连接的最大数量
    __MAX_SHARED = 10  # 共享连接的最大数量
    __BLOCK = True  # 超过最大连接数量时候的表现,为True等待连接数量下降,为false直接报错处理
    __MAX_USAGE = 100  # 单个连接的最大重复使用次数
    __CHARSET = 'UTF8'
    '''
        set_session:可选的SQL命令列表,可用于准备
                    会话,例如[“将日期样式设置为...”,“设置时区...”]
                    重置:当连接返回池中时,应该如何重置连接
                    (False或None表示回滚以begin()开始的事务,
                    为安全起见,始终发出回滚命令)
    '''
    __RESET = True
    __SET_SESSION = ['SET AUTOCOMMIT = 1']  # 设置自动提交

    def __init__(self, host, port, user, password, database):
        if not self.__pool:
            self.__class__.__pool = PooledDB(creator=pymysql, host=host, port=port, user=user, password=password,
                                             database=database,
                                             maxconnections=self.__MAX_CONNECTIONS,
                                             mincached=self.__MIN_CACHED,
                                             maxcached=self.__MAX_CACHED,
                                             maxshared=self.__MAX_SHARED,
                                             blocking=self.__BLOCK,
                                             maxusage=self.__MAX_USAGE,
                                             setsession=self.__SET_SESSION,
                                             reset=self.__RESET,
                                             charset=self.__CHARSET)

    def get_connect(self):
        return self.__pool.connection()

2、封装 pymysql 操作
class DB_MySQL:
    def __init__(self) -> None:
        self.__host = "自己的host"
        self.__port = 3306
        self.__user = "自己的user"
        self.__password = "自己的密码"
        self.__database = "自己的数据库"
        self._log_time = True
        self._log_label = "总用时"
        self.connects_pool = DB_MySQL_Pool(
            host=self.__host, port=self.__port, user=self.__user, password=self.__password, database=self.__database)

    def __enter__(self):
        # 如果需要记录时间
        if self._log_time is True:
            self._start = default_timer()

        connect = self.connects_pool.get_connect()
        cursor = connect.cursor(pymysql.cursors.DictCursor)
        # https://blog.51cto.com/abyss/1736844
        # connect.autocommit = False # 如果使用连接池 则不能在取出后设置 而应该在创建线程池时设置
        self._connect = connect
        self._cursor = cursor
        return self

    def __exit__(self, *exc_info):
        self._connect.commit()
        self._cursor.close()
        self._connect.close()

        if self._log_time is True:
            diff = default_timer() - self._start
            print('-- %s: %.6f 秒' % (self._log_label, diff))

    def select_all(self, sql):
        """查询返回全部结果"""
        self._cursor.execute(sql)
        return self._cursor.fetchall()

    def select_one(self, sql):
        """查询返回单个结果"""
        self._cursor.execute(sql)
        return self._cursor.fetchone()

    def insert(self, sql):
        """插入数据"""
        res = self._cursor.execute(sql)
        return res

3、完整代码
from timeit import default_timer

import pymysql
from dbutils.pooled_db import PooledDB
from pymysql.cursors import DictCursor


class DB_MySQL_Pool:
    """db连接池"""
    __pool = None
    __MAX_CONNECTIONS = 100  # 创建连接池的最大数量
    __MIN_CACHED = 10  # 连接池中空闲连接的初始数量
    __MAX_CACHED = 20  # 连接池中空闲连接的最大数量
    __MAX_SHARED = 10  # 共享连接的最大数量
    __BLOCK = True  # 超过最大连接数量时候的表现,为True等待连接数量下降,为false直接报错处理
    __MAX_USAGE = 100  # 单个连接的最大重复使用次数
    __CHARSET = 'UTF8'
    '''
        set_session:可选的SQL命令列表,可用于准备
                    会话,例如[“将日期样式设置为...”,“设置时区...”]
                    重置:当连接返回池中时,应该如何重置连接
                    (False或None表示回滚以begin()开始的事务,
                    为安全起见,始终发出回滚命令)
    '''
    __RESET = True
    __SET_SESSION = ['SET AUTOCOMMIT = 1']  # 设置自动提交

    def __init__(self, host, port, user, password, database):
        if not self.__pool:
            self.__class__.__pool = PooledDB(creator=pymysql, host=host, port=port, user=user, password=password,
                                             database=database,
                                             maxconnections=self.__MAX_CONNECTIONS,
                                             mincached=self.__MIN_CACHED,
                                             maxcached=self.__MAX_CACHED,
                                             maxshared=self.__MAX_SHARED,
                                             blocking=self.__BLOCK,
                                             maxusage=self.__MAX_USAGE,
                                             setsession=self.__SET_SESSION,
                                             reset=self.__RESET,
                                             charset=self.__CHARSET)

    def get_connect(self):
        return self.__pool.connection()


class DB_MySQL:
    def __init__(self) -> None:
        self.__host = "自己的host"
        self.__port = 3306
        self.__user = "自己的user"
        self.__password = "自己的密码"
        self.__database = "自己的数据库"
        self._log_time = True
        self._log_label = "总用时"
        self.connects_pool = DB_MySQL_Pool(
            host=self.__host, port=self.__port, user=self.__user, password=self.__password, database=self.__database)

    def __enter__(self):
        # 如果需要记录时间
        if self._log_time is True:
            self._start = default_timer()

        connect = self.connects_pool.get_connect()
        cursor = connect.cursor(pymysql.cursors.DictCursor)
        # https://blog.51cto.com/abyss/1736844
        # connect.autocommit = False # 如果使用连接池 则不能在取出后设置 而应该在创建线程池时设置
        self._connect = connect
        self._cursor = cursor
        return self

    def __exit__(self, *exc_info):
        self._connect.commit()
        self._cursor.close()
        self._connect.close()

        if self._log_time is True:
            diff = default_timer() - self._start
            print('-- %s: %.6f 秒' % (self._log_label, diff))

    def select_all(self, sql):
        """查询返回全部结果"""
        self._cursor.execute(sql)
        return self._cursor.fetchall()

    def select_one(self, sql):
        """查询返回单个结果"""
        self._cursor.execute(sql)
        return self._cursor.fetchone()

    def insert(self, sql):
        """插入数据"""
        res = self._cursor.execute(sql)
        return res


if __name__ == '__main__':
    with DB_MySQL() as db:
        print(db.select_one("""select * from banner"""))


如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
暂无回复。
需要 登录 后方可回复, 如果你还没有账号请点击这里 注册