背景
Gov 系统对工单数据的下发结果,由原来的发送到 IT-Kafka 中,更改为发送到 IOT-Kafka 中,需要针对上游服务异常可能导致的数据量堆积场景做性能压测,评估 Gov 系统是否能够及时将上游产生的数据消费处理完毕。
生产者脚本:模拟线上业务场景,向 IOT KAFKA 里面灌数据
# !/usr/bin/python
# -*- coding: utf-8 -*-
"""
@Author : XXX
@Contact : XXX
@File : PushMockKafka_mbm_message.py
@Create Time: 2025/5/15 16:44
@Description: 向CKFM QA环境的IOT KAFKA中灌入数据
"""
import sys
sys.stdout.reconfigure(encoding='utf-8') # 确保输出编码为utf-8,避免中文乱码
from confluent_kafka import Producer # 导入Kafka生产者客户端
import json # 用于处理JSON格式数据
import argparse # 用于解析命令行参数
import time # 用于时间相关操作
import random # 用于生成随机数
import datetime # 用于处理日期时间
from copy import deepcopy # 用于深拷贝对象
# Kafka 配置参数
# 这些配置用于连接到Kafka集群并设置消息发送行为
kafka_config = {
'bootstrap.servers': 'XXXXXXXX', # Kafka集群地址
'security.protocol': 'SASL_SSL', # 安全协议,使用加密连接
'sasl.mechanisms': 'SCRAM-SHA-512', # SASL认证机制
'sasl.username': 'XXXXXXXX', # Kafka用户
'sasl.password': 'XXXXXXXX', # Kafka密码
'batch.num.messages': 5000, # 每批最大消息数,控制批量发送大小
'queue.buffering.max.messages': 100000, # 发送队列最大容量
'linger.ms': 50, # 等待批量形成时间,单位毫秒
'delivery.report.only.error': True # 仅报告发送错误,减少回调负担
}
# 创建 Kafka Producer实例
# 这个对象将用于向Kafka发送消息
producer = Producer(kafka_config)
# 基础消息体内容(示例结构,实际内容应根据业务需求定义)
# 这里使用占位符表示实际消息结构
base_message_body = {
"GovOrderVo": {
"CreationTime": 0, # 将被动态替换为当前时间戳
"OrderNum": "" # 将被动态替换为生成的工单号
},
"GovOrderEquipmentInfoVo": {
"EquipmentID": "" # 将被动态替换为生成的设备号
},
"SapEquipmentList": [
{"equipmentCode": ""} # 将被动态替换为生成的设备号
]
}
# 动态生成符合规则的设备号
def generate_equipment_code():
"""
生成格式为"4"开头,共11位的设备号
前缀固定为4,后面7位为随机数字,保证唯一性
"""
# 保持前缀为4,随机生成剩余7位数字
return f"4{random.randint(0, 9999999):07d}"
# 根据设备号生成工单号OrderNum
def generate_order_num(equipment_code):
"""
生成工单号,格式为"NMT{设备号}{当前日期}N"
例如:NMT4000000120250515N
"""
today = datetime.datetime.now().strftime("%Y%m%d") # 获取当前日期
return f"NMT{equipment_code}{today}N"
# 动态生成当前时间戳(毫秒级)
def generate_current_timestamp():
"""
生成当前时间的毫秒级时间戳
用于消息中的时间字段
"""
return int(datetime.datetime.now().timestamp() * 1000)
# 发送数据到 Kafka
def send_to_kafka(topic, data):
"""
将数据发送到指定Kafka主题
参数:
topic (str): Kafka主题名称
data (dict): 要发送的消息数据
"""
data_str = json.dumps(data) # 将字典转换为JSON字符串
try:
producer.produce(
topic=topic,
value=data_str,
callback=delivery_report # 设置发送结果回调函数
)
# 每发送50条消息主动poll一次,触发回调
if len(producer) % 50 == 0:
producer.poll(0)
print("The content of the real-time data sent is:", data_str)
except BufferError:
# 如果发送队列满,先poll清理队列再重试
producer.poll(1)
producer.produce(topic, value=data_str)
# 发送结果回调函数
def delivery_report(err, msg):
"""
处理消息发送结果的回调函数
参数:
err (Exception): 发送错误(如果发生)
msg (Message): 发送的消息对象
"""
if err is not None:
print(f'消息发送失败: {err}') # 打印发送错误
else:
# 打印发送成功的消息信息
print(f'消息成功发送到 {msg.topic()} [{msg.partition()}] at offset {msg.offset()}')
# 主函数,负责控制消息发送逻辑
def main():
"""
主函数,控制整个消息发送流程
解析命令行参数,循环发送消息直到达到时间限制
"""
# 解析命令行参数
parser = argparse.ArgumentParser(description='Kafka数据发送脚本')
parser.add_argument('--rate', type=int, default=7,
help='每秒发送消息数(默认:7)')
parser.add_argument('--duration', type=int, default=5,
help='运行时长(分钟,默认:5)')
args = parser.parse_args()
duration_seconds = args.duration * 60 # 将分钟转换为秒
start_time = time.time() # 记录开始时间
total = 0 # 初始化已发送消息计数器
# 打印开始信息
print('[%s] 开始发送, 计划发送 %s 分钟, 每秒发送 %s 条消息' %
(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())),
args.duration, args.rate))
# 主循环,持续发送消息直到达到时间限制
while time.time() - start_time < duration_seconds:
_start batch = time.time() # 记录批次开始时间
try:
# 批量发送消息
for _ in range(args.rate):
# 深拷贝基础消息体,避免修改原始数据
message_body = deepcopy(base_message_body)
# 生成新的设备号
new_equipment_code = generate_equipment_code()
# 生成当前时间戳
current_timestamp = generate_current_timestamp()
# 更新消息中的时间字段
message_body["GovOrderVo"]["CreationTime"] = current_timestamp
# 更新设备ID
message_body["GovOrderEquipmentInfoVo"]["EquipmentID"] = new_equipment_code
# 更新工单号
message_body["GovOrderVo"]["OrderNum"] = generate_order_num(new_equipment_code)
# 更新设备列表中的设备代码
for item in message_body["SapEquipmentList"]:
item["equipmentCode"] = new_equipment_code
send_to_kafka("ckfm_mbm_orders", message_body) # 发送消息
total += 1 # 增加已发送计数
# 打印发送统计信息
print('[%s] 发送消息流速 %s个/秒, 发送总数 %s个' %
(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())),
args.rate, total))
# 控制发送速率,确保每秒发送指定数量的消息
producer.poll(0.1) # 非阻塞poll,触发回调
remaining_time = 1 - (time.time() - batch_start)
if remaining_time > 0:
time.sleep(remaining_time) # 等待剩余时间
except KeyboardInterrupt: # 捕获键盘中断信号
break
# 所有消息发送完成后清理发送队列
end_time = time.time()
print("\n正在清理消息队列...")
producer.flush(30) # 等待最多30秒,确保所有消息发送完成
# 计算并打印统计信息
elapsed_time = end_time - start_time
average_rate = total / elapsed_time if elapsed_time > 0 else 0
print('[%s] 发送完毕, 总共发送了 %.2f 秒, 发送总数 %s 个, 平均每秒 %.2f 个' %
(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())),
elapsed_time, total, average_rate))
# 程序入口
if __name__ == '__main__':
main() # 启动主函数
消费者脚本:模拟线上业务场景,从 IOT-Kafka 里面消费数据
# -*- coding: utf-8 -*-
#!/usr/bin/python
"""
@Author : XXX
@Contact : XXX
@File : ReceiveMockKafka_mbm_message.py
@Create Time: 2025/5/15 16:44
@Description: 从CKFM QA环境的IOT KAFKA中消费处理数据
"""
import sys
sys.stdout.reconfigure(encoding='utf-8') # 确保输出编码为utf-8,避免中文乱码
from kafka import KafkaConsumer # 导入Kafka消费者客户端
from kafka.errors import KafkaError # 导入Kafka错误处理模块
import logging # 导入日志模块
import json # 用于处理JSON格式数据
import asyncio # 用于异步编程
from datetime import datetime # 用于处理日期时间
# 启用调试日志,设置日志级别为INFO
logging.basicConfig(level=logging.INFO)
# Kafka 消费者配置
consumer_config = {
'bootstrap_servers': 'XXXXXXXX', # Kafka集群地址
'auto_offset_reset': 'latest', # 从最新的消息开始消费
'enable_auto_commit': True, # 启用自动提交偏移量,确保消费进度被记录
'security_protocol': 'SASL_SSL', # 安全协议,使用加密连接
'sasl_mechanism': 'SCRAM-SHA-512', # SASL认证机制
'sasl_plain_username': 'XXXXXXXX', # Kafka用户名
'sasl_plain_password': 'XXXXXXXX' # Kafka密码
}
# 定义要消费的主题列表
filtered_topics = ['ckfm_mbm_orders']
async def consume_messages():
"""
异步消费Kafka消息
负责从指定主题中拉取消息并进行处理
"""
# 创建Kafka消费者实例
consumer = KafkaConsumer(
*filtered_topics, # 消费指定主题的消息
**consumer_config # 使用上面定义的消费者配置
)
count = 0 # 初始化消息计数器
try:
while True: # 无限循环,持续消费消息
msg = consumer.poll(timeout_ms=100) # 拉取消息,超时时间为100毫秒
if not msg:
await asyncio.sleep(0.1) # 如果没有消息,短暂休眠
continue
# 遍历拉取到的消息
for tp, records in msg.items():
for record in records:
value = record.value.decode('utf-8') # 将消息体解码为字符串
payload = json.loads(value) # 将JSON字符串转换为字典
# 提取 cityCode 字段,用于消息过滤
cityCode = payload.get("cityCode")
allowed_cityCode = ['XXXXXXXX'] # 允许的城市代码列表
# 提取 GovOrderVo 中的 creationTime 字段,用于计算延迟
creation_time_str = payload.get("GovOrderVo", {}).get("CreationTime")
# 消息过滤和延迟计算
if str(cityCode) in allowed_cityCode and creation_time_str:
# 将时间戳转换为datetime对象
message_time = datetime.fromtimestamp(creation_time_str / 1000)
current_time = datetime.now() # 获取当前时间
delay = (current_time - message_time).total_seconds() # 计算延迟,单位秒
# 格式化消息输出
formatted_message = f"{str(count + 1)} -- {record.topic} -- Delay: {delay}s -- {value}\n"
print(f'***********************************************{str(count + 1)} Received new message: {record.topic}, Delay: {delay}s')
count += 1 # 增加消息计数
except Exception as e: # 捕获通用异常
logging.error(f"Error occurred: {e}") # 记录错误日志
except KeyboardInterrupt: # 捕获键盘中断信号
print("Consumer stopped.") # 打印停止信息
finally:
consumer.close() # 确保消费者资源被正确释放
async def main():
"""
主函数,启动消息消费流程
"""
await consume_messages() # 调用消息消费函数
if __name__ == "__main__":
asyncio.run(main()) # 启动异步主函数
优化生产者脚本,采用异步操作,提高写入速率
主要改进点
- 异步发送数据:定义了 async_send_to_kafka 协程函数,利用 asyncio 的事件循环和线程池执行器,将 send_to_kafka 函数以异步方式调用,从而实现并发发送消息到 Kafka,提高了发送效率。
- 异步主循环:main 函数是异步的,通过 asyncio.create_task 创建多个发送消息的任务,并将它们添加到任务列表中。利用 await asyncio.gather(*tasks) 等待所有任务完成,然后清空任务列表,实现了每秒发送指定数量的消息。
- 动态控制发送频率:在每秒的发送循环中,计算剩余时间并根据需要进行睡眠,以确保发送频率符合指定的每秒消息数。
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""
@Author : XXX
@Contact : XXX
@File : PushMockKafka_mbm_message.py
@Create Time: 2025/5/15 16:44
@Description: 向CKFM QA环境的IOT KAFKA造数据进去(不看运行时间,只看是否达到最大目标值,程序就自动停止)
"""
import sys
sys.stdout.reconfigure(encoding='utf-8') # 确保输出编码为utf-8,避免中文乱码
from confluent_kafka import Producer # 导入Kafka生产者客户端
import json # 用于处理JSON格式数据
import argparse # 用于解析命令行参数
import time # 用于时间相关操作
import random # 用于生成随机数
import datetime # 用于处理日期时间
from copy import deepcopy # 用于深拷贝对象
import asyncio # 用于异步编程
# Kafka 配置参数
# 这些配置用于连接到Kafka集群并设置消息发送行为
kafka_config = {
'bootstrap.servers': 'XXXXXX', # Kafka集群地址
'security.protocol': 'SASL_SSL', # 安全协议,使用加密连接
'sasl.mechanisms': 'SCRAM-SHA-512', # SASL认证机制
'sasl.username': 'XXXXXX', # Kafka用户
'sasl.password': 'XXXXXX', # Kafka密码
'batch.num.messages': 5000, # 每批最大消息数,控制批量发送大小
'queue.buffering.max.messages': 100000, # 发送队列最大容量
'linger.ms': 50, # 等待批量形成时间,单位毫秒
'delivery.report.only.error': True # 仅报告发送错误,减少回调负担
}
# 创建 Kafka Producer实例
# 这个对象将用于向Kafka发送消息
producer = Producer(kafka_config)
# 基础消息体内容(示例结构,实际内容应根据业务需求定义)
# 这里使用占位符表示实际消息结构
base_message_body = {
"GovOrderVo": {
"CreationTime": 0, # 将被动态替换为当前时间戳
"OrderNum": "" # 将被动态替换为生成的工单号
},
"GovOrderEquipmentInfoVo": {
"EquipmentID": "" # 将被动态替换为生成的设备号
},
"SapEquipmentList": [
{"equipmentCode": ""} # 将被动态替换为生成的设备号
]
}
# 动态生成符合规则的设备号
def generate_equipment_code():
"""
生成格式为"4"开头,共11位的设备号
前缀固定为4,后面7位为随机数字,保证唯一性
"""
# 保持前缀为4,随机生成剩余7位数字
return f"4{random.randint(0, 9999999):07d}"
# 根据设备号生成工单号OrderNum
def generate_order_num(equipment_code):
"""
生成工单号,格式为"NMT{设备号}{当前日期}N"
例如:NMT4000000120250515N
"""
today = datetime.datetime.now().strftime("%Y%m%d") # 获取当前日期
return f"NMT{equipment_code}{today}N"
# 动态生成当前时间戳(毫秒级)
def generate_current_timestamp():
"""
生成当前时间的毫秒级时间戳
用于消息中的时间字段
"""
return int(datetime.datetime.now().timestamp() * 1000)
# 发送数据到 Kafka
def send_to_kafka(topic, data):
"""
将数据发送到指定Kafka主题
参数:
topic (str): Kafka主题名称
data (dict): 要发送的消息数据
"""
data_str = json.dumps(data) # 将字典转换为JSON字符串
try:
producer.produce(
topic=topic,
value=data_str,
callback=delivery_report # 设置发送结果回调函数
)
# 每发送50条消息主动poll一次,触发回调
if len(producer) % 50 == 0:
producer.poll(0)
print("The content of the real-time data sent is:", data_str)
except BufferError:
# 如果发送队列满,先poll清理队列再重试
producer.poll(1)
producer.produce(topic, value=data_str)
# 发送结果回调函数
def delivery_report(err, msg):
"""
处理消息发送结果的回调函数
参数:
err (Exception): 发送错误(如果发生)
msg (Message): 发送的消息对象
"""
if err is not None:
print(f'消息发送失败: {err}') # 打印发送错误
else:
# 打印发送成功的消息信息
print(f'消息成功发送到 {msg.topic()} [{msg.partition()}] at offset {msg.offset()}')
# 异步发送数据到 Kafka
async def async_send_to_kafka(topic, data):
"""
异步包装器,允许在异步环境中发送消息
参数:
topic (str): Kafka主题名称
data (dict): 要发送的消息数据
"""
loop = asyncio.get_event_loop() # 获取当前事件循环
await loop.run_in_executor(None, send_to_kafka, topic, data) # 在线程池中执行同步发送函数
# 主循环,负责控制消息发送逻辑
async def main():
"""
主函数,控制整个消息发送流程
解析命令行参数,循环发送消息直到达到目标数量
"""
# 解析命令行参数
parser = argparse.ArgumentParser(description='Kafka数据发送脚本')
parser.add_argument('--rate', type=int, default=30,
help='每秒发送消息数(默认:30)')
parser.add_argument('--duration', type=int, default=5,
help='运行时长(分钟,默认:5)')
args = parser.parse_args()
start_time = time.time() # 记录开始时间
total = 0 # 初始化已发送消息计数器
# 打印开始信息
print('[%s] 开始发送, 必须发送达到9000条消息后停止' %
time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
tasks = [] # 用于存储异步任务的列表
# 主循环,持续发送消息直到达到9000条
while total < 9000:
batch_start = time.time() # 记录批次开始时间
for _ in range(args.rate):
# 深拷贝基础消息体,避免修改原始数据
message_body = deepcopy(base_message_body)
# 生成新的设备号
new_equipment_code = generate_equipment_code()
# 生成当前时间戳
current_timestamp = generate_current_timestamp()
# 更新消息中的时间字段
message_body["GovOrderVo"]["CreationTime"] = current_timestamp
# 更新设备ID
message_body["GovOrderEquipmentInfoVo"]["EquipmentID"] = new_equipment_code
# 更新工单号
message_body["GovOrderVo"]["OrderNum"] = generate_order_num(new_equipment_code)
# 更新设备列表中的设备代码
for item in message_body["SapEquipmentList"]:
item["equipmentCode"] = new_equipment_code
# 创建异步发送任务
task = asyncio.create_task(async_send_to_kafka("ckfm_mbm_orders", message_body))
tasks.append(task)
total += 1 # 增加已发送计数
# 等待当前批次所有任务完成
await asyncio.gather(*tasks)
tasks.clear() # 清空任务列表
# 检查是否需要清理发送队列
producer.poll(0.1) # 非阻塞poll,触发回调
# 控制发送速率,确保每秒发送指定数量的消息
remaining_time = 1 - (time.time() - batch_start)
if remaining_time > 0:
await asyncio.sleep(remaining_time) # 等待剩余时间
# 所有消息发送完成后清理发送队列
end_time = time.time()
print("\n正在清理消息队列...")
producer.flush(30) # 等待最多30秒,确保所有消息发送完成
# 计算并打印统计信息
elapsed_time = end_time - start_time
average_rate = total / elapsed_time if elapsed_time > 0 else 0
print('[%s] 发送完毕, 总共耗时 %.2f 秒, 发送总数 %s 条, 平均每秒 %.2f 条' %
(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())),
elapsed_time, total, average_rate))
# 程序入口
if __name__ == '__main__':
# 启动异步主函数
asyncio.run(main())
脚本执行结果
转载文章时务必注明原作者及原始链接,并注明「发表于 TesterHome 」,并不得对作品进行修改。
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
No Reply at the moment.