Java进击框架:Spring-WebFlux(九)
- 前言
- Mono和Flux
- Spring WebFlux
- 反应的核心
- DispatcherHandler
- 带注释的控制器
- WebFlux配置
- WebClient
- 配置
- retrieve()
- 交换
- 请求正文
- Filters
- 属性
- 语境
- 同步使用
- 测试
- RSocket
- 反应库
前言
Spring框架中包含的原始web框架Spring web MVC是专门为Servlet API和Servlet容器构建的。响应式堆栈web框架Spring WebFlux是在5.0版本中添加的。它是完全非阻塞的,支持响应式流回压,并运行在诸如Netty、Undertow和Servlet容器之类的服务器上。
这两个web框架都镜像了它们的源模块的名字(Spring -webmvc和Spring -webflux),并在Spring框架中共存。每个模块都是可选的。应用程序可以使用其中一个或另一个模块,或者在某些情况下,两者都使用——例如,Spring MVC控制器与响应式WebClient。
Mono和Flux
在 Reactor 中,Mono 和 Flux 都提供了许多操作符,用于转换、过滤、组合和处理异步数据。这些操作符可以帮助你对异步序列进行各种操作,例如映射数据、过滤数据、合并序列等。
create()
方法:用于创建 Mono 对象的方法之一。它允许你使用自定义的方式来创建一个 Mono,并在其中定义异步操作。
public static void main(String[] args) {Mono.create(sink ->{});}
你可以在表达式中使用一些方法进行操作。
currentContext()
方法返回一个 Context 对象,你可以使用它来访问当前的上下文信息,并且可以在 Reactor 流中传递这个上下文。例如,你可以在 Mono.create()
或者 Flux.create()
中使用 currentContext()
方法获取当前的上下文,并在异步操作中传递或者修改上下文信息。
public static void main(String[] args) {Mono<Object> objectMono = Mono.create(sink -> {Object user = sink.currentContext();System.out.println(user);});}
error()
是 Reactor 中用于向订阅者发送错误信号的方法。当你在使用 Mono.create()
或 Flux.create()
创建自定义的 Mono 或 Flux 时,你可以通过调用 sink.error()
来通知订阅者发生了错误。
public static void main(String[] args) {Mono<Object> objectMono = Mono.create(sink -> {try {// 在这里进行一些可能抛出异常的操作throw new RuntimeException();} catch (Exception e) {// 发送错误信号给 Monosink.error(e);}});objectMono.subscribe(result -> {// 处理成功的结果},error -> {// 处理错误System.out.println(error);});}
onCancel()
是用于在自定义的 Mono 或 Flux 中处理取消事件的方法。当订阅者取消订阅时,你可以通过调用 sink.onCancel()
来执行一些清理操作或者释放资源。
public static void main(String[] args) {Mono<String> mono = Mono.create(sink -> {// 在这里进行一些操作// 注册取消事件的处理sink.onCancel(() -> {// 执行一些清理操作或者释放资源System.out.println("释放资源");});});// 订阅 MonoDisposable disposable = mono.subscribe(result -> {// 处理结果});// 当取消订阅时,注册的 onCancel 处理会被执行disposable.dispose(); }
onDispose()
是用于在自定义的 Mono 或 Flux 中处理释放事件的方法。当订阅关系被主动释放时,你可以通过调用 sink.onDispose()
来执行一些清理操作或者释放资源。
public static void main(String[] args) {Mono<String> mono = Mono.create(sink -> {// 在这里进行一些操作// 注册释放事件的处理sink.onDispose(() -> {// 执行一些清理操作或者释放资源});});// 订阅 MonoDisposable disposable = mono.subscribe(result -> {// 处理结果});// 释放订阅关系disposable.dispose(); // 当释放订阅关系时,注册的 onDispose 处理会被执行}
onRequest()
方法是用于在自定义的 Mono 或 Flux 中处理请求事件的方法。当订阅者请求元素时,你可以通过调用 sink.onRequest()
来响应请求并提供相应的元素。
public static void main(String[] args) {// 创建一个 Mono 对象Mono<String> mono = Mono.create(sink -> {// 在生成数据时,监听订阅者请求数据的事件sink.onRequest(request -> {System.out.println("Request received: " + request);});});}
success()
方法用于发送完成信号给订阅者,并且不提供任何数据。当你的数据流完成时,可以使用 sink.success()
方法来通知订阅者数据流已经结束,并且订阅者不再需要等待或请求数据。
public static void main(String[] args) {// 创建一个 Mono 对象Mono<String> mono = Mono.create(sink -> {sink.success();});// 订阅 Monomono.subscribe(result -> System.out.println("Result: " + result),error -> System.err.println("Error: " + error.getMessage()),() -> System.out.println("Complete"));/** Output:* Complete*/}
just()
方法:创建 Mono 的一种简单方式。它接受一个参数,并返回一个包含该参数的 Mono。
public static void main(String[] args) {// 创建一个 Mono 对象Mono<String> mono = Mono.just("Hello, world!");mono.subscribe(item -> {System.out.println(item);});/** Output:* Hello, world!*/}
empty()
方法,用于创建一个空的 Mono,即不发出任何元素或错误信号。
public static void main(String[] args) {Mono<String> mono = Mono.empty();mono.subscribe(value -> System.out.println("Received value: " + value),error -> System.err.println("Error occurred: " + error.getMessage()),() -> System.out.println("Completed without emitting any value"));}
justOrEmpty()
方法可能包含一个非空的数值,也可能为空。如果提供的数值为 null,则会创建一个空的 Mono 对象;否则,将创建一个包含指定数值的 Mono 对象。
public static void main(String[] args) {// 创建一个可能为空的 Mono 对象Mono<String> monoWithNull = Mono.justOrEmpty(null);// 订阅 Mono 对象monoWithNull.subscribe(result -> System.out.println("Result (with null): " + result),error -> System.err.println("Error: " + error.getMessage()),() -> System.out.println("Complete (with null)"));// 创建一个包含数值的 Mono 对象Mono<String> monoWithValue = Mono.justOrEmpty("Hello, Reactor!");monoWithValue.subscribe(result -> System.out.println("Result (with value): " + result),error -> System.err.println("Error: " + error.getMessage()),() -> System.out.println("Complete (with value)"));}
never()
方法它永远不会发出任何元素、完成信号或错误信号。
public static void main(String[] args) {Mono<String> mono = Mono.never();mono.subscribe(value -> System.out.println("Received value: " + value),error -> System.err.println("Error occurred: " + error.getMessage()),() -> System.out.println("Completed"));}
from()
方法:可以用于将其他类型的数据转换为 Mono。
public static void main(String[] args) {Publisher<String> publisher = Flux.just("Hello", "World");Mono<String> mono = Mono.from(publisher);}
Mono.from()
方法有多个重载形式,可以接受不同类型的参数。下面是一些常见的用法示例:
从 CompletableFuture 转换:
public static void main(String[] args) {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");Mono<String> mono = Mono.fromFuture(future);}
从 Callable 转换:
public static void main(String[] args) {Callable<String> callable = () -> "Hello";Mono<String> mono = Mono.fromCallable(callable);}
fromDirect()
是一个静态方法,用于创建一个 Mono,该 Mono 直接从给定的数据源发出信号。
public static void main(String[] args) {Mono<Integer> mono = Mono.fromDirect(Mono.just(42));mono.subscribe(System.out::println); // 输出:42}
fromCompletionStage()
是一个静态方法,用于创建一个 Mono 或 Flux,该 Mono 或 Flux 会订阅给定的 CompletionStage 对象,并在 CompletionStage 完成时发出对应的信号。
public static void main(String[] args) {CompletableFuture<String> completableFuture = CompletableFuture.completedFuture("Hello");Mono<String> mono = Mono.fromCompletionStage(completableFuture);mono.subscribe(System.out::println); // 输出:Hello}
fromRunnable()
方法订阅时执行给定的 Runnable,并发出一个信号表示执行完成。
public static void main(String[] args) {Runnable runnable = () -> System.out.println("Executing runnable");Mono<Void> mono = Mono.fromRunnable(runnable);}
fromSupplier()
方法在订阅时执行给定的 Supplier,并发出一个信号表示执行完成,并且携带由 Supplier 生成的结果值。
public static void main(String[] args) {Supplier<String> supplier = () -> "Hello, from supplier";Mono<String> mono = Mono.fromSupplier(supplier);mono.subscribe(System.out::println); // 输出:Hello, from supplier}
firstWithSignal()
方法:用于从多个 Mono 或 Flux 中选择第一个发出信号的源。
public static void main(String[] args) {Mono<Integer> mono1 = Mono.just(1);Mono<Integer> mono2 = Mono.just(2);Mono<Integer> resultMono = Mono.firstWithSignal(mono1, mono2);resultMono.subscribe(System.out::println); // 输出:2}
error()
方法:用于创建一个 Mono,该 Mono 发出一个错误信号。
public static void main(String[] args) {Mono<String> mono = Mono.error(new RuntimeException("Something went wrong"));mono.subscribe(value -> System.out.println("Received value: " + value),error -> System.err.println("Error occurred: " + error.getMessage()));}
defer()
方法 :是一个静态方法,用于延迟创建 Mono 对象。
public static void main(String[] args) {AtomicInteger counter = new AtomicInteger();Mono<Integer> mono = Mono.defer(() -> Mono.just(counter.incrementAndGet()));mono.subscribe(value -> System.out.println("Subscriber 1: " + value));mono.subscribe(value -> System.out.println("Subscriber 2: " + value));}
deferContextual()
方法:是 Context 接口中的一个方法,它允许您基于当前上下文创建一个新的数据流。
public static void main(String[] args) {// 创建一个包含上下文信息的 Mono 对象Mono<String> monoWithContext = Mono.deferContextual(contextView -> {// 从上下文中获取信息String requestId = contextView.get("requestId");// 使用上下文信息创建新的 Mono 对象return Mono.just("Request ID: " + requestId);});// 在订阅时传入上下文信息monoWithContext.contextWrite(Context.of("requestId", "123")).subscribe(result -> System.out.println("Result: " + result));}
firstWithValue()
方法:用于从多个 Mono 对象中选择第一个非空值。
public static void main(String[] args) {// 创建两个可能为空的 Mono 对象Mono<String> mono1 = Mono.justOrEmpty(null);Mono<String> mono2 = Mono.justOrEmpty("Value 2");// 使用 Mono.firstWithValue() 方法选择第一个非空值Mono<String> firstWithValueMono = Mono.firstWithValue(mono1, mono2);// 订阅 MonofirstWithValueMono.subscribe(result -> System.out.println("Result: " + result),error -> System.err.println("Error: " + error.getMessage()),() -> System.out.println("Complete"));}
ignoreElements()
方法:它会忽略源 Mono 的所有元素,只关注源 Mono 是否完成或发出错误信号。
public static void main(String[] args) {Mono<Integer> mono = Mono.just(42);Mono<Integer> ignoreElementsMono = mono.ignoreElement();ignoreElementsMono.subscribe(value -> System.out.println("Completed without emitting any value"),error -> System.err.println("Error occurred: " + error.getMessage()),() -> System.out.println("Completed without emitting any value"));}
using()
方法:会管理一个资源的生命周期,并在处理完成时自动释放该资源。
public static void main(String[] args) {Mono<String> mono = Mono.using(() -> getResource(), // 提供资源的函数resource -> Mono.just(processResource(resource)), // 提供要包装为 Mono 的资源的函数resource -> releaseResource(resource) // 在 Mono 完成或取消时释放资源的动作);mono.subscribe(value -> System.out.println("Received value: " + value),error -> System.err.println("Error occurred: " + error.getMessage()),() -> System.out.println("Completed"));}
也可以使用usingWhen()
方法。
zip()
方法:用于将多个 Mono 组合成一个新的 Mono 的方法。它会等待所有的 Mono 都发出一个元素,然后将这些元素按顺序组合成一个新的元组或对象,并将其作为新的 Mono 的元素发出。
public static void main(String[] args) {Mono<String> mono1 = Mono.just("Hello");Mono<String> mono2 = Mono.just("World");Mono<String> zippedMono = Mono.zip(mono1, mono2).map(tuple -> tuple.getT1() + " " + tuple.getT2());zippedMono.subscribe(System.out::println);}
zipDelayError()
方法会等待所有的 Mono 或 Flux 都完成,并且会保留所有的错误,而不是在第一个错误发生时立即终止流。
public static void main(String[] args) {Mono<Integer> mono1 = Mono.just(1);Mono<Integer> mono2 = Mono.error(new RuntimeException("Something went wrong"));Mono<Object> zippedMono = Mono.zipDelayError(mono1, mono2).map(tuple -> tuple.getT1() + " " + tuple.getT2());;zippedMono.subscribe(result -> System.out.println("Result: " + result),error -> System.err.println("Error: " + error.getMessage()));}
delay()
方法:用于创建一个延迟发射元素的 Mono。它会在指定的时间段后发出一个默认值。
public static void main(String[] args) {Mono<Long> delayedMono = Mono.delay(Duration.ofSeconds(5));delayedMono.subscribe(value -> System.out.println("Received value: " + value),error -> System.err.println("Error occurred: " + error.getMessage()),() -> System.out.println("Completed"));}
sequenceEqual()
方法:用于比较两个 Mono 对象的序列是否相等。它返回一个表示比较结果的 Mono 布尔值。
public static void main(String[] args) {// 创建两个 Mono 对象Mono<Integer> mono1 = Mono.just(123);Mono<Integer> mono2 = Mono.just(321);// 使用 Mono.sequenceEqual() 方法比较两个 Mono 对象的序列是否相等Mono<Boolean> sequenceEqualMono = Mono.sequenceEqual(mono1, mono2);// 订阅 MonosequenceEqualMono.subscribe(result -> System.out.println("Are sequences equal? " + result),error -> System.err.println("Error: " + error.getMessage()),() -> System.out.println("Complete"));}
when()
方法:用于将多个 Mono 对象组合在一起,并在它们都完成时执行一个操作。它返回一个Mono<Void>
对象,该对象表示所有 Mono 对象都已完成。
public static void main(String[] args) {// 创建三个 Mono 对象Mono<Integer> mono1 = Mono.just(1);Mono<Integer> mono2 = Mono.just(2);Mono<Integer> mono3 = Mono.just(3);// 使用 Mono.when() 方法将它们组合在一起Mono<Void> whenMono = Mono.when(mono1, mono2, mono3);// 订阅 MonowhenMono.subscribe(result -> System.out.println("All Monos completed"),error -> System.err.println("Error: " + error.getMessage()),() -> System.out.println("Complete"));}
与 Mono.when()
方法不同的是,Mono.whenDelayError()
方法会延迟发出错误信号,直到所有 Mono 都完成。
public static void main(String[] args) {// 创建两个 Mono 对象,其中 mono1 会抛出一个异常Mono<Integer> mono1 = Mono.error(new RuntimeException("Error in mono1"));Mono<Integer> mono2 = Mono.just(2);// 使用 Mono.whenDelayError() 方法将它们组合在一起Mono<Void> whenDelayErrorMono = Mono.whenDelayError(mono1, mono2);// 订阅 MonowhenDelayErrorMono.subscribe(result -> System.out.println("All Monos completed"),error -> System.err.println("Error: " + error.getMessage()),() -> System.out.println("Complete"));}
除此之外,Flux还有一些方法,按需使用,这里就不详细介绍了。
Spring WebFlux
为什么要创建Spring WebFlux?
部分答案是需要一个非阻塞的web堆栈来处理少量线程的并发性,并使用更少的硬件资源进行扩展。Servlet非阻塞I/O远离Servlet API的其余部分,其中契约是同步的(Filter, Servlet)或阻塞(getParameter, getPart).这是一个新的通用API作为任何非阻塞运行时的基础的动机。这一点很重要,因为服务器(如Netty)在异步、非阻塞领域已经非常成熟。
答案的另一部分是函数式编程。就像Java 5中添加注释创造了机会一样(比如带注释的REST控制器或单元测试),Java 8中添加lambda表达式也为Java中的函数式API创造了机会。这对于非阻塞应用程序和延续风格的API(如CompletableFuture和ReactiveX),它允许异步逻辑的声明性组合。在编程模型层面上,Java 8使Spring WebFlux能够提供功能性web端点和带注释的控制器。
- 定义“反应性”
我们谈到了“无阻塞”和“功能性”,但是反应性是什么意思呢?
术语“反应式”指的是围绕对变化做出反应而构建的编程模型——对I/O事件做出反应的网络组件、对鼠标事件做出反应的UI控制器等。从这个意义上说,非阻塞是被动的,因为我们现在不是被阻塞,而是在操作完成或数据可用时对通知做出反应。
反应堆是Spring WebFlux的首选反应库。反应流对互操作性起着重要的作用。库和基础设施组件对它感兴趣,但作为应用程序API用处不大,因为它太低级了。应用程序需要更高级、更丰富、功能更强大的API来组成异步逻辑——类似于Java 8 StreamAPI,但不仅限于集合。这就是反应式库扮演的角色。
- 适应性
Spring MVC还是WebFlux?
这是一个很自然的问题,但却是一个不合理的二分法。实际上,两者共同努力来扩大可用选项的范围。这两者是为相互之间的连续性和一致性而设计的,它们可以并行使用,每一方的反馈都有利于双方。下图显示了两者之间的关系、共同点以及各自支持的独特功能:
我们建议您考虑以下几点:
-
如果您有一个运行良好的Spring MVC应用程序,则不需要进行更改。您有最多的库选择,因为从历史上看,大多数库都是阻塞的。
-
如果你已经在购买一个非阻塞的web堆栈,Spring WebFlux提供了与该领域其他产品相同的执行模型优势,还提供了服务器选择(Netty、Tomcat、Jetty、Undertow和Servlet容器)、编程模型选择(带注释的控制器和功能web端点)和响应式库选择(反应器、RxJava或其他)。
-
如果你对使用Java 8 lambda或Kotlin的轻量级、功能性web框架感兴趣,你可以使用Spring WebFlux功能性web端点。对于需求不太复杂的小型应用程序或微服务来说,这也是一个不错的选择,这些应用程序或微服务可以从更高的透明度和控制中受益。
-
在微服务架构中,你可以混合使用Spring MVC或Spring WebFlux控制器,也可以使用Spring WebFlux功能端点。在这两个框架中支持相同的基于注释的编程模型,可以更容易地重用知识,同时也可以为正确的工作选择正确的工具。
-
评估应用程序的一个简单方法是检查它的依赖项。如果您有阻塞持久性api (JPA、JDBC)或网络api要使用,Spring MVC至少是通用架构的最佳选择。反应器和RxJava在单独的线程上执行阻塞调用在技术上是可行的,但是您将无法充分利用非阻塞web堆栈。
-
如果您有一个调用远程服务的Spring MVC应用程序,请尝试响应式WebClient。您可以直接从Spring MVC控制器方法返回响应类型(Reactor、RxJava或其他)。每个呼叫的延迟时间越长,或者呼叫之间的相互依赖性越强,好处就越显著。Spring MVC控制器也可以调用其他响应式组件。
-
如果您有一个大型团队,请记住,在向非阻塞、函数式和声明式编程转变的过程中,学习曲线是陡峭的。一种不需要完全切换的实用方法是使用响应式WebClient。除此之外,从小事做起,衡量收益。我们预计,对于广泛的应用来说,这种转变是不必要的。如果您不确定要寻找什么好处,可以从了解非阻塞I/O是如何工作的(例如,单线程Node.js上的并发性)及其效果开始。
-
并发模型
Spring MVC和Spring WebFlux都支持带注释的控制器,但在并发模型和对阻塞和线程的默认假设方面有一个关键区别。
在Spring MVC(以及一般的servlet应用程序)中,假设应用程序可以阻塞当前线程(例如,对于远程调用)。出于这个原因,servlet容器使用一个大的线程池来吸收请求处理期间的潜在阻塞。
在Spring WebFlux(以及一般的非阻塞服务器)中,假设应用程序不会阻塞。因此,非阻塞服务器使用一个固定大小的小型线程池(事件循环工作线程)来处理请求。
“扩展”和“少量线程”听起来可能是矛盾的,但是从不阻塞当前线程(而是依靠回调)意味着您不需要额外的线程,因为没有阻塞调用需要吸收。
反应的核心
spring-web模块包含以下对响应式web应用的基本支持:
对于服务器请求处理,有两个级别的支持。
-
HttpHandler:用于非阻塞I/O和响应式流回压的HTTP请求处理的基本契约,以及用于反应器Netty、Undertow、Tomcat、Jetty和任何Servlet容器的适配器。
-
WebHandler API:稍高级的,用于请求处理的通用web API,在其之上构建具体的编程模型,如带注释的控制器和功能端点。
对于客户端,有一个基本的ClientHttpConnector合约来执行非阻塞I/O和响应式流回压的HTTP请求,以及用于反应器Netty、响应式Jetty HttpClient和Apache HttpComponents的适配器。应用程序中使用的高级WebClient建立在这个基本契约之上。
对于客户机和服务器,用于HTTP请求和响应内容的序列化和反序列化的编解码器。
- HttpHandler
HttpHandler是一个简单的契约,使用单一方法处理请求和响应。它是有意最小化的,其主要且唯一的目的是作为不同HTTP服务器API的最小抽象。
以下是一个简单的示例,演示如何使用HttpHandler处理HTTP请求:
public class MyHttpHandler implements HttpHandler {@Overridepublic Mono<Void> handle(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse) {return ServerResponse.ok().contentType(MediaType.TEXT_PLAIN)//构建一个成功的响应,设置响应类型为text/plain.body(BodyInserters.fromValue("Hello, World!"))//并将内容设置为"Hello, World!"。.then();}public static void main(String[] args) {MyHttpHandler myHttpHandler = new MyHttpHandler();//启动了一个基于Reactor Netty的HTTP服务器,并将MyHttpHandler注册为请求处理器。ReactorHttpHandlerAdapter httpHandler = new ReactorHttpHandlerAdapter(myHttpHandler);//创建了一个HTTP服务器对象,并使用host()和port()方法分别设置主机名和端口号。HttpServer httpServer = HttpServer.create().host("localhost").port(8080);//将服务器绑定到指定的主机和端口上DisposableServer disposableServer = httpServer.handle(httpHandler).bindNow();//阻塞当前线程,直到服务器关闭。disposableServer.onDispose().block();}
}
- WebHandler应用程序接口
虽然HttpHandler有一个简单的目标,抽象不同HTTP服务器的使用,但WebHandler API旨在提供更广泛的web应用程序中常用的功能集,例如:
- 带有属性的用户会话。
- 请求属性。
- 请求的已解析区域设置或主体。
- 访问解析和缓存的表单数据。
- 多部分数据的抽象。
示例代码如下:
public class MyHttpHandler implements WebHandler {@Overridepublic Mono<Void> handle(ServerWebExchange serverWebExchange) {//如果是 /hello,则向客户端发送 "Hello, World!" 的响应;否则,返回 404 Not Found 的响应。if (serverWebExchange.getRequest().getPath().equals("/hello")) {String message = "Hello, World!";return serverWebExchange.getResponse().writeWith(Mono.just(serverWebExchange.getResponse().bufferFactory().wrap(message.getBytes()))).doOnError(error -> serverWebExchange.getResponse().setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR));} else {return Mono.error(new ResponseStatusException(HttpStatus.NOT_FOUND, "Resource not found"));}}public static void main(String[] args) {WebClient webClient = WebClient.create();Mono<String> response = webClient.method(HttpMethod.GET).uri("http://localhost:8080/hello").accept(MediaType.TEXT_PLAIN).retrieve().bodyToMono(String.class);response.subscribe(System.out::println);}
}
ServerWebExchange公开以下访问表单数据的方法:
public class MyHttpHandler implements WebHandler {@Overridepublic Mono<Void> handle(ServerWebExchange serverWebExchange) {Mono<MultiValueMap<String, Part>> formData = serverWebExchange.getMultipartData();}
}
ServerWebExchange公开以下访问多部分数据的方法:
public class MyHttpHandler implements WebHandler {@Overridepublic Mono<Void> handle(ServerWebExchange serverWebExchange) {Mono<MultiValueMap<String, Part>> multipartData = serverWebExchange.getMultipartData();}
}
DispatcherHandler
与Spring MVC类似,Spring WebFlux是围绕前端控制器模式设计的,其中中央WebHandler DispatcherHandler为请求处理提供共享算法,而实际工作则由可配置的委托组件执行。这个模型是灵活的,并且支持不同的工作流。
DispatcherHandler从Spring配置中发现它需要的委托组件。它本身也被设计成一个Spring bean,并实现了ApplicationContextAware来访问它运行的上下文。如果DispatcherHandler是用webHandler的bean名称声明的,那么它又会被WebHttpHandlerBuilder发现,后者将请求处理链组合在一起,如webHandler API中所述。
配置给WebHttpHandlerBuilder来构建处理链,如下面的例子所示:
ApplicationContext context = ...
HttpHandler handler = WebHttpHandlerBuilder.applicationContext(context).build();
得到的HttpHandler可以与服务器适配器一起使用。
DispatcherHandler委托特殊的bean处理请求并呈现适当的响应。所谓“特殊bean”,我们指的是实现WebFlux框架契约的spring管理对象实例。它们通常带有内置契约,但您可以自定义它们的属性、扩展它们或替换它们。比如:HandlerMapping、HandlerAdapter、HandlerResultHandler。
示例代码如下:
public class MyHandlerResultHandler implements HandlerResultHandler {@Overridepublic boolean supports(HandlerResult result) {// 判断是否支持处理给定的处理器方法结果类型// 这里可以根据需要进行判断,返回 true 或 falsereturn true; // 支持所有类型的处理器方法结果}@Overridepublic Mono<Void> handleResult(ServerWebExchange exchange, HandlerResult result) {// 处理给定的处理器方法结果// 这里可以对结果进行转换、序列化、应用其他后置处理等操作// 返回一个空的 Mono 表示处理完成return Mono.empty();}
}
@Configuration
public class WebFluxConfig implements WebFluxConfigurer {@Overridepublic void configureHandlerResultHandlers(List<HandlerResultHandler> handlers) {// 注册自定义的结果处理器handlers.add(new MyHandlerResultHandler());}
}
- 处理
DispatcherHandler按如下方式处理请求:
-
每个HandlerMapping要求查找匹配的处理程序,并使用第一个匹配项。
-
如果找到处理程序,它将通过适当的HandlerAdapter,它将执行的返回值公开为HandlerResult。
-
HandlerResult被赋予一个适当的HandlerResultHandler通过直接写入响应或使用视图呈现来完成处理。
带注释的控制器
大部分的内容在Spring Web章节中介绍过了,他们之间并没什么区别。
- 功能性端点
在WebFlux.fn, HTTP请求由HandlerFunction处理:一个接受ServerRequest并返回延迟的ServerResponse(即Mono<ServerResponse>)
的函数。请求和响应对象都具有不可变的契约,提供对JDK 8友好的HTTP请求和响应访问。HandlerFunction相当于基于注释的编程模型中@RequestMapping
方法的主体。
传入的请求被路由到一个带有RouterFunction的处理函数:这个函数接受ServerRequest并返回一个延迟的HandlerFunction(即Mono<HandlerFunction>
)。当router函数匹配时,返回一个handler函数;否则为空单声道。RouterFunction相当于@RequestMapping
注释,但主要区别在于,路由器函数不仅提供数据,还提供行为。
RouterFunctions.route()
提供了一个路由器构建器,可以方便地创建路由器,如下面的例子所示:
@Configuration
public class Config {@Autowiredprivate MyHandler handler;@Beanpublic RouterFunction<ServerResponse> route() {return RouterFunctions.route().GET("/hello", handler::handleRequest).build();}
}
@Component
public class MyHandler {public Mono<ServerResponse> handleRequest(ServerRequest request) {// 处理请求逻辑String name = request.queryParam("name").orElse("Anonymous");String message = "Hello, " + name + "!";// 构建响应return ServerResponse.ok().body(BodyInserters.fromValue(message));}}
- handler函数
ServerRequest和ServerResponse是不可变的接口,提供对HTTP请求和响应的JDK 8友好访问。请求和响应都为响应流提供反压力。请求体用Reactor Flux或Mono表示。响应体可以用任何响应流发布者表示,包括Flux和Mono。
- ServerRequest
ServerRequest提供对HTTP方法、URI、报头和查询参数的访问,而对主体的访问是通过主体方法提供的。
下面的例子将请求体提取为Mono<String>
:
public void handleRequest(ServerRequest request) {Mono<String> string = request.bodyToMono(String.class);//使用 Reactor 提供的操作符 subscribe 来订阅该 Mono 对象并获取其值string.subscribe(str->{System.out.println(str);});}
subscribe
方法是用于订阅响应式流的核心方法。它可以接收一个或多个回调函数,以便在 Mono 或 Flux 完成时执行特定的逻辑。
下面的例子将主体提取到Flux<Person>
,对象从一些序列化形式(如JSON或XML)进行解码:
public void handleRequest(ServerRequest request) {Flux<String> string = request.bodyToFlux(String.class);//使用 subscribe() 方法订阅 Flux<Person> 对象以触发数据流的处理。string.subscribe(str->{System.out.println(str);});}
上述示例也可以写成如下形式:
public void handleRequest(ServerRequest request) {Mono<String> stringMono = request.body(BodyExtractors.toMono(String.class));Flux<String> stringFlux = request.body(BodyExtractors.toFlux(String.class));}
下面的例子展示了如何访问表单数据:
public void handleRequest(ServerRequest request) {Mono<MultiValueMap<String, String>> multiValueMapMono = request.formData();multiValueMapMono.subscribe(item->{List<String> name = item.get("name");if (name != null && !name.isEmpty()) {String value = name.get(0);System.out.println("Value of fieldName: " + value);} else {System.out.println("fieldName not found in form data");}});}
下面的例子展示了如何以map的形式访问多部分数据:
public void handleRequest(ServerRequest request) {Mono<MultiValueMap<String, Part>> multiValueMapMono = request.multipartData();multiValueMapMono.subscribe(item->{for (String fieldName : item.keySet()) {List<Part> parts = item.get(fieldName);for (Part part : parts) {// 访问 part 的属性或内容System.out.println(fieldName + ": " + part.content());}}});}
下面介绍其他方法的用法:
(1)queryParam()
方法:从请求中获取查询参数。
public void handleRequest(ServerRequest request) {Optional<String> name = request.queryParam("name");//判断值是否存在if (name.isPresent()) {String s = name.get();System.out.println(s);}}
(2)methodName()
方法:用于获取 HTTP 请求的方法名,即请求的 HTTP 方法(例如 GET、POST、PUT、DELETE 等)。
public void handleRequest(ServerRequest request) {String name = request.methodName();System.out.println(name);}
request.method()
方法返回HttpMethod对象。
(3)attribute()
方法:用于获取 HTTP 请求的属性(Attribute)值。
public void handleRequest(ServerRequest request) {Optional<Object> name = request.attribute("name");System.out.println(name.get());}
除此之外可以通过attributes()
,获取所有的值。
public void handleRequest(ServerRequest request) {Map<String, Object> name = request.attributes();name.entrySet().stream().forEach(item->{System.out.println(item.getKey()+":"+item.getValue());});}
(4)checkNotModified()
方法:用于检查 HTTP 请求中的 If-Modified-Since 和 If-None-Match 头信息,以确定请求的资源是否已被修改。如果资源未被修改,服务器可以返回一个 304 Not Modified 响应,从而避免重新传输相同的内容。
public void handleRequest(ServerRequest request) {String data = dataService.getDataById(id);// 计算数据的ETag(这里使用数据的哈希值作为示例)String etag = Integer.toString(data.hashCode());request.checkNotModified(etag).handle((result, sink) -> {if (result != null) {// 资源未被修改,返回304 Not Modified状态码sink.next(ResponseEntity.status(HttpStatus.NOT_MODIFIED).build());} else {// 资源已被修改,返回200 OK状态码和资源内容sink.next(ResponseEntity.ok().cacheControl(CacheControl.maxAge(Duration.ofSeconds(30))).eTag(etag).body(data));}}).then();}
(5)cookies()
方法:用于获取客户端发送的所有 cookie。
public void handleRequest(ServerRequest request) {MultiValueMap<String, HttpCookie> cookies = request.cookies();if (!cookies.isEmpty()) {// 遍历每个 cookiefor (String name : cookies.keySet()) {List<HttpCookie> cookieValues = cookies.get(name);// 处理 cookie// 例如,打印 cookie 的名称和值for (HttpCookie cookie : cookieValues) {System.out.println("Cookie Name: " + cookie.getName());System.out.println("Cookie Value: " + cookie.getValue());}}} else {System.out.println("No cookies found.");}}
(6)headers()
方法:获取请求头信息,使用它来获取特定的请求头字段的值。
public Mono<ServerResponse> handleRequest(ServerRequest request) {ServerRequest.Headers headers = request.headers();System.out.println(headers.firstHeader("name"));}
(7)exchange()
方法:获得请求和响应信息。
public void handleRequest(ServerRequest request) {ServerWebExchange exchange = request.exchange();// 获取请求信息String method = exchange.getRequest().getMethodValue();String path = exchange.getRequest().getPath().toString();}
(8)path()
方法:获取请求地址。
public void handleRequest(ServerRequest request) {String path = request.path();System.out.println(path);}
(9)pathVariable()
方法:用于从请求的 URL 路径中提取参数值。
public void handleRequest(ServerRequest request) {///请求地址:hello/{id}String id = request.pathVariable("id");System.out.println(id);}
pathVariables()
方法来获取所有路径参数的值。
public void handleRequest(ServerRequest request) {Map<String, String> pathVariables = request.pathVariables();if (!pathVariables.isEmpty()) {for (Map.Entry<String, String> entry : pathVariables.entrySet()) {String name = entry.getKey();String value = entry.getValue();System.out.println("Path variable " + name + ": " + value);}} else {System.out.println("No path variables found");}}
(10)localAddress()
方法:获取请求的本地地址信息。
public void handleRequest(ServerRequest request) {Optional<InetSocketAddress> localAddress= request.localAddress();if (localAddress.isPresent()) {InetSocketAddress address = localAddress.get();String hostName = address.getHostName();int port = address.getPort();System.out.println("Local host: " + hostName);System.out.println("Local port: " + port);} else {System.out.println("Local address information not available");}}
你可以使用remoteAddress()
方法,来获取请求的远程地址信息。
(11)messageReaders()
方法:用于获取用于读取请求消息的 MessageReader 实例。
public void handleRequest(ServerRequest request) {List<HttpMessageReader<?>> messageReaders = request.messageReaders();for (HttpMessageReader<?> reader : messageReaders) {if (reader instanceof Decoder) {Decoder decoder = (Decoder) reader;System.out.println("Found decoder: " + decoder.getClass().getName());} else {System.out.println("Found message reader: " + reader.getClass().getName());}}}
(12)principal()
方法:获取请求的身份验证信息(Principal)。
public void handleRequest(ServerRequest request) {Mono<? extends Principal> principal = request.principal();principal.subscribe(item->{String name = item.getName();//验证});}
(13)uri()
方法:获取请求的 URI(Uniform Resource Identifier)信息。
public void handleRequest(ServerRequest request) {URI uri = request.uri();System.out.println(uri.getHost());}
(14)uriBuilder()
方法:构建请求 URI。
public void handleRequest(ServerRequest request) {// 构建请求的 URIUriBuilder uriBuilder = request.uriBuilder();uriBuilder.scheme("https");uriBuilder.host("example.com");uriBuilder.path("/path/to/resource");// 获取构建后的 URIURI uri = uriBuilder.build();System.out.println(uri);}
- ServerResponse
ServerResponse提供对HTTP响应的访问,由于它是不可变的,因此可以使用构建方法来创建它。您可以使用构建器来设置响应状态、添加响应头或提供响应体。下面的示例创建一个带有JSON内容的200 (OK)响应。
public Mono<ServerResponse> handleRequest() {Mono<Person> person = ... // 这里是获取 Person 对象的逻辑return ServerResponse.ok()//成功状态码.contentType(MediaType.APPLICATION_JSON)//设置响应类型.bodyValue(person,Person.class);//返回值}
你可以使用 ServerResponse.created(location).build()
方法来构建一个创建资源成功的响应。该方法返回一个 ServerResponse.Builder
对象,你可以通过调用 build()
方法来构建最终的响应。
public ServerResponse handleRequest() {URI location = ... // 这里是获取新资源的位置 URI 的逻辑return ServerResponse.created(location).build();}
HTTP/1.1 201 Created
Location: https://example.com/users/123
根据所使用的编解码器,可以通过传递提示参数来定制正文的序列化或反序列化方式。例如,指定一个Jackson JSON视图:
// 定义一个简单的 POJO 类
class User {private String name;private int age;// 使用 @JsonView 注解指定不同的视图@JsonView(Views.Public.class)public String getName() {return name;}// 使用 @JsonView 注解指定不同的视图@JsonView(Views.Internal.class)public int getAge() {return age;}public void setName(String name) {this.name = name;}public void setAge(int age) {this.age = age;}
}// 定义视图接口
interface Views {// 空标记接口,用于公开的视图interface Public {}// 空标记接口,用于内部的视图interface Internal extends Public {}
}// 定义一个处理器类
class UserHandler {public Mono<ServerResponse> getUser(ServerRequest request) {// 创建一个 User 对象User user = new User();user.setName("John");user.setAge(30);// 返回带有 JSON 视图的 HTTP 200 OK 响应return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).hint(Jackson2CodecSupport.JSON_VIEW_HINT, Views.Public.class) // 指定视图.body(Mono.just(user), User.class);}
}// 定义路由配置类
class RouterConfig {public RouterFunction<ServerResponse> routes(UserHandler handler) {return RouterFunctions.route().GET("/user", handler::getUser).build();}
}
下面介绍一些其他的用法:
(1)ok()
方法:用于创建 HTTP 200 OK 响应的静态工厂方法。
ServerResponse.ok()
你可以通过链式调用 body()
方法来设置响应的主体内容,并返回最终的 ServerResponse 对象。例如:
public Mono<ServerResponse> handleRequest(ServerRequest request) {// 处理逻辑//方式一:ServerResponse.ok().body("hello world",String.class);//方式二:ServerResponse.ok().bodyValue("hello world");//方式三return ServerResponse.ok().body(BodyInserters.fromValue("Hello, world!"));}
不需要设置响应主体内容的情况,或者响应主体内容已经在其他地方设置好了。调用build()
方法创建一个表示 HTTP 200 OK 响应的 Mono<ServerResponse>
对象,如下:
public Mono<ServerResponse> handleRequest(ServerRequest request) {// 处理逻辑return ServerResponse.ok().build();}
你还可以设置响应的内容类型,如下:
public Mono<ServerResponse> handleRequest(ServerRequest request) {// 处理逻辑return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).bodyValue("hello world");}
contentLength()
方法用于设置响应体的长度。
public Mono<ServerResponse> handleRequest(ServerRequest request) {// 处理逻辑// 创建一个成功响应,设置内容类型为 application/json,设置长度为 100return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).contentLength(100L).bodyValue("hello world");}
如果你需要渲染的模板名称和模型数据,可以使用render()
方法。
public Mono<ServerResponse> handleRequest(ServerRequest request) {// 处理逻辑// 创建一个成功响应,并渲染模板Map data = new HashMap();return ServerResponse.ok().render("templateName", data/* 模型数据 */);}
调用 hint()
方法来设置响应的提示信息。hint()
方法接受两个参数,第一个参数是提示信息的键,第二个参数是提示信息的值。
public Mono<ServerResponse> handleRequest(ServerRequest request) {// 处理逻辑// 创建一个成功响应,并设置响应体的长度提示信息return ServerResponse.ok().hint("content-length", 100L).build();}
(2)status()
方法:用于创建自定义状态码的响应的静态工厂方法。传入一个 HttpStatus 枚举值或自定义的状态码来设置响应的状态码。例如:
public Mono<ServerResponse> handleRequest(ServerRequest request) {// 处理逻辑return ServerResponse.status(HttpStatus.OK).build();}
除此之外,提供了一系列静态方法来创建不同类型的响应。
notFound()
方法:表示资源未找到的响应。
public Mono<ServerResponse> handleRequest(ServerRequest request) {// 处理逻辑return ServerResponse.notFound().build();}
unprocessableEntity()
方法:创建一个表示请求无法处理的响应。
public Mono<ServerResponse> handleRequest(ServerRequest request) {// 处理逻辑return ServerResponse.unprocessableEntity().build();}
badRequest()
方法:表示服务器无法处理客户端发送的请求(HTTP 400 Bad Request)。
public Mono<ServerResponse> handleRequest(ServerRequest request) {// 处理逻辑return ServerResponse.badRequest().build();}
noContent()
方法:响应表示服务器成功处理了请求,但没有返回任何内容(HTTP 204 No Content)。
public Mono<ServerResponse> handleRequest(ServerRequest request) {// 处理逻辑return ServerResponse.noContent().build();}
accepted()
方法:表示服务器已经接受请求 (HTTP 202 Accepted )。
public Mono<ServerResponse> handleRequest(ServerRequest request) {// 处理逻辑return ServerResponse.accepted().build();}
你可以进一步通过链式调用其他方法来添加头信息、设置响应的内容类型等。
public Mono<ServerResponse> handleRequest(ServerRequest request) {// 处理逻辑return ServerResponse.accepted().header("X-Info-Message", "ok").build();}
(3)created()
方法:用于创建一个表示资源已创建的响应。这通常用于在 RESTful 服务中返回成功创建资源的响应。
public Mono<ServerResponse> handleRequest(ServerRequest request) {// 处理逻辑// 创建一个表示资源已创建的响应,并设置响应头和内容return ServerResponse.created(request.uri()).contentType(MediaType.APPLICATION_JSON).body(BodyInserters.fromValue("Resource created successfully"));}
在上述代码中,我们使用 created()
方法创建一个表示资源已创建的响应,并通过 request.uri()
方法设置了响应头中的 Location 字段为当前请求的 URI。然后,我们使用 contentType()
方法设置响应的内容类型为 JSON,并使用 body()
方法将响应体设置为一个包含成功消息的字符串。
(4)from()
方法:用于创建一个新的 ServerResponse 对象,该对象与传入的 ServerResponse 对象具有相同的状态码、头信息和主体内容。
public Mono<ServerResponse> handleRequest(ServerRequest request) {// 处理逻辑Mono<ServerResponse> originalResponse = ServerResponse.ok().bodyValue("Hello World");Mono<ServerResponse> newResponse = ServerResponse.from(originalResponse.block()).header("X-Custom-Header", "Custom Value").build();return newResponse;}
(5)permanentRedirect()
方法:表示所请求的资源已被永久移动到了新的位置(HTTP 301)。
public Mono<ServerResponse> handleRequest(ServerRequest request) {// 处理逻辑URI newUri = URI.create("https://example.com/new-url");return ServerResponse.permanentRedirect(newUri).header("Location", newUri.toString()).build();}
(6)temporaryRedirect()
方法:用于创建一个 HTTP 307 临时重定向的 ServerResponse 对象。
HTTP 307 临时重定向响应表示所请求的资源已被临时移动到了新的位置。与 HTTP 301 不同,客户端在收到该响应后不会自动更新其链接,而是会保留原始请求的方法和请求体,并将请求发送到新的位置。
public Mono<ServerResponse> handleRequest(ServerRequest request) {// 处理逻辑URI newUri = URI.create("https://example.com/new-url");return ServerResponse.temporaryRedirect(newUri).header("Location", newUri.toString()).build();}
(7)seeOther()
方法:用于创建一个 HTTP 303 查看其它位置的 ServerResponse 对象。
public Mono<ServerResponse> handleRequest(ServerRequest request) {// 处理逻辑URI newUri = URI.create("https://example.com/new-url");return ServerResponse.seeOther(newUri).header("Location", newUri.toString()).build();}
- 处理程序类
我们可以编写一个lambda形式的处理函数,如下例所示:
HandlerFunction<ServerResponse> helloWorld =request -> ServerResponse.ok().bodyValue("Hello World");
这很方便,但在应用程序中我们需要多个函数,而多个内联lambda可能会变得混乱。因此,将相关的处理程序函数组合到一个处理程序类中是很有用的,该处理程序类在基于注释的应用程序中具有与@Controller
类似的作用。例如,下面的类公开了一个响应式Person存储库:
public class PersonHandler {private final PersonRepository repository;public PersonHandler(PersonRepository repository) {this.repository = repository;}public Mono<ServerResponse> listPeople(ServerRequest request) { Flux<Person> people = repository.allPeople();return ok().contentType(APPLICATION_JSON).body(people, Person.class);}public Mono<ServerResponse> createPerson(ServerRequest request) { Mono<Person> person = request.bodyToMono(Person.class);return ok().build(repository.savePerson(person));}public Mono<ServerResponse> getPerson(ServerRequest request) { int personId = Integer.valueOf(request.pathVariable("id"));return repository.getPerson(personId).flatMap(person -> ok().contentType(APPLICATION_JSON).bodyValue(person)).switchIfEmpty(ServerResponse.notFound().build());}
}
- 验证
功能性端点可以使用Spring的验证工具对请求体应用验证。
public class PersonValidator implements Validator {private static final String REQUIRED_FIELD = "field.required";private static final String INVALID_EMAIL = "email.invalid";@Overridepublic boolean supports(Class<?> clazz) {return Person.class.equals(clazz);}@Overridepublic void validate(Object target, Errors errors) {Person person = (Person) target;// 验证 name 字段不为空if (person.getName() == null || person.getName().trim().equals("")) {errors.rejectValue("name", REQUIRED_FIELD, "Name cannot be empty");}// 验证 email 字段为有效的邮箱格式if (person.getEmail() != null && !person.getEmail().matches(".+@.+\\..+")) {errors.rejectValue("email", INVALID_EMAIL, "Invalid email format");}}
}
public class PersonHandler {private final Validator validator = new PersonValidator();//创造Validator实例private final PersonRepository repository; // 假设已定义了 PersonRepository 类public Mono<ServerResponse> createPerson(ServerRequest request) {Mono<Person> personMono = request.bodyToMono(Person.class).doOnNext(this::validate);return personMono.flatMap(person -> ServerResponse.ok().build(repository.savePerson(person)));}private void validate(Person person) {//用于在数据绑定和验证过程中收集和处理验证错误Errors errors = new BeanPropertyBindingResult(person, "person");validator.validate(person, errors);if (errors.hasErrors()) {throw new ServerWebInputException(errors.toString());}}
}
doOnNext()
方法允许你在每个元素被处理之前注册一个回调函数。
- RouterFunction
路由器函数用于将请求路由到相应的HandlerFunction。通常,你不会自己编写路由器函数,而是使用RouterFunctions实用工具类中的一个方法来创建一个。RouterFunctions.route()
(不带参数)为你提供了一个流畅的构建器来创建路由器函数。route(RequestPredicate, HandlerFunction)
提供了一种直接创建路由器的方法。
通常,建议使用route()
构建器,因为它为典型的映射场景提供了方便的快捷方式,而不需要难以发现的静态导入。例如,路由器函数构建器提供了GET(String, HandlerFunction)
方法来为GET请求创建映射;和POST(String, HandlerFunction)
用于POST。
RouterFunctions.route()
是 Spring WebFlux 中用于定义路由的静态方法之一。它允许你基于请求的条件来定义路由规则,以及指定处理请求的处理器函数。
@Configuration
public class MyWebFluxConfig {@Beanpublic RouterFunction<ServerResponse> route(MyWebFluxHandler handler) {return RouterFunctions.route().GET("/users", handler::handleRequest).POST("/users",handler::handleRequest).build();}
}
@Component
public class MyWebFluxHandler {public Mono<ServerResponse> handleRequest(ServerRequest request) {// 处理逻辑URI newUri = URI.create("https://example.com/new-url");return ServerResponse.temporaryRedirect(newUri).header("Location", newUri.toString()).build();}public Mono<ServerResponse> handleRequest2(ServerRequest request) {// 处理逻辑URI newUri = URI.create("https://example.com/new-url");return ServerResponse.temporaryRedirect(newUri).header("Location", newUri.toString()).build();}}
在上面的示例中,我们定义了两个路由规则:一个用于处理 GET /users
请求,另一个用于处理 POST /users
请求。
下面介绍其他的方法
(1)changeParser()
方法:允许你在处理请求的过程中更改请求体数据解析器。
@Configuration
public class MyWebFluxConfig {@Beanpublic RouterFunction<ServerResponse> route(MyWebFluxHandler handler) {RouterFunction<ServerResponse> build = RouterFunctions.route().GET("/users", handler::handleRequest).POST("/users", handler::handleRequest2).build();RouterFunction<ServerResponse> serverResponseRouterFunction = RouterFunctions.changeParser(build, PathPatternParser.defaultInstance);return serverResponseRouterFunction;}
}
(2)nest()
方法:用于在路由函数中创建嵌套的子路由器。
@Configuration
public class MyWebFluxConfig {@Beanpublic RouterFunction<ServerResponse> route(MyWebFluxHandler handler) {// 创建主路由器return RouterFunctions.route().path("/api", builder -> builder.nest(RequestPredicates.method(HttpMethod.GET), subBuilder ->RouterFunctions.route().GET("/subEndpoint", serverRequest -> ServerResponse.ok().build()).build())).build();}}
通过这样的配置,当收到 GET 请求的路径为 "/api/subEndpoint"
时,将通过 ServerResponse.ok().build()
方法返回一个 HTTP 200 OK 的响应。
(3)resources()
方法:它允许你将请求映射到类路径下的静态资源,比如 HTML 文件、图片、CSS 文件等。
@Configuration
public class StaticResourceConfig {@Beanpublic RouterFunction<ServerResponse> staticResourceRouter() {return RouterFunctions.resources("/static/**", new ClassPathResource("static/"));}
}
通过这样的配置,当收到以 "/static/"
开头的请求时,WebFlux 将会尝试去加载类路径下 "static/"
目录中对应的静态资源,并返回给客户端。
与 resources()
方法不同的是,RouterFunctions.resourceLookupFunction()
提供了更灵活的方式来解析和加载静态资源,可以通过自定义的 ResourceLookupFunction 接口实现。
public class CustomResourceLookupFunction implements ResourceLookupFunction {private final ResourceLoader resourceLoader;public CustomResourceLookupFunction(ResourceLoader resourceLoader) {this.resourceLoader = resourceLoader;}@Overridepublic Resource lookup(String resourcePath) {try {return resourceLoader.getResource("classpath:" + resourcePath);} catch (Exception e) {// 处理资源加载异常throw new RuntimeException("Failed to load resource: " + resourcePath, e);}}
}
@Configuration
public class StaticResourceConfig {@Beanpublic RouterFunction<ServerResponse> staticResourceRouter(ResourceLoader resourceLoader) {ResourceLookupFunction resourceLookupFunction = new CustomResourceLookupFunction(resourceLoader);return RouterFunctions.resourceLookupFunction("/static/**", resourceLookupFunction);}
}
(4)toHttpHandler()
方法:用于将 RouterFunction 转换为标准的 HttpHandler。
@Configuration
public class MyWebFluxConfig {@Beanpublic RouterFunction<ServerResponse> route(MyWebFluxHandler handler) {// 创建主路由器RouterFunction<ServerResponse> build = RouterFunctions.route().GET("/users", handler::handleRequest).POST("/users", handler::handleRequest2).build();HttpHandler httpHandler = RouterFunctions.toHttpHandler(build);return build;}}
(5)toWebHandler()
方法:用于将 RouterFunction 转换为 WebHandler 对象,可以用于处理 HTTP 请求。
@Configuration
public class MyWebFluxConfig {@Beanpublic RouterFunction<ServerResponse> route(MyWebFluxHandler handler) {// 创建主路由器RouterFunction<ServerResponse> build = RouterFunctions.route().GET("/users", handler::handleRequest).POST("/users", handler::handleRequest2).build();WebHandler webHandler = RouterFunctions.toWebHandler(build);return build;}
}
- 路线
路由器函数按顺序求值:如果第一个路由不匹配,则求第二个,依此类推。因此,在通用路由之前声明更具体的路由是有意义的。在将路由器函数注册为Spring bean时,这一点也很重要,稍后将对此进行描述。注意,这种行为不同于基于注释的编程模型,在基于注释的编程模型中,“最具体”的控制器方法是自动选择的。
当使用路由器函数构建器时,所有定义的路由都被组合成一个RouterFunction,并从build()
中返回。还有其他方法可以将多种路由器功能组合在一起:
@Configuration
public class MyWebFluxConfig {@Beanpublic RouterFunction<ServerResponse> route(MyWebFluxHandler handler) {RouterFunction<ServerResponse> otherRoute = RouterFunctions.route().GET("v1",this::handleGetRequest).POST("v2",this::handlePostRequest).build();// 创建主路由器RouterFunction<ServerResponse> build = RouterFunctions.route().GET("/users", handler::handleRequest).POST("/users", handler::handleRequest2).add(otherRoute).build();return build;}public Mono<ServerResponse> handleGetRequest(ServerRequest request) {return ServerResponse.ok().bodyValue("GET Request Handled");}public Mono<ServerResponse> handlePostRequest(ServerRequest request) {return ServerResponse.ok().bodyValue("POST Request Handled");}}
- 嵌套路由
一组路由器函数通常有一个共享谓词,例如共享路径。在上面的示例中,共享谓词将是匹配/person
的路径谓词,由三个路由使用。
@Configuration
public class MyWebFluxConfig {@Beanpublic RouterFunction<ServerResponse> route(MyWebFluxHandler handler) {// 创建主路由器RouterFunction<ServerResponse> build = RouterFunctions.route().path("/person", builder -> builder.GET("/users", serverRequest -> handleRequest(serverRequest)).POST("/users", serverRequest -> handleRequest2(serverRequest))).build();return build;}public Mono<ServerResponse> handleRequest(ServerRequest request) {// 处理逻辑URI newUri = URI.create("https://example.com/new-url");return ServerResponse.temporaryRedirect(newUri).header("Location", newUri.toString()).build();}public Mono<ServerResponse> handleRequest2(ServerRequest request) {// 处理逻辑URI newUri = URI.create("https://example.com/new-url");return ServerResponse.temporaryRedirect(newUri).header("Location", newUri.toString()).build();}
}
除此之外你也可以使用nest()
方法。
- 过滤处理程序函数
在WebFlux中,你可以使用filter()函数来过滤处理程序函数。filter()
函数接受一个HandlerFilterFunction接口的实现作为参数,并在处理请求之前或之后执行额外的逻辑。
@Configuration
public class MyWebFluxConfig {@Beanpublic RouterFunction<ServerResponse> route(MyWebFluxHandler handler) {// 创建主路由器RouterFunction<ServerResponse> build = RouterFunctions.route().path("/person", builder -> builder.GET("/users", serverRequest -> handleRequest(serverRequest)).POST("/users", serverRequest -> handleRequest2(serverRequest))).filter(((serverRequest, handlerFunction) -> {// 在处理请求之前执行额外的逻辑System.out.println("Before handling the request");// 通过调用下一个处理程序函数继续处理请求//进行权限等校验return handlerFunction.handle(serverRequest);})).build();return build;}
}
WebFlux配置
WebFlux Java配置声明了用带注释的控制器或功能端点处理请求所需的组件,并提供了一个API来定制配置。这意味着您不需要了解由Java配置创建的底层bean。
对于配置API中无法提供的更高级的自定义,您可以通过advanced configuration Mode获得对配置的完全控制。
- 启用WebFlux配置
您可以使用@EnableWebFlux
注释,如下例所示:
@Configuration
@EnableWebFlux
public class WebConfig {
}
- WebFlux配置API
在Java配置中,您可以实现WebFluxConfigurer接口,如下例所示:
@Configuration
public class WebFluxConfig implements WebFluxConfigurer {
}
- 转换,格式
默认情况下,安装了各种数字和日期类型的格式化程序,并支持通过字段上的@NumberFormat
和@DateTimeFormat
进行自定义。
要在Java配置中注册自定义格式化程序和转换器,请使用以下命令:
@Configuration
public class WebFluxConfig implements WebFluxConfigurer {@Overridepublic void addFormatters(FormatterRegistry registry) {registry.addFormatter(new DateFormatter());}private static class DateFormatter implements org.springframework.format.Formatter<Date> {private final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd", Locale.ENGLISH);@Overridepublic Date parse(String text, Locale locale) throws ParseException {return dateFormat.parse(text);}@Overridepublic String print(Date date, Locale locale) {return dateFormat.format(date);}}
}
- 验证
默认情况下,如果类路径上存在Bean验证(例如,Hibernate验证器),LocalValidatorFactoryBean将被注册为全局验证器,以便与@Controller
方法参数上的@Valid
和@Validated
一起使用。
在你的Java配置中,你可以自定义全局Validator实例,如下面的例子所示:
@Configuration
public class WebFluxConfig implements WebFluxConfigurer {@Overridepublic Validator getValidator() {// 返回你自定义的验证器实例return new MyValidator();}private static class Person {private String name;private int age;// getters and setters}// 自定义的验证器类private static class MyValidator implements Validator {@Overridepublic boolean supports(Class<?> clazz) {return Person.class.isAssignableFrom(clazz);}@Overridepublic void validate(Object target, Errors errors) {ValidationUtils.rejectIfEmpty(errors, "name", "Name must not be empty");Person person = (Person) target;if (person.getAge() < 0) {errors.rejectValue("age", "Age must be greater than or equal to 0");}}}
}
- 内容类型解析器
您可以配置Spring WebFlux如何确定请求的媒体类型@Controller
请求中的实例。默认情况下,只有Accept标头,但您也可以启用基于参数的查询策略。
以下示例显示了如何自定义请求的内容类型解析:
@Configuration
public class WebFluxConfig implements WebFluxConfigurer {@Overridepublic void configureContentTypeResolver(RequestedContentTypeResolverBuilder builder) {builder.fixedResolver(MediaType.APPLICATION_JSON);}
}
- HTTP消息编解码器
以下示例显示了如何自定义请求和响应正文的读写方式:
@Configuration
public class WebFluxConfig implements WebFluxConfigurer {@Overridepublic void configureHttpMessageCodecs(ServerCodecConfigurer configurer) {configurer.defaultCodecs().maxInMemorySize(512 * 1024);}
}
ServerCodecConfigurer提供一组默认的读取器和编写器。您可以使用它来添加更多的读取器和写入器,自定义默认的读取器和写入器,或者完全替换默认的读取器和写入器。
- 查看解析器
以下示例显示使用FreeMarker
@Configuration
public class WebFluxConfig implements WebFluxConfigurer {@Overridepublic void configureViewResolvers(ViewResolverRegistry registry) {registry.freeMarker();}// Configure Freemarker...@Beanpublic FreeMarkerConfigurer freeMarkerConfigurer() {FreeMarkerConfigurer configurer = new FreeMarkerConfigurer();configurer.setTemplateLoaderPath("classpath:/templates");return configurer;}
}
你也可以插入任何ViewResolver实现,如下例所示:
@Configuration
public class WebFluxConfig implements WebFluxConfigurer {@Overridepublic void configureViewResolvers(ViewResolverRegistry registry) {registry.viewResolver(new CustomViewResolver());}private static class CustomViewResolver implements ViewResolver {@Overridepublic View resolveViewName(String s, Locale locale) throws Exception {return null;}}
}
- 静态资源
在下一个示例中,给定一个以/resources
开头的请求,相对路径用于查找和提供相对于类路径上的/static
的静态资源。资源的有效期为一年,以确保最大限度地使用浏览器缓存并减少浏览器发出的HTTP请求。Last-Modified标头也会被求值,如果存在,则返回304状态码。下面的清单显示了该示例:
@Configuration
public class WebFluxConfig implements WebFluxConfigurer {@Overridepublic void addResourceHandlers(ResourceHandlerRegistry registry) {registry.addResourceHandler("/resources/**").addResourceLocations("/public", "classpath:/static/").setCacheControl(CacheControl.maxAge(365, TimeUnit.DAYS));}
}
- 路径匹配
您可以自定义与路径匹配相关的选项。下面的示例显示了如何使用PathMatchConfigurer:
@Configuration
public class WebFluxConfig implements WebFluxConfigurer {@Overridepublic void configurePathMatching(PathMatchConfigurer configurer) {configurer.setUseCaseSensitiveMatch(true) // 设置用例敏感匹配.setUseTrailingSlashMatch(true); // 启用尾部斜杠匹配}
}
- WebSocketService
WebFlux的Java配置声明了一个WebSocketHandlerAdapter bean,它为调用WebSocket处理程序提供了支持。这意味着为了处理WebSocket握手请求,剩下要做的就是通过SimpleUrlHandlerMapping将WebSocketHandler映射到URL。
在某些情况下,可能需要使用提供的WebSocketService服务创建WebSocketHandlerAdapter bean,该服务允许配置WebSocket服务器属性。例如:
@Configuration
public class WebFluxConfig implements WebFluxConfigurer {@Overridepublic WebSocketService getWebSocketService() {TomcatRequestUpgradeStrategy strategy = new TomcatRequestUpgradeStrategy();strategy.setMaxSessionIdleTimeout(0L);return new HandshakeWebSocketService(strategy);}
}
- 高级配置模式
@EnableWebFlux
导入DelegatingWebFluxConfiguration:
为WebFlux应用程序提供默认的Spring配置
检测并委托WebFluxConfigurer实现来定制该配置。
对于高级模式,你可以删除@EnableWebFlux
并直接从DelegatingWebFluxConfiguration扩展,而不是实现WebFluxConfigurer,如下例所示:
@Configuration
public class WebConfig extends DelegatingWebFluxConfiguration {// ...
}
WebClient
Spring WebFlux包含一个客户端来执行HTTP请求。WebClient具有基于Reactor的实用、流畅的API,它支持异步逻辑的声明式组合,而无需处理线程或并发。它是完全无阻塞的,支持流式传输,并依赖于相同的编解码器它们也用于在服务器端对请求和响应内容进行编码和解码。
配置
创建一个WebClient是通过静态工厂方法之一:
// 创建一个 WebClient 实例
WebClient webClient = WebClient.create();
// 创建一个具有基本 URL 的 WebClient 实例
WebClient webClient = WebClient.create("https://api.example.com");
WebClient.builder()
是 WebClient 类中的静态方法,用于创建一个 WebClient.Builder 实例,通过该实例可以配置和构建 WebClient 对象。
WebClient.Builder builder = WebClient.builder();
以下是一个示例代码,演示如何创建具有基本 URL 的 WebClient 实例:
public class WebClientExample {public static void main(String[] args) {// 创建一个 WebClient.Builder 实例WebClient.Builder webClientBuilder = WebClient.builder();// 配置 WebClient.BuilderWebClient webClient = webClientBuilder.baseUrl("https://api.example.com").defaultHeader("Authorization", "Bearer token").build();// 发送 GET 请求,并处理响应Mono<String> response = webClient.get().uri("/data").accept(MediaType.APPLICATION_JSON).retrieve().bodyToMono(String.class);// 订阅响应并输出结果response.subscribe(result -> System.out.println("Response: " + result),error -> System.err.println("Error: " + error.getMessage()));}
}
编解码器具有限制用于在内存中缓冲数据以避免应用程序内存问题。默认情况下,它们被设置为256KB。若要更改默认编解码器的限制,请使用以下内容:
public class WebClientExample {public static void main(String[] args) {// 创建一个 WebClient.Builder 实例WebClient.Builder webClientBuilder = WebClient.builder();// 配置 WebClient.BuilderWebClient.Builder webClient = webClientBuilder.codecs(item->item.defaultCodecs().maxInMemorySize(2 * 1024 * 1024));}
}
- JDK HttpClient
以下示例显示了如何自定义JDK HttpClient:
HttpClient httpClient = HttpClient.newBuilder().followRedirects(Redirect.NORMAL).connectTimeout(Duration.ofSeconds(20)).build();ClientHttpConnector connector =new JdkClientHttpConnector(httpClient, new DefaultDataBufferFactory());WebClient webClient = WebClient.builder().clientConnector(connector).build();
retrieve()
retrieve()
方法可用于声明如何提取响应。例如:
public static void main(String[] args) {WebClient client = WebClient.create("https://example.org");Mono<ResponseEntity<Person>> result = client.get().uri("/persons/{id}", 1).accept(MediaType.APPLICATION_JSON).retrieve().toEntity(Person.class);//转换为一个包含响应体和状态码的实体对象。//订阅result.subscribe(item->{Person body = item.getBody();});}
或者只得到body:
public static void main(String[] args) {WebClient client = WebClient.create("https://example.org");Mono<Person> result = client.get().uri("/persons/{id}", 1).accept(MediaType.APPLICATION_JSON).retrieve().bodyToMono(Person.class);result.subscribe(item->{});}
默认情况下,4xx或5xx响应会导致WebClientResponseException,包括特定HTTP状态代码的子类。若要自定义错误响应的处理,请使用onStatus处理程序如下:
public static void main(String[] args) {WebClient client = WebClient.create("https://example.org");Mono<String> resultMono = client.get().uri("/todos/1").retrieve().onStatus(status -> status.is4xxClientError(),clientResponse -> Mono.error(new RuntimeException("Client error"))).onStatus(status -> status.is5xxServerError(),clientResponse -> Mono.error(new RuntimeException("Server error"))).bodyToMono(String.class);resultMono.subscribe(result -> System.out.println("Result: " + result),error -> System.err.println("Error: " + error.getMessage()));}
交换
exchangeToMono()
和exchangeToFlux()
方法对于需要更多控制的高级情况非常有用,例如根据响应状态对响应进行不同的解码:
public static void main(String[] args) {WebClient client = WebClient.create("https://example.org");Mono<ClientResponse> responseMono = client.method(HttpMethod.POST).uri("/posts").contentType(MediaType.APPLICATION_JSON).body(BodyInserters.fromValue("{ \"title\": \"foo\", \"body\": \"bar\", \"userId\": 1 }")).exchangeToMono(response -> {if (response.statusCode().equals(HttpStatus.CREATED)) {return Mono.just(response);} else {return response.createException().flatMap(Mono::error);}});responseMono.subscribe(response -> {System.out.println("Status code: " + response.statusCode());System.out.println("Response body: " + response.bodyToMono(String.class).block());});}
在上述示例中,我们创建了一个WebClient对象,并通过method()
方法指定了HTTP请求的方法,通过uri()
方法指定了请求的URI,通过contentType()
方法指定了请求的Content-Type,通过body()
方法设置了请求体的内容。
然后,我们使用exchangeToMono()
方法发起HTTP请求,并将响应转换为一个Mono<ClientResponse>
对象。在exchangeToMono()
方法中,我们可以根据响应的状态码进行处理。如果响应的状态码是HttpStatus.CREATED,则返回原始响应;否则,我们使用createException()
方法创建一个异常,然后使用flatMap()
方法将其转换为一个错误的Mono对象。
最后,我们订阅responseMono对象,并通过ClientResponse对象来获取响应数据。
请求正文
请求体可以从ReactiveAdapterRegistry处理的任何异步类型编码,如Mono或Kotlin Coroutines Deferred,如下例所示:
public static void main(String[] args) {WebClient client = WebClient.create("https://example.org");Mono<Void> result = client.post()//设置请求方法为 POST。.uri("/persons/{id}", id)//设置请求的 URI,并使用占位符 {id} 将 id 变量的值作为路径参数传递。.contentType(MediaType.APPLICATION_JSON).body(personMono, Person.class)//将 personMono 对象作为请求的主体,并指定主体的类型为 Person.class。.retrieve()//发送请求并获取响应。.bodyToMono(Void.class);//响应的主体转换为 Mono<Void> 对象。}
或者,如果你有实际值,你可以使用bodyValue快捷方法,如下例所示:
public static void main(String[] args) {WebClient client = WebClient.create("https://example.org");Person person = new Person();Mono<Void> result = client.post()//设置请求方法为 POST。.uri("/persons/{id}", id)//设置请求的 URI,并使用占位符 {id} 将 id 变量的值作为路径参数传递。.contentType(MediaType.APPLICATION_JSON).bodyValue(person)//传入对象.retrieve()//发送请求并获取响应。.bodyToMono(Void.class);//响应的主体转换为 Mono<Void> 对象。}
- 格式数据
若要发送表单数据,您可以提供MultiValueMap<String, String>
作为body。请注意,内容将自动设置为FormHttpMessageWriter编码的application/x-www-form-URL
。下面的示例显示了如何使用MultiValueMap<String, String>
:
MultiValueMap<String, String> formData = ... ;
WebClient client = WebClient.create("https://example.org");
Mono<Void> result = client.post().uri("/path", id).bodyValue(formData).retrieve().bodyToMono(Void.class);
- 多部分数据
要发送多部分数据,您需要提供MultiValueMap<String, ?>
,其值要么是表示部分内容的Object实例,要么是表示部分内容和报头的HttpEntity实例。MultipartBodyBuilder提供了一个方便的API来准备多部分请求。下面的示例展示了如何创建MultiValueMap<String, ?>
:
MultipartBodyBuilder builder = new MultipartBodyBuilder();
builder.part("fieldPart", "fieldValue");
builder.part("filePart1", new FileSystemResource("...logo.png"));
builder.part("jsonPart", new Person("Jason"));
builder.part("myPart", part); // Part from a server requestMultiValueMap<String, HttpEntity<?>> parts = builder.build();
一旦MultiValueMap准备好了,把它传递给WebClient最简单的方法就是通过body方法,如下面的例子所示:
MultipartBodyBuilder builder = new MultipartBodyBuilder();
Mono<Void> result = client.post().uri("/path", id).body(builder.build()).retrieve().bodyToMono(Void.class);
要按顺序传输多部分数据,您可以通过PartEvent对象。
(1)可以通过以下方式创建表单字段FormPartEvent::create
。
(2)文件上传可以通过以下方式创建FilePartEvent::create
。
Resource resource = ...
Mono<String> result = webClient.post().uri("https://example.com").body(Flux.concat(FormPartEvent.create("field", "field value"),FilePartEvent.create("file", resource)), PartEvent.class).retrieve().bodyToMono(String.class);
Filters
您可以通过WebClient注册一个客户端过滤器(ExchangeFilterFunction)。Builder来拦截和修改请求,如下例所示:
WebClient client = WebClient.builder().filter((request, next) -> {ClientRequest filtered = ClientRequest.from(request).header("foo", "bar").build();return next.exchange(filtered);}).build();
这可以用于横切关注点,例如身份验证。下面的例子使用一个过滤器通过一个静态工厂方法进行基本身份验证:
public static void main(String[] args) {WebClient client = WebClient.builder().filter(basicAuthentication("user", "password")).build();}
属性
您可以向请求添加属性。如果您希望通过过滤器链传递信息并影响过滤器对给定请求的行为,这将非常方便。例如:
WebClient client = WebClient.builder().filter((request, next) -> {Optional<Object> usr = request.attribute("myAttribute");// ...}).build();client.get().uri("https://example.org/").attribute("myAttribute", "...").retrieve().bodyToMono(Void.class);
请注意,您可以配置defaultRequest在全局回调WebClient.Builder允许您将属性插入到所有请求中的级别,例如,可以在Spring MVC应用程序中使用该级别来填充基于ThreadLocal数据。
语境
属性提供一种方便的方式将信息传递给过滤器链,但它们仅影响当前请求。如果您想要传递传播到嵌套的附加请求的信息,例如通过flatMap,或在之后执行,例如通过concatMap那你就需要使用反应堆Context。
反应堆Context需要在反应链的末端填充,以便应用于所有操作。例如:
public static void main(String[] args) {WebClient client = WebClient.builder().build();Mono<String> initialRequestMono = client.get().uri("https://api.example.com/authenticate").retrieve().bodyToMono(String.class).contextWrite(Context.of("authToken", "myAuthToken"));Mono<String> dataRequestMono = initialRequestMono.flatMap(authToken -> client.get().uri("https://api.example.com/data").headers(headers -> headers.setBearerAuth(authToken)).retrieve().bodyToMono(String.class));}
在上述示例中,我们首先创建了一个WebClient实例。然后,我们使用get()
方法创建第一个HTTP GET请求,并通过uri()
方法设置请求URI。我们使用retrieve()
方法执行请求并获取响应,然后使用bodyToMono()
方法将响应体转换为一个新的Mono对象。
接下来,我们创建一个名为initialRequestMono的新Mono对象,它在写入上下文时包含一个authToken键和相应的值。该键/值对将在后续操作符中使用。
我们通过flatMap()
方法创建一个新的Mono对象,该对象执行第二个HTTP GET请求以获取数据。我们还使用headers()
方法设置请求头,以使用上一步骤中获取的身份验证令牌。我们使用bodyToMono()
方法将响应体转换为一个新的Mono对象。
同步使用
WebClient可通过在结果末尾阻塞以同步方式使用:
public static void main(String[] args) {WebClient client = WebClient.builder().build();String response = client.get().uri("https://api.example.com/data").retrieve().bodyToMono(String.class).block();System.out.println("Response: " + response);}
你可以通过在Mono对象上调用.block()
方法来阻塞等待结果并以同步方式使用WebClient。这将阻塞当前线程直到Mono发出值或错误,并返回最终结果。
测试
测试使用WebClient,您可以使用模拟web服务器,如OkHttp MockWebServer。
RSocket
RSocket是一种新的异步、双向、基于流的通信协议,旨在提供更好的网络通信性能和更丰富的通信模式。它基于Reactive Stream和TCP之上,可以在各种语言和框架之间进行交互。
相比传统的HTTP协议,RSocket具有以下特点:
- 异步:RSocket支持异步通信模式,可以实现更高效的线程利用和更快的响应速度。
- 双向:RSocket支持双向通信,客户端和服务端都可以主动发起请求和响应。
- 基于流:RSocket使用基于流的数据传输方式,可以在单个连接上发送多个消息,从而提高通信效率。
- 多种通信模式:RSocket支持多种通信模式,包括请求/响应、流式请求/响应和流式推送等。
以下是一个简单的RSocket示例代码(使用Spring Boot和Reactor):
@SpringBootApplication
public class RSocketClientApplication {public static void main(String[] args) {SpringApplication.run(RSocketClientApplication.class, args);RSocket rsocket = RSocketFactory.connect().transport(TcpClientTransport.create("localhost", 7000)).start().block();// 发送请求/响应消息Mono<String> response = rsocket.requestResponse(DefaultPayload.create("Hello, RSocket!")).map(Payload::getDataUtf8);System.out.println(response.block()); // 输出响应数据// 发送流式请求/响应消息Flux<String> stream = rsocket.requestStream(DefaultPayload.create("Hello, RSocket!")).map(Payload::getDataUtf8);stream.subscribe(System.out::println); // 输出流式响应数据// 发送流式推送消息Flux<Payload> payloadFlux = Flux.range(1, 10).map(i -> DefaultPayload.create("Message " + i));rsocket.fireAndForget(payloadFlux).block(); // 发送流式推送消息}
}
后续有机会再详细了解。
反应库
spring-webflux
依赖于reactor-core
,并在内部使用它来组成异步逻辑和提供响应式流支持。一般来说,WebFlux api会返回Flux或Mono(因为它们是在内部使用的),并宽容地接受任何Reactive Streams Publisher实现作为输入。使用Flux和Mono是很重要的,因为它有助于表达基数——例如,是期望一个异步值还是多个异步值,这对于决策(例如,在编码或解码HTTP消息时)是必不可少的。
对于带注释的控制器,WebFlux会透明地适应应用程序选择的响应式库。这是在ReactiveAdapterRegistry的帮助下完成的,它为响应式库和其他异步类型提供了可插拔的支持。注册中心内置了对RxJava 3、Kotlin协程和SmallRye Mutiny的支持,但是您也可以注册其他的。