Dubbo分层设计之Exchange层

前言

Dubbo 框架采用分层设计,自上而下共分为十层。Exchange 层位于倒数第三层,它在 协议层 的下方、数据传输层的上方。
第一次看源码的时候,大家应该都会有一个疑问:都已经有 Transport 层了,为啥还要定义 Exchange 层?
Dubbo 这么做自然有它的原因,今天我们一起解开这个疑惑。

理解Exchange

Exchange 层也叫 数据交换层,它和数据传输层有什么区别呢?
Transport 层是对 Netty、Mina 的统一封装,用来做网络数据传输的。一次 RPC 调用在 Dubbo 看来,本质上也就是一次请求报文和响应报文的传输过程。这么一看,好像完全没必要再单独抽象出 Exchange 层嘛。但是我们忽略了一个事情,那就是 Transport 层并没有实现 请求-应答 消息交换模式。

一般来说,我们发起一次 RPC 调用以后,业务线程会阻塞,期望拿到一个服务端发来的结果,再继续往下走。
Transport 层只有一个 tcp 长连接,tcp 本身是没有 Request、Response 概念的。它只具备消息收发的能力,至于收发的消息是 Request 还是 Response 它是不知道的,消息的语义需要靠上层来定义,一般是在协议头用一个专门的比特位来标记。以 HTTP 协议为例,它是七层协议,在传输层看来,报文是不分 Request、Response 的,这完全靠 HTTP 服务器自行实现。

正因如此,Dubbo 要直接基于 tcp 来实现 RPC 调用,就得自己实现 Request-Response 模型。

设计实现

首先我们要清楚 Dubbo 的调用流程,才好去理解这些接口的作用。
客户端发送 Request 和收到 Response 的流程是这样的:
image.png
服务端处理请求的流程是这样的:
image.png

Exchanger

Dubbo Exchange 层的核心 SPI 接口是org.apache.dubbo.remoting.exchange.Exchanger ,同样也分别提供了bindconnect 方法供服务端和客户端使用。

@SPI(HeaderExchanger.NAME)
public interface Exchanger {@Adaptive({Constants.EXCHANGER_KEY})ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException;@Adaptive({Constants.EXCHANGER_KEY})ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException;
}

它和 Transporter 主要区别是:Transporter Channel 处理器是 ChannelHandler 接口,只具备消息收发的能力。Exchanger Channel 处理器是 ExchangeHandler,它在前者的基础上增加了reply 能力,也就是对于一个 Request,服务端可以回复一个 Response,这就是 请求-应答 模型。

public interface ExchangeHandler extends ChannelHandler, TelnetHandler {CompletableFuture<Object> reply(ExchangeChannel channel, Object request) throws RemotingException;
}

至于 ExchangeServer 和 ExchangeClient,只是在 Transport 层的 RemotingServer、Client 上做了一些封装。

HeaderExchanger

Exchanger 官方只提供了一种实现:HeaderExchanger,因为是在 Transport 上层,所以是基于 Transporter 二次封装。主要是创建了 HeaderExchangeClient 和 HeaderExchangeServer,核心是 HeaderExchangeHandler 实现。

public class HeaderExchanger implements Exchanger {public static final String NAME = "header";@Overridepublic ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);}@Overridepublic ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));}
}

HeaderExchangeServer

HeaderExchangeServer 是对 RemotingServer 的二次封装,主要是把传输层的 Channel 封装成了交换层的 ExchangeChannel。

@Override
public Collection<ExchangeChannel> getExchangeChannels() {Collection<ExchangeChannel> exchangeChannels = new ArrayList<ExchangeChannel>();Collection<Channel> channels = server.getChannels();if (CollectionUtils.isNotEmpty(channels)) {for (Channel channel : channels) {exchangeChannels.add(HeaderExchangeChannel.getOrAddChannel(channel));}}return exchangeChannels;
}

HeaderExchangeClient

HeaderExchangeClient 是对传输层 Client 的二次封装,主要是把 Client 封装成了 HeaderExchangeChannel,实现了 Request-Response 语义。

public HeaderExchangeClient(Client client, boolean startTimer) {Assert.notNull(client, "Client can't be null");this.client = client;this.channel = new HeaderExchangeChannel(client);if (startTimer) {URL url = client.getUrl();startReconnectTask(url);startHeartBeatTask(url);}
}

HeaderExchangeHandler

HeaderExchangeHandler 封装了协议层传过来的 ExchangeHandler,重写了receivedsent 方法,实现了对 Request、Response 对象的处理。

消息发送时,它会把 Channel 封装成 HeaderExchangeChannel 再交给后续 handler 处理。

@Override
public void sent(Channel channel, Object message) throws RemotingException {Throwable exception = null;try {// 封装ChannelExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);handler.sent(exchangeChannel, message);} catch (Throwable t) {exception = t;HeaderExchangeChannel.removeChannelIfDisconnected(channel);}if (message instanceof Request) {Request request = (Request) message;// 记录发送时间DefaultFuture.sent(channel, request);}......
}

收到消息时,会针对 Request、Response 分别做处理。如果收到的是 Request,会调用业务 handler 执行业务逻辑,再返回结果。

void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {Response res = new Response(req.getId(), req.getVersion());Object msg = req.getData();try {// 调用业务handler、执行业务逻辑CompletionStage<Object> future = handler.reply(channel, msg);future.whenComplete((appResult, t) -> {try {if (t == null) {res.setStatus(Response.OK);res.setResult(appResult);} else {res.setStatus(Response.SERVICE_ERROR);res.setErrorMessage(StringUtils.toString(t));}// 发送结果channel.send(res);} catch (RemotingException e) {logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);}});} catch (Throwable e) {res.setStatus(Response.SERVICE_ERROR);res.setErrorMessage(StringUtils.toString(e));channel.send(res);}
}

如果收到的是 Response,说明是服务端对客户端请求的响应结果,则会给 DefaultFuture 设置 Result,唤醒业务线程。

static void handleResponse(Channel channel, Response response) throws RemotingException {if (response != null && !response.isHeartbeat()) {DefaultFuture.received(channel, response);}
}
public static void received(Channel channel, Response response, boolean timeout) {try {DefaultFuture future = FUTURES.remove(response.getId());if (future != null) {Timeout t = future.timeoutCheckTask;if (!timeout) {t.cancel();}// 设置结果,业务线程被唤醒future.doReceived(response);}} finally {CHANNELS.remove(response.getId());}
}

HeaderExchangeChannel

最后是 HeaderExchangeChannel,它是 交换层 ExchangeChannel 的实现。ExchangeChannel 是对传输层 Channel 的增强。Channel 只定义了send() 数据发送的能力,ExchangeChannel 增加了request() 支持发送 Request,拿到 Response。

public interface ExchangeChannel extends Channel {CompletableFuture<Object> request(Object request, ExecutorService executor) throws RemotingException;CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException;ExchangeHandler getExchangeHandler();@Overridevoid close(int timeout);
}

HeaderExchangeChannel 主要是对 Channel 的一个二次封装,它会把实例化自身并放到 Channel 属性里

static HeaderExchangeChannel getOrAddChannel(Channel ch) {if (ch == null) {return null;}HeaderExchangeChannel ret = (HeaderExchangeChannel) ch.getAttribute(CHANNEL_KEY);if (ret == null) {ret = new HeaderExchangeChannel(ch);if (ch.isConnected()) {ch.setAttribute(CHANNEL_KEY, ret);}}return ret;
}

自定义Exchange

Dubbo Exchanger 也可以基于 SPI 一键替换,我们实现一个自定义的 Exchanger,加深理解。
首先,我们新建一个模块dubbo-extension-exchange-custom,并引入依赖:

<dependencies><dependency><groupId>org.apache.dubbo</groupId><artifactId>dubbo-remoting-api</artifactId><version>${dubbo.version}</version></dependency>
</dependencies>

新建 dubbo.extension.remoting.exchange.CustomExchanger,重写 Exchanger 接口,返回我们自定义的 Server、Client 实现。

public class CustomExchanger implements Exchanger {public static final String NAME = "custom";@Overridepublic ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {return new CustomeExchangeServer(Transporters.bind(url, new DecodeHandler(new CustomExchangeHandler(handler))));}@Overridepublic ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {return new CustomExchangeClient(Transporters.connect(url, new DecodeHandler(new CustomExchangeHandler(handler))));}
}

CustomeExchangeServer 本身只是对 RemotingServer 的一个封装,核心是把 Channel 封装成 ExchangeChannel

public class CustomeExchangeServer extends RemotingServerDelegate {public CustomeExchangeServer(RemotingServer server) {super(server);}@Overrideprotected ExchangeChannel toExchangeChannel(Channel channel) {return CustomExchangeChannel.getOrAddChannel(channel);}
}

CustomExchangeClient 也只是为了把 Client 封装成 ExchangeChannel,让 传输层的 Channel 拥有request能力

public class CustomExchangeClient extends ClientDelegate {private final ExchangeChannel exchangeChannel;public CustomExchangeClient(Client client) {super(client);this.exchangeChannel = new CustomExchangeChannel(client);}@Overridepublic CompletableFuture<Object> request(Object request) throws RemotingException {return exchangeChannel.request(request);}......省略几个request()
}

CustomExchangeHandler 是对业务 ExchangeHandler 的封装,增加对 Request、Response 对象的处理

public class CustomExchangeHandler implements ChannelHandlerDelegate {private final ExchangeHandler handler;public CustomExchangeHandler(ExchangeHandler handler) {this.handler = handler;}@Overridepublic ChannelHandler getHandler() {return handler;}@Overridepublic void connected(Channel channel) throws RemotingException {handler.connected(toExchangeChannel(channel));}@Overridepublic void disconnected(Channel channel) throws RemotingException {handler.disconnected(toExchangeChannel(channel));}@Overridepublic void sent(Channel channel, Object message) throws RemotingException {handler.sent(toExchangeChannel(channel), message);}@Overridepublic void received(Channel channel, Object message) throws RemotingException {System.err.println("CustomExchangeHandler received:" + message);ExchangeChannel exchangeChannel = toExchangeChannel(channel);if (message instanceof Request) {handleRequest(exchangeChannel, (Request) message);} else if (message instanceof Response) {handleResponse(exchangeChannel, (Response) message);} else {handler.received(exchangeChannel, message);}}private void handleResponse(ExchangeChannel exchangeChannel, Response response) {DefaultFuture.received(exchangeChannel, response);}private void handleRequest(ExchangeChannel exchangeChannel, Request req) {try {Response res = new Response(req.getId(), req.getVersion());CompletableFuture<Object> future = handler.reply(exchangeChannel, req.getData());future.whenComplete((r, e) -> {if (e == null) {res.setStatus((byte) 20);res.setResult(r);} else {res.setStatus((byte) 70);res.setErrorMessage(e.getMessage());}try {exchangeChannel.send(res);} catch (Exception exception) {exception.printStackTrace();}});} catch (Exception e) {e.printStackTrace();}}@Overridepublic void caught(Channel channel, Throwable exception) throws RemotingException {handler.caught(toExchangeChannel(channel), exception);}private ExchangeChannel toExchangeChannel(Channel channel) {return CustomExchangeChannel.getOrAddChannel(channel);}
}

最后是 CustomExchangeChannel,它是对 Channel 的封装,增加了request的能力,发送请求后支持返回一个 CompletableFuture,并在收到响应后设置结果。

public class CustomExchangeChannel extends ExchangeChannelDelegate {private static final String CHANNEL_KEY = CustomExchangeChannel.class.getName() + ".CHANNEL";public CustomExchangeChannel(Channel channel) {super(channel);}@Overridepublic CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {if (isClosed()) {throw new RemotingException(this.getLocalAddress(), (InetSocketAddress) null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");}System.err.println("CustomExchangeChannel request:" + request);Request req = new Request();req.setVersion(Version.getProtocolVersion());req.setTwoWay(true);req.setData(request);DefaultFuture future = DefaultFuture.newFuture(this, req, timeout, executor);send(req);return future;}public static CustomExchangeChannel getOrAddChannel(Channel channel) {CustomExchangeChannel exchangeChannel = (CustomExchangeChannel) channel.getAttribute(CHANNEL_KEY);if (exchangeChannel == null) {channel.setAttribute(CHANNEL_KEY, exchangeChannel = new CustomExchangeChannel(channel));}return exchangeChannel;}
}

尾巴

Dubbo Exchange 层在 Transport 层之上实现了 Request-Response 模型。传输层只有一个 tcp 连接,只具备单纯的消息收发能力,对于消息收发的格式和语义是不关心的。tcp 没有 Request-Response 的概念,Dubbo 基于 tcp 长连接实现 RPC 调用,就必须自己实现一套 Request-Response 消息交换模型,Exchange 层就是对这套请求应答模型的抽象。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/631510.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JAVA调试webservice接口

java程序调试webservice接口可以使用调用工具进行调试&#xff0c;对应的调试工具 可在这个目录下面下载 SoapUI 下载地址&#xff1a;https://www,soapui.org/ 一、webservice项目可以使用soapUi 可以使用工具进行调试的前提是这个接口地址必须在前端的浏览器能够正常的打…

uniapp 获取外设键盘输入(扫码器/扫码枪/读卡器等)

前言 在使用uniapp开发收银机等设备时&#xff0c;常常会用到使用键盘输入的外设&#xff0c;如使用扫码器/扫码枪读取条形码/二维码等&#xff0c;及使用读卡器读取卡ID&#xff08;需要读卡器支持键盘输入卡ID&#xff0c;此种方式只支持读取未加密的卡ID信息&#xff0c;读…

通过OpenIddict设计一个授权服务器03-客户凭证流程

在本部分中&#xff0c;我们将把 OpenIddict 添加到项目中&#xff0c;并实施第一个授权流程&#xff1a;客户端凭证流。 添加 OpenIddict 软件包 首先&#xff0c;我们需要安装 OpenIddict NuGet 软件包 dotnet add package OpenIddict dotnet add package OpenIddict.AspN…

uniapp+vue3打包问题记录

**背景&#xff1a;**打包app出现问题&#xff0c;只显示底部导航的文字&#xff0c;其他一片空白 1. pages.json文件&#xff1a;tabBar中的iconPath图标格式不支持svg&#xff0c;只支持&#xff1a;png, jpg, jpeg的格式&#xff0c;当图片改为.png的时候可以正常显示 2. …

【数学建模】2024年华数杯国际赛B题-光伏发电Photovoltaic Power 思路、代码、参考论文

1 问题背景 中国电力构成包括传统能源(如煤炭、石油、天然气)、可再生能源(如水电、风能、太阳能、核能)和其他形式的电力。这些发电模式在满足中国巨大的电力需求方面发挥着至关重要的作用。据最新数据显示&#xff0c;中国总发电量超过20万亿千瓦时&#xff0c;居世界第一。…

idea社区版 MybatisCodeHelperPro插件使用介绍

文章目录 一、插件介绍二、idea社区版安装MybatisCodeHelperPro插件三、问题记录1. DatabaseHelper插件 加载不了部分数据库链接的列信息2. DatabaseHelper插件 数据库列显示顺序错乱3. MybatisCodeHelperPro插件 数据库字段不提示4. MybatisCodeHelperPro插件 特殊字段增加反引…

Skywalking链路追踪

目录 一、简介1.1、APM系统1.2、SkyWalking 简介 二、快速入门2.1、下载、启动2.2、界面认识 三、持久化存储四、告警通知五、自定义追踪-细粒度追踪service方法 一、简介 1.1、APM系统 APM&#xff08;Application Performance Monitoring&#xff09;系统是一种用于监控和管…

VR风景园林虚拟仿真系统编辑工具支持可视化预览成本低

为了帮助更多人快速、高效地构建虚拟现实应用系统&#xff0c;提高开发效率&#xff0c;降低成本投入&#xff0c;VR虚拟现实交互系统编辑器作为一种用于创建和编辑虚拟现实应用程序的工具&#xff0c;采用可视化界面提示和简单操作就能快速制作VR虚拟现实交互系统。 VR虚拟现实…

使用jackson对java类中包含泛型属性的bean进行序列化和反序列化

最近在做项目时&#xff0c;需要对java的bean对象进行序列化转换为string字符串后&#xff0c;存入redis&#xff0c;然后再从redis中读取该string对象&#xff0c;反序列化为bean对象。正常的简单类型的对象使用jackson可以非常方便进行互为转换操作&#xff0c;但我们操作的对…

Qt OpenGL - 网格式的直角坐标系

Qt OpenGL - 网格式的直角坐标系 引言一、绘制3D网格1.1 绘制平行于y轴的线段1.2 绘制平行于三个轴的线段1.3 绘制不同的3D网格 二、网格式的直角坐标系三、参考链接 引言 在OpenGL进行3D可视化&#xff0c;只绘制三条坐标轴略显单薄&#xff0c;而绘制网格形式的坐标系则能更清…

leedcode刷题笔记day1

题目大意&#xff1a; 暴力解法 两个for循环&#xff08;也是我一看到题目想到的方法&#xff09; 枚举在数组中所有的不同的两个下标的组合逐个检查它们所对应的数的和是否等于 target 复杂度分析 时间复杂度:O(n2)&#xff0c;这里 n 为数组的长度 空间复杂度:O(1)&#x…

canvas绘制不同样式的五角星(图文示例)

查看专栏目录 canvas实例应用100专栏&#xff0c;提供canvas的基础知识&#xff0c;高级动画&#xff0c;相关应用扩展等信息。canvas作为html的一部分&#xff0c;是图像图标地图可视化的一个重要的基础&#xff0c;学好了canvas&#xff0c;在其他的一些应用上将会起到非常重…

Windows无法格式化U盘怎么办?

U盘通常用于保存数据。有时&#xff0c;您可能需要格式化以擦除硬盘或修复错误等。通常&#xff0c;格式化过程可以通过Windows文件资源管理器、磁盘管理或Diskpart命令顺利进行&#xff0c;但有时会遇到Windows无法格式化U盘的情况。那么&#xff0c;Windows无法格式化U盘怎么…

uni-app的数据缓存

数据缓存uni.setStorage 将数据存储在本地缓存中指定的 key 中&#xff0c;会覆盖掉原来该 key 对应的内容&#xff0c;这是一个异步接口。 参数名类型必填说明keyString是本地缓存中的指定的 keydataAny是需要存储的内容&#xff0c;只支持原生类型、及能够通过 JSON.string…

程序员第一次接私活需要注意什么?

终于有一篇只说大白话的程序员接私活指南文章了&#xff01;程序员接私活&#xff0c;首先要关注合法和合理性 先来说合法性&#xff0c;这是程序员接私活的基本原则。不合规的产品不要做&#xff0c;不合法的需求不要做&#xff0c;原以为自己在赚钱&#xff0c;结果搞了半天啥…

1.2 虚拟环境

1.2 虚拟环境 创建好应用目录之后&#xff0c;接下来该安装Flask了。安装Flask最便捷的方法是使用虚拟环境。 虚拟环境是Python解释器的一个私有副本&#xff0c;在这个环境中你可以安装私有包&#xff0c;而且不会影响系统中安装的全局Python解释器。 虚拟环境非常有用&…

Docker registry镜像仓库,私有仓库及harbor管理详解

目录 registry镜像仓库概述 Docker 镜像仓库&#xff08;Docker Registry&#xff09;&#xff1a; registry 容器&#xff1a; 私有仓库概述 搭建本地私有仓库示例 Harbor概述 harbor架构 详解构成 Harbor由容器构成 Harbor部署示例 环境准备 部署Docker-Compose服…

(2023版)斯坦福CS231n学习笔记:DL与CV教程 (4) | 神经网络与反向传播

前言 &#x1f4da; 笔记专栏&#xff1a;斯坦福CS231N&#xff1a;面向视觉识别的卷积神经网络&#xff08;23&#xff09;&#x1f517; 课程链接&#xff1a;https://www.bilibili.com/video/BV1xV411R7i5&#x1f4bb; CS231n: 深度学习计算机视觉&#xff08;2017&#xf…

1月18日代码随想录二叉树搜索、验证二叉搜索树

700.二叉搜索树中的搜索 给定二叉搜索树&#xff08;BST&#xff09;的根节点 root 和一个整数值 val。 你需要在 BST 中找到节点值等于 val 的节点。 返回以该节点为根的子树。 如果节点不存在&#xff0c;则返回 null 。 示例 1: 输入&#xff1a;root [4,2,7,1,3], val …

医院网络安全建设:三网整体设计和云数据中心架构设计

医院网络安全问题涉及到医院日常管理多个方面&#xff0c;一旦医院信息管理系统在正常运行过程中受到外部恶意攻击&#xff0c;或者出现意外中断等情况&#xff0c;都会造成海量医疗数据信息的丢失。由于医院信息管理系统中存储了大量患者个人信息和治疗方案信息等&#xff0c;…