FunTester ForkJoinPool 实践

FunTester · 2023年02月21日 · 2030 次阅读

最近在看一本 15 年出版的《Java 并发编程的艺术》一书,其中看到并发编程时间部分的 ForkJoinPool 功能时,突然发现这个功能实际使用上就是把一个大任务分成多个小的子任务,然后使用多个线程完成。

这个场景跟我之前写过的自定义Java 自定义异步功能实践有点异曲同工之妙,只不过这里有有个子任务的概念,多个任务执行结果是具有相关性的。资料指出 ForkJoinPool 比较适合计算密集型的任务。

性能测试中 QPS 取样器和 RT 取样器中,有这样一个使用场景,在用例执行过程中,我想了解一下当前用例执行的 QPS 和 RT 信息,就需要有个触发开关,开始收集这些数据,等某一个终止条件被触发,结束收集,然后计算结果。在用例 QPS 超过 10 万的情况下,单次收集的数据可能会超过 100 万,计算 QPS 和 RT 就非常适合 ForkJoinPool 来完成。

如果一直实时展示或者上报这些信息的话,也可以使用 ForkJoinPool 来完成计算功能。这里还有另外的方案来实现,如果只是得到 QPS 和 RT 数据的话,比 ForkJoinPool 更加合适,这里先不分享了。

ForkJoinPool API 相比较 ExecutorService 还是比较简单的。主要的功能 3 个:创建任务的 ForkJoinPool、创建任务分配规则和收集任务结果。

下面我以一个数组求和的 Demo 演示一下 ForkJoinPool 的功能。

首先我们需要定义一个 ForkJoinPool,通常使用java.util.concurrent.ForkJoinPool#ForkJoinPool(int)或者java.util.concurrent.ForkJoinPool#commonPool这两个方法其中之一,如果你使用 JDK 7 及以前的版本,第二个 API 是不存在的。

翻看源码之后,看起来 ForkJoinPool 构造方法参数还是挺多的,如果都要自定义比较麻烦也是没多大必要的,所以我就选上面提到的第一种 API 来创建 ForkJoinPool。

然后我们要创建一个任务类实现任务分配规则,首先继承java.util.concurrent.RecursiveTask实现java.util.concurrent.RecursiveTask#compute方法。

拆分任务的思路如下:使用两个 int 属性,标记 List 中需要求和片段索引。这样每次分配任务的时候,只需要改变索引值即可。将一个很长的 List 求和分成 N 个小片段求和。

类代码设计如下:

import com.funtester.frame.SourceCode
import groovy.util.logging.Log4j2

import java.util.concurrent.ForkJoinPool
import java.util.concurrent.RecursiveTask

@Log4j2
class ForkJoinT extends RecursiveTask<Integer> {

    static def data = 1..100 as List

    int start
    int end

    ForkJoinT(int start, int end) {
        this.start = start
        this.end = end
    }

    @Override
    protected Integer compute() {
        if (end - start < 5) {
            sum(start, end)
        } else {
            def middle = ((start + end) / 2) as int
            def left = new ForkJoinT(start, middle)
            def right = new ForkJoinT(middle + 1, end)
            left.fork()
            right.fork()
            left.join() + right.join()
        }
    }

    /**
     * 片段求和
     * @param i
     * @param k
     * @return
     */
    static def sum(int i, int k) {
        SourceCode.range(i, k + 1).map(data::get).sum()
    }

}

总体感觉java.util.concurrent.RecursiveTask#compute方法写起来有点像递归,思路明确了以后还是很流畅的。

先来个高斯求和,下面是测试代码:

static void main(String[] args) {
    def pool = new ForkJoinPool(5)
    def t = new ForkJoinT(0, data.size() - 1)
    pool.submit(t)
    log.info("sum: {}", t.get())
}

控制台输出:

22:30:42.725 main sum: 5050

性能方面等我先研究一波 JMH 之后再来。

如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
暂无回复。
需要 登录 后方可回复, 如果你还没有账号请点击这里 注册