文章目录
- Nacos CP集群
- 说明
- Raft协议
- leader选举
- 重新选举leader
- 多个Candidate情况
- 更新操作,日志复制
- 网络分区
- 源码实现
- 服务注册
- leader选举
- leader心跳包
Nacos CP集群
说明
CAP原则
- C 一致性 Consistency
- A 可用性 Availability
- 分区容错性 Partition tolerance
分区容错性:在集群架构下,如果出现了网络中断,某些节点之间不能交互了,此时整个集群服务就不可用了,这是肯定不行的。分区容错性就是出现这种情况但是集群还能提供服务。
一致性和可用性就是当出现网络分区之后,集群中某个时间点数据不一致,此时我是应该先暂时停掉某些节点来保证集群各个节点一致性嘞(或先不对外提供服务,网络分区恢复数据同步完成后再提供服务),还是优先保证集群的可用性(能容忍短时间内的数据不一致性)
在NacosClient端,在配置文件中有一个spring.cloud.nacos.discovery.ephemeral = true
配置项来指定当前服务实例是否为临时实例,默认是临时实例,如果我们改为false则变为持久化实例,它就会进行CP的架构流程
在AP架构下,服务实例数据是写入在内存中的。而CP架构下的持久化实例除了会写入在内存中,它还会写入在file文件中,
文件保存的默认路径是\$nacosHome\data\naming\data\namespaceId\XXXX文件
BASE原则
- BA 基本可用 Basically Available
- S 软状态 Soft State
- E 最终一致性 Eventual Consistency
CAP原则是三选二、BASE原则是CAP的折中,C、A、P三个都要,但不用100%保证每一个原则
分布式系统肯定优先保证P,多数时候是在C和A之间做权衡选择
Raft协议
Raft协议和Zookeepr使用的ZAB协议很相似,主要包括两部分:
- leader选举,半数以上节点投票同意
- 集群写入数据同步,两阶段提交,先记录日志,待提交状态,同步其他节点,半数以上节点写入成功再变为提交状态,响应客户端
Raft协议详细情况:
leader选举
-
刚开始,集群中各个节点都是follower 追随者/属下 状态
-
如果follower节点没有收到leader的消息,那么它就可以成为leader
-
集群中各个节点都有自己的一个随机倒计时,一般是150ms~300ms之间
-
当某个倒计时到了之后,它会变为候选者Candidate状态,首先给自己投一票
-
然后发送请求给其他follower 节点
-
如果接收节点尚未在这一阶段投票,那么它将投票给候选人,follower 节点将回复投票,并且会重新进行随机倒计时
-
当接收到半数以上节点的投票响应后,它就会从Candidate状态变为leader。
-
leader开始定时向其follower 发送心跳消息,心跳包中包含了一些数据
-
follower接收到leader的心跳包之后会返回一些信息,并重置倒计时时间
与ZAB协议的区别是,ZAB的各个节点都会发起投票,它没有倒计时休眠设计,各个节点拿到选票后还要进行pk,pk完后当半数以上的节点支持某个节点成为leader它才会真正成为leader
重新选举leader
leader停止后,将会进行重新选举流程
-
follower接收不到leader的心跳包了,所以倒计时会一直进行
-
当倒计时到了之后,就会变为Candidate候选者,并先给自己投一票,在去给其他follower节点发送请求
-
其他follower节点当前还未在这一阶段投票,就会把票投给请求方
-
当Candidate节点接收到半数以上节点的票后就会变为leader,接下来它便开始发送心跳包了
-
接收follower节点的心跳响应
多个Candidate情况
各个节点都有一个随机的倒计时,可能会出现一种特殊情况,多个节点的随机倒计时一样,在同一时间了从follower状态变为了Candidate候选者状态,并先给自己投一票,在给其他follower节点发送投票请求。
每一个Candidate候选者节点都比另一个节点先到达一个follower追随者节点
现在每个候选人都有2票,不能再获得更多的选票,这种情况下就根本选不出leader
此时就会进行新的一轮选举,因为各个节点的倒计时都还在进行,在新的一轮选举中,总不会还出现了随机倒计时一样的情况吧
此时某个Candidate就会变为leader,并开始发送心跳包了
更新操作,日志复制
-
更新操作只能在leader节点进行,leader节点需要将所有的更新操作,复制给其他follower节点。
-
首先一个客户端发送一个更新操作给leader
-
进行更新操作时,会先写入进一个日志中,该日志项当前未提交状态,因此不会更新leader节点的值
-
leader节点将在下一次心跳时将更改发送给追随者follower 节点
-
然后leader等待,直到大多数节点都写了该数据,follower节点这里也是写的日志,并响应给leader
-
此时leader节点才会将该条记录变为提交状态,并真正进行更新操作
-
响应给客户端
-
leader在通知其他follower 节点该数据已经是提交状态了,其他节点也在将各自日志中的数据进行真正的更新操作
网络分区
Raft协议可以在网络分区时保持一致,比如集群的初始状态如下所示,NodeB为leader,并正在进行发送心跳包
网络分区后,A和B在一边,CDE三个节点在另一边
此时只有NodeA能接收到leader的心跳了,其他三个节点都没有接收到心跳包
其他三个节点的倒计时因为一直没有接收到leader的心跳包,所以也就不会重新开始,那么此时就会出现某一个节点当倒计时到时间后就会从follower状态变为Candidate状态,并开始进行投票机制,它先给自己投一票,然后再发送请求,收到另外的两票,再从Candidate状态变为leader
添加另一个客户端,并尝试更新两个leader。
一个客户端尝试将leader节点的值设置为3,该leader记录日志后,进行日志复制,一直接收不到超过半数节点的响应,当前的更新也就一直是未提交状态
此时其他客户端尝试将leader节点的值设置为8,这将会成功,因为它可以复制到大多数节点
现在让我们修复网络分区
此时节点B将看到更高的选举期限并退出。两个节点A和B都将回滚它们未提交的条目,并匹配新的leader的日志。
此时整个集群的数据一致了。
源码实现
服务注册
服务实例进行注册,调用NacosServer端的接口时,此时的实例会分为临时实例和持久化实例。注册实例时会从controller层进入到service层,会进入到ServiceManager.addInstance()
方法中来
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)throws NacosException {// 根据ephemeral的值来决定生成上面key,默认情况下NacosClient传递过来的都是true,一般微服务的实例都是临时实例,不是持久化实例// 持久化实例 key = 一些字符串常量 + namespaceId + “##” + serviceName// 临时实例 key = 一些字符串常量 + “ephemeral” + namespaceId + “##” + serviceNameString key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);Service service = getService(namespaceId, serviceName);synchronized (service) {List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);Instances instances = new Instances();instances.setInstanceList(instanceList);// 这里会进入到下一个逻辑中consistencyService.put(key, instances);}
}
上方会调用进DelegateConsistencyServiceImpl
实现类的put()
方法中,在这里就会根据key中是否为临时实例,进而决定调用不同实现类的put()方法
public void put(String key, Record value) throws NacosException {// mapConsistencyService(key)根据是否为临时实例,进而去调用不同实现类的put()方法mapConsistencyService(key).put(key, value);
}// 根据是否为临时实例,返回不同的实现类
private ConsistencyService mapConsistencyService(String key) {return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;
}
-
这里如果是临时实例就会进入到
DistroConsistencyServiceImpl
实现类的put()方法中 -
如果是持久化实例就会进入到
PersistentConsistencyServiceDelegateImpl
实现类的put()方法中
我们需要知道一个点,Nacos服务注册中心是对内存的操作,数据库中存储的是配置中心的数据
PersistentConsistencyServiceDelegateImpl
实现类中有一些小改动,我们可以先直接去看他原生的Raft协议RaftConsistencyServiceImpl
类
下面代码具体的实现,RaftConsistencyServiceImpl
类(弃用了,了解即可):
- 在这个类中如果新增了数据它会先进行写文件的操作
- 然后直接发布一个ValueChangeEvent事件
- 处理该事件的方法中会对内存中的数据进行更改
- 发布完事件后再去通知其他节点。
相当于这里没有使用二阶段提交,而是直接一阶段就完成了。这是有一些问题的,就比如其他节点并没有更新成功,但本机却以及更新完成了。
下面的代码可以学一下,这里就使用了CountDownLatch来实现的半数以上节点的正常响应
// RaftConsistencyServiceImpl类
public void put(String key, Record value) throws NacosException {checkIsStopWork();try {// 核心方法入口raftCore.signalPublish(key, value);} catch (Exception e) {Loggers.RAFT.error("Raft put failed.", e);throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value,e);}
}------------------------------------------------------------------------------------------------------
// 进入到RaftCore.signalPublish()方法
// 写文件 --> 直接更新内存,未向nacos其他节点同步数据 --> 再向nacos其他节点同步数据
public void signalPublish(String key, Record value) throws Exception {if (stopWork) {throw new IllegalStateException("old raft protocol already stop work");}// 当前节点如果不是leaderif (!isLeader()) {ObjectNode params = JacksonUtils.createEmptyJsonNode();params.put("key", key);params.replace("value", JacksonUtils.transferToJsonNode(value));Map<String, String> parameters = new HashMap<>(1);parameters.put("key", key);final RaftPeer leader = getLeader();// 直接把请求转发给leader,让leader去进行写数据raftProxy.proxyPostLarge(leader.ip, API_PUB, params.toString(), parameters);return;}// 是leader节点的处理逻辑OPERATE_LOCK.lock();try {final long start = System.currentTimeMillis();final Datum datum = new Datum();datum.key = key;datum.value = value;if (getDatum(key) == null) {datum.timestamp.set(1L);} else {datum.timestamp.set(getDatum(key).timestamp.incrementAndGet());}ObjectNode json = JacksonUtils.createEmptyJsonNode();json.replace("datum", JacksonUtils.transferToJsonNode(datum));json.replace("source", JacksonUtils.transferToJsonNode(peers.local()));// 核心方法,进行发布// 该方法中会先对更新操作写入到磁盘文件中,然后发布一个ValueChangeEvent事件,处理该事件的方法中会对内存中的数据进行更改// 相当于这里没有进行两阶段提交,直接一阶段onPublish(datum, peers.local());final String content = json.toString();// 这里再去通知其他节点,// peers就是各个Nacos集群节点,取peers.size() / 2 + 1 个数的CountDownLatchfinal CountDownLatch latch = new CountDownLatch(peers.majorityCount());// 遍历除了自己之外的nacos节点for (final String server : peers.allServersIncludeMyself()) {if (isLeader(server)) {latch.countDown();continue;}// 调用各个节点 发送请求final String url = buildUrl(server, API_ON_PUB);HttpClient.asyncHttpPostLarge(url, Arrays.asList("key", key), content, new Callback<String>() {@Overridepublic void onReceive(RestResult<String> result) {if (!result.ok()) {Loggers.RAFT.warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",datum.key, server, result.getCode());return;}// 各个节点响应的回调方法,如果是正常响应则countDown()latch.countDown();}@Overridepublic void onError(Throwable throwable) {Loggers.RAFT.error("[RAFT] failed to publish data to peer", throwable);}@Overridepublic void onCancel() {}});}// 如果并没有半数以上的节点正常响应,那么这里就会抛异常,但此时我本机的内存数据已经更改完成了。if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {// only majority servers return success can we consider this update successLoggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", key);throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key);}long end = System.currentTimeMillis();Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key);} finally {OPERATE_LOCK.unlock();}
}------------------------------------------------------------------------------------------------------// 先写文件 ---> 发布事件 ----> 处理事件的类会去更新内存中注册表的数据
public void onPublish(Datum datum, RaftPeer source) throws Exception {if (stopWork) {throw new IllegalStateException("old raft protocol already stop work");}RaftPeer local = peers.local();if (datum.value == null) {Loggers.RAFT.warn("received empty datum");throw new IllegalStateException("received empty datum");}if (!peers.isLeader(source.ip)) {Loggers.RAFT.warn("peer {} tried to publish data but wasn't leader, leader: {}", JacksonUtils.toJson(source),JacksonUtils.toJson(getLeader()));throw new IllegalStateException("peer(" + source.ip + ") tried to publish " + "data but wasn't leader");}if (source.term.get() < local.term.get()) {Loggers.RAFT.warn("out of date publish, pub-term: {}, cur-term: {}", JacksonUtils.toJson(source),JacksonUtils.toJson(local));throw new IllegalStateException("out of date publish, pub-term:" + source.term.get() + ", cur-term: " + local.term.get());}local.resetLeaderDue();// if data should be persisted, usually this is true:if (KeyBuilder.matchPersistentKey(datum.key)) {// 写文件 一般是写入到\nacosHome\data\naming\data\namespaceId\XXXX文件raftStore.write(datum);}datums.put(datum.key, datum);if (isLeader()) {local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);} else {if (local.term.get() + PUBLISH_TERM_INCREASE_COUNT > source.term.get()) {//set leader term:getLeader().term.set(source.term.get());local.term.set(getLeader().term.get());} else {local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);}}raftStore.updateTerm(local.term.get());// 文件写完之后会发布一个ValueChangeEvent事件,而处理该事件的是在PersistentNotifier类中onEvent()方法// 在处理事件的方法中就会真正去更新内存中的数据,也就是服务注册表中的数据NotifyCenter.publishEvent(ValueChangeEvent.builder().key(datum.key).action(DataOperation.CHANGE).build());Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term);
}
leader选举
上面服务注册是调用的RaftCore.signalPublish()
方法,我们现在看看RaftCore.init()
方法
@PostConstruct
public void init() throws Exception {Loggers.RAFT.info("initializing Raft sub-system");final long start = System.currentTimeMillis();// 该节点刚启动,需要从磁盘文件中读取数据// 持久化实例是会在文件中保存一份,如果当且节点宕机了再重启,肯定是需要重新读取文件进行数据恢复的raftStore.loadDatums(notifier, datums);...// 执行两个定时任务,每隔500ms执行一次,一个是选举主节点,另一个是发送心跳包masterTask = GlobalExecutor.registerMasterElection(new MasterElection());heartbeatTask = GlobalExecutor.registerHeartbeat(new HeartBeat());...
}
对于Leader的选举核心就是要看MasterElection
这个定时任务:
具体实现流程总结如下:
-
检验选举时间leaderDueMs属性是<=0
-
重置选举时间和发送心跳包时间
-
重置集群各个节点投票
-
选举周期+1
-
先投票给自己、修改为candidate
-
向各个follower节点发送投票请求,请求参数就是本机节点信息
/raft/vote 接口提供方的逻辑:
- 如果收到的候选节点term小于等于本地节点term,则本地节点的voteFor更新为自己(意思是这一票投给我自己,我更适合做leader)
- 否则这个follower将重置它的election timeout;更新它的voteFor为收到的候选者节点ip(意思是这一票就投给你了)
- 更新它的term为收到的候选者节点term;
- 将本地节点作为响应数据返回
-
发送投票请求后,处理响应数据
- 遍历所有的节点,若节点的voteFor不为null,则将voteFor添加进ips中
- 记录被选举次数最多的节点和次数
- 选举最多次数 > 集群节点半数+1 就把选举次数最多的节点状态改为leader
public class MasterElection implements Runnable {@Overridepublic void run() {try {if (stopWork) {return;}if (!peers.isReady()) {return;}// 每执行一次任务就减一次任务执行的间隔时间,直到减到小于等于0就可以开始投票了RaftPeer local = peers.local();local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS;if (local.leaderDueMs > 0) {return;}// reset timeout// 重置选举时间local.resetLeaderDue();// 重置心跳时间local.resetHeartbeatDue();// 核心方法 进行投票sendVote();} catch (Exception e) {Loggers.RAFT.warn("[RAFT] error while master election {}", e);}}--------------------------------------------------------------------------------------------------------------------// 真正发送请求 进行leader选举投票private void sendVote() {RaftPeer local = peers.get(NetUtils.localServer());Loggers.RAFT.info("leader timeout, start voting,leader: {}, term: {}", JacksonUtils.toJson(getLeader()),local.term);// 重置集群各个节点投票peers.reset();// 选举周期+1local.term.incrementAndGet();// 先投票给自己local.voteFor = local.ip;// 把自己的状态从follower改为候选者Candidatelocal.state = RaftPeer.State.CANDIDATE;// 请求参数为local对象Map<String, String> params = new HashMap<>(1);params.put("vote", JacksonUtils.toJson(local));// 遍历除了自己之外的集群节点for (final String server : peers.allServersWithoutMySelf()) {// 获取各个节点的url,发送异步post请求final String url = buildUrl(server, API_VOTE);try {/*** /raft/vote 接口提供方的逻辑:* 如果收到的候选节点term小于等于本地节点term,则本地节点的voteFor更新为自己(意思是这一票投给我自己,我更适合做leader)* 否则这个follower将重置它的election timeout;更新它的voteFor为收到的候选者节点ip(意思是这一票就投给你了)* 更新它的term为收到的候选者节点term;* 将本地节点作为响应数据返回*/HttpClient.asyncHttpPost(url, null, params, new Callback<String>() {@Overridepublic void onReceive(RestResult<String> result) {if (!result.ok()) {Loggers.RAFT.error("NACOS-RAFT vote failed: {}, url: {}", result.getCode(), url);return;}// 解析其他节点的响应数据RaftPeer peer = JacksonUtils.toObj(result.getData(), RaftPeer.class);Loggers.RAFT.info("received approve from peer: {}", JacksonUtils.toJson(peer));// 去处理其他节点的响应数据// 遍历所有的节点,若节点的voteFor不为null,则将voteFor添加进ips中// 记录被选举次数最多的节点和次数// 选举最多次数 > 集群节点半数+1 就把选举次数最多的节点状态改为leaderpeers.decideLeader(peer);}@Overridepublic void onError(Throwable throwable) {Loggers.RAFT.error("error while sending vote to server: {}", server, throwable);}@Overridepublic void onCancel() {}});} catch (Exception e) {Loggers.RAFT.warn("error while sending vote to server: {}", server);}}}
}
// 遍历所有的节点,若节点的voteFor不为null,则将voteFor添加进ips中
// 记录被选举次数最多的节点和次数
// 选举最多次数 > 集群节点半数+1 就把选举次数最多的节点状态改为leader
public RaftPeer decideLeader(RaftPeer candidate) {peers.put(candidate.ip, candidate);SortedBag ips = new TreeBag();// 记录被选举最多次数int maxApproveCount = 0;// 记录被选举次数最多的节点String maxApprovePeer = null;for (RaftPeer peer : peers.values()) {if (StringUtils.isEmpty(peer.voteFor)) {continue;}ips.add(peer.voteFor);if (ips.getCount(peer.voteFor) > maxApproveCount) {maxApproveCount = ips.getCount(peer.voteFor);maxApprovePeer = peer.voteFor;}}// 当前收到的选票 > 集群节点半数+1if (maxApproveCount >= majorityCount()) {RaftPeer peer = peers.get(maxApprovePeer);// 把maxApprovePeer的节点状态改为leaderpeer.state = RaftPeer.State.LEADER;if (!Objects.equals(leader, peer)) {leader = peer;ApplicationUtils.publishEvent(new LeaderElectFinishedEvent(this, leader, local()));Loggers.RAFT.info("{} has become the LEADER", leader.ip);}}return leader;
}
leader心跳包
在RaftCore.init()
方法,执行两个定时任务,每隔5秒执行一次,一个是选举主节点,另一个是发送心跳包
@PostConstruct
public void init() throws Exception {Loggers.RAFT.info("initializing Raft sub-system");final long start = System.currentTimeMillis();// 该节点刚启动,需要从磁盘文件中读取数据// 持久化实例是会在文件中保存一份,如果当且节点宕机了再重启,肯定是需要重新读取文件进行数据恢复的raftStore.loadDatums(notifier, datums);...// 执行两个定时任务,每隔5秒执行一次,一个是选举主节点,另一个是发送心跳包masterTask = GlobalExecutor.registerMasterElection(new MasterElection());heartbeatTask = GlobalExecutor.registerHeartbeat(new HeartBeat());...
}
HeartBeat定时任务发送心跳包的具体实现:
具体实现流程总结如下:
- 发送心跳包时间heartbeatDueMs是否 <= 0
- 重置heartbeatDueMs 时间
- 当前的节点如果不是leader那么就不能发送心跳
- 处理请求参数,将本机节点信息、本机中所有的Datum.key 和它所对应的时间戳进行压缩,封装为请求参数作为心跳包发送个各个节点
public class HeartBeat implements Runnable{@Overridepublic void run() {try {if (stopWork) {return;}if (!peers.isReady()) {return;}// 每执行一次任务就减少一次 任务执行的间隔时间500ms ,直到减少到小于等于0就开始向其他节点发送心跳RaftPeer local = peers.local();local.heartbeatDueMs -= GlobalExecutor.TICK_PERIOD_MS;if (local.heartbeatDueMs > 0) {return;}// 重置heartbeatDueMs 时间local.resetHeartbeatDue();// 核心方法,发送心跳的逻辑sendBeat();} catch (Exception e) {Loggers.RAFT.warn("[RAFT] error while sending beat {}", e);}}--------------------------------------------------------------------------------------------------------------------private void sendBeat() throws IOException, InterruptedException {RaftPeer local = peers.local();// 当前的节点如果不是leader那么就不能发送心跳if (EnvUtil.getStandaloneMode() || local.state != RaftPeer.State.LEADER) {return;}if (Loggers.RAFT.isDebugEnabled()) {Loggers.RAFT.debug("[RAFT] send beat with {} keys.", datums.size());}local.resetLeaderDue();// build data// 心跳包最终的数据ObjectNode packet = JacksonUtils.createEmptyJsonNode();// 当前节点信息packet.replace("peer", JacksonUtils.transferToJsonNode(local));// array存放当前节点中所有服务实例集合对应的datum.keyArrayNode array = JacksonUtils.createEmptyArrayNode();if (switchDomain.isSendBeatOnly()) {Loggers.RAFT.info("[SEND-BEAT-ONLY] {}", switchDomain.isSendBeatOnly());}if (!switchDomain.isSendBeatOnly()) {// 遍历当前所有的datumsfor (Datum datum : datums.values()) {// 创建一个空的ObjectNodeObjectNode element = JacksonUtils.createEmptyJsonNode();// 这里只存放datum.keyif (KeyBuilder.matchServiceMetaKey(datum.key)) {element.put("key", KeyBuilder.briefServiceMetaKey(datum.key));} else if (KeyBuilder.matchInstanceListKey(datum.key)) {element.put("key", KeyBuilder.briefInstanceListkey(datum.key));}// 存放datum对应的timestamp时间戳,也就相当于一个版本号element.put("timestamp", datum.timestamp.get());// 添加进array中array.add(element);}}packet.replace("datums", array);// 接下来对请求参数packet进行封装并压缩// broadcastMap<String, String> params = new HashMap<String, String>(1);params.put("beat", JacksonUtils.toJson(packet));String content = JacksonUtils.toJson(params);ByteArrayOutputStream out = new ByteArrayOutputStream();GZIPOutputStream gzip = new GZIPOutputStream(out);gzip.write(content.getBytes(StandardCharsets.UTF_8));gzip.close();byte[] compressedBytes = out.toByteArray();String compressedContent = new String(compressedBytes, StandardCharsets.UTF_8);// compressedContent存放的就是压缩之后的请求参数if (Loggers.RAFT.isDebugEnabled()) {Loggers.RAFT.debug("raw beat data size: {}, size of compressed data: {}", content.length(),compressedContent.length());}// 向除了自己的其他节点发送心跳包,心跳包中包含的数据就是当前节点信息 + 当前节点中所有的datum.keyfor (final String server : peers.allServersWithoutMySelf()) {try {final String url = buildUrl(server, API_BEAT);if (Loggers.RAFT.isDebugEnabled()) {Loggers.RAFT.debug("send beat to server " + server);}// /raft/beat 发送请求HttpClient.asyncHttpPostLarge(url, null, compressedBytes, new Callback<String>() {@Overridepublic void onReceive(RestResult<String> result) {if (!result.ok()) {Loggers.RAFT.error("NACOS-RAFT beat failed: {}, peer: {}", result.getCode(), server);MetricsMonitor.getLeaderSendBeatFailedException().increment();return;}peers.update(JacksonUtils.toObj(result.getData(), RaftPeer.class));if (Loggers.RAFT.isDebugEnabled()) {Loggers.RAFT.debug("receive beat response from: {}", url);}}@Overridepublic void onError(Throwable throwable) {Loggers.RAFT.error("NACOS-RAFT error while sending heart-beat to peer: {} {}", server,throwable);MetricsMonitor.getLeaderSendBeatFailedException().increment();}@Overridepublic void onCancel() {}});} catch (Exception e) {Loggers.RAFT.error("error while sending heart-beat to peer: {} {}", server, e);MetricsMonitor.getLeaderSendBeatFailedException().increment();}}}
}
我们接下来再来看看/raft/beat
接口的具体实现,看看follower节点是如何处理leader节点的心跳包的,会进入到RaftController.beat()
方法
具体实现流程总结如下:
-
解析请求参数
-
接收到的心跳包如果不是leader发送的则抛异常
-
如果本机的term 大于 心跳包term,则心跳包不进行处理
-
当前节点的状态如果不是follower,那么就把自己的节点状态改为follower,voteFor改为请求参数的ip
-
重置本机的选举时间和心跳发送时间
-
更新leader信息,将remote设置为新leader,更新原有leader的节点信息(leader会通过心跳通知其他节点更新leader)
-
创建一个Map集合:
Map<String, Integer> receivedKeysMap
- 该集合中存放的是Datum.key,其中Integer的值为0表示Datum.key是本机的,如果为1则表示是远程心跳包中的
- 经过下面的处理之后,receivedKeysMap中可能还会存在一些Integer的值为0,而这些数据意味着在leader节点中这些数据已经被删除了
- 所以在最后会有一个利用该集合进行移除Datum的操作
-
将本机中的datum对象都存入receivedKeysMap集合中,value都是0
-
创建一个批处理集合batch
-
遍历心跳包中的beatDatums
- 每一次遍历都把datumKey存入到receivedKeysMap中去,value都是1
- 远程心跳包中的datumKey是否在我本机的datums集合中存在,并且本机该datumKey对应的时间戳也比远程心跳包中的时间戳更大。这就表示我不需要处理这一条datumKey
- 和上面的if判断逻辑相反,则需要添加进batch这个集合中
- 批量处理逻辑,表示我的batch集合还能继续存放数据
假如远程心跳包传递过来了100条datumKey 难道我要一条一条的去处理吗?所以肯定是需要批处理的
processedCount的数值最终是 等于 beatDatums集合中的个数,表示真正处理beatDatums中的最后一个元素
第二个条件是 假如的传递过来的总数只有10条嘞?那岂不是就直接遍历完跳出循环了,batch集合中存放的10条数据都还没有进行处理 - 当batch集合存放的数据超过50后,调用远程心跳包发送过来的leader节点ip,把我需要的datumKey集合发送给leader。 然后对leader返回的数据进行处理,写入到本机的磁盘文件和内存中
-
循环遍历结束后,处理receivedKeysMap集合value为0的Datum。移除Datum,先删除内存中datums,在删除磁盘文件
// 在RaftController.beat()方法中不会进行什么重要的处理,会直接调用到receivedBeat(JsonNode beat)方法中
public RaftPeer receivedBeat(JsonNode beat) throws Exception {if (stopWork) {throw new IllegalStateException("old raft protocol already stop work");}// 解析请求参数final RaftPeer local = peers.local();final RaftPeer remote = new RaftPeer();JsonNode peer = beat.get("peer");remote.ip = peer.get("ip").asText();remote.state = RaftPeer.State.valueOf(peer.get("state").asText());remote.term.set(peer.get("term").asLong());remote.heartbeatDueMs = peer.get("heartbeatDueMs").asLong();remote.leaderDueMs = peer.get("leaderDueMs").asLong();remote.voteFor = peer.get("voteFor").asText();// 接收到的心跳包如果不是leader发送的则抛异常if (remote.state != RaftPeer.State.LEADER) {Loggers.RAFT.info("[RAFT] invalid state from master, state: {}, remote peer: {}", remote.state,JacksonUtils.toJson(remote));throw new IllegalArgumentException("invalid state from master, state: " + remote.state);}// 如果本机的term 大于 心跳包term,则心跳包不进行处理if (local.term.get() > remote.term.get()) {Loggers.RAFT.info("[RAFT] out of date beat, beat-from-term: {}, beat-to-term: {}, remote peer: {}, and leaderDueMs: {}",remote.term.get(), local.term.get(), JacksonUtils.toJson(remote), local.leaderDueMs);throw new IllegalArgumentException("out of date beat, beat-from-term: " + remote.term.get() + ", beat-to-term: " + local.term.get());}// 当前节点的状态如果不是follower,那么就把自己的节点状态改为follower,voteFor改为请求参数的ipif (local.state != RaftPeer.State.FOLLOWER) {Loggers.RAFT.info("[RAFT] make remote as leader, remote peer: {}", JacksonUtils.toJson(remote));// mk followerlocal.state = RaftPeer.State.FOLLOWER;local.voteFor = remote.ip;}// 心跳中的核心数据包final JsonNode beatDatums = beat.get("datums");// 重置选举时间和心跳发送时间local.resetLeaderDue();local.resetHeartbeatDue();// 更新leader信息,将remote设置为新leader,更新原有leader的节点信息(leader会通过心跳通知其他节点更新leader)peers.makeLeader(remote);if (!switchDomain.isSendBeatOnly()) {// 该集合中存放的是Datum.key,其中Integer的值为0表示Datum.key是本机的,如果为1则表示是远程心跳包中的// 经过下面方法的处理之后,receivedKeysMap中可能还会存在一些Integer的值为0,而这些数据意味着在leader节点中这些数据已经被删除了// 所以在该方法的最后会有一个利用 该集合 进行移除Datum的操作Map<String, Integer> receivedKeysMap = new HashMap<>(datums.size());// 将本机中的datum对象都存入集合中,value都是0for (Map.Entry<String, Datum> entry : datums.entrySet()) {receivedKeysMap.put(entry.getKey(), 0);}// now check datums// 创建一个批处理集合List<String> batch = new ArrayList<>();int processedCount = 0;if (Loggers.RAFT.isDebugEnabled()) {Loggers.RAFT.debug("[RAFT] received beat with {} keys, RaftCore.datums' size is {}, remote server: {}, term: {}, local term: {}",beatDatums.size(), datums.size(), remote.ip, remote.term, local.term);}// 遍历心跳包中的beatDatumsfor (Object object : beatDatums) {processedCount = processedCount + 1;JsonNode entry = (JsonNode) object;String key = entry.get("key").asText();final String datumKey;// 得到datumKeyif (KeyBuilder.matchServiceMetaKey(key)) {datumKey = KeyBuilder.detailServiceMetaKey(key);} else if (KeyBuilder.matchInstanceListKey(key)) {datumKey = KeyBuilder.detailInstanceListkey(key);} else {// ignore corrupted key:continue;}long timestamp = entry.get("timestamp").asLong();// 把datumKey存入到receivedKeysMap中去receivedKeysMap.put(datumKey, 1);try {// 远程心跳包中的datumKey是否在我本机的datums集合中存在,并且本机该datumKey对应的时间戳也比远程心跳包中的时间戳更大// 这就表示我不需要处理这一条datumKeyif (datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp&& processedCount < beatDatums.size()) {continue;}// 和上面的if判断逻辑相反,则需要添加进batch这个集合中if (!(datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp)) {batch.add(datumKey);}// 批量处理逻辑,表示我的batch集合还能继续存放数据// 假如远程心跳包传递过来了100条datumKey 难道我要一条一条的去处理吗?所以肯定是需要批处理的// processedCount的数值最终是 等于 beatDatums集合中的个数,表示真正处理beatDatums中的最后一个元素// 第二个条件是 假如的传递过来的总数只有10条嘞?那岂不是就直接遍历完跳出循环了,batch集合中存放的10条数据都还没有进行处理if (batch.size() < 50 && processedCount < beatDatums.size()) {continue;}// 当batch集合存放的数据超过50后String keys = StringUtils.join(batch, ",");if (batch.size() <= 0) {continue;}Loggers.RAFT.info("get datums from leader: {}, batch size is {}, processedCount is {}"+ ", datums' size is {}, RaftCore.datums' size is {}", getLeader().ip, batch.size(),processedCount, beatDatums.size(), datums.size());// update datum entry// 调用远程心跳包发送过来的leader节点ip,把我需要的datumKey集合发送给leader// 然后对leader返回的数据进行处理,写入到本机的磁盘文件和内存中String url = buildUrl(remote.ip, API_GET);Map<String, String> queryParam = new HashMap<>(1);queryParam.put("keys", URLEncoder.encode(keys, "UTF-8"));HttpClient.asyncHttpGet(url, null, queryParam, new Callback<String>() {@Overridepublic void onReceive(RestResult<String> result) {if (!result.ok()) {return;}List<JsonNode> datumList = JacksonUtils.toObj(result.getData(), new TypeReference<List<JsonNode>>() {});// 从leader中接收到本机需要的datum集合,进行遍历for (JsonNode datumJson : datumList) {Datum newDatum = null;OPERATE_LOCK.lock();try {// 先获取本机中老的DatumDatum oldDatum = getDatum(datumJson.get("key").asText());// 如果远程发送过来Datum的时间戳比本机的时间戳还小,那么就不用处理if (oldDatum != null && datumJson.get("timestamp").asLong() <= oldDatum.timestamp.get()) {Loggers.RAFT.info("[NACOS-RAFT] timestamp is smaller than that of mine, key: {}, remote: {}, local: {}",datumJson.get("key").asText(),datumJson.get("timestamp").asLong(), oldDatum.timestamp);continue;}// 解析成newDatumif (KeyBuilder.matchServiceMetaKey(datumJson.get("key").asText())) {Datum<Service> serviceDatum = new Datum<>();serviceDatum.key = datumJson.get("key").asText();serviceDatum.timestamp.set(datumJson.get("timestamp").asLong());serviceDatum.value = JacksonUtils.toObj(datumJson.get("value").toString(), Service.class);newDatum = serviceDatum;}if (KeyBuilder.matchInstanceListKey(datumJson.get("key").asText())) {Datum<Instances> instancesDatum = new Datum<>();instancesDatum.key = datumJson.get("key").asText();instancesDatum.timestamp.set(datumJson.get("timestamp").asLong());instancesDatum.value = JacksonUtils.toObj(datumJson.get("value").toString(), Instances.class);newDatum = instancesDatum;}if (newDatum == null || newDatum.value == null) {Loggers.RAFT.error("receive null datum: {}", datumJson);continue;}// 先把newDatum写入到磁盘文件中// 文件目录一般是 \nacosHome\data\naming\data\namespaceId\XXXX文件raftStore.write(newDatum);// 更新内存中的数据,先修改datums的数据datums.put(newDatum.key, newDatum);// 再去修改注册标中的数据notifier.notify(newDatum.key, DataOperation.CHANGE, newDatum.value);// 本机重置leaderDueMs时间local.resetLeaderDue();if (local.term.get() + 100 > remote.term.get()) {getLeader().term.set(remote.term.get());local.term.set(getLeader().term.get());} else {local.term.addAndGet(100);}raftStore.updateTerm(local.term.get());Loggers.RAFT.info("data updated, key: {}, timestamp: {}, from {}, local term: {}",newDatum.key, newDatum.timestamp, JacksonUtils.toJson(remote), local.term);} catch (Throwable e) {Loggers.RAFT.error("[RAFT-BEAT] failed to sync datum from leader, datum: {}", newDatum,e);} finally {OPERATE_LOCK.unlock();}}try {TimeUnit.MILLISECONDS.sleep(200);} catch (InterruptedException e) {Loggers.RAFT.error("[RAFT-BEAT] Interrupted error ", e);}return;}@Overridepublic void onError(Throwable throwable) {Loggers.RAFT.error("[RAFT-BEAT] failed to sync datum from leader", throwable);}@Overridepublic void onCancel() {}});batch.clear();} catch (Exception e) {Loggers.RAFT.error("[NACOS-RAFT] failed to handle beat entry, key: {}", datumKey);}}// 此时receivedKeysMap集合中Integer还为0的Datum意味着:leader节点中已经删除了这些DatumList<String> deadKeys = new ArrayList<>();for (Map.Entry<String, Integer> entry : receivedKeysMap.entrySet()) {if (entry.getValue() == 0) {deadKeys.add(entry.getKey());}}for (String deadKey : deadKeys) {try {// 移除Datum,先删除内存中datums,在删除磁盘文件deleteDatum(deadKey);} catch (Exception e) {Loggers.RAFT.error("[NACOS-RAFT] failed to remove entry, key={} {}", deadKey, e);}}}return local;
}