Spring Cloud Stream消费失败后的处理策略(一):自动重试

之前写了几篇关于Spring Cloud Stream使用中的常见问题,比如:

  • 如何处理消息重复消费
  • 如何消费自己生产的消息

下面几天就集中来详细聊聊,当消息消费失败之后该如何处理的几种方式。不过不论哪种方式,都需要与具体业务结合,解决不同业务场景可能出现的问题。

今天第一节,介绍一下Spring Cloud Stream中默认就已经配置了的一个异常解决方案:重试!

应用场景

依然要明确一点,任何解决方案都要结合具体的业务实现来确定,不要有了锤子看什么问题都是钉子。那么重试可以解决什么问题呢?由于重试的基础逻辑并不会改变,所以通常重试只能解决因环境不稳定等外在因素导致的失败情况,比如:当我们接收到某个消息之后,需要调用一个外部的Web Service做一些事情,这个时候如果与外部系统的网络出现了抖动,导致调用失败而抛出异常。这个时候,通过重试消息消费的具体逻辑,可能在下一次调用的时候,就能完成整合业务动作,从而解决刚才所述的问题。

动手试试

先通过一个小例子来看看Spring Cloud Stream默认的重试机制是如何运作的。之前在如何消费自己生产的消息一文中的例子,我们可以继续沿用,或者也可以精简一些,都写到一个主类中,比如下面这样:

@EnableBinding(TestApplication.TestTopic.class)
@SpringBootApplication
public class TestApplication {

public static void main(String[] args) {
SpringApplication.run(TestApplication.class, args);
}

@RestController
static class TestController {

@Autowired
private TestTopic testTopic;

/**
* 消息生产接口
* @param message
* @return
*/
@GetMapping("/sendMessage")
public String messageWithMQ(@RequestParam String message) {
testTopic.output().send(MessageBuilder.withPayload(message).build());
return "ok";
}

}

/**
* 消息消费逻辑
*/
@Slf4j
@Component
static class TestListener {

@StreamListener(TestTopic.INPUT)
public void receive(String payload) {
log.info("Received: " + payload);
throw new RuntimeException("Message consumer failed!");
}

}

interface TestTopic {

String OUTPUT = "example-topic-output";
String INPUT = "example-topic-input";

@Output(OUTPUT)
MessageChannel output();

@Input(INPUT)
SubscribableChannel input();

}

}

内容很简单,既包含了消息的生产,也包含了消息消费。与之前例子不同的就是在消息消费逻辑中,主动的抛出了一个异常来模拟消息的消费失败。

在启动应用之前,还要记得配置一下输入输出通道对应的物理目标(exchange或topic名),比如:

spring.cloud.stream.bindings.example-topic-input.destination=test-topic
spring.cloud.stream.bindings.example-topic-output.destination=test-topic

完成了上面配置之后,就可以启动应用,并尝试访问localhost:8080/sendMessage?message=hello接口来发送一个消息到MQ中了。此时可以看到类似下面的日志:

2018-12-10 11:20:21.345  INFO 30499 --- [w2p2yKethOsqg-1] c.d.stream.TestApplication$TestListener  : Received: hello
2018-12-10 11:20:22.350 INFO 30499 --- [w2p2yKethOsqg-1] c.d.stream.TestApplication$TestListener : Received: hello
2018-12-10 11:20:24.354 INFO 30499 --- [w2p2yKethOsqg-1] c.d.stream.TestApplication$TestListener : Received: hello
2018-12-10 11:20:54.651 ERROR 30499 --- [w2p2yKethOsqg-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessagingException: Exception thrown while invoking com.didispace.stream.TestApplication$TestListener#receive[1 args]; nested exception is java.lang.RuntimeException: Message consumer failed!, failedMessage=GenericMessage [payload=byte[5], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=test-topic, amqp_receivedExchange=test-topic, amqp_deliveryTag=2, deliveryAttempt=3, amqp_consumerQueue=test-topic.anonymous.EuqBJu66Qw2p2yKethOsqg, amqp_redelivered=false, id=a89adf96-7de2-f29d-20b6-2fcb0c64cd8c, amqp_consumerTag=amq.ctag-XFy6vXU2w4RB_NRBzImWTA, contentType=application/json, timestamp=1544412051638}]
at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:63)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:203)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1100(AmqpInboundChannelAdapter.java:60)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.lambda$onMessage$0(AmqpInboundChannelAdapter.java:214)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:180)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:211)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1414)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1337)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1324)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1303)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:817)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:801)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:77)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1042)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Message consumer failed!
at com.didispace.stream.TestApplication$TestListener.receive(TestApplication.java:65)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:181)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:114)
at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55)
... 27 more

从日志中可以看到,一共输出了三次Received: hello,也就是说消息消费逻辑执行了3次,然后抛出了最终执行失败的异常。

设置重复次数

默认情况下Spring Cloud Stream会重试3次,我们也可以通过配置的方式修改这个默认配置,比如下面的配置可以将重试次数调整为1次:

spring.cloud.stream.bindings.example-topic-input.consumer.max-attempts=1

对于一些纯内部计算逻辑,不需要依赖外部环境,如果出错通常是代码逻辑错误的情况下,不论我们如何重试都会继续错误的业务逻辑可以将该参数设置为0,避免不必要的重试影响消息处理的速度。

深入思考

完成了上面的基础尝试之后,再思考下面两个问题:

问题一:如果在重试过程中消息处理成功了,还会有异常信息吗?

答案是不会。因为重试过程是消息处理的一个整体,如果某一次重试成功了,会任务对所收到消息的消费成功了。

这个问题可以在上述例子中做一些小改动来验证,比如:

@Slf4j
@Component
static class TestListener {

int counter = 1;

@StreamListener(TestTopic.INPUT)
public void receive(String payload) {
log.info("Received: " + payload + ", " + counter);
if (counter == 3) {
counter = 1;
return;
} else {
counter++;
throw new RuntimeException("Message consumer failed!");
}
}

}

通过加入一个计数器,当重试是第3次的时候,不抛出异常来模拟消费逻辑处理成功了。此时重新运行程序,并调用接口localhost:8080/sendMessage?message=hello,可以获得如下日志结果,并没有异常打印出来。

2018-12-10 16:07:38.390  INFO 66468 --- [L6MGAj-MAj7QA-1] c.d.stream.TestApplication$TestListener  : Received: hello, 1
2018-12-10 16:07:39.398 INFO 66468 --- [L6MGAj-MAj7QA-1] c.d.stream.TestApplication$TestListener : Received: hello, 2
2018-12-10 16:07:41.402 INFO 66468 --- [L6MGAj-MAj7QA-1] c.d.stream.TestApplication$TestListener : Received: hello, 3

也就是,虽然前两次消费抛出了异常,但是并不影响最终的结果,也不会打印中间过程的异常,避免了对日志告警产生误报等问题。

问题二:如果重试都失败之后应该怎么办呢?

如果消息在重试了还是失败之后,目前的配置唯一能做的就是将异常信息记录下来,进行告警。由于日志中有消息的消息信息描述,所以应用维护者可以根据这些信息来做一些补救措施。

当然,这样的做法显然不是最好的,因为太过麻烦。那么怎么做才好呢?且听下回分解!

代码示例

本文示例读者可以通过查看下面仓库的中的stream-exception-handler-1项目:

  • Github
  • Gitee

如果您对这些感兴趣,欢迎star、follow、收藏、转发给予支持!

以下专题教程也许您会有兴趣

  • Spring Boot基础教程
  • Spring Cloud基础教程

money.jpg

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/477674.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

会议交流 | DataFunSummit 2022:图机器学习在线峰会

深度学习模型是当今人工智能研究的核心。众所周知,对欧几里得数据(例如图像)和序列数据(例如文本)具有颠覆性学习能力的深度学习技术不能直接适用于图结构数据。这种差距推动了图深度学习研究的浪潮,在学术…

专访邱锡鹏:人工智能开源社区的「先行者」

文 | 刘冰一、Echo源 | 极市平台邱锡鹏,复旦大学理学学士和博士。任职复旦大学计算机科学技术学院教授,博导。发表 CCF A/B 类论文 70 余篇,获得 ACL 2017 杰出论文奖(CCF A类)、CCL 2019 最佳论文奖。出版开源专著《神…

Spring Cloud Stream如何消费自己生产的消息

在上一篇《Spring Cloud Stream如何处理消息重复消费》中,我们通过消费组的配置解决了多实例部署情况下消息重复消费这一入门时的常见问题。本文将继续说说在另外一个被经常问到的问题:如果微服务生产的消息自己也想要消费一份,应该如何实现呢…

LeetCode 400. 第N个数字(数学)

1. 题目 在无限的整数序列 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, …中找到第 n 个数字。 注意: n 是正数且在32为整形范围内 ( n < 2^31)。 示例 1: 输入: 3 输出: 3示例 2: 输入: 11 输出: 0 说明: 第11个数字在序列 1, 2, 3, 4, 5, 6, 7, 8, 9, 1--0--, 11, ... 里是0&a…

图谱实战 | 开源知识图谱融合工具剖析:Dedupe与OpenEA工具实现思想、关键环节与实操分析...

转载公众号 | 老刘说NLP实体对齐旨在发现不同知识图谱中的共指实体&#xff0c;如百度百科的360与Wikipedia中的360 qihoo。实体对齐是知识融合的重要任务&#xff0c;通过实体对齐集成多源知识图谱可以为下游任务提供更加全面的知识表示。实际上&#xff0c;实体对齐本质上就是…

算法岗SSP offer收割指南!

文 | 林小平源 | 知乎前序在本文开始以前&#xff0c;林小平首先需要声明的是这篇超详细面经并不是笔者本人的求职笔记&#xff0c;它是笔者学校隔壁实验室22届毕业学弟的面试心路历程和经验心得。由于笔者和这位学弟经常讨论校招求职和职业发展的问题&#xff0c;并且在秋招以…

Spring Cloud Stream如何处理消息重复消费

最近收到好几个类似的问题&#xff1a;使用Spring Cloud Stream操作RabbitMQ或Kafka的时候&#xff0c;出现消息重复消费的问题。通过沟通与排查下来主要还是用户对消费组的认识不够。其实&#xff0c;在之前的博文以及《Spring Cloud微服务实战》一书中都有提到关于消费组的概…

LeetCode 481. 神奇字符串(找规律)

1. 题目 神奇的字符串 S 只包含 ‘1’ 和 ‘2’&#xff0c;并遵守以下规则&#xff1a; 字符串 S 是神奇的&#xff0c;因为串联字符 ‘1’ 和 ‘2’ 的连续出现次数会生成字符串 S 本身。 字符串 S 的前几个元素如下&#xff1a;S “1221121221221121122 …” 如果我们将…

图谱实战 | ​鲍捷:知识图谱技术在金融领域的分析和应用

转载公众号 | DataFunSummit分享嘉宾&#xff1a;鲍捷博士 文因互联编辑整理&#xff1a;松烨 博瑜科技出品平台&#xff1a;DataFunTalk导读&#xff1a;知识图谱标准件已经全面赋能主流金融场景&#xff0c;经历了7年时间的发展&#xff0c;在金融监管、银行、资管、证券等领…

珍爱生命,远离大厂政治斗争

本文授权转载自公众号“算法圈的小破事”&#xff0c;点击以上卡片进行关注大家好&#xff0c;我是在互联网危险边缘疯狂试探的皮皮虾&#xff0c;今天跟大家分享一个关于大厂政治斗争的故事。有人可能觉得&#xff0c;政治斗争那都是大佬之间的事情&#xff0c;跟我们江湖虾米…

Spring Cloud Finchley版中Consul多实例注册的问题处理

由于Spring Cloud对Etcd的支持一直没能从孵化器中出来&#xff0c;所以目前来说大多用户还在使用Eureka和Consul&#xff0c;之前又因为Eureka 2.0不在开源的消息&#xff0c;外加一些博眼球的标题党媒体使得Eureka的用户有所减少&#xff0c;所以&#xff0c;相信在选择Spring…

论文浅尝 | Continual Learning for Named Entity Recognition

笔记整理&#xff1a;李淑怡&#xff0c;天津大学硕士动机在许多真实任务下&#xff0c;常常需要引入新的实体类型&#xff0c;因此需要重新训练命名实体识别模型。当因为存储或安全问题限制对原始数据的访问时&#xff0c;那么为新实体类型重新标注原始数据的成本将会是高昂的…

Allen AI提出MERLOT,视频理解领域新SOTA!

文 | Yimin_饭煲2021年&#xff0c;多模态领域大概是人工智能研究者们关注者最多的一个领域了。随着各种模态数据集的增长和算力的发展&#xff0c;研究者们开始不断地尝试在一个模型中融合来自各个模态的信息。而在多模态领域的研究中&#xff0c;和视频相关的任务被认为是最复…

基于HMM的中文词性标注 POSTagging

文章目录1. 词性标注1.1 概念1.2 任务1.3 预处理1.4 初步统计预览2. 最大概率模型2.1 训练2.2 预测2.3 结果评估2.4 结果可视化3. 二元隐马尔科夫BiHMM模型3.1 训练3.2 预测3.3 结果评估3.4 结果可视化4. 结果讨论思考本文的代码是在徐老师的代码基础上&#xff0c;自己加了些注…

图谱实战 | 58同城周超:基于招聘场景下的知识图谱构建及应用

转载公众号 | DataFunSummit分享嘉宾&#xff1a;周超 58同城 NLP资深算法工程师编辑整理&#xff1a;吴祺尧 加州大学圣地亚哥分校出品平台&#xff1a;DataFunTalk导读&#xff1a;知识图谱作为一种富信息工程&#xff0c;已经深入到各行各业中&#xff0c;也为产业效率的提升…

2022年薪百万赛道:高性能神经网络与AI芯片应用

随着大数据的发展&#xff0c;计算机芯片算力的提升&#xff0c;人工智能近两年迎来了新一轮的爆发。而人工智能实现超级算力的核心就是AI芯片。AI芯片也被称为人工智能加速器&#xff0c;即专门用于处理人工智能应用中的大量计算任务的模块。2020年我国人工智能芯片市场规模约…

API网关 Zuul1.0 和 2.0 我们该如何选择?

介绍 在今年5月中&#xff0c;Netflix终于开源了它的支持异步调用模式的Zuul网关2.0版本&#xff0c;真可谓千呼万唤始出来。从Netflix的官方博文[附录1]中&#xff0c;我们获得的信息也比较令人振奋&#xff1a; The Cloud Gateway team at Netflix runs and operates more t…

LeetCode 623. 在二叉树中增加一行(BFS/DFS)

文章目录1. 题目2. 解题2.1 BFS2.2 DFS1. 题目 给定一个二叉树&#xff0c;根节点为第1层&#xff0c;深度为 1。在其第 d 层追加一行值为 v 的节点。 添加规则&#xff1a;给定一个深度值 d &#xff08;正整数&#xff09;&#xff0c;针对深度为 d-1 层的每一非空节点 N&a…

论文浅尝 | KR-GCN: 知识感知推理的可解释推荐系统

论文作者&#xff1a;马婷&#xff0c;中国科学院信息工程研究所直博生动机抽取并利用知识图谱(KG)中的多跳关系路径可以提高推荐系统的性能&#xff0c;并提供可解释性。然而&#xff0c;现有的工作仍面临着两个主要的挑战&#xff1a;用户偏好的错误传播和模型的弱解释性。提…

吴恩达,确诊新冠阳性!

编 | 好困 袁榭源 | 新智元【导读】当代人工智能领域最权威的学者之一吴恩达&#xff0c;于2022年2月8日晨在自己推特上宣布新冠检测结果阳性&#xff0c;不过症状轻微。北京时间&#xff0c;2022年2月8日早上6点&#xff0c;吴恩达新冠病毒检测呈阳性。吴恩达表示&#xff0c;…