Flink 数据类型 TypeInformation信息

Flink流应用程序处理的是以数据对象表示的事件流。所以在Flink内部,我么需要能够处理这些对象。它们需要被序列化和反序列化,以便通过网络传送它们;或者从状态后端、检查点和保存点读取它们。为了有效地做到这一点,Flink需要明确知道应用程序所处理的数据类型。并为每个数据类型生成特定的序列化器、反序列化器和比较器。Flink支持非常完善的数据类型,数据类型描述信息都是由TypeInformation定义,比较常用的TypeInformationBasicTypeInfoTupleTypeInfoCaseClassTypeInfoPojoTypeInfo类等。TypeInformation主要作用是为了在 Flink系统内有效地对数据结构类型进行管理,能够在分布式计算过程中对数据的类型进行管理和推断。同时基于对数据的类型信息管理,Flink内部对数据存储也进行了相应的性能优化。Flink能够支持任意的JavaScala的数据类型,不用像Hadoop中的org.apache.hadoop.io.Writable而实现特定的序列化和反序列化接口,从而让用户能够更加容易使用已有的数据结构类型。另外使用TypeInformation管理数据类型信息,能够在数据处理之前将数据类型推断出来,而不是真正在触发计算后才识别出,这样能够及时有效地避免用户在使用Flink编写应用的过程中的数据类型问题。

原生数据类型

Flink通过实现BasicTypeInfo数据类型,能够支持任意Java 原生基本类型(装箱)或String类型,例如IntegerStringDouble等,如以下代码所示,通过从给定的元素集中创建DataStream数据集。

//创建 Int 类型的数据集
DataStreamSource<Integer> integerDataStreamSource = env.fromElements(1, 2, 3, 4, 5);
//创建 String 的类型的数据集
DataStreamSource<String> stringDataStreamSource = env.fromElements("Java", "Scala");

Flink实现另外一种TypeInfomationBasicArrayTypeInfo,对应的是Java基本类型数组(装箱)或String对象的数组,如下代码通过使用 Array数组和List集合创建DataStream数据集。

List<Integer> integers = Arrays.asList(1, 2, 3, 4, 5);
//通过 List 集合创建数据集
DataStreamSource<Integer> integerDataStreamSource1 = env.fromCollection(integers);

Java Tuples类型

通过定义TupleTypeInfo来描述Tuple类型数据,FlinkJava接口中定义了元祖类Tuple供用户使用。Flink Tuples是固定长度固定类型的Java Tuple实现,不支持空值存储。目前支持任意的Flink Java Tuple类型字段数量上限为25,如果字段数量超过上限,可以通过继承Tuple类的方式进行拓展。如下代码所示,创建Tuple数据类型数据集。

//通过实例化 Tuple2 创建具有两个元素的数据集
DataStreamSource<Tuple2<String, Integer>> tuple2DataStreamSource = env.fromElements(new Tuple2<>("a", 1), new Tuple2<>("b", 2));
//通过实例化 Tuple3 创建具有三个元素的数据集
DataStreamSource<Tuple3<String, Integer, Long>> tuple3DataStreamSource = env.fromElements(new Tuple3<>("a", 1, 3L), new Tuple3<>("b", 2, 3L));

Scala Case Class类型

Flink通过实现CaseClassTypeInfo支持任意的Scala Case Class,包括Scala tuples类型,支持的字段数量上限为22,支持通过字段名称和位置索引获取指标,不支持存储空值。如下代码实例所示,定义WordCount Case Class数据类型,然后通过fromElements方法创建input数据集,调用keyBy()方法对数据集根据 word字段重新分区。

//定义 WordCount Case Class 数据结构
case class WordCount(word: Sring, count: Int)
//通过 fromElements 方法创建数据集
val input = env.fromElements(WordCount("hello", 1),WordCount("word",2))
val keyStream1 = input.keyBy("word")//根据word字段为分区字段,
val keyStream2 = input.keyBy(0)//也可以通过制定position分区

通过使用Scala Tuple创建DataStream数据集,其他的使用方式和Case Class相似。需要注意的是,如果根据名称获取字段,可以使用 Tuple中的默认字段名称。

//通过实例化Scala Tuple2 创建具有两个元素的数据集
val tupleStream: DataStream[Tuple2[String,Int]] = env.fromElements(("a",1),("b",2));
//使用默认名字段获取字段,表示第一个 tuple字段,相当于下标0
tuple2DataStreamSource.keyBy("_1");

POJOs 类型

POJOs类可以完成复杂数据结构的定义,Flink通过实现PojoTypeInfo来描述任意的POJOs,包括JavaScala类。在Flink中使用POJOs类可以通过字段名称获取字段,例如dataStream.join(otherStream).where("name").equalTo("personName"),对于用户做数据处理则非常透明和简单,如代码所示。如果在Flink中使用POJOs数据类型,需要遵循以下要求:
【1】POJOs类必须是Public修饰且必须独立定义,不能是内部类;
【2】POJOs类中必须含有默认空构造器;
【3】POJOs类中所有的 Fields必须是Public或者具有Public修饰的gettersetter方法;
【4】POJOs类中的字段类型必须是Flink支持的。

//类和属性具有 public 修饰
public class Persion{public String name;public Integer age;//具有默认的空构造器public Persion(){}public Persion(String name,Integer age){this.name = name;this.age = age;};
}

定义好POJOs Class后,就可以在 Flink环境中使用了,如下代码所示,使用fromElements接口构建Person类的数据集。POJOs类仅支持字段名称指定字段,如代码中通过Person name来指定Keyby字段。

DataStreamSource<Persion> persionDataStreamSource = env.fromElements(new Persion("zzx", 18), new Persion("fj", 16));
persionData.keyBy("name").sum("age");

Flink Value类型

Value数据类型实现了org.apache.flink.types.Value,其中包括read()write()两个方法完成序列化和反序列化操作,相对于通用的序列化工具会有着比较高效的性能。目前Flink提供了內建的Value类型有IntValue、DoubleValue以及StringValue等,用户可以结合原生数据类型和Value类型使用。

特殊数据类型

Flink中也支持一些比较特殊的数据数据类型,例如Scala中的ListMapEitherOptionTry数据类型,以及Java中Either数据类型,还有HadoopWritable数据类型。如下代码所示,创建MapList类型数据集。这种数据类型使用场景不是特别广泛,主要原因是数据中的操作相对不像POJOs类那样方便和透明,用户无法根据字段位置或者名称获取字段信息,同时要借助Types Hint帮助Flink推断数据类型信息,关于Tyeps Hmt介绍可以参考下一小节。

//创建 map 类型数据集
Map map = new HashMap<>();
map.put("name","zzx");
map.put("age",12);
env.fromElements(map);
//创建 List 类型数据集
env.fromElements(Arrays.asList(1,2,3,4,5),Arrays.asList(3,4,5));

TypeInformation信息获取: 通常情况下Flink都能正常进行数据类型推断,并选择合适的serializers以及comparators。但在某些情况下却无法直接做到,例如定义函数时如果使用到了泛型,JVM就会出现类型擦除的问题,使得Flink并不能很容易地获取到数据集中的数据类型信息。同时在Scala APIJava API中,Flink分别使用了不同的方式重构了数据类型信息。

Scala API类型信息

Scala API通过使用Manifest和类标签,在编译器运行时获取类型信息,即使是在函数定义中使用了泛型,也不会像Java API出现类型擦除的问题,这使得Scala API具有非常精密的类型管理机制。同时在Flink中使用到Scala Macros框架,在编译代码的过程中推断函数输入参数和返回值的类型信息,同时在Flink中注册成TypeInformation以支持上层计算算子使用。
当使用Scala API开发 Flink应用,如果使用到Flink已经通过TypeInformation定义的数据类型,TypeInformation类不会自动创建,而是使用隐式参数的方式引入,代码不会直接抛出编码异常,但是当启动Flink应用程序时就会报could not find implicit value for evidence parameter of type TypeInformation的错误。这时需要将TypeInformation类隐式参数引入到当前程序环境中,代码实例如下:

import org.apache.flink.api.scala._

Java API类型信息

由于Java的泛型会出现类型擦除问题,Flink通过Java反射机制尽可能重构类型信息,例如使用函数签名以及子类的信息等。同时类型推断在当输出类型依赖于输入参数类型时相对比较容易做到,但是如果函数的输出类型不依赖于输入参数的类型信息,这个时候就需要借助于类型提示Ctype Himts来告诉系统函数中传入的参数类型信息和输出参数信息。如代码清单通过在returns方法中传入TypeHint实例指定输出参数类型,帮助Flink系统对输出类型进行数据类型参数的推断和收集。

//定义泛型函数,输入参数 T,O 输出参数为 O
class MyMapFucntion<T,O> implements MapFunction<T,O>{@Overridepublic O map(T t) throws Exception {//定义计算逻辑return null;}
}//通过 List 集合创建数据集
DataStreamSource<Integer> input = env.fromCollection(integers);
input.flatMap(new MyMapFucntion<String,Integer>()).returns(new TypeHint<Integer>() {//通过returns方法指定返回参数类型
})

在使用Java API定义POJOs类型数据时,PojoTypeInformationPOJOs类中的所有字段创建序列化器,对于标准的类型,例如IntegerStringLong等类型是通过Flink自带的序列化器进行数据序列化,对于其他类型数据都是直接调用Kryo序列化工具来进行序列化。通常情况下,如果Kryo序列化工具无法对POJOs类序列化时,可以使用AvroPOJOs类进行序列化,如下代码通过在ExecutionConfig中调用 enableForceAvro()来开启Avro序列化。

//获取运行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//开启 avro 序列化
env.getConfig().enableForceAvro();

如果用户想使用Kryo序列化工具来序列化POJOs所有字段,则在ExecutionConfig中调用enableForceKryo()来开启Kryo序列化。

//获取运行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//开启 Kryo 序列化
env.getConfig().enableForceKryo();

如果默认的Kryo序列化类不能序列化POJOs对象,通过调用ExecutionConfigaddDefaultKryoSerializer()方法向Kryo中添加自定义的序列化器。

public void addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass)

自定义TypeInformation

除了使用已有的TypeInformation所定义的数据格式类型之外,用户也可以自定义实现TypeInformation,来满足的不同的数据类型定义需求。Flink提供了可插拔的 Type Information Factory让用户将自定义的TypeInformation注册到Flink类型系统中。如下代码所示只需要通过实现org.apache.flink.api.common.typeinfo.TypeInfoFactory接口,返回相应的类型信息。通过@TypeInfo注解创建数据类型,定义CustomTuple数据类型。

@TypeInfo(CustomTypeInfoFactory.class)
public class CustomTuple<T0,T1>{public T0 field0;public T1 field1;
}

然后定义CustomTypeInfoFactory类继承于TypeInfoFactory,参数类型指定CustomTuple。最后重写createTypeInfo方法,创建的CustomTupleTypeInfo就是CustomTuple数据类型TypeInformation

public class CustomTypeInfoFactory extends TypeInfoFactory<CustomTuple>{@Overridepublic TypeInfomation<CustomTuple> createTypeInfo(Type t, Map<String,TypeInfoFactory<?>> genericParameters){return new CustomTupleTypeInfo(genericParameters.get("T0"),genericParameters.get("T1");}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/232934.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【大麦小米学量化】使用xtquant调用迅投MiniQMT客户端定时操作逆回购,再也不担心忘了赚零花钱了(含完整源代码)

文章目录 前言一、逆回购是什么&#xff1f;1. 什么是逆回购&#xff1f;2. 最低参与金额是多少&#xff1f;3. 逆回购交易是否安全&#xff1f;4. 逆回购交易适合什么类型的客户&#xff1f; 二、讯投XtQuant是什么&#xff1f;1. XtQuant运行依赖环境2. XtQuant运行逻辑 三、…

Linux Docker本地部署WBO在线协作白板结合内网穿透远程访问

文章目录 前言1. 部署WBO白板2. 本地访问WBO白板3. Linux 安装cpolar4. 配置WBO公网访问地址5. 公网远程访问WBO白板6. 固定WBO白板公网地址 前言 WBO在线协作白板是一个自由和开源的在线协作白板&#xff0c;允许多个用户同时在一个虚拟的大型白板上画图。该白板对所有线上用…

LeetCode刷题--- 全排列 II

个人主页&#xff1a;元清加油_【C】,【C语言】,【数据结构与算法】-CSDN博客 个人专栏 力扣递归算法题 http://t.csdnimg.cn/yUl2I 【C】 http://t.csdnimg.cn/6AbpV 数据结构与算法 http://t.csdnimg.cn/hKh2l 前言&#xff1a;这个专栏主要讲述递归递归、搜…

搭建git服务器(本地局域网)

搭建git服务器&#xff08;本地局域网&#xff09; 创建仓库 (假定在/home/git目录下创建仓库) git init --bare sample.git克隆远程仓库到本地 git clone git192.168.0.100:/home/git/sample.git已有项目&#xff0c;绑定远程仓库 # 查看远程仓库绑定 git remote -v# 解除…

拼多多ID取商品详情API:电商行业的核心价值与实时数据获取策略

一、引言 在当今的电商行业中&#xff0c;数据是驱动业务决策和优化用户体验的关键因素。拼多多作为中国电商市场的主要参与者&#xff0c;其根据ID取商品详情原数据的API在电商行业中具有显著的重要性。本文将深入探讨这个话题&#xff0c;并介绍如何实现实时数据获取。 二、…

UE学习记录09----C++实现Actor 单击/双击事件

思路&#xff1a;重载Actor的单击事件&#xff0c;通过计时器判断时间段内鼠标惦记的次数 // Fill out your copyright notice in the Description page of Project Settings.#pragma once#include "CoreMinimal.h" #include "GameFramework/Actor.h" #in…

回归预测 | MATLAB实现GA-LSSVM基于遗传算法优化最小二乘向量机的多输入单输出数据回归预测模型 (多指标,多图)

回归预测 | MATLAB实现GA-LSSVM基于遗传算法优化最小二乘向量机的多输入单输出数据回归预测模型 &#xff08;多指标&#xff0c;多图&#xff09; 目录 回归预测 | MATLAB实现GA-LSSVM基于遗传算法优化最小二乘向量机的多输入单输出数据回归预测模型 &#xff08;多指标&#…

doNet Core中解压zip

doNet Core4.0解压zip文件 1、ZipInputStream.cs public class ZipHelper { /// /// 解压缩一个 zip 文件。 /// /// The ziped file. /// The STR directory. /// 是否覆盖已存在的文件。 public static void UnZip(string zipedFile, string strDirectory, bool overWrit…

ISCTF(b)

test_nc nc_shell ls cat flag 这两道题比较像 你说爱我&#xff1f;尊嘟假嘟 打开后重复出现 “ 你说爱我 ” “ 尊嘟 ” “ 假嘟 ” 。判断为 Ook 加密 , 将 “ 你说爱我 ” 替换为 “Ook.” &#xff1b; “ 尊嘟 ” 替换为&#xff1a; “Ook!” &#xff1b; “ 假嘟…

mysql函数(二)之常见字符串函数

MySQL中常见的字符串函数有以下几种&#xff1a; CONCAT()&#xff1a;将两个或多个字符串连接在一起。 用法&#xff1a;CONCAT(string1, string2, ...) 效果图&#xff1a; LENGTH()&#xff1a;返回字符串的长度。 用法&#xff1a;LENGTH(string) 效果图&#xff1a; U…

教你如何使用天眼销查企业客户

天眼销是一款能够帮助客户获取最新的企业联系方式、工商信息等关键数据的平台。 平台基于先进的技术和大数据解决方案&#xff0c;深入挖掘和分析企业信息&#xff0c;能够高效地收集、整理和存储各类企业数据&#xff0c;为用户提供360度视角和洞察&#xff1b;提供全面、准确…

【算法与数据结构】LeetCode55、45、跳跃游戏 I 、II

文章目录 一、跳跃游戏I二、跳跃游戏II三、完整代码 所有的LeetCode题解索引&#xff0c;可以看这篇文章——【算法和数据结构】LeetCode题解。 一、跳跃游戏I 思路分析&#xff1a;本题目标是根据跳跃数组的元素&#xff0c;判断最终能够到达数组末端。我们引入了一个跳跃范围…

跨境电商的未来工作方式:远程团队与全球协作

随着数字化时代的来临&#xff0c;跨境电商行业在不断演变&#xff0c;其未来工作方式也呈现出新的趋势。本文将探讨跨境电商未来的工作方式&#xff0c;聚焦于远程团队与全球协作的发展&#xff0c;以揭示这一变革如何重新定义企业的组织结构和工作模式。 远程团队的崛起 近年…

【重点!】【二分查找】33.搜索旋转排序数组

题目 法1&#xff1a;二分查找 根据mid来分段&#xff0c;此思路需要牢记&#xff01;&#xff01;&#xff01; class Solution {public int search(int[] nums, int target) {if (nums.length 0) {return -1;}int left 0, right nums.length - 1, mid 0;while (left &…

Leetcode—859.亲密字符串【简单】

2023每日刷题&#xff08;六十三&#xff09; Leetcode—859.亲密字符串 &#x1f4a9;山实现代码 class Solution { public:bool buddyStrings(string s, string goal) {int len1 s.size(), len2 goal.size();int cnt 0;int flag 0;int flag2 0;int odd -1;int a[26] …

双指针——找到字符串中的所有字母异位词

https://leetcode.cn/problems/find-all-anagrams-in-a-string/description/?envTypestudy-plan-v2&envIdtop-100-liked 双指针&#xff0c;每次都统计出来p长度的滑动窗口里的数字,拿Arrays.equals进行对比,然后滑动一小格&#xff0c;减1加1继续比对即可。 class Solut…

VS2019, mfc,c++和halcon 2022调试的时候,查询halcon变量的值, 一直提示未为 halconcpp.dll 加载任何符号

在调试看值的过程中&#xff0c;发现这里看不到变量的值。 可以使用halcon变量检查工具查看。

Leetcode—96.不同的二叉搜索树【中等】

2023每日刷题&#xff08;六十四&#xff09; Leetcode—96.不同的二叉搜索树 算法思想 实现代码 class Solution { public:int numTrees(int n) {vector<int> G(n 1, 0);G[0] 1;G[1] 1;for(int i 2; i < n; i) {for(int j 1; j < i; j) {G[i] G[j - 1] * …

多目标跟踪学习

本文来源&#xff1a; 目标跟踪那些事儿-技术和课程介绍_哔哩哔哩_bilibili 为该视频的学习笔记 目的&#xff1a;我的学习目的主要是了解现有的跟踪算法&#xff0c;并着重了解卡尔曼滤波算法&#xff0c;利用卡尔曼滤波算法进行多目标跟踪等后续一系列估计算法。老师视频中提…

harmonyOS 自定义组件基础演示讲解

上文 HarmonyOS组件属性控制 链式编程格式推荐我们讲了一些系统组件 可以传入一些事件和参数 来达到一些不同的效果 其实 我们还可以用自己写的组件 那么 组件这么写&#xff1f; 其实 我们的 page 内部结果 就是一个组件 harmonyOS的概念 万物皆组件 那么 我们就可以在他下面…