RocketMQ系统性学习-RocketMQ原理分析之Broker接收消息的处理流程

🌈🌈🌈🌈🌈🌈🌈🌈
【11来了】文章导读地址:点击查看文章导读!
🍁🍁🍁🍁🍁🍁🍁🍁

Broker接收消息的处理流程?

既然要分析 Broker 接收消息,那么如何找到 Broker 接收消息并进行处理的程序入口呢?

那么消息既然是从生产者开始发送,消息是有单条消息和批量消息之分的,那么消息肯定是有一个标识,当 Broker 接收到消息之后,肯定是需要通过判断消息的标识来区分单条消息和批量消息,那么只需要找到发送消息的标识,再全局搜索,就可以找到这个标识在哪里被处理,被处理的地方一定就是 Broker 接收消息处理的位置了!

那么还是先找到发送消息的位置:DefaultMQProducer # send(Message msg) ,通过层层调用(这里在生产者发送消息流程中讲了)到达了 DefaultMQProducerImpl # this.sendKernelImpl()

在这个方法中就调用到了 MQ 客户端的发送消息的方法 this.mQClientFactory.getMQClientAPIImpl().sendMessage()

在这里真正的通过 Netty 去发送消息到 Broker 中去:

  1. 通过判断消息的类型构造一个 RemotiongCommand 类型的 request 参数

    这里有 4 个构造 request 参数的方法,如下图会走到第三个方法中,那么这里的请求标识为 RequestCode.SEND_MESSAGE_V2

    在这里插入图片描述

  2. this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request) 方法中通过 Netty 将消息发送出去,那么这个方法需要传入一个 request 参数

在上边构造了 request 并且通过 Netty 发送出去,request 的标识为 RequestCode.SEND_MESSAGE_V2 ,那么我们只需要找到处理该标识的 request 的位置,那就是 Broker 处理消息的位置,在 IDEA 中通过 Ctrl+Shift+F 全局搜索这个标识即可:

在这里插入图片描述

可以发现有三个进行 case 判断的地方:

  • 第一个在 PlainAccessResource 类中
  • 第二个在 SendMessageActivity 类中
  • 第三个在 SendMessageRequestHeader 类中

这里第三个 case 判断的地方就是 Broker 处理消息的位置(可以在三个 case 中都 debug,看断点走到哪里就知道了)

那么我们就在第三个 case 判断的位置打上断点

在这里插入图片描述

接下来启动 NameServer,再以 Debug 的方式启动 Broker,再启动生产者,根据调用堆栈信息来找到 Broker 处理消息的整个调用链:

在这里插入图片描述

根据这个堆栈信息,可以发现,调用链是从 NettyServerHandler 的 channelRead0 转移过来的,那么也就是再 NettyServerHandler 这个 Netty 的服务端接收到消息并进行处理,那么我们就在这个堆栈信息中找 Broker 是在哪里对消息进行处理了呢?

就是在 SendMessageProcessor # processRequest 方法中(也就是堆栈顶第3个方法),在这个方法中:

  1. 通过 parseRequestHeader(request) 先对请求头进行解码,也就是根据请求头 RequestCode.SEND_MESSAGE_V2 的类型做一些相应的处理
  2. 接下来通过 buildMsgContext(ctx, requestHeader, request) 创建消息的上下文对象
  3. this.executeSendMessageHookBefore(sendMessageContext) 执行一些消息发送前的钩子(扩展点)
  4. 核心:this.sendMessage() 真正去发送消息

那么在 this.sendMessage() 中就是真正发送消息的逻辑了:

  1. 首先是 preSend(ctx, request, requestHeader) 进行预发送,这里其实就是对发送的消息进行一些检查(Topic 是否合法?Topic 是否与系统默认 Topic 冲突?Topic 的一些配置是否存在?等等信息)

  2. 如果 queueIdInt < 0 是 true 的话,表明生产者没有指定要发送到哪个队列,那么就通过 99999999 % 队列个数 来选择一个队列发送

  3. 将超过最大重试次数的消息发送到 DLQ 死信队列中去

    if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig, oriProps)) {return response;
    }
    
  4. 接下来判断 Broker 是否开启了 异步模式,如果开启的话,通过 asyncPutMessage() 处理

    如果没有开启 异步模式,通过 putMessage() 处理,这里其实还是调用了 asyncPutMessage(),只不过通过 get() 阻塞等待结果(复用代码)

那么在发送消息的时候,无论是否异步,都会进入到 DefaultMessageStore # asyncPutMessage() 方法中,我们就点进去看看进行了哪些处理:

  1. 执行一些钩子函数,作为扩展点:putMessageHook.executeBeforePutMessage(msg)

  2. 提交文件的写请求:CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg)

    在这个写文件的方法中,主要做一些文件的写操作,以及将文件写入到磁盘中

    1. 获取文件对象:this.mappedFileQueue.getLastMappedFile()
    2. 追加写文件的操作: mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext)
    3. 最后进行刷盘以及高可用的一些处理:handleDiskFlushAndHA(putMessageResult, msg, needAckNums, needHandleHA)
  3. 打印写文件消耗的时间 this.getSystemClock().now() - beginTime

那么 Broker 总体的接收消息的处理流程就是上边将的这么多了,当然还有一些边边角角的内容没有细说,先了解整体的处理流程,不要提前去学习太多的细节!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/237186.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【git学习笔记 01】打标签

文章目录 一、声明二、对标签的基本认知什么是标签&#xff1f;为什么要打标签&#xff1f;如何生成类似github中readme的图标 三、标签相关命令四、示例操作 一、声明 本帖持续更新中如有纰漏&#xff0c;望批评指正&#xff01;参考视频链接&#xff0c;非常感谢原作者&…

5 分钟内搭建一个免费问答机器人:Milvus + LangChain

搭建一个好用、便宜又准确的问答机器人需要多长时间&#xff1f; 答案是 5 分钟。只需借助开源的 RAG 技术栈、LangChain 以及好用的向量数据库 Milvus。必须要强调的是&#xff0c;该问答机器人的成本很低&#xff0c;因为我们在召回、评估和开发迭代的过程中不需要调用大语言…

Backtrader 文档学习-Data Feeds(下)

Backtrader 文档学习-Data Feeds&#xff08;下&#xff09; 1. Data Resampling 当数据仅在单个时间范围内可用&#xff0c;需要在不同的时间范围内进行分析时&#xff0c;就需要进行一些重采样。 “重采样”实际上应该称为“上采样”&#xff0c;因为它是从一个源时间区间到…

C++的泛型编程—模板

目录 一.什么是泛型编程&#xff1f; ​编辑 ​编辑 二.函数模板 函数模板的实例化 当不同类型形参传参时的处理 使用多个模板参数 三.模板参数的匹配原则 四.类模板 1.定义对象时要显式实例化 2.类模板不支持声明与定义分离 3.非类型模板参数 4.模板的特化 函数模板…

MySQL的安装及如何连接到Navicat和IntelliJ IDEA

MySQL的安装及如何连接到Navicat和IntelliJ IDEA 文章目录 MySQL的安装及如何连接到Navicat和IntelliJ IDEA1 MySQL安装1.1 下载1.2 安装(解压)1.3 配置1.3.1 添加环境变量1.3.2 新建配置文件1.3.3 初始化MySQL1.3.4 注册MySQL服务1.3.5 启动MySQL服务1.3.6 修改默认账户密码 1…

Windows中安装nvm进行Node版本控制

1.nvm介绍 nvm英文全程也叫node.js version management&#xff0c;是一个node.js的版本管理工具。nvm和npm都是node.js版本管理工具&#xff0c;但是为了解决node各种不同之间版本存在不兼容的问题&#xff0c;因此可以通过nvm安装和切换不同版本的node。 2.nvm下载 可在点…

6个免费设计资源站,设计师们赶紧收藏!

本期给大家分享5个免费的设计资源站&#xff0c;设计师必备的设计设计神奇&#xff0c;绝对能帮助你在工作中事半功倍&#xff0c;赶紧收藏吧~ 1、菜鸟图库 https://www.sucai999.com/?vNTYwNDUx 菜鸟图库是我推荐过很多次的网站&#xff0c;主要是站内素材多&#xff0c;像…

PHPStorm一站式配置

phpstorm安装好之后&#xff0c;先别急着编码。工欲善其事&#xff0c;必先利其器&#xff0c;配置好下面这些之后让编码事半功倍。 主题 Appearance & Behavior -> Appearance -> Theme 选中 [Light with Light Header] 亮色较为护眼 关闭更新 Appearance & …

C#学习笔记 - C#基础知识 - C#从入门到放弃 - C# 方法

C# 入门基础知识 - 方法 第8节 方法8.1 C# 函数/方法简介8.2 方法的声明及调用8.2.1 参数列表方法的声明及调用8.2.2 参数数组方法的声明及调用8.2.3、引用参数与值参数 8.3 静态方法和实例方法8.3.1 静态、实例方法的区别8.2.3 静态、实例方法的声明及其调用 8.4 虚方法8.4.1 …

Linux学习(3)——基本命令-文件

1、cat&#xff1a;查看文件内容--上下合并文件 注意&#xff1a;cat只能查看普通的文本文件 如果文件内容过多会显示不全 选项效果-n显示行号包括空行-b跳过空白行编号&#xff1b;注意&#xff0c;在一行打了空格不算空白行&#xff0c;enter键直接跳过这一行才算-s将所有连续…

【JAVA】CyclicBarrier源码解析以及示例

文章目录 前言CyclicBarrier源码解析以及示例主要成员变量核心方法 应用场景任务分解与合并应用示例 并行计算应用示例 游戏开发应用示例输出结果 数据加载应用示例 并发工具的协同应用示例 CyclicBarrier和CountDownLatch的区别循环性&#xff1a;计数器的变化&#xff1a;用途…

[c]用指针进行四个数排序

#include<stdio.h> void swap(int*p1,int*p2)//定义函数&#xff0c;实现两个数值交换 {int temp;temp*p1;*p1*p2;*p2temp; } void psort( int *pa, int *pb,int *pc,int *pd) {int i1;for(i1;i<3;i)//对四个数排序&#xff0c;至少3次循环&#xff0c;交换过后是升序…

DDPM推导笔记

各位佬看文章之前&#xff0c;可以先去看看这个视频&#xff0c;并给这位up主点赞投币&#xff0c;这位佬讲解的太好了&#xff1a;大白话AI 1.前置知识的学习 1.1 正态分布特性 ​ &#xff08;1&#xff09;正态分布的概率密度函数 f ( x ) 1 2 π σ e − ( x − μ ) …

Android13音频录制适配

Android13音频录制适配 前言&#xff1a; 之前写过一篇音频录制的文章&#xff0c;当时是在Android10以下的手机可以成功录制和播放&#xff0c;但是Android10及以上手机提示创建文件失败&#xff0c;最近做过Android13的适配&#xff0c;索性一起把之前的录音也适配了&#…

Python 时间日期处理库函数

标准库 datetime >>> import datetime >>> date datetime.date(2023, 12, 20) >>> print(date) 2023-12-20 >>> date datetime.datetime(2023, 12, 20) >>> print(date) 2023-12-20 00:00:00 >>> print(date.strfti…

gem5 RubyPort: mem_request_port作用与连接 simple-MI_example.py

简介 回答这个问题&#xff1a;RubyPort的口下&#xff0c;一共定义了六个口&#xff0c;分别是mem_request_port&#xff0c;mem_response_port&#xff0c;pio_request_port&#xff0c;pio_response_port&#xff0c;in_ports, interrupt_out_ports&#xff0c;他们分别有什…

【异常】jdk21升级,asm报错Unsupported class file major version 65 springboot2 升级JDK21

【异常】jdk21升级&#xff0c;asm报错Unsupported class file major version 65 错误信息 Caused by: org.springframework.core.NestedIOException: ASM ClassReader failed to parse class file - probably due to a new Java class file version that isnt supported yet…

Java对接腾讯多人音视频房间示例

最近在对接腾讯的多人音视频房间&#xff0c;做一个类似于腾讯会议的工具&#xff0c;至于为什么不直接用腾讯会议&#xff0c;这个我也不知道&#xff0c;当然我也不敢问 首先是腾讯官方的文档地址&#xff1a;https://cloud.tencent.com/document/product/1690 我是后端所以…

CSS自适应分辨率 amfe-flexible 和 postcss-pxtorem:大屏高宽自适应问题

前言 继上篇《CSS自适应分辨率 amfe-flexible 和 postcss-pxtorem》。 发现一个有趣的问题&#xff0c;文件 rem.js 中按照宽度设置自适应&#xff0c;适用于大多数页面&#xff0c;但当遇到大屏就不那么合适了。 问题 使用宽度&#xff0c;注意代码第2 和 4 行&#xff1a;…

JAVA面试题分享一百九十九:RabbitMQ 发布确认高级

目录 一、前言 二、发布确认SpringBoot版本 介绍 实战 添加配置类 消息生产者 消息消费者 消息生产者发布消息后的回调接口 三、回退消息 介绍 四、实战 修改配置文件 修改回调接口 五、备份交换机 介绍 实战 修改高级确认发布 配置类 报警消费者 一、前言 …