25、Flink 支持的数据类型及序列化详解

数据类型及序列化
1.概览
a)概述

Flink 以其独特的方式来处理数据类型序列化,包括它自身的类型描述符泛型类型提取以及类型序列化框架

支持的数据类型

Java Tuples and Scala Case Classes
Java POJOs
Primitive Types
Regular Classes
Values
Hadoop Writables
Special Types

Tuples and Case Classes

元组是包含固定数量的具有各种类型的字段的复合类型,Java API 提供从 Tuple1 到 Tuple25 的类,元组的每个字段都可以是任意的 Flink 类型,包括元组,从而产生嵌套的元组;

元组的字段可以使用字段名称 tuple.f4 直接访问,也可以使用通用的 getter 方法 tuple.getField(int位置) 字段索引从0开始。

DataStream<Tuple2<String, Integer>> wordCounts = env.fromElements(new Tuple2<String, Integer>("hello", 1),new Tuple2<String, Integer>("world", 2));wordCounts.map(new MapFunction<Tuple2<String, Integer>, Integer>() {@Overridepublic Integer map(Tuple2<String, Integer> value) throws Exception {return value.f1;}
});wordCounts.keyBy(value -> value.f0);

POJOs

如果 Java 和 Scala 类满足以下要求,Flink 会将它们视为特殊的 POJO 数据类型:

  • 类必须是公开的

  • 它必须有一个不带参数的公共构造函数(默认构造函数)

  • 所有字段要么是公共的,要么必须可以通过 getter 和 setter 函数访问;对于名为 foo 的字段,getter 和 setter 方法必须命名为 getFoo() 和 setFoo()

  • 字段的类型必须被已注册的序列化器支持

POJO 通常用 PojoTypeInfo 表示,并用 PojoSerializer 进行序列化(可以配置回退使用 Kryo 序列化器);除了 POJO 是 Avro 类型(Avro 特定记录)或作为 “Avro 反射类型” ,此时 POJO 由 AvroTypeInfo 表示,并使用 AvroSerializer 进行序列化;如果需要,还可以注册自定义的序列化程序。

Flink 可以分析 POJO 类型的结构,例如了解 POJO 的字段;POJO 类型比一般类型更易于使用,Flink 可以更有效地处理 POJO 类型。

可以通过 org.apache.flink.types.PojoTestUtils#assertSerializedAsPojo() 测试[类是否符合 POJO 要求],如果想确保 POJO 的任何字段都不会使用 Kryo 进行序列化,可以使用用 assertSerializedAsPojoWithoutKryo。

测试所编写的类是否为 POJO 类型

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-test-utils</artifactId><version>1.19.0</version>
</dependency>// 测试所编写的类是否为 POJO 类型
PojoTestUtils.assertSerializedAsPojo(WordWithCount.class);// 确保 POJO 的任何字段都不会使用 Kryo 进行序列化
PojoTestUtils.assertSerializedAsPojoWithoutKryo(WordWithCount.class);

示例:带有两个公共字段的简单POJO。

public class WordWithCount {public String word;public int count;public WordWithCount() {}public WordWithCount(String word, int count) {this.word = word;this.count = count;}
}DataStream<WordWithCount> wordCounts = env.fromElements(new WordWithCount("hello", 1),new WordWithCount("world", 2));wordCounts.keyBy(value -> value.word);

Primitive Types

Flink 支持所有 Java 和 Scala 基本类型,如 Integer、String 和 Double。

General Class Types

Flink 支持大多数 Java 和 Scala 类(API和自定义),对于包含无法序列化字段的类,如文件指针、I/O流或其它本机资源,会受限制;遵循 JavaBeans 约定的类可以正常使用。

Flink 将所有未标识为 POJO 类型的类,作为通用类的类型进行处理,Flink 将这些数据类型视为黑匣子,无法访问其内容;一般类型使用序列化框架 Kryo 进行序列化和反序列化。

Values

值类型需要手动描述它们的序列化和反序列化器,它们不是通过通用的序列化框架,而是通过实现 org.apache.flink.type 自定义读取和写入的方法;

当通用序列化器效率很低时,使用 Value 类型是合理的;示例将元素的稀疏向量用数组实现,其中数组元素大部分为零,就可以对非零元素使用特殊编码,而通用序列化器只需写入所有数组元素。

org.apache.flink.CopyableValue 接口以类似的方式在内部支持手动克隆的逻辑。

Flink 提供了与基本数据类型相对应的预定义值类型(ByteValue、ShortValue、IntValue、LongValue、FloatValue、DoubleValue、StringValue、CharValue、BooleanValue)这些值类型充当基本数据类型的变体,它们的值可以更改,允许重用对象并减轻 GC 的压力。

Hadoop Writables

使用实现了 org.apache.hadoop 接口的类型,使用 write() 和 readFields() 方法定义序列化和反序列化逻辑。

Special Types

特殊的类型,包括 Scala 的 Either,Option 和 Try;Java API 有 Either 的自定义实现,表示两种可能类型的值。

b)类型擦除和类型推断-Java

类型擦除:Java 编译器在编译后会丢弃许多泛型的类型信息,因此在运行时,对象的实例不再知道其泛型类型,例如 DataStream<String> 和 DataStream<Long> 的实例在 JVM 中看起来是相同的。

Flink 在调用程序的 main 方法时需要知道类型信息,Flink Java API 试图重建丢弃的类型信息,并将其显式存储在数据集和 operator 中;可以通过 DataStream.getType() 检索类型,该方法返回 TypeInformation 的一个实例,这是 Flink 表示类型的内部方式。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = env.socketTextStream("localhost", 8888);// String
System.out.println(source.getType());

类型推断有其局限性,有时需要手动指定数据的类型,例如从集合创建数据集 StreamExecutionEnvironment.fromCollection(),可以在其中传递描述类型的参数;像 MapFunction<I,O> 这样的通用函数有时也需要额外的类型信息。

c)Flink中的类型处理

Flink 试图推断出在分布式计算过程中交换和存储的数据类型信息,在大多数情况下,Flink 会推断出所有必要的信息,使用户无需关心序列化框架和无需注册数据类型。

Flink 对数据类型了解得越多,序列化方案就越好;这对于 Flink 中的内存使用模式非常重要(尽可能在堆内/外处理序列化数据,使序列化成本非常低);

当程序对 DataStream 进行调用时以及在调用 execute()、print()、count() 或 collect() 之前,需要有关数据类型的信息。

d)常见问题

注册子类型:如果函数签名只描述超类型,但在执行过程中实际使用了超类型的子类型,那让 Flink 知道这些子类型会大大提高性能,需要在 StreamExecutionEnvironment 上为每个子类型调用.registerType(clazz)。

// 注册子类型[废弃]-PipelineOptions.SERIALIZATION_CONFIG[替代]
StreamExecutionEnvironment.registerType(String.class);

注册自定义序列化程序:Flink 会为无法处理的类型使用 Kryo 序列化器,如果 kryo 序列化器也不能处理,需要在StreamExecutionEnvironment 上调用 .getConfig().addDefaultKryoSerializer(clazz,serializer) 注册自定义的序列化器。

添加类型提示(TypeHints):当 Flink 无法推断出通用类型时,必须传递类型提示,通常只有在 Java API 中需要。

手动创建类型信息(TypeInformation):Java 的通用类型擦除会导致 Flink 无法推断数据类型。

e)Flink 的 TypeInformation 类

TypeInformation 类是所有类型描述符的基类,它揭示了类型的一些基本属性,并可以为类型生成序列化器和比较器(Flink 中的比较器不仅定义顺序,还用于处理 keys)。

在内部,Flink 对类型区分如下

基本类型:Java 基本类型及其装箱形式以及 void、String、Date、BigDecimal 和 BigInteger;

基本数组和对象数组

复合类型

Flink 的 Java 元组(Flink Java API的一部分):最多25个字段,不支持空字段;

Scala case classes (包括 Scala 元组):不支持 null 字段;

Row:具有任意数量的字段和支持空字段的元组;

POJO:遵循特定 bean 模式的类;

辅助类型:Option,Either,Lists,Maps,…;

泛型类型:Flink 本身不会序列化这些类型,而是由 Kryo 进行序列化;

f)POJO 类型规则

如果满足以下条件,Flink 将数据类型识别为 POJO 类型(并允许 “按名称” 字段引用)

该类是公共的和独立的(没有非静态内部类);

该类有一个公共的无参数构造函数;

类(和所有超类)中的所有非静态、非 transient 字段要么是公共的,要么有一个公共的 getter 和 setter 方法;

注意:当用户定义的数据类型无法识别为 POJO 类型时,必须将其处理为 GenericType 并使用 Kryo 进行序列化。

g)创建 TypeInformation 或 TypeSerializer

由于 Java 会擦除泛型类型信息,因此需要将类型传递给 TypeInformation

对于非泛型类型,可以传递类:

TypeInformation<String> info = TypeInformation.of(String.class);

对于泛型类型,需要通过 TypeHint “捕获” 泛型类型信息:

TypeInformation<Tuple2<String, Double>> info = TypeInformation.of(new TypeHint<Tuple2<String, Double>>(){});

在内部,创建了 TypeHint 的一个匿名子类,该子类捕获泛型信息以将其保留到运行时。

有两种方法可以创建 TypeSerializer

  • 对 TypeInformation 对象调用 typeInfo.createSerializer(config),config 参数的类型为 ExecutionConfig,包含有关程序注册的自定义序列化器的信息;尽量将正确的 ExecutionConfig 传递给程序,可以通过调用 getExecutionConfig() 从 DataStream 中获取它。

  • 在函数(如RichMapFunction)内部使用 getRuntimeContext().createSerializer(typeInfo) 来获取它。

h)在 Java API 中的 Type Information

Java 会擦除通用类型信息;Flink 试图通过反射重建尽可能多的类型信息,使用 Java 保留的少数比特(主要是函数签名和子类信息);

函数的返回类型取决于其输入类型的情况以及一些简单的类型推断:

public class AppendOne<T> implements MapFunction<T, Tuple2<T, Long>> {public Tuple2<T, Long> map(T value) {return new Tuple2<T, Long>(value, 1L);}
}

在某些情况下,Flink 无法重建所有泛型类型信息,此时用户必须提供类型提示。

Java API中的类型提示

在 Flink 无法重建擦除的通用类型信息时,Java API 提供所谓的类型提示,类型提示告诉系统函数生成的数据流或数据集的类型:

DataStream<SomeType> result = stream.map(new MyGenericNonInferrableFunction<Long, SomeType>()).returns(SomeType.class);

return 语句指定生成的类型。

Java 8 Lambda的类型提取

Java 8 lambdas 的类型提取与非 lambdas 不同,因为 lambdas 与扩展函数接口的实现类无关。

Flink 试图使用 Java 的泛型签名来确定参数类型和返回类型;但并非所有编译器都会为 Lambda 生成这些签名,有时也需要手动指定数据类型。

POJO类型的序列化

PojoTypeInfo 为 POJO 中的所有字段创建序列化程序;int、long、String 等标准类型由 Flink 自带的序列化程序处理,对于其它类型使用 Kryo,如果 Kryo 无法处理该类型,可以要求 PojoTypeInfo 使用 Avro 序列化 POJO,调用方法如下:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableForceAvro();

注意:Flink 会使用 Avro 序列化器自动序列化 Avro 生成的 POJO,如果希望 Kryo 序列化器处理整个 POJO 类型,配置如下

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableForceKryo();

如果 Kryo 无法序列化 POJO,可以向 Kryo 添加自定义序列化程序。

env.getConfig().addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass);

禁用回退 Kryo

如果程序需要避免使用 Kryo 作为泛型类型的回退,确保通过 Flink 自己的序列化程序或通过用户自定义的自定义序列化程序有效地序列化所有类型。

注意:遇到要使用 Kryo 的数据类型时,以下设置会引发异常:

env.getConfig().disableGenericTypes();
i)使用 Factory 定义类型信息-待验证

类型信息工厂允许将用户定义的类型信息插入 Flink 类型系统,需要实现 org.apache.flink.api.common.typeinfo.TypeInfoFactory 以返回自定义的类型信息。

如果相应的类型已经用 @org.apache.flink.api.common.typeinfo.TypeInfo 注解,则在类型提取阶段会调用工厂。

类型信息工厂可以在 Java 和 Scala API 中使用,在类型层次结构中,向上遍历时将选择最近的工厂,内置工厂具有最高优先级;工厂的优先级也高于 Flink 的内置类型。

示例:使用 Java 中的工厂对自定义类型 MyTuple 进行注解并为其提供自定义类型信息

@TypeInfo(MyTupleTypeInfoFactory.class)
public class MyTuple<T0, T1> {public T0 myfield0;public T1 myfield1;
}

自定义类型信息的工厂

public class MyTupleTypeInfoFactory extends TypeInfoFactory<MyTuple> {@Overridepublic TypeInformation<MyTuple> createTypeInfo(Type t, Map<String, TypeInformation<?>> genericParameters) {return new MyTupleTypeInfo(genericParameters.get("T0"), genericParameters.get("T1"));}
}

方法 createTypeInfo(Type,Map<String,TypeInformation<?>) 为工厂的目标类型创建类型信息;这些参数提供了有关类型本身的附加信息,以及类型的泛型类型参数。

如果类型包含需要从 Flink 函数的输入类型派生的泛型参数,请确保还实现了org.apache.Flink.api.common.typeinfo.TypeInformation#getGenericParameters 用于泛型参数到类型信息的双向映射。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/10147.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

PMP证书如何备考?

每个过了PMP考试的考生&#xff1a;“你是如何学习和准备的”&#xff1f;答案基本分三类&#xff1a; 第一种是“临时抱佛脚”式&#xff1b;第二种是“持续抗战式”&#xff1b;第三种是“疲劳作战式”。 第一种比较符合人性和期望—20世纪三大管理定义之一的帕金斯定律&am…

qt窗口置顶

设置Qt::WindowStaysOnTopHint this->setWindowFlags(Qt::Tool| Qt::FramelessWindowHint|Qt::WindowStaysOnTopHint|Qt::X11BypassWindowManagerHint);Qt::WindowStaysOnTopHint帮助文档 Informs the window system that the window should stay on top of all other wind…

java双亲委派

双亲委派&#xff08;Parent Delegation&#xff09;是Java类加载机制中的一种设计模式&#xff0c;用于确保类的加载安全性和一致性。在双亲委派模式下&#xff0c;一个类加载器在加载类时首先委托给其父类加载器&#xff0c;只有在父类加载器无法加载该类时&#xff0c;才由子…

springmvc数据绑定

数据绑定 数据绑定流程 springmvc框架将ServletRequest对象及目标方法的入参实例传递给WebDataBinderFactory实例&#xff0c;以创建DataBinder实例对象 DataBinder调用装配在springmvc上下文中的ConversionService组件进行数据类型转换、数据格式化工作。将Servlet中的请求信息…

Frida逆向与利用自动化

title: Frida逆向与利用自动化 date: 2022-05-01 21:22:20 tags: frida categories:安卓逆向 toc_number: trueKali kali里面时间老是不对,其实只是时区不对而已,一个命令就搞定: dpkg-reconfigure tzdata 然后选择Asia→Shanghai,然后重启即可。 KaliLinux默认不带中文 a…

ctfshow web入门 php反序列化 web267--web270

web267 查看源代码发现这三个页面 然后发现登录页面直接admin/admin登录成功 然后看到了 ///backdoor/shell unserialize(base64_decode($_GET[code]))EXP <?php namespace yii\rest{class IndexAction{public $checkAccess;public $id;public function __construct(){…

C语言-STM32:初始定时器(通用定时器)

STM32定时器的作用&#xff1a; STM32定时器是一个重要的硬件资源&#xff0c;主要用于以下几种用途&#xff1a; 计数&#xff1a;它可以用来计数外部事件的次数&#xff0c;例如脉冲或信号的周期。定时&#xff1a;定时器可以设置一个特定的时间间隔后产生中断&#xff0c;用…

【Java处理word文档】

Java处理word文档 前言一、word是什么&#xff1f;二、Java处理word2.1、依赖包2.2、加载word样式2.3、读入文件2.4、单一样式段落2.5、复合样式段落2.6、将段落写入word2.7、word表格2.8、超链接2.9、写入样式及文件保存 总结示例源码样式xmlWordConstantWordStylesUtilWordUt…

Windows Server 2012 R2 新增D盘分区

我们经常搭建windows版本的游戏时会要在D盘上操作&#xff0c;今天就介绍下新的服务器如何新增一个D盘。 在"开始"图标右边有个”服务器管理器“&#xff0c;单击点开 点开服务器管理器后&#xff0c;点击“工具”打开“计算机管理” 打开计算机管理后点击“存储”-…

QT切换控件布局

1、切换前垂直布局 2、切换后水平布局 3、关键代码 qDebug() << "开始切换布局";QWidget *widget centralWidget();QLayout *layout widget->layout();if(layout){while(layout->count()){QLayoutItem *item layout->takeAt(0);if(item->layout…

【DDR 终端稳压器】Sink and Source DDR Termination Regulator [C] S0 S1 S2 S3 S4 S5 6状态

TPS51200A-Q1 器件通过 EN 功能提供 S3 支持。EN引脚可以连接到终端应用中的SLP_S3信号。当EN 高电平&#xff08;S0 状态&#xff09;时&#xff0c;REFOUT 和 VO 引脚均导通。当EN 低电平&#xff08;S3状态&#xff09;时&#xff0c;VO引脚关断并通过内部放电MOSFET放电时…

关于SwapBuffers(HDC);的效率问题

最近笔者在尝试使用SwapBuffers(hdc);函数时&#xff0c;遇到了严重的效率问题&#xff0c;通过查阅资料发现了是因为windows中开启垂直同步的原因&#xff0c;由于垂直同步会强制画面的刷新率匹配显示器的刷新率&#xff0c;因此开启垂直同步之后&#xff0c;若画面的刷新率快…

决策树的学习(Decision Tree)

1.对于决策树的概念&#xff1a; **本质上&#xff1a;**决策树就是模拟树的结构基于 if-else的多层判断 2.目的&#xff1a; 对实例进行分类的树形结构&#xff0c;通过多层判断&#xff0c;将所提供的数据归纳为一种分类规则。 3.优点&#xff1a; 1.计算量小&#xff0c;…

源代码防泄密的重要性

​源代码”作为互联网企业的核心资产之一&#xff0c;其安全性至关重要。源代码泄露不仅可能导致企业丧失技术优势&#xff0c;还可能引发知识产权纠纷、增加竞争对手的市场竞争力&#xff0c;甚至可能被用于恶意目的&#xff0c;如开发恶意软件等。因此&#xff0c;保护源代码…

婉转之声舞台之梦

【婉转之声&#xff0c;舞台之梦】&#x1f3a4; 当旋律响起&#xff0c;每个音符承载着梦想与热情。#婉拒歌手# #歌手2024# 话题引爆网络&#xff0c;观众的期待值冲破天际&#xff0c;但谁才是那婉拒名单上的神秘巨星&#xff1f;[嘻嘻] 今夜&#xff0c;我们不谈已成的荣耀&…

QT的TcpServer

Server服务器端 QT版本5.6.1 界面设计 工程文件&#xff1a; 添加 network 模块 头文件引入TcpServer类和TcpSocket&#xff1a;QTcpServer和QTcpSocket #include <QTcpServer> #include <QTcpSocket>创建server对象并实例化&#xff1a; /*h文件中*/QTcpServer…

使用Django中的Session和Cookie来传递数据

在Django中&#xff0c;Session和Cookie是两种常用的机制&#xff0c;用于在服务器端和客户端之间传递数据。下面我将简要介绍如何在Django中使用Session和Cookie来传递数据。 1、问题背景 在 Django 中&#xff0c;可以使用 request.POST 来获取表单提交的数据。但是&#xf…

Activity7框架使用学习记录

用于记录在项目中使用工作流框架 PROC_DEF_ID: 流程定义的id bpmn文件中对流程图定义的id TASK_DEF_KEY:任务定义的id bpmn文件中对每个任务定义的id PROC_INST_ID:流程实例id 启动一个流程时对流程定义的id PROC_DEF_ID&#xff08;流程定义的ID&#xff09;&#xff1a;在…

iOS plist文件增删改查

一. plist简介 plist文件&#xff0c;即属性列表文件&#xff0c;全名是Property List&#xff0c;这种文件的扩展名为.plist&#xff0c;因此&#xff0c;通常被叫做plist文件。它是一种用来存储串行化后的对象的文件&#xff0c;在iOS开发中通常用来存储用户设置&#xff0c…

《自卑与超越》

Ⅰ 内容简介 《自卑与超越》是阿德勒从个体心理学观点出发&#xff0c;阐明人生道路和人生意义的通俗性读物。但通俗中包含着极深的哲理和巨大的学术价值。在《自卑与超越》中&#xff0c;作者提出&#xff1a;每个人都有不同程度的自卑感&#xff0c;因为没有一个人对其现时的…