零碎知识 模拟线上业务场景,进行下游应用的性能压测

大海 · May 20, 2025 · 1974 hits

背景

Gov 系统对工单数据的下发结果,由原来的发送到 IT-Kafka 中,更改为发送到 IOT-Kafka 中,需要针对上游服务异常可能导致的数据量堆积场景做性能压测,评估 Gov 系统是否能够及时将上游产生的数据消费处理完毕。

生产者脚本:模拟线上业务场景,向 IOT KAFKA 里面灌数据

# !/usr/bin/python
# -*- coding: utf-8 -*-

"""
@Author  : XXX
@Contact : XXX
@File    : PushMockKafka_mbm_message.py
@Create Time:  2025/5/15 16:44
@Description:  向CKFM QA环境的IOT KAFKA中灌入数据
"""

import sys
sys.stdout.reconfigure(encoding='utf-8')  # 确保输出编码为utf-8,避免中文乱码

from confluent_kafka import Producer  # 导入Kafka生产者客户端
import json  # 用于处理JSON格式数据
import argparse  # 用于解析命令行参数
import time  # 用于时间相关操作
import random  # 用于生成随机数
import datetime  # 用于处理日期时间
from copy import deepcopy  # 用于深拷贝对象

# Kafka 配置参数
# 这些配置用于连接到Kafka集群并设置消息发送行为
kafka_config = {
    'bootstrap.servers': 'XXXXXXXX',  # Kafka集群地址
    'security.protocol': 'SASL_SSL',  # 安全协议,使用加密连接
    'sasl.mechanisms': 'SCRAM-SHA-512',  # SASL认证机制
    'sasl.username': 'XXXXXXXX',  # Kafka用户
    'sasl.password': 'XXXXXXXX',  # Kafka密码
    'batch.num.messages': 5000,  # 每批最大消息数,控制批量发送大小
    'queue.buffering.max.messages': 100000,  # 发送队列最大容量
    'linger.ms': 50,  # 等待批量形成时间,单位毫秒
    'delivery.report.only.error': True  # 仅报告发送错误,减少回调负担
}

# 创建 Kafka Producer实例
# 这个对象将用于向Kafka发送消息
producer = Producer(kafka_config)

# 基础消息体内容(示例结构,实际内容应根据业务需求定义)
# 这里使用占位符表示实际消息结构
base_message_body = {
    "GovOrderVo": {
        "CreationTime": 0,  # 将被动态替换为当前时间戳
        "OrderNum": ""  # 将被动态替换为生成的工单号
    },
    "GovOrderEquipmentInfoVo": {
        "EquipmentID": ""  # 将被动态替换为生成的设备号
    },
    "SapEquipmentList": [
        {"equipmentCode": ""}  # 将被动态替换为生成的设备号
    ]
}

# 动态生成符合规则的设备号
def generate_equipment_code():
    """
    生成格式为"4"开头,共11位的设备号
    前缀固定为4,后面7位为随机数字,保证唯一性
    """
    # 保持前缀为4,随机生成剩余7位数字
    return f"4{random.randint(0, 9999999):07d}"

# 根据设备号生成工单号OrderNum
def generate_order_num(equipment_code):
    """
    生成工单号,格式为"NMT{设备号}{当前日期}N"
    例如:NMT4000000120250515N
    """
    today = datetime.datetime.now().strftime("%Y%m%d")  # 获取当前日期
    return f"NMT{equipment_code}{today}N"

# 动态生成当前时间戳(毫秒级)
def generate_current_timestamp():
    """
    生成当前时间的毫秒级时间戳
    用于消息中的时间字段
    """
    return int(datetime.datetime.now().timestamp() * 1000)

# 发送数据到 Kafka
def send_to_kafka(topic, data):
    """
    将数据发送到指定Kafka主题
    参数:
        topic (str): Kafka主题名称
        data (dict): 要发送的消息数据
    """
    data_str = json.dumps(data)  # 将字典转换为JSON字符串
    try:
        producer.produce(
            topic=topic,
            value=data_str,
            callback=delivery_report  # 设置发送结果回调函数
        )
        # 每发送50条消息主动poll一次,触发回调
        if len(producer) % 50 == 0:
            producer.poll(0)
        print("The content of the real-time data sent is:", data_str)
    except BufferError:
        # 如果发送队列满,先poll清理队列再重试
        producer.poll(1)
        producer.produce(topic, value=data_str)

# 发送结果回调函数
def delivery_report(err, msg):
    """
    处理消息发送结果的回调函数
    参数:
        err (Exception): 发送错误(如果发生)
        msg (Message): 发送的消息对象
    """
    if err is not None:
        print(f'消息发送失败: {err}')  # 打印发送错误
    else:
        # 打印发送成功的消息信息
        print(f'消息成功发送到 {msg.topic()} [{msg.partition()}] at offset {msg.offset()}')

# 主函数,负责控制消息发送逻辑
def main():
    """
    主函数,控制整个消息发送流程
    解析命令行参数,循环发送消息直到达到时间限制
    """
    # 解析命令行参数
    parser = argparse.ArgumentParser(description='Kafka数据发送脚本')
    parser.add_argument('--rate', type=int, default=7,
                        help='每秒发送消息数(默认:7)')
    parser.add_argument('--duration', type=int, default=5,
                        help='运行时长(分钟,默认:5)')
    args = parser.parse_args()

    duration_seconds = args.duration * 60  # 将分钟转换为秒
    start_time = time.time()  # 记录开始时间
    total = 0  # 初始化已发送消息计数器

    # 打印开始信息
    print('[%s] 开始发送, 计划发送 %s 分钟, 每秒发送 %s 条消息' % 
          (time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())),
           args.duration, args.rate))

    # 主循环,持续发送消息直到达到时间限制
    while time.time() - start_time < duration_seconds:
       _start batch = time.time()  # 记录批次开始时间
        try:
            # 批量发送消息
            for _ in range(args.rate):
                # 深拷贝基础消息体,避免修改原始数据
                message_body = deepcopy(base_message_body)

                # 生成新的设备号
                new_equipment_code = generate_equipment_code()
                # 生成当前时间戳
                current_timestamp = generate_current_timestamp()
                # 更新消息中的时间字段
                message_body["GovOrderVo"]["CreationTime"] = current_timestamp

                # 更新设备ID
                message_body["GovOrderEquipmentInfoVo"]["EquipmentID"] = new_equipment_code
                # 更新工单号
                message_body["GovOrderVo"]["OrderNum"] = generate_order_num(new_equipment_code)

                # 更新设备列表中的设备代码
                for item in message_body["SapEquipmentList"]:
                    item["equipmentCode"] = new_equipment_code

                send_to_kafka("ckfm_mbm_orders", message_body)  # 发送消息
                total += 1  # 增加已发送计数

            # 打印发送统计信息
            print('[%s] 发送消息流速 %s个/秒, 发送总数 %s个' % 
                  (time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())),
                   args.rate, total))

            # 控制发送速率,确保每秒发送指定数量的消息
            producer.poll(0.1)  # 非阻塞poll,触发回调
            remaining_time = 1 - (time.time() - batch_start)
            if remaining_time > 0:
                time.sleep(remaining_time)  # 等待剩余时间

        except KeyboardInterrupt:  # 捕获键盘中断信号
            break

    # 所有消息发送完成后清理发送队列
    end_time = time.time()
    print("\n正在清理消息队列...")
    producer.flush(30)  # 等待最多30秒,确保所有消息发送完成

    # 计算并打印统计信息
    elapsed_time = end_time - start_time
    average_rate = total / elapsed_time if elapsed_time > 0 else 0

    print('[%s] 发送完毕, 总共发送了 %.2f 秒, 发送总数 %s 个, 平均每秒 %.2f 个' % 
          (time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())),
           elapsed_time, total, average_rate))

# 程序入口
if __name__ == '__main__':
    main()  # 启动主函数

消费者脚本:模拟线上业务场景,从 IOT-Kafka 里面消费数据

# -*- coding: utf-8 -*-
#!/usr/bin/python

"""
@Author  : XXX
@Contact : XXX
@File    : ReceiveMockKafka_mbm_message.py
@Create Time:  2025/5/15 16:44
@Description:  从CKFM QA环境的IOT KAFKA中消费处理数据
"""

import sys
sys.stdout.reconfigure(encoding='utf-8')  # 确保输出编码为utf-8,避免中文乱码

from kafka import KafkaConsumer  # 导入Kafka消费者客户端
from kafka.errors import KafkaError  # 导入Kafka错误处理模块
import logging  # 导入日志模块
import json  # 用于处理JSON格式数据
import asyncio  # 用于异步编程
from datetime import datetime  # 用于处理日期时间

# 启用调试日志,设置日志级别为INFO
logging.basicConfig(level=logging.INFO)

# Kafka 消费者配置
consumer_config = {
    'bootstrap_servers': 'XXXXXXXX',  # Kafka集群地址
    'auto_offset_reset': 'latest',  # 从最新的消息开始消费
    'enable_auto_commit': True,     # 启用自动提交偏移量,确保消费进度被记录
    'security_protocol': 'SASL_SSL',  # 安全协议,使用加密连接
    'sasl_mechanism': 'SCRAM-SHA-512',  # SASL认证机制
    'sasl_plain_username': 'XXXXXXXX',  # Kafka用户名
    'sasl_plain_password': 'XXXXXXXX'  # Kafka密码
}

# 定义要消费的主题列表
filtered_topics = ['ckfm_mbm_orders']

async def consume_messages():
    """
    异步消费Kafka消息
    负责从指定主题中拉取消息并进行处理
    """
    # 创建Kafka消费者实例
    consumer = KafkaConsumer(
        *filtered_topics,  # 消费指定主题的消息
        **consumer_config  # 使用上面定义的消费者配置
    )
    count = 0  # 初始化消息计数器

    try:
        while True:  # 无限循环,持续消费消息
            msg = consumer.poll(timeout_ms=100)  # 拉取消息,超时时间为100毫秒
            if not msg:
                await asyncio.sleep(0.1)  # 如果没有消息,短暂休眠
                continue

            # 遍历拉取到的消息
            for tp, records in msg.items():
                for record in records:
                    value = record.value.decode('utf-8')  # 将消息体解码为字符串
                    payload = json.loads(value)  # 将JSON字符串转换为字典

                    # 提取 cityCode 字段,用于消息过滤
                    cityCode = payload.get("cityCode")
                    allowed_cityCode = ['XXXXXXXX']  # 允许的城市代码列表

                    # 提取 GovOrderVo 中的 creationTime 字段,用于计算延迟
                    creation_time_str = payload.get("GovOrderVo", {}).get("CreationTime")

                    # 消息过滤和延迟计算
                    if str(cityCode) in allowed_cityCode and creation_time_str:
                        # 将时间戳转换为datetime对象
                        message_time = datetime.fromtimestamp(creation_time_str / 1000)
                        current_time = datetime.now()  # 获取当前时间
                        delay = (current_time - message_time).total_seconds()  # 计算延迟,单位秒

                        # 格式化消息输出
                        formatted_message = f"{str(count + 1)} -- {record.topic} -- Delay: {delay}s -- {value}\n"
                        print(f'***********************************************{str(count + 1)} Received new message: {record.topic}, Delay: {delay}s')

                        count += 1  # 增加消息计数

    except Exception as e:  # 捕获通用异常
        logging.error(f"Error occurred: {e}")  # 记录错误日志
    except KeyboardInterrupt:  # 捕获键盘中断信号
        print("Consumer stopped.")  # 打印停止信息
    finally:
        consumer.close()  # 确保消费者资源被正确释放

async def main():
    """
    主函数,启动消息消费流程
    """
    await consume_messages()  # 调用消息消费函数

if __name__ == "__main__":
    asyncio.run(main())  # 启动异步主函数

优化生产者脚本,采用异步操作,提高写入速率

主要改进点
  • 异步发送数据:定义了 async_send_to_kafka 协程函数,利用 asyncio 的事件循环和线程池执行器,将 send_to_kafka 函数以异步方式调用,从而实现并发发送消息到 Kafka,提高了发送效率。
  • 异步主循环:main 函数是异步的,通过 asyncio.create_task 创建多个发送消息的任务,并将它们添加到任务列表中。利用 await asyncio.gather(*tasks) 等待所有任务完成,然后清空任务列表,实现了每秒发送指定数量的消息。
  • 动态控制发送频率:在每秒的发送循环中,计算剩余时间并根据需要进行睡眠,以确保发送频率符合指定的每秒消息数。
#!/usr/bin/python
# -*- coding: utf-8 -*-

"""
@Author  : XXX
@Contact : XXX
@File    : PushMockKafka_mbm_message.py
@Create Time:  2025/5/15 16:44
@Description:  向CKFM QA环境的IOT KAFKA造数据进去(不看运行时间,只看是否达到最大目标值,程序就自动停止)
"""

import sys
sys.stdout.reconfigure(encoding='utf-8')  # 确保输出编码为utf-8,避免中文乱码

from confluent_kafka import Producer  # 导入Kafka生产者客户端
import json  # 用于处理JSON格式数据
import argparse  # 用于解析命令行参数
import time  # 用于时间相关操作
import random  # 用于生成随机数
import datetime  # 用于处理日期时间
from copy import deepcopy  # 用于深拷贝对象
import asyncio  # 用于异步编程

# Kafka 配置参数
# 这些配置用于连接到Kafka集群并设置消息发送行为
kafka_config = {
    'bootstrap.servers': 'XXXXXX',  # Kafka集群地址
    'security.protocol': 'SASL_SSL',  # 安全协议,使用加密连接
    'sasl.mechanisms': 'SCRAM-SHA-512',  # SASL认证机制
    'sasl.username': 'XXXXXX',  # Kafka用户
    'sasl.password': 'XXXXXX',  # Kafka密码
    'batch.num.messages': 5000,  # 每批最大消息数,控制批量发送大小
    'queue.buffering.max.messages': 100000,  # 发送队列最大容量
    'linger.ms': 50,  # 等待批量形成时间,单位毫秒
    'delivery.report.only.error': True  # 仅报告发送错误,减少回调负担
}

# 创建 Kafka Producer实例
# 这个对象将用于向Kafka发送消息
producer = Producer(kafka_config)

# 基础消息体内容(示例结构,实际内容应根据业务需求定义)
# 这里使用占位符表示实际消息结构
base_message_body = {
    "GovOrderVo": {
        "CreationTime": 0,  # 将被动态替换为当前时间戳
        "OrderNum": ""  # 将被动态替换为生成的工单号
    },
    "GovOrderEquipmentInfoVo": {
        "EquipmentID": ""  # 将被动态替换为生成的设备号
    },
    "SapEquipmentList": [
        {"equipmentCode": ""}  # 将被动态替换为生成的设备号
    ]
}

# 动态生成符合规则的设备号
def generate_equipment_code():
    """
    生成格式为"4"开头,共11位的设备号
    前缀固定为4,后面7位为随机数字,保证唯一性
    """
    # 保持前缀为4,随机生成剩余7位数字
    return f"4{random.randint(0, 9999999):07d}"

# 根据设备号生成工单号OrderNum
def generate_order_num(equipment_code):
    """
    生成工单号,格式为"NMT{设备号}{当前日期}N"
    例如:NMT4000000120250515N
    """
    today = datetime.datetime.now().strftime("%Y%m%d")  # 获取当前日期
    return f"NMT{equipment_code}{today}N"

# 动态生成当前时间戳(毫秒级)
def generate_current_timestamp():
    """
    生成当前时间的毫秒级时间戳
    用于消息中的时间字段
    """
    return int(datetime.datetime.now().timestamp() * 1000)

# 发送数据到 Kafka
def send_to_kafka(topic, data):
    """
    将数据发送到指定Kafka主题
    参数:
        topic (str): Kafka主题名称
        data (dict): 要发送的消息数据
    """
    data_str = json.dumps(data)  # 将字典转换为JSON字符串
    try:
        producer.produce(
            topic=topic,
            value=data_str,
            callback=delivery_report  # 设置发送结果回调函数
        )
        # 每发送50条消息主动poll一次,触发回调
        if len(producer) % 50 == 0:
            producer.poll(0)
        print("The content of the real-time data sent is:", data_str)
    except BufferError:
        # 如果发送队列满,先poll清理队列再重试
        producer.poll(1)
        producer.produce(topic, value=data_str)

# 发送结果回调函数
def delivery_report(err, msg):
    """
    处理消息发送结果的回调函数
    参数:
        err (Exception): 发送错误(如果发生)
        msg (Message): 发送的消息对象
    """
    if err is not None:
        print(f'消息发送失败: {err}')  # 打印发送错误
    else:
        # 打印发送成功的消息信息
        print(f'消息成功发送到 {msg.topic()} [{msg.partition()}] at offset {msg.offset()}')

# 异步发送数据到 Kafka
async def async_send_to_kafka(topic, data):
    """
    异步包装器,允许在异步环境中发送消息
    参数:
        topic (str): Kafka主题名称
        data (dict): 要发送的消息数据
    """
    loop = asyncio.get_event_loop()  # 获取当前事件循环
    await loop.run_in_executor(None, send_to_kafka, topic, data)  # 在线程池中执行同步发送函数

# 主循环,负责控制消息发送逻辑
async def main():
    """
    主函数,控制整个消息发送流程
    解析命令行参数,循环发送消息直到达到目标数量
    """
    # 解析命令行参数
    parser = argparse.ArgumentParser(description='Kafka数据发送脚本')
    parser.add_argument('--rate', type=int, default=30,
                        help='每秒发送消息数(默认:30)')
    parser.add_argument('--duration', type=int, default=5,
                        help='运行时长(分钟,默认:5)')
    args = parser.parse_args()

    start_time = time.time()  # 记录开始时间
    total = 0  # 初始化已发送消息计数器

    # 打印开始信息
    print('[%s] 开始发送, 必须发送达到9000条消息后停止' % 
          time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))

    tasks = []  # 用于存储异步任务的列表

    # 主循环,持续发送消息直到达到9000条
    while total < 9000:
        batch_start = time.time()  # 记录批次开始时间
        for _ in range(args.rate):
            # 深拷贝基础消息体,避免修改原始数据
            message_body = deepcopy(base_message_body)

            # 生成新的设备号
            new_equipment_code = generate_equipment_code()
            # 生成当前时间戳
            current_timestamp = generate_current_timestamp()
            # 更新消息中的时间字段
            message_body["GovOrderVo"]["CreationTime"] = current_timestamp

            # 更新设备ID
            message_body["GovOrderEquipmentInfoVo"]["EquipmentID"] = new_equipment_code
            # 更新工单号
            message_body["GovOrderVo"]["OrderNum"] = generate_order_num(new_equipment_code)

            # 更新设备列表中的设备代码
            for item in message_body["SapEquipmentList"]:
                item["equipmentCode"] = new_equipment_code

            # 创建异步发送任务
            task = asyncio.create_task(async_send_to_kafka("ckfm_mbm_orders", message_body))
            tasks.append(task)
            total += 1  # 增加已发送计数

        # 等待当前批次所有任务完成
        await asyncio.gather(*tasks)
        tasks.clear()  # 清空任务列表

        # 检查是否需要清理发送队列
        producer.poll(0.1)  # 非阻塞poll,触发回调

        # 控制发送速率,确保每秒发送指定数量的消息
        remaining_time = 1 - (time.time() - batch_start)
        if remaining_time > 0:
            await asyncio.sleep(remaining_time)  # 等待剩余时间

    # 所有消息发送完成后清理发送队列
    end_time = time.time()
    print("\n正在清理消息队列...")
    producer.flush(30)  # 等待最多30秒,确保所有消息发送完成

    # 计算并打印统计信息
    elapsed_time = end_time - start_time
    average_rate = total / elapsed_time if elapsed_time > 0 else 0

    print('[%s] 发送完毕, 总共耗时 %.2f 秒, 发送总数 %s 条, 平均每秒 %.2f 条' % 
          (time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())),
           elapsed_time, total, average_rate))

# 程序入口
if __name__ == '__main__':
    # 启动异步主函数
    asyncio.run(main())

脚本执行结果

如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
No Reply at the moment.
需要 Sign In 后方可回复, 如果你还没有账号请点击这里 Sign Up