零碎知识 模拟线上业务场景,进行下游应用的性能压测

大海 · 2025年05月20日 · 684 次阅读

背景

Gov 系统对工单数据的下发结果,由原来的发送到 IT-Kafka 中,更改为发送到 IOT-Kafka 中,需要针对上游服务异常可能导致的数据量堆积场景做性能压测,评估 Gov 系统是否能够及时将上游产生的数据消费处理完毕。

生产者脚本:模拟线上业务场景,向 IOT KAFKA 里面灌数据

# !/usr/bin/python
# -*- coding: utf-8 -*-

"""
@Author  :XXX
@Contact : XXX
@File    : PushMockKafka_mbm_message.py
@Create Time:  2025/5/15 16:44
@Description:  向CKFM QA环境的IOT KAFKA中灌入数据
"""

import sys
sys.stdout.reconfigure(encoding='utf-8')  # 添加这一行

from confluent_kafka import Producer
import json
import argparse
import time
import random
import datetime
from copy import deepcopy

# Kafka 配置
kafka_config = {
    'bootstrap.servers': 'XXXXXXXX',
    'security.protocol': 'SASL_SSL',
    'sasl.mechanisms': 'SCRAM-SHA-512',
    'sasl.username': 'XXXXXXXX',
    'sasl.password': 'XXXXXXXX',
    'batch.num.messages': 5000,  # 每批最大消息数
    'queue.buffering.max.messages': 100000,  # 队列容量
    'linger.ms': 50,  # 等待批量形成时间
    'delivery.report.only.error': True  # 仅报告错误
}

# 创建 Kafka Producer
producer = Producer(kafka_config)

# 基础消息体内容
base_message_body = {XXXXXXXX}

# 动态生成符合规则的设备号
def generate_equipment_code():
    # 保持前缀为4,随机生成剩余7位数字
    return f"4{random.randint(0, 9999999):07d}"


# 根据设备号生成工单号OrderNum
def generate_order_num(equipment_code):
    today = datetime.datetime.now().strftime("%Y%m%d")
    return f"NMT{equipment_code}{today}N"

# 动态生成当前时间戳(毫秒级)
def generate_current_timestamp():
    return int(datetime.datetime.now().timestamp() * 1000)

# 发送数据到 Kafka
def send_to_kafka(topic, data):
    """异步发送方法"""
    data_str = json.dumps(data)
    try:
        producer.produce(
            topic=topic,
            value=data_str,
            callback=delivery_report  # 添加回调函数
        )
        # 每发送100条主动poll一次
        if len(producer) % 50 == 0:
            producer.poll(0)
        print("The content of the real-time data sent is:", data_str)
    except BufferError:
        producer.poll(1)
        producer.produce(topic, value=data_str)


def delivery_report(err, msg):
    """发送结果回调"""
    if err is not None:
        print(f'消息发送失败: {err}')
    else:
        print(f'消息成功发送到 {msg.topic()} [{msg.partition()}] at offset {msg.offset()}')


# 主循环,每秒发送指定数量的数据
def main():
    # 解析命令行参数
    parser = argparse.ArgumentParser(description='Kafka数据发送脚本')
    parser.add_argument('--rate', type=int, default=7,
                        help='每秒发送消息数(默认:1)')
    parser.add_argument('--duration', type=int, default=5,
                        help='运行时长(分钟,默认:30)')
    args = parser.parse_args()

    duration_seconds = args.duration * 60  # 转换为秒
    start_time = time.time()
    total = 0

    print('[%s] 开始发送, 计划发送 %s 分钟, 每秒发送 %s 条消息' % (
    time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())), args.duration, args.rate))

    while time.time() - start_time < duration_seconds:
        batch_start = time.time()
        try:
            # 批量发送(解除单线程循环)
            for _ in range(args.rate):
                # 使用 deepcopy 创建 base_message_body 的深度副本
                message_body = deepcopy(base_message_body)

                # 动态生成设备号
                new_equipment_code = generate_equipment_code()

                # 动态生成当前时间戳
                current_timestamp = generate_current_timestamp()
                message_body["GovOrderVo"]["CreationTime"] = current_timestamp

                # 更新设备号和OrderNum
                message_body["GovOrderEquipmentInfoVo"]["EquipmentID"] = new_equipment_code
                message_body["GovOrderVo"]["OrderNum"] = generate_order_num(new_equipment_code)

                # 更新 SapEquipmentList 中的 equipmentCode
                for item in message_body["SapEquipmentList"]:
                    item["equipmentCode"] = new_equipment_code

                send_to_kafka("ckfm_mbm_orders", message_body)
                total += 1
            print('[%s] 发送消息流速 %s个/秒, 发送总数 %s个' % (
            time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())), args.rate, total,))

            # 控制发送节奏
            producer.poll(0.1)  # 非阻塞poll
            remaining_time = 1 - (time.time() - batch_start)
            if remaining_time > 0:
                time.sleep(remaining_time)  # 更细粒度休眠
        except KeyboardInterrupt:
            break

    end_time = time.time()
    # 最终确保所有消息发送完成
    print("\n正在清理消息队列...")
    producer.flush(30)  # 最多等待30秒

    elapsed_time = end_time - start_time
    average_rate = total / elapsed_time if elapsed_time > 0 else 0

    print('[%s] 发送完毕, 总共发送了 %.2f 秒, 发送总数 %s 个, 平均每秒 %.2f 个' %
          (time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())), elapsed_time, total, average_rate))


if __name__ == '__main__':
    main()

消费者脚本:模拟线上业务场景,从 IOT-Kafka 里面消费数据

# -*- coding: utf-8 -*-
# !/usr/bin/python

"""
@Author  :XXX
@Contact : XXX
@File    : ReceiveMockKafka_mbm_message.py
@Create Time:  2025/5/15 16:44
@Description:  从CKFM QA环境的IOT KAFKA中消费处理数据
"""

import sys
sys.stdout.reconfigure(encoding='utf-8')  # 添加这一行


from kafka import KafkaConsumer
from kafka.errors import KafkaError
import logging
import json
import asyncio
from datetime import datetime

# 启用调试日志
logging.basicConfig(level=logging.INFO)

consumer_config = {
    'bootstrap_servers': 'XXXXXXXX',
    'auto_offset_reset': 'latest',  # 从最新的消息开始消费
    'enable_auto_commit': True,    # 启用自动提交偏移量
    'security_protocol': 'SASL_SSL',
    'sasl_mechanism': 'SCRAM-SHA-512',
    'sasl_plain_username': 'XXXXXXXX',
    'sasl_plain_password': 'XXXXXXXX'
}

filtered_topics = ['ckfm_mbm_orders']
# output_file = 'kafka_messages.txt'

async def consume_messages():
    consumer = KafkaConsumer(
        *filtered_topics,
        **consumer_config
    )
    count = 0

    try:
        while True:
            msg = consumer.poll(timeout_ms=100)
            if not msg:
                await asyncio.sleep(0.1)
                continue

            for tp, records in msg.items():
                for record in records:
                    value = record.value.decode('utf-8')
                    payload = json.loads(value)

                    # 提取 cityCode 字段
                    cityCode = payload.get("cityCode")
                    allowed_cityCode = ['XXXXXXXX']

                    # 提取 GovOrderVo 中的 creationTime 字段
                    creation_time_str = payload.get("GovOrderVo", {}).get("CreationTime")

                    if str(cityCode) in allowed_cityCode and creation_time_str:
                        # 计算消息延迟
                        message_time = datetime.fromtimestamp(creation_time_str / 1000)
                        current_time = datetime.now()
                        delay = (current_time - message_time).total_seconds()

                        formatted_message = f"{str(count + 1)} -- {record.topic} -- Delay: {delay}s -- {value}\n"
                        print(f'***********************************************{str(count + 1)} Received new message: {record.topic}, Delay: {delay}s')

                        count += 1


    except Exception as e:
        logging.error(f"Error occurred: {e}")
    except KeyboardInterrupt:
        print("Consumer stopped.")
    finally:
        consumer.close()

async def main():
    await consume_messages()

if __name__ == "__main__":
    asyncio.run(main())

优化生产者脚本,采用异步操作,提高写入速率

主要改进点
  • 异步发送数据:定义了 async_send_to_kafka 协程函数,利用 asyncio 的事件循环和线程池执行器,将 send_to_kafka 函数以异步方式调用,从而实现并发发送消息到 Kafka,提高了发送效率。
  • 异步主循环:main 函数是异步的,通过 asyncio.create_task 创建多个发送消息的任务,并将它们添加到任务列表中。利用 await asyncio.gather(*tasks) 等待所有任务完成,然后清空任务列表,实现了每秒发送指定数量的消息。
  • 动态控制发送频率:在每秒的发送循环中,计算剩余时间并根据需要进行睡眠,以确保发送频率符合指定的每秒消息数。
#!/usr/bin/python
# -*- coding: utf-8 -*-

"""
@Author  : XXX
@Contact : XXX
@File    : PushMockKafka_mbm_message.py
@Create Time:  2025/5/15 16:44
@Description:  向CKFM QA环境的IOT KAFKA造数据进去(不看运行时间,只看是否达到最大目标值,程序就自动停止)
"""

import sys
sys.stdout.reconfigure(encoding='utf-8')  # 添加这一行

from confluent_kafka import Producer
import json
import argparse
import time
import random
import datetime
from copy import deepcopy
import asyncio

# Kafka 配置
kafka_config = {
    'bootstrap.servers': 'XXXXXX',
    'security.protocol': 'SASL_SSL',
    'sasl.mechanisms': 'SCRAM-SHA-512',
    'sasl.username': 'XXXXXX',
    'sasl.password': XXXXXX',
    'batch.num.messages': 5000,  # 每批最大消息数
    'queue.buffering.max.messages': 100000,  # 队列容量
    'linger.ms': 50,  # 等待批量形成时间
    'delivery.report.only.error': True  # 仅报告错误
}

# 创建 Kafka Producer
producer = Producer(kafka_config)

# 基础消息体内容
base_message_body = {XXXXXX}

# 动态生成符合规则的设备号
def generate_equipment_code():
    # 保持前缀为4,随机生成剩余7位数字
    return f"4{random.randint(0, 9999999):07d}"

# 根据设备号生成工单号OrderNum
def generate_order_num(equipment_code):
    today = datetime.datetime.now().strftime("%Y%m%d")
    return f"NMT{equipment_code}{today}N"

# 动态生成当前时间戳(毫秒级)
def generate_current_timestamp():
    return int(datetime.datetime.now().timestamp() * 1000)

# 发送数据到 Kafka
def send_to_kafka(topic, data):
    data_str = json.dumps(data)
    try:
        producer.produce(
            topic=topic,
            value=data_str,
            callback=delivery_report  # 添加回调函数
        )
        # 每发送100条主动poll一次
        if len(producer) % 50 == 0:
            producer.poll(0)
        print("The content of the real-time data sent is:", data_str)
    except BufferError:
        producer.poll(1)
        producer.produce(topic, value=data_str)

def delivery_report(err, msg):
    """发送结果回调"""
    if err is not None:
        print(f'消息发送失败: {err}')
    else:
        print(f'消息成功发送到 {msg.topic()} [{msg.partition()}] at offset {msg.offset()}')

async def async_send_to_kafka(topic, data):
    loop = asyncio.get_event_loop()
    await loop.run_in_executor(None, send_to_kafka, topic, data)

# 主循环,每秒发送指定数量的数据
async def main():
    # 解析命令行参数
    parser = argparse.ArgumentParser(description='Kafka数据发送脚本')
    parser.add_argument('--rate', type=int, default=30,
                        help='每秒发送消息数默认1')
    parser.add_argument('--duration', type=int, default=5,
                        help='运行时长分钟默认30')
    args = parser.parse_args()

    start_time = time.time()
    total = 0

    print('[%s] 开始发送, 必须发送达到9000条消息后停止' % time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))

    tasks = []
    while total < 9000:
        batch_start = time.time()
        for _ in range(args.rate):
            message_body = deepcopy(base_message_body)

            new_equipment_code = generate_equipment_code()
            current_timestamp = generate_current_timestamp()
            message_body["GovOrderVo"]["CreationTime"] = current_timestamp

            message_body["GovOrderEquipmentInfoVo"]["EquipmentID"] = new_equipment_code
            message_body["GovOrderVo"]["OrderNum"] = generate_order_num(new_equipment_code)

            for item in message_body["SapEquipmentList"]:
                item["equipmentCode"] = new_equipment_code

            task = asyncio.create_task(async_send_to_kafka("ckfm_mbm_orders", message_body))
            tasks.append(task)
            total += 1

        await asyncio.gather(*tasks)
        tasks.clear()

        producer.poll(0.1)  # 非阻塞poll
        remaining_time = 1 - (time.time() - batch_start)
        if remaining_time > 0:
            await asyncio.sleep(remaining_time)

    end_time = time.time()
    print("\n正在清理消息队列...")
    producer.flush(30)

    elapsed_time = end_time - start_time
    average_rate = total / elapsed_time if elapsed_time > 0 else 0

    print('[%s] 发送完毕, 总共发送了 %.2f , 发送总数 %s , 平均每秒 %.2f ' %
          (time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())), elapsed_time, total, average_rate))



if __name__ == '__main__':
    asyncio.run(main())

脚本执行结果

如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
暫無回覆。
需要 登录 後方可回應,如果你還沒有帳號按這裡 注册