JDK-反应流(响应式流)

归档

  • GitHub: JDK-反应流(响应式流)

使用示例

  • https://github.com/zengxf/small-frame-demo/blob/master/multi-thread/reactive-test/reactor-demo/src/main/java/cn/zxf/reactor_demo/jdk/PubSubTest.java

JDK 版本

openjdk version "17" 2021-09-14
OpenJDK Runtime Environment (build 17+35-2724)
OpenJDK 64-Bit Server VM (build 17+35-2724, mixed mode, sharing)

原理

关键类

  • java.util.concurrent.SubmissionPublisher
// 提交式-发布者
public class SubmissionPublisher<T> implements Publisher<T>, AutoCloseable {BufferedSubscription<T> clients;    // 客户端(BufferedSubscriptions)链表final ReentrantLock lock;           // 锁定以排除多个源volatile boolean closed;            // 运行状态,仅在锁内更新boolean subscribed; // 在第一次调用订阅时设置 true,以初始化可能的拥有者Thread owner;       // 第一个要订阅的调用者线程,如果线程发生更改则为 nullvolatile Throwable closedException; // closeExceptionally 中的异常final Executor executor;            // 用于构造 BufferedSubscriptions 的参数final BiConsumer<? super Subscriber<? super T>, ? super Throwable> onNextHandler;final int maxBufferCapacity;// 调用入口 ref: sign_demo_001public SubmissionPublisher() {this(ASYNC_POOL, Flow.defaultBufferSize(), null);   // ref: sign_cm_002}// sign_cm_002public SubmissionPublisher(Executor executor, int maxBufferCapacity,BiConsumer<? super Subscriber<? super T>, ? super Throwable> handler) {... // 参数校验this.lock = new ReentrantLock();this.executor = executor;   // def: ForkJoinPoolthis.onNextHandler = handler;this.maxBufferCapacity = roundCapacity(maxBufferCapacity);  // def: 256}
}

订阅

  • java.util.concurrent.SubmissionPublisher
    // 订阅(添加订阅者)。调用入口 ref: sign_demo_010public void subscribe(Subscriber<? super T> subscriber) {if (subscriber == null) throw new NullPointerException();ReentrantLock lock = this.lock;int max = maxBufferCapacity;// INITIAL_CAPACITY = 32, 默认情况下计算完数组长度为 32Object[] array = new Object[max < INITIAL_CAPACITY ? max : INITIAL_CAPACITY]; // 初始化订阅关系,ref: sign_c_110 | sign_cm_110BufferedSubscription<T> subscription = new BufferedSubscription<T>(subscriber, executor, onNextHandler, array, max);lock.lock();try {if (!subscribed) {subscribed = true;owner = Thread.currentThread();}for (BufferedSubscription<T> b = clients, pred = null;;) {if (b == null) {    // 还没有初始化链Throwable ex;subscription.onSubscribe();if ((ex = closedException) != null)subscription.onError(ex);   // 有异常else if (closed)subscription.onComplete();  // 已关闭else if (pred == null)clients = subscription;     // 初始化链elsepred.next = subscription;   // 加入链break;}BufferedSubscription<T> next = b.next;if (b.isClosed()) {   // removeb.next = null;    // detachif (pred == null)clients = next;elsepred.next = next;}else if (subscriber.equals(b.subscriber)) {// 不能重复添加b.onError(new IllegalStateException("Duplicate subscribe"));break;}elsepred = b;   // 方便后面的加入链b = next;       // 方便遍历链}} finally {lock.unlock();}}
  • java.util.concurrent.SubmissionPublisher.BufferedSubscription
    // sign_c_110 订阅关系static final class BufferedSubscription<T> implements Subscription, ForkJoinPool.ManagedBlocker {long timeout;                      // Long.MAX_VALUE if untimed waitint head;                          // next position to takeint tail;                          // next position to putfinal int maxCapacity;             // max buffer sizevolatile int ctl;                  // atomic run state flagsObject[] array;                    // bufferfinal Subscriber<? super T> subscriber;final BiConsumer<? super Subscriber<? super T>, ? super Throwable> onNextHandler;Executor executor;                 // null on errorBufferedSubscription<T> next;      // 组装链 // sign_cm_110BufferedSubscription(Subscriber<? super T> subscriber,   // 自定义的订阅者Executor executor,      // ForkJoinPoolBiConsumer<? super Subscriber<? super T>, ? super Throwable> onNextHandler, // nullObject[] array,         // len: 32int maxBufferCapacity   // 256) {this.subscriber = subscriber;this.executor = executor;this.onNextHandler = onNextHandler;this.array = array;this.maxCapacity = maxBufferCapacity;}}

提交数据

  • java.util.concurrent.SubmissionPublisher
    // 提交数据。调用入口 ref: sign_demo_020public int submit(T item) {return doOffer(item, Long.MAX_VALUE, null); // ref: sign_m_210}// sign_m_210private int doOffer(T item, long nanos, BiPredicate<Subscriber<? super T>, ? super T> onDrop) {if (item == null) throw new NullPointerException();int lag = 0;boolean complete, unowned;ReentrantLock lock = this.lock;lock.lock();try {Thread t = Thread.currentThread(), o;BufferedSubscription<T> b = clients;if ((unowned = ((o = owner) != t)) && o != null)owner = null;                     // disable biasif (b == null)complete = closed;else {  // 有订阅者才做处理complete = false;boolean cleanMe = false;BufferedSubscription<T> retries = null, rtail = null, next;do {next = b.next;int stat = b.offer(item, unowned);  // 依次发给订阅者句柄,ref: sign_m_220...} while ((b = next) != null);   // 遍历链...}} finally {lock.unlock();}...}
  • java.util.concurrent.SubmissionPublisher.BufferedSubscription
        // sign_m_220 添加到队列final int offer(T item, boolean unowned) {Object[] a;int stat = 0, cap = ((a = array) == null) ? 0 : a.length;int t = tail, i = t & (cap - 1), n = t + 1 - head;if (cap > 0) {boolean added;if (n >= cap && cap < maxCapacity)  // resize (扩容)added = growAndOffer(item, a, t);else if (n >= cap || unowned)       // need volatile CAS (CAS 替换)added = QA.compareAndSet(a, i, null, item);else {                              // can use release mode (设置值)QA.setRelease(a, i, item); added = true;}if (added) {        // 添加成功tail = t + 1;   // 改下标(可循环使用数组)stat = n;}}return startOnOffer(stat);  // 尝试启动,ref: sign_m_221}/*** sign_m_221 尝试在添加后启动消费者任务*/final int startOnOffer(int stat) {int c; // start or keep alive if requests exist and not activeif (((c = ctl) & (REQS | ACTIVE)) == REQS &&((c = getAndBitwiseOrCtl(RUN | ACTIVE)) & (RUN | CLOSED)) == 0)tryStart(); // 尝试启动,ref: sign_m_222...return stat;}// sign_m_222 尝试启动消费者任务final void tryStart() {try {Executor e;ConsumerTask<T> task = new ConsumerTask<T>(this);   // ref: sign_c_230if ((e = executor) != null)     // skip if disabled on errore.execute(task);            // 执行体,ref: sign_c_230} ... // catch}// sign_m_225 消费final void consume() {Subscriber<? super T> s;if ((s = subscriber) != null) {          // hoist checkssubscribeOnOpen(s); // 没开启,则开启并回调 Subscriber #onSubscribe() 方法,ref: sign_demo_110long d = demand;for (int h = head, t = tail;;) {int c, taken; boolean empty;if (((c = ctl) & ERROR) != 0) {closeOnError(s, null);      // 有异常,回调 Subscriber #onError() 方法,ref: sign_demo_130break;}else if ((taken = takeItems(s, d, h)) > 0) {    // 获取队列元素并处理,ref: sign_m_226head = h += taken;d = subtractDemand(taken);}...else if (t == (t = tail)) {      // stability checkif ((empty = (t == h)) && (c & COMPLETE) != 0) {closeOnComplete(s);      // 已完成,回调 Subscriber #onComplete() 方法,ref: sign_demo_140break;}...}}}}// sign_m_226 获取队列元素并处理final int takeItems(Subscriber<? super T> s, long d, int h) {Object[] a;int k = 0, cap;if ((a = array) != null && (cap = a.length) > 0) {int m = cap - 1, b = (m >>> 3) + 1;int n = (d < (long)b) ? (int)d : b;for (; k < n; ++h, ++k) {Object x = QA.getAndSet(a, h & m, null); // 获取元素...else if (!consumeNext(s, x)) // 通知订阅者,回调 Subscriber #onNext() 方法,ref: sign_demo_120break;}}return k;}
  • java.util.concurrent.SubmissionPublisher.ConsumerTask
    // sign_c_230 消费任务(通知订阅者)static final class ConsumerTask<T> extends ForkJoinTask<Void>implements Runnable, CompletableFuture.AsynchronousCompletionTask {final BufferedSubscription<T> consumer;ConsumerTask(BufferedSubscription<T> consumer) {this.consumer = consumer;}... // 其他方法// sign_c_230 执行体public final void run() { consumer.consume(); } // 消费,ref: sign_m_225}

关闭

  • 简单,略

背压

  • 看代码或调试时,没发现 publisher 暂停的代码,可用 JConsole 查看线程栈
...
java.base@17/jdk.internal.misc.Unsafe.park(Native Method)
... .locks.LockSupport.park(LockSupport.java:211)
... .SubmissionPublisher$BufferedSubscription.block(SubmissionPublisher.java:1495) // ref: sign_m_321
... .ForkJoinPool.unmanagedBlock(ForkJoinPool.java:3463)
... .ForkJoinPool.managedBlock(ForkJoinPool.java:3434)
... .SubmissionPublisher$BufferedSubscription.awaitSpace(SubmissionPublisher.java:1462) // ref: sign_m_320
...
  • java.util.concurrent.SubmissionPublisher
    // 提交数据。调用入口 ref: sign_demo_020private int doOffer(T item, long nanos, BiPredicate<Subscriber<? super T>, ? super T> onDrop) {int lag = 0;...try {Thread t = Thread.currentThread(), o;if ((unowned = ((o = owner) != t)) && o != null)...if (retries != null || cleanMe)lag = retryOffer(item, nanos, onDrop, retries, lag, cleanMe);   // 背压处理,ref: sign_m_310} ... // finally...}// sign_m_310 背压处理private int retryOffer(T item, long nanos,BiPredicate<Subscriber<? super T>, ? super T> onDrop,BufferedSubscription<T> retries, int lag,boolean cleanMe) {for (BufferedSubscription<T> r = retries; r != null;) {BufferedSubscription<T> nextRetry = r.nextRetry;r.nextRetry = null;if (nanos > 0L)r.awaitSpace(nanos);    // 等待,ref: sign_m_320...}...return lag;}
  • java.util.concurrent.SubmissionPublisher.BufferedSubscription
    static final class BufferedSubscription<T> implements Subscription, ForkJoinPool.ManagedBlocker {// sign_m_320 帮助或阻止直到超时、关闭或空间可用final void awaitSpace(long nanos) {if (!isReleasable()) {ForkJoinPool.helpAsyncBlocker(executor, this);if (!isReleasable()) {timeout = nanos;try {ForkJoinPool.managedBlock(this);    // 最终会调用 block() 进行阻塞,ref: sign_m_321} ... // catch}}}// sign_m_321 阻塞实现 (实现 ManagedBlocker 方法)@Overridepublic final boolean block() {...while (!isReleasable()) {...else if (waiter == null)waiter = Thread.currentThread();    // 记录当前线程...elseLockSupport.park(this); // 阻塞}...}// 在订阅者获取元素时,进行通知,继 sign_m_226final int takeItems(Subscriber<? super T> s, long d, int h) {...if (waiting != 0)signalWaiter(); // 通知发布者,ref: sign_m_325...}// sign_m_325 通知发布者final void signalWaiter() {Thread w;waiting = 0;if ((w = waiter) != null)LockSupport.unpark(w);  // 唤醒发布者线程}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/42025.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

自动化鼠标和键盘操作的软件,KeymouseGo是一款功能强大、易于使用的自动化工具,它能够帮助我们从重复性工作中解放出来.

重复性工作常常让我们的双手疲惫不堪。幸运的是&#xff0c;技术的进步为我们提供了自动化解决方案。今天&#xff0c;我们将探讨一款名为KeymouseGo的开源免费软件&#xff0c;它能够记录并自动执行鼠标和键盘操作&#xff0c;从而帮助我们从单调重复的任务中解放出来。 软件…

贪心算法-以学籍管理系统为例

1.贪心算法介绍 1.算法思路 贪心算法的基本思路是从问题的某一个初始解出发一步一步地进行&#xff0c;根据某个优化测度&#xff0c;每一 步都要确保能获得局部最优解。每一步只考虑一 个数据&#xff0c;其选取应该满足局部优化的条件。若下 一个数据和部分最优解连在一起…

单选多选提交问卷,代码示例

&#xff45;&#xff4c;&#xff45;&#xff4d;&#xff45;&#xff4e;&#xff54;中 需要对接口返回的数据进行分析。多选问题使用checkbox&#xff0c;单选题使用radio。 多选时可以绑定&#xff4d;&#xff49;&#xff4e;&#xff0f;&#xff4d;&#xff41;&am…

CS61B Data Structure-Jonathan Lecture2 using objects - OBJECTS METHODS

Recall String s1; // Step 1: declare a String variable s1 new String(); // Step 2: assign it a value, a new empty string objectString s2 new String(); // 1&2 combined今日知识点 situation: pointing to the same object s1 "Yow!";s2 s1; //…

onclick和@click有什么区别,究竟哪个更好使?

哈喽小伙伴们大家好,我是爱学英语的程序员,今天来给大家分享一些关于vue中事件绑定相关的内容,希望对大家有所帮助. 场景是这样的:我要实现一个切换栏,默认激活的是第一个标签,当鼠标移动到第二个标签是,对应的内容让激活.起初,我第一时间想到的是用element plus的组件来实现这…

[leetcode hot 150]第一百一十七题,填充每个节点的下一个右侧节点

题目&#xff1a; 给定一个二叉树&#xff1a; struct Node {int val;Node *left;Node *right;Node *next; } 填充它的每个 next 指针&#xff0c;让这个指针指向其下一个右侧节点。如果找不到下一个右侧节点&#xff0c;则将 next 指针设置为 NULL 。 初始状态下&#x…

NVIDIA的vGPU技术或AMD的MxGPU技术

目录 将物理GPU资源切分为多个虚拟GPU(vGPU) 实现步骤 技术示例 优点与挑战 结论 NVIDIA的vGPU技术或AMD的MxGPU技术 NVIDIA的vGPU技术 AMD的MxGPU技术 将物理GPU资源切分为多个虚拟GPU(vGPU) 将物理GPU资源切分为多个虚拟GPU(vGPU)主要依赖于GPU虚拟化技术。这种…

pytorch LLM训练过程中的精度调试实践

pytorch LLM训练过程中的精度调试实践 1.查看权值的最大,最小值2.检测训练过程中的异常值A.通过hook module,检测异常值B.拦截算子,检测异常值,打印调用栈,保存输入参数,方便复现C.拦截算子,同时执行cpu计算,对比误差,找到第一个精度异常的算子D.以上的代码 3.根据上面dump的数…

dreamerV3 控制人形机器人行走举例

DreamerV3模型 DreamerV3 是一种先进的强化学习算法,它结合了模型预测控制(MPC)和深度学习,能够在复杂环境中实现高效的学习和控制。DreamerV3 通过构建环境的动态模型并使用该模型进行多步预测和优化,来学习复杂任务如人形机器人行走。 DreamerV3 原理简介 DreamerV3 …

flutter背景贴图的困难总结

需求&#xff1a;一张前景图&#xff0c;一张背景图。背景图可以放大缩小&#xff0c;可以平移。 复盘一下整个烦闷之旅。 困难一&#xff0c;保存成文件。 遇到了几个十分难受的问题。 现在回看是很简单&#xff0c;代码也没几行&#xff0c;可中间的思考过程是十分痛苦的&a…

FPGA_HDLBits:2.2Vectors2.3ModulesHierarchy

FPGA_HDLBits:2.2Vectors&2.3ModulesHierarchy 说明:仅对自己做的HDL Bits中的2.2-2.3章节题目的错误部分做的记录&#xff0c;正确的也就没有记录&#xff0c;可以理解为个人的错题本 对于reg [15:0]input input[0:7]是调用低位而不是取最高位&#xff0c;而且调的是最低…

SpringSecurity6.x使用教程

SpringSecurity6.x使用 SpringSecurity版本 SpringSecurity目前支持的版本如下图所示&#xff0c;可以看到5.x的版本过几年就不会再维护了&#xff0c;6.x将成为主流。 入门 引入依赖 <dependency><groupId>org.springframework.boot</groupId><arti…

CMS Made Simple v2.2.15 远程命令执行漏洞(CVE-2022-23906)

前言 CVE-2022-23906 是一个远程命令执行&#xff08;RCE&#xff09;漏洞&#xff0c;存在于 CMS Made Simple v2.2.15 中。该漏洞通过上传头像功能进行利用&#xff0c;攻击者可以上传一个经过特殊构造的图片文件来触发漏洞。 漏洞详情 CMS Made Simple v2.2.15 中的头像上…

【C++/STL】优先级队列的介绍与模拟实现仿函数

✨ 万物与我皆是自由诗 &#x1f30f; &#x1f4c3;个人主页&#xff1a;island1314 &#x1f525;个人专栏&#xff1a;C学习 &#x1f680; 欢迎关注&#xff1a;&#x1f44d;点赞 &#x1f442;&#x1…

关于string的‘\0‘与string,vector构造特点加部分特别知识点的讨论

目录 前言&#xff1a; 问题一&#xff1a;关于string的\0问题讨论 问题二&#xff1a;C标准库中的string内存是分配在堆上面吗&#xff1f; 问题三&#xff1a;string与vector的capacity大小设计的特点 问题四&#xff1a;string的流提取问题 问题五&#xff1a;迭代器失…

unity 使用UnityWebRequest从服务器下载

IEnumerator WinFile(string url){//连接urlusing(UnityWebRequest uwr UnityWebRequest.Get(url)){//等待下载yield return uwr.SendWebRequest();//判断是否连接失败以及是否返回一个错误状态码if (uwr.result UnityWebRequest.Result.ConnectionError || uwr.result Unit…

04.ffmpeg打印音视频媒体信息

目录 1、相关头文件 2、相关结构体 3、相关函数 4、函数详解 5、源码附上 1、相关头文件 #include <libavformat/avformat.h> 包含格式相关的函数和数据结构 #include <libavutil/avutil.h> 包含一些通用实用函数 2、相关结构体 AV…

【PWN · ret2syscall | GoPwn】[2024CISCN · 华中赛区]go_note

一道GoPwn&#xff0c;此外便是ret2syscall的利用。然而过程有不小的曲折&#xff0c;参考 返璞归真 师傅的wp&#xff0c;堪堪完成了复现。复现过程中&#xff0c;师傅也灰常热情回答我菜菜的疑问&#xff0c;感谢&#xff01;2024全国大学生信息安全竞赛&#xff08;ciscn&am…

RabbitMQ快速入门 - 图像化界面的简单操作

目录 1、RabbitMQ的安装 2、RabbitMQ基本介绍 3、简单案例 4、数据隔离 1、RabbitMQ的安装 官网链接&#xff1a;rabbitmq官网 &#xff08;官网很详细&#xff0c;也可以在官网学习啦~&#xff09; 基础入门&#xff1a;自主学习&#xff1a;最新版本&#xff1a;安装我…

缓存-缓存的使用与基本详解

1.缓存使用 为了系统性能的提升&#xff0c;我们一般都会将部分数据放入缓存中&#xff0c;加速访问。而db承担数据落盘工作。 哪些数据适合放入缓存&#xff1f; 即时性、数据一致性要求不高的访问量大且更新频率不高的数据&#xff08;读多&#xff0c;写少&#xff09; …