【源码解析】Apache RocketMQ发送消息源码

send message源码解析

引入

send message方法作为我们经常使用的方法,平时我们很难去关注他底层到底做了什么。大部分人只知道通过send message方法可以将消息发送到broker,然后供消费者进行消费。其实不然,消息从客户端发送到broker,需要中间需要经过很多步骤,比如:首先客户端需要向nameserver拿路由,拿到路由后才能将消息发送到对应的broker。消息到了broker,需要先进行校验,校验无误后,再写到commitLog,写完commitLog后,再根据具体的策略判断是否需要同步到slave节点,同步完slave节点完后才response给客户端。

源码阅读入口

// 客户端入口
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl// NameServer入口
org.apache.rocketmq.namesrv.processor.ClientRequestProcessor#getRouteInfoByTopic// Broker端入口
org.apache.rocketmq.broker.processor.SendMessageProcessor#processRequest

源码解析

mid

  • RocketMQ中的客户端send方法提供了单条发送发送也批量发送的API,不管是单条发送还是批量发送本质都是一样的,批量发送会把消息集合包装一下了,具体可以看batch里面的实现,将消息集合封装了MessageBatch对象,当然MessageBatch继承Message。然后再尝试去topicPublishInfoTable中拿路由,如果没有就请求NameServer(忽略经过Proxy层),需要注意的是,请求NameServer获取路由的这个过程是同步的,同一时间只有一个线程可以请求NameServer,需要等到NameServer返回之后才会执行后续的操作。拿到路由后再根据轮询策略选中其中一个broker进行发送。这就是发送消息客户端大致的逻辑,总体来说是还是比较简单的。
  • CONSUMER_SEND_MSG_BACK是消费者发过来的RETRY消息,本次重点不在这里,后续单独讲下这里。当消息到达Broker’端,先根据请求头构建出一个MappingContext对象,再把request对象封装成sendMessageContext;执行注册到sendMessageProcessor里面的钩子方法sendMessageBefore;之后根据是否是batch消息,如果是batch消息,执行sendBatchMessage,不是执行sendMessage方法,其实本质上还是一样的,只是sendBatchMessage中间构建的是messageExtBatch对象,而sendMessage构建的是messageExtBrokerInner对象。MessageExtBatch是MessageExtBrokerInner的子类,所以两者后续还是共用一套逻辑;然后根据是否开启异步写入执行asyncPutMessage或者putMessage,同步的putMessage实际上还是调用的asynPutMessage,只是要等到asyncPutMessage有返回值之后才执行后续的逻辑。我们这里以asyncPutMessage为主,还是先执行注册到SendMessageProcessor里面的钩子方法SendMessageAfter,然后再先判断时候是否是HA(高可用,高可用是需要等到消息写入slave节点成功之后才说明消息发送成功,一般使用在一些金融场景,对消息可靠性要求较高。),然后再然后分配offset(这个offset是由consumeQueue分配的),分配完offset之后,分配完了之后,再将消息体append到commitLog的分配的buf中,返回的状态码PUT_OK执行handleDiskFlush方法,如果是配置的是同步刷盘就等到刷盘成功后返回,如果是异步刷盘,wakeup对应的FlushManager就算写入完成。
  • 上述执行成功后,执行handleHA方法,如果是不是HA模式执行response PUT_OK,否则,构建一个GroupCommitRequest对象put到haService里面,对应slave节点写完最终才算发送成功。

参考:
· https://rocketmq.apache.org/
· 基于Apache Rocket 5.1.0
· https://github.com/apache/rocketmq

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/601157.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ssm基于vue.js的购物商场的设计与实现论文

摘 要 传统办法管理信息首先需要花费的时间比较多,其次数据出错率比较高,而且对错误的数据进行更改也比较困难,最后,检索数据费事费力。因此,在计算机上安装购物商场软件来发挥其高效地信息处理的作用,可以…

cissp 第10章 : 物理安全要求

10.1 站点与设施设计的安全原则 物理控制是安全防护的第一条防线,而人员是最后一道防线。 10.1.1 安全设施计划 安全设施计划通过关键路径分析完成。 关键路径分析用于找出关键应用、流程、运营以及所有必要支撑元索间的关系。 技术融合指的是各种技术、解决方案…

性能优化-OpenMP基础教程(三)

本文主要介绍OpenMP并行编程的环境变量和实战、主要对比理解嵌套并行的效果。 🎬个人简介:一个全栈工程师的升级之路! 📋个人专栏:高性能(HPC)开发基础教程 🎀CSDN主页 发狂的小花 &…

书生·浦语大模型全链路开源体系 学习笔记 第一课

背景 大模型是发展人工通用人工智能的一个重要途径,能够解决多种任务和多种模态,展示了一个更面向更高阶的智能的潜在途径。大模型的发展历程是从专用模型到通用模型的过程,从语音识别、图像识别、人脸识别等专用模型,到通用的大…

Java8内置四大核心函数式接口

先来看几个例子,主要练习策略模式: 用策略模式的做法 定义个接口 其实像这样的接口并不需要我们自己创建 java8推出的Lambda表达式主要就是为了简化开发,而Lambda表达式 的应用主要是针对与函数式接口,自然也推出了对应的一些接口 /*** Java8 内置的四大核心函数式接口** C…

【C++】STL 算法 ③ ( 函数对象中存储状态 | 函数对象作为参数传递时值传递问题 | for_each 算法的 函数对象 参数是值传递 )

文章目录 一、函数对象中存储状态1、函数对象中存储状态简介2、示例分析 二、函数对象作为参数传递时值传递问题1、for_each 算法的 函数对象 参数是值传递2、代码示例 - for_each 函数的 函数对象 参数在外部不保留状态3、代码示例 - for_each 函数的 函数对象 返回值 一、函数…

权威认可!甄知科技猪齿鱼产品荣获信创产品评估证书

近日,依据《信息技术应用创新产品评估规范 第1部分:应用软件》(T/SSIA 2001-2022),经过严格评估,甄知科技旗下自主研发的猪齿鱼数智化开发管理平台 V2.0.0,通过信创测试认证,获得上海…

差分约束算法

差分约束 差分约束系统包含 m m m个涉及 n n n个变量的差额限制条件,这些差额限制条件每个都是形式为 x i − x j ≤ b ∈ [ 1 , m ] x_i-x_j\leq b_{\in[1,m]} xi​−xj​≤b∈[1,m]​的简单线性不等式。 通常我们要求解出一组可行解。 最短路差分约束 如果我们…

ubuntu 22 virt-manger(kvm)安装winxp; ubuntu22体验 firebird3.0

安装 、启动 virt-manager sudo apt install virt-manager sudo systemctl start libvirtdsudo virt-manager安装windowsXP 安装过程截图如下 要点1 启用 “包括寿终正寝的操作系统” win_xp.iso 安装过程 : 从winXp.iso启动, 执行完自己重启从硬盘重启&#xff0c…

稿件代写3个不可或缺的步骤让你事半功倍-华媒舍

作为一个需求频繁的作者,你可能会面临大量的稿件代写任务。但是,你是否曾经为提高文章质量而苦恼过?是否希望在有限的时间内完成更多的代写任务?本篇文章将向你介绍三个不可或缺的稿件代写步骤,帮助你事半功倍&#xf…

Redis高级特性和应用(慢查询、Pipeline、事务、Lua)

Redis的慢查询 许多存储系统(例如 MySQL)提供慢查询日志帮助开发和运维人员定位系统存在的慢操作。所谓慢查询日志就是系统在命令执行前后计算每条命令的执行时间,当超过预设阀值,就将这条命令的相关信息(例如:发生时间,耗时,命令的详细信息)记录下来,Redis也提供了类似…

Huggy Lingo: 利用机器学习改进 Hugging Face Hub 上的语言元数据

太长不看版: Hub 上有不少数据集没有语言元数据,我们用机器学习来检测其语言,并使用 librarian-bots 自动向这些数据集提 PR 以添加其语言元数据。 Hugging Face Hub 已成为社区共享机器学习模型、数据集以及应用的存储库。随着 Hub 上的数据集越来越多&…

[概率论]四小时不挂猴博士

贝叶斯公式是什么 贝叶斯公式是概率论中的一个重要定理,用于计算在已知一些先验信息的情况下,更新对事件发生概率的估计。贝叶斯公式的表达式如下: P(A|B) P(B|A) * P(A) / P(B) 其中,P(A|B)表示在事件B发生的条件下事件A发生的概…

基于sumo实现交通灯控制算法的模板

基于sumo实现交通灯控制算法的模板 目录 在windows安装run hello world networkroutesviewsettings & configurationsimulation 交通灯控制系统 介绍文件生成器类(FileGenerator)道路网络(Network)辅助函数生成道路网络&am…

php 数组中的元素进行排列组合

需求背景:计算出数组[A,B,C,D]各种排列组合,希望得到的是数据如下图 直接上代码: private function finish_combination($array, &$groupResult [], $splite ,){$result [];$finish_result [];$this->diffArrayItems($array, $…

12、DolphinScheduler

1、DolphinScheduler简介 1.1、 DolphinScheduler概述 Apache DolphinScheduler是一个分布式、易扩展的可视化DAG工作流任务调度平台。致力于解决数据处理流程中错综复杂的依赖关系,使调度系统在数据处理流程中开箱即用。 1.2、 DolphinScheduler核心架构 Dolph…

USB -- STM32F103缓冲区描述表及USB数据存放位置讲解(续)

目录 链接快速定位 前沿 1 0x40005C00和0x40006000地址的区别和联系 2 USB_BTABLE寄存器介绍 3 USB缓冲区描述表(SRAM)介绍 3.1 发送缓冲区地址寄存器n(n[0..7]) 3.2 发送数据字节数寄存器n(n[0..7]&#xff09…

从C++习题中思考

目录 一.开始1.1 二.变量和基本类型1.11.21.31.31.41.5 C Peimer习题集第5版练习。 一.开始 1.1 编写程序&#xff0c;提示用户输入2个整数&#xff0c;打印出这两个整数指定的范围内的所有整数。 方式1&#xff1a;使用while循环。 #include<iostream> using namespac…

socket实现视频通话-WebRTC

最近喜欢研究视频流&#xff0c;所以思考了双向通信socket&#xff0c;接下来我们就一起来看看本地如何实现双向视频通讯的功能吧~ 客户端获取视频流 首先思考如何获取视频流呢&#xff1f; 其实跟录音的功能差不多&#xff0c;都是查询电脑上是否有媒体设备&#xff0c;如果…

C语言学习NO.11-字符函数strlen,strlen函数的使用,与三种strlen函数的模拟实现

&#xff08;一&#xff09;strlen函数的使用 strlen函数的演示 #include <stdio.h> #include <string.h>int main() {char arr1[] "abcdef";char arr2[] "good";printf("arr1 %d,arr2 %d",strlen(arr1),strlen(arr2));return …