【响应式编程】Reactor 常用操作符与使用指南

文章目录

    • 一、创建操作符
      • 1. `just` —— 创建包含指定元素的流
      • 2. `fromIterable` —— 从集合创建 Flux
      • 3. `empty` —— 创建空的 Flux 或 Mono
      • 4. `fromArray` —— 从数组创建 Flux
      • 5. `fromStream` —— 从 Java 8 Stream 创建 Flux
      • 6. `create` —— 使用 FluxSink 手动发射元素
      • 7. `generate` —— 使用状态生成元素,适用于同步场景
      • 8. `fromFuture` —— 从 CompletableFuture 创建 Mono
      • 9. `interval` —— 创建周期性发射元素的 Flux
      • 10. `timer` —— 创建延迟发射的 Mono
    • 二、转换操作符
      • 1. `map` —— 映射每个元素为新值
      • 2. `flatMap` —— 扁平化异步流,将每个元素映射为异步 Publisher
      • 3. `concatMap` —— 顺序执行映射为 Publisher 的异步流
    • 三、过滤操作符
      • 1. `filter` —— 按条件过滤元素
      • 2. `take` —— 获取前 N 个元素
      • 3. `skip` —— 跳过前 N 个元素
    • 四、组合操作符
      • 1. `concat` —— 按顺序合并多个 Flux
      • 2. `merge` —— 并发合并多个 Flux(无序)
      • 3. `zip` —— 按索引组合多个 Flux 的元素
    • 五、错误处理操作符
      • 1. `onErrorReturn` —— 出错时返回默认值
      • 2. `onErrorResume` —— 出错时切换备用流
      • 3. `retry` —— 出错时重试指定次数
    • 六、延迟执行与懒加载:`Mono.defer` 和 `Flux.defer`:被订阅时才执行
      • `Mono.defer` —— 懒加载 Mono,直到subscribe时才创建执行
      • `Flux.defer` —— 懒加载 Flux,每次订阅时重新执行逻辑

Reactor 是一个用于构建反应式应用程序的 Java 库,提供了丰富的操作符(算子)来处理反应式流(FluxMono)。本文详细介绍了 Reactor 中常用的创建、转换、过滤、组合和错误处理操作符,以及一些高级用法示例。


一、创建操作符

1. just —— 创建包含指定元素的流

Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5);
Mono<String> mono = Mono.just("Hello");

2. fromIterable —— 从集合创建 Flux

List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
Flux<Integer> flux = Flux.fromIterable(list);

3. empty —— 创建空的 Flux 或 Mono

Flux<Integer> emptyFlux = Flux.empty();
Mono<String> emptyMono = Mono.empty();

4. fromArray —— 从数组创建 Flux

Integer[] numbers = {1, 2, 3, 4, 5};
Flux<Integer> flux = Flux.fromArray(numbers);

5. fromStream —— 从 Java 8 Stream 创建 Flux

Stream<Integer> stream = Arrays.asList(1, 2, 3, 4, 5).stream();
Flux<Integer> flux = Flux.fromStream(stream);

6. create —— 使用 FluxSink 手动发射元素

Flux<Integer> flux = Flux.create(sink -> {for (int i = 0; i < 5; i++) {sink.next(i);}sink.complete();
});

7. generate —— 使用状态生成元素,适用于同步场景

Flux<Integer> flux = Flux.generate(() -> 0, (state, sink) -> {sink.next(state);if (state == 4) sink.complete();return state + 1;
});

8. fromFuture —— 从 CompletableFuture 创建 Mono

CompletableFuture<String> future = CompletableFuture.completedFuture("Hello");
Mono<String> mono = Mono.fromFuture(future);

9. interval —— 创建周期性发射元素的 Flux

Flux<Long> intervalFlux = Flux.interval(Duration.ofSeconds(1));

10. timer —— 创建延迟发射的 Mono

Mono<Long> timerMono = Mono.timer(Duration.ofSeconds(2));

 

二、转换操作符

1. map —— 映射每个元素为新值

Flux<Integer> squared = Flux.just(1, 2, 3).map(n -> n * n);

2. flatMap —— 扁平化异步流,将每个元素映射为异步 Publisher

Flux<Integer> result = Flux.just(1, 2, 3).flatMap(n -> Mono.just(n * 2));

3. concatMap —— 顺序执行映射为 Publisher 的异步流

Flux<Integer> result = Flux.just(1, 2, 3).concatMap(n -> Mono.just(n * 2));

 

三、过滤操作符

1. filter —— 按条件过滤元素

Flux<Integer> evens = Flux.just(1, 2, 3, 4).filter(n -> n % 2 == 0);

2. take —— 获取前 N 个元素

Flux<Integer> firstThree = Flux.just(1, 2, 3, 4, 5).take(3);

3. skip —— 跳过前 N 个元素

Flux<Integer> skipped = Flux.just(1, 2, 3, 4, 5).skip(2);

 

四、组合操作符

1. concat —— 按顺序合并多个 Flux

Flux<Integer> combined = Flux.concat(Flux.just(1, 2), Flux.just(3, 4));

2. merge —— 并发合并多个 Flux(无序)

Flux<Integer> merged = Flux.merge(Flux.just(1, 2), Flux.just(3, 4));

3. zip —— 按索引组合多个 Flux 的元素

Flux<String> zipped = Flux.zip(Flux.just(1, 2), Flux.just(3, 4), (a, b) -> a + ":" + b);

 

五、错误处理操作符

1. onErrorReturn —— 出错时返回默认值

Flux<Integer> result = Flux.just(1, 2, 3).map(n -> {if (n == 2) throw new RuntimeException("error");return n;}).onErrorReturn(-1);

2. onErrorResume —— 出错时切换备用流

Flux<Integer> result = Flux.just(1, 2, 3).map(n -> {if (n == 2) throw new RuntimeException("error");return n;}).onErrorResume(e -> Mono.just(-1));

3. retry —— 出错时重试指定次数

Flux<Integer> result = Flux.just(1, 2, 3).map(n -> {if (n == 2) throw new RuntimeException("error");return n;}).retry(2);

 

六、延迟执行与懒加载:Mono.deferFlux.defer:被订阅时才执行

Mono.defer —— 懒加载 Mono,直到subscribe时才创建执行

Mono<String> deferredMono = Mono.defer(() -> {System.out.println("Generating value...");return Mono.just("Deferred Result");
});

只有当 subscribe() 被调用时,Mono.defer 中的逻辑才会真正执行。这对于需要确保执行时机晚于前一步完成场景特别重要,比如:

Mono.defer(() -> readQaResultType()).subscribe(result -> System.out.println("QA Result: " + result));

在这段代码中,读取 qaResultType 的操作只会在前面的步骤(例如数据预处理)完全完成后才被触发

Flux.defer —— 懒加载 Flux,每次订阅时重新执行逻辑

Flux<Integer> deferredFlux = Flux.defer(() -> {System.out.println("Evaluating source...");return Flux.just(1, 2, 3);
});

每次订阅都会重新生成数据,适用于带有状态的源或依赖最新上下文的处理逻辑。


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/76612.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

从静态绑定驱动模型到现代设备模型 —— 一次驱动架构的进化之旅

&#x1f50d; B站相应的视屏教程&#xff1a; &#x1f4cc; 内核&#xff1a;博文视频 - 从静态绑定驱动模型到现代设备模型 在 Linux 内核的发展历程中&#xff0c;设备驱动结构经历了从"硬编码 手动注册"的早期实现方式&#xff0c;到"设备模型统一管理&qu…

Embedding质量评估、空间塌缩、 Alignment Uniformity

Embedding质量的评估和空间塌缩的解决是自然语言处理&#xff08;NLP&#xff09;和推荐系统领域的关键问题。以下是综合多篇研究的总结&#xff1a; 一、Embedding质量评估方法 基准测试与任务指标 MTEB/C-MTEB&#xff1a;使用多语言或中文的基准测试集&#xff08;如58个数据…

批量给dwg显示略缩图_c#插件实现(com)

如果&#xff0c;cad文件无略缩图&#xff1a; AutoCAD2021版本以上&#xff0c;命令行输入"netload "加载此dll插件&#xff0c;然后输入 “lst”&#xff0c;选择文件夹&#xff0c;即可一键实现给dwg增加略缩图。 效果如下&#xff1a; 附部分代码&#xff1a; …

婴幼儿托育服务与管理实训室:托育未来的基石

在社会对婴幼儿托育服务的重视程度不断加深的当下&#xff0c;专业托育人才的需求急剧增长。婴幼儿托育服务与管理专业作为培育这类人才的关键途径&#xff0c;要求学生熟练掌握婴幼儿身心发展、饮食营养以及卫生保健等基础知识&#xff0c;同时具备全面的照护与管理能力。要实…

(自用)若依生成左树右表

第一步&#xff1a; 在数据库创建树表和单表&#xff1a; SQL命令&#xff1a; 商品表 CREATE TABLE products (product_id INT AUTO_INCREMENT PRIMARY KEY,product_name VARCHAR(255) , price DECIMAL(10, 2) , stock INT NOT NULL, category_id INT NOT NULL); 商品分类…

Linux:DNS服务配置(课堂实验总结)

遇到的问题&#xff0c;都有解决方案&#xff0c;希望我的博客能为你提供一点帮助。 操作系统&#xff1a;rocky Linux 9.5 ​​一、配置DNS服务器的核心步骤​​ 步骤 1&#xff1a;安装 BIND 软件​​ ​​检查是否安装​​&#xff1a; rpm -qa | grep "^bind"…

搭建一个Spring Boot聚合项目

1. 创建父项目 打开IntelliJ IDEA&#xff0c;选择 New Project。 在创建向导中选择 Maven&#xff0c;确保选中 Create from archetype&#xff0c;选择 org.apache.maven.archetypes:maven-archetype-quickstart。 填写项目信息&#xff1a; GroupId&#xff1a;com.exampl…

若依前后端分离版运行教程、打包教程、部署教程

后端打包教程 注意&#xff1a;需要先运行redis 2、前端运行教程 2.1安装依赖 2.2运行 打开浏览器查看,地址&#xff1a;http://localhost:80 3、前端打包教程 3.1打包 3.2运行打包好的文件&#xff0c;先找到打包好的文件 这是nginx的文件结构 将打包好的文件放到html目录下…

SpringAi 会话记忆功能

在使用chatGPT&#xff0c;豆包等产品后&#xff0c;就会发现他们的会话有“记忆”功能。 那么我们用API接口的话&#xff0c;这个是怎么实现的呢&#xff1f; 属于比较粗暴的方式&#xff0c;把之前的内容与新的提示词一起再次发给大模型。让我们看到他们有记忆功能。 下面介绍…

基于Python的经济循环模型构建与可视化案例

一、代码结构概览 该代码构建了一个包含经济数据生成、可视化分析和政策模拟的交互式经济系统仿真平台&#xff0c;主要包括三大模块&#xff1a; 多部门经济数据生成&#xff1a;模拟包含产业关联的复杂经济数据 增强型可视化&#xff1a;提供多维度的经济数据分析视图 Das…

第十六届蓝桥杯大赛软件赛省赛 Python 大学 B 组 部分题解

题面链接Htlang/2025lqb_python_b 个人觉得今年这套题整体比往年要简单许多&#xff0c;但是G题想简单了出大问题&#xff0c;预估50101015120860&#xff0c;道阻且长&#xff0c;再接再厉 A: 攻击次数 答案&#xff1a;103&#xff1f;181&#xff1f;题目没说明白每回合是…

C++基础精讲-05

文章目录 1.构造函数初始化列表1.1 初始化列表的使用1.2 有参构造函数的默认值 2.对象所占空间大小2.1 大小的计算2.2 内存对齐机制 3. 析构函数3.1 基本概念3.2 总结 4.valgrind工具集4.1 介绍4.2 memcheck的使用 5. 拷贝构造函数5.1 拷贝构造函数定义5.2 浅拷贝/深拷贝5.3 拷…

文章记单词 | 第28篇(六级)

一&#xff0c;单词释义 shirt /ʃɜːt/ n. 衬衫&#xff1b;衬衣commonly /ˈkɒmənli/ adv. 通常地&#xff1b;一般地&#xff1b;普遍地pick /pɪk/ v. 挑选&#xff1b;采摘&#xff1b;捡起&#xff1b;选择&#xff1b;n. 选择&#xff1b;鹤嘴锄&#xff1b;精华com…

安装低版本Pytorch GPU

网上很多教程都是自动安装&#xff0c;不指定版本&#xff0c;其实有大问题。而且torch、torchvision、torchaudio的版本必须是对应&#xff0c;所以一旦版本不对&#xff0c;就可能会出现各种问题。 其实Pytorch官网就已经给出了安装低版本的教程 登入Pytorch官网 点击previo…

2025认证杯挑战赛B题【 谣言在社交网络上的传播 】原创论文讲解(含完整python代码)

大家好呀&#xff0c;从发布赛题一直到现在&#xff0c;总算完成了认证杯数学中国数学建模网络挑战赛第一阶段B题目谣言在社交网络上的传播完整的成品论文。 本论文可以保证原创&#xff0c;保证高质量。绝不是随便引用一大堆模型和代码复制粘贴进来完全没有应用糊弄人的垃圾半…

并发编程--互斥锁与读写锁

并发编程–互斥锁与读写锁 文章目录 并发编程--互斥锁与读写锁1. 基本概念2. 互斥锁2.1 基本逻辑2.2 函数接口2.3示例代码12.4示例代码2 3. 读写锁3.1 基本逻辑3.2示例代码 1. 基本概念 互斥与同步是最基本的逻辑概念&#xff1a; 互斥指的是控制两个进度使之互相排斥&#x…

亲手打造可视化故事线管理工具:开发全流程、难点突破与开发过程经验总结

亲手打造可视化故事线管理工具&#xff1a;开发全流程、难点突破与开发过程经验总结 作为还没入门的业余编程爱好者&#xff0c;奋战了2天&#xff0c;借助AI开发一款FLASK小工具&#xff0c;功能还在完善中&#xff08;时间轴可以跟随关联图缩放&#xff0c;加了一个用C键控制…

网络攻防技术-虚拟机安装和nmap端口扫描

文章是博主上实验课做的实验和心得体会&#xff0c;有些高深的地方我可能也比较一知半解&#xff0c;欢迎来交流。全文参考课程所习得&#xff0c;纯粹梳理知识点和分享&#xff0c;如有不妥请联系修改。 文章侧重实验部分&#xff0c;也会讲述实验相关的理论知识。理论后期如果…

中断的硬件框架

今天呢&#xff0c;我们来讲讲中断的硬件框架&#xff0c;这里会去举3个开发板&#xff0c;去了解中断的硬件框架&#xff1a; 中断路径上的3个部件&#xff1a; 中断源 中断源多种多样&#xff0c;比如GPIO、定时器、UART、DMA等等。 它们都有自己的寄存器&#xff0c;可以…

动手学深度学习:手语视频在VGG模型中的测试

前言 其他所有部分同上一篇AlexNet一样&#xff0c;所以就不再赘诉&#xff0c;直接看VGG搭建部分。 模型 VGG是第一个采取块进行模块化搭建的模型。 def vgg_block(num_convs,in_channels,out_channels):layers[]for _ in range(num_convs):layers.append(nn.Conv2d(in_ch…