最近生产上的xxl_job框架的一个执行器(nacos客户端)因为分配内存不大,导致频繁与nacos服务端的连接断开,而断开之后虽然客户端服务没有宕掉,但是就是无法重新注册到nacos的服务端上去。
问题定位:
查看服务器日志;发现日志中打印好多内存溢出的情况,回顾上个迭代同事新增了一个功能,就是定时同步底层数据。后面定位问题的时候发现这个定时任务要执行12个小时才会结束,跟踪代码的时候发现每次遍历的时候要初始化大量数据到jvm内存中,这就导致了内存资源紧张,后台日志不断报内存溢出和GC回收异常的问题。
解决方案:
(1)针对有问题的定时业务逻辑重新进行编码优化实现。
(2)扩大jvm分配给程序的启动内存。-Xms2g -Xmx2g统一改成 -Xms6g -Xmx6g.
通过这2个方案整改后,1周内生产再也没有出过类似的问题。
问题解决了,但是我想深究一下为什么nacos掉线后,就注册不上去了呢
然后我想出现这个问题的原因是自己的服务在jvm的分配的内存使用完毕后,在后台运行的向nacos服务端保持心跳的线程阻塞或者被杀死了,导致后面nacos服务器接收不到来自客户端的心跳,从而我的服务后面没有再次被nacos服务端发现。
基于以上情况,我试着从nacos客户端注册与心跳检测方面跟一下源码。
首先最重要的是要找到保持服务于nacos服务端保持心跳的源码,看看这个后台运行的保持心跳的线程的运行机制。
我猜想这个保持心跳的线程应该会在服务首次启动注册的时候激活。
所以先找到注册的接口,去到nacos官网,找到api接口界面的注册接口:https://nacos.io/zh-cn/docs/open-api.html
找到注册接口请求路径为/nacos/v1/ns/instance
然后去gitlab下载nacos的源码,我直接下载的最新的,然后根据/nacos/v1/ns/instance 找到如下方法:
可以看到服务注册成功后,还执行了个方法addBeatInfo
@Overridepublic void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {if (instance.isEphemeral()) {BeatInfo beatInfo = new BeatInfo();beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));beatInfo.setIp(instance.getIp());beatInfo.setPort(instance.getPort());beatInfo.setCluster(instance.getClusterName());beatInfo.setWeight(instance.getWeight());beatInfo.setMetadata(instance.getMetadata());beatInfo.setScheduled(false);beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);}
看一下这个方法是做什么的,可以看到BeatReactor是个线程池类
public BeatReactor(NamingProxy serverProxy, int threadCount) {this.serverProxy = serverProxy;executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setDaemon(true);thread.setName("com.alibaba.nacos.naming.beat.sender");return thread;}});}public void **addBeatInfo**(String serviceName, BeatInfo beatInfo) {NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());BeatInfo existBeat = null;//fix #1733if ((existBeat = dom2Beat.remove(key)) != null) {existBeat.setStopped(true);}dom2Beat.put(key, beatInfo);//这个方法就是启动的后台保持心跳的线程executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());}
看看
schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS)
这个方法的定义
/*** Creates and executes a one-shot action that becomes enabled* after the given delay.** @param command the task to execute* @param delay the time from now to delay execution* @param unit the time unit of the delay parameter* @return a ScheduledFuture representing pending completion of* the task and whose {@code get()} method will return* {@code null} upon completion* @throws RejectedExecutionException if the task cannot be* scheduled for execution* @throws NullPointerException if command is null*/public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);
然后重点部分就是看BeatTask,BeatTask继承Runnable,run方法就是我们的重点,该方法调用了NamingProxy的sendBeat方法,服务端请求地址为/instance/beat的方法
class BeatTask implements Runnable {BeatInfo beatInfo;public BeatTask(BeatInfo beatInfo) {this.beatInfo = beatInfo;}@Overridepublic void run() {if (beatInfo.isStopped()) {return;}long nextTime = beatInfo.getPeriod();try {JSONObject result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);long interval = result.getIntValue("clientBeatInterval");boolean lightBeatEnabled = false;if (result.containsKey(CommonParams.LIGHT_BEAT_ENABLED)) {lightBeatEnabled = result.getBooleanValue(CommonParams.LIGHT_BEAT_ENABLED);}BeatReactor.this.lightBeatEnabled = lightBeatEnabled;if (interval > 0) {nextTime = interval;}int code = NamingResponseCode.OK;if (result.containsKey(CommonParams.CODE)) {code = result.getIntValue(CommonParams.CODE);}if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {Instance instance = new Instance();instance.setPort(beatInfo.getPort());instance.setIp(beatInfo.getIp());instance.setWeight(beatInfo.getWeight());instance.setMetadata(beatInfo.getMetadata());instance.setClusterName(beatInfo.getCluster());instance.setServiceName(beatInfo.getServiceName());instance.setInstanceId(instance.getInstanceId());instance.setEphemeral(true);try {serverProxy.registerService(beatInfo.getServiceName(),NamingUtils.getGroupName(beatInfo.getServiceName()), instance);} catch (Exception ignore) {}}} catch (NacosException ne) {NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",JSON.toJSONString(beatInfo), ne.getErrCode(), ne.getErrMsg());}executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);}}
找一下这个线程是每隔几秒运行一次的,是5秒
接下来我们把目光放到服务端,找到InstanceController的beat方法,如果是参数beat信息的话,说明是第一次发起心跳,则会带有服务实例信息,因为发起心跳成功则服务端会返回下次不要带beat信息的参数,这样客户端第二次就不会携带beat信息了。如果发现没有该服务,又没带beat信息,说明这个服务可能被移除过了,直接返回没找到。如果没有服务,但是发现有beat信息,那就从beat中获取服务实例信息,进行注册.
@CanDistro
@PutMapping("/beat")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public ObjectNode beat(HttpServletRequest request) throws Exception {ObjectNode result = JacksonUtils.createEmptyJsonNode();//设置心跳间隔result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);RsInfo clientBeat = null;//判断有无心跳内容//如果存在心跳内容则不是轻量级心跳就转化为RsInfoif (StringUtils.isNotBlank(beat)) {clientBeat = JacksonUtils.toObj(beat, RsInfo.class);}String clusterName = WebUtils.optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME);String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));if (clientBeat != null) {if (StringUtils.isNotBlank(clientBeat.getCluster())) {clusterName = clientBeat.getCluster();} else {// fix #2533clientBeat.setCluster(clusterName);}ip = clientBeat.getIp();port = clientBeat.getPort();}String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);NamingUtils.checkServiceNameFormat(serviceName);Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);//获取实例的信息Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);//如果实例不存在if (instance == null) {if (clientBeat == null) {result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);return result;}Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, "+ "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName);//根据您心跳内容创建一个实例信息instance = new Instance();instance.setPort(clientBeat.getPort());instance.setIp(clientBeat.getIp());instance.setWeight(clientBeat.getWeight());instance.setMetadata(clientBeat.getMetadata());instance.setClusterName(clusterName);instance.setServiceName(serviceName);instance.setInstanceId(instance.getInstanceId());instance.setEphemeral(clientBeat.isEphemeral());//注册实例serviceManager.registerInstance(namespaceId, serviceName, instance);}//获取服务的信息Service service = serviceManager.getService(namespaceId, serviceName);if (service == null) {throw new NacosException(NacosException.SERVER_ERROR,"service not found: " + serviceName + "@" + namespaceId);}//不存在的话,要创建一个进行处理if (clientBeat == null) {clientBeat = new RsInfo();clientBeat.setIp(ip);clientBeat.setPort(port);clientBeat.setCluster(clusterName);}//开启心跳检查任务service.processClientBeat(clientBeat);result.put(CommonParams.CODE, NamingResponseCode.OK);//5秒间隔if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) {result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval());}//告诉客户端不需要带上心跳信息了,变成轻量级心跳了result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());return result;
}
接下来我们看一下nacos服务端 开启心跳检查任务processClientBeat方法,该方法将ClientBeatProcessor放入到线程池中,接下来我们看下重点看下run方法
public void processClientBeat(final RsInfo rsInfo) {ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();clientBeatProcessor.setService(this);clientBeatProcessor.setRsInfo(rsInfo);//放入线程池中执行 HealthCheckReactor.scheduleNow(clientBeatProcessor);}
更新每次客户端发送的最新的心跳时间,这个主要是是放到内存map中
@Overridepublic void run() {Service service = this.service;if (Loggers.EVT_LOG.isDebugEnabled()) {Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());}String ip = rsInfo.getIp();String clusterName = rsInfo.getCluster();int port = rsInfo.getPort();Cluster cluster = service.getClusterMap().get(clusterName);List<Instance> instances = cluster.allIPs(true);for (Instance instance : instances) {if (instance.getIp().equals(ip) && instance.getPort() == port) {if (Loggers.EVT_LOG.isDebugEnabled()) {Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());}//更新每次客户端发送的最新的心跳时间,这个主要是是放到内存map中 instance.setLastBeat(System.currentTimeMillis());if (!instance.isMarked()) {if (!instance.isHealthy()) {instance.setHealthy(true);Loggers.EVT_LOG.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",cluster.getService().getName(), ip, port, cluster.getName(), UtilsAndCommons.LOCALHOST_SITE);getPushService().serviceChanged(service);}}}}}
那么,为什么nacos服务端为什么要设置注册到nacos上的服务最新的更新时间呢?
这涉及到nacos的健康检查机制
Nacos Server会开启一个定时任务来检查注册服务的健康情况,对于超过15秒没收到客户端的心跳实例会将它的 healthy属性置为false,此时当客户端不会将该实例的信息发现,如果某个服务的实例超过30秒没收到心跳,则剔除该实例,如果剔除的实例恢复,发送心跳则会恢复。
当有实例注册的时候,我们会看到有个service.init()的方法,该方法的实现主要是将ClientBeatCheckTask加入到线程池当中:
private void putServiceAndInit(Service service) throws NacosException {putService(service);/启动服务检查service.init();consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJSON());}public void init() {HealthCheckReactor.scheduleCheck(clientBeatCheckTask);for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {entry.getValue().setService(this);entry.getValue().init();}}
ClientBeatCheckTask中的run方法主要做两件事心跳时间超过15秒则设置该实例信息为不健康状况和心跳时间超过30秒则删除该实例信息,如下代码:
@Overridepublic void run() {try {if (!getDistroMapper().responsible(service.getName())) {return;}if (!getSwitchDomain().isHealthCheckEnabled()) {return;}List<Instance> instances = service.allIPs(true);// first set health status of instances:for (Instance instance : instances) {//如果心跳时间超过15秒则设置该实例信息为不健康状况if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {if (!instance.isMarked()) {if (instance.isHealthy()) {instance.setHealthy(false);Loggers.EVT_LOG.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",instance.getIp(), instance.getPort(), instance.getClusterName(), service.getName(),UtilsAndCommons.LOCALHOST_SITE, instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());getPushService().serviceChanged(service);SpringContext.getAppContext().publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));}}}}if (!getGlobalConfig().isExpireInstance()) {return;}// then remove obsolete instances:for (Instance instance : instances) {if (instance.isMarked()) {continue;}//如果心跳时间超过30秒则删除该实例信息if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {// delete instanceLoggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(), JSON.toJSONString(instance));deleteIP(instance);}}} catch (Exception e) {Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);}}
到此完成删除实例的过程,整体的时序图如下: