压测代码:
import websocket
from locust import events,User,TaskSet,task,between
import hashlib,ssl
import json,time,random,string
class WebSocketClient(object):
def __init__(self):
pass
def generateClientOid(self):
origin = ''.join(random.sample(string.ascii_letters + string.digits, 32))
m = hashlib.md5()
m.update(origin.encode('utf-8'))
md = m.hexdigest()
return md
def connect(self,uri):
self.uri = uri
start_time = time.time()
try:
self.ws=websocket.WebSocketApp(self.uri,on_open=self.on_open,on_message=self.on_message,on_error=self.on_error,on_close=self.on_close)
except websocket.WebSocketTimeoutException as e:
total_time = int((time.time() - start_time) * 1000)
events.request_failure.fire(request_type="websocket", name='wsUser', response_time=total_time, exception=e)
else:
# self.ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})
return self.ws
def on_open(self,ws):
print(f'on_open')
topic = 'xxx'
# self.subscribeTopic(ws=self.ws,topic=topic)
def getFormateTime(self):
ft = time.strftime('【%Y-%m-%d %H:%M:%S】', time.localtime(time.time()))
return ft
def on_message(self,ws,msg):
# pass
# print(str(threading.enumerate()))
print(f'{self.getFormateTime()}on_message:{msg}')
# 校验结果
if json.loads(msg)['type']=='welcome':
events.request_success.fire(request_type="websocket", name='wsUser', response_time=0,response_length=0)
def on_error(self,ws,error):
print(f'on_error:{error}')
def on_close(self,ws,code,msg):
print(f'on_close:{code,msg}')
class WebsocketUser(User):
abstract = True
# stub_class = None
def __init__(self, *args, **kwargs):
super(WebsocketUser, self).__init__(*args, **kwargs)
self.client = WebSocketClient()
class TestWS(TaskSet):
def generateClientOid(self):
origin = ''.join(random.sample(string.ascii_letters + string.digits, 32))
m = hashlib.md5()
m.update(origin.encode('utf-8'))
md = m.hexdigest()
return md
@task(1)
def test(self):
uri='wss://xxxxx'
ws=self.client.connect(uri)
data = {
"id": self.generateClientOid(),
"type": "ping"
}
ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE},ping_interval=20,ping_payload=json.dumps(data))
class Mytask(WebsocketUser):
wait_time = between(0.2, 0.2)
tasks = {
TestWS:1
}
压力在跑的时候,CPU 使用正常:
暂停压力后,CPU 使用率迅速打满:
求解!
环境配置:
locust 1.6.0
Python 3.8.1 (v3.8.1:1b293b6006, Dec 18 2019, 14:08:53)
macOS Mojave