消息长度_填坑笔记:RocketMQ消息订阅失败问题?

前语:不要为了读文章而读文章,一定要带着问题来读文章,勤思考。

5b1f76d6429fca6df7474092347c3ef0.png

作者:kinnylee   来源:http://1t.click/g26

# 背景介绍

项目组使用阿里RocketMQ,对同一个消费组设置不同的tag订阅关系,出现消息丢失的问题,本文从rocketmq源码研究消息发布与订阅原理,并分析导致该问题的原因。

# 官方说明

  • 告诉使用者:同一个消费组,必须保持订阅关系一致

  • 为什么?它没有说!只能从源码找答案

    5a601bcc7dbb69a9fe0c86fdbd49c5f9.png

# 问题复现

  • 启动消费者1,消费组为group1,订阅topicA的消息,tag设置为tag1 || tag2

  • 启动消费者2,消费组也为group1,也订阅topicA的消息,但是tag设置为tag3

  • 启动生产者,生产者发送含有tag1,tag2,tag3的消息各10条

  • 消费者1没有收到任何消息,消费者2收到部分消息

# 结论

  • 同一个消费组中,设置不同tag时,后启动的消费者会覆盖先启动的消费者设置的tag

  • tag决定了消息过滤的条件,经过服务端和客户端两层过滤,最后只有后启动的消费者才能收到部分消息

# 原理说明

1、消息如何保存

CommitLog

  • 保存所有topic的原始消息

  • CommitLog分为多个文件,每个文件默认最大为1G

  • 每条记录包括:消息长度和消息文本(消息体,属性,uid等等)

  • 因每条消息长度不一致,每个commitLog的记录长度也不一致

ab0f95dadae9778f73502f6adce67d5e.png

ConsumerQueue

  • 保存某个Topic下某个Queue的索引信息

  • 每条记录包括:消息在commitLog中的offset,消息大小,消息tag的哈希值

  • 每条记录长度固定为20byte

  • producer发送消息后,先保存到commitLog,再异步建立该条消息对应的topic + queue对应的ConsumerQueue索引

  • 第三部分的Hash(tag)是服务端过滤消息的重要依据

dc242eb187c912692a1f566452056979.png

2、consumer如何订阅消息?

注册订阅信息

  • consumer订阅时,会将订阅信息注册到到服务端

  • 保存订阅信息的是Map类,key为topic,value主要是tag

  • subVersion取当前时间。

这里的key是topic,subVersion版本号,这两点很关键!后面有用到!

b7b7dd01e84346cd45f9f9f390aee6fd.png

拉取消息并过滤

  • 拉取消息时,首先从服务端获取订阅关系,得到tag的hash集合codeSet

  • 然后从ConsumerQueue获取一条记录,判断记录的hashCode是否在codeSet中,以达到消息过滤的目的,决定是否将该消息发送给consumer

  • 总之一句话:tag决定了消息是否发到客户端

3、消息过滤

服务端过滤

  • 过滤:tag的hash值过滤

  • 优点:

    • 减少不必要消息占用流量

  • 缺点:

    • Hash存在冲突,过滤不完全准确

ca15655467dd0d7172736efa72364640.png

客户端过滤

  • 服务端过滤存在不准确性,客户端再次精确过滤

  • 客户度过滤:tag的字符串值做对比。不相等的不返回给消费者

原因总结

  • 同一个consumer group的订阅关系,保存在RebalanceImpl类的Map中。key为topic

  • 不同的消费者启动后,依次注册订阅关系,因为tag不一样,导致Map中同一topic的tag被覆盖。比如:消费者1订阅tag1,消费者2订阅tag2。最后map中只保存tag2.

  • 过滤的核心是是tag,tag被更新,过滤条件被改变。服务端过滤后只返回tag2的消息

  • 客户端接收消息后,再次过滤。先启动的消费者1订阅tagA,但是服务端返回tag2,所以消费者1收不到任何消息。消费者2能收到一半的消息(集群模式,假设消息平均分配,另外一半分给tag2)

# 源码分析

1、订阅关系数据结构

3016b0c72b15688e7e8012794392865f.png

2、消费者1启动时注册的订阅关系

d79594d55b976f9435915c2c3fcbec31.png

3、消费者2后启动覆盖订阅关系

93cb0c2abfc9b243cd666f008f0c93c4.png

4、服务端过滤时取出ConsumerQueue的Hash(tag)

bf9d0918563f139712eca903b2968ae0.png

5、对比消息的Hash(tag)和之前保存的订阅关系

a11fb3933a03f3288b4c0b638d6c7da3.png

7、客户端过滤

01989a3c650b9fdc9f5c797690b4ccff.png

热文推荐

这份5G PPT这几天在我的朋友圈刷屏了。

作为一名Java程序员,你竟然不知道Intrumentation!

c97a46ad5adfd4dcfbc75ec8e44e641a.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/420287.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

前端学习(1341):mongoose验证规则延伸

const mongoose require(mongoose); mongoose.connect(mongodb://localhost/playground, { useUnifiedTopology: true }).then(() > console.log(数据库连接成功)).catch(err > console.log(err, 数据库连接失败))//创建集合规则 const postSchema new mongoose.Schema…

艾创机器人_世界教育机器人大赛 2019赛季世界锦标赛落幕曲靖代表队获多个奖项...

来源:曲靖日报-曲靖新闻网本报讯12月14日,世界教育机器人大赛(WER)2019赛季世界锦标赛在上海隆重开赛。来自中国、美国、英国、日本、菲律宾、墨西哥等30多个国家和地区的1万余名选手在大赛中一决高低。刚刚结束的第六届曲靖市青少年机器人竞赛余温未散,从比赛中脱颖…

前端学习(1342):mongoose验证规则拿到错误信息

const mongoose require(mongoose); mongoose.connect(mongodb://localhost/playground, { useUnifiedTopology: true }).then(() > console.log(数据库连接成功)).catch(err > console.log(err, 数据库连接失败))//创建集合规则 const postSchema new mongoose.Schema…

单片机人流统计装置的程序_单片机其实不难

对于大学读电子方面专业的同学们,肯定知道有这么一个神奇的元器件,它枯燥难懂,但也十分吸引人,它就是我们今天要讲的元器件--单片机单片机作为工业控制领域里面最核心的部件,它存在于每一台机器,小到扫地机…

MySQL索引篇

index存储引擎索引InnoDB中的索引MyISAM索引存储引擎 以前一直认为关系型数据库中的索引不重要,知道最近学了MySQL高级篇,才发现,对MySQL一知半解。都是听人泛泛而谈。 首先MySQL服务器是怎么存数据的,怎么取到的,内…

sublime加入input函数_【挑战自学Python编程】第八天:while循环以及input()函数

摘要01 while循环02 input函数03 终端04 使用while循环与input()函数01 while循环在正式讲Python中的while前,希望大家先关注单词一下while,翻译为中文意思是:当。(这里我们只需要这一种意思即可)下面我们开始看while循…

文件循环读取_一个案例轻松认识Python文件处理提取文件中的数字

1、文件打开使用 open() 函数打开文件。它需要两个参数,第一个参数是文件路径或文件名,第二个是文件的打开模式。模式通常是下面这样的:"r",以只读模式打开,你只能读取文件但不能编辑/删除文件的任何内容&qu…

前端学习(1344):用户的增删改查操作1

const http require(http); const mongoose require(mongoose);//数据库连接 mongoose.connect(mongodb://localhost/playground, { useUnifiedTopology: true }).then(() > console.log(数据库连接成功)).catch(() > console.log(数据库连接失败));const userSchema …

索引常用注意事项

索引1. 索引怎么建好?2. 索引容易失效的场景3. 连接查询索引优化4. order by,group by5. 覆盖索引6. 索引下推1. 索引怎么建好? 单表 主键必须唯一,且单调递增有唯一键的,尽量建立唯一键where条件用得比较多的字段查询…

毫米波雷达_最新的7个毫米波雷达应用案例

毫米波雷达传感器如何做到"全天候"?毫米波雷达使用的技术是毫米波(millimeterwave),通常缩写为MMW,波长为1~10毫米,频率为30~300GHz的电磁波。根据波的传播理论,频率越高,波长越短&am…

前端学习(1345):用户的增删改查操作2

//创建http连接 const http require(http); //创建服务器 const app http.createServer(); //第三方模块导入 const mongoose require(mongoose); //获取连接 const url require(url); //数据库连接地址 mongoose.connect(mongodb://localhost/playground, { useUnifiedTop…

太阳光是平行光吗_“彩虹的形成是因为光的色散和光沿直线传播是一回事吗?”...

-1感谢某不愿透露姓名的高中同学提供支持。0请先解释一下你的这个问题提法可能的歧义:究竟是“是因为”后面的词语组成一个整体,还是“和”前面的词语组成一个整体呢?不讲清楚的话,答案会有一些差距。1“彩虹的形成是因为光的色散…

前端学习(1346):用户的增删改查操作3增加

//创建http连接 const http require(http); //创建服务器 const app http.createServer(); //第三方模块导入 const mongoose require(mongoose); //获取连接 const url require(url); // const querystring require(querystring); //数据库连接地址 mongoose.connect(mon…

闪电shader_【Shader案例】怎样做出自然的闪电

(本次案例的效果)最近下载了一套特效,其中一个关于闪电制作的shader想法特别聪明,这里特地附上原代码,并教你怎么把源代码转换成连连看,下面开始正文。这是原本shader定义的属性:_TintColor(闪电的颜色)_MainTex(一张R…

MySQL8数据恢复

binlog数据恢复恢复流程恢复流程 先登录MySQL flush log flush log这样会在MySQL binglog日志目录重新生成二进制文件 查看日志文件名 binlog.000033是我刚生成的日志,现在操作的是binlog.000032这个文件。相当于备份了下。 基于位置恢复(先查看位置…

前端学习(1347):用户的增删改查操作4修改

//创建http连接 const http require(http); //创建服务器 const app http.createServer(); //第三方模块导入 const mongoose require(mongoose); //获取连接 const url require(url); // const querystring require(querystring); //数据库连接地址 mongoose.connect(mon…

前端学习(1348):用户的增删改查操作5修改

//创建http连接 const http require(http); //创建服务器 const app http.createServer(); //第三方模块导入 const mongoose require(mongoose); //获取连接 const url require(url); // const querystring require(querystring); //数据库连接地址 mongoose.connect(mon…

parallelStream与stream

并行流🍭 多线程并发🍭 多线程并发 🍕stream与parallelStream 下面的代码分别用了parallelStream与stream进行迭代。获取对应的每一项值,和对应的线程名称。 package top.lel.jvm.sdk.stream;import java.util.List; import jav…

前端学习(1349):用户的增删改查操作6删除

//创建http连接 const http require(http); //创建服务器 const app http.createServer(); //第三方模块导入 const mongoose require(mongoose); //获取连接 const url require(url); // const querystring require(querystring); //数据库连接地址 mongoose.connect(mon…