响应式编程初探-自定义实现Reactive Streams规范

最近在学响应式编程,这里先记录下,响应式编程的一些基础内容

1.名词解释

Reactive Streams、Reactor、WebFlux以及响应式编程之间存在密切的关系,它们共同构成了在Java生态系统中处理异步和响应式编程的一系列工具和框架。

  1. Reactive Streams:

    • Reactive Streams 是一个规范,定义了一组接口和协议,用于处理异步数据流的背压。它包括发布者(Publisher)、订阅者(Subscriber)、订阅(Subscription)和处理器(Processor)等接口。
    • Reactive Streams 规范的目标是提供一种标准的方式来处理异步数据流,解决背压问题。Java标准库从Java 9开始提供了 java.util.concurrent.Flow 类,定义了Reactive Streams规范。
  2. Reactor:

    • Reactor 是一个基于Reactive Streams规范的响应式编程框架。它提供了一组用于构建异步、事件驱动、响应式应用程序的工具和库。Reactor 的核心是 Flux(表示一个包含零到多个元素的异步序列)和 Mono(表示一个包含零或一个元素的异步序列)。
    • Reactor 通过提供响应式的操作符,如mapfilterflatMap等,使得开发者能够方便地进行数据流的转换和处理。
  3. WebFlux:

    • WebFlux 是Spring Framework 5引入的响应式编程支持。它构建在 Reactor 之上,提供了一套用于构建异步、非阻塞、响应式的Web应用程序的API。WebFlux支持使用Reactive Streams处理HTTP请求和响应。
    • Spring WebFlux 可以用于构建反应式的RESTful服务,支持使用注解的方式定义路由和处理器函数。
  4. 响应式编程:

    • 响应式编程是一种编程范式,强调数据流和变化的传播。在这个范式中,数据源产生数据并通知观察者,观察者相应地处理这些数据。这种方式更容易处理异步操作和事件。
    • 在Java中,响应式编程通常涉及到使用类似于Reactor或RxJava的库,这些库提供了响应式的操作符和工具。

综上所述,Reactive Streams 提供了规范,Reactor 是一个实现了该规范的响应式编程框架,而WebFlux是Spring对于响应式编程的支持。它们共同致力于构建异步、非阻塞、响应式的应用程序。响应式编程则是一种更广义的编程范式,与Reactive Streams和Reactor等具体实现密切相关。

2.Reactive Streams 规范

2.1.Reactive Streams规范定义

java.util.concurrent.Flow 类中,定义了Reactive Streams规范
在这里插入图片描述

  • Publisher(发布者):负责生成数据流,并向订阅者发送数据。
  • Subscriber(订阅者):表示数据流的消费者,它订阅一个或多个发布者,并接收数据。
  • Subscription(订阅):表示订阅关系的接口,用于控制数据流的请求和取消。
  • Processor(处理器):充当发布者和订阅者的中间组件,可以对数据进行转换和处理。

2.2.API方法

1. Publisher(发布者):
interface Publisher<T> {void subscribe(Subscriber<? super T> subscriber);
}
  • subscribe(Subscriber<? super T> subscriber) 用于订阅数据流。当订阅者调用这个方法时,发布者将建立与订阅者的订阅关系,并开始推送数据。
2. Subscriber(订阅者):
interface Subscriber<T> {void onSubscribe(Subscription subscription);void onNext(T item);void onError(Throwable throwable);void onComplete();
}
  • onSubscribe(Subscription subscription) 在订阅关系建立时调用。通过这个方法,订阅者可以持有 Subscription 对象,以便后续请求数据和取消订阅。

  • onNext(T item) 在接收到新元素时调用。订阅者通过这个方法处理收到的数据。

  • onError(Throwable throwable) 在数据流中出现错误时调用。订阅者通过这个方法处理错误情况。

  • onComplete() 在数据流完成时调用。通知订阅者数据流结束,不再有新的元素。

3. Subscription(订阅):
interface Subscription {void request(long n);void cancel();
}
  • request(long n) 用于请求订阅者处理指定数量的元素。订阅者通过这个方法告知发布者它可以处理多少个元素。

  • cancel() 用于取消订阅关系。当订阅者不再需要接收数据时,调用此方法取消订阅。

4. Processor(处理器):
interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

Processor 接口是 SubscriberPublisher 的组合,表示一个中间处理组件,可以同时充当订阅者和发布者的角色。

  • Subscriber 部分的方法:onSubscribe(Subscription subscription), onNext(T item), onError(Throwable throwable), onComplete()

  • Publisher 部分的方法:subscribe(Subscriber<? super R> subscriber)。表示 Processor 可以被其他订阅者订阅。

5.泛型T

泛型T即为数据流

这些方法共同构成 Reactive Streams 协议,定义了发布者和订阅者之间的协作方式,以及订阅者如何处理数据流。在实际的使用中,这些方法的实现通常需要考虑异步处理、背压机制等方面,以确保响应式编程的目标得以实现。

2.3.工作流程

在 Reactive Streams 中,PublisherSubscriberSubscriptionProcessor 之间的协作流程如下:

有时间再补流程图
在这里插入图片描述

  1. Publisher(发布者):

    • Publisher 是异步产生数据流的组件,它通过 subscribe 方法允许订阅者订阅。subscribe 方法会接收一个 Subscriber 对象作为参数。
    • Publisher 有新数据准备好时,通过调用订阅者的 onNext 方法将数据推送给订阅者。
    interface Publisher<T> {void subscribe(Subscriber<? super T> subscriber);
    }
    
  2. Subscriber(订阅者):

    • Subscriber 是数据流的消费者,通过实现 Subscriber 接口来接收来自发布者的数据。订阅者通过调用 subscription.request(n) 请求一定数量的数据,处理数据时通过 onNext 方法接收元素。
    • 当订阅者无法处理更多的元素时,可以调用 subscription.cancel() 来取消订阅。
    interface Subscriber<T> {void onSubscribe(Subscription subscription);void onNext(T item);void onError(Throwable throwable);void onComplete();
    }
    
  3. Subscription(订阅):

    • Subscription 表示订阅关系,它在 onSubscribe 方法中被传递给订阅者。通过 Subscription,订阅者可以请求数据和取消订阅。
    • 订阅者通过 request(long n) 方法请求处理 n 个元素,通过 cancel() 方法取消订阅。
    interface Subscription {void request(long n);void cancel();
    }
    
  4. Processor(处理器):

    • Processor 是一个同时实现了 PublisherSubscriber 接口的中间组件,可以作为数据流的处理器,对数据进行转换和处理。
    • Processor 既能接收数据,也能发布数据。它将 onNextonErroronComplete 方法委托给下游的订阅者,并将数据推送给上游的发布者。
    interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
    }
    

这些接口一起构成了 Reactive Streams 的基本协议。发布者产生数据,订阅者订阅数据流并通过 onNext 方法接收元素,订阅者通过 request 方法请求处理一定数量的元素,同时可以通过 cancel 方法取消订阅。Processor 则可以用于在订阅者和发布者之间进行数据转换和处理。在 Reactive Streams 的实现中,这些接口的方法调用是异步进行的,以支持非阻塞的数据流处理。

3.自定义实现Reactive Streams规范

自己实现了一个,参考了SubmissionPublisher

  • 同步实现的
  • 功能不完善
  • 有bug
class MyPublisher implements Flow.Publisher<String>{MySubscription<String> subscription;public int request ;public void publish(String item){subscription.items.add(item);while (true) {if (request > 0) {for (int i = 0; i < request; i++) {if (!subscription.items.isEmpty()) {try {Object o = subscription.items.get(subscription.items.size() - 1);subscription.subscriber.onNext(o.toString());subscription.items.remove(o);}catch (Exception e){subscription.subscriber.onError(e);return;}}}}if (subscription.items.isEmpty()) {break;}}}@Overridepublic void subscribe(Flow.Subscriber<? super String> subscriber) {System.out.println("第一步:绑定订阅者" );MySubscription<String> subscription = new MySubscription<>(subscriber,this);this.subscription = subscription;subscriber.onSubscribe(subscription);}}class MySubscriber implements Flow.Subscriber<String>{private Flow.Subscription subscription;@Overridepublic void onSubscribe(Flow.Subscription subscription) {System.out.println("第二步:接收Subscription" );this.subscription = subscription;// 请求订阅者处理的元素数量subscription.request(1);}@Overridepublic void onNext(String item) {System.out.println("第四步:推送数据" );System.out.println("MySubscriber 消费了item = " + item);subscription.request(1);}@Overridepublic void onError(Throwable throwable) {System.out.println("出异常了 = " + throwable);}@Overridepublic void onComplete() {}}class MySubscription<T> implements Flow.Subscription{final Flow.Subscriber<? super T> subscriber;final MyPublisher publisher;List items = new ArrayList();public MySubscription(Flow.Subscriber<? super T> subscriber, MyPublisher publisher) {this.subscriber = subscriber;this.publisher = publisher;}@Overridepublic void request(long n) {this.publisher.request++;System.out.println("第三步:拉取请求" );}@Overridepublic void cancel() {}
}
public class FlowDemo {public static void main(String[] args) {MyPublisher myPublisher = new MyPublisher();MySubscriber mySubscriber = new MySubscriber();myPublisher.subscribe(mySubscriber);myPublisher.publish("111");myPublisher.publish("222");myPublisher.publish(null);}
}

4.Jdk实现Reactive Streams使用示例

class SimplePublisher implements Flow.Publisher<Integer> {private final SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();public void publishItems() {for (int i = 1; i <= 5; i++) {publisher.submit(i);}// 发布者完成发布publisher.close();}@Overridepublic void subscribe(Flow.Subscriber<? super Integer> subscriber) {publisher.subscribe(subscriber);}
}class SimpleSubscriber implements Flow.Subscriber<Integer> {private Flow.Subscription subscription;@Overridepublic void onSubscribe(Flow.Subscription subscription) {this.subscription = subscription;// 请求订阅者处理的元素数量subscription.request(1);}@Overridepublic void onNext(Integer item) {System.out.println("Received item: " + item);// 处理完一个元素后请求下一个subscription.request(1);}@Overridepublic void onError(Throwable throwable) {System.err.println("Error occurred: " + throwable.getMessage());}@Overridepublic void onComplete() {System.out.println("Processing completed.");}
}public class ReactiveStreamsExample {public static void main(String[] args) throws InterruptedException {// 创建发布者和订阅者SimplePublisher simplePublisher = new SimplePublisher();SimpleSubscriber simpleSubscriber = new SimpleSubscriber();// 订阅者订阅发布者simplePublisher.subscribe(simpleSubscriber);// 发布者发布数据simplePublisher.publishItems();// 睡一觉,确保数据处理完成Thread.sleep(3000);}
}

学习打卡:Java学习笔记-day05-响应式编程初探-自定义实现Reactive Streams规范

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/624992.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

参与直播领取龙年大礼盒!23年Coremail社区年终福利大放送

2023年终福利大放送 Coremail 管理员社区是由 Coremail 邮件安全团队、服务团队及多条产品线共同维护&#xff0c;集 7*24h 在线自助查询、技术问答交流、大咖互动分享、资料下载等功能于一体&#xff0c;专属于 Coremail 邮件管理员、安全员成长互动的知识库社区。 转眼间&am…

数据库|数据库范式(待完成)

文章目录 数据库的范式数据库的基本操作什么是数据库的范式产生的背景&#xff08;没有规范化的坏处/带来的问题&#xff09;规范化表格设计的要求五大范式的作用——树立标准打个比方——桥的承载能力1NF&#xff08;1范式&#xff09;如何转换成合适的一范式 2NF&#xff08;…

迈向高效LLM微调:低秩适应(LoRA)技术的原理与实践

在快速发展的人工智能领域中&#xff0c;以高效和有效的方式使用大型语言模型&#xff08;LLM&#xff09;变得越来越重要。在本文中&#xff0c;您将学习如何以计算高效的方式使用低秩适应&#xff08;LoRA&#xff09;对LLM进行调整&#xff01; 为什么需要微调&#xff1f;…

吼!原来教师这样发布学生期末成绩,轻松没烦恼

​随着科技的进步和教育的不断创新&#xff0c;教师发布学生期末成绩的方式也在逐渐发生变化。传统的方式&#xff0c;如纸质成绩单和口头通知&#xff0c;已经不能满足现代教育的需求。那么&#xff0c;教师应该如何更有效地发布学生期末成绩呢&#xff1f; 一、电子成绩单 电…

2024年【北京市安全员-C3证】复审考试及北京市安全员-C3证证考试

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 北京市安全员-C3证复审考试考前必练&#xff01;安全生产模拟考试一点通每个月更新北京市安全员-C3证证考试题目及答案&#xff01;多做几遍&#xff0c;其实通过北京市安全员-C3证模拟考试题很简单。 1、【多选题】《…

使用scipy处理图片——任意比例缩放

大纲 缩小放大代码地址 在《使用numpy处理图片——缩放图片》一文中&#xff0c;我们每2个取1个像素来达到图像缩小的效果。这就要求缩小的比例只能是整数倍&#xff0c;而不能支持缩小到0.3倍或者放大到1.5倍这样的效果。 为了支持任意倍数的缩放功能&#xff0c;我们需要使用…

【方法】Excel表格如何“限制编辑区域”?

在制作Excel表格的时候&#xff0c;你是否遇到这些情况&#xff1f;有时候需要限定部分区域让他人协助填写&#xff0c;有时候会有很多数据或公式&#xff0c;要防止误改&#xff0c;否则会引起错误。要保护好这些区域&#xff0c;我们可以给Excel表格设置“限制编辑区域”。 …

微信小程序------WXML模板语法之条件渲染和列表渲染

目录 前言 一、条件渲染 1.wx:if 2. 结合 使用 wx:if 3. hidden 4. wx:if 与 hidden 的对比 二、列表渲染 1. wx:for 2. 手动指定索引和当前项的变量名* 3. wx:key 的使用 前言 上一期我们讲解wxml模版语法中的数据绑定和事件绑定&#xff08;上一期链接&#xff1a;…

PDF修改技巧之:如何简单方便的编辑PDF文件?

在当今精通技术的世界中&#xff0c;PDF 的使用已变得普遍&#xff0c;尤其是在商业和教育方面。如果您在审阅 PDF 文件时遇到语法或其他错误怎么办&#xff1f; 尽管 PDF 文件不像 Word 或在线文档那样容易编辑&#xff0c;但借助高级工具&#xff0c;您一定可以进行编辑。 …

MySQL的安装

一&#xff1a;MySQL的安装 步骤一&#xff1a; 下载mysql&#xff0c;地址&#xff1a;MySQL :: Download MySQL Installer 在MySQL的官网对其进行下载&#xff1a; 也可以下滑&#xff0c;在下面点击此社区服务器安装进行下载&#xff1a; 步骤二&#xff1a; 进入到下载…

Redis之bigkey

目录 1、什么是bigkey&#xff1f; 2、bigkey大的小 3、bigkey有哪些危害&#xff1f; 4、bigkey如何产生&#xff1f; 5、bigkey如何发现&#xff1f; 6、bigkey如何删除&#xff1f; 7、BigKey调优&#xff0c;惰性释放lazyfree 8、生产上限制keys * /flushdb/flushal…

使用WAF防御网络上的隐蔽威胁之CSRF攻击

在网络安全领域&#xff0c;除了常见的XSS&#xff08;跨站脚本&#xff09;攻击外&#xff0c;CSRF&#xff08;跨站请求伪造&#xff09;攻击也是一种常见且危险的威胁。这种攻击利用用户已经验证的身份在没有用户知情的情况下&#xff0c;执行非授权的操作。了解CSRF攻击的机…

2.3数据链路层01

2.3数据链路层 2.3.1数据链路层概述 1、数据链路层在网络体系结构中所处的地位 如下图所示&#xff1a;主机H1给主机H2发送数据&#xff0c;中间要经过三个路由器、电话网、局域网、广域网等多种网络。 从五层协议原理体系结构的角度来看&#xff0c;主机应该具有体系结构中…

数据结构初阶之插入排序与希尔排序详解

个人主页&#xff1a;点我进入主页 专栏分类&#xff1a;C语言初阶 C语言程序设计————KTV C语言小游戏 C语言进阶 C语言刷题 数据结构初阶 Linux 欢迎大家点赞&#xff0c;评论&#xff0c;收藏。 一起努力,共赴大厂。 目录 一.前言 二.插入排序 …

深入浅出Pytorch宝典1.0

文章目录 前言1. 张量操作2. 自动微分3. 数据加载和处理4. 模型构建和训练5. 预训练模型和迁移学习6. 调试和性能7. 高级特性总结 torch中主要的数据对象主要特点和功能张量的创建 数据处理和转换1.torch.tensor() 创建一个新的张量&#xff08;Tensor&#xff09;2.torch.zero…

YOLOv8训练自己的数据集

文章目录 1. 创建数据集文件结构数据集标注脚本分割数据集转换数据格式 2. 配置文件2.1 数据集配置2.2 选择需要的模型 3. 模型训练4. 测试 1. 创建数据集 环境&#xff1a; Ultralytics YOLOv8.0.230 &#x1f680; Python-3.8.18 torch-2.3.0.dev20231226cu118 CUDA:0 (NVIDI…

【DDR】基于Verilog的DDR控制器的简单实现(三)——读操作

上一节 【DDR】基于Verilog的DDR控制器的简单实现&#xff08;二&#xff09;——写操作 本文继续以美光(Micron&#xff09;公司生产的DDR3芯片MT41J512M8RH-093&#xff08;芯片手册&#xff09;为例&#xff0c;说明DDR芯片的读操作过程。下图为读操作指令格式&#xff08;…

市场复盘总结 20240115

仅用于记录当天的市场情况&#xff0c;用于统计交易策略的适用情况&#xff0c;以便程序回测 短线核心&#xff1a;不参与任何级别的调整&#xff0c;采用龙空龙模式 昨日主题投资 连板进级率 0% 失效 二进三&#xff1a; 进级率 中位数50% 最常用的二种方法&#xff1a; 方…

记录centos7.9 离线安装fastllm 编译遇到的问题

centos7.9 安装fastllm 编译步骤 Step1安装cmake: 参考: https://bitsanddragons.wordpress.com/2022/09/19/error-cmake-3-1-or-higher-is-required-you-are-running-version-on-centos-7-x/ ​ 问题1&#xff1a;/lib64/libstdc.so.6: version GLIBCXX_3.4.20‘ not found …

解决Qt的release构建下无法进入断点调试的问题

在工作的时候遇到了第三方库只提供release版本的库的情况&#xff0c;我需要在这基础上封装一层自家库&#xff0c;在调试的时候遇到如下问题&#xff0c;但是在Qt环境下&#xff0c;release的库只能在进行release构建和调试。 卡在了一直进不了断点的情况。提示内容如下&#…