零碎知识 模拟上游服务,推送消息到 Kafka 并进行 kafka 消费的脚本解析(丢包率)

大海 · 2025年02月13日 · 2505 次阅读

上游数据

  • MSK Kafka

测试场景

  • 基准测试:在当前云端台量 X 万的条件下,kafka 以 X 个/S 发送数据,其中 X 个命中发送给政府,探测系统各性能指标表现,验证延迟和丢包率是否满足。
  • 压力测试:预估未来 X 年云端台量会达到 X 万台,kafka 以 X 个/S 发送数据,其中 X 个命中发送给政府,探测系统各性能指标表现,验证延迟和丢包率是否满足。

数据丢包的主函数脚本

import time
import asyncio
from concurrent.futures import ThreadPoolExecutor
from DataMock import KafkaMsgMock
from Percentages import Percentage
import random
from confluent_kafka import Producer
import json
import datetime

mocInstance = KafkaMsgMock()
percIns = Percentage()
task_list = []
total_lock = asyncio.Lock()
total = 0
msglist = []
wait = 0
resource = 0
previous_time = int(time.time())
task_completed = False  # 添加任务完成标志
log_completed = False   # 添加日志打印标志

# 此处定义期望的发送速率
maxlimted = 90
minlimted = 70
totalLimit = 10000

# kafka连接信息
conf = {
    'bootstrap.servers': 'XXXXXX',
    'security.protocol': 'SASL_SSL',
    'sasl.mechanisms': 'SCRAM-SHA-512',
    'sasl.username': 'XXXXXX',
    'sasl.password': 'XXXXXX'
}

producer = Producer(conf)

# 创建默认任务因子
def task_paras():
    for m_Type in ["alarm", "fault", "serviceModeChange"]:
        for contr_type in ['LCE','GCE','DTU','ESC']:
            count = percIns.calculate(controller=contr_type, msgType=m_Type, total=5000)
            if count > 0:
                task_list.append({"controller": contr_type, "msgType": m_Type, "count": count})

async def utcTime():
    return datetime.datetime.utcnow().isoformat()[0:23] + 'Z'

# 定义异步调用装饰器
def async_call(func):
    async def wrapper(*args, **kwargs):
        loop = asyncio.get_running_loop()
        return await loop.run_in_executor(None, func, *args, **kwargs)
    return wrapper

@async_call
def comon_msg(controller, msgType, count):
    return mocInstance.comon_msg(controller=controller, msgType=msgType, count=count)

@async_call
def kce_msg(msgType, count):
    return mocInstance.kce_msg(msgType=msgType, count=count)

@async_call
def kcecpuc_msg(msgType, count):
    return mocInstance.kcecpuc_msg(msgType=msgType, count=count)

# 发送数据异步函数修正
async def send(msg, producer):
    global total, wait, task_completed, previous_time, log_completed
    try:
        topic, data = msg
        data['Param']['Timestamp'] = await utcTime()

        print(f"Sending message to topic {topic}: {data}")

        if wait > 0:
            await asyncio.sleep(wait * 0.001)

        # 检查是否达到发送上限
        if not task_completed:
            async with total_lock:
                total += 1

            if total <= totalLimit:  # 设置发送上限为XXXXXX
                producer.produce("gov-" + topic, json.dumps(data).encode('utf-8'))


        # 检查是否达到发送上限
        if not task_completed:
            if total >= totalLimit:  # 设置发送上限为XXXXXX
                if not log_completed:
                    print(f"总共发送了{total}条消息,发送完毕。")
                    log_completed = True
                task_completed = True  # 设置任务完成标志

        # 更新 previous_time
        curr_time = int(time.time())
        if curr_time != previous_time and not task_completed:
            previous_time = curr_time

    except Exception as e:
        print(f"Error sending message '{msg}': {e}")

# 发送数据批处理修正
async def send_batched(msgs, producer):
    await asyncio.gather(*[send(msg, producer) for msg in msgs])
    producer.flush()

async def request_msg():
    global msglist, task_completed
    while not task_completed:
        if len(msglist) < 300000:
            for unit in task_list:
                if unit['controller'] == 'KCE':
                    msglist += await kce_msg(unit['msgType'], unit['count'])
                elif unit['controller'] == 'KCECPUC':
                    msglist += await kcecpuc_msg(unit['msgType'], unit['count'])
                else:
                    msglist += await comon_msg(unit['controller'], unit['msgType'], unit['count'])
            random.shuffle(msglist)
        await asyncio.sleep(0.01)

async def send_data(producer):
    global msglist, resource, task_completed
    while not task_completed:
        if len(msglist) > 0:
            if resource > 15000:
                resource = 15000
            if len(msglist) > (1000 + int(resource)):
                msgs = msglist[:(1000 + int(resource))]
                msglist = msglist[(1000 + int(resource)):]
            else:
                msgs = msglist
                msglist = msglist[len(msglist):-1]

            await send_batched(msgs, producer)
        await asyncio.sleep(0.01)

# 统计函数修正
async def statistics():
    global previous_time, task_completed
    start_time = int(time.time())
    previous_time = start_time
    time.sleep(1)
    while not task_completed:
        global total, wait, msglist, resource

        if total != 0:
            curr_time = int(time.time())
            if (curr_time - start_time) == 0:
                continue

            if curr_time != previous_time:
                speed = int(total / (curr_time - start_time))

                if speed > maxlimted:
                    if wait <= 100000:
                        if (speed / maxlimted) > 1.5:
                            wait += 500
                            if resource > 100:
                                resource -= 100
                        elif (speed / maxlimted) > 1.15:
                            wait += 100
                            if resource > 20:
                                resource -= 20
                        elif (speed / maxlimted) > 1.10:
                            wait += 50
                            if resource > 15:
                                resource -= 15
                        elif (speed / maxlimted) > 1.05:
                            wait += 20
                            if resource > 10:
                                resource -= 10
                        elif (speed / maxlimted) > 1.03:
                            wait += 10
                            if resource > 5:
                                resource -= 5
                        elif (speed / maxlimted) > 1.015:
                            wait += 1
                            if resource >= 1:
                                resource -= 1

                elif speed < minlimted:
                    if wait >= 0:
                        if (speed / minlimted) < 0.6:
                            if wait > 500:
                                wait -= 500
                            else:
                                wait = 0
                            resource += 100
                        elif (speed / minlimted) < 0.85:
                            if wait > 200:
                                wait -= 200
                            else:
                                wait = 0
                            resource += 50
                        elif (speed / minlimted) < 0.90:
                            if wait > 100:
                                wait -= 100
                            else:
                                wait = 0
                            resource += 20
                        elif (speed / minlimted) < 0.95:
                            if wait > 50:
                                wait -= 50
                            else:
                                wait = 0
                            resource += 10
                        elif (speed / minlimted) < 0.97:
                            if wait > 10:
                                wait -= 10
                            else:
                                wait = 0
                            resource += 5
                        elif (speed / minlimted) < 0.998:
                            if wait >= 1:
                                wait -= 1
                            resource += 1
                    else:
                        wait = 0

                print('[%s] 发送消息流速 %s个/秒, 发送总数 %s个, 速率减速补偿-%.2f, 增速补偿%s, 队列余量%s' % (
                    time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())),
                    speed, total, wait * 0.01, resource, len(msglist),
                ))
                previous_time = curr_time

        await asyncio.sleep(0.001)

async def main():
    task_paras()
    print("执行限速区间, 每秒 %s - %s" % (minlimted, maxlimted))

    try:
        # 启动 Kafka 生产者
        producer = Producer(conf)

        # 运行异步任务
        await asyncio.gather(
            request_msg(),
            send_data(producer),
            statistics()
        )

        # 检查任务是否完成
        while not task_completed:
            await asyncio.sleep(0.1)

    finally:
        # 关闭 Kafka 生产者
        producer.flush()
        print("Kafka生产者已关闭。")

if __name__ == "__main__":
     asyncio.run(main())

各类消息的发送速率百分比

#!/usr/bin/env python3
# -*- coding:utf-8 -*- 


import os
import re
import json

#-----------------------------------------------------------------------------------------------------
# Message type          topic                            Message number peaks (count/sec)   Percentage
#               
#               
# Alarm message         kcecpuc-kceap-alarmbuttonevent   0.366                              0.03678392
# Kafka                 dtu-lce-alarmbutton              4.39                               0.44120603
#                       dtu-step-alarmbutton             0.269                              0.027035176
#                       dtu-gce-alarmbutton              0.42                               0.042211055
#                       dtu-kce-alarmbutton              0.586                              0.058894472
# Fault &Fault          dtu-esc-fault                    1.81                               0.181909548
# recovery  Kafka       kcecpuc-gcd-fault                14.4                               1.447236181
#                       kcecpuc-kcecpu_main-fault        36.3                               3.648241206
#                       dtu-lce-fault                    23                                 2.311557789
#                       dtu-step-fault                   0.949                              0.095376884
#                       dtu-sys-fault                    7.49                               0.752763819
#                       dtu-gce-fault                    2.52                               0.253266332
#                       dtu-kce-fault                    20.6                               2.070351759
#                       dtu-kse-fault                    0.579                              0.058190955
# Service mode          tu-esc-modechange                13.3                               1.336683417
# Change    Kafka       dtu-step-modechange              0.735                              0.073869347
#                       dtu-lce-modechange               42                                 4.221105528
#                       dtu-gce-modechange               7.43                               0.746733668
# Movement  Kafka       kcecpuc-kcecpu_main-carevent     64.8                               6.512562814
#                       dtu-lce-car                      695                                69.84924623
#                       dtu-step-car                     1.82                               0.182914573
#                       dtu-gce-car                      25.1                               2.522613065
#                       dtu-kce-car                      31.5                               3.165829146
#------------------------------------------------------------------------------------------------------

# 1.百分比计算类
class Percentage:
    def __init__(self):
        '''
        类初始化
        '''
        self.percentage       = self.loadPercentage()


    def loadPercentage(self):
        '''
        读取模板文件,为快速处理,所有模板均加载到内存
        '''
        dict_template = dict()
        with open(os.path.abspath(os.path.dirname('Websocket')) + '/MsgTamplate/Kafka/Percentage' , 'r', encoding='utf-8') as f:
            dict_template.update({ "Percentage" : json.loads(f.read().replace("\\n","").replace("\\t","").replace(" ","")) })

        return json.loads(json.dumps(dict_template))


    def calculate(self,controller = 'LCE', msgType = 'alarm', total = 10000 ):
        '''
        读取模板文件,为快速处理,所有模板均加载到内存
        '''
        return int(total * self.percentage.get('Percentage').get(controller).get(msgType))



if __name__ == '__main__':
    p = Percentage()
    count = p.calculate(controller = 'KCE',msgType = 'alarm')
    print(count)

各类模板消息的数据读取操作

#!/usr/bin/env python3
# -*- coding:utf-8 -*- 

'消息生成类'
__author__ = 'data mock'
__date__   = '2022-10-09'

import os
import json
import uuid
import datetime
import random
import copy
import time, random, queue
from threading import Thread
import ast


# 1.创建Kafka消息mock服务类
class KafkaMsgMock:
    def __init__(self):
        '''
        类初始化
        '''
        self.template        = self.loadTemplate()
        self.equipmentNumber = self.loadEquipmentNumber()
        self.faultCode       = self.loadFaultCode()
        print('Load resources ...')

    def loadTemplate(self):
        '''
        读取模板文件,为快速处理,所有模板均加载到内存
        '''
        dict_template = dict()
        for i in os.listdir('./MsgTamplate/Kafka'):
            if len(i) > 7: continue;  
            print(i)  
            with open(os.path.abspath(os.path.dirname(i)) + '/MsgTamplate/Kafka/' + i, 'r', encoding='utf-8') as f:
                dict_template.update({ i : json.loads(f.read().replace("\\n","").replace("\\t","").replace(" ","")) })
        return dict_template


    def loadEquipmentNumber(self):
        '''
        读取ken文件,为快速处理,所有模板均加载到内存
        '''
        dict_template = dict() 
        with open(os.path.abspath(os.path.dirname('./')) + '/MsgTamplate/EQUIPMETN', 'r', encoding='utf-8') as f:
            dict_template.update({ 'EQUIPMETN' : json.loads(f.read().replace("\\n","").replace("\\t","").replace(" ","")) })
        return dict_template

    def loadFaultCode(self):
        '''
        读取fault code文件,为快速处理,所有模板均加载到内存
        '''
        dict_template = dict() 
        with open(os.path.abspath(os.path.dirname('./')) + '/MsgTamplate/FAULTCODE', 'r', encoding='utf-8') as f:
            dict_template.update({ 'FAULTCODE' : json.loads(f.read().replace("\\n","").replace("\\t","").replace(" ","")) })

        return dict_template

    def utcTime(self):
        '''
        make utc time for timestemp
        '''
        a = datetime.datetime.utcnow()
        # 2020-11-06T09:10:36.000Z
        #return (str(a.year) + '-' + str(a.month).zfill(2) + '-' + str(a.day).zfill(2) + 'T' + str(a.hour).zfill(
        #    2) + ':' + str(a.minute).zfill(2) + ':' + str(a.second).zfill(2) + '.' + '%03dZ' % (int(a.microsecond / 1000)))
        return datetime.datetime.utcnow().isoformat()[0:23] + 'Z'

    def uuid(self):
        '''
        make uuid for timestemp
        '''
        return str(uuid.uuid1())


    def comon_msg(self, controller = 'LCE', msgType ='random', count = 1):
        '''
        generate lce/gce/step/esc message
        LCE is default
        '''
        #读取LCE模板
        data             = self.template[controller]
        #读取设备编号列表
        equipmentNumbers = self.equipmentNumber['EQUIPMETN']
        #读取fault code列表
        faultCodes       = self.faultCode['FAULTCODE']

        # list容器
        msages = list()

        idx = 0
        while idx < count:
            idx += 1

            #随机设备号码
            equipmentNumber  = str(random.choice(equipmentNumbers[controller]))
            #随机fault code
            faultCode        = str(random.choice(faultCodes[controller]))


            # 随机消息类型
            if msgType == 'random':
                #随机消息模板
                content = data[msgtype].copy()
                msgtype =  random.choice(sorted(data.keys()))

            # alarm消息
            elif msgType == 'alarm':
                content=data['alarmTamplate'].copy()
                content['Param']['UUID'] = 'testKafka0618007-alarm-' + self.uuid()

            # fault 消息            
            elif msgType == 'fault':
                content=data['faultTamplate'].copy()
                content['Param']['UUID'] = 'testKafka0618007-fault-' + self.uuid()

            # fault recover消息            
            elif msgType == 'faultRecovered':
                content=data['faultRecoveredTamplate'].copy()
                content['Param']['UUID'] = 'testKafka0618007-faultRecovered-' + self.uuid()

            # service mode change消息            
            elif msgType == 'serviceModeChange':
                content=data['serviceModeChangeTamplate'].copy()
                content['Param']['UUID'] = 'testKafka0618007-serviceModeChange-' + self.uuid()

            # movment data消息            
            elif msgType == 'movementData':
                content=data['movementDataTamplate'].copy()
                content['Param']['UUID'] = 'testKafka0618007-movementData-' + self.uuid()

            # SN消息            
            elif msgType == 'callInUrgentSN':
                content=data['serviceNeedTamplate'].copy()
                content['Param']['konectEntryId'] = 'testKafka0618007-serviceNeed-' + self.uuid()

            #合成消息
            content['EquipmentNumber'] = equipmentNumber
            # content['Param']['UUID']      = self.uuid()
            if content['Param'].get('FaultCode'):
                content['Param']['FaultCode'] = faultCode
            content['Param']['Timestamp'] = self.utcTime()

            topic = content.get('topic')
            msages.append((topic, copy.deepcopy(content)))

        return msages 


    def kce_msg(self, msgType ='random', count = 1):
        '''
        generate kce message
        '''
        #读取LCE模板
        data             = self.template['KCE']
        #读取设备编号列表
        equipmentNumbers = self.equipmentNumber['EQUIPMETN']
        #读取fault code列表
        faultCodes       = self.faultCode['FAULTCODE']

        # list容器
        msages = list()

        idx = 0
        while idx < count:
            idx += 1

            #随机设备号码
            equipmentNumber  = str(random.choice(equipmentNumbers['KCE']))
            #随机fault code
            faultCode        = str(random.choice(faultCodes['KCE']))

            # 随机消息类型
            if msgType == 'random':
                #随机消息模板
                msgtype =  random.choice(sorted(data.keys()))
                content = data[msgtype].copy()
            # alarm消息
            elif msgType == 'alarm':
                content=data['alarmTamplate'].copy()
            # fault 消息            
            elif msgType == 'fault':
                content=data['faultTamplate'].copy()

            # fault recover消息            
            elif msgType == 'faultRecovered':
                content=data['faultRecoveredTamplate'].copy()

            #合成消息
            content['equipment'] = equipmentNumber
            content['uid']       = self.uuid()
            content['code']      = faultCode
            if content['startTime']:
                content['startTime'] = self.utcTime()
            if content['endTime']:
                content['endTime']   = self.utcTime()

            topic = content.pop('topic')
            msages.append((topic, copy.deepcopy(content) ))
        return msages 


    def kcecpuc_msg(self, msgType ='random', count = 1):
        '''
        generate kcecpuc message
        '''
        #读取LCE模板
        data             = self.template['KCECPUC']
        #读取设备编号列表
        equipmentNumbers = self.equipmentNumber['EQUIPMETN']
        #读取fault code列表
        faultCodes       = self.faultCode['FAULTCODE']

        # list容器
        msages = list()

        idx = 0
        while idx < count:
            idx += 1

            #随机设备号码
            equipmentNumber  = str(random.choice(equipmentNumbers['KCECPUC']))
            #随机fault code
            faultCode        = str(random.choice(faultCodes['KCECPUC']))

            # 随机消息类型
            if msgType == 'random':
                #随机消息模板
                msgtype =  random.choice(sorted(data.keys()))
                content = data[msgtype].copy()
            # alarm消息
            elif msgType == 'alarm':
                content=data['alarmTamplate'].copy()
            # fault 消息            
            elif msgType == 'faultKcecpuMain':
                content=data['faultKcecpuMainTamplate'].copy()
            # fault recover消息            
            elif msgType == 'faultRecoveredKcecpuMain':
                content=data['faultRecoveredKcecpuMainTamplate'].copy()
            # fault 消息            
            elif msgType == 'faultGcd':
                content=data['faultGcdTamplate'].copy()
            # fault recover消息            
            elif msgType == 'faultRecoveredGcd':
                content=data['faultRecoveredGcdTamplate'].copy()

            #合成消息
            content['equipment'] = equipmentNumber
            content['uid']       = self.uuid().upper().replace('-','')
            content['code']      = faultCode
            if content['startTime']:
                content['startTime'] = self.utcTime()
            if content['endTime']:
                content['endTime']   = self.utcTime()

            topic = content.pop('topic')
            msages.append((topic, copy.deepcopy(content) ))
        return msages





# 1.创建websocket消息mock服务类
class WSSMsgMock:
    def __init__(self):
        '''
        类初始化
        '''
        self.template        = self.loadTemplate()
        self.equipmentNumber = self.loadEquipmentNumber()
        self.faultCode       = self.loadFaultCode()


    def loadTemplate(self):
        '''
        读取模板文件,为快速处理,所有模板均加载到内存
        '''
        dict_template = dict()
        for i in os.listdir('./MsgTamplate/Websocket'):
            if len(i) > 7: continue;   
            print('Load Template file %s' % i)
            with open(os.path.abspath(os.path.dirname(i)) + '/MsgTamplate/Websocket/' + i, 'r',encoding='utf-8') as f:
                dict_template.update({ i :json.loads(f.read().replace("\\n","").replace("\\t","").replace(" ","")) })

        return dict_template


    def loadEquipmentNumber(self):
        '''
        读取ken文件,为快速处理,所有模板均加载到内存
        '''
        dict_template = dict() 
        with open(os.path.abspath(os.path.dirname('./')) + '/MsgTamplate/EQUIPMETN', 'r', encoding='utf-8') as f:
            dict_template.update({ 'EQUIPMETN' : json.loads(f.read().replace("\\n","").replace("\\t","").replace(" ","")) })
        return dict_template

    def loadFaultCode(self):
        '''
        读取fault code文件,为快速处理,所有模板均加载到内存
        '''
        dict_template = dict() 
        with open(os.path.abspath(os.path.dirname('./')) + '/MsgTamplate/FAULTCODE', 'r', encoding='utf-8') as f:
            dict_template.update({ 'FAULTCODE' : json.loads(f.read().replace("\\n","").replace("\\t","").replace(" ","")) })

        return dict_template

    def utcTime(self):
        '''
        make utc time for timestemp
        '''
        return datetime.datetime.utcnow().isoformat()[0:23] + 'Z'


    def uuid(self):
        '''
        make uuid for timestemp
        '''
        return str(uuid.uuid1())


    def comon_msg(self, controller = 'LCE', msgType ='random' , count = 1):
        '''
        generate lce/gce/step/esc message
        '''
        #读取LCE模板
        data             = self.template[controller]
        #读取设备编号列表
        equipmentNumbers = self.equipmentNumber['EQUIPMETN']
        #读取fault code列表
        faultCodes       = self.faultCode['FAULTCODE']

        # list容器
        msages = list()

        idx = 0
        while idx < count:
            idx += 1

            #随机设备号码
            equipmentNumber  = str(random.choice(equipmentNumbers[controller]))
            #随机fault code
            faultCode        = str(random.choice(faultCodes[controller]))

            # 随机消息类型
            content = {}
            if msgType == 'random':
                #随机消息模板
                msgtype =  random.choice(sorted(data.keys()))
                content = data[msgtype].copy()
            # alarm消息
            elif msgType == 'alarm':
                content=data['alarmTamplate'].copy()
            # fault 消息            
            elif msgType == 'fault':
                content=data['faultTamplate'].copy()
            # fault recover消息            
            elif msgType == 'faultRecovered':
                content=data['faultRecoveredTamplate'].copy()

            #合成消息
            if type(content) is not dict:
               content = ast.literal_eval(content)

            if content:
                content['equipmentNumber']    = equipmentNumber
                content['param']['uuid']      = self.uuid()
                if content['param'].get('faultCode'):
                    content['param']['faultCode'] = faultCode
                content['param']['timestamp'] = self.utcTime()

                msages.append(copy.deepcopy(content))

        return msages 


    def kce_msg(self, msgType ='random', count = 1):
        '''
        generate kce message
        '''
        #读取LCE模板
        data             = self.template['KCE']
        #读取设备编号列表
        equipmentNumbers = self.equipmentNumber['EQUIPMETN']
        #读取fault code列表
        faultCodes       = self.faultCode['FAULTCODE']

        # list容器
        msages = list()

        idx = 0
        while idx < count:
            idx += 1

            #随机设备号码
            equipmentNumber  = str(random.choice(equipmentNumbers['KCE']))
            #随机fault code
            faultCode        = str(random.choice(faultCodes['KCE']))

            # 随机消息类型
            content = {}
            if msgType == 'random':
                #随机消息模板
                msgtype =  random.choice(sorted(data.keys()))
                content = data[msgtype].copy()
            # alarm消息
            elif msgType == 'alarm':
                content=data['alarmTamplate'].copy()
            # fault 消息            
            elif msgType == 'fault':
                content=data['faultTamplate'].copy()

            # fault recover消息            
            elif msgType == 'faultRecovered':
                content=data['faultRecoveredTamplate'].copy()

            #合成消息
            if type(content) is not dict:
               content = ast.literal_eval(content)

            if content:
                if content.get('param').get('equipment'):
                   content['param']['equipment'] = equipmentNumber
                elif content.get('equipmentNumber'):
                   content['equipmentNumber'] = equipmentNumber
                elif content.get('param').get('uid'):
                   content['param']['uid']       = self.uuid()
                elif content.get('param').get('code'):
                   content['param']['code']      = faultCode

                elif content.get('param').get('startTime'):
                    content['param']['startTime'] = self.utcTime()
                elif content.get('param').get('endTime'):
                    content['param']['endTime']   = self.utcTime()

                elif content.get('param').get('timestamp'):
                    content['param']['timestamp']   = self.utcTime()
                elif content.get('param').get('time'):
                    content['param']['time']   = int(time.time()*1000)

                msages.append(copy.deepcopy(content))
        return msages 


    def kcecpuc_msg(self, msgType ='random', count = 1):
        '''
        generate kcecpuc message
        '''
        #读取LCE模板
        data             = self.template['KCECPUC']
        #读取设备编号列表
        equipmentNumbers = self.equipmentNumber['EQUIPMETN']
        #读取fault code列表
        faultCodes       = self.faultCode['FAULTCODE']

        # list容器
        msages = list()

        idx = 0
        while idx < count:
            idx += 1

            #随机设备号码
            equipmentNumber  = str(random.choice(equipmentNumbers['KCECPUC']))
            #随机fault code
            faultCode        = str(random.choice(faultCodes['KCECPUC']))

            # 随机消息类型
            content = {}
            if msgType == 'random':
                #随机消息模板
                msgtype =  random.choice(sorted(data.keys()))
                content = data[msgtype].copy()
            # alarm消息
            elif msgType == 'alarm':
                content=data['alarmTamplate'].copy()
            # fault 消息            
            elif msgType == 'fault':#'faultKcecpuMain':
                content=data['faultTamplate'].copy()
            # fault recover消息            
            elif msgType == 'faultRecovered':#'faultRecoveredKcecpuMain':
                content=data['faultRecoveredTamplate'].copy()
            # fault 消息            
            elif msgType == 'faultGcd':
                content=data['faultGcdTamplate'].copy()
            # fault recover消息            
            elif msgType == 'faultRecoveredGcd':
                content=data['faultRecoveredGcdTamplate'].copy()

            #合成消息
            if type(content) is not dict:
               content = ast.literal_eval(content)

            if content:
                if content.get('param').get('equipment'):
                   content['param']['equipment'] = equipmentNumber
                elif content.get('equipmentNumber'):
                   content['equipmentNumber'] = equipmentNumber
                elif content.get('param').get('uid'):
                   content['param']['uid']       = self.uuid().upper().replace('-','')
                elif content.get('param').get('code'):
                   content['param']['code']      = faultCode


                elif content.get('param').get('startTime'):
                    content['param']['startTime'] = self.utcTime()
                elif content.get('param').get('endTime'):
                    content['param']['endTime']   = self.utcTime()

                elif content.get('param').get('timestamp'):
                    content['param']['timestamp']   = self.utcTime()
                elif content.get('param').get('time'):
                    content['param']['time']   = int(time.time()*1000)

                # topic = content.pop('topic')
                msages.append(copy.deepcopy(content))
        return msages


    def sn_msg(self, msgType ='random', controller = 'LCE', count = 1):
        '''
        generate sn message
        LCE is default
        '''
        #读取LCE模板
        data             = self.template['SN']
        #读取设备编号列表
        equipmentNumbers = self.equipmentNumber['EQUIPMETN']
        #读取fault code列表
        faultCodes       = self.faultCode['FAULTCODE']

        # list容器
        msages = list()

        idx = 0
        while idx < count:
            idx += 1

            #随机设备号码
            equipmentNumber  = str(random.choice(equipmentNumbers[controller]))

            # 随机消息类型
            if msgType == '--random--':
                #did not supported other methods.
                pass
            else:
            #随机消息模板
                msgtype =  random.choice(sorted(data.keys()))
                content = data[msgtype].copy()


            #合成消息
            content['equipmentNumber']    = equipmentNumber
            content['param']['konectEntryId']      = 'K24_KM55_00000000%7d' % random.randint(1000000,9999999)
            content['param']['callInUrgentSNTime'] = self.utcTime()
            #print(type(content))
            msages.append(copy.deepcopy(content))
        #print(msages)
        return msages 


if __name__ == '__main__':
    wss = WSSMsgMock()
    start = time.time()
    msg = wss.comon_msg( controller = 'LCE', msgType = 'random',count = 1 )
    print(time.time() -start)
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
暂无回复。
需要 登录 后方可回复, 如果你还没有账号请点击这里 注册