上游数据
测试场景
import time
import asyncio
from concurrent.futures import ThreadPoolExecutor
from DataMock import KafkaMsgMock
from Percentages import Percentage
import random
from confluent_kafka import Producer
import json
import datetime
mocInstance = KafkaMsgMock()
percIns = Percentage()
task_list = []
total = 0
msglist = []
wait = 0
resource = 0
previous_time = int(time.time())
task_completed = False # 添加任务完成标志
log_completed = False # 添加日志打印标志
# 此处定义期望的发送速率
maxlimted = 1000
minlimted = 990
# kafka连接信息
conf = {
'bootstrap.servers': xxxxxxx',
'security.protocol': xxxxxxx',
'sasl.mechanisms': 'xxxxxxx',
'sasl.username': xxxxxxx',
'sasl.password': 'xxxxxxx'
}
producer = Producer(conf)
# 创建默认任务因子
def task_paras():
for m_Type in ['alarm', 'fault', 'serviceModeChange', 'movementData']:
for contr_type in ['LCE', 'GCE', 'STEP', 'ESC', 'KSE', 'DTU']:
count = percIns.calculate(controller=contr_type, msgType=m_Type, total=10000)
if count > 0:
task_list.append({"controller": contr_type, "msgType": m_Type, "count": count})
async def utcTime():
return datetime.datetime.utcnow().isoformat()[0:23] + 'Z'
# 定义异步调用装饰器
def async_call(func):
async def wrapper(*args, **kwargs):
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, func, *args, **kwargs)
return wrapper
@async_call
def comon_msg(controller, msgType, count):
return mocInstance.comon_msg(controller=controller, msgType=msgType, count=count)
@async_call
def kce_msg(msgType, count):
return mocInstance.kce_msg(msgType=msgType, count=count)
@async_call
def kcecpuc_msg(msgType, count):
return mocInstance.kcecpuc_msg(msgType=msgType, count=count)
# 发送数据异步函数修正
async def send(msg, producer):
global total, wait, task_completed, previous_time, log_completed
try:
topic, data = msg
data['Param']['Timestamp'] = await utcTime()
if wait > 0:
await asyncio.sleep(wait * 0.001)
producer.produce("gov-" + topic, json.dumps(data).encode('utf-8'))
total += 1
# 检查是否达到发送上限
if not task_completed:
if total >= 20000: # 设置发送上限为20,000
if not log_completed:
print(f"总共发送了{total}条消息,发送完毕。")
log_completed = True
task_completed = True # 设置任务完成标志
# 更新 previous_time
curr_time = int(time.time())
if curr_time != previous_time and not task_completed:
previous_time = curr_time
except Exception as e:
print(f"Error sending message '{msg}': {e}")
# 发送数据批处理修正
async def send_batched(msgs, producer):
await asyncio.gather(*[send(msg, producer) for msg in msgs])
producer.flush()
async def request_msg():
global msglist, task_completed
while not task_completed:
if len(msglist) < 300000:
for unit in task_list:
if unit['controller'] == 'KCE':
msglist += await kce_msg(unit['msgType'], unit['count'])
elif unit['controller'] == 'KCECPUC':
msglist += await kcecpuc_msg(unit['msgType'], unit['count'])
else:
msglist += await comon_msg(unit['controller'], unit['msgType'], unit['count'])
random.shuffle(msglist)
await asyncio.sleep(0.01)
async def send_data(producer):
global msglist, resource, task_completed
while not task_completed:
if len(msglist) > 0:
if resource > 15000:
resource = 15000
if len(msglist) > (1000 + int(resource)):
msgs = msglist[:(1000 + int(resource))]
msglist = msglist[(1000 + int(resource)):]
else:
msgs = msglist
msglist = msglist[len(msglist):-1]
await send_batched(msgs, producer)
await asyncio.sleep(0.01)
# 统计函数修正
async def statistics():
global previous_time, task_completed
start_time = int(time.time())
previous_time = start_time
time.sleep(1)
while not task_completed:
global total, wait, msglist, resource
if total != 0:
curr_time = int(time.time())
if (curr_time - start_time) == 0:
continue
if curr_time != previous_time:
speed = int(total / (curr_time - start_time))
if speed > maxlimted:
if wait <= 100000:
if (speed / maxlimted) > 1.5:
wait += 500
if resource > 100:
resource -= 100
elif (speed / maxlimted) > 1.15:
wait += 100
if resource > 20:
resource -= 20
elif (speed / maxlimted) > 1.10:
wait += 50
if resource > 15:
resource -= 15
elif (speed / maxlimted) > 1.05:
wait += 20
if resource > 10:
resource -= 10
elif (speed / maxlimted) > 1.03:
wait += 10
if resource > 5:
resource -= 5
elif (speed / maxlimted) > 1.015:
wait += 1
if resource >= 1:
resource -= 1
elif speed < minlimted:
if wait >= 0:
if (speed / minlimted) < 0.6:
if wait > 500:
wait -= 500
else:
wait = 0
resource += 100
elif (speed / minlimted) < 0.85:
if wait > 200:
wait -= 200
else:
wait = 0
resource += 50
elif (speed / minlimted) < 0.90:
if wait > 100:
wait -= 100
else:
wait = 0
resource += 20
elif (speed / minlimted) < 0.95:
if wait > 50:
wait -= 50
else:
wait = 0
resource += 10
elif (speed / minlimted) < 0.97:
if wait > 10:
wait -= 10
else:
wait = 0
resource += 5
elif (speed / minlimted) < 0.998:
if wait >= 1:
wait -= 1
resource += 1
else:
wait = 0
print('[%s] 发送消息流速 %s个/秒, 发送总数 %s个, 速率减速补偿-%.2f, 增速补偿%s, 队列余量%s' % (
time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())),
speed, total, wait * 0.01, resource, len(msglist),
))
previous_time = curr_time
await asyncio.sleep(0.001)
async def main():
task_paras()
print("执行限速区间, 每秒 %s - %s" % (minlimted, maxlimted))
try:
# 启动 Kafka 生产者
producer = Producer(conf)
# 运行异步任务
await asyncio.gather(
request_msg(),
send_data(producer),
statistics()
)
# 检查任务是否完成
while not task_completed:
await asyncio.sleep(0.1)
finally:
# 关闭 Kafka 生产者
producer.flush()
print("Kafka生产者已关闭。")
if __name__ == "__main__":
asyncio.run(main())
import time
import asyncio
from concurrent.futures import ThreadPoolExecutor
from DataMock import KafkaMsgMock
from Percentages import Percentage
import random
from confluent_kafka import Producer
import json
import datetime
mocInstance = KafkaMsgMock()
percIns = Percentage()
task_list = []
total = 0
msglist = []
wait = 0
resource = 0
previous_time = int(time.time())
task_completed = False # 添加任务完成标志
log_completed = False # 添加日志打印标志
# 此处定义期望的发送速率
maxlimted = 1100
minlimted = 1000
# kafka连接信息
conf = {
'bootstrap.servers': xxxxxxx',
'security.protocol': 'xxxxxxx',
'sasl.mechanisms': xxxxxxx',
'sasl.username': 'xxxxxxx',
'sasl.password': 'xxxxxxx'
}
producer = Producer(conf)
def task_paras():
for m_Type in ['alarm', 'fault', 'serviceModeChange', 'movementData']:
for contr_type in ['LCE', 'GCE', 'STEP', 'ESC', 'KSE', 'DTU']:
count = percIns.calculate(controller=contr_type, msgType=m_Type, total=10000)
if count > 0:
task_list.append({"controller": contr_type, "msgType": m_Type, "count": count})
async def utcTime():
return datetime.datetime.utcnow().isoformat()[0:23] + 'Z'
utcTime:异步函数,返回当前的 UTC 时间戳。
def async_call(func):
async def wrapper(*args, **kwargs):
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, func, *args, **kwargs)
return wrapper
@async_call
def comon_msg(controller, msgType, count):
return mocInstance.comon_msg(controller=controller, msgType=msgType, count=count)
@async_call
def kce_msg(msgType, count):
return mocInstance.kce_msg(msgType=msgType, count=count)
@async_call
def kcecpuc_msg(msgType, count):
return mocInstance.kcecpuc_msg(msgType=msgType, count=count)
async def send(msg, producer):
global total, wait, task_completed, previous_time, log_completed
try:
topic, data = msg
data['Param']['Timestamp'] = await utcTime()
if wait > 0:
await asyncio.sleep(wait * 0.001)
producer.produce("gov-" + topic, json.dumps(data).encode('utf-8'))
total += 1
# 检查是否达到发送上限
if not task_completed:
if total >= 20000: # 设置发送上限为20,000
if not log_completed:
print(f"总共发送了{total}条消息,发送完毕。")
log_completed = True
task_completed = True # 设置任务完成标志
# 更新 previous_time
curr_time = int(time.time())
if curr_time != previous_time and not task_completed:
previous_time = curr_time
except Exception as e:
print(f"Error sending message '{msg}': {e}")
async def send_batched(msgs, producer):
await asyncio.gather(*[send(msg, producer) for msg in msgs])
producer.flush()
async def request_msg():
global msglist, task_completed
while not task_completed:
if len(msglist) < 300000:
for unit in task_list:
if unit['controller'] == 'KCE':
msglist += await kce_msg(unit['msgType'], unit['count'])
elif unit['controller'] == 'KCECPUC':
msglist += await kcecpuc_msg(unit['msgType'], unit['count'])
else:
msglist += await comon_msg(unit['controller'], unit['msgType'], unit['count'])
random.shuffle(msglist)
await asyncio.sleep(0.01)
async def statistics():
global previous_time, task_completed
start_time = int(time.time())
previous_time = start_time
time.sleep(1)
while not task_completed:
global total, wait, msglist, resource
if total != 0:
curr_time = int(time.time())
if (curr_time - start_time) == 0:
continue
if curr_time != previous_time:
speed = int(total / (curr_time - start_time))
if speed > maxlimted:
# 如果发送速度超过上限,增加延时
if wait <= 100000:
if (speed / maxlimted) > 1.5:
wait += 500
if resource > 100:
resource -= 100
elif (speed / maxlimted) > 1.15:
wait += 100
if resource > 20:
resource -= 20
# 省略部分逻辑
elif speed < minlimted:
# 如果发送速度低于下限,减少延时
if wait >= 0:
if (speed / minlimted) < 0.6:
if wait > 500:
wait -= 500
else:
wait = 0
resource += 100
elif (speed / minlimted) < 0.85:
if wait > 200:
wait -= 200
else:
wait = 0
resource += 50
# 省略部分逻辑
print('[%s] 发送消息流速 %s个/秒, 发送总数 %s个, 速率减速补偿-%.2f, 增速补偿%s, 队列余量%s' % (
time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())),
speed, total, wait * 0.01, resource, len(msglist),
))
previous_time = curr_time
await asyncio.sleep(0.001)
async def main():
task_paras()
print("执行限速区间, 每秒 %s - %s" % (minlimted, maxlimted))
try:
# 启动 Kafka 生产者
producer = Producer(conf)
# 运行异步任务
await asyncio.gather(
request_msg(),
send_data(producer),
statistics()
)
# 检查任务是否完成
while not task_completed:
await asyncio.sleep(0.1)
finally:
# 关闭 Kafka 生产者
producer.flush()
print("Kafka生产者已关闭。")
if __name__ == "__main__":
asyncio.run(main())