Java学习笔记-day05-响应式编程初探-自定义实现Reactive Streams规范

最近在学响应式编程,这里先记录下,响应式编程的一些基础内容

1.名词解释

Reactive Streams、Reactor、WebFlux以及响应式编程之间存在密切的关系,它们共同构成了在Java生态系统中处理异步和响应式编程的一系列工具和框架。

  1. Reactive Streams:

    • Reactive Streams 是一个规范,定义了一组接口和协议,用于处理异步数据流的背压。它包括发布者(Publisher)、订阅者(Subscriber)、订阅(Subscription)和处理器(Processor)等接口。
    • Reactive Streams 规范的目标是提供一种标准的方式来处理异步数据流,解决背压问题。Java标准库从Java 9开始提供了 java.util.concurrent.Flow 类,实现了Reactive Streams规范。
  2. Reactor:

    • Reactor 是一个基于Reactive Streams规范的响应式编程框架。它提供了一组用于构建异步、事件驱动、响应式应用程序的工具和库。Reactor 的核心是 Flux(表示一个包含零到多个元素的异步序列)和 Mono(表示一个包含零或一个元素的异步序列)。
    • Reactor 通过提供响应式的操作符,如mapfilterflatMap等,使得开发者能够方便地进行数据流的转换和处理。
  3. WebFlux:

    • WebFlux 是Spring Framework 5引入的响应式编程支持。它构建在 Reactor 之上,提供了一套用于构建异步、非阻塞、响应式的Web应用程序的API。WebFlux支持使用Reactive Streams处理HTTP请求和响应。
    • Spring WebFlux 可以用于构建反应式的RESTful服务,支持使用注解的方式定义路由和处理器函数。
  4. 响应式编程:

    • 响应式编程是一种编程范式,强调数据流和变化的传播。在这个范式中,数据源产生数据并通知观察者,观察者相应地处理这些数据。这种方式更容易处理异步操作和事件。
    • 在Java中,响应式编程通常涉及到使用类似于Reactor或RxJava的库,这些库提供了响应式的操作符和工具。

综上所述,Reactive Streams 提供了规范,Reactor 是一个实现了该规范的响应式编程框架,而WebFlux是Spring对于响应式编程的支持。它们共同致力于构建异步、非阻塞、响应式的应用程序。响应式编程则是一种更广义的编程范式,与Reactive Streams和Reactor等具体实现密切相关。

2.Reactive Streams 规范

2.1.Reactive Streams规范定义

java.util.concurrent.Flow 类中,定义了Reactive Streams规范
在这里插入图片描述

  • Publisher(发布者):负责生成数据流,并向订阅者发送数据。
  • Subscriber(订阅者):表示数据流的消费者,它订阅一个或多个发布者,并接收数据。
  • Subscription(订阅):表示订阅关系的接口,用于控制数据流的请求和取消。
  • Processor(处理器):充当发布者和订阅者的中间组件,可以对数据进行转换和处理。

2.2.API方法

1. Publisher(发布者):
interface Publisher<T> {void subscribe(Subscriber<? super T> subscriber);
}
  • subscribe(Subscriber<? super T> subscriber) 用于订阅数据流。当订阅者调用这个方法时,发布者将建立与订阅者的订阅关系,并开始推送数据。
2. Subscriber(订阅者):
interface Subscriber<T> {void onSubscribe(Subscription subscription);void onNext(T item);void onError(Throwable throwable);void onComplete();
}
  • onSubscribe(Subscription subscription) 在订阅关系建立时调用。通过这个方法,订阅者可以持有 Subscription 对象,以便后续请求数据和取消订阅。

  • onNext(T item) 在接收到新元素时调用。订阅者通过这个方法处理收到的数据。

  • onError(Throwable throwable) 在数据流中出现错误时调用。订阅者通过这个方法处理错误情况。

  • onComplete() 在数据流完成时调用。通知订阅者数据流结束,不再有新的元素。

3. Subscription(订阅):
interface Subscription {void request(long n);void cancel();
}
  • request(long n) 用于请求订阅者处理指定数量的元素。订阅者通过这个方法告知发布者它可以处理多少个元素。

  • cancel() 用于取消订阅关系。当订阅者不再需要接收数据时,调用此方法取消订阅。

4. Processor(处理器):
interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

Processor 接口是 SubscriberPublisher 的组合,表示一个中间处理组件,可以同时充当订阅者和发布者的角色。

  • Subscriber 部分的方法:onSubscribe(Subscription subscription), onNext(T item), onError(Throwable throwable), onComplete()

  • Publisher 部分的方法:subscribe(Subscriber<? super R> subscriber)。表示 Processor 可以被其他订阅者订阅。

5.泛型T

泛型T即为数据流

这些方法共同构成 Reactive Streams 协议,定义了发布者和订阅者之间的协作方式,以及订阅者如何处理数据流。在实际的使用中,这些方法的实现通常需要考虑异步处理、背压机制等方面,以确保响应式编程的目标得以实现。

2.3.工作流程

在 Reactive Streams 中,PublisherSubscriberSubscriptionProcessor 之间的协作流程如下:

有时间再补流程图
在这里插入图片描述

  1. Publisher(发布者):

    • Publisher 是异步产生数据流的组件,它通过 subscribe 方法允许订阅者订阅。subscribe 方法会接收一个 Subscriber 对象作为参数。
    • Publisher 有新数据准备好时,通过调用订阅者的 onNext 方法将数据推送给订阅者。
    interface Publisher<T> {void subscribe(Subscriber<? super T> subscriber);
    }
    
  2. Subscriber(订阅者):

    • Subscriber 是数据流的消费者,通过实现 Subscriber 接口来接收来自发布者的数据。订阅者通过调用 subscription.request(n) 请求一定数量的数据,处理数据时通过 onNext 方法接收元素。
    • 当订阅者无法处理更多的元素时,可以调用 subscription.cancel() 来取消订阅。
    interface Subscriber<T> {void onSubscribe(Subscription subscription);void onNext(T item);void onError(Throwable throwable);void onComplete();
    }
    
  3. Subscription(订阅):

    • Subscription 表示订阅关系,它在 onSubscribe 方法中被传递给订阅者。通过 Subscription,订阅者可以请求数据和取消订阅。
    • 订阅者通过 request(long n) 方法请求处理 n 个元素,通过 cancel() 方法取消订阅。
    interface Subscription {void request(long n);void cancel();
    }
    
  4. Processor(处理器):

    • Processor 是一个同时实现了 PublisherSubscriber 接口的中间组件,可以作为数据流的处理器,对数据进行转换和处理。
    • Processor 既能接收数据,也能发布数据。它将 onNextonErroronComplete 方法委托给下游的订阅者,并将数据推送给上游的发布者。
    interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
    }
    

这些接口一起构成了 Reactive Streams 的基本协议。发布者产生数据,订阅者订阅数据流并通过 onNext 方法接收元素,订阅者通过 request 方法请求处理一定数量的元素,同时可以通过 cancel 方法取消订阅。Processor 则可以用于在订阅者和发布者之间进行数据转换和处理。在 Reactive Streams 的实现中,这些接口的方法调用是异步进行的,以支持非阻塞的数据流处理。

3.自定义实现Reactive Streams规范

自己实现了一个,参考了SubmissionPublisher

  • 同步实现的
  • 功能不完善
  • 有bug
class MyPublisher implements Flow.Publisher<String>{MySubscription<String> subscription;public int request ;public void publish(String item){subscription.items.add(item);while (true) {if (request > 0) {for (int i = 0; i < request; i++) {if (!subscription.items.isEmpty()) {try {Object o = subscription.items.get(subscription.items.size() - 1);subscription.subscriber.onNext(o.toString());subscription.items.remove(o);}catch (Exception e){subscription.subscriber.onError(e);return;}}}}if (subscription.items.isEmpty()) {break;}}}@Overridepublic void subscribe(Flow.Subscriber<? super String> subscriber) {System.out.println("第一步:绑定订阅者" );MySubscription<String> subscription = new MySubscription<>(subscriber,this);this.subscription = subscription;subscriber.onSubscribe(subscription);}}class MySubscriber implements Flow.Subscriber<String>{private Flow.Subscription subscription;@Overridepublic void onSubscribe(Flow.Subscription subscription) {System.out.println("第二步:接收Subscription" );this.subscription = subscription;// 请求订阅者处理的元素数量subscription.request(1);}@Overridepublic void onNext(String item) {System.out.println("第四步:推送数据" );System.out.println("MySubscriber 消费了item = " + item);subscription.request(1);}@Overridepublic void onError(Throwable throwable) {System.out.println("出异常了 = " + throwable);}@Overridepublic void onComplete() {}}class MySubscription<T> implements Flow.Subscription{final Flow.Subscriber<? super T> subscriber;final MyPublisher publisher;List items = new ArrayList();public MySubscription(Flow.Subscriber<? super T> subscriber, MyPublisher publisher) {this.subscriber = subscriber;this.publisher = publisher;}@Overridepublic void request(long n) {this.publisher.request++;System.out.println("第三步:拉取请求" );}@Overridepublic void cancel() {}
}
public class FlowDemo {public static void main(String[] args) {MyPublisher myPublisher = new MyPublisher();MySubscriber mySubscriber = new MySubscriber();myPublisher.subscribe(mySubscriber);myPublisher.publish("111");myPublisher.publish("222");myPublisher.publish(null);}
}

4.Jdk实现Reactive Streams使用示例

class SimplePublisher implements Flow.Publisher<Integer> {private final SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();public void publishItems() {for (int i = 1; i <= 5; i++) {publisher.submit(i);}// 发布者完成发布publisher.close();}@Overridepublic void subscribe(Flow.Subscriber<? super Integer> subscriber) {publisher.subscribe(subscriber);}
}class SimpleSubscriber implements Flow.Subscriber<Integer> {private Flow.Subscription subscription;@Overridepublic void onSubscribe(Flow.Subscription subscription) {this.subscription = subscription;// 请求订阅者处理的元素数量subscription.request(1);}@Overridepublic void onNext(Integer item) {System.out.println("Received item: " + item);// 处理完一个元素后请求下一个subscription.request(1);}@Overridepublic void onError(Throwable throwable) {System.err.println("Error occurred: " + throwable.getMessage());}@Overridepublic void onComplete() {System.out.println("Processing completed.");}
}public class ReactiveStreamsExample {public static void main(String[] args) throws InterruptedException {// 创建发布者和订阅者SimplePublisher simplePublisher = new SimplePublisher();SimpleSubscriber simpleSubscriber = new SimpleSubscriber();// 订阅者订阅发布者simplePublisher.subscribe(simpleSubscriber);// 发布者发布数据simplePublisher.publishItems();// 睡一觉,确保数据处理完成Thread.sleep(3000);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/608447.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

3D人体姿态估计

3D人体姿态估计是指通过算法对输入的图像或视频进行分析&#xff0c;推断出人体的三维姿态信息。该技术可以应用于许多领域&#xff0c;如虚拟现实、运动分析、人机交互等。 1. 算法原理&#xff1a; 3D人体姿态估计利用深度学习模型作为算法的核心&#xff0c;通过网络学习人…

html js加载本地文件报错处理,跨域问题

这个问题是怎么来的&#xff1f;我写了一个本地html文件&#xff0c;里面通过three.js加载并显示一个本地三维模型&#xff0c;结果报错了。 报错如下&#xff1a; Access to XMLHttpRequest at file:///C:/model/quater.mtl from origin null has been blocked by CORS poli…

是面试官放水,还是公司实在是太缺人?这都没挂,字节原来这么容易进....

“字节是大企业&#xff0c;是不是很难进去啊&#xff1f;” “在字节做软件测试&#xff0c;能得到很好的发展吗&#xff1f; 一进去就有11.5K&#xff0c;其实也没有想的那么难” 直到现在&#xff0c;心情都还是无比激动&#xff01; 本人211非科班&#xff0c;之前在字节和…

uni-app发版及分包要求

uni-app发版及分包要求 发版 注意&#xff0c;小程序的接口不允许http&#xff0c;只支持https。仅仅是https还不够&#xff0c;正式版和体验版上的接口功能实现还需要将接口地址添加到开发管理——开发设置——服务器域名——request合法域名中去。否则&#xff0c;手机预览…

Spark---RDD(双值类型转换算子)

文章目录 1.RDD双值类型算子1.1 intersection1.2 union1.3 subtract1.4 zip 1.RDD双值类型算子 RDD双Value算子就是对两个RDD进行操作或行动&#xff0c;生成一个新的RDD。 1.1 intersection 对源 RDD 和参数 RDD 求交集后返回一个新的 RDD 函数定义&#xff1a; def inters…

解读 Sobit v2:铭文资产跨链更注重安全、易用性

铭文市场的发展正在从早期的“无序”进入到“有序”阶段&#xff0c;我们看到从 12 月份以来&#xff0c;比特币生态内的多个应用纷纷宣布获得融资。这表明&#xff0c;目前仍旧有大量的资金有意向铭文领域&#xff0c;同样铭文赛道新一轮浪潮或许正在酝酿。 另一方面&#xff…

【设计模式-01】Singleton单利模式

一、方式1(最常用&#xff0c;推荐使用) 单例实现方式一: 饿汉式 类加载到内存后&#xff0c;就实例化一个单例&#xff0c;JVM保证线程安全 简单实用&#xff0c;推荐使用。 唯一缺点: 不管用到与否&#xff0c;类装载时就完成加载。 /*** description: 单例实现方式一: 饿汉…

Java 求2个整数,3个整数 的 10等分比例值

10等份取整比 比如 1.5 &#xff1a; 4 &#xff1a; 4.5 会变成 1&#xff1a;4&#xff1a;5 &#xff0c;当然小数后一位的四舍五入是向上还是向下去整&#xff0c;这个根据自己需要调整即可。 代码 &#xff1a; public static Integer getIntTenPerNum(Integer nu…

YOLOv8改进 | Neck篇 | 利用ASF-YOLO改进特征融合层(适用于分割和目标检测)

一、本文介绍 本文给大家带来的改进机制是ASF-YOLO(发布于2023.12月份的最新机制),其是特别设计用于细胞实例分割。这个模型通过结合空间和尺度特征,提高了在处理细胞图像时的准确性和速度。在实验中,ASF-YOLO在2018年数据科学竞赛数据集上取得了卓越的分割准确性和速度,…

Java项目:115SSM宿舍管理系统

博主主页&#xff1a;Java旅途 简介&#xff1a;分享计算机知识、学习路线、系统源码及教程 文末获取源码 一、项目介绍 宿舍管理系统基于SpringSpringMVCMybatis开发&#xff0c;系统主要功能如下&#xff1a; 学生管理班级管理宿舍管理卫生管理维修登记访客管理 二、技术框…

网络安全新形势下的动态防御体系研究(上)

文章目录 前言一、网络安全的趋势二、网络安全背景&#xff08;一&#xff09;整体形势对网络安全防护提出新挑战&#xff08;二&#xff09;发展对网络安全防护提出新目标 三、网络安全现状分析&#xff08;一&#xff09;国外网络安全现状分析&#xff08;二&#xff09;国内…

短视频实景直播源码+短视频矩阵+多平台分发技术搭建

建立一个短视频实景直播平台&#xff0c;需要以下几个关键组成部分&#xff1a; 短视频实景直播源码&#xff1a;需要开发或购买适用于短视频实景直播的源码。这个源码可以包括实时视频流的采集和传输、直播界面的展示、弹幕功能、礼物打赏等特色功能。可以使用常见的开发框架如…

【教程】代码混淆详解

【教程】代码混淆详解 本文将对代码混淆进行详细解释&#xff0c;并介绍ProGuard代码混淆器以及Ipa Guard工具的使用方法。首先&#xff0c;我们将了解代码混淆的概念和作用&#xff0c;然后深入讨论ProGuard混淆文件的参数设置以及代码混淆的方法。接着&#xff0c;我们将介绍…

解决spring-session-data-redis包redis的session失效时间设置失败问题

这个属于是本人问题&#xff0c;小脑萎缩了 我使用了 EnableRedisHttpSession 这个注解 经过查询这个注解是需要过期时间的 EnableRedisHttpSession(maxInactiveIntervalInSeconds 3600,redisNamespace "tl") 像这样 可以在参数中设置过期时间&#xff0c;只要你…

Java_Swing程序设计

swing组件允许编程人员在跨平台时指定统一的外观和风格。 Swing组件通常被称为轻量级组件&#xff0c; JFrame在程序中的语法格式&#xff1a; JFrame jfnew JFrame(title); Container containerjf.getContentPane(); jf:JFrame类的对象 container:Container类的对象。 J…

腾讯云优惠券怎么获取(腾讯云优惠券在哪领取)

随着云计算技术的快速发展&#xff0c;越来越多的企业开始选择使用云服务来降低成本、提高效率。腾讯云作为国内领先的云服务提供商之一&#xff0c;也提供了丰富的优惠券政策来吸引更多的用户。本文将介绍如何获取腾讯云的优惠券&#xff0c;以及如何使用这些优惠券来获得更好…

基于SpringBoot的康复中心管理系统 JAVA简易版

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 普通用户模块2.2 护工模块2.3 管理员模块 三、系统展示四、核心代码4.1 查询康复护理4.2 新增康复训练4.3 查询房间4.4 查询来访4.5 新增用药 五、免责说明 一、摘要 1.1 项目介绍 基于JAVAVueSpringBootMySQL的康复中…

A借助AI工具提升电子邮件营销内容效果

随着互联网的普及和电子邮件的广泛应用&#xff0c;邮件营销已成为企业推广产品和服务的重要手段之一。为了提高邮件营销的效果&#xff0c;我们需要关注邮件内容的质量和吸引力。而百度文言一心等AI工具作为一款强大的在线写作工具&#xff0c;可以帮助我们提升邮件营销内容的…

MySql01:初识

1.mysql数据库2.配置环境变量3. 列的类型和属性&#xff0c;索引&#xff0c;注释3.1 类型3.2 属性3.3 主键(主键索引)3.4 注释 4.结构化查询语句分类&#xff1a;5.列类型--表列类型设置 1.mysql数据库 数据库&#xff1a; ​ 数据仓库&#xff0c;存储数据&#xff0c;以前我…

重置 Docker 中 Gitlab 的账号密码

1、首先进入Docker容器 docker exec -it gitlab bash 2、连接到 gitlab 的数据库 需要谨慎操作 gitlab-rails console -e production 等待加载完后会进入控制台 ------------------------------------------------------------------------------------------------------…