零碎知识 模拟上游服务,使用脚本推送消息给 Kafka 的解析

大海 · 2024年08月09日 · 2459 次阅读

压测场景

验证上游数据量不断加大的情况下,GOV API 的数据发送与接收,功能保持正常,接口不存在明显的错误率。

脚本代码

import time
import asyncio
from concurrent.futures import ThreadPoolExecutor
import time
from DataMock import  KafkaMsgMock
from kafka import KafkaProducer
from Percentages import Percentage
import time, random, queue
from threading import Thread
import threading
import json
import ast
import datetime
from confluent_kafka import Producer

mocInstance = KafkaMsgMock()
percIns = Percentage()
task_list = []
global total
total = 0
global msglist
msglist = []
global wait
wait = 0
global resource
resource = 0
global previous_time
previous_time = int(time.time())


# 此处定义期望的发送速率
maxlimted = 2790
minlimted = 2760

# kafka连接信息
conf = {
    'bootstrap.servers': 'XXXXX',
    'security.protocol': 'XXXXX',
    'sasl.mechanisms': 'XXXXX',
    'sasl.username': 'XXXXX',
    'sasl.password': 'XXXXX'
} 
producer = Producer(conf)    

# 创建默认任务因子
def task_paras():
    #task_list.append({"controller": "LCE", "msgType": 'fault', "count": 3500})
    #task_list.append({"controller": 'LCE', "msgType": 'alarm', "count": 1500})
    #task_list.append({"controller": 'LCE', "msgType": 'serviceModeChange', "count": 5000})
    for m_Type in ['alarm','fault','serviceModeChange','movementData']:
        for contr_type in ['LCE','GCE','STEP','ESC','KSE','DTU']:
            count = percIns.calculate(controller = contr_type, msgType = m_Type, total = 10000)
            if count > 0:
                task_list.append({"controller" : contr_type, "msgType" : m_Type, "count" : count})

# 定义异步调用装饰器
def async_call(func):
    async def wrapper(*args, **kwargs):
        loop = asyncio.get_running_loop()
        return await loop.run_in_executor(None, func, *args, **kwargs)
    return wrapper

async def utcTime():
    '''
    make utc time for timestemp
    '''
    return datetime.datetime.utcnow().isoformat()[0:23] + 'Z'

# 生成数据函数修正
@async_call
def comon_msg(controller, msgType, count):
    return mocInstance.comon_msg(controller=controller, msgType=msgType, count=count)

@async_call
def kce_msg(msgType, count):
    return mocInstance.kce_msg(msgType=msgType, count=count)

@async_call
def kcecpuc_msg(msgType, count):
    return mocInstance.kcecpuc_msg(msgType=msgType, count=count)

# 发送数据异步函数修正
async def send(msg, producer):
    global previous_time, total, wait
    try:
        topic, data = msg
        data['Param']['Timestamp'] = await utcTime()

        if wait > 0:
            await asyncio.sleep(wait*0.001)           
        producer.produce("gov-"+ topic, json.dumps(data).encode('utf-8'))
        total += 1

        curr_time = int(time.time())
        if curr_time != previous_time:
            previous_time = curr_time

    except Exception as e:
        print(f"Error sending message '{msg}': {e}")  # 添加异常日志


# 发送数据批处理修正
async def send_batched(msgs, producer):
    await asyncio.gather(*[send(msg, producer) for msg in msgs])
    producer.flush() 

# 请求消息函数修正
async def request_msg():

    global msglist
    while True:
        if len(msglist) < 300000:
            for unit in task_list:
                if unit['controller'] == 'KCE':
                    msglist += await kce_msg(unit['msgType'], unit['count'])
                elif unit['controller'] == 'KCECPUC':
                    msglist += await kcecpuc_msg(unit['msgType'], unit['count'])
                else:
                    msglist += await comon_msg(unit['controller'], unit['msgType'], unit['count'])
            random.shuffle(msglist)
        await asyncio.sleep(0.01)

# 发送数据逻辑修正
async def send_data(producer):
    while True:
        global msglist,resource
        if len(msglist) > 0:

            if resource > 15000: resource = 15000
            if len(msglist) > (1000 + int(resource)):
                msgs    = msglist[:(1000 + int(resource))]
                msglist = msglist[(1000 + int(resource)):]
            else:
                msgs    = msglist
                msglist = msglist[len(msglist):-1]

            await send_batched(msgs, producer)
        await asyncio.sleep(0.01)

# 统计函数修正
async def statistics():
    start_time = ''
    global previous_time
    previous_time = int(time.time())
    time.sleep(1)
    while 1:
        global total
        global wait
        global msglist
        global resource

        if total != 0:
            curr_time =int(time.time())
            if start_time == '':
                start_time = curr_time
            if (curr_time - start_time) == 0:
                continue
            if curr_time != previous_time:
                speed = int( total / (curr_time - start_time))
                if speed > maxlimted:
                    if wait <= 100000: 
                        if (speed/maxlimted) > 1.5:
                             wait += 500
                             if resource > 100 : resource -= 100                     
                        elif (speed/maxlimted) > 1.15:
                             wait += 100
                             if resource > 20 : resource -= 20 
                        elif (speed/maxlimted) > 1.10:
                             wait += 50 
                             if resource > 15 : resource -= 15
                        elif (speed/maxlimted) > 1.05:
                             wait += 20 
                             if resource > 10 : resource -= 10
                        elif (speed/maxlimted) > 1.03:
                             wait += 10
                             if resource > 5 : resource -= 5
                        elif (speed/maxlimted) > 1.015:
                             wait += 1
                             if resource >= 1 : resource -= 1
                elif speed < minlimted:
                    if wait >= 0 :
                        if (speed/minlimted) < 0.6:
                            if wait> 500 :
                                wait -= 500
                            else:
                                wait = 0
                            resource += 100
                        elif (speed/minlimted) < 0.85:
                            if wait> 200 :
                                wait -= 200
                            else:
                                wait = 0
                            resource += 50
                        elif (speed/minlimted) < 0.90:
                            if wait> 100 : 
                                wait -= 100 
                            else:
                                wait = 0
                            resource += 20
                        elif (speed/minlimted) < 0.95:
                            if wait> 50 : 
                                wait -= 50 
                            else:
                                wait = 0
                            resource += 10
                        elif (speed/minlimted) < 0.97:
                            if wait> 10 : 
                                wait -= 10 
                            else:
                                wait = 0
                            resource += 5
                        elif (speed/minlimted) < 0.998:
                            if wait>= 1 :wait -= 1
                            resource += 1
                    else:
                        wait = 0
                print('[%s] 发送消息流速 %s个/秒, 发送总数 %s个, 速率减速补偿-%.2f, 增速补偿%s, 队列余量%s' % (time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())),speed,total,wait*0.01, resource, len(msglist),))
                previous_time = int(time.time())
        await asyncio.sleep(0.001)

# 主函数修正
async def main():
    producer = Producer(conf)  # 假设 KafkaProducer 是一个异步版本的 Kafka 生产者
    task_paras()  # 初始化任务参数
    print("执行限速区间, 每秒 %s - %s" % (minlimted, maxlimted))
    await asyncio.gather(request_msg(), send_data( producer), statistics())

# 运行主函数
asyncio.run(main())

脚本解析

这段 Python 脚本是一个生产者程序,用于向 Kafka 消息队列发送消息。它使用了asyncio库来实现异步操作,以及confluent_kafka库来与 Kafka 进行交互。下面是对脚本的详细解释:

  1. 导入模块

    • time:用于时间相关的操作。
    • asyncio:用于编写单线程并发代码,使用协程。
    • ThreadPoolExecutor:用于创建线程池。
    • KafkaMsgMock:自定义模块,用于生成模拟的 Kafka 消息。
    • KafkaProducer:Kafka 生产者类(未在脚本中直接使用,可能是导入错误)。
    • Percentage:自定义模块,可能用于计算百分比或相关比例。
    • queue:用于创建线程安全的队列。
    • threading:用于线程相关的操作。
    • json:用于处理 JSON 数据。
    • ast:用于处理 Python 抽象语法树。
    • datetime:用于处理日期和时间。
  2. 全局变量

    • mocInstanceKafkaMsgMock的实例,用于生成消息。
    • percInsPercentage的实例,用于计算比例。
    • task_list:任务列表,存储任务参数。
    • total:记录发送消息的总数。
    • msglist:存储生成的消息。
    • wait:控制发送速率的等待时间。
    • resource:资源计数,可能用于控制批量发送的大小。
    • previous_time:记录上一次发送消息的时间。
  3. Kafka 配置

    • conf:包含 Kafka 集群的连接信息,包括服务器地址、安全协议、SASL 认证信息。
  4. 生产者实例

    • producer:使用配置conf创建的 Kafka 生产者实例。
  5. 任务参数创建函数

    • task_paras:创建任务参数,根据Percentage实例计算每种消息类型和控制器的发送数量。
  6. 异步调用装饰器

    • async_call:装饰器,用于将同步函数转换为异步函数。
  7. 生成数据函数

    • comon_msgkce_msgkcecpuc_msg:这些函数使用mocInstance生成不同类型和数量的消息。
  8. 发送数据异步函数

    • send:异步发送单条消息到 Kafka。
    • send_batched:异步批量发送消息。
  9. 请求消息函数

    • request_msg:不断生成消息并存入msglist
  10. 发送数据逻辑

    • send_data:从msglist中取出消息并发送。
  11. 统计函数

    • statistics:统计发送速率,并根据速率调整发送策略。
  12. 主函数

    • main:初始化任务参数,启动请求消息、发送数据和统计的协程。
  13. 运行主函数

    • 使用asyncio.run(main())启动程序。

整体来看,这个脚本的目的是模拟向 Kafka 发送不同类型和数量的消息,同时控制发送速率以满足特定的性能测试需求。脚本使用了异步编程来提高效率,并通过全局变量来共享状态。

好的,让我们更细致地一步步解析这个脚本。我会尽量简化和解释每个部分的作用,帮助你更好地理解。

1. 导入必要的库

脚本开始部分导入了多个 Python 模块,这些模块提供了时间处理、异步编程、线程管理、JSON 处理等功能。

2. 初始化全局变量

脚本定义了一些全局变量,比如total(用来记录发送的消息总数)、msglist(用来存储待发送的消息)、wait(用来控制发送速率的延迟)等。

3. Kafka 连接配置

conf字典包含了连接到 Kafka 集群所需的配置信息,比如服务器地址、安全协议、认证信息等。

4. 创建 Kafka 生产者实例

使用conf配置信息,创建了一个 Kafka 生产者实例producer,用于发送消息到 Kafka。

5. 定义任务参数

task_paras函数用于初始化发送任务,它会根据Percentage类计算出每种消息类型和控制器的发送数量,并将这些任务添加到task_list

6. 异步调用装饰器

async_call装饰器是一个高级功能,它允许我们将普通的函数转换为异步函数,这样就可以在等待某些操作(比如 I/O 操作)完成时释放执行权,提高程序效率。

7. 生成消息的函数

comon_msgkce_msgkcecpuc_msg函数用于生成不同类型和数量的消息。这些函数被async_call装饰器装饰,使其成为异步函数。

8. 发送消息的异步函数

send函数是一个异步函数,它负责将单个消息发送到 Kafka。send_batched函数则是批量发送消息的异步函数。

9. 请求消息和发送数据的逻辑

request_msg函数不断生成消息并添加到msglistsend_data函数则从msglist中取出消息并发送。

10. 统计和调整发送速率

statistics函数负责统计当前的发送速率,并根据预设的上限和下限调整发送速率。如果发送速率过快,它会增加wait的值,从而减慢发送速率;如果发送速率过慢,它会减少wait的值,加快发送速率。

11. 主函数

main函数是程序的入口点,它初始化任务参数,然后启动三个协程:request_msgsend_datastatistics。这三个协程分别负责生成消息、发送消息和统计发送速率。

12. 启动程序

最后,使用asyncio.run(main())启动整个程序。

总结

这个脚本的核心是模拟向 Kafka 发送消息的过程,同时通过异步编程和速率控制来满足特定的性能测试需求。它使用了asyncio库来实现异步操作,confluent_kafka库来与 Kafka 进行交互,并通过自定义的装饰器和函数来生成和发送消息。

下面,使用更简单的语言和例子来解释这个脚本。我们可以把脚本比作一个餐厅的厨房,它需要准备和发送食物订单。

1. 导入工具(就像厨房里的工具和食材)

  • time:计时器,用来知道现在几点。
  • asyncio:特殊技能,让厨师可以同时做很多事情。
  • ThreadPoolExecutor:多任务处理,就像多个厨师同时工作。
  • KafkaMsgMock:食材供应商,提供模拟的食物。
  • Producer:点餐机,用来发送食物订单。
  • 其他导入:各种调料和工具,帮助厨房运作。

2. 设置厨房(初始化全局变量)

  • mocInstance:食材供应商的一个实例。
  • percIns:计算器,用来计算每种食物的比例。
  • task_list:菜单列表,记录要准备哪些食物。
  • total:订单总数,记录一共要准备多少份食物。
  • msglist:待处理的订单,等待送出的食物。
  • wait:等待时间,如果太忙了,可能需要让顾客等一下。
  • resource:资源,比如厨师的数量或者食材的存量。
  • previous_time:上次送出食物的时间。

3. 连接点餐机(Kafka 连接信息)

  • conf:点餐机的设置,包括点餐机的地址、安全方式、用户名和密码。

4. 创建点餐机实例(Producer)

  • producer:根据设置conf创建的点餐机实例。

5. 准备菜单(任务参数创建函数)

  • task_paras:决定今天要准备哪些食物,每种食物要准备多少份。

6. 快速处理技能(异步调用装饰器)

  • async_call:让厨师可以快速处理任务,比如快速准备食物。

7. 准备食物(生成数据函数)

  • comon_msgkce_msgkcecpuc_msg:这些是准备不同食物的方法。

8. 发送食物订单(发送数据异步函数)

  • send:将准备好的食物通过点餐机发送出去。
  • send_batched:如果有多个订单,可以一次性发送多个食物。

9. 持续准备食物(请求消息函数)

  • request_msg:厨师不断地准备食物,直到厨房关闭。

10. 发送食物(发送数据逻辑)

  • send_data:厨师根据当前的情况,决定发送多少食物。

11. 监控和调整(统计函数)

  • statistics:监控厨房的工作情况,如果太忙了就减慢速度,如果不忙就加快速度。

12. 开始营业(主函数)

  • main:启动厨房,开始准备食物和发送订单。

13. 启动厨房(运行主函数)

  • asyncio.run(main()):告诉厨房开始工作。

这个脚本就像一个自动化的厨房,它可以自动准备食物、发送订单,并且根据工作量自动调整速度。

如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
暂无回复。
需要 登录 后方可回复, 如果你还没有账号请点击这里 注册