目标

构建一个分布式的 Http 接口的压力测试平台,核心基于开源的 locust,因为 python 的压测能力较低,slaver 端采用开源的用 go 改写的 boomer。
原版需要提供完整的 go 脚本/python 脚本,一次一用,对他人使用难度大。
设想可以提供 postman 类似的界面来构造接口描述信息,由 master 推送给各个 worker。
各个 worker 解析接口描述信息生成满足 boomer 要求的请求并运行。
通过 locust 的 api 获取指标信息,可以绘制实时折线图(可以改造为 推送 prometheus,配合 grafana 查看历史记录)
想要达到分担压力测试的 worker 可随意增加的目的,所以 master(flask)需要自动发现 woker(boomer)的功能
因为为了日后官方更新后好维护,不想改动 locust 源码和 boomer 源码。
只想在其上层包装,那 master 端依然 python 改写,slaver 依然使用 go 改写。要解决两者信息传递问题。
无责任草图

完成的结果

主要模块的设计说明

改造第一步:了解 boomer 默认服务的启动方式并构建自定义的 Boomer。

主要重点是是修改 boomer 默认方式,改为为可以自定义管理启动、关闭,这样要新建一个使用暴露的公共 Boomer 的 struct 构建一个自定义的 Boomer 实例。

重点是关闭处理:

关闭信号可以来自系统关闭信号(ctrl+c 这种)、可以来自 master 发送的消息、可以是因为其他原因。

// 全局boomer
var globalBoomer *boomer.Boomer
// 接受来自EndBoomer的请求处理
var quitSignal chan int=make(chan int)
// boomer运行状态
var boomerStatus = false
// boomer等待退出
func waitForQuit(gboomer *boomer.Boomer) {
    quitByClient :=false
    boomer.Events.SubscribeOnce("boomer:quit", func() { //防止重复订阅
        defer func(){
            r:=recover();if r!=nil{
            fmt.Println("处理Boomer关闭遇到异常:",r)
        }}()
        boomerStatus=false // 结束运行
        if !quitByClient{
            quitSignal<-1 // 释放下面EndBommer处理协程
            fmt.Println("事件订阅中获取了非Client关闭Boomer的消息")
        }
        fmt.Println("boomer服务已经关闭")
    })
    go func(){ // 此处添加通过EndBommer获取关闭boomer信号处理的代码
        <-quitSignal
        fmt.Println("从管理机client获取了关闭Boomer的消息")
        if boomerStatus{
            quitByClient=true
            gboomer.Quit()
        }
    }()
}

改造第二步:woker 端的 gRPC 服务/etcd 和 protobuf

gRPC/etcd 服务端部分主要五步(代码就不贴了)
1-建立一个 http 监听服务
2-建立 gRPC 服务句柄,将服务 struct 注册到 gRPC 中
3-建立 etcd 客户端,将 gRPC 的 “服务名 + 服务地址” 添加到 etcd 上,并增加心跳检查保持长连接
4-开启一个协程监听关闭/中断信号,注销 etcd 上的信息
5-gRPC 服务使用 http 监听服务启动

protobuf 文件:master 端(go)和 woker 端(python)通信结构描述文件
使用 proto 文件配合对应语言工具,可以自动对应语言的 gRPC 接口代码,省事省心.

syntax = "proto3";

option go_package = ".;boomerCall";

service BoomerCallService {
    rpc InitBommer(InitBommerRequest)returns(BoomerCallResponse){}
    rpc EndBommer(EndBommerRequest)returns(BoomerCallResponse){}

}

message InitBommerRequest {
    message SaveParamAction {
        int32 SaveType = 1; // 0-HTML-XPATH解析;1-JSON-解析; 2-文本正则匹配; 3-保存固定字符串
        string ParamName = 2; // 全局变量名称
        string RuleValue = 3;
    }
    message AssertAction { 
        int32 AssertType = 1; // 0-状态码等于; 1-响应内容字节长度小于;2-响应内容直接长度等于;3-响应内容直接长度大于
        int32 RuleValue = 2;
    }
    message HttpRequest {   
      string UrlPath = 1;
      string Method = 2;
      map<string, string> Headers = 3;
      map<string, string> DictData = 4;
      map<string, string> Params = 5;
      string RawData = 6;
      string JsonData = 7;
      repeated SaveParamAction SaveParamChain = 8; 
      repeated AssertAction AssertChain = 9;
    }
    message TestTask {
        string TaskName = 1;
        int32 TaskWeight = 2;
        repeated  HttpRequest PreWork = 3; // 准备工作请求,比如获取token
        HttpRequest TestWork = 4; // 主要性能测试任务
    }
    bool isSession = 1;
    repeated  HttpRequest PreTask = 2; // 前置任务
    repeated TestTask MainTask = 3; // 测试任务
    string LocustMaster = 4;
    string HttpProxy = 5;
}

message EndBommerRequest {

}

message BoomerCallResponse {
    bool status = 1;
    string message = 2;
}

// protoc --go_out=plugins=grpc:. *.proto
// protoc --go_out=plugins=grpc:{输出目录}  {proto文件}

// python3 -m grpc_tools.protoc --python_out=. --grpc_python_out=. -I. *.proto
//   --python_out=. : 编译生成处理 protobuf 相关的代码的路径, 这里生成到当前目录
//   --grpc_python_out=. : 编译生成处理 grpc 相关的代码的路径, 这里生成到当前目录
//   -I. *.proto : proto 文件的路径, 这里的 proto 文件在当前目录

改造第三步:master 端 python 端 etcd 客户端的心跳检查

心跳检查处理

python 版的 etcd3 客户端找了一圈,都比较简陋,只好使用 etcd3 这个包。
一开始尝试该包的 watch 方式,用新的线程去更新服务方变化,但是发现包中 watch 方法会堵塞客户端(无法进行其他比如发送请求操作,大坑)
只好放弃,改用采用每隔 5 秒这种心跳检测的方法来处理。

def __checkHeartBeat(self):
        print("......连接etcd服务:%s,并心跳检查......" % (self.etcdAddr))
        def __resetEtcdClient():
            self.etcdClient.close()
            ectdAddrReslv = self.etcdAddr.split(":")
            self.etcdClient=etcd3.client(ectdAddrReslv[0],ectdAddrReslv[1])
        errCount=1
        while not self.stop_flag:
            self.servAddressList.clear()
            try:
                for kv in self.etcdClient.get_prefix_response(self.servAddrPrefix).kvs:
                    self.servAddressList.append(kv.value.decode('utf-8'))
                errCount=1 # 重置错误次数
                gevent.sleep(2)  # 2s 检查一次
            except Exception as e:
                if errCount>60: # 超过5分钟
                    print(".....etcd尝试次数超过上限,退出....." )
                    exit(2)
                gevent.sleep(5)
                print(".....获取etcd的key:异常:%s。正在尝试第%d次....."%(e,errCount))
                __resetEtcdClient()
                errCount+=1

改造第四步:master 端 python 端 web 页面增加压测机管理及事务管理页面

在原 woker 情况表格下增加可用压测机表格,增加了选择压测机进行事务初始化功能,比较简单。
事务管理页面结构嵌套多,页面复杂,原版的 locust 提供的组件根本不够用

页面结构说明:

为了应付 flask 模板的兼容问题,使用 layui+iframe+flask 模板方式解决。
后端的事务管理页面特别麻烦,因为结构复杂,相同字段名多;
另外还有导入功能 -- 要初始化好导入后的页面,采用从最小模块开始设计,灵活组合,一步步满足上层要求

https://github.com/mao303mao/locust-hazard
(go grpc 更新换代太快了,后端自己编译的话问题特别多,目前 woker 端已经改成 go mod 管理包了,可以减少编译错误...)


↙↙↙阅读原文可查看相关链接,并与作者交流