上游数据

测试场景

验证数据丢包的脚本代码

import time
import asyncio
from concurrent.futures import ThreadPoolExecutor
from DataMock import KafkaMsgMock
from Percentages import Percentage
import random
from confluent_kafka import Producer
import json
import datetime

mocInstance = KafkaMsgMock()
percIns = Percentage()
task_list = []
total = 0
msglist = []
wait = 0
resource = 0
previous_time = int(time.time())
task_completed = False  # 添加任务完成标志
log_completed = False   # 添加日志打印标志

# 此处定义期望的发送速率
maxlimted = 1000
minlimted = 990

# kafka连接信息
conf = {
    'bootstrap.servers': xxxxxxx',
    'security.protocol': xxxxxxx',
    'sasl.mechanisms': 'xxxxxxx',
    'sasl.username': xxxxxxx',
    'sasl.password': 'xxxxxxx'
}

producer = Producer(conf)

# 创建默认任务因子
def task_paras():
    for m_Type in ['alarm', 'fault', 'serviceModeChange', 'movementData']:
        for contr_type in ['LCE', 'GCE', 'STEP', 'ESC', 'KSE', 'DTU']:
            count = percIns.calculate(controller=contr_type, msgType=m_Type, total=10000)
            if count > 0:
                task_list.append({"controller": contr_type, "msgType": m_Type, "count": count})

async def utcTime():
    return datetime.datetime.utcnow().isoformat()[0:23] + 'Z'

# 定义异步调用装饰器
def async_call(func):
    async def wrapper(*args, **kwargs):
        loop = asyncio.get_running_loop()
        return await loop.run_in_executor(None, func, *args, **kwargs)
    return wrapper

@async_call
def comon_msg(controller, msgType, count):
    return mocInstance.comon_msg(controller=controller, msgType=msgType, count=count)

@async_call
def kce_msg(msgType, count):
    return mocInstance.kce_msg(msgType=msgType, count=count)

@async_call
def kcecpuc_msg(msgType, count):
    return mocInstance.kcecpuc_msg(msgType=msgType, count=count)

# 发送数据异步函数修正
async def send(msg, producer):
    global total, wait, task_completed, previous_time, log_completed
    try:
        topic, data = msg
        data['Param']['Timestamp'] = await utcTime()

        if wait > 0:
            await asyncio.sleep(wait * 0.001)
        producer.produce("gov-" + topic, json.dumps(data).encode('utf-8'))
        total += 1

        # 检查是否达到发送上限
        if not task_completed:
            if total >= 20000:  # 设置发送上限为20,000
                if not log_completed:
                    print(f"总共发送了{total}条消息,发送完毕。")
                    log_completed = True
                task_completed = True  # 设置任务完成标志

        # 更新 previous_time
        curr_time = int(time.time())
        if curr_time != previous_time and not task_completed:
            previous_time = curr_time

    except Exception as e:
        print(f"Error sending message '{msg}': {e}")

# 发送数据批处理修正
async def send_batched(msgs, producer):
    await asyncio.gather(*[send(msg, producer) for msg in msgs])
    producer.flush()

async def request_msg():
    global msglist, task_completed
    while not task_completed:
        if len(msglist) < 300000:
            for unit in task_list:
                if unit['controller'] == 'KCE':
                    msglist += await kce_msg(unit['msgType'], unit['count'])
                elif unit['controller'] == 'KCECPUC':
                    msglist += await kcecpuc_msg(unit['msgType'], unit['count'])
                else:
                    msglist += await comon_msg(unit['controller'], unit['msgType'], unit['count'])
            random.shuffle(msglist)
        await asyncio.sleep(0.01)

async def send_data(producer):
    global msglist, resource, task_completed
    while not task_completed:
        if len(msglist) > 0:
            if resource > 15000:
                resource = 15000
            if len(msglist) > (1000 + int(resource)):
                msgs = msglist[:(1000 + int(resource))]
                msglist = msglist[(1000 + int(resource)):]
            else:
                msgs = msglist
                msglist = msglist[len(msglist):-1]

            await send_batched(msgs, producer)
        await asyncio.sleep(0.01)

# 统计函数修正
async def statistics():
    global previous_time, task_completed
    start_time = int(time.time())
    previous_time = start_time
    time.sleep(1)
    while not task_completed:
        global total, wait, msglist, resource

        if total != 0:
            curr_time = int(time.time())
            if (curr_time - start_time) == 0:
                continue

            if curr_time != previous_time:
                speed = int(total / (curr_time - start_time))

                if speed > maxlimted:
                    if wait <= 100000:
                        if (speed / maxlimted) > 1.5:
                            wait += 500
                            if resource > 100:
                                resource -= 100
                        elif (speed / maxlimted) > 1.15:
                            wait += 100
                            if resource > 20:
                                resource -= 20
                        elif (speed / maxlimted) > 1.10:
                            wait += 50
                            if resource > 15:
                                resource -= 15
                        elif (speed / maxlimted) > 1.05:
                            wait += 20
                            if resource > 10:
                                resource -= 10
                        elif (speed / maxlimted) > 1.03:
                            wait += 10
                            if resource > 5:
                                resource -= 5
                        elif (speed / maxlimted) > 1.015:
                            wait += 1
                            if resource >= 1:
                                resource -= 1

                elif speed < minlimted:
                    if wait >= 0:
                        if (speed / minlimted) < 0.6:
                            if wait > 500:
                                wait -= 500
                            else:
                                wait = 0
                            resource += 100
                        elif (speed / minlimted) < 0.85:
                            if wait > 200:
                                wait -= 200
                            else:
                                wait = 0
                            resource += 50
                        elif (speed / minlimted) < 0.90:
                            if wait > 100:
                                wait -= 100
                            else:
                                wait = 0
                            resource += 20
                        elif (speed / minlimted) < 0.95:
                            if wait > 50:
                                wait -= 50
                            else:
                                wait = 0
                            resource += 10
                        elif (speed / minlimted) < 0.97:
                            if wait > 10:
                                wait -= 10
                            else:
                                wait = 0
                            resource += 5
                        elif (speed / minlimted) < 0.998:
                            if wait >= 1:
                                wait -= 1
                            resource += 1
                    else:
                        wait = 0

                print('[%s] 发送消息流速 %s个/, 发送总数 %s个, 速率减速补偿-%.2f, 增速补偿%s, 队列余量%s' % (
                    time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())),
                    speed, total, wait * 0.01, resource, len(msglist),
                ))
                previous_time = curr_time

        await asyncio.sleep(0.001)

async def main():
    task_paras()
    print("执行限速区间, 每秒 %s - %s" % (minlimted, maxlimted))

    try:
        # 启动 Kafka 生产者
        producer = Producer(conf)

        # 运行异步任务
        await asyncio.gather(
            request_msg(),
            send_data(producer),
            statistics()
        )

        # 检查任务是否完成
        while not task_completed:
            await asyncio.sleep(0.1)

    finally:
        # 关闭 Kafka 生产者
        producer.flush()
        print("Kafka生产者已关闭。")

if __name__ == "__main__":
     asyncio.run(main())

针对代码的功能解析

1. 导入模块

import time
import asyncio
from concurrent.futures import ThreadPoolExecutor
from DataMock import KafkaMsgMock
from Percentages import Percentage
import random
from confluent_kafka import Producer
import json
import datetime

2. 初始化和配置

mocInstance = KafkaMsgMock()
percIns = Percentage()
task_list = []
total = 0
msglist = []
wait = 0
resource = 0
previous_time = int(time.time())
task_completed = False  # 添加任务完成标志
log_completed = False   # 添加日志打印标志

3. Kafka 配置

# 此处定义期望的发送速率
maxlimted = 1100
minlimted = 1000

# kafka连接信息
conf = {
    'bootstrap.servers': xxxxxxx',
    'security.protocol': 'xxxxxxx',
    'sasl.mechanisms': xxxxxxx',
    'sasl.username': 'xxxxxxx',
    'sasl.password': 'xxxxxxx'
}

producer = Producer(conf)

4. 任务参数生成

def task_paras():
    for m_Type in ['alarm', 'fault', 'serviceModeChange', 'movementData']:
        for contr_type in ['LCE', 'GCE', 'STEP', 'ESC', 'KSE', 'DTU']:
            count = percIns.calculate(controller=contr_type, msgType=m_Type, total=10000)
            if count > 0:
                task_list.append({"controller": contr_type, "msgType": m_Type, "count": count})

5. UTC 时间获取

async def utcTime():
    return datetime.datetime.utcnow().isoformat()[0:23] + 'Z'

utcTime:异步函数,返回当前的 UTC 时间戳。

6. 异步调用装饰器

def async_call(func):
    async def wrapper(*args, **kwargs):
        loop = asyncio.get_running_loop()
        return await loop.run_in_executor(None, func, *args, **kwargs)
    return wrapper

7. 消息生成函数

@async_call
def comon_msg(controller, msgType, count):
    return mocInstance.comon_msg(controller=controller, msgType=msgType, count=count)

@async_call
def kce_msg(msgType, count):
    return mocInstance.kce_msg(msgType=msgType, count=count)

@async_call
def kcecpuc_msg(msgType, count):
    return mocInstance.kcecpuc_msg(msgType=msgType, count=count)

8. 发送单条消息

async def send(msg, producer):
    global total, wait, task_completed, previous_time, log_completed
    try:
        topic, data = msg
        data['Param']['Timestamp'] = await utcTime()

        if wait > 0:
            await asyncio.sleep(wait * 0.001)
        producer.produce("gov-" + topic, json.dumps(data).encode('utf-8'))
        total += 1

        # 检查是否达到发送上限
        if not task_completed:
            if total >= 20000:  # 设置发送上限为20,000
                if not log_completed:
                    print(f"总共发送了{total}条消息,发送完毕。")
                    log_completed = True
                task_completed = True  # 设置任务完成标志

        # 更新 previous_time
        curr_time = int(time.time())
        if curr_time != previous_time and not task_completed:
            previous_time = curr_time

    except Exception as e:
        print(f"Error sending message '{msg}': {e}")

9. 批量发送消息

async def send_batched(msgs, producer):
    await asyncio.gather(*[send(msg, producer) for msg in msgs])
    producer.flush()

10. 请求消息

async def request_msg():
    global msglist, task_completed
    while not task_completed:
        if len(msglist) < 300000:
            for unit in task_list:
                if unit['controller'] == 'KCE':
                    msglist += await kce_msg(unit['msgType'], unit['count'])
                elif unit['controller'] == 'KCECPUC':
                    msglist += await kcecpuc_msg(unit['msgType'], unit['count'])
                else:
                    msglist += await comon_msg(unit['controller'], unit['msgType'], unit['count'])
            random.shuffle(msglist)
        await asyncio.sleep(0.01)

11. 统计和动态调整

async def statistics():
    global previous_time, task_completed
    start_time = int(time.time())
    previous_time = start_time
    time.sleep(1)
    while not task_completed:
        global total, wait, msglist, resource

        if total != 0:
            curr_time = int(time.time())
            if (curr_time - start_time) == 0:
                continue

            if curr_time != previous_time:
                speed = int(total / (curr_time - start_time))

                if speed > maxlimted:
                    # 如果发送速度超过上限,增加延时
                    if wait <= 100000:
                        if (speed / maxlimted) > 1.5:
                            wait += 500
                            if resource > 100:
                                resource -= 100
                        elif (speed / maxlimted) > 1.15:
                            wait += 100
                            if resource > 20:
                                resource -= 20
                        # 省略部分逻辑
                elif speed < minlimted:
                    # 如果发送速度低于下限,减少延时
                    if wait >= 0:
                        if (speed / minlimted) < 0.6:
                            if wait > 500:
                                wait -= 500
                            else:
                                wait = 0
                            resource += 100
                        elif (speed / minlimted) < 0.85:
                            if wait > 200:
                                wait -= 200
                            else:
                                wait = 0
                            resource += 50
                        # 省略部分逻辑

                print('[%s] 发送消息流速 %s个/秒, 发送总数 %s个, 速率减速补偿-%.2f, 增速补偿%s, 队列余量%s' % (
                    time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())),
                    speed, total, wait * 0.01, resource, len(msglist),
                ))
                previous_time = curr_time

        await asyncio.sleep(0.001)

12. 主函数

async def main():
    task_paras()
    print("执行限速区间, 每秒 %s - %s" % (minlimted, maxlimted))

    try:
        # 启动 Kafka 生产者
        producer = Producer(conf)

        # 运行异步任务
        await asyncio.gather(
            request_msg(),
            send_data(producer),
            statistics()
        )

        # 检查任务是否完成
        while not task_completed:
            await asyncio.sleep(0.1)

    finally:
        # 关闭 Kafka 生产者
        producer.flush()
        print("Kafka生产者已关闭。")

13. 运行主函数

if __name__ == "__main__":
    asyncio.run(main())

总结


↙↙↙阅读原文可查看相关链接,并与作者交流