spring响应式编程系列:总体流程

目录

示例

程序流程

just

subscribe

new LambdaMonoSubscriber

​​​​​​​MonoJust.subscribe

​​​​​​​new Operators.ScalarSubscription  

​​​​​​​onSubscribe

​​​​​​​request

​​​​​​​onNext

时序图

类图

数据发布者

MonoJust

数据订阅者

LambdaSubscriber

订阅的消息体

ScalarSubscription


       

        想要了解响应式编程的总体流程,只要做到真正吃透一个简单的示例即可。

        如下所示:

示例

        首先,通过调用Mono.just创建一个单元素的数据发布者(Publisher);

        然后,通过调用mono.subscribe订阅数据发布者(Publisher)发布的数据。

        如下所示:

// 创建一个包含数据的 Mono
Mono<String> mono = Mono.just("Hello, Reactive World!");
// 订阅并消费 Mono
mono.subscribe(System.out::println);

程序流程

        点击Mono.just,如下所示:

​​​​​​​just

public static <T> Mono<T> just(T data) {

        return onAssembly(new MonoJust(data));

    }

        在这里,直接new一个MonoJust对象并返回。

        点击示例里的mono.subscribe,如下所示:

subscribe

public abstract class Mono<T> implements CorePublisher<T> {

    ... ...

    public final Disposable subscribe(@Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer, @Nullable Context initialContext) {

        return (Disposable)this.subscribeWith(new LambdaMonoSubscriber(consumer, errorConsumer, completeConsumer, (Consumer)null, initialContext));

    }

      在这里,将示例里subscribe的参数作为LambdaMonoSubscriber的构造参数,然后new一个LambdaMonoSubscriber对象。

        LambdaMonoSubscriber对象的初始化参数,如下所示:

​​​​​​​new LambdaMonoSubscriber

final class LambdaMonoSubscriber<T> implements InnerConsumer<T>, Disposable {

    final Consumer<? super T> consumer;

    final Consumer<? super Throwable> errorConsumer;

    final Runnable completeConsumer;

    final Consumer<? super Subscription> subscriptionConsumer;

    final Context initialContext;

    volatile Subscription subscription;

    ... ...

    LambdaMonoSubscriber(@Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer, @Nullable Consumer<? super Subscription> subscriptionConsumer, @Nullable Context initialContext) {

        this.consumer = consumer;

        this.errorConsumer = errorConsumer;

        this.completeConsumer = completeConsumer;

        this.subscriptionConsumer = subscriptionConsumer;

        this.initialContext = initialContext == null ? Context.empty() : initialContext;

    }

​​​​​​​MonoJust.subscribe

final class MonoJust<T> extends Mono<T> implements ScalarCallable<T>, Fuseable, SourceProducer<T> {

    ... ...

public void subscribe(CoreSubscriber<? super T> actual) {

        actual.onSubscribe(Operators.scalarSubscription(actual, this.value));

    }

       在这里,来到了MonoJust对象的subscribe方法,该方法调用了LambdaMonoSubscriber对象的onSubscribe方法;

        同时,new一个Operators.ScalarSubscription对象,该对象封装了LambdaMonoSubscriber对象和数据发布者MonoJust发布的数据。

        如下所示:

​​​​​​​new Operators.ScalarSubscription  

public static <T> Subscription scalarSubscription(CoreSubscriber<? super T> subscriber, T value, String stepName) {

        return new Operators.ScalarSubscription(subscriber, value, stepName);

    }

        点击actual.onSubscribe,如下所示:

​​​​​​​onSubscribe

final class LambdaMonoSubscriber<T> implements InnerConsumer<T>, Disposable {

    ... ...

    public final void onSubscribe(Subscription s) {

        if (Operators.validate(this.subscription, s)) {

            this.subscription = s;

            if (this.subscriptionConsumer != null) {

                try {

                    this.subscriptionConsumer.accept(s);

                } catch (Throwable var3) {

                    Exceptions.throwIfFatal(var3);

                    s.cancel();

                    this.onError(var3);

                }

            } else {

                s.request(9223372036854775807L);

            }

        }

    }

      在这里,LambdaMonoSubscriber对象调用了Operators.ScalarSubscription对象的request方法。

        如下所示:

​​​​​​​request

static final class ScalarSubscription<T> implements SynchronousSubscription<T>, InnerProducer<T> {

public void request(long n) {

            if (Operators.validate(n) && ONCE.compareAndSet(this, 0, 1)) {

                Subscriber<? super T> a = this.actual;

                a.onNext(this.value);

                if (this.once != 2) {

                    a.onComplete();

                }

            }

        }

        在这里,Operators.ScalarSubscription对象又调用了LambdaMonoSubscriber对象的onNext方法。

        LambdaMonoSubscriber对象的onNext方法如下所示:

​​​​​​​onNext

final class LambdaMonoSubscriber<T> implements InnerConsumer<T>, Disposable {

public final void onNext(T x) {

        Subscription s = (Subscription)S.getAndSet(this, Operators.cancelledSubscription());

        if (s == Operators.cancelledSubscription()) {

            Operators.onNextDropped(x, this.initialContext);

        } else {

            if (this.consumer != null) {

                try {

                    this.consumer.accept(x);

                } catch (Throwable var5) {

                    Exceptions.throwIfFatal(var5);

                    s.cancel();

                    this.doError(var5);

                }

            }

            if (this.completeConsumer != null) {

                try {

                    this.completeConsumer.run();

                } catch (Throwable var4) {

                    Operators.onErrorDropped(var4, this.initialContext);

                }

            }

        }

}

        终于,在这里,调用了示例里subscribe()方法的回调函数了。

时序图

【说明】

  1. Mono和MonoJust是数据发布者,LambdaMonoSubscriber是数据消费者,ScalarSubscription是订阅的消息;
  2. 类的设计还是比较清晰的,就是方法的调用显示有点绕。
  3. 数据发布者,提供了just方法来生成数据发布者(Publisher);
  4. 数据订阅者,提供了onSubscribe和onNext方法来响应订阅事件和读取数据;
  5. 订阅的消息体,封装了数据订阅者和数据发布发布的数据,并且提供了request方法用来处理数据。
  6. 使用了观察者设计模式:LambdaMonoSubscriber是观察者模式中的观察者(Observer),它订阅(subscribe)一个发布者(MonoJust),MonoJust是观察者模式中的主题(Subject),它负责通知所有的 Subscriber。

类图

数据发布者

MonoJust

【说明】

  • Publisher

    定义了接口:void subscribe(Subscriber<? super T> var1)。

  • CorePublisher

    定义了接口:void subscribe(CoreSubscriber<? super T> subscriber)。

  • Mono

    是一个抽象类,实现了数据发布者通用的各种功能。

比如:使用了工厂方法设计模式来创建诸如MonoJust、MonoCreate、MonoDefer、MonoError等各种具体的数据发布者。

  • MonoJust

    是一个特定的数据发布者(Publisher),实现了接口void subscribe(CoreSubscriber<? super T> actual)。

数据订阅者

LambdaSubscriber

【说明】

  • Subscriber

    定义了如下接口:onSubscribe、onNext、onError、onComplete。

  • CoreSubscriber

    定义了如下接口:onSubscribe

  • LambdaMonoSubscriber

    关联了consumer、errorConsumer、completeConsumer、subscriptionConsumer这些对象,以完成订阅相关的各种操作。

订阅的消息体

ScalarSubscription

【说明】

  • Subscription

    提供了如下接口:void request(long var1)、void cancel();

  • ScalarSubscription

    封装了数据订阅者和数据发布者发布的数据。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/79447.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于slimBOXtv 9.16 V2-晶晨S905L3A/ S905L3AB-Mod ATV-Android9.0-线刷通刷固件包

基于slimBOXtv 9.16 V2-晶晨S905L3A&#xff0f; S905L3AB-Mod ATV-Android9.0-线刷通刷固件包&#xff0c;基于SlimBOXtv 9 修改而来&#xff0c;贴近于原生ATV&#xff0c;仅支持晶晨S905L3A&#xff0f; S905L3AB芯片刷机。 适用型号&#xff1a;M401A、CM311-1a、CM311-1s…

使用droidrun库实现AI控制安卓手机

使用droidrun库实现AI控制安卓手机 介绍 DroidRun 是一个框架&#xff0c;通过LLM代理控制 Android 设备。它允许您使用自然语言命令自动化 Android 设备交互。 安装环境 安装源码依赖 git clone https://github.com/droidrun/droidrun.git cd droidrun conda create --nam…

知识库建设全流程指南(AI时代优化版)

知识库建设全流程指南&#xff08;AI时代优化版&#xff09; ​​一、知识库建设的战略定位​​ ​​核心价值锚点​​ ​​AI时代基建​​&#xff1a;知识库是GEO优化的核心载体&#xff0c;决定内容被AI引用的概率权重​​动态护城河​​&#xff1a;结构化知识体系可抵御算…

2025年03月中国电子学会青少年软件编程(Python)等级考试试卷(五级)真题

青少年软件编程&#xff08;Python&#xff09;等级考试试卷&#xff08;五级&#xff09; 分数&#xff1a;100 题数&#xff1a;38 答案解析&#xff1a;https://blog.csdn.net/qq_33897084/article/details/147341437 一、单选题(共25题&#xff0c;共50分) 1. 以下哪个选…

基于RRT的优化器:一种基于快速探索随机树算法的新型元启发式算法

受机器人路径规划中常用的快速探索随机树&#xff08;RRT&#xff09;算法的搜索机制的启发&#xff0c;我们提出了一种新颖的元启发式算法&#xff0c;称为基于RRT的优化器&#xff08;RRTO&#xff09;。这是首次将RRT算法的概念与元启发式算法相结合。RRTO的关键创新是其三种…

进阶篇|CAN FD 与性能优化

引言 1. CAN vs. CAN FD 对比 2. CAN FD 帧结构详解

【随身WiFi】随身WiFi Debian系统优化教程

0.操作前必看 本教程基于Debian系统进行优化&#xff0c;有些操作对随身WiFi来说可能会带来负优化&#xff0c;根据需要选择。 所有操作需要在root用户环境下运行&#xff0c;否则都要加sudo 随身wifi Debian系统&#xff0c;可以去某安的随声WiFi模块自行搜索刷机 点赞&am…

【Pandas】pandas DataFrame where

Pandas2.2 DataFrame Indexing, iteration 方法描述DataFrame.head([n])用于返回 DataFrame 的前几行DataFrame.at快速访问和修改 DataFrame 中单个值的方法DataFrame.iat快速访问和修改 DataFrame 中单个值的方法DataFrame.loc用于基于标签&#xff08;行标签和列标签&#…

C++代码优化

前段时间写了一些代码&#xff0c;但是在运算过程中发现有些代码可以进行改进以提高运行效率&#xff0c;尤其是与PCL相关的部分&#xff0c;可以进行大幅度提高&#xff0e;特意在此进行记录&#xff0c;分享给大家&#xff0c;也供自己查看&#xff0e; pcl::PointCloud< …

RAG-分块策略

分块策略在检索增强生成&#xff08;RAG&#xff09;方法中起着至关重要的作用&#xff0c;它使文档能够被划分为可管理的部分&#xff0c;同时保持上下文。每种方法都有其特定的优势&#xff0c;适用于特定的用例。将大型数据文件拆分为更易于管理的段是提高LLM应用效率的最关…

Linux网络编程 深入解析TFTP协议:基于UDP的文件传输实战

知识点1【TFTP的概述】 学习通信的基本&#xff1a;通信协议&#xff08;具体发送上面样的报文&#xff09;、通信流程&#xff08;按照什么步骤发送&#xff09; 1、TFTP的概述 tftp&#xff1a;简单文件传输协议&#xff0c;**基于UDP&#xff0c;**不进行用户有效性验证 …

「数据可视化 D3系列」入门第十一章:力导向图深度解析与实现

D3.js 力导向图深度解析与实现 力导向图核心概念 力导向图是一种通过物理模拟来展示复杂关系网络的图表类型&#xff0c;特别适合表现社交网络、知识图谱、系统拓扑等关系型数据。其核心原理是通过模拟粒子间的物理作用力&#xff08;电荷斥力、弹簧引力等&#xff09;自动计…

音频格式转换

1. 下载ffmpeg https://www.gyan.dev/ffmpeg/builds/packages/ffmpeg-7.1.1-full_build.7z 2. 配置ffmpeg环境变量 3.安装pydub pip install pydub 4.编写转化工具代码 from pydub import AudioSegment def convertM4aToWav(m4a,wav):sound AudioSegment.from_file(m4a, f…

基于spring boot 集成 deepseek 流式输出 的vue3使用指南

本文使用deepseek API接口流式输出的文章。 环境要求 jdk17 spring boot 3.4 代码如下: package com.example.controller;import jakarta.annotation.PostConstruct; import org.springframework.ai.chat.messages.AssistantMessage; import org.springframework.ai.chat.mes…

微博辐射源和干扰机

微波辐射源和干扰机是电子战和通信领域中的两个重要概念&#xff0c;它们在军事、民用及科研中具有广泛应用。以下是两者的详细解析及其相互关系&#xff1a; ‌1. 微波辐射源‌ ‌定义‌&#xff1a; 微波辐射源是指能够主动发射微波&#xff08;频率范围通常为 ‌300 MHz&…

2025年4月16日华为留学生笔试第三题300分

📌 点击直达笔试专栏 👉《大厂笔试突围》 💻 春秋招笔试突围在线OJ 👉 笔试突围OJ 03. 智慧城市网络优化 问题描述 K小姐是一家智慧城市服务提供商的网络架构师。她负责规划城市边缘计算节点的布局,以提供更快速、稳定的网络服务。 城市内有 n n

多线程编程的简单案例——单例模式[多线程编程篇(3)]

目录 前言 1.wati() 和 notify() wait() 和 notify() 的产生原因 如何使用wait()和notify()? 案例一:单例模式 饿汉式写法: 懒汉式写法 对于它的优化 再次优化 结尾 前言 如何简单的去使用jconsloe 查看线程 (多线程编程篇1)_eclipse查看线程-CSDN博客 浅谈Thread类…

pytorch基本操作2

torch.clamp 主要用于对张量中的元素进行截断&#xff08;clamping&#xff09;&#xff0c;将其限制在一个指定的区间范围内。 函数定义 torch.clamp(input, minNone, maxNone) → Tensor 参数说明 input 类型&#xff1a;Tensor 需要进行截断操作的输入张…

一次制作参考网杂志的阅读书源的实操经验总结(附书源)

文章目录 一、背景介绍二、书源文件三、详解制作书源&#xff08;一&#xff09;打开Web服务&#xff08;二&#xff09;参考网结构解释&#xff08;三&#xff09;阅读书源 基础&#xff08;四&#xff09;阅读书源 发现&#xff08;五&#xff09;阅读书源 详细&#xff08;六…

并发设计模式实战系列(2):领导者/追随者模式

&#x1f31f; ​大家好&#xff0c;我是摘星&#xff01;​ &#x1f31f; 今天为大家带来的是并发设计模式实战系列&#xff0c;第二章领导者/追随者&#xff08;Leader/Followers&#xff09;模式&#xff0c;废话不多说直接开始~ 目录 领导者/追随者&#xff08;Leader/…