主从同步机制

        RocketMQ的Broker分为Master和Slave两个角色,为了保证高可用性,Master角色的机器接收到消息后,要把内容同步到Slave机器上,这样一旦Master宕机,Slave机器依然可以提供服务。下面分析Master和Slave角色机器间同步功能实现的源码。

1 同步属性信息

Slave需要和Master同步的不只是消息本身,一些元数据信息也需要同步,比如TopicConfig信息、ConsumerOffset信息、DelayOffset和SubscriptionGroupConfig信息。Broker在启动的时候,判断自己的角色是否是Slave,是的话就启动定时同步任务,如代码清单12-1所示。

代码清单12-1 Slave角色定时同步元数据信息

if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
    if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
        this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
        this.updateMasterHAServerAddrPeriodically = false;
    } else {
        this.updateMasterHAServerAddrPeriodically = true;
    }
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                BrokerController.this.slaveSynchronize.syncAll();
            } catch (Throwable e) {
                log.error("ScheduledTask syncAll slave exception", e);
            }
        }
    }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
}

 

在syncAll函数里,调用syncTopicConfig()、getAllConsumerOffset()、syncDelayOffset()和syncSubscriptionGroupConfig()进行元数据同步。我们以syncConsumerOffset为例,来看看底层的具体实现,如代码清单12-2所示。

代码清单12-2 getAllConsumerOffset具体实现

public ConsumerOffsetSerializeWrapper getAllConsumerOffset(
    final String addr) throws InterruptedException, RemotingTimeoutException,
    RemotingSendRequestException, RemotingConnectException, MQBroker-Exception {
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_CONSUMER_OFFSET, null);
    RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
    assert response != null;
    switch (response.getCode()) {
        case ResponseCode.SUCCESS: {
            return ConsumerOffsetSerializeWrapper.decode(response.getBody(), ConsumerOffsetSerializeWrapper.class);
        }
        default:
            break;
    }
    throw new MQBrokerException(response.getCode(), response.getRemark());
}

 

getAllConsumerOffset()的基本逻辑是组装一个RemotingCommand,底层通过Netty将消息发送到Master角色的Broker,然后获取Offset信息。

2 同步消息体

下面介绍Master和Slave之间同步消息体内容的方法,也就是同步CommitLog内容的方法。CommitLog和元数据信息不同:首先,CommitLog的数据量比元数据要大;其次,对实时性和可靠性要求也不一样。元数据信息是定时同步的,在两次同步的时间差里,如果出现异常可能会造成Master上的元数据内容和Slave上的元数据内容不一致,不过这种情况还可以补救(手动调整Offset,重启Consumer等)。CommitLog在高可靠性场景下如果没有及时同步,一旦Master机器出故障,消息就彻底丢失了。所以有专门的代码来实现Master和Slave之间消息体内容的同步。

主要的实现代码在Broker模块的org.apache.rocketmq.store.ha包中,里面包括HAService、HAConnection和WaitNotifyObject这三个类。

HAService是实现commitLog同步的主体,它在Master机器和Slave机器上执行的逻辑不同,默认是在Master机器上执行,见代码清单12-3。

代码清单12-3 根据Broker角色,确定是否设置HaMasterAddress

if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
    if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig
        .getHaMasterAddress().length() >= 6) {
        this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
        this.updateMasterHAServerAddrPeriodically = false;
    } else {
        this.updateMasterHAServerAddrPeriodically = true;
    }

 

当Broker角色是Slave的时候,MasterAddr的值会被正确设置,这样HAService在启动的时候,在HAClient这个内部类中,connectMaster会被正确执行,如代码清单12-4所示。

代码清单12-4 Slave角色连接Master

private boolean connectMaster() throws ClosedChannelException {
    if (null == socketChannel) {
        String addr = this.masterAddress.get();
        if (addr != null) {
            SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
            if (socketAddress != null) {
                this.socketChannel = RemotingUtil.connect(socketAddress);
                if (this.socketChannel != null) {
                    this.socketChannel.register(this.selector, SelectionKey.OP_READ);
                }
            }
        }
        this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();

        this.lastWriteTimestamp = System.currentTimeMillis();
    }
    return this.socketChannel != null;
}

 

从代码中可以看出,HAClient试图通过Java NIO函数去连接Master角色的Broker。Master角色有相应的监听代码,如代码清单12-5所示。

代码清单12-5 监听Slave的HA连接

public void beginAccept() throws Exception {
    this.serverSocketChannel = ServerSocketChannel.open();
    this.selector = RemotingUtil.openSelector();
    this.serverSocketChannel.socket().setReuseAddress(true);
    this.serverSocketChannel.socket().bind(this.socketAddressListen);
    this.serverSocketChannel.configureBlocking(false);
    this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
}

CommitLog的同步,不是经过netty command的方式,而是直接进行TCP连接,这样效率更高。连接成功以后,通过对比Master和Slave的Offset,不断进行同步。

3 sync_master和async_master

sync_master和async_master是写在Broker配置文件里的配置参数,这个参数影响的是主从同步的方式。从字面意思理解,sync_master是同步方式,也就是Master角色Broker中的消息要立刻同步过去;async_master是异步方式,也就是Master角色Broker中的消息是通过异步处理的方式同步到Slave角色的机器上的。下面结合代码来分析,sync_master下的消息同步如代码清单12-6所示。

代码清单12-6 sync_master下的消息同步

public void handleHA(AppendMessageResult result,
    PutMessageResult putMessageResult, MessageExt messageExt) {
    if (BrokerRole.SYNC_MASTER == this.defaultMessageStore
        .getMessageStoreConfig().getBrokerRole()) {
        HAService service = this.defaultMessageStore.getHaService();
        if (messageExt.isWaitStoreMsgOK()) {
            // Determine whether to wait
            if (service.isSlaveOK(result.getWroteOffset() + result
                .getWroteBytes())) {
                GroupCommitRequest request = new GroupCommitRequest
                    (result.getWroteOffset() + result
                    .getWroteBytes());
                service.putRequest(request);
                service.getWaitNotifyObject().wakeupAll();
                boolean flushOK =
                    request.waitForFlush(this.defaultMessageStore
                        .getMessageStoreConfig().getSyncFlushTimeout());
                if (!flushOK) {
                    log.error("do sync transfer other node, wait return, " +
                        "but failed, topic: " + messageExt
                        .getTopic() + " tags: "
                        + messageExt.getTags() + " client address: " +
                        messageExt.getBornHostNameString());
                    putMessageResult.setPutMessageStatus(PutMessageStatus
                        .FLUSH_SLAVE_TIMEOUT);
                }
            }
            // Slave problem
            else {
                // Tell the producer, slave not available
                putMessageResult.setPutMessageStatus(PutMessageStatus
                    .SLAVE_NOT_AVAILABLE);
            }
        }
    }
}

 

在CommitLog类的putMessage函数末尾,调用handleHA函数。代码中的关键词是wakeupAll和waitForFlush,在同步方式下,Master每次写消息的时候,都会等待向Slave同步消息的过程,同步完成后再返回,如代码清单12-7所示。(putMessage函数比较长,仅列出关键的代码)。

代码清单12-7 putMessage中调用handleHA

public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
    // Set the storage time
    msg.setStoreTimestamp(System.currentTimeMillis());
    // Set the message body BODY CRC (consider the most appropriate setting
    // on the client)
    msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
    // Back to Results
    AppendMessageResult result = null;

    StoreStatsService storeStatsService = this.defaultMessageStore
        .getStoreStatsService();

    String topic = msg.getTopic();
    int queueId = msg.getQueueId();

  ……

    handleDiskFlush(result, putMessageResult, msg);
    handleHA(result, putMessageResult, msg);

    return putMessageResult;
}

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/181750.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Nature子刊最新研究:Hi-C宏基因组揭示土壤-噬菌体-宿主相互作用

土壤中有大量的噬菌体。然而,大多数宿主未知,无法获得其基因组特征。2023年11月23日,最新发表于《Nature communications》期刊题为“Hi-C metagenome sequencing reveals soil phage–host interactions”的文章,通过高通量染色体…

2023 最新版navicat 下载与安装 步骤及演示 (图示版)

2023 最新版navicat 下载与安装 步骤演示 -图示版 1. 下载Navicat2 .安装navicat 160 博主 默语带您 Go to New World. ✍ 个人主页—— 默语 的博客👦🏻 《java 面试题大全》 🍩惟余辈才疏学浅,临摹之作或有不妥之处&#xff0c…

?.的用法

使用场景: undefined和null是两个比较特殊的数据类型,是不能用点操作符去访问属性的,否则将会报错。 空值合并操作符?? 使用 ?? 时用于提供默认值,只有当左侧表达式的结果为 null 或 undefined 时,才会返回右侧表达式的值。 …

时钟控制模块

时钟控制模块 锁相环电路简单的理解 https://www.bilibili.com/video/BV1yS4y1n7vV/?spm_id_from333.337.search-card.all.click&vd_source712cdb762d6632543eeeadb56271617a一 时钟是从哪里来的 时钟晶振(32.768KHz)供给RTC使用在IMX6ULL的T16和…

计算机图形学:直线的扫描转换算法解析与实现

直线的扫描转换: DDA算法: 推理: 在计算机显示图形时,由于显示计算机的分辨率是有限的所以我们在绘制图形时需要将图形从连续量转换成离散量才能完成图形的绘制,直线的扫描转换就是将连续量转换为离散量的过程。 对…

wsj0数据集原始文件.wv1.wv2转换成wav文件

文章目录 准备一、获取WSJO数据集二、安装sph2pipe三、转换代码四、结果展示 ​ 最近做语音分离实验需要用到wsj0-2mix数据集,但是从李宏毅语音分离教程里面获取的wsj0-2mix只有一部分。从网上获取到了完整的WSJO数据集后,由于原始的语音文件后缀是wv1或…

【前端】JS实现SQL格式化

sqlFormatter sql-formatter - npm (npmjs.com) const sqlFormatter require(/utils/sqlFormatter)let sql select count(1) as cnt from t_user where id < 7;// 格式化 // let sqlF sqlFormatter.format(sql);let sqlF sqlFormatter.format(sql, {language:mysql,})…

怎么在NAS里找照片?教你一招,精准定位

每次拍照 咔咔一顿拍 好多文档 咔咔一顿存 需要到的时候 却依稀只记得时间和部分关键词 那么怎么快速在NAS里精准定位 找到“命中注定”的它呢 嘿还真有 铁威马的Terra Search 精准搜索 快速定位 So easy&#xff01; 01 什么是Terra Search Terra Search 通过建立数据…

中国信通院发布《中国算力发展指数白皮书》(2023)

加gzh“大数据食铁兽”&#xff0c;回复“20231129”&#xff0c;获取材料完整版 导读 2023 年白皮书在 2022 年的基础上&#xff0c;加强了全球和我国算力发展的研究&#xff0c;客观评估我国整体、各省份及各城市现阶段的算力发展水平进一步给出我国算力二十强市榜单&…

网关路由器双栈配置中的IPv6相关选项解析

1、引言 讲知识往往是枯燥无味的&#xff0c;我们先从问题入手。家庭网关&#xff08;光猫&#xff09;、路由器是我们每个人或多或少都有所接触的2种设备。现在一般都是光纤入户&#xff0c;通常每个家庭配备一个光猫和一台家用路由器。 目前有许多网络服务已经提供了IPv6支…

ASUS(华硕) B760M-AYW WIFI D4_解决wifi不能使用

1、最近新购买了一套 diy电脑主机&#xff0c;选用的是 ASUS B760M-AYW WIFI D4电脑主板 win10 系统&#xff0c;到货后 发现右下角电脑图标处及网络适配器中 没有wifi选项 首先 在官网和旗舰店客服处&#xff0c;确认了 该主板 有集成wifi模块&#xff0c;鲨鱼鳍天线未安装…

Motion Plan之带动力学约束路径搜索

Motion Plan之搜索算法笔记 Motion Plan之基于采样的路径规划算法笔记 为什么要动力学规划&#xff1a; 前面几章介绍的路径规划&#xff0c;我们只是认为机器人是质点&#xff0c;这节课要说的就是&#xff0c;如何在考虑机器人的运动学模型下再去找一个安全可行的路径。考虑…

C#学习相关系列之abstract和virtual用法

一、abstract抽象类用法 1、抽象类的用途 一个类设计的目的是用来被其他类继承的&#xff0c;它代表一类对象的所具有的公共属性或方法&#xff0c;那么这个类就应该设置为抽象类。 抽象类是一种特殊的类&#xff0c;它不能被实例化&#xff0c;只能作为基类来派生出其它的具体…

后端项目连接数据库-添加MyBatis依赖并检测是否成功

一.在pom.xml添加Mybatis相关依赖 在Spring Boot项目中&#xff0c;编译时会自动加载项目依赖&#xff0c;然后使用依赖包。 需要在根目录下pom.xml文件中添加Mybatis依赖项 <!-- Mybatis整合Spring Boot的依赖项 --> <dependency><groupId>org.mybatis.s…

JDK 21 虚拟线程相关知识简介

什么是虚拟线程 虚拟线程是一种轻量级线程&#xff0c;也可以称为协程。它是一种抽象的概念&#xff0c;可以理解为在程序中同时执行多个线程的并发执行。虚拟线程是由Java虚拟机&#xff08;JVM&#xff09;来实现的&#xff0c;它并不与特定的操作系统线程绑定&#xff0c;而…

UDP的特点及应用场景

目录 UDP特点 应用场景 总结 User Datagram Protocol&#xff08;UDP&#xff0c;用户数据报协议&#xff09;是互联网协议套件中的一种传输层协议。与TCP不同&#xff0c;UDP是一种无连接的、不可靠的协议。 UDP特点 要知道UDP可以用来做什么&#xff0c;首先我们要知道它…

UE Web Remote Control

前言 最近在研究UE自启WEB服务和网页通信以此来通过网页与UE进行数据交互&#xff0c;这样最好的方式就是可以摒弃掉整个繁琐的通信连接流程如TCP UDP&#xff0c;但是找到的一些方法都不是很适用&#xff0c;尤其是WEBUI这个插件它只适合内嵌到UE本身才能完成交互&#xff0c;…

LeetCode(37)矩阵置零【矩阵】【中等】

目录 1.题目2.答案3.提交结果截图 链接&#xff1a; 73. 矩阵置零 1.题目 给定一个 m x n 的矩阵&#xff0c;如果一个元素为 0 &#xff0c;则将其所在行和列的所有元素都设为 0 。请使用 原地 算法。 示例 1&#xff1a; 输入&#xff1a;matrix [[1,1,1],[1,0,1],[1,1,1]…

仓储货架生产厂家|拥有编码器+激光传感器的海格里斯HEGERLS料箱式四向穿梭车

随着高新科技的迅猛发展&#xff0c;仓储物流行业已慢慢朝着无人化、自动化、智能化、密集化方向快速发展&#xff0c;用户的需求量也随之日益提升。在众多仓储物流设备中&#xff0c;四向穿梭车越来越得到各大中小企业所青睐和投入使用。四向穿梭车不但具有良好的可延性与适配…

2022年土地出让数据,超多字段,附数据可视化

分享一个土地出让数据&#xff0c;详细信息如下&#xff1a; 数据名称: 2022年土地出让数据 数据格式: Shp、excel 数据时间: 2022年 数据几何类型: 点 数据坐标系: WGS84坐标系 数据来源&#xff1a;网络公开数据 部分字段如下&#xff1a; 如需获取可搜“吧唧数…