RocketMQ源码学习笔记:Broker接受消息和发送消息

这是本人学习的总结,主要学习资料如下

  • 马士兵教育
  • rocketMq官方文档

目录

  • 1、Overview
  • 2、技术亮点
    • 2.1、消息写入时的自旋锁和可重入锁
    • 2.2、堆外内存机制
      • 2.2.1、Overview
      • 2.2.2、源码
        • 2.2.2.1、开启堆外内存的条件
        • 2.2.2.2、堆外内存的初始化
        • 2.2.2.3、写消息到堆外内存
        • 2.2.2.4、堆外内存同步数据到磁盘

1、Overview

这是Broker中类的架构图。

在这里插入图片描述

发送和接收消息的代码流程是从上到下的,比如接受消息的流程就是SendMessageProcessor#processRequest-> DefaultMessageStore#asyncPutMessage -> CommitLog#asyncPutMessage -> MappedFile#appendMessage

2、技术亮点

2.1、消息写入时的自旋锁和可重入锁

CommitLog的构造方法中,初始化了这么一个锁。在写入消息时会调用这个锁的lock()unlock()方法。

this.putMessageLock = defaultMessageStore.getMessageStoreConfig()
.isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() 
: new PutMessageSpinLock();

默认情况下是自旋锁,我们也可以配置成可重入锁。

我们看看PutMessageSpinLock怎么实现的。

public class PutMessageSpinLock implements PutMessageLock {//true: Can lock, false : in lock.private AtomicBoolean putMessageSpinLock = new AtomicBoolean(true);@Overridepublic void lock() {boolean flag;do {flag = this.putMessageSpinLock.compareAndSet(true, false);}while (!flag);}@Overridepublic void unlock() {this.putMessageSpinLock.compareAndSet(false, true);}
}

这个自旋实现的很简单,就是不断地循环然后通过CAS加锁解锁,所以这个锁不会阻塞线程,不涉及操作系统上下文切换,只是CPU空转。

PutMessageReentrantLock则更简单,它直接使用ReentrantLock来加锁解锁。所以可能会导致线程阻塞或者挂起。

官方文档建议,异步刷盘时使用自旋锁,同步刷盘使用可重入锁。

因为异步刷盘速度快,消息到Borker内存就可以返回发送成功,占有锁的时间较少,自旋锁能有最大的效率。

同步刷盘需要等到消息写入磁盘后才能返回发送成功,占有所得时间较长,用自旋锁会导致大量线程空转占用CPU。所以需要用可重入锁将获取锁失败的线程挂起。

2.2、堆外内存机制

2.2.1、Overview

堆外内存机制用于高并发的场景。

因为高并发会在JVM中产生大量的对象,很可能会频繁地触发GC导致STW暂停业务线程。

堆外内存是指从内存中开辟一个新的空间,这个空间的回收不受GC的控制,完全交给开发者。

这片堆外内存会被当成一个缓存,Broker接受到的消息对象会存放到堆外内存中,然后定时从把消息从堆外内存中刷到磁盘。

因为堆外内存的垃圾回收不受GC控制,而是交给开发者,所以就能保证垃圾回收的频率够低,保证业务线程尽可能少地暂停。

这是消息写入时,普通模式和开启堆外内存时的流程图。
在这里插入图片描述


2.2.2、源码

2.2.2.1、开启堆外内存的条件

MessageStoreConfig中可以看到什么情况才会被RocketMQ认为当前开启了堆外内存。

public boolean isTransientStorePoolEnable() {return transientStorePoolEnable && FlushDiskType.ASYNC_FLUSH == getFlushDiskType()&& BrokerRole.SLAVE != getBrokerRole();
}

可以看到,三个条件同时满足才能开启堆外内存。

  1. Broker.conf中设置transientStorePoolEnable=true
  2. 刷盘方式是异步刷盘:第二个刷盘方式必须是异步刷盘。这是因为同步刷盘要求数据写到磁盘后才返回ACK给生产者,这需要较长的时间。但堆外内存的意义就是为了满足高并发,同步刷盘与之相违背,所以只能是异步刷盘。
  3. 当前的Broker必须是Master:因为主从架构中,从节点只能只能被消费者读消息不能被生产者写消息,而堆外内存只是一个写数据时的缓存,读数据还是得从磁盘中读。所以从节点开启堆外内存没意义,反而会占用内存影响性能。

2.2.2.2、堆外内存的初始化

初始化的内容比较简单,靠外部配置就足够的话,一般是在BrokerStartup#createBrokerController中。比较复杂的则是在BrokerController#createBrokerController中。

堆外内存和DefaultMessageStore有关,初始化在DefaultMessageStore的构造方法。下面是相关代码。

if (messageStoreConfig.isTransientStorePoolEnable()) {this.transientStorePool.init();
}
public void init() {// poolSize = 5 by defaultfor (int i = 0; i < poolSize; i++) {ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);final long address = ((DirectBuffer) byteBuffer).address();Pointer pointer = new Pointer(address);LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));availableBuffers.offer(byteBuffer);}
}

从代码中可以看到,初始化具体做的事是新建默认5个ByteBuffer对象,然后存放在availableBuffers中,availableBuffers是一个队列。

这5个ByteBuffer是映射到堆外内存的缓存,后续将通过这几个缓存向堆外内存中放数据。

之后在初始化BrokerController#intialize()中,会通过线程AllocateMappedFileService调用MappedFile#init()方法,将上面初始化好的availableBuffers通过transientStorePool#borrowBuffer()传给MappedFilewriteBuffer。这样写消息时,MappedFile就可以通过writeBuffer向堆外内存写数据。

// init with off-heap
public void init(final String fileName, final int fileSize,final TransientStorePool transientStorePool) throws IOException {init(fileName, fileSize);this.writeBuffer = transientStorePool.borrowBuffer();this.transientStorePool = transientStorePool;
}

2.2.2.3、写消息到堆外内存

写消息的流程是SendMessageProcessor#processRequest-> DefaultMessageStore#asyncPutMessage -> CommitLog#asyncPutMessage -> MappedFile#appendMessage -> MappdFile#appendMessagInner,即使开启堆外内存也是一样。

所以我们可以来看看MappedFile#appendMessagInner怎么实现写消息到堆外内存。这里截取关键片段。

 // 如果writeBuffer不为空,则开启了堆外内存,否则用正常的mappedByteBufferByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();byteBuffer.position(currentPos);AppendMessageResult result;if (messageExt instanceof MessageExtBrokerInner) {result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,(MessageExtBrokerInner) messageExt, putMessageContext);} else if (messageExt instanceof MessageExtBatch) {result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,(MessageExtBatch) messageExt, putMessageContext);} else {return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);}

代码中可以看到,关键点是writeBuffer。之前提到开启了堆外内存,那初始化时会将堆外内存的映射缓存传给MappedFile中的workBuffer;如果没开启堆外内存则writeBuffer为null。

所以开启堆外内存就向writeBuffer写数据到堆外内存;没有开启就向mappedByteBuffer写数据到磁盘。


2.2.2.4、堆外内存同步数据到磁盘

堆外内存只是一个缓存,最终数据应该同步到磁盘。RocketMQ设置了一个定时线程做这个工作,叫CommitRealTimeService

默认200ms同步一次。因为MQ就是用于异步的场景,所以写完数据至少200ms后才能读到也是可以忍耐的。

具体的代码不展示,没什么值得说的地方。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/863171.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MAS0902量产工具分享,MAS0902A开卡教程,MAS0901量产工具下载

MAS0902和MAS1102都是基于SATA3.2技术开发的DRAM-less SSD控制芯片&#xff0c;简单来说就是SATA协议无缓存主控。下面是我摸索的麦光黑金300 240G SSD开卡修复简易教程&#xff0c;也就是MAS0902量产过程&#xff1a; 注意&#xff1a;开卡转接线必须要用ASM1153E或JMS578主控…

鉴权开发框架Django REST framework的应用场景

目录 一、鉴权开发框架介绍二、Django REST framework是什么三、如何实现认证、权限与限流功能四、Django REST framework的应用场景 一、鉴权开发框架介绍 鉴权开发框架是一种用于实现身份验证和授权的软件开发工具。它可以帮助开发者快速构建安全、可靠的身份验证和授权系统…

24级中国科学技术大学843信号与系统考研分数线,中科大843初复试科目,参考书,大纲,真题,苏医工生医电子信息与通信工程。

(上岸难度&#xff1a;★★★★☆&#xff0c;考试大纲、真题、经验帖等考研资讯和资源加群960507167/博睿泽电子信息通信考研咨询&#xff1a;34342183) 一、专业目录及考情分析 说明: ①复试成绩:满分100分。上机满分50分&#xff0c;面试满分150分&#xff0c;复试成绩(上机…

ros1仿真导航机器人 基础传感器数据读取

仅为学习记录和一些自己的思考&#xff0c;不具有参考意义。 1 仿真环境 gazebo、rviz、ros1 2 机器人模型 <?xml version"1.0"?> <robot name"wpb_home_gazebo"><link name"base_footprint"><visual><origin …

C++进阶

C进阶 一、细节1.cout与输出缓冲区2.constexpr3.NULL和nullptr是不同的类型4.关于inline5.函数杂合用法6.const char*、char const*、char * const7.进程地址空间&#xff0c;所谓静态区常量区不准8.位运算9.多态9.1 内存切片9.2 转型9.3 构造函数和析构函数里是静态绑定9.4 dy…

DP:解决路径问题

文章目录 二维DP模型如何解决路径问题有关路径问题的几个问题1.不同路径2.不同路径Ⅱ3.下降路径最小和4.珠宝的最高价值5.地下城游戏 总结 二维DP模型 二维动态规划&#xff08;DP&#xff09;模型是一种通过引入两个维度的状态和转移方程来解决复杂问题的技术。它在许多优化和…

docker容器内为什么能解析宿主机的hosts文件

Docker容器可以通过特定的网络设置来解析宿主机的hosts文件&#xff0c;这是因为Docker容器在创建网络时&#xff0c;会自动将宿主机的DNS配置信息传递给容器。 当你启动一个Docker容器时&#xff0c;如果没有指定任何DNS相关的选项&#xff0c;Docker默认会使用宿主机的DNS配…

Hi3861 OpenHarmony嵌入式应用入门--LiteOS MessageQueue

CMSIS 2.0接口中的消息&#xff08;Message&#xff09;功能主要涉及到实时操作系统&#xff08;RTOS&#xff09;中的线程间通信。在CMSIS 2.0标准中&#xff0c;消息通常是通过消息队列&#xff08;MessageQueue&#xff09;来进行处理的&#xff0c;以实现不同线程之间的信息…

【机器学习300问】135、决策树算法ID3的局限性在哪儿?C4.5算法做出了怎样的改进?

ID3算法是一种用于创建决策树的机器学习算法&#xff0c;该算法基于信息论中的信息增益概念来选择最优属性进行划分。信息增益是原始数据集熵与划分后数据集熵的差值&#xff0c;熵越小表示数据集的纯度越高。有关ID3算法的详细步骤和算法公式在我之前的文章中谈到&#xff0c;…

探索 Electron:将 Web 技术带入桌面应用

Electron是一个开源的桌面应用程序开发框架&#xff0c;它允许开发者使用Web技术&#xff08;如 HTML、CSS 和 JavaScript&#xff09;构建跨平台的桌面应用程序&#xff0c;它的出现极大地简化了桌面应用程序的开发流程&#xff0c;让更多的开发者能够利用已有的 Web 开发技能…

VMware Workstation 安装 Centos 虚拟机

1. 下载 VMware Workstation 直接上网找官网下载即可 2. 下载 Centos 镜像 阿里巴巴开源镜像站-OPSX镜像站-阿里云开发者社区 3.打开 VMware 创建虚拟机 3.1点击创建虚拟机 3.2 选择自定义安装 3.3 选择使用 Workstation 的版本 版本越高兼容性越低但性能越好&#xff0c;一…

智慧校园-实训管理系统总体概述

智慧校园实训管理系统&#xff0c;专为满足高等教育与职业教育的特定需求而设计&#xff0c;它代表了实训课程管理领域的一次数字化飞跃。此系统旨在通过革新实训的组织结构、执行流程及评估标准&#xff0c;来增强学生的实践操作技能和教师的授课效率&#xff0c;为社会输送具…

数据结构-分析期末选择题考点(图)

我是梦中传彩笔 欲书花叶寄朝云 目录 图的常见考点&#xff08;一&#xff09;图的概念题 图的常见考点&#xff08;二&#xff09;图的邻接矩阵、邻接表 图的常见考点&#xff08;三&#xff09;拓扑排序 图的常见考点&#xff08;四&#xff09;关键路径 图的常见考点&#x…

c语言实现贪吃蛇小游戏

源码 /** * FileName: snakec* Author:PowerKing * Version&#xff1a;V1.0* Date:2024.6.28* Description: 贪吃蛇小游戏*/#include <curses.h> #include <stdlib.h> #include <pthread.h> #include <unistd.h>/*贪吃蛇游戏 */#define UP 1…

S32K3 工具篇2:如何在S32DS中使用Segger JLINK下载

S32K3 工具篇2&#xff1a;如何在S32DS中使用Segger JLINK下载 一&#xff0c; S32DS中JLINK下载1.1 Segger JLINK 驱动1.2 S32DS JLINK驱动路径配置1.3 S32DS JLINK debug configuration1.4 S32DS JLINK debug S32K3板子结果 二&#xff0c; JLINK驱动实现S32K344代码下载2.1 …

高考落幕,暑期西北行,甘肃美食等你来尝

高考结束&#xff0c;暑期来临&#xff0c;西北之旅成为许多人的热门选择。而来到甘肃&#xff0c;除了领略壮丽的自然风光和深厚的历史文化&#xff0c;甘肃特产和传统面点以其独特的风味和传统的制作工艺也为游客们带来了一场地道的甘肃美食体验。 平凉的美食&#x…

005-GeoGebra基础篇-GeoGebra的点

新手刚开始操作GeoGebra的时候一般都会恨之入骨&#xff0c;因为有些操作不进行学习确实有些难以凭自己发现。 目录 一、点的基本操作1. 通过工具界面添加点2. 关于点的选择&#xff08;对象选择通用方法&#xff09;&#xff08;1&#xff09;选择工具法&#xff08;2&#xf…

Vue3使用jsbarcode生成条形码,以及循环生成条形码

前言&#xff1a;哈喽&#xff0c;大家好&#xff0c;我是前端菜鸟的自我修养&#xff01;今天给大家分享Vue3使用jsbarcode生成条形码&#xff0c;以及循环生成条形码&#xff0c;介绍了JsBarcode插件的详细使用方法&#xff0c;并提供具体代码帮助大家深入理解&#xff0c;彻…

【Docker】集群容器监控和统计 CAdvisor+lnfluxDB+Granfana的基本用法

集群容器监控和统计组合&#xff1a;CAdvisorlnfluxDBGranfana介绍 CAdvisor&#xff1a;数据收集lnfluxDB&#xff1a;数据存储Granfana&#xff1a;数据展示 ‘三剑客’ 安装 通过使用compose容器编排&#xff0c;进行安装。特定目录下新建文件docker-compose.yml文件&am…

日志分析-windows系统日志分析

日志分析-windows系统日志分析 使用事件查看器分析Windows系统日志 cmd命令 eventvwr 筛选 清除日志、注销并重新登陆&#xff0c;查看日志情况 Windows7和Windowserver2008R2的主机日志保存在C:\Windows\System32\winevt\Logs文件夹下&#xff0c;Security.evtx即为W…