Dubbo服务提供端处理请求的过程剖析

1 处理请求的过程概述

(1)消费端发起TCP连接后,服务提供方的NettyServer的connected方法将被调用;

(2)因为Netty默认的线程模型为All,因此AllChannelHandler类把接收到的所有消息(包括请求事件、响应事件、连接事件、断开事件,心跳事件等)包装成ChannelEventRunnable任务,并将其投递到线程池中;

(3)接着执行线程池中的任务,并最终将调用DubboProtocol的connected方法。

2 处理请求的实现细节

2.1 NettyServer的connected方法被调用

消费端发起TCP连接后,服务提供方的NettyServer的connected方法将被调用。connected方法为NettyServer父类AbstractServer的connected方法。

其中的依次调用关系为:AbstractServer的connected()->AbstractPeer的connected()->ChannelHandler的connected()。具体实现如下所示。

(1)AbstractServer的connected()

    public void connected(Channel ch) throws RemotingException {// If the server has entered the shutdown process, reject any new connectionif (this.isClosing() || this.isClosed()) {logger.warn(INTERNAL_ERROR, "unknown error in remoting module", "", "Close new channel " + ch + ", cause: server is closing or has been closed. For example, receive a new connect request while in shutdown process.");ch.close();return;}if (accepts > 0 && getChannelsSize()> accepts) {logger.error(INTERNAL_ERROR, "unknown error in remoting module", "", "Close channel " + ch + ", cause: The server " + ch.getLocalAddress() + " connections greater than max config " + accepts);ch.close();return;}super.connected(ch);}

(2)AbstractPeer的connected()

    private final ChannelHandler handler;public void connected(Channel ch) throws RemotingException {if (closed) {return;}handler.connected(ch);}

2.2 消息被投递到线程池中

调用ChannelHandler的connected()时,因为Netty默认的线程模型为All,因此AllChannelHandler类(ChannelHandler的子类)把接收到的所有消息包装成ChannelEventRunnable任务,并将其投递到线程池中。具体实现如下所示。

public class AllChannelHandler extends WrappedChannelHandler {public AllChannelHandler(ChannelHandler handler, URL url) {super(handler, url);}@Overridepublic void connected(Channel channel) throws RemotingException {ExecutorService executor = getSharedExecutorService();try {executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));} catch (Throwable t) {throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);}}@Overridepublic void disconnected(Channel channel) throws RemotingException {ExecutorService executor = getSharedExecutorService();try {executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));} catch (Throwable t) {throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t);}}@Overridepublic void received(Channel channel, Object message) throws RemotingException {ExecutorService executor = getPreferredExecutorService(message);try {executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));} catch (Throwable t) {if(message instanceof Request && t instanceof RejectedExecutionException){sendFeedback(channel, (Request) message, t);return;}throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);}}@Overridepublic void caught(Channel channel, Throwable exception) throws RemotingException {ExecutorService executor = getSharedExecutorService();try {executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));} catch (Throwable t) {throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);}}
}

2.3 执行线程池中的任务

2.3.1 ChannelEventRunnable的run方法

执行线程池中的任务时,将执行ChannelEventRunnable的run方法,其实现细节具体如下所示。

    public void run() {InternalThreadLocalMap internalThreadLocalMap = InternalThreadLocalMap.getAndRemove();try {if (state == ChannelState.RECEIVED) {try {handler.received(channel, message);} catch (Exception e) {logger.warn(INTERNAL_ERROR, "unknown error in remoting module", "", "ChannelEventRunnable handle " + state + " operation error, channel is " + channel+ ", message is " + message, e);}} else {switch (state) {case CONNECTED:try {handler.connected(channel);} catch (Exception e) {logger.warn(INTERNAL_ERROR, "unknown error in remoting module", "", "ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);}break;case DISCONNECTED:try {handler.disconnected(channel);} catch (Exception e) {logger.warn(INTERNAL_ERROR, "unknown error in remoting module", "", "ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);}break;case SENT:try {handler.sent(channel, message);} catch (Exception e) {logger.warn(INTERNAL_ERROR, "unknown error in remoting module", "", "ChannelEventRunnable handle " + state + " operation error, channel is " + channel+ ", message is " + message, e);}break;case CAUGHT:try {handler.caught(channel, exception);} catch (Exception e) {logger.warn(INTERNAL_ERROR, "unknown error in remoting module", "", "ChannelEventRunnable handle " + state + " operation error, channel is " + channel+ ", message is: " + message + ", exception is " + exception, e);}break;default:logger.warn(INTERNAL_ERROR, "unknown error in remoting module", "", "unknown state: " + state + ", message is " + message);}}} finally {InternalThreadLocalMap.set(internalThreadLocalMap);}}

2.3.2 执行connected方法

执行handler.connected(channel)时,将调用HeaderExchangeHandler#connected方法,具体实现如下所示。

    public void connected(Channel channel) throws RemotingException {ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);handler.connected(exchangeChannel);channel.setAttribute(Constants.CHANNEL_SHUTDOWN_TIMEOUT_KEY,ConfigurationUtils.getServerShutdownTimeout(channel.getUrl().getOrDefaultApplicationModel()));}

接着在执行handler.connected(exchangeChannel)时,将调用DubboProtocol#connected方法,实现如下所示。

public void connected(Channel channel) throws RemotingException {invoke(channel, ON_CONNECT_KEY);
}private void invoke(Channel channel, String methodKey) {Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey);if (invocation != null) {try {if (Boolean.TRUE.toString().equals(invocation.getAttachment(STUB_EVENT_KEY))) {tryToGetStubService(channel, invocation);}received(channel, invocation);} catch (Throwable t) {logger.warn(PROTOCOL_FAILED_REFER_INVOKER, "", "", "Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t);}}
}public void received(Channel channel, Object message) throws RemotingException {if (message instanceof Invocation) {reply((ExchangeChannel) channel, message);} else {super.received(channel, message);}
}

执行接口请求最终将调用DubboProtocol#reply方法,具体实现如下所示。

public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {if (!(message instanceof Invocation)) {throw new RemotingException(channel, "Unsupported request: "+ (message == null ? null : (message.getClass().getName() + ": " + message))+ ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());}Invocation inv = (Invocation) message;// 1、获取调用方法对应的InvokerInvoker<?> invoker = inv.getInvoker() == null ? getInvoker(channel, inv) : inv.getInvoker();// switch TCCLif (invoker.getUrl().getServiceModel() != null) {Thread.currentThread().setContextClassLoader(invoker.getUrl().getServiceModel().getClassLoader());}// need to consider backward-compatibility if it's a callbackif (Boolean.TRUE.toString().equals(inv.getObjectAttachmentWithoutConvert(IS_CALLBACK_SERVICE_INVOKE))) {String methodsStr = invoker.getUrl().getParameters().get("methods");boolean hasMethod = false;if (methodsStr == null || !methodsStr.contains(",")) {hasMethod = inv.getMethodName().equals(methodsStr);} else {String[] methods = methodsStr.split(",");for (String method : methods) {if (inv.getMethodName().equals(method)) {hasMethod = true;break;}}}if (!hasMethod) {logger.warn(PROTOCOL_FAILED_REFER_INVOKER, "", "", new IllegalStateException("The methodName " + inv.getMethodName()+ " not found in callback service interface ,invoke will be ignored."+ " please update the api interface. url is:"+ invoker.getUrl()) + " ,invocation is :" + inv);return null;}}// 2、获取上下文对象,并设置对端地址RpcContext.getServiceContext().setRemoteAddress(channel.getRemoteAddress());// 3、执行invoker调用链Result result = invoker.invoke(inv);// 4、返回结果return result.thenApply(Function.identity());
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/594733.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

PaddlePaddle初使用

模型导出与预测 # -c 后面设置训练算法的yml配置文件 # -o 配置可选参数 # Global.pretrained_model 参数设置待转换的训练模型地址&#xff0c;不用添加文件后缀 .pdmodel&#xff0c;.pdopt或.pdparams。 # Global.save_inference_dir参数设置转换的模型将保存的地址。pytho…

数据库攻防学习之MySQL

MySQL 0x01mysql学习 MySQL 是瑞典的MySQL AB公司开发的一个可用于各种流行操作系统平台的关系数据库系统&#xff0c;它具有客户机/服务器体系结构的分布式数据库管理系统。可以免费使用使用&#xff0c;用的人数很多。 0x02环境搭建 这里演示用&#xff0c;phpstudy搭建的…

华为端口隔离高级用法经典案例

最终效果&#xff1a; pc4不能ping通pc5&#xff0c;pc5能ping通pc4 pc1不能和pc2、pc3通&#xff0c;但pc2和pc3能互通 vlan batch 2 interface Vlanif1 ip address 10.0.0.254 255.255.255.0 interface Vlanif2 ip address 192.168.2.1 255.255.255.0 interface MEth0/0/1 i…

基于SSM的校园快递管理系统

目录 前言 开发环境以及工具 项目功能介绍 学生&#xff1a; 管理员&#xff1a; 详细设计 获取源码 前言 本项目是一个基于IDEA和Java语言开发的基于SSM的校园快递管理系统应用。应用包含学生端和管理员端等多个功能模块。 欢迎使用我们的校园快递管理系统&#xff01;我…

厦门大学OpenHarmony技术俱乐部开创“1+N”新模式,加速推动产学研融合

12月29日,OpenHarmony技术俱乐部再添重将——在多方见证下,厦门大学OpenHarmony技术俱乐部在翔安校区益海嘉里楼报告厅正式揭牌成立,现场出席领导及师生代表近千人。 成立仪式现场 OpenHarmony技术俱乐部 携手厦门大学共绘开源生态新图景 OpenHarmony是由开放原子开源基金…

揭示AUTOSAR中隐藏的漏洞

AUTOSAR是一个普遍采用的软件框架&#xff0c;用于各种汽车零部件&#xff0c;如ABS, ECU,自动照明、环境控制、充电控制器、信息娱乐系统等。AUTOSAR的创建目的是促进汽车零部件之间形成标准接口&#xff0c;可以在不同制造商之间互通。 因此&#xff0c;任何配备微控制器(MC…

CGAL的无限制的Delaunay图

本章描述了构建L∞距离下线段Delaunay图的算法和几何特征。这些特征还包括绘制L∞距离下线段Delaunay图对偶&#xff08;即L∞距离下线段Voronoi图&#xff09;边缘的方法。L∞算法和特征依赖于欧几里得&#xff08;或L2&#xff09;距离下的线段Delaunay图算法和特征。L∞度量…

redis 面试问题 (更新中 ing)

reids 是做什么的 为什么那么快 有哪些使用场景 1.数据缓存 2.计数器 3.限时 4.限流 5.分布式锁 6.队列 7.发布 订阅 redis有哪些 数据结构 常用的 string 对字符串 、整数、浮点数list 链表 &#xff0c;字符串set 不重复集合 &#xff0c;交集、并集 差集hash 无序散列…

月报总结|Moonbeam 12月份大事一览

一转眼已经到年底啦。本月&#xff0c;Moonbeam基金会发布四个最新战略重点&#xff1a;跨链解决方案、游戏、真实世界资产&#xff08;RWA&#xff09;、新兴市场。其中在新兴市场方面&#xff0c;紧锣密鼓地推出与巴西公司Grupo RO的战略合作。 用户教育方面&#xff0c;为了…

大创项目推荐 深度学习人脸表情识别算法 - opencv python 机器视觉

文章目录 0 前言1 技术介绍1.1 技术概括1.2 目前表情识别实现技术 2 实现效果3 深度学习表情识别实现过程3.1 网络架构3.2 数据3.3 实现流程3.4 部分实现代码 4 最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; 深度学习人脸表情识别系…

如何使用Python3 Boto3删除AWS CloudFormation的栈(Stacks)

文章目录 小结问题及解决有关Json文件的输入和输出使用Python3及正则表达式查找字符串包含某个子字符串使用Python3 Boto3删除AWS CloudFormation的栈&#xff08;Stacks&#xff09; 参考 小结 本文记录了使用Python3的Boto3包删除AWS CloudFormation的栈&#xff08;Stacks&…

Git从远程仓库拉取指定的分支

一、git clone 命令获取 使用git管理代码版本的时候&#xff0c;本地分支默认与远程同名分支建立追踪关系。文章开始也提到git clone <url>命令默认将整个远程版本库克隆到本地&#xff0c;但是git clone -b <分支名称>命令可以将指定的某一个远程分支拉取到我们本…

优雅地展示20w单细胞热图|非Doheatmap 超大数据集 细胞数太多

单细胞超大数据集的热图怎么画&#xff1f;昨天刚做完展示20万单细胞的热图要这么画吗&#xff1f; 今天就有人发消息问我为啥他画出来的热图有问题。 问题起源 昨天分享完 20万单细胞的热图要这么画吗&#xff1f;&#xff0c;就有人问为啥他的数据会出错。我们先来看下他的…

imgaug库指南(一):从入门到精通的【图像增强】之旅

文章目录 引言imgaug简介安装和导入imgaug代码示例imgaug的强大之处和用途小结结尾 引言 在深度学习和计算机视觉的世界里&#xff0c;数据是模型训练的基石&#xff0c;其质量与数量直接影响着模型的性能。然而&#xff0c;获取大量高质量的标注数据往往需要耗费大量的时间和…

(JAVA)-反射

什么是反射&#xff1f; 反射允许对成员变量&#xff0c;成员方法和构造方法的信息进行编程访问。 说简单点就是反射能将类里面的构造方法&#xff0c;成员变量,修饰符,返回值&#xff0c;注解&#xff0c;类型&#xff0c;甚至异常等类里面的所有东西都能够获取出来。 关于C…

团子杂记:SAP PS or 项目管理软件(PMIS )? PPM/P6

众所周知SAP的PS模块在项目型企业的SAP应用中扮演着核心角色&#xff0c;整个项目端到端的业务执行、财务核算、控制及分析都是通过PS作为主线&#xff0c;依赖于PS中的项目对象&#xff08;如WBS元素、网络活动等&#xff09;实现的。 在实施SAP的过程中&#xff0c;可以看到…

Nest 框架:解锁企业级 Web 应用开发的秘密武器(上)

&#x1f90d; 前端开发工程师&#xff08;主业&#xff09;、技术博主&#xff08;副业&#xff09;、已过CET6 &#x1f368; 阿珊和她的猫_CSDN个人主页 &#x1f560; 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 &#x1f35a; 蓝桥云课签约作者、已在蓝桥云…

RKE安装k8s及部署高可用rancher之证书私有证书但是内置的ssl不放到外置的LB中 4层负载均衡

先决条件# Kubernetes 集群 参考RKE安装k8s及部署高可用rancher之证书在外面的LB&#xff08;nginx中&#xff09;-CSDN博客CLI 工具Ingress Controller&#xff08;仅适用于托管 Kubernetes&#xff09; 创建集群k8s [rootnginx locale]# cat rancher-cluster.yml nodes:- …

网络安全—模拟ARP欺骗

文章目录 网络拓扑安装使用编辑数据包客户机攻击机验证 仅做实验用途&#xff0c;禁止做违法犯罪的事情&#xff0c;后果自负。当然现在的计算机多无法被欺骗了&#xff0c;开了防火墙ARP欺骗根本无效。 网络拓扑 均使用Windows Server 2003系统 相关配置可以点击观看这篇文章…

【iOS安全】JS 调用Objective-C中WKWebview Handler的三种方式

有三种实现途径 1. WKScriptMessageHandler OC部分&#xff1a;注册并实现Handler 将OC中的方法"nativeMethod"注册为JavaScript Message Handler&#xff0c;从而WebView中的JavaScript代码可以调用该方法 // Register in Objective-C code - (void)setupWKWebVi…