在软件开发不断发展的世界中,有效管理并发任务的能力至关重要。传统的线程方法可能变得繁琐且容易出错,特别是在处理大量异步操作时。这时,ExecutorService 登场了:它是 Java 并发框架中一个强大的抽象,旨在简化和优化异步任务执行。

本文我们将深入探讨 ExecutorService 其核心功能,探索各种线程池配置,以解决 Java 应用程序中的现实世界并发挑战。通过阅读、实践本文内容,你将掌握以下能力;

ExecutorService 概览

传统上,线程被用来实现并发。每个线程就像处理器上的一个单独核心,一次专注于一个任务。然而,直接管理线程可能是一个 杂技 行为。这种做法需要担心创建线程、处理它们的生命周期(启动、停止)以及当多个线程访问共享资源(如数据库或文件系统)时可能出现的同步问题。

这就是 ExecutorService 可靠的地方。它作为管理 异步任务 的高级抽象。异步任务本质上是可以独立提交和执行的工作,而不必阻塞主执行线程。ExecutorService 负责线程池管理,让开发者专注于任务的逻辑,而不是线程的复杂性。

使用 ExecutorService 与原始线程管理相比的好处:

基础功能

现在我们已经理解了 ExecutorService 的魔力,让我们看看如何将其付诸实践。以下是如何创建一个ExecutorService 实例并提交任务进行异步执行的方法:

创建 ExecutorService

Java 的 Executors 类提供了各种工厂方法来创建不同类型的 ExecutorService 实例。这里是一个常见的例子:

ExecutorService executorService = Executors.newFixedThreadPool(5);

这段代码创建了一个大小为 5 的固定线程池的 ExecutorService。这意味着 ExecutorService 将管理一个 5 个线程的池来执行您的任务。您可以根据您的需求选择其他配置,如newSingleThreadExecutor(一个线程)或newCachedThreadPool(动态调整线程池大小)。

提交任务

有两种主要方式向 ExecutorService 提交任务:

Future 的力量:管理任务执行

当我们使用submit(Callable<T> task)时,ExecutorService 返回一个Future对象。这个Future对象作为任务最终结果的占位符。它提供了几种管理任务执行的方法:

线程池机制

ExecutorService 的核心是一个强大的概念——线程池。它就像一群等待被工头(ExecutorService)分配任务的工人(线程)。理解线程池对于有效利用 ExecutorService 至关重要。

线程池在行动

工头(ExecutorService)有一群具有特定技能(任务类型)的工人(线程)。当建筑任务(提交)到达时,工头将它们分配给池中的可用工人。这确保了任务的有效执行,而不需要为每个单独的任务创建新的工人。

选择正确的配置

Executors类提供了各种工厂方法,用于创建具有不同线程池配置的 ExecutorService 实例:

平衡性能和资源

线程池的大小和配置显著影响应用程序的性能和资源利用。以下是如何找到平衡的方法:

ExecutorService 高级功能

现在我们已经探讨了 ExecutorService 的核心功能,让我们深入了解一些高级主题,以获得全面、深刻的理解:

优雅关闭

ExecutorService 不应该被突然遗弃。以下是两个关键方法,用于优雅地关闭它:

拒绝策略

当我们向一个已满的线程池的 ExecutorService 提交任务时,如果线程池无法接受新的任务,这些任务会被提交到线程池的阻塞队列中。如果阻塞队列也已满,就会触发一个名为 RejectedExecutionHandler 的机制。默认情况下,ThreadPoolExecutor 使用的拒绝策略是 AbortPolicy,即抛出 RejectedExecutionException 异常,导致任务被拒绝。

为了处理被拒绝的任务,我们可以选择不同的策略:

  1. AbortPolicy(默认策略):
  2. CallerRunsPolicy:
  3. DiscardPolicy:
  4. DiscardOldestPolicy:
  5. 自定义处理策略:

选择适合的拒绝策略,能帮助你更好地控制线程池在高负载情况下的行为,确保系统的稳定性。

多任务协调

ExecutorService 提供了超出简单任务提交的功能:

ExecutorService 实践

ExecutorService 在许多需要异步处理以提高性能和响应性的场景中表现出色。让我们通过一些代码示例来探索一些常见用例:

网络请求

当需要从多个 API 并发地获取数据以提升 Web 应用程序的感知性能时,ExecutorService 可以发挥重要作用。通过它,我们可以高效地管理线程池,提交多个并行任务,从而在最短的时间内获取所有 API 的响应。这种方式不仅提升了数据获取速度,还减少了单个 API 请求的等待时间,从而显著改善用户体验。

以下是一个使用 ExecutorService 并发地从多个 API 获取数据的示例:

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.List;
import java.util.ArrayList;

public class ConcurrentApiRequests {
    public static void main(String[] args) {
        // 创建一个固定大小的线程池
        ExecutorService executor = Executors.newFixedThreadPool(3);

        // 假设我们有多个API的URL列表
        List<String> apiUrls = List.of(
            "https://funtester.com/posts/1",
            "https://funtester.com/posts/2",
            "https://funtester.com/posts/3"
        );

        // 存储每个任务的Future对象
        List<Future<String>> futures = new ArrayList<>();

        for (String url : apiUrls) {
            Callable<String> task = () -> {
                // 模拟发送HTTP请求(使用 HttpClient 或 OkHttp 等工具)
                // 这里为了简化,直接返回URL
                return fetchDataFromApi(url);
            };

            // 提交任务给线程池
            Future<String> future = executor.submit(task);
            futures.add(future);
        }

        // 处理所有的Future对象,获取结果
        for (Future<String> future : futures) {
            try {
                // 获取任务执行的结果
                String result = future.get();
                System.out.println("API Response: " + result);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        // 关闭线程池
        executor.shutdown();
        try {
            // 等待所有任务完成
            if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                executor.shutdownNow(); // 强制关闭
            }
        } catch (InterruptedException e) {
            executor.shutdownNow();
        }
    }

    // 模拟API数据获取
    private static String fetchDataFromApi(String url) throws Exception {
        // 这里可以实际调用HttpClient或其他HTTP库发送请求并获取数据
        // 为了简化,直接返回URL作为结果
        Thread.sleep(1000); // 模拟网络延迟
        return "Data from " + url;
    }
}

关键步骤说明:

  1. 线程池创建: 使用 Executors.newFixedThreadPool(3) 创建一个固定大小为 3 的线程池,允许最多 3 个任务并发执行。
  2. 提交任务: 将每个 API 请求封装为一个 Callable 任务,并提交给 ExecutorService,返回一个 Future 对象。Future 用于异步获取任务的执行结果。
  3. 处理任务结果: 通过遍历 Future 列表,调用 future.get() 获取每个任务的结果。此操作是阻塞的,但因为任务是并发执行的,整体性能大大提升。
  4. 线程池关闭: 调用 executor.shutdown() 关闭线程池,并使用 awaitTermination 等待所有任务完成。如果等待超时,则调用 shutdownNow 强制关闭。

通过 ExecutorService 并发地从多个 API 获取数据,可以显著提升 Web 应用程序的感知性能,让用户感受到更快的响应速度和更流畅的交互体验。在实际应用中,根据 API 的特性和系统资源合理调整线程池配置,是获得最佳性能的关键。

图像处理

在需要对一批上传的图像进行后台处理(如调整图像大小)时,ExecutorService 是一个非常有效的工具。它可以异步处理这些任务,而不会阻塞主线程,从而保持应用程序的响应性。

以下是一个使用 ExecutorService 来异步调整一批图像大小的示例:

import java.awt.Graphics2D;
import java.awt.Image;
import java.awt.image.BufferedImage;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import javax.imageio.ImageIO;

public class ImageResizer {
    public static void main(String[] args) {
        // 创建一个固定大小的线程池
        ExecutorService executor = Executors.newFixedThreadPool(4);

        // 模拟上传的图像文件列表
        List<File> imageFiles = List.of(
            new File("path/to/image1.jpg"),
            new File("path/to/image2.jpg"),
            new File("path/to/image3.jpg")
        );

        // 存储每个任务的Future对象
        List<Future<File>> futures = new ArrayList<>();

        for (File imageFile : imageFiles) {
            Callable<File> task = () -> resizeImage(imageFile, 200, 200); // 将图像调整为200x200的大小

            // 提交任务给线程池
            Future<File> future = executor.submit(task);
            futures.add(future);
        }


    // 图像大小调整方法
    private static File resizeImage(File inputFile, int width, int height) throws IOException {
        // 读取原始图像
        BufferedImage originalImage = ImageIO.read(inputFile);

        // 创建一个缩放后的图像
        Image scaledImage = originalImage.getScaledInstance(width, height, Image.SCALE_SMOOTH);
        BufferedImage resizedImage = new BufferedImage(width, height, BufferedImage.TYPE_INT_RGB);

        // 在新的BufferedImage中绘制缩放后的图像
        Graphics2D g2d = resizedImage.createGraphics();
        g2d.drawImage(scaledImage, 0, 0, null);
        g2d.dispose();

        // 保存调整大小后的图像
        File outputFile = new File("resized_" + inputFile.getName());
        ImageIO.write(resizedImage, "jpg", outputFile);

        return outputFile;
    }
}

代码步骤说明:

  1. 线程池创建: 使用 Executors.newFixedThreadPool(4) 创建一个固定大小为 4 的线程池,允许最多 4 个图像处理任务并发执行。
  2. 提交任务: 将每个图像的大小调整操作封装为一个 Callable<File> 任务,并提交给 ExecutorService。每个任务返回一个 Future<File>,用于异步获取处理结果。
  3. 图像大小调整: 在 resizeImage 方法中,通过 Image.getScaledInstance 方法调整图像大小,并使用 Graphics2D 将缩放后的图像绘制到新的 BufferedImage 上,然后将其保存为新的文件。
  4. 处理任务结果: 通过遍历 Future 列表,调用 future.get() 获取每个任务的结果。此操作阻塞当前线程直到任务完成,但由于任务是并发执行的,整个过程依然很高效。
  5. 线程池关闭: 调用 executor.shutdown() 关闭线程池,确保所有任务完成后再结束应用。

这种方法适用于 Web 应用、桌面应用或服务器后台任务,例如用户上传多张图片时,应用可以迅速响应用户上传操作,而在后台调整图片大小,以便稍后用于展示或存储。

后台任务

在应用程序中,某些任务可能需要在后台执行,例如发送电子邮件、记录数据、处理文件等。这些任务通常需要一定的时间完成,而如果在主线程中执行这些任务,可能会导致应用程序的 UI 变得不响应。为了保持 UI 的流畅性和用户体验,使用 ExecutorService 来异步处理这些后台任务是非常有效的。

下面是一个简单的示例,展示了如何使用 ExecutorService 来异步发送电子邮件和记录数据,而不阻塞主线程:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class BackgroundTaskExample {
    public static void main(String[] args) {
        // 创建一个单线程的ExecutorService,用于处理后台任务
        ExecutorService executor = Executors.newSingleThreadExecutor();

        // 模拟UI操作,例如用户点击按钮后触发的任务
        System.out.println("UI is responsive. Performing background tasks...");

        // 异步发送电子邮件
        executor.submit(() -> sendEmail("user@example.com", "Welcome", "Thank you for signing up!"));

        // 异步记录数据
        executor.submit(() -> logData("User signed up with email user@example.com"));

        // 主线程继续执行其他操作
        System.out.println("UI can continue to interact with the user...");

        // 关闭ExecutorService
        executor.shutdown();
    }

    // 模拟发送电子邮件的方法
    private static void sendEmail(String recipient, String subject, String body) {
        try {
            // 模拟发送电子邮件的延迟
            Thread.sleep(2000);
            System.out.println("Email sent to " + recipient + " with subject: " + subject);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            System.err.println("Email sending was interrupted");
        }
    }

    // 模拟记录数据的方法
    private static void logData(String data) {
        try {
            // 模拟记录数据的延迟
            Thread.sleep(1000);
            System.out.println("Data logged: " + data);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            System.err.println("Data logging was interrupted");
        }
    }
}

代码说明:

  1. 线程池创建: 使用 Executors.newSingleThreadExecutor() 创建一个单线程的 ExecutorService。这种类型的线程池适合处理多个任务但确保任务按顺序执行。
  2. 异步任务提交: 使用 executor.submit() 提交任务。这些任务在后台异步执行,主线程无需等待任务完成即可继续处理其他操作。
  3. 任务实现: 任务的实现可以是任何需要耗时操作的代码,例如发送电子邮件或记录数据。为了模拟这些操作的延迟,使用了 Thread.sleep()
  4. 主线程的执行: 在任务执行的同时,主线程依然保持对 UI 的控制权,可以继续响应用户操作。通过这种方式,应用程序的响应性得以保持。
  5. 线程池关闭: 在所有任务提交完毕后,调用 executor.shutdown() 关闭 ExecutorService。这种关闭方式允许已经提交的任务继续执行,而不会接受新任务。

实际应用场景:

通过 ExecutorService,我们可以有效地将这些后台任务与主线程分离,实现应用程序的高效运行和良好的用户体验。

经验分享

最佳实践

选择正确的 ExecutorService 配置对优化性能和资源至关重要。以下是一些最佳实践:

避免的陷阱

监控和管理

FunTester 原创精华


↙↙↙阅读原文可查看相关链接,并与作者交流