RocketMQ源码解析-主从同步原理(HA)

1、关键组件

主从同步的实现逻辑主要在HAService中,在它的构造函数中实例化了几个对象同时在start()方法内执行启动:

public class HAService {public HAService(final DefaultMessageStore defaultMessageStore) throws IOException {this.defaultMessageStore = defaultMessageStore;this.acceptSocketService =new AcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort());this.groupTransferService = new GroupTransferService();this.haClient = new HAClient();}......public void start() throws Exception {this.acceptSocketService.beginAccept();this.acceptSocketService.start();this.groupTransferService.start();this.haClient.start();}
}

首先了解一下HAService的构造函数中的内容究竟是干什么的:

  • AcceptSocketService:主要是处理从节点的连接,调用AcceptSocketService#beginAccept()方法,这一步主要是进行端口绑定,在端口上监听从节点的连接请求;调用AcceptSocketService#start()方法启动服务,这一步主要为了处理从节点的连接请求,与从节点建立连接(可以看做是运行在master节点的)。
  • GroupTransferService:主要用于在主从同步的时候,等待数据传输完毕(可以看做是运行在master节点的。
  • HAClient:里面与master节点建立连接,向master汇报主从同步进度并存储master发送过来的同步数据(可以看做是运行在slave从节点的)。

了解完HAService中的组件,而且看到在start()方法中启动了各个组件,那么HAService在何时被启动的呢?

还记得之前在记录broker时,看过BrokerController#initialize()初始化方法内,同时也构建了DefaultMessageStore对象,它作为HAService构造函数的入参,定义的start()方法中就包含HAService的启动

1).构建DefaultMessageStore以及start()启动

//BrokerController.class
public class BrokerController {private MessageStore messageStore;//broekr初始化public boolean initialize() throws CloneNotSupportedException {.......this.messageStore =  new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,this.brokerConfig);.......}//borker启动public void start() throws Exception {if (this.messageStore != null) {this.messageStore.start();}}}
}

2)实例化HAServer以及start()启动

//DefaultMessageStore.class
public class DefaultMessageStore implements MessageStore {private final HAService haService;......public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {......//实例化HAServiceif (!messageStoreConfig.isEnableDLegerCommitLog()) {this.haService = new HAService(this);} else {this.haService = null;}......}public void start() throws Exception {......if (!messageStoreConfig.isEnableDLegerCommitLog()) {//启动HAthis.haService.start();this.handleScheduleMessageService(messageStoreConfig.getBrokerRole());}......}
}

2.主从同步流程 

2.1.绑定端口,监听连接请求

AcceptSocketService#beginAccept方法里面首先获取了ServerSocketChannel,然后进行端口绑定,并在selector上面注册了OP_ACCEPT事件的监听,监听从节点的连接请求:

class AcceptSocketService extends ServiceThread {/*** 监听从节点的连接** @throws Exception If fails.*/public void beginAccept() throws Exception {// 创建ServerSocketChannelthis.serverSocketChannel = ServerSocketChannel.open();// 获取selectorthis.selector = RemotingUtil.openSelector();this.serverSocketChannel.socket().setReuseAddress(true);// 绑定端口:10912this.serverSocketChannel.socket().bind(this.socketAddressListen);// 设置非阻塞this.serverSocketChannel.configureBlocking(false);// 注册OP_ACCEPT连接事件的监听this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);}
}

2.2master节点处理连接 

 因为继承了ServiceThread,所以被调用start()启动方法后,会另外开启一个线程执行run()代码,这块就是处理连接请求:

public class HAService {class AcceptSocketService extends ServiceThread {@Overridepublic void run() {log.info(this.getServiceName() + " service started");// 如果服务未停止while (!this.isStopped()) {try {this.selector.select(1000);// 获取监听到的事件Set<SelectionKey> selected = this.selector.selectedKeys();// 处理事件if (selected != null) {for (SelectionKey k : selected) {// 如果是连接事件if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();if (sc != null) {HAService.log.info("HAService receive new connection, "+ sc.socket().getRemoteSocketAddress());try {// 创建HAConnection,建立连接HAConnection conn = new HAConnection(HAService.this, sc);// 启动conn.start();//添加连接HAService.this.addConnection(conn);}...}}
}
  1.  从selector中获取到监听到的事件;
  2. 如果是OP_ACCEPT连接事件,创建与从节点的连接对象HAConnection,与从节点建立连接,然后调用HAConnectionstart方法进行启动,并创建的HAConnection对象加入到连接集合中,HAConnection中封装了Master节点和从节点的数据同步逻辑;

2.3HAClient

HAClient同样也继承了ServiceThread

public void run() {log.info(this.getServiceName() + " service started");//是否执行while (!this.isStopped()) {try {//连接Masterif (this.connectMaster()) {//判断时间间隔是否合法if (this.isTimeToReportOffset()) {// 发送同步偏移量,传入的参数是当前的主从复制偏移量currentReportedOffsetboolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);//返回不对则关闭连接if (!result) {this.closeMaster();}}......
}}
2.3.1slave与主节点建立连接

connectMaster()方法执行连接主节点操作

 class HAClient extends ServiceThread {// 当前的主从复制进度private long currentReportedOffset = 0;private boolean connectMaster() throws ClosedChannelException {if (null == socketChannel) {String addr = this.masterAddress.get();if (addr != null) {// 将地址转为SocketAddressSocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);if (socketAddress != null) {// 连接masterthis.socketChannel = RemotingUtil.connect(socketAddress);if (this.socketChannel != null) {// 注册OP_READ可读事件监听this.socketChannel.register(this.selector, SelectionKey.OP_READ);}}}// 获取CommitLog中当前最大的偏移量this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();// 更新上次写入时间this.lastWriteTimestamp = System.currentTimeMillis();}return this.socketChannel != null;}
}
2.3.2处理网络可读事件

processReadEvent()方法中处理了可读事件,也就是处理Master节点发送的同步数据, 首先从socketChannel中读取数据到byteBufferRead中,byteBufferRead是读缓冲区,读取数据的方法会返回读取到的字节数,对字节数大小进行判断: 

  class HAClient extends ServiceThread {// 读缓冲区,会将从socketChannel读入缓冲区private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);private boolean processReadEvent() {int readSizeZeroTimes = 0;while (this.byteBufferRead.hasRemaining()) {try {// 从socketChannel中读取数据到byteBufferRead中,返回读取到的字节数int readSize = this.socketChannel.read(this.byteBufferRead);if (readSize > 0) {// 重置readSizeZeroTimesreadSizeZeroTimes = 0;// 处理数据boolean result = this.dispatchReadRequest();if (!result) {log.error("HAClient, dispatchReadRequest error");return false;}} else if (readSize == 0) {// 记录读取到空数据的次数if (++readSizeZeroTimes >= 3) {break;}} else {log.info("HAClient, processReadEvent read socket < 0");return false;}} catch (IOException e) {log.info("HAClient, processReadEvent read socket exception", e);return false;}}return true;}}
  • 如果可读字节数大于0表示有数据需要处理,调用dispatchReadRequest方法进行处理;
  • 如果可读字节数为0表示没有可读数据,此时记录读取到空数据的次数,如果连续读到空数据的次数大于3次,将终止本次处理;
2.3.3消息写入ComitLog

dispatchReadRequest方法中会将从节点读取到的数据写入CommitLog,dispatchPosition记录了已经处理的数据在读缓冲区中的位置,从读缓冲区byteBufferRead获取剩余可读取的字节数,如果可读数据的字节数大于一个消息头的字节数(12个字节),表示有数据还未处理完毕,反之表示消息已经处理完毕结束处理。 

 private boolean dispatchReadRequest() {// 消息头大小final int msgHeaderSize = 8 + 4; // phyoffset + sizeint readSocketPos = this.byteBufferRead.position();// 开启循环不断读取数据while (true) {......// 如果可读取的字节数大于一个消息头的字节数 + 消息体大小if (diff >= (msgHeaderSize + bodySize)) {byte[] bodyData = new byte[bodySize];this.byteBufferRead.position(this.dispatchPosition + msgHeaderSize);this.byteBufferRead.get(bodyData);// 从读缓冲区中根据消息的位置,读取消息内容,将消息追加到从节点的CommitLog中HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);this.byteBufferRead.position(readSocketPos);// 更新dispatchPosition的值为消息头大小+消息体大小this.dispatchPosition += msgHeaderSize + bodySize;if (!reportSlaveMaxOffsetPlus()) {return false;}continue;}}if (!this.byteBufferRead.hasRemaining()) {this.reallocateByteBuffer();}break;}return true;}

2.4向Master发送主从同步消息拉取偏移量

HAClient#run()中与主节点建立连接后,会向主节点发送同步消息拉取偏移量,调用reportSlaveMaxOffset()

 private boolean reportSlaveMaxOffset(final long maxOffset) {this.reportOffset.position(0);this.reportOffset.limit(8); // 设置数据传输大小为8个字节this.reportOffset.putLong(maxOffset);// 设置同步偏移量this.reportOffset.position(0);this.reportOffset.limit(8);for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {try {// 向Master节点发送拉取偏移量this.socketChannel.write(this.reportOffset);} catch (IOException e) {log.error(this.getServiceName()+ "reportSlaveMaxOffset this.socketChannel.write exception", e);return false;}}// 更新发送时间lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();return !this.reportOffset.hasRemaining();}

2.5HAConnection

前面知道HAClientSlave节点会定时向Master节点汇报从节点的消息同步偏移量,那么Master节点是如何处理的呢?

 HAConnection中封装了Master节点与从节点的网络通信处理,分别在ReadSocketService(负责读Socket的服务)和WriteSocketService(负责读Socket的服务)。

暂时不做深究了有兴趣的可以去看看。这边值注意的一点是,消息消费时用的是netty,而主从同步时用的是java.nio下原生的SocketChannel 

3.有新消息写入之后的同步流程

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/591453.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

李宏毅机器学习第二十三周周报 Flow-based model

文章目录 week 23 Flow-based model摘要Abstract一、李宏毅机器学习1.引言2.数学背景2.1Jacobian2.2Determinant2.3Change of Variable Theorem 3.Flow-based Model4.GLOW 二、文献阅读1. 题目2. abstract3. 网络架构3.1 change of variable formula3.2 Coupling layers3.3Prop…

阿里云域名外部入库流程

注册商是阿里云&#xff0c;且在阿里云管理的&#xff0c;请使用此教程外部入库。 如您的域名注册商是阿里云但在聚名管理&#xff0c;请参考教程&#xff1a;https://www.west.cn/faq/list.asp?unid2539 在外部入库操作之前&#xff0c;请先登录阿里云获取账号ID。详细的账…

软件测试方法分类-按照是否手工执行划分

接上一篇,下来我们再细讲,第二个维度的分类, 软件测试方法分类-按照是否手工执行划分 按是否手工执行划分 1,手工测试(manualTesting) 手工测试是由人一个一个的输入用例,然后观察结果,和机器测试相对应,属于比较原始但是必须的一种。 2,自动化测试(automationTestin…

【刷题日志】深度理解除(/)与取模(%)附水仙花数以及变种水仙花数题解

文章目录 &#x1f680;前言&#x1f680;除与取模&#x1f680;水仙花数&#x1f680;变种水仙花数 &#x1f680;前言 本专栏文章都直奔刷题主题&#xff0c;阿辉都不会在废话了&#xff0c;加油&#xff0c;少年&#xff01;&#xff01;&#xff01; &#x1f680;除与取…

STM32CubeMX教程11 RTC 实时时钟 - 入侵检测和时间戳

目录 1、准备材料 2、实验目标 3、实验流程 3.0、前提知识 3.1、CubeMX相关配置 3.1.1、时钟树配置 3.1.2、外设参数配置 3.1.3、外设中断配置 3.2、生成代码 3.2.1、外设初始化调用流程 3.2.2、外设中断调用流程 3.2.3、添加其他必要代码 4、常用函数 5、烧录验…

探索 CodeWave低代码技术的魅力与应用

目录 前言1 低代码平台2 CodeWave简介3 CodeWave 的独特之处3.1 高保真还原交互视觉需求3.2 擅长复杂应用开发3.3 支持应用导出&独立部署3.4 金融级安全要求3.5 可集成性高3.6 可拓展性强 4 平台架构和核心功能4.1 数据模型设计4.2 页面设计4.3 逻辑设计4.4 流程设计4.5 接…

新能源汽车冷却系统的水道管口类型有哪些?格雷希尔针对这些管口密封的快速接头有哪些?

对于新能源汽车&#xff0c;不仅电池&#xff0c;还有电机、电控、充电单元部件&#xff0c;都需要处于适宜的工作温度&#xff0c;才能维持整车的正常运行。而这些部件在运行过程中会产生大量的热量&#xff0c;如果不及时散热会对汽车的性能、寿命产生影响&#xff0c;甚至可…

Scrapy爬虫中合理使用time.sleep和Request

概述 在Scrapy爬虫中&#xff0c;我们需要深入分析time.sleep和Request对象对并发请求的影响。time.sleep函数用于在发起请求之前等待一段时间&#xff0c;而Request对象用于发送HTTP请求。我们必须仔细考虑这些操作对其他并发请求的潜在影响&#xff0c;以及在异步情况下可能…

TTS | NaturalSpeech语音合成论文详解及项目实现【正在更新中】

----------------------------------&#x1f50a; 语音合成 相关系列直达 &#x1f50a; ------------------------------------- ✨NaturalSpeech&#xff1a;正在更新中~ ✨NaturalSpeech2&#xff1a;TTS | NaturalSpeech2语音合成论文详解及项目实现 本文主要是 讲解了Nat…

基于孔雀优化算法的航线规划

MATLAB2020a下正常运行 上传明细-CSDN创作中心

Excel中部分sheet页隐藏并设置访问密码

1、新建sheet1 2、新建sheet2 3、隐藏sheet2 4、保护工作簿、输密码 5、密码二次确认 6、隐藏的sheet2已经查看不了 7、想要查看时&#xff0c;按图示输入原密码即可 8、查看sheet2内容

【软件工程】航行敏捷之路:深度解析Scrum框架的精髓

&#x1f34e;个人博客&#xff1a;个人主页 &#x1f3c6;个人专栏&#xff1a; 软件工程 ⛳️ 功不唐捐&#xff0c;玉汝于成 目录 前言 正文 Scrum&#xff08;敏捷开发框架之一&#xff09; 详细介绍和解释&#xff1a; 优缺点&#xff1a; 优点&#xff1a; 缺点&…

【MySQL】数据库之高级SQL查询语句补充

目录 一、补充正则表达式的查询regexp 二、补充case的用法 三、补充空值和null值的区别 一、补充正则表达式的查询regexp 要知道 在MySQL中使用正则表达式&#xff0c;一定要在前面加上regexp 正则表达式 ^ 匹配文本的开始字符 ‘^bd’ 匹配以 bd 开头的字符串 …

开关电源输入输出电压测试方法:如何用开关电源智能测试系统测试输入输出电压?

一、用万用表测量输入输出电压 1. 连接万用表到电路中 2. 将万用表调到直流电压挡&#xff0c;连接红表笔到开关电源正极&#xff0c;连接黑表笔到开关电源负极。 3. 打开电源&#xff0c;读取万用表显示的电压值。 二、用示波器测量输入输出电压 1. 连接示波器到电路中 2. 将示…

网络安全—PKI公钥基础设施

文章目录 前提知识散列函数非对称加密数字签名 PKI受信任的人RA注册CA颁发IKE数字签名认证&#xff08;交换证书&#xff09;密钥管理 前提知识 散列函数 散列也可以叫哈希函数&#xff0c;MD5、SHA-1、SHA-2、、&#xff08;不管叫啥&#xff0c;都记得是同一个东西就行&…

图神经网络——图学习

图学习 0. 前言1. 图2. 图学习3. 图神经网络小结 0. 前言 近年来&#xff0c;从社交网络到分子生物学等各个领域&#xff0c;数据的图表示越来越普遍。图神经网络 (Graph Neural Network, GNN) 是专为处理图结构数据而设计的&#xff0c;要充分挖掘图表示的潜力&#xff0c;深…

log4cplus visual c++ 编译及调试小记

简介 最近在调试一款SATA加密设备&#xff0c;发现设备有时加密出来的数据&#xff0c;再解密时与明文对不上&#xff0c;怀疑是通信问题。因此&#xff0c;急需要在测试工具中加入通信日志。由于对第三方日志库都不熟悉&#xff0c;所以随便选了个log4cplus软件集成到现有工具…

Easy Rules规则引擎实战

文章目录 简介pom 规则抽象规则Rule基础规则BasicRule事实类Facts&#xff1a;map条件接口动作接口 四种规则定义方式注解方式RuleBuilder 链式Mvel和Spel表达式Yml配置 常用规则类DefaultRuleSpELRule&#xff08;Spring的表达式注入&#xff09; 组合规则UnitRuleGroup 规则引…

009、引用

1. 引用与借用 下面的示例重新定义了一个新的 calculate_length 函数。与之前不同的是&#xff0c;新的函数签名使用了 String 的引用作为参数而没有直接转移值的所有权&#xff1a; fn main() { let s1 String::from("hello"); let len calculate_length(&s1…

Java学习,一文掌握Java之SpringBoot框架学习文集(1)

&#x1f3c6;作者简介&#xff0c;普修罗双战士&#xff0c;一直追求不断学习和成长&#xff0c;在技术的道路上持续探索和实践。 &#x1f3c6;多年互联网行业从业经验&#xff0c;历任核心研发工程师&#xff0c;项目技术负责人。 &#x1f389;欢迎 &#x1f44d;点赞✍评论…