在之前的 Java 线程池实践当中,我遇到了任务优先级的问题。最终采取的方案是新增一个线程池作为执行高优任务,然后将普通线程池的在执行任务执行,先去判断高优线程池是否有等待任务,如果有就先执行高优线程池等待队列中的任务。
虽然后期给普通异步线程池增加了双向链表,直接采取插队模式执行,但是也让异步任务更加复杂了。所以经过一些 java.util.concurrent
包的重新学习,得到了最优的答案,就是 java.util.concurrent.PriorityBlockingQueue
。
java.util.concurrent.PriorityBlockingQueue
是 Java 并发包中的一个线程安全的优先级阻塞队列。它是基于优先级的元素顺序,具有以下特点:
PriorityBlockingQueue
是线程安全的,可以在多线程环境下安全地进行操作,而不需要额外的同步手段。PriorityBlockingQueue
是一个无界队列,可以无限制地添加元素,因此不会因为队列满而阻塞生产者线程。PriorityBlockingQueue
中的元素按照优先级顺序进行排序,具有较高优先级的元素会被优先出队。默认情况下,元素需要实现 Comparable
接口来定义它们的优先级,也可以在构造函数中提供一个 Comparator
来自定义优先级顺序。PriorityBlockingQueue
中获取元素的操作会阻塞线程,直到队列中有元素可用;当队列满时,向 PriorityBlockingQueue
中添加元素的操作也会阻塞线程,直到队列有足够的空间。PriorityBlockingQueue
不支持添加空元素,即元素不能为 null。PriorityBlockingQueue
可以用于实现基于优先级的任务调度、事件处理等场景,其中优先级高的任务或事件会优先被处理。它提供了一种高效的方式来管理和处理具有不同优先级的元素。
下面是我自己的实现逻辑:
java.lang.Comparable
和 java.lang.Runnable
。int
类型,代表优点等级。具体代码如下:
/**
* 多优先级线程池
*/
static ThreadPoolExecutor levelPool = createPool(POOL_SIZE, POOL_SIZE, ALIVE_TIME, new PriorityBlockingQueue<PriorityTask>(), getFactory("L"), new ThreadPoolExecutor.DiscardPolicy())
/**
* 执行优先级任务
* @param task
* @return
*/
static def executeLevel(PriorityTask task) {
levelPool.execute(task)
}
/**
* 执行优先级任务,设定优先级
* @param priority
* @param closure
* @return
*/
static def executeLevel(int priority, Closure closure) {
levelPool.execute(new PriorityTask(priority) {
@Override
void run() {
closure()
}
})
}
/**
* 执行优先级任务,使用默认优先级
* @param closure
* @return
*/
static def executeLevel(Closure closure) {
levelPool.execute(new PriorityTask(PRIORITY_LEVEL_DEFAULT) {
@Override
void run() {
closure()
}
})
}
/**
* 优先级任务,用于优先级线程池
*/
static abstract class PriorityTask implements Runnable, Comparable<PriorityTask> {
int priority
PriorityTask(int priority) {
this.priority = priority
}
/**
* 比较方法,用于优先级队列,优先级越高,越先执行
* @param o
* @return
*/
@Override
int compareTo(PriorityTask o) {
return this.priority - o.priority
}
}
我们来写一个测试用例,往线程池提交优先级不断提升的任务,打印任务执行时间。
ThreadPoolUtil.getLevelPool().setCorePoolSize(1);
ThreadPoolUtil.getLevelPool().setMaximumPoolSize(1);
for (int i = 0; i < 10; i++) {
ThreadPoolUtil.executeLevel(new ThreadPoolUtil.PriorityTask(10 - i) {
@Override
public void run() {
SourceCode.sleep(0.01);
System.out.println(this.getPriority());
}
});
}
控制台打印:
10
1
2
3
4
5
6
7
8
9
看起来是比较符合预期的。