在我进行 Java 编程实践当中,特别是高性能编程时,线程池是无法逾越的高山。在最近攀登高山的路途上,我又双叒叕掌握了一些优雅地使用线程池的技巧。
通常我会将异步任务丢给线程池去处理,不怎么会额外处理异步任务执行中报错。一般来说,任务执行报错,会终止当前的线程,这样线程池会创建新的线程执行下一个任务,当然是在需要创建线程和可以创建新线程的前提下。
在我最近一次实践当中,发现一个定长 20 的线程池,已经创建过上万个线程,这让我大呼不可能。仔细一想,最终也在日志当中确认了大量的异步任务报错。所以不得不让我开始研究如何处理线程池中异步任务的异常了。
以下是我的研究报告,诚邀各位共赏。
就我的水平而言,总计发现 5 种常见的异常处理方式。
try-catch
在提交异步任务之前,通常我们会对异步任务检查异常进行处理,但是对于诸如 java.lang.RuntimeException 的非检查异常不会做更多操作。
当我们提交异步任务的时候,可以增加一个 try-catch 处理的话,就可以完全 hold 住异步任务的可能抛出的异常。
在我的框架设计当中,提交异步任务有且仅一个入口,代码如下:
/
* 异步执行某个代码块
* Java调用需要return,Groovy也不需要,语法兼容
*
* @param f
*/
public static void fun(Closure f) {
fun(f, null, true);
}
/
* 使用自定义同步器{@link FunPhaser}进行多线程同步
*
* @param f 代码块
* @param phaser 同步器
*/
public static void fun(Closure f, FunPhaser phaser) {
fun(f, phaser, true);
}
/
* 使用自定义同步器{@link FunPhaser}进行多线程同步
*
* @param f
* @param phaser
* @param log
*/
public static void fun(Closure f, FunPhaser phaser, boolean log) {
if (phaser != null) phaser.register();
ThreadPoolUtil.executeSync(() -> {
try {
ThreadPoolUtil.executePriority(); //处理高优任务,可忽略
f.call();
} finally {
if (phaser != null) {
phaser.done();
if (log) logger.info("async task {}", phaser.queryTaskNum());
}
}
});
}
所以改造起来比较简单,只需要在最后的方法中,增加 catch 代码块即可。
/
* 使用自定义同步器{@link FunPhaser}进行多线程同步
* @param f
* @param phaser
* @param log
*/
public static void fun(Closure f, FunPhaser phaser, boolean log) {
if (phaser != null) phaser.register();
ThreadPoolUtil.executeSync(() -> {
try {
ThreadPoolUtil.executePriority();
f.call();
} catch (Exception e) {
logger.error("fun error", e);
} finally {
if (phaser != null) {
phaser.done();
if (log) logger.info("async task {}", phaser.queryTaskNum());
}
}
});
}
Callable
在 Java 中,Callable 是一种可以抛出受检异常(Checked Exception)的任务接口。这与 Runnable 的不同之处在于,Callable 能够返回结果,并允许在任务执行过程中抛出异常。异常处理通常在获取任务结果时完成,以下是一些常见的处理方式。
Callable 异常处理的特点:
- 异常机制:
-
Callable方法签名为V call() throws Exception,允许直接抛出Exception或其子类。 - 提交到线程池的
Callable任务,如果抛出异常,会被封装到ExecutionException中。
-
- 获取异常:
- 通过
Future.get()获取结果时,若任务抛出异常,则会引发ExecutionException。 - 开发者需要在调用
get()时捕获和处理ExecutionException。
- 通过
演示代码如下:
import java.util.concurrent.*;
public class CallableExceptionExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newSingleThreadExecutor();
Callable<String> task = () -> {
if (true) { // 模拟任务中出错
throw new Exception("Simulated Exception");
}
return "Task Completed";
};
Future<String> future = executor.submit(task);
try {
// 获取任务结果,可能抛出 ExecutionException
String result = future.get();
System.out.println(result);
} catch (ExecutionException e) {
System.out.println("Caught an ExecutionException: " + e.getCause().getMessage());
} catch (InterruptedException e) {
System.out.println("Task was interrupted");
Thread.currentThread().interrupt(); // 恢复中断状态
}
}
}
控制台输出:
Caught an ExecutionException: Simulated Exception
当 Callable 抛出异常时,线程池会捕获这个异常,并将其封装在 ExecutionException 中。如果任务在执行过程中被中断,会抛出 InterruptedException。建议在捕获时恢复线程的中断状态,以避免吞掉中断信号。
afterExecute()
在 Java 中,afterExecute() 是 ThreadPoolExecutor 提供的一个钩子方法,允许开发者在每个任务执行完成后执行一些额外的逻辑。它可以用来捕获线程池任务中抛出的运行时异常和其他异常,从而进行集中处理或记录。
afterExecute() 的作用:
- 触发时机:
- 每当线程池中某个任务完成后,无论是正常完成还是抛出异常,都会调用
afterExecute()。 - 默认实现为空,用户可以重写它以添加自定义行为。
- 每当线程池中某个任务完成后,无论是正常完成还是抛出异常,都会调用
- 异常捕获:
- 如果任务在执行过程中抛出异常(例如
RuntimeException或Error),它会作为参数传递给afterExecute()的Throwable参数。 - 需要手动从
Future中获取异常,或者在异常处理逻辑中记录。
- 如果任务在执行过程中抛出异常(例如
演示案例如下:
import java.util.concurrent.*;
public class AfterExecuteExample {
public static void main(String[] args) {
ThreadPoolExecutor executor = new CustomThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
// 提交一个会抛出异常的任务
executor.submit(() -> {
System.out.println("Task started");
throw new RuntimeException("Task failed with exception");
});
executor.shutdown();
}
static class CustomThreadPoolExecutor extends ThreadPoolExecutor {
public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t); // 调用父类实现以确保正常行为
// 处理任务异常
if (t != null) {
System.err.println("Task threw an exception: " + t.getMessage());
} else if (r instanceof Future<?>) {
try {
((Future<?>) r).get(); // 获取任务执行结果或捕获异常
} catch (CancellationException e) {
System.err.println("Task was cancelled");
} catch (ExecutionException e) {
System.err.println("Task threw an exception: " + e.getCause().getMessage());
}
}
}
}
}
最终控制台输出:
Task started
Task threw an exception: Task failed with exception
如果需要在任务开始和结束时都执行逻辑,可以同时重写 beforeExecute() 和 afterExecute()。重写此方法时,建议注意线程中断信号的恢复,并确保异常记录逻辑不会引发额外的错误。
自定义 ThreadFactory
在 Java 中,如果需要自定义线程的异常处理行为,可以通过 自定义 ThreadFactory 创建线程并设置异常处理策略。线程的异常处理主要依赖于 Thread.UncaughtExceptionHandler 接口,该接口用于处理线程运行时未捕获的异常。
步骤概览:
- 创建自定义
ThreadFactory:- 实现
ThreadFactory接口,定制线程的创建逻辑。 - 在创建线程时,设置自定义的
UncaughtExceptionHandler。
- 实现
- 实现异常处理逻辑:
- 使用
java.lang.Thread#setUncaughtExceptionHandler,定义异常处理行为(如日志记录、发送警报等)。
- 使用
- 将自定义
ThreadFactory应用于线程池:- 在创建线程池时,通过
Executors或ThreadPoolExecutor使用自定义的ThreadFactory。
- 在创建线程池时,通过
import java.util.concurrent.*;
public class CustomThreadFactoryExample {
public static void main(String[] args) {
// 使用自定义线程工厂创建线程池
ExecutorService executor = Executors.newFixedThreadPool(2, new CustomThreadFactory());
// 提交任务
executor.submit(() -> {
System.out.println("Task 1 started");
throw new RuntimeException("Task 1 encountered an error");
});
executor.submit(() -> System.out.println("Task 2 completed"));
executor.shutdown();
}
// 自定义 ThreadFactory 实现
static class CustomThreadFactory implements ThreadFactory {
private int threadId = 0;
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("CustomThread-" + threadId++);
thread.setUncaughtExceptionHandler(new CustomExceptionHandler());
return thread;
}
}
// 自定义异常处理器
static class CustomExceptionHandler implements Thread.UncaughtExceptionHandler {
@Override
public void uncaughtException(Thread t, Throwable e) {
System.err.println("Thread " + t.getName() + " threw an exception: " + e.getMessage());
// 这里可以添加更多处理逻辑,例如日志记录或警报通知
}
}
}
控制台输出:
Task 1 started
Thread CustomThread-0 threw an exception: Task 1 encountered an error
Task 2 completed
使用 Executors.newFixedThreadPool() 并传入自定义的 ThreadFactory,让线程池中的每个线程具备统一的异常处理行为。也可以通过线程标识为每个线程设置了自定义的 UncaughtExceptionHandler。
全局异常处理
在 Java 中,Thread.setDefaultUncaughtExceptionHandler 是一个全局异常处理机制,用于处理所有未被捕获的线程异常。与每个线程单独设置的 Thread.setUncaughtExceptionHandler 不同,setDefaultUncaughtExceptionHandler 提供了一个全局级别的异常处理器,适用于所有线程(除非线程单独设置了自己的处理器)。
Thread.setDefaultUncaughtExceptionHandler 方法作用:
- 用于设置一个全局的默认未捕获异常处理器。
- 如果某个线程未显式设置自己的
UncaughtExceptionHandler,则会使用这个默认处理器。 - 通常用于记录日志、发送报警等全局异常处理逻辑。
演示代码如下:
public class DefaultExceptionHandlerExample {
public static void main(String[] args) {
// 设置全局默认异常处理器
Thread.setDefaultUncaughtExceptionHandler((thread, throwable) -> {
System.err.println("Unhandled exception in thread: " + thread.getName());
System.err.println("Exception: " + throwable.getMessage());
throwable.printStackTrace();
});
// 创建一个线程,抛出未捕获的异常
Thread thread1 = new Thread(() -> {
throw new RuntimeException("Thread 1 failed!");
});
thread1.start();
// 创建另一个线程,也抛出未捕获的异常
Thread thread2 = new Thread(() -> {
throw new RuntimeException("Thread 2 encountered an error!");
});
thread2.start();
}
}
控制台如下:
Unhandled exception in thread: Thread-0
Exception: Thread 1 failed!
java.lang.RuntimeException: Thread 1 failed!
at ...
Unhandled exception in thread: Thread-1
Exception: Thread 2 encountered an error!
java.lang.RuntimeException: Thread 2 encountered an error!
at ...
全局异常处理只针对主线程和未显式设置 UncaughtExceptionHandler 的其他线程生效。
异常处理的优先级:
- 如果线程显式设置了自己的
UncaughtExceptionHandler(通过thread.setUncaughtExceptionHandler),那么会优先调用该处理器。 - 如果线程未设置单独的处理器,则调用全局默认处理器。
- 如果没有设置全局默认处理器,未捕获的异常将打印到标准错误输出流。
如果主线程抛出异常,Thread.setDefaultUncaughtExceptionHandler 无法捕获它。需要在 main 方法中显式处理。如果使用线程池(如 ExecutorService),未捕获的异常通常会被封装为 ExecutionException,不会触发默认处理器。需要使用 Future.get() 或重写 afterExecute() 来处理线程池的任务异常。
FunTester 原创精华