java的反应式流

Java的反应式流是一种新的编程模型,它在异步和事件驱动的环境下工作。反应式流的目的是为了解决传统的单线程或者多线程编程模型在高并发和大流量情况下的性能瓶颈。

反应式流的核心是Observable和Observer,Observable表示一个数据流,而Observer则表示这个数据流的消费者。Observable在数据流上产生事件,而Observer则对这些事件进行响应。反应式流的数据流是一种推式的流,Observable发布事件时不需要等待Observer接收,Observable会把事件推送给Observer,而不是Observer去轮询Observable。

Java的反应式流通常基于Reactor或RxJava等库,这些库提供了丰富的函数式编程API和运算符,可以非常方便地处理异步事件。这些库都提供了类似于Observable和Observer的抽象概念,可以用来描述和处理异步数据流。同时还提供了常用的运算符,包括map、filter、reduce等,这些运算符可以方便地对数据流进行变换和过滤。

反应式流还有一个重要的概念是背压(backpressure),它是指在高并发和大流量情况下,消费者无法处理生产者产生的数据流,导致数据积压的情况。为了解决这个问题,反应式流引入了背压机制,生产者会在发送数据前先询问消费者的处理能力,如果消费者没有处理能力,生产者会等待一段时间或者缓存数据,等待消费者处理完数据后再继续发送。

反应式流已经被广泛应用于大规模的互联网应用中,包括机器学习、数据分析、网络爬虫等领域。它的优点在于处理高并发和大流量的数据流时,能够更加高效地利用系统资源,提高系统的性能和可扩展性。

总之,反应式流是Java编程中的一个重要概念,它可以帮助我们更好地处理异步和事件驱动的数据流,提高系统的性能和可扩展性。

不涉及任何库,就单纯用java的反应式流,完成发布订阅者模式:

package com.example.jdk9.react;import java.util.concurrent.Flow.*;public class PublisherSubscriberDemo {public static void main(String[] args) {SimplePublisher<String> publisher = new SimplePublisher<>();SimpleSubscriber<String> subscriber1 = new SimpleSubscriber<>();SimpleSubscriber<String> subscriber2 = new SimpleSubscriber<>();publisher.subscribe(subscriber1);publisher.subscribe(subscriber2);publisher.submit("hello");publisher.submit("world");publisher.close();}
}class SimplePublisher<T> implements Publisher<T> {private Subscription subscription;@Overridepublic void subscribe(Subscriber<? super T> subscriber) {subscriber.onSubscribe(new Subscription() {@Overridepublic void request(long n) {}@Overridepublic void cancel() {// nothing to do}});this.subscription = new Subscription() {private boolean cancelled = false;@Overridepublic void request(long n) {// nothing to do}@Overridepublic void cancel() {this.cancelled = true;}public boolean isCancelled() {return this.cancelled;}};subscriber.onSubscribe(this.subscription);}public void submit(T item) {subscriptionLimitedQueue.offer(item);subscription.request(1);}public void close() {while (!subscriptionLimitedQueue.isEmpty()) {subscriptionLimitedQueue.poll();}subscription.cancel();}private SubscriptionLimitedQueue<T> subscriptionLimitedQueue = new SubscriptionLimitedQueue<>(2);static class SubscriptionLimitedQueue<T> {private final int limit;private int size = 0;private Node<T> head;private Node<T> tail;public SubscriptionLimitedQueue(int limit) {this.limit = limit;}private static class Node<T> {final T item;Node<T> next;Node(T item, Node<T> next) {this.item = item;this.next = next;}}public void offer(T item) {Node<T> node = new Node<>(item, null);if (head == null) {head = node;tail = head;} else {tail.next = node;tail = tail.next;}size++;if (size > limit) {Node<T> newHead = head.next;head.next = null;head = newHead;size--;}}public boolean isEmpty() {return size == 0;}public T poll() {if (isEmpty()) {return null;}T item = head.item;Node<T> newHead = head.next;head.next = null;head = newHead;size--;return item;}}
}class SimpleSubscriber<T> implements Subscriber<T> {private Subscription subscription;@Overridepublic void onSubscribe(Subscription subscription) {this.subscription = subscription;System.out.println("订阅成功");subscription.request(1);}@Overridepublic void onNext(T item) {System.out.println("Received item: " + item);subscription.request(1);}@Overridepublic void onError(Throwable throwable) {throwable.printStackTrace();}@Overridepublic void onComplete() {System.out.println("Done");}
}

这段代码演示了使用Flow API来发布和订阅消息的过程,它包含以下类和接口:

  1. Publisher<T>:发布者接口,表示能够发布指定类型的消息给订阅者。
  2. Subscriber<T>:订阅者接口,表示能够接收指定类型的消息。
  3. Subscription:订阅接口,表示订阅关系,能够请求一定数量的消息和取消订阅。
  4. SubmissionPublisher<T>:继承自Publisher<T>接口,实现了异步发布消息的能力。
  5. Flow API:一组用于处理数据流和异步操作的接口和类。

具体解释:

  1. SimplePublisher类是一个实现了Publisher接口的简单发布者类,它能够发布指定类型的消息给订阅者。它内部维护了一个SubscriptionLimitedQueue类的对象,用于限制消息队列的长度。
  2. SubscriptionLimitedQueue类是一个维护队列长度的类,用于实现限制消息队列长度的功能。
  3. SimpleSubscriber类是一个实现了Subscriber接口的简单订阅者类,它能够接收指定类型的消息,并将其输出到控制台中。
  4. main方法创建了一个SimplePublisher类的实例和一个SimpleSubscriber类的实例,然后将它们关联起来,最后向SimplePublisher类的实例中发布了两个消息,随后关闭了发布者。

运行结果:

例子:

第一步,引入依赖:

<dependency><groupId>io.projectreactor</groupId><artifactId>reactor-core</artifactId><version>3.5.11</version></dependency>

第二步,编写代码:

package com.example.jdk9.react;import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;public class ReactiveStreamExample {public static void main(String[] args) {Flux<Integer> stream = Flux.range(1, 10);stream.map(i -> i * 2).filter(i -> i % 3 == 0).flatMap(i -> Mono.just(i).zipWith(Mono.just(i * 3))).subscribe(System.out::println);}
}

上面的代码首先创建了一个从1到10的数字列表,然后通过map操作符将每个元素乘以2,再使用filter操作符过滤掉不能被3整除的元素。接下来,使用flatMap操作符来创建一个新的流,该流将原始元素和该元素乘以3的结果合并在一起。最后,使用subscribe方法来订阅这个流并打印出每个元素的值。

这个例子展示了Reactor库中的一些常见操作符,包括mapfilterflatMap。通过这些操作符的链式调用,我们可以轻松地对数据流进行复杂的操作。在实际的应用中,我们可以根据具体的需求选择不同的操作符来实现所需的数据处理逻辑。

使用Reactor 库实现发布订阅者模式:

package com.example.jdk9.react;import reactor.core.publisher.Flux;public class PublisherSubscriberExample {public static void main(String[] args) {// 创建发布者Flux<Integer> publisher = Flux.just(1, 2, 3, 4, 5);// 订阅者1:打印每个元素publisher.subscribe(System.out::println);// 订阅者2:计算元素的总和并打印publisher.reduce(0, Integer::sum).subscribe(total -> System.out.println("Sum = " + total));}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/134258.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【CSDN 每日一练 ★☆☆】【链表】删除排序链表中的重复元素

【CSDN 每日一练 ★☆☆】【链表】删除排序链表中的重复元素 链表 递归 题目 存在一个按升序排列的链表&#xff0c;给你这个链表的头节点 head &#xff0c;请你删除所有重复的元素&#xff0c;使每个元素 只出现一次 。 返回同样按升序排列的结果链表。 示例 示例 1&am…

ActiveMq学习⑨__基于zookeeper和LevelDB搭建ActiveMQ集群

引入消息中间件后如何保证其高可用&#xff1f; 基于zookeeper和LevelDB搭建ActiveMQ集群。集群仅提供主备方式的高可用集群功能&#xff0c;避免单点故障。 http://activemq.apache.org/masterslave LevelDB&#xff0c;5.6版本之后推出了LecelDB的持久化引擎&#xff0c;它使…

基于Qt QProcess获取linux启动的程序、QScreen 截屏、GIF动画实现

在Linux中,可以使用QProcess类来获取已启动的程序。以下是一个示例代码: #include <QCoreApplication>#include <QProcess>int main(int argc, char *argv[]){QCoreApplication a(argc, argv); // 创建一个QProcess对象 QProcess process; // 设置执行…

kubernetes集群编排——k8s调度

nodename vim nodename.yaml apiVersion: v1 kind: Pod metadata:name: nginxlabels:app: nginxspec:containers:- name: nginximage: nginxnodeName: k8s2 nodeName: k8s2 #找不到节点pod会出现pending&#xff0c;优先级最高 kubectl apply -f nodename.yamlkubectl get pod …

Linux之打印函数调用依赖关系(六十一)

简介&#xff1a; CSDN博客专家&#xff0c;专注Android/Linux系统&#xff0c;分享多mic语音方案、音视频、编解码等技术&#xff0c;与大家一起成长&#xff01; 优质专栏&#xff1a;Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 人生格言&#xff1a; 人生…

linux地址空间

地址空间 内存空间示意图虚拟地址空间虚拟地址进程地址空间生命周期图解为什么要有地址空间呢&#xff1f; 小结 内存空间示意图 进程是在内存中运行的&#xff0c;为了便于管理&#xff0c;不同的数据会存储在不同的区域&#xff0c;因此内存就被分为几部分&#xff0c;如下图…

微型计算机原理1

一、选择题 1.8086CPU的字长是&#xff08;&#xff09;位。 A. 32 B. 128 C. 64 D. 16 2 间接寻址方式中&#xff0c;操作数在(&#xff09;中。 A. 通用寄存器 B. 内存单元 C. 程序计数器 D.堆栈 3.在循环指令LOOP和串操作指令中,用作计数器的寄存器是() A. AX B. BX C. C…

软件测试/测试开发丨如何利用ChatGPT自动生成测试用例思维导图

点此获取更多相关资料 简介 思维导图是一种用图形方式表示思维和概念之间关系的工具&#xff1a; 有些公司会使用思维导图编写测试用例&#xff0c;这样做的优点是&#xff1a; 1.可视化和结构化。 2.易于理解&#xff0c;提高效率。 而 ChatGPT 是无法直接生成 xmind 格式…

Linux--进程间通信

1.进程间通信 进程间通信的背景&#xff1a; 进程之间是相互独立的&#xff0c;进程由内核数据结构和它所对应的代码和数据&#xff0c;通信成本比较高。 进程间通信目的&#xff1a; 数据传输&#xff1a;一个进程需要将它的数据发送给另一个进程 资源共享&#xff1a;多个进程…

[ Linux Busybox ] flash_eraseall 命令解析

文章目录 相关结构体flash_eraseall 函数实现flash_eraseall 实现流程图 文件路径&#xff1a;busybox-1.20.2/miscutils/flash_eraseall.c 相关结构体 MTD 相关信息结构体 struct mtd_info_user {__u8 type; // MTD 设备类型__u32 flags; // MTD设…

14.序列化和文件的输入/输出 保存对象

14.1 保存对象状态 你已经奏出完美的乐章&#xff0c;现在会想把它储存起来。你可以抓个文房四宝把它记下来&#xff0c;但也可以按下储存按钮(或按下File菜单上的Save)。然后你帮文件命名&#xff0c;并希望这个文件不会让屏幕变成蓝色的画面。 储存状态的选择有很多种&…

App备案-iOS云管理式证书 Distribution Managed 公钥及证书SHA-1指纹的获取方法

​ 根据近日工业和信息化部发布的《工业和信息化部关于开展移动互联网应用程序备案工作的通知》&#xff0c;相信不少要进行IOS平台App备案的朋友遇到了一个问题&#xff0c;就是apple不提供云管理式证书的下载&#xff0c;也就无法获取公钥及证书SHA-1指纹。 ​ 已经上架的应用…

aosp定制android系统

目录 AOSP 准备工作(配置) 确定机型和版本 初始化 git安装 curl安装 同步源码 环境变量 创建aosp目录 指定同步版本 解下来安装编译需要的依赖 编译aosp源码 刷入系统 AOSP 全称 Android Open Source Project 是指Android开源项目&#xff0c;它是由Google主导的…

【有源码】基于uniapp的农场管理小程序springboot基于微信小程序的农场检测系统(源码 调试 lw 开题报告ppt)

&#x1f495;&#x1f495;作者&#xff1a;计算机源码社 &#x1f495;&#x1f495;个人简介&#xff1a;本人七年开发经验&#xff0c;擅长Java、Python、PHP、.NET、微信小程序、爬虫、大数据等&#xff0c;大家有这一块的问题可以一起交流&#xff01; &#x1f495;&…

算法:分治法-力扣题最大子数组和

文章目录 概念应用步骤实现过程-快速排序为例具体实现步骤&#xff1a;代码实现&#xff1a; 力扣-2586统计范围内的元音字符题解 概念 分治法是一种算法思想&#xff0c;其核心思想是将一个大问题分割成若干个小问题来解决。通过对小问题的分别计算&#xff0c;最终得到大问题…

SEO是什么?独立站如何进行SEO优化

创建一个独立网站并不是难事&#xff0c;但要做好独立网站并进行SEO优化以增加自然流量可能是一个不小的挑战。今天&#xff0c;我们将分享一些关于独立网站SEO优化的技巧&#xff0c;并详细探讨如何提升流量。 在本文中&#xff0c;我们将主要关注谷歌SEO&#xff0c;但请不要…

2000-2022年上市公司专利申请、创新绩效数据

2000-2022年上市公司专利申请、创新绩效数据 1、时间&#xff1a;2000-2022年 2、指标&#xff1a;年份、股票代码、股票简称、行业名称、行业代码、省份、城市、区县、行政区划代码、城市代码、区县代码、首次上市年份、上市状态、专利申请总量、发明专利申请总量、实用新型…

Linux--vim

文章目录 Vim的介绍Vim的几种模式命令模式下的基本操作批量化注释Vim的简单配置使用插件 Vim的介绍 Vim是一个强大的文本编辑器&#xff0c;是从vi编辑器发展而来的&#xff0c;在vi编辑器的基础上进行了改进和拓展&#xff0c;具有强大的特性和功能。 Vim是一个自由开源软件&…

技术分享 | app自动化测试(Android)--显式等待机制

WebDriverWait类解析 WebDriverWait 用法代码 Python 版本 WebDriverWait( driver,timeout,poll_frequency0.5,ignored_exceptionsNone) 参数解析&#xff1a; driver&#xff1a;WebDriver 实例对象 timeout: 最长等待时间&#xff0c;单位秒 poll_frequency: 检测的间…

MySQL 8.0 Clone Plugin 详解

文章目录 前言1. 克隆插件安装2. 克隆插件的使用2.1 本地克隆2.2 远程克隆 3. 克隆任务监控4. 克隆插件实现4.1 Init 阶段4.2 File Copy4.3 Page Copy4.4 Redo Copy4.5 Done 5. 克隆插件的限制6. 克隆插件与 Xtrabackup 的异同7. 克隆插件相关参数后记 前言 克隆插件&#xff…