Spring Cloud Stream同一通道根据消息内容分发不同的消费逻辑

应用场景

有的时候,我们对于同一通道中的消息处理,会通过判断头信息或者消息内容来做一些差异化处理,比如:可能在消息头信息中带入消息版本号,然后通过if判断来执行不同的处理逻辑,其代码结构可能是这样的:

@StreamListener(value = TestTopic.INPUT)
public void receiveV1(String payload, @Header("version") String version) {
if("1.0".equals(version)) {
// Version 1.0
}
if("2.0".equals(version)) {
// Version 2.0
}
}

那么当消息处理逻辑复杂的时候,这段逻辑就会变得特别复杂。针对这个问题,在@StreamListener注解中提供了一个不错的属性condition,可以用来优化这样的处理结构。

动手试试

下面通过编写一个简单的例子来具体体会一下这个属性的用法:

@EnableBinding(TestApplication.TestTopic.class)
@SpringBootApplication
public class TestApplication {

public static void main(String[] args) {
SpringApplication.run(TestApplication.class, args);
}

@RestController
static class TestController {

@Autowired
private TestTopic testTopic;

/**
* 消息生产接口
*
* @param message
* @return
*/
@GetMapping("/sendMessage")
public String messageWithMQ(@RequestParam String message) {
testTopic.output().send(MessageBuilder.withPayload(message).setHeader("version", "1.0").build());
testTopic.output().send(MessageBuilder.withPayload(message).setHeader("version", "2.0").build());
return "ok";
}

}

/**
* 消息消费逻辑
*/
@Slf4j
@Component
static class TestListener {

@StreamListener(value = TestTopic.INPUT, condition = "headers['version']=='1.0'")
public void receiveV1(String payload, @Header("version") String version) {
log.info("Received v1 : " + payload + ", " + version);
}

@StreamListener(value = TestTopic.INPUT, condition = "headers['version']=='2.0'")
public void receiveV2(String payload, @Header("version") String version) {
log.info("Received v2 : " + payload + ", " + version);
}

}

interface TestTopic {

String OUTPUT = "example-topic-output";
String INPUT = "example-topic-input";

@Output(OUTPUT)
MessageChannel output();

@Input(INPUT)
SubscribableChannel input();

}

}

内容很简单,既包含了消息的生产,也包含了消息消费。在/sendMessage接口的定义中,发送了两条消息,一条消息的头信息中包含version=1.0,另外一条消息的头信息中包含version=2.0。在消息监听类TestListener中,对TestTopic.INPUT通道定义了两个@StreamListener,这两个监听逻辑有不同的condition,这里的表达式表示会根据消息头信息中的version值来做不同的处理逻辑分发。

在启动应用之前,还要记得配置一下输入输出通道对应的物理目标(exchange或topic名),比如:

spring.cloud.stream.bindings.example-topic-input.destination=test-topic
spring.cloud.stream.bindings.example-topic-input.group=stream-content-route
spring.cloud.stream.bindings.example-topic-output.destination=test-topic

完成了上面配置之后,就可以启动应用,并尝试访问localhost:8080/sendMessage?message=hello接口来发送一个消息到MQ中了。此时可以看到类似下面的日志:

2018-12-24 15:50:33.361  INFO 17683 --- [content-route-1] c.d.stream.TestApplication$TestListener  : Received v1 : hello, 1.0
2018-12-24 15:50:33.363 INFO 17683 --- [content-route-1] c.d.stream.TestApplication$TestListener : Received v2 : hello, 2.0

从日志中可以看到,两条带有不同头信息的消息,分别通过不同的监听处理逻辑输出了对应的日志打印。

代码示例

本文示例读者可以通过查看下面仓库的中的stream-content-route项目:

  • Github
  • Gitee

如果您对这些感兴趣,欢迎star、follow、收藏、转发给予支持!

以下专题教程也许您会有兴趣

  • Spring Boot基础教程
  • Spring Cloud基础教程

money.jpg

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/477686.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

图谱实战 | 再谈图谱表示:图网络表示GE与知识图谱表示KGE的原理对比与实操效果分析...

转载公众号 | 老刘说NLP知识图谱嵌入是一个经典话题,在之前的文章《知识表示技术:图谱表示VS图网络表示及基于距离函数的表示学习总结》中,围绕知识图谱嵌入学习这一主题,对比了知识图谱嵌入与图网络嵌入的异同。而在实际工作中&a…

LeetCode 1247. 交换字符使得字符串相同

1. 题目 有两个长度相同的字符串 s1 和 s2,且它们其中 只含有 字符 “x” 和 “y”,你需要通过「交换字符」的方式使这两个字符串相同。 每次「交换字符」的时候,你都可以在两个字符串中各选一个字符进行交换。 交换只能发生在两个不同的字…

图深度学习前沿工作汇总与解析

图深度学习除了可以应用于标准图推理任务以外,还广泛应用于推荐、疾病或药物预测、自然语言处理、计算机视觉、交通预测等领域。可见,基于图的深度学习不仅有助于挖掘现有图数据背后的丰富价值,而且还通过将关系数据自然地建模为图&#xff0…

Spring Cloud Stream消费失败后的处理策略(四):重新入队(RabbitMQ)

应用场景 之前我们已经通过《Spring Cloud Stream消费失败后的处理策略(一):自动重试》一文介绍了Spring Cloud Stream默认的消息重试功能。本文将介绍RabbitMQ的binder提供的另外一种重试功能:重新入队。 动手试试 准备一个会…

图谱实战 | 华农夏静波:深层语义知识图谱在药物重定位中的应用

转载公众号 | DataFunSummit分享嘉宾:夏静波 华中农业大学 副教授编辑整理:王金华 电科32所出品平台:DataFunTalk导读:自新冠病毒肺炎疫情发生以来,由于传统药物研发周期长,药物重定位(老药新用…

LeetCode 738. 单调递增的数字(贪心)

1. 题目 给定一个非负整数 N&#xff0c;找出小于或等于 N 的最大的整数&#xff0c;同时这个整数需要满足其各个位数上的数字是单调递增。 &#xff08;当且仅当每个相邻位数上的数字 x 和 y 满足 x < y 时&#xff0c;我们称这个整数是单调递增的。&#xff09; 示例 1…

晋升挂了,leader说不是我技术不行

本文授权转载自公众号“算法圈的小破事”&#xff0c;点击以上卡片进行关注大家好&#xff0c;我是在互联网危险边缘疯狂试探的皮皮虾。今天跟大家分享一个故事。晋升去年秋季&#xff0c;我参加了校招入职以来的第一场晋升答辩。答辩前&#xff0c;我比来比去&#xff0c;觉得…

Spring Cloud Stream消费失败后的处理策略(二):自定义错误处理逻辑

应用场景 上一篇《Spring Cloud Stream消费失败后的处理策略&#xff08;一&#xff09;&#xff1a;自动重试》介绍了默认就会生效的消息重试功能。对于一些因环境原因、网络抖动等不稳定因素引发的问题可以起到比较好的作用。但是对于诸如代码本身存在的逻辑错误等&#xff…

论文浅尝 | DEKR: 一个基于描述增强知识图谱的机器学习方法推荐系统

笔记整理&#xff1a;刘尧锟&#xff0c;天津大学硕士链接&#xff1a;https://dl.acm.org/doi/pdf/10.1145/3404835.3462900动机面对大量的机器学习&#xff08;ML&#xff09;方法&#xff0c;为给定的数据集和任务选择合适的方法是一个挑战。一般来说&#xff0c;ML方法或数…

Meta AI 发布 data2vec!统一模态的新里程碑!

文 | ZenMoore编 | 小轶如果让大家举一个最成功的自监督模型的例子&#xff0c;尤其对于各位 NLPer&#xff0c;肯定毫不犹豫地祭出我大 BERT. 想当年 BERT 打了一个名叫 MLM (Masked Language Model) 的响指&#xff0c;直接成了 NLP 灭霸。视觉界、语音界闻声而来&#xff0c…

LeetCode 946. 验证栈序列(栈)

1. 题目 给定 pushed 和 popped 两个序列&#xff0c;每个序列中的 值都不重复&#xff0c;只有当它们可能是在最初空栈上进行的推入 push 和弹出 pop 操作序列的结果时&#xff0c;返回 true&#xff1b;否则&#xff0c;返回 false 。 示例 1&#xff1a; 输入&#xff1a;…

Spring Cloud Stream消费失败后的处理策略(一):自动重试

之前写了几篇关于Spring Cloud Stream使用中的常见问题&#xff0c;比如&#xff1a; 如何处理消息重复消费如何消费自己生产的消息 下面几天就集中来详细聊聊&#xff0c;当消息消费失败之后该如何处理的几种方式。不过不论哪种方式&#xff0c;都需要与具体业务结合&#xf…

会议交流 | DataFunSummit 2022:图机器学习在线峰会

深度学习模型是当今人工智能研究的核心。众所周知&#xff0c;对欧几里得数据&#xff08;例如图像&#xff09;和序列数据&#xff08;例如文本&#xff09;具有颠覆性学习能力的深度学习技术不能直接适用于图结构数据。这种差距推动了图深度学习研究的浪潮&#xff0c;在学术…

专访邱锡鹏:人工智能开源社区的「先行者」

文 | 刘冰一、Echo源 | 极市平台邱锡鹏&#xff0c;复旦大学理学学士和博士。任职复旦大学计算机科学技术学院教授&#xff0c;博导。发表 CCF A/B 类论文 70 余篇&#xff0c;获得 ACL 2017 杰出论文奖&#xff08;CCF A类&#xff09;、CCL 2019 最佳论文奖。出版开源专著《神…

Spring Cloud Stream如何消费自己生产的消息

在上一篇《Spring Cloud Stream如何处理消息重复消费》中&#xff0c;我们通过消费组的配置解决了多实例部署情况下消息重复消费这一入门时的常见问题。本文将继续说说在另外一个被经常问到的问题&#xff1a;如果微服务生产的消息自己也想要消费一份&#xff0c;应该如何实现呢…

LeetCode 400. 第N个数字(数学)

1. 题目 在无限的整数序列 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, …中找到第 n 个数字。 注意: n 是正数且在32为整形范围内 ( n < 2^31)。 示例 1: 输入: 3 输出: 3示例 2: 输入: 11 输出: 0 说明: 第11个数字在序列 1, 2, 3, 4, 5, 6, 7, 8, 9, 1--0--, 11, ... 里是0&a…

图谱实战 | 开源知识图谱融合工具剖析:Dedupe与OpenEA工具实现思想、关键环节与实操分析...

转载公众号 | 老刘说NLP实体对齐旨在发现不同知识图谱中的共指实体&#xff0c;如百度百科的360与Wikipedia中的360 qihoo。实体对齐是知识融合的重要任务&#xff0c;通过实体对齐集成多源知识图谱可以为下游任务提供更加全面的知识表示。实际上&#xff0c;实体对齐本质上就是…

算法岗SSP offer收割指南!

文 | 林小平源 | 知乎前序在本文开始以前&#xff0c;林小平首先需要声明的是这篇超详细面经并不是笔者本人的求职笔记&#xff0c;它是笔者学校隔壁实验室22届毕业学弟的面试心路历程和经验心得。由于笔者和这位学弟经常讨论校招求职和职业发展的问题&#xff0c;并且在秋招以…

Spring Cloud Stream如何处理消息重复消费

最近收到好几个类似的问题&#xff1a;使用Spring Cloud Stream操作RabbitMQ或Kafka的时候&#xff0c;出现消息重复消费的问题。通过沟通与排查下来主要还是用户对消费组的认识不够。其实&#xff0c;在之前的博文以及《Spring Cloud微服务实战》一书中都有提到关于消费组的概…

LeetCode 481. 神奇字符串(找规律)

1. 题目 神奇的字符串 S 只包含 ‘1’ 和 ‘2’&#xff0c;并遵守以下规则&#xff1a; 字符串 S 是神奇的&#xff0c;因为串联字符 ‘1’ 和 ‘2’ 的连续出现次数会生成字符串 S 本身。 字符串 S 的前几个元素如下&#xff1a;S “1221121221221121122 …” 如果我们将…