1、首先安装 redis 相关库版本:
redis-py-cluster==1.3.5
通过这里https://github.com/Grokzen/redis-py-cluster/releases1.3.5 版本的,这里下载的是 redis-py-cluster下载较低版本的安装包,自行安装,隐约记得之前是 1.3.5
tar xf redis-py-cluster-1.3.5.tar.gz
cd redis-py-cluster-1.3.5
python3 setup.py install

redis==2.10.6
通过这里https://github.com/andymccurdy/redis-py 下载指定的 redis-py 包(redis-py-2.10.6)

2、创建 python 连接 redis cluster 集群脚本并通过 flask: web 框架创建简单服务接口

# -*- coding: utf-8 -*-
#!/usr/bin/python3
from rediscluster import StrictRedisCluster
from flask import Flask

#创建一个服务,把当前这个python文件当做一个服务
app = Flask(__name__)

def redis_cluster():
    redis_nodes = [
        {"host":"host1", "port":port},
        {"host":"host2", "port":port},
        {"host":"host3", "port":port},
        {"host":"host4", "port":port},
        {"host":"host5", "port":port},
        {"host":"host6", "port":port},
    ]
    redis_con = None
    # 创建连接
    try:
        redis_con = StrictRedisCluster(startup_nodes=redis_nodes, max_connections=60000, decode_responses=True)
        #print(redis_con.cluster_info())
        #print(redis_con.cluster_slots())
    except Exception as e:
        print("conn error:{}".format(e))
    return redis_con

def process():
    r = redis_cluster()
    r.exists('key')# redis判断key是否存在。
    #print(exists)
    keyLists = r.hgetall('key')
    #print(keyLists)  #
    return repr(keyLists)


# @server.route()可以将普通函数转变为服务连接redis接口的路径、请求方式
@app.route('/', methods=["GET"])
def index():
    return process()
    pass

if __name__ == "__main__":
    app.run(host='0.0.0.0', port=20001, debug=True)

3、创建 locust 压测 python 脚本

# -*- coding: utf-8 -*-
#!/usr/bin/python3
from locust import HttpUser,TaskSet,task
import json
from json import JSONDecodeError
#创建任务类
class UserBehavior(TaskSet):

    # @task装饰器加到定义的方法前面表示这个方法就是一个可执行任务,装饰方法中可以添加数字(@task(2)),表示执行任务的执行次数的比例
    @task
    def test_redisInterface(self):
        "只查询redis接口"
        #请求url
        request_url = '/'
        #print(response)
        with self.client.get(url=request_url,catch_response=True) as response:
            try:
            #assert response.json()["code"] == 200
                if response.status_code == 200:
                    response.success()
            except JSONDecodeError:
                response.failure("Response could not be decoded as JSON")

# 创建用户类
class websiteuser(HttpUser):
    """自定义Locust类,可以设置Locust的参数。"""
    tasks = [UserBehavior]
    #host = "http://host:port/"  # 被测服务器地址
    min_wait = 100
    # 最小等待时间,即至少等待多少秒后Locust选择执行一个任务。
    max_wait = 200
    # 最大等待时间,即至多等待多少秒后Locust选择执行一个任务。
    #time_limit = 300

3、使用 locust 压测工具:
启动一个 Locust 工作节点:
locust -f locust_test.py --master
本机启动利用多个核心 启动多个进程方式
locust -f locust_test.py --worker

启动和执行
1)启动
可以在命令行启动,cmd 命令框,进入此文件的目录输入:
locust -f locust_test.py --host="http://*****.com"
-f: 指定性能测试脚本文件的绝对路径。
–host: 指定被测试应用的 URL 的地址,就是测试项目的 host 地址。

参考文档:https://www.cnblogs.com/zhaoyingjie/p/16926078.html
参考文档:http://www.manongjc.com/detail/61-jcvheidzpitxjgl.html
参考文档:https://blog.csdn.net/liuhuayang/article/details/119745827


↙↙↙阅读原文可查看相关链接,并与作者交流