在之前的 Java 线程池实践当中,我遇到了任务优先级的问题。最终采取的方案是新增一个线程池作为执行高优任务,然后将普通线程池的在执行任务执行,先去判断高优线程池是否有等待任务,如果有就先执行高优线程池等待队列中的任务。

虽然后期给普通异步线程池增加了双向链表,直接采取插队模式执行,但是也让异步任务更加复杂了。所以经过一些 java.util.concurrent 包的重新学习,得到了最优的答案,就是 java.util.concurrent.PriorityBlockingQueue

PriorityBlockingQueue 简介

java.util.concurrent.PriorityBlockingQueue 是 Java 并发包中的一个线程安全的优先级阻塞队列。它是基于优先级的元素顺序,具有以下特点:

  1. 线程安全: PriorityBlockingQueue 是线程安全的,可以在多线程环境下安全地进行操作,而不需要额外的同步手段。
  2. 无界队列: PriorityBlockingQueue 是一个无界队列,可以无限制地添加元素,因此不会因为队列满而阻塞生产者线程。
  3. 基于优先级的元素顺序: PriorityBlockingQueue 中的元素按照优先级顺序进行排序,具有较高优先级的元素会被优先出队。默认情况下,元素需要实现 Comparable 接口来定义它们的优先级,也可以在构造函数中提供一个 Comparator 来自定义优先级顺序。
  4. 阻塞操作: 当队列为空时,从 PriorityBlockingQueue 中获取元素的操作会阻塞线程,直到队列中有元素可用;当队列满时,向 PriorityBlockingQueue 中添加元素的操作也会阻塞线程,直到队列有足够的空间。
  5. 不支持空元素: PriorityBlockingQueue 不支持添加空元素,即元素不能为 null。

PriorityBlockingQueue 可以用于实现基于优先级的任务调度、事件处理等场景,其中优先级高的任务或事件会优先被处理。它提供了一种高效的方式来管理和处理具有不同优先级的元素。

多优先级线程池

下面是我自己的实现逻辑:

  1. 首先创建一个功能类,实现 java.lang.Comparablejava.lang.Runnable
  2. 制定优先级规则,通常定义一个属性 int 类型,代表优点等级。
  3. 封装 execute() 方法,用来向线程池提交任务。

具体代码如下:

/**  
 * 多优先级线程池  
 */  
static ThreadPoolExecutor levelPool = createPool(POOL_SIZE, POOL_SIZE, ALIVE_TIME, new PriorityBlockingQueue<PriorityTask>(), getFactory("L"), new ThreadPoolExecutor.DiscardPolicy())  


/**  
 * 执行优先级任务  
 * @param task  
 * @return  
 */  
static def executeLevel(PriorityTask task) {  
    levelPool.execute(task)  
}  

/**  
 * 执行优先级任务,设定优先级  
 * @param priority  
 * @param closure  
 * @return  
 */  
static def executeLevel(int priority, Closure closure) {  
    levelPool.execute(new PriorityTask(priority) {  

        @Override  
        void run() {  
            closure()  
        }  
    })  
}  

/**  
 * 执行优先级任务,使用默认优先级  
 * @param closure  
 * @return  
 */  
static def executeLevel(Closure closure) {  
    levelPool.execute(new PriorityTask(PRIORITY_LEVEL_DEFAULT) {  

        @Override  
        void run() {  
            closure()  
        }  
    })  
}  

/**  
 * 优先级任务,用于优先级线程池  
 */  
static abstract class PriorityTask implements Runnable, Comparable<PriorityTask> {  

    int priority  

    PriorityTask(int priority) {  
        this.priority = priority  
    }  


    /**  
     * 比较方法,用于优先级队列,优先级越高,越先执行  
     * @param o  
     * @return  
     */  
    @Override  
    int compareTo(PriorityTask o) {  
        return this.priority - o.priority  
    }  
}

测试

我们来写一个测试用例,往线程池提交优先级不断提升的任务,打印任务执行时间。

ThreadPoolUtil.getLevelPool().setCorePoolSize(1);  
ThreadPoolUtil.getLevelPool().setMaximumPoolSize(1);  
for (int i = 0; i < 10; i++) {  
    ThreadPoolUtil.executeLevel(new ThreadPoolUtil.PriorityTask(10 - i) {  
        @Override  
        public void run() {  
            SourceCode.sleep(0.01);  
            System.out.println(this.getPriority());  
        }  
    });  
}

控制台打印:

10
1
2
3
4
5
6
7
8
9

看起来是比较符合预期的。


↙↙↙阅读原文可查看相关链接,并与作者交流