Spring Cloud Stream如何处理消息重复消费

最近收到好几个类似的问题:使用Spring Cloud Stream操作RabbitMQ或Kafka的时候,出现消息重复消费的问题。通过沟通与排查下来主要还是用户对消费组的认识不够。其实,在之前的博文以及《Spring Cloud微服务实战》一书中都有提到关于消费组的概念以及作用。

那么什么是消费组呢?为什么要用消费组?它解决什么问题呢?摘录一段之前博文的内容,来解答这些疑问:

通常在生产环境,我们的每个服务都不会以单节点的方式运行在生产环境,当同一个服务启动多个实例的时候,这些实例都会绑定到同一个消息通道的目标主题(Topic)上。默认情况下,当生产者发出一条消息到绑定通道上,这条消息会产生多个副本被每个消费者实例接收和处理(出现上述重复消费问题)。但是有些业务场景之下,我们希望生产者产生的消息只被其中一个实例消费,这个时候我们需要为这些消费者设置消费组来实现这样的功能。

详细也可查看原文:消息驱动的微服务(消费组)。

下面,通过一个例子来看看如何使用消费组:

问题重现

构建消息消费端

第一步:创建绑定接口,绑定example-topic输入通道(默认情况下,会绑定到RabbitMQ的同名Exchange或Kafaka的同名Topic)。

interface ExampleBinder {

String NAME = "example-topic";

@Input(NAME)
SubscribableChannel input();

}

第二步:对上述输入通道创建监听与处理逻辑。

@EnableBinding(ExampleBinder.class)
public class ExampleReceiver {

private static Logger logger = LoggerFactory.getLogger(ExampleReceiver.class);

@StreamListener(ExampleBinder.NAME)
public void receive(String payload) {
logger.info("Received: " + payload);
}

}

第三步;创建应用主类和配置文件

@SpringBootApplication
public class ExampleApplication {

public static void main(String[] args) {
SpringApplication.run(ExampleApplication.class, args);
}

}
spring.application.name=stream-consumer-group
server.port=0

这里设置server.port=0,以方便在本地启动多实例来重现问题。

完成上述操作之后,启动两个该应用的实例,以备后续调用。

构建消息生产端

比较简单,需要注意的是,使用@Output创建一个同名的输出绑定,这样发出的消息才能被上述启动的实例接收到。具体实现如下:

@RunWith(SpringRunner.class)
@EnableBinding(value = {ExampleApplicationTests.ExampleBinder.class})
public class ExampleApplicationTests {

@Autowired
private ExampleBinder exampleBinder;

@Test
public void exampleBinderTester() {
exampleBinder.output().send(MessageBuilder.withPayload("Produce a message from : http://blog.didispace.com").build());
}

public interface ExampleBinder {

String NAME = "example-topic";

@Output(NAME)
MessageChannel output();

}

}

启动上述测试用例之后,可以发现之前启动的两个实例都收到的消息,并在日志中打印了:Received: Produce a message from : http://blog.didispace.com。消息重复消费的问题成功重现!

使用消费组解决问题

如何解决上述消息重复消费的问题呢?我们只需要在配置文件中增加如下配置即可:

spring.cloud.stream.bindings.example-topic.group=aaa

当我们指定了某个绑定所指向的消费组之后,往当前主题发送的消息在每个订阅消费组中,只会有一个订阅者接收和消费,从而实现了对消息的负载均衡。只所以之前会出现重复消费的问题,是由于默认情况下,任何订阅都会产生一个匿名消费组,所以每个订阅实例都会有自己的消费组,从而当有消息发送的时候,就形成了广播的模式。

另外,需要注意上述配置中example-topic是在代码中@Output@Input中传入的名字。

代码示例

本文示例读者可以通过查看下面仓库的中的stream-consumer-group项目:

  • Github
  • Gitee

如果您对这些感兴趣,欢迎star、follow、收藏、转发给予支持!

以下专题教程也许您会有兴趣

  • Spring Boot基础教程
  • Spring Cloud基础教程

money.jpg

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/477667.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LeetCode 481. 神奇字符串(找规律)

1. 题目 神奇的字符串 S 只包含 ‘1’ 和 ‘2’,并遵守以下规则: 字符串 S 是神奇的,因为串联字符 ‘1’ 和 ‘2’ 的连续出现次数会生成字符串 S 本身。 字符串 S 的前几个元素如下:S “1221121221221121122 …” 如果我们将…

图谱实战 | ​鲍捷:知识图谱技术在金融领域的分析和应用

转载公众号 | DataFunSummit分享嘉宾:鲍捷博士 文因互联编辑整理:松烨 博瑜科技出品平台:DataFunTalk导读:知识图谱标准件已经全面赋能主流金融场景,经历了7年时间的发展,在金融监管、银行、资管、证券等领…

珍爱生命,远离大厂政治斗争

本文授权转载自公众号“算法圈的小破事”,点击以上卡片进行关注大家好,我是在互联网危险边缘疯狂试探的皮皮虾,今天跟大家分享一个关于大厂政治斗争的故事。有人可能觉得,政治斗争那都是大佬之间的事情,跟我们江湖虾米…

Spring Cloud Finchley版中Consul多实例注册的问题处理

由于Spring Cloud对Etcd的支持一直没能从孵化器中出来,所以目前来说大多用户还在使用Eureka和Consul,之前又因为Eureka 2.0不在开源的消息,外加一些博眼球的标题党媒体使得Eureka的用户有所减少,所以,相信在选择Spring…

论文浅尝 | Continual Learning for Named Entity Recognition

笔记整理:李淑怡,天津大学硕士动机在许多真实任务下,常常需要引入新的实体类型,因此需要重新训练命名实体识别模型。当因为存储或安全问题限制对原始数据的访问时,那么为新实体类型重新标注原始数据的成本将会是高昂的…

Allen AI提出MERLOT,视频理解领域新SOTA!

文 | Yimin_饭煲2021年,多模态领域大概是人工智能研究者们关注者最多的一个领域了。随着各种模态数据集的增长和算力的发展,研究者们开始不断地尝试在一个模型中融合来自各个模态的信息。而在多模态领域的研究中,和视频相关的任务被认为是最复…

基于HMM的中文词性标注 POSTagging

文章目录1. 词性标注1.1 概念1.2 任务1.3 预处理1.4 初步统计预览2. 最大概率模型2.1 训练2.2 预测2.3 结果评估2.4 结果可视化3. 二元隐马尔科夫BiHMM模型3.1 训练3.2 预测3.3 结果评估3.4 结果可视化4. 结果讨论思考本文的代码是在徐老师的代码基础上,自己加了些注…

图谱实战 | 58同城周超:基于招聘场景下的知识图谱构建及应用

转载公众号 | DataFunSummit分享嘉宾:周超 58同城 NLP资深算法工程师编辑整理:吴祺尧 加州大学圣地亚哥分校出品平台:DataFunTalk导读:知识图谱作为一种富信息工程,已经深入到各行各业中,也为产业效率的提升…

2022年薪百万赛道:高性能神经网络与AI芯片应用

随着大数据的发展,计算机芯片算力的提升,人工智能近两年迎来了新一轮的爆发。而人工智能实现超级算力的核心就是AI芯片。AI芯片也被称为人工智能加速器,即专门用于处理人工智能应用中的大量计算任务的模块。2020年我国人工智能芯片市场规模约…

API网关 Zuul1.0 和 2.0 我们该如何选择?

介绍 在今年5月中,Netflix终于开源了它的支持异步调用模式的Zuul网关2.0版本,真可谓千呼万唤始出来。从Netflix的官方博文[附录1]中,我们获得的信息也比较令人振奋: The Cloud Gateway team at Netflix runs and operates more t…

LeetCode 623. 在二叉树中增加一行(BFS/DFS)

文章目录1. 题目2. 解题2.1 BFS2.2 DFS1. 题目 给定一个二叉树,根节点为第1层,深度为 1。在其第 d 层追加一行值为 v 的节点。 添加规则:给定一个深度值 d (正整数),针对深度为 d-1 层的每一非空节点 N&a…

论文浅尝 | KR-GCN: 知识感知推理的可解释推荐系统

论文作者:马婷,中国科学院信息工程研究所直博生动机抽取并利用知识图谱(KG)中的多跳关系路径可以提高推荐系统的性能,并提供可解释性。然而,现有的工作仍面临着两个主要的挑战:用户偏好的错误传播和模型的弱解释性。提…

吴恩达,确诊新冠阳性!

编 | 好困 袁榭源 | 新智元【导读】当代人工智能领域最权威的学者之一吴恩达,于2022年2月8日晨在自己推特上宣布新冠检测结果阳性,不过症状轻微。北京时间,2022年2月8日早上6点,吴恩达新冠病毒检测呈阳性。吴恩达表示,…

Spring Cloud Config采用Git存储时两种常用的配置策略

由于Spring Cloud Config默认采用了Git存储,相信很多团队在使用Spring Cloud的配置中心时也会采用这样的策略。即便大家都使用了Git存储,可能还有各种不同的配置方式,本文就来介绍一下两种常用的配置策略。 第一种:多个项目公用一…

图谱实战 | 城市大脑知识图谱构建及应用研究

转载公众号 | 专知随着城市大脑建设进程的推进,城市中积累了大量的物联网(IoT)设备和数据,利用海量设备数据对问题 进行分析和溯源,对于城市大脑建设具有重要意义。该文基于资源描述框架和智能物联网协议概念,提出一种以城市物联网本体为基础的城市大脑知识图谱建设方法,城市大…

震惊!三个万引大佬嘴仗,原来是为了他……?

文|白鹡鸰想把小轶挂到三作编|小轶已把白鹡鸰挂到三作这本应是白鹡鸰在小轶追杀下游刃有余拖稿的一天,结果小轶再次把一篇新论文喂到了我的嘴边。象征性地打开论文,草草扫过去,嗯,迁移学习,嗯&a…

LeetCode 611. 有效三角形的个数(双指针)

1. 题目 给定一个包含非负整数的数组,你的任务是统计其中可以组成三角形三条边的三元组个数。 示例 1: 输入: [2,2,3,4] 输出: 3 解释: 有效的组合是: 2,3,4 (使用第一个 2) 2,3,4 (使用第二个 2) 2,2,3注意: 数组长度不超过1000。 数组里整数的范围为 [0, 1000]…

Eureka 2.0 开源流产,真的对你影响很大吗?

最近连续发烧四天,偶尔刷两下朋友圈都能看到好几条来自不同号的关于《Eureka 2.0开源工作宣告停止,继续使用风险自负》的推文。主要内容如下: 近日,知名服务注册与服务发现工具 Eureka 的 GitHub Wiki 上显示其 2.0 版本的开源工作…

会议交流 | 知识图谱开源开放及生态——7月12日TF65

转载公众号 | 中国计算机学会本期会议邀请到来自阿里巴巴等头部企业的代表,以及来自北京大学、南京大学和浙江大学的研究人员,一起深入探讨开放知识图谱、知识图谱开源工具等所面临的机遇和挑战,并进一步了解知识图谱开放和开源工具构建的应用…

LeetCode 650. 只有两个键的键盘(DP)

1. 题目 最初在一个记事本上只有一个字符 ‘A’。你每次可以对这个记事本进行两种操作: Copy All (复制全部) : 你可以复制这个记事本中的所有字符(部分的复制是不允许的)。Paste (粘贴) : 你可以粘贴你上一次复制的字符。 给定一个数字 n 。你需要使用最少的操作…