开源测试工具 atomic-bomb-engine-py:使用 rust 开发的 python 压测工具

qyzhg · 2024年04月01日 · 最后由 qyzhg 回复于 2024年04月02日 · 7616 次阅读

logo

项目背景

公司的原有压测平台是由 go 开发,使用 locust 作为压力引擎使用,在互联网的大环境下,开始各种降本增效,性能测试的 pod 现在缩减到只有 1/8 个物理核心,这种情况下,locust 大概 300 并发都已经出现了 cpu 瓶颈,所以需要一款性能更好的压测引擎作为替代品,刚开始的时候是想直接使用 wrk 集成进项目中,但是遇见了几个比较麻烦的问题:

  • post 请求需要使用 lua
  • 无法对压测过程进行监听
  • 项目过于庞大,二开十分困难

所以经过调研现有的开源压测引擎,没有符合现在的平台化需求的,所以诞生了这个项目,之所以选用 rust,是因为 rust 的性能会非常的好,而且高并发的压测下,没有 gc 对结果产生影响,引擎写完后,可以直接导出一个 c 的入口,go 开启 cgo 会比较简单的将项目集成。
由于引擎是一个独立的项目,无公司的业务部分,可以直接开源。介于现在大部分测试人员的技术栈都是以 python 为主,所以又在引擎外面,使用 pyo3 开发了一个 python 的包装器,可以让更多 python 技术栈的同学直接调用,为了方便大家使用,又开发了一个比较简单的前端页面,可以满足简单的压测需求。

项目地址

项目已发布到 pip,可以直接使用 pip 安装使用,ci 部分使用 github actions 进行矩阵编译,支持 python3.8-3.12,linux-x86, mac arm、x86, win-x86,应该可以覆盖大部分环境,如果有特殊需求,可以联系作者添加 action

项目内部架构设计图

项目界面

使用说明

准备开始

安装:

pip install atomic-bomb-engine

在 python 中导入

import atomic_bomb_engine

异步使用的时候,还需要引用 asyncio

import asyncio

主要方法说明

多接口压测可以使用 batch_async 方法进行操作,函数签名和解释如下

async def batch_async(
             test_duration_secs: int,
             concurrent_requests: int,
             api_endpoints:List[Dict],
             step_option:Dict[str, int]=None,
             verbose:bool=False,
             should_prevent:bool=False) ->Dict:
    """
        批量压测
        :param test_duration_secs: 测试持续时间
        :param concurrent_requests: 并发数
        :param api_endpoints: 接口信息
        :param step_option: 阶梯加压选项
        :param verbose: 打印详细信息
        :param should_prevent: 是否禁用睡眠
    """

使用 assert_option 方法可以返回断言选项字典

assert_options=[
atomic_bomb_engine.assert_option("$.code", 429),
atomic_bomb_engine.assert_option("$.code", 200)
])
print(result)

jsonpath 如果不会用的话,建议去jsonpath学习
使用 step_option 方法可以返回阶梯加压选项字典

def step_option(increase_step: int, increase_interval: int) -> Dict[str, int]:
    """
    生成step option
    :param increase_step: 阶梯步长
    :param increase_interval: 阶梯间隔
    """

同样的本包中也包含了一个对 api_endpoint 的包装:endpoint 方法,方便调用,endpoint 中的 assert_options 中也可以套用 assert_option 方法

async def run_batch():
    result = await atomic_bomb_engine.batch_async(
        test_duration_secs=10,
        concurrent_requests=10,
        api_endpoints=[
            atomic_bomb_engine.endpoint(
                name="test1",
                url="https:xxxxx1.xx",
                method="get",
                weight=1,
                timeout_secs=10,
                assert_options=[atomic_bomb_engine.assert_option(jsonpath="$.code", reference_object=200)]
            ),
            atomic_bomb_engine.endpoint(
                name="test2",
                url="https://xxxxx2.xx",
                method="get",
                weight=1,
                timeout_secs=10)
        ])
    print(result)

监听时可以使用 BatchListenIter 生成器

async def listen_batch():
    iterator = atomic_bomb_engine.BatchListenIter()
    for message in iterator:
        if message:
            print(message)
        else:
            await asyncio.sleep(0.3)

同时调用时同单接口

async def main():
    await asyncio.gather(
        run_batch(),
        listen_batch(),
    )


if __name__ == "__main__":
    asyncio.run(main())

使用内置 ui 界面

导入

from atomic_bomb_engine import server

导入内置的 server 后,可以使用内置的 http 服务器,开启一个 ui 界面,并且开始监听压测过程中的数据,无需手动迭代数据

import asyncio

import atomic_bomb_engine
from atomic_bomb_engine import server


@server.ui(port=8000)
async def run_batch():
    result = await atomic_bomb_engine.batch_async(
        test_duration_secs=120,
        concurrent_requests=100,
        step_option=atomic_bomb_engine.step_option(increase_step=6, increase_interval=5),
        verbose=False,
        api_endpoints=[
            atomic_bomb_engine.endpoint(name="test-baidu",url="https://baidu.com",method="GET",weight=1,timeout_secs=10),
            atomic_bomb_engine.endpoint(name="test-google", url="https://google.com", method="GET", weight=1, timeout_secs=10),
        ])
    print(result)
    return result


if __name__ == '__main__':
    asyncio.run(run_batch())

BUG 和需求

由于项目开启时间较短,只有短短一个月左右,可能会有一些 bug 和没有做的需求,如果发现了 bug 和需求,都可以联系作者,由于是工作之余开发,可能不会太保证时效性

欢迎加群交流

共收到 13 条回复 时间 点赞

期待更新

赞👍🏻等一个接口关联功能

post 请求需要使用 lua?没看懂,不过 lua 是可以用 Python 实现 lua 的语言层面的解释器。
locust 本身可以容器化,虚拟机启动好多个,做分布式,300 个应该不会瓶颈,可能是写法问题。
并发大还是需要使用旁路收集和到时序数据库在提取绘制吧,要不你 web 内存直接崩了。

qyzhg #10 · 2024年04月01日 Author
陈子昂 回复

说的是 wrk 要使用 lua,分布式加机器是要走预算的,本身我们已经有了分布式,还做了一套精准到 cpu 核心数量的管理,还是希望在根本上解决问题,三年前我们已经用了 influxdb 存数据,前端做的 wasm,保证最少 30 万条数据瞬间渲染不会崩溃

陈子昂 回复


管理 locust 这些东西其实都做了,但是单机性能上不去,大量压测的时候预算还是跑的太快,最终还是决定了这个方案,提升单机性能又不增加脚本的复杂度

qyzhg 回复

不一定要用 wrk,wrk 理论跑那么高,机器除非加网卡,程序的测量方案和测试的不一样所以才需要测试来做压测。用容器来启动不同群组的 lua 脚本 - 来测试不同的场景,容器级别上面的调度。
时序数据库不是新玩意,加削峰不丢就行,其实有很多方案可以存储不一定用时序。
监控侧在数据库出口的地方吗?

陈子昂 回复

所以我们做自研的工具有什么问题吗?

大佬好厉害🎀

big 佬,6666

不会拘泥于前端花里胡哨的实现,选用 Rust 原因里说了对性能、内存管理、互操作性和项目集成的四个方面的综合考虑。感觉你是个老手吧,也不像是做测试的,专门整性能的? 我测功能的,还没接触过会 rust 的

测试新人 回复

我也不是专门整性能的😂 就是遇到什么问题解决什么问题

我开发的压测工具底层使用 c++,业务层可以是 lua,python ,golang。python 现在是我准备抛弃的方案了,性能弱。准备再接入 js,引擎准备用 jerryscript

回复

这个引擎是 rust,接入是 go,python,现在开源出来的是引擎和 python 的 client,python 实际其实就是做一个入参,对性能没有什么影响,go 的业务代码太多了,暂时不打放开了

需要 登录 后方可回复, 如果你还没有账号请点击这里 注册