压测场景

验证上游数据量不断加大的情况下,GOV API 的数据发送与接收,功能保持正常,接口不存在明显的错误率。

脚本代码

import time
import asyncio
from concurrent.futures import ThreadPoolExecutor
import time
from DataMock import  KafkaMsgMock
from kafka import KafkaProducer
from Percentages import Percentage
import time, random, queue
from threading import Thread
import threading
import json
import ast
import datetime
from confluent_kafka import Producer

mocInstance = KafkaMsgMock()
percIns = Percentage()
task_list = []
global total
total = 0
global msglist
msglist = []
global wait
wait = 0
global resource
resource = 0
global previous_time
previous_time = int(time.time())


# 此处定义期望的发送速率
maxlimted = 2790
minlimted = 2760

# kafka连接信息
conf = {
    'bootstrap.servers': 'XXXXX',
    'security.protocol': 'XXXXX',
    'sasl.mechanisms': 'XXXXX',
    'sasl.username': 'XXXXX',
    'sasl.password': 'XXXXX'
} 
producer = Producer(conf)    

# 创建默认任务因子
def task_paras():
    #task_list.append({"controller": "LCE", "msgType": 'fault', "count": 3500})
    #task_list.append({"controller": 'LCE', "msgType": 'alarm', "count": 1500})
    #task_list.append({"controller": 'LCE', "msgType": 'serviceModeChange', "count": 5000})
    for m_Type in ['alarm','fault','serviceModeChange','movementData']:
        for contr_type in ['LCE','GCE','STEP','ESC','KSE','DTU']:
            count = percIns.calculate(controller = contr_type, msgType = m_Type, total = 10000)
            if count > 0:
                task_list.append({"controller" : contr_type, "msgType" : m_Type, "count" : count})

# 定义异步调用装饰器
def async_call(func):
    async def wrapper(*args, **kwargs):
        loop = asyncio.get_running_loop()
        return await loop.run_in_executor(None, func, *args, **kwargs)
    return wrapper

async def utcTime():
    '''
    make utc time for timestemp
    '''
    return datetime.datetime.utcnow().isoformat()[0:23] + 'Z'

# 生成数据函数修正
@async_call
def comon_msg(controller, msgType, count):
    return mocInstance.comon_msg(controller=controller, msgType=msgType, count=count)

@async_call
def kce_msg(msgType, count):
    return mocInstance.kce_msg(msgType=msgType, count=count)

@async_call
def kcecpuc_msg(msgType, count):
    return mocInstance.kcecpuc_msg(msgType=msgType, count=count)

# 发送数据异步函数修正
async def send(msg, producer):
    global previous_time, total, wait
    try:
        topic, data = msg
        data['Param']['Timestamp'] = await utcTime()

        if wait > 0:
            await asyncio.sleep(wait*0.001)           
        producer.produce("gov-"+ topic, json.dumps(data).encode('utf-8'))
        total += 1

        curr_time = int(time.time())
        if curr_time != previous_time:
            previous_time = curr_time

    except Exception as e:
        print(f"Error sending message '{msg}': {e}")  # 添加异常日志


# 发送数据批处理修正
async def send_batched(msgs, producer):
    await asyncio.gather(*[send(msg, producer) for msg in msgs])
    producer.flush() 

# 请求消息函数修正
async def request_msg():

    global msglist
    while True:
        if len(msglist) < 300000:
            for unit in task_list:
                if unit['controller'] == 'KCE':
                    msglist += await kce_msg(unit['msgType'], unit['count'])
                elif unit['controller'] == 'KCECPUC':
                    msglist += await kcecpuc_msg(unit['msgType'], unit['count'])
                else:
                    msglist += await comon_msg(unit['controller'], unit['msgType'], unit['count'])
            random.shuffle(msglist)
        await asyncio.sleep(0.01)

# 发送数据逻辑修正
async def send_data(producer):
    while True:
        global msglist,resource
        if len(msglist) > 0:

            if resource > 15000: resource = 15000
            if len(msglist) > (1000 + int(resource)):
                msgs    = msglist[:(1000 + int(resource))]
                msglist = msglist[(1000 + int(resource)):]
            else:
                msgs    = msglist
                msglist = msglist[len(msglist):-1]

            await send_batched(msgs, producer)
        await asyncio.sleep(0.01)

# 统计函数修正
async def statistics():
    start_time = ''
    global previous_time
    previous_time = int(time.time())
    time.sleep(1)
    while 1:
        global total
        global wait
        global msglist
        global resource

        if total != 0:
            curr_time =int(time.time())
            if start_time == '':
                start_time = curr_time
            if (curr_time - start_time) == 0:
                continue
            if curr_time != previous_time:
                speed = int( total / (curr_time - start_time))
                if speed > maxlimted:
                    if wait <= 100000: 
                        if (speed/maxlimted) > 1.5:
                             wait += 500
                             if resource > 100 : resource -= 100                     
                        elif (speed/maxlimted) > 1.15:
                             wait += 100
                             if resource > 20 : resource -= 20 
                        elif (speed/maxlimted) > 1.10:
                             wait += 50 
                             if resource > 15 : resource -= 15
                        elif (speed/maxlimted) > 1.05:
                             wait += 20 
                             if resource > 10 : resource -= 10
                        elif (speed/maxlimted) > 1.03:
                             wait += 10
                             if resource > 5 : resource -= 5
                        elif (speed/maxlimted) > 1.015:
                             wait += 1
                             if resource >= 1 : resource -= 1
                elif speed < minlimted:
                    if wait >= 0 :
                        if (speed/minlimted) < 0.6:
                            if wait> 500 :
                                wait -= 500
                            else:
                                wait = 0
                            resource += 100
                        elif (speed/minlimted) < 0.85:
                            if wait> 200 :
                                wait -= 200
                            else:
                                wait = 0
                            resource += 50
                        elif (speed/minlimted) < 0.90:
                            if wait> 100 : 
                                wait -= 100 
                            else:
                                wait = 0
                            resource += 20
                        elif (speed/minlimted) < 0.95:
                            if wait> 50 : 
                                wait -= 50 
                            else:
                                wait = 0
                            resource += 10
                        elif (speed/minlimted) < 0.97:
                            if wait> 10 : 
                                wait -= 10 
                            else:
                                wait = 0
                            resource += 5
                        elif (speed/minlimted) < 0.998:
                            if wait>= 1 :wait -= 1
                            resource += 1
                    else:
                        wait = 0
                print('[%s] 发送消息流速 %s个/秒, 发送总数 %s个, 速率减速补偿-%.2f, 增速补偿%s, 队列余量%s' % (time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())),speed,total,wait*0.01, resource, len(msglist),))
                previous_time = int(time.time())
        await asyncio.sleep(0.001)

# 主函数修正
async def main():
    producer = Producer(conf)  # 假设 KafkaProducer 是一个异步版本的 Kafka 生产者
    task_paras()  # 初始化任务参数
    print("执行限速区间, 每秒 %s - %s" % (minlimted, maxlimted))
    await asyncio.gather(request_msg(), send_data( producer), statistics())

# 运行主函数
asyncio.run(main())

脚本解析

这段 Python 脚本是一个生产者程序,用于向 Kafka 消息队列发送消息。它使用了asyncio库来实现异步操作,以及confluent_kafka库来与 Kafka 进行交互。下面是对脚本的详细解释:

  1. 导入模块

  2. 全局变量

  3. Kafka 配置

  4. 生产者实例

  5. 任务参数创建函数

  6. 异步调用装饰器

  7. 生成数据函数

  8. 发送数据异步函数

  9. 请求消息函数

  10. 发送数据逻辑

  11. 统计函数

  12. 主函数

  13. 运行主函数

整体来看,这个脚本的目的是模拟向 Kafka 发送不同类型和数量的消息,同时控制发送速率以满足特定的性能测试需求。脚本使用了异步编程来提高效率,并通过全局变量来共享状态。

好的,让我们更细致地一步步解析这个脚本。我会尽量简化和解释每个部分的作用,帮助你更好地理解。

1. 导入必要的库

脚本开始部分导入了多个 Python 模块,这些模块提供了时间处理、异步编程、线程管理、JSON 处理等功能。

2. 初始化全局变量

脚本定义了一些全局变量,比如total(用来记录发送的消息总数)、msglist(用来存储待发送的消息)、wait(用来控制发送速率的延迟)等。

3. Kafka 连接配置

conf字典包含了连接到 Kafka 集群所需的配置信息,比如服务器地址、安全协议、认证信息等。

4. 创建 Kafka 生产者实例

使用conf配置信息,创建了一个 Kafka 生产者实例producer,用于发送消息到 Kafka。

5. 定义任务参数

task_paras函数用于初始化发送任务,它会根据Percentage类计算出每种消息类型和控制器的发送数量,并将这些任务添加到task_list

6. 异步调用装饰器

async_call装饰器是一个高级功能,它允许我们将普通的函数转换为异步函数,这样就可以在等待某些操作(比如 I/O 操作)完成时释放执行权,提高程序效率。

7. 生成消息的函数

comon_msgkce_msgkcecpuc_msg函数用于生成不同类型和数量的消息。这些函数被async_call装饰器装饰,使其成为异步函数。

8. 发送消息的异步函数

send函数是一个异步函数,它负责将单个消息发送到 Kafka。send_batched函数则是批量发送消息的异步函数。

9. 请求消息和发送数据的逻辑

request_msg函数不断生成消息并添加到msglistsend_data函数则从msglist中取出消息并发送。

10. 统计和调整发送速率

statistics函数负责统计当前的发送速率,并根据预设的上限和下限调整发送速率。如果发送速率过快,它会增加wait的值,从而减慢发送速率;如果发送速率过慢,它会减少wait的值,加快发送速率。

11. 主函数

main函数是程序的入口点,它初始化任务参数,然后启动三个协程:request_msgsend_datastatistics。这三个协程分别负责生成消息、发送消息和统计发送速率。

12. 启动程序

最后,使用asyncio.run(main())启动整个程序。

总结

这个脚本的核心是模拟向 Kafka 发送消息的过程,同时通过异步编程和速率控制来满足特定的性能测试需求。它使用了asyncio库来实现异步操作,confluent_kafka库来与 Kafka 进行交互,并通过自定义的装饰器和函数来生成和发送消息。

下面,使用更简单的语言和例子来解释这个脚本。我们可以把脚本比作一个餐厅的厨房,它需要准备和发送食物订单。

1. 导入工具(就像厨房里的工具和食材)

2. 设置厨房(初始化全局变量)

3. 连接点餐机(Kafka 连接信息)

4. 创建点餐机实例(Producer)

5. 准备菜单(任务参数创建函数)

6. 快速处理技能(异步调用装饰器)

7. 准备食物(生成数据函数)

8. 发送食物订单(发送数据异步函数)

9. 持续准备食物(请求消息函数)

10. 发送食物(发送数据逻辑)

11. 监控和调整(统计函数)

12. 开始营业(主函数)

13. 启动厨房(运行主函数)

这个脚本就像一个自动化的厨房,它可以自动准备食物、发送订单,并且根据工作量自动调整速度。


↙↙↙阅读原文可查看相关链接,并与作者交流