性能测试工具 Boomer 实战压测 mqtt,2w 并发轻松实现

花菜 · August 04, 2020 · Last by 我去炒饭 replied at August 04, 2023 · 9037 hits

压测结果

啥也不说,先看看效果.
施压两台机:8 核 CPU,32G 内存.限制每台机最高并发量为 1w
就放一组最大值的,2w 并发,10 分钟.
实际上内存消耗很小,CPU 也只是使用了 200% 左右.理论上 8 核 CPU 可以使用到 800%.


压测背景

接入第三方mqtt服务,目前公司设备超过 10w 台,并发预计4000rps

工具选择

Jmeter

  • 优点: 有现成 Mqtt 插件,开箱即用,支持分布式
  • 缺点: 施压需要消耗很大性能,插件不够灵活 (可能是我不熟悉)

Locust + Python

  • 优点: 很灵活,有现成Web界面
  • 缺点: 原生Locust不支持mqtt协议,需要重写HTTPLocust这个类.Python受限于GLI,并发不给力.需要起多个slave

Locust + Boomer

  • 优点: boomergolang编写的,性能强劲,可搭配locust实现Web界面
  • 缺点: 缺少mqtt现成案例参考 (我本身对于 go 也不算熟悉)

一开始测试选了 Jmeter,因为简单方便.但发现调试不是很方便,还是上面的,可能不熟悉.另外,50 个并发左右,我的 MBP(19 款 16 寸,6 核),就开始咆哮了! 时间关系,我没深究原因.
后来选择了 Locust + Boomer.踩了不少坑,但最后总算完成了任务.

压测分析

  • mqtt 账号和主题是一一绑定,因此需要批量生成大概 20w 个账号

压测场景

账号建立连接

  • 300rps,5 分钟
  • 500rps,5 分钟
  • 1000rps,5 分钟

发送消息

  • 1000rps,5 分钟
  • 2000rps,5 分钟
  • 4000rps,5 分钟

脚本设计

流程图

实现代码

//  main.go
// 代码仅供参考,无法直接运行.
package main

import (
    "bytes"
    "encoding/csv"
    "fmt"
    MQTT "github.com/eclipse/paho.mqtt.golang"
    "github.com/myzhan/boomer"
    "io"
    "io/ioutil"
    "log"
    "os"
    "strconv"
    "strings"
    "sync"
    "time"
)

var rows [][]string // 读取csv文件保存到这里
var clientTopic []map[string]MQTT.Client
var conn = 0 // 调试用
var failCount = 0 // 初始化失败数量
var i = 0 // 控制并发
var j = 1 // 记录消息发送成功
var f = 1 // 记录消息发送失败
var nowStr = strconv.Itoa(int(time.Now().Unix())) // 当前时间戳,用来做后续查询的消息的标识符

func newConn(c MQTT.Client, clientId string, group *sync.WaitGroup) {
    defer func() {
        group.Add(-1)
        err := recover()
        if err != nil {
            failCount++
            fmt.Println("login fail clientId:  ", clientId)
        }
    }()
    token := c.Connect()
    if token.Wait() && token.Error() != nil {
        panic(token.Error())
    }
    // 组装topic
    topic := fmt.Sprintf("msg/%s/supply", clientId)
    temp := make(map[string]MQTT.Client)
    temp[topic] = c
    clientTopic = append(clientTopic, temp)
    conn++ // 调试用
}

func initClients() {
    var wg sync.WaitGroup 
    server := "server_ip:1883"
    for i := 0; i < len(rows); i++ {
        wg.Add(1)
        clientId, userName, passWord := rows[i][0], rows[i][1], rows[i][2]
        opts := MQTT.NewClientOptions().AddBroker(server)
        opts.SetUsername(userName)
        opts.SetPassword(passWord)
        opts.SetClientID(clientId)
        opts.SetKeepAlive(300 * time.Second)
        c := MQTT.NewClient(opts)
        go newConn(c, clientId, &wg)

    }
    wg.Wait() // 等到所有协程执行完成 
    fmt.Printf("init finish, clients len is %d \n", len(clientTopic))
    fmt.Printf("conn: %d \n", conn)
    fmt.Printf("failCount: %d \n", failCount)
}

func initCsvData() {
    pwd, _ := os.Getwd()
    b, err := ioutil.ReadFile(pwd + "/clients.csv")
    fs := bytes.NewBuffer(b)
    if err != nil {
        log.Fatalf("can not open the file, err is %+v", err)
    }

    r := csv.NewReader(fs)
    //针对大文件,一行一行的读取文件
    for {
        row, err := r.Read()
        if err != nil && err != io.EOF {
            log.Fatalf("can not read, err is %+v", err)
        }
        if err == io.EOF {
            break
        }
        rows = append(rows, row)
    }
}



func login() {
    server := "server_ip:port"
    clientId, userName, passWord := rows[i][0], rows[i][1], rows[i][2]
    start := time.Now()
    opts := MQTT.NewClientOptions().AddBroker(server)
    opts.SetUsername(userName)
    opts.SetPassword(passWord)
    opts.SetClientID(clientId)
    c := MQTT.NewClient(opts)
    token := c.Connect()
    elapsed := time.Since(start)
    if token.Error() == nil {
        log.Println("success" + strconv.Itoa(j))
        boomer.RecordSuccess("tcp", "login", elapsed.Nanoseconds()/int64(time.Millisecond), int64(10))
    } else {
        log.Println(token.Error())
        boomer.RecordFailure("tcp", "login", elapsed.Nanoseconds()/int64(time.Millisecond), clientId)
    }
    c.Disconnect(5)
    // avoid out of array
    if i < len(clientTopic)-1 {
        i++
    } else {
        i = 0
    }
    j++
}

func sendMsg() {
    start := time.Now()
    msgId := "msg" + strconv.Itoa(i)
    var clientId string
    var topic string
    var c MQTT.Client
    for k, v := range clientTopic[i] {
        clientId = k[6:19]
        topic = k
        c = v // v就是一个connected的client
    }
    deviceTime := nowStr
    str := []string{msgId, clientId, deviceTime}
    msgPayload := strings.Join(str, "|")

    if c.IsConnected() == true {
        token := c.Publish(topic, 1, false, msgPayload)
        token.Wait() 等待消息发送完成,虽然会拉低并发,但必须要这么做,确保消息发送成功
        elapsed := time.Since(start)
        if token.Error() == nil {
            fmt.Printf("this topic name is: %s \n", topic)
            fmt.Printf("this topic payload is: %s \n", msgPayload)
            fmt.Printf("success msg index: %v elapsed: %v  \n", j, elapsed)
            j++ // 消息发送成功, 记录一条,并且也给locust记录一条,方便后续校对数据量
            boomer.RecordSuccess("tcp", "task", elapsed.Nanoseconds()/int64(time.Millisecond), int64(j))
            // 避免数组越界
            if i < len(clientTopic)-1 {
                i++
            } else {
                i = 0
            }
        } else {
            boomer.RecordFailure("tcp", "task", elapsed.Nanoseconds()/int64(time.Millisecond), msgPayload)
            fmt.Printf("发送失败, fail msg index: %v \n", f)
        }

    } else {
        if token := c.Connect(); token.Wait() && token.Error() != nil {
            elapsed := time.Since(start)
            fmt.Printf("fail msg index: %v \n", f)
            f++
            boomer.RecordFailure("tcp", "task", elapsed.Nanoseconds()/int64(time.Millisecond), msgPayload)
        }
    }
}

func main() {
    initCsvData()
    initClients()

    task1 := &boomer.Task{
        Name:   "myTask",
        Weight: 1,
        Fn:     sendMsg,
    }

    //task2 := &boomer.Task{
    //  Name:   "login",
    //  Weight: 1,
    //  Fn:     login,
    //}
    boomer.Run(task1)
}

后续思考

  • 使用全局变量要非常小心,特别是高并发时.修改一定要加锁
  • 实现新功能时,务必先建立分支,不要直接在 master 上修改
  • 修改完代码,及时提交,并写清楚 commit 内容,最好分批次提交
  • Golang 并发非常强悍,并且使用起来异常简单

存在问题

  • locust master 点击停止时,slave 实际上还在工作,导致 locust 统计的请求数比实际请求偏少.并发量大时,尤为明显. 在这次测试中需要对 mqtt 发送的消息数量做统计,还好自己实现起来也不难.
  • 场景 2 需要先建立 TCP 连接,在并发创建大量连接时,容易出现网络 i/o 超时.创建 3000 个连接,到 2950 个左右,就开始出现网络超时.前面连接成功的 2950 个账号,耗时只需要 10s 多点 (300 并发).后续的 50 个重连大概花了 1 分钟. -- 换了测试服务器测试,出现同样的情况,所以猜测是触碰到了第三方服务的防火墙机制 -- 对接的第三方公司拿了我的脚本去测试,把用户量升到 7000 也只是耗时 20 多秒,这又否定防火墙问题的说法. 最后猜测是本机网络的带宽限制

接入普罗米修斯

共收到 9 条回复 时间 点赞

每每看到 bommer 相关的都好像没人关注,可能大家都不熟悉吧,甚至 locust 用的也少,接触过一些,是个不错的工具,给个赞!

我们这边同学分享过,基于 Jmeter 定制 mqtt 协议,可以压到百万 QPS

花菜 #3 · August 05, 2020 Author
simple 回复

NB!!!

花菜 坎坷的 2020 年 中提及了此贴 01 Jan 17:12

咨询下, 在设计 Boomer 的时候, 关于 TPS 是如何统计的呢? 每个请求响应完成之后计数? 那么动态 TPS 图画的时候, 是当前秒的完成数量, 还是当前秒之前所有的完成数量/当前秒-StartTime 呢?
问题来源是当一些业务的 API 响应时间横跨 1-100s 都正常的情况下, TPS 动态画图, 如果画每个时间段的实时 TPS 图, 该如何画呢?

tps 的时间统计,在代码中有体现,是单个请求维度的。
locust 没有 tps 的概念,是 rps。
另外本次压测最关注的点是消息是否有丢失,和最大并发量。至于响应时间,并不是最重要的。

@ 花菜 楼主,我看你另一个帖子说'封装一个 http 接口来转发 mqtt'
那这个性能测试也是这样转发的吗?会影响性能吗?😂

yyy 回复

不同的实现方式。

花菜 怎么产生 100w 并发 中提及了此贴 01 Nov 10:24
simple 回复

老铁,jmeter 支持百万压测的文章在哪,求资源🙏

伍绍展 回复

同求

需要 Sign In 后方可回复, 如果你还没有账号请点击这里 Sign Up