SpringCloud Alibaba 深入源码 - Nacos 和 Eureka 的区别(健康检测、服务的拉取和订阅)

目录

一、Nacos 和 Eureka 的区别

1.1、以 Nacos 注册流程来解析区别


一、Nacos 和 Eureka 的区别


1.1、以 Nacos 注册流程来解析区别

a)首先,我们的服务启动时。都会把自己的信息提交给注册中心,然后注册中心就会把信息保存下来.

注册的信息实际上就是一个嵌套 Map,结构为 Map<String, Map<String, Service>>,第一层 key 就是 namespace_id,起到环境隔离的作用. value 由是一个嵌套 Map<String, Service>.

第二层的 key 表示 group 分组,key 就是分组名,value 就是分组下的某一个服务,实际上就是一个类,内部又维护了一个  Map<String,Cluster> .

第三层的 key 就是集群的名称,value 就是  Cluster ,也是一个类,包含了集群的具体信息.  

因为一个集群中可能包含多个实例,也就是具体的节点信息(例如实例的IP、Port、健康状态),那么 Cluster 这个类中又维护了 两个 Set<Instance>,分别是临时实例和非临时实例(此处,Eureka 就没有做区分,只有临时实例).

b)那么当服务消费者要去消费时,就可以从注册中心拉取服务信息.  这个过程也被称为“服务发现”.  但是他这个拉去动作不是每次都要做的(压力太大),而是将拉取到的服务信息缓存到一个列表中,这样接下来的一段时间里,就不用去拉去了,而是直接从缓存列表中拿. 

当然这个缓存一直不更新也不行,因此会每隔 30 秒取重新拉取一次(多长时间不用记,都是可以配置的),进行更新.

c)消费者拿到服务列表后,就可以通过 负载均衡(LoadBalancer)从列表中挑选一个发起远程调用就可以了. 

d)截至目前为止,Nacos 和 Eureka 还没什么太大的差别,那差别在哪呢?差别就在于服务提供者的健康检测机制.

e)在 nacos 中,将服务分成了临时实例和非临时实例:

  • 临时实例:当临时实例进行心跳检测的时候,如果心跳不跳了,nacos 就会把它从服务中直接剔除.  (这里的心跳检测机制和 Eureka 是一样的,但非临时实例就不一样了)
  • 非临时实例:nacos 就不会要求你给我发心跳了,而是通过 nacos 主动发请求询问,定时向实例发送请求:“你还活着吗?”,即使真的挂了,nacos 也仅仅只是把它标记为 不健康,不会剔除,而是等待它恢复健康.

而 Eureka 只提供了心跳模式的健康监测,而没有主动检测功能。

主动询问进行健康检测,效率岂不是很低?

对于非临时实例,所有的健康检测任务都不是立即执行的,都会被放入一个阻塞队列中,如下源码

@Override
public void process(HealthCheckTask task) {// 获取所有 非临时实例的 集合List<Instance> ips = task.getCluster().allIPs(false);if (CollectionUtils.isEmpty(ips)) {return;}for (Instance ip : ips) {// 封装健康检测信息到 BeatBeat beat = new Beat(ip, task);// 放入一个阻塞队列中taskQueue.add(beat);MetricsMonitor.getTcpHealthCheckMonitor().incrementAndGet();}
}

可以看出,检测任务不是立即执行,这里也采用了异步指定的策略,会把任务放到线程池中取执行,如下:

public void run() {while (true) {try {// 处理任务processTask();// ...} catch (Throwable e) {SRV_LOG.error("[HEALTH-CHECK] error while processing NIO task", e);}}
}

 通过 processTask 来处理健康检测的任务:

private void processTask() throws Exception {// 将任务封装为一个 TaskProcessor,并放入集合Collection<Callable<Void>> tasks = new LinkedList<>();do {Beat beat = taskQueue.poll(CONNECT_TIMEOUT_MS / 2, TimeUnit.MILLISECONDS);if (beat == null) {return;}tasks.add(new TaskProcessor(beat));} while (taskQueue.size() > 0 && tasks.size() < NIO_THREAD_COUNT * 64);// 批量处理集合中的任务for (Future<?> f : GlobalExecutor.invokeAllTcpSuperSenseTask(tasks)) {f.get();}
}

 任务被封装到了TaskProcessor中去执行了,TaskProcessor是一个Callable,其中的call方法:

@Override
public Void call() {// 获取检测任务已经等待的时长long waited = System.currentTimeMillis() - beat.getStartTime();if (waited > MAX_WAIT_TIME_MILLISECONDS) {Loggers.SRV_LOG.warn("beat task waited too long: " + waited + "ms");}SocketChannel channel = null;try {// 获取实例信息Instance instance = beat.getIp();// 通过NIO建立TCP连接channel = SocketChannel.open();channel.configureBlocking(false);// only by setting this can we make the socket close event asynchronouschannel.socket().setSoLinger(false, -1);channel.socket().setReuseAddress(true);channel.socket().setKeepAlive(true);channel.socket().setTcpNoDelay(true);Cluster cluster = beat.getTask().getCluster();int port = cluster.isUseIPPort4Check() ? instance.getPort() : cluster.getDefCkport();channel.connect(new InetSocketAddress(instance.getIp(), port));// 注册连接、读取事件SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);key.attach(beat);keyMap.put(beat.toString(), new BeatKey(key));beat.setStartTime(System.currentTimeMillis());GlobalExecutor.scheduleTcpSuperSenseTask(new TimeOutTask(key), CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS);} catch (Exception e) {beat.finishCheck(false, false, switchDomain.getTcpHealthParams().getMax(),"tcp:error:" + e.getMessage());if (channel != null) {try {channel.close();} catch (Exception ignore) {}}}return null;
}

这差别就像是亲生儿子和非亲生儿子,亲生儿子我还会去主动关怀一下,诶,你还活着么?而非临时实例,就是你不心跳了,就把你扔了~

f)还有一个差别在于服务消费者,Eureka 采用的是定时拉取(每 30 秒一次),那如果在 30 秒内服务提供者挂了,消费肯定是不知道的,因此 Eureka 这里更新的时效性也比较差.

我们的 微服务 定时拉取的基本逻辑就是先从本地缓存读:

  • 如果本地缓存没有,就通过 Nacos 客户端构造请求去 nacos 服务器中读取.
  • 如果本地缓存有,就开启定时更新功能(就是创建一个定时任务,每隔一段时间去拉取一次),并返回缓存结果.

核心源码如下:

public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());// 由 服务名@@集群名拼接 keyString key = ServiceInfo.getKey(serviceName, clusters);if (failoverReactor.isFailoverSwitch()) {return failoverReactor.getService(key);}// 读取本地服务列表的缓存,缓存是一个Map,格式:Map<String, ServiceInfo>ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);// 判断缓存是否存在if (null == serviceObj) {// 不存在,创建空ServiceInfoserviceObj = new ServiceInfo(serviceName, clusters);// 放入缓存serviceInfoMap.put(serviceObj.getKey(), serviceObj);// 放入待更新的服务列表(updatingMap)中updatingMap.put(serviceName, new Object());// 立即更新服务列表updateServiceNow(serviceName, clusters);// 从待更新列表中移除updatingMap.remove(serviceName);} else if (updatingMap.containsKey(serviceName)) {// 缓存中有,但是需要更新if (UPDATE_HOLD_INTERVAL > 0) {// hold a moment waiting for update finish 等待5秒中,待更新完成synchronized (serviceObj) {try {serviceObj.wait(UPDATE_HOLD_INTERVAL);} catch (InterruptedException e) {NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);}}}}// 开启定时更新服务列表的功能scheduleUpdateIfAbsent(serviceName, clusters);// 返回缓存中的服务信息return serviceInfoMap.get(serviceObj.getKey());
}

定时更新方法如下:

public void updateService(String serviceName, String clusters) throws NacosException {ServiceInfo oldService = getServiceInfo0(serviceName, clusters);try {// 基于ServerProxy发起远程调用,查询服务列表String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);if (StringUtils.isNotEmpty(result)) {// 处理查询结果processServiceJson(result);}} finally {if (oldService != null) {synchronized (oldService) {oldService.notifyAll();}}}
}public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)throws NacosException {// 准备请求参数final Map<String, String> params = new HashMap<String, String>(8);params.put(CommonParams.NAMESPACE_ID, namespaceId);params.put(CommonParams.SERVICE_NAME, serviceName);params.put("clusters", clusters);params.put("udpPort", String.valueOf(udpPort));params.put("clientIP", NetUtils.localIP());params.put("healthyOnly", String.valueOf(healthyOnly));// 发起请求,地址与API接口一致return reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET);
}

 而 nacos 这里的消费者不光进行服务的定时拉取,nacos 还会主动进行消息的订阅推送,一旦发现有服务挂了,就立刻推送一条消息给服务消费者,告诉你服务要更新了.

Nacos 具体是通过什么实现消息订阅推送机制呢?

a)首先 PushPeceiver 这个类(我们自己的微服务配置的 Nacos 客户端),会以 UDP 的方式与 Nacos 服务端建立连接,监听 Nacos 服务端推送的服务变更数据.

b)一旦 Nacos 服务列表发生变更,就会发送 UDP 广播给所有的微服务订阅者.

c)当订阅者接收到通知以后,就可以将接收到的服务信息缓存到本地缓存列表.

d)那么之后再拉取服务的时候,会优先从缓存里读取,缓存里有就直接返回缓存,如果没有,再去拉取或者订阅.

PushPeceiver 构造函数如下:

public PushReceiver(HostReactor hostReactor) {try {this.hostReactor = hostReactor;// 创建 UDP客户端String udpPort = getPushReceiverUdpPort();if (StringUtils.isEmpty(udpPort)) {this.udpSocket = new DatagramSocket();} else {this.udpSocket = new DatagramSocket(new InetSocketAddress(Integer.parseInt(udpPort)));}// 准备线程池this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setDaemon(true);thread.setName("com.alibaba.nacos.naming.push.receiver");return thread;}});// 开启线程任务,准备接收变更数据this.executorService.execute(this);} catch (Exception e) {NAMING_LOGGER.error("[NA] init udp socket failed", e);}
}

PushReceiver 构造函数中基于线程池来运行任务。这是因为 PushReceiver 本身也是一个Runnable,其中的run方法业务逻辑就是:

@Override
public void run() {while (!closed) {try {// byte[] is initialized with 0 full filled by defaultbyte[] buffer = new byte[UDP_MSS];DatagramPacket packet = new DatagramPacket(buffer, buffer.length);// 接收推送数据udpSocket.receive(packet);// 解析为json字符串String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim();NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());// 反序列化为对象PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class);String ack;if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {// 交给 HostReactor去处理hostReactor.processServiceJson(pushPacket.data);// send ack to server 发送ACK回执,略。。} catch (Exception e) {if (closed) {return;}NAMING_LOGGER.error("[NA] error while receiving push data", e);}}
}

通知数据的处理由交给了 HostReactor 的 processServiceJson 方法:

public ServiceInfo processServiceJson(String json) {// 解析出ServiceInfo信息ServiceInfo serviceInfo = JacksonUtils.toObj(json, ServiceInfo.class);String serviceKey = serviceInfo.getKey();if (serviceKey == null) {return null;}// 查询缓存中的 ServiceInfoServiceInfo oldService = serviceInfoMap.get(serviceKey);// 如果缓存存在,则需要校验哪些数据要更新boolean changed = false;if (oldService != null) {// 拉取的数据是否已经过期if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) {NAMING_LOGGER.warn("out of date data received, old-t: " + oldService.getLastRefTime() + ", new-t: "+ serviceInfo.getLastRefTime());}// 放入缓存serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);// 中间是缓存与新数据的对比,得到newHosts:新增的实例;remvHosts:待移除的实例;// modHosts:需要修改的实例if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) {// 发布实例变更的事件NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(),serviceInfo.getClusters(), serviceInfo.getHosts()));DiskCache.write(serviceInfo, cacheDir);}} else {// 本地缓存不存在changed = true;// 放入缓存serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);// 直接发布实例变更的事件NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(),serviceInfo.getClusters(), serviceInfo.getHosts()));serviceInfo.setJsonFromServer(json);DiskCache.write(serviceInfo, cacheDir);}// 。。。return serviceInfo;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/639116.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ELK日志分析

目录 一、ELK概述 &#xff08;一&#xff09;ELK的定义 &#xff08;二&#xff09;ELK工具 1.ElasticSearch 2.Kiabana 3.Logstash &#xff08;1&#xff09;定义 &#xff08;2&#xff09;插件 ① input ② filter ③ output &#xff08;三&#xff09;可以添…

快速排序(三)——hoare法

目录 ​一.前言 二.快速排序 hoare排法​ 三.结语 一.前言 本文给大家带来的是快速排序&#xff0c;快速排序是一种很强大的排序方法&#xff0c;相信大家在学习完后一定会有所收获。 码字不易&#xff0c;希望大家多多支持我呀&#xff01;&#xff08;三连&#xff0b;关…

Spring Boot3整合Druid(监控功能)

目录 1.前置条件 2.导依赖 错误依赖&#xff1a; 正确依赖&#xff1a; 3.配置 1.前置条件 已经初始化好一个spring boot项目且版本为3X&#xff0c;项目可正常启动。 作者版本为3.2.2最新版 2.导依赖 错误依赖&#xff1a; 这个依赖对于spring boot 3的支持不够&#…

H5嵌入小程序适配方案

时间过去了两个多月&#xff0c;2024已经到来&#xff0c;又老了一岁。头发也掉了好多。在这两个月时间里都忙着写页面&#xff0c;感觉时间过去得很快。没有以前那么轻松了。也不是遇到了什么难点技术&#xff0c;而是接手了一个很烂得项目。能有多烂&#xff0c;一个页面发起…

开源无代码应用程序生成器Saltcorn

什么是 Saltcorn &#xff1f; Saltcorn 是一个无需编写任何代码即可构建数据库 Web 应用程序的平台。它配备了一个吸睛的仪表板&#xff0c;丰富的生态系统、视图生成器以及支持主题的界面&#xff0c;使用直观的点击、拖放用户界面来构建整个应用程序。 软件的特点&#xff1…

智慧文旅运营综合平台:重塑文化旅游产业的新引擎

目录 一、建设意义 二、包含内容 三、功能架构 四、典型案例 五、智慧文旅全套解决方案 - 210份下载 在数字化浪潮席卷全球的今天&#xff0c;智慧文旅运营综合平台作为文化旅游产业与信息技术深度融合的产物&#xff0c;正逐渐显现出其强大的生命力和广阔的发展前景。 该…

海外抖音TikTok、正在内测 AI 生成歌曲功能,依靠大语言模型 Bloom 进行文本生成歌曲

近日&#xff0c;据外媒The Verge报道&#xff0c;TikTok正在测试一项新功能&#xff0c;利用大语言模型Bloom的AI能力&#xff0c;允许用户上传歌词文本&#xff0c;并使用AI为其添加声音。这一创新旨在为用户提供更多创作音乐的工具和选项。 Bloom 是由AI初创公司Hugging Fac…

C语言——内存函数介绍和模拟实现(memcpy、memmove、memset、memcmp)

之前我们讲过一些字符串函数&#xff08;http://t.csdnimg.cn/ZcvCo&#xff09;&#xff0c;今天我们来讲一讲几个内存函数&#xff0c;那么可能有人要问了&#xff0c;都有字符串函数了&#xff0c;怎么又来个内存函数&#xff0c;这不是一样的么&#xff1f; 我们要知道之前…

第十二站(20天):C++泛型编程

模板 C提供了模板(template)编程的概念。所谓模板&#xff0c;实际上是建立一个通用函数或类&#xff0c; 其 类内部的类型和函数的形参类型不具体指定 &#xff0c;用一个虚拟的类型来代表。这种通用的方式称 为模板。 模板是泛型编程的基础, 泛型编程即以一种独立于任何特定…

C++面试:跳表

目录 跳表介绍 跳表的特点&#xff1a; 跳表的应用场景&#xff1a; C 代码示例&#xff1a; 跳表的特性 跳表示例 总结 跳表&#xff08;Skip List&#xff09;是一种支持快速搜索、插入和删除的数据结构&#xff0c;具有相对简单的实现和较高的查询性能。下面是跳表…

职业规划,软件开发工程师的岗位任职资格

软件工程师是指从事软件开发的人&#xff0c;主要的工作涉及到项目培训和项目设计两个方面。在实际工作中&#xff0c;软件工程师是一个广义的概念&#xff0c;包括了很多与软件相关的人员。除开最基础的编程语言&#xff0c;还有数据库语言等等。从事这份工作&#xff0c;需要…

记录一下uniapp 集成腾讯im特别卡(已解决)

uniapp的项目运行在微信小程序 , 安卓 , ios手机三端 , 之前这个项目集成过im,不过版本太老了,0.x的版本, 现在需要添加客服功能,所以就升级了 由于是二开 , 也为了方便 , 沿用之前的webview嵌套腾讯IM的方案 , 选用uniapp集成ui ,升级之后所有安卓用户反馈点击进去特别卡,几…

HR人才测评,如何做技术研发人员基本素质测评?

技术研发人员的基本素质测评&#xff0c;可以从以下几个方面考虑&#xff1a; 1. 技术能力&#xff1a;首要的因素是技术能力&#xff0c;包括编程能力、算法能力、架构设计能力、代码调试和优化能力等。在测评中可以通过技术测试、编程练习、项目经验等方式来考察。 2. 学习…

Java - 深入理解加密解密和签名算法

文章目录 应用的接口安全性问题可能来源加密解密Why保护数据隐私防止未经授权的访问防止数据泄露 对称加密 VS 单向加密 VS 非对称加密一、对称加密二、单向加密&#xff08;哈希加密&#xff09;三、非对称加密 常用的对称加密算法1. AES&#xff08;高级加密标准&#xff09;…

Django从入门到精通(二)

目录 三、视图 3.1、文件or文件夹 3.2、相对和绝对导入urls 3.3、视图参数requests 3.4、返回值 3.5、响应头 3.6、FBV和CBV FBV 四、静态资源 4.1、静态文件 4.2、媒体文件 五、模板 5.1、寻找html模板 5.2、模板处理的本质 5.3、常见模板语法 5.4、内置模板函…

对称密码算法有什么优点

对称密码算法是一种加密和解密数据的方法&#xff0c;其中加密和解密使用相同的密钥。这种方法的一个关键特点是加密和解密的速度非常快&#xff0c;因此它在许多需要高速加密的应用中非常有用。 对称密码算法的优点主要在于其效率和安全性。由于加密和解密使用相同的密钥&…

自定义注解与拦截器实现不规范sql拦截(拦截器实现篇)

最近考虑myBatis中sql语句使用规范的问题&#xff0c;如果漏下条件或者写一些不规范语句会对程序性能造成很大影响。最好的方法就是利用代码进行限制&#xff0c;通过拦截器进行sql格式的判断在自测环节就能找到问题。写了个简单情景下的demo&#xff0c;并通过idea插件来将myB…

UE5 Windows打包时报错“SDK Not Found”解决方案

在Unreal Engine 5.0.3 Windows平台下打包时报错&#xff1a;“Windows的SDK未正常安装&#xff0c;而其是生成数据的必需项。请检查主工具栏中“启动“菜单SDK部分来更新SDK。” 解决方案&#xff1a; 1、打开 Visual Studio Installer&#xff0c;点击“修改”按钮&#xf…

EtherNet/IP开发:C++搭建基础模块,EtherNet/IP源代码

这里是CIP资料的协议层级图&#xff0c;讲解协议构造。 ODVA&#xff08;www.ODVA.org&#xff09;成立于1995年&#xff0c;是一个全球性协会&#xff0c;其成员包括世界领先的自动化公司。结合其成员的支持&#xff0c;ODVA的使命是在工业自动化中推进开放、可互操作的信息和…

到店商详架构变迁

一、项目背景 到店商详是平台为京东到店业务提供的专属商详页面&#xff0c;将传统电商购物路径打造成以LBS门店属性的本地生活服务交易链路。 二、架构变迁 1、 主站商详扩展点 **优点&#xff1a;**到店侧仅关注业务&#xff0c;无需过度关注服务部署、性能优化等。 **缺…