FunTester Spring WebClient + 虚拟线程实战

FunTester · 2025年12月10日 · 130 次阅读

Spring WebClient 与虚拟线程:实战集成指南

在上一篇文章中,我们介绍了 JDK HttpClient 和虚拟线程的基础概念。现在让我们深入探讨如何将虚拟线程与 Spring WebClient 集成,以及在实际项目中的应用场景。

与 Spring WebClient 集成

你也可以增强 Spring WebClient,使其使用虚拟线程,将 Spring 强大的生态系统与虚拟线程的可扩展性结合起来。这样你既能享受 Spring 的便利,又能获得虚拟线程的性能优势。

自定义 WebClient 配置

下面是一个自定义的 WebClient 配置,让它使用虚拟线程:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.JdkClientHttpRequestFactory;
import org.springframework.web.reactive.function.client.WebClient;
import java.net.http.HttpClient;
import java.time.Duration;
import java.util.concurrent.Executors;

/**
 * WebClient 配置类,使用虚拟线程增强性能
 * WebClient configuration class with virtual thread enhancement
 */
@Configuration
public class WebClientConfig {

    /**
     * 创建使用虚拟线程的 WebClient Bean
     * Create WebClient Bean using virtual threads
     * @return WebClient 实例 / WebClient instance
     */
    @Bean
    public WebClient webClient() {
        // 创建使用虚拟线程的 JDK HttpClient
        // Create JDK HttpClient using virtual threads
        HttpClient httpClient = HttpClient.newBuilder()
            .version(HttpClient.Version.HTTP_2)  // 使用 HTTP/2 协议 / Use HTTP/2 protocol
            .connectTimeout(Duration.ofSeconds(10))  // 设置连接超时 / Set connection timeout
            .executor(Executors.newVirtualThreadPerTaskExecutor())  // 使用虚拟线程执行器 / Use virtual thread executor
            .build();

        // 创建基于 JDK HttpClient 的请求工厂
        // Create request factory based on JDK HttpClient
        JdkClientHttpRequestFactory requestFactory =
            new JdkClientHttpRequestFactory(httpClient);

        // 构建 WebClient,配置基础 URL 和默认请求头
        // Build WebClient with base URL and default headers
        return WebClient.builder()
            .clientConnector(new HttpComponentsClientHttpConnector(requestFactory))  // 设置客户端连接器 / Set client connector
            .baseUrl("https://api.example.com")  // 设置基础 URL / Set base URL
            .defaultHeader("User-Agent", "FunTester-Spring-VirtualThread-Client")  // 设置 User-Agent 请求头 / Set User-Agent header
            .build();
    }
}

实际服务实现

下面是一个利用这种配置的真实服务,展示了如何在业务代码中使用虚拟线程增强的 WebClient:

import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.concurrent.Executors;

/**
 * 用户服务类,使用虚拟线程增强的 WebClient
 * User service class using virtual thread enhanced WebClient
 */
@Service
public class UserService {

    // WebClient 实例
    // WebClient instance
    private final WebClient webClient;

    /**
     * 构造函数,注入 WebClient
     * Constructor, inject WebClient
     * @param webClient WebClient 实例 / WebClient instance
     */
    public UserService(WebClient webClient) {
        this.webClient = webClient;
    }

    /**
     * 根据用户 ID 获取用户信息
     * Get user info by user ID
     * @param userId 用户 ID / User ID
     * @return Mono<User> 响应式用户对象 / Reactive User object
     */
    public Mono<User> getUserById(Long userId) {
        return webClient.get()
            .uri("/users/{id}", userId)  // 设置请求路径 / Set request path
            .retrieve()  // 检索响应 / Retrieve response
            .bodyToMono(User.class)  // 转换为 User 对象 / Convert to User object
            .doOnError(error ->
                System.err.println("Error fetching user: FunTester - " + error.getMessage()));  // 错误处理 / Error handling
    }

    /**
     * 获取所有用户列表
     * Get all users list
     * @return Flux<User> 响应式用户流 / Reactive User stream
     */
    public Flux<User> getAllUsers() {
        return webClient.get()
            .uri("/users")  // 设置请求路径 / Set request path
            .retrieve()  // 检索响应 / Retrieve response
            .bodyToFlux(User.class);  // 转换为 User 流 / Convert to User stream
    }

    /**
     * 基于虚拟线程的批量处理用户信息
     * Batch process user info based on virtual threads
     * @param userIds 用户 ID 列表 / List of user IDs
     * @return 用户列表 / List of users
     */
    public List<User> getUsersBatch(List<Long> userIds) {
        // 创建虚拟线程执行器
        // Create virtual thread executor
        try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
            // 为每个用户 ID 提交一个虚拟线程任务
            // Submit a virtual thread task for each user ID
            var futures = userIds.stream()
                .map(id -> executor.submit(() ->
                    getUserById(id).block()))  // 阻塞等待结果,虚拟线程下成本低 / Block and wait for result, low cost with virtual threads
                .toList();

            // 收集所有任务结果
            // Collect all task results
            return futures.stream()
                .map(future -> {
                    try {
                        return future.get();  // 获取任务结果 / Get task result
                    } catch (Exception e) {
                        return null;  // 失败返回 null / Return null on failure
                    }
                })
                .filter(user -> user != null)  // 过滤掉 null 值 / Filter out null values
                .toList();
        }
    }
}

高级模式和最佳实践

连接池和资源管理

JDK HttpClient 自动管理连接池,但你仍然需要注意资源限制。虽然虚拟线程让你可以创建大量并发请求,但网络连接和服务器资源仍然是有限的:

// 创建使用虚拟线程的 HTTP 客户端,支持 HTTP/2 多路复用
// Create HTTP client using virtual threads with HTTP/2 multiplexing support
HttpClient client = HttpClient.newBuilder()
    .version(HttpClient.Version.HTTP_2)  // 使用 HTTP/2 协议 / Use HTTP/2 protocol
    .connectTimeout(Duration.ofSeconds(10))  // 设置连接超时 / Set connection timeout
    .executor(Executors.newVirtualThreadPerTaskExecutor())  // 使用虚拟线程执行器 / Use virtual thread executor
    // HTTP/2 多路复用允许一个连接上处理多个请求
    // HTTP/2 multiplexing allows multiple requests on a single connection
    .build();

HTTP/2 的多路复用能力意味着多个请求可以共享一个 TCP 连接,与 HTTP/1.1 相比,大幅减少了开销。这就像 HTTP/1.1 是单车道,一次只能过一辆车;HTTP/2 是多车道,可以同时过很多辆车,效率自然高很多。

错误处理和弹性

构建弹性 HTTP 客户端需要正确的错误处理和重试逻辑。在实际生产环境中,网络请求可能会失败,服务器可能会返回错误,这时候重试机制就很重要了。下面是一个带重试逻辑的 HTTP 客户端实现:

import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpTimeoutException;
import java.time.Duration;

/**
 * 具有重试机制的弹性 HTTP 客户端
 * Resilient HTTP client with retry mechanism
 */
public class ResilientHttpClient {

    // HTTP 客户端实例
    // HTTP client instance
    private final HttpClient httpClient;
    // 最大重试次数
    // Maximum retry count
    private final int maxRetries = 3;

    /**
     * 带重试机制的 HTTP 请求方法
     * HTTP request method with retry mechanism
     * @param url 请求 URL / Request URL
     * @return 响应内容 / Response content
     */
    public String fetchWithRetry(String url) {
        int attempt = 0;  // 当前尝试次数 / Current attempt count
        Exception lastException = null;  // 最后一次异常 / Last exception

        // 重试循环
        // Retry loop
        while (attempt < maxRetries) {
            try {
                // 构建 HTTP 请求
                // Build HTTP request
                HttpRequest request = HttpRequest.newBuilder()
                    .uri(URI.create(url))  // 设置请求 URI / Set request URI
                    .timeout(Duration.ofSeconds(5))  // 设置请求超时 / Set request timeout
                    .GET()  // 设置为 GET 请求 / Set as GET request
                    .build();

                // 发送请求并获取响应
                // Send request and get response
                HttpResponse<String> response = httpClient.send(request,
                    HttpResponse.BodyHandlers.ofString());

                // 检查响应状态码
                // Check response status code
                if (response.statusCode() >= 200 && response.statusCode() < 300) {
                    return response.body();  // 成功返回响应体 / Success, return response body
                } else if (response.statusCode() >= 500) {
                    // 服务器错误时重试
                    // Retry on server error
                    Thread.sleep(1000 * (attempt + 1)); // 指数退避策略 / Exponential backoff strategy
                    attempt++;
                    continue;
                } else {
                    throw new RuntimeException("FunTester - HTTP " + response.statusCode());
                }

            } catch (HttpTimeoutException e) {
                lastException = e;
                attempt++;
                System.out.println("Timeout on attempt FunTester - " + attempt);
            } catch (Exception e) {
                throw new RuntimeException("FunTester - Request failed", e);
            }
        }

        throw new RuntimeException("FunTester - Max retries exceeded", lastException);
    }
}

结构化并发

Java 21 还引入了结构化并发,这与虚拟线程完美搭配,用于管理复杂的并发操作。结构化并发就像给并发操作加了一个生命周期管理,确保所有子任务一起完成或失败,不会出现任务泄漏。这在处理多个相关 HTTP 请求时特别有用:

import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.StructuredTaskScope.Subtask;

/**
 * 使用结构化并发获取聚合数据
 * Fetch aggregated data using structured concurrency
 */
public class StructuredHttpFetcher {

    /**
     * 获取所有相关数据
     * Fetch all related data
     * @param userId 用户 ID / User ID
     * @return 聚合数据 / Aggregated data
     */
    public AggregatedData fetchAllData(String userId) {
        // 创建结构化任务作用域,任一任务失败则取消其他任务
        // Create structured task scope, cancel other tasks if any task fails
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {

            // 并行执行三个 HTTP 请求任务
            // Execute three HTTP request tasks in parallel
            Subtask<User> userTask = scope.fork(() -> fetchUser(userId));  // 获取用户信息 / Fetch user info
            Subtask<List<Order>> ordersTask = scope.fork(() -> fetchOrders(userId));  // 获取订单列表 / Fetch orders list
            Subtask<Profile> profileTask = scope.fork(() -> fetchProfile(userId));  // 获取用户资料 / Fetch user profile

            scope.join();           // 等待所有任务完成 / Wait for all tasks to complete
            scope.throwIfFailed();  // 如果任何任务失败则抛出异常 / Throw exception if any task failed

            // 聚合所有数据
            // Aggregate all data
            return new AggregatedData(
                userTask.get(),  // 获取用户任务结果 / Get user task result
                ordersTask.get(),  // 获取订单任务结果 / Get orders task result
                profileTask.get()  // 获取资料任务结果 / Get profile task result
            );
        } catch (Exception e) {
            throw new RuntimeException("FunTester - Failed to fetch data", e);
        }
    }
}

这种模式确保所有子任务一起完成或失败,并自动清理和传播错误。如果任何一个子任务失败,其他任务也会被取消,避免资源浪费。

实际用例

微服务通信

在微服务架构中,服务之间不断通过 HTTP 进行通信。虚拟线程消除了简单性(每个请求一个线程)与可扩展性之间的传统权衡。以前你要么用线程池限制并发,要么忍受资源消耗,现在虚拟线程让你可以轻松处理大量并发请求:

/**
 * 订单控制器,使用虚拟线程和结构化并发
 * Order controller using virtual threads and structured concurrency
 */
@RestController
@RequestMapping("/api")
public class OrderController {

    // HTTP 客户端实例
    // HTTP client instance
    private final HttpClient httpClient;

    /**
     * 构造函数,初始化使用虚拟线程的 HTTP 客户端
     * Constructor, initialize HTTP client using virtual threads
     */
    public OrderController() {
        this.httpClient = HttpClient.newBuilder()
            .executor(Executors.newVirtualThreadPerTaskExecutor())  // 使用虚拟线程执行器 / Use virtual thread executor
            .build();
    }

    /**
     * 获取订单详情,并行调用多个微服务
     * Get order details, call multiple microservices in parallel
     * @param orderId 订单 ID / Order ID
     * @return 订单详情 / Order details
     */
    @GetMapping("/orders/{orderId}/details")
    public OrderDetails getOrderDetails(@PathVariable String orderId) {
        // 这些调用在虚拟线程上并发运行
        // These calls run concurrently on virtual threads
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
            // 并行执行多个 HTTP 请求
            // Execute multiple HTTP requests in parallel
            var orderTask = scope.fork(() -> fetchOrder(orderId));  // 获取订单信息 / Fetch order info
            var userTask = scope.fork(() -> fetchUser(orderTask.get().getUserId()));  // 获取用户信息 / Fetch user info
            var inventoryTask = scope.fork(() -> fetchInventory(orderId));  // 获取库存信息 / Fetch inventory info
            var shippingTask = scope.fork(() -> fetchShipping(orderId));  // 获取物流信息 / Fetch shipping info

            scope.join();  // 等待所有任务完成 / Wait for all tasks to complete
            scope.throwIfFailed();  // 检查是否有任务失败 / Check if any task failed

            // 组装订单详情
            // Assemble order details
            return new OrderDetails(
                orderTask.get(),  // 订单信息 / Order info
                userTask.get(),  // 用户信息 / User info
                inventoryTask.get(),  // 库存信息 / Inventory info
                shippingTask.get()  // 物流信息 / Shipping info
            );
        } catch (Exception e) {
            throw new RuntimeException("FunTester - Failed to build order details", e);
        }
    }
}

API 网关聚合

API 网关通常需要从多个后端服务聚合数据。虚拟线程使这种模式高效且简单。以前你可能需要复杂的异步编排,现在用虚拟线程可以轻松实现并行请求多个服务:

/**
 * API 网关,聚合多个后端服务的数据
 * API gateway aggregating data from multiple backend services
 */
public class ApiGateway {

    // HTTP 客户端实例
    // HTTP client instance
    private final HttpClient httpClient;

    /**
     * 获取仪表板数据,并行调用多个服务
     * Get dashboard data by calling multiple services in parallel
     * @param userId 用户 ID / User ID
     * @return 仪表板数据 / Dashboard data
     */
    public DashboardData getDashboard(String userId) {
        // 创建虚拟线程执行器
        // Create virtual thread executor
        try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {

            // 并行提交多个服务调用任务
            // Submit multiple service call tasks in parallel
            var futures = List.of(
                executor.submit(() -> fetchService("user-service", userId)),  // 用户服务 / User service
                executor.submit(() -> fetchService("notification-service", userId)),  // 通知服务 / Notification service
                executor.submit(() -> fetchService("analytics-service", userId)),  // 分析服务 / Analytics service
                executor.submit(() -> fetchService("recommendation-service", userId))  // 推荐服务 / Recommendation service
            );

            // 收集所有服务调用结果
            // Collect all service call results
            List<String> results = futures.stream()
                .map(future -> {
                    try {
                        return future.get(Duration.ofSeconds(2));  // 设置超时时间 / Set timeout
                    } catch (Exception e) {
                        return "{}"; // 回退到空数据 / Fallback to empty data
                    }
                })
                .toList();

            // 聚合结果
            // Aggregate results
            return aggregateResults(results);
        }
    }
}

总结

通过将虚拟线程与 Spring WebClient 集成,我们可以在 Spring 生态系统中充分利用虚拟线程的性能优势。高级模式如结构化并发、错误处理和连接池管理,让我们能够构建更加健壮和高效的 HTTP 客户端。

在实际应用中,微服务通信和 API 网关聚合是虚拟线程发挥优势的典型场景。在下一篇文章中,我们将深入探讨性能调优、监控和迁移策略,帮助你将虚拟线程应用到生产环境中。


FunTester 原创精华
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
暂无回复。
需要 登录 后方可回复, 如果你还没有账号请点击这里 注册