Spring,Reactor和ElasticSearch:从回调到React流

Spring 5(以及Boot 2,将在数周内到货)是一次革命。 不是“ XML上的注释 ”或“ Java上的注释类 ”的革命。 这是一个真正的革命性框架,可以编写全新的应用程序类别。 近年来,我对此框架感到有些恐惧。 “ Spring Cloud是简化了Spring Boot使用的框架,Spring简化了Spring使用的框架,是简化了企业开发的框架。” start.spring.io (也称为“ 开始...点弹簧...点I ... O ”)列出了120个可以添加到服务中的不同模块(!)。 这几天的春天变成了一个庞大的伞式项目,我可以想象为什么有些人(仍然!)更喜欢Java EE(或者这些天所谓的东西)。

但是Spring 5带来了革命性的革命。 它不再只是阻止servlet API和各种Web框架的包装器。 在Project Reactor之上的Spring 5允许编写高性能,超快速和可伸缩的服务器,完全避免了servlet堆栈。 该死的,CLASSPATH上没有Jetty甚至servlet API! 在Spring 5 Web-flux的核心,我们将找到Netty ,这是一个用于编写异步客户端和服务器的低级框架。 最终,Spring成为React框架家族的一等公民。 Java开发人员可以实现快速服务,而不必离开自己的舒适区,也可以使用https://doc.akka.io/docs/akka-http/current/或https://www.playframework.com/ 。 Spring 5是用于构建高度可扩展且具有弹性的应用程序的完全被动的现代工具。 尽管如此,诸如控制器,bean,依赖注入之类的基本原理都是相同的。 而且,升级路径很顺利,我们可以逐步添加功能,而不是学习全新的外来框架。 足够多的谈话,让我们写一些代码。

在本文中,我们将编写一个简单的无头应用程序,该应用程序可以在ElasticSearch中大量索引文档。 我们的目标是即使服务器速度变慢,也只需要几个线程即可实现数千个并发连接。 但是,与Spring Data MongoDB不同, Spring Data ElasticSearch本身不支持非阻塞存储库。 好吧,后者似乎不再被维护,当前版本已经3年了。 许多文章定位Spring 5 +的MongoDB其仓库返回无阻塞流( FluxFlowable的RxJava)。 这一点会更高级。

ElasticSearch 6 Java API使用RESTful接口,并使用非阻塞HTTP客户端实现。 不幸的是,它使用回调而不是像CompletableFuture这样的理智的东西。 因此,让我们自己构建客户端适配器。

使用Fluxes和Monos的ElasticSearch客户端

本文的源代码可在react reactive-elastic-search分支上的github.com/nurkiewicz/elastic-flux上找到。

我们想通过返回FluxMono来构建一个支持Project Reactor的ElasticSearch Java客户端。 当然,如果基础流是完全异步的并且不消耗线程,则将获得最大的好处。 幸运的是,Java API就是这样。 首先,让我们将ElasticSearch的客户端设置为Spring Bean:

import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;@Bean
RestHighLevelClient restHighLevelClient() {return new RestHighLevelClient(RestClient.builder(new HttpHost("localhost", 9200)).setRequestConfigCallback(config -> config.setConnectTimeout(5_000).setConnectionRequestTimeout(5_000).setSocketTimeout(5_000)).setMaxRetryTimeoutMillis(5_000));
}

在现实生活中,我们显然会将其中的大多数参数化。 我们将为简单的JSON文档建立索引,目前,它们的内容并不重要:

@Value
class Doc {private final String username;private final String json;
}

我们将编写的代码包装RestHighLevelClient并通过返回Mono<IndexResponse>使它更高级Mono非常类似于CompletableFuture但有两个例外:

  • 很懒–只要您不订阅,就不会开始计算
  • CompletableFuture不同, Mono可以正常完成而不会发出任何值

第二个区别总是对我有些误导。 在RxJava 2.x中,有两种不同的类型: Single (总是带有值或错误)和Maybe (例如Mono )。 太糟糕的Reactor并没有做到这一点。 没关系,适配器层是什么样的? 普通的Elastic API如下所示:

client.indexAsync(indexRequest, new ActionListener() {@Overridepublic void onResponse(IndexResponse indexResponse) {//got response}@Overridepublic void onFailure(Exception e) {//got error}
});

您可以看到问题所在: callback hell 。 与其将自定义ActionListener公开为该逻辑的参数,不如将其包装在Mono

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;private Mono<IndexResponse> indexDoc(Doc doc) {return Mono.create(sink -> {IndexRequest indexRequest = new IndexRequest("people", "person", doc.getUsername());indexRequest.source(doc.getJson(), XContentType.JSON);client.indexAsync(indexRequest, new ActionListener<IndexResponse>() {@Overridepublic void onResponse(IndexResponse indexResponse) {sink.success(indexResponse);}@Overridepublic void onFailure(Exception e) {sink.error(e);}});});
}

我们必须创建包装了JSON文档的IndexRequest ,并通过RESTful API发送它。 但这不是重点。 我们正在使用Mono.create()方法,它有一些缺点,但稍后会介绍更多。 Mono是懒惰的,因此仅调用indexDoc()还不够,没有对ElasticSearch发出HTTP请求。 但是,每次有人订阅此单元素源时,都会执行create()内部的逻辑。 关键行是sink.success()sink.error() 。 它们将结果从ElasticSearch(来自后台异步线程)传播到流中。 在实践中如何使用这种方法? 非常简单!

Doc doc = //...
indexDoc(doc).subscribe(indexResponse -> log.info("Got response"));

当然,React流处理的真正能力来自于组合多个流。 但是我们迈出了第一步:将基于回调的异步API转换为通用流。 如果您不愿意使用MongoDB,它会在存储库中内置对诸如MonoFlux类的React式类型的支持。 Cassandra和Redis也是如此 。 在下一篇文章中,我们将学习如何生成一些虚假数据并对其进行索引。

翻译自: https://www.javacodegeeks.com/2018/01/spring-reactor-elasticsearch-callbacks-reactive-streams.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/334065.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

关于压缩工具 7z(7-zip) 的选项 -so(从标准输出流写入数据)的解读

文章目录一、选项介绍二、选项示例&#xff08;一&#xff09;解压缩 archive.gz 输出流并将该输出流写入到 Doc.txt 文件&#xff08;二&#xff09;压缩 Doc.txt 输出流并将该输出流写入到 archive.gz 压缩档案一、选项介绍 选项 -so 使 7-Zip 从 stdout&#xff08;标准输出…

python删除重复文字_python如何删除文件中重复的字段

本文实例为大家分享了python如何删除文件中重复字段的具体代码&#xff0c;供大家参考&#xff0c;具体内容如下原文件内容放在list中&#xff0c;新文件内容按行查找&#xff0c;如果没有出现在list中则写入第三个文件中。import csvfiletxt1 E:/gg/log/log1.txtfilecsv1 E:…

css 深度选择器 ,CSS的coped私有作用域和深度选择器

大家都知道当 编译前&#xff1a;.example {color: red;}编译后&#xff1a;.example[data-v-f3f3eg9] {color: red;}看完你肯定就会明白了&#xff0c;其实是在你写的组件的样式&#xff0c;添加了一个属性而已&#xff0c;这样就实现了所谓的私有作用域。但是也会有弊端&…

关于压缩工具 7z(7-zip) 的选项 -u(更新压缩档案中的文件)的解读

文章目录一、选项介绍&#xff08;一&#xff09;参量说明二、示例一、选项介绍 更新选项 -u 指定压缩档案中文件的更新及创建的方式。可以和此选项结合使用的命令&#xff1a;a (添加)&#xff0c; d (删除)&#xff0c; u (更新) 。 语法格式&#xff1a;-u[-][action_set]…

营销自动化权威指南_免费电子书:自动化根本原因分析的完整指南

营销自动化权威指南自动化根本原因分析入门指南。 我们在OverOps过夜&#xff0c;梦想着拥有一个自动化的世界。 看起来似乎有些怪异&#xff0c;但是您能想象您和您的团队会比那些不眠之夜和无休止的日子在日志文件中筛选以找出问题出在哪里的快乐多了吗&#xff1f; 这些乏…

光立方原理讲解_90%人不理解什么是防眩光射灯 防昡晕 防炫光,。怎么选项led防眩灯...

关于LED射灯“防眩光(防炫光)”实在想和各位朋友讲解一下防眩光原理 &#xff0c;因为我们碰到90%以上装修朋友&#xff0c;总是把眩光理解为简单的不刺眼的光。我可以负责的告诉大家 &#xff0c;市面上没有不刺眼的射灯&#xff0c;不管是LED芯片直接射出来的光&#xff0c;还…

打印机设置虚拟服务器,虚拟机打印机服务器设置

虚拟机打印机服务器设置 内容精选换一换迁移前&#xff0c;您需要设置目的端服务器。该目的端用来接收源端的数据&#xff0c;同时您也可以使用该目的端进行迁移测试和启动目的端。只有“迁移阶段”为“已就绪”时才可设置目的端。或单击“操作”列的“更多 > 设置目的端”&…

关于压缩工具 7z(7-zip) 的选项 -w(设置工作目录)的解读

文章目录一、选项介绍二、语法格式三、选项示例一、选项介绍 选项 -w 为文件压缩设置临时的工作目录。可以和此选项结合使用的命令&#xff1a;a (添加)&#xff0c; d (删除)&#xff0c; u (更新) 。 默认情况下&#xff0c;7-Zip 新建一个压缩档案时&#xff0c;会临时在当…

nginx fastcgi python_Nginx + webpy 和FastCGI搭建webpy环境

web.py 是一个轻量级Python web框架&#xff0c;它简单而且功能大。web.py是一个开源项目。1、所需要的软件&#xff1a;Nginx nginx-1.4.7.tar.gz (需要包含fastcgi和rewrite模块)。Webpy 0.32Spawn-fcgi 1.6.2Flup注意&#xff1a;Flup是最常见的忘记装的软件&#xff0c;需要…

datastore_使用Spring Session和JDBC DataStore进行会话管理

datastore在Web应用程序中&#xff0c;用户会话管理对于管理用户状态至关重要。 在本文中&#xff0c;我们将学习在集群环境中管理用户会话所采用的方法&#xff0c;以及如何使用Spring Session以更简单和可扩展的方式实现该方法。 通常在生产环境中&#xff0c;我们将有多个服…

关于压缩工具 7z(7-zip) 的选项 -x(排除文件)的解读

文章目录一、选项介绍二、选项语法三、选项示例一、选项介绍 选项 -x 用来指定某一文件或某一类文件从操作中排除&#xff0c;此选项可同时排除多个类型。可以和此选项结合使用的命令&#xff1a;a (添加)&#xff0c; d (删除)&#xff0c; e (释放)&#xff0c; l (列表)&am…

华为服务器芯片总在pc,服务器芯片 华为

弹性云服务器 ECS弹性云服务器(Elastic Cloud Server)是一种可随时自助获取、可弹性伸缩的云服务器&#xff0c;帮助用户打造可靠、安全、灵活、高效的应用环境&#xff0c;确保服务持久稳定运行&#xff0c;提升运维效率三年低至5折&#xff0c;多种配置可选了解详情Linux云服…

python figure函数 gui_python 在一个GUI内创建了2个figure,为什么只能显示第二个图?...

建议你把代码格式化一下&#xff0c;编辑框上有代码块选项&#xff0c;选中代码再点击如下图标就可以格式化。说说这段代码的问题。因为tk我没怎么用过&#xff0c;说明如果有错&#xff0c;帮忙指正。def figure(self):f1Figure((4,3))self.f11f1.add_subplot(111)self.canvas…

java 哈希算法_选择Java密码算法第1部分-哈希

java 哈希算法抽象 这是涵盖Java加密算法的三部分博客系列文章的第1部分。 该系列涵盖如何实现以下功能&#xff1a; 使用SHA–512散列 使用AES–256的单密钥对称加密 使用RSA–4096的公钥/私钥非对称加密 第一篇文章详细介绍了如何实现SHA–512哈希。 让我们开始吧。 免责…

在 Linux 下打包命令 tar 和压缩命令 7z 的配合使用示例

文章目录一、压缩命令&#xff08;结合 tar 命令&#xff09;二、解压命令&#xff08;结合 tar 命令&#xff09;一、压缩命令&#xff08;结合 tar 命令&#xff09; tar cf – /home/test | 7z a -si test.tar.7z上面命令将 /home/test 文件夹压缩为 test.tar.7z 文件。实际…

上传书籍进度信息到服务器...,使用HttpWebRequest实现大文件上传资料.pdf

Twilight Software Development Studio © 2011使用HttpWebRequest 实现大文件上传Author:xuzhihongCreate Date:2011-06-03Descriptions: WinForm 程序使用HttpWebRequest 实现大文件上传Url: /blog/static/2673158720115991432899/概述&#xff1a;通常在WinForm 程序中都…

做一个公众号大概要多少钱_公众号流量主一个月可以赚多少钱?

我是小郁儿&#xff0c;点击上方“关注”&#xff0c;每天为你分享自媒体运营与个人精进干货。细心的读者会发现&#xff0c;阅读我文章的时候&#xff0c;最下面不再出现广告卡片了&#xff0c;因为我前几天已经把流量主功能关掉。起初是因为我在看自己发的视频时&#xff0c;…

Linux 命令之 7za -- 文件压缩命令

文章目录一、命令介绍二、7za 和 7z 的区别三、常用选项四、命令示例&#xff08;一&#xff09;压缩指定目录下的内容&#xff08;二&#xff09;解压缩指定的压缩包到指定的目录下一、命令介绍 7za 一种文件压缩命令&#xff0c;7z格式具有高压缩比率&#xff0c;它采用了多…

从Speedment 3.0.17或更高版本的事务轻松返回值

交易次数 在我以前的文章中&#xff0c;我写了关于如何使用Speedment轻松使用事务的方法&#xff0c;其中我们原子地更新了两个银行帐户。 众所周知&#xff0c;事务是一种将多个数据库操作组合到一个原子执行的单个操作中的方法。 但是事务不仅与更新数据库有关&#xff0c;而…

华为路由设置虚拟服务器命令,华为路由器配置ip命令

基本命令  en 进入特权模式conf 进入全局配置模式in s0 进入 serial 0 端口配置ip add xxx.xxx.xxx.xxx xxx.xxx.xxx.xxx 添加ip 地址和掩码&#xff0c;电信分配enca hdlc/ppp 捆绑链路协议 hdlc 或者  ip unn e0  exit 回到全局配置模式in e0 进入以太接口配置ip add x…