上游数据
- MSK Kafka
测试场景
- 基准测试:在当前云端台量 X 万的条件下,kafka 以 X 个/S 发送数据,其中 X 个命中发送给政府,探测系统各性能指标表现,验证延迟和丢包率是否满足。
- 压力测试:预估未来 X 年云端台量会达到 X 万台,kafka 以 X 个/S 发送数据,其中 X 个命中发送给政府,探测系统各性能指标表现,验证延迟和丢包率是否满足。
验证数据丢包的脚本代码
import time
import asyncio
from concurrent.futures import ThreadPoolExecutor
from DataMock import KafkaMsgMock
from Percentages import Percentage
import random
from confluent_kafka import Producer
import json
import datetime
mocInstance = KafkaMsgMock()
percIns = Percentage()
task_list = []
total = 0
msglist = []
wait = 0
resource = 0
previous_time = int(time.time())
task_completed = False # 添加任务完成标志
log_completed = False # 添加日志打印标志
# 此处定义期望的发送速率
maxlimted = 1000
minlimted = 990
# kafka连接信息
conf = {
'bootstrap.servers': xxxxxxx',
'security.protocol': xxxxxxx',
'sasl.mechanisms': 'xxxxxxx',
'sasl.username': xxxxxxx',
'sasl.password': 'xxxxxxx'
}
producer = Producer(conf)
# 创建默认任务因子
def task_paras():
for m_Type in ['alarm', 'fault', 'serviceModeChange', 'movementData']:
for contr_type in ['LCE', 'GCE', 'STEP', 'ESC', 'KSE', 'DTU']:
count = percIns.calculate(controller=contr_type, msgType=m_Type, total=10000)
if count > 0:
task_list.append({"controller": contr_type, "msgType": m_Type, "count": count})
async def utcTime():
return datetime.datetime.utcnow().isoformat()[0:23] + 'Z'
# 定义异步调用装饰器
def async_call(func):
async def wrapper(*args, **kwargs):
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, func, *args, **kwargs)
return wrapper
@async_call
def comon_msg(controller, msgType, count):
return mocInstance.comon_msg(controller=controller, msgType=msgType, count=count)
@async_call
def kce_msg(msgType, count):
return mocInstance.kce_msg(msgType=msgType, count=count)
@async_call
def kcecpuc_msg(msgType, count):
return mocInstance.kcecpuc_msg(msgType=msgType, count=count)
# 发送数据异步函数修正
async def send(msg, producer):
global total, wait, task_completed, previous_time, log_completed
try:
topic, data = msg
data['Param']['Timestamp'] = await utcTime()
if wait > 0:
await asyncio.sleep(wait * 0.001)
producer.produce("gov-" + topic, json.dumps(data).encode('utf-8'))
total += 1
# 检查是否达到发送上限
if not task_completed:
if total >= 20000: # 设置发送上限为20,000
if not log_completed:
print(f"总共发送了{total}条消息,发送完毕。")
log_completed = True
task_completed = True # 设置任务完成标志
# 更新 previous_time
curr_time = int(time.time())
if curr_time != previous_time and not task_completed:
previous_time = curr_time
except Exception as e:
print(f"Error sending message '{msg}': {e}")
# 发送数据批处理修正
async def send_batched(msgs, producer):
await asyncio.gather(*[send(msg, producer) for msg in msgs])
producer.flush()
async def request_msg():
global msglist, task_completed
while not task_completed:
if len(msglist) < 300000:
for unit in task_list:
if unit['controller'] == 'KCE':
msglist += await kce_msg(unit['msgType'], unit['count'])
elif unit['controller'] == 'KCECPUC':
msglist += await kcecpuc_msg(unit['msgType'], unit['count'])
else:
msglist += await comon_msg(unit['controller'], unit['msgType'], unit['count'])
random.shuffle(msglist)
await asyncio.sleep(0.01)
async def send_data(producer):
global msglist, resource, task_completed
while not task_completed:
if len(msglist) > 0:
if resource > 15000:
resource = 15000
if len(msglist) > (1000 + int(resource)):
msgs = msglist[:(1000 + int(resource))]
msglist = msglist[(1000 + int(resource)):]
else:
msgs = msglist
msglist = msglist[len(msglist):-1]
await send_batched(msgs, producer)
await asyncio.sleep(0.01)
# 统计函数修正
async def statistics():
global previous_time, task_completed
start_time = int(time.time())
previous_time = start_time
time.sleep(1)
while not task_completed:
global total, wait, msglist, resource
if total != 0:
curr_time = int(time.time())
if (curr_time - start_time) == 0:
continue
if curr_time != previous_time:
speed = int(total / (curr_time - start_time))
if speed > maxlimted:
if wait <= 100000:
if (speed / maxlimted) > 1.5:
wait += 500
if resource > 100:
resource -= 100
elif (speed / maxlimted) > 1.15:
wait += 100
if resource > 20:
resource -= 20
elif (speed / maxlimted) > 1.10:
wait += 50
if resource > 15:
resource -= 15
elif (speed / maxlimted) > 1.05:
wait += 20
if resource > 10:
resource -= 10
elif (speed / maxlimted) > 1.03:
wait += 10
if resource > 5:
resource -= 5
elif (speed / maxlimted) > 1.015:
wait += 1
if resource >= 1:
resource -= 1
elif speed < minlimted:
if wait >= 0:
if (speed / minlimted) < 0.6:
if wait > 500:
wait -= 500
else:
wait = 0
resource += 100
elif (speed / minlimted) < 0.85:
if wait > 200:
wait -= 200
else:
wait = 0
resource += 50
elif (speed / minlimted) < 0.90:
if wait > 100:
wait -= 100
else:
wait = 0
resource += 20
elif (speed / minlimted) < 0.95:
if wait > 50:
wait -= 50
else:
wait = 0
resource += 10
elif (speed / minlimted) < 0.97:
if wait > 10:
wait -= 10
else:
wait = 0
resource += 5
elif (speed / minlimted) < 0.998:
if wait >= 1:
wait -= 1
resource += 1
else:
wait = 0
print('[%s] 发送消息流速 %s个/秒, 发送总数 %s个, 速率减速补偿-%.2f, 增速补偿%s, 队列余量%s' % (
time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())),
speed, total, wait * 0.01, resource, len(msglist),
))
previous_time = curr_time
await asyncio.sleep(0.001)
async def main():
task_paras()
print("执行限速区间, 每秒 %s - %s" % (minlimted, maxlimted))
try:
# 启动 Kafka 生产者
producer = Producer(conf)
# 运行异步任务
await asyncio.gather(
request_msg(),
send_data(producer),
statistics()
)
# 检查任务是否完成
while not task_completed:
await asyncio.sleep(0.1)
finally:
# 关闭 Kafka 生产者
producer.flush()
print("Kafka生产者已关闭。")
if __name__ == "__main__":
asyncio.run(main())
针对代码的功能解析
1. 导入模块
import time
import asyncio
from concurrent.futures import ThreadPoolExecutor
from DataMock import KafkaMsgMock
from Percentages import Percentage
import random
from confluent_kafka import Producer
import json
import datetime
- time:用于处理时间相关操作,如获取当前时间戳。
- asyncio:Python 的异步编程库,允许程序在等待 I/O 操作时执行其他任务。
- ThreadPoolExecutor:用于在异步任务中运行阻塞型函数。
- KafkaMsgMock:自定义模块,用于生成模拟的 Kafka 消息。
- Percentage:自定义模块,用于计算消息的比例。
- random:用于随机打乱消息列表。
- confluent_kafka.Producer:Kafka 的 Python 客户端库,用于发送消息到 Kafka。
- json:用于将消息序列化为 JSON 格式。
- datetime:用于处理时间相关的操作。
2. 初始化和配置
mocInstance = KafkaMsgMock()
percIns = Percentage()
task_list = []
total = 0
msglist = []
wait = 0
resource = 0
previous_time = int(time.time())
task_completed = False # 添加任务完成标志
log_completed = False # 添加日志打印标志
- mocInstance:KafkaMsgMock 的实例,用于生成模拟消息。
- percIns:Percentage 的实例,用于计算消息比例。
- task_list:存储生成的任务参数。
- total:已发送的消息总数。
- msglist:存储待发送的消息队列。
- wait:发送消息时的延时(单位:毫秒)。
- resource:控制每次发送的消息数量。
- previous_time:记录上一次的时间戳,用于统计发送速度。
- task_completed 和 log_completed:标志任务是否完成和日志是否打印。
3. Kafka 配置
# 此处定义期望的发送速率
maxlimted = 1100
minlimted = 1000
# kafka连接信息
conf = {
'bootstrap.servers': xxxxxxx',
'security.protocol': 'xxxxxxx',
'sasl.mechanisms': xxxxxxx',
'sasl.username': 'xxxxxxx',
'sasl.password': 'xxxxxxx'
}
producer = Producer(conf)
- maxlimted 和 minlimted:定义消息发送的速率范围(每秒发送的消息数)。
- conf:Kafka 的连接配置,包括服务器地址、安全协议、认证信息等。
- producer:Kafka 生产者实例,用于发送消息到 Kafka。
4. 任务参数生成
def task_paras():
for m_Type in ['alarm', 'fault', 'serviceModeChange', 'movementData']:
for contr_type in ['LCE', 'GCE', 'STEP', 'ESC', 'KSE', 'DTU']:
count = percIns.calculate(controller=contr_type, msgType=m_Type, total=10000)
if count > 0:
task_list.append({"controller": contr_type, "msgType": m_Type, "count": count})
- task_paras:生成任务参数。
- 遍历消息类型和控制器类型,根据 Percentage 模块计算每种类型的消息数量。
- 如果消息数量大于 0,将任务参数添加到 task_list。
5. UTC 时间获取
async def utcTime():
return datetime.datetime.utcnow().isoformat()[0:23] + 'Z'
utcTime:异步函数,返回当前的 UTC 时间戳。
6. 异步调用装饰器
def async_call(func):
async def wrapper(*args, **kwargs):
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, func, *args, **kwargs)
return wrapper
- async_call:装饰器,用于将同步函数包装为异步函数。
- 在异步事件循环中运行同步函数。
7. 消息生成函数
@async_call
def comon_msg(controller, msgType, count):
return mocInstance.comon_msg(controller=controller, msgType=msgType, count=count)
@async_call
def kce_msg(msgType, count):
return mocInstance.kce_msg(msgType=msgType, count=count)
@async_call
def kcecpuc_msg(msgType, count):
return mocInstance.kcecpuc_msg(msgType=msgType, count=count)
- comon_msg:根据控制器类型和消息类型生成通用消息。
- kce_msg 和 kcecpuc_msg:根据消息类型生成特定控制器的消息。
8. 发送单条消息
async def send(msg, producer):
global total, wait, task_completed, previous_time, log_completed
try:
topic, data = msg
data['Param']['Timestamp'] = await utcTime()
if wait > 0:
await asyncio.sleep(wait * 0.001)
producer.produce("gov-" + topic, json.dumps(data).encode('utf-8'))
total += 1
# 检查是否达到发送上限
if not task_completed:
if total >= 20000: # 设置发送上限为20,000
if not log_completed:
print(f"总共发送了{total}条消息,发送完毕。")
log_completed = True
task_completed = True # 设置任务完成标志
# 更新 previous_time
curr_time = int(time.time())
if curr_time != previous_time and not task_completed:
previous_time = curr_time
except Exception as e:
print(f"Error sending message '{msg}': {e}")
- send:异步函数,发送单条消息到 Kafka。
- 在消息中添加当前的 UTC 时间戳。
- 根据 wait 变量引入发送延时。
- 更新消息总数 total,并检查是否达到发送上限。
9. 批量发送消息
async def send_batched(msgs, producer):
await asyncio.gather(*[send(msg, producer) for msg in msgs])
producer.flush()
- send_batched:异步函数,批量发送消息。
- 使用 asyncio.gather 并行发送多条消息。
- 调用 producer.flush() 确保消息发送完成。
10. 请求消息
async def request_msg():
global msglist, task_completed
while not task_completed:
if len(msglist) < 300000:
for unit in task_list:
if unit['controller'] == 'KCE':
msglist += await kce_msg(unit['msgType'], unit['count'])
elif unit['controller'] == 'KCECPUC':
msglist += await kcecpuc_msg(unit['msgType'], unit['count'])
else:
msglist += await comon_msg(unit['controller'], unit['msgType'], unit['count'])
random.shuffle(msglist)
await asyncio.sleep(0.01)
- request_msg:异步函数,生成模拟消息并添加到消息队列。
- 当消息队列中的消息数少于 300,000 条时,继续生成消息。
- 随机打乱消息列表。
11. 统计和动态调整
async def statistics():
global previous_time, task_completed
start_time = int(time.time())
previous_time = start_time
time.sleep(1)
while not task_completed:
global total, wait, msglist, resource
if total != 0:
curr_time = int(time.time())
if (curr_time - start_time) == 0:
continue
if curr_time != previous_time:
speed = int(total / (curr_time - start_time))
if speed > maxlimted:
# 如果发送速度超过上限,增加延时
if wait <= 100000:
if (speed / maxlimted) > 1.5:
wait += 500
if resource > 100:
resource -= 100
elif (speed / maxlimted) > 1.15:
wait += 100
if resource > 20:
resource -= 20
# 省略部分逻辑
elif speed < minlimted:
# 如果发送速度低于下限,减少延时
if wait >= 0:
if (speed / minlimted) < 0.6:
if wait > 500:
wait -= 500
else:
wait = 0
resource += 100
elif (speed / minlimted) < 0.85:
if wait > 200:
wait -= 200
else:
wait = 0
resource += 50
# 省略部分逻辑
print('[%s] 发送消息流速 %s个/秒, 发送总数 %s个, 速率减速补偿-%.2f, 增速补偿%s, 队列余量%s' % (
time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())),
speed, total, wait * 0.01, resource, len(msglist),
))
previous_time = curr_time
await asyncio.sleep(0.001)
- statistics:异步函数,统计消息发送速度。
- 根据发送速度动态调整消息发送的延时 wait 和资源限制 resource。
- 打印统计信息。
12. 主函数
async def main():
task_paras()
print("执行限速区间, 每秒 %s - %s" % (minlimted, maxlimted))
try:
# 启动 Kafka 生产者
producer = Producer(conf)
# 运行异步任务
await asyncio.gather(
request_msg(),
send_data(producer),
statistics()
)
# 检查任务是否完成
while not task_completed:
await asyncio.sleep(0.1)
finally:
# 关闭 Kafka 生产者
producer.flush()
print("Kafka生产者已关闭。")
- main:主函数,初始化任务列表。
- 启动 Kafka 生产者。
- 并行运行 request_msg, send_data, 和 statistics。
- 确保所有任务完成后关闭 Kafka 生产者。
13. 运行主函数
if __name__ == "__main__":
asyncio.run(main())
- 使用 asyncio.run 启动主异步函数 main。
总结
- 这段代码通过异步编程和 Kafka 生产者实现了模拟消息的生成和发送,并动态调整发送速率。
转载文章时务必注明原作者及原始链接,并注明「发表于 TesterHome 」,并不得对作品进行修改。
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
暂无回复。