Flink-序列化

一、概述

几乎每个Flink作业都必须在其运算符之间交换数据,由于这些记录不仅可以发送到同一JVM中的另一个实例,还可以发送到单独的进程,因此需要先将记录序列化为字节。类似地,Flink的堆外状态后端基于本地嵌入式RocksDB实例,该实例以本机C++代码实现,因此也需要在每次状态访问时转换为字节。如果执行不正确,仅有线和状态序列化就很容易消耗作业的大量性能,因此,每当您查看Flink作业的分析器输出时,您很可能会在使用CPU周期的顶部看到序列化。

因此,序列化对我们的Flink作业至关重要

本质上,Flink试图推断有关作业数据类型的信息以进行连接和状态序列化,并能够通过引用单个字段名称来使用分组、连接和聚合操作,例如stream. keyBy("ruleId")或dataSet.connect(另一个).where("name").equalTo("个性化名称")。它还允许优化序列化格式以及减少不必要的去序列化(主要是在某些批处理操作以及SQL/表API中)。

二、序列化器选择

Flink的开箱即用序列化大致有以下几种:

  • Flink为基本类型(Java原语及其装箱形式)、数组、复合类型(元组、Scala案例类、行)和一些辅助类型(Option, Either, Lists, Maps…)提供了特殊的序列化程序
  • POJO:一个公共的、独立的类,具有公共的无参数构造函数和类层次结构中的所有非静态、非瞬态字段,要么是公共的,要么是公共的getter-和setter-method;
  • 泛型类型:不被识别为POJO然后通过Kryo序列化的用户定义数据类型。
  • 自定义序列化程序:可以为用户定义的数据类型注册自定义序列化程序。这包括编写自己的序列化程序或通过Kryo集成其他序列化系统,如Google Pro buf或Apache Thrift。

PojoSerializer

如果我们的数据类型没有被专门的序列化程序覆盖,但遵循POJO规则,Flink将使用PojoSerializer序列化,PojoSerializer使用Java反射来访问对象的字段。它快速、通用、特定于Flink,并支持开箱即用的状态模式演变。如果复合数据类型不能序列化为POJO,我们可以在集群日志中找到以下消息(或类似消息):

15:45:51,460 INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class … cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on “Data Types & Serialization” for details of the effect on performance.

这意味着,PojoSerializer将不会被使用,而是Flink将回退到Kryo进行序列化。当然还会有一些情况可能导致Kryo意外回退的情况。

Tuple Data Types

Flink带有一组预定义的元组类型,它们都具有固定的长度,并包含一组可能不同类型的强类型字段。有Tuple0、Tuple1<T0>、…、Tuple25<T0、T1、…、T24>的实现,它们可以作为易于使用的包装器,为我们需要在计算之间传递的每个对象组合节省POJO的创建。除了Tuple0之外,这些都是使用TupleSerializer和相应字段的序列化器序列化和反序列化的。由于元组类完全在Flink的控制之下,因此可以通过直接访问适当的字段来执行这两个操作而无需反射。

在使用元组而不是POJO时,这当然是一个(性能)优势。然而,元组在代码中并不那么灵活,描述性肯定也较差。

Row Data Types 

行类型主要由Flink的Table和SQLAPI使用。Row将任意数量的对象组合在一起,类似于上面的元组。这些字段不是强类型的,可能都是不同的类型。由于缺少字段类型,Flink的类型提取不能自动提取类型信息,Row的用户需要手动告诉Flink该行的字段类型。然后RowSerializer将利用这些类型进行高效的序列化。

行类型信息可以通过两种方式提供:

1、让源或运算符实现ResultTypeQueryable<Row>

public static class RowSource implements SourceFunction<Row>, ResultTypeQueryable<Row> {// ...@Overridepublic TypeInformation<Row> getProducedType() {return Types.ROW(Types.INT, Types.STRING, Types.OBJECT_ARRAY(Types.STRING));}
}

在构建作业图时使用SingleOutputStreamOperator#returns()提供类型

DataStream<Row> sourceStream =env.addSource(new RowSource()).returns(Types.ROW(Types.INT, Types.STRING, Types.OBJECT_ARRAY(Types.STRING)));

如果您未能提供“行”的类型信息,Flink会根据上述规则识别“行”不是有效的POJO类型,并回退到Kryo序列化,这样性能就会下降。

flink 自带的TupleSerializer性能最高,其中一部分原因来源于不需要使用反射来访问 Tuple 中的字段。PojoSerializer 比 TupleSerializer性能差一些,但是比 kryo 的序列化方式性能要高几倍

Avro 

Flink通过将org. apache.flink:flink-avro依赖项添加到作业中来提供对Apache Avro序列化框架(当前使用版本1.8.2)的内置支持。然后,Flink的AvroSerializer可以使用Avro的Specific、Generic和 Reflect数据序列化,并利用Avro的性能和灵活性,特别是在类随时间变化时演变模式方面。

Avro Specific

通过检查给定类型的类型层次结构是否包含SpecificRecordBase类,将自动检测Avro特定记录。可以指定具体的Avro类型,或者——如果我们想更通用并在运算符中允许不同的类型——在我们用户函数中、在ResultTypeQueryable#getProducedType()中或在SingleOutputStreamOperator中使用SpecificRecordBase类型(或子类型)。由于特定记录使用生成的Java代码,因此它们是强类型的,并允许通过已知的getter和setter直接访问字段。

:如果您将Flink类型指定为“SpecificRecord”而不是“SpecificRecordBase”,Flink不会将其视为Avro类型。相反,它将使用Kryo对任何可能相当慢的对象进行解/序列化

Avro Generic

不幸的是,Avro的GenericRecord类型不能自动使用,因为它们需要用户指定模式(手动或从某些模式注册表中检索)。使用该模式,我们可以通过以下任一选项提供正确的类型信息,就像上面的行类型一样:

  • implement ResultTypeQueryable<GenericRecord>:
public static class AvroGenericSource implements SourceFunction<GenericRecord>, ResultTypeQueryable<GenericRecord> {private final GenericRecordAvroTypeInfo producedType;public AvroGenericSource(Schema schema) {this.producedType = new GenericRecordAvroTypeInfo(schema);}@Overridepublic TypeInformation<GenericRecord> getProducedType() {return producedType;}
}
  • 在构建作业图时使用SingleOutputStreamOperator#returns()
DataStream<GenericRecord> sourceStream =env.addSource(new AvroGenericSource()).returns(new GenericRecordAvroTypeInfo(schema));

如果没有这种类型信息,Flink将回退到Kryo进行序列化,这将一遍又一遍地将模式序列化到每条记录中。因此,序列化的形式将更大,创建成本更高。

注意:由于Avro的Schema类不可序列化,因此不能按原样发送。我们可以通过将其转换为字符串并在需要时解析它来解决这个问题。如果在初始化时只这样做一次,那么直接发送实际上没有区别。

Avro Reflect

使用Avro的第三种方法是将Flink的PojoSerializer(根据上述规则用于POJO)交换为Avro的基于反射的序列化器。这可以通过调用以下代码实现:

env.getConfig().enableForceAvro();

Kryo

任何不属于上述类别或被Flink提供的特殊序列化程序覆盖的类或对象都将被解/序列化,并回退到Kryo(当前版本2.24.0),这是Java中一个强大的通用序列化框架。Flink将此类类型称为泛型类型,我们在调试代码时可能会偶然发现GenericTypeInfo。如果使用Kryo序列化,请确保向kryo注册使用的类型:

env.getConfig().registerKryoType(MyCustomType.class);

注册类型会将它们添加内部map(class->tag)中,这样在序列化过程中,Kryo就不必将完全限定的类名作为前缀添加到序列化形式中。相反,Kryo使用这些(整数)标签来识别底层类并减少序列化开销。

注意:Flink将在其检查点和保存点中存储来自类型注册的Kryo serializer mappings,并在作业(重新)启动时保留它们。

禁用Kryo

如果需要,您可以通过调用禁用Kryo回退,即序列化泛型类型的能力

env.getConfig().disableGenericTypes();

这对于找出这些回退的应用位置并用更好的序列化程序替换它们非常有用。如果我们的作业有任何具有此配置的泛型类型,它将失败

Apache Thrift(通过Kryo)

除了上面的变体之外,Flink还允许我们向Kryo注册其他类型的序列化框架。从留档(com.twitter:chill-thrift 和 org.apache.thrift:libthrift)添加适当的依赖项后,可以像下面这样使用Apache Thrift:

env.getConfig().addDefaultKryoSerializer(MyCustomType.class, TBaseSerializer.class);

这仅在未禁用泛型类型并且MyCustomType是Thrift生成的数据类型时才有效。如果数据类型不是由Thrift生成的,Flink将在运行时失败。

Protobuf(通过Kryo)

在类似于Apache Thrift的方式中,添加正确的依赖项(com.twitter:chill-protobuf 和 com.google.protobuf:protobuf-java)后,Google Protobuf可以注册为自定义序列化程序:

env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, ProtobufSerializer.class);

只要泛型类型没有被禁用,这就可以工作(这将永久禁用Kryo)。如果MyCustomType不是Protobuf生成的类,Flink作业将在运行时失败。

三、状态模式演变

在仔细研究上述每个序列化程序的性能之前,我们想强调的是,性能并不是实际Flink作业中的一切。例如,用于存储状态的类型应该能够在作业的整个生命周期内发展其模式(添加/删除/更改字段),而不会丢失以前的状态。这就是Flink所说的状态模式演变。目前,从Flink 1.10开始,只有两个序列化程序支持开箱即用的模式演变:POJO和Avro。

对于其他任何事情,如果我们想更改状态模式,必须实现自己的自定义序列化程序或使用状态处理器API为新代码修改状态。

四、性能对比

有这么多的序列化选项,要做出正确的选择其实并不容易。我们已经看到了上面概述的每一个的一些技术优势和劣势。由于序列化程序是我们Flink作业的核心,并且通常也作用在热路径上(每个记录调用),所以让我们在https://github.com/dataArtisans/flink-benchmarks的Flink基准项目的帮助下实际更深入地了解它们的性能。这个项目在Flink之上添加了一些微基准(有些比其他更低级)来跟踪性能回归和改进。Flink用于监控序列化堆栈性能的持续基准在SerializationFrameworkMiniBenchmarks.java中实现。

不过,这只是所有可用序列化基准测试的一个子集,我们将在SerializationFrameworkAllBenchmarks.java中找到完整的集合。所有这些都使用可能涵盖平均用例的小型POJO的相同定义。本质上(没有构造函数、getter和setter),这些是它用于评估性能的数据类型:

public class MyPojo {public int id;private String name;private String[] operationNames;private MyOperation[] operations;private int otherId1;private int otherId2;private int otherId3;private Object someObject;
}
public class MyOperation {int id;protected String name;
}

这被适当地映射到tuples、行、Avro specific、Thrift和Protobuf 表示,并通过并行度=4的简单Flink作业发送,其中数据类型在网络通信期间使用,如下所示:

env.setParallelism(4);
env.addSource(new PojoSource(RECORDS_PER_INVOCATION, 10)).rebalance().addSink(new DiscardingSink<>());

在通过SerializationFrameworkAllBenchmarks.java中定义的jmh微基准测试运行后,得到了官方给出的Flink 1.10以下性能结果(以每毫秒的操作数为单位):

从图中我们可以得到以下信息:

  • 从POJO到Kryo的默认回退将性能降低了75%。与使用POJO相比,向Kryo注册类型显着提高了其性能,仅减少了64%的操作。

  • Avro GenericRecord和SpecificRecord的序列化速度大致相同。

  • Avro Reflect序列化甚至比Kryo默认值(-45%)还要慢。

  • Tuples 是最快的,紧随其后的是Rows。两者都利用基于直接访问的快速专用序列化代码,无需Java反射。

  • 使用(嵌套)Tuples 而不是POJO可能会使工作速度提高42%(但灵活性较低!)。为PojoSerializer(FLINK-3599)生成代码实际上可能会缩小这一差距(或者至少更接近RowSerializer)。

  • 如果不能使用POJO,请尝试使用为其生成特定代码的序列化框架之一来定义用到的数据类型:Protobuf 、Avro、Thrift(按性能顺序)。

注意与所有基准测试一样,请记住,这些数字只能提示Flink在特定场景中的序列化器性能。它们可能因您的数据类型而异,但粗略的分类可能是相同的。如果你不放心,可以使用你的数据类型验证结果。

五、结论

我们研究了Flink如何对不同类型的数据类型执行序列化,并详细说明了技术上的优缺点。对于Flink状态下使用的数据类型,推荐使用POJO或Avro类型,目前,它们是唯一支持开箱即用状态演进的类型,并允许在有状态应用程序随着时间的推移而开发。POJO通常在反序列化方面更快,而Avro可能支持更灵活的模式演进,并且可以更好地与外部系统集成。但是请注意,我们可以对外部组件和内部组件甚至状态和网络通信使用不同的序列化程序

最快的反序列化是通过Flink的内部元组和行序列化器实现的,这些元组和行序列化器可以直接访问这些类型的字段,而无需通过反射。与元组相比,吞吐量降低了大约30%,Protobuf 和POJO类型本身的性能不会太差,并且更加灵活和可维护。Avro(specific and generic)记录以及Thrift数据类型分别进一步降低了20%和30%的性能。所以我们要想方设法避免Kryo,因为这会进一步降低约50%甚至更多的吞吐量!

那么如何避免Kryo的常见陷阱和障碍呢?如何充分利用PojoSerializer等序列化技术的调整呢?敬请关注,我们一起跟着官网壮大自己。

 -------------------------------------------------------------------------------------------------------------------------------

大多数高校硕博生毕业要求需要参加学术会议,发表EI或者SCI检索的学术论文会议论文:
可访问艾思科蓝官网,浏览即将召开的学术会议列表。会议如下:

 第八届大数据与应用统计国际学术研讨会(ISBDAS 2025)

https://ais.cn/u/fEzmy2

第二届生成式人工智能与信息安全国际学术会议(GAIIS 2025)

https://ais.cn/u/uAbENn

第四届电子技术与人工智能国际学术会议(ETAI 2025)

https://ais.cn/u/vqM7Nj

第四届网络安全、人工智能与数字经济国际学术会议(CSAIDE 2025)

https://ais.cn/u/ZrERn2

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/69654.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

使用DeepSeek和Kimi快速自动生成PPT

目录 步骤1&#xff1a;在DeepSeek中生成要制作的PPT主要大纲内容。 &#xff08;1&#xff09;在DeepSeek网页端生成 &#xff08;2&#xff09;在本地部署DeepSeek后&#xff0c;使用chatBox生成PPT内容 步骤2&#xff1a;将DeepSeek成的PPT内容复制到Kimi中 步骤3&…

第41天:Web开发-JS应用微信小程序源码架构编译预览逆向调试嵌套资产代码审计

#知识点 1、安全开发-微信小程序-搭建&开发&架构&安全 2、安全开发-微信小程序-编译调试&反编译&泄露 一、小程序创建&#xff08;了解即可&#xff09; 1、下载微信开发者工具 2、创建小程序模版引用 https://developers.weixin.qq.com/miniprogram/dev/d…

Arduino 第十一章:温度传感器

Arduino 第十一章&#xff1a;LM35 温度传感器 一、LM35 简介 LM35 是美国国家半导体公司&#xff08;现德州仪器&#xff09;生产的一款精密集成电路温度传感器。与基于热力学原理的传统温度传感器不同&#xff0c;LM35 能直接将温度转换为电压输出&#xff0c;且输出电压与…

Oracle常用导元数据方法

1 说明 前两天领导发邮件要求导出O库一批表和索引的ddl语句做国产化测试&#xff0c;涉及6个系统&#xff0c;6千多张表&#xff0c;还好涉及的用户并不多&#xff0c;要不然很麻烦。 如此大费周折原因&#xff0c;是某国产库无法做元数据迁移。。。额&#xff0c;只能我手动导…

2022java面试总结,1000道(集合+JVM+并发编程+Spring+Mybatis)的Java高频面试题

1、面试题模块汇总 面试题包括以下十九个模块&#xff1a; Java 基础、容器、多线程、反射、对象拷贝、Java Web 模块、异常、网络、设计模式、Spring/Spring MVC、Spring Boot/Spring Cloud、Hibernate、Mybatis、RabbitMQ、Kafka、Zookeeper、MySql、Redis、JVM 。如下图所示…

Curser2_解除机器码限制

# Curser1_无限白嫖试用次数 文末有所需工具下载地址 Cursor Device ID Changer 一个用于修改 Cursor 编辑器设备 ID 的跨平台工具集。当遇到设备 ID 锁定问题时&#xff0c;可用于重置设备标识。 功能特性 ✨ 支持 Windows 和 macOS 系统&#x1f504; 自动生成符合格式的…

carbon 加入 GitCode:Golang 时间处理的 “瑞士军刀”

在 Golang 的开发生态中&#xff0c;时间处理领域长期存在着诸多挑战。高效、精准的时间处理对于各类软件应用的稳定运行与功能拓展至关重要。近日&#xff0c;carbon 正式加入 GitCode&#xff0c;为 Golang 开发者带来一款强大且便捷的时间处理利器&#xff0c;助力项目开发迈…

算法学习--链表

引言&#xff1a;为什么进行链表的学习&#xff1f; 考察能力独特&#xff1a;链表能很好地考察应聘者对指针操作、内存管理的理解和运用能力&#xff0c;还能检验代码的鲁棒性&#xff0c;比如处理链表的插入、删除操作时对边界条件的处理。数据结构基础&#xff1a;链表是很多…

域名劫持原理与实践

了解域名及域名劫持 由于点分十进制的IP地址难于记忆&#xff0c;便出现了域名。由于网络传输中最终还是基于IP&#xff0c;所以必须通过一种机制将IP和域名一一对应起来&#xff0c;这便是DNS。全球总共有13台根域名服务器。 域名劫持是互联网攻击中常见的一种攻击方式&…

【论文翻译】DeepSeek-V3论文翻译——DeepSeek-V3 Technical Report——第二部分:(训练硬件)基础设施

论文原文链接&#xff1a;DeepSeek-V3/DeepSeek_V3.pdf at main deepseek-ai/DeepSeek-V3 GitHub 特别声明&#xff0c;本文不做任何商业用途&#xff0c;仅作为个人学习相关论文的翻译记录。本文对原文内容直译&#xff0c;一切以论文原文内容为准&#xff0c;对原文作者表示…

MapReduce到底是个啥?

在聊 MapReduce 之前不妨先看个例子&#xff1a;假设某短视频平台日活用户大约在7000万左右&#xff0c;若平均每一个用户产生3条行为日志&#xff1a;点赞、转发、收藏&#xff1b;这样就是两亿条行为日志&#xff0c;再假设每条日志大小为100个字节&#xff0c;那么一天就会产…

Error: llama runner process has terminated: exit status 0xc0000409 问题解决办法

在大模型部署过程中&#xff0c;格式转换环节若使用了高版本的 llama.cpp 库&#xff0c;而系统当前运行的版本较低&#xff0c;就会出现版本不兼容的情况。 这种不匹配会阻碍模型的正常运行&#xff0c;进而导致报错。建议你密切关注模型所需的版本要求&#xff0c;及时将系统…

代码随想录-训练营-day20

今天我们继续回溯&#xff1a; 39. 组合总和 - 力扣&#xff08;LeetCode&#xff09; 这个题和我们之前的组合题相比&#xff0c;最大的区别在于我们可以无限次的重复取用某值了&#xff0c;这就让我们的递归参数与之前不同&#xff0c;除此之外&#xff0c;本质上这个题与21…

ubuntu 本地部署deepseek r1 蒸馏模型

本文中的文件路径或网络代理需要根据自身环境自行删改 一、交互式chat页面 1.1 open-webui 交互窗口部署&#xff1a;基于docker安装&#xff0c;且支持联网搜索 Open WebUI 是一个可扩展、功能丰富且用户友好的自托管 AI 平台&#xff0c;旨在完全离线操作。它支持各种 LLM…

数据库 绪论

目录 数据库基本概念 一.基本概念 1.信息 2.数据 3.数据库&#xff08;DB&#xff09; 4.数据库管理系统&#xff08;DBMS&#xff09; 5.数据库系统&#xff08;DBS&#xff09; 二.数据管理技术的发展 1.人工管理阶段 2.文件系统阶段 3.数据库系统阶段 4.数据库管…

数据中台是什么?:架构演进、业务整合、方向演进

文章目录 1. 引言2. 数据中台的概念与沿革2.1 概念定义2.2 历史沿革 3. 数据中台的架构组成与关键技术要素解析3.1 架构组成3.2 关键技术要素 4. 数据中台与其他平台的对比详细解析 5. 综合案例&#xff1a;金融行业数据中台落地实践5.1 背景5.2 解决方案5.3 成果与价值 6. 方向…

【DeepSeek】DeepSeek概述 | 本地部署deepseek

目录 1 -> 概述 1.1 -> 技术特点 1.2 -> 模型发布 1.3 -> 应用领域 1.4 -> 优势与影响 2 -> 本地部署 2.1 -> 安装ollama 2.2 -> 部署deepseek-r1模型 1 -> 概述 DeepSeek是由中国的深度求索公司开发的一系列人工智能模型&#xff0c;以其…

如何使用C++将处理后的信号保存为PNG和TIFF格式

在信号处理领域&#xff0c;我们常常需要将处理结果以图像的形式保存下来&#xff0c;方便后续分析和展示。C提供了多种库来处理图像数据&#xff0c;本文将介绍如何使用stb_image_write库保存为PNG格式图像以及使用OpenCV库保存为TIFF格式图像。 1. PNG格式保存 使用stb_ima…

查出 product 表中所有 detail 字段包含 xxx 的完整记录

您可以使用以下 SQL 查询语句来查出 product 表中所有 detail 字段包含 oss.kxlist.com 的完整记录&#xff1a; SELECT * FROM product WHERE INSTR(detail, oss.kxlist.com) > 0;下面是detail字段包含的完整内容 <p><img style"max-width:100%;" src…

微服务 day01 注册与发现 Nacos OpenFeign

目录 1.认识微服务&#xff1a; 单体架构&#xff1a; 微服务架构&#xff1a; 2.服务注册和发现 1.注册中心&#xff1a; 2.服务注册&#xff1a; 3.服务发现&#xff1a; 发现并调用服务&#xff1a; 方法1&#xff1a; 方法2&#xff1a; 方法3:OpenFeign OpenFeig…