在我进行 Java
编程实践当中,特别是高性能编程时,线程池是无法逾越的高山。在最近攀登高山的路途上,我又双叒叕掌握了一些优雅地使用线程池的技巧。
通常我会将异步任务丢给线程池去处理,不怎么会额外处理异步任务执行中报错。一般来说,任务执行报错,会终止当前的线程,这样线程池会创建新的线程执行下一个任务,当然是在需要创建线程和可以创建新线程的前提下。
在我最近一次实践当中,发现一个定长 20 的线程池,已经创建过上万个线程,这让我大呼不可能。仔细一想,最终也在日志当中确认了大量的异步任务报错。所以不得不让我开始研究如何处理线程池中异步任务的异常了。
以下是我的研究报告,诚邀各位共赏。
就我的水平而言,总计发现 5 种常见的异常处理方式。
在提交异步任务之前,通常我们会对异步任务检查异常进行处理,但是对于诸如 java.lang.RuntimeException
的非检查异常不会做更多操作。
当我们提交异步任务的时候,可以增加一个 try-catch
处理的话,就可以完全 hold 住异步任务的可能抛出的异常。
在我的框架设计当中,提交异步任务有且仅一个入口,代码如下:
/
* 异步执行某个代码块
* Java调用需要return,Groovy也不需要,语法兼容
*
* @param f
*/
public static void fun(Closure f) {
fun(f, null, true);
}
/
* 使用自定义同步器{@link FunPhaser}进行多线程同步
*
* @param f 代码块
* @param phaser 同步器
*/
public static void fun(Closure f, FunPhaser phaser) {
fun(f, phaser, true);
}
/
* 使用自定义同步器{@link FunPhaser}进行多线程同步
*
* @param f
* @param phaser
* @param log
*/
public static void fun(Closure f, FunPhaser phaser, boolean log) {
if (phaser != null) phaser.register();
ThreadPoolUtil.executeSync(() -> {
try {
ThreadPoolUtil.executePriority(); //处理高优任务,可忽略
f.call();
} finally {
if (phaser != null) {
phaser.done();
if (log) logger.info("async task {}", phaser.queryTaskNum());
}
}
});
}
所以改造起来比较简单,只需要在最后的方法中,增加 catch
代码块即可。
/
* 使用自定义同步器{@link FunPhaser}进行多线程同步
* @param f
* @param phaser
* @param log
*/
public static void fun(Closure f, FunPhaser phaser, boolean log) {
if (phaser != null) phaser.register();
ThreadPoolUtil.executeSync(() -> {
try {
ThreadPoolUtil.executePriority();
f.call();
} catch (Exception e) {
logger.error("fun error", e);
} finally {
if (phaser != null) {
phaser.done();
if (log) logger.info("async task {}", phaser.queryTaskNum());
}
}
});
}
在 Java 中,Callable
是一种可以抛出受检异常(Checked Exception)的任务接口。这与 Runnable
的不同之处在于,Callable
能够返回结果,并允许在任务执行过程中抛出异常。异常处理通常在获取任务结果时完成,以下是一些常见的处理方式。
Callable 异常处理的特点:
Callable
方法签名为 V call() throws Exception
,允许直接抛出 Exception
或其子类。Callable
任务,如果抛出异常,会被封装到 ExecutionException
中。Future.get()
获取结果时,若任务抛出异常,则会引发 ExecutionException
。get()
时捕获和处理 ExecutionException
。演示代码如下:
import java.util.concurrent.*;
public class CallableExceptionExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newSingleThreadExecutor();
Callable<String> task = () -> {
if (true) { // 模拟任务中出错
throw new Exception("Simulated Exception");
}
return "Task Completed";
};
Future<String> future = executor.submit(task);
try {
// 获取任务结果,可能抛出 ExecutionException
String result = future.get();
System.out.println(result);
} catch (ExecutionException e) {
System.out.println("Caught an ExecutionException: " + e.getCause().getMessage());
} catch (InterruptedException e) {
System.out.println("Task was interrupted");
Thread.currentThread().interrupt(); // 恢复中断状态
}
}
}
控制台输出:
Caught an ExecutionException: Simulated Exception
当 Callable
抛出异常时,线程池会捕获这个异常,并将其封装在 ExecutionException
中。如果任务在执行过程中被中断,会抛出 InterruptedException
。建议在捕获时恢复线程的中断状态,以避免吞掉中断信号。
在 Java 中,afterExecute()
是 ThreadPoolExecutor
提供的一个钩子方法,允许开发者在每个任务执行完成后执行一些额外的逻辑。它可以用来捕获线程池任务中抛出的运行时异常和其他异常,从而进行集中处理或记录。
afterExecute()
的作用:
afterExecute()
。RuntimeException
或 Error
),它会作为参数传递给 afterExecute()
的 Throwable
参数。Future
中获取异常,或者在异常处理逻辑中记录。演示案例如下:
import java.util.concurrent.*;
public class AfterExecuteExample {
public static void main(String[] args) {
ThreadPoolExecutor executor = new CustomThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
// 提交一个会抛出异常的任务
executor.submit(() -> {
System.out.println("Task started");
throw new RuntimeException("Task failed with exception");
});
executor.shutdown();
}
static class CustomThreadPoolExecutor extends ThreadPoolExecutor {
public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t); // 调用父类实现以确保正常行为
// 处理任务异常
if (t != null) {
System.err.println("Task threw an exception: " + t.getMessage());
} else if (r instanceof Future<?>) {
try {
((Future<?>) r).get(); // 获取任务执行结果或捕获异常
} catch (CancellationException e) {
System.err.println("Task was cancelled");
} catch (ExecutionException e) {
System.err.println("Task threw an exception: " + e.getCause().getMessage());
}
}
}
}
}
最终控制台输出:
Task started
Task threw an exception: Task failed with exception
如果需要在任务开始和结束时都执行逻辑,可以同时重写 beforeExecute()
和 afterExecute()
。重写此方法时,建议注意线程中断信号的恢复,并确保异常记录逻辑不会引发额外的错误。
在 Java 中,如果需要自定义线程的异常处理行为,可以通过 自定义 ThreadFactory
创建线程并设置异常处理策略。线程的异常处理主要依赖于 Thread.UncaughtExceptionHandler
接口,该接口用于处理线程运行时未捕获的异常。
步骤概览:
ThreadFactory
:
ThreadFactory
接口,定制线程的创建逻辑。UncaughtExceptionHandler
。java.lang.Thread#setUncaughtExceptionHandler
,定义异常处理行为(如日志记录、发送警报等)。ThreadFactory
应用于线程池:
Executors
或 ThreadPoolExecutor
使用自定义的 ThreadFactory
。import java.util.concurrent.*;
public class CustomThreadFactoryExample {
public static void main(String[] args) {
// 使用自定义线程工厂创建线程池
ExecutorService executor = Executors.newFixedThreadPool(2, new CustomThreadFactory());
// 提交任务
executor.submit(() -> {
System.out.println("Task 1 started");
throw new RuntimeException("Task 1 encountered an error");
});
executor.submit(() -> System.out.println("Task 2 completed"));
executor.shutdown();
}
// 自定义 ThreadFactory 实现
static class CustomThreadFactory implements ThreadFactory {
private int threadId = 0;
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("CustomThread-" + threadId++);
thread.setUncaughtExceptionHandler(new CustomExceptionHandler());
return thread;
}
}
// 自定义异常处理器
static class CustomExceptionHandler implements Thread.UncaughtExceptionHandler {
@Override
public void uncaughtException(Thread t, Throwable e) {
System.err.println("Thread " + t.getName() + " threw an exception: " + e.getMessage());
// 这里可以添加更多处理逻辑,例如日志记录或警报通知
}
}
}
控制台输出:
Task 1 started
Thread CustomThread-0 threw an exception: Task 1 encountered an error
Task 2 completed
使用 Executors.newFixedThreadPool()
并传入自定义的 ThreadFactory
,让线程池中的每个线程具备统一的异常处理行为。也可以通过线程标识为每个线程设置了自定义的 UncaughtExceptionHandler
。
在 Java 中,Thread.setDefaultUncaughtExceptionHandler
是一个全局异常处理机制,用于处理所有未被捕获的线程异常。与每个线程单独设置的 Thread.setUncaughtExceptionHandler
不同,setDefaultUncaughtExceptionHandler
提供了一个全局级别的异常处理器,适用于所有线程(除非线程单独设置了自己的处理器)。
Thread.setDefaultUncaughtExceptionHandler
方法作用:
UncaughtExceptionHandler
,则会使用这个默认处理器。演示代码如下:
public class DefaultExceptionHandlerExample {
public static void main(String[] args) {
// 设置全局默认异常处理器
Thread.setDefaultUncaughtExceptionHandler((thread, throwable) -> {
System.err.println("Unhandled exception in thread: " + thread.getName());
System.err.println("Exception: " + throwable.getMessage());
throwable.printStackTrace();
});
// 创建一个线程,抛出未捕获的异常
Thread thread1 = new Thread(() -> {
throw new RuntimeException("Thread 1 failed!");
});
thread1.start();
// 创建另一个线程,也抛出未捕获的异常
Thread thread2 = new Thread(() -> {
throw new RuntimeException("Thread 2 encountered an error!");
});
thread2.start();
}
}
控制台如下:
Unhandled exception in thread: Thread-0
Exception: Thread 1 failed!
java.lang.RuntimeException: Thread 1 failed!
at ...
Unhandled exception in thread: Thread-1
Exception: Thread 2 encountered an error!
java.lang.RuntimeException: Thread 2 encountered an error!
at ...
全局异常处理只针对主线程和未显式设置 UncaughtExceptionHandler
的其他线程生效。
异常处理的优先级:
UncaughtExceptionHandler
(通过 thread.setUncaughtExceptionHandler
),那么会优先调用该处理器。如果主线程抛出异常,Thread.setDefaultUncaughtExceptionHandler
无法捕获它。需要在 main
方法中显式处理。如果使用线程池(如 ExecutorService
),未捕获的异常通常会被封装为 ExecutionException
,不会触发默认处理器。需要使用 Future.get()
或重写 afterExecute()
来处理线程池的任务异常。
FunTester 原创精华