「原创声明:保留所有权利,禁止转载」
上游数据
- MSK Kafka
测试场景
- 基准测试:在当前云端台量 X 万的条件下,kafka 以 X 个/S 发送数据,其中 X 个命中发送给政府,探测系统各性能指标表现,验证延迟和丢包率是否满足。
- 压力测试:预估未来 X 年云端台量会达到 X 万台,kafka 以 X 个/S 发送数据,其中 X 个命中发送给政府,探测系统各性能指标表现,验证延迟和丢包率是否满足。
数据丢包的主函数脚本
import time
import asyncio
from concurrent.futures import ThreadPoolExecutor
from DataMock import KafkaMsgMock
from Percentages import Percentage
import random
from confluent_kafka import Producer
import json
import datetime
mocInstance = KafkaMsgMock()
percIns = Percentage()
task_list = []
total_lock = asyncio.Lock()
total = 0
msglist = []
wait = 0
resource = 0
previous_time = int(time.time())
task_completed = False # 添加任务完成标志
log_completed = False # 添加日志打印标志
# 此处定义期望的发送速率
maxlimted = 90
minlimted = 70
totalLimit = 10000
# kafka连接信息
conf = {
'bootstrap.servers': 'XXXXXX',
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'SCRAM-SHA-512',
'sasl.username': 'XXXXXX',
'sasl.password': 'XXXXXX'
}
producer = Producer(conf)
# 创建默认任务因子
def task_paras():
for m_Type in ["alarm", "fault", "serviceModeChange"]:
for contr_type in ['LCE','GCE','DTU','ESC']:
count = percIns.calculate(controller=contr_type, msgType=m_Type, total=5000)
if count > 0:
task_list.append({"controller": contr_type, "msgType": m_Type, "count": count})
async def utcTime():
return datetime.datetime.utcnow().isoformat()[0:23] + 'Z'
# 定义异步调用装饰器
def async_call(func):
async def wrapper(*args, **kwargs):
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, func, *args, **kwargs)
return wrapper
@async_call
def comon_msg(controller, msgType, count):
return mocInstance.comon_msg(controller=controller, msgType=msgType, count=count)
@async_call
def kce_msg(msgType, count):
return mocInstance.kce_msg(msgType=msgType, count=count)
@async_call
def kcecpuc_msg(msgType, count):
return mocInstance.kcecpuc_msg(msgType=msgType, count=count)
# 发送数据异步函数修正
async def send(msg, producer):
global total, wait, task_completed, previous_time, log_completed
try:
topic, data = msg
data['Param']['Timestamp'] = await utcTime()
print(f"Sending message to topic {topic}: {data}")
if wait > 0:
await asyncio.sleep(wait * 0.001)
# 检查是否达到发送上限
if not task_completed:
async with total_lock:
total += 1
if total <= totalLimit: # 设置发送上限为XXXXXX
producer.produce("gov-" + topic, json.dumps(data).encode('utf-8'))
# 检查是否达到发送上限
if not task_completed:
if total >= totalLimit: # 设置发送上限为XXXXXX
if not log_completed:
print(f"总共发送了{total}条消息,发送完毕。")
log_completed = True
task_completed = True # 设置任务完成标志
# 更新 previous_time
curr_time = int(time.time())
if curr_time != previous_time and not task_completed:
previous_time = curr_time
except Exception as e:
print(f"Error sending message '{msg}': {e}")
# 发送数据批处理修正
async def send_batched(msgs, producer):
await asyncio.gather(*[send(msg, producer) for msg in msgs])
producer.flush()
async def request_msg():
global msglist, task_completed
while not task_completed:
if len(msglist) < 300000:
for unit in task_list:
if unit['controller'] == 'KCE':
msglist += await kce_msg(unit['msgType'], unit['count'])
elif unit['controller'] == 'KCECPUC':
msglist += await kcecpuc_msg(unit['msgType'], unit['count'])
else:
msglist += await comon_msg(unit['controller'], unit['msgType'], unit['count'])
random.shuffle(msglist)
await asyncio.sleep(0.01)
async def send_data(producer):
global msglist, resource, task_completed
while not task_completed:
if len(msglist) > 0:
if resource > 15000:
resource = 15000
if len(msglist) > (1000 + int(resource)):
msgs = msglist[:(1000 + int(resource))]
msglist = msglist[(1000 + int(resource)):]
else:
msgs = msglist
msglist = msglist[len(msglist):-1]
await send_batched(msgs, producer)
await asyncio.sleep(0.01)
# 统计函数修正
async def statistics():
global previous_time, task_completed
start_time = int(time.time())
previous_time = start_time
time.sleep(1)
while not task_completed:
global total, wait, msglist, resource
if total != 0:
curr_time = int(time.time())
if (curr_time - start_time) == 0:
continue
if curr_time != previous_time:
speed = int(total / (curr_time - start_time))
if speed > maxlimted:
if wait <= 100000:
if (speed / maxlimted) > 1.5:
wait += 500
if resource > 100:
resource -= 100
elif (speed / maxlimted) > 1.15:
wait += 100
if resource > 20:
resource -= 20
elif (speed / maxlimted) > 1.10:
wait += 50
if resource > 15:
resource -= 15
elif (speed / maxlimted) > 1.05:
wait += 20
if resource > 10:
resource -= 10
elif (speed / maxlimted) > 1.03:
wait += 10
if resource > 5:
resource -= 5
elif (speed / maxlimted) > 1.015:
wait += 1
if resource >= 1:
resource -= 1
elif speed < minlimted:
if wait >= 0:
if (speed / minlimted) < 0.6:
if wait > 500:
wait -= 500
else:
wait = 0
resource += 100
elif (speed / minlimted) < 0.85:
if wait > 200:
wait -= 200
else:
wait = 0
resource += 50
elif (speed / minlimted) < 0.90:
if wait > 100:
wait -= 100
else:
wait = 0
resource += 20
elif (speed / minlimted) < 0.95:
if wait > 50:
wait -= 50
else:
wait = 0
resource += 10
elif (speed / minlimted) < 0.97:
if wait > 10:
wait -= 10
else:
wait = 0
resource += 5
elif (speed / minlimted) < 0.998:
if wait >= 1:
wait -= 1
resource += 1
else:
wait = 0
print('[%s] 发送消息流速 %s个/秒, 发送总数 %s个, 速率减速补偿-%.2f, 增速补偿%s, 队列余量%s' % (
time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())),
speed, total, wait * 0.01, resource, len(msglist),
))
previous_time = curr_time
await asyncio.sleep(0.001)
async def main():
task_paras()
print("执行限速区间, 每秒 %s - %s" % (minlimted, maxlimted))
try:
# 启动 Kafka 生产者
producer = Producer(conf)
# 运行异步任务
await asyncio.gather(
request_msg(),
send_data(producer),
statistics()
)
# 检查任务是否完成
while not task_completed:
await asyncio.sleep(0.1)
finally:
# 关闭 Kafka 生产者
producer.flush()
print("Kafka生产者已关闭。")
if __name__ == "__main__":
asyncio.run(main())
各类消息的发送速率百分比
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
import os
import re
import json
#-----------------------------------------------------------------------------------------------------
# Message type topic Message number peaks (count/sec) Percentage
#
#
# Alarm message kcecpuc-kceap-alarmbuttonevent 0.366 0.03678392
# Kafka dtu-lce-alarmbutton 4.39 0.44120603
# dtu-step-alarmbutton 0.269 0.027035176
# dtu-gce-alarmbutton 0.42 0.042211055
# dtu-kce-alarmbutton 0.586 0.058894472
# Fault &Fault dtu-esc-fault 1.81 0.181909548
# recovery Kafka kcecpuc-gcd-fault 14.4 1.447236181
# kcecpuc-kcecpu_main-fault 36.3 3.648241206
# dtu-lce-fault 23 2.311557789
# dtu-step-fault 0.949 0.095376884
# dtu-sys-fault 7.49 0.752763819
# dtu-gce-fault 2.52 0.253266332
# dtu-kce-fault 20.6 2.070351759
# dtu-kse-fault 0.579 0.058190955
# Service mode tu-esc-modechange 13.3 1.336683417
# Change Kafka dtu-step-modechange 0.735 0.073869347
# dtu-lce-modechange 42 4.221105528
# dtu-gce-modechange 7.43 0.746733668
# Movement Kafka kcecpuc-kcecpu_main-carevent 64.8 6.512562814
# dtu-lce-car 695 69.84924623
# dtu-step-car 1.82 0.182914573
# dtu-gce-car 25.1 2.522613065
# dtu-kce-car 31.5 3.165829146
#------------------------------------------------------------------------------------------------------
# 1.百分比计算类
class Percentage:
def __init__(self):
'''
类初始化
'''
self.percentage = self.loadPercentage()
def loadPercentage(self):
'''
读取模板文件,为快速处理,所有模板均加载到内存
'''
dict_template = dict()
with open(os.path.abspath(os.path.dirname('Websocket')) + '/MsgTamplate/Kafka/Percentage' , 'r', encoding='utf-8') as f:
dict_template.update({ "Percentage" : json.loads(f.read().replace("\\n","").replace("\\t","").replace(" ","")) })
return json.loads(json.dumps(dict_template))
def calculate(self,controller = 'LCE', msgType = 'alarm', total = 10000 ):
'''
读取模板文件,为快速处理,所有模板均加载到内存
'''
return int(total * self.percentage.get('Percentage').get(controller).get(msgType))
if __name__ == '__main__':
p = Percentage()
count = p.calculate(controller = 'KCE',msgType = 'alarm')
print(count)
各类模板消息的数据读取操作
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
'消息生成类'
__author__ = 'data mock'
__date__ = '2022-10-09'
import os
import json
import uuid
import datetime
import random
import copy
import time, random, queue
from threading import Thread
import ast
# 1.创建Kafka消息mock服务类
class KafkaMsgMock:
def __init__(self):
'''
类初始化
'''
self.template = self.loadTemplate()
self.equipmentNumber = self.loadEquipmentNumber()
self.faultCode = self.loadFaultCode()
print('Load resources ...')
def loadTemplate(self):
'''
读取模板文件,为快速处理,所有模板均加载到内存
'''
dict_template = dict()
for i in os.listdir('./MsgTamplate/Kafka'):
if len(i) > 7: continue;
print(i)
with open(os.path.abspath(os.path.dirname(i)) + '/MsgTamplate/Kafka/' + i, 'r', encoding='utf-8') as f:
dict_template.update({ i : json.loads(f.read().replace("\\n","").replace("\\t","").replace(" ","")) })
return dict_template
def loadEquipmentNumber(self):
'''
读取ken文件,为快速处理,所有模板均加载到内存
'''
dict_template = dict()
with open(os.path.abspath(os.path.dirname('./')) + '/MsgTamplate/EQUIPMETN', 'r', encoding='utf-8') as f:
dict_template.update({ 'EQUIPMETN' : json.loads(f.read().replace("\\n","").replace("\\t","").replace(" ","")) })
return dict_template
def loadFaultCode(self):
'''
读取fault code文件,为快速处理,所有模板均加载到内存
'''
dict_template = dict()
with open(os.path.abspath(os.path.dirname('./')) + '/MsgTamplate/FAULTCODE', 'r', encoding='utf-8') as f:
dict_template.update({ 'FAULTCODE' : json.loads(f.read().replace("\\n","").replace("\\t","").replace(" ","")) })
return dict_template
def utcTime(self):
'''
make utc time for timestemp
'''
a = datetime.datetime.utcnow()
# 2020-11-06T09:10:36.000Z
#return (str(a.year) + '-' + str(a.month).zfill(2) + '-' + str(a.day).zfill(2) + 'T' + str(a.hour).zfill(
# 2) + ':' + str(a.minute).zfill(2) + ':' + str(a.second).zfill(2) + '.' + '%03dZ' % (int(a.microsecond / 1000)))
return datetime.datetime.utcnow().isoformat()[0:23] + 'Z'
def uuid(self):
'''
make uuid for timestemp
'''
return str(uuid.uuid1())
def comon_msg(self, controller = 'LCE', msgType ='random', count = 1):
'''
generate lce/gce/step/esc message
LCE is default
'''
#读取LCE模板
data = self.template[controller]
#读取设备编号列表
equipmentNumbers = self.equipmentNumber['EQUIPMETN']
#读取fault code列表
faultCodes = self.faultCode['FAULTCODE']
# list容器
msages = list()
idx = 0
while idx < count:
idx += 1
#随机设备号码
equipmentNumber = str(random.choice(equipmentNumbers[controller]))
#随机fault code
faultCode = str(random.choice(faultCodes[controller]))
# 随机消息类型
if msgType == 'random':
#随机消息模板
content = data[msgtype].copy()
msgtype = random.choice(sorted(data.keys()))
# alarm消息
elif msgType == 'alarm':
content=data['alarmTamplate'].copy()
content['Param']['UUID'] = 'testKafka0618007-alarm-' + self.uuid()
# fault 消息
elif msgType == 'fault':
content=data['faultTamplate'].copy()
content['Param']['UUID'] = 'testKafka0618007-fault-' + self.uuid()
# fault recover消息
elif msgType == 'faultRecovered':
content=data['faultRecoveredTamplate'].copy()
content['Param']['UUID'] = 'testKafka0618007-faultRecovered-' + self.uuid()
# service mode change消息
elif msgType == 'serviceModeChange':
content=data['serviceModeChangeTamplate'].copy()
content['Param']['UUID'] = 'testKafka0618007-serviceModeChange-' + self.uuid()
# movment data消息
elif msgType == 'movementData':
content=data['movementDataTamplate'].copy()
content['Param']['UUID'] = 'testKafka0618007-movementData-' + self.uuid()
# SN消息
elif msgType == 'callInUrgentSN':
content=data['serviceNeedTamplate'].copy()
content['Param']['konectEntryId'] = 'testKafka0618007-serviceNeed-' + self.uuid()
#合成消息
content['EquipmentNumber'] = equipmentNumber
# content['Param']['UUID'] = self.uuid()
if content['Param'].get('FaultCode'):
content['Param']['FaultCode'] = faultCode
content['Param']['Timestamp'] = self.utcTime()
topic = content.get('topic')
msages.append((topic, copy.deepcopy(content)))
return msages
def kce_msg(self, msgType ='random', count = 1):
'''
generate kce message
'''
#读取LCE模板
data = self.template['KCE']
#读取设备编号列表
equipmentNumbers = self.equipmentNumber['EQUIPMETN']
#读取fault code列表
faultCodes = self.faultCode['FAULTCODE']
# list容器
msages = list()
idx = 0
while idx < count:
idx += 1
#随机设备号码
equipmentNumber = str(random.choice(equipmentNumbers['KCE']))
#随机fault code
faultCode = str(random.choice(faultCodes['KCE']))
# 随机消息类型
if msgType == 'random':
#随机消息模板
msgtype = random.choice(sorted(data.keys()))
content = data[msgtype].copy()
# alarm消息
elif msgType == 'alarm':
content=data['alarmTamplate'].copy()
# fault 消息
elif msgType == 'fault':
content=data['faultTamplate'].copy()
# fault recover消息
elif msgType == 'faultRecovered':
content=data['faultRecoveredTamplate'].copy()
#合成消息
content['equipment'] = equipmentNumber
content['uid'] = self.uuid()
content['code'] = faultCode
if content['startTime']:
content['startTime'] = self.utcTime()
if content['endTime']:
content['endTime'] = self.utcTime()
topic = content.pop('topic')
msages.append((topic, copy.deepcopy(content) ))
return msages
def kcecpuc_msg(self, msgType ='random', count = 1):
'''
generate kcecpuc message
'''
#读取LCE模板
data = self.template['KCECPUC']
#读取设备编号列表
equipmentNumbers = self.equipmentNumber['EQUIPMETN']
#读取fault code列表
faultCodes = self.faultCode['FAULTCODE']
# list容器
msages = list()
idx = 0
while idx < count:
idx += 1
#随机设备号码
equipmentNumber = str(random.choice(equipmentNumbers['KCECPUC']))
#随机fault code
faultCode = str(random.choice(faultCodes['KCECPUC']))
# 随机消息类型
if msgType == 'random':
#随机消息模板
msgtype = random.choice(sorted(data.keys()))
content = data[msgtype].copy()
# alarm消息
elif msgType == 'alarm':
content=data['alarmTamplate'].copy()
# fault 消息
elif msgType == 'faultKcecpuMain':
content=data['faultKcecpuMainTamplate'].copy()
# fault recover消息
elif msgType == 'faultRecoveredKcecpuMain':
content=data['faultRecoveredKcecpuMainTamplate'].copy()
# fault 消息
elif msgType == 'faultGcd':
content=data['faultGcdTamplate'].copy()
# fault recover消息
elif msgType == 'faultRecoveredGcd':
content=data['faultRecoveredGcdTamplate'].copy()
#合成消息
content['equipment'] = equipmentNumber
content['uid'] = self.uuid().upper().replace('-','')
content['code'] = faultCode
if content['startTime']:
content['startTime'] = self.utcTime()
if content['endTime']:
content['endTime'] = self.utcTime()
topic = content.pop('topic')
msages.append((topic, copy.deepcopy(content) ))
return msages
# 1.创建websocket消息mock服务类
class WSSMsgMock:
def __init__(self):
'''
类初始化
'''
self.template = self.loadTemplate()
self.equipmentNumber = self.loadEquipmentNumber()
self.faultCode = self.loadFaultCode()
def loadTemplate(self):
'''
读取模板文件,为快速处理,所有模板均加载到内存
'''
dict_template = dict()
for i in os.listdir('./MsgTamplate/Websocket'):
if len(i) > 7: continue;
print('Load Template file %s' % i)
with open(os.path.abspath(os.path.dirname(i)) + '/MsgTamplate/Websocket/' + i, 'r',encoding='utf-8') as f:
dict_template.update({ i :json.loads(f.read().replace("\\n","").replace("\\t","").replace(" ","")) })
return dict_template
def loadEquipmentNumber(self):
'''
读取ken文件,为快速处理,所有模板均加载到内存
'''
dict_template = dict()
with open(os.path.abspath(os.path.dirname('./')) + '/MsgTamplate/EQUIPMETN', 'r', encoding='utf-8') as f:
dict_template.update({ 'EQUIPMETN' : json.loads(f.read().replace("\\n","").replace("\\t","").replace(" ","")) })
return dict_template
def loadFaultCode(self):
'''
读取fault code文件,为快速处理,所有模板均加载到内存
'''
dict_template = dict()
with open(os.path.abspath(os.path.dirname('./')) + '/MsgTamplate/FAULTCODE', 'r', encoding='utf-8') as f:
dict_template.update({ 'FAULTCODE' : json.loads(f.read().replace("\\n","").replace("\\t","").replace(" ","")) })
return dict_template
def utcTime(self):
'''
make utc time for timestemp
'''
return datetime.datetime.utcnow().isoformat()[0:23] + 'Z'
def uuid(self):
'''
make uuid for timestemp
'''
return str(uuid.uuid1())
def comon_msg(self, controller = 'LCE', msgType ='random' , count = 1):
'''
generate lce/gce/step/esc message
'''
#读取LCE模板
data = self.template[controller]
#读取设备编号列表
equipmentNumbers = self.equipmentNumber['EQUIPMETN']
#读取fault code列表
faultCodes = self.faultCode['FAULTCODE']
# list容器
msages = list()
idx = 0
while idx < count:
idx += 1
#随机设备号码
equipmentNumber = str(random.choice(equipmentNumbers[controller]))
#随机fault code
faultCode = str(random.choice(faultCodes[controller]))
# 随机消息类型
content = {}
if msgType == 'random':
#随机消息模板
msgtype = random.choice(sorted(data.keys()))
content = data[msgtype].copy()
# alarm消息
elif msgType == 'alarm':
content=data['alarmTamplate'].copy()
# fault 消息
elif msgType == 'fault':
content=data['faultTamplate'].copy()
# fault recover消息
elif msgType == 'faultRecovered':
content=data['faultRecoveredTamplate'].copy()
#合成消息
if type(content) is not dict:
content = ast.literal_eval(content)
if content:
content['equipmentNumber'] = equipmentNumber
content['param']['uuid'] = self.uuid()
if content['param'].get('faultCode'):
content['param']['faultCode'] = faultCode
content['param']['timestamp'] = self.utcTime()
msages.append(copy.deepcopy(content))
return msages
def kce_msg(self, msgType ='random', count = 1):
'''
generate kce message
'''
#读取LCE模板
data = self.template['KCE']
#读取设备编号列表
equipmentNumbers = self.equipmentNumber['EQUIPMETN']
#读取fault code列表
faultCodes = self.faultCode['FAULTCODE']
# list容器
msages = list()
idx = 0
while idx < count:
idx += 1
#随机设备号码
equipmentNumber = str(random.choice(equipmentNumbers['KCE']))
#随机fault code
faultCode = str(random.choice(faultCodes['KCE']))
# 随机消息类型
content = {}
if msgType == 'random':
#随机消息模板
msgtype = random.choice(sorted(data.keys()))
content = data[msgtype].copy()
# alarm消息
elif msgType == 'alarm':
content=data['alarmTamplate'].copy()
# fault 消息
elif msgType == 'fault':
content=data['faultTamplate'].copy()
# fault recover消息
elif msgType == 'faultRecovered':
content=data['faultRecoveredTamplate'].copy()
#合成消息
if type(content) is not dict:
content = ast.literal_eval(content)
if content:
if content.get('param').get('equipment'):
content['param']['equipment'] = equipmentNumber
elif content.get('equipmentNumber'):
content['equipmentNumber'] = equipmentNumber
elif content.get('param').get('uid'):
content['param']['uid'] = self.uuid()
elif content.get('param').get('code'):
content['param']['code'] = faultCode
elif content.get('param').get('startTime'):
content['param']['startTime'] = self.utcTime()
elif content.get('param').get('endTime'):
content['param']['endTime'] = self.utcTime()
elif content.get('param').get('timestamp'):
content['param']['timestamp'] = self.utcTime()
elif content.get('param').get('time'):
content['param']['time'] = int(time.time()*1000)
msages.append(copy.deepcopy(content))
return msages
def kcecpuc_msg(self, msgType ='random', count = 1):
'''
generate kcecpuc message
'''
#读取LCE模板
data = self.template['KCECPUC']
#读取设备编号列表
equipmentNumbers = self.equipmentNumber['EQUIPMETN']
#读取fault code列表
faultCodes = self.faultCode['FAULTCODE']
# list容器
msages = list()
idx = 0
while idx < count:
idx += 1
#随机设备号码
equipmentNumber = str(random.choice(equipmentNumbers['KCECPUC']))
#随机fault code
faultCode = str(random.choice(faultCodes['KCECPUC']))
# 随机消息类型
content = {}
if msgType == 'random':
#随机消息模板
msgtype = random.choice(sorted(data.keys()))
content = data[msgtype].copy()
# alarm消息
elif msgType == 'alarm':
content=data['alarmTamplate'].copy()
# fault 消息
elif msgType == 'fault':#'faultKcecpuMain':
content=data['faultTamplate'].copy()
# fault recover消息
elif msgType == 'faultRecovered':#'faultRecoveredKcecpuMain':
content=data['faultRecoveredTamplate'].copy()
# fault 消息
elif msgType == 'faultGcd':
content=data['faultGcdTamplate'].copy()
# fault recover消息
elif msgType == 'faultRecoveredGcd':
content=data['faultRecoveredGcdTamplate'].copy()
#合成消息
if type(content) is not dict:
content = ast.literal_eval(content)
if content:
if content.get('param').get('equipment'):
content['param']['equipment'] = equipmentNumber
elif content.get('equipmentNumber'):
content['equipmentNumber'] = equipmentNumber
elif content.get('param').get('uid'):
content['param']['uid'] = self.uuid().upper().replace('-','')
elif content.get('param').get('code'):
content['param']['code'] = faultCode
elif content.get('param').get('startTime'):
content['param']['startTime'] = self.utcTime()
elif content.get('param').get('endTime'):
content['param']['endTime'] = self.utcTime()
elif content.get('param').get('timestamp'):
content['param']['timestamp'] = self.utcTime()
elif content.get('param').get('time'):
content['param']['time'] = int(time.time()*1000)
# topic = content.pop('topic')
msages.append(copy.deepcopy(content))
return msages
def sn_msg(self, msgType ='random', controller = 'LCE', count = 1):
'''
generate sn message
LCE is default
'''
#读取LCE模板
data = self.template['SN']
#读取设备编号列表
equipmentNumbers = self.equipmentNumber['EQUIPMETN']
#读取fault code列表
faultCodes = self.faultCode['FAULTCODE']
# list容器
msages = list()
idx = 0
while idx < count:
idx += 1
#随机设备号码
equipmentNumber = str(random.choice(equipmentNumbers[controller]))
# 随机消息类型
if msgType == '--random--':
#did not supported other methods.
pass
else:
#随机消息模板
msgtype = random.choice(sorted(data.keys()))
content = data[msgtype].copy()
#合成消息
content['equipmentNumber'] = equipmentNumber
content['param']['konectEntryId'] = 'K24_KM55_00000000%7d' % random.randint(1000000,9999999)
content['param']['callInUrgentSNTime'] = self.utcTime()
#print(type(content))
msages.append(copy.deepcopy(content))
#print(msages)
return msages
if __name__ == '__main__':
wss = WSSMsgMock()
start = time.time()
msg = wss.comon_msg( controller = 'LCE', msgType = 'random',count = 1 )
print(time.time() -start)
TesterHome 为用户提供「保留所有权利,禁止转载」的选项。
除非获得原作者的单独授权,任何第三方不得转载标注了「原创声明:保留所有权利,禁止转载」的内容,否则均视为侵权。
具体请参见TesterHome 知识产权保护协议。
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
暂无回复。