Python 使用 locust 对 mysql 语句进行压测

roc · 2020年08月24日 · 最后由 会说话的汤姆猫 回复于 2020年08月25日 · 4365 次阅读

使用 locust 对 mysql 语句进行压测

一、目标说明

1、自定义封装协议对 mysql 语句进行压测
2、此处使用第三方库 pymysql

二、实现原理

1、先简单对 pymysql 进行简单封装(若使用原生方法太多,不方便使用)
2、参照 官方原文 https://docs.locust.io/en/stable/testing-other-systems.html
3、三步走:1、构建协议客户端=》2、依赖客户端对协议方法进行装饰和数据统计 =》3、整改协议继承 user 类
4、给类的所有方法加装饰器不理解参照:https://blog.csdn.net/weixin_36179862/article/details/102829018

三、代码逻辑及演示

3.1 对 pymysql 进行个性化简单封装(mysql.py):

import pymysql
import traceback
from utils.logger import log  #可使用系统的logging
import sys
import time
#import stopit


class MySql():
    def __init__(self):

        self.conn = None  # 一个链接对象
        self.cur = None  # 链接中的游标
    def __getattr__(self, name):
        pass
        # magic method dispatcher
        #log.info("开始调用 mysql __getattr__:{}".format(name))


    def close_conn(self):
        # 链接未关闭,主动关闭
        try:
            if self.conn != None:
                self.conn.close()
                self.conn = None
        except:
            # 防止之前的 链接未关闭
            pass

    def creat_conn(self, host, user, pwd, dbname, port=3306, close=1, isnew=0):
        """
        新建一个mysql链接conn
        c= self.conn
        c.ping()  # 采用连接对象的ping()函数检测连接状态
        print('connect-%d ok' % 1)
        :param host:
        :param user:
        :param pwd:
        :param dbname:
        :param port:
        :return:
        :close=1;默认强制关闭此对象之前已经开启的链接对象和游标
        :isnew=1:强制创建一个新的链接 ,0不新建连接

        """
        #print(host, user, pwd, dbname, port,close)
        # 已存在链接,且不需要新建,就返回已有的conn
        if self.conn != None and isnew == 0:
            return self.conn

        self.close_conn()

        try:

            self.conn = pymysql.connect(host=host, user=user, passwd=pwd, db=dbname, port=port, charset='utf8')
            self.cur = self.conn.cursor()  # 并创建取一个游标
        except  Exception as e:
            #print(host,user,pwd,dbname,port)
            log.error("mysql链接数据库失败:\n" + str(traceback.print_exc()))
            raise Exception("mysql链接数据库失败1:\n" ,traceback.print_exc())
            # logger.error(str(*sys.exc_info()))
        return self.conn


    #@stopit.threading_timeoutable()
    def query(self, sql,close_cursor=1,is_except=0,iscommit=0):
        """
        :param sql: sql语句
        :param close_cursor: 失败1关闭游标,0失败不关闭关闭游标
        :param  is_except :是否抛出异常,1为抛出异常(测试用例中会报失败),0为返回异常值
        :return:
        """
        try:
            #log.info("utils.mysql准备执行的sql:\n{}".format(sql))
            self.cur.execute(sql)
            if iscommit == 1:
                #log.info("准备提交:")
                self.conn.commit()
            data = self.cur.fetchall()
            #log.info(sql,"mysql data:{}".format(data))
            return data
        except  Exception as e:
           # log.warning("utils.mysql准备执行的sql:\n{}".format(sql))
            message=str(e)
            if close_cursor ==1:
                self.cur.close()  # 关闭游标
            log.warning("utils_sql失败:{}\n{}!!!{}".format(sql,e, traceback.print_exc()))
            if is_except ==1:
                raise Exception(message[-300:])

            return ('sql失败{}\n{}'.format(sql,message[-300:]),)


            # self.conn.close()  # 释放数据库资源


# 示列化一个对象
mysql = MySql()

3.2 依赖 mysql.py 进行封装 locust 脚本(mysql_locust.py):


import time
from locust import User, env, task, between
from utils.mysql import MySql  #上面的 mysql.py文件的 mysql对象
from utils.logger import log #此处可使用 系统logging

#log.set_logpath("/mysql/locust/")

# 第一步 构建新协议的客户端 和 第二步 对类的方法进行装饰
class MysqlClient(MySql):
    """
    Simple, sample XML RPC client implementation that wraps xmlrpclib.ServerProxy and
    fires locust events on request_success and request_failure, so that all requests
    gets tracked in locust's statistics.
    """

    _locust_environment = None  #设置默认私有的环境变量

    def __getattribute__(self, item):  # __getattribute__
        """通过该方法对所有的方法进行装饰,能被locust 统计数据
        可参考:官方文档或下面链接
        https://blog.csdn.net/weixin_36179862/article/details/102829018

        """
        func = super().__getattribute__(item)  # __getattr__   __getattribute__
        if str(type(func)) == "<class 'function'>" or str(type(func)) == "<class 'method'>":
            def wrapper(*args, **kwargs):
                if 'locust_name' in kwargs.keys():  # 处理数据统计名字
                    name = kwargs['locust_name']
                    kwargs.pop('locust_name')
                else:
                    name = item

                start_time = time.time()
                try:
                    result = func(*args, **kwargs)
                except Exception as e:
                    total_time = int((time.time() - start_time) * 1000)
                    # 添加请求失败事件
                    self._locust_environment.events.request_failure.fire(request_type="mysql", name=name,
                                                                         response_time=total_time, exception=e)
                else:
                    total_time = int((time.time() - start_time) * 1000)
                    #添加请求成功事件
                    self._locust_environment.events.request_success.fire(request_type="mysql", name=name,
                                                                         response_time=total_time,
                                                                         response_length=0)
                    # In this example, I've hardcoded response_length=0. If we would want the response length to be
                    # reported correctly in the statistics, we would probably need to hook in at a lower level
                log.info("{} 耗时total_time:{} reslut:\n{}".format(name, total_time, result))
                return result

            return wrapper
        else:
            return func


# 第三步 继承user类,并初始化客户端和设置环境变量
class MysqlUser(User):
    """
    This is the abstract User class which should be subclassed. It provides an XML-RPC client
    that can be used to make XML-RPC requests that will be tracked in Locust's statistics.
    """
    abstract = True

    def __init__(self, *args, **kwargs):
        # super(MysqlUser, self).__init__(*args, **kwargs)
        super().__init__(*args, **kwargs)
        self.client = MysqlClient()
        self.client._locust_environment = self.environment


# 第四步 使用继承了user的类,产生新协议的用户
class ApiUser(MysqlUser):
    host = "http://10.2.1.95:3318/"
    wait_time = between(0.1, 1)

    def on_start(self):
        log.info("开始登陆:")
        user = 'root'
        pwd = '123456'
        host = '10.2.1.95'
        dbname = 'coreframe'
        self.client.creat_conn(host, user, pwd, dbname, port=3318)

    @task(1)
    def test_mysql(self):
        # log.info("self.client2:", self.client,self.environment)
        sql = """
              select count(*) from om_employee
                """
        name = "select count(*) from LD16090835"
        data = self.client.query(sql, locust_name=name)
        log.info("data:{}".format(data))
        # [log.info(('on_start:', s)) for s in [self.environment.stats.total]]


if __name__ == '__main__':
    import subprocess, os


    path = os.path.dirname(os.path.abspath(__file__))
    file_name = os.path.split(__file__)[-1].split(".")[0]
    default_url = 'http://10.2.1.95:3318/'
    log.info(path, file_name)

    log.info("开始运行wt")  # --no-web -c 2 -r 1 -t 3s
    # locust -f --headless -u 1000 -r 100 --run-time 1h30m --step-load --step-users 300 --step-time 20m
    # subprocess.call(
    #     'locust -f {}/{}.py  -u 1 -r 1 -t 10 -l --csv-full-history'.format(path,file_name, default_url),
    #     shell=True)
    cmd = 'locust -f {}/{}.py  --host={}  --web-host="127.0.0.1" --web-port 8090 --csv CSV_{}'.format(path, file_name,
                                                                                                      default_url,
                                                                                                      file_name)
    log.info("cmd:", cmd)
    dd = subprocess.call(cmd, shell=True)  ##--web-port 8090
    # d=subprocess.check_call('locust -f {}/{}.py  --worker'.format(path,file_name, default_url),shell=True)
    # d1=subprocess.check_call('locust -f {}/{}.py  --worker'.format(path,file_name, default_url),shell=True)
    d2 = subprocess.call(
        'locust -f {}/{}.py  --master --host={}  --web-host="127.0.0.1" --web-port 8090 --csv CSV_{}'.format(path,
                                                                                                             file_name,
                                                                                                             default_url,
                                                                                                             file_name),
        shell=True)

四、展示结果及其他说明

共收到 1 条回复 时间 点赞
需要 登录 后方可回复, 如果你还没有账号请点击这里 注册