Simple RPC - 02 通用高性能序列化和反序列化设计与实现

文章目录

  • 概述
  • 设计实现
    • 通用的序列化接口
    • 通用的序列化实现【推荐】 vs 专用的序列化实现
    • 专用序列化接口定义
    • 序列化实现

在这里插入图片描述

概述

网络传输和序列化这两部分的功能相对来说是非常通用并且独立的,在设计的时候,只要能做到比较好的抽象,这两部的实现,它的通用性是非常强的。不仅可以用于 RPC 框架中,同样可以直接拿去用于实现消息队列,或者其他需要互相通信的分布式系统中。

我们先来实现序列化和反序列化部分,因为后面的部分会用到序列化和反序列化。


设计实现

通用的序列化接口

首先我们需要设计一个可扩展的,通用的序列化接口,为了方便使用,我们直接使用静态类的方式来定义这个接口(严格来说这并不是一个接口)


public class SerializeSupport {public static  <E> E parse(byte [] buffer) {// ...}public static <E> byte [] serialize(E  entry) {// ...}
}
  • parse 方法用于反序列化
  • serialize 方法用于序列化

比如

// 序列化
MyClass myClassObject = new MyClass();
byte [] bytes = SerializeSupport.serialize(myClassObject);
// 反序列化
MyClass myClassObject1 = SerializeSupport.parse(bytes);

通用的序列化实现【推荐】 vs 专用的序列化实现

在讲解序列化和反序列化的时候说过,可以使用通用的序列化实现,也可以自己来定义专用的序列化实现。

  • 专用的序列化性能最好,但缺点是实现起来比较复杂,你要为每一种类型的数据专门编写序列化和反序列化方法。
  • 一般的 RPC 框架采用的都是通用的序列化实现,比如 gRPC 采用的是 Protobuf 序列化实现,Dubbo 支持 hession2 等好几种序列化实现

为什么这些 RPC 框架不像消息队列一样,采用性能更好的专用的序列化实现呢?这个原因很简单,消息队列它需要序列化数据的类型是固定的,只是它自己的内部通信的一些命令。但 RPC 框架,它需要序列化的数据是,用户调用远程方法的参数,这些参数可能是各种数据类型,所以必须使用通用的序列化实现,确保各种类型的数据都能被正确的序列化和反序列化。


我们这里还是采用专用的序列化实现,主要的目的是一起来实践一下,如何来实现序列化和反序列化

专用序列化接口定义

public interface Serializer<T> {/*** 计算对象序列化后的长度,主要用于申请存放序列化数据的字节数组* @param entry 待序列化的对象* @return 对象序列化后的长度*/int size(T entry);/*** 序列化对象。将给定的对象序列化成字节数组* @param entry 待序列化的对象* @param bytes 存放序列化数据的字节数组* @param offset 数组的偏移量,从这个位置开始写入序列化数据* @param length 对象序列化后的长度,也就是{@link Serializer#size(java.lang.Object)}方法的返回值。*/void serialize(T entry, byte[] bytes, int offset, int length);/*** 反序列化对象* @param bytes 存放序列化数据的字节数组* @param offset 数组的偏移量,从这个位置开始写入序列化数据* @param length 对象序列化后的长度* @return 反序列化之后生成的对象*/T parse(byte[] bytes, int offset, int length);/*** 用一个字节标识对象类型,每种类型的数据应该具有不同的类型值*/byte type();/*** 返回序列化对象类型的Class对象。*/Class<T> getSerializeClass();
}

这个接口中,除了 serialize 和 parse 这两个序列化和反序列化两个方法以外,还定义了下面这几个方法:

  • size 方法计算序列化之后的数据长度,用于事先来申请存放序列化数据的字节数组;
  • type 方法定义每种序列化实现的类型,这个类型值也会写入到序列化之后的数据中,主要的作用是在反序列化的时候,能够识别是什么数据类型的,以便找到对应的反序列化实现类;
  • getSerializeClass 这个方法返回这个序列化实现类对应的对象类型,目的是,在执行序列化的时候,通过被序列化的对象类型找到对应序列化实现类

序列化实现

利用这个 Serializer 接口,我们就可以来实现 SerializeSupport 这个支持任何对象类型序列化的通用静态类了。

首先我们定义两个 Map,这两个 Map 中存放着所有实现 Serializer 接口的序列化实现类

private static Map<Class<?>/*序列化对象类型*/, Serializer<?>/*序列化实现*/> serializerMap = new HashMap<>();
private static Map<Byte/*序列化实现类型*/, Class<?>/*序列化对象类型*/> typeMap = new HashMap<>();
  • serializerMap 中的 key 是序列化实现类对应的序列化对象的类型,它的用途是在序列化的时候,通过被序列化的对象类型,找到对应的序列化实现类
  • typeMap 的作用和 serializerMap 是类似的,它的 key 是序列化实现类的类型,用于在反序列化的时候,从序列化的数据中读出对象类型,然后找到对应的序列化实现类

理解了这两个 Map 的作用,实现序列化和反序列化这两个方法就很容易了。这两个方法的实现思路是一样的,都是通过一个类型在这两个 Map 中进行查找,查找的结果就是对应的序列化实现类的实例,也就是 Serializer 接口的实现,然后调用对应的序列化或者反序列化方法就可以了。

public class SerializeSupport {private static final Logger logger = LoggerFactory.getLogger(SerializeSupport.class);private static Map<Class<?>/*序列化对象类型*/, Serializer<?>/*序列化实现*/> serializerMap = new HashMap<>();private static Map<Byte/*序列化实现类型*/, Class<?>/*序列化对象类型*/> typeMap = new HashMap<>();static {for (Serializer serializer : ServiceSupport.loadAll(Serializer.class)) {registerType(serializer.type(), serializer.getSerializeClass(), serializer);logger.info("Found serializer, class: {}, type: {}.",serializer.getSerializeClass().getCanonicalName(),serializer.type());}}private static byte parseEntryType(byte[] buffer) {return buffer[0];}private static <E> void registerType(byte type, Class<E> eClass, Serializer<E> serializer) {serializerMap.put(eClass, serializer);typeMap.put(type, eClass);}@SuppressWarnings("unchecked")private static  <E> E parse(byte [] buffer, int offset, int length, Class<E> eClass) {Object entry =  serializerMap.get(eClass).parse(buffer, offset, length);if (eClass.isAssignableFrom(entry.getClass())) {return (E) entry;} else {throw new SerializeException("Type mismatch!");}}public static  <E> E parse(byte [] buffer) {return parse(buffer, 0, buffer.length);}private static  <E> E parse(byte[] buffer, int offset, int length) {byte type = parseEntryType(buffer);@SuppressWarnings("unchecked")Class<E> eClass = (Class<E> )typeMap.get(type);if(null == eClass) {throw new SerializeException(String.format("Unknown entry type: %d!", type));} else {return parse(buffer, offset + 1, length - 1,eClass);}}public static <E> byte [] serialize(E  entry) {@SuppressWarnings("unchecked")Serializer<E> serializer = (Serializer<E>) serializerMap.get(entry.getClass());if(serializer == null) {throw new SerializeException(String.format("Unknown entry class type: %s", entry.getClass().toString()));}byte [] bytes = new byte [serializer.size(entry) + 1];bytes[0] = serializer.type();serializer.serialize(entry, bytes, 1, bytes.length - 1);return bytes;}
}

所有的 Serializer 的实现类是怎么加载到 SerializeSupport 的那两个 Map 中的呢?这里面利用了 Java 的一个 SPI 类加载机制

public class ServiceSupport {private final static Map<String, Object> singletonServices = new HashMap<>();public synchronized static <S> S load(Class<S> service) {return StreamSupport.stream(ServiceLoader.load(service).spliterator(), false).map(ServiceSupport::singletonFilter).findFirst().orElseThrow(ServiceLoadException::new);}public synchronized static <S> Collection<S> loadAll(Class<S> service) {return StreamSupport.stream(ServiceLoader.load(service).spliterator(), false).map(ServiceSupport::singletonFilter).collect(Collectors.toList());}@SuppressWarnings("unchecked")private static <S>  S singletonFilter(S service) {if(service.getClass().isAnnotationPresent(Singleton.class)) {String className = service.getClass().getCanonicalName();Object singletonInstance = singletonServices.putIfAbsent(className, service);return singletonInstance == null ? service : (S) singletonInstance;} else {return service;}}
}

到这里,我们就封装好了一个通用的序列化的接口,

  • 对于使用序列化的模块来说,它只要依赖 SerializeSupport 这个静态类,调用它的序列化和反序列化方法就可以了,不需要依赖任何序列化实现类。

  • 对于序列化实现的提供者来说,也只需要依赖并实现 Serializer 这个接口就可以了。

比如,我们的 HelloService 例子中的参数是一个 String 类型的数据,我们需要实现一个支持 String 类型的序列化实现

public class StringSerializer implements Serializer<String> {@Overridepublic int size(String entry) {return entry.getBytes(StandardCharsets.UTF_8).length;}@Overridepublic void serialize(String entry, byte[] bytes, int offset, int length) {byte [] strBytes = entry.getBytes(StandardCharsets.UTF_8);System.arraycopy(strBytes, 0, bytes, offset, strBytes.length);}@Overridepublic String parse(byte[] bytes, int offset, int length) {return new String(bytes, offset, length, StandardCharsets.UTF_8);}@Overridepublic byte type() {return Types.TYPE_STRING;}@Overridepublic Class<String> getSerializeClass() {return String.class;}
}

在把 String 和 byte 数组做转换的时候,一定要指定编码方式,确保序列化和反序列化的时候都使用一致的编码,我们这里面统一使用 UTF8 编码。否则,如果遇到执行序列化和反序列化的两台服务器默认编码不一样,就会出现乱码。我们在开发过程用遇到的很多中文乱码问题,绝大部分都是这个原因

还有一个更复杂的序列化实现 MetadataSerializer,用于将注册中心的数据持久化到文件中

/*** Size of the map                     2 bytes*      Map entry:*          Key string:*              Length:                2 bytes*              Serialized key bytes:  variable length*          Value list*              List size:              2 bytes*              item(URI):*                  Length:             2 bytes*                  serialized uri:     variable length*              item(URI):*              ...*      Map entry:*      ...**/
public class MetadataSerializer implements Serializer<Metadata> {@Overridepublic int size(Metadata entry) {return Short.BYTES +                   // Size of the map                  2 bytesentry.entrySet().stream().mapToInt(this::entrySize).sum();}@Overridepublic void serialize(Metadata entry, byte[] bytes, int offset, int length) {ByteBuffer buffer = ByteBuffer.wrap(bytes, offset, length);buffer.putShort(toShortSafely(entry.size()));entry.forEach((k,v) -> {byte [] keyBytes = k.getBytes(StandardCharsets.UTF_8);buffer.putShort(toShortSafely(keyBytes.length));buffer.put(keyBytes);buffer.putShort(toShortSafely(v.size()));for (URI uri : v) {byte [] uriBytes = uri.toASCIIString().getBytes(StandardCharsets.UTF_8);buffer.putShort(toShortSafely(uriBytes.length));buffer.put(uriBytes);}});}private int entrySize(Map.Entry<String, List<URI>> e) {// Map entry:return Short.BYTES +       // Key string length:               2 bytese.getKey().getBytes().length +    // Serialized key bytes:   variable lengthShort.BYTES + // List size:              2 bytese.getValue().stream() // Value list.mapToInt(uri -> {return Short.BYTES +       // Key string length:               2 bytesuri.toASCIIString().getBytes(StandardCharsets.UTF_8).length;    // Serialized key bytes:   variable length}).sum();}@Overridepublic Metadata parse(byte[] bytes, int offset, int length) {ByteBuffer buffer = ByteBuffer.wrap(bytes, offset, length);Metadata metadata = new Metadata();int sizeOfMap = buffer.getShort();for (int i = 0; i < sizeOfMap; i++) {int keyLength = buffer.getShort();byte [] keyBytes = new byte [keyLength];buffer.get(keyBytes);String key = new String(keyBytes, StandardCharsets.UTF_8);int uriListSize = buffer.getShort();List<URI> uriList = new ArrayList<>(uriListSize);for (int j = 0; j < uriListSize; j++) {int uriLength = buffer.getShort();byte [] uriBytes = new byte [uriLength];buffer.get(uriBytes);URI uri  = URI.create(new String(uriBytes, StandardCharsets.UTF_8));uriList.add(uri);}metadata.put(key, uriList);}return metadata;}@Overridepublic byte type() {return Types.TYPE_METADATA;}@Overridepublic Class<Metadata> getSerializeClass() {return Metadata.class;}private short toShortSafely(int v) {assert v < Short.MAX_VALUE;return (short) v;}
}

到这里序列化的部分就实现完成了。我们这个序列化的实现,对外提供服务的就只有一个 SerializeSupport 静态类,并且可以通过扩展支持序列化任何类型的数据,这样一个通用的实现,不仅可以用在我们这个 RPC 框架的例子中,完全可以把这部分直接拿过去用在业务代码中


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/112865.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

全光谱护眼灯有哪些?2023全光谱护眼台灯推荐

随着电子设备的不断普及&#xff0c;手机、平板电脑、显示器、电视机等几乎是家家户户的必备品&#xff0c;也正因为眼睛有那么多时间、那么多机会去盯着屏幕&#xff0c;所以如今近视低龄化现象也越来越严重了。随着科技的不断发展&#xff0c;台灯的发展也越来越多样化&#…

成都瀚网科技有限公司:开抖音店铺有哪些注意事项?

成功经营一个小店不仅仅是发布产品视频那么简单&#xff0c;还需要注意一些重要的事情。开抖音店铺需要注意以下几点&#xff1a; 1、开抖音店铺有哪些注意事项&#xff1f; 合规管理&#xff1a;在抖音开店&#xff0c;首先要确保自己的运营合规。遵守相关法律法规及平台规定&…

Elasticsearch学习笔记

1.核心概念 bucket: 一个数据分组&#xff08;类似于sql group by以后的数据&#xff09;metric&#xff1a;对bucket执行的某种聚合分析的操作&#xff0c;比如说求平均值&#xff0c;最大值&#xff0c;最小值。一些系列的统计方法(类似 select count(1) MAX MIN AVG) 请…

CUDA学习笔记5——CUDA程序错误检测

CUDA程序错误检测 所有CUDA的API函数都有一个类型为cudaError_t的返回值&#xff0c;代表了一种错误信息&#xff1b;只有返回cudaSuccess时&#xff0c;才是成功调用。 cudaGetLastError()用来检测核函数的执行是否出错cudaGetErrorString()输出错误信息 #include <stdi…

【lesson13】进程控制初识

文章目录 进程创建 进程创建 请你描述一下&#xff0c;fork创建子进程操作系统都做了什么&#xff1f; fork创建子进程&#xff0c;系统里多了一个进程&#xff0c;进程 内核数据结构 进程代码数据&#xff0c;内核数据结构由OS维护&#xff0c;进程代码数据一般由磁盘维护。…

【三维重建】DreamGaussian:高斯splatting的单视图3D内容生成(原理+代码)

文章目录 摘要一、前言二、相关工作2.1 3D表示2.2 Text-to-3D2.3 Image-to-3D 三、本文方法3.1生成式 高斯 splitting3.2 高效的 mesh 提取3.3 UV空间的纹理优化 四. 实验4.1实施细节4.2 定性比较4.3 定量比较4.4 消融实验 总结&#xff08;特点、局限性&#xff09; 五、安装与…

【框架源码篇 01】Spring源码-手写IOC

Spring源码手写篇-手写IoC 一、IoC分析 1.Spring的核心 在Spring中非常核心的内容是 IOC和 AOP. 2.IoC的几个疑问? 2.1 IoC是什么&#xff1f; IoC:Inversion of Control 控制反转&#xff0c;简单理解就是&#xff1a;依赖对象的获得被反转了。 2.2 IoC有什么好处? IoC带…

[ROS2系列] ORBBEC(奥比中光)AstraPro相机在ROS2进行rtabmap 3D建图

目录 背景&#xff1a; 一、驱动AstraPro摄像头 二、安装rtabmap error1&#xff1a;缺包 三、尝试 四、参数讲解 五、运行 error2: Did not receive data since 5 seconds! 六、效果​编辑 error4: 背景&#xff1a; 1、设备&#xff1a;pc&#xff1b;jeston agx …

语音芯片KT142C两种音频输出方式PWM和DAC的区别

目录 语音芯片KT142C两种音频输出方式PWM和DAC的区别 一般的语音芯片&#xff0c;输出方式&#xff0c;无外乎两种&#xff0c;即dac输出&#xff0c;或者PWM输出 其中dac的输出&#xff0c;一般应用场景都是外挂功放芯片&#xff0c;实现声音的放大&#xff0c;比如常用的音箱…

WMS透明仓库:实现仓储的全方位可视化与优化

一、WMS透明仓库的定义与特点 1. WMS透明仓库的定义&#xff1a;WMS透明仓库是一种基于信息技术的仓库管理系统&#xff0c;通过实时数据采集、分析和可视化&#xff0c;将仓库内外的物流流程、库存状态、人员活动等信息以透明的方式展示给相关利益方。 2. 实时数据采集&…

性能评测 | GreatDB VIP PLUGIN方案 VS MySQL InnoDB Cluster高可用方案

前言 最近&#xff0c;我们与许多数据库用户进行了沟通和调研&#xff0c;了解到&#xff0c;目前仍有相当一部分投产的MySQL高可用或故障转移方案&#xff0c;用到了读写分离功能或业务接入VIP&#xff08;Virtual IP Address&#xff09;的方式&#xff0c;来屏蔽后端数据库架…

MySQL 性能分析

MySQL 性能分析 对 mysql 进行性能分析&#xff0c;主要就是提升查询的效率&#xff0c;其中索引占主导地位。对 mysql 进行性能分析主要有如下几种方式&#xff1a; 方式一&#xff1a;查看 sql 执行频次 show global status like ‘Com_______’; // global 表示全局 show s…

[每周一更]-(第68期):Excel常用函数及常用操作

日常工作&#xff0c;偶尔也会存在excel表格入库的情况&#xff0c;针对复杂的入库情况&#xff0c;一般都是代码编号&#xff0c;读文件-写db形式&#xff1b;但是有些简单就直接操作&#xff0c;但是 这些简单的入库不仅仅是直接入库&#xff0c;而是内容中有部分需要进行映射…

防水款无源NFC卡片

产品参数&#xff1a; PN29_T 产品参数 产品型号 PN29_T 尺寸(mm) 85.8*41*2.9mm 显示技术 电子墨水屏 显示区域(mm) 29(H) * 66.9(V) 分辨率(像素) 296*128 像素尺寸(mm) 0.227*0.226 显示颜色 黑/白 视觉角度 180 工作温度 0-50℃ 电池 无需电池 工作…

Stable Diffusion原理

一、Diffusion扩散理论 1.1、 Diffusion Model&#xff08;扩散模型&#xff09; Diffusion扩散模型分为两个阶段&#xff1a;前向过程 反向过程 前向过程&#xff1a;不断往输入图片中添加高斯噪声来破坏图像反向过程&#xff1a;使用一系列马尔可夫链逐步将噪声还原为原始…

“智能+”时代,深维智信如何借助阿里云打造AI内容生成系统

云布道师 前言&#xff1a; 随着数字经济的发展&#xff0c;线上数字化远程销售模式越来越成为一种主流&#xff0c;销售流程也演变为线上视频会议、线下拜访等多种方式的结合。根据 Gartner 报告&#xff0c;到 2025 年 60% 的 B2B 销售组织将从基于经验和直觉的销售转变为数…

stable diffusion如何解决gradio外链无法开启的问题

问题确认 为了确认gradio开启不了是gradio库的问题还是stable diffusion的问题&#xff0c;可以先执行这样一段demo代码 import gradio as grdef greet(name):return "Hello " name "!"demo gr.Interface(fngreet, inputs"text", outputs&q…

Unity之ShaderGraph如何实现飘动的红旗

前言 今天我们来实现一个飘动的红旗 如图所示&#xff1a; 关键节点 SimpleNoise&#xff1a;根据输入UV生成简单噪声或Value噪声。生成的噪声的大小由输入Scale控制。 Split&#xff1a;将输入向量In拆分为四个Float输出R、G、B和A。这些输出向量由输入In的各个通道定义&…

UiPath:一家由生成式AI驱动的流程自动化软件公司

来源&#xff1a;猛兽财经 作者&#xff1a;猛兽财经 总结&#xff1a; &#xff08;1&#xff09;UiPath(PATH)的股价并没有因为生成式AI的炒作而上涨&#xff0c;但很可能会成为主要受益者。 &#xff08;2&#xff09;即使在严峻的宏观环境下&#xff0c;UiPath的收入还在不…

Stable Diffusion WebUI几种解决手崩溃的方法

1. 添加与手相关负面提示词 如何提价提示词呢? 首先有一个embeddings模型文件bad-hands-5,我们可以去各个大模型网站去搜,我是在C站上面下载的。 附上C站地址:https://civitai.com/ 下载好之后,你需要将文件放入stable-diffusion-webui\embeddings目录中。位置如下所示…