响应式编程Reactor API大全(上)

Reactor 是一个基于响应式编程的库,主要用于构建异步和事件驱动的应用程序。Reactor 提供了丰富的 API,包括创建、转换、过滤、组合等操作符,用于处理异步数据流。以下是一些 Reactor 的主要 API 示例:

pom依赖

   <dependencyManagement><dependencies><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-bom</artifactId><version>2023.0.0</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><dependencies><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-core</artifactId></dependency><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.junit.jupiter</groupId><artifactId>junit-jupiter</artifactId><version>5.7.2</version><scope>test</scope></dependency></dependencies>

1. 创建 Mono 和 Flux

  • Mono: 用于表示包含零个或一个元素的异步序列。
  • Flux: 用于表示包含零个或多个元素的异步序列。
import reactor.core.publisher.Mono;
import reactor.core.publisher.Flux;public class ReactorCreateExample {public static void main(String[] args) {// 创建包含单个元素的 MonoMono<String> mono = Mono.just("Hello, Reactor!");// 创建包含多个元素的 FluxFlux<Integer> flux = Flux.fromArray(new Integer[]{1, 2, 3, 4, 5});mono.subscribe(System.out::println); // 输出: Hello, Reactor!flux.subscribe(System.out::println); // 输出: 1, 2, 3, 4, 5}
}

2. 转换操作符

使用转换操作符对数据流进行转换或处理。

import reactor.core.publisher.Flux;public class ReactorTransformExample {public static void main(String[] args) {Flux<Integer> source = Flux.range(1, 5);// 对每个元素进行平方操作Flux<Integer> squared = source.map(x -> x * x);squared.subscribe(System.out::println); // 输出: 1, 4, 9, 16, 25}
}

3. 过滤操作符

使用过滤操作符筛选数据流中的元素。

import reactor.core.publisher.Flux;public class ReactorFilterExample {public static void main(String[] args) {Flux<Integer> source = Flux.range(1, 5);// 筛选偶数Flux<Integer> evenNumbers = source.filter(x -> x % 2 == 0);evenNumbers.subscribe(System.out::println); // 输出: 2, 4}
}

4. 组合操作符

使用组合操作符组合多个数据流。

import reactor.core.publisher.Flux;public class ReactorCombineExample {public static void main(String[] args) {Flux<Integer> source1 = Flux.range(1, 3);Flux<Integer> source2 = Flux.range(4, 3);// 合并两个数据流Flux<Integer> merged = Flux.concat(source1, source2);merged.subscribe(System.out::println); // 输出: 1, 2, 3, 4, 5, 6}
}

这些只是 Reactor API 的一小部分示例。Reactor 提供了丰富的操作符和方法,用于处理复杂的异步数据流。开发人员可以根据具体需求选择适当的操作符进行组合,以构建出符合业务逻辑的异步处理链。

5. 错误处理

Reactor 提供了多种处理错误的方式,例如使用 onErrorResume, onErrorReturn, doOnError 等方法。

import reactor.core.publisher.Flux;public class ReactorErrorHandlingExample {public static void main(String[] args) {Flux<Integer> source = Flux.just(1, 2, 0, 4, 5);// 处理除零异常并提供默认值Flux<Integer> result = source.map(x -> 10 / x).onErrorResume(ex -> Flux.just(-1));result.subscribe(System.out::println); // 输出: 10, 5, -1}
}

6. 背压处理

Reactor 提供了背压处理的支持,允许生产者和消费者之间实现合理的数据流控制。使用 onBackpressureBuffer 或者其他背压操作符可以处理高速生产者和慢速消费者之间的数据流。

import reactor.core.publisher.Flux;public class ReactorBackpressureExample {public static void main(String[] args) {Flux<Integer> source = Flux.range(1, 1000);// 设置缓冲区大小Flux<Integer> buffered = source.onBackpressureBuffer(10);buffered.subscribe(data -> {// 模拟慢速消费者try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(data);},error -> System.err.println("Error: " + error),() -> System.out.println("Done"));}
}
  • TODO:未能实现没有背压和有背压的对比

7. 使用 Reactor WebFlux 处理 Web 请求

Reactor 还提供了 WebFlux 模块,用于处理响应式的 Web 请求。以下是一个简单的示例:

import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;@RestController
public class WebFluxController {@GetMapping("/hello")public Mono<ResponseEntity<String>> hello() {return Mono.just(ResponseEntity.ok("Hello, Reactor WebFlux!"));}
}

8. Reactor 核心概念

Reactor 中有一些核心概念,了解这些概念有助于更好地使用 Reactor API。

  • Publisher(发布者): 代表一个生产数据的源头,通常是 MonoFlux

  • Subscriber(订阅者): 用于消费数据流的组件。通过 subscribe 方法订阅 Publisher

  • Subscription(订阅): 代表 SubscriberPublisher 之间的连接。Subscriber 可以使用 Subscription 来请求数据,取消订阅等。

  • Processor(处理器): 既是 Publisher 又是 Subscriber,用于在两者之间进行转换和处理。

public class ReactorCoreConceptsExample {public static void main(String[] args) {// 创建发布者Flux<Integer> source = Flux.range(1, 5);// 创建处理器,并进行数据处理UnicastProcessor<Integer> processor = UnicastProcessor.create();source.map(value -> value * 2)  // Example: doubling the values.subscribe(processor);// 创建订阅者CustomSubscriber<Integer> subscriber = new CustomSubscriber<>();// 订阅并处理数据processor.subscribe(subscriber);}// 自定义订阅者static class CustomSubscriber<T> extends BaseSubscriber<T> {@Overrideprotected void hookOnNext(T value) {System.out.println("Processed Value: " + value);}@Overrideprotected void hookOnError(Throwable throwable) {System.err.println("Error: " + throwable);}@Overrideprotected void hookOnComplete() {System.out.println("Done");}}
}
  • UnicastProcessor.create()已弃用,可以使用Sinks.many().unicast().onBackpressureBuffer()

9. Reactor 调度器

Reactor 提供了多种调度器,用于控制异步操作的执行线程。例如,Schedulers.boundedElastic() 创建了一个弹性线程池,可以根据需要动态调整线程数。

public class ReactorSchedulersExample {public static void main(String[] args) {Flux.range(1, 5).publishOn(Schedulers.boundedElastic())  // 在弹性线程池上发布.map(x -> x * x).subscribeOn(Schedulers.parallel())  // 在并行线程池上订阅.subscribe(System.out::println);}
}
  • 经测试,大概率只使用了一个线程

11. 组合多个 Mono 或 Flux

使用 zip 操作符可以组合多个 MonoFlux,将它们的元素进行组合。

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;public class ReactorZipExample {public static void main(String[] args) {Mono<String> mono1 = Mono.just("Hello");Mono<String> mono2 = Mono.just("Reactor");// 将两个 Mono 合并为一个 FluxFlux<String> result = Flux.zip(mono1, mono2).map(tuple -> tuple.getT1() + " " + tuple.getT2());result.subscribe(System.out::println); // 输出: Hello Reactor}
}

12. 超时操作

使用 timeout 操作符可以在指定的时间内等待数据流产生结果,如果超时,则触发错误。

public class ReactorTimeoutExample {public static void main(String[] args) throws InterruptedException {Flux<Integer> source = Flux.just(1, 2, 3).delayElements(Duration.ofSeconds(2)); // 模拟延迟// 在指定时间内等待数据流产生结果,否则触发超时source.timeout(Duration.ofSeconds(1)).subscribe(data -> System.out.println("Received: " + data),error -> System.err.println("Error: " + error),() -> System.out.println("Done"));//睡一会,等待任务执行完成Thread.sleep(3333);}
}

13. 并行操作

使用 parallel 操作符可以将一个数据流并行处理,提高处理速度。

public class ReactorParallelExample {public static void main(String[] args) throws InterruptedException {Flux.range(1, 10).parallel().runOn(Schedulers.parallel()).map(x -> x * x).sequential().subscribe(System.out::println);//睡一会,等待任务执行完成Thread.sleep(1111);}
}

14. 与 Java Stream 集成

Reactor 与 Java Stream 可以方便地进行集成。

import reactor.core.publisher.Flux;
import java.util.stream.Stream;public class ReactorJavaStreamIntegrationExample {public static void main(String[] args) {Flux<Integer> flux = Flux.fromStream(Stream.of(1, 2, 3, 4, 5));flux.subscribe(System.out::println); // 输出: 1, 2, 3, 4, 5}
}

15. 使用 Mono 和 Flux 进行条件操作

Reactor 提供了条件操作符,例如 switchIfEmptyfilter,用于根据条件处理数据流。

public class ReactorConditionalOperatorsExample {public static void main(String[] args) {Flux<Integer> empty = Flux.range(1, 0);Flux<Integer> source = Flux.range(1, 5);// 如果数据流为空,则切换到另一个数据流empty.switchIfEmpty(Flux.range(6, 3)).subscribe(System.out::println); // 输出: 6,7,8// 使用 filter 过滤元素source.filter(x -> x % 2 == 0).subscribe(System.out::println); // 输出: 2, 4}
}

16. 使用 Reactor StepVerifier 进行测试

代码需要写在test测试目录下!!!

Reactor 提供了 StepVerifier 类,用于测试异步操作的行为。

public class ReactorTestingExample {public static void main(String[] args) {Flux<Integer> flux = Flux.range(1, 5);// 使用 StepVerifier 验证数据流的行为StepVerifier.create(flux).expectNext(1, 1, 3, 4, 5)//正确顺序应该是12345.expectComplete().verify();}
}

17. 使用 Mono 和 Flux 进行重试

Reactor 提供了 retryWhen 方法,结合 Backoff 操作符,用于在发生错误时进行重试。

public class ReactorRetryExample {public static void main(String[] args) throws InterruptedException {Mono<Object> source = Mono.fromCallable(() -> {throw new RuntimeException("Simulated error");})//最大重试次数为3次,初始重试间隔为1秒,并且采用指数回退策略,直到达到最大的回退时间(这里是5秒)。.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)).maxBackoff(Duration.ofSeconds(5)));source.subscribe(data -> System.out.println("Received: " + data),error -> System.err.println("Error: " + error.getMessage()));//得多睡会儿,让它跑完最大重试时间Thread.sleep(999999);}
}

19. 使用 Reactor Context 进行上下文传递

Reactor 提供了 Context 类,用于在操作链中传递上下文信息。这对于在异步操作中共享信息非常有用。

import reactor.core.publisher.Mono;
import reactor.util.context.Context;public class ReactorContextExample {public static void main(String[] args) {Mono<String> mono = Mono.deferContextual(contextView ->Mono.just("Hello, " + contextView.get("user")));String result = mono.contextWrite(Context.of("user", "John")).block();System.out.println(result); // 输出: Hello, John}
}

20. 使用 Reactor 的 doOn 方法进行副作用处理

doOn 系列方法允许在数据流的不同生命周期阶段执行副作用操作,如日志记录、统计等。

import reactor.core.publisher.Flux;public class ReactorDoOnExample {public static void main(String[] args) {Flux<Integer> source = Flux.range(1, 5);source.doOnNext(value -> System.out.println("Processing element: " + value)).doOnComplete(() -> System.out.println("Processing complete")).subscribe(System.out::println);}
}

21. 使用 Reactor 的 transform 方法进行操作链重用

transform 方法允许对操作链进行重用,将一系列操作组合为一个新的 Function

import reactor.core.publisher.Flux;import java.util.function.Function;public class ReactorTransformExample {public static void main(String[] args) {Flux<Integer> source = Flux.range(1, 5);// 定义一个操作链Function<Flux<Integer>, Flux<Integer>> customTransform = flux ->flux.filter(x -> x % 2 == 0).map(x -> x * 2);// 使用 transform 应用自定义操作链source.transform(customTransform).subscribe(System.out::println); // 输出: 4, 8}
}

学习打卡:Java学习笔记-day06-响应式编程Reactor API大全(上)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/612879.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【算法】激光炸弹(二维数组前缀和)

题目 地图上有 N 个目标&#xff0c;用整数 Xi,Yi 表示目标在地图上的位置&#xff0c;每个目标都有一个价值 Wi。 注意&#xff1a;不同目标可能在同一位置。 现在有一种新型的激光炸弹&#xff0c;可以摧毁一个包含 RR 个位置的正方形内的所有目标。 激光炸弹的投放是通过…

中间捕获事件:IntermediateCatchingEvent(TimerEvent)

一&#xff1a;TimerEvent https://monday.blog.csdn.net/article/details/134435415 应用场景&#xff1a; 定时启动流程&#xff1a;该类型节点作为流程的开始节点&#xff0c;不需要显式启动流程&#xff0c;只需要部署。节点延时审批。节点超时处理&#xff1a;对在指定…

基于YOLOv5的行人检测系统

若需要完整工程源代码&#xff0c;请私信作者 目标检测在计算机视觉领域中的重要性&#xff0c;特别是在人群流量监测方面的应用。其中&#xff0c;YOLO&#xff08;You Only Look Once&#xff09;系列算法在目标检测领域取得了显著的进展&#xff0c;从YOLO到YOLOv5的发展历…

Rust类型之字符串

字符串 Rust 中的字符串类型是String。虽然字符串只是比字符多了一个“串”字&#xff0c;但是在Rust中这两者的存储方式完全不一样&#xff0c;字符串不是字符的数组&#xff0c;String内部存储的是Unicode字符串的UTF8编码&#xff0c;而char直接存的是Unicode Scalar Value…

Sqlmap注入参数

Sqlmap注入参数 &#x1f60a;&#x1f60a;&#x1f60a;&#x1f60a;&#x1f60a;&#x1f60a;&#x1f60a;&#x1f32d;&#x1f32d;&#x1f32d;&#x1f32d;&#x1f32d;&#x1f32d;&#x1f32d;❤️❤️❤️❤️❤️❤️❤️&#x1f968;&#x1f968;&…

机器学习如何改变缺陷检测的格局?

机器学习在缺陷检测中扮演着重要的角色&#xff0c;它能够通过自动学习和识别各种缺陷的模式和特征&#xff0c;改变缺陷检测的格局。以下是机器学习在缺陷检测中的一些应用和优势&#xff1a; 自动化检测&#xff1a;机器学习技术可以自动化处理大量的数据&#xff0c;通过学…

Python基础学习(一)

Python基础语法学习记录 输出 将结果或内容呈现给用户 print("休对故人思故国&#xff0c;且将新火试新茶&#xff0c;诗酒趁年华") # 输出不换行&#xff0c;并且可以指定以什么字符结尾 print("青山依旧在",end ",") print("几度夕阳红…

服务消费端Directory目录的创建与更新

1 Directory目录概述 Directory代表多个invoker&#xff0c;其内部维护了一个list&#xff0c;并且这个list的内容是动态变化的&#xff08;对于消费端来说&#xff0c;每个invoker代表一个服务提供者&#xff09;。 在Dubbo中&#xff0c;RegistryDirectory和StaticDirector…

MySQL从0到1全教程【1】MySQL数据库的基本概念以及MySQL8.0版本的部署

1 MySQL数据库的相关概念 1.1 数据库中的专业术语 1.1.1 数据库 (DB) 数据库是指:保存有组织的数据的容器(通常是一个文数据库 (database)件或一组文件)。 1.1.2 数据库管理系统 (DBMS) 数据库管理系统(DBMS)又称为数据库软件(产品)&#xff0c;用于管理DB中的数据 注意:…

【前端素材】bootstrap5实现美食餐饮网站RegFood

一、需求分析 美食餐饮网站是指专门提供关于美食和餐饮的信息、服务和资源的在线平台。这类网站通常提供以下功能&#xff1a; 餐厅搜索和预订&#xff1a;用户可以在网站上搜索附近的餐厅&#xff0c;并预订桌位。网站会提供餐厅的详细信息&#xff0c;包括菜单、地址、电话号…

LeetCode878. Nth Magical Number

文章目录 一、题目二、题解 一、题目 A positive integer is magical if it is divisible by either a or b. Given the three integers n, a, and b, return the nth magical number. Since the answer may be very large, return it modulo 109 7. Example 1: Input: n …

JavaWeb- Tomcat

一、概念 老规矩&#xff0c;先看维基百科&#xff1a;Apache Tomcat (called "Tomcat" for short) is a free and open-source implementation of the Jakarta Servlet, Jakarta Expression Language, and WebSocket technologies.[2] It provides a "pure Ja…

微信小程序基本使用2:wxs,组件的使用以及弹窗、滚动条

WXS WXS&#xff08;WeiXin Script&#xff09;是小程序的一套脚本语言&#xff0c;结合 WXML&#xff0c;可以构建出页面的结构。 可以在模版中内联少量处理脚本&#xff0c;丰富模板的数据预处理能力。 wsx 在IOS设备上性能是JavaScript的2-20倍 内嵌式 <view><…

SpringBoot+Vue药品ADR不良反应智能监测系统源码

药品不良反应&#xff08;Adverse Drug Reaction&#xff0c;ADR&#xff09;是指合格药品在正常用法用量下出现的与用药目的无关的有害反应&#xff0c;不包括超说明书用药、药品质量问题等导致的不良后果。 ADR智能监测系统开发环境 ❀技术架构&#xff1a;B/S ❀开发语言&…

补充一:C#中的Queue

队列是一种基本的数据结构&#xff0c;按照先进先出&#xff08;FIFO&#xff09;的原则组织元素。在队列中&#xff0c;新元素从队尾入队&#xff0c;而从队头出队&#xff0c;确保了先进入队列的元素首先被处理。这使得队列特别适合模拟排队、任务调度等场景。 在编程中&…

深度剖析Redis:从基础到高级应用

目录 引言 1、 Redis基础 1.1 Redis数据结构 1.1.1 字符串&#xff08;String&#xff09; 1.1.2 列表&#xff08;List&#xff09; 1.1.3 集合&#xff08;Set&#xff09; 1.1.4 散列&#xff08;Hash&#xff09; 1.1.5 有序集合&#xff08;Sorted Set&#xff09;…

常见类型的yaml文件如何编写?--kind: Job|CronJob

本次介绍两个关联度很高的类型&#xff0c;Job和CronJob。 Job基本说明 在 Kubernetes 中&#xff0c;Job 是一种用于运行一次性任务的资源对象。它用于确保在集群内部执行某个任务&#xff0c;即使任务运行失败或其中一个 Pod 发生故障时&#xff0c;也会进行重试。Job 可以…

CRM系统进行市场营销,这些功能可以派上用场。

现如今的企业想要做好营销&#xff0c;不仅仅依赖于一句玄之又玄的slogan亦或是电子邮件的狂轰乱炸。要想做好市场活动营销需要一个前提——那就是CRM管理系统发挥作用的地方。但CRM系统关于营销的功能太多了——对于不太了解的人来说很容易不知所措。那么&#xff0c;CRM系统做…

如何上传苹果ipa安装包?

目录 引言 摘要 第二步&#xff1a;打开appuploader工具 第二步&#xff1a;打开appuploader工具&#xff0c;第二步&#xff1a;打开appuploader工具 第五步&#xff1a;交付应用程序&#xff0c;在iTunes Connect中查看应用程序 总结 引言 在将应用程序上架到苹果应用…

PTA——换硬币

将一笔零钱换成5分、2分和1分的硬币&#xff0c;要求每种硬币至少有一枚&#xff0c;有几种不同的换法&#xff1f; 输入格式: 输入在一行中给出待换的零钱数额x∈(8,100)。 输出格式: 要求按5分、2分和1分硬币的数量依次从大到小的顺序&#xff0c;输出各种换法。每行输出…