在性能测试的实践当中,异步任务是离不开的。Java 异步编程提高了应用程序的性能和响应性,通过避免线程阻塞提高了资源利用率,并简化了并发编程的复杂性。改善用户体验,避免死锁和线程阻塞等问题。异步编程利用 CompletableFuture、Future 等工具和 API 简化了开发流程,提高了系统的稳定性和可靠性。
缘起
我也参照了 Go
语言的 go
关键字,自定义了 fun
关键字Java 自定义异步功能实践 。但是在使用过程中,遇到了一个略显尴尬的问题,就是如果当一个异步任务中,又增加一个异步任务,且使用集合点设置。那么就会阻塞线程池,导致大量任务阻塞的情况。
比如一个学校,200 个班级,每个班级有一个班主任,要给 30 个学生发作业,之后再报告作业分发已完成。按照之前的思路,我会包装两个异步且设置集合点的任务,伪代码如下:
static void main(String[] args) {
200.times {
fun {
sleep(1.0)// 模拟业务处理
pushHomework()// 布置作业
}
}
}
/**
* 布置作业
*/
static void pushHomework() {
FunPhaser phaser = new FunPhaser()// 创建同步屏障
30.times {
fun {
sleep(1.0)// 模拟业务处理
output("布置作业")
} , phaser
}
phaser.await()// 等待所有作业布置完成
}
最终的结果就是,等于最大线程数的任务会阻塞在 pushHomework()
方法中,而 pushHomework()
方法需要完成的异步任务又全都等待在线程池的等待队列中。
初解
一开始我的思路采取优先级策略。如果区分任务的优先级,让有可能阻塞在等待队列的高优任务优先执行即可。所以我先想使用 java.util.concurrent.PriorityBlockingQueue
当做 java.util.concurrent.BlockingQueue
的实现当做异步线程池的等待队列。
但也无法解决问题,因为依然存在阻塞的问题,只不过概率变小了而已。看来不得不使用单独的异步线程池来实现了。
关于线程池的选择有两种选择:
- 选择最大线程数较小的线程池,只是作为辅助功能,防止阻塞。在普通异步任务执行时,优先执行高优任务,利用普通线程池优先执行高优任务。
- 选择最小线程数较大的线程池,大概率是缓存线程池。单独用来执行高优任务。同时也可以利用普通的线程池执行高优任务。
关于我的选择,也没有选择。根据实际的情况使用吧。高优任务的多少、需要限制的频率等等因素。我自己的项目用的是第二种,原因是我用到高优任务的机会不多,我可以在脚本中控制高优任务的数量。
方案
首先是线程池的实现代码:
priorityPool = createFixedPool(POOL_SIZE, "P")
创建时机放在了普通线程池中:
/**
* 获取异步任务连接池
* @return
*/
static ThreadPoolExecutor getFunPool() {
if (asyncPool == null) {
synchronized (ThreadPoolUtil.class) {
if (asyncPool == null) {
asyncPool = createPool(POOL_SIZE, POOL_SIZE, ALIVE_TIME, new LinkedBlockingDeque<Runnable>(Constant.MAX_WAIT_TASK), getFactory("F"))
daemon()
}
priorityPool = createFixedPool(POOL_SIZE, "P")
// priorityPool = createPool(1, POOL_MAX, ALIVE_TIME, new LinkedBlockingQueue<Runnable>(10), getFactory("P"), new ThreadPoolExecutor.DiscardOldestPolicy())
}
}
return asyncPool
}
下面是调用执行高优的异步任务的方法:
/**
* 执行高优异步任务
* @param runnable
*/
static void executeSyncPriority(Runnable runnable) {
if (priorityPool == null) getFunPool()
priorityPool.execute(runnable)
}
还有一个调用方法,用来普通线程池优先执行高优任务:
/**
* 执行高优任务
*/
static void executePriority() {
def locked = priorityLock.compareAndSet(false, true)//如果没有锁,则加锁
if (locked) {//如果加锁成功
while (priorityPool.getQueue().size() > 0) {
def poll = priorityPool.getQueue().poll()
def queue = (LinkedBlockingDeque<Runnable>) getFunPool().getQueue()
if (poll != null) {
queue.offerFirst(poll)
}
}
priorityLock.set(false)//解锁
}
}
这里用到了一个原子类,当做高优之行时候的锁 private static AtomicBoolean priorityLock = new AtomicBoolean(false)
,避免在这块浪费过多性能。这里没有 try-catch-finally
,此处没有使用,确实发生异常概率较小。
我重新修改了任务队列的实现,用的是 java.util.concurrent.LinkedBlockingDeque
,这样我就可以将高优任务直接插入到队列的最前头,可以优先执行高优任务。
对于异步关键字,我也进行了一些改动:
/**
* 使用自定义同步器{@link FunPhaser}进行多线程同步
*
* @param f
* @param phaser
* @param log
*/
public static void fun(Closure f, FunPhaser phaser, boolean log) {
if (phaser != null) phaser.register();
ThreadPoolUtil.executeSync(() -> {
try {
ThreadPoolUtil.executePriority();
f.call();
} finally {
if (phaser != null) {
phaser.done();
if (log) logger.info("async task {}", phaser.queryTaskNum());
}
}
});
}
执行高优任务的关键字,我也进行了同样的封装,只不过换了个关键字和线程池:
/**
* 提交高优任务
*
* @param f
* @param phaser
* @param log
*/
public static void funny(Closure f, FunPhaser phaser, boolean log) {
if (phaser != null) phaser.register();
ThreadPoolUtil.executeSyncPriority(() -> {
try {
f.call();
} finally {
if (phaser != null) {
phaser.done();
if (log) logger.info("priority async task {}", phaser.queryTaskNum());
}
}
});
}
验证
我们修改一下开始的脚本:
static void main(String[] args) {
setPoolMax(2)
6.times {
fun {
sleep(1.0)// 模拟业务处理
pushHomework()// 布置作业
}
}
}
/**
* 布置作业
*/
static void pushHomework() {
FunPhaser phaser = new FunPhaser()// 创建同步屏障
4.times {
fun {
sleep(1.0)// 模拟业务处理
output("布置作业")
} , phaser
}
phaser.await()// 等待所有作业布置完成
}
执行的话,线程池的 F
线程全都全都是 TIME_WAITING
状态。当把 pushHomework()
方法改成高优关键字 funny
之后问题便可迎刃而解。
控制台输出如下:
22:47:17:160 P-1 布置作业
22:47:17:160 P-1 布置作业
22:47:17:160 P-1 priority async task 3
22:47:17:160 P-1 priority async task 4
22:47:18:178 F-2 布置作业
22:47:18:179 F-2 priority async task 3
22:47:19:183 F-2 布置作业
可以看出,已经开始有了 F
线程执行高优任务了。