Spring Cloud Stream消费失败后的处理策略(二):自定义错误处理逻辑

应用场景

上一篇《Spring Cloud Stream消费失败后的处理策略(一):自动重试》介绍了默认就会生效的消息重试功能。对于一些因环境原因、网络抖动等不稳定因素引发的问题可以起到比较好的作用。但是对于诸如代码本身存在的逻辑错误等,无论重试多少次都不可能成功的问题,是无法修复的。对于这样的情况,前文中说了可以利用日志记录消息内容,配合告警来做补救,但是很显然,这样做非常原始,并且太过笨拙,处理复杂度过高。所以,我们需要需求更好的办法,本文将介绍针对该类问题的一种处理方法:自定义错误处理逻辑。

动手试试

准备一个会消费失败的例子,可以直接沿用前文的工程,也可以新建一个,然后创建如下代码的逻辑:

@EnableBinding(TestApplication.TestTopic.class)
@SpringBootApplication
public class TestApplication {

public static void main(String[] args) {
SpringApplication.run(TestApplication.class, args);
}

@RestController
static class TestController {

@Autowired
private TestTopic testTopic;

/**
* 消息生产接口
*
* @param message
* @return
*/
@GetMapping("/sendMessage")
public String messageWithMQ(@RequestParam String message) {
testTopic.output().send(MessageBuilder.withPayload(message).build());
return "ok";
}

}

/**
* 消息消费逻辑
*/
@Slf4j
@Component
static class TestListener {

@StreamListener(TestTopic.INPUT)
public void receive(String payload) {
log.info("Received payload : " + payload);
throw new RuntimeException("Message consumer failed!");
}

}

interface TestTopic {

String OUTPUT = "example-topic-output";
String INPUT = "example-topic-input";

@Output(OUTPUT)
MessageChannel output();

@Input(INPUT)
SubscribableChannel input();

}

}

内容很简单,既包含了消息的生产,也包含了消息消费。消息消费的时候主动抛出了一个异常来模拟消息的消费失败。

在启动应用之前,还要记得配置一下输入输出通道对应的物理目标(exchange或topic名)、并设置一下分组,比如:

spring.cloud.stream.bindings.example-topic-input.destination=test-topic
spring.cloud.stream.bindings.example-topic-input.group=stream-exception-handler
spring.cloud.stream.bindings.example-topic-input.consumer.max-attempts=1

spring.cloud.stream.bindings.example-topic-output.destination=test-topic

完成了上面配置之后,启动应用并访问localhost:8080/sendMessage?message=hello接口来发送一个消息到MQ中了,此时可以看到消费失败后抛出了异常,跟上一篇文章的结果一样,消息消费失败,记录了日志,消息信息丢弃。

下面,针对消息消费失败,在TestListener中针对消息消费逻辑创建一段错误处理逻辑,比如:

@Slf4j
@Component
static class TestListener {

@StreamListener(TestTopic.INPUT)
public void receive(String payload) {
log.info("Received payload : " + payload);
throw new RuntimeException("Message consumer failed!");
}

/**
* 消息消费失败的降级处理逻辑
*
* @param message
*/
@ServiceActivator(inputChannel = "test-topic.stream-exception-handler.errors")
public void error(Message<?> message) {
log.info("Message consumer failed, call fallback!");
}

}

通过使用@ServiceActivator(inputChannel = "test-topic.stream-exception-handler.errors")指定了某个通道的错误处理映射。其中,inputChannel的配置中对应关系如下:

  • test-topic:消息通道对应的目标(destination,即:spring.cloud.stream.bindings.example-topic-input.destination的配置)
  • stream-exception-handler:消息通道对应的消费组(group,即:spring.cloud.stream.bindings.example-topic-input.group的配置)

再启动应用并访问localhost:8080/sendMessage?message=hello接口来发送一个消息到MQ中,此时可以看到日志如下:

2018-12-11 12:00:35.500  INFO 75269 --- [ctor-http-nio-3] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
2018-12-11 12:00:35.512 INFO 75269 --- [ctor-http-nio-3] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory.publisher#311db1cb:0/SimpleConnection@40370d8c [delegate=amqp://guest@127.0.0.1:5672/, localPort= 54391]
2018-12-11 12:00:35.527 INFO 75269 --- [ption-handler-1] c.d.stream.TestApplication$TestListener : Received: hello,
2018-12-11 12:00:38.541 INFO 75269 --- [ption-handler-1] c.d.stream.TestApplication$TestListener : Message consumer failed, call fallback!

虽然消费逻辑中输出了消息内容之后抛出了异常,但是会进入到error函数中,执行错误处理逻辑(这里只是答应了一句话),用户可以根据需要读取消息内容以及异常详情做更进一步的细化处理。

深入思考

由于error逻辑是通过编码方式来实现的,所以这段逻辑相对来说比较死。通常,只有业务上有明确的错误处理逻辑的时候,这种方法才可以比较好的被应用到。不然能做的可能也只是将消息记录下来,然后具体的分析原因后再去做补救措施。所以这种方法也不是万能的,主要适用于有明确错误处理方案的方式来使用(这种场景并不多),另外。。。

注意:有坑! 这个方案在目前版本(2.0.x)其实还有一个坑,这种方式并不能很好的处理异常消息,会有部分消息得不到正确的处理,由于应用场景也不多,所以目前不推荐使用这种方法来做(完全可以用原始的异常捕获机制来处理,只是没有这种方式那么优雅)。目前看官方issue是在Spring Cloud Stream的2.1.0版本中会修复,后续发布之后可以使用该功能,具体点击查看:Issue #1357。

而对于没有特定的错误处理方案的,也只能通过记录和后续处理来解决,可能这样的方式也只是比从日志中抓去简单那么一些,并没有得到很大的提升。但是,不要紧,因为下一篇我们将继续介绍其他更好的处理方案。

代码示例

本文示例读者可以通过查看下面仓库的中的stream-exception-handler-2项目:

  • Github
  • Gitee

如果您对这些感兴趣,欢迎star、follow、收藏、转发给予支持!

以下专题教程也许您会有兴趣

  • Spring Boot基础教程
  • Spring Cloud基础教程

money.jpg

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/477678.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

论文浅尝 | DEKR: 一个基于描述增强知识图谱的机器学习方法推荐系统

笔记整理&#xff1a;刘尧锟&#xff0c;天津大学硕士链接&#xff1a;https://dl.acm.org/doi/pdf/10.1145/3404835.3462900动机面对大量的机器学习&#xff08;ML&#xff09;方法&#xff0c;为给定的数据集和任务选择合适的方法是一个挑战。一般来说&#xff0c;ML方法或数…

Meta AI 发布 data2vec!统一模态的新里程碑!

文 | ZenMoore编 | 小轶如果让大家举一个最成功的自监督模型的例子&#xff0c;尤其对于各位 NLPer&#xff0c;肯定毫不犹豫地祭出我大 BERT. 想当年 BERT 打了一个名叫 MLM (Masked Language Model) 的响指&#xff0c;直接成了 NLP 灭霸。视觉界、语音界闻声而来&#xff0c…

LeetCode 946. 验证栈序列(栈)

1. 题目 给定 pushed 和 popped 两个序列&#xff0c;每个序列中的 值都不重复&#xff0c;只有当它们可能是在最初空栈上进行的推入 push 和弹出 pop 操作序列的结果时&#xff0c;返回 true&#xff1b;否则&#xff0c;返回 false 。 示例 1&#xff1a; 输入&#xff1a;…

Spring Cloud Stream消费失败后的处理策略(一):自动重试

之前写了几篇关于Spring Cloud Stream使用中的常见问题&#xff0c;比如&#xff1a; 如何处理消息重复消费如何消费自己生产的消息 下面几天就集中来详细聊聊&#xff0c;当消息消费失败之后该如何处理的几种方式。不过不论哪种方式&#xff0c;都需要与具体业务结合&#xf…

会议交流 | DataFunSummit 2022:图机器学习在线峰会

深度学习模型是当今人工智能研究的核心。众所周知&#xff0c;对欧几里得数据&#xff08;例如图像&#xff09;和序列数据&#xff08;例如文本&#xff09;具有颠覆性学习能力的深度学习技术不能直接适用于图结构数据。这种差距推动了图深度学习研究的浪潮&#xff0c;在学术…

专访邱锡鹏:人工智能开源社区的「先行者」

文 | 刘冰一、Echo源 | 极市平台邱锡鹏&#xff0c;复旦大学理学学士和博士。任职复旦大学计算机科学技术学院教授&#xff0c;博导。发表 CCF A/B 类论文 70 余篇&#xff0c;获得 ACL 2017 杰出论文奖&#xff08;CCF A类&#xff09;、CCL 2019 最佳论文奖。出版开源专著《神…

Spring Cloud Stream如何消费自己生产的消息

在上一篇《Spring Cloud Stream如何处理消息重复消费》中&#xff0c;我们通过消费组的配置解决了多实例部署情况下消息重复消费这一入门时的常见问题。本文将继续说说在另外一个被经常问到的问题&#xff1a;如果微服务生产的消息自己也想要消费一份&#xff0c;应该如何实现呢…

LeetCode 400. 第N个数字(数学)

1. 题目 在无限的整数序列 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, …中找到第 n 个数字。 注意: n 是正数且在32为整形范围内 ( n < 2^31)。 示例 1: 输入: 3 输出: 3示例 2: 输入: 11 输出: 0 说明: 第11个数字在序列 1, 2, 3, 4, 5, 6, 7, 8, 9, 1--0--, 11, ... 里是0&a…

图谱实战 | 开源知识图谱融合工具剖析:Dedupe与OpenEA工具实现思想、关键环节与实操分析...

转载公众号 | 老刘说NLP实体对齐旨在发现不同知识图谱中的共指实体&#xff0c;如百度百科的360与Wikipedia中的360 qihoo。实体对齐是知识融合的重要任务&#xff0c;通过实体对齐集成多源知识图谱可以为下游任务提供更加全面的知识表示。实际上&#xff0c;实体对齐本质上就是…

算法岗SSP offer收割指南!

文 | 林小平源 | 知乎前序在本文开始以前&#xff0c;林小平首先需要声明的是这篇超详细面经并不是笔者本人的求职笔记&#xff0c;它是笔者学校隔壁实验室22届毕业学弟的面试心路历程和经验心得。由于笔者和这位学弟经常讨论校招求职和职业发展的问题&#xff0c;并且在秋招以…

Spring Cloud Stream如何处理消息重复消费

最近收到好几个类似的问题&#xff1a;使用Spring Cloud Stream操作RabbitMQ或Kafka的时候&#xff0c;出现消息重复消费的问题。通过沟通与排查下来主要还是用户对消费组的认识不够。其实&#xff0c;在之前的博文以及《Spring Cloud微服务实战》一书中都有提到关于消费组的概…

LeetCode 481. 神奇字符串(找规律)

1. 题目 神奇的字符串 S 只包含 ‘1’ 和 ‘2’&#xff0c;并遵守以下规则&#xff1a; 字符串 S 是神奇的&#xff0c;因为串联字符 ‘1’ 和 ‘2’ 的连续出现次数会生成字符串 S 本身。 字符串 S 的前几个元素如下&#xff1a;S “1221121221221121122 …” 如果我们将…

图谱实战 | ​鲍捷:知识图谱技术在金融领域的分析和应用

转载公众号 | DataFunSummit分享嘉宾&#xff1a;鲍捷博士 文因互联编辑整理&#xff1a;松烨 博瑜科技出品平台&#xff1a;DataFunTalk导读&#xff1a;知识图谱标准件已经全面赋能主流金融场景&#xff0c;经历了7年时间的发展&#xff0c;在金融监管、银行、资管、证券等领…

珍爱生命,远离大厂政治斗争

本文授权转载自公众号“算法圈的小破事”&#xff0c;点击以上卡片进行关注大家好&#xff0c;我是在互联网危险边缘疯狂试探的皮皮虾&#xff0c;今天跟大家分享一个关于大厂政治斗争的故事。有人可能觉得&#xff0c;政治斗争那都是大佬之间的事情&#xff0c;跟我们江湖虾米…

Spring Cloud Finchley版中Consul多实例注册的问题处理

由于Spring Cloud对Etcd的支持一直没能从孵化器中出来&#xff0c;所以目前来说大多用户还在使用Eureka和Consul&#xff0c;之前又因为Eureka 2.0不在开源的消息&#xff0c;外加一些博眼球的标题党媒体使得Eureka的用户有所减少&#xff0c;所以&#xff0c;相信在选择Spring…

论文浅尝 | Continual Learning for Named Entity Recognition

笔记整理&#xff1a;李淑怡&#xff0c;天津大学硕士动机在许多真实任务下&#xff0c;常常需要引入新的实体类型&#xff0c;因此需要重新训练命名实体识别模型。当因为存储或安全问题限制对原始数据的访问时&#xff0c;那么为新实体类型重新标注原始数据的成本将会是高昂的…

Allen AI提出MERLOT,视频理解领域新SOTA!

文 | Yimin_饭煲2021年&#xff0c;多模态领域大概是人工智能研究者们关注者最多的一个领域了。随着各种模态数据集的增长和算力的发展&#xff0c;研究者们开始不断地尝试在一个模型中融合来自各个模态的信息。而在多模态领域的研究中&#xff0c;和视频相关的任务被认为是最复…

基于HMM的中文词性标注 POSTagging

文章目录1. 词性标注1.1 概念1.2 任务1.3 预处理1.4 初步统计预览2. 最大概率模型2.1 训练2.2 预测2.3 结果评估2.4 结果可视化3. 二元隐马尔科夫BiHMM模型3.1 训练3.2 预测3.3 结果评估3.4 结果可视化4. 结果讨论思考本文的代码是在徐老师的代码基础上&#xff0c;自己加了些注…

图谱实战 | 58同城周超:基于招聘场景下的知识图谱构建及应用

转载公众号 | DataFunSummit分享嘉宾&#xff1a;周超 58同城 NLP资深算法工程师编辑整理&#xff1a;吴祺尧 加州大学圣地亚哥分校出品平台&#xff1a;DataFunTalk导读&#xff1a;知识图谱作为一种富信息工程&#xff0c;已经深入到各行各业中&#xff0c;也为产业效率的提升…

2022年薪百万赛道:高性能神经网络与AI芯片应用

随着大数据的发展&#xff0c;计算机芯片算力的提升&#xff0c;人工智能近两年迎来了新一轮的爆发。而人工智能实现超级算力的核心就是AI芯片。AI芯片也被称为人工智能加速器&#xff0c;即专门用于处理人工智能应用中的大量计算任务的模块。2020年我国人工智能芯片市场规模约…