在软件开发不断发展的世界中,有效管理并发任务的能力至关重要。传统的线程方法可能变得繁琐且容易出错,特别是在处理大量异步操作时。这时,ExecutorService
登场了:它是 Java 并发框架中一个强大的抽象,旨在简化和优化异步任务执行。
本文我们将深入探讨 ExecutorService
其核心功能,探索各种线程池配置,以解决 Java 应用程序中的现实世界并发挑战。通过阅读、实践本文内容,你将掌握以下能力;
传统上,线程被用来实现并发。每个线程就像处理器上的一个单独核心,一次专注于一个任务。然而,直接管理线程可能是一个 杂技
行为。这种做法需要担心创建线程、处理它们的生命周期(启动、停止)以及当多个线程访问共享资源(如数据库或文件系统)时可能出现的同步问题。
这就是 ExecutorService 可靠的地方。它作为管理 异步任务 的高级抽象。异步任务本质上是可以独立提交和执行的工作,而不必阻塞主执行线程。ExecutorService 负责线程池管理,让开发者专注于任务的逻辑,而不是线程的复杂性。
使用 ExecutorService 与原始线程管理相比的好处:
ExecutorService
管理一个线程池。如果一个线程忙于处理图像,另一个可以下载文件,确保程序能够高效地处理不同的工作负载。ExecutorService
提供了处理任务执行期间可能发生的错误的方法。开发者不必编写单独的代码来捕获和处理个别线程抛出的异常。ExecutorService
控制线程池中的线程数量,防止创建太多线程,可能会压垮系统的资源。这就像有一个定义好的工作线程数量,以避免用太多任务超载处理器。现在我们已经理解了 ExecutorService 的魔力,让我们看看如何将其付诸实践。以下是如何创建一个ExecutorService
实例并提交任务进行异步执行的方法:
Java 的 Executors
类提供了各种工厂方法来创建不同类型的 ExecutorService
实例。这里是一个常见的例子:
ExecutorService executorService = Executors.newFixedThreadPool(5);
这段代码创建了一个大小为 5 的固定线程池的 ExecutorService
。这意味着 ExecutorService
将管理一个 5 个线程的池来执行您的任务。您可以根据您的需求选择其他配置,如newSingleThreadExecutor
(一个线程)或newCachedThreadPool
(动态调整线程池大小)。
有两种主要方式向 ExecutorService 提交任务:
submit(Callable<T> task)
: 这个方法接受一个Callable
对象作为输入。Callable
接口扩展了Runnable
,但允许开发者从任务执行中返回结果。当调用submit
时,ExecutorService
安排任务执行并返回一个Future
对象。execute(Runnable task)
: 这个方法接受一个Runnable
对象作为输入。Runnable
接口定义了一个要异步执行的单个方法run()
。与submit
不同,execute
不返回结果。当我们使用submit(Callable<T> task)
时,ExecutorService
返回一个Future
对象。这个Future
对象作为任务最终结果的占位符。它提供了几种管理任务执行的方法:
Callable
中的call()
方法产生的结果。true
;否则返回false
。mayInterruptIfRunning
参数指定是否应该中断当前运行的线程。ExecutorService
的核心是一个强大的概念——线程池。它就像一群等待被工头(ExecutorService)分配任务的工人(线程)。理解线程池对于有效利用 ExecutorService
至关重要。
工头(ExecutorService)有一群具有特定技能(任务类型)的工人(线程)。当建筑任务(提交)到达时,工头将它们分配给池中的可用工人。这确保了任务的有效执行,而不需要为每个单独的任务创建新的工人。
Executors
类提供了各种工厂方法,用于创建具有不同线程池配置的 ExecutorService
实例:
newFixedThreadPool(int nThreads)
: 这个方法创建了一个大小为nThreads
的固定线程池的 ExecutorService。这适用于工作负载可预测的场景。固定池大小确保了一致的并发级别,但如果工作负载超过了可用线程,任务可能会排队等待可用的工人。newSingleThreadExecutor()
: 这个方法创建了一个池中只有一个线程的 ExecutorService。这适用于需要严格顺序执行或彼此依赖的任务。然而,它限制了并发性,并且可能不适用于处理多个独立任务。newCachedThreadPool()
: 这个方法创建了一个动态调整大小的线程池的 ExecutorService。池大小可以根据需要增长以处理传入的任务。然而,这种灵活性带来了潜在的缺点:线程池的大小和配置显著影响应用程序的性能和资源利用。以下是如何找到平衡的方法:
Executors
类不直接控制排队行为。然而,一些底层实现可能使用有界队列(如果队列已满,则拒绝任务)或无界队列(即使队列已满,任务也会继续添加,可能导致 OutOfMemoryError
异常)。现在我们已经探讨了 ExecutorService
的核心功能,让我们深入了解一些高级主题,以获得全面、深刻的理解:
ExecutorService
不应该被突然遗弃。以下是两个关键方法,用于优雅地关闭它:
shutdown()
: 这个方法向 ExecutorService 发出信号,表示不应再提交新任务。队列中的任务或当前正在执行的任务将被允许完成,然后 ExecutorService 才会终止。这就像告知工头 ExecutorService
停止接受新的建筑工程任务,但允许正在进行的项目完成。shutdownNow()
: 这个方法尝试停止所有当前正在执行的任务,并防止任何新任务被提交。这就像工头紧急停止所有建筑活动。然而,要小心——突然停止任务可能导致工作不完整或数据不一致。当我们向一个已满的线程池的 ExecutorService
提交任务时,如果线程池无法接受新的任务,这些任务会被提交到线程池的阻塞队列中。如果阻塞队列也已满,就会触发一个名为 RejectedExecutionHandler
的机制。默认情况下,ThreadPoolExecutor
使用的拒绝策略是 AbortPolicy
,即抛出 RejectedExecutionException
异常,导致任务被拒绝。
为了处理被拒绝的任务,我们可以选择不同的策略:
RejectedExecutionException
异常,阻止任务被提交。RejectedExecutionHandler
接口,开发者可以定义自己的拒绝处理逻辑。例如,可以记录日志、报警,甚至将任务重新加入队列。选择适合的拒绝策略,能帮助你更好地控制线程池在高负载情况下的行为,确保系统的稳定性。
ExecutorService
提供了超出简单任务提交的功能:
invokeAll(Collection<? extends Callable<T>> tasks)
: 这个方法允许提交一系列 Callable
任务,并返回包含结果的 List<Future>
。它阻塞调用线程直到所有任务完成。这适用于等待一组任务的完成并检索它们各自的结果。invokeAny(Collection<? extends Callable<T>> tasks)
: 这个方法提交一系列 Callable
任务,但只等待第一个完成。它返回已完成任务的结果,如果所有任务都失败则抛出异常。这在您只需要一个组中任何一个任务的结果时非常有用,无论哪个任务首先完成。ExecutorService
在许多需要异步处理以提高性能和响应性的场景中表现出色。让我们通过一些代码示例来探索一些常见用例:
当需要从多个 API 并发地获取数据以提升 Web 应用程序的感知性能时,ExecutorService
可以发挥重要作用。通过它,我们可以高效地管理线程池,提交多个并行任务,从而在最短的时间内获取所有 API 的响应。这种方式不仅提升了数据获取速度,还减少了单个 API 请求的等待时间,从而显著改善用户体验。
以下是一个使用 ExecutorService
并发地从多个 API 获取数据的示例:
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.List;
import java.util.ArrayList;
public class ConcurrentApiRequests {
public static void main(String[] args) {
// 创建一个固定大小的线程池
ExecutorService executor = Executors.newFixedThreadPool(3);
// 假设我们有多个API的URL列表
List<String> apiUrls = List.of(
"https://funtester.com/posts/1",
"https://funtester.com/posts/2",
"https://funtester.com/posts/3"
);
// 存储每个任务的Future对象
List<Future<String>> futures = new ArrayList<>();
for (String url : apiUrls) {
Callable<String> task = () -> {
// 模拟发送HTTP请求(使用 HttpClient 或 OkHttp 等工具)
// 这里为了简化,直接返回URL
return fetchDataFromApi(url);
};
// 提交任务给线程池
Future<String> future = executor.submit(task);
futures.add(future);
}
// 处理所有的Future对象,获取结果
for (Future<String> future : futures) {
try {
// 获取任务执行的结果
String result = future.get();
System.out.println("API Response: " + result);
} catch (Exception e) {
e.printStackTrace();
}
}
// 关闭线程池
executor.shutdown();
try {
// 等待所有任务完成
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow(); // 强制关闭
}
} catch (InterruptedException e) {
executor.shutdownNow();
}
}
// 模拟API数据获取
private static String fetchDataFromApi(String url) throws Exception {
// 这里可以实际调用HttpClient或其他HTTP库发送请求并获取数据
// 为了简化,直接返回URL作为结果
Thread.sleep(1000); // 模拟网络延迟
return "Data from " + url;
}
}
关键步骤说明:
Executors.newFixedThreadPool(3)
创建一个固定大小为 3 的线程池,允许最多 3 个任务并发执行。Callable
任务,并提交给 ExecutorService
,返回一个 Future
对象。Future
用于异步获取任务的执行结果。Future
列表,调用 future.get()
获取每个任务的结果。此操作是阻塞的,但因为任务是并发执行的,整体性能大大提升。executor.shutdown()
关闭线程池,并使用 awaitTermination
等待所有任务完成。如果等待超时,则调用 shutdownNow
强制关闭。通过 ExecutorService
并发地从多个 API 获取数据,可以显著提升 Web 应用程序的感知性能,让用户感受到更快的响应速度和更流畅的交互体验。在实际应用中,根据 API 的特性和系统资源合理调整线程池配置,是获得最佳性能的关键。
在需要对一批上传的图像进行后台处理(如调整图像大小)时,ExecutorService
是一个非常有效的工具。它可以异步处理这些任务,而不会阻塞主线程,从而保持应用程序的响应性。
以下是一个使用 ExecutorService
来异步调整一批图像大小的示例:
import java.awt.Graphics2D;
import java.awt.Image;
import java.awt.image.BufferedImage;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import javax.imageio.ImageIO;
public class ImageResizer {
public static void main(String[] args) {
// 创建一个固定大小的线程池
ExecutorService executor = Executors.newFixedThreadPool(4);
// 模拟上传的图像文件列表
List<File> imageFiles = List.of(
new File("path/to/image1.jpg"),
new File("path/to/image2.jpg"),
new File("path/to/image3.jpg")
);
// 存储每个任务的Future对象
List<Future<File>> futures = new ArrayList<>();
for (File imageFile : imageFiles) {
Callable<File> task = () -> resizeImage(imageFile, 200, 200); // 将图像调整为200x200的大小
// 提交任务给线程池
Future<File> future = executor.submit(task);
futures.add(future);
}
// 图像大小调整方法
private static File resizeImage(File inputFile, int width, int height) throws IOException {
// 读取原始图像
BufferedImage originalImage = ImageIO.read(inputFile);
// 创建一个缩放后的图像
Image scaledImage = originalImage.getScaledInstance(width, height, Image.SCALE_SMOOTH);
BufferedImage resizedImage = new BufferedImage(width, height, BufferedImage.TYPE_INT_RGB);
// 在新的BufferedImage中绘制缩放后的图像
Graphics2D g2d = resizedImage.createGraphics();
g2d.drawImage(scaledImage, 0, 0, null);
g2d.dispose();
// 保存调整大小后的图像
File outputFile = new File("resized_" + inputFile.getName());
ImageIO.write(resizedImage, "jpg", outputFile);
return outputFile;
}
}
代码步骤说明:
Executors.newFixedThreadPool(4)
创建一个固定大小为 4 的线程池,允许最多 4 个图像处理任务并发执行。Callable<File>
任务,并提交给 ExecutorService
。每个任务返回一个 Future<File>
,用于异步获取处理结果。resizeImage
方法中,通过 Image.getScaledInstance
方法调整图像大小,并使用 Graphics2D
将缩放后的图像绘制到新的 BufferedImage
上,然后将其保存为新的文件。Future
列表,调用 future.get()
获取每个任务的结果。此操作阻塞当前线程直到任务完成,但由于任务是并发执行的,整个过程依然很高效。executor.shutdown()
关闭线程池,确保所有任务完成后再结束应用。这种方法适用于 Web 应用、桌面应用或服务器后台任务,例如用户上传多张图片时,应用可以迅速响应用户上传操作,而在后台调整图片大小,以便稍后用于展示或存储。
在应用程序中,某些任务可能需要在后台执行,例如发送电子邮件、记录数据、处理文件等。这些任务通常需要一定的时间完成,而如果在主线程中执行这些任务,可能会导致应用程序的 UI 变得不响应。为了保持 UI 的流畅性和用户体验,使用 ExecutorService
来异步处理这些后台任务是非常有效的。
下面是一个简单的示例,展示了如何使用 ExecutorService
来异步发送电子邮件和记录数据,而不阻塞主线程:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class BackgroundTaskExample {
public static void main(String[] args) {
// 创建一个单线程的ExecutorService,用于处理后台任务
ExecutorService executor = Executors.newSingleThreadExecutor();
// 模拟UI操作,例如用户点击按钮后触发的任务
System.out.println("UI is responsive. Performing background tasks...");
// 异步发送电子邮件
executor.submit(() -> sendEmail("user@example.com", "Welcome", "Thank you for signing up!"));
// 异步记录数据
executor.submit(() -> logData("User signed up with email user@example.com"));
// 主线程继续执行其他操作
System.out.println("UI can continue to interact with the user...");
// 关闭ExecutorService
executor.shutdown();
}
// 模拟发送电子邮件的方法
private static void sendEmail(String recipient, String subject, String body) {
try {
// 模拟发送电子邮件的延迟
Thread.sleep(2000);
System.out.println("Email sent to " + recipient + " with subject: " + subject);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.err.println("Email sending was interrupted");
}
}
// 模拟记录数据的方法
private static void logData(String data) {
try {
// 模拟记录数据的延迟
Thread.sleep(1000);
System.out.println("Data logged: " + data);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.err.println("Data logging was interrupted");
}
}
}
代码说明:
Executors.newSingleThreadExecutor()
创建一个单线程的 ExecutorService
。这种类型的线程池适合处理多个任务但确保任务按顺序执行。executor.submit()
提交任务。这些任务在后台异步执行,主线程无需等待任务完成即可继续处理其他操作。Thread.sleep()
。executor.shutdown()
关闭 ExecutorService
。这种关闭方式允许已经提交的任务继续执行,而不会接受新任务。实际应用场景:
通过 ExecutorService
,我们可以有效地将这些后台任务与主线程分离,实现应用程序的高效运行和良好的用户体验。
选择正确的 ExecutorService
配置对优化性能和资源至关重要。以下是一些最佳实践:
FixedThreadPool
)提供稳定的性能。对于高度可变的工作负载,可选择缓存线程池(CachedThreadPool
),它会动态调整线程数量,但需注意无界队列可能导致资源耗尽。ExecutorService
后,务必调用 shutdown()
或 shutdownNow()
方法来关闭它。否则,线程池中的空闲线程和其他资源可能会持续占用内存,导致资源泄漏和性能下降。确保在不再需要线程池时进行正确关闭,以维护系统资源的健康。Future
的 get()
方法获取任务结果时也要处理 ExecutionException
。FunTester 原创精华