1 Directory目录概述
Directory代表多个invoker,其内部维护了一个list,并且这个list的内容是动态变化的(对于消费端来说,每个invoker代表一个服务提供者)。
在Dubbo中,RegistryDirectory和StaticDirectory都是Directory的实现类。
RegistryDirectory是一个动态服务目录,可以感知注册中心配置的变化,其持有的Invoker列表会随着注册中心内容的变化而变化。每次变化后,RegistryDirectory都会动态地增删Invoker,并调用Router的route方法进行路由,过滤掉不符合路由规则的Invoker。相反,StaticDirectory是一个静态服务目录,它内部存放的Invoker是不会变动的。
RegistryDirectory是Dubbo中默认使用的Directory,适用于服务提供者和消费者都动态变化的情况。而StaticDirectory适用于服务提供者和消费者相对固定,不需要频繁变动的场景。
2 RegistryDirectory的创建
RegistryDirectory是在服务消费端启动时创建的。
消费端启动时,通过 ReferenceConfig#get() 创建对服务提供方的远程调用代理类。最终在通过RegistryProtocol#refer() 创建invoker时创建了RegistryDirectory。具体实现细节如下所示。
public T get(boolean check) {// ...return ref;} protected synchronized void init(boolean check) {// ...// 创建对服务提供方的远程调用代理类ref = createProxy(referenceParameters);// ...}private T createProxy(Map<String, String> referenceParameters) {// ...// 创建invokercreateInvoker();// ...// create service proxyreturn (T) proxyFactory.getProxy(invoker, ProtocolUtils.isGeneric(generic));}private void createInvoker() {if (urls.size() == 1) {URL curUrl = urls.get(0);invoker = protocolSPI.refer(interfaceClass, curUrl);// ...} else {List<Invoker<?>> invokers = new ArrayList<>();URL registryUrl = null;for (URL url : urls) {invokers.add(protocolSPI.refer(interfaceClass, url));if (UrlUtils.isRegistry(url)) {// use last registry urlregistryUrl = url;}}// ...}}
创建invoker的核心方法为
invoker = protocolSPI.refer(interfaceClass, curUrl);
服务注册和发现使用是register协议,因此上述方法实际上将调用 RegistryProtocol#refer方法,实现如下所示。
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {url = getRegistryUrl(url);Registry registry = getRegistry(url);if (RegistryService.class.equals(type)) {return proxyFactory.getInvoker((T) registry, type, url);}// group="a,b" or group="*"Map<String, String> qs = (Map<String, String>) url.getAttribute(REFER_KEY);String group = qs.get(GROUP_KEY);if (StringUtils.isNotEmpty(group)) {if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {return doRefer(Cluster.getCluster(url.getScopeModel(), MergeableCluster.NAME), registry, type, url, qs);}}Cluster cluster = Cluster.getCluster(url.getScopeModel(), qs.get(CLUSTER_KEY));// 创建invokerreturn doRefer(cluster, registry, type, url, qs);}protected <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url, Map<String, String> parameters) {// ...// 创建invokerClusterInvoker<T> migrationInvoker = getMigrationInvoker(this, cluster, registry, type, url, consumerUrl);return interceptInvoker(migrationInvoker, url, consumerUrl);}
最终将调用 InterfaceCompatibleRegistryProtocol#getInvoker() 方法创建RegistryDirectory。
// InterfaceCompatibleRegistryProtocol.getInvoker
public <T> ClusterInvoker<T> getInvoker(Cluster cluster, Registry registry, Class<T> type, URL url) {DynamicDirectory<T> directory = new RegistryDirectory<>(type, url);return doCreateInvoker(directory, cluster, registry, type);
}// RegistryProtocol#doCreateInvoker
protected <T> ClusterInvoker<T> doCreateInvoker(DynamicDirectory<T> directory, Cluster cluster, Registry registry, Class<T> type) {directory.setRegistry(registry);directory.setProtocol(protocol);// all attributes of REFER_KEYMap<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());URL urlToRegistry = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);if (directory.isShouldRegister()) {directory.setRegisteredConsumerUrl(urlToRegistry);registry.register(directory.getRegisteredConsumerUrl());}// 1、建立路由规则链directory.buildRouterChain(urlToRegistry);// 2、订阅服务提供者地址,生成invokerdirectory.subscribe(toSubscribeUrl(urlToRegistry));// 3、包装机器容错策略到invokerreturn (ClusterInvoker<T>) cluster.join(directory);
}
3 RegistryDirectory中invoker列表的更新
创建完RegistryDirectory后会调用subscribe()方法订阅需要调用的服务提供者的地址列表。主要操作如下。
(1)假设使用的服务注册中心为Zookeeper,则会调用Zookeeper的subscribe()方法去Zookeeper订阅服务提供者的地址列表,并且创建一个监听器。
(2)当Zookeeper服务端发现服务提供者的地址列表发生变化后,zkClient会回调该监听器的notify()方法,推送服务提供者的地址列表,刷新RegistryDirectory中的invoker列表。
(3)服务消费端启动时则是创建完监听器后,同步调用notify()方法,刷新RegistryDirectory中的invoker列表。
具体实现细节如下所示。
// org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doSubscribe
public void doSubscribe(final URL url, final NotifyListener listener) {try {checkDestroyed();if (ANY_VALUE.equals(url.getServiceInterface())) {// ...zkClient.create(root, false, true);List<String> services = zkClient.addChildListener(root, zkListener);// ...} else {CountDownLatch latch = new CountDownLatch(1);try {List<URL> urls = new ArrayList<>();// 创建监听器for (String path : toCategoriesPath(url)) {ConcurrentMap<NotifyListener, ChildListener> listeners = ConcurrentHashMapUtils.computeIfAbsent(zkListeners, url, k -> new ConcurrentHashMap<>());ChildListener zkListener = ConcurrentHashMapUtils.computeIfAbsent(listeners, listener, k -> new RegistryChildListenerImpl(url, k, latch));if (zkListener instanceof RegistryChildListenerImpl) {((RegistryChildListenerImpl) zkListener).setLatch(latch);}// create "directories".zkClient.create(path, false, true);// Add children (i.e. service items).List<String> children = zkClient.addChildListener(path, zkListener);if (children != null) {// The invocation point that may cause 1-1.urls.addAll(toUrlsWithEmpty(url, path, children));}}// 回调方法notify(url, listener, urls);} finally {// tells the listener to run only after the sync notification of main thread finishes.latch.countDown();}}} catch (Throwable e) {throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);}
}// org.apache.dubbo.registry.support.FailbackRegistry#notify
protected void notify(URL url, NotifyListener listener, List<URL> urls) {if (url == null) {throw new IllegalArgumentException("notify url == null");}if (listener == null) {throw new IllegalArgumentException("notify listener == null");}try {doNotify(url, listener, urls);} catch (Exception t) {// Record a failed registration request to a failed listlogger.error(REGISTRY_FAILED_NOTIFY_EVENT, "", "", "Failed to notify addresses for subscribe " + url + ", cause: " + t.getMessage(), t);}
}protected void doNotify(URL url, NotifyListener listener, List<URL> urls) {super.notify(url, listener, urls);
}// org.apache.dubbo.registry.support.AbstractRegistry#notify
protected void notify(URL url, NotifyListener listener, List<URL> urls) {// ...Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());for (Map.Entry<String, List<URL>> entry : result.entrySet()) {String category = entry.getKey();List<URL> categoryList = entry.getValue();categoryNotified.put(category, categoryList);// 主要方法listener.notify(categoryList);if (localCacheEnabled) {saveProperties(url);}}
}// org.apache.dubbo.registry.integration.RegistryDirectory#notify
public synchronized void notify(List<URL> urls) {// ...refreshOverrideAndInvoker(providerURLs);
}protected synchronized void refreshOverrideAndInvoker(List<URL> urls) {// mock zookeeper://xxx?mock=return nullthis.directoryUrl = overrideWithConfigurator(getOriginalConsumerUrl());refreshInvoker(urls);
}
刷新invoker列表缓存(urlInvokerMap)的最终实现细节如下所示
protected volatile Map<URL, Invoker<T>> urlInvokerMap;// RegistryDirectory#refreshInvoker
private void refreshInvoker(List<URL> invokerUrls) {// ...// use local reference to avoid NPE as this.urlInvokerMap will be set null concurrently at destroyAllInvokers().Map<URL, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap;// can't use local reference as oldUrlInvokerMap's mappings might be removed directly at toInvokers().Map<URL, Invoker<T>> oldUrlInvokerMap = null;if (localUrlInvokerMap != null) {// the initial capacity should be set greater than the maximum number of entries divided by the load factor to avoid resizing.oldUrlInvokerMap = new LinkedHashMap<>(Math.round(1 + localUrlInvokerMap.size() / DEFAULT_HASHMAP_LOAD_FACTOR));localUrlInvokerMap.forEach(oldUrlInvokerMap::put);}// 刷新invoker列表缓存-urlInvokerMapMap<URL, Invoker<T>> newUrlInvokerMap = toInvokers(oldUrlInvokerMap, invokerUrls);// Translate url list to Invoker map// ...
}private Map<URL, Invoker<T>> toInvokers(Map<URL, Invoker<T>> oldUrlInvokerMap, List<URL> urls) {Map<URL, Invoker<T>> newUrlInvokerMap = new ConcurrentHashMap<>(urls == null ? 1 : (int) (urls.size() / 0.75f + 1));if (urls == null || urls.isEmpty()) {return newUrlInvokerMap;}String queryProtocols = this.queryMap.get(PROTOCOL_KEY);for (URL providerUrl : urls) {if (!checkProtocolValid(queryProtocols, providerUrl)) {continue;}URL url = mergeUrl(providerUrl);// Cache key is url that does not merge with consumer side parameters,// regardless of how the consumer combines parameters,// if the server url changes, then refer againInvoker<T> invoker = oldUrlInvokerMap == null ? null : oldUrlInvokerMap.remove(url);if (invoker == null) { // Not in the cache, refer againtry {boolean enabled = true;if (url.hasParameter(DISABLED_KEY)) {enabled = !url.getParameter(DISABLED_KEY, false);} else {enabled = url.getParameter(ENABLED_KEY, true);}if (enabled) {invoker = protocol.refer(serviceType, url);}} catch (Throwable t) {// Thrown by AbstractProtocol.optimizeSerialization()if (t instanceof RpcException && t.getMessage().contains("serialization optimizer")) {// 4-2 - serialization optimizer class initialization failed.logger.error(PROTOCOL_FAILED_INIT_SERIALIZATION_OPTIMIZER, "typo in optimizer class", "","Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);} else {// 4-3 - Failed to refer invoker by other reason.logger.error(PROTOCOL_FAILED_REFER_INVOKER, "", "","Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);}}if (invoker != null) { // Put new invoker in cachenewUrlInvokerMap.put(url, invoker);}} else {newUrlInvokerMap.put(url, invoker);}}return newUrlInvokerMap;
}