Spring响应式编程之Reactor操作符

操作符

  • 操作符Processo<T,R>
    • (1)创建操作符
    • (2)转换操作符
    • (3)组合操作符
    • (4)条件操作符
    • (5)错误处理操作符

操作符Processo<T,R>

操作符并不是响应式流规范的一部分,但为了改进响应式代码的可读性并降低开发成本,Reactor 库中的 API 提供了一组丰富的操作符,这些操作符为响应式流规范提供了最大的附加值。下面介绍一些常用的操作符。

(1)创建操作符

  • just:创建一个包含单个元素的Mono或多个元素的Flux;
  • empty:创建一个空的Flux或Mono;
  • defer:在订阅时动态创建一个新的Flux或Mono;
  • fromArray:从数组创建Flux;
  • fromIterable:从Iterable对象创建Flux;
  • range:创建一个从start到end的整数序列Flux;
  • interval:创建一个按时间间隔发布数据的Flux;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;public class CreationExample {public static void main(String[] args) {// 示例 1: 使用 Mono 创建操作符Mono<String> monoJust = Mono.just("Hello, Mono");Mono<String> monoEmpty = Mono.empty();Mono<String> monoDefer = Mono.defer(() -> Mono.just("Deferred Mono"));// 订阅 Mono 并打印结果monoJust.subscribe(System.out::println);monoEmpty.subscribe(System.out::println, Throwable::printStackTrace, () -> System.out.println("Completed"));monoDefer.subscribe(System.out::println);// 示例 2: 使用 Flux 创建操作符Flux<String> fluxJust = Flux.just("A", "B", "C");Flux<String> fluxFromArray = Flux.fromArray(new String[]{"A", "B", "C"});List<String> list = Arrays.asList("A", "B", "C");Flux<String> fluxFromIterable = Flux.fromIterable(list);Flux<String> fluxFromStream = Flux.fromStream(Stream.of("A", "B", "C"));Flux<Integer> fluxRange = Flux.range(1, 5);Flux<Long> fluxInterval = Flux.interval(Duration.ofSeconds(1));Flux<String> fluxDefer = Flux.defer(() -> Flux.just("Deferred Flux"));// 订阅 Flux 并打印结果fluxJust.subscribe(System.out::println);fluxFromArray.subscribe(System.out::println);fluxFromIterable.subscribe(System.out::println);fluxFromStream.subscribe(System.out::println);fluxRange.subscribe(System.out::println);fluxInterval.take(5).subscribe(System.out::println);fluxDefer.subscribe(System.out::println);}
}

(2)转换操作符

  • map:将Mono中的值或Flux中的每个元素转换为另一种类型;

  • flatmap:将Mono中的值或Flux中的每个元素转换为另一个Mono或另一个Publisher,并展平结果;

  • flatMapSequential:类似于flatMap,但保持顺序并并行处理;

  • flatMapMany:将Mono中的值转换为Flux;

  • collectList: 将Flux中的所有元素收集到一个List中,返回Mono<List<T>>;

  • collectMap:将Flux中的元素收集到一个Map中,返回Mono<Map<K,V>>;

  • reduce:聚合Flux中的元素,返回Mono;

  • buffer:将Flux中的元素收集到List中,按指定大小进行分组;

  • window:将Flux中的元素分组到Flux中,每组包含指定数量的元素;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;import java.util.List;public class ConversionExample {public static void main(String[] args) {// 示例 1: 使用 Mono 转换操作符Mono<Integer> mono = Mono.just("123").map(Integer::parseInt).flatMap(i -> Mono.just(i * 2)).doOnNext(System.out::println);mono.subscribe();// 示例 2: 使用 Flux 转换操作符Flux<Integer> flux = Flux.just("1", "2", "3", "4", "5").map(Integer::parseInt).filter(i -> i % 2 == 0).flatMap(i -> Flux.just(i * 2)).concatMap(i -> Flux.just(i + 1)).buffer(2).doOnNext(System.out::println);flux.subscribe();}
}

(3)组合操作符

  • zipWith:将两个Mono的值组合成一个新的Mono;
  • zip:将多个Flux的元素组合成一个Flux;
  • then:在当前Mono或Flux完成后执行另一个Mono或Flux;
  • thenReturn:在当前Mono或Flux完成后返回一个指定的值;
  • thenMany:在当前Mono完成后返回一个Flux;
  • when:等待多个Publisher完成
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;public class CombinationExample {public static void main(String[] args) {// 示例 1: 使用 Mono 组合操作符Mono<String> mono1 = Mono.just("Hello");Mono<String> mono2 = Mono.just("World");Mono<String> combined = mono1.zipWith(mono2, (a, b) -> a + " " + b);combined.subscribe(System.out::println); // 输出: Hello WorldMono<Void> when = Mono.when(mono1, mono2);when.subscribe(null, Throwable::printStackTrace, () -> System.out.println("Completed")); // 输出: Completed// 示例 2: 使用 Flux 组合操作符Flux<String> flux1 = Flux.just("A", "B", "C");Flux<String> flux2 = Flux.just("1", "2", "3");Flux<String> merged = Flux.merge(flux1, flux2);merged.subscribe(System.out::println); // 输出: A 1 B 2 C 3Flux<String> concatenated = Flux.concat(flux1, flux2);concatenated.subscribe(System.out::println); // 输出: A B C 1 2 3Flux<String> zipped = Flux.zip(flux1, flux2, (a, b) -> a + b);zipped.subscribe(System.out::println); // 输出: A1 B2 C3Flux<String> combinedLatest = Flux.combineLatest(flux1, flux2, (a, b) -> a + b);combinedLatest.subscribe(System.out::println); // 输出: C3Flux<String> started = flux1.startWith("Start");started.subscribe(System.out::println); // 输出: Start A B C}
}

(4)条件操作符

  • hasElement:判断Mono是否包含元素;
  • hasElements:判断Flux是否包含元素;
  • hasElementWith:判断Mono是否包含与给定Predicate匹配的元素;
  • all:判断Flux中的所有元素是否都满足给定的条件;
  • any:判断Flux中是否有任意一个元素满足给定的条件;
  • isEmpty:判断Flux是否为空;
  • switchIfEmpty:如果Mono或Flux为空,则切换到另一个Mono或Flux;
  • defaultIfEmpty:如果Mono或Flux为空,则返回默认值;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;public class ConditionalExample {public static void main(String[] args) {// 示例 1: 使用 Mono 条件操作符Mono<String> mono = Mono.just("Hello");Mono<Boolean> hasElement = mono.hasElement();hasElement.subscribe(System.out::println); // 输出: trueMono<String> emptyMono = Mono.<String>empty();Mono<String> switchIfEmptyMono = emptyMono.switchIfEmpty(Mono.just("Default"));switchIfEmptyMono.subscribe(System.out::println); // 输出: DefaultMono<String> defaultIfEmptyMono = emptyMono.defaultIfEmpty("Default");defaultIfEmptyMono.subscribe(System.out::println); // 输出: Default// 示例 2: 使用 Flux 条件操作符Flux<Integer> flux = Flux.just(1, 2, 3, 4);Mono<Boolean> allMatch = flux.all(i -> i > 0);allMatch.subscribe(System.out::println); // 输出: trueMono<Boolean> anyMatch = flux.any(i -> i > 3);anyMatch.subscribe(System.out::println); // 输出: trueMono<Boolean> hasElements = flux.hasElements();hasElements.subscribe(System.out::println); // 输出: trueMono<Boolean> isEmpty = flux.isEmpty();isEmpty.subscribe(System.out::println); // 输出: falseFlux<Integer> emptyFlux = Flux.<Integer>empty();Flux<Integer> switchIfEmptyFlux = emptyFlux.switchIfEmpty(Flux.just(10, 20, 30));switchIfEmptyFlux.subscribe(System.out::println); // 输出: 10 20 30Flux<Integer> defaultIfEmptyFlux = emptyFlux.defaultIfEmpty(999);defaultIfEmptyFlux.subscribe(System.out::println); // 输出: 999}
}

(5)错误处理操作符

  • onErrorResume:当发生错误时,切换到另一个数据流;
  • onErrorReturn:当发生错误时,返回一个默认值;
  • onErrorMap:将错误映射为另一个错误;
  • retry重试操作一定次数;
  • retryWhen:当错误发生时,根据提供的Publisher逻辑重试;
  • doOnError:当发生错误时执行一些额外的逻辑;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Flux;public class ErrorHandlingExample {public static void main(String[] args) {// 示例 1: 使用 Mono 错误处理操作符Mono<String> mono1 = Mono.error(new RuntimeException("Error")).onErrorResume(e -> Mono.just("Recovered"));mono1.subscribe(System.out::println); // 输出: RecoveredMono<String> mono2 = Mono.error(new RuntimeException("Error")).onErrorReturn("Default");mono2.subscribe(System.out::println); // 输出: DefaultMono<String> mono3 = Mono.error(new RuntimeException("Error")).onErrorMap(e -> new IllegalArgumentException("Mapped Error", e));mono3.subscribe(System.out::println, Throwable::printStackTrace); // 输出: Mapped ErrorMono<String> mono4 = Mono.error(new RuntimeException("Error")).retry(3);mono4.subscribe(System.out::println, Throwable::printStackTrace);Mono<String> mono5 = Mono.error(new RuntimeException("Error")).retryWhen(companion -> companion.take(3));mono5.subscribe(System.out::println, Throwable::printStackTrace);Mono<String> mono6 = Mono.error(new RuntimeException("Error")).doOnError(e -> System.out.println("Error occurred: " + e.getMessage()));mono6.subscribe(System.out::println, Throwable::printStackTrace);// 示例 2: 使用 Flux 错误处理操作符Flux<String> flux1 = Flux.just("A", "B").concatWith(Mono.error(new RuntimeException("Error"))).onErrorResume(e -> Flux.just("Recovered"));flux1.subscribe(System.out::println); // 输出: A B RecoveredFlux<String> flux2 = Flux.just("A", "B").concatWith(Mono.error(new RuntimeException("Error"))).onErrorReturn("Default");flux2.subscribe(System.out::println); // 输出: A B DefaultFlux<String> flux3 = Flux.just("A", "B").concatWith(Mono.error(new RuntimeException("Error"))).onErrorMap(e -> new IllegalArgumentException("Mapped Error", e));flux3.subscribe(System.out::println, Throwable::printStackTrace); // 输出: Mapped ErrorFlux<String> flux4 = Flux.just("A", "B").concatWith(Mono.error(new RuntimeException("Error"))).retry(3);flux4.subscribe(System.out::println, Throwable::printStackTrace);Flux<String> flux5 = Flux.just("A", "B").concatWith(Mono.error(new RuntimeException("Error"))).retryWhen(companion -> companion.take(3));flux5.subscribe(System.out::println, Throwable::printStackTrace);Flux<String> flux6 = Flux.just("A", "B").concatWith(Mono.error(new RuntimeException("Error"))).doOnError(e -> System.out.println("Error occurred: " + e.getMessage()));flux6.subscribe(System.out::println, Throwable::printStackTrace);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/32781.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Python机器学习实战】 | 基于线性回归以及支持向量机对汽车MPG与自重进行回归预测

&#x1f3a9; 欢迎来到技术探索的奇幻世界&#x1f468;‍&#x1f4bb; &#x1f4dc; 个人主页&#xff1a;一伦明悦-CSDN博客 ✍&#x1f3fb; 作者简介&#xff1a; C软件开发、Python机器学习爱好者 &#x1f5e3;️ 互动与支持&#xff1a;&#x1f4ac;评论 &…

【Android面试八股文】请你描述一下JVM的内存模型

文章目录 JVM内存模型1. 方法区(Method Area)运行时常量池(Runtime Constant Pool)2. 堆(Heap)3. 栈(Stack)4. 本地方法栈(Native Method Stack)5. 程序计数器(Program Counter Register)6. 直接内存(Direct Memory)JVM内存溢出的情况Java的口号是: “Write onc…

【Flink metric】Flink指标系统的系统性知识:以便我们实现特性化数据的指标监控与分析

文章目录 一. Registering metrics&#xff1a;向flink注册新自己的metrics1. 注册metrics2. Metric types:指标类型2.1. Counter2.2. Gauge2.3. Histogram(ing)4. Meter 二. Scope:指标作用域1. User Scope2. System Scope ing3. User Variables 三. Reporter ing四. System m…

tensorRT C++使用pt转engine模型进行推理

目录 1. 前言2. 模型转换3. 修改Binding4. 修改后处理 1. 前言 本文不讲tensorRT的推理流程&#xff0c;因为这种文章很多&#xff0c;这里着重讲从标准yolov5的tensort推理代码&#xff08;模型转pt->wts->engine&#xff09;改造成TPH-yolov5&#xff08;pt->onnx-…

如何关闭软件开机自启,提升电脑开机速度?

如何关闭软件开机自启&#xff0c;提升电脑开机速度&#xff1f;大家知道&#xff0c;很多软件在安装时默认都会设置为开机自动启动。但是&#xff0c;有很多软件在我们开机之后并不是马上需要用到的&#xff0c;开机启动的软件过多会导致电脑开机变慢。那么&#xff0c;如何关…

【break】大头哥哥做题

【break】大头哥哥做题 时间限制: 1000 ms 内存限制: 65536 KB 【题目描述】 【参考代码】 #include <iostream> using namespace std; int main(){ int sum 0;//求和int day 0;//天数 while(1){int a;cin>>a;if(a-1){break;//结束当前循环 }sum sum a; …

HTTP基本概念介绍

HTTP概述 HTTP : 超文本传输协议&#xff0c;HTTP是浏览器端Web通信的基础。 一&#xff0c; 两种架构 B/S架构&#xff1a;Browser/Server&#xff0c;浏览器/服务器架构。 B: 浏览器&#xff0c;比如Firefox 、Google 、Internet; S: 服务器&#xff0c;Apache&#xff0c…

[stm32]温湿度采集与OLED显示

一、I2C总线协议 I2C&#xff08;Inter-integrated circuit &#xff09;是一种允许从不同的芯片或电路与不同的主芯片通信的协议。它仅用于短距离通信&#xff0c;是一种用于两个或多个设备之间进行数据传输的串行总线技术&#xff0c;它可以让你在微处理器、传感器、存储器、…

6月20日(周四)A股行情总结:A股险守3000点,恒生科技指数跌1.6%

A股三大股指走弱&#xff0c;科创板逆势上扬&#xff0c;半导体板块走强&#xff0c;多股20CM涨停。中芯国际港股涨超1%。恒生科技指数跌超1%。离岸人民币对美元汇率小幅走低&#xff0c;20日盘中最低跌至7.2874&#xff0c;创下2023年11月中旬以来的新低&#xff0c;随后收复部…

287 寻找重复数-类似于环形链表II

题目 给定一个包含 n 1 个整数的数组 nums &#xff0c;其数字都在 [1, n] 范围内&#xff08;包括 1 和 n&#xff09;&#xff0c;可知至少存在一个重复的整数。 假设 nums 只有 一个重复的整数 &#xff0c;返回 这个重复的数 。 你设计的解决方案必须 不修改 数组 nums…

理解堆排序

堆排序&#xff08;Heapsort&#xff09;是一种基于堆这种数据结构的排序算法&#xff0c;但在实际实现中&#xff0c;堆通常是用数组来表示的。这种方法充分利用了数组的特性&#xff0c;使得堆的操作更加高效。下面通过详细解释和举例说明来帮助理解这种排序方式。 堆的数组…

Linux应急响应——知攻善防应急靶场-Linux(1)

文章目录 查看history历史指令查看开机自启动项异常连接和端口异常进程定时任务异常服务日志分析账户排查总结 靶场出处是知攻善防 Linux应急响应靶机 1 前景需要&#xff1a; 小王急匆匆地找到小张&#xff0c;小王说"李哥&#xff0c;我dev服务器被黑了",快救救我&…

手持弹幕LED滚动字幕屏夜店表白手灯接机微信抖音小程序开源版开发

手持弹幕LED滚动字幕屏夜店表白手灯接机微信抖音小程序开源版开发 专业版 插件版 手持弹幕小程序通常提供多种功能&#xff0c;以便用户在不同的场合如夜店、表白、接机等使用。以下是一些常见的功能列表&#xff1a; 文本输入&#xff1a; 输入要显示的文字内容&#xff0c;…

强化学习算法复现记录

目录 1.多智能体强化学习MADDPG tensorflow2版本IMAC tensorflow2版本 2.单智能体强化学习DQN pytorch版本PPO pytorch版本 1.多智能体强化学习 MADDPG tensorflow2版本 文章链接&#xff1a;tensorflow2实现多智能体强化学习算法MADDPG IMAC tensorflow2版本 文章链接&…

如何利用AopContext.currentProxy()解决事务管理中的方法调用问题

在Spring应用开发中&#xff0c;使用AOP&#xff08;面向切面编程&#xff09;来管理事务是非常常见的做法。然而&#xff0c;在某些场景下&#xff0c;尤其是在同一个类的方法内部&#xff0c;一个非事务方法直接调用另一个带有事务注解的方法时&#xff0c;可能会遇到事务不生…

初中英语优秀作文分析-005How to Plan Our Life Wisely-如何明智地规划我们的生活

PDF格式公众号回复关键字:SHCZYF005 记忆树 1 The “double reduction policy” reduces the burden on students and offers us more spare time than before, but how to plan our life wisely? 翻译 “双减政策”减轻了学生的负担&#xff0c;给了我们比以前更多的业余…

Linux进程概念(二)

上期我们已经学习了进程的基础的内容&#xff0c;已经对进程的基本概念有了了解&#xff0c;知道了进程的组成&#xff0c; 本期我们将以操作为主进一步探讨进程的相关概念。 目录 查看进程 创建进程 查看进程 查看进程主要有两种方式。 ps ajx指令 在当前目录下有名为tes…

SpringBoot-注解@ImportResource引入自定义spring的配置xml文件和配置类

1、注解ImportResource 我们知道Spring的配置文件是可以有很多个的&#xff0c;我们在web.xml中如下配置就可以引入它们&#xff1a; SprongBoot默认已经给我们配置好了Spring&#xff0c;它的内部相当于已经有一个配置文件&#xff0c;那么我们想要添加新的配置文件怎么办&am…

SkyWalking 极简入门

1. 概述 1.1 概念 SkyWalking 是什么&#xff1f; FROM Apache SkyWalking 分布式系统的应用程序性能监视工具&#xff0c;专为微服务、云原生架构和基于容器&#xff08;Docker、K8s、Mesos&#xff09;架构而设计。 提供分布式追踪、服务网格遥测分析、度量聚合和可视化一体…

【CSS in Depth 2 精译】1.5 渐进式增强

文章目录 1.5 渐进式增强1.5.1 利用层叠规则实现渐进式增强1.5.2 渐进式增强的选择器1.5.3 利用 supports() 实现特性查询启用浏览器实验特性 1.5 渐进式增强 要用好 CSS 这样一门不断发展演进中的语言&#xff0c;其中一个重要的因素就是要与时俱进&#xff0c;及时了解哪些功…