Nacos源码分析:心跳机制、健康检查、服务发现、AP集群

文章目录

    • 心跳机制与服务健康检查
      • NacosClient端
      • NacosServer端
      • NacosServer端健康检查
    • 服务发现
      • NacosClient端
      • NacosServer端
    • AP集群
      • 从源码启动集群
      • 心跳设计原理
      • 各节点状态同步
      • 服务实例数据同步
      • 服务实例状态变动同步



心跳机制与服务健康检查

官方文档:发送某个实例的心跳接口信息

心跳机制 在线流程图

健康检查在线流程图

在这里插入图片描述



健康检查:

在这里插入图片描述


NacosClient端

现在先回到NacosClient进行服务注册时的代码,通过调用NamingService.registerInstance(...)方法进行服务注册

public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {// 该方法 往NacosServer中注册一个微服务实例NamingUtils.checkInstanceIsLegal(instance);String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);// 是否为临时实例,如果是则进行延迟发送心跳if (instance.isEphemeral()) {// 创建一个BeatInfo对象,其中存放的该instance实例所有重要的信息,比如服务名、ip、端口、集群、权重等等// 其中有一个关键信息 实例心跳间隔时长,配置为preserved.heart.beat.interval,如果没有默认值为5s  会保存在period属性中BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);// 该方法会执行一次延时任务,在该延迟任务中它会使用延迟任务嵌套调用自己beatReactor.addBeatInfo(groupedServiceName, beatInfo);}// 调用该方法进行微服务注册,instance对象中保存着微服务的各种信息,比如ip、端口、访问权重、健康状态、是否上线等等serverProxy.registerService(groupedServiceName, groupName, instance);
}// 首先是构建一个BeatInfo对象
public BeatInfo buildBeatInfo(String groupedServiceName, Instance instance) {BeatInfo beatInfo = new BeatInfo();beatInfo.setServiceName(groupedServiceName);beatInfo.setIp(instance.getIp());beatInfo.setPort(instance.getPort());beatInfo.setCluster(instance.getClusterName());beatInfo.setWeight(instance.getWeight());beatInfo.setMetadata(instance.getMetadata());beatInfo.setScheduled(false);// 实例心跳间隔时长,配置为preserved.heart.beat.interval,如果没有指定默认值为5sbeatInfo.setPeriod(instance.getInstanceHeartBeatInterval());return beatInfo;
}// 接下来就是执行延迟任务
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());BeatInfo existBeat = null;//fix #1733if ((existBeat = dom2Beat.remove(key)) != null) {existBeat.setStopped(true);}dom2Beat.put(key, beatInfo);// 执行一次延迟任务,在该BeatTask任务中它会使用延迟任务嵌套调用自己executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}

再来看BeatTask类中run()方法具体的实现

public void run() {if (beatInfo.isStopped()) {return;}// 获取心跳间隔时间long nextTime = beatInfo.getPeriod();try {// 调用NacosServer的/v1/ns/instance/beat,发送一次心跳请求JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);......} catch (NacosException ex) {NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());}// 继续调用自己executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
}// 发送心跳请求
public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException {if (NAMING_LOGGER.isDebugEnabled()) {NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString());}Map<String, String> params = new HashMap<String, String>(8);Map<String, String> bodyMap = new HashMap<String, String>(2);if (!lightBeatEnabled) {bodyMap.put("beat", JacksonUtils.toJson(beatInfo));}params.put(CommonParams.NAMESPACE_ID, namespaceId);params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster());params.put("ip", beatInfo.getIp());params.put("port", String.valueOf(beatInfo.getPort()));String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT);return JacksonUtils.toObj(result);
}



NacosServer端

接收客户端的心跳接口/nacos/v1/ns/instance/beat,该接口找到instance所属的service后,再调用processClientBeat()方法进行处理

@CanDistro
@PutMapping("/beat")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public ObjectNode beat(HttpServletRequest request) throws Exception {ObjectNode result = JacksonUtils.createEmptyJsonNode();result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());// beat是JSON格式字符串:实例心跳内容String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);RsInfo clientBeat = null;if (StringUtils.isNotBlank(beat)) {clientBeat = JacksonUtils.toObj(beat, RsInfo.class);}String clusterName = WebUtils.optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME);String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));if (clientBeat != null) {if (StringUtils.isNotBlank(clientBeat.getCluster())) {clusterName = clientBeat.getCluster();} else {// fix #2533clientBeat.setCluster(clusterName);}ip = clientBeat.getIp();port = clientBeat.getPort();}String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);NamingUtils.checkServiceNameFormat(serviceName);Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);// 获取到发送心跳的实例Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);// 实例为null的处理逻辑if (instance == null) {if (clientBeat == null) {result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);return result;}Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, "+ "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName);instance = new Instance();instance.setPort(clientBeat.getPort());instance.setIp(clientBeat.getIp());instance.setWeight(clientBeat.getWeight());instance.setMetadata(clientBeat.getMetadata());instance.setClusterName(clusterName);instance.setServiceName(serviceName);instance.setInstanceId(instance.getInstanceId());instance.setEphemeral(clientBeat.isEphemeral());serviceManager.registerInstance(namespaceId, serviceName, instance);}Service service = serviceManager.getService(namespaceId, serviceName);if (service == null) {throw new NacosException(NacosException.SERVER_ERROR,"service not found: " + serviceName + "@" + namespaceId);}if (clientBeat == null) {clientBeat = new RsInfo();clientBeat.setIp(ip);clientBeat.setPort(port);clientBeat.setCluster(clusterName);}// 更新该服务实例最后发送心跳时间,这里是其他线程异步进行的处理service.processClientBeat(clientBeat);result.put(CommonParams.CODE, NamingResponseCode.OK);if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) {result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval());}result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());return result;
}

修改instance的LastBeat时间

// 这里会有一个线程,直接去执行ClientBeatProcessor中的run()方法,所以核心就是直接去看run()方法的执行逻辑
public void processClientBeat(final RsInfo rsInfo) {ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();clientBeatProcessor.setService(this);clientBeatProcessor.setRsInfo(rsInfo);HealthCheckReactor.scheduleNow(clientBeatProcessor);
}public static ScheduledFuture<?> scheduleNow(Runnable task) {return GlobalExecutor.scheduleNamingHealth(task, 0, TimeUnit.MILLISECONDS);
}public void run() {Service service = this.service;if (Loggers.EVT_LOG.isDebugEnabled()) {Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());}String ip = rsInfo.getIp();String clusterName = rsInfo.getCluster();int port = rsInfo.getPort();// 先得到集群Cluster cluster = service.getClusterMap().get(clusterName);// 再从集群中去实例集合List<Instance> instances = cluster.allIPs(true);// 再遍历集合,通过ip+端口找到对应的实例for (Instance instance : instances) {if (instance.getIp().equals(ip) && instance.getPort() == port) {if (Loggers.EVT_LOG.isDebugEnabled()) {Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());}// 修改该实例最后一次心跳时间instance.setLastBeat(System.currentTimeMillis());if (!instance.isMarked()) {if (!instance.isHealthy()) {instance.setHealthy(true);Loggers.EVT_LOG.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",cluster.getService().getName(), ip, port, cluster.getName(),UtilsAndCommons.LOCALHOST_SITE);getPushService().serviceChanged(service);}}}}
}



NacosServer端健康检查

健康检查在线流程图

健康检查机制是在服务注册的流程中进行的,接下来回忆一下服务注册的流程:

NacosClient端通过调用/v1/ns/instance接口发送post请求会往NacosServer端进行服务实例注册。

而在这个过程中,如果该服务实例是本服务中第一个进行的注册,此时NacosServer都还没有service,那么此时就会去创建一个service。

还会调用该服务的初始化方法service.init();

public void init() {// NacosClient客户端的健康检查HealthCheckReactor.scheduleCheck(clientBeatCheckTask);for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {entry.getValue().setService(this);entry.getValue().init();}
}public static void scheduleCheck(ClientBeatCheckTask task) {// GlobalExecutor.scheduleNamingHealth()就是一个定时任务,延时5秒执行一次,往后每隔5秒执行一次task任务futureMap.putIfAbsent(task.taskKey(), GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS));
}

所以接下来服务实例健康检查的详细代码就在clientBeatCheckTask.run()方法中

public void run() {try {// nacos集群相关判断// 具体的实现是对我们是serviceName进行哈希运算,在对nacos集群节点数进行取模// 最终只允许一个nacos节点对某一个service进行健康检查if (!getDistroMapper().responsible(service.getName())) {return;}if (!getSwitchDomain().isHealthCheckEnabled()) {return;}// 获取该服务所有的临时实例List<Instance> instances = service.allIPs(true);// first set health status of instances:// 首先检查实例的健康状态for (Instance instance : instances) {// 当前时间 - 上一次心跳时间 > 阀值默认15sif (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {if (!instance.isMarked()) {// 修改该实例的健康状态,如果已经是不健康的实例了也就没必要重复修改了if (instance.isHealthy()) {instance.setHealthy(false);Loggers.EVT_LOG.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",instance.getIp(), instance.getPort(), instance.getClusterName(),service.getName(), UtilsAndCommons.LOCALHOST_SITE,instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());getPushService().serviceChanged(service);ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));}}}}if (!getGlobalConfig().isExpireInstance()) {return;}// then remove obsolete instances:// 删除过时的实例for (Instance instance : instances) {if (instance.isMarked()) {continue;}// 当前时间 - 上一次心跳时间 > 阀值默认30s// 再进行删除实例操作if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {// delete instanceLoggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),JacksonUtils.toJson(instance));// 服务实例的删除下线// 这里其实就是往本服务器发送一个delete请求,调用的还是/v1/ns/instance接口deleteIp(instance);}}} catch (Exception e) {Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);}}



服务发现

官方文档:获取全部实例 NamingService接口方法说明

官方文档:查询服务下的实例列表接口信息

服务发现 在线流程图

在这里插入图片描述



NacosClient端

首先是NacosClient这边的流程,我们知道NamingService它是一个最主要的接口,它其中有两类方法是用来获取服务实例的:

  • getAllInstances(...) 获取全部实例
  • selectInstances(...) 根据条件获取过滤后的实例列表。可以获取健康或不健康的服务实例

下面方法主要就是调用getServiceInfo()方法

public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters,boolean subscribe) throws NacosException {ServiceInfo serviceInfo;if (subscribe) {// 会调用getServiceInfo(...)方法获取服务实例列表serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName),StringUtils.join(clusters, ","));} else {serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName),StringUtils.join(clusters, ","));}List<Instance> list;if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {return new ArrayList<Instance>();}return list;
}

下面方法中的核心代码就是updateServiceNow(serviceName, clusters);。这里会先从本地缓存中找ServiceInfo对象,如果没有找到就会去向NacosServer发送请求查询所有服务列表,并将查询到的服务信息保存至本地缓存serviceInfoMap中

public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());String key = ServiceInfo.getKey(serviceName, clusters);if (failoverReactor.isFailoverSwitch()) {return failoverReactor.getService(key);}// 这里会先从NacosClient客户端自己本地缓存serviceInfoMap中取ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);// 刚开始NacosClient客户端本地缓存是为空的,那么就会进入到这里的逻辑中if (null == serviceObj) {// 先创建一个对象存入缓存中serviceObj = new ServiceInfo(serviceName, clusters);serviceInfoMap.put(serviceObj.getKey(), serviceObj);updatingMap.put(serviceName, new Object());// 向NacosServer发送请求查询所有服务列表,并将查询到的服务信息保存至本地缓存serviceInfoMap中updateServiceNow(serviceName, clusters);updatingMap.remove(serviceName);} else if (updatingMap.containsKey(serviceName)) {// 在多线程的环境下,如果上面真正进行发送请求处理结果这个过程中的话,那么这个线程先调用wait()方法进行等待,等上面方法执行完后notifyAll唤醒if (UPDATE_HOLD_INTERVAL > 0) {// hold a moment waiting for update finishsynchronized (serviceObj) {try {serviceObj.wait(UPDATE_HOLD_INTERVAL);} catch (InterruptedException e) {NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);}}}}// 创建定时任务入口,拉取NacosServer端的服务实例列表并更新本地缓存serviceInfoMap,将该Future添加进futureMap集合中scheduleUpdateIfAbsent(serviceName, clusters);return serviceInfoMap.get(serviceObj.getKey());
}// 只是调用updateService(String serviceName, String clusters)方法
private void updateServiceNow(String serviceName, String clusters) {try {updateService(serviceName, clusters);} catch (NacosException e) {NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);}
}// 调用发送请求的方法,并将返回数据保存至NacosClient客户端本地缓存serviceInfoMap中
public void updateService(String serviceName, String clusters) throws NacosException {// 当try{}语句段中主要流程执行完后,该对象会对外层方面中阻塞的线程进行唤醒ServiceInfo oldService = getServiceInfo0(serviceName, clusters);try {// 向NacosServer发送请求/nacos/v1/ns/instance/list,查询所有服务列表// pushReceiver.getUdpPort(),如果服务实例发生了改变,NacosServer会通过udpPort主动推送最新的数据给NacosClientString result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);// 将查询到的服务信息保存至NacosClient客户端本地缓存serviceInfoMap中if (StringUtils.isNotEmpty(result)) {processServiceJson(result);}} finally {if (oldService != null) {synchronized (oldService) {oldService.notifyAll();}}}
}// 发送请求
public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)throws NacosException {final Map<String, String> params = new HashMap<String, String>(8);params.put(CommonParams.NAMESPACE_ID, namespaceId);params.put(CommonParams.SERVICE_NAME, serviceName);params.put("clusters", clusters);// udp端口,如果服务实例发生了改变,NacosServer会通过udpPort主动推送最新的数据给NacosClientparams.put("udpPort", String.valueOf(udpPort));params.put("clientIP", NetUtils.localIP());params.put("healthyOnly", String.valueOf(healthyOnly));return reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET);
}

接下来是延时任务的代码

public void scheduleUpdateIfAbsent(String serviceName, String clusters) {if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {return;}synchronized (futureMap) {if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {return;}// 创建一个延迟任务,在任务最后会延迟嵌套调用本任务ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);}
}// UpdateTask类的run()方法具体代码如下,主要还是通过updateService()方法发送请求并更新NacosClient客户端本地缓存serviceInfoMap
public void run() {// 定时从NacosServer端拉取最新的服务实例列表long delayTime = DEFAULT_DELAY;try {ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));// 如果本地缓存中没有当前ServiceInfo,那么就直接向NacosServer发送一个查询服务实例列表请求,// 并将结果封装为一个新的ServiceInfo对象并存入NacosClient客户端的本地缓存serviceInfoMap中if (serviceObj == null) {updateService(serviceName, clusters);return;}// 发送查询服务实例列表请求,更新NacosClient客户端的本地缓存serviceInfoMapif (serviceObj.getLastRefTime() <= lastRefTime) {updateService(serviceName, clusters);serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));} else {// if serviceName already updated by push, we should not override it// since the push data may be different from pull through force pushrefreshOnly(serviceName, clusters);}lastRefTime = serviceObj.getLastRefTime();if (!notifier.isSubscribed(serviceName, clusters) && !futureMap.containsKey(ServiceInfo.getKey(serviceName, clusters))) {// abort the update taskNAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters);return;}if (CollectionUtils.isEmpty(serviceObj.getHosts())) {incFailCount();return;}delayTime = serviceObj.getCacheMillis();resetFailCount();} catch (Throwable e) {incFailCount();NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);} finally {// 嵌套执行本任务,执行时间和失败次数有关,最长是60秒调用一次executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);}
}



NacosServer端

NacosServer端就会去处理/v1/ns/instance/list请求,NacosServer服务端的代码也很简单:

  • 就是先获取请求参数,其中包括udp端口
  • 通过请求参数中的namespaceId和serviceName去服务注册表serviceMap中获取到对应是Service
  • 再取出该service中所有的实例集合instances
  • 将它根据健康实例与非健康实例分开,赋值给Map<Boolean, List<Instance>> ipMap
  • 遍历imMap,将结果封装成一个Map返回给客户端

具体源码如下

@GetMapping("/list")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)
public ObjectNode list(HttpServletRequest request) throws Exception {// 处理请求参数String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);NamingUtils.checkServiceNameFormat(serviceName);String agent = WebUtils.getUserAgent(request);String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);// udp端口,如果服务实例发生了改变,NacosServer会通过udpPort主动推送最新的数据给NacosClientint udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));String env = WebUtils.optional(request, "env", StringUtils.EMPTY);boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));String app = WebUtils.optional(request, "app", StringUtils.EMPTY);String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));// 调用doSrvIpxt()进行instances实例的获取return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant,healthyOnly);
}
public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP,int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception {ClientInfo clientInfo = new ClientInfo(agent);ObjectNode result = JacksonUtils.createEmptyJsonNode();// 先根据namespaceId+serviceName从注册表serviceMap中取出ServiceService service = serviceManager.getService(namespaceId, serviceName);long cacheMillis = switchDomain.getDefaultCacheMillis();// now try to enable the pushtry {if (udpPort > 0 && pushService.canEnablePush(agent)) {pushService.addClient(namespaceId, serviceName, clusters, agent, new InetSocketAddress(clientIP, udpPort),pushDataSource, tid, app);cacheMillis = switchDomain.getPushCacheMillis(serviceName);}} catch (Exception e) {Loggers.SRV_LOG.error("[NACOS-API] failed to added push client {}, {}:{}", clientInfo, clientIP, udpPort, e);cacheMillis = switchDomain.getDefaultCacheMillis();}// service为null就表示当前服务没有实例,直接返回if (service == null) {if (Loggers.SRV_LOG.isDebugEnabled()) {Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);}result.put("name", serviceName);result.put("clusters", clusters);result.put("cacheMillis", cacheMillis);result.replace("hosts", JacksonUtils.createEmptyArrayNode());return result;}checkIfDisabled(service);List<Instance> srvedIPs;// 获取service服务的所有实例instance集合srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ",")));// filter ips using selector:// 过滤筛选ipif (service.getSelector() != null && StringUtils.isNotBlank(clientIP)) {srvedIPs = service.getSelector().select(clientIP, srvedIPs);}// 如果该服务没有相应的实例,也直接返回if (CollectionUtils.isEmpty(srvedIPs)) {if (Loggers.SRV_LOG.isDebugEnabled()) {Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);}if (clientInfo.type == ClientInfo.ClientType.JAVA&& clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {result.put("dom", serviceName);} else {result.put("dom", NamingUtils.getServiceName(serviceName));}result.put("name", serviceName);result.put("cacheMillis", cacheMillis);result.put("lastRefTime", System.currentTimeMillis());result.put("checksum", service.getChecksum());result.put("useSpecifiedURL", false);result.put("clusters", clusters);result.put("env", env);result.set("hosts", JacksonUtils.createEmptyArrayNode());result.set("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));return result;}Map<Boolean, List<Instance>> ipMap = new HashMap<>(2);ipMap.put(Boolean.TRUE, new ArrayList<>());ipMap.put(Boolean.FALSE, new ArrayList<>());// 将实例集合分为健康实例和不健康实例,存入IPMap中for (Instance ip : srvedIPs) {ipMap.get(ip.isHealthy()).add(ip);}if (isCheck) {result.put("reachProtectThreshold", false);}double threshold = service.getProtectThreshold();if ((float) ipMap.get(Boolean.TRUE).size() / srvedIPs.size() <= threshold) {Loggers.SRV_LOG.warn("protect threshold reached, return all ips, service: {}", serviceName);if (isCheck) {result.put("reachProtectThreshold", true);}ipMap.get(Boolean.TRUE).addAll(ipMap.get(Boolean.FALSE));ipMap.get(Boolean.FALSE).clear();}if (isCheck) {result.put("protectThreshold", service.getProtectThreshold());result.put("reachLocalSiteCallThreshold", false);return JacksonUtils.createEmptyJsonNode();}ArrayNode hosts = JacksonUtils.createEmptyArrayNode();for (Map.Entry<Boolean, List<Instance>> entry : ipMap.entrySet()) {List<Instance> ips = entry.getValue();// 是否只要健康服务实例if (healthyOnly && !entry.getKey()) {continue;}// 遍历所有服务实例,并存入ArrayNode hosts中for (Instance instance : ips) {// remove disabled instance:if (!instance.isEnabled()) {continue;}ObjectNode ipObj = JacksonUtils.createEmptyJsonNode();ipObj.put("ip", instance.getIp());ipObj.put("port", instance.getPort());// deprecated since nacos 1.0.0:ipObj.put("valid", entry.getKey());ipObj.put("healthy", entry.getKey());ipObj.put("marked", instance.isMarked());ipObj.put("instanceId", instance.getInstanceId());ipObj.set("metadata", JacksonUtils.transferToJsonNode(instance.getMetadata()));ipObj.put("enabled", instance.isEnabled());ipObj.put("weight", instance.getWeight());ipObj.put("clusterName", instance.getClusterName());if (clientInfo.type == ClientInfo.ClientType.JAVA&& clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {ipObj.put("serviceName", instance.getServiceName());} else {ipObj.put("serviceName", NamingUtils.getServiceName(instance.getServiceName()));}ipObj.put("ephemeral", instance.isEphemeral());hosts.add(ipObj);}}// 处理响应数据result.replace("hosts", hosts);if (clientInfo.type == ClientInfo.ClientType.JAVA&& clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {result.put("dom", serviceName);} else {result.put("dom", NamingUtils.getServiceName(serviceName));}result.put("name", serviceName);result.put("cacheMillis", cacheMillis);result.put("lastRefTime", System.currentTimeMillis());result.put("checksum", service.getChecksum());result.put("useSpecifiedURL", false);result.put("clusters", clusters);result.put("env", env);result.replace("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));return result;
}



AP集群

从源码启动集群

第一步,运行sql脚本: distribution/conf 目录下的 nacos-mysql.sql 脚本

在这里插入图片描述

第二步:修改 console\src\main\resources 目录下的 application.properties 文件里的mysql配置

在这里插入图片描述

第三步,创建一个目录,用来存放某个nacos节点运行时的数据,同时在目录下创建一个conf目录,在该目录下创建一个cluster.conf文件,文件内容为各个nacos节点的ip和端口

在这里插入图片描述

然后将目录复制三份,并修改目录名

在这里插入图片描述

第四步,运行NacosServer,需要添加两个JVM参数-Dserver.port=8848 -Dnacos.home=D:\nacos-cluster\nacos-8848,其他两个也要做相应的修改

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

结果

在这里插入图片描述



心跳设计原理

我们知道服务实例注册进NacosServer之后,它会定时的发送心跳给NacosServer。在NacosServer端中,每一个service都有一个心跳健康检查任务定时执行。

那么在集群模式下,我现有有两种方案:

  • 方案一,我在每一台Nacos节点上都有定时心跳检测任务
  • 方案二,我只在一台Nacos节点上有定时心跳检测任务,如果有不健康的instance出现或者下线后,这台节点再同步数据给其他节点

Nacos集群,它的实现是只允许一个service在一台nacos节点上进行健康检查机制。就比如下面的代码中

服务实例健康检查的详细代码就在clientBeatCheckTask.run()方法中,第一个if判断就会对serviceName进行哈希运算,在对nacos集群节点数进行取模

public void run() {try {// nacos集群相关判断// 具体的实现是对我们是serviceName进行哈希运算,在对nacos集群节点数进行取模// 最终只允许一个nacos节点对某一个service进行健康检查if (!getDistroMapper().responsible(service.getName())) {return;}if (!getSwitchDomain().isHealthCheckEnabled()) {return;}// 获取该服务所有的临时实例List<Instance> instances = service.allIPs(true);// first set health status of instances:// 首先检查实例的健康状态for (Instance instance : instances) {// 当前时间 - 上一次心跳时间 > 阀值默认15sif (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {if (!instance.isMarked()) {// 修改该实例的健康状态,如果已经是不健康的实例了也就没必要重复修改了if (instance.isHealthy()) {instance.setHealthy(false);Loggers.EVT_LOG.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",instance.getIp(), instance.getPort(), instance.getClusterName(),service.getName(), UtilsAndCommons.LOCALHOST_SITE,instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());getPushService().serviceChanged(service);ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));}}}}if (!getGlobalConfig().isExpireInstance()) {return;}// then remove obsolete instances:// 删除过时的实例for (Instance instance : instances) {if (instance.isMarked()) {continue;}// 当前时间 - 上一次心跳时间 > 阀值默认30s// 再进行删除实例操作if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {// delete instanceLoggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),JacksonUtils.toJson(instance));// 服务实例的删除下线// 这里其实就是往本服务器发送一个delete请求,调用的还是/v1/ns/instance接口deleteIp(instance);}}} catch (Exception e) {Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);}}
public boolean responsible(String serviceName) {final List<String> servers = healthyList;if (!switchDomain.isDistroEnabled() || EnvUtil.getStandaloneMode()) {return true;}if (CollectionUtils.isEmpty(servers)) {// means distro config is not ready yetreturn false;}int index = servers.indexOf(EnvUtil.getLocalAddress());int lastIndex = servers.lastIndexOf(EnvUtil.getLocalAddress());if (lastIndex < 0 || index < 0) {return true;}// 对我们是serviceName进行哈希运算,在对nacos集群节点数进行取模int target = distroHash(serviceName) % servers.size();return target >= index && target <= lastIndex;
}



各节点状态同步

Nacos集群中,存在一个定时任务用来同步各个节点之间的状态,方便感知到其他节点是否已经挂了。

入口在ServerListManager的内部类ServerStatusReporter.run()方法中

// ServerListManager它使用了@Component注解,表示它是一个bean,同时它还使用了@PostConstruct注解,进行初始化方法的执行,
@Component("serverListManager")
public class ServerListManager extends MemberChangeListener {...@PostConstructpublic void init() {// 初始化方法中执行延迟任务,对Nacos集群同步各节点之间的健康状态GlobalExecutor.registerServerStatusReporter(new ServerStatusReporter(), 2000);GlobalExecutor.registerServerInfoUpdater(new ServerInfoUpdater());}...
}

各节点状态同步详细代码如下所示

private class ServerStatusReporter implements Runnable {@Overridepublic void run() {try {if (EnvUtil.getPort() <= 0) {return;}int weight = Runtime.getRuntime().availableProcessors() / 2;if (weight <= 0) {weight = 1;}long curTime = System.currentTimeMillis();String status = LOCALHOST_SITE + "#" + EnvUtil.getLocalAddress() + "#" + curTime + "#" + weight+ "\r\n";// 获取所有的NacosServer节点,这里的servers就是从我们./conf/cluster.conf文件中获取的信息List<Member> allServers = getServers();if (!contains(EnvUtil.getLocalAddress())) {Loggers.SRV_LOG.error("local ip is not in serverlist, ip: {}, serverlist: {}",EnvUtil.getLocalAddress(), allServers);return;}if (allServers.size() > 0 && !EnvUtil.getLocalAddress().contains(IPUtil.localHostIP())) {// 使用for循环,向哥节点发送心跳for (Member server : allServers) {if (Objects.equals(server.getAddress(), EnvUtil.getLocalAddress())) {continue;}// This metadata information exists from 1.3.0 onwards "version"if (server.getExtendVal(MemberMetaDataConstants.VERSION) != null) {Loggers.SRV_LOG.debug("[SERVER-STATUS] target {} has extend val {} = {}, use new api report status",server.getAddress(), MemberMetaDataConstants.VERSION,server.getExtendVal(MemberMetaDataConstants.VERSION));continue;}Message msg = new Message();msg.setData(status);// 发送心跳synchronizer.send(server.getAddress(), msg);}}} catch (Exception e) {Loggers.SRV_LOG.error("[SERVER-STATUS] Exception while sending server status", e);} finally {// 嵌套调用自己GlobalExecutor.registerServerStatusReporter(this, switchDomain.getServerStatusSynchronizationPeriodMillis());}}
}



服务实例数据同步

服务中新增/移除了一个instance实例,对该实例同步至其他节点。

我们在新增/移除实例时,其实他们都调用了这个方法consistencyService.put(key, instances);

在这里插入图片描述

在这里插入图片描述

最终都是会进入到下面的代码中,onPut(key, value);就是往阻塞队列中添加数据,而下面这一行就是对nacos集群中各个节点进行实例数据同步

public void put(String key, Record value) throws NacosException {// 将key和所有服务实例封装的Record对象封装成一个datum对象,并保存到一个map集合中。// 同时还有一个key和DataOperation枚举操作类型添加进阻塞队列的操作。// 后续肯定有一个线程从这个注释队列中取出数据,然后根据key把datum对象取出来onPut(key, value);// 各个nacos节点进行实例数据同步distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,globalConfig.getTaskDispatchPeriod() / 2);
}



服务实例状态变动同步

服务中的某一个实例停止运行,过段时间后,该服务实例的状态就变为不健康。

这个变动同步至其他各个节点是通过ServiceManager的内部类ServiceReporter.run()来实现的

@Component
public class ServiceManager implements RecordListener<Service> {...@PostConstructpublic void init() {// 执行延迟任务,同步服务实例的健康状态GlobalExecutor.scheduleServiceReporter(new ServiceReporter(), 60000, TimeUnit.MILLISECONDS);GlobalExecutor.submitServiceUpdateManager(new UpdatedServiceProcessor());if (emptyServiceAutoClean) {Loggers.SRV_LOG.info("open empty service auto clean job, initialDelay : {} ms, period : {} ms",cleanEmptyServiceDelay, cleanEmptyServicePeriod);// delay 60s, period 20s;// This task is not recommended to be performed frequently in order to avoid// the possibility that the service cache information may just be deleted// and then created due to the heartbeat mechanismGlobalExecutor.scheduleServiceAutoClean(new EmptyServiceAutoClean(), cleanEmptyServiceDelay,cleanEmptyServicePeriod);}try {Loggers.SRV_LOG.info("listen for service meta change");consistencyService.listen(KeyBuilder.SERVICE_META_KEY_PREFIX, this);} catch (NacosException e) {Loggers.SRV_LOG.error("listen for service meta change failed!");}}....
}

每一个命名空间向各节点发送一次请求,发送数据为当前命名空间下所有服务的所有实例信息

private class ServiceReporter implements Runnable {@Overridepublic void run() {try {Map<String, Set<String>> allServiceNames = getAllServiceNames();if (allServiceNames.size() <= 0) {//ignorereturn;}// 遍历命名空间for (String namespaceId : allServiceNames.keySet()) {ServiceChecksum checksum = new ServiceChecksum(namespaceId);// 遍历该命名空间下的所有service服务for (String serviceName : allServiceNames.get(namespaceId)) {if (!distroMapper.responsible(serviceName)) {continue;}Service service = getService(namespaceId, serviceName);if (service == null || service.isEmpty()) {continue;}// 将该服务下的所有实例信息进行字符串拼接,在MD5加密,对结果保存至checksum中service.recalculateChecksum();checksum.addItem(serviceName, service.getChecksum());}// 循环结束,checksum保存的当前命名空间下所有服务的所有实例信息Message msg = new Message();msg.setData(JacksonUtils.toJson(checksum));// 获取所有nacos集群节点Collection<Member> sameSiteServers = memberManager.allMembers();if (sameSiteServers == null || sameSiteServers.size() <= 0) {return;}for (Member server : sameSiteServers) {if (server.getAddress().equals(NetUtils.localServer())) {continue;}// 发送请求synchronizer.send(server.getAddress(), msg);}}} catch (Exception e) {Loggers.SRV_LOG.error("[DOMAIN-STATUS] Exception while sending service status", e);} finally {GlobalExecutor.scheduleServiceReporter(this, switchDomain.getServiceStatusSynchronizationPeriodMillis(),TimeUnit.MILLISECONDS);}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/867494.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于GWO灰狼优化的多目标优化算法matlab仿真

目录 1.程序功能描述 2.测试软件版本以及运行结果展示 3.核心程序 4.本算法原理 4.1灰狼优化算法原理 4.2 多目标优化问题(MOP)的帕累托最优解 4.3 基于GWO的多目标优化算法 5.完整程序 1.程序功能描述 基于GWO灰狼优化的多目标优化算法matlab仿真&#xff0c;目标函数…

Linux多进程和多线程(六)进程间通信-共享内存

多进程(六) 共享内存共享内存的创建 示例: 共享内存删除 共享内存映射 共享内存映射的创建解除共享内存映射示例:写入和读取共享内存中的数据 写入: ### 读取: 大致操作流程: 多进程(六) 共享内存 共享内存是将分配的物理空间直接映射到进程的⽤户虚拟地址空间中, 减少数据在…

Java | Leetcode Java题解之第217题存在重复元素

题目&#xff1a; 题解&#xff1a; class Solution {public boolean containsDuplicate(int[] nums) {Set<Integer> set new HashSet<Integer>();for (int x : nums) {if (!set.add(x)) {return true;}}return false;} }

C#开发的自定义提示和对话框窗体 - 开源研究系列文章

上次开发了《LUAgent服务器端工具》&#xff0c;然后就开发了自定义的提示和对话框窗体&#xff0c;因为这个是无边框窗体&#xff0c;所以不使用默认的MessageBox了&#xff0c;界面美观并且用户体验更好一些。然后就写了此文&#xff0c;让其他读者能够使用或者复用此类库的代…

非对称加密算法原理与应用2——RSA私钥加密文件

作者:私语茶馆 1.相关章节 (1)非对称加密算法原理与应用1——秘钥的生成-CSDN博客 第一章节讲述的是创建秘钥对,并将公钥和私钥导出为文件格式存储。 本章节继续讲如何利用私钥加密内容,包括从密钥库或文件中读取私钥,并用RSA算法加密文件和String。 2.私钥加密的概述…

git pull拉取显示Already up-to-date,但文件并没有更新

1、问题&#xff1a; 使用git pull拉取远程仓库代码&#xff0c;显示更新成功&#xff08;Already up-to-date&#xff09;&#xff0c;但是本地代码没有更新 这是因为本地有尚未提交的更改&#xff0c;和远程代码有冲突导致无法更新 2、解决方法&#xff1a; 可以使用git s…

axios的使用,处理请求和响应,axios拦截器

1、axios官网 https://www.axios-http.cn/docs/interceptors 2、安装 npm install axios 3、在onMouunted钩子函数中使用axios来发送请求&#xff0c;接受响应 4.出现的问题&#xff1a; &#xff08;1&#xff09; 但是如果发送请求请求时间过长&#xff0c;回出现请求待处…

进程控制-exec函数

让父子进程来执行不相干的操作 能够替换进程地址空间的代码.text段 执行另外的程序&#xff0c;不需要创建额外的的地址空间 当前程序中调用另外一个应用程序 指定执行目录下的程序 int execl(const char *path, const char *arg&#xff0c;/* (char *) NULL */); /* pat…

3.python

闯关 3作业 本节关卡&#xff1a; 学习 python 虚拟环境的安装 Python 的基本语法 学会 vscode 远程连接 internstudio 打断点调试 python 程序

数据库管理-第216期 Oracle的高可用-01(20240703)

数据库管理216期 2024-07-03 数据库管理-第216期 Oracle的高可用-01&#xff08;20240703&#xff09;1 MAA简介2 MAA等级2.1 BRONZE2.2 SILVER2.3 GOLD2.4 PLATINUM 3 业务延续性总结 数据库管理-第216期 Oracle的高可用-01&#xff08;20240703&#xff09; 作者&#xff1a;…

cs224n作业4

NMT结构图&#xff1a;&#xff08;具体结构图&#xff09; LSTM基础知识 nmt_model.py&#xff1a; 参考文章&#xff1a;LSTM输出结构描述 #!/usr/bin/env python3 # -*- coding: utf-8 -*-""" CS224N 2020-21: Homework 4 nmt_model.py: NMT Model Penchen…

雨量监测站:守护大地的晴雨表

雨量监测站是一种专门用于测量和记录降雨量的设施。它通常由雨量计、数据采集器、传输装置和数据处理系统组成。雨量计负责感应雨滴的接触&#xff0c;通过一定的机制将降雨量转化为电信号或数字信号。数据采集器则负责收集这些信号&#xff0c;并将其传输至数据处理系统进行分…

FastAPI+vue3+Primeflex教学20240706,渲染阶乘案例

子绝父相 相对定位是相对于自己原本的位置定位。 绝对定位&#xff0c;如果父元素设置了相对定位&#xff0c;则相对于父元素进行绝对定位&#xff0c;否则相对于最近的设置了相对定位的元素进行绝对定位&#xff0c;或者相对于根元素进行绝对定位。 定位有四个方向&#xff0…

38 IO流

目录 C语言的输入和输出流是什么CIO流stringstream的简单介绍 1. C语言的输入与输出 C语言中我们用到的最频繁的输出方式是scanf和printf&#xff0c;scanf&#xff1a;从标准输入设备&#xff08;键盘&#xff09;读取数据&#xff0c;并将值存在变量中。printf&#xff1a;…

高级计算机体系结构--期末教材复习

Chap2 性能评测和并行编程性能评测并行编程为什么需要三次 barrier改进方法 Chap3 互连网络交换和路由二维网格中 XY 路由 死锁、活锁及饿死死锁避免的方法&#xff1a;虚通道、转弯模型二维网格中最小 西向优先、北向最后和负向优先算法转弯模型&#xff1a;超立方体的部分自适…

安装 tesseract

安装 tesseract 1. Ubuntu-24.04 安装 tesseract2. Ubuntu-24.04 安装支持语言3. Windows 安装 tesseract4. Oracle Linux 8 安装 tesseract 1. Ubuntu-24.04 安装 tesseract sudo apt install tesseract-ocr sudo apt install libtesseract-devreference: https://tesseract-…

绝区贰--及时优化降低 LLM 成本和延迟

前言 大型语言模型 (LLM) 为各行各业带来了变革性功能&#xff0c;让用户能够利用尖端的自然语言处理技术处理各种应用。然而&#xff0c;这些强大的 AI 系统的便利性是有代价的 — 确实如此。随着 LLM 变得越来越普及&#xff0c;其计算成本和延迟可能会迅速增加&#xff0c;…

ctfshow web 36d 练手赛

不知所措.jpg 没啥用然后测试了网站可以使用php伪达到目的 ?filephp://filter/convert.base64-encode/resourcetest/../index.<?php error_reporting(0); $file$_GET[file]; $file$file.php; echo $file."<br />"; if(preg_match(/test/is,$file)){inclu…

如何处理 PostgreSQL 中由于表连接顺序不当导致的性能问题?

文章目录 一、理解表连接和连接顺序二、识别由于表连接顺序不当导致的性能问题三、影响表连接顺序的因素四、解决方案手动调整连接顺序创建合适的索引分析数据分布和优化查询逻辑 五、示例分析手动调整连接顺序创建索引优化查询逻辑 六、总结 在 PostgreSQL 中&#xff0c;表连…

论文回顾 | CVPR 2021 | How to Calibrate Your Event Camera | 基于图像重建的事件相机校准新方法

论文速览 | CVPR 2021 | How to Calibrate Your Event Camera | 基于图像重建的事件相机校准新方法 1 引言 在计算机视觉和机器人领域,相机校准一直是一个基础而又重要的问题。传统的相机校准方法主要依赖于从已知校准图案中提取角点,然后通过优化算法求解相机的内参和外参。这…