验证上游数据量不断加大的情况下,GOV API 的数据发送与接收,功能保持正常,接口不存在明显的错误率。
import time
import asyncio
from concurrent.futures import ThreadPoolExecutor
import time
from DataMock import KafkaMsgMock
from kafka import KafkaProducer
from Percentages import Percentage
import time, random, queue
from threading import Thread
import threading
import json
import ast
import datetime
from confluent_kafka import Producer
mocInstance = KafkaMsgMock()
percIns = Percentage()
task_list = []
global total
total = 0
global msglist
msglist = []
global wait
wait = 0
global resource
resource = 0
global previous_time
previous_time = int(time.time())
# 此处定义期望的发送速率
maxlimted = 2790
minlimted = 2760
# kafka连接信息
conf = {
'bootstrap.servers': 'XXXXX',
'security.protocol': 'XXXXX',
'sasl.mechanisms': 'XXXXX',
'sasl.username': 'XXXXX',
'sasl.password': 'XXXXX'
}
producer = Producer(conf)
# 创建默认任务因子
def task_paras():
#task_list.append({"controller": "LCE", "msgType": 'fault', "count": 3500})
#task_list.append({"controller": 'LCE', "msgType": 'alarm', "count": 1500})
#task_list.append({"controller": 'LCE', "msgType": 'serviceModeChange', "count": 5000})
for m_Type in ['alarm','fault','serviceModeChange','movementData']:
for contr_type in ['LCE','GCE','STEP','ESC','KSE','DTU']:
count = percIns.calculate(controller = contr_type, msgType = m_Type, total = 10000)
if count > 0:
task_list.append({"controller" : contr_type, "msgType" : m_Type, "count" : count})
# 定义异步调用装饰器
def async_call(func):
async def wrapper(*args, **kwargs):
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, func, *args, **kwargs)
return wrapper
async def utcTime():
'''
make utc time for timestemp
'''
return datetime.datetime.utcnow().isoformat()[0:23] + 'Z'
# 生成数据函数修正
@async_call
def comon_msg(controller, msgType, count):
return mocInstance.comon_msg(controller=controller, msgType=msgType, count=count)
@async_call
def kce_msg(msgType, count):
return mocInstance.kce_msg(msgType=msgType, count=count)
@async_call
def kcecpuc_msg(msgType, count):
return mocInstance.kcecpuc_msg(msgType=msgType, count=count)
# 发送数据异步函数修正
async def send(msg, producer):
global previous_time, total, wait
try:
topic, data = msg
data['Param']['Timestamp'] = await utcTime()
if wait > 0:
await asyncio.sleep(wait*0.001)
producer.produce("gov-"+ topic, json.dumps(data).encode('utf-8'))
total += 1
curr_time = int(time.time())
if curr_time != previous_time:
previous_time = curr_time
except Exception as e:
print(f"Error sending message '{msg}': {e}") # 添加异常日志
# 发送数据批处理修正
async def send_batched(msgs, producer):
await asyncio.gather(*[send(msg, producer) for msg in msgs])
producer.flush()
# 请求消息函数修正
async def request_msg():
global msglist
while True:
if len(msglist) < 300000:
for unit in task_list:
if unit['controller'] == 'KCE':
msglist += await kce_msg(unit['msgType'], unit['count'])
elif unit['controller'] == 'KCECPUC':
msglist += await kcecpuc_msg(unit['msgType'], unit['count'])
else:
msglist += await comon_msg(unit['controller'], unit['msgType'], unit['count'])
random.shuffle(msglist)
await asyncio.sleep(0.01)
# 发送数据逻辑修正
async def send_data(producer):
while True:
global msglist,resource
if len(msglist) > 0:
if resource > 15000: resource = 15000
if len(msglist) > (1000 + int(resource)):
msgs = msglist[:(1000 + int(resource))]
msglist = msglist[(1000 + int(resource)):]
else:
msgs = msglist
msglist = msglist[len(msglist):-1]
await send_batched(msgs, producer)
await asyncio.sleep(0.01)
# 统计函数修正
async def statistics():
start_time = ''
global previous_time
previous_time = int(time.time())
time.sleep(1)
while 1:
global total
global wait
global msglist
global resource
if total != 0:
curr_time =int(time.time())
if start_time == '':
start_time = curr_time
if (curr_time - start_time) == 0:
continue
if curr_time != previous_time:
speed = int( total / (curr_time - start_time))
if speed > maxlimted:
if wait <= 100000:
if (speed/maxlimted) > 1.5:
wait += 500
if resource > 100 : resource -= 100
elif (speed/maxlimted) > 1.15:
wait += 100
if resource > 20 : resource -= 20
elif (speed/maxlimted) > 1.10:
wait += 50
if resource > 15 : resource -= 15
elif (speed/maxlimted) > 1.05:
wait += 20
if resource > 10 : resource -= 10
elif (speed/maxlimted) > 1.03:
wait += 10
if resource > 5 : resource -= 5
elif (speed/maxlimted) > 1.015:
wait += 1
if resource >= 1 : resource -= 1
elif speed < minlimted:
if wait >= 0 :
if (speed/minlimted) < 0.6:
if wait> 500 :
wait -= 500
else:
wait = 0
resource += 100
elif (speed/minlimted) < 0.85:
if wait> 200 :
wait -= 200
else:
wait = 0
resource += 50
elif (speed/minlimted) < 0.90:
if wait> 100 :
wait -= 100
else:
wait = 0
resource += 20
elif (speed/minlimted) < 0.95:
if wait> 50 :
wait -= 50
else:
wait = 0
resource += 10
elif (speed/minlimted) < 0.97:
if wait> 10 :
wait -= 10
else:
wait = 0
resource += 5
elif (speed/minlimted) < 0.998:
if wait>= 1 :wait -= 1
resource += 1
else:
wait = 0
print('[%s] 发送消息流速 %s个/秒, 发送总数 %s个, 速率减速补偿-%.2f, 增速补偿%s, 队列余量%s' % (time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())),speed,total,wait*0.01, resource, len(msglist),))
previous_time = int(time.time())
await asyncio.sleep(0.001)
# 主函数修正
async def main():
producer = Producer(conf) # 假设 KafkaProducer 是一个异步版本的 Kafka 生产者
task_paras() # 初始化任务参数
print("执行限速区间, 每秒 %s - %s" % (minlimted, maxlimted))
await asyncio.gather(request_msg(), send_data( producer), statistics())
# 运行主函数
asyncio.run(main())
这段 Python 脚本是一个生产者程序,用于向 Kafka 消息队列发送消息。它使用了asyncio
库来实现异步操作,以及confluent_kafka
库来与 Kafka 进行交互。下面是对脚本的详细解释:
导入模块:
time
:用于时间相关的操作。asyncio
:用于编写单线程并发代码,使用协程。ThreadPoolExecutor
:用于创建线程池。KafkaMsgMock
:自定义模块,用于生成模拟的 Kafka 消息。KafkaProducer
:Kafka 生产者类(未在脚本中直接使用,可能是导入错误)。Percentage
:自定义模块,可能用于计算百分比或相关比例。queue
:用于创建线程安全的队列。threading
:用于线程相关的操作。json
:用于处理 JSON 数据。ast
:用于处理 Python 抽象语法树。datetime
:用于处理日期和时间。全局变量:
mocInstance
:KafkaMsgMock
的实例,用于生成消息。percIns
:Percentage
的实例,用于计算比例。task_list
:任务列表,存储任务参数。total
:记录发送消息的总数。msglist
:存储生成的消息。wait
:控制发送速率的等待时间。resource
:资源计数,可能用于控制批量发送的大小。previous_time
:记录上一次发送消息的时间。Kafka 配置:
conf
:包含 Kafka 集群的连接信息,包括服务器地址、安全协议、SASL 认证信息。生产者实例:
producer
:使用配置conf
创建的 Kafka 生产者实例。任务参数创建函数:
task_paras
:创建任务参数,根据Percentage
实例计算每种消息类型和控制器的发送数量。异步调用装饰器:
async_call
:装饰器,用于将同步函数转换为异步函数。生成数据函数:
comon_msg
、kce_msg
、kcecpuc_msg
:这些函数使用mocInstance
生成不同类型和数量的消息。发送数据异步函数:
send
:异步发送单条消息到 Kafka。send_batched
:异步批量发送消息。请求消息函数:
request_msg
:不断生成消息并存入msglist
。发送数据逻辑:
send_data
:从msglist
中取出消息并发送。统计函数:
statistics
:统计发送速率,并根据速率调整发送策略。主函数:
main
:初始化任务参数,启动请求消息、发送数据和统计的协程。运行主函数:
asyncio.run(main())
启动程序。脚本开始部分导入了多个 Python 模块,这些模块提供了时间处理、异步编程、线程管理、JSON 处理等功能。
脚本定义了一些全局变量,比如total
(用来记录发送的消息总数)、msglist
(用来存储待发送的消息)、wait
(用来控制发送速率的延迟)等。
conf
字典包含了连接到 Kafka 集群所需的配置信息,比如服务器地址、安全协议、认证信息等。
使用conf
配置信息,创建了一个 Kafka 生产者实例producer
,用于发送消息到 Kafka。
task_paras
函数用于初始化发送任务,它会根据Percentage
类计算出每种消息类型和控制器的发送数量,并将这些任务添加到task_list
。
async_call
装饰器是一个高级功能,它允许我们将普通的函数转换为异步函数,这样就可以在等待某些操作(比如 I/O 操作)完成时释放执行权,提高程序效率。
comon_msg
、kce_msg
和kcecpuc_msg
函数用于生成不同类型和数量的消息。这些函数被async_call
装饰器装饰,使其成为异步函数。
send
函数是一个异步函数,它负责将单个消息发送到 Kafka。send_batched
函数则是批量发送消息的异步函数。
request_msg
函数不断生成消息并添加到msglist
。send_data
函数则从msglist
中取出消息并发送。
statistics
函数负责统计当前的发送速率,并根据预设的上限和下限调整发送速率。如果发送速率过快,它会增加wait
的值,从而减慢发送速率;如果发送速率过慢,它会减少wait
的值,加快发送速率。
main
函数是程序的入口点,它初始化任务参数,然后启动三个协程:request_msg
、send_data
和statistics
。这三个协程分别负责生成消息、发送消息和统计发送速率。
最后,使用asyncio.run(main())
启动整个程序。
asyncio
库来实现异步操作,confluent_kafka
库来与 Kafka 进行交互,并通过自定义的装饰器和函数来生成和发送消息。time
:计时器,用来知道现在几点。asyncio
:特殊技能,让厨师可以同时做很多事情。ThreadPoolExecutor
:多任务处理,就像多个厨师同时工作。KafkaMsgMock
:食材供应商,提供模拟的食物。Producer
:点餐机,用来发送食物订单。mocInstance
:食材供应商的一个实例。percIns
:计算器,用来计算每种食物的比例。task_list
:菜单列表,记录要准备哪些食物。total
:订单总数,记录一共要准备多少份食物。msglist
:待处理的订单,等待送出的食物。wait
:等待时间,如果太忙了,可能需要让顾客等一下。resource
:资源,比如厨师的数量或者食材的存量。previous_time
:上次送出食物的时间。conf
:点餐机的设置,包括点餐机的地址、安全方式、用户名和密码。producer
:根据设置conf
创建的点餐机实例。task_paras
:决定今天要准备哪些食物,每种食物要准备多少份。async_call
:让厨师可以快速处理任务,比如快速准备食物。comon_msg
、kce_msg
、kcecpuc_msg
:这些是准备不同食物的方法。send
:将准备好的食物通过点餐机发送出去。send_batched
:如果有多个订单,可以一次性发送多个食物。request_msg
:厨师不断地准备食物,直到厨房关闭。send_data
:厨师根据当前的情况,决定发送多少食物。statistics
:监控厨房的工作情况,如果太忙了就减慢速度,如果不忙就加快速度。main
:启动厨房,开始准备食物和发送订单。asyncio.run(main())
:告诉厨房开始工作。