Dubbo的路由策略剖析

1 概述

Dubbo中的路由策略的作用是服务消费端使用路由策略对服务提供者列表进行过滤和选择,最终获取符合路由规则的服务提供者。

Dubbo中的路由策略主要分为两类,StateRouter和普通Router。

StateRouter (如TagStateRouter、ConditionStateRouter等)是在Dubbo 3中引入的一种路由实现,是Dubbo 3中主要使用的路由策略。它可根据服务提供者的状态信息(如负载、响应时间等)进行路由决策。

普通Router是指根据预设的路由规则进行服务调用的路由选择。

2 路由策略的加载

服务消费端启动时,会加载远程服务对应的路由规则,并创建路由规则链

具体而言,在服务消费端启动时,会将远程服务转换为Invoker,在此过程中,会调用 RegistryProtocol 的 doCreateInvoker 方法,其中会调用 RegistryDirectory 的 buildRouterChain() 方法创建路由规则链 RouterChain。具体实现细节如下所示。

// org.apache.dubbo.registry.integration.RegistryProtocol#getInvoker
public <T> ClusterInvoker<T> getInvoker(Cluster cluster, Registry registry, Class<T> type, URL url) {// FIXME, this method is currently not used, create the right registry before enable.DynamicDirectory<T> directory = new RegistryDirectory<>(type, url);return doCreateInvoker(directory, cluster, registry, type);
}// org.apache.dubbo.registry.integration.RegistryProtocol#doCreateInvoker
protected <T> ClusterInvoker<T> doCreateInvoker(DynamicDirectory<T> directory, Cluster cluster, Registry registry, Class<T> type) {directory.setRegistry(registry);directory.setProtocol(protocol);// all attributes of REFER_KEYMap<String, String> parameters = new HashMap<>(directory.getConsumerUrl().getParameters());URL urlToRegistry = new ServiceConfigURL(parameters.get(PROTOCOL_KEY) == null ? CONSUMER : parameters.get(PROTOCOL_KEY),parameters.remove(REGISTER_IP_KEY),0,getPath(parameters, type),parameters);urlToRegistry = urlToRegistry.setScopeModel(directory.getConsumerUrl().getScopeModel());urlToRegistry = urlToRegistry.setServiceModel(directory.getConsumerUrl().getServiceModel());if (directory.isShouldRegister()) {directory.setRegisteredConsumerUrl(urlToRegistry);registry.register(directory.getRegisteredConsumerUrl());}// 1、建立路由规则链directory.buildRouterChain(urlToRegistry);// 2、订阅服务提供者地址,生成invokerdirectory.subscribe(toSubscribeUrl(urlToRegistry));// 3、封装集群容错策略到invokerreturn (ClusterInvoker<T>) cluster.join(directory, true);
}

以下为加载路由规则和创建路由规则链的实现细节。

// org.apache.dubbo.registry.integration.DynamicDirectory#buildRouterChain
public void buildRouterChain(URL url) {this.setRouterChain(RouterChain.buildChain(getInterface(), url));
}// org.apache.dubbo.rpc.cluster.RouterChain#buildChain
public static <T> RouterChain<T> buildChain(Class<T> interfaceClass, URL url) {SingleRouterChain<T> chain1 = buildSingleChain(interfaceClass, url);SingleRouterChain<T> chain2 = buildSingleChain(interfaceClass, url);return new RouterChain<>(new SingleRouterChain[]{chain1, chain2});
}public static <T> SingleRouterChain<T> buildSingleChain(Class<T> interfaceClass, URL url) {ModuleModel moduleModel = url.getOrDefaultModuleModel();List<RouterFactory> extensionFactories = moduleModel.getExtensionLoader(RouterFactory.class).getActivateExtension(url, ROUTER_KEY);// 加载 Router 路由规则List<Router> routers = extensionFactories.stream().map(factory -> factory.getRouter(url)).sorted(Router::compareTo).collect(Collectors.toList());// 加载 StateRouter 路由规则List<StateRouter<T>> stateRouters = moduleModel.getExtensionLoader(StateRouterFactory.class).getActivateExtension(url, ROUTER_KEY).stream().map(factory -> factory.getRouter(interfaceClass, url)).collect(Collectors.toList());boolean shouldFailFast = Boolean.parseBoolean(ConfigurationUtils.getProperty(moduleModel, Constants.SHOULD_FAIL_FAST_KEY, "true"));RouterSnapshotSwitcher routerSnapshotSwitcher = ScopeModelUtil.getFrameworkModel(moduleModel).getBeanFactory().getBean(RouterSnapshotSwitcher.class);// 创建路由链return new SingleRouterChain<>(routers, stateRouters, shouldFailFast, routerSnapshotSwitcher);
}public SingleRouterChain(List<Router> routers, List<StateRouter<T>> stateRouters, boolean shouldFailFast, RouterSnapshotSwitcher routerSnapshotSwitcher) {initWithRouters(routers);initWithStateRouters(stateRouters);this.shouldFailFast = shouldFailFast;this.routerSnapshotSwitcher = routerSnapshotSwitcher;
}

以下为构建 StateRouter 路由链的实现细节。

private void initWithStateRouters(List<StateRouter<T>> stateRouters) {StateRouter<T> stateRouter = TailStateRouter.getInstance();for (int i = stateRouters.size() - 1; i >= 0; i--) {StateRouter<T> nextStateRouter = stateRouters.get(i);// 设置当前路由节点的下一个路由节点nextStateRouter.setNextRouter(stateRouter);stateRouter = nextStateRouter;}this.headStateRouter = stateRouter;this.stateRouters = Collections.unmodifiableList(stateRouters);
}

3 路由策略的使用

服务消费端根据负载均衡策略获取服务提供者前,会先根据路由策略获取符合路由规则的服务提供者。具体细节如下所示。

(1)以下是服务消费端发起远程调用的主要过程

org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker#invoke

public Result invoke(final Invocation invocation) throws RpcException {checkWhetherDestroyed();// binding attachments into invocation.
//        Map<String, Object> contextAttachments = RpcContext.getClientAttachment().getObjectAttachments();
//        if (contextAttachments != null && contextAttachments.size() != 0) {
//            ((RpcInvocation) invocation).addObjectAttachmentsIfAbsent(contextAttachments);
//        }InvocationProfilerUtils.enterDetailProfiler(invocation, () -> "Router route.");// 1、获取服务提供者列表-InvokersList<Invoker<T>> invokers = list(invocation);InvocationProfilerUtils.releaseDetailProfiler(invocation);checkInvokers(invokers, invocation);// 2、获取负载均衡策略。根据url参数找LoadBalance扩展,默认RandomLoadBalanceLoadBalance loadbalance = initLoadBalance(invokers, invocation);RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);InvocationProfilerUtils.enterDetailProfiler(invocation, () -> "Cluster " + this.getClass().getName() + " invoke.");try {// 3、执行远程调用。子类实现,会有不同的集群容错方式return doInvoke(invocation, invokers, loadbalance);} finally {InvocationProfilerUtils.releaseDetailProfiler(invocation);}
}protected List<Invoker<T>> list(Invocation invocation) throws RpcException {return getDirectory().list(invocation);
}

(2)以下是根据路由策略获取符合路由规则的服务提供者

org.apache.dubbo.rpc.cluster.directory.AbstractDirectory#list

public List<Invoker<T>> list(Invocation invocation) throws RpcException {if (destroyed) {throw new RpcException("Directory of type " + this.getClass().getSimpleName() + " already destroyed for service " + getConsumerUrl().getServiceKey() + " from registry " + getUrl());}BitList<Invoker<T>> availableInvokers;SingleRouterChain<T> singleChain = null;try {try {if (routerChain != null) {routerChain.getLock().readLock().lock();}// use clone to avoid being modified at doList().if (invokersInitialized) {availableInvokers = validInvokers.clone();} else {availableInvokers = invokers.clone();}// 获取路由规则链if (routerChain != null) {singleChain = routerChain.getSingleChain(getConsumerUrl(), availableInvokers, invocation);singleChain.getLock().readLock().lock();}} finally {if (routerChain != null) {routerChain.getLock().readLock().unlock();}}// 根据路由规则信息和invoker列表,获取经过路由规则筛选后的服务提供者列表List<Invoker<T>> routedResult = doList(singleChain, availableInvokers, invocation);if (routedResult.isEmpty()) {// 2-2 - No provider available.logger.warn(CLUSTER_NO_VALID_PROVIDER, "provider server or registry center crashed", "","No provider available after connectivity filter for the service " + getConsumerUrl().getServiceKey()+ " All routed invokers' size: " + routedResult.size()+ " from registry " + this+ " on the consumer " + NetUtils.getLocalHost()+ " using the dubbo version " + Version.getVersion() + ".");}return Collections.unmodifiableList(routedResult);} finally {if (singleChain != null) {singleChain.getLock().readLock().unlock();}}
}

org.apache.dubbo.registry.integration.DynamicDirectory#doList

public List<Invoker<T>> doList(SingleRouterChain<T> singleRouterChain,BitList<Invoker<T>> invokers, Invocation invocation) {if (forbidden && shouldFailFast) {// 1. No service provider 2. Service providers are disabledthrow new RpcException(RpcException.FORBIDDEN_EXCEPTION, "No provider available from registry " +this + " for service " + getConsumerUrl().getServiceKey() + " on consumer " +NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() +", please check status of providers(disabled, not registered or in blacklist).");}if (multiGroup) {return this.getInvokers();}try {// 执行路由策略,获取服务提供者列表// Get invokers from cache, only runtime routers will be executed.List<Invoker<T>> result = singleRouterChain.route(getConsumerUrl(), invokers, invocation);return result == null ? BitList.emptyList() : result;} catch (Throwable t) {// 2-1 - Failed to execute routing.logger.error(CLUSTER_FAILED_SITE_SELECTION, "", "","Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);return BitList.emptyList();}
}

org.apache.dubbo.rpc.cluster.SingleRouterChain#route 

public List<Invoker<T>> route(URL url, BitList<Invoker<T>> availableInvokers, Invocation invocation) {if (invokers.getOriginList() != availableInvokers.getOriginList()) {logger.error(INTERNAL_ERROR, "", "Router's invoker size: " + invokers.getOriginList().size() +" Invocation's invoker size: " + availableInvokers.getOriginList().size(),"Reject to route, because the invokers has changed.");throw new IllegalStateException("reject to route, because the invokers has changed.");}if (RpcContext.getServiceContext().isNeedPrintRouterSnapshot()) {return routeAndPrint(url, availableInvokers, invocation);} else {return simpleRoute(url, availableInvokers, invocation);}
}public List<Invoker<T>> simpleRoute(URL url, BitList<Invoker<T>> availableInvokers, Invocation invocation) {BitList<Invoker<T>> resultInvokers = availableInvokers.clone();// 1. route state routerresultInvokers = headStateRouter.route(resultInvokers, url, invocation, false, null);if (resultInvokers.isEmpty() && (shouldFailFast || routers.isEmpty())) {printRouterSnapshot(url, availableInvokers, invocation);return BitList.emptyList();}if (routers.isEmpty()) {return resultInvokers;}List<Invoker<T>> commonRouterResult = resultInvokers.cloneToArrayList();// 2. route common routerfor (Router router : routers) {// Copy resultInvokers to a arrayList. BitList not supportRouterResult<Invoker<T>> routeResult = router.route(commonRouterResult, url, invocation, false);commonRouterResult = routeResult.getResult();if (CollectionUtils.isEmpty(commonRouterResult) && shouldFailFast) {printRouterSnapshot(url, availableInvokers, invocation);return BitList.emptyList();}// stop continue routingif (!routeResult.isNeedContinueRoute()) {return commonRouterResult;}}if (commonRouterResult.isEmpty()) {printRouterSnapshot(url, availableInvokers, invocation);return BitList.emptyList();}return commonRouterResult;
}

使用基于BitMap实现的BitList,对不同路由策略之间的结果取交集(&),得到最终的路由结果。具体实现如下所示。

org.apache.dubbo.rpc.cluster.router.state.AbstractStateRouter#route

public final BitList<Invoker<T>> route(BitList<Invoker<T>> invokers, URL url, Invocation invocation, boolean needToPrintMessage, Holder<RouterSnapshotNode<T>> nodeHolder) throws RpcException {if (needToPrintMessage && (nodeHolder == null || nodeHolder.get() == null)) {needToPrintMessage = false;}RouterSnapshotNode<T> currentNode = null;RouterSnapshotNode<T> parentNode = null;Holder<String> messageHolder = null;// pre-build current nodeif (needToPrintMessage) {parentNode = nodeHolder.get();currentNode = new RouterSnapshotNode<>(this.getClass().getSimpleName(), invokers.clone());parentNode.appendNode(currentNode);// set parent node's output size in the first child invoke// initial node output size is zero, first child will override itif (parentNode.getNodeOutputSize() < invokers.size()) {parentNode.setNodeOutputInvokers(invokers.clone());}messageHolder = new Holder<>();nodeHolder.set(currentNode);}BitList<Invoker<T>> routeResult;routeResult = doRoute(invokers, url, invocation, needToPrintMessage, nodeHolder, messageHolder);if (routeResult != invokers) {// 对不同的路由策略之间的结果取交集(&)routeResult = invokers.and(routeResult);}// check if router support call continue route by itselfif (!supportContinueRoute()) {// use current node's result as next node's parameterif (!shouldFailFast || !routeResult.isEmpty()) {routeResult = continueRoute(routeResult, url, invocation, needToPrintMessage, nodeHolder);}}// post-build current nodeif (needToPrintMessage) {currentNode.setRouterMessage(messageHolder.get());if (currentNode.getNodeOutputSize() == 0) {// no child callcurrentNode.setNodeOutputInvokers(routeResult.clone());}currentNode.setChainOutputInvokers(routeResult.clone());nodeHolder.set(parentNode);}return routeResult;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/21459.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

实力!云起无垠晋级“第九届安全创客汇”年度10强

2024年5月28日&#xff0c;第九届“安全创客汇”复赛在重庆圆满落幕。在本次国内最具影响力的网络安全创业大赛中&#xff0c;云起无垠凭借其技术的创新性和巨大市场价值&#xff0c;成功跻身年度十强。 随着人工智能技术的不断发展&#xff0c;特别是在大模型技术的推动下&…

【图像处理与机器视觉】XJTU期末考点

题型 选择&#xff1a;1 分10 填空&#xff1a;1 分15 简答题&#xff08;也含有计算和画图&#xff09;&#xff1a;10 分*4 计算题&#xff1a;15 分20 分 考点 选择题&#xff08;部分&#xff09; 数字图像处理基础 p(x,y),q(s,t)两个像素之间的距离由公式&#xff1a…

湖南(品牌调研)源点咨询 企业品牌调研侧重点分析

本文由湖南长沙&#xff08;市场调研&#xff09;源点咨询编辑发布 企业建立品牌&#xff0c;往往都需进行科学性的品牌调研。因为只有这样&#xff0c;才能让企业更好的把握市场的发展趋势&#xff0c;进而为品牌的建立和发展提供更有价值的数据参考&#xff01;那么品牌的调…

江淮集团分享:江淮集团数据管理实践

下文为江淮集团信息化管理部副部长丁志海的演讲全文&#xff1a; 大家下午好。我是来自江淮汽车的丁志海&#xff0c;我做IT、做信息化做这一块有二十多年了。这次得帆邀请我来讲讲数据管理的实践经验。我就想说一说我的感受&#xff0c;为什么我们当初选择得帆&#xff0c;和一…

微信小程序计算器

微信小程序计算器 index.wxml <view classscreen>{{result}}</view><view classtest-bg><view classbtnGroup><view classitem grey bindtapclickButton id"{{C}}">AC</view><view classitem grey bindtapclickButton id&q…

AI精选付费资料包【37GB】

课程介绍 一、人工智能论文合集 二、AI必读经典书籍 三、超详细人工智能学习大纲 四、机器学习基础算法教程 五、深度学习神经网络基础教程 六、计算机视觉实战项目 课程获取 资料&#xff1a;AI精选付费资料包&#xff08;37.4GB&#xff09;获取&#xff1a;扫码关注公z号…

esp8266阿里云上线(小程序控制)

此wechatproject已上传在页面最上方 由图可见&#xff0c;项目只有两个页面&#xff0c;一个是获取该产品下的设备信息列表&#xff0c;一个是某设备对应的详情控制页面&#xff0c;由于这个项目只利用esp8266板子上自带的led&#xff0c;功能简单&#xff0c;只需要控制开关即…

leetcode 575.分糖果

思路&#xff1a;开两个数组&#xff0c;一个用来存储非负数的糖果个数&#xff0c;一个用来存储负数的糖果个数&#xff0c;这两个数组都是状态数组&#xff0c;而不是计数数组 如果当前能够吃的种类大于现有的种类&#xff0c;现有的种类个数就是答案&#xff1b; 如果当前…

Update! 基于RockyLinux9.3离线安装Zabbix6.0

链接&#xff1a; Ansible离线部署 之 Zabbixhttp://mp.weixin.qq.com/s?__bizMzk0NTQ3OTk3MQ&mid2247487434&idx1&sn3128800a0219c5ebc5a3f89d2c8ccf50&chksmc3158786f4620e90afe440bb32fe68541191cebbabc2d2ef196f7300e84cde1e1b57383c521a&scene21#we…

YOLOv9改进策略 | Conv篇 | 利用YOLOv10提出的SCDown魔改YOLOv9进行下采样(附代码 + 结构图 + 添加教程)

一、本文介绍 本文给大家带来的改进机制是利用YOLOv10提出的SCDown魔改YOLOv9进行下采样,其是更高效的下采样。具体而言,其首先利用点卷积调整通道维度,然后利用深度卷积进行空间下采样。这将计算成本减少到和参数数量减少到。同时,这最大限度地保留了下采样过程中的信息,…

创新指南|提高人才回报率的重要举措和指标

员工是组织最大的投资&#xff0c;也是最深层的价值源泉。人才系统必须同时强调生产力和价值创造。让合适的人才担任合适的职位&#xff0c;并为员工提供成功所需的支持和机会&#xff0c;这是实现回报的关键。本文将介绍组织可以采取的五项行动&#xff0c;以最大化企业的人才…

postgresql常用命令#postgresql认证

PostgreSQL 是一个功能强大的开源关系数据库管理系统&#xff0c;提供了一系列命令行工具来管理和操作数据库。以下是一些常用的 PostgreSQL 命令&#xff0c;涵盖数据库和用户管理、数据操作以及查询和维护等方面。 #PostgreSQL培训 #postgresql认证 #postgreSQL考试 #PG考试…

汽车识别项目

窗口设计 这里的代码放在py文件最前面或者最后面都无所谓 # 创建主窗口 window tk.Tk() window.title("图像目标检测系统") window.geometry(1000x650) # 设置窗口大小# 创建背景画布并使用grid布局管理器 canvas_background tk.Canvas(window, width1000, height…

【Hive SQL 每日一题】统计各个商品今年销售额与去年销售额的增长率及排名变化

文章目录 测试数据需求说明需求实现分步解析 测试数据 -- 创建商品表 DROP TABLE IF EXISTS products; CREATE TABLE products (product_id INT,product_name STRING );INSERT INTO products VALUES (1, Product A), (2, Product B), (3, Product C), (4, Product D), (5, Pro…

英码科技推出鸿蒙边缘计算盒子:提升国产化水平,增强AI应用效能,保障数据安全

当前&#xff0c;随着国产化替代趋势的加强&#xff0c;鸿蒙系统Harmony OS也日趋成熟和完善&#xff0c;各行各业都在积极拥抱鸿蒙&#xff1b;那么&#xff0c;边缘计算要加快实现全面国产化&#xff0c;基于鸿蒙系统开发AI应用势在必行。 关于鸿蒙系统及其优势 鸿蒙系统是华…

Linux 问题定位查看日志文件常用命令

Linux 问题定位查看日志文件常用命令 查看日志文件的前100行中是否包含关键词&#xff1a; head -n 100 /var/log/file.log | grep "keyword"查看日志文件的最后100行中是否包含关键词&#xff1a; tail -n 100 /var/log/file.log | grep "keyword"使用l…

ROS2从入门到精通4-3:全局路径规划插件开发案例(以A*算法为例)

目录 0 专栏介绍1 路径规划插件的意义2 全局规划插件编写模板2.1 构造规划插件类2.2 注册并导出插件2.3 编译与使用插件 3 全局规划插件开发案例(A*算法)常见问题 0 专栏介绍 本专栏旨在通过对ROS2的系统学习&#xff0c;掌握ROS2底层基本分布式原理&#xff0c;并具有机器人建…

2023-2025年最值得选择的Java毕业设计选题大全:1000个热门选题推荐✅✅✅

&#x1f497;博主介绍&#xff1a;✌全网粉丝1W,CSDN作者、博客专家、全栈领域优质创作者&#xff0c;博客之星、平台优质作者、专注于Java、小程序技术领域和毕业项目实战✌&#x1f497; &#x1f31f;文末获取源码数据库&#x1f31f; 感兴趣的可以先收藏起来&#xff0c;还…

以字节为单位管理文件系统

《操作系统》书上说&#xff1a;文件系统是块设备&#xff0c;一般是4KB一块。本文尝试以字节为单位管理文件系统。 关键是空闲空间的管理。在传统的文件系统中&#xff0c;用的是bitmap&#xff0c;在新的方法中&#xff0c;用的是“双B树”。 空闲空间用一张两列多行的表来…

冥想第一千一百七十八天

1.周末&#xff0c;早上先骑着电车到绿谷公园拿了姐给的精油&#xff0c;40分钟到家。 2.早上带着媳妇吃了饭&#xff0c;等丈母娘和小侄子。一起去荥泽水乡特别的推荐。感受特别好玩。 3.晚上带着丈母娘和小侄子吃了饭&#xff0c;给送到中原福塔。回来都都12点了。 4.累的&am…