Spring Cloud Stream消费失败后的处理策略(四):重新入队(RabbitMQ)

应用场景

之前我们已经通过《Spring Cloud Stream消费失败后的处理策略(一):自动重试》一文介绍了Spring Cloud Stream默认的消息重试功能。本文将介绍RabbitMQ的binder提供的另外一种重试功能:重新入队。

动手试试

准备一个会消费失败的例子,可以直接沿用前文的工程,也可以新建一个,然后创建如下代码的逻辑:

@EnableBinding(TestApplication.TestTopic.class)
@SpringBootApplication
public class TestApplication {

public static void main(String[] args) {
SpringApplication.run(TestApplication.class, args);
}

@RestController
static class TestController {

@Autowired
private TestTopic testTopic;

/**
* 消息生产接口
*
* @param message
* @return
*/
@GetMapping("/sendMessage")
public String messageWithMQ(@RequestParam String message) {
testTopic.output().send(MessageBuilder.withPayload(message).build());
return "ok";
}

}

/**
* 消息消费逻辑
*/
@Slf4j
@Component
static class TestListener {

private int count = 1;

@StreamListener(TestTopic.INPUT)
public void receive(String payload) {
log.info("Received payload : " + payload + ", " + count);
throw new RuntimeException("Message consumer failed!");
}

}

interface TestTopic {

String OUTPUT = "example-topic-output";
String INPUT = "example-topic-input";

@Output(OUTPUT)
MessageChannel output();

@Input(INPUT)
SubscribableChannel input();

}

}

内容很简单,既包含了消息的生产,也包含了消息消费。消息消费的时候主动抛出了一个异常来模拟消息的消费失败。

在启动应用之前,还要记得配置一下输入输出通道对应的物理目标(exchange或topic名)、并设置一下分组,比如:

spring.cloud.stream.bindings.example-topic-input.destination=test-topic
spring.cloud.stream.bindings.example-topic-input.group=stream-exception-handler
spring.cloud.stream.bindings.example-topic-input.consumer.max-attempts=1
spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.requeue-rejected=true

spring.cloud.stream.bindings.example-topic-output.destination=test-topic

完成了上面配置之后,启动应用并访问localhost:8080/sendMessage?message=hello接口来发送一个消息到MQ中了,此时可以看到程序不断的抛出了消息消费异常。这是由于这里我们多加了一个配置:spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.requeue-rejected=true。在该配置作用之下,消息消费失败之后,并不会将该消息抛弃,而是将消息重新放入队列,所以消息的消费逻辑会被重复执行,直到这条消息消费成功为止。

深入思考

在完成了上面的这个例子之后,可能读者会有下面两个常见问题:

问题一:之前介绍的Spring Cloud Stream默认提供的默认功能(spring.cloud.stream.bindings.example-topic-input.consumer.max-attempts)与本文所说的重入队列实现的重试有什么区别?

Spring Cloud Stream默认提供的默认功能只是对处理逻辑的重试,它们的处理逻辑是由同一条消息触发的。而本文所介绍的重新入队史通过重新将消息放入队列而触发的,所以实际上是收到了多次消息而实现的重试。

问题二:如上面的例子那样,消费一直不成功,这些不成功的消息会被不断堆积起来,如何解决这个问题?

对于这个问题,我们可以联合前文介绍的DLQ队列来完善消息的异常处理。

我们只需要增加如下配置,自动绑定dlq队列:

spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.auto-bind-dlq=true

然后改造一下消息处理程序,可以根据业务情况,为进入dlq队列增加一个条件,比如下面的例子:

@StreamListener(TestTopic.INPUT)
public void receive(String payload) {
log.info("Received payload : " + payload + ", " + count);
if (count == 3) {
count = 1;
throw new AmqpRejectAndDontRequeueException("tried 3 times failed, send to dlq!");
} else {
count ++;
throw new RuntimeException("Message consumer failed!");
}
}

设定了计数器count,当count为3的时候抛出AmqpRejectAndDontRequeueException这个特定的异常。此时,当只有当抛出这个异常的时候,才会将消息放入DLQ队列,从而不会造成严重的堆积问题。

代码示例

本文示例读者可以通过查看下面仓库的中的stream-exception-handler-4项目:

  • Github
  • Gitee

如果您对这些感兴趣,欢迎star、follow、收藏、转发给予支持!

以下专题教程也许您会有兴趣

  • Spring Boot基础教程
  • Spring Cloud基础教程

money.jpg

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/477682.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

图谱实战 | 华农夏静波:深层语义知识图谱在药物重定位中的应用

转载公众号 | DataFunSummit分享嘉宾:夏静波 华中农业大学 副教授编辑整理:王金华 电科32所出品平台:DataFunTalk导读:自新冠病毒肺炎疫情发生以来,由于传统药物研发周期长,药物重定位(老药新用…

LeetCode 738. 单调递增的数字(贪心)

1. 题目 给定一个非负整数 N&#xff0c;找出小于或等于 N 的最大的整数&#xff0c;同时这个整数需要满足其各个位数上的数字是单调递增。 &#xff08;当且仅当每个相邻位数上的数字 x 和 y 满足 x < y 时&#xff0c;我们称这个整数是单调递增的。&#xff09; 示例 1…

晋升挂了,leader说不是我技术不行

本文授权转载自公众号“算法圈的小破事”&#xff0c;点击以上卡片进行关注大家好&#xff0c;我是在互联网危险边缘疯狂试探的皮皮虾。今天跟大家分享一个故事。晋升去年秋季&#xff0c;我参加了校招入职以来的第一场晋升答辩。答辩前&#xff0c;我比来比去&#xff0c;觉得…

Spring Cloud Stream消费失败后的处理策略(二):自定义错误处理逻辑

应用场景 上一篇《Spring Cloud Stream消费失败后的处理策略&#xff08;一&#xff09;&#xff1a;自动重试》介绍了默认就会生效的消息重试功能。对于一些因环境原因、网络抖动等不稳定因素引发的问题可以起到比较好的作用。但是对于诸如代码本身存在的逻辑错误等&#xff…

论文浅尝 | DEKR: 一个基于描述增强知识图谱的机器学习方法推荐系统

笔记整理&#xff1a;刘尧锟&#xff0c;天津大学硕士链接&#xff1a;https://dl.acm.org/doi/pdf/10.1145/3404835.3462900动机面对大量的机器学习&#xff08;ML&#xff09;方法&#xff0c;为给定的数据集和任务选择合适的方法是一个挑战。一般来说&#xff0c;ML方法或数…

Meta AI 发布 data2vec!统一模态的新里程碑!

文 | ZenMoore编 | 小轶如果让大家举一个最成功的自监督模型的例子&#xff0c;尤其对于各位 NLPer&#xff0c;肯定毫不犹豫地祭出我大 BERT. 想当年 BERT 打了一个名叫 MLM (Masked Language Model) 的响指&#xff0c;直接成了 NLP 灭霸。视觉界、语音界闻声而来&#xff0c…

LeetCode 946. 验证栈序列(栈)

1. 题目 给定 pushed 和 popped 两个序列&#xff0c;每个序列中的 值都不重复&#xff0c;只有当它们可能是在最初空栈上进行的推入 push 和弹出 pop 操作序列的结果时&#xff0c;返回 true&#xff1b;否则&#xff0c;返回 false 。 示例 1&#xff1a; 输入&#xff1a;…

Spring Cloud Stream消费失败后的处理策略(一):自动重试

之前写了几篇关于Spring Cloud Stream使用中的常见问题&#xff0c;比如&#xff1a; 如何处理消息重复消费如何消费自己生产的消息 下面几天就集中来详细聊聊&#xff0c;当消息消费失败之后该如何处理的几种方式。不过不论哪种方式&#xff0c;都需要与具体业务结合&#xf…

会议交流 | DataFunSummit 2022:图机器学习在线峰会

深度学习模型是当今人工智能研究的核心。众所周知&#xff0c;对欧几里得数据&#xff08;例如图像&#xff09;和序列数据&#xff08;例如文本&#xff09;具有颠覆性学习能力的深度学习技术不能直接适用于图结构数据。这种差距推动了图深度学习研究的浪潮&#xff0c;在学术…

专访邱锡鹏:人工智能开源社区的「先行者」

文 | 刘冰一、Echo源 | 极市平台邱锡鹏&#xff0c;复旦大学理学学士和博士。任职复旦大学计算机科学技术学院教授&#xff0c;博导。发表 CCF A/B 类论文 70 余篇&#xff0c;获得 ACL 2017 杰出论文奖&#xff08;CCF A类&#xff09;、CCL 2019 最佳论文奖。出版开源专著《神…

Spring Cloud Stream如何消费自己生产的消息

在上一篇《Spring Cloud Stream如何处理消息重复消费》中&#xff0c;我们通过消费组的配置解决了多实例部署情况下消息重复消费这一入门时的常见问题。本文将继续说说在另外一个被经常问到的问题&#xff1a;如果微服务生产的消息自己也想要消费一份&#xff0c;应该如何实现呢…

LeetCode 400. 第N个数字(数学)

1. 题目 在无限的整数序列 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, …中找到第 n 个数字。 注意: n 是正数且在32为整形范围内 ( n < 2^31)。 示例 1: 输入: 3 输出: 3示例 2: 输入: 11 输出: 0 说明: 第11个数字在序列 1, 2, 3, 4, 5, 6, 7, 8, 9, 1--0--, 11, ... 里是0&a…

图谱实战 | 开源知识图谱融合工具剖析:Dedupe与OpenEA工具实现思想、关键环节与实操分析...

转载公众号 | 老刘说NLP实体对齐旨在发现不同知识图谱中的共指实体&#xff0c;如百度百科的360与Wikipedia中的360 qihoo。实体对齐是知识融合的重要任务&#xff0c;通过实体对齐集成多源知识图谱可以为下游任务提供更加全面的知识表示。实际上&#xff0c;实体对齐本质上就是…

算法岗SSP offer收割指南!

文 | 林小平源 | 知乎前序在本文开始以前&#xff0c;林小平首先需要声明的是这篇超详细面经并不是笔者本人的求职笔记&#xff0c;它是笔者学校隔壁实验室22届毕业学弟的面试心路历程和经验心得。由于笔者和这位学弟经常讨论校招求职和职业发展的问题&#xff0c;并且在秋招以…

Spring Cloud Stream如何处理消息重复消费

最近收到好几个类似的问题&#xff1a;使用Spring Cloud Stream操作RabbitMQ或Kafka的时候&#xff0c;出现消息重复消费的问题。通过沟通与排查下来主要还是用户对消费组的认识不够。其实&#xff0c;在之前的博文以及《Spring Cloud微服务实战》一书中都有提到关于消费组的概…

LeetCode 481. 神奇字符串(找规律)

1. 题目 神奇的字符串 S 只包含 ‘1’ 和 ‘2’&#xff0c;并遵守以下规则&#xff1a; 字符串 S 是神奇的&#xff0c;因为串联字符 ‘1’ 和 ‘2’ 的连续出现次数会生成字符串 S 本身。 字符串 S 的前几个元素如下&#xff1a;S “1221121221221121122 …” 如果我们将…

图谱实战 | ​鲍捷:知识图谱技术在金融领域的分析和应用

转载公众号 | DataFunSummit分享嘉宾&#xff1a;鲍捷博士 文因互联编辑整理&#xff1a;松烨 博瑜科技出品平台&#xff1a;DataFunTalk导读&#xff1a;知识图谱标准件已经全面赋能主流金融场景&#xff0c;经历了7年时间的发展&#xff0c;在金融监管、银行、资管、证券等领…

珍爱生命,远离大厂政治斗争

本文授权转载自公众号“算法圈的小破事”&#xff0c;点击以上卡片进行关注大家好&#xff0c;我是在互联网危险边缘疯狂试探的皮皮虾&#xff0c;今天跟大家分享一个关于大厂政治斗争的故事。有人可能觉得&#xff0c;政治斗争那都是大佬之间的事情&#xff0c;跟我们江湖虾米…

Spring Cloud Finchley版中Consul多实例注册的问题处理

由于Spring Cloud对Etcd的支持一直没能从孵化器中出来&#xff0c;所以目前来说大多用户还在使用Eureka和Consul&#xff0c;之前又因为Eureka 2.0不在开源的消息&#xff0c;外加一些博眼球的标题党媒体使得Eureka的用户有所减少&#xff0c;所以&#xff0c;相信在选择Spring…

论文浅尝 | Continual Learning for Named Entity Recognition

笔记整理&#xff1a;李淑怡&#xff0c;天津大学硕士动机在许多真实任务下&#xff0c;常常需要引入新的实体类型&#xff0c;因此需要重新训练命名实体识别模型。当因为存储或安全问题限制对原始数据的访问时&#xff0c;那么为新实体类型重新标注原始数据的成本将会是高昂的…