1.需求背景

服务升级,需要对 kafka 消息持久化服务进行压测,预计每分钟要产生消息 400w 条。
目前使用 Golang 实现了批量发送 kafka 消息的接口,但 100w 条消息就要还是 50s 多,无法满足需求,因此需要对发送 kafka 接口进行性能调优

2.问题分析

2.1 发送的消息量是否已经达到了网络 io 的瓶颈

经过测试,本地调试时,确实存在这个问题。
在服务器上调试,则可以避免这个问题

2.2 发送 kafka 接口的实现存在性能问题

// 使用jsonpath替换某个字段
// source {"name":{"first":"Janet","last":"Prichard"}}
// jsonKeyValue {"name.first": {"name.first", "_", "name.last"}}
// want {"name":{"first":"Janet_Prichard","last":"Prichard"}}

func replaceJsonMsg(source string, jsonKeyValue map[string][]string) string {
    for keyPath, valueList := range jsonKeyValue {
        var builder strings.Builder
        for _, valuePath := range valueList {
            value := gjson.Get(source, valuePath)
            if value.Exists() {
                builder.WriteString(value.Str)
            } else {
                builder.WriteString(valuePath)
            }
        }
        source, _ = sjson.Set(source, keyPath, builder.String())
    }
    return source
}
rikasai@huacainoMBP handlers % go test -bench=BenchmarkReplaceJsonMsg -benchtime=5s -cpuprofile jsonCpu.out
goos: darwin
goarch: amd64
pkg: BigDataTestTool/handlers
BenchmarkReplaceJsonMsg-12       7363956               814 ns/op
PASS

启动 web ui 查看性能分析图
rikasai@huacainoMBP handlers % go tool pprof -http=:8081 jsonCpu.out
replaceJsonMsg cpu profile

2.3kafka 服务端配置存在问题

3.调试过程

上述的几种猜测都没发现什么大问题
我想replaceJsonMsg这个函数用的是string替换,改成map会不会更快
于是就有了replaceMapMsg

func replaceMapMsg(msg *map[string]interface{}, jsonKeyValue *map[string][]string) *[]byte {
    for keyPath, valueList := range *jsonKeyValue {
        var builder strings.Builder
        for _, valuePath := range valueList {
            property, err := GetProperty(*msg, valuePath)
            if err == nil {
                s := fmt.Sprintf("%v", property)
                builder.WriteString(s)
            } else {
                s := fmt.Sprintf("%v", valuePath)
                builder.WriteString(s)
            }
        }
        UpdateProperty(*msg, keyPath, builder.String())
    }
    b, _ := json.Marshal(msg)
    return &b
}
func BenchmarkReplaceMapMsg(b *testing.B) {
    msg := map[string]interface{}{"name": map[string]interface{}{"first": "Janet", "last": "Prichard"}}
    j := map[string][]string{"name.first": {"name.first", "_", "name.last"}}
    for i := 0; i < b.N; i++ {
        ReplaceMapMsg(&msg, &j)
    }
}

结果大吃一惊!
反向优化了!!!

rikasai@huacainoMBP handlers % go test -bench=BenchmarkReplaceMapMsg -benchtime=5s -cpuprofile mapCpu.out 
goos: darwin
goarch: amd64
pkg: BigDataTestTool/handlers
BenchmarkReplaceMapMsg-12          41468            419347 ns/op
PASS
ok      BigDataTestTool/handlers        19.071s

rikasai@huacainoMBP handlers % go tool pprof -http=:8080 mapCpu.out
Serving web UI on http://localhost:8080

赶紧看一波pprof结果
replaceMapMsg cpu profile
一语惊醒梦中人~
json.Marshal原来是这个家伙占了大头
通过 map 替换是很快,但终究还是要序列化成[]byte类型才能发送 kafka 消息
那用replaceJsonMsg是事先序列化好了,怎么还会那么慢呢?

for begin <= end {
    deviceNo := fmt.Sprintf("%v%v", b.DevicePrefix, begin)
    for _, m := range b.MsgPayload {
        var msg string
        if b.MsgType == "json" {
            m["deviceNo"] = deviceNo
            marshal, _ := json.Marshal(m)
            s := string(marshal)
            msg = replaceJsonMsg(s, b.JsonKeyValue)
        } 
        p.Produce(&kafka.Message{
            TopicPartition: kafka.TopicPartition{Topic: &t, Partition: kafka.PartitionAny},
            Value:          []byte(msg),
        }, nil)
    }
    if (end-begin)*int64(len(b.MsgPayload))%100000 == 0 {
        p.Flush(5 * 1000)
    }
    begin++
}

好家伙,每个循环都执行一次json.Marshal(m), 发 100w 条消息就会执行 100w 次,简直要命啊!

4.优化结果

// 先序列化,下面用到的时候,只需要迭代strMsg这个切片取出即可
    strMsg := make([]string, 0, len(b.MsgPayload))  // 定义切片要小心,要先指定好容量,否则append会触发自动扩容
    if b.MsgType == "json" {
        for _, m := range b.MsgPayload {
            marshal, _ := json.Marshal(m)
            s := string(marshal)
            strMsg = append(strMsg, s)
        }
    }

    for begin <= end {
                deviceNo := fmt.Sprintf("%v%v", b.DevicePrefix, begin)
        var msg string
        if b.MsgType == "json" {
            for _, s := range strMsg {
                msg = replaceJsonMsg(s, deviceNo, b.JsonKeyValue)
                p.Produce(&kafka.Message{
                    TopicPartition: kafka.TopicPartition{Topic: &t, Partition: kafka.PartitionAny},
                    Value:          []byte(msg),
                }, nil)
            }
        }

优化前发送 100w 条消息,耗时将近 56s

{
    "success_count": 1000001,
    "fail_count": 0,
    "parse_fail": [],
    "elapsed": "55.874803s",
    "msg": "success"
}

优化后发送 100w 条消息,耗时将近 18s,提升了三倍的性能!
距离每分钟 400w 还差一点点,还得继续加油~

{
    "success_count": 1000001,
    "fail_count": 0,
    "parse_fail": [],
    "elapsed": "17.823006273s",
    "msg": "success"
}


↙↙↙阅读原文可查看相关链接,并与作者交流