性能测试工具 如何让 locust 压测工具实现压测 redis cluster 集群

萧帅 · 2023年02月22日 · 最后由 回复于 2023年02月25日 · 7594 次阅读

1、首先安装 redis 相关库版本:
redis-py-cluster==1.3.5
通过这里https://github.com/Grokzen/redis-py-cluster/releases1.3.5 版本的,这里下载的是 redis-py-cluster下载较低版本的安装包,自行安装,隐约记得之前是 1.3.5
tar xf redis-py-cluster-1.3.5.tar.gz
cd redis-py-cluster-1.3.5
python3 setup.py install

redis==2.10.6
通过这里https://github.com/andymccurdy/redis-py 下载指定的 redis-py 包(redis-py-2.10.6)

2、创建 python 连接 redis cluster 集群脚本并通过 flask: web 框架创建简单服务接口

# -*- coding: utf-8 -*-
#!/usr/bin/python3
from rediscluster import StrictRedisCluster
from flask import Flask

#创建一个服务,把当前这个python文件当做一个服务
app = Flask(__name__)

def redis_cluster():
    redis_nodes = [
        {"host":"host1", "port":port},
        {"host":"host2", "port":port},
        {"host":"host3", "port":port},
        {"host":"host4", "port":port},
        {"host":"host5", "port":port},
        {"host":"host6", "port":port},
    ]
    redis_con = None
    # 创建连接
    try:
        redis_con = StrictRedisCluster(startup_nodes=redis_nodes, max_connections=60000, decode_responses=True)
        #print(redis_con.cluster_info())
        #print(redis_con.cluster_slots())
    except Exception as e:
        print("conn error:{}".format(e))
    return redis_con

def process():
    r = redis_cluster()
    r.exists('key')# redis判断key是否存在。
    #print(exists)
    keyLists = r.hgetall('key')
    #print(keyLists)  #
    return repr(keyLists)


# @server.route()可以将普通函数转变为服务连接redis接口的路径、请求方式
@app.route('/', methods=["GET"])
def index():
    return process()
    pass

if __name__ == "__main__":
    app.run(host='0.0.0.0', port=20001, debug=True)

3、创建 locust 压测 python 脚本

# -*- coding: utf-8 -*-
#!/usr/bin/python3
from locust import HttpUser,TaskSet,task
import json
from json import JSONDecodeError
#创建任务类
class UserBehavior(TaskSet):

    # @task装饰器加到定义的方法前面表示这个方法就是一个可执行任务,装饰方法中可以添加数字(@task(2)),表示执行任务的执行次数的比例
    @task
    def test_redisInterface(self):
        "只查询redis接口"
        #请求url
        request_url = '/'
        #print(response)
        with self.client.get(url=request_url,catch_response=True) as response:
            try:
            #assert response.json()["code"] == 200
                if response.status_code == 200:
                    response.success()
            except JSONDecodeError:
                response.failure("Response could not be decoded as JSON")

# 创建用户类
class websiteuser(HttpUser):
    """自定义Locust类,可以设置Locust的参数。"""
    tasks = [UserBehavior]
    #host = "http://host:port/"  # 被测服务器地址
    min_wait = 100
    # 最小等待时间,即至少等待多少秒后Locust选择执行一个任务。
    max_wait = 200
    # 最大等待时间,即至多等待多少秒后Locust选择执行一个任务。
    #time_limit = 300

3、使用 locust 压测工具:
启动一个 Locust 工作节点:
locust -f locust_test.py --master
本机启动利用多个核心 启动多个进程方式
locust -f locust_test.py --worker

启动和执行
1)启动
可以在命令行启动,cmd 命令框,进入此文件的目录输入:
locust -f locust_test.py --host="http://*****.com"
-f: 指定性能测试脚本文件的绝对路径。
–host: 指定被测试应用的 URL 的地址,就是测试项目的 host 地址。

参考文档:https://www.cnblogs.com/zhaoyingjie/p/16926078.html
参考文档:http://www.manongjc.com/detail/61-jcvheidzpitxjgl.html
参考文档:https://blog.csdn.net/liuhuayang/article/details/119745827

共收到 8 条回复 时间 点赞

目前代码还不够完善,locust 压出来的性能不理想

有没有大佬能指导下,如何提高并发性能

你这个跟压测 redis 有什么关系,你这是压测 flask

😂 到哪都能看到万能的 chatgpt

少年 chatgpt 最近体验的一些心得 中提及了此贴 02月23日 11:45
少年 回复

万能的 chatgpt

回复

大佬指点下

少年 回复

ChatGPT 给出的方案不行

萧帅 回复

应该压测工具直连 redis,执行 redis 命令

需要 登录 后方可回复, 如果你还没有账号请点击这里 注册