性能常识 Go pprof 性能调优实战,性能提升 3 倍!

花菜 · 2020年12月15日 · 最后由 花菜 回复于 2020年12月23日 · 7458 次阅读
本帖已被设为精华帖!

1.需求背景

服务升级,需要对 kafka 消息持久化服务进行压测,预计每分钟要产生消息 400w 条。
目前使用 Golang 实现了批量发送 kafka 消息的接口,但 100w 条消息就要还是 50s 多,无法满足需求,因此需要对发送 kafka 接口进行性能调优

2.问题分析

2.1 发送的消息量是否已经达到了网络 io 的瓶颈

经过测试,本地调试时,确实存在这个问题。
在服务器上调试,则可以避免这个问题

2.2 发送 kafka 接口的实现存在性能问题

  • 组装 kafka 消息的逻辑(BenchMark 测试也没啥问题)
// 使用jsonpath替换某个字段
// source {"name":{"first":"Janet","last":"Prichard"}}
// jsonKeyValue {"name.first": {"name.first", "_", "name.last"}}
// want {"name":{"first":"Janet_Prichard","last":"Prichard"}}

func replaceJsonMsg(source string, jsonKeyValue map[string][]string) string {
    for keyPath, valueList := range jsonKeyValue {
        var builder strings.Builder
        for _, valuePath := range valueList {
            value := gjson.Get(source, valuePath)
            if value.Exists() {
                builder.WriteString(value.Str)
            } else {
                builder.WriteString(valuePath)
            }
        }
        source, _ = sjson.Set(source, keyPath, builder.String())
    }
    return source
}
rikasai@huacainoMBP handlers % go test -bench=BenchmarkReplaceJsonMsg -benchtime=5s -cpuprofile jsonCpu.out
goos: darwin
goarch: amd64
pkg: BigDataTestTool/handlers
BenchmarkReplaceJsonMsg-12       7363956               814 ns/op
PASS

启动 web ui 查看性能分析图
rikasai@huacainoMBP handlers % go tool pprof -http=:8081 jsonCpu.out
replaceJsonMsg cpu profile

  • 发送 kafka 的第三方库有问题(经过测试,直接发消息,不经过任何处理时,第三方库能满足需求)

2.3kafka 服务端配置存在问题

  • 经研发大佬确认过,配置正常

3.调试过程

上述的几种猜测都没发现什么大问题
我想replaceJsonMsg这个函数用的是string替换,改成map会不会更快
于是就有了replaceMapMsg

func replaceMapMsg(msg *map[string]interface{}, jsonKeyValue *map[string][]string) *[]byte {
    for keyPath, valueList := range *jsonKeyValue {
        var builder strings.Builder
        for _, valuePath := range valueList {
            property, err := GetProperty(*msg, valuePath)
            if err == nil {
                s := fmt.Sprintf("%v", property)
                builder.WriteString(s)
            } else {
                s := fmt.Sprintf("%v", valuePath)
                builder.WriteString(s)
            }
        }
        UpdateProperty(*msg, keyPath, builder.String())
    }
    b, _ := json.Marshal(msg)
    return &b
}
func BenchmarkReplaceMapMsg(b *testing.B) {
    msg := map[string]interface{}{"name": map[string]interface{}{"first": "Janet", "last": "Prichard"}}
    j := map[string][]string{"name.first": {"name.first", "_", "name.last"}}
    for i := 0; i < b.N; i++ {
        ReplaceMapMsg(&msg, &j)
    }
}

结果大吃一惊!
反向优化了!!!

rikasai@huacainoMBP handlers % go test -bench=BenchmarkReplaceMapMsg -benchtime=5s -cpuprofile mapCpu.out 
goos: darwin
goarch: amd64
pkg: BigDataTestTool/handlers
BenchmarkReplaceMapMsg-12          41468            419347 ns/op
PASS
ok      BigDataTestTool/handlers        19.071s

rikasai@huacainoMBP handlers % go tool pprof -http=:8080 mapCpu.out
Serving web UI on http://localhost:8080

赶紧看一波pprof结果
replaceMapMsg cpu profile
一语惊醒梦中人~
json.Marshal原来是这个家伙占了大头
通过 map 替换是很快,但终究还是要序列化成[]byte类型才能发送 kafka 消息
那用replaceJsonMsg是事先序列化好了,怎么还会那么慢呢?

for begin <= end {
    deviceNo := fmt.Sprintf("%v%v", b.DevicePrefix, begin)
    for _, m := range b.MsgPayload {
        var msg string
        if b.MsgType == "json" {
            m["deviceNo"] = deviceNo
            marshal, _ := json.Marshal(m)
            s := string(marshal)
            msg = replaceJsonMsg(s, b.JsonKeyValue)
        } 
        p.Produce(&kafka.Message{
            TopicPartition: kafka.TopicPartition{Topic: &t, Partition: kafka.PartitionAny},
            Value:          []byte(msg),
        }, nil)
    }
    if (end-begin)*int64(len(b.MsgPayload))%100000 == 0 {
        p.Flush(5 * 1000)
    }
    begin++
}

好家伙,每个循环都执行一次json.Marshal(m), 发 100w 条消息就会执行 100w 次,简直要命啊!

4.优化结果

// 先序列化,下面用到的时候,只需要迭代strMsg这个切片取出即可
    strMsg := make([]string, 0, len(b.MsgPayload))  // 定义切片要小心,要先指定好容量,否则append会触发自动扩容
    if b.MsgType == "json" {
        for _, m := range b.MsgPayload {
            marshal, _ := json.Marshal(m)
            s := string(marshal)
            strMsg = append(strMsg, s)
        }
    }

    for begin <= end {
                deviceNo := fmt.Sprintf("%v%v", b.DevicePrefix, begin)
        var msg string
        if b.MsgType == "json" {
            for _, s := range strMsg {
                msg = replaceJsonMsg(s, deviceNo, b.JsonKeyValue)
                p.Produce(&kafka.Message{
                    TopicPartition: kafka.TopicPartition{Topic: &t, Partition: kafka.PartitionAny},
                    Value:          []byte(msg),
                }, nil)
            }
        }

优化前发送 100w 条消息,耗时将近 56s

{
    "success_count": 1000001,
    "fail_count": 0,
    "parse_fail": [],
    "elapsed": "55.874803s",
    "msg": "success"
}

优化后发送 100w 条消息,耗时将近 18s,提升了三倍的性能!
距离每分钟 400w 还差一点点,还得继续加油~

{
    "success_count": 1000001,
    "fail_count": 0,
    "parse_fail": [],
    "elapsed": "17.823006273s",
    "msg": "success"
}
共收到 3 条回复 时间 点赞

多开几个线程,速度还是很猛的

恒温 将本帖设为了精华贴 12月22日 07:56

可以多个机器同时发。我们压测 Kafka 的只关注消息体大小,不用关注消息内容,所以用 Kafka 自带的压测工具,单机压不上去就多个机器机器同时压,能到十几万的 QPS。

rihkddd 回复

我们这个需要关注消息的内容,所以才会有上面那些处理消息的函数,自带的压测工具可能就满足不了。
昨天也试了多线程,确实能够显著提高并发。现在这个能满足需求,并且继承到了测试平台,研发和测试随时都能调用。

花菜 坎坷的 2020 年 中提及了此贴 01月01日 17:12
需要 登录 後方可回應,如果你還沒有帳號按這裡 注册