Flink 数据序列化

为 Flink 量身定制的序列化框架

大家都知道现在大数据生态非常火,大多数技术组件都是运行在JVM上的,Flink也是运行在JVM上,基于JVM的数据分析引擎都需要将大量的数据存储在内存中,这就不得不面临JVM的一些问题,比如Java对象存储密度较低等。针对这些问题,最常用的方法就是实现一个显式的内存管理,也就是说用自定义的内存池来进行内存的分配回收,接着将序列化后的对象存储到内存块中。
现在Java生态圈中已经有许多序列化框架,比如说Java serialization, Kryo,Apache Avro等等。但是Flink依然是选择了自己定制的序列化框架,那么到底有什么意义呢?若Flink选择自己定制的序列化框架,对类型信息了解越多,可以在早期完成类型检查,更好的选取序列化方式,进行数据布局,节省数据的存储空间,直接操作二进制数据。
在这里插入图片描述Flink在其内部构建了一套自己的类型系统,Flink现阶段支持的类型分类如图所示,从图中可以看到Flink类型可以分为基础类型Basic、数组Arrays、复合类型Composite、辅助类型Auxiliary、泛型和其它类型GenericFlink支持任意的Java或是Scala类型。不需要像Hadoop一样去实现一个特定的接口org.apache.hadoop.io.WritableFlink能够自动识别数据类型。
在这里插入图片描述
TypeInformation的思维导图如图所示,从图中可以看出,在Flink中每一个具体的类型都对应了一个具体的TypeInformation实现类,例如BasicTypeInformation中的IntegerTypeInformationFractionalTypeInformation都具体的对应了一个TypeInformation。然后还有BasicArrayTypeInformationCompositeType以及一些其它类型,也都具体对应了一个TypeInformation

TypeInformationFlink类型系统的核心类。对于用户自定义的Function来说,Flink需要一个类型信息来作为该函数的输入输出类型,即TypeInfomation。该类型信息类作为一个工具来生成对应类型的序列化器TypeSerializer,并用于执行语义检查,比如当一些字段在作为joingrouping的键时,检查这些字段是否在该类型中存在。

Flink 的序列化过程

Flink序列化过程中,进行序列化操作必须要有序列化器,那么序列化器从何而来?
每一个具体的数据类型都对应一个TypeInformation的具体实现,每一个TypeInformation都会为对应的具体数据类型提供一个专属的序列化器。通过 Flink的序列化过程图可以看到TypeInformation会提供一个createSerialize()方法,通过这个方法就可以得到该类型进行数据序列化操作与反序化操作的对象TypeSerializer
在这里插入图片描述对于大多数数据类型 Flink可以自动生成对应的序列化器,能非常高效地对数据集进行序列化和反序列化,比如,BasicTypeInfoWritableTypeInfo等,但针对GenericTypeInfo类型,Flink会使用Kyro进行序列化和反序列化。其中,TuplePojoCaseClass类型是复合类型,它们可能嵌套一个或者多个数据类型。在这种情况下,它们的序列化器同样是复合的。它们会将内嵌类型的序列化委托给对应类型的序列化器。

简单的介绍下Pojo的类型规则,即在满足一些条件的情况下,才会选用Pojo的序列化进行相应的序列化与反序列化的一个操作。即类必须是Public的,且类有一个public的无参数构造函数,该类(以及所有超类)中的所有非静态no-static、非瞬态no-transient字段都是public的(和非最终的final)或者具有公共gettersetter方法,该方法遵循gettersetterJava bean命名约定。当用户定义的数据类型无法识别为POJO类型时,必须将其作为GenericType处理并使用Kryo进行序列化。

Flink自带了很多TypeSerializer子类,大多数情况下各种自定义类型都是常用类型的排列组合,因而可以直接复用,如果内建的数据类型和序列化方式不能满足你的需求,Flink的类型信息系统也支持用户拓展。若用户有一些特殊的需求,只需要实现 TypeInformationTypeSerializerTypeComparator即可定制自己类型的序列化和比较大小方式,来提升数据类型在序列化和比较时的性能。

序列化就是将数据结构或者对象转换成一个二进制串的过程,在Java里面可以简单地理解成一个byte数组。而反序列化恰恰相反,就是将序列化过程中所生成的二进制串转换成数据结构或者对象的过程。下面就以内嵌型的Tuple3这个对象为例,简述一下它的序列化过程。
[点击并拖拽以移动] ​

Tuple3包含三个层面,一是int类型,一是double类型,还有一个是PersonPerson包含两个字段,一是int型的ID,另一个是 String 类型的name,它在序列化操作时,会委托相应具体序列化的序列化器进行相应的序列化操作。从图中可以看到Tuple3 会把 int类型通过IntSerializer进行序列化操作,此时int只需要占用四个字节就可以了。根据int占用四个字节,这个能够体现出Flink可序列化过程中的一个优势,即在知道数据类型的前提下,可以更好的进行相应的序列化与反序列化操作。相反,如果采用Java的序列化,虽然能够存储更多的属性信息,但一次占据的存储空间会受到一定的损耗。Person类会被当成一个Pojo对象来进行处理,PojoSerializer序列化器会把一些属性信息使用一个字节存储起来。同样,其字段则采取相对应的序列化器进行相应序列化,在序列化完的结果中,可以看到所有的数据都是由MemorySegment去支持。

MemorySegment具有什么作用呢? MemorySegmentFlink中会将对象序列化到预分配的内存块上,它代表1个固定长度的内存,默认大小为32 kbMemorySegment代表Flink中的一个最小的内存分配单元,相当于是Java的一个byte数组。 每条记录都会以序列化的形式存储在一个或多个MemorySegment中。

Flink 序列化的最佳实践

Flink常见的应用场景有四种,即注册子类型、注册自定义序列化器、添加类型提示、手动创建TypeInformation,具体如下:
【1】注册子类型: 如果函数签名只描述了超类型,但是它们实际上在执行期间使用了超类型的子类型,那么让Flink了解这些子类型会大大提高性能。可以在StreamExecutionEnvironmentExecutionEnvironment中调用.registertype (clazz) 注册子类型信息。
【2】注册自定义序列化器: 对于不适用于自己的序列化框架的数据类型,Flink会使用Kryo来进行序列化,并不是所有的类型都与Kryo无缝连接,具体注册方法在下文介绍。
【3】添加类型提示: 有时,当Flink用尽各种手段都无法推测出泛型信息时,用户需要传入一个类型提示TypeHint,这个通常只在Java API中需要。
【4】手动创建TypeInformation 在某些API调用中,这可能是必需的,因为Java的泛型类型擦除导致Flink无法推断数据类型。
其实在大多数情况下,用户不必担心序列化框架和注册类型,因为Flink已经提供了大量的序列化操作,不需要去定义自己的一些序列化器,但是在一些特殊场景下,需要去做一些相应的处理。

实践 - 类型声明: 类型声明去创建一个类型信息的对象是通过哪种方式?通常是用TypeInformation.of()方法来创建一个类型信息的对象,具体说明如下:
【1】对于非泛型类,直接传入class对象即可

PojoTypeInfo<Person> typeInfo = (PojoTypeInfo<Person>) TypeInformation.of(Person.class);

【2】对于泛型类,需要通过TypeHint来保存泛型类型信息

final TypeInfomation<Tuple2<Integer,Integer>> resultType = TypeInformation.of(new TypeHint<Tuple2<Integer,Integer>>(){});

【3】预定义常量:BasicTypeInfo,这个类定义了一系列常用类型的快捷方式,对于StringBooleanByteShortIntegerLongFloatDoubleChar等基本类型的类型声明,可以直接使用。而且Flink还提供了完全等价的Typesorg.apache.flink.api.common.typeinfo.Types。特别需要注意的是,flink-table模块也有一个Typesorg.apache.flink.table.api.Types,用于table模块内部的类型定义信息,用法稍有不同。使用IDE 的自动import时一定要小心。
【4】自定义TypeInfoTypeInfoFactory 通过自定义TypeInfo为任意类提供Flink原生内存管理(而非Kryo),使存
储更紧凑,运行时也更高效。需要注意在自定义类上使用@TypeInfo注解,随后创建相应的TypeInfoFactory并覆盖createTypeInfo()方法。

@TypeInfo(MyTupleTypeInfoFactory.class)
public class MyTuple<T0,T1>{public T0 myfield0;public T1 myfield1;
}public class MyTupleTypeInfoFactory extends TypeInfoFactory<MyTuple>{@Overridepublic TypeInformation<MyTuple> createTypeInfo(Type t, Map<String, TypeInfomation<?>> genericParameters){return new MyTupleTypeInfo(genericParameters.get("T0").genericParameters.get("T1"));}
}

实践 - 注册子类型

Flink认识父类,但不一定认识子类的一些独特特性,因此需要单独注册子类型。StreamExecutionEnvironmentExecutionEnvironment提供registerType()方法用来向Flink注册子类信息。

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.registerType(typeClass);

registerType()方法内部,会使用TypeExtractor来提取类型信息,如上所示,获取到的类型信息属于PojoTypeInfo及其子类,那么需要将其注册到一起,否则统一交给Kryo去处理,Flink并不过问 ( 这种情况下性能会变差 )。

实践 - Kryo 序列化

对于Flink无法序列化的类型(例如用户自定义类型,没有registerType,也没有自定义TypeInfoTypeInfoFactory),默认会交给 Kryo处理,如果Kryo仍然无法处理(例如GuavaThriftProtobuf等第三方库的一些类),有两种解决方案:
【1】强制使用Avro来代替Kryo

env.getConfig().enableForceAvro();

【2】为Kryo增加自定义的Serializer以增强Kryo的功能

env.getConfig().addDefaultKryoSerializer(clazz, serializer);

注:如果希望完全禁用Kryo100%使用Flink的序列化机制),可以通过Kryoenv.getConfig().disableGenericTypes()的方式完成,但注意一切无法处理的类都将导致异常,这种对于调试非常有效。

Flink 通信层的序列化

FlinkTask之间如果需要跨网络传输数据记录, 那么就需要将数据序列化之后写入NetworkBufferPool,然后下层的Task读出之后再进行反序列化操作,最后进行逻辑处理。为了使得记录以及事件能够被写入 Buffer,随后在消费时再从Buffer中读出,
Flink提供了数据记录序列化器RecordSerializer与反序列化器RecordDeserializer以及事件序列化器EventSerializer

Function发送的数据被封装成SerializationDelegate,它将任意元素公开为IOReadableWritable以进行序列化,通过setInstance()来传入要序列化的数据。在Flink通信层的序列化中,有几个问题值得关注,具体如下:
【1】何时确定Function的输入输出类型?
[点击并拖拽以移动] ​

在构建StreamTransformation的时候通过TypeExtractor工具确定Function的输入输出类型。TypeExtractor类可以根据方法签名、子类信息等蛛丝马迹自动提取或恢复类型信息。
【2】何时确定Function的序列化 / 反序列化器?
构造StreamGraph时, 通过TypeInfomationcreateSerializer()方法获取对应类型的序列化器TypeSerializer,并在addOperator()的过程中执行setSerializers() 操作,设置StreamConfigTYPESERIALIZERIN1TYPESERIALIZERIN2TYPESERIALIZEROUT_1属性。
【3】何时进行真正的序列化 / 反序列化操作? 这个过程与TypeSerializer又是怎么联系在一起的呢?
构造StreamGraph时, 通过TypeInfomationcreateSerializer()方法获取对应类型的序列化器TypeSerializer,并在addOperator()的过程中执行setSerializers()操作,设置StreamConfigTYPESERIALIZERIN1TYPESERIALIZERIN2TYPESERIALIZEROUT_1属性。
【4】何时进行真正的序列化 / 反序列化操作? 这个过程与TypeSerializer又是怎么联系在一起的呢?
[点击并拖拽以移动] ​

大家都应该清楚TaskStreamTask两个概念,Task是直接受TaskManager管理和调度的,而Task又会调用StreamTask,而StreamTask中真正封装了算子的处理逻辑。在run()方法中,首先将反序列化后的数据封装成StreamRecord交给算子处理;然后将处理结果通过Collector发送给下游 ( 在构建Collector时已经确定了SerializtionDelegate),并通过RecordWriter写入器将序列化后的结果写入DataOutput;最后序列化的操作交给SerializerDelegate处理,实际还是通过TypeSerializerserialize()方法完成。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/241597.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【小白攻略】php 小数转为百分比,保留两位小数的函数

php 小数转为百分比 首先&#xff0c;最简单直观的方法是利用PHP内置的number_format函数。该函数可以对一个数字进行格式化&#xff0c;并可以设置小数点后的精度。通过将小数乘以100&#xff0c;再用number_format函数将结果格式化为百分比形式&#xff0c;即可达到将小数转为…

【线性代数】决定张成空间的最少向量线性无关吗?

答1&#xff1a; 是的&#xff0c;张成空间的最少向量是线性无关的。 在数学中&#xff0c;张成空间&#xff08;span space&#xff09;是一个向量空间&#xff0c;它由一组向量通过线性组合&#xff08;即每个向量乘以一个标量&#xff09;生成。如果这组向量是线性无关的&…

HackTheBox - Medium - Linux - Format

Format Format 是一种中等难度的 Linux 机器&#xff0c;它突出显示了由解决方案的结构方式引起的安全问题。立足点涉及PHP源代码审查&#xff0c;发现和利用本地文件读/写漏洞&#xff0c;并利用Nginx中的错误配置在Redis Unix套接字上执行命令。横向移动包括浏览 Redis 数据…

旅游品牌网站搭建的作用是什么

我国旅游业规模非常高&#xff0c;各地大小旅游景区也是非常多&#xff0c;尤其节假日更是可以达到峰值&#xff0c;无论周边游还是外地游对所要去的景区&#xff0c;消费者总是需要来回了解很多&#xff0c;浏览器查或旅行社咨询等。 对旅游企业而言&#xff0c;传统线下方式…

Linux6.1、IO基础(C库函数文件类接口)

个人主页&#xff1a;Lei宝啊 愿所有美好如期而遇 1、文件打开及关闭 我们先了解文件的打开&#xff0c;path是文件路径&#xff0c;mode是打开文件的模式 r模式是只读&#xff0c;流位于文件的开头&#xff0c;也就是说从文件内容的开头开始读。r模式是读和写&#xff0c;同…

研究论文 2022-Oncoimmunology:AI+癌RNA-seq数据 识别细胞景观

Wang, Xin, et al. "Deep learning using bulk RNA-seq data expands cell landscape identification in tumor microenvironment." Oncoimmunology 11.1 (2022): 2043662. https://www.tandfonline.com/doi/full/10.1080/2162402X.2022.2043662 被引次数&#xff1…

VSCode软件与SCL编程

原创 NingChao NCLib 博途工控人平时在哪里技术交流博途工控人社群 VSCode简称VSC&#xff0c;是Visual studio code的缩写&#xff0c;是由微软开发的跨平台的轻量级编辑器&#xff0c;支持几乎所有主流的开发语言的语法高亮、代码智能补全、插件扩展、代码对比等&#xff0c…

图解LRU缓存

图解LRU缓存 OJ链接 介绍 LRU 缓存机制可以通过哈希表辅以双向链表实现&#xff0c;我们用一个哈希表和一个双向链表维护所有在缓存中的键值对。 双向链表按照被使用的顺序存储了这些键值对&#xff0c;靠近尾部的键值对是最近使用的&#xff0c;而靠近头部的键值对是最久未…

适配器模式学习

适配器模式&#xff08;Adapter&#xff09;将一个类的接口转换成客户希望的另外一个接口。Adapter 模式使得原本由于接口不兼容而不能一起工作的那些类可以一起工作。 适配器模式分为类适配器模式和对象适配器模式两种&#xff0c;前者类之间的耦合度比后者高&#xff0c;且要…

【常见的语法糖(详解)】

&#x1f7e9; 说几个常见的语法糖 &#x1f7e2;关于语法糖的典型解析&#x1f7e2;如何解语法糖&#xff1f;&#x1f7e2;糖块一、switch 支持 String 与枚举&#x1f4d9;糖块二、泛型&#x1f4dd;糖块三、自动装箱与拆箱&#x1f341;糖块四、方法变长参数&#x1f5a5;️…

STM32——CAN协议

文章目录 一.CAN协议的基本特点1.1 特点1.2 电平标准1.3 基本的五个帧1.4 数据帧 二.数据帧解析2.1 帧起始和仲裁段2.2 控制段2.3 数据段和CRC段2.4 ACK段和帧结束 三.总线仲裁四.位时序五.STM32CAN控制器原理与配置5.1 STM32CAN控制器介绍5.2 CAN的模式5.3 CAN框图 六 手册寄存…

LangChain 30 ChatGPT LLM将字符串作为输入并返回字符串Chat Model将消息列表作为输入并返回消息

LangChain系列文章 LangChain 实现给动物取名字&#xff0c;LangChain 2模块化prompt template并用streamlit生成网站 实现给动物取名字LangChain 3使用Agent访问Wikipedia和llm-math计算狗的平均年龄LangChain 4用向量数据库Faiss存储&#xff0c;读取YouTube的视频文本搜索I…

深度学习(八):bert理解之transformer

1.主要结构 transformer 是一种深度学习模型&#xff0c;主要用于处理序列数据&#xff0c;如自然语言处理任务。它在 2017 年由 Vaswani 等人在论文 “Attention is All You Need” 中提出。 Transformer 的主要特点是它完全放弃了传统的循环神经网络&#xff08;RNN&#x…

智能优化算法应用:基于爬行动物算法3D无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于爬行动物算法3D无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于爬行动物算法3D无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.爬行动物算法4.实验参数设定5.算法结果6.…

PHP函数定义和分类

函数的含义和定义格式 在PHP中&#xff0c;允许程序员将常用的流程或者变量等组件组织成一个固定的格式实现特定功能&#xff0c;也就是说函数是具有特定功能特定格式的代码段。 函数的定义格式如下&#xff1a; function 函数名(参数1&#xff0c;参数2&#xff0c;参数n) {…

Web前端 ---- 【Vue】vue路由守卫(全局前置路由守卫、全局后置路由守卫、局部路由path守卫、局部路由component守卫)

目录 前言 全局前置路由守卫 全局后置路由守卫 局部路由守卫之path守卫 局部路由守卫之component守卫 前言 本文介绍Vue2最后的知识点&#xff0c;关于vue的路由守卫。也就是鉴权&#xff0c;不是所有的组件任何人都可以访问到的&#xff0c;需要权限&#xff0c;而根据权限…

Hadoop入门学习笔记——六、连接到Hive

视频课程地址&#xff1a;https://www.bilibili.com/video/BV1WY4y197g7 课程资料链接&#xff1a;https://pan.baidu.com/s/15KpnWeKpvExpKmOC8xjmtQ?pwd5ay8 Hadoop入门学习笔记&#xff08;汇总&#xff09; 目录 六、连接到Hive6.1. 使用Hive的Shell客户端6.2. 使用Beel…

vue3(五)-基础入门之计算属性

一、计算属性 1.计算属性与普通方法的的区别&#xff1a; 计算属性在需要渲染数据时调用一次&#xff0c;而后将结果缓存起来。只有计算属性所依赖的数据发生改变时才会重新调用函数&#xff0c;否则每次渲染相同的数据都只会从缓存中读取。 普通方法在每次数据需要渲染时都会…

CGAL的网格简化

1、介绍 曲面网格简化是减少曲面网格中使用的面数&#xff0c;同时尽可能保持整体形状、体积和边界的过程。它是细分法的反面。 这里提出的算法可以使用称为边折叠的方法简化任何有向2流形曲面&#xff0c;具有任意数量的连接组件&#xff0c;有或没有边界&#xff08;边界或孔…

为什么react call api in cDidMount

为什么react call api in cDM 首先&#xff0c;放到constructor或者cWillMount不是语法错误 参考1 参考2 根据上2个参考&#xff0c;总结为&#xff1a; 1、官网就是这么建议的&#xff1a; 2、17版本后的react 由于fiber的出现导致 cWM 会调用多次&#xff01; cWM 方法已…