1、首先安装 redis 相关库版本:
redis-py-cluster==1.3.5
通过这里https://github.com/Grokzen/redis-py-cluster/releases1.3.5 版本的,这里下载的是 redis-py-cluster下载较低版本的安装包,自行安装,隐约记得之前是 1.3.5
tar xf redis-py-cluster-1.3.5.tar.gz
cd redis-py-cluster-1.3.5
python3 setup.py install
redis==2.10.6
通过这里https://github.com/andymccurdy/redis-py 下载指定的 redis-py 包(redis-py-2.10.6)
2、创建 python 连接 redis cluster 集群脚本并通过 flask: web 框架创建简单服务接口
# -*- coding: utf-8 -*-
#!/usr/bin/python3
from rediscluster import StrictRedisCluster
from flask import Flask
#创建一个服务,把当前这个python文件当做一个服务
app = Flask(__name__)
def redis_cluster():
redis_nodes = [
{"host":"host1", "port":port},
{"host":"host2", "port":port},
{"host":"host3", "port":port},
{"host":"host4", "port":port},
{"host":"host5", "port":port},
{"host":"host6", "port":port},
]
redis_con = None
# 创建连接
try:
redis_con = StrictRedisCluster(startup_nodes=redis_nodes, max_connections=60000, decode_responses=True)
#print(redis_con.cluster_info())
#print(redis_con.cluster_slots())
except Exception as e:
print("conn error:{}".format(e))
return redis_con
def process():
r = redis_cluster()
r.exists('key')# redis判断key是否存在。
#print(exists)
keyLists = r.hgetall('key')
#print(keyLists) #
return repr(keyLists)
# @server.route()可以将普通函数转变为服务连接redis接口的路径、请求方式
@app.route('/', methods=["GET"])
def index():
return process()
pass
if __name__ == "__main__":
app.run(host='0.0.0.0', port=20001, debug=True)
3、创建 locust 压测 python 脚本
# -*- coding: utf-8 -*-
#!/usr/bin/python3
from locust import HttpUser,TaskSet,task
import json
from json import JSONDecodeError
#创建任务类
class UserBehavior(TaskSet):
# @task装饰器加到定义的方法前面表示这个方法就是一个可执行任务,装饰方法中可以添加数字(@task(2)),表示执行任务的执行次数的比例
@task
def test_redisInterface(self):
"只查询redis接口"
#请求url
request_url = '/'
#print(response)
with self.client.get(url=request_url,catch_response=True) as response:
try:
#assert response.json()["code"] == 200
if response.status_code == 200:
response.success()
except JSONDecodeError:
response.failure("Response could not be decoded as JSON")
# 创建用户类
class websiteuser(HttpUser):
"""自定义Locust类,可以设置Locust的参数。"""
tasks = [UserBehavior]
#host = "http://host:port/" # 被测服务器地址
min_wait = 100
# 最小等待时间,即至少等待多少秒后Locust选择执行一个任务。
max_wait = 200
# 最大等待时间,即至多等待多少秒后Locust选择执行一个任务。
#time_limit = 300
3、使用 locust 压测工具:
启动一个 Locust 工作节点:
locust -f locust_test.py --master
本机启动利用多个核心 启动多个进程方式
locust -f locust_test.py --worker
启动和执行
1)启动
可以在命令行启动,cmd 命令框,进入此文件的目录输入:
locust -f locust_test.py --host="http://*****.com"
-f: 指定性能测试脚本文件的绝对路径。
–host: 指定被测试应用的 URL 的地址,就是测试项目的 host 地址。
参考文档:https://www.cnblogs.com/zhaoyingjie/p/16926078.html
参考文档:http://www.manongjc.com/detail/61-jcvheidzpitxjgl.html
参考文档:https://blog.csdn.net/liuhuayang/article/details/119745827