Spring WebClient 与虚拟线程:实战集成指南
在上一篇文章中,我们介绍了 JDK HttpClient 和虚拟线程的基础概念。现在让我们深入探讨如何将虚拟线程与 Spring WebClient 集成,以及在实际项目中的应用场景。
与 Spring WebClient 集成
你也可以增强 Spring WebClient,使其使用虚拟线程,将 Spring 强大的生态系统与虚拟线程的可扩展性结合起来。这样你既能享受 Spring 的便利,又能获得虚拟线程的性能优势。
自定义 WebClient 配置
下面是一个自定义的 WebClient 配置,让它使用虚拟线程:
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.JdkClientHttpRequestFactory;
import org.springframework.web.reactive.function.client.WebClient;
import java.net.http.HttpClient;
import java.time.Duration;
import java.util.concurrent.Executors;
/**
* WebClient 配置类,使用虚拟线程增强性能
* WebClient configuration class with virtual thread enhancement
*/
@Configuration
public class WebClientConfig {
/**
* 创建使用虚拟线程的 WebClient Bean
* Create WebClient Bean using virtual threads
* @return WebClient 实例 / WebClient instance
*/
@Bean
public WebClient webClient() {
// 创建使用虚拟线程的 JDK HttpClient
// Create JDK HttpClient using virtual threads
HttpClient httpClient = HttpClient.newBuilder()
.version(HttpClient.Version.HTTP_2) // 使用 HTTP/2 协议 / Use HTTP/2 protocol
.connectTimeout(Duration.ofSeconds(10)) // 设置连接超时 / Set connection timeout
.executor(Executors.newVirtualThreadPerTaskExecutor()) // 使用虚拟线程执行器 / Use virtual thread executor
.build();
// 创建基于 JDK HttpClient 的请求工厂
// Create request factory based on JDK HttpClient
JdkClientHttpRequestFactory requestFactory =
new JdkClientHttpRequestFactory(httpClient);
// 构建 WebClient,配置基础 URL 和默认请求头
// Build WebClient with base URL and default headers
return WebClient.builder()
.clientConnector(new HttpComponentsClientHttpConnector(requestFactory)) // 设置客户端连接器 / Set client connector
.baseUrl("https://api.example.com") // 设置基础 URL / Set base URL
.defaultHeader("User-Agent", "FunTester-Spring-VirtualThread-Client") // 设置 User-Agent 请求头 / Set User-Agent header
.build();
}
}
实际服务实现
下面是一个利用这种配置的真实服务,展示了如何在业务代码中使用虚拟线程增强的 WebClient:
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.concurrent.Executors;
/**
* 用户服务类,使用虚拟线程增强的 WebClient
* User service class using virtual thread enhanced WebClient
*/
@Service
public class UserService {
// WebClient 实例
// WebClient instance
private final WebClient webClient;
/**
* 构造函数,注入 WebClient
* Constructor, inject WebClient
* @param webClient WebClient 实例 / WebClient instance
*/
public UserService(WebClient webClient) {
this.webClient = webClient;
}
/**
* 根据用户 ID 获取用户信息
* Get user info by user ID
* @param userId 用户 ID / User ID
* @return Mono<User> 响应式用户对象 / Reactive User object
*/
public Mono<User> getUserById(Long userId) {
return webClient.get()
.uri("/users/{id}", userId) // 设置请求路径 / Set request path
.retrieve() // 检索响应 / Retrieve response
.bodyToMono(User.class) // 转换为 User 对象 / Convert to User object
.doOnError(error ->
System.err.println("Error fetching user: FunTester - " + error.getMessage())); // 错误处理 / Error handling
}
/**
* 获取所有用户列表
* Get all users list
* @return Flux<User> 响应式用户流 / Reactive User stream
*/
public Flux<User> getAllUsers() {
return webClient.get()
.uri("/users") // 设置请求路径 / Set request path
.retrieve() // 检索响应 / Retrieve response
.bodyToFlux(User.class); // 转换为 User 流 / Convert to User stream
}
/**
* 基于虚拟线程的批量处理用户信息
* Batch process user info based on virtual threads
* @param userIds 用户 ID 列表 / List of user IDs
* @return 用户列表 / List of users
*/
public List<User> getUsersBatch(List<Long> userIds) {
// 创建虚拟线程执行器
// Create virtual thread executor
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
// 为每个用户 ID 提交一个虚拟线程任务
// Submit a virtual thread task for each user ID
var futures = userIds.stream()
.map(id -> executor.submit(() ->
getUserById(id).block())) // 阻塞等待结果,虚拟线程下成本低 / Block and wait for result, low cost with virtual threads
.toList();
// 收集所有任务结果
// Collect all task results
return futures.stream()
.map(future -> {
try {
return future.get(); // 获取任务结果 / Get task result
} catch (Exception e) {
return null; // 失败返回 null / Return null on failure
}
})
.filter(user -> user != null) // 过滤掉 null 值 / Filter out null values
.toList();
}
}
}
高级模式和最佳实践
连接池和资源管理
JDK HttpClient 自动管理连接池,但你仍然需要注意资源限制。虽然虚拟线程让你可以创建大量并发请求,但网络连接和服务器资源仍然是有限的:
// 创建使用虚拟线程的 HTTP 客户端,支持 HTTP/2 多路复用
// Create HTTP client using virtual threads with HTTP/2 multiplexing support
HttpClient client = HttpClient.newBuilder()
.version(HttpClient.Version.HTTP_2) // 使用 HTTP/2 协议 / Use HTTP/2 protocol
.connectTimeout(Duration.ofSeconds(10)) // 设置连接超时 / Set connection timeout
.executor(Executors.newVirtualThreadPerTaskExecutor()) // 使用虚拟线程执行器 / Use virtual thread executor
// HTTP/2 多路复用允许一个连接上处理多个请求
// HTTP/2 multiplexing allows multiple requests on a single connection
.build();
HTTP/2 的多路复用能力意味着多个请求可以共享一个 TCP 连接,与 HTTP/1.1 相比,大幅减少了开销。这就像 HTTP/1.1 是单车道,一次只能过一辆车;HTTP/2 是多车道,可以同时过很多辆车,效率自然高很多。
错误处理和弹性
构建弹性 HTTP 客户端需要正确的错误处理和重试逻辑。在实际生产环境中,网络请求可能会失败,服务器可能会返回错误,这时候重试机制就很重要了。下面是一个带重试逻辑的 HTTP 客户端实现:
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpTimeoutException;
import java.time.Duration;
/**
* 具有重试机制的弹性 HTTP 客户端
* Resilient HTTP client with retry mechanism
*/
public class ResilientHttpClient {
// HTTP 客户端实例
// HTTP client instance
private final HttpClient httpClient;
// 最大重试次数
// Maximum retry count
private final int maxRetries = 3;
/**
* 带重试机制的 HTTP 请求方法
* HTTP request method with retry mechanism
* @param url 请求 URL / Request URL
* @return 响应内容 / Response content
*/
public String fetchWithRetry(String url) {
int attempt = 0; // 当前尝试次数 / Current attempt count
Exception lastException = null; // 最后一次异常 / Last exception
// 重试循环
// Retry loop
while (attempt < maxRetries) {
try {
// 构建 HTTP 请求
// Build HTTP request
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(url)) // 设置请求 URI / Set request URI
.timeout(Duration.ofSeconds(5)) // 设置请求超时 / Set request timeout
.GET() // 设置为 GET 请求 / Set as GET request
.build();
// 发送请求并获取响应
// Send request and get response
HttpResponse<String> response = httpClient.send(request,
HttpResponse.BodyHandlers.ofString());
// 检查响应状态码
// Check response status code
if (response.statusCode() >= 200 && response.statusCode() < 300) {
return response.body(); // 成功返回响应体 / Success, return response body
} else if (response.statusCode() >= 500) {
// 服务器错误时重试
// Retry on server error
Thread.sleep(1000 * (attempt + 1)); // 指数退避策略 / Exponential backoff strategy
attempt++;
continue;
} else {
throw new RuntimeException("FunTester - HTTP " + response.statusCode());
}
} catch (HttpTimeoutException e) {
lastException = e;
attempt++;
System.out.println("Timeout on attempt FunTester - " + attempt);
} catch (Exception e) {
throw new RuntimeException("FunTester - Request failed", e);
}
}
throw new RuntimeException("FunTester - Max retries exceeded", lastException);
}
}
结构化并发
Java 21 还引入了结构化并发,这与虚拟线程完美搭配,用于管理复杂的并发操作。结构化并发就像给并发操作加了一个生命周期管理,确保所有子任务一起完成或失败,不会出现任务泄漏。这在处理多个相关 HTTP 请求时特别有用:
import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.StructuredTaskScope.Subtask;
/**
* 使用结构化并发获取聚合数据
* Fetch aggregated data using structured concurrency
*/
public class StructuredHttpFetcher {
/**
* 获取所有相关数据
* Fetch all related data
* @param userId 用户 ID / User ID
* @return 聚合数据 / Aggregated data
*/
public AggregatedData fetchAllData(String userId) {
// 创建结构化任务作用域,任一任务失败则取消其他任务
// Create structured task scope, cancel other tasks if any task fails
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// 并行执行三个 HTTP 请求任务
// Execute three HTTP request tasks in parallel
Subtask<User> userTask = scope.fork(() -> fetchUser(userId)); // 获取用户信息 / Fetch user info
Subtask<List<Order>> ordersTask = scope.fork(() -> fetchOrders(userId)); // 获取订单列表 / Fetch orders list
Subtask<Profile> profileTask = scope.fork(() -> fetchProfile(userId)); // 获取用户资料 / Fetch user profile
scope.join(); // 等待所有任务完成 / Wait for all tasks to complete
scope.throwIfFailed(); // 如果任何任务失败则抛出异常 / Throw exception if any task failed
// 聚合所有数据
// Aggregate all data
return new AggregatedData(
userTask.get(), // 获取用户任务结果 / Get user task result
ordersTask.get(), // 获取订单任务结果 / Get orders task result
profileTask.get() // 获取资料任务结果 / Get profile task result
);
} catch (Exception e) {
throw new RuntimeException("FunTester - Failed to fetch data", e);
}
}
}
这种模式确保所有子任务一起完成或失败,并自动清理和传播错误。如果任何一个子任务失败,其他任务也会被取消,避免资源浪费。
实际用例
微服务通信
在微服务架构中,服务之间不断通过 HTTP 进行通信。虚拟线程消除了简单性(每个请求一个线程)与可扩展性之间的传统权衡。以前你要么用线程池限制并发,要么忍受资源消耗,现在虚拟线程让你可以轻松处理大量并发请求:
/**
* 订单控制器,使用虚拟线程和结构化并发
* Order controller using virtual threads and structured concurrency
*/
@RestController
@RequestMapping("/api")
public class OrderController {
// HTTP 客户端实例
// HTTP client instance
private final HttpClient httpClient;
/**
* 构造函数,初始化使用虚拟线程的 HTTP 客户端
* Constructor, initialize HTTP client using virtual threads
*/
public OrderController() {
this.httpClient = HttpClient.newBuilder()
.executor(Executors.newVirtualThreadPerTaskExecutor()) // 使用虚拟线程执行器 / Use virtual thread executor
.build();
}
/**
* 获取订单详情,并行调用多个微服务
* Get order details, call multiple microservices in parallel
* @param orderId 订单 ID / Order ID
* @return 订单详情 / Order details
*/
@GetMapping("/orders/{orderId}/details")
public OrderDetails getOrderDetails(@PathVariable String orderId) {
// 这些调用在虚拟线程上并发运行
// These calls run concurrently on virtual threads
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// 并行执行多个 HTTP 请求
// Execute multiple HTTP requests in parallel
var orderTask = scope.fork(() -> fetchOrder(orderId)); // 获取订单信息 / Fetch order info
var userTask = scope.fork(() -> fetchUser(orderTask.get().getUserId())); // 获取用户信息 / Fetch user info
var inventoryTask = scope.fork(() -> fetchInventory(orderId)); // 获取库存信息 / Fetch inventory info
var shippingTask = scope.fork(() -> fetchShipping(orderId)); // 获取物流信息 / Fetch shipping info
scope.join(); // 等待所有任务完成 / Wait for all tasks to complete
scope.throwIfFailed(); // 检查是否有任务失败 / Check if any task failed
// 组装订单详情
// Assemble order details
return new OrderDetails(
orderTask.get(), // 订单信息 / Order info
userTask.get(), // 用户信息 / User info
inventoryTask.get(), // 库存信息 / Inventory info
shippingTask.get() // 物流信息 / Shipping info
);
} catch (Exception e) {
throw new RuntimeException("FunTester - Failed to build order details", e);
}
}
}
API 网关聚合
API 网关通常需要从多个后端服务聚合数据。虚拟线程使这种模式高效且简单。以前你可能需要复杂的异步编排,现在用虚拟线程可以轻松实现并行请求多个服务:
/**
* API 网关,聚合多个后端服务的数据
* API gateway aggregating data from multiple backend services
*/
public class ApiGateway {
// HTTP 客户端实例
// HTTP client instance
private final HttpClient httpClient;
/**
* 获取仪表板数据,并行调用多个服务
* Get dashboard data by calling multiple services in parallel
* @param userId 用户 ID / User ID
* @return 仪表板数据 / Dashboard data
*/
public DashboardData getDashboard(String userId) {
// 创建虚拟线程执行器
// Create virtual thread executor
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
// 并行提交多个服务调用任务
// Submit multiple service call tasks in parallel
var futures = List.of(
executor.submit(() -> fetchService("user-service", userId)), // 用户服务 / User service
executor.submit(() -> fetchService("notification-service", userId)), // 通知服务 / Notification service
executor.submit(() -> fetchService("analytics-service", userId)), // 分析服务 / Analytics service
executor.submit(() -> fetchService("recommendation-service", userId)) // 推荐服务 / Recommendation service
);
// 收集所有服务调用结果
// Collect all service call results
List<String> results = futures.stream()
.map(future -> {
try {
return future.get(Duration.ofSeconds(2)); // 设置超时时间 / Set timeout
} catch (Exception e) {
return "{}"; // 回退到空数据 / Fallback to empty data
}
})
.toList();
// 聚合结果
// Aggregate results
return aggregateResults(results);
}
}
}
总结
通过将虚拟线程与 Spring WebClient 集成,我们可以在 Spring 生态系统中充分利用虚拟线程的性能优势。高级模式如结构化并发、错误处理和连接池管理,让我们能够构建更加健壮和高效的 HTTP 客户端。
在实际应用中,微服务通信和 API 网关聚合是虚拟线程发挥优势的典型场景。在下一篇文章中,我们将深入探讨性能调优、监控和迁移策略,帮助你将虚拟线程应用到生产环境中。