初见-响应式编程-002

 🤗 ApiHug × {Postman|Swagger|Api...} = 快↑ 准√ 省↓

  1. GitHub - apihug/apihug.com: All abou the Apihug   
  2. apihug.com: 有爱,有温度,有质量,有信任
  3. ApiHug - API design Copilot - IntelliJ IDEs Plugin | Marketplace

#Reactive

The Reactive Manifestoopen in new window:

Systems built as Reactive Systems are more flexible, loosely-coupled and scalable. This makes them easier to develop and amenable to change. They are significantly more tolerant of failure and when failure does occur they meet it with elegance rather than disaster. Reactive Systems are highly responsive, giving users effective interactive feedback.

  1. flexible,
  2. loosely-coupled
  3. scalable

  1. Responsive, 响应时间, 服务质量
  2. Resilient, 容错, 恢复能力
  3. Elastic, 弹性,动态扩容
  4. Message Driven, 事件驱动,低耦合

#Reactor

Reactor is an implementation of the Reactive Programming paradigm, which can be summed up as follows:

Reactive programming is an asynchronous programming paradigm concerned with data streams and the propagation of change. This means that it becomes possible to express static (e.g. arrays) or dynamic (e.g. event emitters) data streams with ease via the employed programming language(s). — https://en.wikipedia.org/wiki/Reactive_programmingopen in new window

最初 微软创建了 .NET 里的 Reactive Extensions (Rx); RxJava 实现了 JVM 上的 Reactive编程模式, 最终 JAVA 9 融入了 Flowopen in new window -- java.util.concurrent.Flowopen in new window

在 OO 编程里面 reactive 常被当做 Observer 观察者设计模式, 当然你可把 Reactive Stream 和你熟悉的 Iterator 设计模式做对比;两种实现里面都有Iterable-Iterator 两个概念, 主要的不一样在, Iterator 是一种 拉 pull 模式, 而 reactive 是一种push 推模式。

在两个流程中都有 next(), 在 reactive stream 更类似于 Publisher-Subscriber 模式,由Publishr 来控制新到的value 给 Subscriber; push 是 reactive 里面非常重要的一面;

程序实现着注重收到 value 后的计算逻辑 Operation, 而不是整个控制流程。

整个流程里面喂入数据 push 自然是整个响应流程里面最核心的流程, onNext 用来完成此动作, 还包含 错误 onErro() 和最终的结束处理 onComplete(), 整个流程可以被抽象为:

onNext x 0..N [onError | onComplete]

整个流程的处理可以说非常灵活, 可以有 0 个, 1个, N 个, 或者无限多的数据, 比如一个定时器。

回到问题的本质, WHY 我们为什么需要 异步的 reactive 模式呢?

#Asynchronicity 能解决

并行, 多核已经是常态来增加吞吐量和响应时间。

多线程用来最大化利用资源; 但是多线程异步可以解决问题通知, 带来了很大的挑战。 JVM 解决此引入两个概念:

  1. callback, 异步方法用来通知结果, 一般是一个内部类,或者一个 lamdba 表达式
  2. future,异步调用立即返回一个 Future<T>, 但是结果 T 尚不能立即获得, 结果获得后才能通过 poll 获得。 ExecutorService 跑 Callable<T> 时返回 Future。
#Callback地狱

callback 贯穿整个链路的调用过程:

userService.getFavorites(userId, new Callback<List<String>>() { public void onSuccess(List<String> list) { if (list.isEmpty()) { suggestionService.getSuggestions(new Callback<List<Favorite>>() {public void onSuccess(List<Favorite> list) { UiUtils.submitOnUiThread(() -> { list.stream().limit(5).forEach(uiList::show); });}public void onError(Throwable error) { UiUtils.errorPopup(error);}});} else {list.stream() .limit(5).forEach(favId -> favoriteService.getDetails(favId, new Callback<Favorite>() {public void onSuccess(Favorite details) {UiUtils.submitOnUiThread(() -> uiList.show(details));}public void onError(Throwable error) {UiUtils.errorPopup(error);}}));}}public void onError(Throwable error) {UiUtils.errorPopup(error);}
});

如果换成 reactor:

userService.getFavorites(userId) .flatMap(favoriteService::getDetails) .switchIfEmpty(suggestionService.getSuggestions()) .take(5) .publishOn(UiUtils.uiThreadScheduler()) .subscribe(uiList::show, UiUtils::errorPopup); 

reactor 不仅仅让整个流程更精简, 通知提供服务质量控制(类似熔断), 比如我们保证整个服务质量在 800ms内返回,超时后从 fallback cache 或者其他获取:

userService.getFavorites(userId).timeout(Duration.ofMillis(800)) .onErrorResume(cacheService.cachedFavoritesFor(userId)) .flatMap(favoriteService::getDetails) .switchIfEmpty(suggestionService.getSuggestions()).take(5).publishOn(UiUtils.uiThreadScheduler()).subscribe(uiList::show, UiUtils::errorPopup);
#Future

Future 避免了回调地狱, 但是依然不太容易进行组装, 虽然在 Java 8 里面引入了 CompletableFuture, 编制多个 Future 在一起虽然可以操作,但是Future 还是有其他问题:

  1. Future 的 get() 依然是阻塞的
  2. 不支持延迟计算
  3. 缺乏对多结果的支持, 更好的错误处理

这样一个业务场景; 从一个 ID 列表, 去查询他们的name + 统计, 所有的都是异步 CompletableFuture 例子:

CompletableFuture<List<String>> ids = ifhIds(); CompletableFuture<List<String>> result = ids.thenComposeAsync(l -> { Stream<CompletableFuture<String>> zip =l.stream().map(i -> { CompletableFuture<String> nameTask = ifhName(i); CompletableFuture<Integer> statTask = ifhStat(i); return nameTask.thenCombineAsync(statTask, (name, stat) -> "Name " + name + " has stats " + stat); });List<CompletableFuture<String>> combinationList = zip.collect(Collectors.toList()); CompletableFuture<String>[] combinationArray = combinationList.toArray(new CompletableFuture[combinationList.size()]);CompletableFuture<Void> allDone = CompletableFuture.allOf(combinationArray); return allDone.thenApply(v -> combinationList.stream().map(CompletableFuture::join) .collect(Collectors.toList()));});List<String> results = result.join(); 
assertThat(results).contains("Name NameJoe has stats 103","Name NameBart has stats 104","Name NameHenry has stats 105","Name NameNicole has stats 106",

reactor 更紧凑解决方案, 更精炼容易理解:

Flux<String> ids = ifhrIds(); Flux<String> combinations =ids.flatMap(id -> { Mono<String> nameTask = ifhrName(id); Mono<Integer> statTask = ifhrStat(id); return nameTask.zipWith(statTask, (name, stat) -> "Name " + name + " has stats " + stat);});Mono<List<String>> result = combinations.collectList(); List<String> results = result.block(); 
assertThat(results).containsExactly( "Name NameJoe has stats 103","Name NameBart has stats 104","Name NameHenry has stats 105","Name NameNicole has stats 106","Name NameABSLAJNFOAJNFOANFANSF has stats 121"
);

#响应编程

除非上面我们看到, 让编码更清晰直观, reactor 等还在这些方面上花了很多心思:

  1. Composability and readability; 组件化, 可读性好。
  2. Data as a flow manipulated with a rich vocabulary of operators, 数据管道铺好, 自由搭配运算逻辑。
  3. Nothing happens until you subscribe, 延迟计算 subscribe 触发计算。
  4. Backpressure or the ability for the consumer to signal the producer that the rate of emission is too high, 背压控制, consumer 和 producer配合,消费端和生产端。
  5. High level but high value abstraction that is concurrency-agnostic; 润物细无声,抽象的让你感知不到并发。

Composability and Readability 包括可以自由的组建编制任务, 任务之间的依赖关系, 上下关系, 或者同步运行的 fork-join 风格, 高度抽象层使用异步任务。

流水线一样的操作, Reactor 既是传送带也是工作站, 原料从 Publisher 最终成品发送到消费端 Subscriber

Operators, 相当于流水线上的工作站, 整个流水线上就是上一个 Publisher 的产物, 然后包装发送到下一个 Publisher, 最终到一个 Subsccriber 里面。

延迟计算, 在Reactor 里当你写一个 Publisher 链条, 数据默认是不会启动起来, 你只是创建了一个异步处理的抽象流程(Spark 里的RDD, 或者像一个流程的 DSL)。

当你触发 subscribing 时候, 将 Publisher 和一个 Subscriber 绑定, 同事触发数据流, 流入到整个链中, 内部通过 Subscriber 触发一个 request 信号传递到上游, 最终到源的 Publisher

背压, 下游将信号传递给上游是用来实现 backpressure背压的一种方式, 依然用流水线的比方, 当下游的工作站赶不上上游的速度的时候需要反馈一个信号到上游去。

A subscriber can work in unbounded mode and let the source push all the data at its fastest achievable rate or it can use the request mechanism to signal the source that it is ready to process at most n elements.

可以 push-pull 方式混合, 下游容量自由控制上游的推送速度, 或者懒式的拉取。

Cold流和Hot流

  1. Cold流不论订阅者在何时订阅该数据流,总是能收到数据流中产生的全部消息。
  2. Hot流则是在持续不断地产生消息,订阅者只能获取到在其订阅之后产生的消息。

冷例子:

@Testpublic void cold_example() {Flux<String> source =Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple")).map(String::toUpperCase);source.subscribe(d -> System.out.println("Subscriber 1: " + d));source.subscribe(d -> System.out.println("Subscriber 2: " + d));
}

输出结果:

Subscriber 1: BLUE
Subscriber 1: GREEN
Subscriber 1: ORANGE
Subscriber 1: PURPLE
Subscriber 2: BLUE
Subscriber 2: GREEN
Subscriber 2: ORANGE
Subscriber 2: PURPLE

所有的 subscriber 都能得到结果。

热例子:

@Testpublic void hot_example() {Sinks.Many<String> hotSource = Sinks.unsafe().many().multicast().directBestEffort();Flux<String> hotFlux = hotSource.asFlux().map(String::toUpperCase);hotFlux.subscribe(d -> System.out.println("Subscriber 1 to Hot Source: " + d));hotSource.emitNext("blue", FAIL_FAST);hotSource.tryEmitNext("green").orThrow();hotFlux.subscribe(d -> System.out.println("Subscriber 2 to Hot Source: " + d));hotSource.emitNext("orange", FAIL_FAST);hotSource.emitNext("purple", FAIL_FAST);hotSource.emitComplete(FAIL_FAST);}

得到结果:

Subscriber 1 to Hot Source: BLUE
Subscriber 1 to Hot Source: GREEN
Subscriber 1 to Hot Source: ORANGE
Subscriber 2 to Hot Source: ORANGE
Subscriber 1 to Hot Source: PURPLE
Subscriber 2 to Hot Source: PURPLE

#结论

从 reactive 宣言我们看到响应式编程的 '理想', 初探 reactor, 我们看到 reactor 的强大表达力, 这些还只是管中窥豹, 更多的等待我们下面章节去探索和挖掘。

测试项目 Reactor_001_testopen in new window

#参考

  1. The Reactive Manifestoopen in new window
  2. reactive-streams-jvmopen in new window
  3. reactive-streamsopen in new window
  4. java 9 flowopen in new window
  5. Cold流和Hot流open in new window
  6. Flux 详实的流程图open in new window
  7. Mono 详实的流程图

我们

api-hug-contact

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/2516.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

BERT-CRF 微调中文 NER 模型

文章目录 数据集模型定义数据集预处理BIO 标签转换自定义Dataset拆分训练、测试集 训练验证、测试指标计算推理其它相关参数CRF 模块 数据集 CLUE-NER数据集&#xff1a;https://github.com/CLUEbenchmark/CLUENER2020/blob/master/pytorch_version/README.md 模型定义 imp…

【iOS开发】(五)react Native路由和导航20240421-22

【iOS开发】(五)react Native 路由和导航Navigation 20240421 在&#xff08;一&#xff09;&#xff08;二&#xff09;中我们 Reactnative搭建了开发环境、学习了 基础语法、状态管理&#xff0c;JSX、组件、状态和生命周期以及样式布局等。 在&#xff08;三&#xff09;&a…

MATLAB 数据类型

MATLAB 数据类型 MATLAB 不需要任何类型声明或维度语句。每当 MATLAB 遇到一个新的变量名&#xff0c;它就创建变量并分配适当的内存空间。 如果变量已经存在&#xff0c;那么MATLAB将用新内容替换原始内容&#xff0c;并在必要时分配新的存储空间。 例如&#xff0c; Tota…

Vue3中使用无缝滚动插件vue3-seamless-scroll

官网&#xff1a;https://www.npmjs.com/package/vue-seamless-scroll 1、实现效果文字描述&#xff1a; 表格中的列数据进行横向无缝滚动&#xff0c;某一列进行筛选的时候&#xff0c;重新请求后端的数据&#xff0c;进行刷新 2、安装&#xff1a;npm i vue3-seamless-scrol…

小程序 rich-text 解析富文本 图片过大时如何自适应?

在微信小程序中&#xff0c;用rich-text 解析后端返回的数据&#xff0c;当图片尺寸太大时&#xff0c;会溢出屏幕&#xff0c;导致横向出现滚动 查看富文本代码 图片是用 <img 标签&#xff0c;所以写个正则匹配一下图片标签&#xff0c;手动加上样式即可 // content 为后…

Python 面向对象——5.多态

本章学习链接如下&#xff1a; Python 面向对象——1.基本概念 Python 面向对象——2.类与对象实例属性补充解释&#xff0c;self的作用等 Python 面向对象——3.实例方法&#xff0c;类方法与静态方法 Python 面向对象——4.继承 1.基本概念 多态是面向对象编程&#x…

贪吃蛇(C语言版)

在我们学习完C语言 和单链表知识点后 我们开始写个贪吃蛇的代码 目标&#xff1a;使用C语言在Windows环境的控制台模拟实现经典小游戏贪吃蛇 贪吃蛇代码实现的基本功能&#xff1a; 地图的绘制 蛇、食物的创建 蛇的状态&#xff08;正常 撞墙 撞到自己 正常退出&#xf…

Python蜘蛛侠

目录 写在前面 蜘蛛侠 编写代码 代码分析 更多精彩 写在后面 写在前面 本期小编给大家推荐一个酷酷的Python蜘蛛侠&#xff0c;一起来看看叭~ 蜘蛛侠 蜘蛛侠&#xff08;Spider-Man&#xff09;是美国漫威漫画宇宙中的一位标志性人物&#xff0c;由传奇创作者斯坦李与艺…

探索ChatGPT在提高人脸识别与软性生物识准确性的表现与可解释性

概述 从GPT-1到GPT-3&#xff0c;OpenAI的模型不断进步&#xff0c;推动了自然语言处理技术的发展。这些模型在处理语言任务方面展现出了强大的能力&#xff0c;包括文本生成、翻译、问答等。 然而&#xff0c;当涉及到面部识别和生物特征估计等任务时&#xff0c;这些基于文…

设计模式-00 设计模式简介之几大原则

设计模式-00 设计模式简介之几大原则 本专栏主要分析自己学习设计模式相关的浅解&#xff0c;并运用modern cpp 来是实现&#xff0c;描述相关设计模式。 通过编写代码&#xff0c;深入理解设计模式精髓&#xff0c;并且很好的帮助自己掌握设计模式&#xff0c;顺便巩固自己的c…

用于车载T-BOX汽车级的RA8900CE

用于车载T-BOX等高精度计时的汽车级时钟模块RTC:RA8900CE.车载实时时钟芯片RA8900CE内置32.768Khz的晶体&#xff0c;实现年、月、日、星期、小时、分钟和秒精准计时。RA8900CE满足AEC-Q200认证&#xff0c;内置温补功能&#xff0c;保证实时时钟的稳定可靠&#xff0c;功耗低至…

【Linux】解决ubuntu20.04版本插入无线网卡没有wifi显示【无线网卡Realtek 8811cu】

ubuntu为Realtek 8811cu安装驱动&#xff0c;解决wifi连接问题 1、确认无线网卡的型号-Realtek 8810cu2、下载并配置驱动 一句话总结&#xff1a;先确定网卡的型号&#xff0c;然后根据网卡的型号区寻找对应的驱动下载&#xff0c;下载完成之后在ubuntu系统中进行编译&#xff…

HTTP慢连接攻击的原理和防范措施

随着互联网的快速发展&#xff0c;网络安全问题日益凸显&#xff0c;网络攻击事件频繁发生。其中&#xff0c;HTTP慢速攻击作为一种隐蔽且高效的攻击方式&#xff0c;近年来逐渐出现的越来越多。 为了防范这些网络攻击&#xff0c;我们需要先了解这些攻击情况&#xff0c;这样…

【笔试】03

FLOPS FLOPS 是 Floating Point Operations Per Second 的缩写&#xff0c;意为每秒浮点运算次数。它是衡量计算机性能的指标&#xff0c;特别是用于衡量计算机每秒能够执行多少浮点运算。在高性能计算领域&#xff0c;FLOPS 被广泛用来评估超级计算机、CPU、GPU 和其他处理器…

2024年区块链链游即将迎来大爆发

随着区块链技术的不断发展和成熟&#xff0c;其应用领域也在不断扩展。其中&#xff0c;区块链链游&#xff08;Blockchain Games&#xff09;作为区块链技术在游戏行业中的应用&#xff0c;备受关注。2024年&#xff0c;区块链链游行业即将迎来爆发&#xff0c;这一趋势不容忽…

Windows10如何关闭Edge浏览器的Copilot

在Windows10更新后&#xff0c;打开Edge浏览器&#xff0c;无论复制什么内容&#xff0c;都会弹出Copilot人工智能插件&#xff0c;非常令人反感&#xff0c;网上搜索的关闭方法都非常麻烦&#xff0c;比如&#xff1a;组策略和注册表。自己摸索得出最简便有效的关闭方法。 1、…

【java毕业设计】 基于Spring Boot+mysql的高校心理教育辅导系统设计与实现(程序源码)-高校心理教育辅导系统

基于Spring Bootmysql的高校心理教育辅导系统设计与实现&#xff08;程序源码毕业论文&#xff09; 大家好&#xff0c;今天给大家介绍基于Spring Bootmysql的高校心理教育辅导系统设计与实现&#xff0c;本论文只截取部分文章重点&#xff0c;文章末尾附有本毕业设计完整源码及…

一致性hash

一、什么是一致性hash 普通的hash算法 (hashcode % size )&#xff0c;如果size发生变化&#xff0c;几乎所有的历史数据都需要重hash、移动&#xff0c;代价非常大&#xff0c;常见的java中的hashmap就是如此。 那如果在hash表扩容或者收缩的时候size能够保持不变&#xff0…

gitee / github 配置git, 实现免密码登录

文章目录 怎么配置公钥和私钥验证配置成功问题 怎么配置公钥和私钥 以下内容参考自 github ssh 配置&#xff0c;gitee的配置也是一样的&#xff1b; 粘贴以下文本&#xff0c;将示例中使用的电子邮件替换为 GitHub 电子邮件地址。 ssh-keygen -t ed25519 -C "your_emai…

线性代数 --- 矩阵的对角化以及矩阵的n次幂

矩阵的对角化以及矩阵的n次幂 &#xff08;特征向量与特征值的应用&#xff09; 前言&#xff1a; 在上一篇文章中&#xff0c;我记录了学习矩阵的特征向量和特征值的学习笔记&#xff0c;所关注的是那些矩阵A作用于向量x后&#xff0c;方向不发生改变的x(仅有尺度的缩放)。线…