Java 9 响应式流(Reactive Streams)

前言

首先出现响应式编程理念,然后出现响应式编程实现,再然后出现响应式规范,响应流主要解决处理元素流的问题—如何将元素流从发布者传递到订阅者,不而不需要发布者阻塞,或者要求订阅者有无限的缓冲区,有限的缓冲区在到达缓冲上界的时候,对到达的元素进行丢弃或者拒绝,订阅者可以异步通知发布者降低或提升数据生产发布的速率,它是响应式编程实现效果的核心特点。

而响应式规范则是一种倡议,遵循此倡议的系统可以让数据在各个响应式系统中都实现响应式的处理数据,规范在Java中的形式就是接口,也就是我们本篇的主题Flow 类,对于一项标准而言,它的目的自然是用更少的协议来描述交互。

Java 9 的Flow统一了规范

响应式流从2013年开始,作为提供非阻塞背压的异步流处理标准的倡议。 它旨在解决处理元素流的问题——如何将元素流从发布者传递到订阅者,而不需要发布者阻塞,或订阅者有无限制的缓冲区或丢弃。

响应式流模型非常简单——订阅者向发布者发送多个元素的异步请求。 发布者向订阅者异步发送多个或稍少的元素。

在2015年,出版了用于处理响应式流的规范和Java API。 有关响应式流的更多信息,请访问http://www.reactive-streams.org/。 Java API 的响应式流只包含四个接口:

Publisher<T>
Subscriber<T>
Subscription
Processor<T,R>

发布者(publisher)是潜在无限数量的有序元素的生产者。 它根据收到的要求向当前订阅者发布(或发送)元素。

订阅者(subscriber)从发布者那里订阅并接收元素。 发布者向订阅者发送订阅令牌(subscription token)。 使用订阅令牌,订阅者从发布者哪里请求多个元素。 当元素准备就绪时,发布者向订阅者发送多个或更少的元素。 订阅者可以请求更多的元素。 发布者可能有多个来自订阅者的元素待处理请求。

订阅(subscription)表示订阅者订阅的一个发布者的令牌。 当订阅请求成功时,发布者将其传递给订阅者。 订阅者使用订阅令牌与发布者进行交互,例如请求更多的元素或取消订阅。

处理者(processor)充当订阅者和发布者的处理阶段。 Processor接口继承了PublisherSubscriber接口。 它用于转换发布者——订阅者管道中的元素。 Processor<T,R>订阅类型T的数据元素,接收并转换为类型R的数据,并发布变换后的数据。 

JDK 9在java.util.concurrent包中提供了一个与响应式流兼容的API,它在java.base模块中。 API由两个类组成:

Flow
SubmissionPublisher<T>

Flow类是final的。 它封装了响应式流Java API和静态方法。 由响应式流Java API指定的四个接口作为嵌套静态接口包含在Flow类中:

Flow.Processor<T,R>
Flow.Publisher<T>
Flow.Subscriber<T>
Flow.Subscription

这四个接口包含与上面代码所示的相同的方法。 Flow类包含defaultBufferSize()静态方法,它返回发布者和订阅者使用的缓冲区的默认大小。 目前,它返回256。

SubmissionPublisher<T>类是Flow.Publisher<T>接口的实现类。 该类实现了AutoCloseable接口,因此可以使用try-with-resources块来管理其实例。 JDK 9不提供Flow.Subscriber<T>接口的实现类; 需要自己实现。 但是,SubmissionPublisher<T>类包含可用于处理此发布者发布的所有元素的consume(Consumer<? super T> consumer)方法。

简单的案例

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.function.Consumer;/*** 使用java9 Flow中的API实现响应式编程*/
public class Reactor03 {static class SampleSubScriber<T> implements Flow.Subscriber<T> {final Consumer<? super T> consumer;Flow.Subscription subscription;SampleSubScriber(Consumer<? super T> consumer) {this.consumer = consumer;}@Overridepublic void onSubscribe(Flow.Subscription subscription) {System.out.println("建立订阅关系");this.subscription = subscription;// 订阅者只处理两条信息subscription.request(2);}@Overridepublic void onNext(T item) {System.out.println("收到发送者的消息"+ item);consumer.accept(item);// 消费过后,还可以继续要求发布者发送消息// subscription.request(2);// 如果不要求发布者发送消息,由于onSubscribe()设置了只处理2条消息,那边发布者就不会继续发布第三条了。}@Overridepublic void onError(Throwable throwable) {throwable.printStackTrace();}@Overridepublic void onComplete() {}}public static void main(String[] args) {SampleSubScriber subScriber = new SampleSubScriber<>(msg -> System.out.println("hello ...... " + msg));ExecutorService executorService = Executors.newFixedThreadPool(1);SubmissionPublisher<Boolean> submissionPublisher = new SubmissionPublisher<>(executorService, Flow.defaultBufferSize());submissionPublisher.subscribe(subScriber);submissionPublisher.submit(true);submissionPublisher.submit(true);submissionPublisher.submit(true);executorService.shutdown();}
}

感谢下述博文的作者:

参考资料:

https://www.cnblogs.com/IcanFixIt/p/7245377.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/693355.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

GitCode配置ssh

下载SSH windows设置里选“应用” 选“可选功能” 添加功能 安装这个 坐等安装&#xff0c;安装好后可以关闭设置。 运行 打开cmd 执行如下指令&#xff0c;启动SSH服务。 net start sshd设置开机自启动 把OpenSSH服务添加到Windows自启动服务中&#xff0c;可避免每…

java中实体pojo对于布尔类型属性命名尽量别以is开头,否则 fastjson可能会导致属性读取不到

假如我们有一个场景&#xff0c;就是需要将一个对象以字符串的形式&#xff0c;也就是jsonString存到一个地方&#xff0c;比如mysql&#xff0c;或者redis的String结构。现在有一个实体&#xff0c;我们自己创建的&#xff0c;叫做CusPojo.java 有两个属性是布尔类型的&#x…

STM32引脚重定义问题

最近在搞资源管理&#xff0c;发现有些引脚不能用 比如这个PE引脚。我想用他输出PWM&#xff0c;但是不能用&#xff0c;我也重定义了&#xff0c;还是不能用。回去翻看了技术手册。 RCC_APB2PeriphClockCmd(RCC_APB2Periph_AFIO, ENABLE); //重映射引脚功能&#xff0c;需…

C语言中各种运算符用法

C语言中有许多不同的运算符&#xff0c;用于执行各种不同的操作。 以下是C语言中常见的运算符及其用法&#xff1a; 算术运算符&#xff1a; 加法运算符&#xff08;&#xff09;&#xff1a;用于将两个值相加。减法运算符&#xff08;-&#xff09;&#xff1a;用于将一个值减…

NoSQL 数据库管理工具,搭载强大支持:Redis、Memcached、SSDB、LevelDB、RocksDB,为您的数据存储提供无与伦比的灵活性与性能!

NoSQL 数据库管理工具&#xff0c;搭载强大支持&#xff1a;Redis、Memcached、SSDB、LevelDB、RocksDB&#xff0c;为您的数据存储提供无与伦比的灵活性与性能&#xff01; 【官网地址】&#xff1a;http://www.redisant.cn/nosql 介绍 直观的用户界面 从单一应用程序中同…

String 必知必会底层逻辑

String 是不可变的 String 类中使用 final 关键字修饰字符数组来保存字符串 public final class String implements java.io.Serializable, Comparable<String>, CharSequence {private final char value[];//... }final关键字的作用&#xff1a; 不可变性&#xff1a…

openai DALL-E 3 论文 提升图像生成的关键:更好的图像描述

摘要 我们展示了通过训练高度描述性的生成图像标题&#xff0c;可以显着改善文本到图像模型的提示跟随能力。 现有的文本到图像模型在跟随详细的图像描述方面存在困难&#xff0c;经常忽略单词或混淆提示的含义。 我们假设这个问题源于训练数据集中存在嘈杂和不准确的图像标…

阅读笔记(ECCV2020)Content-Aware Unsupervised Deep Homography Estimation

Zhang J, Wang C, Liu S, et al. Content-aware unsupervised deep homography estimation[C]//Computer Vision–ECCV 2020: 16th European Conference, Glasgow, UK, August 23–28, 2020, Proceedings, Part I 16. Springer International Publishing, 2020: 653-669. Part…

人工智能|深度学习——基于对抗网络的室内定位系统

代码下载&#xff1a; 基于CSI的工业互联网深度学习定位.zip资源-CSDN文库 摘要 室内定位技术是工业互联网相关技术的关键一环。该技术旨在解决于室外定位且取得良好效果的GPS由于建筑物阻挡无法应用于室内的问题。实现室内定位技术&#xff0c;能够在真实工业场景下实时追踪和…

w29pikachu-ssrf实例

实验环境 php&#xff1a;7.3.4nts apache&#xff1a;2.4.39 浏览器&#xff1a;谷歌实验步骤 ssrf&#xff08;curl&#xff09; 打开ssrf(curl) 点击文字&#xff0c;跳转404页面&#xff0c;从反馈信息来看是找不到对应的页面。 查看源码&#xff0c;发现有个RD变量影…

【Flink网络通讯(一)】Flink RPC框架的整体设计

文章目录 1. Akka基本概念与Actor模型2. Akka相关demo2.1. 创建Akka系统2.2. 根据path获取Actor并与之通讯 3. Flink RPC框架与Akka的关系4.运行时RPC整体架构设计5. RpcEndpoint的设计与实现 我们从整体的角度看一下Flink RPC通信框架的设计与实现&#xff0c;了解其底层Akka通…

利用nbsp设置空格

想要实现上面效果&#xff0c;一开始直接<el-col :span"8" >{{ item.name }} </el-col> 或者<el-col :span"8" >{{ item.name }}</el-col>或者<el-col :span"8" >{{ item.name }}</el-col> 都无…

深入浅出JVM(三)之HotSpot虚拟机类加载机制

HotSpot虚拟机类加载机制 类的生命周期 什么叫做类加载? 类加载的定义: JVM把描述类的数据从Class文件加载到内存,并对数据进行校验,解析和初始化,最终变成可以被JVM直接使用的Java类型(因为可以动态产生,这里的Class文件并不是具体存在磁盘中的文件,而是二进制数据流) 一个…

善于利用GPT确实可以解决许多难题

当我设计一个导出Word文档的功能时&#xff0c;我面临了一个挑战。在技术选型时&#xff0c;我选择了poi-tl这个模板引擎&#xff0c;因为在网上看到了很多关于它的推荐。poi-tl可以根据模板快速导出Word文档。虽然之前没有做过类似的功能&#xff0c;而且项目中也没有用过&…

开年喜报!Walrus成功入选CNCF云原生全景图

近日&#xff0c;数澈软件 Seal &#xff08;以下简称“Seal”&#xff09;旗下开源应用管理平台 Walrus 成功入选云原生计算基金会全景图&#xff08;CNCF Landscape&#xff09;并收录至 “App Definition and Development - Application Definition & Image Build”板块…

go语言内存泄漏检查工具

和其它语言一样&#xff0c;go语言也提供了一些内存泄漏分析的工具&#xff0c;用来帮助查找和分析内存泄漏问题。有以下一些常用的工具和技术&#xff1a; 1、go tool pprof&#xff1a; Go内置了一个性能分析工具&#xff08;pprof&#xff09;&#xff0c;它可以用于分析内…

Encoder-decoder 与Decoder-only 模型之间的使用区别

承接上文&#xff1a;Transformer Encoder-Decoer 结构回顾 笔者以huggingface T5 transformer 对encoder-decoder 模型进行了简单的回顾。 由于笔者最近使用decoder-only模型时发现&#xff0c;其使用细节和encoder-decoder有着非常大的区别&#xff1b;而huggingface的借口为…

热阻基础理论 --NMOS温度评估

热阻基础理论 器件 温度差 功率 * 热阻 MOS应用实例 1.假如MOS管悬挂或者外壳贴到散热器上&#xff0c;就意味着用CASE到空气的散热热阻会很大&#xff0c; 如下图中的20。 2. 假如MOS管金属面焊接到PCB上&#xff0c;就意味着用CASE到空气的散热热阻会很校&#xff0c; 如…

spark 的group by ,join数据倾斜调优

背景 spark任务中最常见的耗时原因就是数据分布不均匀&#xff0c;从而导致有些task运行时间很长&#xff0c;长尾效应导致的整个job运行耗时很长 数据倾斜调优 首先我们要定位数据倾斜&#xff0c;我们可以通过在spark ui界面中查看某个stage下的task的耗时&#xff0c;如果…

浅谈数仓发展

引言 随着信息化时代的到来&#xff0c;数据成为企业最宝贵的资产之一。为了更好地管理和利用数据&#xff0c;数仓&#xff08;Data Warehouse&#xff09;作为数据管理的核心架构扮演着至关重要的角色。本文将深入探讨数仓的历史、发展以及未来趋势&#xff0c;分析传统数仓和…