nacos上的注册过的服务实例掉线分析

最近生产上的xxl_job框架的一个执行器(nacos客户端)因为分配内存不大,导致频繁与nacos服务端的连接断开,而断开之后虽然客户端服务没有宕掉,但是就是无法重新注册到nacos的服务端上去。

问题定位:

查看服务器日志;发现日志中打印好多内存溢出的情况,回顾上个迭代同事新增了一个功能,就是定时同步底层数据。后面定位问题的时候发现这个定时任务要执行12个小时才会结束,跟踪代码的时候发现每次遍历的时候要初始化大量数据到jvm内存中,这就导致了内存资源紧张,后台日志不断报内存溢出和GC回收异常的问题。

解决方案:

(1)针对有问题的定时业务逻辑重新进行编码优化实现。
(2)扩大jvm分配给程序的启动内存。-Xms2g -Xmx2g统一改成 -Xms6g -Xmx6g.
通过这2个方案整改后,1周内生产再也没有出过类似的问题。

问题解决了,但是我想深究一下为什么nacos掉线后,就注册不上去了呢

然后我想出现这个问题的原因是自己的服务在jvm的分配的内存使用完毕后,在后台运行的向nacos服务端保持心跳的线程阻塞或者被杀死了,导致后面nacos服务器接收不到来自客户端的心跳,从而我的服务后面没有再次被nacos服务端发现。

基于以上情况,我试着从nacos客户端注册与心跳检测方面跟一下源码。

首先最重要的是要找到保持服务于nacos服务端保持心跳的源码,看看这个后台运行的保持心跳的线程的运行机制。

我猜想这个保持心跳的线程应该会在服务首次启动注册的时候激活。

所以先找到注册的接口,去到nacos官网,找到api接口界面的注册接口:https://nacos.io/zh-cn/docs/open-api.html

找到注册接口请求路径为/nacos/v1/ns/instance

然后去gitlab下载nacos的源码,我直接下载的最新的,然后根据/nacos/v1/ns/instance 找到如下方法:
可以看到服务注册成功后,还执行了个方法addBeatInfo

@Overridepublic void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {if (instance.isEphemeral()) {BeatInfo beatInfo = new BeatInfo();beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));beatInfo.setIp(instance.getIp());beatInfo.setPort(instance.getPort());beatInfo.setCluster(instance.getClusterName());beatInfo.setWeight(instance.getWeight());beatInfo.setMetadata(instance.getMetadata());beatInfo.setScheduled(false);beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);}

看一下这个方法是做什么的,可以看到BeatReactor是个线程池类

public BeatReactor(NamingProxy serverProxy, int threadCount) {this.serverProxy = serverProxy;executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setDaemon(true);thread.setName("com.alibaba.nacos.naming.beat.sender");return thread;}});}public void **addBeatInfo**(String serviceName, BeatInfo beatInfo) {NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());BeatInfo existBeat = null;//fix #1733if ((existBeat = dom2Beat.remove(key)) != null) {existBeat.setStopped(true);}dom2Beat.put(key, beatInfo);//这个方法就是启动的后台保持心跳的线程executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());}

看看
schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS)
这个方法的定义

/*** Creates and executes a one-shot action that becomes enabled* after the given delay.** @param command the task to execute* @param delay the time from now to delay execution* @param unit the time unit of the delay parameter* @return a ScheduledFuture representing pending completion of*         the task and whose {@code get()} method will return*         {@code null} upon completion* @throws RejectedExecutionException if the task cannot be*         scheduled for execution* @throws NullPointerException if command is null*/public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);

然后重点部分就是看BeatTask,BeatTask继承Runnable,run方法就是我们的重点,该方法调用了NamingProxy的sendBeat方法,服务端请求地址为/instance/beat的方法

class BeatTask implements Runnable {BeatInfo beatInfo;public BeatTask(BeatInfo beatInfo) {this.beatInfo = beatInfo;}@Overridepublic void run() {if (beatInfo.isStopped()) {return;}long nextTime = beatInfo.getPeriod();try {JSONObject result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);long interval = result.getIntValue("clientBeatInterval");boolean lightBeatEnabled = false;if (result.containsKey(CommonParams.LIGHT_BEAT_ENABLED)) {lightBeatEnabled = result.getBooleanValue(CommonParams.LIGHT_BEAT_ENABLED);}BeatReactor.this.lightBeatEnabled = lightBeatEnabled;if (interval > 0) {nextTime = interval;}int code = NamingResponseCode.OK;if (result.containsKey(CommonParams.CODE)) {code = result.getIntValue(CommonParams.CODE);}if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {Instance instance = new Instance();instance.setPort(beatInfo.getPort());instance.setIp(beatInfo.getIp());instance.setWeight(beatInfo.getWeight());instance.setMetadata(beatInfo.getMetadata());instance.setClusterName(beatInfo.getCluster());instance.setServiceName(beatInfo.getServiceName());instance.setInstanceId(instance.getInstanceId());instance.setEphemeral(true);try {serverProxy.registerService(beatInfo.getServiceName(),NamingUtils.getGroupName(beatInfo.getServiceName()), instance);} catch (Exception ignore) {}}} catch (NacosException ne) {NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",JSON.toJSONString(beatInfo), ne.getErrCode(), ne.getErrMsg());}executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);}}

找一下这个线程是每隔几秒运行一次的,是5秒

接下来我们把目光放到服务端,找到InstanceController的beat方法,如果是参数beat信息的话,说明是第一次发起心跳,则会带有服务实例信息,因为发起心跳成功则服务端会返回下次不要带beat信息的参数,这样客户端第二次就不会携带beat信息了。如果发现没有该服务,又没带beat信息,说明这个服务可能被移除过了,直接返回没找到。如果没有服务,但是发现有beat信息,那就从beat中获取服务实例信息,进行注册.

@CanDistro
@PutMapping("/beat")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public ObjectNode beat(HttpServletRequest request) throws Exception {ObjectNode result = JacksonUtils.createEmptyJsonNode();//设置心跳间隔result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);RsInfo clientBeat = null;//判断有无心跳内容//如果存在心跳内容则不是轻量级心跳就转化为RsInfoif (StringUtils.isNotBlank(beat)) {clientBeat = JacksonUtils.toObj(beat, RsInfo.class);}String clusterName = WebUtils.optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME);String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));if (clientBeat != null) {if (StringUtils.isNotBlank(clientBeat.getCluster())) {clusterName = clientBeat.getCluster();} else {// fix #2533clientBeat.setCluster(clusterName);}ip = clientBeat.getIp();port = clientBeat.getPort();}String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);NamingUtils.checkServiceNameFormat(serviceName);Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);//获取实例的信息Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);//如果实例不存在if (instance == null) {if (clientBeat == null) {result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);return result;}Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, "+ "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName);//根据您心跳内容创建一个实例信息instance = new Instance();instance.setPort(clientBeat.getPort());instance.setIp(clientBeat.getIp());instance.setWeight(clientBeat.getWeight());instance.setMetadata(clientBeat.getMetadata());instance.setClusterName(clusterName);instance.setServiceName(serviceName);instance.setInstanceId(instance.getInstanceId());instance.setEphemeral(clientBeat.isEphemeral());//注册实例serviceManager.registerInstance(namespaceId, serviceName, instance);}//获取服务的信息Service service = serviceManager.getService(namespaceId, serviceName);if (service == null) {throw new NacosException(NacosException.SERVER_ERROR,"service not found: " + serviceName + "@" + namespaceId);}//不存在的话,要创建一个进行处理if (clientBeat == null) {clientBeat = new RsInfo();clientBeat.setIp(ip);clientBeat.setPort(port);clientBeat.setCluster(clusterName);}//开启心跳检查任务service.processClientBeat(clientBeat);result.put(CommonParams.CODE, NamingResponseCode.OK);//5秒间隔if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) {result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval());}//告诉客户端不需要带上心跳信息了,变成轻量级心跳了result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());return result;
}

接下来我们看一下nacos服务端 开启心跳检查任务processClientBeat方法,该方法将ClientBeatProcessor放入到线程池中,接下来我们看下重点看下run方法

public void processClientBeat(final RsInfo rsInfo) {ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();clientBeatProcessor.setService(this);clientBeatProcessor.setRsInfo(rsInfo);//放入线程池中执行 HealthCheckReactor.scheduleNow(clientBeatProcessor);}

更新每次客户端发送的最新的心跳时间,这个主要是是放到内存map中

@Overridepublic void run() {Service service = this.service;if (Loggers.EVT_LOG.isDebugEnabled()) {Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());}String ip = rsInfo.getIp();String clusterName = rsInfo.getCluster();int port = rsInfo.getPort();Cluster cluster = service.getClusterMap().get(clusterName);List<Instance> instances = cluster.allIPs(true);for (Instance instance : instances) {if (instance.getIp().equals(ip) && instance.getPort() == port) {if (Loggers.EVT_LOG.isDebugEnabled()) {Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());}//更新每次客户端发送的最新的心跳时间,这个主要是是放到内存map中         instance.setLastBeat(System.currentTimeMillis());if (!instance.isMarked()) {if (!instance.isHealthy()) {instance.setHealthy(true);Loggers.EVT_LOG.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",cluster.getService().getName(), ip, port, cluster.getName(), UtilsAndCommons.LOCALHOST_SITE);getPushService().serviceChanged(service);}}}}}

那么,为什么nacos服务端为什么要设置注册到nacos上的服务最新的更新时间呢?

这涉及到nacos的健康检查机制

Nacos Server会开启一个定时任务来检查注册服务的健康情况,对于超过15秒没收到客户端的心跳实例会将它的 healthy属性置为false,此时当客户端不会将该实例的信息发现,如果某个服务的实例超过30秒没收到心跳,则剔除该实例,如果剔除的实例恢复,发送心跳则会恢复。

当有实例注册的时候,我们会看到有个service.init()的方法,该方法的实现主要是将ClientBeatCheckTask加入到线程池当中:

private void putServiceAndInit(Service service) throws NacosException {putService(service);/启动服务检查service.init();consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJSON());}public void init() {HealthCheckReactor.scheduleCheck(clientBeatCheckTask);for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {entry.getValue().setService(this);entry.getValue().init();}}

ClientBeatCheckTask中的run方法主要做两件事心跳时间超过15秒则设置该实例信息为不健康状况和心跳时间超过30秒则删除该实例信息,如下代码:

@Overridepublic void run() {try {if (!getDistroMapper().responsible(service.getName())) {return;}if (!getSwitchDomain().isHealthCheckEnabled()) {return;}List<Instance> instances = service.allIPs(true);// first set health status of instances:for (Instance instance : instances) {//如果心跳时间超过15秒则设置该实例信息为不健康状况if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {if (!instance.isMarked()) {if (instance.isHealthy()) {instance.setHealthy(false);Loggers.EVT_LOG.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",instance.getIp(), instance.getPort(), instance.getClusterName(), service.getName(),UtilsAndCommons.LOCALHOST_SITE, instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());getPushService().serviceChanged(service);SpringContext.getAppContext().publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));}}}}if (!getGlobalConfig().isExpireInstance()) {return;}// then remove obsolete instances:for (Instance instance : instances) {if (instance.isMarked()) {continue;}//如果心跳时间超过30秒则删除该实例信息if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {// delete instanceLoggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(), JSON.toJSONString(instance));deleteIP(instance);}}} catch (Exception e) {Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);}}

到此完成删除实例的过程,整体的时序图如下:

经过源码的研究,我发现问题的根本原因就是我的服务里面维持向nacos服务器端发送心跳的定时任务不再执行了,本质上讲 就是内存溢出的原因导致这个线程不再执行了

应该从jvm和垃圾回收层面找问题,或者说JVM内存溢出造成的tomcat假死

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/75686.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

怎样获取字符串数组的长度_使用sizeof(array) / sizeof(array[0])

使用sizeof() C、C中没有提供直接获取数组长度的函数&#xff0c;对于存放字符串的字符数组提供了一个strlen函数获取长度&#xff0c;那么对于其他类型的数组如何获取他们的长度呢&#xff1f; 其中一种方法是使用sizeof(array) / sizeof(array[0]), 在C语言中习惯上在使用时…

如何将两台Mac显示器设置为单个屏幕配置

​能够在扩展模式下将两个或多个外部显示器连接到Mac是一种解放的屏幕体验&#xff0c;但当每个显示器仍然像独立显示器一样工作时&#xff0c;会导致沮丧。 在这里&#xff0c;我们向你展示如何通过对系统设置进行一些简单的更改&#xff0c;使两个扩展屏幕看起来像一个屏幕。…

【MyBatis】四、MyBatis中的动态SQL标签

动态SQL 动态SQL语句是动态的拼接Mybatis中SQL语句的情况&#xff0c;可以动态的在Mybatis中使用SQL if语句 if语句的xml文件&#xff1a; <!-- List<Emp> getEmpByCondition(Emp emp);--><select id"getEmpByCondition" resultType"Emp&…

C++(20):自定义类型的format

C++(20):format格式化字符串_风静如云的博客-CSDN博客 介绍了如何使用format。 对于自定义类型可以定义适配其需求的format: #include <format> #include <string> #include <iostream> using namespace std; template<class T> class KeyValue { …

【代码随想录day24】不同的二叉搜索树

题目 给你一个整数 n &#xff0c;求恰由 n 个节点组成且节点值从 1 到 n 互不相同的 二叉搜索树 有多少种&#xff1f;返回满足题意的二叉搜索树的种数。 示例 1&#xff1a; 输入&#xff1a;n 3 输出&#xff1a;5示例 2&#xff1a; 输入&#xff1a;n 1 输出&#xf…

数据结构与算法基础-学习-32-选择排序之简单选择排序、堆排序

目录 一、简单选择排序基本思路 二、简单选择排序基本操作 三、简单选择排序算法思路 四、简单选择排序代码 1、SimpleSelectSortSentrySqQueue 五、简单选择排序算法分析 1、记录移动次数 2、记录比较次数 六、简单选择排序Linux环境编译测试 七、堆的定义 八、堆调…

【前端】for循环中 使用 v-model:value 导致引用重复

如果您在循环中使用 v-model:value 导致引用重复的问题&#xff0c;通常是因为 Vue.js 会生成相同的 v-model 绑定&#xff0c;导致引用冲突。为了解决这个问题&#xff0c;您可以采取以下一些方法之一&#xff1a; 使用唯一的属性名&#xff1a; 在循环中&#xff0c;确保每个…

catface,使用Interface定义Controller,实现基于Http协议的RPC调用

catface 前言cat-client 模块EnableCatClientCatClientCatMethodCatNoteCatResponesWrapperCatClientConfigurationCatClientProviderCatClientFactoryCatSendInterceptorCatHttpCatPayloadResolverCatObjectResolverCatLoggerProcessorCatResultProcessorCatSendProcessorAbst…

A6120 Emerson 机箱地震振动监测器

A6120 Emerson 机箱地震振动监测器 艾默生过程管理公司宣布&#xff0c;PlantWeb数字工厂架构已经安装在化学工业CATCH(技术能力评估中心)培训中心&#xff0c;该中心位于北林肯郡格里姆斯比附近的Stallingborough。这座价值820万英镑的设施是为了满足行业对培训中心的需求而开…

教师如何私密发布成绩查询?

随着科技的不断发展&#xff0c;教育领域也逐渐引入了各种在线工具来提高教学效果和管理效率。其中&#xff0c;易查分作为一款功能强大的在线查询工具&#xff0c;帮助老师们更好的利用该工具进行成绩查询。 好消息&#xff01;博主给大家争取到的易查分福利&#xff0c;只需要…

C++中的红黑树

红黑树 搜索二叉树搜索二叉树的模拟实现平衡搜索二叉树(AVL Tree)平衡搜索二叉树的模拟实现红黑树(Red Black Tree)红黑树的模拟实现 红黑树的应用(Map 和 Set)Map和Set的封装 搜索二叉树 搜索二叉树的概念&#xff1a;二叉搜索树又称二叉排序树&#xff0c;它或者是一棵空树&…

电脑文件批量重命名:高效操作技巧

随着时间的推移&#xff0c;我们积累的文件和文件夹数量越来越多&#xff0c;需要对它们进行合理的命名和管理&#xff0c;以便更方便地查找和利用。而文件批量重命名功能可以帮助我们更高效地管理文件夹。下面介绍五种方式&#xff0c;帮助你更好地利用文件批量重命名工具&…

leetcode刷题--栈与递归

文章目录 1. 682 棒球比赛2. 71 简化路径3. 388 文件的最长绝对路径4. 150 逆波兰表达式求值5. 227. 基本计算器II6. 224. 基本计算器7. 20. 有效的括号8. 636. 函数的独占时间9. 591. 标签验证器10. 32.最长有效括号12. 341. 扁平化嵌套列表迭代器13. 394.字符串解码 1. 682 棒…

算法之双指针题型:

双指针例题小总结&#xff1a; 力扣27&#xff1a; 移除元素 力扣题目链接 双指针分为&#xff1a; 快慢双指针&#xff1a;同一个起点&#xff0c;同向出发 相向双指针&#xff1a;从两端出发&#xff0c;方向相反&#xff0c;终会相遇 经典的双指针&#xff08;快慢双指…

消息队列理解

rocketMQ RocketMQ消息存储原理_码上得天下的博客-CSDN博客 领域模型概述 | RocketMQ kafka Kafka基本架构介绍-腾讯云开发者社区-腾讯云 看完这篇Kafka&#xff0c;你也许就会了Kafka_心的步伐的博客-CSDN博客 Apache Kafka

技术解码 | GB28181/SIP/SDP 协议--EasyGBS国标GB28181平台国标视频技术SDP解析

EasyGBS去年更换了新内核&#xff0c;新内核版本的平台性能更加稳定&#xff0c;我们也在不断对平台进行持续的功能优化和升级&#xff0c;始终保持EasyGBS平台在安防视频监控市场的技术先进性。EasyGBS拥有视频直播、录像存储、检索与回放、云台控制、告警上报、语音对讲、平台…

Tomcat服务的部署及配置优化

文章目录 1. Tomcat的相关介绍1.1 Tomcat简介1.2 Tomcat的核心组件1.2.1 Web容器1.2.2 Servlet容器1.2.3 JSP容器 1.3 Tomcat的功能组件1.3.1 connector连接器1.3.2 container容器1.3.2.1 子容器及其相关功能 1.4 主要作用1.5 Tmocat处理请求的过程 2. Tomcata服务部署2.1 安装…

jira获取issue条目transitions id,以用来进行流转实用脚本

官方文档链接地址&#xff1a; The Jira Cloud platform REST API GET Get transitions Returns either all transitions or a transition that can be performed by the user on an issue, based on the issues status. Note, if a request is made for a transition that do…

Httpservletrequest与Httpservletresponse

目录 一、Httpservletrequest 1.1什么是Httpservletrequest 1.2Httpservletrequest中的方法 二、Httpservletresponse 1.1什么是Httpservletresponse 1.2Httpservletresponse的方法 一、Httpservletrequest 1.1什么是Httpservletrequest HttpServletRequest&#xff08;…

CPU和GPU性能优化

在Unity游戏开发中&#xff0c;优化CPU和GPU的性能是非常重要的&#xff0c;可以提高游戏的运行效率、降低功耗和延迟&#xff0c;并提高用户体验。以下是一些优化CPU和GPU性能的方法&#xff1a; 1.优化游戏逻辑和算法 减少不必要的计算和内存操作&#xff0c;例如避免频繁的…