服务消费端Directory目录的创建与更新

1 Directory目录概述

Directory代表多个invoker,其内部维护了一个list,并且这个list的内容是动态变化的(对于消费端来说,每个invoker代表一个服务提供者)。

在Dubbo中,RegistryDirectory和StaticDirectory都是Directory的实现类。

RegistryDirectory是一个动态服务目录,可以感知注册中心配置的变化,其持有的Invoker列表会随着注册中心内容的变化而变化。每次变化后,RegistryDirectory都会动态地增删Invoker,并调用Router的route方法进行路由,过滤掉不符合路由规则的Invoker。相反,StaticDirectory是一个静态服务目录,它内部存放的Invoker是不会变动的。

RegistryDirectory是Dubbo中默认使用的Directory,适用于服务提供者和消费者都动态变化的情况。而StaticDirectory适用于服务提供者和消费者相对固定,不需要频繁变动的场景。

2 RegistryDirectory的创建

RegistryDirectory是在服务消费端启动时创建的。

消费端启动时,通过 ReferenceConfig#get() 创建对服务提供方的远程调用代理类。最终在通过RegistryProtocol#refer() 创建invoker时创建了RegistryDirectory。具体实现细节如下所示。

     public T get(boolean check) {// ...return ref;}   protected synchronized void init(boolean check) {// ...// 创建对服务提供方的远程调用代理类ref = createProxy(referenceParameters);// ...}private T createProxy(Map<String, String> referenceParameters) {// ...// 创建invokercreateInvoker();// ...// create service proxyreturn (T) proxyFactory.getProxy(invoker, ProtocolUtils.isGeneric(generic));}private void createInvoker() {if (urls.size() == 1) {URL curUrl = urls.get(0);invoker = protocolSPI.refer(interfaceClass, curUrl);// ...} else {List<Invoker<?>> invokers = new ArrayList<>();URL registryUrl = null;for (URL url : urls) {invokers.add(protocolSPI.refer(interfaceClass, url));if (UrlUtils.isRegistry(url)) {// use last registry urlregistryUrl = url;}}// ...}}

创建invoker的核心方法为 

invoker = protocolSPI.refer(interfaceClass, curUrl);

服务注册和发现使用是register协议,因此上述方法实际上将调用 RegistryProtocol#refer方法,实现如下所示。

    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {url = getRegistryUrl(url);Registry registry = getRegistry(url);if (RegistryService.class.equals(type)) {return proxyFactory.getInvoker((T) registry, type, url);}// group="a,b" or group="*"Map<String, String> qs = (Map<String, String>) url.getAttribute(REFER_KEY);String group = qs.get(GROUP_KEY);if (StringUtils.isNotEmpty(group)) {if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {return doRefer(Cluster.getCluster(url.getScopeModel(), MergeableCluster.NAME), registry, type, url, qs);}}Cluster cluster = Cluster.getCluster(url.getScopeModel(), qs.get(CLUSTER_KEY));// 创建invokerreturn doRefer(cluster, registry, type, url, qs);}protected <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url, Map<String, String> parameters) {// ...// 创建invokerClusterInvoker<T> migrationInvoker = getMigrationInvoker(this, cluster, registry, type, url, consumerUrl);return interceptInvoker(migrationInvoker, url, consumerUrl);}

最终将调用 InterfaceCompatibleRegistryProtocol#getInvoker() 方法创建RegistryDirectory。

// InterfaceCompatibleRegistryProtocol.getInvoker
public <T> ClusterInvoker<T> getInvoker(Cluster cluster, Registry registry, Class<T> type, URL url) {DynamicDirectory<T> directory = new RegistryDirectory<>(type, url);return doCreateInvoker(directory, cluster, registry, type);
}// RegistryProtocol#doCreateInvoker
protected <T> ClusterInvoker<T> doCreateInvoker(DynamicDirectory<T> directory, Cluster cluster, Registry registry, Class<T> type) {directory.setRegistry(registry);directory.setProtocol(protocol);// all attributes of REFER_KEYMap<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());URL urlToRegistry = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);if (directory.isShouldRegister()) {directory.setRegisteredConsumerUrl(urlToRegistry);registry.register(directory.getRegisteredConsumerUrl());}// 1、建立路由规则链directory.buildRouterChain(urlToRegistry);// 2、订阅服务提供者地址,生成invokerdirectory.subscribe(toSubscribeUrl(urlToRegistry));// 3、包装机器容错策略到invokerreturn (ClusterInvoker<T>) cluster.join(directory);
}

3 RegistryDirectory中invoker列表的更新

创建完RegistryDirectory后会调用subscribe()方法订阅需要调用的服务提供者的地址列表。主要操作如下。

(1)假设使用的服务注册中心为Zookeeper,则会调用Zookeeper的subscribe()方法去Zookeeper订阅服务提供者的地址列表,并且创建一个监听器。

(2)当Zookeeper服务端发现服务提供者的地址列表发生变化后,zkClient会回调该监听器的notify()方法,推送服务提供者的地址列表,刷新RegistryDirectory中的invoker列表。

(3)服务消费端启动时则是创建完监听器后,同步调用notify()方法,刷新RegistryDirectory中的invoker列表。

具体实现细节如下所示。

// org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doSubscribe
public void doSubscribe(final URL url, final NotifyListener listener) {try {checkDestroyed();if (ANY_VALUE.equals(url.getServiceInterface())) {// ...zkClient.create(root, false, true);List<String> services = zkClient.addChildListener(root, zkListener);// ...} else {CountDownLatch latch = new CountDownLatch(1);try {List<URL> urls = new ArrayList<>();// 创建监听器for (String path : toCategoriesPath(url)) {ConcurrentMap<NotifyListener, ChildListener> listeners = ConcurrentHashMapUtils.computeIfAbsent(zkListeners, url, k -> new ConcurrentHashMap<>());ChildListener zkListener = ConcurrentHashMapUtils.computeIfAbsent(listeners, listener, k -> new RegistryChildListenerImpl(url, k, latch));if (zkListener instanceof RegistryChildListenerImpl) {((RegistryChildListenerImpl) zkListener).setLatch(latch);}// create "directories".zkClient.create(path, false, true);// Add children (i.e. service items).List<String> children = zkClient.addChildListener(path, zkListener);if (children != null) {// The invocation point that may cause 1-1.urls.addAll(toUrlsWithEmpty(url, path, children));}}// 回调方法notify(url, listener, urls);} finally {// tells the listener to run only after the sync notification of main thread finishes.latch.countDown();}}} catch (Throwable e) {throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);}
}// org.apache.dubbo.registry.support.FailbackRegistry#notify
protected void notify(URL url, NotifyListener listener, List<URL> urls) {if (url == null) {throw new IllegalArgumentException("notify url == null");}if (listener == null) {throw new IllegalArgumentException("notify listener == null");}try {doNotify(url, listener, urls);} catch (Exception t) {// Record a failed registration request to a failed listlogger.error(REGISTRY_FAILED_NOTIFY_EVENT, "", "", "Failed to notify addresses for subscribe " + url + ", cause: " + t.getMessage(), t);}
}protected void doNotify(URL url, NotifyListener listener, List<URL> urls) {super.notify(url, listener, urls);
}// org.apache.dubbo.registry.support.AbstractRegistry#notify
protected void notify(URL url, NotifyListener listener, List<URL> urls) {// ...Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());for (Map.Entry<String, List<URL>> entry : result.entrySet()) {String category = entry.getKey();List<URL> categoryList = entry.getValue();categoryNotified.put(category, categoryList);// 主要方法listener.notify(categoryList);if (localCacheEnabled) {saveProperties(url);}}
}// org.apache.dubbo.registry.integration.RegistryDirectory#notify
public synchronized void notify(List<URL> urls) {// ...refreshOverrideAndInvoker(providerURLs);
}protected synchronized void refreshOverrideAndInvoker(List<URL> urls) {// mock zookeeper://xxx?mock=return nullthis.directoryUrl = overrideWithConfigurator(getOriginalConsumerUrl());refreshInvoker(urls);
}

刷新invoker列表缓存(urlInvokerMap)的最终实现细节如下所示

protected volatile Map<URL, Invoker<T>> urlInvokerMap;// RegistryDirectory#refreshInvoker
private void refreshInvoker(List<URL> invokerUrls) {// ...// use local reference to avoid NPE as this.urlInvokerMap will be set null concurrently at destroyAllInvokers().Map<URL, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap;// can't use local reference as oldUrlInvokerMap's mappings might be removed directly at toInvokers().Map<URL, Invoker<T>> oldUrlInvokerMap = null;if (localUrlInvokerMap != null) {// the initial capacity should be set greater than the maximum number of entries divided by the load factor to avoid resizing.oldUrlInvokerMap = new LinkedHashMap<>(Math.round(1 + localUrlInvokerMap.size() / DEFAULT_HASHMAP_LOAD_FACTOR));localUrlInvokerMap.forEach(oldUrlInvokerMap::put);}// 刷新invoker列表缓存-urlInvokerMapMap<URL, Invoker<T>> newUrlInvokerMap = toInvokers(oldUrlInvokerMap, invokerUrls);// Translate url list to Invoker map// ...
}private Map<URL, Invoker<T>> toInvokers(Map<URL, Invoker<T>> oldUrlInvokerMap, List<URL> urls) {Map<URL, Invoker<T>> newUrlInvokerMap = new ConcurrentHashMap<>(urls == null ? 1 : (int) (urls.size() / 0.75f + 1));if (urls == null || urls.isEmpty()) {return newUrlInvokerMap;}String queryProtocols = this.queryMap.get(PROTOCOL_KEY);for (URL providerUrl : urls) {if (!checkProtocolValid(queryProtocols, providerUrl)) {continue;}URL url = mergeUrl(providerUrl);// Cache key is url that does not merge with consumer side parameters,// regardless of how the consumer combines parameters,// if the server url changes, then refer againInvoker<T> invoker = oldUrlInvokerMap == null ? null : oldUrlInvokerMap.remove(url);if (invoker == null) { // Not in the cache, refer againtry {boolean enabled = true;if (url.hasParameter(DISABLED_KEY)) {enabled = !url.getParameter(DISABLED_KEY, false);} else {enabled = url.getParameter(ENABLED_KEY, true);}if (enabled) {invoker = protocol.refer(serviceType, url);}} catch (Throwable t) {// Thrown by AbstractProtocol.optimizeSerialization()if (t instanceof RpcException && t.getMessage().contains("serialization optimizer")) {// 4-2 - serialization optimizer class initialization failed.logger.error(PROTOCOL_FAILED_INIT_SERIALIZATION_OPTIMIZER, "typo in optimizer class", "","Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);} else {// 4-3 - Failed to refer invoker by other reason.logger.error(PROTOCOL_FAILED_REFER_INVOKER, "", "","Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);}}if (invoker != null) { // Put new invoker in cachenewUrlInvokerMap.put(url, invoker);}} else {newUrlInvokerMap.put(url, invoker);}}return newUrlInvokerMap;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/612871.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MySQL从0到1全教程【1】MySQL数据库的基本概念以及MySQL8.0版本的部署

1 MySQL数据库的相关概念 1.1 数据库中的专业术语 1.1.1 数据库 (DB) 数据库是指:保存有组织的数据的容器(通常是一个文数据库 (database)件或一组文件)。 1.1.2 数据库管理系统 (DBMS) 数据库管理系统(DBMS)又称为数据库软件(产品)&#xff0c;用于管理DB中的数据 注意:…

【前端素材】bootstrap5实现美食餐饮网站RegFood

一、需求分析 美食餐饮网站是指专门提供关于美食和餐饮的信息、服务和资源的在线平台。这类网站通常提供以下功能&#xff1a; 餐厅搜索和预订&#xff1a;用户可以在网站上搜索附近的餐厅&#xff0c;并预订桌位。网站会提供餐厅的详细信息&#xff0c;包括菜单、地址、电话号…

LeetCode878. Nth Magical Number

文章目录 一、题目二、题解 一、题目 A positive integer is magical if it is divisible by either a or b. Given the three integers n, a, and b, return the nth magical number. Since the answer may be very large, return it modulo 109 7. Example 1: Input: n …

JavaWeb- Tomcat

一、概念 老规矩&#xff0c;先看维基百科&#xff1a;Apache Tomcat (called "Tomcat" for short) is a free and open-source implementation of the Jakarta Servlet, Jakarta Expression Language, and WebSocket technologies.[2] It provides a "pure Ja…

微信小程序基本使用2:wxs,组件的使用以及弹窗、滚动条

WXS WXS&#xff08;WeiXin Script&#xff09;是小程序的一套脚本语言&#xff0c;结合 WXML&#xff0c;可以构建出页面的结构。 可以在模版中内联少量处理脚本&#xff0c;丰富模板的数据预处理能力。 wsx 在IOS设备上性能是JavaScript的2-20倍 内嵌式 <view><…

SpringBoot+Vue药品ADR不良反应智能监测系统源码

药品不良反应&#xff08;Adverse Drug Reaction&#xff0c;ADR&#xff09;是指合格药品在正常用法用量下出现的与用药目的无关的有害反应&#xff0c;不包括超说明书用药、药品质量问题等导致的不良后果。 ADR智能监测系统开发环境 ❀技术架构&#xff1a;B/S ❀开发语言&…

补充一:C#中的Queue

队列是一种基本的数据结构&#xff0c;按照先进先出&#xff08;FIFO&#xff09;的原则组织元素。在队列中&#xff0c;新元素从队尾入队&#xff0c;而从队头出队&#xff0c;确保了先进入队列的元素首先被处理。这使得队列特别适合模拟排队、任务调度等场景。 在编程中&…

深度剖析Redis:从基础到高级应用

目录 引言 1、 Redis基础 1.1 Redis数据结构 1.1.1 字符串&#xff08;String&#xff09; 1.1.2 列表&#xff08;List&#xff09; 1.1.3 集合&#xff08;Set&#xff09; 1.1.4 散列&#xff08;Hash&#xff09; 1.1.5 有序集合&#xff08;Sorted Set&#xff09;…

常见类型的yaml文件如何编写?--kind: Job|CronJob

本次介绍两个关联度很高的类型&#xff0c;Job和CronJob。 Job基本说明 在 Kubernetes 中&#xff0c;Job 是一种用于运行一次性任务的资源对象。它用于确保在集群内部执行某个任务&#xff0c;即使任务运行失败或其中一个 Pod 发生故障时&#xff0c;也会进行重试。Job 可以…

CRM系统进行市场营销,这些功能可以派上用场。

现如今的企业想要做好营销&#xff0c;不仅仅依赖于一句玄之又玄的slogan亦或是电子邮件的狂轰乱炸。要想做好市场活动营销需要一个前提——那就是CRM管理系统发挥作用的地方。但CRM系统关于营销的功能太多了——对于不太了解的人来说很容易不知所措。那么&#xff0c;CRM系统做…

如何上传苹果ipa安装包?

目录 引言 摘要 第二步&#xff1a;打开appuploader工具 第二步&#xff1a;打开appuploader工具&#xff0c;第二步&#xff1a;打开appuploader工具 第五步&#xff1a;交付应用程序&#xff0c;在iTunes Connect中查看应用程序 总结 引言 在将应用程序上架到苹果应用…

PTA——换硬币

将一笔零钱换成5分、2分和1分的硬币&#xff0c;要求每种硬币至少有一枚&#xff0c;有几种不同的换法&#xff1f; 输入格式: 输入在一行中给出待换的零钱数额x∈(8,100)。 输出格式: 要求按5分、2分和1分硬币的数量依次从大到小的顺序&#xff0c;输出各种换法。每行输出…

四道面试题

一.网络的七层模型 网络的七层模型&#xff0c;也被称为OSI七层协议模型&#xff0c;是一种用于理解和描述网络通信过程的概念模型。这个模型将网络通信过程划分为七个层次&#xff0c;从低到高分别是&#xff1a;物理层、数据链路层、网络层、传输层、会话层、表示层和应用层…

DUET: Cross-Modal Semantic Grounding for Contrastive Zero-Shot Learning论文阅读

文章目录 摘要1.问题的提出引出当前研究的不足与问题属性不平衡问题属性共现问题 解决方案 2.数据集和模型构建数据集传统的零样本学习范式v.s. DUET学习范式DUET 模型总览属性级别对比学习正负样本解释&#xff1a; 3.结果分析VIT-based vision transformer encoder.消融研究消…

【XR806开发板试用】+ FreeRtos开发环境搭建

获取SDK SDK可以通过官网直接下载。 下载完成之后&#xff0c;通过gzip命令解压文件 gzip -d xr806_sdk.tar.gz 获取编译链工具 还是按照官网操作指南&#xff0c;下载 gcc-arm-none-eabi-8-2019-q3-update 下载之后进行解压&#xff0c;同理。 注意修改GCC路径&#xff0c…

既然所有ERP系统都很烂,那创业公司有没有机会?

既然所有ERP系统都烂,那创业公司有没机会? 得一点点把这问题捋顺了再回答—— 先说说“都很烂”这个判断是否准确谈谈国产ERP和国际ERP厂商&#xff0c;新创公司是否有优势&#xff1f;最后聊一下创业本身&#xff0c;如何创业、风险如何…… 一些人可能对传统的ERP系统感到…

初识 Elasticsearch 应用知识,一文读懂 Elasticsearch 知识文集(2)

&#x1f3c6;作者简介&#xff0c;普修罗双战士&#xff0c;一直追求不断学习和成长&#xff0c;在技术的道路上持续探索和实践。 &#x1f3c6;多年互联网行业从业经验&#xff0c;历任核心研发工程师&#xff0c;项目技术负责人。 &#x1f389;欢迎 &#x1f44d;点赞✍评论…

MySQL进阶篇(三) 索引

一、插入数据 1. insert &#xff08;1&#xff09;优化方案一&#xff0c;批量插入数据 Insert into tb_test values(1,Tom),(2,Cat),(3,Jerry);&#xff08;2&#xff09;优化方案二&#xff0c;手动控制事务 start transaction; insert into tb_test values(1,Tom),(2,Cat…

JS栈和堆:数据是如何存储的

JS栈和堆&#xff1a;数据是如何存储的 背景JavaScript 是什么类型的语言JavaScript 的数据类型内存空间栈空间和堆空间再谈闭包 背景 JS有多种数据类型&#xff1a;数字型&#xff0c;字符串型&#xff0c;数组型等&#xff0c;虽然 JavaScript 并不需要直接去管理内存&#…

Apache ActiveMQ RCE CNVD-2023-69477 CVE-2023-46604

漏洞简介 Apache ActiveMQ官方发布新版本&#xff0c;修复了一个远程代码执行漏洞&#xff0c;攻击者可构造恶意请求通过Apache ActiveMQ的61616端口发送恶意数据导致远程代码执行&#xff0c;从而完全控制Apache ActiveMQ服务器。 影响版本 Apache ActiveMQ 5.18.0 before …