从零开发短视频电商 使用Spring WebClient发起远程Http调用

文章目录

    • 依赖
    • 使用
      • 创建WebClient实例
        • 创建带有超时的WebClient实例
        • 示例
      • 请求准备
      • 获取响应
    • 高级
      • 过滤器
        • 自定义过滤器
      • 自定义线程池
      • 自定义WebClient连接池
      • 开启日志
      • 错误处理
        • 最佳实践
    • 示例
      • 异步请求
      • 同步请求
      • 上传文件
      • 重试
      • 过滤错误
      • 错误处理
    • 参考

Spring WebClient 是 Spring WebFlux 项目中 Spring 5 中引入的异步、反应式 HTTP 客户端,用于替换旧的 RestTemplate,以便在使用 Spring Boot 框架构建的应用程序中进行 REST API 调用。

它支持同步、异步和流式场景。

它是一种基于 HTTP/1.1 协议的反应式、非阻塞解决方案。

依赖

为了使用WebClient,我们需要添加对 Spring WebFlux 启动器模块的依赖:

  <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId></dependency>

使用

创建WebClient实例

WebClient client = WebClient.create("http://localhost:8080");WebClient client = WebClient.builder().baseUrl("http://localhost:8080") // 基本 URL.defaultCookie("cookieKey", "cookieValue") // 定义默认cookie.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) // 定义默认header.defaultUriVariables(Collections.singletonMap("url", "http://localhost:8080")) // 定义默认 quringstring参数.build();WebClient.builder().baseUrl(host).exchangeStrategies(ExchangeStrategies.builder().codecs(codecs -> codecs.defaultCodecs().maxInMemorySize(500 * 1024))// 编解码器内存中数据的缓冲,默认值为 262,144 字节.build()).build();

创建带有超时的WebClient实例

默认的 30 秒超时

  • 通过 ChannelOption.CONNECT_TIMEOUT_MILLIS 选项设置连接超时时间
  • 使用 ReadTimeoutHandler 和 WriteTimeoutHandler 分别设置读取写入超时时间
  • 使用 responseTimeout 指令配置响应超时时间
HttpClient httpClient = HttpClient.create().option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) // 连接超时时间.responseTimeout(Duration.ofMillis(5000)) // 响应超时时间.doOnConnected(conn -> conn.addHandlerLast(new ReadTimeoutHandler(5000, TimeUnit.MILLISECONDS)) // 读超时.addHandlerLast(new WriteTimeoutHandler(5000, TimeUnit.MILLISECONDS))); // 写超时WebClient client = WebClient.builder().clientConnector(new ReactorClientHttpConnector(httpClient)).build();

示例

// 配置资源工厂,以控制连接池、线程池和其他资源的创建和管理
ReactorResourceFactory factory = new ReactorResourceFactory();
// 设置是否使用全局资源。设置为 false将创建独立的资源。默认情况下,为 true,表示使用全局资源。
factory.setUseGlobalResources(false);
// 配置连接池的提供者,控制连接的创建、分配和回收。 创建名为"httpClient"的连接池,最大连接数为 50 默认是 max(cpu,8)*2 。
factory.setConnectionProvider(ConnectionProvider.create("httpClient", 50));
// 用于配置事件循环资源,它管理底层事件循环线程。创建一个名为 "httpClient" 的事件循环资源,最大线程数为 50,而第三个参数 线程是否在 JVM 关闭时释放 max(cpu,4)
factory.setLoopResources(LoopResources.create("httpClient", 50, true));
WebClient.builder().baseUrl("")// 用于配置请求和响应的处理策略 通常用于配置序列化和反序列化。.exchangeStrategies(ExchangeStrategies.builder().codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(16 * 1024 * 1024)).build())// 用于配置编解码器。.codecs(clientCodecConfigurer -> {clientCodecConfigurer.defaultCodecs().maxInMemorySize(16 * 1024 * 1024); // 设置最大内存大小}).clientConnector(new ReactorClientHttpConnector(factory, client -> client// 设置连接建立的超时时间,单位为毫秒。.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)// 启用或禁用 HTTP 响应的压缩。默认情况下,压缩是禁用的。.compress(true)// 启用或禁用 "wiretap",这将允许你记录请求和响应的详细信息,用于调试和监控。.wiretap(true).responseTimeout(Duration.ofMillis(5000)) // 响应超时时间.doOnConnected(connection -> {// 添加读超时处理器,单位为毫秒connection.addHandlerLast(new ReadTimeoutHandler(10));// 添加写超时处理器,单位为毫秒connection.addHandlerLast(new WriteTimeoutHandler(8));})// 启用跟随重定向,默认情况下启用.followRedirect(true))).filter(ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {// 在请求发出之前记录请求信息System.out.println("Request: " + clientRequest.method() + " " + clientRequest.url());return Mono.just(clientRequest);}).andThen(ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {// 在响应接收后记录响应信息System.out.println("Response: " + clientResponse.statusCode());return Mono.just(clientResponse);}))).build();

请求准备

// 1.方法
UriSpec<RequestBodySpec> uriSpec = client.method(HttpMethod.POST);
UriSpec<RequestBodySpec> uriSpec = client.post();
// 2.uri
RequestBodySpec bodySpec = uriSpec.uri("/resource");
RequestBodySpec bodySpec = uriSpec.uri(uriBuilder -> uriBuilder.pathSegment("/resource").build());
//  "/products/2"
RequestBodySpec bodySpec = uriSpec.uri(uriBuilder - > uriBuilder.path("/products/{id}").build(2))
//  "/products/2/attributes/13"    
RequestBodySpec bodySpec = uriSpec.uri(uriBuilder - > uriBuilder.path("/products/{id}/attributes/{attributeId}").build(2, 13))
//	"/products/?name=AndroidPhone&color=black&deliveryDate=13/04/2019"    
RequestBodySpec bodySpec = uriSpec.uri(uriBuilder - > uriBuilder.path("/products/").queryParam("name", "AndroidPhone").queryParam("color", "black").queryParam("deliveryDate", "13/04/2019").build())
//	"/products/?name=AndroidPhone&color=black&deliveryDate=13%2F04%2F2019" 这种'/'符被转义了
RequestBodySpec bodySpec = uriSpec.uri(uriBuilder - > uriBuilder.path("/products/").queryParam("name", "{title}").queryParam("color", "{authorId}").queryParam("deliveryDate", "{date}").build("AndroidPhone", "black", "13/04/2019"))
// 数组参数 "/products/?category=Phones&category=Tablets"
webClient.get().uri(uriBuilder - > uriBuilder.path("/products/").queryParam("category", "Phones", "Tablets").build())
// 数组参数 "/products/?category=Phones,Tablets"
webClient.get().uri(uriBuilder - > uriBuilder.path("/products/").queryParam("category", String.join(",", "Phones", "Tablets")).build())    // 3.内容
RequestHeadersSpec<?> headersSpec = bodySpec.bodyValue("data");
RequestHeadersSpec<?> headersSpec = bodySpec.body(Mono.just(new Foo("name")), Foo.class);
RequestHeadersSpec<?> headersSpec = bodySpec.body(BodyInserters.fromValue("data"));
LinkedMultiValueMap map = new LinkedMultiValueMap();
map.add("key1", "value1");
map.add("key2", "value2");
RequestHeadersSpec<?> headersSpec = bodySpec.body(BodyInserters.fromMultipartData(map));
// 4.header标头
ResponseSpec responseSpec = headersSpec.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE).accept(MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML).acceptCharset(StandardCharsets.UTF_8).ifNoneMatch("*").ifModifiedSince(ZonedDateTime.now()).retrieve();

获取响应

发送请求并接收响应。我们可以通过使用exchangeToMono/exchangeToFlux或retrieve方法来实现。

// ExchangeToMono和ExchangeToFlux方法允许访问ClientResponse及其状态和标头
Mono<String> response = headersSpec.exchangeToMono(response -> {if (response.statusCode().equals(HttpStatus.OK)) {return response.bodyToMono(String.class);} else if (response.statusCode().is4xxClientError()) {return Mono.just("Error response");} else {return response.createException().flatMap(Mono::error);}
});
// 而retrieve方法是直接获取body
Mono<String> response = headersSpec.retrieve().bodyToMono(String.class);

需要注意的是 ResponseSpec.bodyToMono 方法,如果状态码为4xx(客户端错误)或5xx(服务器错误),它将抛出一个 WebClientException。

单个资源

Mono<Employee> employeeMono = client.get().uri("/employees/{id}", "1").retrieve().bodyToMono(Employee.class);employeeMono.subscribe(System.out::println);

多个资源

Flux<Employee> employeeFlux = client.get().uri("/employees").retrieve().bodyToFlux(Employee.class);employeeFlux.subscribe(System.out::println);

高级

过滤器

过滤器可以拦截、检查和修改客户端请求(或响应)。过滤器非常适合为每个请求添加功能,因为逻辑保留在一个位置。用例包括监视、修改、记录和验证客户端请求。

一个请求具有一个有序链,包含零个或多个过滤器。

在Spring Reactive中,过滤器是 ExchangeFilterFunction 的实例。过滤器函数有两个参数:要修改的 ClientRequest 和下一个 ExchangeFilterFunction。

通常,过滤器函数通过调用过滤器链中的下一个函数来返回:

ExchangeFilterFunction filterFunction = (clientRequest, nextFilter) -> {LOG.info("WebClient fitler executed");return nextFilter.exchange(clientRequest);
};// 添加过滤器
WebClient webClient = WebClient.builder().filter(filterFunction).build();WebClient.builder().filter((request, next) -> {  //过滤器,3次重试,header打印log.info(String.format("请求地址: %s", request.url()));log.info(String.format("请求头信息: %s", request.headers()));Mono<ClientResponse> exchange = next.exchange(request).retry(3);ClientResponse clientResponse = exchange.block();log.info(String.format("响应头信息: %s", clientResponse.headers().asHttpHeaders()));return exchange;}).clientConnector(connector).build();

自定义过滤器

让我们从一个对客户端发送的 HTTP GET 请求进行计数的过滤器开始。

过滤器检查请求方法并在 GET 请求的情况下增加“全局”计数器:

ExchangeFilterFunction countingFunction = (clientRequest, nextFilter) -> {HttpMethod httpMethod = clientRequest.method();if (httpMethod == HttpMethod.GET) {getCounter.incrementAndGet();}return nextFilter.exchange(clientRequest);
};

我们将定义的第二个过滤器将版本号附加到请求 URL 路径。我们利用ClientRequest.from()方法从当前请求对象创建一个新的请求对象并设置修改后的 URL。

随后,我们继续使用新修改的请求对象执行过滤器链:

ExchangeFilterFunction urlModifyingFilter = (clientRequest, nextFilter) -> {String oldUrl = clientRequest.url().toString();URI newUrl = URI.create(oldUrl + "/" + version);ClientRequest filteredRequest = ClientRequest.from(clientRequest).url(newUrl).build();return nextFilter.exchange(filteredRequest);
};

自定义线程池

WebClient 默认使用 Project Reactor 提供的线程池来执行异步操作。但是,你可以根据应用程序的需求进行自定义线程池配置。以下是一些相关原理和最佳实践:

  • 调度器(Schedulers):WebClient 使用调度器来管理线程,例如 Schedulers.elastic() 用于 CPU 密集型操作,Schedulers.parallel() 用于 I/O 密集型操作。你可以通过 publishOnsubscribeOn 方法来切换调度器。
webClient.get().uri("/todos/1").retrieve().bodyToMono(Todo.class).subscribeOn(Schedulers.elastic()) // 切换订阅线程.publishOn(Schedulers.parallel()) // 切换发布线程.subscribe(result -> {// 处理响应});
  • 自定义线程池:如果需要更精细的线程控制,你可以创建自定义的线程池,并在调度器中使用它。这对于控制并发度和资源管理非常有用。
ExecutorService customExecutorService = Executors.newFixedThreadPool(10);webClient.get().uri("/todos/1").retrieve().bodyToMono(Todo.class).subscribeOn(Schedulers.fromExecutor(customExecutorService)).subscribe(result -> {// 处理响应});

自定义WebClient连接池

WebClient 使用 Reactor Netty 作为底层的 HTTP 客户端,它管理着连接池。连接池是一组可重用的连接,以提高性能和资源利用率。以下是有关连接池的原理和最佳实践:

  • 连接池大小:连接池的大小可以通过 WebClient 的配置进行设置。默认情况下,它会根据应用程序的需求动态分配连接。你可以使用 HttpClientmaxConnections 方法来设置最大连接数。
HttpClient httpClient = HttpClient.create().maxConnections(50); // 设置最大连接数为 50WebClient webClient = WebClient.builder().clientConnector(new ReactorClientHttpConnector(httpClient)).baseUrl("https://jsonplaceholder.typicode.com").build();
  • 连接超时:你可以配置连接超时和读写超时,以确保请求不会永远等待。这对于防止应用程序被慢速或不响应的服务挂起非常重要。
HttpClient httpClient = HttpClient.create().responseTimeout(Duration.ofSeconds(10)) // 设置响应超时时间为 10 秒.doOnConnected(connection -> {connection.addHandlerLast(new ReadTimeoutHandler(10)); // 设置读取超时connection.addHandlerLast(new WriteTimeoutHandler(10)); // 设置写入超时});WebClient webClient = WebClient.builder().clientConnector(new ReactorClientHttpConnector(httpClient)).baseUrl("https://jsonplaceholder.typicode.com").build();
  • 默认情况下,WebClient 会自动管理连接池,无需手动配置。
  • 通常情况下,你不需要担心连接池的具体细节,因为它会在后台自动处理连接的创建、重用和关闭。
  • 如果你需要更精细的控制,可以考虑配置连接池的大小和超时设置。

开启日志

在开发和调试期间,启用详细的 WebClient 日志记录可以帮助你识别问题。你可以使用 Spring Boot 的日志配置来启用 WebClient 的日志记录。在 application.propertiesapplication.yml 中添加以下配置:

logging.level.org.springframework.web.reactive.function.client=DEBUG
logging.level.reactor.netty.http.client=DEBUG
logging.level.reactor.netty.tcp.client=DEBUG

这将为 WebClient 的 HTTP 请求和响应生成详细的日志信息,包括请求头、响应头和响应体。

WebClient 提供了一种简单的方法来记录请求和响应的日志,以便进行调试和监控。你可以通过添加过滤器来实现日志记录。以下是一个示例:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
import org.springframework.web.reactive.function.client.WebClient;@Configuration
public class WebClientConfig {@Beanpublic WebClient.Builder webClientBuilder() {return WebClient.builder().filter(logRequest()).filter(logResponse());}private ExchangeFilterFunction logRequest() {return ExchangeFilterFunction.ofRequestProcessor(request -> {// 记录请求日志System.out.println("Request: " + request.method() + " " + request.url());return Mono.just(request);});}private ExchangeFilterFunction logResponse() {return ExchangeFilterFunction.ofResponseProcessor(response -> {// 记录响应日志System.out.println("Response: " + response.statusCode());return Mono.just(response);});}// ... 省略其他配置
}

在上面的示例中,我们定义了两个过滤器 logRequestlogResponse,分别用于记录请求和响应的日志。你可以根据需要将日志输出到日志文件或其他监控工具。

错误处理

WebClient 提供了多种处理错误的方式。在上面的示例中,我们使用了 onStatus 方法来处理特定的 HTTP 状态码。这里有一些更多的错误处理策略和最佳实践:

  • onStatus 方法onStatus 方法允许你根据 HTTP 响应的状态码来处理错误。你可以根据需要定义不同的处理逻辑,例如重试、返回默认值或引发自定义异常。
  • onErrorResume 方法:使用 onErrorResume 方法,你可以在出现错误时返回一个备用的 Mono。这可以用于从缓存中获取数据或返回默认值。
  • 全局错误处理器:你可以注册一个全局错误处理器来处理 WebClient 的全局错误,例如连接失败、超时等。通过 ExchangeStrategies 可以定义一个全局错误处理器。
@Bean
public WebClient.Builder webClientBuilder() {return WebClient.builder().exchangeStrategies(ExchangeStrategies.builder().codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(16 * 1024 * 1024)).build()).clientConnector(new ReactorClientHttpConnector(HttpClient.newConnection().compress(true).resolver(DefaultAddressResolverGroup.INSTANCE))).baseUrl("https://jsonplaceholder.typicode.com").filter((request, next) -> next.exchange(request).doOnError(throwable -> {// 全局错误处理逻辑}));
}

WebClient 提供了丰富的错误处理机制,可以通过 onStatus 和其他方法来捕获和处理不同类型的错误。例如,onStatus 方法用于根据 HTTP 响应的状态码来处理错误。你可以在 onStatus 方法中返回一个 Mono.error,将错误包装成异常并传播。

最佳实践

  • 使用 onStatus 处理不同的 HTTP 状态码。这可以让你根据状态码执行不同的错误处理逻辑。
  • 使用 onErrorResume 来提供备用值或执行备用操作,以确保即使出现错误,也能返回有意义的响应。
  • 使用 onErrorReturn 来在发生错误时返回一个默认值。
.retrieve()
.onStatus(HttpStatus::is4xxClientError, response -> {// 处理 4xx 错误,或返回备用值return Mono.error(new CustomException("Client error: " + response.statusCode()));
})
.onStatus(HttpStatus::is5xxServerError, response -> {// 处理 5xx 错误,或返回备用值return Mono.error(new CustomException("Server error: " + response.statusCode()));
})
.bodyToMono(Todo.class)    
.onErrorResume(CustomClientException.class, ex -> {// 处理自定义客户端异常return Mono.just(createDefaultTodo()); // 返回一个默认值
})
.onErrorResume(CustomServerException.class, ex -> {// 处理自定义服务器异常return Mono.error(new CustomFallbackException("Fallback error: " + ex.getMessage()));
}) 
.onErrorResume(Exception.class, error -> {// 处理其他类型的异常,或返回备用值return Mono.just(new DefaultResponse());
})
.onErrorReturn(CustomClientException.class, createDefaultTodo()) // 在客户端错误时返回默认值
.onErrorReturn(CustomServerException.class, createDefaultTodo()) // 在服务器错误时返回默认值    
.doOnError(error -> {// 在发生错误时执行的操作System.err.println("Error occurred: " + error.getMessage());})

示例

异步请求

client.get().uri(URLConstants.URL).header(URLConstants.API_KEY_NAME, URLConstants.API_KEY_VALUE).retrieve().bodyToMono(String.class).subscribe(result->System.out.println(result));// 创建 WebClient 实例WebClient webClient = webClientBuilder().baseUrl("https://jsonplaceholder.typicode.com").build();// 执行 GET 请求Mono<ResponseEntity<String>> responseMono = webClient.get().uri("/posts/1").retrieve().toEntity(String.class);// 订阅响应并处理结果responseMono.subscribe(responseEntity -> {if (responseEntity.getStatusCode().is2xxSuccessful()) {System.out.println("Response Body: " + responseEntity.getBody());} else {System.err.println("Request failed with status code: " + responseEntity.getStatusCode());}});

以非阻塞的方式订阅subscribe(),该方法返回Mono的包装器。

同步请求

虽然Spring WebClient是异步的,但是我们仍然可以通过调用阻塞线程直到执行结束的方法block()来进行同步调用。方法执行后我们得到结果。

 String block = webClient.get().uri("https://jsonplaceholder.typicode.com/t3odos/1").retrieve().onStatus(HttpStatus::is4xxClientError, response -> {// 处理 4xx 错误,或返回备用值return Mono.error(new RuntimeException("Client error: " + response.statusCode()));}).onStatus(HttpStatus::is5xxServerError, response -> {// 处理 5xx 错误,或返回备用值return Mono.error(new RuntimeException("Server error: " + response.statusCode()));}).bodyToMono(String.class).onErrorResume(RuntimeException.class, ex -> {// 处理自定义异常 // 返回一个默认值return Mono.just("createDefaultTodo()");}).doOnError(error -> {// 在发生错误时执行的操作System.out.println("Error occurred: " + error.getMessage());}).block(Duration.ofSeconds(10));System.out.println(block);String result = client.post().uri("https://reqbin.com/echo/post/json").body(BodyInserters.fromValue(prepareRequest())).exchange().flatMap(response -> response.bodyToMono(String.class)).block();System.out.println("result::" + result);private String prepareRequest() {var values = new HashMap<String, String>() {{put("Id", "12345");put("Customer", "Roger Moose");put("Quantity", "3");put("Price", "167.35");}};var objectMapper = new ObjectMapper();String requestBody;try {requestBody = objectMapper.writeValueAsString(values);} catch (JsonProcessingException e) {e.printStackTrace();return null;}return requestBody;}}

创建了一个 JSON 字符串prepareRequest(),然后将该字符串作为请求正文发送到 HTTPPOST方法中。

exchange()与之前使用的retrieve()方法相比,该方法通过提供对来自 HTTP 客户端的响应的访问来提供更多控制。

上传文件

Mono<HttpStatus> httpStatusMono = webClient.post().uri(url).contentType(MediaType.APPLICATION_PDF).body(BodyInserters.fromResource(resource)).exchangeToMono(response -> {if (response.statusCode().equals(HttpStatus.OK)) {return response.bodyToMono(HttpStatus.class).thenReturn(response.statusCode());} else {throw new ServiceException("Error uploading file");}});
// 从多部分资源上传文件
MultipartBodyBuilder builder = new MultipartBodyBuilder();
builder.part("file", multipartFile.getResource());Mono<HttpStatus> httpStatusMono = webClient.post().uri(url).contentType(MediaType.MULTIPART_FORM_DATA).body(BodyInserters.fromMultipartData(builder.build())).exchangeToMono(response -> {if (response.statusCode().equals(HttpStatus.OK)) {return response.bodyToMono(HttpStatus.class).thenReturn(response.statusCode());} else {throw new ServiceException("Error uploading file");}});

重试

// 使用retry方法
public Mono<String> getData(String stockId) {return webClient.get().uri(PATH_BY_ID, stockId).retrieve().bodyToMono(String.class).retry(3); // 无论 Web 客户端返回什么错误,这都会重试最多 3 次。
}
// 使用retryWhen方法的可配置策略
public Mono<String> getData(String stockId) {return webClient.get().uri(PATH_BY_ID, stockId).retrieve().bodyToMono(String.class).retryWhen(Retry.max(3));
}
// 固定延迟重试
public Mono<String> getData(String stockId) {return webClient.get().uri(PATH_BY_ID, stockId).retrieve().bodyToMono(String.class)// 尝试之间有两秒的延迟,这可能会增加成功的机会.retryWhen(Retry.fixedDelay(3, Duration.ofSeconds(2)));
}
// 不是按固定间隔重试
public Mono<String> getData(String stockId) {return webClient.get().uri(PATH_BY_ID, stockId).retrieve().bodyToMono(String.class)// 会逐渐增加尝试之间的延迟- 大约为 2 秒、4 秒,然后是 8 秒.retryWhen(Retry.backoff(3, Duration.ofSeconds(2)));
}
// 抖动重试 为计算的延迟间隔增加了随机性
public Mono<String> getData(String stockId) {return webClient.get().uri(PATH_BY_ID, stockId).accept(MediaType.APPLICATION_JSON).retrieve().bodyToMono(String.class).retryWhen(Retry.backoff(3, Duration.ofSeconds(2)).jitter(0.75));
}

过滤错误

服务中的任何错误都将导致重试尝试,包括 4xx 错误,例如400:Bad Request或401:Unauthorized。

显然,我们不应该重试此类客户端错误,因为服务器响应不会有任何不同。因此,让我们看看如何仅在出现特定错误的情况下应用重试策略

public Mono<String> getData(String stockId) {return webClient.get().uri(PATH_BY_ID, stockId).retrieve()// 当是5xx 错误的异常,返回我们自定义的异常.onStatus(HttpStatus::is5xxServerError, response -> Mono.error(new ServiceException("Server error", response.rawStatusCode()))).bodyToMono(String.class).retryWhen(Retry.backoff(3, Duration.ofSeconds(5))// 仅在抛出ServiceException时重试.filter(throwable -> throwable instanceof ServiceException));
}

所有重试尝试均不成功的时候。在这种情况下,该策略的默认行为是传播 RetryExhaustedException ,包装最后一个错误。

public Mono<String> getData(String stockId) {return webClient.get().uri(PATH_BY_ID, stockId).retrieve().onStatus(HttpStatus::is5xxServerError, response -> Mono.error(new ServiceException("Server error", response.rawStatusCode()))).bodyToMono(String.class).retryWhen(Retry.backoff(3, Duration.ofSeconds(5)).filter(throwable -> throwable instanceof ServiceException)// 一系列失败的重试结束后,请求将失败并出现 ServiceException       .onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> {throw new ServiceException("External Service failed to process after max retries", HttpStatus.SERVICE_UNAVAILABLE.value());}));
}

错误处理

使用onStatus

onStatus是一种内置机制,可用于处理WebClient响应。这使我们能够根据特定响应(例如 400、500、503 等)或状态类别(例如 4XX 和 5XX 等)应用细粒度的功能:

WebClient.builder().build().post().uri("/some-resource").retrieve().onStatus(HttpStatus.INTERNAL_SERVER_ERROR::equals,response -> response.bodyToMono(String.class).map(Exception::new))

onStatus方法需要两个参数。第一个是接收状态代码的谓词。第二个参数的执行基于第一个参数的输出。第二个是将响应映射到Mono或异常的函数。

示例为,如果我们看到 INTERNAL_SERVER_ERROR (即 500),我们将使用bodyToMono获取主体,然后将其映射到新的Exception。

我们可以链接onStatus调用,以便能够为不同的状态条件提供功能:

Mono<String> response = WebClient.builder().build().post().uri("some-resource").retrieve().onStatus( HttpStatus.INTERNAL_SERVER_ERROR::equals,response -> response.bodyToMono(String.class).map(CustomServerErrorException::new)) .onStatus(HttpStatus.BAD_REQUEST::equals,response -> response.bodyToMono(String.class).map(CustomBadRequestException::new))... .bodyToMono(String.class);// do something with responsewebClient.get().uri("/todos/{id}", id).retrieve().onStatus(HttpStatus::is4xxClientError, response -> {// 处理 4xx 错误return Mono.error(new CustomException("Client error: " + response.statusCode()));}).onStatus(HttpStatus::is5xxServerError, response -> {// 处理 5xx 错误return Mono.error(new CustomException("Server error: " + response.statusCode()));}).bodyToMono(Todo.class);

现在onStatus调用映射到我们的自定义异常。我们为这两种错误状态分别定义了异常类型。onStatus方法允许我们使用我们选择的任何类型。

使用ExchangeFilterFunction

ExchangeFilterFunction是处理特定状态代码和获取响应正文的另一种方法 。与onStatus不同,交换过滤器非常灵活,适用于基于任何布尔表达式的过滤器功能。

我们可以受益于ExchangeFilterFunction的灵活性,涵盖与onStatus函数相同的类别。

处理返回的逻辑:

private static Mono<ClientResponse> exchangeFilterResponseProcessor(ClientResponse response) {HttpStatus status = response.statusCode();if (HttpStatus.INTERNAL_SERVER_ERROR.equals(status)) {return response.bodyToMono(String.class).flatMap(body -> Mono.error(new CustomServerErrorException(body)));}if (HttpStatus.BAD_REQUEST.equals(status)) {return response.bodyToMono(String.class).flatMap(body -> Mono.error(new CustomBadRequestException(body)));}return Mono.just(response);
}

接下来,我们将定义过滤器并使用对处理程序的方法引用:

ExchangeFilterFunction errorResponseFilter = ExchangeFilterFunction.ofResponseProcessor(WebClientStatusCodeHandler::exchangeFilterResponseProcessor);

与onStatus调用类似,我们在错误时映射到异常类型。但是,使用Mono.error会将此异常包装ReactiveException 中。处理错误时应牢记这种嵌套。

现在我们将其应用于WebClient的实例,以达到与onStatus链式调用相同的效果:

Mono<String> response = WebClient.builder().filter(errorResponseFilter).build().post().uri("some-resource").retrieve().bodyToMono(String.class);// do something with response

参考

  • https://reflectoring.io/comparison-of-java-http-clients/
  • https://reflectoring.io/spring-webclient/
  • https://docs.flydean.com/spring-framework-documentation5/webreactive/2.webclient
  • https://blog.hanqunfeng.com/2020/04/18/http-utils/#WebClientUtil

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/78130.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于Protege的知识建模实战

一.Protege简介、用途和特点 1.Protege简介 Protege是斯坦福大学医学院生物信息研究中心基于Java开发的本体编辑和本体开发工具&#xff0c;也是基于知识的编辑器&#xff0c;属于开放源代码软件。这个软件主要用于语义网中本体的构建&#xff0c;是语义网中本体构建的核心开发…

高阶导数的概念与公式

目录 高阶导数的概念 常用的高阶导数的公式 隐函数补充 反函数补充 高阶导数的概念 高阶导数是指一阶或二阶及以上的导数。这些导数可以通过连续进行一阶导数的计算来得到。然而&#xff0c;实际计算高阶导数时&#xff0c;存在一些问题&#xff0c;例如对抽象函数高阶导数…

遇见问题:使用mybaties向数据库中插入数据,idea显示插入成功,但是数据库中并没有数据变化?

遇见问题&#xff1a;使用mybaties向数据库中插入数据&#xff0c;idea显示插入成功&#xff0c;但是数据库中并没有数据变化? 可能的原因有几种&#xff1a; 没有提交事务&#xff1a;在使用 MyBatis 进行数据库操作时&#xff0c;需要手动提交事务。你可以在插入数据完成后…

北京映急物流有限公司 面试.net软件工程师岗位

请实现以下算法&#xff0c;语言不限&#xff0c;也可以是伪代码。 1.有一个数组 a[1000]存放了1000整数&#xff0c;这 1000 个数都大于等于 1&#xff0c;小于等于999&#xff0c;并且只有两个数是相同的,剩下的 998 个数均不相同。请写一个最优搜索算法&#xff0c;找出相同…

西门子S7-1200F或1500F系列安全PLC的组态步骤和基础编程(一)

西门子S7-1200F或1500F系列安全PLC的组态步骤和基础编程(一) 第一部分:组态配置 具体步骤可参考以下内容: 如下图所示,新建一个项目后,添加一个安全型PLC,这里以1516F-3 PN/DP为例进行说明, 如下图所示,添加CPU完成后,可以看到左侧的项目树中比普通的PLC多了几个选项…

Markdown和PlantUML的基本使用

首先需要在VS Code中安装Markdown extention和plantUML插件 测试标题 这是测试标题&#xff0c;使用一个#号配合标题 测试1级标题 这是1级测试标题&#xff0c;使用2个#号配合标题 测试2级标题 这是2级测试标题&#xff0c;使用3个#号配合标题 这里是多级列表 Part A S…

leetcode 232 用栈实现队列

请你仅使用两个栈实现先入先出队列。队列应当支持一般队列支持的所有操作&#xff08;push、pop、peek、empty&#xff09;&#xff1a; 实现 MyQueue 类&#xff1a; void push(int x) 将元素 x 推到队列的末尾int pop() 从队列的开头移除并返回元素int peek() 返回队列开头…

软件测试基础

什么是软件?程序,文档,服务,数据什么是软件测试?尽快尽早的发现中软件存在错误,贯穿整个软件生命周期的确定和验证的过程。项目流程:需求分析 概要设计 详细设计 编码 测试 验收项目的开发模型 瀑布模型 优点:规范了项目的流程 缺点:测试介入的太晚,…

【Docker】Docker简介

Docker简介 &#x1f4cb;导航 1. Docker简介1.1 什么是Docker&#xff1f;1.2 什么是容器&#xff1f;1.3 容器的优势&#xff1f;1.4 Docker的优势&#xff1f;1.5 虚拟技术与容器技术Docker的区别&#xff1f;1.6 为什么学习Docker? 2. 安装Docker3. Docker架构4. Docker命…

Vue3样式绑定

文章目录 Vue3样式绑定1. class 属性绑定1.1 v-bind:class 设置一个对象&#xff0c;从而动态的切换 class1.2 在对象中传入更多属性用来动态切换多个 class1.3 直接绑定数据里的一个对象1.4 绑定一个返回对象的计算属性。这是一个常用且强大的模式1. 5 数据语法1.6 errorClass…

算法通关村第十五关——位运算在查找重复元素中的妙用

前言 大部分算法默认给定的数据量都比较小&#xff0c;只有几个或者几十个元素&#xff0c;但是如果将数据量提高到百万甚至几十亿&#xff0c;那么处理逻辑就会发生很大差异。在海量数据中&#xff0c;普通数据结构都无效了&#xff0c;因为内存空间放不下&#xff0c;常规的…

JAsper:专注于营销领域的AIGC

【产品介绍】 Jasper 由 Dave Rogenmoser&#xff08;CEO&#xff09;、Chris Hull&#xff08;COO&#xff09;和 John Phillip Morgan&#xff08;CTO&#xff09;在2021 年成立&#xff0c;是一款领先的 AI 营销工具以及写作助手。整个jasper官网都会强调自己对营销领域的理…

了解冒泡排序

package com.mypackage.array;import java.util.Arrays;public class Demo07 {public static void main(String[] args) {int[] a {3,2,6,7,4,5,6,34,56,7};int[] sort1 sort1(a); //调用我们自己写的排序方法后&#xff0c;返回一个排序后的数组System.out.println(Array…

Spring Boot 下载文件(word/excel等)文件名中文乱码问题|构建打包不存在模版文件(templates等)

Spring Boot 下载文件(word/excel等)文件名中文乱码问题&#xff5c;构建打包不存在模版文件(templates等) 准备文件&#xff0c;这里我放在resource下的templates路径 在pom中配置构建打包的资源&#xff0c;更新maven 如果使用了assembly打包插件这样配置可能仍不生效&#…

台积电的战略布局:“曲线”抢单 | 百能云芯

郭明錤最新的分析引发了广泛关注&#xff0c;他指出台积电采取了一系列重大战略投资举措&#xff0c;旨在争夺未来的半导体订单&#xff0c;尤其是来自苹果和英伟达的12纳米订单。这些战略举措包括认购英特尔手中的IMS Nanofabrication Global股权以及投资安谋&#xff08;Arm&…

【C++学习笔记】野指针的定义与避免

1.野指针的定义 指向非法的内存地址指针叫作野指针&#xff08;Wild Pointer&#xff09;&#xff0c;也叫悬挂指针&#xff08;Dangling Pointer&#xff09;&#xff0c;意为无法正常使用的指针。 2.出现野指针的常见情形 2.1使用未初始化的指针 出现野指针最典型的情形就…

单个vue echarts页面

<template> <div ref"history" class"echarts"></div> </template> <script> export default{ data () { return {}; }, methods: { history(){ let myChart this.$echarts.init(this.$refs.history); // 绘制图表 myCha…

Linux:基础开发工具之yum,vim,gcc的使用

文章目录 yumvimgcc 本篇主要总结的是Linux下开发工具 yumvimgcc/g yum 什么是yum&#xff1f; 不管是在手机移动端还是pc端&#xff0c;不管是什么操作系统&#xff0c;当用户想要下载一些内容或者工具的时候&#xff0c;都需要到一个特定的位置进行下载&#xff0c;例如在…

点云从入门到精通技术详解100篇-从全局到局部的三维点云细节差异分析

目录 前言 国内外研究现状 细节差异分析相关研究 三维点云的相似性相关研究 存在的问题 三维点云对比的相关技术 2.1 三维点云的采集设备 2.2三维点云的存储格式 2.3三维点云的空间变换 2.4三维点云相似度分析 2.4.1点云特征的提取 2.4.2特征相似度计算 本文篇幅较长&#xff0…

获取Windows 10中的照片(旧版)下载

Windows 10中的新版照片应用&#xff0c;目前发现无法直接打开部分iOS设备上存储的照片。需要使用照片&#xff08;旧版&#xff09;才行。 但目前应用商店中无法直接搜索到照片&#xff08;旧版&#xff09;&#xff0c;因此笔者提供如下链接&#xff0c;可以直接访问并呼出W…