自定义Dubbo RPC通信协议

前言

Dubbo 协议层的核心SPI接口是org.apache.dubbo.rpc.Protocol,通过扩展该接口和围绕的相关接口,就可以让 Dubbo 使用我们自定义的协议来通信。默认的协议是 dubbo,本文提供一个 Grpc 协议的实现。

设计思路

Google 提供了 Java 的 Grpc 实现,所以我们站在巨人的肩膀上即可,就不用重复造轮子了。

首先,我们要实现 Protocol 接口,服务暴露时开启我们的 GrpcServer,绑定本地端口,用于后续处理连接和请求。
服务端如何处理grpc请求呢???
方案一,是把暴露的所有服务 Invoker 都封装成grpc的 Service,全部统一让 GrpcServer 处理,但是这么做太麻烦了。方案二,是提供一个 DispatcherService,统一处理客户端发来的grpc请求,再根据参数查找要调用的服务,执行本地调用返回结果。本文采用方案二。
客户端引用服务时,我们创建 GrpcInvoker 对象,和服务端建立连接并生成 DispatcherService 的本地存根 Stub 对象,发起 RPC 调用时只需把 RpcInvocation 转换成 Protobuf 消息发出去即可。

实现GrpcProtocol

项目结构

首先,我们新建一个dubbo-extension-protocol-grpc模块,引入必要的依赖。

<dependencies><dependency><groupId>org.apache.dubbo</groupId><artifactId>dubbo-rpc-api</artifactId><version>${dubbo.version}</version></dependency><dependency><groupId>io.grpc</groupId><artifactId>grpc-all</artifactId><version>1.56.1</version></dependency>
</dependencies>

项目结构:

main
--java
----dubbo.extension.rpc.grpc
------message
--------RequestData.java
--------ResponseData.java
------Codec.java
------DispatcherService.java
------DispatcherServiceGrpc.java
------GrpcExporter.java
------GrpcInvoker.java
------GrpcProtocol.java
------GrpcProtocolServer.java
--resources
----META-INF/dubbo
------org.apache.dubbo.rpc.Protocol

服务&消息定义

然后是定义grpc的 Service 和消息格式
DispatcherService.proto 请求分发服务的定义

syntax = "proto3";option java_multiple_files = true;
option java_package = "dubbo.extension.rpc.grpc";
option java_outer_classname = "DispatcherServiceProto";
option objc_class_prefix = "HLW";import "RequestData.proto";
import "ResponseData.proto";service DispatcherService {rpc dispatch (RequestData) returns (ResponseData) {}
}

RequestData.proto 请求消息的定义,主要是对 Invocation 的描述

syntax = "proto3";option java_multiple_files = true;
option java_package = "dubbo.extension.rpc.grpc.message";
option java_outer_classname = "RequestDataProto";
option objc_class_prefix = "HLW";message RequestData {string targetServiceUniqueName = 1;string methodName = 2;string serviceName = 3;repeated bytes parameterTypes = 4;string parameterTypesDesc = 5;repeated bytes arguments = 6;bytes attachments = 7;
}

ResponseData.proto 响应消息的定义,主要是对 AppResponse 的描述

syntax = "proto3";option java_multiple_files = true;
option java_package = "dubbo.extension.rpc.grpc.message";
option java_outer_classname = "ResponseataProto";
option objc_class_prefix = "HLW";message ResponseData {int32 status = 1;string errorMessage = 2;bytes result = 3;bytes attachments = 4;
}

使用protobuf-maven-plugin插件把 proto 文件生成对应的 Java 类。

协议实现

新建 GrpcProtocol 类,继承 AbstractProtocol,实现 Protocol 协议细节。
核心是:服务暴露时开启 Grpc 服务,引用服务时生成对应的 Invoker。

public class GrpcProtocol extends AbstractProtocol {@Overrideprotected <T> Invoker<T> protocolBindingRefer(Class<T> type, URL url) throws RpcException {return new GrpcInvoker<>(type, url);}@Overridepublic int getDefaultPort() {return 18080;}@Overridepublic <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {GrpcExporter<T> exporter = new GrpcExporter<>(invoker);exporterMap.put(invoker.getInterface().getName(), exporter);openServer(invoker.getUrl());return exporter;}private void openServer(URL url) {String key = serviceKey(url);ProtocolServer protocolServer = serverMap.get(key);if (protocolServer == null) {synchronized (serverMap) {protocolServer = serverMap.get(key);if (protocolServer == null) {serverMap.put(key, createServer(url));}}}}private ProtocolServer createServer(URL url) {return new GrpcProtocolServer(url, exporterMap);}
}

新建 GrpcProtocolServer 类实现 ProtocolServer 接口,核心是启动 GrpcServer,并添加 DispatcherService 处理请求。

public class GrpcProtocolServer implements ProtocolServer {private final Server server;public GrpcProtocolServer(URL url, Map<String, Exporter<?>> exporterMap) {server = ServerBuilder.forPort(url.getPort()).addService(new DispatcherService(exporterMap)).build();try {server.start();} catch (IOException e) {throw new RuntimeException(e);}}@Overridepublic String getAddress() {return null;}@Overridepublic void setAddress(String address) {}@Overridepublic void close() {server.shutdown();}
}

新建 DispatcherService 类实现 Grpc Service,用来处理客户端的grpc请求。核心是把 RequestData 解码成 RpcInvocation,再查找本地 Invoker 调用并返回结果。

public class DispatcherService extends DispatcherServiceGrpc.DispatcherServiceImplBase {private final Map<String, Exporter<?>> exporterMap;public DispatcherService(Map<String, Exporter<?>> exporterMap) {this.exporterMap = exporterMap;}@Overridepublic void dispatch(RequestData request, StreamObserver<ResponseData> responseObserver) {RpcInvocation invocation = Codec.decodeInvocation(request);ResponseData responseData;try {Invoker<?> invoker = exporterMap.get(invocation.getServiceName()).getInvoker();Object returnValue = invoker.invoke(invocation).get().getValue();responseData = Codec.encodeResponse(returnValue, null);} catch (Exception e) {responseData = Codec.encodeResponse(null, e);}responseObserver.onNext(responseData);responseObserver.onCompleted();}
}

新建 GrpcInvoker 类实现 Invoker 接口,服务引用时会创建它,目的是发起 RPC 调用时通过 Stub 发一个请求到 DispatcherService,实现grpc协议的 RPC 调用。

public class GrpcInvoker<T> extends AbstractInvoker<T> {private static final Map<String, DispatcherServiceGrpc.DispatcherServiceFutureStub> STUB_MAP = new ConcurrentHashMap<>();public GrpcInvoker(Class<T> type, URL url) {super(type, url);}private DispatcherServiceGrpc.DispatcherServiceFutureStub getStub() {String key = getUrl().getAddress();DispatcherServiceGrpc.DispatcherServiceFutureStub stub = STUB_MAP.get(key);if (stub == null) {synchronized (STUB_MAP) {stub = STUB_MAP.get(key);if (stub == null) {STUB_MAP.put(key, stub = createClient(getUrl()));}}}return stub;}private DispatcherServiceGrpc.DispatcherServiceFutureStub createClient(URL url) {ManagedChannel channel = ManagedChannelBuilder.forAddress(url.getHost(), url.getPort()).usePlaintext().build();return DispatcherServiceGrpc.newFutureStub(channel);}@Overrideprotected Result doInvoke(Invocation invocation) throws Throwable {RequestData requestData = Codec.encodeInvocation((RpcInvocation) invocation);ResponseData responseData = getStub().dispatch(requestData).get();return Codec.decodeResponse(responseData, invocation);}
}

最后是编解码器 Codec,它的作用是对 RequestData、ResponseData 对象的编解码。对于请求来说,要编解码的是 RpcInvocation;对于响应来说,要编解码的是返回值和异常信息。
方法实参是 Object[] 类型,附带参数是 Map 类型,本身不能直接通过 Protobuf 传输,我们会先利用 Serialization 序列化成字节数组后再传输。

public class Codec {private static final Serialization serialization = ExtensionLoader.getExtensionLoader(Serialization.class).getDefaultExtension();public static RequestData encodeInvocation(RpcInvocation invocation) {RequestData.Builder builder = RequestData.newBuilder().setTargetServiceUniqueName(invocation.getTargetServiceUniqueName()).setMethodName(invocation.getMethodName()).setServiceName(invocation.getServiceName());for (Class<?> parameterType : invocation.getParameterTypes()) {builder.addParameterTypes(serialize(parameterType));}builder.setParameterTypesDesc(invocation.getParameterTypesDesc());for (Object argument : invocation.getArguments()) {builder.addArguments(serialize(argument));}builder.setAttachments(serialize(invocation.getAttachments()));return builder.build();}public static RpcInvocation decodeInvocation(RequestData requestData) {RpcInvocation invocation = new RpcInvocation();invocation.setTargetServiceUniqueName(requestData.getTargetServiceUniqueName());invocation.setMethodName(requestData.getMethodName());invocation.setServiceName(requestData.getServiceName());List<ByteString> parameterTypesList = requestData.getParameterTypesList();Class<?>[] parameterTypes = new Class[parameterTypesList.size()];for (int i = 0; i < parameterTypesList.size(); i++) {parameterTypes[i] = (Class<?>) deserialize(parameterTypesList.get(i));}invocation.setParameterTypes(parameterTypes);invocation.setParameterTypesDesc(requestData.getParameterTypesDesc());List<ByteString> argumentsList = requestData.getArgumentsList();Object[] arguments = new Object[argumentsList.size()];for (int i = 0; i < argumentsList.size(); i++) {arguments[i] = deserialize(argumentsList.get(i));}invocation.setArguments(arguments);invocation.setAttachments((Map<String, String>) deserialize(requestData.getAttachments()));return invocation;}public static Result decodeResponse(ResponseData responseData, Invocation invocation) {AppResponse appResponse = new AppResponse();if (responseData.getStatus() == 200) {appResponse.setValue(deserialize(responseData.getResult()));appResponse.setAttachments((Map<String, String>) deserialize(responseData.getAttachments()));} else {appResponse.setException(new RuntimeException(responseData.getErrorMessage()));}return new AsyncRpcResult(CompletableFuture.completedFuture(appResponse), invocation);}private static Object deserialize(ByteString byteString) {try {InputStream inputStream = new ByteArrayInputStream(byteString.toByteArray());ObjectInput objectInput = serialization.deserialize(null, inputStream);return objectInput.readObject();} catch (Exception e) {throw new RuntimeException(e);}}private static ByteString serialize(Object obj) {try {ByteArrayOutputStream outputStream = new ByteArrayOutputStream();ObjectOutput output = serialization.serialize(null, outputStream);output.writeObject(obj);output.flushBuffer();return ByteString.copyFrom(outputStream.toByteArray());} catch (Exception e) {throw new RuntimeException(e);}}public static ResponseData encodeResponse(Object returnValue, Throwable throwable) {ResponseData.Builder builder = ResponseData.newBuilder();if (throwable == null) {builder.setStatus(200);builder.setResult(serialize(returnValue));builder.setAttachments(serialize(new HashMap<>()));//先忽略} else {builder.setStatus(500);builder.setErrorMessage(throwable.getMessage());}return builder.build();}
}

实现完毕,最后是让 Dubbo 可以加载到我们自定义的 GrpcProtocol,可以通过 SPI 的方式。新建META-INF/dubbo/org.apache.dubbo.rpc.Protocol文件,内容:

grpc=dubbo.extension.rpc.grpc.GrpcProtocol

服务提供方使用自定义协议:

ProtocolConfig protocolConfig = new ProtocolConfig("grpc", 10880);

消费方使用自定义协议:

ReferenceConfig#setUrl("grpc://127.0.0.1:10880");

尾巴

Protocol 层关心的是如何暴露服务和引用服务,以及如何让双方使用某个具体的协议来通信,以完成 RPC 调用。如果你觉得官方提供的 dubbo 协议无法满足你的业务,就可以通过扩展 Protocol 接口来实现你自己的私有协议。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/632851.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

VSCode使用Makefile Tools插件开发C/C++程序

提起Makefile&#xff0c;可能有人会觉得它已经过时了&#xff0c;毕竟现在有比它更好的工具&#xff0c;比如CMake&#xff0c;XMake&#xff0c;Meson等等&#xff0c;但是在Linux下很多C/C源码都是直接或者间接使用Makefile文件来编译项目的&#xff0c;可以说Makefile是基石…

C++从零开始的打怪升级之路(day14)

这是关于一个普通双非本科大一学生的C的学习记录贴 在此前&#xff0c;我学了一点点C语言还有简单的数据结构&#xff0c;如果有小伙伴想和我一起学习的&#xff0c;可以私信我交流分享学习资料 那么开启正题 今天分享的内容是string类 这里给上官方的文档链接&#xff0c;…

qt学习:进度条,水平滑动条,垂直滑动条+rgb调试实战

目录 水平滑动条&#xff0c;垂直滑动条 常用信号 进度条 常用信号 修改进度条 例子 rgb调色 配置ui界面 编写3个进度条的事件函数 添加链表容器和按钮索引 在.h里的类定义 初始化链表容器和按钮索引 编写添加颜色的按钮点击事件函数 效果 水平滑动条&#xff0c…

【Flink】FlinkSQL读取Mysql表中时间字段相差13个小时

问题:Flink版本1.13,在我们使用FlinkSQL读取Mysql中数据的时候,发现读取出来的时间字段中的数据和Mysql表中的数据相差13个小时,Mysql建表语句及插入的数据如下; CREATE TABLE `mysql_example` ( `id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT 自增ID, `name` v…

Java-初识正则表达式 以及 练习

目录 什么是正则表达式&#xff1f; 1. 正则表达式---字符类&#xff08;一个大括号匹配一个字符&#xff09;&#xff1a; 2. 正则表达式---预字符类&#xff08;也是匹配一个字符&#xff09;&#xff1a; 正则表达式---数量词 &#xff08;可以匹配多个字符&#xff09;…

【前后端的那些事】15min快速实现图片上传,预览功能(ElementPlus+Springboot)

文章目录 Element Plus SpringBoot实现图片上传&#xff0c;预览&#xff0c;删除效果展示 1. 后端代码1.1 controller1.2 service 2. 前端代码2.1 路由创建2.2 api接口2.2 文件创建 3. 前端上传组件封装 前言&#xff1a;最近写项目&#xff0c;发现了一些很有意思的功能&…

网络安全产品之认识WEB应用防火墙

随着B/S架构的广泛应用&#xff0c;Web应用的功能越来越丰富&#xff0c;蕴含着越来越有价值的信息&#xff0c;应用程序漏洞被恶意利用的可能性越来越大&#xff0c;因此成为了黑客主要的攻击目标。传统防火墙无法解析HTTP应用层的细节&#xff0c;对规则的过滤过于死板&#…

速盾网络:高防ip是什么

速盾网络&#xff1a;高防IP是什么 在当今信息化社会中&#xff0c;网络安全问题日益突出&#xff0c;各种网络攻击威胁层出不穷。为了保护企业的网络安全&#xff0c;提高网络业务的稳定性&#xff0c;高防IP应运而生。那么&#xff0c;什么是高防IP呢&#xff1f; 高防IP是…

Jackson标签的高阶使用样例--多继承/子类、对象id、JsonIdentityInfo、JsonTypeInfo、JsonSubTypes

1. 背景 最近笔者在开发大数据平台XSailboat 的 数据资产目录 模块。它的其中一个功能是能定义并查看资产数据。我们支持的资产类型不仅有关系数据库表&#xff0c;也支持Kafka主题&#xff0c;hdfs上的文件等。对于Kafka主题&#xff0c;hdfs文件等&#xff0c;它们没有强模式…

CSS Position总结:定位属性的实战技巧

CSS Position总结&#xff1a;定位属性的实战技巧 大家好&#xff0c;我是免费搭建查券返利机器人赚佣金就用微赚淘客系统3.0的小编&#xff0c;也是冬天不穿秋裤&#xff0c;天冷也要风度的程序猿&#xff01;在今天的文章中&#xff0c;我们将深入研究CSS中一个至关重要的属…

移动云助力智慧交通数智化升级

智慧交通是在整个交通运输领域充分利用物联网、空间感知、云计算、移动互联网等新一代信息技术&#xff0c;综合运用交通科学、系统方法、人工智能、知识挖掘等理论与工具&#xff0c;以全面感知、深度融合、主动服务、科学决策为目标&#xff0c;推动交通运输更安全、更高效、…

软件设计师5--CISC与RISC

软件设计师5--CISC与RISC 考点1&#xff1a;CISC与RISC有什么不同考点2&#xff1a;CISC与RISC比较&#xff0c;分哪些维度例题&#xff1a; 考点1&#xff1a;CISC与RISC有什么不同 考点2&#xff1a;CISC与RISC比较&#xff0c;分哪些维度 例题&#xff1a; 1、以下关于RISC…

微信小程序安卓系统下Input输入内容上移错位问题的解决办法

在较长的表单中&#xff0c;页面可能需要滑动&#xff0c; 在这种情况下&#xff0c;在苹果手机上使用Input显示正常&#xff0c;但是在安卓手机上就会出现输入内容上移错位的问题,严重影响使用 需要设置一个状态控制scroll-view是否允许滑动&#xff0c;当Input获取焦点是&am…

SD-WAN组网设计原则:灵活、安全、高效

在实现按需、灵活和安全的SD-WAN组网方案中&#xff0c;我们必须遵循一系列关键的设计原则&#xff0c;以确保网络的可靠性和效率。通过以下几点设计原则&#xff0c;SD-WAN能够满足企业对灵活性、安全性和高效性的迫切需求。 灵活的Overlay网络互联 SD-WAN通过IP地址在站点之间…

Unicode编码

文章目录 前言一、Unicode &#xff1f;二、前端工程师使用Unicode三、Javascript中处理 Unicode总结 前言 一、Unicode &#xff1f; Unicode 是一种字符编码标准&#xff0c;旨在为世界上所有的字符&#xff08;包括各种语言、符号和特殊字符&#xff09;提供唯一的数字标识…

UniAPP社区论坛项目实战--社区服务 API 接口文档

社区服务 API 接口文档 社区服务 API 接口文档基础相关信息一、广告管理1.1 查询所有广告位1.2 获取一个广告位的广告列表1.3 批量获取广告列表 二、动态管理2.1 批量获取动态列表信息2.2 获取指定 ID 动态详情2.3 创建一条动态2.4 点赞、取消点赞、点赞列表 当前动态2.5 评论指…

深度求索开源国内首个 MoE 大模型 | DeepSeekMoE:在专家混合语言模型中实现终极专家专业化

文章目录 一、前言二、主要内容三、总结 &#x1f349; CSDN 叶庭云&#xff1a;https://yetingyun.blog.csdn.net/ 一、前言 在大语言模型时代&#xff0c;混合专家模型&#xff08;MoE&#xff09;是一种很有前途的架构&#xff0c;用于在扩展模型参数时管理计算成本。然而&a…

旅游项目day03

1. 前端整合后端发短信接口 2. 注册功能 后端提供注册接口&#xff0c;接受前端传入的参数&#xff0c;创建新的用户对象&#xff0c;保存到数据库。 接口设计&#xff1a; 实现步骤&#xff1a; 手机号码唯一性校验&#xff08;后端一定要再次校验手机号唯一性&#xff09…

Vray渲染效果图材质参数设置

渲染是创造出引人入胜视觉效果的关键步骤&#xff0c;在视觉艺术领域尤为重要。不过&#xff0c;渲染作为一个资源密集型的过程&#xff0c;每当面对它时&#xff0c;我们往往都会遭遇到时间消耗和资源利用的巨大挑战。幸运的是&#xff0c;有几种方法能够帮助我们优化渲染&…

【51单片机】数码管的静态与动态显示(含消影)

数码管在现实生活里是非常常见的设备&#xff0c;例如 这些数字的显示都是数码管的应用。 目录 静态数码管&#xff1a;器件介绍&#xff1a;数码管的使用&#xff1a;译码器的使用&#xff1a;缓冲器&#xff1a; 实现原理&#xff1a;完整代码&#xff1a; 动态数码管&#…