死磕Nacos系列:Nacos是如何更新服务信息的?

前言

说到服务信息,我们还是得回到NamingService,因为这是和NacosServer进行服务注册的核心组件,内部提供了注册、获取Nacos实例的能力。至于其他组件,如Ribbon,在调用时需要所有实例信息来进行负载,那肯定就是通过NamingService的能力来获取到所有的实例。

NamingService

NamingService中获取实例主要有两类方法,一类是getAllInstances、另一类是selectInstances,它们最主要的区别就是selectInstances增加了对实例是否健康的过滤的支持。

既然如此,那我们直接来就看看selectInstances的逻辑:

@Override
public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy,boolean subscribe) throws NacosException {ServiceInfo serviceInfo;if (subscribe) {serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName),StringUtils.join(clusters, ","));} else {serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName),StringUtils.join(clusters, ","));}return selectInstances(serviceInfo, healthy);
}

上述代码有两个逻辑:

1、传入的subscribe,如果是true,就代表是订阅模式,就走hostReactor.getServiceInfo查询,反之走hostReactor.getServiceInfoDirectlyFromServer查询。
2、根据实例的健康状态进行过滤返回。

那我们就先走进HostReactor,对它提供的两个能力进行分析:

HostReactor

这个类是在实例化NamingService时在构造函数中实例化的一个对象。

而在HostReactor实例化时,其构造函数会创建一个定时线程池,核心线程数量可以通过spring.cloud.nacos.discovery.namingPollingThreadCount进行控制,默认是当前机器核心数的一半,最少为1。

也会实例化一个FailoverReactor,是一个容灾备份反应器,其内部实例化了一个单线程的定时线程池,内部由两个延迟定时任务组成:

  • SwitchRefresher

    每隔5秒去检查文件系统中是否有cacheDir + "/failover00-00---000-VIPSRV_FAILOVER_SWITCH-000---00-00"文件,如果有,那么就标记为容灾模式。

    cacheDir的取值如下:

    private void initCacheDir() {cacheDir = System.getProperty("com.alibaba.nacos.naming.cache.dir");if (StringUtils.isEmpty(cacheDir)) {cacheDir = System.getProperty("user.home") + "/nacos/naming/" + namespace;}
    }
    
  • FailoverFileReader

    如果是容灾模式,就从文件中读取Service信息。

  • DiskFileWriter

    初始延迟30分钟,后每隔1天serviceInfoMap(服务信息)写入到cacheDir + "/failover文件夹下。

整个FailoverReactor的示意图如下:

image-20231126131554348

在了解了HostReactor的基本情况后,我们来对上面调用的两个方法进行分析。

getServiceInfo

public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());String key = ServiceInfo.getKey(serviceName, clusters);// 如果是容灾模式,就从failoverReactor中获取文件系统中缓存的ServiceInfo信息if (failoverReactor.isFailoverSwitch()) {return failoverReactor.getService(key);}// 从HostReactor维护的serviceInfoMap中取ServiceInfoServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);// 如果HostReactor中没有if (null == serviceObj) {serviceObj = new ServiceInfo(serviceName, clusters);// 存入serviceInfoMapserviceInfoMap.put(serviceObj.getKey(), serviceObj);updatingMap.put(serviceName, new Object());// 查询服务端的这个Service的信息,如果有,则使用和接收到服务端Service推送一致的更新处理updateServiceNow(serviceName, clusters);updatingMap.remove(serviceName);} else if (updatingMap.containsKey(serviceName)) {// 如果原来的serviceInfoMap有数据,但updatingMap又存在,说明可能存在了并发问题,则需要锁住serviceObj一段时间,等待执行完成if (UPDATE_HOLD_INTERVAL > 0) {// hold a moment waiting for update finishsynchronized (serviceObj) {try {serviceObj.wait(UPDATE_HOLD_INTERVAL);} catch (InterruptedException e) {NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);}}}}// 用一个定时线程池去执行Servcie的主动更新scheduleUpdateIfAbsent(serviceName, clusters);// 从本地serviceInfoMap中获取ServiceInforeturn serviceInfoMap.get(serviceObj.getKey());
}

在现在就去更新服务信息的方法updateServiceNow中,最终会调用的processServiceJson方法,方法太长,直接说逻辑:

public ServiceInfo processServiceJson(String json) {....   
}

1、如果是以前就存在于本地serviceInfoMap中的数据,就分别计算出其中新增,修改,删除的实例,如果是修改的实例,需要更新心跳信息。

2、更新本地serviceInfoMap

3、使用EventDispatcher发布通知事件。

4、ServiceInfo写入本地磁盘。

在定时更新服务信息的方法scheduleUpdateIfAbsent中:

public void scheduleUpdateIfAbsent(String serviceName, String clusters) {if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {return;}synchronized (futureMap) {if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {return;}ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);}
}

如果futureMap中不存在这个Service的任务,才会使用addTask进行添加。addTask中就是将UpdateTask延迟1s执行一次。具体后续调用是在UpdateTask中实现的。

@Override
public void run() {long delayTime = DEFAULT_DELAY;try {ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));if (serviceObj == null) {updateService(serviceName, clusters);return;}if (serviceObj.getLastRefTime() <= lastRefTime) {updateService(serviceName, clusters);serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));} else {// if serviceName already updated by push, we should not override it// since the push data may be different from pull through force pushrefreshOnly(serviceName, clusters);}lastRefTime = serviceObj.getLastRefTime();if (!eventDispatcher.isSubscribed(serviceName, clusters) && !futureMap.containsKey(ServiceInfo.getKey(serviceName, clusters))) {// abort the update taskNAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters);return;}if (CollectionUtils.isEmpty(serviceObj.getHosts())) {incFailCount();return;}delayTime = serviceObj.getCacheMillis();resetFailCount();} catch (Throwable e) {incFailCount();NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);} finally {executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);}
}

上述代码简化一下,就是以下几个点:

1、如果本地serviceInfoMap中没有这个Service,就去服务端查询并更新。

2、如果通过这样的拉模式下最后修改的时间是大于这个Service本身的修改时间的,才进行更新。

3、下一次执行的时间是根据失败次数来定的,比如第一次失败,那就是delayTime左移一位,失败几次就左移几次,最多左移6次,且最大延迟60s执行。

getServiceInfoDirectlyFromServer

public ServiceInfo getServiceInfoDirectlyFromServer(final String serviceName, final String clusters)throws NacosException {String result = serverProxy.queryList(serviceName, clusters, 0, false);if (StringUtils.isNotEmpty(result)) {return JacksonUtils.toObj(result, ServiceInfo.class);}return null;
}

这个方法就是直接请求服务端拿到ServiceInfo的数据。

思考

看到这里,我们会不会有个疑问?那就是HostReactor是通过定时执行去更新服务信息的,那如果在时间间隔内有其他Servcie信息的更新呢?那我们岂不是得等到下一次任务执行时才能得到更新后的信息?

Nacos是考虑到了的,通过用定时任务通过HTTP去拉数据,和接收服务端通过UDP推送的数据,一拉一推来保证数据的实时性。

HostReactor中的PushReceiver就是客户端侧对服务端侧推数据的处理器。

PushReceiver

HostReactor在实例化时,其构造方法中也会实例化一个PushReceiver,其内部是一个单线程的定时线程池,死循环,用来接收来自服务端的信息,以及向服务端发送ACK确认信息。

image-20231126122032248

核心代码如下:

@Override
public void run() {while (!closed) {try {// byte[] is initialized with 0 full filled by defaultbyte[] buffer = new byte[UDP_MSS];DatagramPacket packet = new DatagramPacket(buffer, buffer.length);udpSocket.receive(packet);String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim();NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class);String ack;if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {hostReactor.processServiceJson(pushPacket.data);// send ack to serverack = "{\"type\": \"push-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":"+ "\"\"}";} else if ("dump".equals(pushPacket.type)) {// dump data to serverack = "{\"type\": \"dump-ack\"" + ", \"lastRefTime\": \"" + pushPacket.lastRefTime + "\", \"data\":"+ "\"" + StringUtils.escapeJavaScript(JacksonUtils.toJson(hostReactor.getServiceInfoMap()))+ "\"}";} else {// do nothing send ack onlyack = "{\"type\": \"unknown-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime+ "\", \"data\":" + "\"\"}";}udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length,packet.getSocketAddress()));} catch (Exception e) {NAMING_LOGGER.error("[NA] error while receiving push data", e);}}
}

上述代码主要有以下几个逻辑:

1、死循环接收udp的数据。

2、如果是"dom"或者"service"类型的消息,会交由HostReactor进行处理。

3、向服务端发送ack确认信息。

总结

Nacos是通过定时任务使用HTTP拉数据,和接收服务端通过UDP推送的数据来实现更新服务信息的目的。

今天的内容中还涉及到了Nacos的容灾处理,可以通过在磁盘中配置达到开启本地容灾的模式。在获取实例时,就会去本地磁盘中的备份文件中去找服务实例的数据。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/173652.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【古诗生成AI实战】之四——模型包装器与模型的训练

在上一篇博客中&#xff0c;我们已经利用任务加载器task成功地从数据集文件中加载了文本数据&#xff0c;并通过预处理器processor构建了词典和编码器。在这一过程中&#xff0c;我们还完成了词向量的提取。 接下来的步骤涉及到定义模型、加载数据&#xff0c;并开始训练过程。…

如何快速检测硬盘健康程度?

当我们使用Windows11/10/8/7计算机时&#xff0c;可能会遇到各种各样的问题&#xff0c;比如蓝屏报错、系统崩溃或其他运行不正常的状况。很多时候都是因为硬盘错误或故障导致的。那么&#xff0c;我们该如何快速检测硬盘健康程度呢&#xff1f; 在驱动器属性中执行硬盘查错 硬…

【Cisco Packet Tracer】电子邮箱仿真搭建

本文使用Cisco Packet Tracer&#xff0c;搭建电子邮箱仿真系统&#xff0c;使得zhangsancisco.com可以和lisicisco.com可以互相发送邮件。 电子邮箱账号&#xff08;为了简单起见&#xff0c;账号密码设置一致&#xff09;&#xff1a;zhangsan/lisi 域名&#xff1a;cisco.…

office tool plus工具破解word、visio等软件步骤

第一步&#xff1a;下载工具 破解需要用到office tool plus软件 office tool plus软件下载地址&#xff1a;Office Tool Plus 官方网站 - 一键部署 Office 选择其中一个下载到本地&#xff08;本人选择的是第一个的云图小镇下载方式&#xff09; 第二步&#xff1a;启动工具 …

回归预测 | MATLAB实现SMA+WOA+BOA-LSSVM基于黏菌算法+鲸鱼算法+蝴蝶算法优化LSSVM回归预测

回归预测 | MATLAB实现SMAWOABOA-LSSVM基于黏菌算法鲸鱼算法蝴蝶算法优化LSSVM回归预测 目录 回归预测 | MATLAB实现SMAWOABOA-LSSVM基于黏菌算法鲸鱼算法蝴蝶算法优化LSSVM回归预测效果一览基本介绍程序设计参考资料 效果一览 基本介绍 MATLAB实现SMAWOABOA-LSSVM基于黏菌算法…

【腾讯云云上实验室】用向量数据库——实现高效文本检索功能

文章目录 前言Tencent Cloud VectorDB 简介Tencent Cloud VectorDB 使用实战申请腾讯云向量数据库腾讯云向量数据库使用步骤腾讯云向量数据库实现文本检索 结论和建议 前言 想必各位开发者一定使用过关系型数据库MySQL去存储我们的项目的数据&#xff0c;也有部分人使用过非关…

CloudCompare 源码编译

一、下载源码 二、cmake 编译 这里面有四个比较重要的地方 1、源码的位置 2、生成的位置 3、项目的位置 4、qt 的位置 三、编译 开始测试&#xff0c;先用那个项目做测试 没有问题 然后用build的那个打开 加入Qt 的相关库到qcc中 启动项目生成cloudcompare 启动 ok ,完成…

本地Nginx服务搭建结合内网穿透实现多个Windows Web站点公网访问

文章目录 1. 下载windows版Nginx2. 配置Nginx3. 测试局域网访问4. cpolar内网穿透5. 测试公网访问6. 配置固定二级子域名7. 测试访问公网固定二级子域名 1. 下载windows版Nginx 进入官方网站(http://nginx.org/en/download.html)下载windows版的nginx 下载好后解压进入nginx目…

动态规划学习——等差子序列问题

目录 一&#xff0c;最长等差子序列 1.题目 2.题目接口 3.解题思路及其代码 二&#xff0c;等差序列的划分——子序列 1.题目 2.题目接口 3.解题思路及其代码 一&#xff0c;最长等差子序列 1.题目 给你一个整数数组 nums&#xff0c;返回 nums 中最长等差子序列的长度…

NX二次开发UF_CURVE_create_arc_center_radius 函数介绍

文章作者&#xff1a;里海 来源网站&#xff1a;https://blog.csdn.net/WangPaiFeiXingYuan UF_CURVE_create_arc_center_radius Defined in: uf_curve.h int UF_CURVE_create_arc_center_radius(tag_t center, double radius, tag_t help_point, UF_CURVE_limit_p_t limit_p…

SparkDesk知识库 + ChuanhuChatGPT前端 = 实现轻量化知识库问答

上一篇 讯飞星火知识库文档问答Web API的使用&#xff08;二&#xff09; 把星火知识库搞明白了&#xff1b; 然后又花了时间学习了一下gradio的一些基础内容: 在Gradio实现两个下拉框进行联动案例解读&#xff1a;change/click/input实践&#xff08;三&#xff09; 在Gradio实…

数据结构与算法编程题27

计算二叉树深度 #define _CRT_SECURE_NO_WARNINGS#include <iostream> using namespace std;typedef char ElemType; #define ERROR 0 #define OK 1 #define Maxsize 100 #define STR_SIZE 1024typedef struct BiTNode {ElemType data;BiTNode* lchild, * rchild; }BiTNo…

2023中学生古诗文阅读专辑(初中适用)使用和备考的几点建议

上周六的2023年第八届小学生古诗文大会复选结束后&#xff0c;很多孩子和家长大呼“太难了”&#xff0c;平时刷的题好像都没用&#xff0c;蓦然回首&#xff0c;发现很多题目都在主办方出版的《古诗文阅读专辑》上&#xff0c;只是考得非常的细。 所以&#xff0c;昨天有家长在…

计算机毕业设计|基于SpringBoot+MyBatis框架的电脑商城的设计与实现(系统概述与环境搭建)

计算机毕业设计|基于SpringBootMyBatis框架的电脑商城的设计与实现&#xff08;系统概述与环境搭建&#xff09; 该项目分析着重于设计和实现基于SpringBootMyBatis框架的电脑商城。首先&#xff0c;通过深入分析项目所需数据&#xff0c;包括用户、商品、商品类别、收藏、订单…

基于C#实现十字链表

上一篇我们看了矩阵的顺序存储&#xff0c;这篇我们再看看一种链式存储方法“十字链表”&#xff0c;当然目的都是一样&#xff0c;压缩空间。 一、概念 既然要用链表节点来模拟矩阵中的非零元素&#xff0c;肯定需要如下 5 个元素(row,col,val,down,right)&#xff0c;其中&…

初识数据结构

归纳编程学习的感悟&#xff0c; 记录奋斗路上的点滴&#xff0c; 希望能帮到一样刻苦的你&#xff01; 如有不足欢迎指正&#xff01; 共同学习交流&#xff01; &#x1f30e;欢迎各位→点赞 &#x1f44d; 收藏⭐ 留言​&#x1f4dd; 熬过了我们不想要的生活&#xf…

麒麟V10服务器搭建FTP服务

概念 1.1介绍 FTP&#xff1a;File transfer protocol 文件传输协议 1.2原理 默认采用被动模式 被动模式FTP 为了解决服务器发起到客户的连接的问题&#xff0c;人们开发了一种不同的FTP连接方式。这就是所谓的被 动方式&#xff0c;或者叫做PASV&#xff0c;当客户端通…

Vue路由器(详细教程)

路由&#xff1a; 1.理解&#xff1a;一个路由(route)就是一组映射关系&#xff08;key-value)&#xff0c;多个路由需要路由器&#xff08;router&#xff09;进行管理。 2.前端路由&#xff1a;key是路径&#xff0c;value是组件。 1、先安装vue-router路由 npm i vue-route…

车载通信架构 —— 传统车内通信网络LIN总线(低成本覆盖低速场景)

车载通信架构 —— 传统车内通信网络LIN总线(低成本覆盖低速场景) 我是穿拖鞋的汉子,魔都中坚持长期主义的汽车电子工程师。 老规矩,分享一段喜欢的文字,避免自己成为高知识低文化的工程师: 屏蔽力是信息过载时代一个人的特殊竞争力,任何消耗你的人和事,多看一眼都是…

redisserver一闪而过 redis闪退解决版本

1.进入Redis根目录 2.输入redis-server 或 redis-server.exe redis.windows.conf 启动redis命令&#xff0c;看是否成功。 执 一闪而过的问题 可能是因为已启动或者其他问题&#xff0c;需要重启 先输入redis-cli.exe再输入shutdown再输入redis-server.exe redis.windows.c…