响应式流规范解析

在互联网应用构建过程中,我们知道可以采用异步非阻塞的编程模型来提高服务的响应能力。而为了实现异步非阻塞,我们可以引入数据流,并对数据的流量进行控制。我们来考虑一个场景,如果数据消费的速度跟不上数据发出的速度,会发生什么现象呢?让我们来看一下。


显然,如果我们不对生产者生产的数据流量进行控制,那么消费者中的数据就会不断的积压,从而导致出现服务不可用等异常情况。那么如何解决这一问题呢?这就需要引入响应式编程中的一个核心概念,即背压。

背压和响应式流

在数据流中,我们注意到当消费者无法及时处理超过其承受能力的数据量时,需要有一个反馈机制,告知生产者调整生产数据的速度。从数据的流转方向而言,这种反馈机制由位于数据流下游的消费者进行发起,代表着消费者处理数据的压力,所以被称为Back Pressure,翻译成中文就是背压。


一旦有了背压机制,位于下游的消费者就可以通知位于上游的生产者合理控制数据生产的速度,从而确保消费者能够正常处理数据流中的数据。这样,消费者就不会因为数据流量过大而出现问题。

明白了背压的概念,那么问题又来了,如何实现背压呢?这就需要引入一套完整的规范,基于这套规范,开发人员可以获取所有实现背压机制所需要的编程组件,这套规范就是响应式流(Reactive Stream)规范。

响应式流规范为如何实现基于背压的数据流提供了一种事实上的标准。在该规范中,我们通过这样一种机制来实现背压:消费者发送一种异步请求向生产者反馈自己所能处理的数据量,然后生产者同样通过一种异步响应的方式向消费者发送对应的数据量。

响应式流规范

从表现形式上讲,响应式流规范为开发人员提供了一批事先约定好的接口定义。通过这些接口中所包含的各个操作方法,数据流就能从生产者异步传递到消费者。同时,生产者所产生的数据也不会导致消费者无法正常消费。

响应式流是一个非常简洁的规范,只包含了Publisher、Subscriber、Subscription和Processor这四个核心接口。其中Publisher和Subscriber分别充当了生产者和消费者的角色。

我们先来看代表生产者的Publisher接口,该接口的作用就是生成一定数量的数据并进行发送,而发送数据的前提是接收到来自订阅者的请求。

public interface Publisher<T> {

    public void subscribe(Subscriber<? super T> s);

}

可以看到在Publisher接口的subscribe方法中传入了一个Subscriber,这个Subscriber代表的就是订阅者。另一方面,订阅者想要发送订阅请求的前提是需要明确该次请求的数据量,这部分内部被封装在一个订阅令牌中,Subscriber接口的定义如下所示。

public interface Subscriber<T> {

    public void onSubscribe(Subscription s);

    public void onNext(T t);

    public void onError(Throwable t);

    public void onComplete();

}

可以看到在Subscriber接口中包含了一个onSubscribe方法,该方法需要传入代表订阅令牌的Subscription参数。结合Publisher接口和Subscriber接口,我们注意到当执行Publisher接口的subscribe方法时,Subscriber接口的onSubscribe方法就会被执行,这是一种典型的回调处理机制。

而从方法命名上看,Subscriber接口的所有方法都是以on为前缀,代表了对数据流处理过程都是采用了回调处理机制。除了onSubscribe方法之外,onNext回调方法会根据Subscription参数中所包含的请求数量逐一发出数据。数据的发送过程会有两种情况,一种情况是所有数据都成功发送,这时候onComplete回调方法就会被触发;或者是数据发送过程出现了错误,那么就会执行onError回调方法中的处理流程。

接下来,让我们来到前面已经介绍到的Subscription接口。Subscription接口的作用就是一种令牌,相当于在Publisher和Subscriber之间建立了一种请求响应的桥梁,它的定义如下所示。

public interface Subscription {

    public void request(long n);

    public void cancel();

}

显然,Subscription接口的request方法用于向Subscriber请求n个数据,而cancel方法则可以用来对请求过程执行取消操作。

现在,我们已经掌握了响应式流规范中生产者和消费者之间的数据处理流程,让我们回到背压机制,看看在这种处理流程下如何实现背压。在整个数据流处理过程中,处于数据流下游的Subscriber通过Subscription接口的request方法向Publisher请求数据,这就是一种向上反馈的机制,也是实现背压的关键所在。Publisher、Subscriber和Subscription三者之间的交互关系如图所示。


至于响应式流规范中的最后一个接口Processor,它同时具备Publisher和Subscriber接口的所有能力,并提供了对数据流中数据进行转换和处理的能力,定义如下。

public interface Processor<T,R> extends Subscriber<T>,

Publisher<R> {

}

可以看到,Processor可以将来自Subscriber接口的数据类型从T转换为R并返回给Subscriber,这种转换关系如下图所示。


以上四个接口构成了响应式流规范的主体。虽然接口的定义并不复杂,但围绕数据流所展开的交互过程值得我们做进一步的总结。

上图中所示的交互方式共包含7个步骤。

  1. 当Publisher需要执行数据发布操作时,首先需要明确所发布数据的数量,这时候就应该创建一个Subscription接口的实例。
  2. 然后Publisher通过Subscriber的onSubscribe回调方法发送数据,这个过程中需要用到前面所创建的Subscription。
  3. 我们知道Subscription中包含了一个request方法,执行该方法将发起真正的数据请求。
  4. 一旦成功发起请求,Subscriber中的onNext回调方法就会执行,该方法会对发送的数据进行业务处理。
  5. 每当处理完一个数据之后,Subscription的request方法将再次被执行,直到所有数据都发送完毕。
  6. 同样,Publisher继续发送数据,而Subscriber中的onNext回调方法继续处理数据。
  7. 取决于数据发送过程是否顺利结束,系统会触发Subscriber的onComplete或onError回调方法,代表数据流是正常结束还是异常结束。

响应式流规范实现框架

响应式流只是一种规范,而不是一种实现框架或工具。围绕响应式流规范,业界也诞生了一批响应式编程框架,包括Spring 5中所引入的Project Reactor,以及RxJava、Akka和Vert.x等经典框架。

规范的作用就是为所有技术框架提供了一种能够相互协作的兼容模式。基于响应式流规范,开发人员可以在同一个应用程序中综合使用一组响应式编程框架。以Spring 5为例,默认使用了Project Reactor框架,但我们也可以引入RxJava来开发业务代码。下图展示了基于响应式流规范的几种代表性具体技术框架以及它们之间的一种可能交互过程。


对于开发人员而言,掌握如何使用这些响应式编程框架是日常开发过程中的一个方面。更重要的是,我们需要理解和掌握这些框架背后的设计思想和理念,而响应式流规范正是响应式编程思想和理念的精髓所在。

总结

在今天的内容中,我们对响应式流规范的方方面面进行了阐述。我们首先需要明确,数据流处理过程中的流量控制和背压机制是促使响应式流规范诞生的原因,响应式流规范的目的就是为了更好的处理消费者和生产者之间的数据交互过程。同时,我们也注意到想要实现背压机制,需要数据消费者具备一种向上反馈的能力,响应式流规范通过提供一组接口定义了这种能力,包括Publisher、Subscriber、Subscription和Processor。这些接口中的方法并不复杂,但却完整的描绘了整个数据流的高效处理过程。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/23906.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于spring boot的超市管理系统【附:资料➕文档】

前言&#xff1a;我是源码分享交流Coding&#xff0c;专注JavaVue领域&#xff0c;专业提供程序设计开发、源码分享、 技术指导讲解、各类项目免费分享&#xff0c;定制和毕业设计服务&#xff01; 免费获取方式--->>文章末尾处&#xff01; 项目介绍&#xff1a; 网址 …

JWT及单点登录实现

JWT发展简史 JWT Token JSON Web Token (JWT&#xff0c;RFC 7519 (opens new window))&#xff0c;是为了在网络应用环境间传递声明而执行的一种基于 JSON 的开放标准&#xff08;(RFC 7519)。 ID Token OIDC (OpenID Connect) 协议 (opens new window)对 OAuth 2.0 协议 …

DEA统计代码行数插件Statistic

1.安装Statistic插件 直接在idea里面搜索Statistic即可 2.重启idea 3.查看代码行数 它可以统计各类文件的行数总和

Mysql基础进阶速成版

一&#xff1a;sql语句&#xff1a; 1.创建一张表&#xff1a;写成公式&#xff1a;创建函数(create table)表名(配置字段)。配置字段公式:字段名称字段类型&#xff0c;常用的类型有&#xff1a;整数类型int(8),int(16),int(32).....&#xff0c;小数类型float(8),float(16).…

【Linux】进程(7):地址空间

大家好&#xff0c;我是苏貝&#xff0c;本篇博客带大家了解Linux进程&#xff08;7&#xff09;&#xff1a;地址空间&#xff0c;如果你觉得我写的还不错的话&#xff0c;可以给我一个赞&#x1f44d;吗&#xff0c;感谢❤️ 目录 &#xff08;A&#xff09; 直接看代码&…

go语言接口之sort.Interface接口

排序操作和字符串格式化一样是很多程序经常使用的操作。尽管一个最短的快排程序只要15 行就可以搞定&#xff0c;但是一个健壮的实现需要更多的代码&#xff0c;并且我们不希望每次我们需要的时候 都重写或者拷贝这些代码。 幸运的是&#xff0c;sort包内置的提供了根据一些排序…

PostgreSQL和MySQL架构模型的区别

PostgreSQL - 多进程架构 PostgreSQL采用了一种多进程架构。在这种模型下&#xff0c;每个数据库连接被分配给一个新的服务器进程&#xff08;或者说是数据库进程&#xff09;。这种方式为每个连接提供了独立的内存空间和执行环境。这样做的优点是提高了系统的稳定性和隔离性&…

法国人工智能初创公司 Mistral 正在推出新的人工智能模型定制选项服务和 SDK

Mistral AI是一家成立于2023年的法国人工智能初创公司&#xff0c;由Artur Mensch、Timothe Lacroix和Guillaume Lample三位前Meta和Google DeepMind的研究人员创立。该公司专注于生成式AI技术&#xff0c;特别是用于构建在线聊天机器人、搜索引擎等应用。 Mistral AI在成立之…

python之List记录

1. 列表&#xff08;List&#xff09;基础 Python中的列表是一种可变、有序的元素集合&#xff0c;元素之间通过逗号分隔并包含在一对方括号内。列表的元素可以是不同类型的数据。 创建列表 # 创建一个包含不同类型元素的列表 my_list [1, 2.5, hello, True, [1, 2, 3]]访问…

[数据集][图像分类]城市异常情况路边倒树火灾水灾交通事故分类数据集15223张8类别

数据集类型&#xff1a;图像分类用&#xff0c;不可用于目标检测无标注文件 数据集格式&#xff1a;仅仅包含jpg图片&#xff0c;每个类别文件夹下面存放着对应图片 图片数量(jpg文件个数)&#xff1a;15223 分类类别数&#xff1a;8 类别名称:[“badroad”,“fallentree”,“f…

Android-Q升级-Camera记录

目录 代码环境 建立Android Q使用的camera仓 Camera底层适配 camx 原生接口变化 其他编译问题 chi-cdk 数据类型不匹配 case未加break的报错 libalRnBRT_GL_GBWRAPPER链接问题 vidhance编译错误 libarcsat链接问题 vendor/qcom/proprietary prebuilt_HY11 调试cam…

floor函数

添加链接描述\ #include<bits/stdc.h>using namespace std;const int N 10;int main() {int n;cin>>n;for(int i1;i<50000;i){if(floor(i*1.08)n){cout<<i;return 0;}}cout<<":(";return 0; }floor函数是向下取整 ceil是向上取整 round…

CarSim车辆运动轨迹绘制

CarSim车辆运动轨迹绘制 CarSim中与车辆位置有关的信息分别为Xo和Yo 输出到Simulink中 导入到工作空间中保存&#xff0c;low_carsim_path.mat &#xff0c;绘制结果曲线&#xff0c;low_carsim_path_comp.m data csvread(low_two_path.csv,1,0); low_two_path_x data(:,1)…

【CentOS】手动编译安装make、cmake、gcc、git

摘要 Centos7升级make和gcc版本到最新——CSDN make make 各个版本下载地址 http://ftp.gnu.org/pub/gnu/make 以4.4为例安装&#xff1a; # 下载 wget https://ftp.gnu.org/pub/gnu/make/make-4.4.tar.gz # 解压配置 tar zxf make-4.4.tar.gz cd make-4.4 ./configure --p…

分享我的新版FMEA培训心得

近日&#xff0c;我有幸参加了深圳天行健企业管理咨询公司举办的新版FMEA培训&#xff0c;这次学习不仅让我对FMEA有了更深入的理解&#xff0c;更使我在实际工作中找到了提升产品质量的新路径。 新版FMEA相较于传统版本&#xff0c;更加注重风险识别与预防&#xff0c;强调在…

Java 开发面试题精选:分布式锁相关一篇全搞定

面试路上&#xff0c;分布式锁始终是绕不开的坎&#xff1f;别怕&#xff0c;这篇精心准备的文章正是您的通关秘籍&#xff01;这篇文章聚焦面试官最青睐的提问点&#xff1a;从分布式锁基础概念到其实现机理&#xff0c;再到它在多场景下的应用智慧&#xff1b;深入剖析性能优…

C# FFmpeg 音视频开发总结

&#x1f3c6;作者&#xff1a;科技、互联网行业优质创作者 &#x1f3c6;专注领域&#xff1a;.Net技术、软件架构、人工智能、数字化转型、DeveloperSharp、微服务、工业互联网、智能制造 &#x1f3c6;欢迎关注我&#xff08;Net数字智慧化基地&#xff09;&#xff0c;里面…

golang通道(chan)选择(select)与关闭(close)使用示例

1.通道选择 创建两个双向通道 c1 : make(chan string) //双向通道 c2 : make(chan string) //双向通道 向通道写入数据 //协程1向通道1写数据go func() { c1 <- "hello world from c1" }()//协程2向通道2写数据go func() { c2 <- "hello world from …

【递归、搜索与回溯】递归算法

一、经验总结 递归 VS 迭代&#xff08;循环&#xff09; 递归和迭代都解决的是重复的子问题&#xff0c;因此两者是可以相互转化的。利用栈结构可以将递归算法转化为迭代算法。 递归和迭代各有其优缺点&#xff0c;选择时需根据具体场景和需求来决定。 递归的优点包括&#…

苹果眼镜(Vision Pro)专业咨询服务模式优化方案

一、精准定位&#xff1a; 专注于为Apple Vision Pro应用开发者提供一站式、全方位的专业咨询服务&#xff0c;致力于成为开发者在空间计算时代中不可或缺的合作伙伴&#xff0c;共同打造“下一个大事件”。 二、核心业务优化&#xff1a; visionOS策略咨询&#xff1a; 深入…