一、简介
Reactor 是运行在 JVM 上的编程框架,最大特点是完全非阻塞,能高效控制 “背压”,简单来说就是处理数据传输时速度不匹配的问题 。它能和 Java 8 里的一些功能直接搭配使用,像处理异步结果的 CompletableFuture、处理数据序列的 Stream 以及表示时间的 Duration 等。
在 Reactor 里,Flux 用来处理多个数据元素,Mono 处理零个或一个数据元素,并且它严格按照 “响应式扩展规范” 来设计。
此外,reactor-ipc 这个组件可以让不同进程间在不互相等待的情况下通信。它为 HTTP(含 Websockets)、TCP 和 UDP 这些网络协议,提供了支持背压的网络引擎,很适合用在微服务架构里,还能很好地处理响应式的编解码。
响应式编程模型
二、依赖
<dependencyManagement> <dependencies><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-bom</artifactId><version>2023.0.0</version><type>pom</type><scope>import</scope></dependency></dependencies>
</dependencyManagement>
<dependencies><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-core</artifactId> </dependency><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-test</artifactId> <scope>test</scope></dependency>
</dependencies>
三、响应式编程
响应式编程是一种关注于数据流(data streams)和变化传递(propagation of change)的异步编程方式。 这意味着它可以用既有的编程语言表达静态(如数组)或动态(如事件源)的数据流。
了解历史:
在响应式编程的发展进程中,微软率先行动,在.NET生态系统里创建了响应式扩展库(Rx)。紧接着,RxJava在JVM上实现了响应式编程。后来,JVM平台诞生了一套标准的响应式编程规范,这套规范定义了一系列标准接口和交互规则,还被整合进了Java 9(借助Flow类)。
响应式编程常被视作面向对象编程中“观察者模式”的拓展。响应式流和“迭代子模式”也有相似之处,比如都存在类似Iterable - Iterator这样的对应关系。不过,它们的核心区别在于,Iterator采用的是“拉取”方式,也就是开发者决定何时调用next()方法获取元素,属于“命令式”编程范式;而响应式流基于“推送”方式,当有新数据产生时,由发布者(Publisher)主动通知订阅者(Subscriber),这种“推送”模式是响应式编程的关键。并且,对推送数据的处理是通过声明式的方式,即开发者只需描述“控制流程”,就能定义对数据流的处理逻辑,而不是像命令式编程那样一步步明确指令。
在响应式流里,除了数据推送机制,错误处理和完成信号的定义也很完善。发布者(Publisher)不仅能向订阅者(Subscriber)推送新数据(调用onNext方法),还能推送错误信号(调用onError方法)和完成信号(调用onComplete方法)。一旦出现错误或完成信号,响应式流就会终止 。可以用下边的表达式描述:
onNext x 0..N [onError | onComplete]
3.1 阻塞是对资源的浪费
现代应用面临着大量并发用户的挑战,尽管现代硬件处理能力发展迅速,但软件性能依旧是至关重要的因素。
从宏观角度看,提升程序性能主要有两种思路:
-
并行化:利用更多线程与硬件资源,以异步方式处理任务,借此提升整体处理效率。
-
优化执行效率:在现有资源的基础上,通过优化代码逻辑、算法等手段,提高单位时间内的任务处理量。
在 Java 开发中,开发者常常采用阻塞式编程方式编写代码。这种方式本身并无不妥,当程序出现性能瓶颈时,一般会通过增加处理线程来缓解,而新增线程中的代码依旧是阻塞式的。然而,这种资源使用方式极易引发资源竞争与并发问题。
更为严重的是,阻塞式编程会造成资源浪费。举例来讲,当程序遭遇延迟(常见于 I/O 操作,如数据库读写请求或网络调用)时,对应的线程只能进入空闲状态,等待数据返回,在此期间,线程资源被白白浪费。
由此可见,并行化并非解决性能问题的万能良方。它虽然能够挖掘硬件潜力,但同时也带来了复杂性,并且容易造成资源浪费 。
3.2 异步可以解决问题吗
第二种思路——提高执行效率——可以解决资源浪费问题。通过编写 异步非阻塞 的代码, (任务发起异步调用后)执行过程会切换到另一个 使用同样底层资源 的活跃任务,然后等 异步调用返回结果再去处理。
但是在 JVM 上如何编写异步代码呢?Java 提供了两种异步编程方式:
-
回调(Callbacks) :异步方法没有返回值,而是采用一个 callback 作为参数(lambda 或匿名类),当结果出来后回调这个 callback。常见的例子比如 Swings 的 EventListener。
-
Futures :异步方法 立即 返回一个 Future<T>,该异步方法要返回结果的是 T 类型,通过 Future封装。这个结果并不是 立刻 可以拿到,而是等实际处理结束才可用。比如, ExecutorService 执行 Callable<T> 任务时会返回 Future 对象。
这些技术够用吗?并非对于每个用例都是如此,两种方式都有局限性。
回调很难组合起来,因为很快就会导致代码难以理解和维护(即所谓的“回调地狱(callback hell)”)。
考虑这样一种情景:
-
在用户界面上显示用户的5个收藏,或者如果没有任何收藏提供5个建议。
-
这需要3个 服务(一个提供收藏的ID列表,第二个服务获取收藏内容,第三个提供建议内容):
回调地狱(Callback Hell)的例子:
userService.getFavorites(userId, new Callback<List<String>>() { public void onSuccess(List<String> list) { if (list.isEmpty()) { suggestionService.getSuggestions(new Callback<List<Favorite>>() {public void onSuccess(List<Favorite> list) { UiUtils.submitOnUiThread(() -> { list.stream().limit(5).forEach(uiList::show); });}public void onError(Throwable error) { UiUtils.errorPopup(error);}});} else {list.stream() .limit(5).forEach(favId -> favoriteService.getDetails(favId, new Callback<Favorite>() {public void onSuccess(Favorite details) {UiUtils.submitOnUiThread(() -> uiList.show(details));}public void onError(Throwable error) {UiUtils.errorPopup(error);}}));}}public void onError(Throwable error) {UiUtils.errorPopup(error);}
});
Reactor改造后:
userService.getFavorites(userId) .flatMap(favoriteService::getDetails) .switchIfEmpty(suggestionService.getSuggestions()) .take(5) .publishOn(UiUtils.uiThreadScheduler()) .subscribe(uiList::show, UiUtils::errorPopup);
如果你想确保“收藏的ID”的数据在800ms内获得(如果超时,从缓存中获取)呢?在基于回调的代码中, 会比较复杂。但 Reactor 中就很简单,在处理链中增加一个 timeout 的操作符即可。
userService.getFavorites(userId).timeout(Duration.ofMillis(800)) .onErrorResume(cacheService.cachedFavoritesFor(userId)) .flatMap(favoriteService::getDetails) .switchIfEmpty(suggestionService.getSuggestions()).take(5).publishOn(UiUtils.uiThreadScheduler()).subscribe(uiList::show, UiUtils::errorPopup);
额外扩展:
Futures 比回调要好一点,但即使在 Java 8 引入了 CompletableFuture,它对于多个处理的组合仍不够好用。 编排多个 Futures 是可行的,但却不易。此外,Future 还有一个问题:当对 Future 对象最终调用 get() 方法时,仍然会导致阻塞,并且缺乏对多个值以及更进一步对错误的处理。
考虑另外一个例子,我们首先得到 ID 的列表,然后通过它进一步获取到“对应的 name 和 statistics” 为元素的列表,整个过程用异步方式来实现。
CompletableFuture 处理组合的例子
CompletableFuture<List<String>> ids = ifhIds(); CompletableFuture<List<String>> result = ids.thenComposeAsync(l -> { Stream<CompletableFuture<String>> zip =l.stream().map(i -> { CompletableFuture<String> nameTask = ifhName(i); CompletableFuture<Integer> statTask = ifhStat(i); return nameTask.thenCombineAsync(statTask, (name, stat) -> "Name " + name + " has stats " + stat); });List<CompletableFuture<String>> combinationList = zip.collect(Collectors.toList()); CompletableFuture<String>[] combinationArray = combinationList.toArray(new CompletableFuture[combinationList.size()]);CompletableFuture<Void> allDone = CompletableFuture.allOf(combinationArray); return allDone.thenApply(v -> combinationList.stream().map(CompletableFuture::join) .collect(Collectors.toList()));
});List<String> results = result.join();
assertThat(results).contains("Name NameJoe has stats 103","Name NameBart has stats 104","Name NameHenry has stats 105","Name NameNicole has stats 106","Name NameABSLAJNFOAJNFOANFANSF has stats 121");
3.3 从命令式编程到响应式编程
类似 Reactor 这样的响应式库的目标就是要弥补上述“经典”的 JVM 异步方式所带来的不足, 此外还会关注一下几个方面:
-
可编排性(Composability) 以及 可读性(Readability)
-
使用丰富的 操作符 来处理形如 流 的数据
-
在 订阅(subscribe) 之前什么都不会发生
-
背压(backpressure) 具体来说即 消费者能够反向告知生产者生产内容的速度的能力
-
高层次 (同时也是有高价值的)的抽象,从而达到 并发无关 的效果
3.3.1 可编排性与可读性
可编排性,简单来说,就是能够有条不紊地组织多个异步任务。比如把前一个任务的成果,精准传递给下一个任务作为输入;或者像 “分而治之再汇总” 的 fork - join 模式那样,同时开展多个任务并最终整合结果;又或者把异步任务当作可复用的离散组件,在整个系统中灵活调用。
这种任务编排能力,和代码的可读性、可维护性紧密相连。当异步处理的任务数量增多,逻辑愈发复杂,编写代码就像在荆棘丛中摸索,阅读代码更是难上加难。就拿常见的回调模式来说,它看似简单,可一旦处理逻辑复杂起来,回调里嵌套回调,一层又一层,就会陷入 “回调地狱”。经历过这种痛苦的开发者都知道,这样的代码简直是一团乱麻,想要读懂、分析清楚,实在是太费劲了。
而 Reactor 在这方面表现出色,它提供了多种多样的编排操作。借助这些操作,代码能够清晰直观地展现处理流程,所有操作基本都维持在同一层级,尽可能避免了令人头疼的嵌套结构 。
3.3.2 就像装配流水线
想象一下,在响应式应用里处理数据的过程,就如同产品在装配流水线上作业。Reactor就好比这条流水线的传送带,同时还充当着装配工人和机器人的角色。数据就像原材料,从源头(也就是最初的Publisher)开始流动,经过一道道工序加工,最终变成成品,等待被传送给消费者(即Subscriber)。 在这个过程中,原材料会经历各种各样的中间处理环节,可能会和其他半成品进行组装。要是流水线某个地方出现问题,比如齿轮卡住了,或者某个产品的包装耗时太长,那么这个工位就可以给上游发送信号,让它们少送点原材料过来,甚至暂停供应。
3.3.3 操作符(Operators)
在 Reactor 中,操作符(operator)就像装配线中的工位(操作员或装配机器人)。每一个操作符 对 Publisher 进行相应的处理,然后将 Publisher 包装为一个新的 Publisher。就像一个链条, 数据源自第一个 Publisher,然后顺链条而下,在每个环节进行相应的处理。最终,一个订阅者 (Subscriber)终结这个过程。请记住,在订阅者(Subscriber)订阅(subscribe)到一个 发布者(Publisher)之前,什么都不会发生。
理解了操作符会创建新的 Publisher 实例这一点,能够帮助你避免一个常见的问题, 这种问题会让你觉得处理链上的某个操作符没有起作用。
虽然响应式流规范(Reactive Streams specification)没有规定任何操作符, 类似 Reactor 这样的响应式库所带来的最大附加价值之一就是提供丰富的操作符。包括基础的转换操作, 到过滤操作,甚至复杂的编排和错误处理操作。
3.3.4 subscribe() 之前什么都不会发生
在 Reactor 中,当你创建了一条 Publisher 处理链,数据还不会开始生成。事实上,你是创建了 一种抽象的对于异步处理流程的描述(从而方便重用和组装)。
当真正“订阅(subscrib)”的时候,你需要将 Publisher 关联到一个 Subscriber 上,然后才会触发整个链的流动。这时候,Subscriber 会向上游发送一个 request 信号,一直到达源头的 Publisher。
3.3.5 背压
在响应式编程里,向上游传递信号这一机制在实现背压方面发挥着关键作用。这就好比在真实的装配线上,要是某个工位处理产品的速度跟不上流水线的整体速度,它就会向上游发送反馈信号,提醒调整生产节奏。
响应式流规范中定义的相关机制,和上述装配线的例子十分相似。订阅者在接收数据时有两种方式:一种是毫无限制地接收,任由数据源头 “开足马力”,推送所有数据;另一种则是通过 request 机制,明确告知数据源头,自己一次最多能够处理 n 个元素。
不仅如此,中间环节的操作同样会对 request 产生影响。例如,有一种缓存(buffer)操作,它能把每 10 个元素打包成一批。这时,如果订阅者只请求 1 个元素,对于数据源头而言,其实需要生成 10 个元素,因为缓存操作是按批处理的。此外,预取策略也可以在此发挥作用,比如在订阅开始前,提前生成一些元素。
如此一来,原本单一的 “推送” 模式,就演变成了 “推送 + 拉取” 的混合模式。下游如果已经准备就绪,便可以主动从上游拉取 n 个元素;但要是上游的元素尚未准备好,下游也只能乖乖等待上游推送 。
3.3.6 热(Hot) vs 冷(Cold)
在 Rx 家族的响应式库中,响应式流分为“热”和“冷”两种类型,区别主要在于响应式流如何 对订阅者进行响应:
-
一个“冷”的序列,指对于每一个 Subscriber,都会收到从头开始所有的数据。如果源头 生成了一个 HTTP 请求,对于每一个订阅都会创建一个新的 HTTP 请求。
-
一个“热”的序列,指对于一个 Subscriber,只能获取从它开始 订阅 之后 发出的数据。不过注意,有些“热”的响应式流可以缓存部分或全部历史数据。 通常意义上来说,一个“热”的响应式流,甚至在即使没有订阅者接收数据的情况下,也可以发出数据(这一点同 “Subscribe() 之前什么都不会发生”的规则有冲突)。
什么问题都可以评论区留言,看见都会回复的
如果你觉得本篇文章对你有所帮助的,把“文章有帮助的”打在评论区
多多支持吧!!!
点赞加藏评论,是对小编莫大的肯定。抱拳了!