Java&Go 高性能队列之 channel 性能测试
之前写了两篇 Java 的高性能队列性能测试实践文章,发现了一些比较通用的规律,总体上Disruptor
性能是要领先LinkedBlockingQueue
的。先回顾一下Java&Go 高性能队列之 LinkedBlockingQueue 性能测试,Java&Go 高性能队列之 Disruptor 性能测试。
那么理论上性能更高的 Go 语言中的channel
(下文中的也称为队列)性能如何呢,下面我将对它进行同样的性能测试。
测试场景设计的思路与前两篇文章相同,通过三个场景对变量的修改进行对比压测,包括不限于数量、大小、goroutine 的数量。
结论
总体来说channel
性能还是在性能足够高,完全满足现在压测需求。总结起来几点比较通用的参考:
- Go 语言 channel 性能足够好,首先与生产者生产能力,工作中需要提升生产能力
- 消息体越小越好
- channel 的 size 长度并不重要
- 创建请求对象上
fasthttp.Request
居然还不如net/http.Request
,可能跟没有 release 掉有关。
简介
Go 语言中的通道(channel)是一种特殊的类型。通道像一个传送带或者队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。每一个通道都是一个具体类型的导管,也就是声明 channel 的时候需要为其指定元素类型。如果说 goroutine 是 Go 程序并发的执行体,channel 就是它们之间的连接。channel 是可以让一个 goroutine 发送特定值到另一个 goroutine 的通信机制。
在我查资料的过程中,发现 Go 语言在锁解决(多协程/多 goroutine 安全)的层面有很多很优秀的功能,显示在不同场景下会比channel
性能更高。但是我在阅读 goreplay 源码的过程中,看到的更多还是channel
的实践。等我逐步提高自己 Go 语言多协程编程能力之后再来测试其他实现。
测试结果
这里性能只记录每毫秒处理消息(对象)个数作为评价性能的唯一标准。在我测试Disruptor
框架的过程中,发现这个单一指标有点有失偏颇,后续如果还有下一轮的测试的话,我再优化这个地方。
数据说明
这里我用了三种net/http
中的Request
,创建方法均使用原生 API,为了区分大小的区别,我会响应增加一些 header 和 URL 长度。
小对象:
get, _ := http.NewRequest("GET", base.Empty, nil)
中对象:
get,_ := http.NewRequest("GET",base.Empty, nil)
get.Header.Add("token", token)
get.Header.Add("Connection", base.Connection_Alive)
get.Header.Add("User-Agent", base.UserAgent)
大对象:
get,_ := http.NewRequest("GET",base.Empty, nil)
get.Header.Add("token", token)
get.Header.Add("token1", token)
get.Header.Add("token2", token)
get.Header.Add("token3", token)
get.Header.Add("token4", token)
get.Header.Add("token5", token)
get.Header.Add("Connection", base.Connection_Alive)
get.Header.Add("User-Agent", base.UserAgent)
生产者
对象大小 | 队列长度(百万) | 线程数 | 速率(/ms) |
---|---|---|---|
小 | 1 | 1 | 2173 |
小 | 1 | 5 | 4385 |
小 | 1 | 10 | 4273 |
小 | 5 | 1 | 2048 |
小 | 10 | 1 | 1964 |
中 | 1 | 1 | 831 |
中 | 1 | 5 | 1792 |
中 | 1 | 10 | 2450 |
中 | 1 | 20 | 2481 |
中 | 5 | 1 | 898 |
中 | 10 | 1 | 848 |
中 | 0.5 | 1 | 865 |
中 | 0.5 | 5 | 1760 |
大 | 1 | 1 | 560 |
大 | 1 | 5 | 1633 |
大 | 1 | 10 | 2092 |
大 | 0.5 | 1 | 571 |
大 | 0.5 | 5 | 1677 |
大 | 0.5 | 10 | 1984 |
针对net/http
中的Request
消息体结论如下:
- 长度在 50 万 ~ 1000 万没有明显差异
- 生产者越多越好(20 以内,再多增益效果不明显)
- 消息体尽可能小
消费者
对象大小 | 队列长度(百万) | 线程数 | 速率(/ms) |
---|---|---|---|
小 | 1 | 1 | 2092 |
小 | 1 | 5 | 3322 |
小 | 1 | 10 | 3472 |
小 | 1 | 20 | 3246 |
小 | 2 | 1 | 2030 |
小 | 2 | 5 | 4081 |
小 | 5 | 1 | 2150 |
小 | 5 | 5 | 3980 |
中 | 1 | 1 | 1851 |
中 | 1 | 5 | 3460 |
中 | 1 | 10 | 3289 |
中 | 1 | 20 | 2832 |
中 | 2 | 1 | 1733 |
中 | 2 | 5 | 2652 |
大 | 1 | 1 | 1697 |
大 | 1 | 5 | 2564 |
大 | 1 | 10 | 3436 |
大 | 0.5 | 1 | 2032 |
大 | 0.5 | 5 | 3311 |
大 | 0.5 | 10 | 3597 |
针对net/http
中的Request
消息体结论如下:
- 长度在 50 万 ~ 500 万没有明显差异
- 消费者 10 ~ 20 以内到达峰值
- 消息体尽可能小
消费者并发越多越好,这个在实际工作中消费者消费消息会有耗时,消费者 goroutine 会很多,要根据实际情况设置消费者数量,或者在压测过程中灵活增减消费者数量,这点跟Disruptor
不同。
生产者 & 消费者
这里的线程数指的是生产者或者消费者的数量,总体线程数是此数值的 2 倍。
对象大小 | 次数(百万) | 线程数 | 速率(/ms) |
---|---|---|---|
小 | 1 | 1 | 0.1 |
小 | 1 | 1 | 0.2 |
小 | 1 | 1 | 0.5 |
小 | 1 | 5 | 0.1 |
小 | 1 | 10 | 0.1 |
小 | 2 | 1 | 0.1 |
小 | 2 | 1 | 0.2 |
小 | 2 | 5 | 0.2 |
小 | 5 | 5 | 0.1 |
小 | 5 | 10 | 0.1 |
中 | 1 | 1 | 0.1 |
中 | 1 | 1 | 0.2 |
中 | 1 | 5 | 0.2 |
中 | 1 | 10 | 0.2 |
中 | 2 | 1 | 0.2 |
中 | 2 | 5 | 0.2 |
中 | 2 | 10 | 0.2 |
中 | 2 | 15 | 0.2 |
大 | 1 | 1 | 0.1 |
大 | 1 | 1 | 0.2 |
大 | 1 | 5 | 0.2 |
大 | 1 | 10 | 0.2 |
大 | 2 | 1 | 0.2 |
大 | 2 | 5 | 0.2 |
大 | 2 | 10 | 0.2 |
针对net/http
中的Request
消息体结论如下:
- 消息队列积累消息对性能影响不大
- 消费次数越多,性能反而有点下降,应该是生产者速率不足导致
- 消息体尽可能小,不过性能下降不多
测试用例
总体代码逻辑与 Java 和 Groovy 用例一样,有几处差别如下:
这里我用了sync.WaitGroup
代替了java.util.concurrent.CountDownLatch
,暂时没有找到合适的功能替换java.util.concurrent.CyclicBarrier
,经过测试并不影响测试结果,所以略过此项。
Go
语言的channel
有个先天的优势,就是必需得设置size
,相当于提前分配内存了。这点是我之前没想到的,当我回去复测LinkedBlockingQueue
的时候发现并没有明显的性能差异,对于测试结果影响可忽略。
我还用了atomic.AddInt32
解决计数安全的问题,这里不多分享了,有兴趣可以搜一下官方文档学习使用。
生产者场景
func TestQueue(t *testing.T) {
var index int32 = 0
rs := make(chan *http.Request, total+10000)
var group sync.WaitGroup
group.Add(threadNum)
milli := futil.Milli()
funtester := func() {
go func() {
for {
l := atomic.AddInt32(&index, 1)
if l%piece == 0 {
m := futil.Milli()
log.Println(m - milli)
milli = m
}
if l > total {
break
}
get := getRequest()
rs <- get
}
group.Done()
}()
}
start := futil.Milli()
for i := 0; i < threadNum; i++ {
funtester()
}
group.Wait()
end := futil.Milli()
log.Println(atomic.LoadInt32(&index))
log.Printf("平均每毫秒速率%d", total/(end-start))
}
消费者场景
func TestConsumer(t *testing.T) {
rs := make(chan *http.Request, total+10000)
var group sync.WaitGroup
group.Add(10)
funtester := func() {
go func() {
for {
if len(rs) > total {
break
}
get := getRequest()
rs <- get
}
group.Done()
}()
}
for i := 0; i < 10; i++ {
funtester()
}
group.Wait()
log.Printf("造数据完成! 总数%d", len(rs))
totalActual := int64(len(rs))
var conwait sync.WaitGroup
conwait.Add(threadNum)
consumer := func() {
go func() {
FUN:
for {
select {
case <-rs:
case <-time.After(10 * time.Millisecond):
break FUN
}
}
conwait.Done()
}()
}
start := futil.Milli()
for i := 0; i < threadNum; i++ {
consumer()
}
conwait.Wait()
end := futil.Milli()
log.Printf("平均每毫秒速率%d", totalActual/(end-start))
}
生产者 & 消费者 场景
这里我引入了另外一个变量:初始队列长度 length,用例运行之前将队列按照这个长度进行单线程填充。
func TestConsumer(t *testing.T) {
rs := make(chan *http.Request, total+10000)
var group sync.WaitGroup
group.Add(10)
funtester := func() {
go func() {
for {
if len(rs) > total {
break
}
get := getRequest()
rs <- get
}
group.Done()
}()
}
for i := 0; i < 10; i++ {
funtester()
}
group.Wait()
log.Printf("造数据完成! 总数%d", len(rs))
totalActual := int64(len(rs))
var conwait sync.WaitGroup
conwait.Add(threadNum)
consumer := func() {
go func() {
FUN:
for {
select {
case <-rs:
case <-time.After(10 * time.Millisecond):
break FUN
}
}
conwait.Done()
}()
}
start := futil.Milli()
for i := 0; i < threadNum; i++ {
consumer()
}
conwait.Wait()
end := futil.Milli()
log.Printf("平均每毫秒速率%d", totalActual/(end-start))
}
生产对象
func getRequest() *http.Request {
//get, _ := http.NewRequest("GET", base.Empty, nil)
//get,_ := http.NewRequest("GET",url, nil)
//get.Header.Add("token", token)
//get.Header.Add("Connection", base.Connection_Alive)
//get.Header.Add("User-Agent", base.UserAgent)
get,_ := http.NewRequest("GET",url, nil)
get.Header.Add("token", token)
get.Header.Add("token1", token)
get.Header.Add("token2", token)
get.Header.Add("token3", token)
get.Header.Add("token4", token)
get.Header.Add("token5", token)
get.Header.Add("Connection", base.Connection_Alive)
get.Header.Add("User-Agent", base.UserAgent)
return get
}
基准测试
下面是我使用 FunTester(Go 语言版本)性能测试框架对三种消息对象的生产代码进行的测试结果。没想到net/http
的性能还不如 Java 的,有点奇怪。
测试对象 | 线程数 | 个数(百万) | 速率(/ms) |
---|---|---|---|
小 | 1 | 1 | 3311 |
小 | 5 | 1 | 3725 |
小 | 5 | 5 | 7382 |
中 | 1 | 1 | 1132 |
中 | 5 | 1 | 1205 |
中 | 5 | 5 | 3064 |
大 | 1 | 1 | 732 |
大 | 5 | 1 | 738 |
大 | 5 | 5 | 2061 |
下面是fasthttp.Request
的基准测试结果:
测试对象 | 线程数 | 个数(百万) | 速率(/ms) |
---|---|---|---|
小 | 1 | 1 | 2673 |
小 | 5 | 1 | 2881 |
小 | 5 | 5 | 4983 |
中 | 1 | 1 | 1197 |
中 | 5 | 1 | 1137 |
中 | 5 | 5 | 2784 |
大 | 1 | 1 | 621 |
大 | 5 | 1 | 631 |
大 | 5 | 5 | 1438 |
fasthttp.Request
居然还不如net/http.Request
,有点奇怪。
测试用例如下:
// TestBase
// @Description: 基准测试
// @param t
func TestBase(t *testing.T) {
execute.ExecuteRoutineTimes(func() {
getRequest()
},total,threadNum)
}