目录
一、服务发现
二、getServices():获取服务列表
2.1、获取服务列表
2.2、总结图
三、getInstances(serviceId):获取服务实例列表
3.1、从缓存中获取服务信息
3.2、缓存为空,执行订阅服务
3.2.1、调度更新,往线程池中提交一个UpdateTask任务
3.2.2、订阅服务
3.2.3、处理服务信息
3.3、非订阅模式,通过grpc发送ServiceQueryRequest服务查询请求
3.4、筛选满足条件的实例
3.5、总结图
一、服务发现
在discovery-provider项目的pom.xml中,我们引入了如下依赖:
<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId><version>2.2.9.RELEASE</version>
</dependency>
SpringCloud很多功能都是基于SpringBoot项目的自动配置原理来扩展实现的,下面我们查看spring-cloud-starter-alibaba-nacos-discovery-2.2.9.RELEASE.jar包路径下的spring.factories的"org.springframework.boot.autoconfigure.EnableAutoConfiguration"自动装配类配置,如下图:
如上图,跟客户端服务发现有关的有两个自动配置类:NacosDiscoveryClientConfiguration和NacosDiscoveryAutoConfiguration。
相关源码如下:
@Configuration(proxyBeanMethods = false)
@ConditionalOnDiscoveryEnabled
@ConditionalOnBlockingDiscoveryEnabled
@ConditionalOnNacosDiscoveryEnabled
@AutoConfigureBefore({ SimpleDiscoveryClientAutoConfiguration.class,CommonsClientAutoConfiguration.class })
// 在NacosDiscoveryAutoConfiguration自动装配类执行完成后才执行
@AutoConfigureAfter(NacosDiscoveryAutoConfiguration.class)
public class NacosDiscoveryClientConfiguration {// 创建DiscoveryClient bean对象@Beanpublic DiscoveryClient nacosDiscoveryClient(NacosServiceDiscovery nacosServiceDiscovery) {return new NacosDiscoveryClient(nacosServiceDiscovery);}@Bean@ConditionalOnMissingBean@ConditionalOnProperty(value = "spring.cloud.nacos.discovery.watch.enabled",matchIfMissing = true)public NacosWatch nacosWatch(NacosServiceManager nacosServiceManager,NacosDiscoveryProperties nacosDiscoveryProperties) {return new NacosWatch(nacosServiceManager, nacosDiscoveryProperties);}}@Configuration(proxyBeanMethods = false)
// spring.cloud.discovery.enabled=true时才生效,缺省值为true
@ConditionalOnDiscoveryEnabled
// spring.cloud.nacos.discovery.enabled=true时才生效,缺省值为true
@ConditionalOnNacosDiscoveryEnabled
public class NacosDiscoveryAutoConfiguration {@Bean@ConditionalOnMissingBeanpublic NacosDiscoveryProperties nacosProperties() {// 匹配配置文件中以“spring.cloud.nacos.discovery”为前缀的那些属性,// 如namespace、username、password、serverAddr等属性return new NacosDiscoveryProperties();}// 创建NacosServiceDiscovery bean对象@Bean@ConditionalOnMissingBeanpublic NacosServiceDiscovery nacosServiceDiscovery(NacosDiscoveryProperties discoveryProperties,NacosServiceManager nacosServiceManager) {return new NacosServiceDiscovery(discoveryProperties, nacosServiceManager);}}
在NacosDiscoveryAutoConfiguration自动配置类中,创建了一个NacosServiceDiscovery的bean对象,然后在NacosDiscoveryClientConfiguration自动装配时,创建DiscoveryClient的bean对象,传入前面创建的NacosServiceDiscovery对象。
重点关注NacosDiscoveryClient这个类,NacosDiscoveryClient的源码如下:
public class NacosDiscoveryClient implements DiscoveryClient {private static final Logger log = LoggerFactory.getLogger(NacosDiscoveryClient.class);/*** Nacos Discovery Client Description.*/public static final String DESCRIPTION = "Spring Cloud Nacos Discovery Client";private NacosServiceDiscovery serviceDiscovery;public NacosDiscoveryClient(NacosServiceDiscovery nacosServiceDiscovery) {this.serviceDiscovery = nacosServiceDiscovery;}@Overridepublic String description() {return DESCRIPTION;}@Overridepublic List<ServiceInstance> getInstances(String serviceId) {try {return serviceDiscovery.getInstances(serviceId);}catch (Exception e) {throw new RuntimeException("Can not get hosts from nacos server. serviceId: " + serviceId, e);}}@Overridepublic List<String> getServices() {try {return serviceDiscovery.getServices();}catch (Exception e) {log.error("get service name from nacos server fail,", e);return Collections.emptyList();}}}
可以看到,NacosDiscoveryClient实现了SpringCloud的DiscoveryClient接口,重点是getInstances()和getServices()方法,而且都是由NacosServiceDiscovery类去实现。
public class NacosServiceDiscovery {// 跟配置文件中以“spring.cloud.nacos.discovery”前缀的属性配置对应上private NacosDiscoveryProperties discoveryProperties;// nacos服务管理器对象private NacosServiceManager nacosServiceManager;public NacosServiceDiscovery(NacosDiscoveryProperties discoveryProperties,NacosServiceManager nacosServiceManager) {this.discoveryProperties = discoveryProperties;this.nacosServiceManager = nacosServiceManager;}/*** 返回指定group和servic的所有实例*/public List<ServiceInstance> getInstances(String serviceId) throws NacosException {// 配置文件中配置的group组名String group = discoveryProperties.getGroup();// namingService(): 通过反射创建一个NacosNamingService对象// 最终会调用NacosNamingService#selectInstances()方法List<Instance> instances = namingService().selectInstances(serviceId, group,true);// 将Instance包装成NacosServiceInstance对象返回return hostToServiceInstanceList(instances, serviceId);}/*** 返回指定group的所有服务名称*/public List<String> getServices() throws NacosException {// 配置文件中配置的group组名String group = discoveryProperties.getGroup();// namingService(): 通过反射创建一个NacosNamingService对象// 最终会调用NamingGrpcClientProxy#getServiceList()方法ListView<String> services = namingService().getServicesOfServer(1,Integer.MAX_VALUE, group);// 返回所有服务名称return services.getData();}public static List<ServiceInstance> hostToServiceInstanceList(List<Instance> instances, String serviceId) {List<ServiceInstance> result = new ArrayList<>(instances.size());for (Instance instance : instances) {ServiceInstance serviceInstance = hostToServiceInstance(instance, serviceId);if (serviceInstance != null) {result.add(serviceInstance);}}return result;}public static ServiceInstance hostToServiceInstance(Instance instance,String serviceId) {if (instance == null || !instance.isEnabled() || !instance.isHealthy()) {return null;}NacosServiceInstance nacosServiceInstance = new NacosServiceInstance();nacosServiceInstance.setHost(instance.getIp());nacosServiceInstance.setPort(instance.getPort());nacosServiceInstance.setServiceId(serviceId);Map<String, String> metadata = new HashMap<>();metadata.put("nacos.instanceId", instance.getInstanceId());metadata.put("nacos.weight", instance.getWeight() + "");metadata.put("nacos.healthy", instance.isHealthy() + "");metadata.put("nacos.cluster", instance.getClusterName() + "");if (instance.getMetadata() != null) {metadata.putAll(instance.getMetadata());}metadata.put("nacos.ephemeral", String.valueOf(instance.isEphemeral()));nacosServiceInstance.setMetadata(metadata);if (metadata.containsKey("secure")) {boolean secure = Boolean.parseBoolean(metadata.get("secure"));nacosServiceInstance.setSecure(secure);}return nacosServiceInstance;}private NamingService namingService() {return nacosServiceManager.getNamingService();}}
接下来,我们分析前面介绍到的两个重要方法:getInstances(serviceId)和getServices()。
二、getServices():获取服务列表
2.1、获取服务列表
// namingService(): 通过反射创建一个NacosNamingService对象
// NamingFactory#createNamingService(java.util.Properties)
public static NamingService createNamingService(Properties properties) throws NacosException {try {Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.naming.NacosNamingService");Constructor constructor = driverImplClass.getConstructor(Properties.class);return (NamingService) constructor.newInstance(properties);} catch (Throwable e) {throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);}
}
//getServices()调用栈大体如下:namingService().getServicesOfServer(1, Integer.MAX_VALUE, group);NacosNamingService#getServicesOfServerclientProxy.getServiceList(pageNo, pageSize, groupName, selector)NamingClientProxyDelegate#getServiceListgrpcClientProxy.getServiceList(pageNo, pageSize, groupName, selector);// 最终会调用NamingGrpcClientProxy#getServiceList()方法
public ListView<String> getServiceList(int pageNo, int pageSize, String groupName, AbstractSelector selector)throws NacosException {// 构建ServiceListRequest请求(服务列表请求),指定命名空间ID、服务组名ServiceListRequest request = new ServiceListRequest(namespaceId, groupName, pageNo, pageSize);if (selector != null) {if (SelectorType.valueOf(selector.getType()) == SelectorType.label) {request.setSelector(JacksonUtils.toJson(selector));}}// 发送服务列表请求给Nacos服务端,接下来由服务端处理ServiceListResponse response = requestToServer(request, ServiceListResponse.class);// 组装返回值出去ListView<String> result = new ListView<>();result.setCount(response.getCount());result.setData(response.getServiceNames());return result;
}
接下来,我们看看服务端怎么处理这个服务列表请求的。通过对ServiceListRequest类引用的追踪,我们发现是在com.alibaba.nacos.naming.remote.rpc.handler.ServiceListRequestHandler#handle这个方法中对客户端提交的服务列表请求进行处理的。
// 处理客户端提交的服务列表请求
public ServiceListResponse handle(ServiceListRequest request, RequestMeta meta) throws NacosException {// ServiceManager.getInstance()通过单例返回一个ServiceManager对象/*** 获取指定命令空间下的所有服务,在ServiceManager中存在一个map保存着每个命名空间中的所有服务。* ConcurrentHashMap<String, Set<Service>> namespaceSingletonMaps = new ConcurrentHashMap<>(1 << 2)* key: namespaceId* value: Set<Service>* 注册实例的时候,就往这个map写入了数据** ServiceManager.getInstance().getSingletons(request.getNamespace())相当于执行:* namespaceSingletonMaps.getOrDefault(namespace, new HashSet<>(1))*/Collection<Service> serviceSet = ServiceManager.getInstance().getSingletons(request.getNamespace());// 构建响应结果ServiceListResponse result = ServiceListResponse.buildSuccessResponse(0, new LinkedList<>());if (!serviceSet.isEmpty()) {// 过滤指定分组的Service,添加groupServiceName,格式如:groupA@@serviceACollection<String> serviceNameSet = selectServiceWithGroupName(serviceSet, request.getGroupName());// 按分页裁剪serviceNameSetList<String> serviceNameList = ServiceUtil.pageServiceName(request.getPageNo(), request.getPageSize(), serviceNameSet);result.setCount(serviceNameSet.size());result.setServiceNames(serviceNameList);}return result;
}
从源码可以看出,Nacos服务端从ServiceManager管理器中的一个map(namespaceSingletonMaps)中拿出指定命名空间那些Service,并根据筛选条件过滤满足条件的Service,然后组装好groupServiceName(格式如:groupA@@serviceA)并返回。
2.2、总结图
三、getInstances(serviceId):获取服务实例列表
// 调用栈如下:
// namingService().selectInstances(serviceId, group,true);// NamingService#selectInstances(serviceName, groupName, healthy, true)// NamingService#selectInstances(serviceName, groupName, new ArrayList<String>(), healthy, true)
// getInstances(serviceId)方法最终会调用NacosNamingService#selectInstances()获取实例信息。
public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy,boolean subscribe) throws NacosException {ServiceInfo serviceInfo;// 集群名称,使用逗号分隔String clusterString = StringUtils.join(clusters, ",");// 是否订阅,默认是订阅的if (subscribe) {/*** 1.从缓存中获取ServiceInfo* ConcurrentMap<String, ServiceInfo> serviceInfoMap* key: groupName@@serviceName 或者 groupName@@serviceName@@clusterString* value: ServiceInfo*/// 示例:serviceName=discovery-provider groupName=DEFAULT_GROUPserviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString);// 2.缓存为空,执行订阅服务if (null == serviceInfo) {serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);}} else {// 3.非订阅,通过grpc发送ServiceQueryRequest服务查询请求serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, 0, false);}// 4.筛选满足条件的实例return selectInstances(serviceInfo, healthy);
}
上述流程的基本逻辑为:
- 如果是订阅模式,则直接从本地缓存获取服务信息(ServiceInfo),然后从中获取实例列表,这是因为订阅机制会自动同步服务器实例的变化到本地。如果本地缓存中没有,那说明是首次调用,则进行订阅,在订阅完成后会获得到服务信息。
- 如果是非订阅模式,那就直接请求服务器端,获得服务信息。
3.1、从缓存中获取服务信息
public ServiceInfo getServiceInfo(final String serviceName, final String groupName, final String clusters) {NAMING_LOGGER.debug("failover-mode: {}", failoverReactor.isFailoverSwitch());// 组装服务名(带组名):groupName@@serviceName// 例如:DEFAULT_GROUP@@discovery-providerString groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);// 如果指定了集群,那么key还会加上"@@clusters"String key = ServiceInfo.getKey(groupedServiceName, clusters);if (failoverReactor.isFailoverSwitch()) {return failoverReactor.getService(key);}// ConcurrentMap<String, ServiceInfo> serviceInfoMap// 从缓存中获取服务信息return serviceInfoMap.get(key);
}
3.2、缓存为空,执行订阅服务
serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);// clientProxy在构造方法中初始化为:NamingClientProxyDelegate
this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, nacosClientProperties, changeNotifier);
实际上调用的是NamingClientProxyDelegate#subscribe()方法:
public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {NAMING_LOGGER.info("[SUBSCRIBE-SERVICE] service:{}, group:{}, clusters:{} ", serviceName, groupName, clusters);// 服务名称(带组名) 格式:groupName@@serviceNameString serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);// 如果集群名称非空,key还需要拼接上集群名称String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters);// 调度更新,往线程池中提交一个UpdateTask任务serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);// 获取缓存中的服务信息ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);if (null == result || !isSubscribed(serviceName, groupName, clusters)) {// 缓存中不存在对应的服务信息 或者 SubscriberRedoData还未注册,则执行订阅result = grpcClientProxy.subscribe(serviceName, groupName, clusters);}// 处理服务信息:获取老的服务信息,将新的服务信息重新存入客户端缓存中,对比新的服务信息,如发生变更,则发布实例变更数据,并同步serviceInfo数据到本地文件serviceInfoHolder.processServiceInfo(result);return result;
}
主要逻辑有下面三个,分析如下。
3.2.1、调度更新,往线程池中提交一个UpdateTask任务
public void scheduleUpdateIfAbsent(String serviceName, String groupName, String clusters) {if (!asyncQuerySubscribeService) {return;}// 组装key 格式:groupName@@serviceName@@clustersString serviceKey = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);// Map<String, ScheduledFuture<?>> futureMap = new HashMap<>()// futureMap用于保存UpdateTask线程池任务的执行结果if (futureMap.get(serviceKey) != null) {return;}synchronized (futureMap) {// double check双重检查,如果非空,直接返回,也就是相同的groupName@@serviceName@@clusters,只会存在一个UpdateTask任务if (futureMap.get(serviceKey) != null) {return;}// 往线程池中添加一个更新任务// UpdateTask实现了Runnable接口,需要关注其run()方法ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, groupName, clusters));futureMap.put(serviceKey, future);}
}private synchronized ScheduledFuture<?> addTask(UpdateTask task) {// 延迟1s执行UpdateTaskreturn executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
}
可以看到,使用了一个map来保存线程池任务的响应,延迟1s执行调度更新任务。我们看下UpdateTask的源码:
public class UpdateTask implements Runnable {long lastRefTime = Long.MAX_VALUE;private boolean isCancel;private final String serviceName;private final String groupName;private final String clusters;private final String groupedServiceName;private final String serviceKey;/*** the fail situation. 1:can't connect to server 2:serviceInfo's hosts is empty*/private int failCount = 0;public UpdateTask(String serviceName, String groupName, String clusters) {this.serviceName = serviceName;this.groupName = groupName;this.clusters = clusters;this.groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);this.serviceKey = ServiceInfo.getKey(groupedServiceName, clusters);}@Overridepublic void run() {long delayTime = DEFAULT_DELAY;try {if (!changeNotifier.isSubscribed(groupName, serviceName, clusters) && !futureMap.containsKey(serviceKey)) {NAMING_LOGGER.info("update task is stopped, service:{}, clusters:{}", groupedServiceName, clusters);isCancel = true;return;}ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey);if (serviceObj == null) {// 使用grpc向服务端发送ServiceQueryRequest服务查询请求serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);// 处理服务信息:获取老的服务信息,将新的服务信息重新存入客户端缓存中,对比新的服务信息,如发生变更,则发布实例变更数据,并同步serviceInfo数据到本地文件serviceInfoHolder.processServiceInfo(serviceObj);lastRefTime = serviceObj.getLastRefTime();return;}if (serviceObj.getLastRefTime() <= lastRefTime) {// 使用grpc向服务端发送ServiceQueryRequest服务查询请求serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);// 处理服务信息:获取老的服务信息,将新的服务信息重新存入客户端缓存中,对比新的服务信息,如发生变更,则发布实例变更数据,并同步serviceInfo数据到本地文件serviceInfoHolder.processServiceInfo(serviceObj);}lastRefTime = serviceObj.getLastRefTime();if (CollectionUtils.isEmpty(serviceObj.getHosts())) {// 记录失败次数incFailCount();return;}// TODO multiple time can be configured.delayTime = serviceObj.getCacheMillis() * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE;// 重置失败次数resetFailCount();} catch (NacosException e) {handleNacosException(e);} catch (Throwable e) {handleUnknownException(e);} finally {if (!isCancel) {// 注意:延时时间最长为60s,时长和失败次数相关(失败次数越大,延时时间越长)executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60),TimeUnit.MILLISECONDS);}}}private void handleNacosException(NacosException e) {incFailCount();int errorCode = e.getErrCode();if (NacosException.SERVER_ERROR == errorCode) {handleUnknownException(e);}NAMING_LOGGER.warn("Can't update serviceName: {}, reason: {}", groupedServiceName, e.getErrMsg());}private void handleUnknownException(Throwable throwable) {incFailCount();NAMING_LOGGER.warn("[NA] failed to update serviceName: {}", groupedServiceName, throwable);}private void incFailCount() {int limit = 6;if (failCount == limit) {return;}failCount++;}private void resetFailCount() {failCount = 0;}
}
run()方法主要逻辑就是使用grpc向服务端发送ServiceQueryRequest服务查询请求,然后处理服务信息,获取老的服务信息,将新的服务信息重新存入客户端缓存中,对比新的服务信息,如发生变更,则发布实例变更数据,并同步serviceInfo数据到本地文件。
这里有重试机制,最多重试6次,延时时间最长为60s,时长和失败次数相关(失败次数越大,延时时间越长)。
3.2.2、订阅服务
public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {if (NAMING_LOGGER.isDebugEnabled()) {NAMING_LOGGER.debug("[GRPC-SUBSCRIBE] service:{}, group:{}, cluster:{} ", serviceName, groupName, clusters);}// 缓存SubscriberRedoData重做数据,定时使用redoData重新订阅,// 具体实现在RedoScheduledTask(由NamingGrpcRedoService定时调度),最终调用的也是NamingGrpcClientProxy#doSubscribe// 缓存重做数据保存在map中:private final ConcurrentMap<String, SubscriberRedoData> subscribes = new ConcurrentHashMap<>();redoService.cacheSubscriberForRedo(serviceName, groupName, clusters);// 使用grpc发送服务订阅请求return doSubscribe(serviceName, groupName, clusters);
}public void cacheSubscriberForRedo(String serviceName, String groupName, String cluster) {String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), cluster);SubscriberRedoData redoData = SubscriberRedoData.build(serviceName, groupName, cluster);// private final ConcurrentMap<String, SubscriberRedoData> subscribes = new ConcurrentHashMap<>();synchronized (subscribes) {subscribes.put(key, redoData);}
}
订阅服务首先会缓存SubscriberRedoData重做数据,实际上就是保存在一个map中,后续可以定时使用SubscriberRedoData重做数据来重新订阅,然后使用grpc发送服务订阅请求。
我们来看下如何订阅服务的。
public ServiceInfo doSubscribe(String serviceName, String groupName, String clusters) throws NacosException {// 构建一个SubscribeServiceRequest客户端订阅请求// 服务端处理代码: com.alibaba.nacos.naming.remote.rpc.handler.SubscribeServiceRequestHandler.handleSubscribeServiceRequest request = new SubscribeServiceRequest(namespaceId, groupName, serviceName, clusters,true);// grpc请求Nacos服务端进行订阅SubscribeServiceResponse response = requestToServer(request, SubscribeServiceResponse.class);// 标记SubscriberRedoData重做数据为已订阅redoService.subscriberRegistered(serviceName, groupName, clusters);return response.getServiceInfo();
}
通过grpc向Nacos服务端发起一个订阅请求,服务端真正的处理是在:com.alibaba.nacos.naming.remote.rpc.handler.SubscribeServiceRequestHandler#handle()方法。
public SubscribeServiceResponse handle(SubscribeServiceRequest request, RequestMeta meta) throws NacosException {String namespaceId = request.getNamespace();String serviceName = request.getServiceName();String groupName = request.getGroupName();String app = request.getHeader("app", "unknown");String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);// 构建一个Service服务,指定为临时实例Service service = Service.newService(namespaceId, groupName, serviceName, true);// 构建Subscriber订阅者对象Subscriber subscriber = new Subscriber(meta.getClientIp(), meta.getClientVersion(), app, meta.getClientIp(),namespaceId, groupedServiceName, 0, request.getClusters());// serviceStorage.getData(service): 从缓存中获取serviceInfo// metadataManager.getServiceMetadata(service).orElse(null): 从内存(map)获取ServiceMetadata// ServiceUtil.selectInstancesWithHealthyProtection(): 仅包含有保护机制的健康实例ServiceInfo serviceInfo = ServiceUtil.selectInstancesWithHealthyProtection(serviceStorage.getData(service),metadataManager.getServiceMetadata(service).orElse(null), subscriber.getCluster(), false,true, subscriber.getIp());if (request.isSubscribe()) {// 订阅服务clientOperationService.subscribeService(service, subscriber, meta.getConnectionId());NotifyCenter.publishEvent(new SubscribeServiceTraceEvent(System.currentTimeMillis(),meta.getClientIp(), service.getNamespace(), service.getGroup(), service.getName()));} else {// 取消订阅服务clientOperationService.unsubscribeService(service, subscriber, meta.getConnectionId());NotifyCenter.publishEvent(new UnsubscribeServiceTraceEvent(System.currentTimeMillis(),meta.getClientIp(), service.getNamespace(), service.getGroup(), service.getName()));}return new SubscribeServiceResponse(ResponseCode.SUCCESS.getCode(), "success", serviceInfo);
}
我们重点关注订阅服务的方法:
public void subscribeService(Service service, Subscriber subscriber, String clientId) {Service singleton = ServiceManager.getInstance().getSingletonIfExist(service).orElse(service);Client client = clientManager.getClient(clientId);if (!clientIsLegal(client, clientId)) {return;}// 添加到订阅者列表中,实际上就是保存在map中// 订阅者列表: protected final ConcurrentHashMap<Service, Subscriber> subscribers = new ConcurrentHashMap<>(16, 0.75f, 1);client.addServiceSubscriber(singleton, subscriber);client.setLastUpdatedTime();// 发布客户端订阅事件NotifyCenter.publishEvent(new ClientOperationEvent.ClientSubscribeServiceEvent(singleton, clientId));
}
将service添加到订阅者列表中,然后发布客户端订阅事件,这个在之前分析过,客户端订阅事件是在com.alibaba.nacos.naming.core.v2.index.ClientServiceIndexesManager#handleClientOperation进行处理,
核心逻辑就是将服务添加到ClientServiceIndexesManager的subscriberIndexes订阅者列表中:
private final ConcurrentMap<Service, Set<String>> subscriberIndexes = new ConcurrentHashMap<>();
3.2.3、处理服务信息
public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {String serviceKey = serviceInfo.getKey();if (serviceKey == null) {return null;}// 获取老的服务ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());if (isEmptyOrErrorPush(serviceInfo)) {//empty or error push, just ignorereturn oldService;}// 重新存入客户端缓存中serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);// 对比下服务信息是否发生变更boolean changed = isChangedServiceInfo(oldService, serviceInfo);if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));}MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());if (changed) {NAMING_LOGGER.info("current ips:({}) service: {} -> {}", serviceInfo.ipCount(), serviceInfo.getKey(),JacksonUtils.toJson(serviceInfo.getHosts()));// 如果发生改变,发送实例变更事件,处理源码在:InstancesChangeNotifierNotifyCenter.publishEvent(new InstancesChangeEvent(notifierEventScope, serviceInfo.getName(), serviceInfo.getGroupName(),serviceInfo.getClusters(), serviceInfo.getHosts()));// 同步serviceInfo数据到本地文件DiskCache.write(serviceInfo, cacheDir);}return serviceInfo;
}
3.3、非订阅模式,通过grpc发送ServiceQueryRequest服务查询请求
真正执行的是com.alibaba.nacos.client.naming.remote.NamingClientProxyDelegate#queryInstancesOfService():
public ServiceInfo queryInstancesOfService(String serviceName, String groupName, String clusters, int udpPort,boolean healthyOnly) throws NacosException {return grpcClientProxy.queryInstancesOfService(serviceName, groupName, clusters, udpPort, healthyOnly);
}public ServiceInfo queryInstancesOfService(String serviceName, String groupName, String clusters, int udpPort,boolean healthyOnly) throws NacosException {// 构建服务查询请求// Nacos服务端处理是在:com.alibaba.nacos.naming.remote.rpc.handler.ServiceQueryRequestHandler.handleServiceQueryRequest request = new ServiceQueryRequest(namespaceId, serviceName, groupName);request.setCluster(clusters);request.setHealthyOnly(healthyOnly);request.setUdpPort(udpPort);// 通过grpc请求Nacos服务端处理QueryServiceResponse response = requestToServer(request, QueryServiceResponse.class);return response.getServiceInfo();
}
queryInstancesOfService()核心就是构建了一个服务查询请求,通过grpc请求Nacos服务端,接下来我们直接看服务端的处理代码,具体是在:com.alibaba.nacos.naming.remote.rpc.handler.ServiceQueryRequestHandler#handle。
public QueryServiceResponse handle(ServiceQueryRequest request, RequestMeta meta) throws NacosException {String namespaceId = request.getNamespace();String groupName = request.getGroupName();String serviceName = request.getServiceName();Service service = Service.newService(namespaceId, groupName, serviceName);String cluster = null == request.getCluster() ? "" : request.getCluster();boolean healthyOnly = request.isHealthyOnly();// 从缓存中获取serviceInfoServiceInfo result = serviceStorage.getData(service);// 从内存(map)获取ServiceMetadataServiceMetadata serviceMetadata = metadataManager.getServiceMetadata(service).orElse(null);// 获取有保护机制的健康实例result = ServiceUtil.selectInstancesWithHealthyProtection(result, serviceMetadata, cluster, healthyOnly, true,meta.getClientIp());return QueryServiceResponse.buildSuccessResponse(result);
}
3.4、筛选满足条件的实例
private List<Instance> selectInstances(ServiceInfo serviceInfo, boolean healthy) {List<Instance> list;if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {return new ArrayList<>();}// 遍历所有实例,直接移除掉不满足条件的实例Iterator<Instance> iterator = list.iterator();while (iterator.hasNext()) {Instance instance = iterator.next();// 筛选出健康、启用、权重大于0的实例if (healthy != instance.isHealthy() || !instance.isEnabled() || instance.getWeight() <= 0) {iterator.remove();}}return list;
}
3.5、总结图