我最近参与了一个项目,在该项目中,我不得不有效地处理通过AWS SQS Queue流入的大量消息。 在这篇文章(可能还有一篇)中,我将介绍使用出色的Project Reactor处理消息的方法。
以下是我要进行的设置:
设置本地AWS环境
在我进入代码之前,让我先做一些准备。 首先,如何获得SNS和SQS的本地版本。 最简单的方法之一是使用localstack 。 我使用这里描述的docker-compose版本
我将使用的第二个实用程序是AWS CLI。 该网站包含有关如何在本地安装的详细信息。
一旦这两个实用程序都到位,快速测试应验证设置:
# Create a queue aws --endpoint http: //localhost:4576 sqs create-queue --queue-name test-queue # Send a sample message aws --endpoint http: //localhost:4576 sqs send-message --queue-url http://localhost:4576/queue/test-queue --message-body "Hello world" # Receive the message aws --endpoint http: //localhost:4576 sqs receive-message --queue-url http://localhost:4576/queue/test-queue
项目反应堆的基础
Project Reactor实现了Reactive Streams规范,并提供了一种跨异步边界处理数据流的方法,该方法尊重背压。 这里有很多词,但本质上是这样想的:
1. SQS产生数据 2.应用程序将使用它并将其作为数据流进行处理 3.应用程序应以可持续的速度使用数据–不应输入太多数据。这正式称为 “背压”
AWS开发工具包2
我将用于消耗AWS SQS数据的库是
AWS开发工具包2 。 该库在幕后使用了非阻塞IO。
该库提供了拨打电话的同步版本以及异步版本。 考虑从SQS队列中获取记录的同步方式:
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest import software.amazon.awssdk.services.sqs.SqsClient val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder() .queueUrl(queueUrl) .maxNumberOfMessages( 5 ) .waitTimeSeconds( 10 ) .build() val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages()
在这里,“ software.amazon.awssdk.services.sqs.SqsClient”用于查询sqs和同步检索一批结果。 另一方面,异步结果如下所示:
val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder() .queueUrl(queueUrl) .maxNumberOfMessages( 5 ) .waitTimeSeconds( 10 ) .build() val messages: CompletableFuture<List<Message>> = sqsAsyncClient .receiveMessage(receiveMessageRequest) .thenApply { result -> result.messages() }
现在,输出为“ CompletableFuture”
无限循环,无背压
我最初创建消息流( Flux )的尝试非常简单–一个无限循环,它轮询AWS sqs并使用“ Flux.create”运算符从中创建Flux ,方法是:
fun listen(): Flux<Pair<String, () -> Unit>> { return Flux.create { sink: FluxSink<List<Message>> -> while (running) { try { val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder() .queueUrl(queueUrl) .maxNumberOfMessages( 5 ) .waitTimeSeconds( 10 ) .build() val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages() LOGGER.info( "Received: $messages" ) sink.next(messages) } catch (e: InterruptedException) { LOGGER.error(e.message, e) } catch (e: Exception) { LOGGER.error(e.message, e) } } } .flatMapIterable(Function.identity()) .doOnError { t: Throwable -> LOGGER.error(t.message, t) } .retry() .map { snsMessage: Message -> val snsMessageBody: String = snsMessage.body() val snsNotification: SnsNotification = readSnsNotification(snsMessageBody) snsNotification.message to { deleteQueueMessage(snsMessage.receiptHandle(), queueUrl) } } }
它的工作方式是存在一个无限循环,该循环使用long-polling检查新消息。 消息可能并非在每次轮询时都可用,在这种情况下,会将空列表添加到流中。
然后,使用“ flatMapIterable”运算符将此列表中的最多5条消息映射到单个消息流,并通过从SNS包装器中提取消息来进一步映射(当消息从SNS转发到SQS时,SNS将包装器添加到消息),并在消息成功处理后删除消息的方法(deleteHandle)作为对返回。
这种方法可以很好地工作……但是,请想象一下有大量消息进入的情况,因为循环并没有真正意识到下游的吞吐量,它将继续将数据泵送到流中。 中间操作员的默认行为是根据最终使用者使用数据的方式来缓冲流入的数据。 由于此缓冲区是无界的,因此系统可能会达到不可持续的状态。
背压感知流
解决方法是使用其他运算符生成数据流–
助焊剂
使用此运算符的代码如下所示:
fun listen(): Flux<Pair<String, () -> Unit>> { return Flux.generate { sink: SynchronousSink<List<Message>> -> val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder() .queueUrl(queueUrl) .maxNumberOfMessages( 5 ) .waitTimeSeconds( 10 ) .build() val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages() LOGGER.info( "Received: $messages" ) sink.next(messages) } .flatMapIterable(Function.identity()) .doOnError { t: Throwable -> LOGGER.error(t.message, t) } .retry() .map { snsMessage: Message -> val snsMessageBody: String = snsMessage.body() val snsNotification: SnsNotification = readSnsNotification(snsMessageBody) snsNotification.message to { deleteQueueMessage(snsMessage.receiptHandle(), queueUrl) } } }
这种工作方式是重复调用传递给“ Flux.generate”运算符的块–与while循环类似,在每个循环中,期望将一项添加到流中。 在这种情况下,添加到流中的项目恰好是一个列表,该列表像以前一样分解为单独的消息。
背压在这种情况下如何工作–
因此,请再次考虑下游使用者处理速度比生成端慢的情况。 在这种情况下,Flux本身将以调用generate运算符的速率减慢速度,因此要考虑下游系统的吞吐量。
结论
这应该建立一个良好的管道来处理来自SQS的消息,对此有更多细微差别,可以稍后在流中并行处理消息,我将在以后的文章中介绍。
这个例子的代码库可以在我的github仓库中找到
在这里 – https://github.com/bijukunjummen/boot-with-sns-sqs。 该代码具有完整的管道,其中包括处理消息并在处理后将其删除。
翻译自: https://www.javacodegeeks.com/2020/03/processing-sqs-messages-using-spring-boot-and-project-reactor.html