聊聊Flink:Flink的分区机制

一、前言

flink任务在执行过程中,一个流(stream)包含一个或多个分区(Stream partition)。TaskManager中的一个slot的subtask就是一个stream partition(流分区),一个Job的流(stream)分布在多个不同的Slot上执行。每一个算子可以包含一个或多个子任务(subtask),这些subtask执行在不同的分区中,本质是在不同的线程、不同的物理机或不同的容器中彼此互不依赖地执行。

1.1 Flink数据传输

  • 组件之间的通信消息传输,即Client、JobManager、TaskManager之间的信息传递,采用Akka框架(主要用作组件间的协同,如心跳检测、状态上报、指标统计、作业提交和部署等)。
  • 算子之间的流数据传输
    • 本地线程内的流数据传输(同一个SubTask中):同一个SubTask内的两个Operator(属于同一个OperatorChain)之间的数据传输是方法调用,即上游算子处理完数据后,直接调用下游算子的processElement方法。
    • 本地线程间的流数据传输(同一个TaskManager的不同SubTask中):即同一个TaskManager(JVM进程)中的不同Task(线程,本质上是SubTask)的算子之间的数据传输,通过本地内存进行数据传递,存在数据序列化和反序列过程。
    • 跨网络的流数据传输(不同TaskManager的SubTask中):采用Netty框架,通过Socket传递,也存在数据序列化和反序列过程。

flink中的重分区算子定义上下游subtask之间数据传递的方式,SubTask之间进行数据传递模式有两种,一种是one-to-oneforwarding)模式,另一种是redistributing的模式。

1.2 重分区算子数据传递的两种方式

  • One-to-one:数据不需要重新分布,上游SubTask生产的数据与下游SubTask受到的数据完全一致,数据不需要重分区,也就是数据不需要经过IO,比如下图中source->map的数据传递形式就是One-to-One方式。常见的map、fliter、flatMap等算子的SubTask的数据传递都是one-to-one的对应关系。类似于spark中的窄依赖。
  • Redistributing:数据需要通过shuffle过程重新分区,需要经过IO,比如上图中的map->keyBy。创建的keyBy、broadcast、rebalance、shuffle等算子的SubTask的数据传递都是Redistributing方式,但它们具体数据传递方式是不同的。类似于spark中的宽依赖。

在这里插入图片描述

flink中的重分区算子除了keyBy以外,还有broadcast、rebalance、shuffle、rescale、global、partitionCustom等多种算子,它们的分区方式各不相同。需要注意的是,这些算子中除了keyBy能将DataStream转化为KeyedStream外,其它重分区算子均不会改变Stream的类型。

二、分区策略

数据在算子之间流动需要依靠分区策略(分区器),Flink目前内置了以下几种分区策略和自定义分区策略。已实现的分区策略对应的API为:
在这里插入图片描述
自定义分区策略的API为CustomPartitionerWrapper。

各个API的继承关系如下图所示:
在这里插入图片描述
ChannelSelector是分区策略的顶层接口,其决定了记录应该写入哪个逻辑通道,通道可理解为下游算子的某个实例,或下游并行算子的某个子任务。该接口的定义源码如下:
在这里插入图片描述

抽象类StreamPartitioner实现了ChannelSelector接口,是一个用于流程序的特殊的ChannelSelector,其中定义了一些通用的分区策略方法。Flink中的所有分区策略(分区器)都继承了StreamPartitioner类,并且实现了各自独有的分区规则。

三、内置分区策略

3.1 BinaryHashPartitioner

该分区策略位于Blink的Table API的org.apache.flink.table.runtime.partitioner包中,是一种针对BinaryRowData的哈希分区器。BinaryRowData是RowData的实现,可以显著减少Java对象的序列化/反序列化。RowData用于表示结构化数据类型,运行时通过Table API或SQL管道传递的所有顶级记录都是RowData的实例。关于BinaryHashPartitioner,我们这里不做过多讲解。

3.2 BroadcastPartitioner

广播分区策略将上游数据记录输出到下游算子的每个并行实例中,即下游每个分区都会有上游的所有数据。使用DataStream的broadcast()方法即可设置该DataStream向下游发送数据时使用广播分区策略。

来一段代码演示下:

/*** 微信公众号:老周聊架构*/
public class PartitionerTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);DataStream<Integer> dataStream = env.fromElements(1, 2, 3, 4, 5, 6);//1.分区策略前的操作//输出dataStream每个元素及所属的子任务编号dataStream.map(new RichMapFunction<Integer, Object>() {@Overridepublic Object map(Integer value) throws Exception {System.out.println(String.format("元素值: %s, 分区策略前,子任务编号: %s", value,getRuntimeContext().getIndexOfThisSubtask()));return value;}});//2.设置分区策略//设置DataStream向下游发送数据时使用的策略DataStream<Integer> dataStreamAfter = dataStream.broadcast();//3.分区策略后的操作dataStreamAfter.map(new RichMapFunction<Integer, Object>() {@Overridepublic Object map(Integer value) throws Exception {System.out.println(String.format("元素值: %s, 分区策略后,子任务编号: %s", value,getRuntimeContext().getIndexOfThisSubtask()));return value;}}).print();env.execute("PartitionerTest Job");}
}

直接IDEA控制台输出:
在这里插入图片描述

从输出结果可以看出,数据共分为3个分区(编号为0、1、2)。执行分区策略前,每个元素所属的分区:
在这里插入图片描述
执行分区策略后,每个元素所属的分区如下:
在这里插入图片描述
对比表发现,广播分区策略将上游每个元素分别发送到了下游算子的所有分区,这种策略会把数据复制多份,向下游算子的每个分区发送一份。
在这里插入图片描述
我们把上面的任务提交到Flink,同样也可以看出前面分区前每个子任务两条数据,分区后每个子任务六条数据。
在这里插入图片描述
在这里插入图片描述
3.3 ForwardPartitioner

转发分区策略只将元素转发给本地运行的下游算子的实例,即将元素发送到与当前算子实例在同一个TaskManager的下游算子实例,而不需要进行网络传输。要求上下游算子并行度一样,这样上下游算子可以同属一个子任务。

这里把上面的代码调整下:

dataStream.forward()

IDEA控制台输出:
在这里插入图片描述
从输出结果可以看出,数据共分为3个分区(编号为0、1、2)。执行分区策略前,每个元素所属的分区:
在这里插入图片描述

执行分区策略后,每个元素所属的分区如下:
在这里插入图片描述

对比发现,转发分区策略将上游同一个分区的元素发送到了下游同一个分区中。使用数据流图表示如下图:

在这里插入图片描述
在上下游的算子没有指定分区策略的情况下,如果上下游的算子并行度一致,则默认使用ForwardPartitioner,否则使用RebalancePartitioner。在StreamGraph类的源码中可以看到该规则:
在这里插入图片描述
对于ForwardPartitioner,必须保证上下游算子并行度一致,否则会抛出异常。
在这里插入图片描述
3.4 GlobalPartitioner

全局分区策略将上游所有元素发送到下游子任务编号等于0的分区算子实例上(下游第一个实例)。

这里把上面的代码调整下:

dataStream.global()

IDEA控制台输出:
在这里插入图片描述
分区前:
在这里插入图片描述

分区后:
在这里插入图片描述

全局分区策略将上游所有分区中的所有元素发送到了下游编号为0的分区中:
在这里插入图片描述

3.5 .KeyGroupStreamPartitioner

Key分区策略根据元素Key的Hash值输出到下游算子指定的实例。keyBy()算子底层正是使用的该分区策略,底层最终会调用KeyGroupStreamPartitioner的selectChannel()方法,计算每个Key对应的通道索引(通道编号,可理解为分区编号),根据通道索引将Key发送到下游相应的分区中。selectChannel()方法源码如下:
在这里插入图片描述
在这里插入图片描述

总的来说,Flink底层计算通道索引(分区编号)的流程如下:

  • 计算Key的HashCode值。
  • 将Key的HashCode值进行特殊的Hash处理,即MathUtils.murmurHash(keyHash),返回一个非负哈希码。
  • 将非负哈希码除以最大并行度取余数,得到keyGroupId,即Key组索引。
  • 使用公式keyGroupId×parallelism/maxParallelism得到分区编号。parallelism为当前算子的并行度,即通道数量;maxParallelism为系统默认支持的最大并行度,即128。

3.6 RebalancePartitioner

平衡分区策略使用循环遍历下游分区的方式,将上游元素均匀分配给下游算子的每个实例。每个下游算子的实例都具有相等的负载。当数据流中的元素存在数据倾斜时,使用该策略对性能有很大的提升。

这里把上面的代码调整下:

dataStream.setParallelism(2);

dataStreamAfter.setParallelism(3);

dataStream.rebalance()

IDEA控制台输出:
在这里插入图片描述

分区前:
在这里插入图片描述
分区后:

在这里插入图片描述

平衡分区策略将上游所有元素均匀发送到了下游算子的所有分区:
在这里插入图片描述

3.7 RescalePartitioner

重新调节分区策略基于上下游算子的并行度,将元素以循环的方式输出到下游算子的每个实例。类似于平衡分区策略,但又与平衡分区策略不同。

上游算子将元素发送到下游哪一个算子实例,取决于上游和下游算子的并行度。例如,如果上游算子的并行度为2,而下游算子的并行度为4,那么一个上游算子实例将把元素均匀分配给两个下游算子实例,而另一个上游算子实例将把元素均匀分配给另外两个下游算子实例。相反,如果下游算子的并行度为2,而上游算子的并行度为4,那么两个上游算子实例将分配给一个下游算子实例,而另外两个上游算子实例将分配给另一个下游算子实例。

假设上游算子并行度为2,分区编号为A和B,下游算子并行度为4,分区编号为1、2、3、4,那么A将把数据循环发送给1和2,B则把数据循环发送给3和4。假设上游算子并行度为4,编号为A、B、C、D,下游算子并行度为2,编号为1、2,那么A和B把数据发送给1,C和D则把数据发送给2。

这里把上面的代码调整下:

dataStream.rescale()

同时将第一个map算子的并行度设置为2,第二个map算子的并行度设置为4。

IDEA控制台输出:
在这里插入图片描述

分区前:
在这里插入图片描述

分区后:
在这里插入图片描述
在这里插入图片描述

接下来改变map算子的并行度,将第一个map算子的并行度设置为4,第二个map算子的并行度设置为2。

在这里插入图片描述

如果想将元素均匀地输出到下游算子的每个实例,以实现负载均衡,同时又不希望使用平衡分区策略的全局负载均衡,则可以使用重新调节分区策略。该策略会尽可能避免数据在网络间传输,而能否避免还取决于TaskManager的Task Slot数量、上下游算子的并行度等。

3.8 ShufflePartitioner

随机分区策略将上游算子元素输出到下游算子的随机实例中。元素会被均匀分配到下游算子的每个实例。这种策略可以实现计算任务的负载均衡。

这里把上面的代码调整下:

dataStream.shuffle()

这里就不做过多演示了。我们下面来看下自定义分区策略。

四、自定义分区策略

自定义分区策略的API为CustomPartitionerWrapper。该策略允许开发者自定义规则将上游算子元素发送到下游指定的算子实例中。

4.1 新建自定义分区器

新建分区器类MyCustomPartitioner并实现接口Partitioner(Object表示分区Key的数据类型),实现其中未实现的方法partition(),在该方法中添加相应的分区逻辑。

/*** 自定义分区策略* 微信公众号:老周聊架构*/
public class MyCustomPartitioner implements Partitioner {@Overridepublic int partition(Object key, int numPartitions) {if (key.equals("chinese")) {return 0;} else if (key.equals("math")) {return 1;} else {return 2;}}
}

上述代码通过partition()方法取得分区编号,将Key值等于chinese的元素分配到编号为0的分区,将Key值等于math的元素分配到编号为1的分区,其余元素分配到编号为2的分区。

4.2 使用自定义分区器

调用DataStream的partitionCustom()方法传入自定义分区器类MyCustomPartitioner的实例,可以对DataStream按照自定义规则进行重新分区,代码如下:

/*** 自定义分区策略* 微信公众号:老周聊架构*/
public class CustomPartitionerTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);DataStream<String> dataStream = env.fromElements("chinese,98", "math,88", "english,96");//1.分区策略前的操作//输出dataStream每个元素及所属的子任务编号SingleOutputStreamOperator<Map<String, Integer>> dataStreamBefore =dataStream.map(new RichMapFunction<String, Map<String, Integer>>() {@Overridepublic Map<String, Integer> map(String value) throws Exception {System.out.println(String.format("元素值: %s, 分区策略前,子任务编号: %s", value,getRuntimeContext().getIndexOfThisSubtask()));Map<String, Integer> map = new HashMap<>();map.put(value.split(",")[0], Integer.parseInt(value.split(",")[1]));return map;}}).setParallelism(2);//2.设置分区策略//设置DataStream向下游发送数据时使用的策略DataStream<Map<String, Integer>> dataStreamAfter = dataStreamBefore.partitionCustom(new MyCustomPartitioner(), value -> value);//3.分区策略后的操作dataStreamAfter.map(new RichMapFunction<Map<String, Integer>, Map<String, Integer>>() {@Overridepublic Map<String, Integer> map(Map<String, Integer> value) throws Exception {System.out.println(String.format("元素值: %s, 分区策略后,子任务编号: %s", value,getRuntimeContext().getIndexOfThisSubtask()));return value;}}).setParallelism(3).print();env.execute("CustomPartitionerTest Job");}
}

分区前:
在这里插入图片描述

分区后:

在这里插入图片描述
自定义分区策略将上游所有元素按照自定义的规则发送到了下游的3个分区中。

把任务给到Flink上去跑,发现:
在这里插入图片描述

这是因为泛型擦除,下面的DataStream泛型需要指定类型,不能
在这里插入图片描述

小知识:

在编译之后程序会采取去泛型化的措施。也就是说Java中的泛型,只在编译阶段有效。在编译过程中,正确检验泛型结果后,在运行时会将泛型的相关信息擦除,编译器只会在对象进入JVM和离开JVM的边界处添加类型检查和转换的方法,泛型的信息不会进入到运行时阶段,这就是所谓的Java类型擦除

类型加好以后,再跑一下任务,会出现任务成功。

在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/60734.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

IO流实用案例:用字节流--输入流(Inpustream)、输出流(OutputStream)写一个拷贝图片的案例--超简单!

案例背景&#xff1a; 我的电脑桌面有一张白敬亭的照片&#xff0c;我们需要把这张照片拷贝到我的电脑D:\学习软件\copyBJT目录下&#xff0c;当前我们这个目录是没有东西的。 代码演示以及注释&#xff1a; ublic class StreamCopy {public static void main(String[] args)…

ArkTS学习笔记:ArkTS起步

ArkTS是HarmonyOS的主力应用开发语言&#xff0c;基于TypeScript扩展&#xff0c;强化了静态检查和分析&#xff0c;旨在提升程序稳定性和性能。它采用静态类型&#xff0c;禁止运行时改变对象布局&#xff0c;并对UI开发框架能力进行扩展&#xff0c;支持声明式UI描述和自定义…

卡尔曼滤波:从理论到应用的简介

卡尔曼滤波&#xff08;Kalman Filter&#xff09;是一种递归算法&#xff0c;用于对一系列噪声观测数据进行动态系统状态估计。它广泛应用于导航、控制系统、信号处理、金融预测等多个领域。本文将介绍卡尔曼滤波的基本原理、核心公式和应用案例。 1. 什么是卡尔曼滤波&#x…

【已解决】git push一直提示输入用户名及密码、fatal: Could not read from remote repository的问题

问题描述&#xff1a; 在实操中&#xff0c;git push代码到github上一直提示输入用户名及密码&#xff0c;并且跳出的输入框输入用户名和密码后&#xff0c;报错找不到远程仓库 实际解决中&#xff0c;发现我环境有两个问题解决&#xff1a; git push一直提示输入用户名及密码…

【Rust 编程语言工具】rustup-init.exe 安装与使用指南

rustup-init.exe 是用于安装和管理 Rust 编程语言工具链的 Windows 可执行文件。Rust 是一种系统级编程语言&#xff0c;旨在提供安全、并发和高性能的功能。rustup-init.exe 是官方提供的安装器&#xff0c;用于将 Rust 安装到 Windows 操作系统中&#xff0c;并配置相关环境。…

Mac 使用mac 原生工具将mp4视频文件提取其中的 mp3 音频文件

简介 Hello! 非常感谢您阅读海轰的文章,倘若文中有错误的地方,欢迎您指出~ ଘ(੭ˊᵕˋ)੭ 昵称:海轰 标签:程序猿|C++选手|学生 简介:因C语言结识编程,随后转入计算机专业,获得过国家奖学金,有幸在竞赛中拿过一些国奖、省奖…已保研 学习经验:扎实基础 + 多做笔…

项目中用户数据获取遇到bug

项目跟练的时候 Uncaught (in promise) TypeError: Cannot read properties of undefined (reading ‘code’) at Proxy.userInfo (user.ts:57:17) 因此我想要用result接受信息的时候会出错&#xff0c;报错显示为result.code没有该值 导致我无法获取到相应的数据 解决如下 给…

【视觉SLAM】1-概述

读书笔记 文章目录 1. 经典视觉SLAM框架2. 数学表述2.1 运动方程2.2 观测方程2.3 问题抽象 1. 经典视觉SLAM框架 传感器信息读取&#xff1a;相机图像、IMU等多源数据&#xff1b;前端视觉里程计&#xff08;Visual Odometry&#xff0c;VO&#xff09;&#xff1a;估计相机的相…

Isaac Sim+SKRL机器人并行强化学习

目录 Isaac Sim介绍 OmniIssacGymEnvs安装 SKRL安装与测试 基于UR5的机械臂Reach强化学习测评 机器人控制 OMNI GYM环境编写 SKRL运行文件 训练结果与速度对比 结果分析 运行体验与建议 Isaac Sim介绍 Isaac Sim是英伟达出的一款机器人仿真平台&#xff0c;适用于做机…

Python学习------第八天

函数 函数的传入参数 掌握函数返回值的作用 掌握函数返回值的定义语法 函数的嵌套调用&#xff1a; 函数的局部变量和全局变量 局部变量的作用&#xff1a;在函数体内部&#xff0c;临时保存数据&#xff0c;即当函数调用完成后&#xff0c;则销毁局部变量。 money 5000000 n…

机器学习基础04

目录 1.朴素贝叶斯-分类 1.1贝叶斯分类理论 1.2条件概率 1.3全概率公式 1.4贝叶斯推断 1.5朴素贝叶斯推断 1.6拉普拉斯平滑系数 1.7API 2.决策树-分类 2.1决策树 2.2基于信息增益的决策树建立 2.2.1信息熵 2.2.2信息增益 2.2.3信息增益决策树建立步骤 2.3基于基…

The Internals of PostgreSQL 翻译版 持续更新...

为了方便自己快速学习&#xff0c;整理了翻译版本&#xff0c;目前翻译的还不完善&#xff0c;后续会边学习边完善。 文档用于自己快速参考&#xff0c;会持续修正&#xff0c;能力有限,无法确保正确!!! 《The Internals of PostgreSQL 》 不是 《 PostgreSQL14 Internals 》…

Android 无签名系统 debug 版本APK push到设备引起的开机异常问题分析(zygote进程)

问题背景 前置操作&#xff1a; 替换原system/priv-app 目录下已有的应用包未未签名的debug版本&#xff0c;然后重启。 现象&#xff1a; 无法正常开机&#xff0c;卡在开机动画&#xff0c;并且pm没有起来&#xff0c;因为执行adb install 命令是返回“cmd: Cant find se…

深度学习推荐系统的工程实现

参考自《深度学习推荐系统》——王喆&#xff0c;用于学习和记录。 介绍 之前章节主要从理论和算法层面介绍了推荐系统的关键思想。但算法和模型终究只是“好酒”&#xff0c;还需要用合适的“容器”盛载才能呈现出最好的味道&#xff0c;这里的“容器”指的就是实现推荐系统…

attention 注意力机制 学习笔记-GPT2

注意力机制 这可能是比较核心的地方了。 gpt2 是一个decoder-only模型&#xff0c;也就是仅仅使用decoder层而没有encoder层。 decoder层中使用了masked-attention 来进行注意力计算。在看代码之前&#xff0c;先了解attention-forward的相关背景知识。 在普通的self-atten…

Java 内存区域详解

对于 Java 程序员来说&#xff0c;在虚拟机自动内存管理机制下&#xff0c;不再需要像 C/C程序开发程序员这样为每一个 new 操作去写对应的 delete/free 操作&#xff0c;不容易出现内存泄漏和内存溢出问题。正是因为 Java 程序员把内存控制权利交给 Java 虚拟机&#xff0c;一…

FluentUI使用

首先向Qt Qml FluentUI组件库的作者zhuzichu520致敬&#xff01; 一、源码下载地址&#xff1a; 1&#xff09;GitHub - zhuzichu520/FluentUI: FluentUI for QML 2&#xff09;GitCode - 全球开发者的开源社区,开源代码托管平台 二、Qt6下载地址&#xff1a; qt-online-i…

【UE5】在材质Custom写函数的方法

UE材质的Custom本身会构建为函数&#xff0c;所以并不能在Custom定义函数&#xff0c;但当然还是有办法的 总结一些在custom写函数的方法 常规办法 常规办法就是使用结构体作为函数使用 以一个Lerp功能函数演示 让我们看看写法&#xff1a; struct VolBlendFunc //定义结…

分享 pdf 转 word 的免费平台

背景 找了很多 pdf 转 word 的平台都骗进去要会员&#xff0c;终于找到一个真正免费的&#xff0c;遂分享。 网址 PDF转Word转换器 - 100%免费市面上最优质的PDF转Word转换器 - 免费且易于使用。无附加水印 - 快速将PDF转成Word。https://smallpdf.com/cn/pdf-to-word

【LeetCode】每日一题 2024_11_14 统计好节点的数目(图/树的 DFS)

前言 每天和你一起刷 LeetCode 每日一题~ LeetCode 启动&#xff01; 题目&#xff1a;统计好节点的数目 代码与解题思路 先读题&#xff1a;题目要求我们找出好节点的数量&#xff0c;什么是好节点&#xff1f;“好节点的所有子节点的数量都是相同的”&#xff0c;拿示例一…