Spring Cloud Stream如何消费自己生产的消息

在上一篇《Spring Cloud Stream如何处理消息重复消费》中,我们通过消费组的配置解决了多实例部署情况下消息重复消费这一入门时的常见问题。本文将继续说说在另外一个被经常问到的问题:如果微服务生产的消息自己也想要消费一份,应该如何实现呢?

常见错误

在放出标准答案前,先放出一个常见的错误姿势和告警信息(以便您可以通过搜索引擎找到这里^_^)。以下错误基于Spring Boot 2.0.5、Spring Cloud Finchley SR1。

首先,根据入门示例,为了生产和消费消息,需要定义两个通道:一个输入、一个输出。比如下面这样:

public interface TestTopic {

String OUTPUT = "example-topic";
String INPUT = "example-topic";

@Output(OUTPUT)
MessageChannel output();

@Input(INPUT)
SubscribableChannel input();

}

通过INPUTOUTPUT使用相同的名称,让生产消息和消费消息指向相同的Topic,从而实现消费自己发出的消息。

接下来,创建一个HTTP接口,并通过上面定义的输出通道触来生产消息,比如:

@Slf4j
@RestController
public class TestController {

@Autowired
private TestTopic testTopic;

@GetMapping("/sendMessage")
public String messageWithMQ(@RequestParam String message) {
testTopic.output().send(MessageBuilder.withPayload(message).build());
return "ok";
}

}

已经有生产消息的实现,下面来创建对输入通道的监听,以实现消息的消费逻辑。

@Slf4j
@Component
public class TestListener {

@StreamListener(TestTopic.INPUT)
public void receive(String payload) {
log.info("Received: " + payload);
throw new RuntimeException("BOOM!");
}

}

最后,在应用主类中,使用@EnableBinding注解来开启它,比如:

@EnableBinding(TestTopic.class)
@SpringBootApplication
public class TestApplication {

public static void main(String[] args) {
SpringApplication.run(TestApplication.class, args);
}

}

看似天衣无缝的操作,然而在启动的瞬间,你可能收到了下面这样的错误:

org.springframework.beans.factory.BeanDefinitionStoreException: Invalid bean definition with name 'example-topic' defined in com.didispace.stream.TestTopic: bean definition with this name already exists - Root bean: class [null]; scope=; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=com.didispace.stream.TestTopic; factoryMethodName=input; initMethodName=null; destroyMethodName=null
at org.springframework.cloud.stream.binding.BindingBeanDefinitionRegistryUtils.registerBindingTargetBeanDefinition(BindingBeanDefinitionRegistryUtils.java:64) ~[spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]
at org.springframework.cloud.stream.binding.BindingBeanDefinitionRegistryUtils.registerOutputBindingTargetBeanDefinition(BindingBeanDefinitionRegistryUtils.java:54) ~[spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]
at org.springframework.cloud.stream.binding.BindingBeanDefinitionRegistryUtils.lambda$registerBindingTargetBeanDefinitions$0(BindingBeanDefinitionRegistryUtils.java:86) ~[spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]
at org.springframework.util.ReflectionUtils.doWithMethods(ReflectionUtils.java:562) ~[spring-core-5.0.9.RELEASE.jar:5.0.9.RELEASE]
at org.springframework.util.ReflectionUtils.doWithMethods(ReflectionUtils.java:541) ~[spring-core-5.0.9.RELEASE.jar:5.0.9.RELEASE]
at org.springframework.cloud.stream.binding.BindingBeanDefinitionRegistryUtils.registerBindingTargetBeanDefinitions(BindingBeanDefinitionRegistryUtils.java:76) ~[spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]
at org.springframework.cloud.stream.config.BindingBeansRegistrar.registerBeanDefinitions(BindingBeansRegistrar.java:45) ~[spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]
at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.lambda$loadBeanDefinitionsFromRegistrars$1(ConfigurationClassBeanDefinitionReader.java:358) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE]
at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684) ~[na:1.8.0_151]
at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.loadBeanDefinitionsFromRegistrars(ConfigurationClassBeanDefinitionReader.java:357) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE]
at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.loadBeanDefinitionsForConfigurationClass(ConfigurationClassBeanDefinitionReader.java:145) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE]
at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.loadBeanDefinitions(ConfigurationClassBeanDefinitionReader.java:117) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE]
at org.springframework.context.annotation.ConfigurationClassPostProcessor.processConfigBeanDefinitions(ConfigurationClassPostProcessor.java:328) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE]
at org.springframework.context.annotation.ConfigurationClassPostProcessor.postProcessBeanDefinitionRegistry(ConfigurationClassPostProcessor.java:233) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE]
at org.springframework.context.support.PostProcessorRegistrationDelegate.invokeBeanDefinitionRegistryPostProcessors(PostProcessorRegistrationDelegate.java:271) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE]
at org.springframework.context.support.PostProcessorRegistrationDelegate.invokeBeanFactoryPostProcessors(PostProcessorRegistrationDelegate.java:91) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.invokeBeanFactoryPostProcessors(AbstractApplicationContext.java:694) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:532) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE]
at org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext.refresh(ReactiveWebServerApplicationContext.java:61) ~[spring-boot-2.0.5.RELEASE.jar:2.0.5.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:780) [spring-boot-2.0.5.RELEASE.jar:2.0.5.RELEASE]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:412) [spring-boot-2.0.5.RELEASE.jar:2.0.5.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:333) [spring-boot-2.0.5.RELEASE.jar:2.0.5.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1277) [spring-boot-2.0.5.RELEASE.jar:2.0.5.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1265) [spring-boot-2.0.5.RELEASE.jar:2.0.5.RELEASE]
at com.didispace.stream.TestApplication.main(TestApplication.java:13) [classes/:na]

正确姿势

根据错误提示:Invalid bean definition with name 'example-topic' defined in com.didispace.stream.TestTopic: bean definition with this name already exists,没有启动成功的原因是已经存在了一个名为example-topic的Bean,那么为什么会重复创建这个Bean呢?

实际上,在F版的Spring Cloud Stream中,当我们使用@Output@Input注解来定义消息通道时,都会根据传入的通道名称来创建一个Bean。而在上面的例子中,我们定义的@Output@Input名称是相同的,因为我们系统输入和输出是同一个Topic,这样才能实现对自己生产消息的消费。

既然这样,我们定义相同的通道名是行不通了,那么我们只能通过定义不同的通道名,并为这两个通道配置相同的目标Topic来将这一对输入输出指向同一个实际的Topic。对于上面的错误程序,只需要做如下两处改动:

第一步:修改通道名,使用不同的名字

public interface TestTopic {

String OUTPUT = "example-topic-output";
String INPUT = "example-topic-input";

@Output(OUTPUT)
MessageChannel output();

@Input(INPUT)
SubscribableChannel input();

}

第二步:在配置文件中,为这两个通道设置相同的Topic名称,比如:

spring.cloud.stream.bindings.example-topic-input.destination=aaa-topic
spring.cloud.stream.bindings.example-topic-output.destination=aaa-topic

这样,这两个输入输出通道就会都指向名为aaa-topic的Topic了。

最后,再启动该程序,没有报错。然后访问接口:localhost:8080/sendMessage?message=hello-didi,可以在控制台中看到如下信息:

2018-11-17 23:24:10.425  INFO 32039 --- [ctor-http-nio-2] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
2018-11-17 23:24:10.453 INFO 32039 --- [ctor-http-nio-2] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory.publisher#266753da:0/SimpleConnection@627fba83 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 60752]
2018-11-17 23:24:10.458 INFO 32039 --- [ctor-http-nio-2] o.s.amqp.rabbit.core.RabbitAdmin : Auto-declaring a non-durable, auto-delete, or exclusive Queue (aaa-topic.anonymous.fNUxZ8C0QIafxrhkFBFI1A) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
2018-11-17 23:24:10.483 INFO 32039 --- [IafxrhkFBFI1A-1] com.didispace.stream.TestListener : Received: hello-didi

消费自己生产的消息成功了!读者也还可以访问一下应用的/actuator/beans端点,看看当前Spring上下文中有哪些Bean,应该可以看到有下面Bean,也就是上面分析的两个通道的Bean对象

"example-topic-output": {
"aliases": [],
"scope": "singleton",
"type": "org.springframework.integration.channel.DirectChannel",
"resource": null,
"dependencies": []
},
"example-topic-input": {
"aliases": [],
"scope": "singleton",
"type": "org.springframework.integration.channel.DirectChannel",
"resource": null,
"dependencies": []
},

后记

其实大部分开发者在使用Spring Cloud Stream时候碰到的问题都源于对Spring Cloud Stream的核心概念还是不够理解。所以,还是推荐读一下下面的文章和示例:

  • 入门示例
  • 核心概念
  • 消费组
  • 消费分区

代码示例

本文示例读者可以通过查看下面仓库的中的stream-consumer-self项目:

  • Github
  • Gitee

如果您对这些感兴趣,欢迎star、follow、收藏、转发给予支持!

以下专题教程也许您会有兴趣

  • Spring Boot基础教程
  • Spring Cloud基础教程

money.jpg

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/477671.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LeetCode 400. 第N个数字(数学)

1. 题目 在无限的整数序列 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, …中找到第 n 个数字。 注意: n 是正数且在32为整形范围内 ( n < 2^31)。 示例 1: 输入: 3 输出: 3示例 2: 输入: 11 输出: 0 说明: 第11个数字在序列 1, 2, 3, 4, 5, 6, 7, 8, 9, 1--0--, 11, ... 里是0&a…

图谱实战 | 开源知识图谱融合工具剖析:Dedupe与OpenEA工具实现思想、关键环节与实操分析...

转载公众号 | 老刘说NLP实体对齐旨在发现不同知识图谱中的共指实体&#xff0c;如百度百科的360与Wikipedia中的360 qihoo。实体对齐是知识融合的重要任务&#xff0c;通过实体对齐集成多源知识图谱可以为下游任务提供更加全面的知识表示。实际上&#xff0c;实体对齐本质上就是…

算法岗SSP offer收割指南!

文 | 林小平源 | 知乎前序在本文开始以前&#xff0c;林小平首先需要声明的是这篇超详细面经并不是笔者本人的求职笔记&#xff0c;它是笔者学校隔壁实验室22届毕业学弟的面试心路历程和经验心得。由于笔者和这位学弟经常讨论校招求职和职业发展的问题&#xff0c;并且在秋招以…

Spring Cloud Stream如何处理消息重复消费

最近收到好几个类似的问题&#xff1a;使用Spring Cloud Stream操作RabbitMQ或Kafka的时候&#xff0c;出现消息重复消费的问题。通过沟通与排查下来主要还是用户对消费组的认识不够。其实&#xff0c;在之前的博文以及《Spring Cloud微服务实战》一书中都有提到关于消费组的概…

LeetCode 481. 神奇字符串(找规律)

1. 题目 神奇的字符串 S 只包含 ‘1’ 和 ‘2’&#xff0c;并遵守以下规则&#xff1a; 字符串 S 是神奇的&#xff0c;因为串联字符 ‘1’ 和 ‘2’ 的连续出现次数会生成字符串 S 本身。 字符串 S 的前几个元素如下&#xff1a;S “1221121221221121122 …” 如果我们将…

图谱实战 | ​鲍捷:知识图谱技术在金融领域的分析和应用

转载公众号 | DataFunSummit分享嘉宾&#xff1a;鲍捷博士 文因互联编辑整理&#xff1a;松烨 博瑜科技出品平台&#xff1a;DataFunTalk导读&#xff1a;知识图谱标准件已经全面赋能主流金融场景&#xff0c;经历了7年时间的发展&#xff0c;在金融监管、银行、资管、证券等领…

珍爱生命,远离大厂政治斗争

本文授权转载自公众号“算法圈的小破事”&#xff0c;点击以上卡片进行关注大家好&#xff0c;我是在互联网危险边缘疯狂试探的皮皮虾&#xff0c;今天跟大家分享一个关于大厂政治斗争的故事。有人可能觉得&#xff0c;政治斗争那都是大佬之间的事情&#xff0c;跟我们江湖虾米…

Spring Cloud Finchley版中Consul多实例注册的问题处理

由于Spring Cloud对Etcd的支持一直没能从孵化器中出来&#xff0c;所以目前来说大多用户还在使用Eureka和Consul&#xff0c;之前又因为Eureka 2.0不在开源的消息&#xff0c;外加一些博眼球的标题党媒体使得Eureka的用户有所减少&#xff0c;所以&#xff0c;相信在选择Spring…

论文浅尝 | Continual Learning for Named Entity Recognition

笔记整理&#xff1a;李淑怡&#xff0c;天津大学硕士动机在许多真实任务下&#xff0c;常常需要引入新的实体类型&#xff0c;因此需要重新训练命名实体识别模型。当因为存储或安全问题限制对原始数据的访问时&#xff0c;那么为新实体类型重新标注原始数据的成本将会是高昂的…

Allen AI提出MERLOT,视频理解领域新SOTA!

文 | Yimin_饭煲2021年&#xff0c;多模态领域大概是人工智能研究者们关注者最多的一个领域了。随着各种模态数据集的增长和算力的发展&#xff0c;研究者们开始不断地尝试在一个模型中融合来自各个模态的信息。而在多模态领域的研究中&#xff0c;和视频相关的任务被认为是最复…

基于HMM的中文词性标注 POSTagging

文章目录1. 词性标注1.1 概念1.2 任务1.3 预处理1.4 初步统计预览2. 最大概率模型2.1 训练2.2 预测2.3 结果评估2.4 结果可视化3. 二元隐马尔科夫BiHMM模型3.1 训练3.2 预测3.3 结果评估3.4 结果可视化4. 结果讨论思考本文的代码是在徐老师的代码基础上&#xff0c;自己加了些注…

图谱实战 | 58同城周超:基于招聘场景下的知识图谱构建及应用

转载公众号 | DataFunSummit分享嘉宾&#xff1a;周超 58同城 NLP资深算法工程师编辑整理&#xff1a;吴祺尧 加州大学圣地亚哥分校出品平台&#xff1a;DataFunTalk导读&#xff1a;知识图谱作为一种富信息工程&#xff0c;已经深入到各行各业中&#xff0c;也为产业效率的提升…

2022年薪百万赛道:高性能神经网络与AI芯片应用

随着大数据的发展&#xff0c;计算机芯片算力的提升&#xff0c;人工智能近两年迎来了新一轮的爆发。而人工智能实现超级算力的核心就是AI芯片。AI芯片也被称为人工智能加速器&#xff0c;即专门用于处理人工智能应用中的大量计算任务的模块。2020年我国人工智能芯片市场规模约…

API网关 Zuul1.0 和 2.0 我们该如何选择?

介绍 在今年5月中&#xff0c;Netflix终于开源了它的支持异步调用模式的Zuul网关2.0版本&#xff0c;真可谓千呼万唤始出来。从Netflix的官方博文[附录1]中&#xff0c;我们获得的信息也比较令人振奋&#xff1a; The Cloud Gateway team at Netflix runs and operates more t…

LeetCode 623. 在二叉树中增加一行(BFS/DFS)

文章目录1. 题目2. 解题2.1 BFS2.2 DFS1. 题目 给定一个二叉树&#xff0c;根节点为第1层&#xff0c;深度为 1。在其第 d 层追加一行值为 v 的节点。 添加规则&#xff1a;给定一个深度值 d &#xff08;正整数&#xff09;&#xff0c;针对深度为 d-1 层的每一非空节点 N&a…

论文浅尝 | KR-GCN: 知识感知推理的可解释推荐系统

论文作者&#xff1a;马婷&#xff0c;中国科学院信息工程研究所直博生动机抽取并利用知识图谱(KG)中的多跳关系路径可以提高推荐系统的性能&#xff0c;并提供可解释性。然而&#xff0c;现有的工作仍面临着两个主要的挑战&#xff1a;用户偏好的错误传播和模型的弱解释性。提…

吴恩达,确诊新冠阳性!

编 | 好困 袁榭源 | 新智元【导读】当代人工智能领域最权威的学者之一吴恩达&#xff0c;于2022年2月8日晨在自己推特上宣布新冠检测结果阳性&#xff0c;不过症状轻微。北京时间&#xff0c;2022年2月8日早上6点&#xff0c;吴恩达新冠病毒检测呈阳性。吴恩达表示&#xff0c;…

Spring Cloud Config采用Git存储时两种常用的配置策略

由于Spring Cloud Config默认采用了Git存储&#xff0c;相信很多团队在使用Spring Cloud的配置中心时也会采用这样的策略。即便大家都使用了Git存储&#xff0c;可能还有各种不同的配置方式&#xff0c;本文就来介绍一下两种常用的配置策略。 第一种&#xff1a;多个项目公用一…

图谱实战 | 城市大脑知识图谱构建及应用研究

转载公众号 | 专知随着城市大脑建设进程的推进,城市中积累了大量的物联网(IoT)设备和数据,利用海量设备数据对问题 进行分析和溯源,对于城市大脑建设具有重要意义。该文基于资源描述框架和智能物联网协议概念,提出一种以城市物联网本体为基础的城市大脑知识图谱建设方法,城市大…

震惊!三个万引大佬嘴仗,原来是为了他……?

文&#xff5c;白鹡鸰想把小轶挂到三作编&#xff5c;小轶已把白鹡鸰挂到三作这本应是白鹡鸰在小轶追杀下游刃有余拖稿的一天&#xff0c;结果小轶再次把一篇新论文喂到了我的嘴边。象征性地打开论文&#xff0c;草草扫过去&#xff0c;嗯&#xff0c;迁移学习&#xff0c;嗯&a…