Nacos 进阶篇---集群:选举心跳健康检查劳动者(九)

一、引言

   本章将是我们第二阶段,开始学习集群模式下,Nacos 是怎么去操作的 ?

本章重点:

  • 在Nacos服务端当中,会去开启健康心跳检查定时任务。如果是在Nacos集群下,大家思考一下,有没有必要所有的集群实例都去执行开启健康心跳检查定时任务?还是只有当中一个实例去执行健康心跳检查定时任务,然后把结果同步给其他集群实例的节点 ?大家可以思考一下~
  • 既然Nacos有健康心跳检查定时任务,如果微服务健康实例状态发生了改变,这个时候Nacos是怎么把健康实例同步给其他Nacos 集群节点的 ?代码怎么实现的 ?

带着这些问题我们一起往下看吧 ~

二、目录 

目录

          一、引言

二、目录 

三、集群心跳健康检查架构分析

四、集群心跳健康检查选举源码分析

五、集群实例健康状态同步源码分析

六、本章总结


三、集群心跳健康检查架构分析

我们先来分析第一问题。 在Nacos集群下,所有的集群实例都去执行开启健康心跳检查定时任务?还是只有当中一个实例去执行健康心跳检查定时任务,然后把结果同步给其他集群实例的节点 ?

  • 如果是在Nacos集群下,所有的集群实例都去执行开启健康心跳检查定时任务。那么就会出现跑出来结果不一致的问题,那么以哪个集群实例结果为准呢 ?很明显这种方式很不合理。
  • 那么就是第二种方式的了,只有当中一个实例去执行健康心跳检查定时任务,然后把结果同步给其他集群实例的节点 。

第二种方式明显更加靠谱点,逻辑也更加简洁。在Nacos集群当中也是这么做的,所有集群实例都会开启健康心跳检查任务,但是真正执行健康心跳任务检查逻辑的只有一个实例,在执行完成后。会有一个定时任务,把结果同步给其他集群节点

那我们接下来看看源码当中,Nacos 是怎么去实现的~

四、集群心跳健康检查选举源码分析

既然是 ” 心跳健康检查 “ ,我们还是要看服务端实例注册接口中的 ClientBeatCheckTask 任务:

那我们直接看 ClientBeatCheckTask 当中的 run 方法,一开始有两个 if 判断方法:

// 集群下,判断自身节点是否需要执行心跳健康检查任务,如果不需要,直接 return
if (!getDistroMapper().responsible(service.getName())) {return;
}// 判断是否需要开启健康任务检查,默认为: true
if (!getSwitchDomain().isHealthCheckEnabled()) {return;
}

那么集群下是如何保证只有一台节点去执行定时任务的,关键点就在于第一个判断当中 responsible方法,那我们具体来看下代码:

public boolean responsible(String serviceName) {// 获取集群节点的数量final List<String> servers = healthyList;// 如果为单机模式,就直接返回为 trueif (!switchDomain.isDistroEnabled() || EnvUtil.getStandaloneMode()) {return true;}// 没有可用健康集群的节点,就直接返回 falseif (CollectionUtils.isEmpty(servers)) {// means distro config is not ready yetreturn false;}int index = servers.indexOf(EnvUtil.getLocalAddress());int lastIndex = servers.lastIndexOf(EnvUtil.getLocalAddress());if (lastIndex < 0 || index < 0) {return true;}// 把 serviceName 的进行 hash操作,然后和 servers.size() 取模,最终只有一个集群节点能够返回 trueint target = distroHash(serviceName) % servers.size();return target >= index && target <= lastIndex;
}

通过这个方法我们可以得知,在Nacos集群下,只会有一个节点去执行定时任务。那么该节点定时执行完,怎么把结果同步给其他集群节点的呢 ?

我们一起来往下接着看~

五、集群实例健康状态同步源码分析

本节重点:在Nacos集群下,只会有一个节点去执行定时任务。那么该节点定时执行完,怎么把结果同步给其他集群节点的呢 ?

在 ServiceManager 类中,init() 方法被 @PostConstruct 注解修饰,在Spring 创建 Bean的时候,会去执行 init()方法。在这个方法当中,会去开启心跳健康检查同步的定时任务,我们一起来看下~

@PostConstruct
public void init() {// 同步心跳健康检查异结果异步任务GlobalExecutor.scheduleServiceReporter(new ServiceReporter(), 60000, TimeUnit.MILLISECONDS);// 处理 同步心跳健康检查异结果异步任务  内存队列 + 异步任务GlobalExecutor.submitServiceUpdateManager(new UpdatedServiceProcessor());// 省略部分代码
}

那我们先来看下 同步心跳健康检查异结果异步任务代码,ServiceReporter 当中的 run() 方法:

我们可以把这块代码分成三个部分,这样看更容易理解:

第一部分:获取当前所有服务,key:命名空间 value:服务名称

// 获取全部服务,key:命名空间   value:服务名称
Map<String, Set<String>> allServiceNames = getAllServiceNames();if (allServiceNames.size() <= 0) {//ignorereturn;
}

第二部分:遍历 allServiceNames 中的每一个命名空间 ,封装请求参数 ,准备同步健康心跳检查结果

// 遍历 allServiceNames 中的每一个命名空间 ,封装请求参数 ,准备同步健康心跳检查结果
for (String namespaceId : allServiceNames.keySet()) {ServiceChecksum checksum = new ServiceChecksum(namespaceId);// 遍历每一个命名空间对应 serviceName 服务名称for (String serviceName : allServiceNames.get(namespaceId)) {if (!distroMapper.responsible(serviceName)) {continue;}Service service = getService(namespaceId, serviceName);if (service == null || service.isEmpty()) {continue;}service.recalculateChecksum();// 添加请求参数checksum.addItem(serviceName, service.getChecksum());}// 封装 Message 对象数据,把请求对象转换成JSONMessage msg = new Message();msg.setData(JacksonUtils.toJson(checksum));Collection<Member> sameSiteServers = memberManager.allMembers();if (sameSiteServers == null || sameSiteServers.size() <= 0) {return;}

第三部分:同步结果到其他集群节点

for (Member server : sameSiteServers) {// 判断是否是当前集群的节点,如果是就跳过if (server.getAddress().equals(NetUtils.localServer())) {continue;}// 重点:同步其他集群节点synchronizer.send(server.getAddress(), msg);
}

在 synchronizer.send(server.getAddress(), msg); 这个方法当中,会通过HTTP 方式给其他集群节点同步心跳任务健康检查结果:

@Override
public void send(final String serverIP, Message msg) {if (serverIP == null) {return;}// 创建请求参数Map<String, String> params = new HashMap<String, String>(10);params.put("statuses", msg.getData());params.put("clientIP", NetUtils.localServer());// 拼接 url 地址String url = "http://" + serverIP + ":" + EnvUtil.getPort() + EnvUtil.getContextPath()+ UtilsAndCommons.NACOS_NAMING_CONTEXT + "/service/status";if (IPUtil.containsPort(serverIP)) {url = "http://" + serverIP + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT+ "/service/status";}try {// 异步发送 http 请求,请求地址:http://ip/v1/ns/service/status , 同步心跳健康检查结果HttpClient.asyncHttpPostLarge(url, null, JacksonUtils.toJson(params), new Callback<String>() {// 代码省略});} catch (Exception e) {Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serviceStatus, remote server: " + serverIP, e);}}

通过代码可以得知,最终也是通过 HTTP 的方式来进行数据同步的,也能够看出请求地址是v1/ns/service/status。接下来我们一起来看下请求地址对应的接口代码逻辑,其实代码很好找,看下图:

这块代码就不细讲了,主要逻辑就是 判断服务状态是否有变动 ,有变动的话就 包装 ServiceKey 对象, 放入到 toBeUpdatedServicesQueue 阻塞队列当中。

代码如下:

public void addUpdatedServiceToQueue(String namespaceId, String serviceName, String serverIP, String checksum) {lock.lock();try {// 包装 ServiceKey 对象, 放入到 toBeUpdatedServicesQueue 阻塞队列当中toBeUpdatedServicesQueue.offer(new ServiceKey(namespaceId, serviceName, serverIP, checksum), 5, TimeUnit.MILLISECONDS);} catch (Exception e) {toBeUpdatedServicesQueue.poll();toBeUpdatedServicesQueue.add(new ServiceKey(namespaceId, serviceName, serverIP, checksum));Loggers.SRV_LOG.error("[DOMAIN-STATUS] Failed to add service to be updated to queue.", e);} finally {lock.unlock();}
}

我们刚刚分析的在 ServiceManager类中的 init 方法(代码如下),第一个线程任务就是同步心跳健康检查结果的异步任务,那么我们接下来分析第二个线程任务。

@PostConstruct
public void init() {// 同步心跳健康检查异结果异步任务GlobalExecutor.scheduleServiceReporter(new ServiceReporter(), 60000, TimeUnit.MILLISECONDS);// 处理 同步心跳健康检查异结果异步任务  内存队列 + 异步任务GlobalExecutor.submitServiceUpdateManager(new UpdatedServiceProcessor());// 省略部分代码
}

第二个线程任务类是:UpdatedServiceProcessor,我们从run 方法中(代码如下),能够看出是一个 while 循环,并且是没有结束条件的。在循环的逻辑当中,会从toBeUpdatedServicesQueue阻塞队列中一直取任务,取到任务之后,又是提交了一个线程池任务。

@Override
public void run() {ServiceKey serviceKey = null;try {while (true) {try {// 从阻塞队列当中一直获取任务serviceKey = toBeUpdatedServicesQueue.take();} catch (Exception e) {Loggers.EVT_LOG.error("[UPDATE-DOMAIN] Exception while taking item from LinkedBlockingDeque.");}if (serviceKey == null) {continue;}// 把任务提交到线程池执行GlobalExecutor.submitServiceUpdate(new ServiceUpdater(serviceKey));}} catch (Exception e) {Loggers.EVT_LOG.error("[UPDATE-DOMAIN] Exception while update service: {}", serviceKey, e);}
}

那我们接着看 ServiceUpdater 当中的 run 方法,代码如下:

@Override
public void run() {try {// 调用更改健康状态方法updatedHealthStatus(namespaceId, serviceName, serverIP);} catch (Exception e) {Loggers.SRV_LOG.warn("[DOMAIN-UPDATER] Exception while update service: {} from {}, error: {}", serviceName,serverIP, e);}
}public void updatedHealthStatus(String namespaceId, String serviceName, String serverIP) {Message msg = synchronizer.get(serverIP, UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));// 解析参数JsonNode serviceJson = JacksonUtils.toObj(msg.getData());ArrayNode ipList = (ArrayNode) serviceJson.get("ips");Map<String, String> ipsMap = new HashMap<>(ipList.size());for (int i = 0; i < ipList.size(); i++) {String ip = ipList.get(i).asText();String[] strings = ip.split("_");ipsMap.put(strings[0], strings[1]);}Service service = getService(namespaceId, serviceName);if (service == null) {return;}// 是否改变标识boolean changed = false;// 遍历全部实例信息,更新健康状态List<Instance> instances = service.allIPs();for (Instance instance : instances) {boolean valid = Boolean.parseBoolean(ipsMap.get(instance.toIpAddr()));if (valid != instance.isHealthy()) {changed = true;instance.setHealthy(valid);Loggers.EVT_LOG.info("{} {SYNC} IP-{} : {}:{}@{}", serviceName,(instance.isHealthy() ? "ENABLED" : "DISABLED"), instance.getIp(), instance.getPort(),instance.getClusterName());}}// 如果实例健康状态改变了,那么就发布 服务改变事件,使用 upd 的方式通知客户端if (changed) {pushService.serviceChanged(service);if (Loggers.EVT_LOG.isDebugEnabled()) {StringBuilder stringBuilder = new StringBuilder();List<Instance> allIps = service.allIPs();for (Instance instance : allIps) {stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");}Loggers.EVT_LOG.debug("[HEALTH-STATUS-UPDATED] namespace: {}, service: {}, ips: {}", service.getNamespaceId(),service.getName(), stringBuilder.toString());}}}

在上面代码中,注意是先解析我们的 msg.getData()参数,然后获取注册表中全部的 Instance 实例列,进行遍历,在健康状态有变动的情况下,会直接更改它的 healthy 属性。在方法的最后,如果有更新 healthy属性的情况下,最终也会发布服务改变事件来通知客户端进行更新。

六、本章总结

在本章节我们首先知道了,在Nacos集群下,是只有一个集群节点去执行心跳健康检查定时任务的,然后把结果同步给其他集群的节点。那么是怎么同步给其他集群节点的呢 ?

在Nacos 服务端是有一个定时任务,来和其他集群节点进行数据同步的。通过源码分析,我们知道最终也是通过 HTTP 的方式进行同步的,采用了 异步任务 + 阻塞队列的方式 的设计架构。这样的好处就是快,先把任务都给接受放入到阻塞队列当中,就立马返回。然后后台会开启一条线程不断从阻塞队列当中获取任务进行处理。

把本章流程图补充完整:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/43347.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

无人直播系统源码开发:功能~优势~开发方法

自动直播通常是指通过自动化技术来实现实时内容分发的过程&#xff0c;它结合了流媒体技术和人工智能&#xff08;如机器学习&#xff09;。以下是自动直播实现的基本步骤&#xff1a; 内容采集&#xff1a;通过摄像头、手机等设备捕捉实时画面&#xff0c;并通过编码将其转换成…

rocketmq主从切换测试

服务器 192.168.1.23 nameserver、broker nameserver、brokerA&#xff0c;brokerB 192.168.1.35 nameserver、broker nameserver、brokerA&#xff0c;brokerB 192.168.1.88 nameserver nameserver 主从切换 关闭master&#xff1a;等待几秒钟23成为新的master slave消费测…

超市收银系统源码

今天给大家分享一套线上线下打通的收银系统&#xff0c;安卓/win双端线下收银台&#xff0c;可DIY、多模板的三端线上小程序商城&#xff0c;除此之外ERP进销存管理、商品管理、会员营销都很完善。 重点是系统支持OEM贴牌独立部署和全开源源码&#xff0c;非常适合一些正在寻找…

南航秋招指南,线上测评和线下考试

南航秋招简介 南航作为国内一流的航空公司&#xff0c;对人才的需求量非常旺盛&#xff0c;每年也有很多专业对口的工作提供给应届毕业生&#xff0c;对于应届毕业生而言&#xff0c;一定要抓住任何一个应聘机会&#xff0c;并且在规定的范围内进行简历的提交&#xff0c;以便…

CSS content 计数器

CSS content 计数器 CSS 计数器通过一个变量来设置&#xff0c;根据规则递增变量。 使用计数器自动编号 CSS 计数器根据规则来递增变量。 CSS 计数器使用到以下几个属性&#xff1a; counter-reset - 创建或者重置计数器&#xff0c;给计算器命名。注意声明计算器不能在自身…

孕产妇(产科)管理信息系统源码 三甲医院产科电子病历系统成品源代码

孕产妇&#xff08;产科&#xff09;管理信息系统源码 三甲医院产科电子病历系统成品源代码 医院智慧孕产是一种通过信息化手段,实现孕产期宣教、健康服务的院外延伸,对孕产妇健康管理具有重要意义,是医院智慧服务水平和能力的体现。实行涵盖婚前检查、孕期保健、产后康复的一…

如何把harmonos项目修改为openharmony项目

一开始分不清harmonyos和openharmony&#xff0c;在harmonyos直接下载的开发软件&#xff0c;后面发现不对劲&#xff0c;打脑阔 首先你要安装对应版本的开发软件&#xff0c;鸿蒙开发是由harmonyos和openharmony官网两个的&#xff0c;找到对应的地方下载对应版本的开发软件&…

C#-反射

一、概念 反射&#xff08;Reflection&#xff09;在C#中是一种非常重要的特性&#xff0c;它为开发者提供了在运行时获取和操作关于类型、成员、属性、方法等的详细信息的能力。通过反射&#xff0c;开发者可以在程序运行期间动态地创建对象、调用方法、设置属性值以及进行其…

【Java开发实训】day01

目录 1.Java开发步骤 2.目录的三个表达方法 3.Java的三种注释方法 4.文档注释的作用 &#x1f308;嗨&#xff01;我是Filotimo__&#x1f308;。很高兴与大家相识&#xff0c;希望我的博客能对你有所帮助。 &#x1f4a1;本文由Filotimo__✍️原创&#xff0c;首发于CSDN&…

运维锅总详解数据一致性

本文首先对数据一致性进行简要说明&#xff0c;然后画图分析展示9种数据一致性协议的工作流程&#xff0c;最后给出实现这9种协议的例子。希望对您理解数据一致性有所帮助&#xff01; 一、数据一致性简介 数据一致性是数据库和分布式系统中的一个关键概念&#xff0c;它确保…

【Mac】Folder Icons for mac(文件夹个性化图标修改软件)软件介绍

软件介绍 Folder Icons for Mac 是一款专为 macOS 设计的应用程序&#xff0c;主要用于个性化和定制你的文件夹图标。以下是它的主要特点和使用方法&#xff1a; 主要特点&#xff1a; 个性化文件夹图标 Folder Icons for Mac 允许用户为 macOS 上的任何文件夹定制图标。你…

怎样优化 PostgreSQL 中对布尔类型数据的查询?

文章目录 一、索引的合理使用1. 常规 B-tree 索引2. 部分索引 二、查询编写技巧1. 避免不必要的类型转换2. 逻辑表达式的优化 三、表结构设计1. 避免过度细分的布尔列2. 规范化与反规范化 四、数据分布与分区1. 数据分布的考虑2. 表分区 五、数据库参数调整1. 相关配置参数2. 定…

融云入驻首个数字生态出海基地,加速构建数字经济出海创新生态

7 月 3 日&#xff0c;“2024 全球数字经济大会”重要专题论坛“2024 数字生态出海发展论坛”在北京国家会议中心举行。 论坛由全球数字经济大会组委会主办&#xff0c;北京市经济和信息化局、北京市政务服务和数据管理局、大兴区人民政府共同承办。来自阿联酋、日本、古巴、…

Chain-of-Verification Reduces Hallucination in Lagrge Language Models阅读笔记

来来来&#xff0c;继续读文章了&#xff0c;今天这个是meta的研究员们做的一个关于如何减少LLM得出幻觉信息的工作&#xff0c;23年底发表。文章链接&#xff1a;https://arxiv.org/abs/2309.11495 首先&#xff0c;这个工作所面向的LLM的问答任务&#xff0c;是list-based q…

动态粒子发射特效404网站HTML源码

源码介绍 动态粒子发射404网站HTML源码&#xff0c;粒子内容可以进行修改&#xff0c;默认是4&#xff0c;0数字还有一个页面不存在英文&#xff0c;可以自行修改&#xff0c;喜欢的朋友可以拿去使用&#xff0c;源码是html&#xff0c;记事本打开修改即可&#xff0c;鼠标双击…

线程池的合理使用

线程池的合理使用 一、简介二、为什么要使用线程池三、核心参数四、如何合理配置线程参数1.1 corePoolSize && maximumPoolSize1.2 Handler 拒绝策略1.2.1AbortPolicy&#xff1a;优势&#xff1a;劣势&#xff1a; 1.2.2 DiscardPolicy&#xff1a;优势&#xff1a;劣…

海外媒体发稿-全媒体百科

全球知名媒体机构 在全球范围内&#xff0c;有许多知名的新闻机构负责报道世界各地的新闻事件。以下是一些国外常见的媒体机构&#xff1a; AP&#xff08;美联社&#xff09;合众国际社&#xff08;UPI&#xff09;AFP(法新社)EFE&#xff08;埃菲通讯社&#xff09;Europa …

Nginx理论篇与相关网络协议

Nginx是什么&#xff1f; Nginx是一款由C语言编写的高性能、轻量级的web服务器&#xff0c;一个线程能处理多个请求&#xff0c;支持万级并发。 优势&#xff1a;I/O多路复用。 I/O是什么&#xff1f; I指的是输入&#xff08;Input&#xff09;,O是指输出&#xff08;Outp…

【安全设备】日志审计

一、什么是日志审计 日志审计是一站式的日志数据管理平台&#xff0c;主要致力于提供事前预警、事后审计的安全能力&#xff0c; 通过对日志数据的全面采集、解析和深度的关联分析&#xff0c;及时发现各种安全威胁和异常行为事件。日志审计是指通过集中采集信息系统中的各类信…