零碎知识 模拟上游服务,推送消息到 Kafka 并进行 kafka 消费的脚本解析(丢包率)

大海 · 2025年02月13日 · 1739 次阅读

上游数据

  • MSK Kafka

测试场景

  • 基准测试:在当前云端台量 X 万的条件下,kafka 以 X 个/S 发送数据,其中 X 个命中发送给政府,探测系统各性能指标表现,验证延迟和丢包率是否满足。
  • 压力测试:预估未来 X 年云端台量会达到 X 万台,kafka 以 X 个/S 发送数据,其中 X 个命中发送给政府,探测系统各性能指标表现,验证延迟和丢包率是否满足。

验证数据丢包的脚本代码

import time
import asyncio
from concurrent.futures import ThreadPoolExecutor
from DataMock import KafkaMsgMock
from Percentages import Percentage
import random
from confluent_kafka import Producer
import json
import datetime

mocInstance = KafkaMsgMock()
percIns = Percentage()
task_list = []
total = 0
msglist = []
wait = 0
resource = 0
previous_time = int(time.time())
task_completed = False  # 添加任务完成标志
log_completed = False   # 添加日志打印标志

# 此处定义期望的发送速率
maxlimted = 1000
minlimted = 990

# kafka连接信息
conf = {
    'bootstrap.servers': xxxxxxx',
    'security.protocol': xxxxxxx',
    'sasl.mechanisms': 'xxxxxxx',
    'sasl.username': xxxxxxx',
    'sasl.password': 'xxxxxxx'
}

producer = Producer(conf)

# 创建默认任务因子
def task_paras():
    for m_Type in ['alarm', 'fault', 'serviceModeChange', 'movementData']:
        for contr_type in ['LCE', 'GCE', 'STEP', 'ESC', 'KSE', 'DTU']:
            count = percIns.calculate(controller=contr_type, msgType=m_Type, total=10000)
            if count > 0:
                task_list.append({"controller": contr_type, "msgType": m_Type, "count": count})

async def utcTime():
    return datetime.datetime.utcnow().isoformat()[0:23] + 'Z'

# 定义异步调用装饰器
def async_call(func):
    async def wrapper(*args, **kwargs):
        loop = asyncio.get_running_loop()
        return await loop.run_in_executor(None, func, *args, **kwargs)
    return wrapper

@async_call
def comon_msg(controller, msgType, count):
    return mocInstance.comon_msg(controller=controller, msgType=msgType, count=count)

@async_call
def kce_msg(msgType, count):
    return mocInstance.kce_msg(msgType=msgType, count=count)

@async_call
def kcecpuc_msg(msgType, count):
    return mocInstance.kcecpuc_msg(msgType=msgType, count=count)

# 发送数据异步函数修正
async def send(msg, producer):
    global total, wait, task_completed, previous_time, log_completed
    try:
        topic, data = msg
        data['Param']['Timestamp'] = await utcTime()

        if wait > 0:
            await asyncio.sleep(wait * 0.001)
        producer.produce("gov-" + topic, json.dumps(data).encode('utf-8'))
        total += 1

        # 检查是否达到发送上限
        if not task_completed:
            if total >= 20000:  # 设置发送上限为20,000
                if not log_completed:
                    print(f"总共发送了{total}条消息,发送完毕。")
                    log_completed = True
                task_completed = True  # 设置任务完成标志

        # 更新 previous_time
        curr_time = int(time.time())
        if curr_time != previous_time and not task_completed:
            previous_time = curr_time

    except Exception as e:
        print(f"Error sending message '{msg}': {e}")

# 发送数据批处理修正
async def send_batched(msgs, producer):
    await asyncio.gather(*[send(msg, producer) for msg in msgs])
    producer.flush()

async def request_msg():
    global msglist, task_completed
    while not task_completed:
        if len(msglist) < 300000:
            for unit in task_list:
                if unit['controller'] == 'KCE':
                    msglist += await kce_msg(unit['msgType'], unit['count'])
                elif unit['controller'] == 'KCECPUC':
                    msglist += await kcecpuc_msg(unit['msgType'], unit['count'])
                else:
                    msglist += await comon_msg(unit['controller'], unit['msgType'], unit['count'])
            random.shuffle(msglist)
        await asyncio.sleep(0.01)

async def send_data(producer):
    global msglist, resource, task_completed
    while not task_completed:
        if len(msglist) > 0:
            if resource > 15000:
                resource = 15000
            if len(msglist) > (1000 + int(resource)):
                msgs = msglist[:(1000 + int(resource))]
                msglist = msglist[(1000 + int(resource)):]
            else:
                msgs = msglist
                msglist = msglist[len(msglist):-1]

            await send_batched(msgs, producer)
        await asyncio.sleep(0.01)

# 统计函数修正
async def statistics():
    global previous_time, task_completed
    start_time = int(time.time())
    previous_time = start_time
    time.sleep(1)
    while not task_completed:
        global total, wait, msglist, resource

        if total != 0:
            curr_time = int(time.time())
            if (curr_time - start_time) == 0:
                continue

            if curr_time != previous_time:
                speed = int(total / (curr_time - start_time))

                if speed > maxlimted:
                    if wait <= 100000:
                        if (speed / maxlimted) > 1.5:
                            wait += 500
                            if resource > 100:
                                resource -= 100
                        elif (speed / maxlimted) > 1.15:
                            wait += 100
                            if resource > 20:
                                resource -= 20
                        elif (speed / maxlimted) > 1.10:
                            wait += 50
                            if resource > 15:
                                resource -= 15
                        elif (speed / maxlimted) > 1.05:
                            wait += 20
                            if resource > 10:
                                resource -= 10
                        elif (speed / maxlimted) > 1.03:
                            wait += 10
                            if resource > 5:
                                resource -= 5
                        elif (speed / maxlimted) > 1.015:
                            wait += 1
                            if resource >= 1:
                                resource -= 1

                elif speed < minlimted:
                    if wait >= 0:
                        if (speed / minlimted) < 0.6:
                            if wait > 500:
                                wait -= 500
                            else:
                                wait = 0
                            resource += 100
                        elif (speed / minlimted) < 0.85:
                            if wait > 200:
                                wait -= 200
                            else:
                                wait = 0
                            resource += 50
                        elif (speed / minlimted) < 0.90:
                            if wait > 100:
                                wait -= 100
                            else:
                                wait = 0
                            resource += 20
                        elif (speed / minlimted) < 0.95:
                            if wait > 50:
                                wait -= 50
                            else:
                                wait = 0
                            resource += 10
                        elif (speed / minlimted) < 0.97:
                            if wait > 10:
                                wait -= 10
                            else:
                                wait = 0
                            resource += 5
                        elif (speed / minlimted) < 0.998:
                            if wait >= 1:
                                wait -= 1
                            resource += 1
                    else:
                        wait = 0

                print('[%s] 发送消息流速 %s个/, 发送总数 %s个, 速率减速补偿-%.2f, 增速补偿%s, 队列余量%s' % (
                    time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())),
                    speed, total, wait * 0.01, resource, len(msglist),
                ))
                previous_time = curr_time

        await asyncio.sleep(0.001)

async def main():
    task_paras()
    print("执行限速区间, 每秒 %s - %s" % (minlimted, maxlimted))

    try:
        # 启动 Kafka 生产者
        producer = Producer(conf)

        # 运行异步任务
        await asyncio.gather(
            request_msg(),
            send_data(producer),
            statistics()
        )

        # 检查任务是否完成
        while not task_completed:
            await asyncio.sleep(0.1)

    finally:
        # 关闭 Kafka 生产者
        producer.flush()
        print("Kafka生产者已关闭。")

if __name__ == "__main__":
     asyncio.run(main())

针对代码的功能解析

1. 导入模块

import time
import asyncio
from concurrent.futures import ThreadPoolExecutor
from DataMock import KafkaMsgMock
from Percentages import Percentage
import random
from confluent_kafka import Producer
import json
import datetime
  • time:用于处理时间相关操作,如获取当前时间戳。
  • asyncio:Python 的异步编程库,允许程序在等待 I/O 操作时执行其他任务。
  • ThreadPoolExecutor:用于在异步任务中运行阻塞型函数。
  • KafkaMsgMock:自定义模块,用于生成模拟的 Kafka 消息。
  • Percentage:自定义模块,用于计算消息的比例。
  • random:用于随机打乱消息列表。
  • confluent_kafka.Producer:Kafka 的 Python 客户端库,用于发送消息到 Kafka。
  • json:用于将消息序列化为 JSON 格式。
  • datetime:用于处理时间相关的操作。

2. 初始化和配置

mocInstance = KafkaMsgMock()
percIns = Percentage()
task_list = []
total = 0
msglist = []
wait = 0
resource = 0
previous_time = int(time.time())
task_completed = False  # 添加任务完成标志
log_completed = False   # 添加日志打印标志
  • mocInstance:KafkaMsgMock 的实例,用于生成模拟消息。
  • percIns:Percentage 的实例,用于计算消息比例。
  • task_list:存储生成的任务参数。
  • total:已发送的消息总数。
  • msglist:存储待发送的消息队列。
  • wait:发送消息时的延时(单位:毫秒)。
  • resource:控制每次发送的消息数量。
  • previous_time:记录上一次的时间戳,用于统计发送速度。
  • task_completed 和 log_completed:标志任务是否完成和日志是否打印。

3. Kafka 配置

# 此处定义期望的发送速率
maxlimted = 1100
minlimted = 1000

# kafka连接信息
conf = {
    'bootstrap.servers': xxxxxxx',
    'security.protocol': 'xxxxxxx',
    'sasl.mechanisms': xxxxxxx',
    'sasl.username': 'xxxxxxx',
    'sasl.password': 'xxxxxxx'
}

producer = Producer(conf)
  • maxlimted 和 minlimted:定义消息发送的速率范围(每秒发送的消息数)。
  • conf:Kafka 的连接配置,包括服务器地址、安全协议、认证信息等。
  • producer:Kafka 生产者实例,用于发送消息到 Kafka。

4. 任务参数生成

def task_paras():
    for m_Type in ['alarm', 'fault', 'serviceModeChange', 'movementData']:
        for contr_type in ['LCE', 'GCE', 'STEP', 'ESC', 'KSE', 'DTU']:
            count = percIns.calculate(controller=contr_type, msgType=m_Type, total=10000)
            if count > 0:
                task_list.append({"controller": contr_type, "msgType": m_Type, "count": count})
  • task_paras:生成任务参数。
  • 遍历消息类型和控制器类型,根据 Percentage 模块计算每种类型的消息数量。
  • 如果消息数量大于 0,将任务参数添加到 task_list。

5. UTC 时间获取

async def utcTime():
    return datetime.datetime.utcnow().isoformat()[0:23] + 'Z'

utcTime:异步函数,返回当前的 UTC 时间戳。

6. 异步调用装饰器

def async_call(func):
    async def wrapper(*args, **kwargs):
        loop = asyncio.get_running_loop()
        return await loop.run_in_executor(None, func, *args, **kwargs)
    return wrapper
  • async_call:装饰器,用于将同步函数包装为异步函数。
  • 在异步事件循环中运行同步函数。

7. 消息生成函数

@async_call
def comon_msg(controller, msgType, count):
    return mocInstance.comon_msg(controller=controller, msgType=msgType, count=count)

@async_call
def kce_msg(msgType, count):
    return mocInstance.kce_msg(msgType=msgType, count=count)

@async_call
def kcecpuc_msg(msgType, count):
    return mocInstance.kcecpuc_msg(msgType=msgType, count=count)
  • comon_msg:根据控制器类型和消息类型生成通用消息。
  • kce_msg 和 kcecpuc_msg:根据消息类型生成特定控制器的消息。

8. 发送单条消息

async def send(msg, producer):
    global total, wait, task_completed, previous_time, log_completed
    try:
        topic, data = msg
        data['Param']['Timestamp'] = await utcTime()

        if wait > 0:
            await asyncio.sleep(wait * 0.001)
        producer.produce("gov-" + topic, json.dumps(data).encode('utf-8'))
        total += 1

        # 检查是否达到发送上限
        if not task_completed:
            if total >= 20000:  # 设置发送上限为20,000
                if not log_completed:
                    print(f"总共发送了{total}条消息,发送完毕。")
                    log_completed = True
                task_completed = True  # 设置任务完成标志

        # 更新 previous_time
        curr_time = int(time.time())
        if curr_time != previous_time and not task_completed:
            previous_time = curr_time

    except Exception as e:
        print(f"Error sending message '{msg}': {e}")
  • send:异步函数,发送单条消息到 Kafka。
  • 在消息中添加当前的 UTC 时间戳。
  • 根据 wait 变量引入发送延时。
  • 更新消息总数 total,并检查是否达到发送上限。

9. 批量发送消息

async def send_batched(msgs, producer):
    await asyncio.gather(*[send(msg, producer) for msg in msgs])
    producer.flush()
  • send_batched:异步函数,批量发送消息。
  • 使用 asyncio.gather 并行发送多条消息。
  • 调用 producer.flush() 确保消息发送完成。

10. 请求消息

async def request_msg():
    global msglist, task_completed
    while not task_completed:
        if len(msglist) < 300000:
            for unit in task_list:
                if unit['controller'] == 'KCE':
                    msglist += await kce_msg(unit['msgType'], unit['count'])
                elif unit['controller'] == 'KCECPUC':
                    msglist += await kcecpuc_msg(unit['msgType'], unit['count'])
                else:
                    msglist += await comon_msg(unit['controller'], unit['msgType'], unit['count'])
            random.shuffle(msglist)
        await asyncio.sleep(0.01)
  • request_msg:异步函数,生成模拟消息并添加到消息队列。
  • 当消息队列中的消息数少于 300,000 条时,继续生成消息。
  • 随机打乱消息列表。

11. 统计和动态调整

async def statistics():
    global previous_time, task_completed
    start_time = int(time.time())
    previous_time = start_time
    time.sleep(1)
    while not task_completed:
        global total, wait, msglist, resource

        if total != 0:
            curr_time = int(time.time())
            if (curr_time - start_time) == 0:
                continue

            if curr_time != previous_time:
                speed = int(total / (curr_time - start_time))

                if speed > maxlimted:
                    # 如果发送速度超过上限,增加延时
                    if wait <= 100000:
                        if (speed / maxlimted) > 1.5:
                            wait += 500
                            if resource > 100:
                                resource -= 100
                        elif (speed / maxlimted) > 1.15:
                            wait += 100
                            if resource > 20:
                                resource -= 20
                        # 省略部分逻辑
                elif speed < minlimted:
                    # 如果发送速度低于下限,减少延时
                    if wait >= 0:
                        if (speed / minlimted) < 0.6:
                            if wait > 500:
                                wait -= 500
                            else:
                                wait = 0
                            resource += 100
                        elif (speed / minlimted) < 0.85:
                            if wait > 200:
                                wait -= 200
                            else:
                                wait = 0
                            resource += 50
                        # 省略部分逻辑

                print('[%s] 发送消息流速 %s个/秒, 发送总数 %s个, 速率减速补偿-%.2f, 增速补偿%s, 队列余量%s' % (
                    time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())),
                    speed, total, wait * 0.01, resource, len(msglist),
                ))
                previous_time = curr_time

        await asyncio.sleep(0.001)
  • statistics:异步函数,统计消息发送速度。
  • 根据发送速度动态调整消息发送的延时 wait 和资源限制 resource。
  • 打印统计信息。

12. 主函数

async def main():
    task_paras()
    print("执行限速区间, 每秒 %s - %s" % (minlimted, maxlimted))

    try:
        # 启动 Kafka 生产者
        producer = Producer(conf)

        # 运行异步任务
        await asyncio.gather(
            request_msg(),
            send_data(producer),
            statistics()
        )

        # 检查任务是否完成
        while not task_completed:
            await asyncio.sleep(0.1)

    finally:
        # 关闭 Kafka 生产者
        producer.flush()
        print("Kafka生产者已关闭。")
  • main:主函数,初始化任务列表。
  • 启动 Kafka 生产者。
  • 并行运行 request_msg, send_data, 和 statistics。
  • 确保所有任务完成后关闭 Kafka 生产者。

13. 运行主函数

if __name__ == "__main__":
    asyncio.run(main())
  • 使用 asyncio.run 启动主异步函数 main。

总结

  • 这段代码通过异步编程和 Kafka 生产者实现了模拟消息的生成和发送,并动态调整发送速率。
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
暂无回复。
需要 登录 后方可回复, 如果你还没有账号请点击这里 注册