使用Spring Boot和Project Reactor处理SQS消息-第2部分

这是我关于使用Spring Boot和Project Reactor有效处理SQS消息的博客文章的后续文章

我在第一部分中列出了一些方法上的差距。

1.处理SQS客户端调用中的失败

2.该方法一次只能处理来自SQS的一条消息,如何并行化 3.它不处理错误,管道中的任何错误都会中断整个过程并停止从队列中读取更新的消息。

概括

回顾一下,上一篇文章演示了如何使用出色的Project Reactor创建管道来处理来自AWS SQS队列的消息

该练习的最终结果是一个管道,如下所示:

有了这个管道,让我现在讨论如何弥合差距:

处理SQS客户端故障

此功能生成从SQS读取的消息流。

 Flux.generate { sink: SynchronousSink<List<Message>> -> val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder() .queueUrl(queueUrl) .maxNumberOfMessages( 5 ) .waitTimeSeconds( 10 ) .build() val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages() sink.next(messages)  } .flatMapIterable(Function.identity()) 

现在考虑上述“ sqsClient”存在连接问题的情况, Flux的行为是在发生错误的情况下终止了流。 当然,只要服务正在运行,这对于服务于处理消息的服务就不会起作用。

解决方法是在出现错误的情况下仅重试处理流程。

 Flux.generate { sink: SynchronousSink<List<Message>> -> val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder() .queueUrl(queueUrl) .maxNumberOfMessages( 5 ) .waitTimeSeconds( 10 ) .build() val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages() sink.next(messages)  } .flatMapIterable(Function.identity()) .retry() 

如果出现任何错误,这将导致Flux重新建立消息流。

并行处理消息

Project Reactor提供了几种并行化处理管道的方式。 我第一次尝试并行处理是在处理链中添加“ subscribeOn”方法。

 Flux.generate { sink: SynchronousSink<List<Message>> -> val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder() .queueUrl(queueUrl) .maxNumberOfMessages( 5 ) .waitTimeSeconds( 10 ) .build() val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages() sink.next(messages)  } .flatMapIterable(Function.identity()) .retry() .subscribeOn(Schedulers.newElastic( "sub" )) 

但是,这不是“ subscribeOn”的工作方式。 当我向该管道发送一些消息时,输出如下:

 2020 - 04 - 07 20 : 52 : 53.241 INFO 1137 --- [         sub- 3 ] sample.msg.MessageListenerRunner        : Processed Message hello  2020 - 04 - 07 20 : 52 : 53.434 INFO 1137 --- [         sub- 3 ] sample.msg.MessageListenerRunner        : Processed Message hello  2020 - 04 - 07 20 : 52 : 53.493 INFO 1137 --- [         sub- 3 ] sample.msg.MessageListenerRunner        : Processed Message hello  2020 - 04 - 07 20 : 52 : 53.538 INFO 1137 --- [         sub- 3 ] sample.msg.MessageListenerRunner        : Processed Message hello  2020 - 04 - 07 20 : 52 : 53.609 INFO 1137 --- [         sub- 3 ] sample.msg.MessageListenerRunner        : Processed Message hello  2020 - 04 - 07 20 : 52 : 53.700 INFO 1137 --- [         sub- 3 ] sample.msg.MessageListenerRunner        : Processed Message hello 

上面的“ sub-3”是处理消息的线程的名称,看起来所有消息都在“ sub-3”线程上进行处理,而没有其他线程在处理!

subscriptionOn只是通过从此调度程序池中借用“线程”来更改执行上下文,而不使用池本身中的所有线程。

那么如何使处理并行化呢? 这个StackOverflow答案提供了我在这里使用的一种非常好的方法,本质上是使用
flatMap运算符,然后在“ flatMap”运算符内添加“ subscribeOn”运算符。

该运算符急切地订阅其内部发布者,然后将结果展平,其诀窍是可以为内部订阅者提供他们自己的调度程序,并且对于每个订阅,最终将使用调度程序池中的线程。 这些并发订阅者的数量可以使用传递给flatMap运算符的“并发”参数来控制。

 Flux.generate { sink: SynchronousSink<List<Message>> -> val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder() .queueUrl(queueUrl) .maxNumberOfMessages( 5 ) .waitTimeSeconds( 10 ) .build() val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages() sink.next(messages)  } .flatMapIterable(Function.identity()) .retry() .flatMap({ (message: String, deleteHandle: () -> Unit) -> task(message) .then(Mono.fromSupplier { Try.of { deleteHandle() } }) .then() .subscribeOn(taskScheduler) }, concurrency) 

处理多个消息时的输出如下所示–

 2020 - 04 - 08 21 : 03 : 24.582 INFO 17541 --- [ taskHandler- 4 ] sample.msg.MessageListenerRunner        : Processed Message hello  2020 - 04 - 08 21 : 03 : 24.815 INFO 17541 --- [ taskHandler- 4 ] sample.msg.MessageListenerRunner        : Processed Message hello  2020 - 04 - 08 21 : 03 : 24.816 INFO 17541 --- [ taskHandler- 5 ] sample.msg.MessageListenerRunner        : Processed Message hello  2020 - 04 - 08 21 : 03 : 24.816 INFO 17541 --- [ taskHandler- 6 ] sample.msg.MessageListenerRunner        : Processed Message hello  2020 - 04 - 08 21 : 03 : 24.816 INFO 17541 --- [ taskHandler- 7 ] sample.msg.MessageListenerRunner        : Processed Message hello  2020 - 04 - 08 21 : 03 : 24.817 INFO 17541 --- [ taskHandler- 8 ] sample.msg.MessageListenerRunner        : Processed Message hello 

现在查看日志中,除了线程名(taskHandler- *)之外还有更多!

处理下游错误

我以前使用“重试”运算符进行的修复之一是关于使用sqsClient连接处理上游错误。 但是,有可能在管道中处理消息并且任何步骤引发错误时,整个管道都会失败。 因此,重要的是要防止每一步失败。 我一直致力于确保错误不会传播的一种巧妙方法是使用出色的vavr库及其“尝试”类型 。 尝试类型具有两个结果–一个成功(成功)或一个异常(失败)。 这使其余的管道可以按可衡量的方式对上一步的结果进行操作:

 .flatMap({ (message: String, deleteHandle: () -> Unit) -> task(message) .then(Mono.fromSupplier { Try.of { deleteHandle() } }) .doOnNext { t -> t.onFailure { e -> LOGGER.error(e.message, e) } } .then() .subscribeOn(taskScheduler)  }, concurrency) 

上面的代码段演示了一种方法,在该方法中,我知道负责删除消息的“ deleteHandle”会引发异常,Try捕获了此异常,如果有错误记录了异常,则该异常不会缩短消息流。

结论

我最初的想法是,因为我已经采取了一种被动的方式来处理消息,所以我将在我的sqs消息处理管道中获得巨大的推动,但是,我的学习是,就像其他所有事情一样,需要对基于Project的反应堆进行仔细的了解和调整流以有效地处理消息。 我敢肯定,还有更多课程可供我学习,我将像我一样记录下来。

整个示例可在我的github存储库中找到 -https://github.com/bijukunjummen/boot-with-sns-sqs

翻译自: https://www.javacodegeeks.com/2020/04/processing-sqs-messages-using-spring-boot-and-project-reactor-part-2.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/339763.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

java爬虫jsoup_Java爬虫之利用Jsoup自制简单的搜索引擎

内容导读在上述代码中&#xff0c;url为输入词条(暂时仅限于英文)&#xff0c;进入while循环可一直搜索&#xff0c;当输入为’exit’时退出。contentText为该词条的百度百科简介的网页形式&#xff0c;通过正则表达式将其中的文字提取出来。代码虽然简洁&#xff0c;但是功能还…

shader weaver_具有自定义汇编程序,Weaver和运行时的可插拔知识

shader weaver作为贝叶斯工作的一部分&#xff0c;我对Kie进行了很多重构&#xff0c;使其具有清晰的扩展点。 我想确保可以完成贝叶斯系统的所有工作部件&#xff0c;而无需在现有内核中添加任何代码。 因此&#xff0c;现在每种知识类型都可以拥有自己的包&#xff0c;汇编器…

matplotlib的默认字体_浅谈matplotlib默认字体设置探索

控制默认字体的设置根据官方文档https://matplotlib.org/tutorials/text/text_props.html#default-font可知&#xff1a;The base default font is controlled by a set of rcParams默认字体是由一组rcParams控制的。rcParamusage‘font.family"List of either names of f…

如何使用Apache Camel,Quarkus和GraalVM快速运行100个骆驼

今天&#xff0c;我继续在youtube上练习&#xff0c;并录制了10分钟的视频&#xff0c;介绍了如何创建一个新的Camel and Quarkus项目&#xff0c;该项目包括Rest和HTTP服务以及开箱即用的健康检查和指标。 然后比较以JVM模式运行示例与使用GraalVM编译的本机内存的使用情况。…

java空心菱形_java 空心菱形

分为两部分&#xff0c;先打印前四行&#xff0c;再打印后三行&#xff0c;int n 4;    //设初始值为4for(int i0;ifor(int j0;jSystem.out.print(" ");}for(int k0;kif(k0||k2*i) {    //打印前四行的*&#xff0c;中间部分输出空格System.out.print(&quo…

java接口版本控制_为什么要在Java中控制类和接口的可见性

java接口版本控制维护是软件开发的重要方面之一&#xff0c;并且经验证明&#xff0c;保持较低组件可视性的软件比暴露更多组件的软件更易于维护。 您不会在一开始就意识到它&#xff0c;但是在重新设计应用程序时会严重错过它。 由于保持向后兼容性是许多应用程序的“必须具备…

遮掩java_css之图片下方定位遮掩层

需要的效果如图&#xff0c;图片下方加个遮掩层&#xff1a;html&#xff1a;css&#xff1a;.listContent>div{width:300px;height: 300px;float: left;margin-top: 20px;margin-left: 20px;position:relative;}.mask{width:300px;height: 40px;background-color:#FFCCCC;p…

使用JDK的密码流的加密怪癖(以及如何做)

在我们的日常工作中&#xff0c;我们经常遇到经常性的主题&#xff0c;即将数据&#xff08;例如文件&#xff09;从一个位置传输到另一个位置。 这听起来像是一个非常简单的任务&#xff0c;但让我们通过声明这些文件可能包含机密信息并可以通过非安全的通信渠道进行传输这一事…

python中函数的定义实例_Python基础之函数的定义与使用实例

此文实例介绍了Python基础之函数的定义与使用。推荐给大伙学习一下&#xff0c;内容如下&#xff1a;Python 定义函数使用 def 关键字&#xff0c;一般格式如下&#xff1a;def 函数名(参数列表)&#xff1a;函数体让我们使用函数来输出"Hello World&#xff01;"&am…

log4j 程序日志_使用log4j监视和筛选应用程序日志到邮件

log4j 程序日志在今天的帖子中&#xff0c;我将向您展示如何将日志语句过滤为警告电子邮件。 这是出于监视我正在处理的一个应用程序的一些关键点的需要。 您可以使用一些工具来执行应用程序监视。 我不会详细介绍这些工具&#xff0c;但有时让应用程序发送警告电子邮件会更容易…

python切换消息窗_用Python切换窗口

The way that user had defined find_window only allows you to choose by the classname of the window用户定义它的方式是将这两个参数class_name和window_name传递给^{}(后者反过来只调用Win32 API函数^{})。所以&#xff0c;就这样做&#xff1a;windowmgr.find_window(No…

Java UnknownHostException –服务器的无效主机名–如何解决?

An UnknownHostException的快速指南&#xff0c;如果在为远程方法调用创建到远程主机的连接时发生java.net.UnknownHostException&#xff0c;则会抛出该快速指南。 UnknownHostException的预防方法。 1.简介 在本教程中&#xff0c;我们将学习什么是UnknownHostException以及…

mongodb连接java_如何从Java EE无状态应用程序连接到MongoDB

mongodb连接java在本文中&#xff0c;我将介绍如何从无状态Java EE应用程序连接到MongoDB&#xff0c;以利用与MongoDB Java驱动程序提供的数据库的内置连接池。 如果您开发的REST API对MongoDB执行操作&#xff0c;则可能是这种情况。 获取Java MongoDb驱动程序 要将Java连接…

学java专科_专科学历可以学习java开发吗

学习Java的热潮越来越高涨&#xff0c;除了转行而来的人&#xff0c;很多刚毕业的学生也加入到其中。很多人都觉得学习Java需要有一个高学历作为基础&#xff0c;一些专科生在学习之前会犹豫&#xff0c;他们是否能学习Java&#xff0c;首先学程序开发&#xff0c;入行Java开发…

具有InlfuxDB的Spring Boot和Micrometer第3部分:Servlet和JDBC

在上一个博客中&#xff0c;我们使用由InfluxDB支持的千分尺设置了反应式应用程序。 在本教程中&#xff0c;我们将使用传统的带JDBC阻塞式Servlet的Spring Stack。 我选择的数据库是postgresql。 我将使用与先前博客文章相同的脚本。 因此&#xff0c;我们将拥有初始化数据库…

java linkedlist实例_Java Linkedlist原理及实例详解

这篇文章主要介绍了Java Linkedlist原理及实例详解,文中通过示例代码介绍的非常详细&#xff0c;对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下定义&#xff1a;linkedlist属于链表结构&#xff0c;方便添加和删除元素&#xff0c;但查询不方便&#xff0c…

jsf xhtml调用方法_JSF的工作方式以及调试方法–可以使用polyglot吗?

jsf xhtml调用方法JSF不是我们通常认为的那样。 这也是一个调试起来可能有些棘手的框架&#xff0c;尤其是在初次遇到时。 在这篇文章中&#xff0c;让我们继续探讨为什么会出现这种情况&#xff0c;并提供一些JSF调试技术。 我们将讨论以下主题&#xff1a; JSF不是我们经常想…

java 分别编译_Java源文件和编译后的文件扩展名分别为()_学小易找答案

【单选题】( )下列关于逻辑运算符AND,描述正确的是哪一项?【单选题】如果声明一个类时使用abstract修饰符,则表明该类是()【填空题】要查询student表中name字段值以字符“m”开始,以字符“d”结束的记录应该在WHERE子句后跟 LIKE________。【填空题】不允许在关系中出现重复记…

将Auth0 OIDC(OAUTH 2)与授权(组和角色)集成

如果您正在使用Auth0对多个现有应用程序中的用户进行身份验证和授权&#xff0c;则可能需要将下一个Web应用程序与Auth0集成。 有多种方法可以执行此操作&#xff0c;例如&#xff0c;如果要将Jenkins与Auth0集成&#xff0c;则可以使用SAML v2&#xff1b;否则&#xff0c;可…

power of two java_LeetCode算法题-Power Of Two(Java实现)

这是悦乐书的第194次更新&#xff0c;第200篇原创01 看题和准备今天介绍的是LeetCode算法题中Easy级别的第56题(顺位题号是231)。给定一个整数&#xff0c;写一个函数来确定它是否是2的幂。例如&#xff1a;输入&#xff1a;1输出&#xff1a;true说明&#xff1a;2^0 1输入&a…