Dubbo 3.x源码(26)—Dubbo服务引用源码(9)应用级服务发现订阅refreshServiceDiscoveryInvoker

基于Dubbo 3.1,详细介绍了Dubbo服务的发布与引用的源码。

此前我们学习了MigrationRuleHandler这个处理器,它用于通过动态更改规则来控制迁移行为。MigrationRuleListener的onrefer方法是Dubbo2.x 接口级服务发现与Dubbo3.x应用级服务发现之间迁移的关键。

我们最后讲到了MigrationRuleHandler的refreshInvoker方法,该方法除了刷新invoker迁移新规则之外,还负责远程服务发现订阅的逻辑,即消费者能发现远程服务提供方的地址列表,而应用级的服务引入订阅则是通过refreshServiceDiscoveryInvoker方法实现的。

我们此前学习了接口级服务引入的方法refreshInterfaceInvoker的源码,应用级的服务引入和此方法有很多相似之处,对于相同的方法,我们不再赘述。

Dubbo 3.x服务引用源码:

  1. Dubbo 3.x源码(11)—Dubbo服务的发布与引用的入口
  2. Dubbo 3.x源码(18)—Dubbo服务引用源码(1)
  3. Dubbo 3.x源码(19)—Dubbo服务引用源码(2)
  4. Dubbo 3.x源码(20)—Dubbo服务引用源码(3)
  5. Dubbo 3.x源码(21)—Dubbo服务引用源码(4)
  6. Dubbo 3.x源码(22)—Dubbo服务引用源码(5)服务引用bean的获取以及懒加载原理
  7. Dubbo 3.x源码(23)—Dubbo服务引用源码(6)MigrationRuleListener迁移规则监听器
  8. Dubbo 3.x源码(24)—Dubbo服务引用源码(7)接口级服务发现订阅refreshInterfaceInvoker
  9. Dubbo 3.x源码(25)—Dubbo服务引用源码(8)notify订阅服务通知更新
  10. Dubbo 3.x源码(26)—Dubbo服务引用源码(9)应用级服务发现订阅refreshServiceDiscoveryInvoker
  11. Dubbo 3.x源码(27)—Dubbo服务引用源码(10)subscribeURLs订阅应用级服务url

Dubbo 3.x服务发布源码:

  1. Dubbo 3.x源码(11)—Dubbo服务的发布与引用的入口
  2. Dubbo 3.x源码(12)—Dubbo服务发布导出源码(1)
  3. Dubbo 3.x源码(13)—Dubbo服务发布导出源码(2)
  4. Dubbo 3.x源码(14)—Dubbo服务发布导出源码(3)
  5. Dubbo 3.x源码(15)—Dubbo服务发布导出源码(4)
  6. Dubbo 3.x源码(16)—Dubbo服务发布导出源码(5)
  7. Dubbo 3.x源码(17)—Dubbo服务发布导出源码(6)

1 refreshServiceDiscoveryInvoker刷新应用级inovker

该方法具有应用级别的远程服务发现、引入、订阅能力,大概逻辑为:

  1. 首先判断是否需要刷新serviceDiscoveryInvoker,即重新创建真实的服务级invoker:如果真实serviceDiscoveryInvoker不存在,或者已被销毁,或者内部没有Directory,则需要刷新。
  2. 一般情况下,当启动消费者并首次执行refer的时候,真实serviceDiscoveryInvoker为null,需要创建serviceDiscoveryInvoker。
  3. 通过注册中心操作类registryProtocol#getServiceDiscoveryInvoker方法来引入服务提供者serviceDiscoveryInvoker,这是消费者进行应用级别服务发现订阅的核心逻辑。
/*** MigrationInvoker的方法** 刷新应用invoker* @param latch 倒计数器*/
protected void refreshServiceDiscoveryInvoker(CountDownLatch latch) {/** 1 如果MigrationInvoker内部的真实serviceDiscoveryInvoker存在,那么清空真实serviceDiscoveryInvoker的directory的*/clearListener(serviceDiscoveryInvoker);/** 2 判断是否需要刷新服务级serviceDiscoveryInvoker* 如果真实invoker不存在,或者已被销毁,或者内部没有Directory* 一般情况下,当启动消费者并首次执行refer的时候,真实invoker为null,需要创建*/if (needRefresh(serviceDiscoveryInvoker)) {if (logger.isDebugEnabled()) {logger.debug("Re-subscribing instance addresses, current interface " + type.getName());}//如果不为null,则销毁if (serviceDiscoveryInvoker != null) {serviceDiscoveryInvoker.destroy();}/** 3 通过注册中心操作类registryProtocol获取真实serviceDiscoveryInvoker** 这是消费者进行应用级服务发现订阅的核心逻辑,设这里的registryProtocol类型为InterfaceCompatibleRegistryProtocol*/serviceDiscoveryInvoker = registryProtocol.getServiceDiscoveryInvoker(cluster, registry, type, url);}/** 设置监听器*/setListener(serviceDiscoveryInvoker, () -> {latch.countDown();if (reportService.hasReporter()) {reportService.reportConsumptionStatus(reportService.createConsumptionReport(consumerUrl.getServiceInterface(), consumerUrl.getVersion(), consumerUrl.getGroup(), "app"));}if (step == APPLICATION_FIRST) {calcPreferredInvoker(rule);}});
}

2 getServiceDiscoveryInvoker获取invoker

这是默认消费者进行应用级服务发现订阅的核心逻辑,这里的registryProtocol类型为InterfaceCompatibleRegistryProtocol。

  1. 调用父类RegistryProtocol#getRegistryUrl方法,将注册中心协议url转换为应用级服务发现协议url,即service-discovery-registry协议。
  2. 随后调用getRegistry方法,根据应用级服务发现协议url获取注册中心操作类Registry,service-discovery-registry协议对应着ServiceDiscoveryRegistry。
  3. 创建应用级动态注册心中目录ServiceDiscoveryRegistryDirectory,随后调用doCreateInvoker方法创建服务引入invoker。
/*** InterfaceCompatibleRegistryProtocol的方法* <p>* 获取应用级别invoker** @param cluster  集群操作对象* @param registry 注册中心对象,例如ListenerRegistryWrapper(ZookeeperRegistry)* @param type     接口类型* @param url      注册中心协议url,协议是真实注册中心协议,例如zookeeper* @return 真实invoker*/
@Override
public <T> ClusterInvoker<T> getServiceDiscoveryInvoker(Cluster cluster, Registry registry, Class<T> type, URL url) {//调用父类RegistryProtocol的getRegistryUrl方法,将注册中心协议url转换为应用级服务发现协议url,即service-discovery-registry协议//根据应用级服务发现协议url获取注册中心操作类Registry,service-discovery-registry对应着ServiceDiscoveryRegistryregistry = getRegistry(super.getRegistryUrl(url));/** 创建应用级动态注册心中目录ServiceDiscoveryRegistryDirectory*/DynamicDirectory<T> directory = new ServiceDiscoveryRegistryDirectory<>(type, url);/** 创建invoker*/return doCreateInvoker(directory, cluster, registry, type);
}

3 ServiceDiscoveryRegistryDirectory注册中心目录

ServiceDiscoveryRegistryDirectory是基于应用级注册中心的服务发现使用的服务目录,我们在接口级服务发现订阅refreshInterfaceInvoker部分已经讲过Directory的作用了,在此不再赘述。

public ServiceDiscoveryRegistryDirectory(Class<T> serviceType, URL url) {//父类DynamicDirectory的构造器super(serviceType, url);moduleModel = getModuleModel(url.getScopeModel());//服务提供者优先的属性Set<ProviderFirstParams> providerFirstParams = url.getOrDefaultApplicationModel().getExtensionLoader(ProviderFirstParams.class).getSupportedExtensionInstances();if (CollectionUtils.isEmpty(providerFirstParams)) {this.providerFirstParams = null;} else {if (providerFirstParams.size() == 1) {this.providerFirstParams = Collections.unmodifiableSet(providerFirstParams.iterator().next().params());} else {Set<String> params = new HashSet<>();for (ProviderFirstParams paramsFilter : providerFirstParams) {if (paramsFilter.params() == null) {break;}params.addAll(paramsFilter.params());}this.providerFirstParams = Collections.unmodifiableSet(params);}}//获取消费者需要查询过滤的协议String protocol = consumerUrl.getParameter(PROTOCOL_KEY, consumerUrl.getProtocol());//消费者协议服务keyconsumerProtocolServiceKey = new ProtocolServiceKey(consumerUrl.getServiceInterface(), consumerUrl.getVersion(), consumerUrl.getGroup(),!CommonConstants.CONSUMER.equals(protocol) ? protocol : null);
}

4 doCreateInvoker创建invoker

该方法由InterfaceCompatibleRegistryProtocol的父类RegistryProtocol实现。大概步骤为:

  1. 首先根据消费者信息转换为消费者注册信息url,内部包括消费者ip、指定引用的protocol(默认consumer协议)、指定引用的服务接口、指定引用的方法以及其他消费者信息。
  2. 调用registry.register方法将消费者注册信息url注册到注册中心。
  3. 调用directory.buildRouterChain方法构建服务调用路由链。
  4. 调用directory.subscribe方法进行服务发现、引入并订阅服务。
  5. 调用cluster.join方法进行集群容错能力包装。

接口级的服务发现同样是调用该方法,区别是应用级调用的方法中registry参数底层对象为ServiceDiscoveryRegistry类型,directory参数为ServiceDiscoveryRegistryDirectory类型,接口级调用的方法中registry参数底层对象为ZookeeperRegistry类型,directory参数为RegistryDirectory类型。


/*** RegistryProtocol的方法* 创建ClusterInvoker** @param directory 动态目录* @param cluster   集群* @param registry  注册中心* @param type      服务接口类型* @return ClusterInvoker*/
protected <T> ClusterInvoker<T> doCreateInvoker(DynamicDirectory<T> directory, Cluster cluster, Registry registry, Class<T> type) {//注册中心操作类directory.setRegistry(registry);//设置协议,Protocol$Adaptivedirectory.setProtocol(protocol);// all attributes of REFER_KEY 消费者服务引用参数Map<String, String> parameters = new HashMap<>(directory.getConsumerUrl().getParameters());//消费者信息转消费者注册信息urlURL urlToRegistry = new ServiceConfigURL(//获取protocol属性,只调用指定协议的服务提供方,其它协议忽略,默认值consumerparameters.get(PROTOCOL_KEY) == null ? CONSUMER : parameters.get(PROTOCOL_KEY),//消费者ipparameters.remove(REGISTER_IP_KEY),//端口0,//服务接口路径getPath(parameters, type),//服务引用参数parameters);urlToRegistry = urlToRegistry.setScopeModel(directory.getConsumerUrl().getScopeModel());urlToRegistry = urlToRegistry.setServiceModel(directory.getConsumerUrl().getServiceModel());//是否应该注册,默认trueif (directory.isShouldRegister()) {//设置注册的消费者urldirectory.setRegisteredConsumerUrl(urlToRegistry);/** 1 消费者注册信息url注册到注册中心*/registry.register(directory.getRegisteredConsumerUrl());}/** 2 构建服务路由器链*/directory.buildRouterChain(urlToRegistry);/** 3 服务发现并订阅服务*/directory.subscribe(toSubscribeUrl(urlToRegistry));/** 4 集群容错包装*/return (ClusterInvoker<T>) cluster.join(directory, true);
}

4.1 register注册应用级消费者信息

该方法的源码我们在此前学习provider****导出服务并且应用级服务注册到注册中心的时候就讲过了,即注册应用级别服务消费者和提供者信息是同一个方法。

与之前讲的接口级服务引入的注册不同的是,应用级服务引入的服务消费者url将不会注册到注册中心,这样减轻了注册中心的压力。

/*** ServiceDiscoveryRegistry的方法* * @param url  Registration information , is not allowed to be empty, e.g: dubbo://10.20.153.10/org.apache.dubbo.foo.BarService?version=1.0.0&application=kylin*/
@Override
public final void register(URL url) {//只注册提供者,如果是消费者url直接返回if (!shouldRegister(url)) { // Should Not Registerreturn;}//执行注册doRegister(url);
}

4.2 subscribe应用级服务发现和订阅

该方法首先将当前RegistryDirectory实例加入到节点目录变化的回调通知监听器集合中,用以接收通知。随后调用父类DynamicDirectory的subscribe方法订阅服务。

/*** ServiceDiscoveryRegistryDirectory的方法** 应用级服务发现并订阅服务** @param url 服务消费者url*/
@Override
public void subscribe(URL url) {//获取enable-configuration-listen属性,即是否支持配置监听,默认trueif (moduleModel.getModelEnvironment().getConfiguration().convert(Boolean.class, Constants.ENABLE_CONFIGURATION_LISTEN, true)) {//设置为trueenableConfigurationListen = true;//将当前ServiceDiscoveryRegistryDirectory加入到节点目录变化的回调通知监听器集合中getConsumerConfigurationListener(moduleModel).addNotifyListener(this);//引用配置监听器referenceConfigurationListener = new ReferenceConfigurationListener(this.moduleModel, this, url);} else {enableConfigurationListen = false;}//调用父类DynamicDirectory的subscribe方法super.subscribe(url);
}

DynamicDirectory的subscribe方法如下,可以看到最终还是依靠ServiceDiscoveryRegistry#subscribe方法实现应用级服务订阅的。

/*** DynamicDirectory的方法* 订阅服务** @param url 服务消费者url*/
public void subscribe(URL url) {//设置subscribeUrl属性setSubscribeUrl(url);//调用registry注册中心的subscribe方法实现服务订阅registry.subscribe(url, this);
}

4.2.1 ServiceDiscoveryRegistry#subscribe应用级服务订阅

ServiceDiscoveryRegistry实现了该方法,而ZookeeperRegistry没有实现改方法。

  1. 首先调用!shouldSubscribe方法判断是否不应该订阅,内部调用的!shouldRegister方法。也就是说应用级服务提供者只会注册不会订阅,而应用级的服务消费者只会订阅不会注册。
  2. 然后调用doSubscribe方法对应用级的服务消费者执行服务订阅。
/*** ServiceDiscoveryRegistry的方法** @param url      订阅者url* @param listener 通知监听器*/
@Override
public final void subscribe(URL url, NotifyListener listener) {//是否不应该订阅,内部调用的shouldRegister方法//也就是说应用级服务提供者只会注册不会订阅,而应用级的服务消费者只会订阅不会注册if (!shouldSubscribe(url)) { // Should Not Subscribereturn;}//应用级的服务消费者执行服务订阅doSubscribe(url, listener);
}

4.3 ServiceDiscoveryRegistry#doSubscribe执行服务发现订阅

doSubscribe方法执行应用级服务发现订阅,大概步骤为:

  1. 通过ZookeeperServiceDiscovery#subscribe方法执行应用级服务发现订阅。这里仅仅是将订阅者url加入到metadataInfo的subscribedServiceURLs缓存中,没有真正的订阅操作。
  2. 通serviceNameMapping#getAndListen从元数据中心获取当前服务接口映射的服务提供者服务名列表。在应用级服务提供者启动过程中,会将服务接口到服务名的映射关系发布到远程元数据中心。
  3. 调用subscribeURLs方法,继续执行服务url订阅,这里才会执行真正的订阅逻辑,将会根据服务名去注册中心找到服务实例,然后对服务实例分组并通过rpc调用服务实例的MetaDataService服务获取服务元数据。最后构建服务实例url,notify通知ServiceDiscoveryRegistryDirectory,进一步的创建invoker。
/*** ServiceDiscoveryRegistry的方法* <p>* 执行服务发现订阅** @param url      订阅者url* @param listener 通知监听器,ServiceDiscoveryRegistryDirectory*/@Overridepublic void doSubscribe(URL url, NotifyListener listener) {url = addRegistryClusterKey(url);/** 1 通过ZookeeperServiceDiscoveryy#subscribe方法执行应用级服务发现订阅* 这里仅仅是将订阅者url加入到metadataInfo的subscribedServiceURLs缓存中,没有真正的订阅操作。*/serviceDiscovery.subscribe(url, listener);boolean check = url.getParameter(CHECK_KEY, false);/** 2 从元数据中心获取当前服务接口映射的服务提供者服务名列表** 在应用级服务提供者启动过程中,会将服务接口到服务名的映射关系发布到远程元数据中心*///构建服务接口到服务名的映射关系key,就是serviceInterface,即服务接口全路径名String key = ServiceNameMapping.buildMappingKey(url);//获取对应的锁,每一个key对应一把锁Lock mappingLock = serviceNameMapping.getMappingLock(key);try {//加锁mappingLock.lock();/** 尝试从缓存获取需要订阅的服务名列表,作为初始化服务名列表,默认null* 这是一个Dubbo实现的LUR缓存,默认最多10000个mapping缓存,原理是很简单的继承LinkedHashMap的方式*/Set<String> subscribedServices = serviceNameMapping.getCachedMapping(url);try {//服务映射监听器MappingListener mappingListener = new DefaultMappingListener(url, subscribedServices, listener);/** 核心方法* 根据注册中心协议url,服务消费者url,服务映射监听器,从元数据中心获取当前服务接口对应的服务提供者服务名列表*/subscribedServices = serviceNameMapping.getAndListen(this.getUrl(), url, mappingListener);//存入监听器缓存mappingListeners.put(url.getProtocolServiceKey(), mappingListener);} catch (Exception e) {logger.warn("Cannot find app mapping for service " + url.getServiceInterface() + ", will not migrate.", e);}if (CollectionUtils.isEmpty(subscribedServices)) {logger.info("No interface-apps mapping found in local cache, stop subscribing, will automatically wait for mapping listener callback: " + url);
//                if (check) {
//                    throw new IllegalStateException("Should has at least one way to know which services this interface belongs to, subscription url: " + url);
//                }return;}/** 3 应用级别的服务url订阅** 这里才会执行真正的订阅*/subscribeURLs(url, listener, subscribedServices);} finally {mappingLock.unlock();}}

4.3.1 ZookeeperServiceDiscovery#subscribe基于zk的应用级服务订阅

该方法是父类AbstractServiceDiscovery实现的,内部调用metadataInfo的addService方法。

我们此前学习应用级服务注册的时候,ServiceDiscoveryRegistry#register方法同样是依靠父类AbstractServiceDiscovery实现,其内部也是调用的,内部调用metadataInfo的addService方法。

/*** AbstractServiceDiscovery的方法** 执行应用级订阅** @param url      订阅者url* @param listener 通知监听器,ServiceDiscoveryRegistryDirectory*/
@Override
public void subscribe(URL url, NotifyListener listener) {//添加订阅urlmetadataInfo.addSubscribedURL(url);
}

addSubscribedURL方法实际上仅仅将订阅者url加入到subscribedServiceURLs缓存map中就结束了,key为serviceKey,规则为{group}/{interfaceName}:{version},除此之外没有其他的操作。

/*** MetadataInfo的方法* 添加订阅url* @param url 订阅者url*/
public synchronized void addSubscribedURL(URL url) {//第一次添加时初始化subscribedServiceURLs集合,这里使用的是一个跳表if (subscribedServiceURLs == null) {subscribedServiceURLs = new ConcurrentSkipListMap<>();}//将url加入到subscribedServiceURLs集合addURL(subscribedServiceURLs, url);
}private boolean addURL(Map<String, SortedSet<URL>> serviceURLs, URL url) {//加入到serviceURLs集合, key为{group}/{interfaceName}:{version}SortedSet<URL> urls = serviceURLs.computeIfAbsent(url.getServiceKey(), this::newSortedURLs);// make sure the parameters of tmpUrl is variablereturn urls.add(url);
}

4.3.2 MetadataServiceNameMapping#getAndListen获取并订阅服务映射信息

我们此前在应用级服务提供者启动过程中讲过,在最后会将服务接口到服务名的映射关系发布到远程元数据中心。

而在应用级别消费者启动过程中,在引用服务的时候也会根据接口(服务接口到服务名的映射关系key,就是serviceInterface,即服务接口全路径名)去元数据中心查找当前要引入的服务接口对应的服务提供者服务名列表,并且还会进行订阅,当服务映射数据变更时会更新内存数据。

该方法的大概步骤为:

  1. 首先尝试从本地LUR缓存中获取mapping,如果没有获取到,那么将会创建一个异步监听器AsyncMappingTask,主动调用call方法同步拉取元数据中心的mapping映射信息,获取的数据不会通知监听器DefaultMappingListener立即更新缓存。
  2. 如果缓存已存在,那么将会创建一个异步监听器AsyncMappingTask提交到线程池,异步的获取的数据,并且会通知监听器DefaultMappingListener更新缓存。
/*** AbstractServiceNameMapping的方法** 获取并监听服务映射** @param registryURL   注册中心协议url* @param subscribedURL 服务消费者url* @param listener      通知监听器DefaultMappingListener,内部含有ServiceDiscoveryRegistryDirectory* @return 构建服务接口到服务名的映射关系key,就是serviceInterface,即服务接口全路径名*/
@Override
public Set<String> getAndListen(URL registryURL, URL subscribedURL, MappingListener listener) {//构建服务接口到服务名的映射关系key,就是serviceInterface,即服务接口全路径名String key = ServiceNameMapping.buildMappingKey(subscribedURL);//首先从本地LUR缓存中获取mappingSet<String> mappingServices = this.getCachedMapping(key);// Asynchronously register listener in case previous cache does not exist or cache expired.//如果缓存不存在if (CollectionUtils.isEmpty(mappingServices)) {try {logger.info("Local cache mapping is empty");/** 创建一个异步监听器,主动调用call方法同步拉取元数据中心的mapping映射信息,获取的数据不会通知监听器DefaultMappingListener*/mappingServices = (new AsyncMappingTask(listener, subscribedURL, false)).call();} catch (Exception e) {// ignore}//如果注册中心数据不存在if (CollectionUtils.isEmpty(mappingServices)) {//从url获取subscribed-services参数,表示手动指定的需要订阅的服务名,以,分割多个值String registryServices = registryURL.getParameter(SUBSCRIBED_SERVICE_NAMES_KEY);if (StringUtils.isNotEmpty(registryServices)) {logger.info(subscribedURL.getServiceInterface() + " mapping to " + registryServices + " instructed by registry subscribed-services.");//以,分割多个值mappingServices = parseServices(registryServices);}}//如果找到了mapping数据,那么存入LRU缓存中if (CollectionUtils.isNotEmpty(mappingServices)) {this.putCachedMapping(key, mappingServices);}}//如果存在本地缓存else {//获取异步mapping任务执行器mappingRefreshingExecutorExecutorService executorService = applicationModel.getFrameworkModel().getBeanFactory().getBean(FrameworkExecutorRepository.class).getMappingRefreshingExecutor();//创建一个异步监听器提交到线程池,获取的数据将会通知监听器DefaultMappingListener更新缓存executorService.submit(new AsyncMappingTask(listener, subscribedURL, true));}return mappingServices;
}
4.3.2.1 AsyncMappingTask#call从元数据中心获取mapping

AsyncMappingTask#call方法将会从元数据中心远程拉取服务映射信息。实际上内部仍然是调用MetadataServiceNameMapping的另一个重载的getAndListen方法从元数据中心获取服务接口到服务名的映射信息。

/*** AsyncMappingTask的方法** 从元数据中心远程拉取服务映射信息*/
@Override
public Set<String> call() throws Exception {synchronized (mappingListeners) {//构建空集合Set<String> mappedServices = emptySet();try {//key,目前是仅仅是服务接口全路径名String mappingKey = ServiceNameMapping.buildMappingKey(subscribedURL);//如果监听器不为空if (listener != null) {//调用getAndListen方法元数据中心远程拉取服务映射信息并且注册监听mappedServices = toTreeSet(getAndListen(subscribedURL, listener));//讲key和对应的listener加入到外部类的mappingListeners映射中Set<MappingListener> listeners = mappingListeners.computeIfAbsent(mappingKey, _k -> new HashSet<>());listeners.add(listener);//是否立即通知监听器,if (CollectionUtils.isNotEmpty(mappedServices)) {if (notifyAtFirstTime) {// guarantee at-least-once notification no matter what kind of underlying meta server is used.// listener notification will also cause updating of mapping cache.//通知更新缓存listener.onEvent(new MappingChangedEvent(mappingKey, mappedServices));}}} else {//没有监听器的情况mappedServices = get(subscribedURL);//直接存入缓存if (CollectionUtils.isNotEmpty(mappedServices)) {AbstractServiceNameMapping.this.putCachedMapping(mappingKey, mappedServices);}}} catch (Exception e) {logger.error("Failed getting mapping info from remote center. ", e);}return mappedServices;}
}
4.3.2.2 MetadataServiceNameMapping#getAndListen获取并监听服务映射信息

metadataReport.getServiceAppMapping方法从元数据中心远程拉取获取服务接口对应的服务名映射集合并且注册监听。

/*** MetadataServiceNameMapping的方法** 从元数据中心远程拉取服务映射信息并且注册监听** @param url 消费者url* @param mappingListener 监听器MappingListener* @return 服务映射信息*/
@Override
public Set<String> getAndListen(URL url, MappingListener mappingListener) {//服务接口String serviceInterface = url.getServiceInterface();// randomly pick one metadata report is ok for it's guaranteed all metadata report will have the same mapping data. //随机获取一个配置的注册中心idString registryCluster = getRegistryCluster(url);//首先获取注册中心id对应的元数据中心实例,如果没有则从metadataReports列表获取第一个元数据中心实例MetadataReport metadataReport = metadataReportInstance.getMetadataReport(registryCluster);if (metadataReport == null) {return Collections.emptySet();}//从元数据中心获取服务接口对应的服务名映射集合return metadataReport.getServiceAppMapping(serviceInterface, mappingListener, url);
}

获取元数据中实例时,首先获取随机注册中心id对应的元数据中心实例,如果没有则从metadataReports列表获取第一个元数据中心实例。

/*** MetadataReportInstance的方法** @param registryKey 注册中心id* @return 元数据中心实例*/
public MetadataReport getMetadataReport(String registryKey) {//首先获取注册中心id对应的元数据中心实例MetadataReport metadataReport = metadataReports.get(registryKey);//如果没有则从metadataReports列表获取第一个元数据中心实例if (metadataReport == null && metadataReports.size() > 0) {metadataReport = metadataReports.values().iterator().next();}return metadataReport;
}
4.3.2.3 ZookeeperMetadataReport#getServiceAppMapping获取服务映射

如果元数据中心是zookeeper,那么对应ZookeeperMetadataReport。他的getServiceAppMapping方法很简单,构建节点路径默认 dubbo/mapping/{serviceKey},例如/dubbo/mapping/org.apache.dubbo.demo.DemoService,然后注册监听器监听该节点的变化,最后获取获取节点的内容,也就是服务名映射字符串,并且通过,拆分为set集合。

/*** ZookeeperMetadataReport的方法* <p>* 从元数据中心远程拉取获取服务接口对应的服务名映射集合并且注册监听** @param serviceKey 服务接口* @param listener   监听器MappingListener* @param url        消费者url* @return 服务接口对应的服务名映射集合*/
@Override
public Set<String> getServiceAppMapping(String serviceKey, MappingListener listener, URL url) {//构建节点路径,默认 dubbo/mapping/{serviceKey},例如String path = buildPathKey(DEFAULT_MAPPING_GROUP, serviceKey);//监听节点变化MappingDataListener mappingDataListener = casListenerMap.computeIfAbsent(path, _k -> {MappingDataListener newMappingListener = new MappingDataListener(serviceKey, path);//添加监听器zkClient.addDataListener(path, newMappingListener);return newMappingListener;});mappingDataListener.addListener(listener);//获取节点的内容,也就是服务名映射字符串,通过,拆分为set集合return getAppNames(zkClient.getContent(path));
}

我们在应用级服务提供者注册的时候就讲过了服务映射数据在元数据中心的样子,现在再来看看。


可以很明显的看出来所谓的服务映射是什么意思,也就是一个服务接口到对应的服务名的关系节点。有了服务映射,那么应用级消费者就能通过服务接口来查询对应的服务应用名了,这在consumer应用级服务发现的时候很有用。

5 总结

本次我们学习了Dubbo3的应用级服务发现订阅refreshServiceDiscoveryInvoker方法的源码,下文我们将会学习应用级服务发现订阅后半部分的源码,即在获取到服务应用名之后,通过subscribeURLs方法进行应用级别的服务url订阅的源码。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/64432.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Qt Creator 为同一个项目切换不同的构建套件(Kit)

如下图所示&#xff0c;我只有一个构建套件&#xff1a; 切换构建套件(Kit)的步骤如下&#xff1a; 选中上图中的步骤②后&#xff0c;可以看到如下图所示的结果&#xff0c;构建套件就已经添加成功了&#xff1a; 此时&#xff0c;我们可以自由选择使用哪一个构建套件。 如…

vue3实现商城系统详情页(前端实现)

目录 写在前面 预览 实现 图片部分 详情部分 代码 源码地址 总结 写在前面 笔者不是上一个月毕业了么&#xff1f;找工作没找到&#xff0c;准备在家躺平两个月。正好整理一下当时的毕业设计&#xff0c;是一个商城系统。还是写篇文章记录下吧 预览 商品图片切换显示…

Java深拷贝和浅拷贝区别?

大家好&#xff0c;我是锋哥。今天分享关于【Java深拷贝和浅拷贝区别?】面试题。希望对大家有帮助&#xff1b; Java深拷贝和浅拷贝区别? 1000道 互联网大厂Java工程师 精选面试题-Java资源分享网 在Java中&#xff0c;深拷贝&#xff08;Deep Copy&#xff09;和浅拷贝&am…

React 第十七节 useMemo用法详解

概述 useMemo 是React 中的一个HOOK&#xff0c;用于根据依赖在每次渲染时候缓存计算结果&#xff1b; 大白话就是&#xff0c;只有依赖项发生变化时候&#xff0c;才会重新渲染为新计算的值&#xff0c;否则就还是取原来的值&#xff0c;有点类似 vue 中的 computed 计算属性…

全国数据资源入表年度发展报告(2024)(附下载)

近日&#xff0c;在“数据要素暨第二届数据资产价值大会”上&#xff0c;青岛、潍坊、湖州、广西等地的数据资产登记评价中心&#xff0c;联合发布了《全国数据资源入表年度发展报告&#xff08;2024&#xff09;》。 报告内容包括全国数据资源入表的总体发展概述、政策指引、…

【Mybatis】MyBatis 探秘:#{} 与 ${} 参传差异解码,数据库连接池筑牢数据交互根基

前言 &#x1f31f;&#x1f31f;本期讲解关于Spring IOC&DI的详细介绍~~~ &#x1f308;感兴趣的小伙伴看一看小编主页&#xff1a;GGBondlctrl-CSDN博客 &#x1f525; 你的点赞就是小编不断更新的最大动力 &#x1f386;那么…

解锁 draw.io 流程图制作工具的强大功能与应用(1/2)

一、draw.io 简介 &#xff08;一&#xff09;基本概述 draw.io 是一款由 JGraph 公司开发的基于网页的在线图表绘制工具。它最大的优势之一就是无需进行繁琐的下载和安装步骤&#xff0c;只要打开浏览器&#xff0c;访问其官网&#xff0c;就能立即开始使用。无论是在 Window…

ai论文写作免费平台:五款AI论文写作辅助工具的对比与分析

随着人工智能技术的飞速发展&#xff0c;越来越多的AI工具应运而生&#xff0c;为学术写作带来了前所未有的便利。本文将对千笔AI论文、笔灵AI论文、Smodin、以及Notion AI等五款AI论文写作辅助工具进行全面的对比与分析&#xff0c;以帮助用户更好地了解这些工具的优势和特点&…

三、基于langchain使用Qwen搭建金融RAG问答机器人--检索增强生成

经过前面2节数据准备后&#xff0c;现在来构建检索 加载向量数据库 from langchain.vectorstores import Chroma from langchain_huggingface import HuggingFaceEmbeddings import os# 定义 Embeddings embeddings HuggingFaceEmbeddings(model_name"m3e-base")#…

数据仓库工具箱—读书笔记02(Kimball维度建模技术概述02、事实表技术基础)

Kimball维度建模技术概述 记录一下读《数据仓库工具箱》时的思考&#xff0c;摘录一些书中关于维度建模比较重要的思想与大家分享&#x1f923;&#x1f923;&#x1f923; 第二章前言部分作者提到&#xff1a;技术的介绍应该通过涵盖各种行业的熟悉的用例展开&#xff08;赞同…

fabric.js

目录 一、在canvas上画简单的图形 二、在canvas上用路径(Path)画不规则图形 三、在canvas上插入图片并设置旋转属性(angle) 四、让元素动起来(animate) 五、图像过滤器(filters)让图片多姿多彩 六、颜色模式(Color)和相互转换(toRgb、toHex) 七、对图形的渐变填充(Gradi…

Liinux下VMware Workstation Pro的安装,建议安装最新版本17.61

建议安装最新版本17.61&#xff0c;否则可能有兼容性问题 下载VMware Workstation安装软件 从官网网站下载 https://support.broadcom.com/group/ecx/productdownloads?subfamilyVMwareWorkstationPro 选择所需版本 现在最新版本是17.61&#xff0c;否则可能有兼容性问题…

压力测试Jmeter简介

前提条件&#xff1a;要安装JDK 若不需要了解&#xff0c;请直接定位到左侧目录的安装环节。 1.引言 在现代软件开发中&#xff0c;性能和稳定性是衡量系统质量的重要指标。为了确保应用程序在高负载情况下仍能正常运行&#xff0c;压力测试变得尤为重要。Apache JMeter 是一…

前端的知识(部分)

11 前端的编写步骤 第一步:在HTML的页面中声明方法 第二步:在<script>中定义一个函数,其中声明一个data来为需要的数据 赋值一个初始值 第三步:编写这个方法实现对应的功能

LSTM详解

1. LSTM设计 LSTM(长短期记忆网络)详解 长短期记忆网络(LSTM, Long Short-Term Memory) 是一种特殊的循环神经网络(RNN),特别适合处理和预测序列数据中的长时间依赖关系。LSTM 通过引入“门机制”(如输入门、遗忘门、输出门)来解决标准 RNN 在长时间序列任务中梯度消…

我在广州学 Mysql 系列之 数据类型和运算符详解

ℹ️大家好&#xff0c;我是&#x1f606;练小杰&#xff0c;今天主要学习 Mysql的数据类型以及运算符操作~~ 上周五学习了“Mysql 系列之 数据“表”的基本操作”~ 想要了解更多&#x1f236;️MYSQL 数据库的命令行总结&#xff01;&#xff01;&#xff01; “我是你的敌人,…

python 配置 oracle instant client

1.问题描述 想用python连接oracle数据库&#xff0c;百度得知需要cx_Oracle这个第三方库 import cx_Oracle# 设置Oracle数据源名称 dsn cx_Oracle.makedsn(host, port, service_nameservice_name)# 创建数据库连接 connection cx_Oracle.connect(userusername, passwordpas…

使用FastGPT制做一个AI网站日志分析器

越来越的多网站面临每天上千次的扫描和各类攻击&#xff0c;及时发现攻击IP&#xff0c;并有效的屏蔽不良访问成为网站安全的重要保障&#xff0c;这里我们使用AI来完成对网站日志的日常分析。 我们来使用FastGPT来制做一个AI网站日志析器&#xff0c;下面就开始&#xff1a; …

RabbitMQ中的Work Queues模式

在现代分布式系统中&#xff0c;消息队列&#xff08;Message Queue&#xff09;是实现异步通信和解耦系统的关键组件之一。RabbitMQ 是一个广泛使用的开源消息代理软件&#xff0c;支持多种消息传递模式。其中&#xff0c;Work Queues&#xff08;工作队列&#xff09;模式是一…

【Python爬虫系列】_032.Scrapy_全站爬取

课 程 推 荐我 的 个 人 主 页:👉👉 失心疯的个人主页 👈👈入 门 教 程 推 荐 :👉👉 Python零基础入门教程合集 👈👈虚 拟 环 境 搭 建 :👉👉 Python项目虚拟环境(超详细讲解) 👈👈PyQt5 系 列 教 程:👉👉 Python GUI(PyQt5)教程合集 👈👈