Ratf协议图解、Nacos CP集群源码分析

文章目录

    • Nacos CP集群
      • 说明
      • Raft协议
        • leader选举
        • 重新选举leader
        • 多个Candidate情况
        • 更新操作,日志复制
        • 网络分区
      • 源码实现
        • 服务注册
        • leader选举
        • leader心跳包

Nacos CP集群

说明

CAP原则

  • C 一致性 Consistency
  • A 可用性 Availability
  • 分区容错性 Partition tolerance

分区容错性:在集群架构下,如果出现了网络中断,某些节点之间不能交互了,此时整个集群服务就不可用了,这是肯定不行的。分区容错性就是出现这种情况但是集群还能提供服务。

一致性和可用性就是当出现网络分区之后,集群中某个时间点数据不一致,此时我是应该先暂时停掉某些节点来保证集群各个节点一致性嘞(或先不对外提供服务,网络分区恢复数据同步完成后再提供服务),还是优先保证集群的可用性(能容忍短时间内的数据不一致性)

在这里插入图片描述



在NacosClient端,在配置文件中有一个spring.cloud.nacos.discovery.ephemeral = true配置项来指定当前服务实例是否为临时实例,默认是临时实例,如果我们改为false则变为持久化实例,它就会进行CP的架构流程



在AP架构下,服务实例数据是写入在内存中的。而CP架构下的持久化实例除了会写入在内存中,它还会写入在file文件中,

文件保存的默认路径是\$nacosHome\data\naming\data\namespaceId\XXXX文件



BASE原则

  • BA 基本可用 Basically Available
  • S 软状态 Soft State
  • E 最终一致性 Eventual Consistency

CAP原则是三选二、BASE原则是CAP的折中,C、A、P三个都要,但不用100%保证每一个原则

分布式系统肯定优先保证P,多数时候是在C和A之间做权衡选择



Raft协议

Raft协议和Zookeepr使用的ZAB协议很相似,主要包括两部分:

  • leader选举,半数以上节点投票同意
  • 集群写入数据同步,两阶段提交,先记录日志,待提交状态,同步其他节点,半数以上节点写入成功再变为提交状态,响应客户端



Raft协议详细情况:

leader选举
  • 刚开始,集群中各个节点都是follower 追随者/属下 状态

  • 如果follower节点没有收到leader的消息,那么它就可以成为leader

  • 集群中各个节点都有自己的一个随机倒计时,一般是150ms~300ms之间



  • 当某个倒计时到了之后,它会变为候选者Candidate状态,首先给自己投一票




  • 然后发送请求给其他follower 节点



  • 如果接收节点尚未在这一阶段投票,那么它将投票给候选人,follower 节点将回复投票,并且会重新进行随机倒计时



  • 当接收到半数以上节点的投票响应后,它就会从Candidate状态变为leader。



  • leader开始定时向其follower 发送心跳消息,心跳包中包含了一些数据



  • follower接收到leader的心跳包之后会返回一些信息,并重置倒计时时间

    在这里插入图片描述

与ZAB协议的区别是,ZAB的各个节点都会发起投票,它没有倒计时休眠设计,各个节点拿到选票后还要进行pk,pk完后当半数以上的节点支持某个节点成为leader它才会真正成为leader



重新选举leader

leader停止后,将会进行重新选举流程

  • follower接收不到leader的心跳包了,所以倒计时会一直进行




  • 当倒计时到了之后,就会变为Candidate候选者,并先给自己投一票,在去给其他follower节点发送请求

  • 其他follower节点当前还未在这一阶段投票,就会把票投给请求方

  • 当Candidate节点接收到半数以上节点的票后就会变为leader,接下来它便开始发送心跳包了




  • 接收follower节点的心跳响应



多个Candidate情况

各个节点都有一个随机的倒计时,可能会出现一种特殊情况,多个节点的随机倒计时一样,在同一时间了从follower状态变为了Candidate候选者状态,并先给自己投一票,在给其他follower节点发送投票请求。




每一个Candidate候选者节点都比另一个节点先到达一个follower追随者节点




现在每个候选人都有2票,不能再获得更多的选票,这种情况下就根本选不出leader




此时就会进行新的一轮选举,因为各个节点的倒计时都还在进行,在新的一轮选举中,总不会还出现了随机倒计时一样的情况吧




此时某个Candidate就会变为leader,并开始发送心跳包了



更新操作,日志复制
  • 更新操作只能在leader节点进行,leader节点需要将所有的更新操作,复制给其他follower节点。

  • 首先一个客户端发送一个更新操作给leader

    在这里插入图片描述


  • 进行更新操作时,会先写入进一个日志中,该日志项当前未提交状态,因此不会更新leader节点的值

    在这里插入图片描述


  • leader节点将在下一次心跳时将更改发送给追随者follower 节点

    在这里插入图片描述


  • 然后leader等待,直到大多数节点都写了该数据,follower节点这里也是写的日志,并响应给leader

  • 此时leader节点才会将该条记录变为提交状态,并真正进行更新操作

    在这里插入图片描述


  • 响应给客户端

    在这里插入图片描述

  • leader在通知其他follower 节点该数据已经是提交状态了,其他节点也在将各自日志中的数据进行真正的更新操作



网络分区

Raft协议可以在网络分区时保持一致,比如集群的初始状态如下所示,NodeB为leader,并正在进行发送心跳包

在这里插入图片描述


网络分区后,A和B在一边,CDE三个节点在另一边

此时只有NodeA能接收到leader的心跳了,其他三个节点都没有接收到心跳包

在这里插入图片描述



其他三个节点的倒计时因为一直没有接收到leader的心跳包,所以也就不会重新开始,那么此时就会出现某一个节点当倒计时到时间后就会从follower状态变为Candidate状态,并开始进行投票机制,它先给自己投一票,然后再发送请求,收到另外的两票,再从Candidate状态变为leader

在这里插入图片描述



添加另一个客户端,并尝试更新两个leader。

在这里插入图片描述



一个客户端尝试将leader节点的值设置为3,该leader记录日志后,进行日志复制,一直接收不到超过半数节点的响应,当前的更新也就一直是未提交状态

在这里插入图片描述



此时其他客户端尝试将leader节点的值设置为8,这将会成功,因为它可以复制到大多数节点

在这里插入图片描述



现在让我们修复网络分区

此时节点B将看到更高的选举期限并退出。两个节点A和B都将回滚它们未提交的条目,并匹配新的leader的日志。

在这里插入图片描述

此时整个集群的数据一致了。



源码实现

服务注册

服务实例进行注册,调用NacosServer端的接口时,此时的实例会分为临时实例和持久化实例。注册实例时会从controller层进入到service层,会进入到ServiceManager.addInstance()方法中来

public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)throws NacosException {// 根据ephemeral的值来决定生成上面key,默认情况下NacosClient传递过来的都是true,一般微服务的实例都是临时实例,不是持久化实例// 持久化实例  key = 一些字符串常量 + namespaceId + “##” + serviceName// 临时实例   key = 一些字符串常量 + “ephemeral” + namespaceId + “##” + serviceNameString key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);Service service = getService(namespaceId, serviceName);synchronized (service) {List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);Instances instances = new Instances();instances.setInstanceList(instanceList);// 这里会进入到下一个逻辑中consistencyService.put(key, instances);}
}

上方会调用进DelegateConsistencyServiceImpl实现类的put()方法中,在这里就会根据key中是否为临时实例,进而决定调用不同实现类的put()方法

public void put(String key, Record value) throws NacosException {// mapConsistencyService(key)根据是否为临时实例,进而去调用不同实现类的put()方法mapConsistencyService(key).put(key, value);
}// 根据是否为临时实例,返回不同的实现类
private ConsistencyService mapConsistencyService(String key) {return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;
}
  • 这里如果是临时实例就会进入到DistroConsistencyServiceImpl实现类的put()方法中

  • 如果是持久化实例就会进入到PersistentConsistencyServiceDelegateImpl实现类的put()方法中

我们需要知道一个点,Nacos服务注册中心是对内存的操作,数据库中存储的是配置中心的数据

PersistentConsistencyServiceDelegateImpl实现类中有一些小改动,我们可以先直接去看他原生的Raft协议RaftConsistencyServiceImpl

下面代码具体的实现,RaftConsistencyServiceImpl类(弃用了,了解即可):

  • 在这个类中如果新增了数据它会先进行写文件的操作
  • 然后直接发布一个ValueChangeEvent事件
  • 处理该事件的方法中会对内存中的数据进行更改
  • 发布完事件后再去通知其他节点。

相当于这里没有使用二阶段提交,而是直接一阶段就完成了。这是有一些问题的,就比如其他节点并没有更新成功,但本机却以及更新完成了。

下面的代码可以学一下,这里就使用了CountDownLatch来实现的半数以上节点的正常响应

// RaftConsistencyServiceImpl类
public void put(String key, Record value) throws NacosException {checkIsStopWork();try {// 核心方法入口raftCore.signalPublish(key, value);} catch (Exception e) {Loggers.RAFT.error("Raft put failed.", e);throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value,e);}
}------------------------------------------------------------------------------------------------------
// 进入到RaftCore.signalPublish()方法
// 写文件 --> 直接更新内存,未向nacos其他节点同步数据 -->  再向nacos其他节点同步数据
public void signalPublish(String key, Record value) throws Exception {if (stopWork) {throw new IllegalStateException("old raft protocol already stop work");}// 当前节点如果不是leaderif (!isLeader()) {ObjectNode params = JacksonUtils.createEmptyJsonNode();params.put("key", key);params.replace("value", JacksonUtils.transferToJsonNode(value));Map<String, String> parameters = new HashMap<>(1);parameters.put("key", key);final RaftPeer leader = getLeader();// 直接把请求转发给leader,让leader去进行写数据raftProxy.proxyPostLarge(leader.ip, API_PUB, params.toString(), parameters);return;}// 是leader节点的处理逻辑OPERATE_LOCK.lock();try {final long start = System.currentTimeMillis();final Datum datum = new Datum();datum.key = key;datum.value = value;if (getDatum(key) == null) {datum.timestamp.set(1L);} else {datum.timestamp.set(getDatum(key).timestamp.incrementAndGet());}ObjectNode json = JacksonUtils.createEmptyJsonNode();json.replace("datum", JacksonUtils.transferToJsonNode(datum));json.replace("source", JacksonUtils.transferToJsonNode(peers.local()));// 核心方法,进行发布// 该方法中会先对更新操作写入到磁盘文件中,然后发布一个ValueChangeEvent事件,处理该事件的方法中会对内存中的数据进行更改// 相当于这里没有进行两阶段提交,直接一阶段onPublish(datum, peers.local());final String content = json.toString();// 这里再去通知其他节点,// peers就是各个Nacos集群节点,取peers.size() / 2 + 1 个数的CountDownLatchfinal CountDownLatch latch = new CountDownLatch(peers.majorityCount());// 遍历除了自己之外的nacos节点for (final String server : peers.allServersIncludeMyself()) {if (isLeader(server)) {latch.countDown();continue;}// 调用各个节点 发送请求final String url = buildUrl(server, API_ON_PUB);HttpClient.asyncHttpPostLarge(url, Arrays.asList("key", key), content, new Callback<String>() {@Overridepublic void onReceive(RestResult<String> result) {if (!result.ok()) {Loggers.RAFT.warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",datum.key, server, result.getCode());return;}// 各个节点响应的回调方法,如果是正常响应则countDown()latch.countDown();}@Overridepublic void onError(Throwable throwable) {Loggers.RAFT.error("[RAFT] failed to publish data to peer", throwable);}@Overridepublic void onCancel() {}});}// 如果并没有半数以上的节点正常响应,那么这里就会抛异常,但此时我本机的内存数据已经更改完成了。if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {// only majority servers return success can we consider this update successLoggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", key);throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key);}long end = System.currentTimeMillis();Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key);} finally {OPERATE_LOCK.unlock();}
}------------------------------------------------------------------------------------------------------// 先写文件 --->  发布事件  ----> 处理事件的类会去更新内存中注册表的数据
public void onPublish(Datum datum, RaftPeer source) throws Exception {if (stopWork) {throw new IllegalStateException("old raft protocol already stop work");}RaftPeer local = peers.local();if (datum.value == null) {Loggers.RAFT.warn("received empty datum");throw new IllegalStateException("received empty datum");}if (!peers.isLeader(source.ip)) {Loggers.RAFT.warn("peer {} tried to publish data but wasn't leader, leader: {}", JacksonUtils.toJson(source),JacksonUtils.toJson(getLeader()));throw new IllegalStateException("peer(" + source.ip + ") tried to publish " + "data but wasn't leader");}if (source.term.get() < local.term.get()) {Loggers.RAFT.warn("out of date publish, pub-term: {}, cur-term: {}", JacksonUtils.toJson(source),JacksonUtils.toJson(local));throw new IllegalStateException("out of date publish, pub-term:" + source.term.get() + ", cur-term: " + local.term.get());}local.resetLeaderDue();// if data should be persisted, usually this is true:if (KeyBuilder.matchPersistentKey(datum.key)) {// 写文件 一般是写入到\nacosHome\data\naming\data\namespaceId\XXXX文件raftStore.write(datum);}datums.put(datum.key, datum);if (isLeader()) {local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);} else {if (local.term.get() + PUBLISH_TERM_INCREASE_COUNT > source.term.get()) {//set leader term:getLeader().term.set(source.term.get());local.term.set(getLeader().term.get());} else {local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);}}raftStore.updateTerm(local.term.get());// 文件写完之后会发布一个ValueChangeEvent事件,而处理该事件的是在PersistentNotifier类中onEvent()方法// 在处理事件的方法中就会真正去更新内存中的数据,也就是服务注册表中的数据NotifyCenter.publishEvent(ValueChangeEvent.builder().key(datum.key).action(DataOperation.CHANGE).build());Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term);
}



leader选举

上面服务注册是调用的RaftCore.signalPublish()方法,我们现在看看RaftCore.init()方法

@PostConstruct
public void init() throws Exception {Loggers.RAFT.info("initializing Raft sub-system");final long start = System.currentTimeMillis();// 该节点刚启动,需要从磁盘文件中读取数据// 持久化实例是会在文件中保存一份,如果当且节点宕机了再重启,肯定是需要重新读取文件进行数据恢复的raftStore.loadDatums(notifier, datums);...// 执行两个定时任务,每隔500ms执行一次,一个是选举主节点,另一个是发送心跳包masterTask = GlobalExecutor.registerMasterElection(new MasterElection());heartbeatTask = GlobalExecutor.registerHeartbeat(new HeartBeat());...
}

对于Leader的选举核心就是要看MasterElection这个定时任务:

具体实现流程总结如下:

  • 检验选举时间leaderDueMs属性是<=0

  • 重置选举时间和发送心跳包时间

  • 重置集群各个节点投票

  • 选举周期+1

  • 先投票给自己、修改为candidate

  • 向各个follower节点发送投票请求,请求参数就是本机节点信息

    /raft/vote 接口提供方的逻辑:

    • 如果收到的候选节点term小于等于本地节点term,则本地节点的voteFor更新为自己(意思是这一票投给我自己,我更适合做leader)
    • 否则这个follower将重置它的election timeout;更新它的voteFor为收到的候选者节点ip(意思是这一票就投给你了)
    • 更新它的term为收到的候选者节点term;
    • 将本地节点作为响应数据返回
  • 发送投票请求后,处理响应数据

    • 遍历所有的节点,若节点的voteFor不为null,则将voteFor添加进ips中
    • 记录被选举次数最多的节点和次数
    • 选举最多次数 > 集群节点半数+1 就把选举次数最多的节点状态改为leader
public class MasterElection implements Runnable {@Overridepublic void run() {try {if (stopWork) {return;}if (!peers.isReady()) {return;}// 每执行一次任务就减一次任务执行的间隔时间,直到减到小于等于0就可以开始投票了RaftPeer local = peers.local();local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS;if (local.leaderDueMs > 0) {return;}// reset timeout// 重置选举时间local.resetLeaderDue();// 重置心跳时间local.resetHeartbeatDue();// 核心方法 进行投票sendVote();} catch (Exception e) {Loggers.RAFT.warn("[RAFT] error while master election {}", e);}}--------------------------------------------------------------------------------------------------------------------// 真正发送请求 进行leader选举投票private void sendVote() {RaftPeer local = peers.get(NetUtils.localServer());Loggers.RAFT.info("leader timeout, start voting,leader: {}, term: {}", JacksonUtils.toJson(getLeader()),local.term);// 重置集群各个节点投票peers.reset();// 选举周期+1local.term.incrementAndGet();// 先投票给自己local.voteFor = local.ip;// 把自己的状态从follower改为候选者Candidatelocal.state = RaftPeer.State.CANDIDATE;// 请求参数为local对象Map<String, String> params = new HashMap<>(1);params.put("vote", JacksonUtils.toJson(local));// 遍历除了自己之外的集群节点for (final String server : peers.allServersWithoutMySelf()) {// 获取各个节点的url,发送异步post请求final String url = buildUrl(server, API_VOTE);try {/*** /raft/vote 接口提供方的逻辑:* 如果收到的候选节点term小于等于本地节点term,则本地节点的voteFor更新为自己(意思是这一票投给我自己,我更适合做leader)* 否则这个follower将重置它的election timeout;更新它的voteFor为收到的候选者节点ip(意思是这一票就投给你了)* 更新它的term为收到的候选者节点term;* 将本地节点作为响应数据返回*/HttpClient.asyncHttpPost(url, null, params, new Callback<String>() {@Overridepublic void onReceive(RestResult<String> result) {if (!result.ok()) {Loggers.RAFT.error("NACOS-RAFT vote failed: {}, url: {}", result.getCode(), url);return;}// 解析其他节点的响应数据RaftPeer peer = JacksonUtils.toObj(result.getData(), RaftPeer.class);Loggers.RAFT.info("received approve from peer: {}", JacksonUtils.toJson(peer));// 去处理其他节点的响应数据// 遍历所有的节点,若节点的voteFor不为null,则将voteFor添加进ips中// 记录被选举次数最多的节点和次数// 选举最多次数 > 集群节点半数+1  就把选举次数最多的节点状态改为leaderpeers.decideLeader(peer);}@Overridepublic void onError(Throwable throwable) {Loggers.RAFT.error("error while sending vote to server: {}", server, throwable);}@Overridepublic void onCancel() {}});} catch (Exception e) {Loggers.RAFT.warn("error while sending vote to server: {}", server);}}}
}
// 遍历所有的节点,若节点的voteFor不为null,则将voteFor添加进ips中
// 记录被选举次数最多的节点和次数
// 选举最多次数 > 集群节点半数+1  就把选举次数最多的节点状态改为leader
public RaftPeer decideLeader(RaftPeer candidate) {peers.put(candidate.ip, candidate);SortedBag ips = new TreeBag();// 记录被选举最多次数int maxApproveCount = 0;// 记录被选举次数最多的节点String maxApprovePeer = null;for (RaftPeer peer : peers.values()) {if (StringUtils.isEmpty(peer.voteFor)) {continue;}ips.add(peer.voteFor);if (ips.getCount(peer.voteFor) > maxApproveCount) {maxApproveCount = ips.getCount(peer.voteFor);maxApprovePeer = peer.voteFor;}}// 当前收到的选票 > 集群节点半数+1if (maxApproveCount >= majorityCount()) {RaftPeer peer = peers.get(maxApprovePeer);// 把maxApprovePeer的节点状态改为leaderpeer.state = RaftPeer.State.LEADER;if (!Objects.equals(leader, peer)) {leader = peer;ApplicationUtils.publishEvent(new LeaderElectFinishedEvent(this, leader, local()));Loggers.RAFT.info("{} has become the LEADER", leader.ip);}}return leader;
}



leader心跳包

RaftCore.init()方法,执行两个定时任务,每隔5秒执行一次,一个是选举主节点,另一个是发送心跳包

@PostConstruct
public void init() throws Exception {Loggers.RAFT.info("initializing Raft sub-system");final long start = System.currentTimeMillis();// 该节点刚启动,需要从磁盘文件中读取数据// 持久化实例是会在文件中保存一份,如果当且节点宕机了再重启,肯定是需要重新读取文件进行数据恢复的raftStore.loadDatums(notifier, datums);...// 执行两个定时任务,每隔5秒执行一次,一个是选举主节点,另一个是发送心跳包masterTask = GlobalExecutor.registerMasterElection(new MasterElection());heartbeatTask = GlobalExecutor.registerHeartbeat(new HeartBeat());...
}

HeartBeat定时任务发送心跳包的具体实现:

具体实现流程总结如下:

  • 发送心跳包时间heartbeatDueMs是否 <= 0
  • 重置heartbeatDueMs 时间
  • 当前的节点如果不是leader那么就不能发送心跳
  • 处理请求参数,将本机节点信息、本机中所有的Datum.key 和它所对应的时间戳进行压缩,封装为请求参数作为心跳包发送个各个节点
public class HeartBeat implements Runnable{@Overridepublic void run() {try {if (stopWork) {return;}if (!peers.isReady()) {return;}// 每执行一次任务就减少一次 任务执行的间隔时间500ms ,直到减少到小于等于0就开始向其他节点发送心跳RaftPeer local = peers.local();local.heartbeatDueMs -= GlobalExecutor.TICK_PERIOD_MS;if (local.heartbeatDueMs > 0) {return;}// 重置heartbeatDueMs 时间local.resetHeartbeatDue();// 核心方法,发送心跳的逻辑sendBeat();} catch (Exception e) {Loggers.RAFT.warn("[RAFT] error while sending beat {}", e);}}--------------------------------------------------------------------------------------------------------------------private void sendBeat() throws IOException, InterruptedException {RaftPeer local = peers.local();// 当前的节点如果不是leader那么就不能发送心跳if (EnvUtil.getStandaloneMode() || local.state != RaftPeer.State.LEADER) {return;}if (Loggers.RAFT.isDebugEnabled()) {Loggers.RAFT.debug("[RAFT] send beat with {} keys.", datums.size());}local.resetLeaderDue();// build data// 心跳包最终的数据ObjectNode packet = JacksonUtils.createEmptyJsonNode();// 当前节点信息packet.replace("peer", JacksonUtils.transferToJsonNode(local));// array存放当前节点中所有服务实例集合对应的datum.keyArrayNode array = JacksonUtils.createEmptyArrayNode();if (switchDomain.isSendBeatOnly()) {Loggers.RAFT.info("[SEND-BEAT-ONLY] {}", switchDomain.isSendBeatOnly());}if (!switchDomain.isSendBeatOnly()) {// 遍历当前所有的datumsfor (Datum datum : datums.values()) {// 创建一个空的ObjectNodeObjectNode element = JacksonUtils.createEmptyJsonNode();// 这里只存放datum.keyif (KeyBuilder.matchServiceMetaKey(datum.key)) {element.put("key", KeyBuilder.briefServiceMetaKey(datum.key));} else if (KeyBuilder.matchInstanceListKey(datum.key)) {element.put("key", KeyBuilder.briefInstanceListkey(datum.key));}// 存放datum对应的timestamp时间戳,也就相当于一个版本号element.put("timestamp", datum.timestamp.get());// 添加进array中array.add(element);}}packet.replace("datums", array);// 接下来对请求参数packet进行封装并压缩// broadcastMap<String, String> params = new HashMap<String, String>(1);params.put("beat", JacksonUtils.toJson(packet));String content = JacksonUtils.toJson(params);ByteArrayOutputStream out = new ByteArrayOutputStream();GZIPOutputStream gzip = new GZIPOutputStream(out);gzip.write(content.getBytes(StandardCharsets.UTF_8));gzip.close();byte[] compressedBytes = out.toByteArray();String compressedContent = new String(compressedBytes, StandardCharsets.UTF_8);// compressedContent存放的就是压缩之后的请求参数if (Loggers.RAFT.isDebugEnabled()) {Loggers.RAFT.debug("raw beat data size: {}, size of compressed data: {}", content.length(),compressedContent.length());}// 向除了自己的其他节点发送心跳包,心跳包中包含的数据就是当前节点信息 + 当前节点中所有的datum.keyfor (final String server : peers.allServersWithoutMySelf()) {try {final String url = buildUrl(server, API_BEAT);if (Loggers.RAFT.isDebugEnabled()) {Loggers.RAFT.debug("send beat to server " + server);}// /raft/beat  发送请求HttpClient.asyncHttpPostLarge(url, null, compressedBytes, new Callback<String>() {@Overridepublic void onReceive(RestResult<String> result) {if (!result.ok()) {Loggers.RAFT.error("NACOS-RAFT beat failed: {}, peer: {}", result.getCode(), server);MetricsMonitor.getLeaderSendBeatFailedException().increment();return;}peers.update(JacksonUtils.toObj(result.getData(), RaftPeer.class));if (Loggers.RAFT.isDebugEnabled()) {Loggers.RAFT.debug("receive beat response from: {}", url);}}@Overridepublic void onError(Throwable throwable) {Loggers.RAFT.error("NACOS-RAFT error while sending heart-beat to peer: {} {}", server,throwable);MetricsMonitor.getLeaderSendBeatFailedException().increment();}@Overridepublic void onCancel() {}});} catch (Exception e) {Loggers.RAFT.error("error while sending heart-beat to peer: {} {}", server, e);MetricsMonitor.getLeaderSendBeatFailedException().increment();}}}
}

我们接下来再来看看/raft/beat 接口的具体实现,看看follower节点是如何处理leader节点的心跳包的,会进入到RaftController.beat()方法

具体实现流程总结如下:

  • 解析请求参数

  • 接收到的心跳包如果不是leader发送的则抛异常

  • 如果本机的term 大于 心跳包term,则心跳包不进行处理

  • 当前节点的状态如果不是follower,那么就把自己的节点状态改为follower,voteFor改为请求参数的ip

  • 重置本机的选举时间和心跳发送时间

  • 更新leader信息,将remote设置为新leader,更新原有leader的节点信息(leader会通过心跳通知其他节点更新leader)

  • 创建一个Map集合:Map<String, Integer> receivedKeysMap

    • 该集合中存放的是Datum.key,其中Integer的值为0表示Datum.key是本机的,如果为1则表示是远程心跳包中的
    • 经过下面的处理之后,receivedKeysMap中可能还会存在一些Integer的值为0,而这些数据意味着在leader节点中这些数据已经被删除了
    • 所以在最后会有一个利用该集合进行移除Datum的操作
  • 将本机中的datum对象都存入receivedKeysMap集合中,value都是0

  • 创建一个批处理集合batch

  • 遍历心跳包中的beatDatums

    • 每一次遍历都把datumKey存入到receivedKeysMap中去,value都是1
    • 远程心跳包中的datumKey是否在我本机的datums集合中存在,并且本机该datumKey对应的时间戳也比远程心跳包中的时间戳更大。这就表示我不需要处理这一条datumKey
    • 和上面的if判断逻辑相反,则需要添加进batch这个集合中
    • 批量处理逻辑,表示我的batch集合还能继续存放数据
      假如远程心跳包传递过来了100条datumKey 难道我要一条一条的去处理吗?所以肯定是需要批处理的
      processedCount的数值最终是 等于 beatDatums集合中的个数,表示真正处理beatDatums中的最后一个元素
      第二个条件是 假如的传递过来的总数只有10条嘞?那岂不是就直接遍历完跳出循环了,batch集合中存放的10条数据都还没有进行处理
    • 当batch集合存放的数据超过50后,调用远程心跳包发送过来的leader节点ip,把我需要的datumKey集合发送给leader。 然后对leader返回的数据进行处理,写入到本机的磁盘文件和内存中
  • 循环遍历结束后,处理receivedKeysMap集合value为0的Datum。移除Datum,先删除内存中datums,在删除磁盘文件

// 在RaftController.beat()方法中不会进行什么重要的处理,会直接调用到receivedBeat(JsonNode beat)方法中
public RaftPeer receivedBeat(JsonNode beat) throws Exception {if (stopWork) {throw new IllegalStateException("old raft protocol already stop work");}// 解析请求参数final RaftPeer local = peers.local();final RaftPeer remote = new RaftPeer();JsonNode peer = beat.get("peer");remote.ip = peer.get("ip").asText();remote.state = RaftPeer.State.valueOf(peer.get("state").asText());remote.term.set(peer.get("term").asLong());remote.heartbeatDueMs = peer.get("heartbeatDueMs").asLong();remote.leaderDueMs = peer.get("leaderDueMs").asLong();remote.voteFor = peer.get("voteFor").asText();// 接收到的心跳包如果不是leader发送的则抛异常if (remote.state != RaftPeer.State.LEADER) {Loggers.RAFT.info("[RAFT] invalid state from master, state: {}, remote peer: {}", remote.state,JacksonUtils.toJson(remote));throw new IllegalArgumentException("invalid state from master, state: " + remote.state);}// 如果本机的term 大于 心跳包term,则心跳包不进行处理if (local.term.get() > remote.term.get()) {Loggers.RAFT.info("[RAFT] out of date beat, beat-from-term: {}, beat-to-term: {}, remote peer: {}, and leaderDueMs: {}",remote.term.get(), local.term.get(), JacksonUtils.toJson(remote), local.leaderDueMs);throw new IllegalArgumentException("out of date beat, beat-from-term: " + remote.term.get() + ", beat-to-term: " + local.term.get());}// 当前节点的状态如果不是follower,那么就把自己的节点状态改为follower,voteFor改为请求参数的ipif (local.state != RaftPeer.State.FOLLOWER) {Loggers.RAFT.info("[RAFT] make remote as leader, remote peer: {}", JacksonUtils.toJson(remote));// mk followerlocal.state = RaftPeer.State.FOLLOWER;local.voteFor = remote.ip;}// 心跳中的核心数据包final JsonNode beatDatums = beat.get("datums");// 重置选举时间和心跳发送时间local.resetLeaderDue();local.resetHeartbeatDue();// 更新leader信息,将remote设置为新leader,更新原有leader的节点信息(leader会通过心跳通知其他节点更新leader)peers.makeLeader(remote);if (!switchDomain.isSendBeatOnly()) {// 该集合中存放的是Datum.key,其中Integer的值为0表示Datum.key是本机的,如果为1则表示是远程心跳包中的// 经过下面方法的处理之后,receivedKeysMap中可能还会存在一些Integer的值为0,而这些数据意味着在leader节点中这些数据已经被删除了// 所以在该方法的最后会有一个利用 该集合 进行移除Datum的操作Map<String, Integer> receivedKeysMap = new HashMap<>(datums.size());// 将本机中的datum对象都存入集合中,value都是0for (Map.Entry<String, Datum> entry : datums.entrySet()) {receivedKeysMap.put(entry.getKey(), 0);}// now check datums// 创建一个批处理集合List<String> batch = new ArrayList<>();int processedCount = 0;if (Loggers.RAFT.isDebugEnabled()) {Loggers.RAFT.debug("[RAFT] received beat with {} keys, RaftCore.datums' size is {}, remote server: {}, term: {}, local term: {}",beatDatums.size(), datums.size(), remote.ip, remote.term, local.term);}// 遍历心跳包中的beatDatumsfor (Object object : beatDatums) {processedCount = processedCount + 1;JsonNode entry = (JsonNode) object;String key = entry.get("key").asText();final String datumKey;// 得到datumKeyif (KeyBuilder.matchServiceMetaKey(key)) {datumKey = KeyBuilder.detailServiceMetaKey(key);} else if (KeyBuilder.matchInstanceListKey(key)) {datumKey = KeyBuilder.detailInstanceListkey(key);} else {// ignore corrupted key:continue;}long timestamp = entry.get("timestamp").asLong();// 把datumKey存入到receivedKeysMap中去receivedKeysMap.put(datumKey, 1);try {// 远程心跳包中的datumKey是否在我本机的datums集合中存在,并且本机该datumKey对应的时间戳也比远程心跳包中的时间戳更大// 这就表示我不需要处理这一条datumKeyif (datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp&& processedCount < beatDatums.size()) {continue;}// 和上面的if判断逻辑相反,则需要添加进batch这个集合中if (!(datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp)) {batch.add(datumKey);}// 批量处理逻辑,表示我的batch集合还能继续存放数据// 假如远程心跳包传递过来了100条datumKey 难道我要一条一条的去处理吗?所以肯定是需要批处理的// processedCount的数值最终是 等于 beatDatums集合中的个数,表示真正处理beatDatums中的最后一个元素// 第二个条件是 假如的传递过来的总数只有10条嘞?那岂不是就直接遍历完跳出循环了,batch集合中存放的10条数据都还没有进行处理if (batch.size() < 50 && processedCount < beatDatums.size()) {continue;}// 当batch集合存放的数据超过50后String keys = StringUtils.join(batch, ",");if (batch.size() <= 0) {continue;}Loggers.RAFT.info("get datums from leader: {}, batch size is {}, processedCount is {}"+ ", datums' size is {}, RaftCore.datums' size is {}", getLeader().ip, batch.size(),processedCount, beatDatums.size(), datums.size());// update datum entry// 调用远程心跳包发送过来的leader节点ip,把我需要的datumKey集合发送给leader// 然后对leader返回的数据进行处理,写入到本机的磁盘文件和内存中String url = buildUrl(remote.ip, API_GET);Map<String, String> queryParam = new HashMap<>(1);queryParam.put("keys", URLEncoder.encode(keys, "UTF-8"));HttpClient.asyncHttpGet(url, null, queryParam, new Callback<String>() {@Overridepublic void onReceive(RestResult<String> result) {if (!result.ok()) {return;}List<JsonNode> datumList = JacksonUtils.toObj(result.getData(), new TypeReference<List<JsonNode>>() {});// 从leader中接收到本机需要的datum集合,进行遍历for (JsonNode datumJson : datumList) {Datum newDatum = null;OPERATE_LOCK.lock();try {// 先获取本机中老的DatumDatum oldDatum = getDatum(datumJson.get("key").asText());// 如果远程发送过来Datum的时间戳比本机的时间戳还小,那么就不用处理if (oldDatum != null && datumJson.get("timestamp").asLong() <= oldDatum.timestamp.get()) {Loggers.RAFT.info("[NACOS-RAFT] timestamp is smaller than that of mine, key: {}, remote: {}, local: {}",datumJson.get("key").asText(),datumJson.get("timestamp").asLong(), oldDatum.timestamp);continue;}// 解析成newDatumif (KeyBuilder.matchServiceMetaKey(datumJson.get("key").asText())) {Datum<Service> serviceDatum = new Datum<>();serviceDatum.key = datumJson.get("key").asText();serviceDatum.timestamp.set(datumJson.get("timestamp").asLong());serviceDatum.value = JacksonUtils.toObj(datumJson.get("value").toString(), Service.class);newDatum = serviceDatum;}if (KeyBuilder.matchInstanceListKey(datumJson.get("key").asText())) {Datum<Instances> instancesDatum = new Datum<>();instancesDatum.key = datumJson.get("key").asText();instancesDatum.timestamp.set(datumJson.get("timestamp").asLong());instancesDatum.value = JacksonUtils.toObj(datumJson.get("value").toString(), Instances.class);newDatum = instancesDatum;}if (newDatum == null || newDatum.value == null) {Loggers.RAFT.error("receive null datum: {}", datumJson);continue;}// 先把newDatum写入到磁盘文件中// 文件目录一般是 \nacosHome\data\naming\data\namespaceId\XXXX文件raftStore.write(newDatum);// 更新内存中的数据,先修改datums的数据datums.put(newDatum.key, newDatum);// 再去修改注册标中的数据notifier.notify(newDatum.key, DataOperation.CHANGE, newDatum.value);// 本机重置leaderDueMs时间local.resetLeaderDue();if (local.term.get() + 100 > remote.term.get()) {getLeader().term.set(remote.term.get());local.term.set(getLeader().term.get());} else {local.term.addAndGet(100);}raftStore.updateTerm(local.term.get());Loggers.RAFT.info("data updated, key: {}, timestamp: {}, from {}, local term: {}",newDatum.key, newDatum.timestamp, JacksonUtils.toJson(remote), local.term);} catch (Throwable e) {Loggers.RAFT.error("[RAFT-BEAT] failed to sync datum from leader, datum: {}", newDatum,e);} finally {OPERATE_LOCK.unlock();}}try {TimeUnit.MILLISECONDS.sleep(200);} catch (InterruptedException e) {Loggers.RAFT.error("[RAFT-BEAT] Interrupted error ", e);}return;}@Overridepublic void onError(Throwable throwable) {Loggers.RAFT.error("[RAFT-BEAT] failed to sync datum from leader", throwable);}@Overridepublic void onCancel() {}});batch.clear();} catch (Exception e) {Loggers.RAFT.error("[NACOS-RAFT] failed to handle beat entry, key: {}", datumKey);}}// 此时receivedKeysMap集合中Integer还为0的Datum意味着:leader节点中已经删除了这些DatumList<String> deadKeys = new ArrayList<>();for (Map.Entry<String, Integer> entry : receivedKeysMap.entrySet()) {if (entry.getValue() == 0) {deadKeys.add(entry.getKey());}}for (String deadKey : deadKeys) {try {// 移除Datum,先删除内存中datums,在删除磁盘文件deleteDatum(deadKey);} catch (Exception e) {Loggers.RAFT.error("[NACOS-RAFT] failed to remove entry, key={} {}", deadKey, e);}}}return local;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/41859.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【密码学】密码学五要素

密码学五要素是密码系统的基本组成部分&#xff0c;这五个要素共同构成了密码系统的框架。在实际应用中&#xff0c;密码系统的安全性依赖于密钥的安全管理以及算法的强度。 如果任何一方被泄露或破解&#xff0c;那么整个密码系统都将面临风险。因此&#xff0c;在设计和使用密…

生物化学笔记:电阻抗基础+电化学阻抗谱EIS+电化学系统频率响应分析

视频教程地址 引言 方法介绍 稳定&#xff1a;撤去扰动会到原始状态&#xff0c;反之不稳定&#xff0c;还有近似稳定的 阻抗谱图形&#xff08;Nyquist和Bode图&#xff09; 阻抗谱图形是用于分析电化学系统和材料的工具&#xff0c;主要有两种类型&#xff1a;Nyquist图和B…

《第一行代码》小结

文章目录 一. Android总览1. 系统架构2. 开发环境3. 在红米手机上运行4. 项目资源详解4.1 整体结构4.2 res文件4.3 build.gradle文件 二. Activity0. 常用方法小结1. 创建一个Activity 一. Android总览 1. 系统架构 应用层&#xff1a;所有安装在手机上的应用程序 应用框架层&…

罗剑锋的C++实战笔记学习(一):const、智能指针、lambda表达式

1、const 1&#xff09;、常量 const一般的用法就是修饰变量、引用、指针&#xff0c;修饰之后它们就变成了常量&#xff0c;需要注意的是const并未区分出编译期常量和运行期常量&#xff0c;并且const只保证了运行时不直接被修改 一般的情况&#xff0c;const放在左边&…

解决Docker Desktop启动异常 Docker Desktop- WSL distro terminated abruptly

异常 当打开Docker Desktop时候&#xff0c;启动docker引擎时&#xff0c;提示 加粗样式文本信息 Docker Desktop - WSL distro terminated abruptly A WSL distro Docker Desktop relies on has exited unexpectedly. This usually happensas a result of an external entit…

Vue2基础 14:自定义指令

自定义指令 1 函数式1.1 案例--v-text放大10倍 2 对象式2.1 案例--v-fbind默认获取焦点&#xff08;函数式&#xff09;2.2 案例--v-fbind默认获取焦点&#xff08;对象式&#xff09; 3 自定义指令容易犯的错4 全局指令写法&#xff08;参考过滤器写法&#xff09;&#xff1a…

Go 依赖注入设计模式

&#x1f49d;&#x1f49d;&#x1f49d;欢迎莅临我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:「stormsha的主页」…

系统重装

待更新 重置win11 双系统删除其中一个&#xff0c;并将格式化后的空间并入

跟着峰哥学java 第四天 商品分类 前后端显示

1.后端 1.1mybatis-plus分页查询配置 在商品热卖数据中&#xff0c;只让其显示八条数据 将要使用分页 也就是service.page方法 此时需要配置 mp拦截器 Configuration public class MybatisPlusConfig {Beanpublic PaginationInterceptor paginationInterceptor() {return …

模型训练之数据集

我们知道人工智能的四大要素&#xff1a;数据、算法、算力、场景。我们训练模型离不开数据 目标 一、数据集划分 定义 数据集&#xff1a;训练集是一组训练数据。 样本&#xff1a;一组数据中一个数据 特征&#xff1a;反映样本在某方面的表现、属性或性质事项 训练集&#…

星辰宇宙动态页面vue版,超好看的前端页面。附源码与应用教程(若依)

本代码的html版本&#xff0c;来源自“山羊の前端小窝”作者&#xff0c;我对此进行了vue版本转换以及相关应用。特此与大家一起分享~ 1、直接上效果图&#xff1a; 带文字版&#xff1a;文字呼吸式缩放。 纯净版&#xff1a; 默认展示效果&#xff1a; 缩放与旋转后&#xf…

mysql5.6的安装步骤

1.下载mysql 下载地址&#xff1a;https://downloads.mysql.com/archives/community/ 在这里我们下载zip的包 2.解压mysql包到指定目录 3. 添加my.ini文件 # For advice on how to change settings please see # http://dev.mysql.com/doc/refman/5.6/en/server-configurat…

tongweb+ths6011测试websocket(by lqw)

本次使用的tongweb版本7049m4&#xff0c;测试包ws_example.war&#xff08;在tongweb安装目录的samples/websocket下&#xff09;&#xff0c;ths版本6011 首先在tongweb控制台部署一下ws_example.war,部署后测试是否能访问&#xff1a; 然後ths上的httpserver.conf的參考配…

本地部署到服务器上的资源路径问题

本地部署到服务器上的资源路径问题 服务器端的源代码的静态资源目录层级 当使用Thymeleaf时&#xff0c;在templates的目录下为返回的html页面&#xff0c;下面以两个例子解释当将代码部署到tomcat时访问资源的路径配置问题 例子一 index.html&#xff08;在templates的根目录…

VBA初学:零件成本统计之三(获取材料外协的金额)

第三步&#xff0c;从K3的数据库中获取金额 我这里是使用循环&#xff0c;通过任务单号将金额汇总出来&#xff0c;如果使用数组的话&#xff0c;还要按任务单写GROUP&#xff0c;还要去对应&#xff0c;不如循环直接一点 获取材料和外协金额的表格Sub getje()Dim rowcount A…

leetcode-每日一题

3101. 交替子数组计数https://leetcode.cn/problems/count-alternating-subarrays/ 给你一个 二进制数组 nums 。 如果一个 子数组 中 不存在 两个 相邻 元素的值 相同 的情况&#xff0c;我们称这样的子数组为 交替子数组 。 返回数组 nums 中交替子数组的数量。 示例 …

3-2 梯度与反向传播

3-2 梯度与反向传播 主目录点这里 梯度的含义 可以看到红色区域的变化率较大&#xff0c;梯度较大&#xff1b;绿色区域的变化率较小&#xff0c;梯度较小。 在二维情况下&#xff0c;梯度向量的方向指向函数增长最快的方向&#xff0c;而其大小表示增长的速率。 梯度的计算 …

如何第一次从零上传项目到GitLab

嗨&#xff0c;我是兰若&#xff0c;今天想给大家说下&#xff0c;如何上传一个完整的项目到与LDAP集成的GitLab&#xff0c;也就是说这个项目之前是不在git上面的&#xff0c;这是第一次上传&#xff0c;这样上传上去之后&#xff0c;其他小伙伴就可以根据你这个项目的git地址…

Lua语言入门

目录 Lua语言1 搭建Lua开发环境1.1 安装Lua解释器WindowsLinux 1.2 IntelliJ安装Lua插件在线安装本地安装 2 Lua语法2.1 数据类型2.2 变量全局变量局部变量命名规范局部变量作用域 2.3 注释单行注释多行注释 2.4 赋值2.5 操作符数学操作符比较操作符逻辑操作符连接操作符取长度…

moonlight+sunshine+ParsecVDisplay ipad8-windows 局域网串流

1.sunshine PC 安装 2.设置任意账户密码登录 3.setting 里 network启用UPNP IPV4IPV6 save apply 4.ParsecVDisplay虚拟显示器安装 5.ipad appstore download moonlight 6.以ipad 8 为例 2160*1620屏幕分辨率 7.ParsecVDisplay里面 custom设置2160*1620 240hz&#xff0c;…