注意: 本文内容于 2024-12-28 21:22:12 创建,可能不会在此平台上进行更新。如果您希望查看最新版本或更多相关内容,请访问原文地址:ReactiveStreams、Reactor、SpringWebFlux。感谢您的关注与支持!
ReactiveStreams是一个处理异步流的规范,定义了Publisher、Subscriber、Subscription、Processor接口。
Reactor是ReactiveStreams的实现,对于Publisher提供了两个核心实现——Mono与Flux。
SpringWebFlux是构建在Reactor之上的响应式Web框架。
本文源码
一、Reactive Streams
Reactive Streams 是一个用于处理异步流数据的标准规范,特别适合处理非阻塞、背压控制的场景。
所谓的背压控制,是指在异步数据流中,消费者根据自身的能力向生产者获取数据进行消费,以避免数据积压导致系统过载或者崩溃。
TCP中的拥塞控制,也可以看作是背压控制的一种实现。
1.1 API规范
Reactive Streams 的四大API接口如下
org.reactivestreams.Publisher
: 发布者接口,提供数据流。- void subscribe(Subscriber<? super T> subscriber)
org.reactivestreams.Subscriber
: 订阅者接口,接收数据流。- void onSubscribe(Subscription subscription)
- void onNext(T item)
- void onError(Throwable throwable)
- void onComplete()
org.reactivestreams.Subscription
: 订阅关系接口,提供控制机制。- void request(long n)
- void cancel()
org.reactivestreams.Processor
: 继承Publisher和Subscriber的接口。
简单绘制一个时序图,加深对整个链路的理解。
使用Publisher、Subscriber、Subscription实现一个简单的订阅功能,示例如下
以下代码,并没有异步相关的内容。只是为了学习整个API流转链路。
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;public class Example01 {private static final Logger log = LoggerFactory.getLogger(Example01.class);/*** 订阅关系*/public static Subscription getSubscription(Subscriber<? super String> subscriber, String... items) {return new Subscription() {private final AtomicBoolean canceled = new AtomicBoolean(false);private final AtomicInteger sendItems = new AtomicInteger(0);/*** request数据* 内部onNext会request后面的数据,而onComplete应该要等所有的数据消费完毕后,才会执行。* 故需要加锁保证线程安全,此处采取CAS。源码参考reactor.core.publisher.Operators.ScalarSubscription#request(long)*/@Overridepublic void request(long n) {if (n > 0) {if (canceled.get()) {return;}if (sendItems.get() >= items.length) {subscriber.onComplete();} else {subscriber.onNext(items[sendItems.getAndIncrement()]);}}}@Overridepublic void cancel() {canceled.compareAndSet(true, true);}};}/*** 发布者*/private static Publisher<String> getPublisher(String... items) {return new Publisher<String>() {@Overridepublic void subscribe(Subscriber<? super String> subscriber) {subscriber.onSubscribe(getSubscription(subscriber, items));}};}/*** 订阅者*/private static Subscriber<String> getSubscriber() {return new Subscriber<String>() {private Subscription subscription;@Overridepublic void onSubscribe(Subscription s) {this.subscription = s;log.info("Subscribed to {}", s);// 请求第一个元素subscription.request(1);}@Overridepublic void onNext(String s) {log.info("Received {}", s);// 请求下一个元素subscription.request(1);}@Overridepublic void onError(Throwable t) {log.error("Error occurred", t);}@Overridepublic void onComplete() {log.info("All items received");}};}public static void main(String[] args) {// 订阅Flux// Flux.just("first", "second", "third").delayElements(Duration.ofSeconds(2))// .subscribe(getSubscriber());/*** org.reactivestreams.Publisher: 发布者* org.reactivestreams.Subscriber: 订阅者* org.reactivestreams.Subscription: 发布者和订阅者之间的桥梁,数据流控制的核心机制。*/// 订阅自定义PublishergetPublisher("first", "second", "third", "fourth", "fifth").subscribe(getSubscriber());while (true) {}}
}
运行结果
1.2 API实现库
Reactive Streams实现如下
- Java9+ java.util.concurrent.Flow
- RxJava: Reactive Extension Java
- Reactor: Reactor Framework
Java9+中提供了java.util.concurrent.Flow
,在标准库中提供ReactiveStreams规范的接口。
ReactiveStreams内部也提供了适配JDK中Flow的适配器org.reactivestreams.FlowAdapters
。
RxJava以及Reactor,分别用于Java开发中不同领域。RxJava一般用于Android开发,Reactor一般用于Spring开发。
二、Reactor
Reactor提供了两个核心类
- reactor.core.publisher.Flux:发布0或N个元素的异步数据流
- reactor.core.publisher.Mono:发布0或1个元素的异步数据流
这两者都是Publisher,主要区别在于发送数据的数量。因此在使用上,相关的API都是差不多的。
2.1 Mono
Mono中的静态方法,用于创建Mono实例。
Mono实例中的成员方法如下
方法名 | 说明 |
---|---|
and | 合并多个 Mono 实例,等所有 Mono 完成后返回一个新的 Mono 。 |
as | 用指定的类型包裹当前 Mono ,通常用于类型转换。 |
block | 阻塞并获取 Mono 的结果,直到执行完成。 |
blockOptional | 类似于 block ,但返回 Optional 包裹的结果。 |
cache | 缓存当前 Mono 的值,使得未来的订阅者可以共享相同的结果。 |
cacheInvalidateIf | 缓存失效条件满足时重新缓存,适用于动态失效策略。 |
cacheInvalidateWhen | 在指定条件下使缓存失效。 |
cancelOn | 当给定的 Publisher 发出信号时,取消当前 Mono 。 |
cast | 强制类型转换为指定的类型。 |
checkpoint | 在流的执行过程中插入检查点,用于调试。 |
concatWith | 与另一个 Mono 或 Flux 连接,按顺序执行。 |
contextWrite | 修改 Mono 的上下文。 |
defaultIfEmpty | 如果 Mono 为空,返回默认值。 |
delayElement | 延迟发出元素的时间。 |
delaySubscription | 延迟订阅,等到指定的时间或事件发生才开始订阅。 |
delayUntil | 延迟直到指定的 Publisher 发出信号时才开始执行。 |
dematerialize | 将一个包含 Signal 的 Mono 转换为原始值的 Mono 。 |
doAfterSuccessOrError | 在执行成功或出错后执行的操作。 |
doAfterTerminate | 在 Mono 结束时执行的操作,不论成功或失败。 |
doFinally | 在 Mono 完成时执行的最终操作。 |
doFirst | 在 Mono 执行前执行的操作。 |
doOnCancel | 当订阅者取消时执行的操作。 |
doOnDiscard | 当元素被丢弃时执行的操作。 |
doOnEach | 对每个发出的信号执行操作。 |
doOnError | 当发生错误时执行的操作。 |
doOnNext | 每次元素发出时执行的操作。 |
doOnRequest | 在请求信号到达时执行的操作。 |
doOnSubscribe | 在订阅时执行的操作。 |
doOnSuccess | 当成功完成时执行的操作。 |
doOnSuccessOrError | 无论成功还是失败,都执行的操作。 |
doOnTerminate | 在终止时执行的操作。 |
elapsed | 返回每个信号的时间戳。 |
expand | 展开 Mono ,生成新的 Mono ,直到满足某个条件。 |
expandDeep | 深度展开 Mono ,通常递归调用直到满足条件。 |
filter | 过滤元素,只有符合条件的元素才会发出。 |
filterWhen | 使用 Publisher 的元素条件来过滤当前 Mono 。 |
flatMap | 转换元素,返回新的 Mono 或 Flux 。 |
flatMapIterable | 将每个元素转换为一个可迭代的元素。 |
flatMapMany | 将元素转换为 Flux 。 |
flux | 将 Mono 转换为 Flux 。 |
handle | 基于元素的条件来决定如何处理流。 |
hasElement | 判断是否包含元素。 |
hide | 隐藏 Mono 的实现细节,返回一个不可观察的 Mono 。 |
ignoreElement | 忽略元素,只关心是否完成。 |
log | 记录 Mono 中的信号,便于调试。 |
map | 将元素映射为另一个元素。 |
mapNotNull | 映射并排除空值。 |
materialize | 将信号转化为一个 Signal 对象。 |
mergeWith | 合并当前 Mono 和另一个 Mono 。 |
metrics | 获取流的度量信息。 |
name | 为 Mono 设置名称,用于调试和监控。 |
ofType | 根据类型过滤信号。 |
onErrorContinue | 在发生错误时继续执行。 |
onErrorMap | 将错误映射为其他类型。 |
onErrorResume | 在发生错误时恢复操作。 |
onErrorReturn | 在发生错误时返回默认值。 |
onErrorStop | 在发生错误时终止流。 |
onTerminateDetach | 在终止时解除与订阅者的连接。 |
or | 连接另一个 Mono ,如果当前 Mono 没有值或为空时执行。 |
publish | 启动 Mono 并返回一个共享的流。 |
publishOn | 指定在哪个线程调度上下文中执行 Mono 。 |
repeat | 重复执行 Mono ,直到满足某个条件。 |
repeatWhen | 基于另一个 Publisher 的信号来控制重复。 |
repeatWhenEmpty | 当 Mono 为空时重复执行。 |
retry | 在发生错误时重试操作。 |
retryWhen | 基于另一个 Publisher 来控制重试。 |
share | 共享执行的结果,避免重复执行。 |
single | 获取 Mono 中唯一的元素。 |
subscribe | 启动流的执行并订阅。 |
subscribeOn | 指定在哪个线程调度上下文中订阅 Mono 。 |
subscribeWith | 通过指定的 Subscriber 订阅 Mono 。 |
subscriberContext | 获取或修改订阅时的上下文。 |
switchIfEmpty | 如果 Mono 为空,则切换到另一个 Mono 。 |
tag | 为 Mono 打上标签,用于调试和日志。 |
take | 限制只获取前 N 个元素。 |
takeUntilOther | 当另一个 Publisher 发出信号时停止当前 Mono 。 |
then | 在当前 Mono 执行完后执行另一个操作。 |
thenEmpty | 在当前 Mono 执行完后返回一个空的 Mono 。 |
thenMany | 在当前 Mono 执行完后返回一个 Flux 。 |
thenReturn | 在当前 Mono 执行完后返回指定的值。 |
timed | 返回元素和其时间戳。 |
timeout | 如果 Mono 在指定时间内没有发出信号,则触发超时。 |
timestamp | 返回元素及其时间戳。 |
toFuture | 将 Mono 转换为 Future 。 |
toProcessor | 将 Mono 转换为 Processor ,适用于与 Flux 的结合。 |
toString | 返回 Mono 的字符串表示。 |
transform | 使用转换函数修改 Mono 。 |
transformDeferred | 延迟转换,直到订阅发生。 |
transformDeferredContextual | 延迟转换并访问上下文。 |
zipWhen | 与另一个 Mono 的信号配对,形成 Mono 的组合。 |
zipWith | 与另一个 Mono 的信号进行合并,形成 Mono 的组合。 |
2.2 Flux
Flux中的静态方法,用于创建Flux实例。
Flux实例中的成员方法如下
方法名 | 说明 |
---|---|
all | 判断 Flux 中的所有元素是否满足给定条件。 |
any | 判断 Flux 中是否有任何一个元素满足给定条件。 |
as | 将 Flux 转换为指定类型的 Publisher 。 |
blockFirst | 阻塞并返回 Flux 中的第一个元素。 |
blockLast | 阻塞并返回 Flux 中的最后一个元素。 |
buffer | 将 Flux 中的元素分成固定大小的缓冲区。 |
bufferTimeout | 按照指定的时间或缓冲区大小将元素分块。 |
bufferUntil | 在满足某个条件时开始一个新的缓冲区。 |
bufferUntilChanged | 将相邻相同的元素合并到同一个缓冲区。 |
bufferWhen | 根据外部 Publisher 切换缓冲区。 |
bufferWhile | 按照指定条件将元素分组为缓冲区。 |
cache | 缓存 Flux 的值,使得未来的订阅者可以共享相同的结果。 |
cancelOn | 当另一个 Publisher 发出信号时取消当前的 Flux 。 |
cast | 将 Flux 强制转换为指定的类型。 |
checkpoint | 在执行流中插入检查点,用于调试和分析。 |
collect | 收集流中的元素,按给定规则生成结果。 |
collectList | 收集 Flux 中的所有元素并返回一个 List 。 |
collectMap | 将 Flux 中的元素收集为一个 Map 。 |
collectMultimap | 将 Flux 中的元素收集为一个多值 Map 。 |
collectSortedList | 将 Flux 中的元素收集为排序的 List 。 |
concatMap | 将元素转换为 Mono ,按顺序处理。 |
concatMapDelayError | 与 concatMap 类似,但在错误发生时延迟处理。 |
concatMapIterable | 将每个元素转换为可迭代的元素,并按顺序合并。 |
concatWith | 与另一个 Flux 连接,按顺序执行。 |
concatWithValues | 连接多个值作为新的 Flux 。 |
contextWrite | 修改 Flux 的上下文。 |
count | 统计 Flux 中元素的数量。 |
defaultIfEmpty | 如果 Flux 为空,则返回默认值。 |
delayElements | 延迟元素的发出。 |
delaySequence | 延迟整个序列的发出。 |
delaySubscription | 延迟订阅,直到指定的时间或事件发生。 |
delayUntil | 延迟直到另一个 Publisher 发出信号。 |
dematerialize | 将一个包含 Signal 的 Flux 转换为原始元素的 Flux 。 |
distinct | 过滤掉重复的元素,保持唯一性。 |
distinctUntilChanged | 过滤掉相邻重复的元素。 |
doAfterTerminate | 在 Flux 完成后执行的操作。 |
doFinally | 在 Flux 终止时执行的操作。 |
doFirst | 在 Flux 执行前执行的操作。 |
doOnCancel | 在 Flux 被取消时执行的操作。 |
doOnComplete | 在 Flux 完成时执行的操作。 |
doOnDiscard | 在元素被丢弃时执行的操作。 |
doOnEach | 对 Flux 发出的每个元素执行操作。 |
doOnError | 在发生错误时执行的操作。 |
doOnNext | 每次 Flux 发出元素时执行的操作。 |
doOnRequest | 在请求信号到达时执行的操作。 |
doOnSubscribe | 在订阅时执行的操作。 |
doOnTerminate | 在 Flux 终止时执行的操作。 |
elapsed | 获取每个元素的时间戳和持续时间。 |
elementAt | 获取指定索引处的元素。 |
expand | 对每个元素进行展开,生成新的元素流。 |
expandDeep | 深度展开 Flux ,通常递归展开元素。 |
filter | 过滤出符合条件的元素。 |
filterWhen | 使用外部 Publisher 的信号过滤 Flux 中的元素。 |
flatMap | 将元素转换为 Flux ,并合并其发出的所有元素。 |
flatMapDelayError | 在发生错误时延迟元素的转换。 |
flatMapIterable | 将元素转换为可迭代的 Flux 。 |
flatMapSequential | 顺序地将元素转换为 Flux 。 |
flatMapSequentialDelayError | 顺序转换,并在发生错误时延迟。 |
getPrefetch | 获取 Flux 的预取量。 |
groupBy | 将元素按指定的键分组。 |
groupJoin | 类似 groupBy ,但用于联接多个流。 |
handle | 根据元素的条件进行流的处理。 |
hasElement | 判断 Flux 中是否包含某个元素。 |
hasElements | 判断 Flux 中是否包含多个元素。 |
hide | 隐藏 Flux 的实现细节,返回不可观察的流。 |
ignoreElements | 忽略 Flux 中的所有元素,只关心终止信号。 |
index | 返回元素在流中的索引。 |
join | 将多个 Flux 中的元素合并为一个字符串。 |
last | 获取 Flux 中的最后一个元素。 |
limitRate | 限制从流中请求的元素数量。 |
limitRequest | 限制从流中请求的最大元素数量。 |
log | 记录流中的元素,用于调试。 |
map | 将元素映射为新的类型。 |
mapNotNull | 映射并排除空值。 |
materialize | 将信号转换为 Signal 对象。 |
mergeComparingWith | 将两个 Flux 合并并根据比较条件排序。 |
mergeOrderedWith | 将两个有序的 Flux 合并。 |
mergeWith | 合并当前 Flux 和另一个 Flux 。 |
metrics | 获取流的度量信息。 |
name | 为 Flux 设置名称,便于调试。 |
next | 获取 Flux 中的下一个元素。 |
ofType | 根据类型过滤信号。 |
onBackpressureBuffer | 在背压时缓存元素。 |
onBackpressureDrop | 在背压时丢弃元素。 |
onBackpressureError | 在背压时触发错误。 |
onBackpressureLatest | 在背压时保留最新的元素。 |
onErrorContinue | 在发生错误时继续执行。 |
onErrorMap | 在错误时将其映射为其他类型。 |
onErrorResume | 在错误时恢复操作。 |
onErrorReturn | 在错误时返回默认值。 |
onErrorStop | 在错误时终止流。 |
onTerminateDetach | 在终止时分离与订阅者的连接。 |
or | 连接另一个 Flux ,如果当前 Flux 为空时执行。 |
parallel | 将 Flux 分发到多个线程进行并行处理。 |
publish | 启动 Flux 并返回一个共享流。 |
publishNext | 在流的每个元素发出时开始新的发布。 |
publishOn | 指定在哪个线程调度上下文中执行流。 |
reduce | 将流中的所有元素合并为单一值。 |
reduceWith | 使用指定初始值对元素进行合并。 |
repeat | 重复执行 Flux 直到满足某个条件。 |
repeatWhen | 基于另一个 Publisher 的信号来控制重复。 |
replay | 缓存并重播流中的元素。 |
retry | 在发生错误时重试操作。 |
retryWhen | 基于另一个 Publisher 来控制重试。 |
sample | 每隔指定时间间隔取一个元素。 |
sampleFirst | 获取流中的第一个元素。 |
sampleTimeout | 超过指定时间间隔时触发超时操作。 |
scan | 对流中的元素执行累加操作。 |
scanWith | 使用给定的初始值对元素执行累加操作。 |
share | 共享流的执行,避免重复执行。 |
shareNext | 将下一个发出的元素共享给多个订阅者。 |
single | 获取 Flux 中唯一的元素。 |
singleOrEmpty | 获取 Flux 中唯一的元素,如果为空返回空。 |
skip | 跳过流中的前 N 个元素。 |
skipLast | 跳过流中的最后 N 个元素。 |
skipUntil | 跳过直到满足某个条件的元素。 |
skipUntilOther | 跳过直到另一个 Flux 发出信号时的元素。 |
skipWhile | 跳过直到满足条件的元素。 |
sort | 对流中的元素进行排序。 |
startWith | 在流的开始处添加额外元素。 |
subscribe | 订阅并启动 Flux 。 |
subscribeOn | 指定在哪个线程调度上下文中订阅流。 |
subscribeWith | 通过指定的 Subscriber 订阅流。 |
subscriberContext | 获取或修改订阅时的上下文。 |
switchIfEmpty | 如果 Flux 为空,则切换到另一个 Flux 。 |
switchMap | 将元素转换为另一个 Flux 并切换执行。 |
switchOnFirst | 在流开始时选择一个 Flux 进行切换。 |
tag | 为 Flux 打标签,便于调试和日志。 |
take | 限制只获取前 N 个元素。 |
takeLast | 获取流中的最后 N 个元素。 |
takeUntil | 获取直到满足条件为止的元素。 |
takeUntilOther | 获取直到另一个 Flux 发出信号时的元素。 |
takeWhile | 获取满足条件的元素,直到条件不满足为止。 |
then | 在当前流完成后执行另一个操作。 |
thenEmpty | 在当前流完成后返回一个空流。 |
thenMany | 在当前流完成后返回另一个 Flux 。 |
timed | 返回每个元素的时间戳和持续时间。 |
timeout | 如果 Flux 在指定时间 |
三、SpringWebFlux
3.1 WebHandler与WebFilter
在SpringMVC中,有Servlet、Filter。
在SpringWebFlux中,有WebHandler、WebFilter,对标的其实就是Servlet API中的Servlet、Filter。甚至执行链也是相似的设计。
Servlet相关知识阅读Servlet - 言成言成啊
Filter相关知识阅读Filter和Listener - 言成言成啊
WebFilter的注册如下
@Bean
@Order(0) // 值越小,优先级越高
@ConditionalOnProperty(name = "allowAllCors.learnFilter", havingValue = "true")
public WebFilter aFilter() {/*** 在servlet中。请求的扭转是 aFilter-->bFilter-->servlet-->bFilter-->aFilter* 在webflux中同理。Filter对应WebFilter,Servlet对应WebHandler*/return (exchange, chain) -> {log.info("aFilter start");return chain.filter(exchange).doOnSuccess(t -> log.info("aFilter end"));};
}
3.2 实际案例
跨域配置
@Bean
@Order(Integer.MIN_VALUE)
@ConditionalOnProperty(name = "allowAllCors.personal", havingValue = "true")
public WebFilter personalCorsFilter(WebSocketHandlerAdapter webFluxWebSocketHandlerAdapter) {WebFilter webFilter = (exchange, chain) -> {ServerHttpRequest request = exchange.getRequest();ServerHttpResponse response = exchange.getResponse();HttpHeaders headers = response.getHeaders();//用*会导致范围过大,浏览器出于安全考虑,在allowCredentials为true时会不认*这个操作,因此可以使用如下代码,间接实现允许跨域headers.set(HttpHeaders.ACCESS_CONTROL_ALLOW_ORIGIN, request.getHeaders().getFirst("origin"));headers.set(HttpHeaders.ACCESS_CONTROL_ALLOW_HEADERS, "*");headers.set(HttpHeaders.ACCESS_CONTROL_ALLOW_METHODS, "*");//允许跨域发送cookieheaders.set(HttpHeaders.ACCESS_CONTROL_ALLOW_CREDENTIALS, "true");if ("OPTIONS".equalsIgnoreCase(request.getMethod().name())) {response.setStatusCode(HttpStatus.OK);return Mono.empty();} else {return chain.filter(exchange);}};log.info("allowAllCors.personal is set to true");return webFilter;}
全局异常拦截/定义响应格式
首先,定义通用响应格式
import lombok.Data;
import reactor.core.publisher.Mono;@Data
public class Resp<T> {private int code;private String msg;private T data;public static <T> Resp<T> ok(T t) {Resp<T> resp = new Resp<>();resp.setCode(0);resp.setMsg("成功");resp.setData(t);return resp;}public static Resp<Void> failure(String msg) {Resp<Void> resp = new Resp<>();resp.setCode(1);resp.setMsg("失败: " + msg);return resp;}public static Resp<Void> error() {Resp<Void> resp = new Resp<>();resp.setCode(500);resp.setMsg("服务器内部错误");return resp;}public static <T> Mono<Resp<T>> getSuccessResp(Mono<T> mono) {return mono.map(Resp::ok);}public static Mono<Resp<Void>> getFailureResp(String msg) {return Mono.just(failure(msg));}public static Mono<Resp<Void>> getErrorResp() {return Mono.just(error());}
}
其次,定义自定义异常DIYException。
最后,配置全局异常拦截。
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.RestControllerAdvice;
import reactor.core.publisher.Mono;
import top.meethigher.utils.Resp;@RestControllerAdvice
@Slf4j
public class GlobalExceptionHandler {@ExceptionHandler(Exception.class)public Mono<Resp<Void>> handleException(Exception e) {log.error("api occurred exception", e);return Resp.getErrorResp();}@ExceptionHandler(DIYException.class)public Mono<Resp<Void>> handleDiyException(DIYException e) {log.error("api occurred exception", e);return Resp.getFailureResp(e.getMessage());}
}