服务升级,需要对 kafka 消息持久化服务进行压测,预计每分钟要产生消息 400w 条。
目前使用 Golang 实现了批量发送 kafka 消息的接口,但 100w 条消息就要还是 50s 多,无法满足需求,因此需要对发送 kafka 接口进行性能调优
经过测试,本地调试时,确实存在这个问题。
在服务器上调试,则可以避免这个问题
// 使用jsonpath替换某个字段
// source {"name":{"first":"Janet","last":"Prichard"}}
// jsonKeyValue {"name.first": {"name.first", "_", "name.last"}}
// want {"name":{"first":"Janet_Prichard","last":"Prichard"}}
func replaceJsonMsg(source string, jsonKeyValue map[string][]string) string {
for keyPath, valueList := range jsonKeyValue {
var builder strings.Builder
for _, valuePath := range valueList {
value := gjson.Get(source, valuePath)
if value.Exists() {
builder.WriteString(value.Str)
} else {
builder.WriteString(valuePath)
}
}
source, _ = sjson.Set(source, keyPath, builder.String())
}
return source
}
rikasai@huacainoMBP handlers % go test -bench=BenchmarkReplaceJsonMsg -benchtime=5s -cpuprofile jsonCpu.out
goos: darwin
goarch: amd64
pkg: BigDataTestTool/handlers
BenchmarkReplaceJsonMsg-12 7363956 814 ns/op
PASS
启动 web ui 查看性能分析图
rikasai@huacainoMBP handlers % go tool pprof -http=:8081 jsonCpu.out
上述的几种猜测都没发现什么大问题
我想replaceJsonMsg
这个函数用的是string
替换,改成map
会不会更快
于是就有了replaceMapMsg
func replaceMapMsg(msg *map[string]interface{}, jsonKeyValue *map[string][]string) *[]byte {
for keyPath, valueList := range *jsonKeyValue {
var builder strings.Builder
for _, valuePath := range valueList {
property, err := GetProperty(*msg, valuePath)
if err == nil {
s := fmt.Sprintf("%v", property)
builder.WriteString(s)
} else {
s := fmt.Sprintf("%v", valuePath)
builder.WriteString(s)
}
}
UpdateProperty(*msg, keyPath, builder.String())
}
b, _ := json.Marshal(msg)
return &b
}
func BenchmarkReplaceMapMsg(b *testing.B) {
msg := map[string]interface{}{"name": map[string]interface{}{"first": "Janet", "last": "Prichard"}}
j := map[string][]string{"name.first": {"name.first", "_", "name.last"}}
for i := 0; i < b.N; i++ {
ReplaceMapMsg(&msg, &j)
}
}
结果大吃一惊!
反向优化了!!!
rikasai@huacainoMBP handlers % go test -bench=BenchmarkReplaceMapMsg -benchtime=5s -cpuprofile mapCpu.out
goos: darwin
goarch: amd64
pkg: BigDataTestTool/handlers
BenchmarkReplaceMapMsg-12 41468 419347 ns/op
PASS
ok BigDataTestTool/handlers 19.071s
rikasai@huacainoMBP handlers % go tool pprof -http=:8080 mapCpu.out
Serving web UI on http://localhost:8080
赶紧看一波pprof
结果
一语惊醒梦中人~
json.Marshal
原来是这个家伙占了大头
通过 map 替换是很快,但终究还是要序列化成[]byte
类型才能发送 kafka 消息
那用replaceJsonMsg
是事先序列化好了,怎么还会那么慢呢?
for begin <= end {
deviceNo := fmt.Sprintf("%v%v", b.DevicePrefix, begin)
for _, m := range b.MsgPayload {
var msg string
if b.MsgType == "json" {
m["deviceNo"] = deviceNo
marshal, _ := json.Marshal(m)
s := string(marshal)
msg = replaceJsonMsg(s, b.JsonKeyValue)
}
p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &t, Partition: kafka.PartitionAny},
Value: []byte(msg),
}, nil)
}
if (end-begin)*int64(len(b.MsgPayload))%100000 == 0 {
p.Flush(5 * 1000)
}
begin++
}
好家伙,每个循环都执行一次json.Marshal(m)
, 发 100w 条消息就会执行 100w 次,简直要命啊!
// 先序列化,下面用到的时候,只需要迭代strMsg这个切片取出即可
strMsg := make([]string, 0, len(b.MsgPayload)) // 定义切片要小心,要先指定好容量,否则append会触发自动扩容
if b.MsgType == "json" {
for _, m := range b.MsgPayload {
marshal, _ := json.Marshal(m)
s := string(marshal)
strMsg = append(strMsg, s)
}
}
for begin <= end {
deviceNo := fmt.Sprintf("%v%v", b.DevicePrefix, begin)
var msg string
if b.MsgType == "json" {
for _, s := range strMsg {
msg = replaceJsonMsg(s, deviceNo, b.JsonKeyValue)
p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &t, Partition: kafka.PartitionAny},
Value: []byte(msg),
}, nil)
}
}
优化前发送 100w 条消息,耗时将近 56s
{
"success_count": 1000001,
"fail_count": 0,
"parse_fail": [],
"elapsed": "55.874803s",
"msg": "success"
}
优化后发送 100w 条消息,耗时将近 18s,提升了三倍的性能!
距离每分钟 400w 还差一点点,还得继续加油~
{
"success_count": 1000001,
"fail_count": 0,
"parse_fail": [],
"elapsed": "17.823006273s",
"msg": "success"
}