在性能测试的实践当中,异步任务是离不开的。Java 异步编程提高了应用程序的性能和响应性,通过避免线程阻塞提高了资源利用率,并简化了并发编程的复杂性。改善用户体验,避免死锁和线程阻塞等问题。异步编程利用 CompletableFuture、Future 等工具和 API 简化了开发流程,提高了系统的稳定性和可靠性。

缘起

我也参照了 Go 语言的 go 关键字,自定义了 fun 关键字Java 自定义异步功能实践 。但是在使用过程中,遇到了一个略显尴尬的问题,就是如果当一个异步任务中,又增加一个异步任务,且使用集合点设置。那么就会阻塞线程池,导致大量任务阻塞的情况。

比如一个学校,200 个班级,每个班级有一个班主任,要给 30 个学生发作业,之后再报告作业分发已完成。按照之前的思路,我会包装两个异步且设置集合点的任务,伪代码如下:

static void main(String[] args) {
    200.times {
        fun {
            sleep(1.0)// 模拟业务处理
            pushHomework()// 布置作业
        }
    }

}

/**
 * 布置作业
 */
static void pushHomework() {
    FunPhaser phaser = new FunPhaser()// 创建同步屏障
    30.times {
        fun {
            sleep(1.0)// 模拟业务处理
            output("布置作业")
        } , phaser
    }
    phaser.await()// 等待所有作业布置完成
}

最终的结果就是,等于最大线程数的任务会阻塞在 pushHomework() 方法中,而 pushHomework() 方法需要完成的异步任务又全都等待在线程池的等待队列中。

初解

一开始我的思路采取优先级策略。如果区分任务的优先级,让有可能阻塞在等待队列的高优任务优先执行即可。所以我先想使用 java.util.concurrent.PriorityBlockingQueue 当做 java.util.concurrent.BlockingQueue 的实现当做异步线程池的等待队列。

但也无法解决问题,因为依然存在阻塞的问题,只不过概率变小了而已。看来不得不使用单独的异步线程池来实现了。

关于线程池的选择有两种选择:

  1. 选择最大线程数较小的线程池,只是作为辅助功能,防止阻塞。在普通异步任务执行时,优先执行高优任务,利用普通线程池优先执行高优任务。
  2. 选择最小线程数较大的线程池,大概率是缓存线程池。单独用来执行高优任务。同时也可以利用普通的线程池执行高优任务。

关于我的选择,也没有选择。根据实际的情况使用吧。高优任务的多少、需要限制的频率等等因素。我自己的项目用的是第二种,原因是我用到高优任务的机会不多,我可以在脚本中控制高优任务的数量。

方案

首先是线程池的实现代码:

priorityPool = createFixedPool(POOL_SIZE, "P")

创建时机放在了普通线程池中:

    /**
     * 获取异步任务连接池
     * @return
     */
    static ThreadPoolExecutor getFunPool() {
        if (asyncPool == null) {
            synchronized (ThreadPoolUtil.class) {
                if (asyncPool == null) {
                    asyncPool = createPool(POOL_SIZE, POOL_SIZE, ALIVE_TIME, new LinkedBlockingDeque<Runnable>(Constant.MAX_WAIT_TASK), getFactory("F"))
                    daemon()
                }
                priorityPool = createFixedPool(POOL_SIZE, "P")
//                priorityPool = createPool(1, POOL_MAX, ALIVE_TIME, new LinkedBlockingQueue<Runnable>(10), getFactory("P"), new ThreadPoolExecutor.DiscardOldestPolicy())
            }
        }
        return asyncPool
    }

下面是调用执行高优的异步任务的方法:

/**
 * 执行高优异步任务
 * @param runnable
 */
static void executeSyncPriority(Runnable runnable) {
    if (priorityPool == null) getFunPool()
    priorityPool.execute(runnable)
}

还有一个调用方法,用来普通线程池优先执行高优任务:

/**
 * 执行高优任务
 */
static void executePriority() {
    def locked = priorityLock.compareAndSet(false, true)//如果没有锁,则加锁
    if (locked) {//如果加锁成功
        while (priorityPool.getQueue().size() > 0) {
            def poll = priorityPool.getQueue().poll()
            def queue = (LinkedBlockingDeque<Runnable>) getFunPool().getQueue()
            if (poll != null) {
                queue.offerFirst(poll)
            }

        }
        priorityLock.set(false)//解锁
    }
}

这里用到了一个原子类,当做高优之行时候的锁 private static AtomicBoolean priorityLock = new AtomicBoolean(false) ,避免在这块浪费过多性能。这里没有 try-catch-finally ,此处没有使用,确实发生异常概率较小。

我重新修改了任务队列的实现,用的是 java.util.concurrent.LinkedBlockingDeque ,这样我就可以将高优任务直接插入到队列的最前头,可以优先执行高优任务。

对于异步关键字,我也进行了一些改动:

/**
 * 使用自定义同步器{@link FunPhaser}进行多线程同步
 *
 * @param f
 * @param phaser
 * @param log
 */
public static void fun(Closure f, FunPhaser phaser, boolean log) {
    if (phaser != null) phaser.register();
    ThreadPoolUtil.executeSync(() -> {
        try {
            ThreadPoolUtil.executePriority();
            f.call();
        } finally {
            if (phaser != null) {
                phaser.done();
                if (log) logger.info("async task {}", phaser.queryTaskNum());
            }
        }
    });
}

执行高优任务的关键字,我也进行了同样的封装,只不过换了个关键字和线程池:

/**
 * 提交高优任务
 *
 * @param f
 * @param phaser
 * @param log
 */
public static void funny(Closure f, FunPhaser phaser, boolean log) {
    if (phaser != null) phaser.register();
    ThreadPoolUtil.executeSyncPriority(() -> {
        try {
            f.call();
        } finally {
            if (phaser != null) {
                phaser.done();
                if (log) logger.info("priority async task {}", phaser.queryTaskNum());
            }
        }
    });
}

验证

我们修改一下开始的脚本:

static void main(String[] args) {
    setPoolMax(2)
    6.times {
        fun {
            sleep(1.0)// 模拟业务处理
            pushHomework()// 布置作业
        }
    }

}

/**
 * 布置作业
 */
static void pushHomework() {
    FunPhaser phaser = new FunPhaser()// 创建同步屏障
    4.times {
        fun {
            sleep(1.0)// 模拟业务处理
            output("布置作业")
        } , phaser
    }
    phaser.await()// 等待所有作业布置完成
}

执行的话,线程池的 F 线程全都全都是 TIME_WAITING 状态。当把 pushHomework() 方法改成高优关键字 funny 之后问题便可迎刃而解。

控制台输出如下:

22:47:17:160 P-1  布置作业
22:47:17:160 P-1  布置作业
22:47:17:160 P-1  priority async task 3
22:47:17:160 P-1  priority async task 4
22:47:18:178 F-2  布置作业
22:47:18:179 F-2  priority async task 3
22:47:19:183 F-2  布置作业

可以看出,已经开始有了 F 线程执行高优任务了。


↙↙↙阅读原文可查看相关链接,并与作者交流