使用Spring Boot和Project Reactor处理SQS消息

我最近参与了一个项目,在该项目中,我不得不有效地处理通过AWS SQS Queue流入的大量消息。 在这篇文章(可能还有一篇)中,我将介绍使用出色的Project Reactor处理消息的方法。

以下是我要进行的设置:

设置本地AWS环境

在我进入代码之前,让我先做一些准备。 首先,如何获得SNS和SQS的本地版本。 最简单的方法之一是使用localstack 。 我使用这里描述的docker-compose版本

我将使用的第二个实用程序是AWS CLI。 该网站包含有关如何在本地安装的详细信息。

一旦这两个实用程序都到位,快速测试应验证设置:

 # Create a queue  aws --endpoint http: //localhost:4576 sqs create-queue --queue-name test-queue  # Send a sample message  aws --endpoint http: //localhost:4576 sqs send-message --queue-url http://localhost:4576/queue/test-queue --message-body "Hello world"  # Receive the message  aws --endpoint http: //localhost:4576 sqs receive-message --queue-url http://localhost:4576/queue/test-queue 

项目反应堆的基础

Project Reactor实现了Reactive Streams规范,并提供了一种跨异步边界处理数据流的方法,该方法尊重背压。 这里有很多词,但本质上是这样想的:

1. SQS产生数据 2.应用程序将使用它并将其作为数据流进行处理 3.应用程序应以可持续的速度使用数据–不应输入太多数据。这正式称为 “背压”

AWS开发工具包2

我将用于消耗AWS SQS数据的库是
AWS开发工具包2 。 该库在幕后使用了非阻塞IO。

该库提供了拨打电话的同步版本以及异步版本。 考虑从SQS队列中获取记录的同步方式:

 import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest  import software.amazon.awssdk.services.sqs.SqsClient  val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder() .queueUrl(queueUrl) .maxNumberOfMessages( 5 ) .waitTimeSeconds( 10 ) .build()  val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages() 

在这里,“ software.amazon.awssdk.services.sqs.SqsClient”用于查询sqs和同步检索一批结果。 另一方面,异步结果如下所示:

 val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder() .queueUrl(queueUrl) .maxNumberOfMessages( 5 ) .waitTimeSeconds( 10 ) .build()  val messages: CompletableFuture<List<Message>> = sqsAsyncClient .receiveMessage(receiveMessageRequest) .thenApply { result -> result.messages() } 

现在,输出为“ CompletableFuture”

无限循环,无背压

我最初创建消息流( Flux )的尝试非常简单–一个无限循环,它轮询AWS sqs并使用“ Flux.create”运算符从中创建Flux ,方法是:

 fun listen(): Flux<Pair<String, () -> Unit>> { return Flux.create { sink: FluxSink<List<Message>> -> while (running) { try { val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder() .queueUrl(queueUrl) .maxNumberOfMessages( 5 ) .waitTimeSeconds( 10 ) .build() val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages() LOGGER.info( "Received: $messages" ) sink.next(messages) } catch (e: InterruptedException) { LOGGER.error(e.message, e) } catch (e: Exception) { LOGGER.error(e.message, e) } } } .flatMapIterable(Function.identity()) .doOnError { t: Throwable -> LOGGER.error(t.message, t) } .retry() .map { snsMessage: Message -> val snsMessageBody: String = snsMessage.body() val snsNotification: SnsNotification = readSnsNotification(snsMessageBody) snsNotification.message to { deleteQueueMessage(snsMessage.receiptHandle(), queueUrl) } }  } 

它的工作方式是存在一个无限循环,该循环使用long-polling检查新消息。 消息可能并非在每次轮询时都可用,在这种情况下,会将空列表添加到流中。

然后,使用“ flatMapIterable”运算符将此列表中的最多5条消息映射到单个消息流,并通过从SNS包装器中提取消息来进一步映射(当消息从SNS转发到SQS时,SNS将包装器添加到消息),并在消息成功处理后删除消息的方法(deleteHandle)作为对返回。

这种方法可以很好地工作……但是,请想象一下有大量消息进入的情况,因为循环并没有真正意识到下游的吞吐量,它将继续将数据泵送到流中。 中间操作员的默认行为是根据最终使用者使用数据的方式来缓冲流入的数据。 由于此缓冲区是无界的,因此系统可能会达到不可持续的状态。

背压感知流

解决方法是使用其他运算符生成数据流–
助焊剂
使用此运算符的代码如下所示:

 fun listen(): Flux<Pair<String, () -> Unit>> { return Flux.generate { sink: SynchronousSink<List<Message>> -> val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder() .queueUrl(queueUrl) .maxNumberOfMessages( 5 ) .waitTimeSeconds( 10 ) .build() val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages() LOGGER.info( "Received: $messages" ) sink.next(messages) } .flatMapIterable(Function.identity()) .doOnError { t: Throwable -> LOGGER.error(t.message, t) } .retry() .map { snsMessage: Message -> val snsMessageBody: String = snsMessage.body() val snsNotification: SnsNotification = readSnsNotification(snsMessageBody) snsNotification.message to { deleteQueueMessage(snsMessage.receiptHandle(), queueUrl) } }  } 

这种工作方式是重复调用传递给“ Flux.generate”运算符的块–与while循环类似,在每个循环中,期望将一项添加到流中。 在这种情况下,添加到流中的项目恰好是一个列表,该列表像以前一样分解为单独的消息。

背压在这种情况下如何工作–

因此,请再次考虑下游使用者处理速度比生成端慢的情况。 在这种情况下,Flux本身将以调用generate运算符的速率减慢速度,因此要考虑下游系统的吞吐量。

结论

这应该建立一个良好的管道来处理来自SQS的消息,对此有更多细微差别,可以稍后在流中并行处理消息,我将在以后的文章中介绍。

这个例子的代码库可以在我的github仓库中找到
在这里 – https://github.com/bijukunjummen/boot-with-sns-sqs。 该代码具有完整的管道,其中包括处理消息并在处理后将其删除。

翻译自: https://www.javacodegeeks.com/2020/03/processing-sqs-messages-using-spring-boot-and-project-reactor.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/339740.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

初级测试开发面试题_初级开发人员在编写单元测试时常犯的错误

初级测试开发面试题自从我编写第一个单元测试以来已经有10年了。 从那时起&#xff0c;我不记得我已经编写了成千上万的单元测试。 老实说&#xff0c;我在源代码和测试代码之间没有任何区别。 对我来说是同一回事。 测试代码是源代码的一部分。 在过去的3-4年中&#xff0c;我…

使用SoapUI调用安全WCF SOAP服务–第1部分,该服务

在这个由三部分组成的传奇中&#xff0c;我将演示如何使用SoapUI API工具来调用安全的SOAP服务。 首先&#xff0c;我将专注于创建服务&#xff0c;在接下来的文章中它将充当被测系统。 使用基本身份验证传输安全性机制维护对该服务中资源的访问。 Windows Communication Foun…

java简单系统_Java简单学生管理系统

Java简单学生管理系统这个不需要手动输入&#xff0c;笔记记录//studentpublic class student(){private String id;//学号private String name;//姓名private int age;//年龄public String getId() {return id;}public void setId(String id) {this.id id;}public String get…

kafka java编程demo_Kafka简单客户端编程实例

今天&#xff0c;我们给大家带来一篇如何利用Kafka的API进行客户端编程的文章&#xff0c;这篇文章很简单&#xff0c;就是利用Kafka的API创建一个生产者和消费者&#xff0c;生产者不断向Kafka写入消息&#xff0c;消费者则不断消费Kafka的消息。下面是具体的实例代码。一、创…

java我的世界极限生存_我的世界 1.7.10 极限生存整合包

整合包介绍&#xff1a;最近总有人觉得Minecraft很无聊&#xff0c;没有什么可玩的&#xff0c;或者觉得生存太简单 那么就来试试这个吧&#xff0c;全部是增强怪物的MOD&#xff0c;保证不无聊&#xff0c;保证不简单 基本上没有增加一些新的东西&#xff0c;只增加了几种怪物…

具有InlfuxDB的Spring Boot和Micrometer第1部分:基础项目

对于那些关注此博客的人来说&#xff0c;难怪我倾向于大量使用InfluxDB。 我喜欢这样一个事实&#xff0c;它是一个真正的单一用途的数据库&#xff08;时间序列&#xff09;&#xff0c;具有许多功能&#xff0c;并且还带有企业支持。 Spring也是我选择的工具之一。 因此&…

PIT,JUnit 5和Gradle –仅需额外的一行配置

在Gradle&#xff08;带有gradle-pitest-plugin 1.4.7&#xff09;中发现简单&#xff0c;经过改进的PIT和JUnit 5配置。 不可否认&#xff0c;如今JUnit 5越来越受欢迎。 虽然为JUnit 5提供了一个专用于PIT的插件&#xff0c;并且gradle-pitest-plugin支持了很多年&#xff0…

apache camel_使用WildFly 8在Java EE7中自举Apache Camel

apache camel从Camel版本2.10开始&#xff0c;支持CDI&#xff08;JSR-299&#xff09;和DI&#xff08;JSR-330&#xff09;。 这为在Java EE容器中以及在独立的Java SE或CDI容器中开发和部署Apache Camel项目提供了新的机会。 是时候尝试一下并熟悉它了。 骆驼到底是什么&am…

Hibernate中保存与持久性以及saveOrUpdate之间的区别

保存与保存或更新与持久保存在Hibernate中 save和saveOrUpdate之间的区别是什么或save和persist之间的区别是任何Hibernate面试中常见的面试问题&#xff0c;就像Hibernate中get和load方法之间的区别一样。 Hibernate Session类提供了几种方法&#xff0c;可以通过诸如save&am…

java中的语句有哪些_java中的循环语句有哪些

Java中有三种主要的循环结构&#xff1a;while 循环do…while 循环for 循环顺序结构的程序语句只能被执行一次。如果您想要同样的操作执行多次,&#xff0c;就需要使用循环结构。一、while循环语法&#xff1a;while( 布尔表达式 ) {     //循环内容   }只要符合布尔表达…

php无法新数据类型,新手入门PHP必知的七种数据类型

想要入门PHP&#xff0c;首先要学会搭建环境&#xff0c;其次是学习基础语法。PHP的基础包括数据类型&#xff0c;运算符&#xff0c;变量和常量等。在这篇文章中&#xff0c;我们主要了解什么是数据类型。数据类型是指同种数据的一个统称&#xff0c;一般会描述为XX数据类型。…

攻防世界web高手进阶php_rce,php_rce 攻防世界xctf web

php_rce首先了解ThinkPHP5.x rec 漏洞分析与复现https://blog.csdn.net/qq_40884727/article/details/101452478var_pathinfo的默认配置为s,我们可以通过$_GET[‘s’]来传参于是构造payloadhttp://111.198.29.45:30600/index.php?sindex/\think\App/invokefunction&functi…

具有InlfuxDB的Spring Boot和Micrometer第2部分:添加InfluxDB

自从我们添加了基本应用程序以来&#xff0c;是时候启动InfluxDB实例了。 我们将按照之前的教程进行操作&#xff0c;并添加一个docker实例。 docker run –rm -p 8086&#xff1a;8086 –name influxdb-本地influxdb 是时候在我们的pom上添加微米InfluxDB依赖项了 < dep…

使用比较器的nulls对具有null值的列表进行排序

你好朋友&#xff0c; 在本教程中&#xff0c;我们将看到如何使用Java 8 Comparator.nullsFirst在列表中的项目很少为空时如何对项目列表进行排序&#xff0c;以便将null视为列表中的最小元素。 –什么是比较器 – nullsFirst方法在Comparator中做什么 –排序具有非空名称的…

Jar Hell变得轻松–用jHades揭开类路径的神秘面纱

Java开发人员将不得不面对的最困难的问题是类路径错误&#xff1a; ClassNotFoundException &#xff0c; NoClassDefFoundError &#xff0c;Jar Hell&#xff0c; Xerces Hell和公司。 在本文中&#xff0c;我们将探究这些问题的根本原因&#xff0c;并了解最小的工具&#…

分度器中硒定位器的完整指南(示例)

在测试网站的功能时&#xff0c;特别是Web元素&#xff08;例如单选按钮&#xff0c;文本框&#xff0c;下拉列表等&#xff09;&#xff0c;您需要确保能够访问这些元素。 Selenium定位器正是出于这个目的&#xff0c;通过使用此命令&#xff0c;我们可以识别这些Web元素DOM&a…

wildfly管理控制台_WildFly 9 –别希望您的控制台像这样!

wildfly管理控制台每个人都可能听到这个消息。 周一发布了第一个WildFly 9.0.0.Alpha1版本。 您可以从wildfly.org网站上下载它&#xff0c;最大的变化是它是由一个新的功能配置工具构建的&#xff0c;该工具位于现在单独的核心发行版上&#xff0c;还包含一个新的Servlet发行版…

azure mysql sql,UiPath连接Azure Sql Server数据库

一、创建数据库在Azure中创建SQL数据库image更改防火墙设置&#xff0c;并设置客户端IP访问规则image二、安装数据源驱动在本地安装数据源驱动程序&#xff0c;保证可以正常接入到远程的数据库。如果不安装驱动程序&#xff0c;则会出现以下报错&#xff1a;[Microsoft][ODBC D…

linux 误删除mysql表能恢复吗,Linux误删数据恢复

引子指在键上飘&#xff0c;难免会湿手套。当你按下shiftdel键后&#xff0c;会不会突然心里凉透&#xff0c;当你执行rm -rf后&#xff0c;会不会马上去搜索哪个国家入境不需要签证。或者你还会遇到如下的情况&#xff1a;root4xem7:~# aliasalias cdrm -rfalias ddocker数据恢…

Apache Camel 3.1 –更多骆驼核心优化(第3部分)

我以前曾在博客中介绍过我们在下一个Camel 3.1版本中所做的优化 博客第1部分 博客第2部分 今天&#xff0c;我想简短介绍一下我们已经完成的最新开发&#xff0c;因为我们准备在本周末或下半年准备好构建和发布Camel 3.1。 从第2部分开始&#xff0c;我们设法在路由过程中将…