RocketMQ生产者消息发送出去了,消费者一直接收不到怎么办?(Rocket MQ订阅关系一致性)

问题: 使用RocketMQ消息队列,生产者将数据发送出去了,但是生产者一致没接收到(或者是间隔好几分钟,突然接收到一条数据)怎么办?并且通过rocket web控制台查看消息的状态为NOT_ONELINE或者NOT_CONSUME,(如下图) 这种诡异现象该怎么解决?
在这里插入图片描述

1. 先说解决方案

这种情况99%是由于订阅关系不一致导致的,可以排查下程序看看是否有多个消费者使用了同一个group,并且订阅了不同的主题。逻辑图展示如下:
在这里插入图片描述
这种情况只需要将不同的消费者的group区分一下即可, 逻辑关系图变成如下这种:
在这里插入图片描述

到此为止,是不是惊奇的发现,问题解决了?

2. 注意事项:订阅关系一致性

看下Rocket MQ官方文档给出的说明:

定义
消费者分组是 Apache RocketMQ 系统中承载多个消费行为一致的消费者的负载均衡分组。

和消费者不同,消费者分组并不是运行实体,而是一个逻辑资源。在 Apache RocketMQ 中,通过消费者分组内初始化多个消费者实现消费性能的水平扩展以及高可用容灾。

在这里插入图片描述
这里面只描述出了Tag的一致,事实上下面这种订阅关系也是错误的,同一个group中的两个消费者分别订阅了不同的主题, 违背了定义中的消费行为一致原则:

//Consumer c1
Consumer c1 = ConsumerBuilder.build(groupA);
c1.subscribe(topicA);
//Consumer c2Consumer 
c2 = ConsumerBuilder.build(groupA);
c2.subscribe(topicB);

3. 剖析源码实现,分析原因

从GitHub下载rocketmq源码通过idea打开之后,从官方提供的example进来:
在这里插入图片描述
进入到DefaultMQPushConsumer构造方法中,可以发现初始化了一个DefaultMQPushConsumerImpl类:

    public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,AllocateMessageQueueStrategy allocateMessageQueueStrategy) {this.consumerGroup = consumerGroup;this.namespace = namespace;this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;// 这里初始化一个默认的push类型的Consumer实现类defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);}

然后继续进入到DefaultMQPushConsumerImpl类中, 可以看见有一个成员变量MQClientInstance mQClientFactory, 在DefaultMQPushConsumerImpl类的start()(启动消费者)方法中会通过MQClientManager初始化MQClientInstance类.
在这里插入图片描述
接着跳转到MQClientInstance构造方法中, 会发现有这样一行代码, 初始化了一个rebalanceService. 这个rebalanceService就是RocketMQ隔一段时间进行rebalance的核心实现.
在这里插入图片描述
继续剖析RebalanceService类, 发现其实现了Runnable接口, 话不多说, 直接看其 run()方法中做了什么事.

呀! 原来是隔一段时间调用一次上述咱们提到的DefaultMQPushConsumerImpl类中的doRebalance()方法, 搞了半天又绕回来了. … … … … … …

直接进入到这里面, 看看rebalance的逻辑:
在这里插入图片描述
集群部署模式下, 会进行rebalance操作, 根据topic名称和group名称获取到所有的consumer列表.

case CLUSTERING: {Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);// 这里根据topic名称和Group进行获取到所有的consumerList<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);if (null == mqSet) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {this.messageQueueChanged(topic, Collections.<MessageQueue>emptySet(), Collections.<MessageQueue>emptySet());log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);}}

但是进去这行代码里面发现, topic名称仅仅用来获取Broker的网络地址, 真正获取到所有Consumer列表的是通过Group名称获取的, 看到这里相信大家基本上能够恍然大悟. 回归到上面的问题: 如果一个一个Group中的多个消费者分别订阅了不同的主题, 即: 消费行为不一致, 无论这个属于当前Group中的消费者是否订阅了这个主题, 都会参与rebalance.
在这里插入图片描述
画图解释一下, 假设在同一个Group下, 两个Consumer都分别订阅了Topic1和Topic2, 这种情况订阅关系一致,
在这里插入图片描述
假设消费者1消费Topic2的速度比较快, 经过一次rebalance之后, Consumer订阅的队列逻辑有可能成为这样的:
在这里插入图片描述
此时由于订阅关系的一致性, 整体系统并不会出现问题. 接下来看一种情况, 同一个消费组中的Consumer1 订阅了Topic1, Consumer2订阅了Topic2, 初始情况逻辑关系是这样:
在这里插入图片描述
由于进行rebalance是通过Group获取对应的消费者客户端ID, 因此rebalance之后可能出现Consumer1 指向了Topic2中的某一个队列, 同理, Consumer2指向了Topic1中的队列. 但是这与Consumer中设定的topic不一致, 因此会出现RocketMQ中消息状态为为NOT_COMSUME_YET

(个人通过对源码的简单梳理总结的文章, 如有错误欢迎指正)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/125508.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

韦东山D1S板子——利用xfel工具初始化内置64MB内存,并直接下载程序到内存运行

1、前言 &#xff08;1&#xff09;最近使用韦东山老师的D1S板子学习RISC-V架构知识&#xff0c;我是结合《RISC-V体系结构编程与实践》这本书的进行学习&#xff0c;其中韦东山老师对书中的代码做了部分移植&#xff0c;到MMU模块就没有在移植书中代码&#xff1b; &#xff0…

java毕业设计基于springboot+vue高校本科学生综评系统

项目介绍 本系统是利用Spring Boot框架而设计的一款结合用户的实际情况而设计的平台&#xff0c;利用VUE技术来将可供学生和管理员来使用的所有界面来显示出来&#xff0c;利用Java语言技术来编程实现用户和管理员所执行的各类操作业务逻辑&#xff0c;以MySQL数据库来存取系统…

把Qt6.2.4内置的标签打印了一遍

2023年10月31日&#xff0c;周二晚上 #include <QGridLayout> #include <QPushButton> #include <QLabel> #include <QApplication> #include <QStyle>int main(int argc, char *argv[]) {QApplication a(argc, argv);QWidget widget;widget.set…

Python 数学函数和 math 模块指南

Python 提供了一组内置的数学函数&#xff0c;包括一个广泛的数学模块&#xff0c;可以让您对数字执行数学任务。 内置数学函数。min() 和 max() 函数可用于在可迭代对象中查找最低或最高值&#xff1a; 示例&#xff1a;查找可迭代对象中的最低或最高值&#xff1a; x min…

Webpack常见的插件和模式

文章目录 一、认识插件Plugin1.认识Plugin 二、CleanWebpackPlugin三、HtmlWebpackPlugin1.生成index.html分析2.自定义HTML模板3.自定义模板数据填充 四、DefinePlugin1.DefinePlugin的介绍2.DefinePlugin的使用 五、Mode配置 一、认识插件Plugin 1.认识Plugin Webpack的另一…

MySQL主从复制原理

1、MySQL主从复制的三个步骤及其原理图 slave会从master读取binlog来进行数据同步 MySQL复制过程分成三步&#xff1a; 1、master将改变记录到二进制日志&#xff08;binary log&#xff09;。这些记录过程叫做二进制日志事件&#xff0c;binary log events。 2、slave将ma…

AcWing 第127场周赛 构造矩阵

构造题目&#xff0c;考虑去除掉最后一行最后一列先进行考虑&#xff0c;假设除了最后一行和最后一列都已经排好了&#xff08;你可以随便排&#xff09;&#xff0c;那么分析知最后一个数字由限制以外其他都已经确定了&#xff0c;无解的情况是k为-1 并且n&#xff0c;m的奇偶…

【多线程面试题十七】、如果不使用synchronized和Lock,如何保证线程安全?

文章底部有个人公众号&#xff1a;热爱技术的小郑。主要分享开发知识、学习资料、毕业设计指导等。有兴趣的可以关注一下。为何分享&#xff1f; 踩过的坑没必要让别人在再踩&#xff0c;自己复盘也能加深记忆。利己利人、所谓双赢。 面试官&#xff1a;如果不使用synchronized…

【Linux】虚拟机部署与发布J2EE项目(Linux版本)

&#x1f389;&#x1f389;欢迎来到我的CSDN主页&#xff01;&#x1f389;&#x1f389; &#x1f3c5;我是Java方文山&#xff0c;一个在CSDN分享笔记的博主。&#x1f4da;&#x1f4da; &#x1f31f;推荐给大家我的专栏《微信小程序开发实战》。&#x1f3af;&#x1f3a…

零资源的大语言模型幻觉预防

零资源的大语言模型幻觉预防 摘要1 引言2 相关工作2.1 幻觉检测和纠正方法2.2 幻觉检测数据集 3 方法论3.1 概念提取3.2 概念猜测3.2.1 概念解释3.2.2 概念推理 3.3 聚合3.3.1 概念频率分数3.3.2 加权聚合 4 实验5 总结 摘要 大语言模型&#xff08;LLMs&#xff09;在各个领域…

FedGNN: Federated Graph Neural Network for Privacy-Preserving Recommendation

FedGNN&#xff1a;用于隐私保护推荐的联邦图神经网络 参考笔记 ICML-21-workshop 本文的主要创新工作 在具有局部差分隐私的模型训练中保护模型梯度&#xff0c;并提出一种伪交互项目采样技术来保护用户与之交互的项目。提出了一种保护隐私的用户-项目图扩展方法&#xff0…

Go学习第十七章——Gin中间件与路由

Go web框架——Gin中间件与路由 1 单独注册中间件1.1 入门案例1.2 多个中间件1.3 中间件拦截响应1.4 中间件放行 2 全局注册中间件3 自定义参数传递4 路由分组4.1 入门案例4.2 路由分组注册中间件4.3 综合使用 5 使用内置的中间件6 中间件案例权限验证耗时统计 1 单独注册中间件…

【c++|opencv】一、基础操作---3.访问图像元素

every blog every motto: You can do more than you think. https://blog.csdn.net/weixin_39190382?typeblog 0. 前言 访问图像元素 1. 访问图像像素 1.1 访问某像素 //灰度图像&#xff1a; image.at<uchar>(j, i) //j为行数&#xff0c;i为列数 //BGR彩色图像 i…

思维训练第三课 反意疑问句

系列文章目录 文章目录 系列文章目录前言一、什么是反意疑问句二、反意疑问句的回答&#x1f49a;主系表/主谓宾&#xff08;肯定&#xff09;&#xff0c;否定提问1、一般现在时2、一般过去时3、一般将来时4、现在完成时 &#x1f49b; 主谓宾1、一般现在2、一般过去3、一般将…

大数据可视化BI分析工具Apache Superset实现公网远程访问

大数据可视化BI分析工具Apache Superset实现公网远程访问 文章目录 大数据可视化BI分析工具Apache Superset实现公网远程访问前言1. 使用Docker部署Apache Superset1.1 第一步安装docker 、docker compose1.2 克隆superset代码到本地并使用docker compose启动 2. 安装cpolar内网…

【mfc/VS2022】计图实验:绘图工具设计知识笔记3

实现类对串行化的支持 如果要用CArchive类保存对象的话&#xff0c;那么这个对象的类必须支持串行化。一个可串行化的类通常有一个Serialize成员函数。要想使一个类可串行化&#xff0c;要经历以下5个步骤&#xff1a; 1、从CObject派生类 2、重写Serialize成员函数 3、使用DE…

java基础 集合2

前9点&#xff0c;在另一篇作品中&#xff0c;可以从集合1开始观看 9.List遍历方式&#xff1a; 10.Arraylist底层原理&#xff1a; 11.Linklist底层原理&#xff1a; 1.LinkedList做队列和栈&#xff1a; package day01;import java.util.ArrayList; import java.util.I…

MySQL扩展语句和约束方式

一、扩展语句 复制&#xff0c;通过like这个语法直接复制bbb的表结构。只是复制表结构&#xff0c;不能复制表里面的数据 把bbb表里面的数据&#xff0c;复制到test&#xff0c;两个表数据结构要一致 创建一张表&#xff0c;test1,数据从bbb来&#xff0c;表结构也是bbb delete…

医院室内地图导航技术分析与作用

随着科技的不断发展&#xff0c;医疗行业的服务水平也在逐步提高。为了方便患者和医务人员&#xff0c;医院室内地图导航技术应运而生。这种技术运用了多种元素&#xff0c;包括模型地图、室内3D电子地图、路线指引、对接医院系统、位置分享和寻车导航等&#xff0c;为医院提供…

论文阅读——DistilBERT

ArXiv&#xff1a;https://arxiv.org/abs/1910.01108 Train Loss: DistilBERT&#xff1a; DistilBERT具有与BERT相同的一般结构&#xff0c;层数减少2倍&#xff0c;移除token类型嵌入和pooler。从老师那里取一层来初始化学生。 The token-type embeddings and the pooler a…