SpringWebFlux初步认识

反应式编程的认识

基础认识

命令式编程:后一行代码需要等待前一行代码执行完毕,也就是后面的执行任务步骤依赖于前面的执行任务。
反应式编程: 定义了一组如何处理数据的任务,这些任务是可以并行进行的,可以在处理数据的一部分子集的完成之后,就立马将这部分数据子集传递给下一个任务,同时继续处理另外的数据子集。

命令式编程的理念很简单:你可以一次一个的按照顺序将代码编写为需要遵循的指令列表,在某项指令开始执行之后,程序在开始下一项任务之前需要等待当前指令的完成。在整个处理数据的过程中,需要处理的数据都必须是完全可用的,它们是做作为一个整体看待的

反应式编程本质上是函数式和声明式的,相对于要求将需要被处理的数据看作是一个整体来看待,反应流可以在数据可用的时候立即开始处理;相对于描述一组将一次执行的步骤,反应式编程描述了数据将会流经的管道或者流。

反应式流规范

反应式流是一种规范,旨在提供无阻塞回压的异步流处理标准。
反应式流可以总结为四个接口Publisher,Subscriber,Subscription,Processor。Publisher负责生成数据,Subscriber负责接收数据,Subscription是描述订阅关系的。 Processor是Publisher和Subscriber的结合,既可以是Publisher也可以是Subscriber。

public interface Publisher<T> {//  传入一个Subscriber对象,表示Subscriber订阅这个Publisher的事件public void subscribe(Subscriber<? super T> s);
}
public interface Subscriber<T> {//  Publisher调用onSubscribe方法,会将Subscription对象传给Subscriber,Subsciber通过Subscription来管理订阅情况public void onSubscribe(Subscription s);//  Publisher发布的每个数据都会通过onNext方法传给Subscriber,如果有错误就会调用onError方法,如果Publisher没有更多的数据了,就会调用onComplete方法。public void onNext(T t);public void onError(Throwable t);public void onComplete();
}
public interface Subscription {//  Subscriber调用request方法来向Publisher请求发送数据,参数是用来表明能接收多少的数据,回压发挥作用的地方,Publisher发布的数据都会通过onNetx方法传递给Subscriber对象public void request(long n);//  Subscriber调用cancel方法来取消订阅public void cancel();
}
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

publier就是看作一个事件发布者,subscribe就是订阅这个事件的订阅者。先调用Publisher对象的subscribe方法,将Subscriber对象作为方法参数传进去,当Publisher开始发布事件的时候,会调用持有的Subscriber对象的onSubscribe方法,将Subscription对象传进去,这样Subscriber对象就有了subscription对象,Subscriber对象可以调用subscription的request方法去请求指定数量的数据,publisher会通过他所持有的Subscriber对象去调用onNetx方法,将数据传递给subscriber,如果有了错误就会调用onError方法,如果没有数据了就会调用onComplete方法,来告诉Subscriber对象数据发送完毕了。

Reactor

Reactor是反应式流规范的一种具体实现。

    public void test1() {String s1 = "zrl";String upperCase = s1.toUpperCase();String union = "Hello, " + upperCase;System.out.println(union);}public void test2() {Mono.just("zrl").map(x -> x.toUpperCase()).map(y -> "hello, " + y).subscribe(System.out::println);}

使用命令式编程模型,每行代码执行一个步骤,并且肯定在同一个线程中执行,每一步在执行完成之前都会阻塞执行线程执行下一步。

第二个方法中则不同,虽然看起来保持着按步骤执行的模型,但实际是数据会流经处理管线,在处理管线中,不能判断是在哪一个线程中执行操作,有可能在同一个线程中也可能在不同的线程中。

反应式操作

Flux和Mono是Reactor提供的最基础的构建块,这两个都实现了Publisher接口,Flux代表零个,一个或者多个数据项的管道,Mono是一个特殊的反应式类型,针对数据项不超过一个的场景。(StepVerifier:验证管道数据的类)

Flux和Mono共有500多个操作,这些操作可以大致归类为:

  • 创建操作
  • 组合操作
  • 转换操作
  • 逻辑操作
创建操作:
        // 使用固定值Flux<String> f1 = Flux.just("1","2","3","4","5","6","7");// 使用数组Flux<String> f2 = Flux.fromArray(new String[]{"1","2","3","4","5","6","7"});// 使用IterableSet<String> s1 = new HashSet<>();Flux<String> f3 = Flux.fromIterable(s1);//  streamFlux<String> f5 = Flux.fromStream(s1.stream());// 使用 Supplier<Stream<String>>Flux<String> f6 = Flux.fromStream(() -> s1.stream());// 使用Publisher(Mono 和 Flux 都实现了Publisher接口)Flux<String> f7 = Flux.from(f1);Flux<String> f8 = Flux.from(Mono.just("123"));
just,fromArray,fromIterable,fromStream;range,range是一个生成计时的方法,interval也是一个生成数字的,但是可以指定间隔时间。
组合操作:
 List<String> weekStrList = new ArrayList<>();weekStrList.add("周一");weekStrList.add("周二");weekStrList.add("周三");weekStrList.add("周四");weekStrList.add("周五");weekStrList.add("周六");weekStrList.add("周日");List<Integer>  weekNumList = new ArrayList<>();weekNumList.add(1);weekNumList.add(2);weekNumList.add(3);weekNumList.add(4);weekNumList.add(5);weekNumList.add(6);weekNumList.add(7);List<String> monthStrList = new ArrayList<>();monthStrList.add("一月");monthStrList.add("二月");monthStrList.add("三月");monthStrList.add("四月");monthStrList.add("五月");monthStrList.add("六月");monthStrList.add("七月");monthStrList.add("八月");monthStrList.add("九月");Flux<String> weekStrFlux = Flux.fromIterable(weekStrList);Flux<String> monthStrFlux = Flux.fromIterable(monthStrList);weekStrFlux.mergeWith(monthStrFlux).subscribe(System.out::print);
// 周一周二周三周四周五周六周日一月二月三月四月五月六月七月八月九月

看结果使用mergeWith方法是控制的合并的Flux的顺序,在调用mergeWitn方法的那块,如果将weekStrFlux和monthStrFlux调换下,就会发现打印的是 (一月二月。。。。周一周二)。
如果是想两个Flux之间交叉着合并,可以使用zip方法:

Flux<String> weekFlux = Flux.fromIterable(weekStrList);Flux<String> monthFlux = Flux.fromIterable(monthStrList);Flux.zip(weekFlux, monthFlux).subscribe(x -> {System.out.println("第一个是:"+x.getT1()+"     第二个是:"+x.getT2());});第一个是:周一     第二个是:一月
第一个是:周二     第二个是:二月
第一个是:周三     第二个是:三月
第一个是:周四     第二个是:四月
第一个是:周五     第二个是:五月
第一个是:周六     第二个是:六月
第一个是:周日     第二个是:七月

看打印结果可以看出来合并结果的数量是按最少的来。

first方法也是一个组合操作,它可以在多个Flux之间选择优先发布值的Flux中取值作为新的元素,存在多个合并的源Flux的发布数据的速度有快有慢的情况下就可以用first方法。

还有skip方法,以指定跳过的条目数和时间。take方法和skip方法有点相反,skip是跳过前几个,而take方法是只取前几个,也可以指定条目数量或者前多少秒发布的数据,也有filter方法,和stream流方法中一样,传入一个Predicate。

转换操作
 Flux.fromIterable(weekStrList).map(x -> {return  "转换操作"+x;}).subscribe(x -> {System.out.println(x);});转换操作周一
转换操作周二
转换操作周三
转换操作周四
转换操作周五
转换操作周六
转换操作周日Flux.fromIterable(weekStrList).flatMap(x -> {return Mono.just(x).map(y -> {return  y+"flatMap";});}).subscribe(x -> {System.out.println(x);});周一flatMap
周二flatMap
周三flatMap
周四flatMap
周五flatMap
周六flatMap
周日flatMap

转换操作的方法有map,flatMap,将已发布的数据项转换为其他形式的数据类型,两者不同的是,map操作是同步执行的,flatMap操作是异步的,flatMap并不像map操作那样简单的将一个对象转换到另一个对象,而是将每一个对象转换为一个新得Mono或者Flux,形成得Mono或者Flux会扁平为新得Flux,

逻辑操作

逻辑操作有any和all方法。

        Flux.fromIterable(weekNumList).any(x -> {return x > 5;}).doOnNext(x -> {System.out.println(x.booleanValue());}).subscribe();
trueFlux.fromIterable(weekNumList).all(x -> {return x > 5;}).doOnNext(x -> {System.out.println(x.booleanValue());}).subscribe();false

结束

上述那些方法都可以和subscribeOn方法结合使用时,flatMap操作可以释放Reactor反应式的异步能力,subscribeOn方法是用来描述如何并发的处理订阅,Scheduler支持的并发模型:

Parallel Scheduler:
并行调度器通常用于执行可以并行处理的任务,它可能使用多个线程来同时执行多个任务。
在Reactor中,可以使用Schedulers.parallel()来获取一个并行调度器实例。
并行调度器通常用于需要高吞吐量的场景,例如批量处理或数据转换。

Elastic Scheduler:
弹性调度器会根据需要动态地创建和销毁线程,以应对工作负载的变化。
使用Schedulers.elastic()可以获取一个弹性调度器实例。
弹性调度器适用于处理大量短生命周期的异步任务,它可以根据需要扩展或缩小线程池的大小。

Single Scheduler:
单线程调度器确保任务按照它们被提交的顺序依次执行。
使用Schedulers.single()可以获取一个单线程调度器实例。
单线程调度器适用于需要保证任务执行顺序的场景,例如按顺序处理事件或日志。

Immediate Scheduler:
立即调度器在提交任务的同一个线程中立即执行任务,不进行任何异步处理。
这通常用于不需要异步执行的任务,或者作为其他调度器的回退选项。

Custom Scheduler:
除了上述内置的调度器之外,你还可以创建自定义的Scheduler实现,以满足特定的并发模型或性能需求。
自定义调度器可以基于Java的ExecutorService或其他并发工具构建,并集成到Reactor的响应式流中。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/781744.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CQI-17:2021 V2 英文 、中文版。特殊过程:电子组装制造-锡焊系统评审标准

锡焊作为一个特殊的工艺过程&#xff0c;由于其材料特性的差异性、工艺参数的复杂性和过程控制的不确定性&#xff0c;长期以来一直视为汽车零部件制造业的薄弱环节&#xff0c;并将很大程度上直接导致整车产品质量的下降和召回风险的上升。 美国汽车工业行动集团AIAG的特别工…

C++蓝桥考级一级到十八级的考点内容整理

以下是C蓝桥考级一级到十八级的考点内容整理&#xff1a; C一级考点内容 C程序基本结构 初步了解C编程了解C程序基本结构&#xff1a;头文件、命名空间、主函数、基本输入输出 cin、cout C二级考点内容 数据类型与变量 掌握编程中数学表达式的计算方式基础数据类型、变量的…

2024年2月游戏手柄线上电商(京东天猫淘宝)综合热销排行榜

鲸参谋监测的线上电商&#xff08;京东天猫淘宝&#xff09;游戏手柄品牌销售数据已出炉&#xff01;2月游戏手柄销售数据呈现出强劲的增长势头。 根据鲸参谋数据显示&#xff0c;今年2月游戏手柄月销售量累计约43万件&#xff0c;同比去年上涨了78%&#xff1b;销售额累计达1…

武汉星起航:跨境电商获各大企业鼎力支持,共筑繁荣生态

随着全球化和数字化的深入发展&#xff0c;跨境电商行业逐渐成为连接国内外市场的重要桥梁。在这一进程中&#xff0c;各大企业纷纷加大对跨境电商行业的支持力度&#xff0c;通过投资、合作与创新&#xff0c;共同推动行业的繁荣与发展。武汉星起航将探讨各大企业对跨境电商行…

Linux安装python3

Linux安装python3 本文章中使用的安装包等相关文件&#xff1a; 链接: https://pan.baidu.com/s/1C4PTB6IqXtHM6XSOEMkefg 提取码: wyeq 1.编译环境安装 yum -y install zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel readline-devel tk-devel gcc mak…

Linux 基于chrony进行时钟同步方案验证

Linux 基于chrony进行时钟同步方案验证 1. 背景介绍2. 验证过程2.1 追踪配置2.2 追平记录2.2 追平时间换算 3. 疑问和思考3.1 如何统计追踪1s需要花费多长时间&#xff1f; 4. 参考文档 chrony是一个Linux系统中用于时钟同步的工具。它使用NTP&#xff08;网络时间协议&#xf…

在 Linux 中通过 SSH 执行远程命令时,无法自动加载环境变量(已解决)

问题场景 目前我的环境变量都存储在 /etc/profile 文件中&#xff0c;当我通过远程 SSH 执行一些命令时&#xff0c;提示命令找不到&#xff0c;如下所示&#xff1a; 问题出现原因 这里找到了一张出自尚硅谷的图片&#xff0c;很好的解释了该问题&#xff1a; 这是由于 Linu…

【BlossomRPC】一个完整的含源码和文档的RPC项目

文章目录 手把手教你写一个RPC协议如何自定义一个RPC协议&#xff1f;编解码器的实现服务暴露与发现服务端与客户端服务解析注解的实现服务端与客户端请求Handler接入注册中心 RPC项目源码 配置中心项目源码 ⭐网关项目源码⭐ 手把手教你写一个RPC协议 如何自定义一个RPC协议…

Java解决数位递增的数

Java解决数位递增的数 01 题目 一个正整数如果任何一个数位不大于右边相邻的数位&#xff0c;则称为 个数位递增的数。 例如 1135 是一个数位递增的数&#xff0c;而 1024 不是一个数位递增的数。 给定正整数n&#xff0c;请问在整数 1 至 n 中有多少个数位递增的数? 输入描…

Java数据结构-链表OJ题

目录 1. 移除链表元素2. 反转链表3. 返回中间结点4. 返回倒数第k个结点5. 合并两个有序链表6. 分割链表7. 回文链表8. 找相交链表的公共结点9. 判断链表是否有环10. 返回链表环的入口 老铁们好&#xff0c;学习完链表这个数据结构之后&#xff0c;怎么能少了OJ题呢&#xff1f;…

HTLM 之 vscode 插件推荐

文章目录 vscode 插件live Serverprettiersetting 保存这个文档的更改Material Theme / Material Theme icon vscode 插件 live Server prettier setting 搜索 format default 保存这个文档的更改 cmds // mac ctrls // win Material Theme / Material Theme icon 来更换…

Mysql中的那些锁

表锁和行锁 表锁&#xff1a;一锁锁整张表&#xff0c;mysql中锁定颗粒度最大的一种&#xff0c;针对非索引字段加的锁。MyISAM和InnoDb都支持。 行锁&#xff1a;一锁只锁整行&#xff0c;锁定颗粒度最小&#xff0c;针对索引字段加的锁。MyISAM不支持&#xff0c;InnoDb支持…

【No.21】蓝桥杯组合数学|数位排序|加法计数原理|乘法计数原理|排列数|组合数|抽屉原理|小蓝吃糖果|二项式定理|杨辉三角|归并排序(C++)

组合数学 数位排序 【问题描述】 小蓝对一个数的数位之和很感兴趣,今天他要按照数位之和给数排序。当两个数各个数位之和不同时,将数位和较小的排在前面,当数位之和相等时,将数值小的排在前面。 例如,2022 排在 409 前面, 因为 2022 的数位之和是 6,小于 409 的数位 之和 13。…

数据结构:Trie(前缀树/字典树)

文章目录 一、介绍Trie1.1、Trie的结点结构1.2、Trie的整体结构 二、Trie的操作2.1、Trie插入操作2.2、Trie查找操作2.3、Trie前缀匹配操作2.4、Trie删除操作 三、实战3.1、实现Trie&#xff08;前缀树&#xff09; 一、介绍Trie Trie 又称字典树、前缀树和单词查找树&#xff…

C++11 shared_from_this学习

最近学习网络变成发现一些C源码库中封装对象时会公有继承enable_shared_from_this&#xff1b; 用一个案例进行说明&#xff0c;案例代码如下&#xff1a; #include <iostream> #include <memory> #include <stdio.h>using namespace std;class C : public…

RPC(Remote Procedure Call)远程过程调用

定义 RPC&#xff08;Remote Procedure Call&#xff09;即远程过程调用&#xff0c;是一种计算机通信协议&#xff0c;它允许程序在不同的计算机之间进行通信和交互&#xff0c;就像本地调用一样。 为什么需要 RPC&#xff1f; 回到 RPC 的概念&#xff0c;RPC 允许一个程序…

快速上手Spring Cloud 十七:深入浅出的学习之旅

快速上手Spring Cloud 一&#xff1a;Spring Cloud 简介 快速上手Spring Cloud 二&#xff1a;核心组件解析 快速上手Spring Cloud 三&#xff1a;API网关深入探索与实战应用 快速上手Spring Cloud 四&#xff1a;微服务治理与安全 快速上手Spring Cloud 五&#xff1a;Spring …

2000-2021年各省人口密度数据(原始数据+结果)

2000-2021年各省人口密度数据&#xff08;原始数据结果&#xff09; 1、时间&#xff1a;2000-2021年 2、指标&#xff1a;年末常住人口、行政区划面积、人口密度 3、来源&#xff1a;国家统计局、统计年鉴 4、范围&#xff1a;31省 5、计算说明&#xff1a;人口密度年末常…

Python数据分析四

一、Python的字符串下表取值 在Python中&#xff0c;可以通过索引&#xff08;indexing&#xff09;来获取字符串中特定位置的字符。字符串的索引从0开始&#xff0c;即第一个字符的索引为0&#xff0c;第二个字符的索引为1&#xff0c;以此类推。 下面是一个示例代码&#x…

基于重写ribbon负载实现灰度发布

项目结构如下 代码如下&#xff1a; pom&#xff1a; <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0"xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocat…