这是我关于使用Spring Boot和Project Reactor有效处理SQS消息的博客文章的后续文章
我在第一部分中列出了一些方法上的差距。
1.处理SQS客户端调用中的失败
2.该方法一次只能处理来自SQS的一条消息,如何并行化 3.它不处理错误,管道中的任何错误都会中断整个过程并停止从队列中读取更新的消息。
概括
回顾一下,上一篇文章演示了如何使用出色的Project Reactor创建管道来处理来自AWS SQS队列的消息
该练习的最终结果是一个管道,如下所示:
有了这个管道,让我现在讨论如何弥合差距:
处理SQS客户端故障
此功能生成从SQS读取的消息流。
Flux.generate { sink: SynchronousSink<List<Message>> -> val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder() .queueUrl(queueUrl) .maxNumberOfMessages( 5 ) .waitTimeSeconds( 10 ) .build() val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages() sink.next(messages) } .flatMapIterable(Function.identity())
现在考虑上述“ sqsClient”存在连接问题的情况, Flux的行为是在发生错误的情况下终止了流。 当然,只要服务正在运行,这对于服务于处理消息的服务就不会起作用。
解决方法是在出现错误的情况下仅重试处理流程。
Flux.generate { sink: SynchronousSink<List<Message>> -> val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder() .queueUrl(queueUrl) .maxNumberOfMessages( 5 ) .waitTimeSeconds( 10 ) .build() val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages() sink.next(messages) } .flatMapIterable(Function.identity()) .retry()
如果出现任何错误,这将导致Flux重新建立消息流。
并行处理消息
Project Reactor提供了几种并行化处理管道的方式。 我第一次尝试并行处理是在处理链中添加“ subscribeOn”方法。
Flux.generate { sink: SynchronousSink<List<Message>> -> val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder() .queueUrl(queueUrl) .maxNumberOfMessages( 5 ) .waitTimeSeconds( 10 ) .build() val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages() sink.next(messages) } .flatMapIterable(Function.identity()) .retry() .subscribeOn(Schedulers.newElastic( "sub" ))
但是,这不是“ subscribeOn”的工作方式。 当我向该管道发送一些消息时,输出如下:
2020 - 04 - 07 20 : 52 : 53.241 INFO 1137 --- [ sub- 3 ] sample.msg.MessageListenerRunner : Processed Message hello 2020 - 04 - 07 20 : 52 : 53.434 INFO 1137 --- [ sub- 3 ] sample.msg.MessageListenerRunner : Processed Message hello 2020 - 04 - 07 20 : 52 : 53.493 INFO 1137 --- [ sub- 3 ] sample.msg.MessageListenerRunner : Processed Message hello 2020 - 04 - 07 20 : 52 : 53.538 INFO 1137 --- [ sub- 3 ] sample.msg.MessageListenerRunner : Processed Message hello 2020 - 04 - 07 20 : 52 : 53.609 INFO 1137 --- [ sub- 3 ] sample.msg.MessageListenerRunner : Processed Message hello 2020 - 04 - 07 20 : 52 : 53.700 INFO 1137 --- [ sub- 3 ] sample.msg.MessageListenerRunner : Processed Message hello
上面的“ sub-3”是处理消息的线程的名称,看起来所有消息都在“ sub-3”线程上进行处理,而没有其他线程在处理!
subscriptionOn只是通过从此调度程序池中借用“线程”来更改执行上下文,而不使用池本身中的所有线程。
那么如何使处理并行化呢? 这个StackOverflow答案提供了我在这里使用的一种非常好的方法,本质上是使用
flatMap运算符,然后在“ flatMap”运算符内添加“ subscribeOn”运算符。
该运算符急切地订阅其内部发布者,然后将结果展平,其诀窍是可以为内部订阅者提供他们自己的调度程序,并且对于每个订阅,最终将使用调度程序池中的线程。 这些并发订阅者的数量可以使用传递给flatMap运算符的“并发”参数来控制。
Flux.generate { sink: SynchronousSink<List<Message>> -> val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder() .queueUrl(queueUrl) .maxNumberOfMessages( 5 ) .waitTimeSeconds( 10 ) .build() val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages() sink.next(messages) } .flatMapIterable(Function.identity()) .retry() .flatMap({ (message: String, deleteHandle: () -> Unit) -> task(message) .then(Mono.fromSupplier { Try.of { deleteHandle() } }) .then() .subscribeOn(taskScheduler) }, concurrency)
处理多个消息时的输出如下所示–
2020 - 04 - 08 21 : 03 : 24.582 INFO 17541 --- [ taskHandler- 4 ] sample.msg.MessageListenerRunner : Processed Message hello 2020 - 04 - 08 21 : 03 : 24.815 INFO 17541 --- [ taskHandler- 4 ] sample.msg.MessageListenerRunner : Processed Message hello 2020 - 04 - 08 21 : 03 : 24.816 INFO 17541 --- [ taskHandler- 5 ] sample.msg.MessageListenerRunner : Processed Message hello 2020 - 04 - 08 21 : 03 : 24.816 INFO 17541 --- [ taskHandler- 6 ] sample.msg.MessageListenerRunner : Processed Message hello 2020 - 04 - 08 21 : 03 : 24.816 INFO 17541 --- [ taskHandler- 7 ] sample.msg.MessageListenerRunner : Processed Message hello 2020 - 04 - 08 21 : 03 : 24.817 INFO 17541 --- [ taskHandler- 8 ] sample.msg.MessageListenerRunner : Processed Message hello
现在查看日志中,除了线程名(taskHandler- *)之外还有更多!
处理下游错误
我以前使用“重试”运算符进行的修复之一是关于使用sqsClient连接处理上游错误。 但是,有可能在管道中处理消息并且任何步骤引发错误时,整个管道都会失败。 因此,重要的是要防止每一步失败。 我一直致力于确保错误不会传播的一种巧妙方法是使用出色的vavr库及其“尝试”类型 。 尝试类型具有两个结果–一个成功(成功)或一个异常(失败)。 这使其余的管道可以按可衡量的方式对上一步的结果进行操作:
.flatMap({ (message: String, deleteHandle: () -> Unit) -> task(message) .then(Mono.fromSupplier { Try.of { deleteHandle() } }) .doOnNext { t -> t.onFailure { e -> LOGGER.error(e.message, e) } } .then() .subscribeOn(taskScheduler) }, concurrency)
上面的代码段演示了一种方法,在该方法中,我知道负责删除消息的“ deleteHandle”会引发异常,Try捕获了此异常,如果有错误记录了异常,则该异常不会缩短消息流。
结论
我最初的想法是,因为我已经采取了一种被动的方式来处理消息,所以我将在我的sqs消息处理管道中获得巨大的推动,但是,我的学习是,就像其他所有事情一样,需要对基于Project的反应堆进行仔细的了解和调整流以有效地处理消息。 我敢肯定,还有更多课程可供我学习,我将像我一样记录下来。
整个示例可在我的github存储库中找到 -https://github.com/bijukunjummen/boot-with-sns-sqs
翻译自: https://www.javacodegeeks.com/2020/04/processing-sqs-messages-using-spring-boot-and-project-reactor-part-2.html