spring响应式编程系列:异步生产数据

目录

示例

大致流程

create

new MonoCreate

subscribe

new LambdaMonoSubscriber

monoCreate.subscribe

accept

success

onNext

时序图

类图

数据发布者

MonoCreate

数据订阅者

LambdaMonoSubscriber

订阅的消息体

DefaultMonoSink

        本篇文章我们来研究如何将现有异步 API(如回调式接口)适配到 Reactor 的响应式流中。

        默认情况下,Mono.create的代码块执行在订阅时的线程上,但如果在该代码块中启动其他线程或使用异步API,那么数据生产就会变成异步的。示例如下所示:

示例

Mono<String> mono = Mono.create(sink -> {
    // 模拟一个异步API操作
    new Thread(() -> {
        try {
            Thread.sleep(1000); // 模拟耗时操作
            log.info("success");
            sink.success("Hello, World!"); // 成功时发射数据
        } catch (InterruptedException e) {
            sink.error(e); // 发生错误时发射错误信号
        }
    }).start();
});
log.info("main start");
mono.subscribe(x -> log.info("main finish"));
Thread.sleep(5000);

        在这里,通过Mono.create模拟一个异步API操作,API操作成功后,调用sink.success("Hello, World!")进行数据发布者发送数据,从而触发数据的订阅。

        接下来,让我们一起看看程序的流程是怎么处理的。

        点击create()方法,如下所示:

大致流程

create

public static <T> Mono<T> create(Consumer<MonoSink<T>> callback) {
    return onAssembly(new MonoCreate<>(callback));
}

        在这里,new一个MonoCreate对象并返回。

        点击MonoCreate,如下所示:

new MonoCreate

final class MonoCreate<T> extends Mono<T> implements SourceProducer<T> {
   static final Disposable TERMINATED = OperatorDisposables.DISPOSED;
   static final Disposable CANCELLED = Disposables.disposed();
   final Consumer<MonoSink<T>> callback;
   MonoCreate(Consumer<MonoSink<T>> callback) {
      this.callback = callback;
   }

        在这里,将create()方法的回调接口参数赋值给callback属性。因此,Mono.create的参数就作为数据发布者的一个属性信息了。

        点击示例里的mono.subscribe(),如下所示:

subscribe

public final Disposable subscribe(
      @Nullable Consumer<? super T> consumer,
      @Nullable Consumer<? super Throwable> errorConsumer,
      @Nullable Runnable completeConsumer,
      @Nullable Context initialContext) {
   return subscribeWith(new LambdaMonoSubscriber<>(consumer, errorConsumer,
         completeConsumer, null, initialContext));
}

        在这里,new一个LambdaMonoSubscriber对象,如下所示:

new LambdaMonoSubscriber

LambdaMonoSubscriber(@Nullable Consumer<? super T> consumer,
      @Nullable Consumer<? super Throwable> errorConsumer,
      @Nullable Runnable completeConsumer,
      @Nullable Consumer<? super Subscription> subscriptionConsumer,
      @Nullable Context initialContext) {
   this.consumer = consumer;
   this.errorConsumer = errorConsumer;
   this.completeConsumer = completeConsumer;
   this.subscriptionConsumer = subscriptionConsumer;
   this.initialContext = initialContext == null ? Context.empty() : initialContext;
}

        在这里,将subscribe的回调接口参数赋值给consumer 属性,因此,mono.subscribe的参数就作为数据消费者的属性了。

        点击上一步的subscribeWith()方法,如下所示:

monoCreate.subscribe

@Override
public void subscribe(CoreSubscriber<? super T> actual) {
   DefaultMonoSink<T> emitter = new DefaultMonoSink<>(actual);
   actual.onSubscribe(emitter);
   try {
      callback.accept(emitter);
   }
   catch (Throwable ex) {
      emitter.error(Operators.onOperatorError(ex, actual.currentContext()));
   }
}

        在这里,首先调用了数据消费者的onSubscribe()方法,这个与《spring响应式编程系列:总体流程》一样。

        另外,调用了callback.accept()方法,也就是Mono.create()的回调接口参数。

accept

Mono<String> mono = Mono.create(sink -> {
    // 模拟一个异步操作
    new Thread(() -> {
        try {
            Thread.sleep(1000); // 模拟耗时操作
            log.info("success");
            sink.success("Hello, World!"); // 成功时发射数据
        } catch (InterruptedException e) {
            sink.error(e); // 发生错误时发射错误信号
        }
    }).start();
});

        在这里,模拟了耗时操作,然后调用sink.success()方法。

      通常,可以将sink对象保存在线程共享环境里,等其它的业务操作执行完成后,再调用sink.success()方法,即可发射数据发布者数据,从而触发消费者订阅。

        点击sink.success(),如下所示:

​​​​​​​success

public void success(@Nullable T value) {

... ...
     for (; ; ) {
      int s = state;
      if (s == HAS_REQUEST_HAS_VALUE || s == NO_REQUEST_HAS_VALUE) {
         Operators.onNextDropped(value, actual.currentContext());
         return;
      }
      if (s == HAS_REQUEST_NO_VALUE) {
         if (STATE.compareAndSet(this, s, HAS_REQUEST_HAS_VALUE)) {
            try {
               actual.onNext(value);
               actual.onComplete();
            }
            catch (Throwable t) {
               actual.onError(t);
            }
            finally {
               disposeResource(false);
            }
         } else {
            Operators.onNextDropped(value, actual.currentContext());
         }
         return;
      }
      ... ...
   }
}

        在这里,调用了数据订阅者的onNext()方法,如下所示:

​​​​​​​onNext

public final void onNext(T x) {
   Subscription s = S.getAndSet(this, Operators.cancelledSubscription());
   if (s == Operators.cancelledSubscription()) {
      Operators.onNextDropped(x, this.initialContext);
      return;
   }
   if (consumer != null) {
      try {
         consumer.accept(x);
      }
      catch (Throwable t) {
         Exceptions.throwIfFatal(t);
         s.cancel();
         doError(t);
      }
   }
   if (completeConsumer != null) {
      try {
         completeConsumer.run();
      }
      catch (Throwable t) {
         Operators.onErrorDropped(t, this.initialContext);
      }
   }
}

时序图

  1. 类关系的设计,与《spring响应式编程系列:总体流程》类似,主要包括数据发布者对象、数据订阅者对象及订阅的消息体对象;
  2. Mono和MonoCreate是数据发布者,LambdaMonoSubscriber是数据订阅者,DefaultMonoSink是订阅的消息体;
  3. 不同点在于,DefaultMonoSink可以通过示例里的Mono.create暴露给业务侧,业务侧的相关业务执行完成之后,可以通过调用该对象success方法,来触发订阅者的回调函数。

​​​​​​​类图

数据发布者

MonoCreate

        MonoCreate与《spring响应式编程系列:总体流程》介绍的类似,都是继承于Mono类,并且实现了CorePublisher和Publisher接口。

        不同点在于,该数据发布者多了一个属性,如下所示:

        final Consumer<MonoSink<T>> callback;

        该属性是一个可以接收所订阅消息体(类型为MonoSink<T>)参数的回调函数,在这里可以将该消息体与对应的业务建立绑定关系,为后续业务执行结束后的回调做准备。

数据订阅者

LambdaMonoSubscriber

        LambdaMonoSubscriber与《spring响应式编程系列:总体流程》介绍的一样。

订阅的消息体

DefaultMonoSink

        DefaultMonoSink与《spring响应式编程系列:总体流程​​​​​​​》介绍的类似,都实现了Subscription接口。

        不同点在于,DefaultMonoSink实现了MonoSink接口,该接口提供了供业务侧调用 的接口方法,如下所示:

void success(@Nullable T value);

        业务侧的相关业务执行完成之后,可以通过调用该接口方法,来触发订阅者的回调函数。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/80610.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MCP Python SDK构建的**SQLite浏览器**的完整操作指南

以下是使用MCP Python SDK构建的SQLite浏览器的完整操作指南&#xff1a; 一、环境准备 安装依赖 # 安装MCP SDK及SQLite支持 pip install mcp sqlite3创建测试数据库 sqlite3 test.db <<EOF CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, email TEXT); IN…

【Python爬虫基础篇】--3.cookie和session

目录 1.cookie 1.1.定义 1.2.参数 1.3.分类 2.session 3.使用cookie登录微博 4.使用session登录 1.cookie 由于http是一个无状态的协议&#xff0c;请求与请求之间无法相互传递或者记录一些信息&#xff0c;cookie和session正是为了解决这个问题而产生。 例子&#xff1…

风车邮箱系统详细使用指南:Windows与Ubuntu双平台解析

风车邮箱系统V1.2使用手册 风车邮箱系统详细使用指南&#xff1a;Windows与Ubuntu双平台解析 前言 在日常网络活动中&#xff0c;我们经常需要一个临时邮箱来注册各类网站或接收验证码&#xff0c;但不想使用自己的真实邮箱。「风车无线邮箱系统」作为一款优秀的临时邮箱工具…

同样的接口用postman/apifox能跑通,用jmeter跑就报错500

之前没用过jmeter,第一次用调试压测脚本遇到了问题 一样的接口用postman能跑通&#xff0c;用jmeter跑就报错500&#xff0c;百度很多文章都说是该接口需要加一个‘内容编码’改成utf-8,我加了还是不行 后来我就想到apifox好像有隐藏的header&#xff0c;然后开始比较apifox的…

1656打印路径-Floyd回溯/图论-链表/数据结构

蓝桥账户中心 1.税收&#xff1a; “城市的税收”&#xff1a;所以是中介点的税收&#xff0c;经过该点后加上 2.路径&#xff1a; 用数组存储前驱节点从而串成链表 pre[ i ][ j ]代表的是从 i 到 j 的最短路径上 j 的前驱节点是什么 那么便可以pre[ i ][ j ]k 把k加入pa…

Eigen矩阵操作类 (Map, Block, 视图类)

1. Map 类&#xff1a;内存映射&#xff08;零拷贝操作&#xff09; 核心功能 将现有的 C/C 数组或缓冲区映射为 Eigen 矩阵/向量&#xff0c;不复制数据&#xff0c;直接操作原内存。 模板参数 cpp Map<Matrix<Scalar, Rows, Cols, Options, MaxRows, MaxCols>&…

多系统安装经验,移动硬盘,ubuntu grub修改/etc/fstab 移动硬盘需要改成nfts格式才能放steam游戏

总结&#xff1a;我硬盘会自动挂载&#xff0c;直接格式化nfts&#xff0c;steam就能装里面了 机械硬盘装系统真的不行&#xff0c;超级慢游戏还跑不了 --------------------------------------------------------------------底下都不用看 笔记本一个系统&#xff0c;移动硬盘…

JFLAP SOFTWARE 编译原理用(自动机绘图)

csdn全是蛆虫&#xff0c;2mb的软件&#xff0c;都在那里搞收费&#xff0c;我就看不惯&#xff0c;我就放出来&#xff0c;那咋了&#xff01;&#xff01;&#xff01; https://pan.baidu.com/s/1IuEfHScynjCCUF5ScF26KA 通过网盘分享的文件&#xff1a;JFLAP7.1.jar 链接: h…

[Windows] Disk Sorter文件分类管理软件 v16.7.18

[Windows] Disk Sorter文件分类管理 链接&#xff1a;https://pan.xunlei.com/s/VOOl0sDntAdHvlMkc7N0ZOD-A1?pwd966n# Disk Sorter是一个功能强大的文件分类管理软件&#xff0c;允许对本地磁盘、网络共享、NAS设备和企业存储系统中的文件进行分类&#xff0c;并且支持生成…

STM32提高篇: 蓝牙通讯

STM32提高篇: 蓝牙通讯 一.蓝牙通讯介绍1.蓝牙技术类型 二.蓝牙协议栈1.蓝牙芯片架构2.BLE低功耗蓝牙协议栈框架 三.ESP32-C3中的蓝牙功能1.广播2.扫描3.通讯 四.发送和接收 一.蓝牙通讯介绍 蓝牙&#xff0c;是一种利用低功率无线电&#xff0c;支持设备短距离通信的无线电技…

6.1.多级缓存架构

目录 一、多级缓存基础与核心概念 缓存的定义与价值 • 缓存的应用场景&#xff08;高并发、低延迟、减轻数据库压力&#xff09; • 多级缓存 vs 单级缓存的优劣对比 多级缓存核心组件 • 本地缓存&#xff08;Caffeine、Guava Cache&#xff09; • 分布式缓存&#xff08;…

MySQL的MVCC【学习笔记】

MVCC 事务的隔离级别分为四种&#xff0c;其中Read Committed和Repeatable Read隔离级别&#xff0c;部分实现就是通过MVCC&#xff08;Multi-Version Concurrency Control&#xff0c;多版本并发控制&#xff09; 版本链 版本链是通过undo日志实现的&#xff0c; 事务每次修改…

基于OpenMV+STM32+OLED与YOLOv11+PaddleOCR的嵌入式车牌识别系统开发笔记

基于OpenMV、STM32与OLED的嵌入式车牌识别系统开发笔记 基于OpenMV、STM32与OLED的嵌入式车牌识别系统开发笔记系统架构全景 一、实物演示二、OpenMV端设计要点1. 硬件配置优化2. 智能帧率控制算法3. 数据传输协议设计 三、PyTorch后端核心实现&#xff1a;YOLOv11与PaddleOCR的…

C#中常见的设计模式

文章目录 引言设计模式的分类创建型模式 (Creational Patterns)1. 单例模式 (Singleton)2. 工厂方法模式 (Factory Method)3. 抽象工厂模式 (Abstract Factory)4. 建造者模式 (Builder) 结构型模式 (Structural Patterns)5. 适配器模式 (Adapter)6. 装饰器模式 (Decorator)7. 外…

Nacos简介—3.Nacos的配置简介

大纲 1.Nacos生产集群Web端口与数据库配置 2.Nacos生产集群的Distro协议核心参数 3.Nacos打通CMDB实现跨机房的就近访问 4.Nacos基于SPI动态扩展机制来获取CMDB的数据 5.基于Nacos SPI机制开发CMDB动态扩展 6.Nacos基于CMDB来实现多机房就近访问 7.Nacos生产集群Prometh…

Jest 快照测试

以下是关于 Jest 快照测试的系统化知识总结,从基础使用到底层原理全面覆盖: 一、快照测试核心原理 1. 工作机制三阶段 #mermaid-svg-GC46t2NBvGv7RF0M {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-GC46t2NBvGv…

第十六届蓝桥杯大赛软件赛省赛 C/C++ 大学B组 [京津冀]

由于官方没有公布题目的数据, 所以代码仅供参考 1. 密密摆放 题目链接&#xff1a;P12337 [蓝桥杯 2025 省 AB/Python B 第二场] 密密摆放 - 洛谷 题目描述 小蓝有一个大箱子&#xff0c;内部的长宽高分别是 200、250、240&#xff08;单位&#xff1a;毫米&#xff09;&…

Spring 学习笔记之 @Transactional 异常不回滚汇总

使用springboot时&#xff0c;只要引入spring-jdbc/jpa相关的依赖后&#xff0c;在想要启用事务的方法上加上Transactional注解就能开启事务&#xff0c;碰到异常就能自动回滚。大大的提高了编码的便捷性性&#xff0c;同时也不侵入代码&#xff0c;保持了代码的简洁性。 默认情…

React 与 Vue 虚拟 DOM 实现原理深度对比:从理论到实践

在现代前端开发中&#xff0c;React 和 Vue 作为最流行的两大框架&#xff0c;都采用了虚拟 DOM&#xff08;Virtual DOM&#xff09; 技术来优化渲染性能。虚拟 DOM 的核心思想是通过 JavaScript 对象模拟真实 DOM&#xff0c;减少直接操作 DOM 的开销&#xff0c;从而提高页面…

WordPress AI 原创文章自动生成插件 24小时全自动生成SEO原创文章 | 多语言支持 | 智能配图与排版

为什么选择Linkreate AI内容生成插件&#xff1f; ✓ 全自动化工作流程 - 从关键词挖掘到文章发布一站式完成 ✓ 多语言支持 - 轻松覆盖全球市场&#xff08;中/英等多语种&#xff09; ✓ 智能SEO优化 - 自动生成搜索引擎友好的内容结构 ✓ AI智能配图 - 每篇文章自动匹配高质…