Spring WebFlux:响应式编程

在软件开发领域,随着互联网应用的规模和复杂性不断增加,传统的编程模型逐渐暴露出一些局限性,尤其是在面对高并发、大规模数据流处理等场景时。为了应对这些挑战,响应式编程(Reactive Programming)应运而生,它提供了一种更为高效、灵活的编程范式,以适应不断变化的系统需求。

1.Spring WebFlux 简介

WebFlux提供了一个非阻塞、异步的Web框架,允许开发者构建高性能、可伸缩的 Web 应用程序,特别适合处理大量并发连接,如在微服务架构和云环境中。

WebFlux是Spring Framework 5引入的一个重要组件,它代表了Spring对于响应式编程(Reactive Programming)的支持。

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

1.1.异步非阻塞

异步非阻塞是一种编程模式,它允许程序在等待某个操作完成时继续执行其他任务。这种模式是基于事件循环,可以在单个线程上处理多个I/O操作,大幅提高了系统的吞吐量和伸缩性。

  • 服务器端处理: WebFlux 使用 Netty 或 Undertow 这类非阻塞服务器,它们能够处理大量连接而不阻塞线程。这意味着服务器可以在单个线程中同时处理多个请求,提高了资源利用率和吞吐量。
  • 数据库访问: 通过使用 R2DBC( Reactive Relational Database Connectivity)或其他支持响应式编程的数据库客户端,可以在数据库查询中实现异步操作,从而避免线程等待数据库响应造成的阻塞。
  • 异步API调用: 在处理外部服务调用时,可以利用 WebClient 进行异步HTTP请求,WebClient 是完全非阻塞的,能够在等待响应的同时处理其他任务。

1.2.响应式流(Reactive Streams)

响应式流是一个规范,它定义了异步流处理的接口和行为。

1.2.1.响应式编程

响应式编程是一种编程范式,是一种异步的、事件驱动的编程范式,它特别适用于构建能够处理实时数据流的应用程序。在这种模型中,数据和事件作为流进行处理,允许开发者以声明式的方式构建复杂的异步逻辑。

Spring WebFlux 遵循这一规范,使用 Publisher 作为响应式流的源头,Subscriber 作为流的消费者。这种模型允许开发者以声明式的方式处理异步数据流,同时保持了流的控制和背压管理。

  • 数据流处理: Spring WebFlux 集成了 Reactor 库,使用 Flux 和 Mono 类型来处理数据流。Flux 表示0到N个元素的异步序列,Mono 表示0或1个元素的异步结果,两者都支持背压策略,能够智能调整数据生产速度以匹配消费者的处理能力。
  • 事件驱动编程: 应用程序可以轻松地处理各种事件,如消息队列中的消息、WebSocket连接上的事件等,通过响应式流模型,可以高效地处理这些事件而不会阻塞主线程。

1.2.2.背压管理

背压是响应式流中的一个重要概念,用于控制生产者和消费者之间的数据流速率。Project Reactor 提供了多种背压策略,帮助开发者处理数据流过载的问题。

1.2.3.Project Reactor

Project Reactor 用于创建和操作响应式流的一组丰富的API。

Project Reactor 是一个基于Java 8的响应式编程库,由Pivotal团队开发,专为配合Spring框架使用而设计。

响应式类型:Mono 和 Flux
  • Mono:代表0到1个元素的响应式类型,适合表示单个结果或无结果的异步操作。
  • Flux:代表0到N个元素的响应式类型,用于表示多个结果的异步序列。
操作符与响应式数据流

Project Reactor 提供了大量操作符,用于处理响应式流中的元素。这些操作符包括:

  • Map:将流中的每个元素应用一个函数,并发布结果。
  • Filter:根据条件过滤流中的元素。
  • FlatMap:将流中的每个元素转换为另一个流,并将结果流合并为一个流。
  • SwitchIfEmpty:如果源流为空,则切换到备选的Mono或Flux。

1.2.4.与传统Spring MVC的比较

Spring MVC是一个基于Servlet API的Web框架,它采用阻塞I/O模型,每个请求/响应对都与一个线程绑定。这在并发量较低时表现良好,但在高并发场景下,线程资源的消耗会急剧增加。

相比之下,Spring WebFlux基于响应式流,它不依赖于Servlet API,可以在如Netty、Undertow等非Servlet服务器上运行。这种模型使得WebFlux能够以非阻塞的方式处理并发请求,有效利用资源,提高性能。

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

1.3.函数式编程

函数式编程是一种编程范式 , 它强调的是将任务看作一系列可组合的函数调用。通过声明式的方式定义处理流程,让代码更简洁、易读,也更适合处理复杂的异步逻辑。WebFlux采用函数式编程范式,利用Lambda表达式简化了编程模型,路由和请求处理采用函数式编程的方式进行定义,这与传统的基于注解的控制器方法截然不同。

  • 路由与处理: WebFlux 提供了函数式编程模型,允许开发者使用 Java 8 的 Lambda 表达式来定义路由规则和处理函数,使得代码更为简洁、可读性强。例如,可以使用 RouterFunctions.route() 方法来定义路由,使用 ServerResponse 来构建响应。
  • 链式操作与组合: 利用响应式类型 Flux 和 Mono 的丰富操作符,如 map(), filter(), flatMap() 等,可以轻松地构建复杂的异步数据处理流程,而无需显式管理回调或线程。

1.3.1.请求路由

使用 RouterFunction 定义请求路由

RouterFunction 是Spring WebFlux中用于定义请求路由的函数接口。通过实现 RouterFunction,可以精确控制请求的匹配和处理。

路由谓语和处理器
  • 路由谓语:用于匹配HTTP请求的特定属性,如路径、方法、头部等。

  • 处理器:一旦路由谓语匹配成功,处理器将负责处理请求并返回响应。

1.3.2.函数式端点

Spring WebFlux 还引入了函数式端点的概念,允许开发者以简单的函数形式处理请求和生成响应,这些函数通常返回 ServerResponse

1.3.3.函数式编程与响应式流

函数式编程是响应式编程模型的一个重要组成部分。它提倡使用无副作用的函数、不可变数据结构,并且推崇声明式编程。这些原则与响应式流的概念相契合,响应式流强调数据流的声明式处理,以及在数据流中应用各种操作符来转换、过滤和组合数据。

2.Spring WebFlux 应用搭建

2.1 环境准备

项目依赖配置

基于Maven的Springboot项目,pom.xml文件中的依赖配置可能如下所示:

 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

2.2 定义路由与处理器

2.2.1.创建 RouterFunction Bean

在Spring WebFlux中,使用RouterFunction来定义请求的路由。

首先,创建一个配置类,并在其中定义路由:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.RequestPredicates;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;@Configuration
public class WebFluxConfig {@Beanpublic RouterFunction<ServerResponse> route(MyHandler handler) {return RouterFunctions.route(GET("/hello"), handler::hello);}
}
  • RouterFunctions.route() 是用来创建路由规则的起点。
  • GET("/hello") 是来自RequestPredicates的静态方法,定义了一个谓词,用于匹配HTTP GET方法并且路径为"/hello"的请求。
  • handler::hello 是一个方法引用,指向MyHandler类中名为hello的方法。这意味着当匹配到上述HTTP请求条件时,会调用handler.hello()方法来处理请求,并期待它返回一个ServerResponse对象作为响应。
2.2.2.使用 HandlerFunction 处理请求

创建一个处理器类,它将包含处理请求的方法。这些方法可以返回Mono<ServerResponse>Flux<ServerResponse>,这取决于它们处理的是单个响应还是响应流:

import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;@Component
public class MyHandler {public Mono<ServerResponse> hello(ServerRequest request) {String name = request.queryParam("name").orElse("World");String message = "Hello, " + name + "!";return ServerResponse.ok().contentType(MediaType.TEXT_PLAIN).body(Mono.just(message), String.class);}
}

hello的方法,用于处理HTTP请求并返回一个响应。

  • Mono<ServerResponse>:返回类型是一个Mono对象,这是Reactor库中的一个类,用于表示0到1个元素的异步序列。在这里,它最终会包含一个ServerResponse对象,即HTTP响应。使用Mono是为了支持非阻塞和响应式编程。

  • ServerRequest request:输入参数,表示接收到的HTTP请求信息。

提取查询参数:

String name = request.queryParam("name").orElse("World");

这行代码从ServerRequest对象中尝试获取名为"name"的查询参数。如果请求中包含了这个参数,比如http://example.com/hello?name=John,那么name变量就会被赋值为"John";如果没有提供,则使用默认值"World"。

构建HTTP响应的:

  • ServerResponse.ok():创建一个表示成功(HTTP状态码200 OK)的基础响应。
  • .contentType(MediaType.TEXT_PLAIN):设置响应的内容类型为纯文本(PLAIN TEXT)。
  • .body(Mono.just(message), String.class):指定响应体的内容。这里使用Mono.just(message)来包装问候消息字符串,表明响应体是一个包含单个元素的异步序列,其类型为String。这确保了整个处理过程是非阻塞的。

2.3 全局异常处理

全局异常处理是任何Web应用程序的重要部分

import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;@Component
@Order(Ordered.HIGHEST_PRECEDENCE)
public class GlobalExceptionHandler {public Mono<Void> handleException(ServerWebExchange exchange, Throwable ex) {ServerHttpResponse response = exchange.getResponse();response.setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);DataBuffer buffer = response.bufferFactory().wrap("{\"error\": \"Internal Server Error\"}".getBytes());return response.writeWith(Mono.just(buffer));}
}

这段代码定义了一个全局异常处理器,用于在Spring WebFlux应用中捕获未处理的异常,并向客户端返回统一的错误响应。下面是各部分的详细说明:

@Order(Ordered.HIGHEST_PRECEDENCE)

  • @Order注解用于指定组件的执行顺序,特别是当有多个相同类型的组件(比如多个异常处理器)需要按特定顺序执行时。Ordered.HIGHEST_PRECEDENCE是一个常量,值为Integer.MIN_VALUE,意味着这个异常处理器将具有最高优先级,会在所有其他相同类型的组件之前执行。换句话说,如果有其他异常处理器也能够处理相同的异常类型,但没有指定这么高的优先级,那么这个GlobalExceptionHandler将会优先处理异常。

handleException 方法

  • 参数

    • ServerWebExchange exchange:封装了HTTP请求和响应的所有信息,是WebFlux处理请求时的核心对象。
    • Throwable ex:抛出的异常对象,即需要被处理的异常。
  • 功能

    1. 设置响应状态码:首先通过exchange.getResponse()获取响应对象,并将其状态码设置为HttpStatus.INTERNAL_SERVER_ERROR(HTTP 500),表示服务器遇到了未知错误。

    2. 构建响应体:使用response.bufferFactory().wrap()方法创建一个包含错误信息的DataBuffer对象。这里的消息体内容为{"error": "Internal Server Error"},表示发生了内部服务器错误。

    3. 写入响应体并完成响应:最后,通过response.writeWith(Mono.just(buffer))将构建好的错误信息缓冲区写入响应,并返回一个Mono<Void>表示这是一个无返回值的异步操作,完成响应的发送。

3.应用细节

3.1.RouterFunction

RouterFunction 是 Spring WebFlux 提供的一种用于定义和处理 HTTP 路由的功能性接口,它是构建响应式 Web 应用的基础组件之一。与传统的基于注解(如 @GetMapping, @PostMapping 等)的控制器相比,RouterFunction 提供了一种更为灵活和强大的方式来定义路由逻辑,特别适合函数式编程风格。下面详细介绍其用法:

3.1.1. 基本概念

  • RouterFunction: 表示一个路由处理逻辑,它将HTTP请求与相应的处理逻辑关联起来。一个RouterFunction可以匹配一个请求,返回下一个RouterFunction或一个处理结果(HandlerFunction)。
  • HandlerFunction: 表示一个处理逻辑,它接受一个ServerRequest并返回一个Mono<ServerResponse>。即,它负责处理请求并产生响应。

3.1.2. 创建 RouterFunction

创建 RouterFunction 通常涉及定义路由规则和处理逻辑。以下是一个简单的示例,展示如何定义一个路由来处理 GET 请求:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
import static org.springframework.web.reactive.function.server.ServerResponse.ok;
@Configuration
public class GreetingRouter {@Beanpublic  RouterFunction<ServerResponse> routingFunction() {return route(GET("/hello2"), request -> ok().bodyValue("Hello, Spring WebFlux!"));}
}

这段代码定义了一个简单的RouterFunction<ServerResponse>实例,用于处理一个HTTP GET请求到/hello路径的路由逻辑。

return route(GET("/hello"), request -> ok().bodyValue("Hello, Spring WebFlux!"));
  • routeRouterFunctions类中的一个静态方法,用于创建一个基础的路由定义。它接受两个参数:

    • 谓词(Predicate)GET("/hello")是一个谓词,用于匹配HTTP请求的方法和路径。这里匹配所有GET方法并且路径为/hello的请求。
    • 处理器函数(HandlerFunction)request -> ok().bodyValue("Hello, Spring WebFlux!")是一个Lambda表达式,代表了处理请求的具体逻辑。它接受一个ServerRequest对象作为输入,并返回一个Mono<ServerResponse>表示响应结果。
  • request -> ...:Lambda表达式定义了如何根据请求生成响应。

  • ok():这是ServerResponse的静态工厂方法,用于创建一个表示成功(HTTP状态码200 OK)的响应。

  • .bodyValue("Hello, Spring WebFlux!"):此方法设置了响应体的内容为给定的字符串,并且指定了内容类型(默认情况下,如果没有显式指定,会根据内容推断)。

3.1.3. 注册 RouterFunction

要在 Spring Boot 应用中注册 RouterFunction,通常在配置类中声明为 @Bean,以便 Spring 自动发现并配置:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;@Configuration
public class WebConfig {@Beanpublic RouterFunction<ServerResponse> routes(GreetingHandler handler) {return RouterFunctions.route(GET("/hello"), handler::sayHello);}
}

其中,GreetingHandler 是一个包含业务逻辑的 HandlerFunction

3.1.4. 组合 RouterFunction

RouterFunction 可以组合起来形成复杂的路由结构。这使得路由配置更加模块化和可维护。

  • 谓词组合:可以使用 and, or 等逻辑运算符组合谓词,以实现更复杂的路由匹配逻辑。
    @Beanpublic RouterFunction<ServerResponse> route(MyHandler handler) {return RouterFunctions.route(GET("/haha").and(accept(MediaType.TEXT_PLAIN)), handler::haha).andRoute(GET("/reactor").and(accept(MediaType.APPLICATION_JSON)), handler::reactorExample);}
    // 处理简单的文本请求public Mono<ServerResponse> haha(ServerRequest request) {return ServerResponse.ok().contentType(MediaType.TEXT_PLAIN).bodyValue("haha!");}// 处理复杂的响应式请求public Mono<ServerResponse> reactorExample(ServerRequest request) {return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).bodyValue("{'message': 'This is a reactive response!'}");}

3.1.5. 处理请求参数

RouterFunction 可以方便地处理请求参数,无论是路径参数、查询参数还是请求体。例如,处理带路径参数的请求:

public RouterFunction<ServerResponse> userRoutes(UserHandler handler) {return route(GET("/users/{id}"), handler::getUserById);
}

3.1.6. 错误处理

Spring WebFlux 支持全局或局部的错误处理,可以通过提供一个处理特定异常类型的 RouterFunction 实现:

3.1.6.1.自带异常处理
    @Beanpublic RouterFunction<ServerResponse> route(MyHandler handler) {return RouterFunctions.route().GET("/greeting", handler::reactorExample)// 全局错误处理.onError(Exception.class, (exception, request) -> {System.out.println("An exception occurred: " + exception.getMessage());return ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR).contentType(MediaType.TEXT_PLAIN).bodyValue("Oops! Something went wrong.");}).build();}
3.1.6.2.全局异常处理
    // 错误处理路由private RouterFunction<ServerResponse> errorRoute() {return RouterFunctions.route(RequestPredicates.all(),request -> ServerResponse.status(HttpStatus.BAD_REQUEST).contentType(MediaType.TEXT_PLAIN).bodyValue("这是一条用于未匹配请求的回退处理。"));}

RequestPredicates.all(),意味着这个谓词将匹配所有的HTTP请求,无论方法(GET、POST等)或路径是什么。

3.1.7. 高级用法

  • 过滤器与拦截器:可以插入自定义的过滤器或拦截器来处理请求的预处理或后处理逻辑。
  • 条件路由:基于环境、配置或其他条件动态选择路由。

3.2.请求接参

Spring WebFlux 完全支持接收 RESTful 风格的传参。RESTful 风格的接口通常通过URL路径、查询参数、请求头以及请求体来传递参数。在Spring WebFlux中,你可以使用函数式和注解两种方式来定义端点以接收这些参数。下面是几种常见的接收参数方式:

3.2.1. 路径变量(Path Variables)

在路由定义中,你可以使用 {variableName} 来标记路径变量,然后在处理器方法中通过 @PathVariable 注解接收它们。

@Bean
public RouterFunction<ServerResponse> userRoute(UserHandler handler) {return RouterFunctions.route(GET("/users/{id}"), handler::getUserById);
}@Component
public class UserHandler {public Mono<ServerResponse> getUserById(ServerRequest request) {String id = request.pathVariable("id");// 处理逻辑...}
}

3.2.2. 查询参数(Query Parameters)

查询参数可以直接从 ServerRequest 中获取,或者使用 @RequestParam 注解。

public Mono<ServerResponse> searchUsers(ServerRequest request) {String keyword = request.queryParam("keyword").orElse("");// 处理逻辑...
}

或者使用 @RequestParam

public Mono<ServerResponse> searchUsers(@RequestParam(name = "keyword", defaultValue = "") String keyword) {// 处理逻辑...
}

3.2.3. 请求体(Request Body)

对于POST、PUT等方法,你可能需要从请求体中读取数据。可以使用 @RequestBody 注解,并指定相应的对象类型来自动绑定JSON或表单数据。

public Mono<ServerResponse> createUser(@RequestBody UserDTO user) {// 处理逻辑...
}

3.2.4. 请求头(Request Headers)

请求头可以通过 ServerRequest.headers() 获取,或者使用 @RequestHeader 注解。

public Mono<ServerResponse> handleRequestWithHeader(@RequestHeader("Authorization") String authHeader) {// 处理逻辑...
}

3.3.响应内容

3.3.1.ServerResponse对象

在Spring WebFlux中,响应的返回内容通常是通过ServerResponse对象来构建和管理的,它代表了即将发送给客户端的HTTP响应。

3.3.1.1. 状态码(Status Code)

每个HTTP响应都会有一个状态码,用于表示请求的处理结果。在Spring WebFlux中,你可以通过如下方式设置状态码:

  • 使用静态方法,如ServerResponse.ok()表示200 OK,ServerResponse.created()表示201 Created等。
  • 或者直接指定状态码,如ServerResponse.status(HttpStatus.NOT_FOUND)表示404 Not Found。
3.3.1.2. 响应体(Body)

响应体是响应的主要内容,可以是文本、JSON、XML等各种格式的数据。构建响应体的方式有多种:

  • 直接值:使用.bodyValue("响应内容"),适合于简单的字符串或对象直接转换为响应体。
  • 流(Stream).body(fromPublisher(publisher, MediaType)),当响应内容来自一个publisher(如Flux或Mono)时,这种方式非常适合处理异步数据流。
  • 对象转换:结合.body(toEntity(object)).bodyValue(object)结合Jackson等库自动将Java对象转换为JSON等格式的响应体。
3.3.1.3. 内容类型(Content-Type)

通过.contentType(MediaType)方法指定响应的内容类型,如MediaType.APPLICATION_JSON_UTF8表示JSON格式,MediaType.TEXT_PLAIN表示纯文本等。

3.3.1.4. 头部(Headers)

可以在响应中添加自定义头部信息,如.header("X-Custom-Header", "value")

3.3.1.5. 构建响应

一旦定义好状态码、响应体、内容类型和头部等信息,通过.build()方法完成ServerResponse的构建。这一步是必要的,它将之前设置的所有配置整合成一个待发送的响应对象。

3.3.1.6.示例

假设我们要构建一个返回JSON格式数据的响应:

import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;public Mono<ServerResponse> getUserInfo(ServerRequest request) {// 假设getUserDetails()是一个Mono<User>,User是自定义的Java对象Mono<User> userDetails = userService.getUserDetails(request.pathVariable("id"));return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(userDetails, User.class) // 自动转换User对象为JSON.switchIfEmpty(ServerResponse.notFound().build()); // 如果找不到用户,返回404
}

在这个例子中,我们首先指定了响应的状态码为200 OK,内容类型为JSON,然后将从数据库查询到的用户详情(User对象)转换为JSON响应体。如果查询结果为空(即用户不存在),则通过.switchIfEmpty()方法切换到返回404 Not Found的响应。

3.3.2.响应式类型

在响应式编程领域,尤其是在使用Spring WebFlux和Reactor框架时,MonoFlux是两个核心的响应式类型,它们都是Reactor库提供的对Reactive Streams规范的实现。这两种类型都实现了Publisher接口,这意味着它们可以作为异步数据流的生产者,在响应式系统中扮演着至关重要的角色。

3.3.2.1. Mono
  • 概念Mono代表0或1个元素的异步序列。换句话说,它要么发出一个元素,要么发出一个完成信号(表示没有元素),或者发出一个错误信号。这使得它非常适合用于表示单个结果或者空结果的场景,比如数据库查询返回单行记录,或者执行一个可能会失败的操作。

  • 典型用途:数据库查询的单一结果、网络请求的响应、计算单一值的操作等。

  • 操作符Mono提供了丰富的操作符,如mapflatMapzipthen等,用于处理单个数据流的变换、组合和错误处理。

    1. map
    • 功能:对Mono中的元素应用一个函数进行转换。

      Mono<String> monoStr = Mono.just("Hello");
      Mono<Integer> monoLength = monoStr.map(String::length);
      
    2. flatMap
    • 功能:将Mono中的元素转换为另一个MonoFlux,然后扁平化这个结果流,使其成为单一流。

      Mono<User> monoUser = ...; // 假设获取用户信息
      Mono<Address> monoAddress = monoUser.flatMap(user -> getAddressForUser(user.getId()));
      
    3. then
    • 功能:忽略Mono中的元素,仅当前Mono完成时,执行下一个Mono

      Mono<Void> saveUser = userRepository.save(newUser);
      Mono<User> findUser = saveUser.then(userRepository.findById(newUser.getId()));
      
    4. zipWith
    • 功能:将两个Mono的输出按照某种方式结合(通常是元组),只有当两个Mono都成功时才会触发。

      Mono<User> monoUser = ...;
      Mono<Order> monoOrder = ...;
      Mono<Tuple2<User, Order>> combined = monoUser.zipWith(monoOrder);
      
3.3.2.2. Flux
  • 概念Flux代表0到N个元素的异步序列。它可以发出任意数量的元素,包括无限数量,直到它完成或者遇到错误。这使得Flux非常适合处理集合、事件流或者任何可能有多次数据推送的场景。

  • 典型用途:处理列表或集合的数据、实时数据流(如WebSocket消息)、数据库查询的多行结果等。

  • 操作符Flux同样提供了丰富的操作符,如mapfilterconcatMapbufferwindow等,用于处理数据流的变换、过滤、合并、窗口化等操作。

    1. map
    • 功能:对流中的每个元素应用一个函数进行转换。

      Flux<String> names = Flux.fromIterable(Arrays.asList("Alice", "Bob", "Charlie"));
      Flux<Integer> lengths = names.map(String::length);
      
    2. filter
    • 功能:根据条件从流中筛选元素。

      Flux<String> names = ...;
      Flux<String> longNames = names.filter(name -> name.length() > 5);
      
    3. flatMap
    • 功能:将流中的每个元素转换为一个新的流,然后将这些流合并成一个单一的流。

      Flux<User> users = ...;
      Flux<Order> orders = users.flatMap(user -> getOrderListForUser(user.getId()));
      
    4. buffer
    • 功能:将流中的元素收集到缓冲区中,达到一定条件(如数量或时间)后作为一个列表或数组发出。

      Flux<Integer> numbers = Flux.interval(Duration.ofMillis(100)).take(10);
      Flux<List<Integer>> buffered = numbers.buffer(3); // 每3个元素打包一次
      
    5. concatMap
    • 功能:类似于flatMap,但保证按源流的顺序依次处理每个元素产生的流。

      Flux<User> users = ...;
      Flux<Order> ordersSequential = users.concatMap(user -> getOrderListForUser(user.getId()));
      
3.3.3.3.共同特点
  • 响应式MonoFlux都是非阻塞的,支持背压(Backpressure),能够在数据生产者和消费者之间自动调节数据流动的速度,防止生产过快导致消费端处理不过来的情况。

  • 链式操作:两者都支持链式调用操作符,允许以声明式的方式构建复杂的异步数据处理流程,而无需显式地管理线程或回调。

  • 异步和事件驱动:它们的设计哲学鼓励编写异步和事件驱动的代码,提高了系统的可伸缩性和资源利用率。

3.3.3.4.转换关系

MonoFlux之间可以通过操作符互相转换,例如,Mono可以通过flatMapMany转换为Flux,而多个MonoFlux可以通过concatmerge等操作合并。

4.WebClient

WebClient是Spring Framework 5引入的一个非阻塞、响应式的HTTP客户端,它属于Spring WebFlux模块的一部分。它设计用于构建高性能、异步的Web服务客户端,特别适合用于处理大量的并发请求和与响应式服务交互。下面是对WebClient的详细讲解:

核心特性

  1. 非阻塞WebClient基于非阻塞IO,这意味着它在等待数据时不会占用线程,从而能够高效地处理大量并发连接,减少资源消耗。

  2. 响应式:遵循Reactor项目中的响应式编程模型,使用MonoFlux作为返回类型,便于处理异步数据流和背压机制。

  3. 链式调用:提供了一套流畅的API,允许通过链式调用来构建HTTP请求,易于理解和维护。

  4. 配置灵活:支持多种配置选项,包括基础URL、默认头、超时设置、SSL配置等。

  5. 内容协商:自动处理内容类型和编码,支持JSON、XML等多种数据格式的序列化和反序列化。

  6. 过滤器:允许添加自定义过滤器来修改请求或响应,实现日志记录、认证、重试逻辑等。

基本使用

创建WebClient实例
// 最简单的创建方式
WebClient client = WebClient.create();// 配置化创建
WebClient client = WebClient.builder().baseUrl("http://example.com").defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE).build();
发送GET请求
Mono<String> response = client.get().uri("/api/data").retrieve() // 获取响应体.bodyToMono(String.class); // 将响应体转换为String类型
发送POST请求
Mono<String> response = client.post().uri("/api/data").body(BodyInserters.fromObject(someObject)) // 发送对象.retrieve().bodyToMono(String.class);
处理响应
  • 使用.block()方法会阻塞当前线程直到响应到来,这通常只在测试或者非响应式环境中使用。
  • 在响应式环境中,应该通过订阅MonoFlux来处理响应,或者将其与其它响应式流进行组合。
错误处理

可以使用.onErrorResume等操作符来优雅地处理错误情况,例如重试逻辑或返回默认值。

Mono<String> withErrorHandling = client.get().uri("/api/data").retrieve().onErrorResume(WebClientResponseException.class, e -> {// 处理错误,比如返回默认值return Mono.just("Default Value");}).bodyToMono(String.class);

总结

WebClient是一个强大且灵活的工具,它在现代Web应用和服务间通信中扮演着重要角色,特别是当需要构建高性能、可扩展的系统时。通过充分利用响应式编程的优势,开发者可以构建出更加高效、易维护的客户端逻辑。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/12288.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

电商核心技术揭秘56:客户关系管理与忠诚度提升

相关系列文章 电商技术揭秘相关系列文章合集&#xff08;1&#xff09; 电商技术揭秘相关系列文章合集&#xff08;2&#xff09; 电商技术揭秘相关系列文章合集&#xff08;3&#xff09; 文章目录 引言客户关系管理&#xff08;CRM&#xff09;的重要性提升顾客体验数据驱…

Intel HDSLB 高性能四层负载均衡器 — 快速入门和应用场景

目录 文章目录 目录前言与背景传统 LB 技术的局限性HDSLB 的特点和优势HDSLB 的性能参数基准性能数据对标竞品 HDSLB 的应用场景HDSLB 的发展前景参考文档 前言与背景 在云计算、SDN、NFV 高速发展并普遍落地的今天&#xff0c;随着上云业务的用户数量越来越多、数据中心的规模…

umi项目配置之项目构建时配置umirc.ts

对于 umi 中能使用的自定义配置&#xff0c;你可以使用项目根目录的 .umirc.ts 文件或者 config/config.ts&#xff0c;值得注意的是这两个文件功能一致&#xff0c;仅仅是存在目录不同&#xff0c;2 选 1 &#xff0c;.umirc.ts 文件优先级较高 umi 的配置文件是一个正常的 n…

【vivado】 IBERT GT收发器误码率测试

一、前言 IBERT(Integrated Bit Error Ratio Tester),集成误码率测试仪。作为用户来说可以使用这个工具对自己设计的板子中的高速串行收发器进行简单测试&#xff0c;从而判断设计的接口是否有问题。因为这个工具是直接集成到FPGA上&#xff0c;这样一来直接使用这个工具来测试…

STL----push,insert,empalce

push_back和emplace_back的区别 #include <iostream> #include <vector>using namespace std; class testDemo { public:testDemo(int n) :num(n) {cout << "构造函数" << endl;}testDemo(const testDemo& other) :num(other.num) {cou…

实验十 智能手机互联网程序设计(微信程序方向)实验报告

实验目的和要求 完成以下页面设计。 二、实验步骤与结果&#xff08;给出对应的代码或运行结果截图&#xff09; Wxml <view class"container"> <view class"header"> <view class"logo"…

景源畅信:抖音小店比较冷门的品类分享?

在抖音小店的世界里&#xff0c;热门品类总是吸引着众多商家和消费者的目光。然而&#xff0c;就像星空中的繁星&#xff0c;虽不那么耀眼却依然存在的冷门品类同样值得我们关注。它们或许不似服装、美妆那样日进斗金&#xff0c;但正是这些小众市场的存在&#xff0c;为平台带…

Linux 服务器配置共享文件夹(NFS)

一、准备三台 linux 服务器 三台服务器: manger:172.16.11.178 ap1:172.16.11.179 ap2:172.16.11.180 /root/serverfiles/ 为共享目录 二、配置步骤 1、在服务端01的机器上安装nfs和rpcbind程序 yum -y install nfs* yum -y install rpcbind* 2、在安装完nfs以及rpcb…

基于IDEA快速创建一个SpringMVC项目并且配置Tomcat

1&#xff0c;打开IDEA&#xff0c;新建Maven项目【使用web模板创建】 使用社区版的同学创建普通的maven项目&#xff0c;并配置项目的webapp&#xff0c;详情可参考 快速创建一个SpringMVC项目&#xff08;IDEA&#xff09; 2&#xff0c;在main目录下创建Java和resource目录…

2024洗地机爆款榜单,哪个牌子洗地机值得买?助你轻松选对洗地机

随着现代生活节奏的加快&#xff0c;人们对于家庭清洁的需求也越来越高。家用洗地机作为一种高效清洁工具&#xff0c;能够帮助您轻松应对家庭地板的清洁问题&#xff0c;节省时间和精力。然而&#xff0c;在选择洗地机时&#xff0c;究竟哪个牌子的洗地机值得买呢&#xff1f;…

Milvus 简介与核心特性

一、Milvus 概述 Milvus 是一个开源的向量数据库&#xff0c;由 Zilliz 公司发起并维护。它专为处理非结构化数据而设计&#xff0c;能够存储、检索和分析大量的向量数据。Milvus 的名字来源于拉丁语&#xff0c;意为“一万”&#xff0c;象征着其处理大规模数据集的能力。 M…

linux安装Openresty

安装必要的依赖库 指定仓库地址 下载openresty 添加环境变量 vi /etc/profile i export NGINX_HOME/usr/local/openresty/nginx/ export PATH${NGINX_HOME}/sbin:$PATH esc :wq source /etc/profile #启动 nginx # 重启 nginx -s reload #关闭 nginx -s stop

宿舍管理系统代码详解(主页面)

本篇将对管理系统的主页面的代码进行详细的介绍。 目录 一、主页面前端代码 1.样式展示 2.代码详解 &#xff08;1&#xff09;template部分 &#xff08;2&#xff09;script部分 &#xff08;3&#xff09;路由导航守卫 &#xff08;4&#xff09;在vue中引用vue 一、主页…

运维别卷系列 - 云原生监控平台 之 01.prometheus 入门和部署

文章目录 [toc]什么是 PrometheusPrometheus 架构及其一些生态系统组件Prometheus 的工作模式Prometheus 的适用场景Prometheus 的不适用场景Prometheus 词汇表 Prometheus 启动参数Prometheus 配置文件通用占位符定义配置文件示例解释服务发现 Prometheus 部署创建 namespace创…

Unity里的Time

Time and frame rate management Time类&#xff1a; Time script reference page. 一些常见的属性有&#xff1a; Time.time 返回从游戏开始经历的时间.Time.deltaTime 返回从上帧结束到现在经历的时间&#xff0c;和帧率成反比Time.timeScale 控制时间流逝的因子Time.fixe…

嵌入式学习-通用定时器

简介 框图介绍 时钟选择 计数器部分 输入捕获和输出比较框图 嵌入式学习全文参考&#xff08;小向是个der&#xff09;做笔记&#xff1a;https://blog.csdn.net/qq_41954556/article/details/129735708

C#中json数据序列化和反序列化的最简单方法(C#对象和字符串的相互转换)

文章目录 将C#对象转换为json字符串Newtonsoft模块的安装用Newtonsoft将对象转换为json字符串 将json字符串转换为C#对象 将C#对象转换为json字符串 本介绍将基于C#中的第三方库Newtonsoft进行&#xff0c;因此将分为Newtonsoft模块的安装和使用两部分。该模块的优势在于只需要…

Python以docker形式部署,flask简易服务器。

公司大部分都是springboot 服务器&#xff0c;有时候用到python写的一些模型&#xff0c;部署在linux上进行处理 首先项目这样&#xff1a; flask就不说了&#xff0c;快捷服务器&#xff0c; # -*- coding: utf-8 -*-from flask import Flask, request# 实例化Flask对象 app…

LVM - Linux磁盘逻辑卷管理器概念讲解及实践

1、lvm概念 逻辑卷管理器(LogicalVolumeManager)本质上是一个虚拟设备驱动,是在内核中块设备和物理设备之间添加的一个新的抽象层次,它可以将几块磁盘(物理卷,PhysicalVolume)组合起来形成一个存储池或者卷组(VolumeGroup)。LVM可以每次从卷组中划分出不同大小的逻辑卷(Logi…

【核弹】我的第一款IDEA插件

SuperHotSwap 插件名称叫做&#xff1a;SuperHotSwap&#xff08;超级热更新&#xff09; 开发初心&#xff1a;旨在做出一款最便捷的IDEA热更新插件&#xff0c;减少用户操作步骤&#xff0c;提供零配置的可视化操作更新。 为什么要写这个插件&#xff1a; 每次改一下Mappe…