FunTester Go 语言协程池实现

FunTester · 2023年06月29日 · 2689 次阅读

对于性能测试来讲,使用编程语言实现性能测试用例的核心就是并发编程,也就是同时执行多个测试用例,以模拟真实的负载情况。并发编程可以有效地提高测试效率,可以更快地发现系统中的瓶颈和性能问题。在实现并发编程时,需要考虑线程的同步和互斥,以确保测试结果的正确性和可靠性。此外,还需要考虑如何分配和管理资源,以避免资源竞争和浪费。

之前已经使用了 Java 实现,最近在计划使用 Go 语言实现一些新的压测功能的开发,这其中肯定也少不了使用到线程池(Go 中协程池)。虽然 Go 语言协程已经非常强大了,很多情况下,我们可以直接使用 go 关键字直接创建协程去执行任务。但是在任务调度和负载保护的场景中,还是有所欠缺。所以在参考了 Java 线程池实现类java.util.concurrent.ThreadPoolExecutor自己实现了一个包含等待队列、调度以及等待队列任务完成的协程池。

PS:文中若在 Go 语言语境中出现线程,均指协程。

ThreadPoolExecutor 分析

首先我们看看java.util.concurrent.ThreadPoolExecutor的实现中几个比较重要的功能点,然后简单介绍实现逻辑。下面是构造方法:

/**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
    }

这里我省略了具体实现,我们看到参数:

  1. 核心线程数、最大线程数,这两个用来管理线程池的数量。
  2. 最大空闲时间,时间单位,这俩组合起来回收空闲线程。
  3. workQueue,用例暂存任务
  4. 线程工厂和拒绝策略,这俩用处少,忽略。(Go 协程池也没有设计这俩) 下面就要祭出个人原创画作: Java线程池执行流程图

这里我借鉴了 动态修改 coreThread 线程池拓展的思路,不再依靠任务队列是否已满来作为增加线程池线程数的依据。除了依赖等待队列的数量以外,还提供单独的 API(这一点跟java.util.concurrent.ThreadPoolExecutor是一样的)。

协程池属性设计

我从 Java 抄来两个属性:核心数,最大数。其中核心数在协程池自己管理中收到最大值的限制,在使用 API 时不受限制。

同样的,我抄来一个等待队列的概念,使用chan func() taskType实现,taskType用来区分是普通任务还是具有管理效果的任务(目前只有减少协程数管理事件,自增事件通过单独的协程实现)

超时时间,这个必不可少,庆幸的是 Go 在这方面比较灵活,我抄了一个简单 Demo 实现。

我增加了活跃协程数(这个在java.util.concurrent.ThreadPoolExecutor也有,但未显式展示),协程池状态(防止 main 结束导致进程直接结束)。

计数类,收到任务数,执行任务数,用来统计任务执行数量。这个同java.util.concurrent.ThreadPoolExecutor

协程池实现

struct 展示

type GorotinesPool struct {
    Max          int
    Min          int
    tasks        chan func() taskType
    status       bool
    active       int32
    ReceiveTotal int32
    ExecuteTotal int32
    addTimeout   time.Duration
}

事件类型枚举

type taskType int

const (
    normal taskType = 0
    reduce taskType = 1
)

构造方法

这里我选择了直接创建所有核心线程数。

  1. 如果复用java.util.concurrent.ThreadPoolExecutor后创建,会功能变得复杂
  2. Go 语言创建协程资源消耗较低
  3. 测试下来,耗时非常低,简单粗暴但是可靠 ```go // GetPool // @Description: 创建线程池 // @param max 最大协程数 // @param min 最小协程数 // @param maxWaitTask 最大任务等待长度 // @param timeout 添加任务超时时间,单位 s // @return *GorotinesPool // func GetPool(max, min, maxWaitTask, timeout int) *GorotinesPool { p := &GorotinesPool{ Max: max, Min: min, tasks: make(chan func() taskType, maxWaitTask), status: true, active: 0, ReceiveTotal: 0, ExecuteTotal: 0, addTimeout: time.Duration(timeout) * time.Second, } for i := 0; i < min; i++ { atomic.AddInt32(&p.active, 1) go p.worker() } go func() { for { if ! p.status { break } ftool.Sleep(1000) p.balance() } }() return p }

## 管理协程数
主要分成2个:增加和减少,增加比较简单,减少的话,我通过管理事件(`taskType`)实现,如果需要减少线程数,我就往队列里面添加一个`reduce`的事件,然后任意一个协程收到之后就终止。后面会分享`worker`实现。

```go
// AddWorker
//  @Description: 添加worker,协程数加1
//  @receiver pool
//
func (pool *GorotinesPool) AddWorker() {
    atomic.AddInt32(&pool.active, 1)
    go pool.worker()
}

// ReduceWorker
//  @Description: 减少worker,协程数减1
//  @receiver pool
//
func (pool *GorotinesPool) ReduceWorker() {
    atomic.AddInt32(&pool.active, -1)
    pool.tasks <- func() taskType {
        return reduce
    }
}

// balance
//  @Description: 平衡活跃协程数
//  @receiver pool
//
func (pool *GorotinesPool) balance() {
    if pool.status {
        if len(pool.tasks) > 0 && pool.active < int32(pool.Max) {
            pool.AddWorker()
        }
        if len(pool.tasks) == 0 && pool.active > int32(pool.Min) {
            pool.ReduceWorker()
        }
    }
}

worker

// worker
//  @Description: 开始执行协程
//  @receiver pool
//
func (pool *GorotinesPool) worker() {
    defer func() {
        if p := recover(); p != nil {
            log.Printf("execute task fail: %v", p)
        }
    }()
Fun:
    for t := range pool.tasks {
        atomic.AddInt32(&pool.ExecuteTotal, 1)
        switch t() {
        case normal:
            atomic.AddInt32(&pool.active, -1)
        case reduce:
            if pool.active > int32(pool.Min) {
                break Fun
            }
        }
    }
}

保障任务完成

为了防止进程终止而任务没有完成,我增加了线程池的状态state和等待方法(此方法需要显式调用)。

// Wait
//  @Description: 结束等待任务完成
//  @receiver pool
//
func (pool *GorotinesPool) Wait() {
    pool.status = false
Fun:
    for {
        if len(pool.tasks) == 0 || pool.active == 0 {
            break Fun
        }
        ftool.Sleep(1000)
    }
    defer close(pool.tasks)
    log.Printf("recieve: %d,execute: %d", pool.ReceiveTotal, pool.ExecuteTotal)
}

执行任务

有了以上的基础,执行就比较简单了。

// Execute
//  @Description: 执行任务
//  @receiver pool
//  @param t
//  @return error
//
func (pool *GorotinesPool) Execute(t func()) error {
    if pool.status {
        select {
        case pool.tasks <- func() taskType {
            t()
            return normal
        }:
            atomic.AddInt32(&pool.ReceiveTotal, 1)
            return nil
        case <-time.After(pool.addTimeout):
            return errors.New("add tasks timeout")
        }
    } else {
        return errors.New("pools is down")
    }
}

自测

自测用例

func TestPool(t *testing.T) {
    pool := execute.GetPool(1000, 1, 200, 1)
    for i := 0; i < 3; i++ {
        pool.Execute(func() {
            log.Println(i)
            ftool.Sleep(1000)
        })
    }
    ftool.Sleep(3000)
    pool.Wait()
    log.Printf("T : %d", pool.ExecuteTotal)
    log.Printf("R : %d", pool.ReceiveTotal)
    log.Printf("max : %d", pool.Max)
    log.Printf("min : %d", pool.Min)
}

下面是自测结果,从 39s 两个输出可以看出当时实际运行的协程数已经超过1了,协程池自增策略生效了。

2023/06/23 17:21:38 3
2023/06/23 17:21:39 3
2023/06/23 17:21:39 3
2023/06/23 17:21:41 recieve: 3,execute: 3
2023/06/23 17:21:41 T : 3
2023/06/23 17:21:41 R : 3
2023/06/23 17:21:41 max : 1000
2023/06/23 17:21:41 min : 1
--- PASS: TestPool (3.00s)

本次分享结束,协程池自测之后效果很不错,后面会依据这个协程池的设计进行其他性能测试功能开发。

如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
暂无回复。
需要 登录 后方可回复, 如果你还没有账号请点击这里 注册