FunTester 线程池异常处理的 5 中方式

FunTester · 2024年12月18日 · 43 次阅读

在我进行 Java 编程实践当中,特别是高性能编程时,线程池是无法逾越的高山。在最近攀登高山的路途上,我又双叒叕掌握了一些优雅地使用线程池的技巧。

通常我会将异步任务丢给线程池去处理,不怎么会额外处理异步任务执行中报错。一般来说,任务执行报错,会终止当前的线程,这样线程池会创建新的线程执行下一个任务,当然是在需要创建线程和可以创建新线程的前提下。

在我最近一次实践当中,发现一个定长 20 的线程池,已经创建过上万个线程,这让我大呼不可能。仔细一想,最终也在日志当中确认了大量的异步任务报错。所以不得不让我开始研究如何处理线程池中异步任务的异常了。

以下是我的研究报告,诚邀各位共赏。

就我的水平而言,总计发现 5 种常见的异常处理方式。

try-catch

在提交异步任务之前,通常我们会对异步任务检查异常进行处理,但是对于诸如 java.lang.RuntimeException 的非检查异常不会做更多操作。

当我们提交异步任务的时候,可以增加一个 try-catch 处理的话,就可以完全 hold 住异步任务的可能抛出的异常。

在我的框架设计当中,提交异步任务有且仅一个入口,代码如下:

/  
 * 异步执行某个代码块  
 * Java调用需要return,Groovy也不需要,语法兼容  
 *  
 * @param f  
 */  
public static void fun(Closure f) {  
    fun(f, null, true);  
}

/  
 * 使用自定义同步器{@link FunPhaser}进行多线程同步  
 *  
 * @param f      代码块  
 * @param phaser 同步器  
 */  
public static void fun(Closure f, FunPhaser phaser) {  
    fun(f, phaser, true);  
}  

/  
 * 使用自定义同步器{@link FunPhaser}进行多线程同步  
 *  
 * @param f  
 * @param phaser  
 * @param log  
 */  
public static void fun(Closure f, FunPhaser phaser, boolean log) {  
    if (phaser != null) phaser.register();  
    ThreadPoolUtil.executeSync(() -> {  
        try {  
            ThreadPoolUtil.executePriority();  //处理高优任务,可忽略
            f.call();  
        } finally {  
            if (phaser != null) {  
                phaser.done();  
                if (log) logger.info("async task {}", phaser.queryTaskNum());  
            }  
        }  
    });  
}

所以改造起来比较简单,只需要在最后的方法中,增加 catch 代码块即可。

/  
 * 使用自定义同步器{@link FunPhaser}进行多线程同步  
 * @param f  
 * @param phaser  
 * @param log  
 */  
public static void fun(Closure f, FunPhaser phaser, boolean log) {  
    if (phaser != null) phaser.register();  
    ThreadPoolUtil.executeSync(() -> {  
        try {  
            ThreadPoolUtil.executePriority();  
            f.call();  
        } catch (Exception e) {  
            logger.error("fun error", e);  
        } finally {  
            if (phaser != null) {  
                phaser.done();  
                if (log) logger.info("async task {}", phaser.queryTaskNum());  
            }  
        }  
    });  
}

Callable

在 Java 中,Callable 是一种可以抛出受检异常(Checked Exception)的任务接口。这与 Runnable 的不同之处在于,Callable 能够返回结果,并允许在任务执行过程中抛出异常。异常处理通常在获取任务结果时完成,以下是一些常见的处理方式。

Callable 异常处理的特点:

  1. 异常机制:
    • Callable 方法签名为 V call() throws Exception,允许直接抛出 Exception 或其子类。
    • 提交到线程池的 Callable 任务,如果抛出异常,会被封装到 ExecutionException 中。
  2. 获取异常:
    • 通过 Future.get() 获取结果时,若任务抛出异常,则会引发 ExecutionException
    • 开发者需要在调用 get() 时捕获和处理 ExecutionException

演示代码如下:

import java.util.concurrent.*;

public class CallableExceptionExample {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newSingleThreadExecutor();

        Callable<String> task = () -> {
            if (true) {  // 模拟任务中出错
                throw new Exception("Simulated Exception");
            }
            return "Task Completed";
        };
        Future<String> future = executor.submit(task);
        try {
            // 获取任务结果,可能抛出 ExecutionException
            String result = future.get();
            System.out.println(result);
        } catch (ExecutionException e) {
            System.out.println("Caught an ExecutionException: " + e.getCause().getMessage());
        } catch (InterruptedException e) {
            System.out.println("Task was interrupted");
            Thread.currentThread().interrupt();  // 恢复中断状态
        }
    }
}

控制台输出:

Caught an ExecutionException: Simulated Exception

Callable 抛出异常时,线程池会捕获这个异常,并将其封装在 ExecutionException 中。如果任务在执行过程中被中断,会抛出 InterruptedException。建议在捕获时恢复线程的中断状态,以避免吞掉中断信号。

afterExecute()

在 Java 中,afterExecute()ThreadPoolExecutor 提供的一个钩子方法,允许开发者在每个任务执行完成后执行一些额外的逻辑。它可以用来捕获线程池任务中抛出的运行时异常和其他异常,从而进行集中处理或记录。

afterExecute() 的作用:

  1. 触发时机:
    • 每当线程池中某个任务完成后,无论是正常完成还是抛出异常,都会调用 afterExecute()
    • 默认实现为空,用户可以重写它以添加自定义行为。
  2. 异常捕获:
    • 如果任务在执行过程中抛出异常(例如 RuntimeExceptionError),它会作为参数传递给 afterExecute()Throwable 参数。
    • 需要手动从 Future 中获取异常,或者在异常处理逻辑中记录。

演示案例如下:

import java.util.concurrent.*;

public class AfterExecuteExample {
    public static void main(String[] args) {
        ThreadPoolExecutor executor = new CustomThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());

        // 提交一个会抛出异常的任务
        executor.submit(() -> {
            System.out.println("Task started");
            throw new RuntimeException("Task failed with exception");
        });

        executor.shutdown();
    }

    static class CustomThreadPoolExecutor extends ThreadPoolExecutor {
        public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        }

        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            super.afterExecute(r, t); // 调用父类实现以确保正常行为

            // 处理任务异常
            if (t != null) {
                System.err.println("Task threw an exception: " + t.getMessage());
            } else if (r instanceof Future<?>) {
                try {
                    ((Future<?>) r).get(); // 获取任务执行结果或捕获异常
                } catch (CancellationException e) {
                    System.err.println("Task was cancelled");
                } catch (ExecutionException e) {
                    System.err.println("Task threw an exception: " + e.getCause().getMessage());
                }
            }
        }
    }
}

最终控制台输出:

Task started
Task threw an exception: Task failed with exception

如果需要在任务开始和结束时都执行逻辑,可以同时重写 beforeExecute()afterExecute()。重写此方法时,建议注意线程中断信号的恢复,并确保异常记录逻辑不会引发额外的错误。

自定义 ThreadFactory

在 Java 中,如果需要自定义线程的异常处理行为,可以通过 自定义 ThreadFactory 创建线程并设置异常处理策略。线程的异常处理主要依赖于 Thread.UncaughtExceptionHandler 接口,该接口用于处理线程运行时未捕获的异常。

步骤概览:

  1. 创建自定义 ThreadFactory
    • 实现 ThreadFactory 接口,定制线程的创建逻辑。
    • 在创建线程时,设置自定义的 UncaughtExceptionHandler
  2. 实现异常处理逻辑:
    • 使用 java.lang.Thread#setUncaughtExceptionHandler,定义异常处理行为(如日志记录、发送警报等)。
  3. 将自定义 ThreadFactory 应用于线程池:
    • 在创建线程池时,通过 ExecutorsThreadPoolExecutor 使用自定义的 ThreadFactory
import java.util.concurrent.*;

public class CustomThreadFactoryExample {

    public static void main(String[] args) {
        // 使用自定义线程工厂创建线程池
        ExecutorService executor = Executors.newFixedThreadPool(2, new CustomThreadFactory());

        // 提交任务
        executor.submit(() -> {
            System.out.println("Task 1 started");
            throw new RuntimeException("Task 1 encountered an error");
        });

        executor.submit(() -> System.out.println("Task 2 completed"));

        executor.shutdown();
    }

    // 自定义 ThreadFactory 实现
    static class CustomThreadFactory implements ThreadFactory {
        private int threadId = 0;

        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setName("CustomThread-" + threadId++);
            thread.setUncaughtExceptionHandler(new CustomExceptionHandler());
            return thread;
        }
    }

    // 自定义异常处理器
    static class CustomExceptionHandler implements Thread.UncaughtExceptionHandler {
        @Override
        public void uncaughtException(Thread t, Throwable e) {
            System.err.println("Thread " + t.getName() + " threw an exception: " + e.getMessage());
            // 这里可以添加更多处理逻辑,例如日志记录或警报通知
        }
    }
}

控制台输出:

Task 1 started
Thread CustomThread-0 threw an exception: Task 1 encountered an error
Task 2 completed

使用 Executors.newFixedThreadPool() 并传入自定义的 ThreadFactory,让线程池中的每个线程具备统一的异常处理行为。也可以通过线程标识为每个线程设置了自定义的 UncaughtExceptionHandler

全局异常处理

在 Java 中,Thread.setDefaultUncaughtExceptionHandler 是一个全局异常处理机制,用于处理所有未被捕获的线程异常。与每个线程单独设置的 Thread.setUncaughtExceptionHandler 不同,setDefaultUncaughtExceptionHandler 提供了一个全局级别的异常处理器,适用于所有线程(除非线程单独设置了自己的处理器)。

Thread.setDefaultUncaughtExceptionHandler 方法作用:

  • 用于设置一个全局的默认未捕获异常处理器。
  • 如果某个线程未显式设置自己的 UncaughtExceptionHandler,则会使用这个默认处理器。
  • 通常用于记录日志、发送报警等全局异常处理逻辑。

演示代码如下:

public class DefaultExceptionHandlerExample {

    public static void main(String[] args) {
        // 设置全局默认异常处理器
        Thread.setDefaultUncaughtExceptionHandler((thread, throwable) -> {
            System.err.println("Unhandled exception in thread: " + thread.getName());
            System.err.println("Exception: " + throwable.getMessage());
            throwable.printStackTrace();
        });

        // 创建一个线程,抛出未捕获的异常
        Thread thread1 = new Thread(() -> {
            throw new RuntimeException("Thread 1 failed!");
        });
        thread1.start();

        // 创建另一个线程,也抛出未捕获的异常
        Thread thread2 = new Thread(() -> {
            throw new RuntimeException("Thread 2 encountered an error!");
        });
        thread2.start();
    }
}

控制台如下:

Unhandled exception in thread: Thread-0
Exception: Thread 1 failed!
java.lang.RuntimeException: Thread 1 failed!
    at ...

Unhandled exception in thread: Thread-1
Exception: Thread 2 encountered an error!
java.lang.RuntimeException: Thread 2 encountered an error!
    at ...

全局异常处理只针对主线程和未显式设置 UncaughtExceptionHandler 的其他线程生效。

异常处理的优先级:

  • 如果线程显式设置了自己的 UncaughtExceptionHandler(通过 thread.setUncaughtExceptionHandler),那么会优先调用该处理器。
  • 如果线程未设置单独的处理器,则调用全局默认处理器。
  • 如果没有设置全局默认处理器,未捕获的异常将打印到标准错误输出流。

如果主线程抛出异常,Thread.setDefaultUncaughtExceptionHandler 无法捕获它。需要在 main 方法中显式处理。如果使用线程池(如 ExecutorService),未捕获的异常通常会被封装为 ExecutionException,不会触发默认处理器。需要使用 Future.get() 或重写 afterExecute() 来处理线程池的任务异常。

FunTester 原创精华
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
暂无回复。
需要 登录 后方可回复, 如果你还没有账号请点击这里 注册