啥也不说,先看看效果.
施压两台机:8 核 CPU,32G 内存.限制每台机最高并发量为 1w
就放一组最大值的,2w 并发,10 分钟.
实际上内存消耗很小,CPU 也只是使用了 200% 左右.理论上 8 核 CPU 可以使用到 800%.
接入第三方mqtt
服务,目前公司设备超过 10w 台,并发预计4000rps
Web
界面Locust
不支持mqtt
协议,需要重写HTTPLocust
这个类.Python
受限于GLI
,并发不给力.需要起多个slave
boomer
是golang
编写的,性能强劲,可搭配locust
实现Web
界面mqtt
现成案例参考 (我本身对于 go 也不算熟悉) 一开始测试选了 Jmeter,因为简单方便.但发现调试不是很方便,还是上面的,可能不熟悉.另外,50 个并发左右,我的 MBP(19 款 16 寸,6 核),就开始咆哮了! 时间关系,我没深究原因.
后来选择了 Locust + Boomer.踩了不少坑,但最后总算完成了任务.
// main.go
// 代码仅供参考,无法直接运行.
package main
import (
"bytes"
"encoding/csv"
"fmt"
MQTT "github.com/eclipse/paho.mqtt.golang"
"github.com/myzhan/boomer"
"io"
"io/ioutil"
"log"
"os"
"strconv"
"strings"
"sync"
"time"
)
var rows [][]string // 读取csv文件保存到这里
var clientTopic []map[string]MQTT.Client
var conn = 0 // 调试用
var failCount = 0 // 初始化失败数量
var i = 0 // 控制并发
var j = 1 // 记录消息发送成功
var f = 1 // 记录消息发送失败
var nowStr = strconv.Itoa(int(time.Now().Unix())) // 当前时间戳,用来做后续查询的消息的标识符
func newConn(c MQTT.Client, clientId string, group *sync.WaitGroup) {
defer func() {
group.Add(-1)
err := recover()
if err != nil {
failCount++
fmt.Println("login fail clientId: ", clientId)
}
}()
token := c.Connect()
if token.Wait() && token.Error() != nil {
panic(token.Error())
}
// 组装topic
topic := fmt.Sprintf("msg/%s/supply", clientId)
temp := make(map[string]MQTT.Client)
temp[topic] = c
clientTopic = append(clientTopic, temp)
conn++ // 调试用
}
func initClients() {
var wg sync.WaitGroup
server := "server_ip:1883"
for i := 0; i < len(rows); i++ {
wg.Add(1)
clientId, userName, passWord := rows[i][0], rows[i][1], rows[i][2]
opts := MQTT.NewClientOptions().AddBroker(server)
opts.SetUsername(userName)
opts.SetPassword(passWord)
opts.SetClientID(clientId)
opts.SetKeepAlive(300 * time.Second)
c := MQTT.NewClient(opts)
go newConn(c, clientId, &wg)
}
wg.Wait() // 等到所有协程执行完成
fmt.Printf("init finish, clients len is %d \n", len(clientTopic))
fmt.Printf("conn: %d \n", conn)
fmt.Printf("failCount: %d \n", failCount)
}
func initCsvData() {
pwd, _ := os.Getwd()
b, err := ioutil.ReadFile(pwd + "/clients.csv")
fs := bytes.NewBuffer(b)
if err != nil {
log.Fatalf("can not open the file, err is %+v", err)
}
r := csv.NewReader(fs)
//针对大文件,一行一行的读取文件
for {
row, err := r.Read()
if err != nil && err != io.EOF {
log.Fatalf("can not read, err is %+v", err)
}
if err == io.EOF {
break
}
rows = append(rows, row)
}
}
func login() {
server := "server_ip:port"
clientId, userName, passWord := rows[i][0], rows[i][1], rows[i][2]
start := time.Now()
opts := MQTT.NewClientOptions().AddBroker(server)
opts.SetUsername(userName)
opts.SetPassword(passWord)
opts.SetClientID(clientId)
c := MQTT.NewClient(opts)
token := c.Connect()
elapsed := time.Since(start)
if token.Error() == nil {
log.Println("success" + strconv.Itoa(j))
boomer.RecordSuccess("tcp", "login", elapsed.Nanoseconds()/int64(time.Millisecond), int64(10))
} else {
log.Println(token.Error())
boomer.RecordFailure("tcp", "login", elapsed.Nanoseconds()/int64(time.Millisecond), clientId)
}
c.Disconnect(5)
// avoid out of array
if i < len(clientTopic)-1 {
i++
} else {
i = 0
}
j++
}
func sendMsg() {
start := time.Now()
msgId := "msg" + strconv.Itoa(i)
var clientId string
var topic string
var c MQTT.Client
for k, v := range clientTopic[i] {
clientId = k[6:19]
topic = k
c = v // v就是一个connected的client
}
deviceTime := nowStr
str := []string{msgId, clientId, deviceTime}
msgPayload := strings.Join(str, "|")
if c.IsConnected() == true {
token := c.Publish(topic, 1, false, msgPayload)
token.Wait() 等待消息发送完成,虽然会拉低并发,但必须要这么做,确保消息发送成功
elapsed := time.Since(start)
if token.Error() == nil {
fmt.Printf("this topic name is: %s \n", topic)
fmt.Printf("this topic payload is: %s \n", msgPayload)
fmt.Printf("success msg index: %v elapsed: %v \n", j, elapsed)
j++ // 消息发送成功, 记录一条,并且也给locust记录一条,方便后续校对数据量
boomer.RecordSuccess("tcp", "task", elapsed.Nanoseconds()/int64(time.Millisecond), int64(j))
// 避免数组越界
if i < len(clientTopic)-1 {
i++
} else {
i = 0
}
} else {
boomer.RecordFailure("tcp", "task", elapsed.Nanoseconds()/int64(time.Millisecond), msgPayload)
fmt.Printf("发送失败, fail msg index: %v \n", f)
}
} else {
if token := c.Connect(); token.Wait() && token.Error() != nil {
elapsed := time.Since(start)
fmt.Printf("fail msg index: %v \n", f)
f++
boomer.RecordFailure("tcp", "task", elapsed.Nanoseconds()/int64(time.Millisecond), msgPayload)
}
}
}
func main() {
initCsvData()
initClients()
task1 := &boomer.Task{
Name: "myTask",
Weight: 1,
Fn: sendMsg,
}
//task2 := &boomer.Task{
// Name: "login",
// Weight: 1,
// Fn: login,
//}
boomer.Run(task1)
}
接入普罗米修斯