背景
Gov 系统对工单数据的下发结果,由原来的发送到 IT-Kafka 中,更改为发送到 IOT-Kafka 中,需要针对上游服务异常可能导致的数据量堆积场景做性能压测,评估 Gov 系统是否能够及时将上游产生的数据消费处理完毕。
生产者脚本:模拟线上业务场景,向 IOT KAFKA 里面灌数据
# !/usr/bin/python
# -*- coding: utf-8 -*-
"""
@Author :XXX
@Contact : XXX
@File : PushMockKafka_mbm_message.py
@Create Time: 2025/5/15 16:44
@Description: 向CKFM QA环境的IOT KAFKA中灌入数据
"""
import sys
sys.stdout.reconfigure(encoding='utf-8') # 添加这一行
from confluent_kafka import Producer
import json
import argparse
import time
import random
import datetime
from copy import deepcopy
# Kafka 配置
kafka_config = {
'bootstrap.servers': 'XXXXXXXX',
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'SCRAM-SHA-512',
'sasl.username': 'XXXXXXXX',
'sasl.password': 'XXXXXXXX',
'batch.num.messages': 5000, # 每批最大消息数
'queue.buffering.max.messages': 100000, # 队列容量
'linger.ms': 50, # 等待批量形成时间
'delivery.report.only.error': True # 仅报告错误
}
# 创建 Kafka Producer
producer = Producer(kafka_config)
# 基础消息体内容
base_message_body = {XXXXXXXX}
# 动态生成符合规则的设备号
def generate_equipment_code():
# 保持前缀为4,随机生成剩余7位数字
return f"4{random.randint(0, 9999999):07d}"
# 根据设备号生成工单号OrderNum
def generate_order_num(equipment_code):
today = datetime.datetime.now().strftime("%Y%m%d")
return f"NMT{equipment_code}{today}N"
# 动态生成当前时间戳(毫秒级)
def generate_current_timestamp():
return int(datetime.datetime.now().timestamp() * 1000)
# 发送数据到 Kafka
def send_to_kafka(topic, data):
"""异步发送方法"""
data_str = json.dumps(data)
try:
producer.produce(
topic=topic,
value=data_str,
callback=delivery_report # 添加回调函数
)
# 每发送100条主动poll一次
if len(producer) % 50 == 0:
producer.poll(0)
print("The content of the real-time data sent is:", data_str)
except BufferError:
producer.poll(1)
producer.produce(topic, value=data_str)
def delivery_report(err, msg):
"""发送结果回调"""
if err is not None:
print(f'消息发送失败: {err}')
else:
print(f'消息成功发送到 {msg.topic()} [{msg.partition()}] at offset {msg.offset()}')
# 主循环,每秒发送指定数量的数据
def main():
# 解析命令行参数
parser = argparse.ArgumentParser(description='Kafka数据发送脚本')
parser.add_argument('--rate', type=int, default=7,
help='每秒发送消息数(默认:1)')
parser.add_argument('--duration', type=int, default=5,
help='运行时长(分钟,默认:30)')
args = parser.parse_args()
duration_seconds = args.duration * 60 # 转换为秒
start_time = time.time()
total = 0
print('[%s] 开始发送, 计划发送 %s 分钟, 每秒发送 %s 条消息' % (
time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())), args.duration, args.rate))
while time.time() - start_time < duration_seconds:
batch_start = time.time()
try:
# 批量发送(解除单线程循环)
for _ in range(args.rate):
# 使用 deepcopy 创建 base_message_body 的深度副本
message_body = deepcopy(base_message_body)
# 动态生成设备号
new_equipment_code = generate_equipment_code()
# 动态生成当前时间戳
current_timestamp = generate_current_timestamp()
message_body["GovOrderVo"]["CreationTime"] = current_timestamp
# 更新设备号和OrderNum
message_body["GovOrderEquipmentInfoVo"]["EquipmentID"] = new_equipment_code
message_body["GovOrderVo"]["OrderNum"] = generate_order_num(new_equipment_code)
# 更新 SapEquipmentList 中的 equipmentCode
for item in message_body["SapEquipmentList"]:
item["equipmentCode"] = new_equipment_code
send_to_kafka("ckfm_mbm_orders", message_body)
total += 1
print('[%s] 发送消息流速 %s个/秒, 发送总数 %s个' % (
time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())), args.rate, total,))
# 控制发送节奏
producer.poll(0.1) # 非阻塞poll
remaining_time = 1 - (time.time() - batch_start)
if remaining_time > 0:
time.sleep(remaining_time) # 更细粒度休眠
except KeyboardInterrupt:
break
end_time = time.time()
# 最终确保所有消息发送完成
print("\n正在清理消息队列...")
producer.flush(30) # 最多等待30秒
elapsed_time = end_time - start_time
average_rate = total / elapsed_time if elapsed_time > 0 else 0
print('[%s] 发送完毕, 总共发送了 %.2f 秒, 发送总数 %s 个, 平均每秒 %.2f 个' %
(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())), elapsed_time, total, average_rate))
if __name__ == '__main__':
main()
消费者脚本:模拟线上业务场景,从 IOT-Kafka 里面消费数据
# -*- coding: utf-8 -*-
# !/usr/bin/python
"""
@Author :XXX
@Contact : XXX
@File : ReceiveMockKafka_mbm_message.py
@Create Time: 2025/5/15 16:44
@Description: 从CKFM QA环境的IOT KAFKA中消费处理数据
"""
import sys
sys.stdout.reconfigure(encoding='utf-8') # 添加这一行
from kafka import KafkaConsumer
from kafka.errors import KafkaError
import logging
import json
import asyncio
from datetime import datetime
# 启用调试日志
logging.basicConfig(level=logging.INFO)
consumer_config = {
'bootstrap_servers': 'XXXXXXXX',
'auto_offset_reset': 'latest', # 从最新的消息开始消费
'enable_auto_commit': True, # 启用自动提交偏移量
'security_protocol': 'SASL_SSL',
'sasl_mechanism': 'SCRAM-SHA-512',
'sasl_plain_username': 'XXXXXXXX',
'sasl_plain_password': 'XXXXXXXX'
}
filtered_topics = ['ckfm_mbm_orders']
# output_file = 'kafka_messages.txt'
async def consume_messages():
consumer = KafkaConsumer(
*filtered_topics,
**consumer_config
)
count = 0
try:
while True:
msg = consumer.poll(timeout_ms=100)
if not msg:
await asyncio.sleep(0.1)
continue
for tp, records in msg.items():
for record in records:
value = record.value.decode('utf-8')
payload = json.loads(value)
# 提取 cityCode 字段
cityCode = payload.get("cityCode")
allowed_cityCode = ['XXXXXXXX']
# 提取 GovOrderVo 中的 creationTime 字段
creation_time_str = payload.get("GovOrderVo", {}).get("CreationTime")
if str(cityCode) in allowed_cityCode and creation_time_str:
# 计算消息延迟
message_time = datetime.fromtimestamp(creation_time_str / 1000)
current_time = datetime.now()
delay = (current_time - message_time).total_seconds()
formatted_message = f"{str(count + 1)} -- {record.topic} -- Delay: {delay}s -- {value}\n"
print(f'***********************************************{str(count + 1)} Received new message: {record.topic}, Delay: {delay}s')
count += 1
except Exception as e:
logging.error(f"Error occurred: {e}")
except KeyboardInterrupt:
print("Consumer stopped.")
finally:
consumer.close()
async def main():
await consume_messages()
if __name__ == "__main__":
asyncio.run(main())
优化生产者脚本,采用异步操作,提高写入速率
主要改进点
- 异步发送数据:定义了 async_send_to_kafka 协程函数,利用 asyncio 的事件循环和线程池执行器,将 send_to_kafka 函数以异步方式调用,从而实现并发发送消息到 Kafka,提高了发送效率。
- 异步主循环:main 函数是异步的,通过 asyncio.create_task 创建多个发送消息的任务,并将它们添加到任务列表中。利用 await asyncio.gather(*tasks) 等待所有任务完成,然后清空任务列表,实现了每秒发送指定数量的消息。
- 动态控制发送频率:在每秒的发送循环中,计算剩余时间并根据需要进行睡眠,以确保发送频率符合指定的每秒消息数。
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""
@Author : XXX
@Contact : XXX
@File : PushMockKafka_mbm_message.py
@Create Time: 2025/5/15 16:44
@Description: 向CKFM QA环境的IOT KAFKA造数据进去(不看运行时间,只看是否达到最大目标值,程序就自动停止)
"""
import sys
sys.stdout.reconfigure(encoding='utf-8') # 添加这一行
from confluent_kafka import Producer
import json
import argparse
import time
import random
import datetime
from copy import deepcopy
import asyncio
# Kafka 配置
kafka_config = {
'bootstrap.servers': 'XXXXXX',
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'SCRAM-SHA-512',
'sasl.username': 'XXXXXX',
'sasl.password': XXXXXX',
'batch.num.messages': 5000, # 每批最大消息数
'queue.buffering.max.messages': 100000, # 队列容量
'linger.ms': 50, # 等待批量形成时间
'delivery.report.only.error': True # 仅报告错误
}
# 创建 Kafka Producer
producer = Producer(kafka_config)
# 基础消息体内容
base_message_body = {XXXXXX}
# 动态生成符合规则的设备号
def generate_equipment_code():
# 保持前缀为4,随机生成剩余7位数字
return f"4{random.randint(0, 9999999):07d}"
# 根据设备号生成工单号OrderNum
def generate_order_num(equipment_code):
today = datetime.datetime.now().strftime("%Y%m%d")
return f"NMT{equipment_code}{today}N"
# 动态生成当前时间戳(毫秒级)
def generate_current_timestamp():
return int(datetime.datetime.now().timestamp() * 1000)
# 发送数据到 Kafka
def send_to_kafka(topic, data):
data_str = json.dumps(data)
try:
producer.produce(
topic=topic,
value=data_str,
callback=delivery_report # 添加回调函数
)
# 每发送100条主动poll一次
if len(producer) % 50 == 0:
producer.poll(0)
print("The content of the real-time data sent is:", data_str)
except BufferError:
producer.poll(1)
producer.produce(topic, value=data_str)
def delivery_report(err, msg):
"""发送结果回调"""
if err is not None:
print(f'消息发送失败: {err}')
else:
print(f'消息成功发送到 {msg.topic()} [{msg.partition()}] at offset {msg.offset()}')
async def async_send_to_kafka(topic, data):
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, send_to_kafka, topic, data)
# 主循环,每秒发送指定数量的数据
async def main():
# 解析命令行参数
parser = argparse.ArgumentParser(description='Kafka数据发送脚本')
parser.add_argument('--rate', type=int, default=30,
help='每秒发送消息数(默认:1)')
parser.add_argument('--duration', type=int, default=5,
help='运行时长(分钟,默认:30)')
args = parser.parse_args()
start_time = time.time()
total = 0
print('[%s] 开始发送, 必须发送达到9000条消息后停止' % time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
tasks = []
while total < 9000:
batch_start = time.time()
for _ in range(args.rate):
message_body = deepcopy(base_message_body)
new_equipment_code = generate_equipment_code()
current_timestamp = generate_current_timestamp()
message_body["GovOrderVo"]["CreationTime"] = current_timestamp
message_body["GovOrderEquipmentInfoVo"]["EquipmentID"] = new_equipment_code
message_body["GovOrderVo"]["OrderNum"] = generate_order_num(new_equipment_code)
for item in message_body["SapEquipmentList"]:
item["equipmentCode"] = new_equipment_code
task = asyncio.create_task(async_send_to_kafka("ckfm_mbm_orders", message_body))
tasks.append(task)
total += 1
await asyncio.gather(*tasks)
tasks.clear()
producer.poll(0.1) # 非阻塞poll
remaining_time = 1 - (time.time() - batch_start)
if remaining_time > 0:
await asyncio.sleep(remaining_time)
end_time = time.time()
print("\n正在清理消息队列...")
producer.flush(30)
elapsed_time = end_time - start_time
average_rate = total / elapsed_time if elapsed_time > 0 else 0
print('[%s] 发送完毕, 总共发送了 %.2f 秒, 发送总数 %s 个, 平均每秒 %.2f 个' %
(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())), elapsed_time, total, average_rate))
if __name__ == '__main__':
asyncio.run(main())
脚本执行结果
转载文章时务必注明原作者及原始链接,并注明「发表于 TesterHome 」,并不得对作品进行修改。
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
暂无回复。