在我进行 Java 编程实践当中,特别是高性能编程时,线程池是无法逾越的高山。在最近攀登高山的路途上,我又双叒叕掌握了一些优雅地使用线程池的技巧。

通常我会将异步任务丢给线程池去处理,不怎么会额外处理异步任务执行中报错。一般来说,任务执行报错,会终止当前的线程,这样线程池会创建新的线程执行下一个任务,当然是在需要创建线程和可以创建新线程的前提下。

在我最近一次实践当中,发现一个定长 20 的线程池,已经创建过上万个线程,这让我大呼不可能。仔细一想,最终也在日志当中确认了大量的异步任务报错。所以不得不让我开始研究如何处理线程池中异步任务的异常了。

以下是我的研究报告,诚邀各位共赏。

就我的水平而言,总计发现 5 种常见的异常处理方式。

try-catch

在提交异步任务之前,通常我们会对异步任务检查异常进行处理,但是对于诸如 java.lang.RuntimeException 的非检查异常不会做更多操作。

当我们提交异步任务的时候,可以增加一个 try-catch 处理的话,就可以完全 hold 住异步任务的可能抛出的异常。

在我的框架设计当中,提交异步任务有且仅一个入口,代码如下:

/  
 * 异步执行某个代码块  
 * Java调用需要return,Groovy也不需要,语法兼容  
 *  
 * @param f  
 */  
public static void fun(Closure f) {  
    fun(f, null, true);  
}

/  
 * 使用自定义同步器{@link FunPhaser}进行多线程同步  
 *  
 * @param f      代码块  
 * @param phaser 同步器  
 */  
public static void fun(Closure f, FunPhaser phaser) {  
    fun(f, phaser, true);  
}  

/  
 * 使用自定义同步器{@link FunPhaser}进行多线程同步  
 *  
 * @param f  
 * @param phaser  
 * @param log  
 */  
public static void fun(Closure f, FunPhaser phaser, boolean log) {  
    if (phaser != null) phaser.register();  
    ThreadPoolUtil.executeSync(() -> {  
        try {  
            ThreadPoolUtil.executePriority();  //处理高优任务,可忽略
            f.call();  
        } finally {  
            if (phaser != null) {  
                phaser.done();  
                if (log) logger.info("async task {}", phaser.queryTaskNum());  
            }  
        }  
    });  
}

所以改造起来比较简单,只需要在最后的方法中,增加 catch 代码块即可。

/  
 * 使用自定义同步器{@link FunPhaser}进行多线程同步  
 * @param f  
 * @param phaser  
 * @param log  
 */  
public static void fun(Closure f, FunPhaser phaser, boolean log) {  
    if (phaser != null) phaser.register();  
    ThreadPoolUtil.executeSync(() -> {  
        try {  
            ThreadPoolUtil.executePriority();  
            f.call();  
        } catch (Exception e) {  
            logger.error("fun error", e);  
        } finally {  
            if (phaser != null) {  
                phaser.done();  
                if (log) logger.info("async task {}", phaser.queryTaskNum());  
            }  
        }  
    });  
}

Callable

在 Java 中,Callable 是一种可以抛出受检异常(Checked Exception)的任务接口。这与 Runnable 的不同之处在于,Callable 能够返回结果,并允许在任务执行过程中抛出异常。异常处理通常在获取任务结果时完成,以下是一些常见的处理方式。

Callable 异常处理的特点:

  1. 异常机制:
  2. 获取异常:

演示代码如下:

import java.util.concurrent.*;

public class CallableExceptionExample {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newSingleThreadExecutor();

        Callable<String> task = () -> {
            if (true) {  // 模拟任务中出错
                throw new Exception("Simulated Exception");
            }
            return "Task Completed";
        };
        Future<String> future = executor.submit(task);
        try {
            // 获取任务结果,可能抛出 ExecutionException
            String result = future.get();
            System.out.println(result);
        } catch (ExecutionException e) {
            System.out.println("Caught an ExecutionException: " + e.getCause().getMessage());
        } catch (InterruptedException e) {
            System.out.println("Task was interrupted");
            Thread.currentThread().interrupt();  // 恢复中断状态
        }
    }
}

控制台输出:

Caught an ExecutionException: Simulated Exception

Callable 抛出异常时,线程池会捕获这个异常,并将其封装在 ExecutionException 中。如果任务在执行过程中被中断,会抛出 InterruptedException。建议在捕获时恢复线程的中断状态,以避免吞掉中断信号。

afterExecute()

在 Java 中,afterExecute()ThreadPoolExecutor 提供的一个钩子方法,允许开发者在每个任务执行完成后执行一些额外的逻辑。它可以用来捕获线程池任务中抛出的运行时异常和其他异常,从而进行集中处理或记录。

afterExecute() 的作用:

  1. 触发时机:
  2. 异常捕获:

演示案例如下:

import java.util.concurrent.*;

public class AfterExecuteExample {
    public static void main(String[] args) {
        ThreadPoolExecutor executor = new CustomThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());

        // 提交一个会抛出异常的任务
        executor.submit(() -> {
            System.out.println("Task started");
            throw new RuntimeException("Task failed with exception");
        });

        executor.shutdown();
    }

    static class CustomThreadPoolExecutor extends ThreadPoolExecutor {
        public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        }

        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            super.afterExecute(r, t); // 调用父类实现以确保正常行为

            // 处理任务异常
            if (t != null) {
                System.err.println("Task threw an exception: " + t.getMessage());
            } else if (r instanceof Future<?>) {
                try {
                    ((Future<?>) r).get(); // 获取任务执行结果或捕获异常
                } catch (CancellationException e) {
                    System.err.println("Task was cancelled");
                } catch (ExecutionException e) {
                    System.err.println("Task threw an exception: " + e.getCause().getMessage());
                }
            }
        }
    }
}

最终控制台输出:

Task started
Task threw an exception: Task failed with exception

如果需要在任务开始和结束时都执行逻辑,可以同时重写 beforeExecute()afterExecute()。重写此方法时,建议注意线程中断信号的恢复,并确保异常记录逻辑不会引发额外的错误。

自定义 ThreadFactory

在 Java 中,如果需要自定义线程的异常处理行为,可以通过 自定义 ThreadFactory 创建线程并设置异常处理策略。线程的异常处理主要依赖于 Thread.UncaughtExceptionHandler 接口,该接口用于处理线程运行时未捕获的异常。

步骤概览:

  1. 创建自定义 ThreadFactory
  2. 实现异常处理逻辑:
  3. 将自定义 ThreadFactory 应用于线程池:
import java.util.concurrent.*;

public class CustomThreadFactoryExample {

    public static void main(String[] args) {
        // 使用自定义线程工厂创建线程池
        ExecutorService executor = Executors.newFixedThreadPool(2, new CustomThreadFactory());

        // 提交任务
        executor.submit(() -> {
            System.out.println("Task 1 started");
            throw new RuntimeException("Task 1 encountered an error");
        });

        executor.submit(() -> System.out.println("Task 2 completed"));

        executor.shutdown();
    }

    // 自定义 ThreadFactory 实现
    static class CustomThreadFactory implements ThreadFactory {
        private int threadId = 0;

        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setName("CustomThread-" + threadId++);
            thread.setUncaughtExceptionHandler(new CustomExceptionHandler());
            return thread;
        }
    }

    // 自定义异常处理器
    static class CustomExceptionHandler implements Thread.UncaughtExceptionHandler {
        @Override
        public void uncaughtException(Thread t, Throwable e) {
            System.err.println("Thread " + t.getName() + " threw an exception: " + e.getMessage());
            // 这里可以添加更多处理逻辑,例如日志记录或警报通知
        }
    }
}

控制台输出:

Task 1 started
Thread CustomThread-0 threw an exception: Task 1 encountered an error
Task 2 completed

使用 Executors.newFixedThreadPool() 并传入自定义的 ThreadFactory,让线程池中的每个线程具备统一的异常处理行为。也可以通过线程标识为每个线程设置了自定义的 UncaughtExceptionHandler

全局异常处理

在 Java 中,Thread.setDefaultUncaughtExceptionHandler 是一个全局异常处理机制,用于处理所有未被捕获的线程异常。与每个线程单独设置的 Thread.setUncaughtExceptionHandler 不同,setDefaultUncaughtExceptionHandler 提供了一个全局级别的异常处理器,适用于所有线程(除非线程单独设置了自己的处理器)。

Thread.setDefaultUncaughtExceptionHandler 方法作用:

演示代码如下:

public class DefaultExceptionHandlerExample {

    public static void main(String[] args) {
        // 设置全局默认异常处理器
        Thread.setDefaultUncaughtExceptionHandler((thread, throwable) -> {
            System.err.println("Unhandled exception in thread: " + thread.getName());
            System.err.println("Exception: " + throwable.getMessage());
            throwable.printStackTrace();
        });

        // 创建一个线程,抛出未捕获的异常
        Thread thread1 = new Thread(() -> {
            throw new RuntimeException("Thread 1 failed!");
        });
        thread1.start();

        // 创建另一个线程,也抛出未捕获的异常
        Thread thread2 = new Thread(() -> {
            throw new RuntimeException("Thread 2 encountered an error!");
        });
        thread2.start();
    }
}

控制台如下:

Unhandled exception in thread: Thread-0
Exception: Thread 1 failed!
java.lang.RuntimeException: Thread 1 failed!
    at ...

Unhandled exception in thread: Thread-1
Exception: Thread 2 encountered an error!
java.lang.RuntimeException: Thread 2 encountered an error!
    at ...

全局异常处理只针对主线程和未显式设置 UncaughtExceptionHandler 的其他线程生效。

异常处理的优先级:

如果主线程抛出异常,Thread.setDefaultUncaughtExceptionHandler 无法捕获它。需要在 main 方法中显式处理。如果使用线程池(如 ExecutorService),未捕获的异常通常会被封装为 ExecutionException,不会触发默认处理器。需要使用 Future.get() 或重写 afterExecute() 来处理线程池的任务异常。

FunTester 原创精华


↙↙↙阅读原文可查看相关链接,并与作者交流