聊聊Flink:Flink的分区机制

一、前言

flink任务在执行过程中,一个流(stream)包含一个或多个分区(Stream partition)。TaskManager中的一个slot的subtask就是一个stream partition(流分区),一个Job的流(stream)分布在多个不同的Slot上执行。每一个算子可以包含一个或多个子任务(subtask),这些subtask执行在不同的分区中,本质是在不同的线程、不同的物理机或不同的容器中彼此互不依赖地执行。

1.1 Flink数据传输

  • 组件之间的通信消息传输,即Client、JobManager、TaskManager之间的信息传递,采用Akka框架(主要用作组件间的协同,如心跳检测、状态上报、指标统计、作业提交和部署等)。
  • 算子之间的流数据传输
    • 本地线程内的流数据传输(同一个SubTask中):同一个SubTask内的两个Operator(属于同一个OperatorChain)之间的数据传输是方法调用,即上游算子处理完数据后,直接调用下游算子的processElement方法。
    • 本地线程间的流数据传输(同一个TaskManager的不同SubTask中):即同一个TaskManager(JVM进程)中的不同Task(线程,本质上是SubTask)的算子之间的数据传输,通过本地内存进行数据传递,存在数据序列化和反序列过程。
    • 跨网络的流数据传输(不同TaskManager的SubTask中):采用Netty框架,通过Socket传递,也存在数据序列化和反序列过程。

flink中的重分区算子定义上下游subtask之间数据传递的方式,SubTask之间进行数据传递模式有两种,一种是one-to-oneforwarding)模式,另一种是redistributing的模式。

1.2 重分区算子数据传递的两种方式

  • One-to-one:数据不需要重新分布,上游SubTask生产的数据与下游SubTask受到的数据完全一致,数据不需要重分区,也就是数据不需要经过IO,比如下图中source->map的数据传递形式就是One-to-One方式。常见的map、fliter、flatMap等算子的SubTask的数据传递都是one-to-one的对应关系。类似于spark中的窄依赖。
  • Redistributing:数据需要通过shuffle过程重新分区,需要经过IO,比如上图中的map->keyBy。创建的keyBy、broadcast、rebalance、shuffle等算子的SubTask的数据传递都是Redistributing方式,但它们具体数据传递方式是不同的。类似于spark中的宽依赖。

在这里插入图片描述

flink中的重分区算子除了keyBy以外,还有broadcast、rebalance、shuffle、rescale、global、partitionCustom等多种算子,它们的分区方式各不相同。需要注意的是,这些算子中除了keyBy能将DataStream转化为KeyedStream外,其它重分区算子均不会改变Stream的类型。

二、分区策略

数据在算子之间流动需要依靠分区策略(分区器),Flink目前内置了以下几种分区策略和自定义分区策略。已实现的分区策略对应的API为:
在这里插入图片描述
自定义分区策略的API为CustomPartitionerWrapper。

各个API的继承关系如下图所示:
在这里插入图片描述
ChannelSelector是分区策略的顶层接口,其决定了记录应该写入哪个逻辑通道,通道可理解为下游算子的某个实例,或下游并行算子的某个子任务。该接口的定义源码如下:
在这里插入图片描述

抽象类StreamPartitioner实现了ChannelSelector接口,是一个用于流程序的特殊的ChannelSelector,其中定义了一些通用的分区策略方法。Flink中的所有分区策略(分区器)都继承了StreamPartitioner类,并且实现了各自独有的分区规则。

三、内置分区策略

3.1 BinaryHashPartitioner

该分区策略位于Blink的Table API的org.apache.flink.table.runtime.partitioner包中,是一种针对BinaryRowData的哈希分区器。BinaryRowData是RowData的实现,可以显著减少Java对象的序列化/反序列化。RowData用于表示结构化数据类型,运行时通过Table API或SQL管道传递的所有顶级记录都是RowData的实例。关于BinaryHashPartitioner,我们这里不做过多讲解。

3.2 BroadcastPartitioner

广播分区策略将上游数据记录输出到下游算子的每个并行实例中,即下游每个分区都会有上游的所有数据。使用DataStream的broadcast()方法即可设置该DataStream向下游发送数据时使用广播分区策略。

来一段代码演示下:

/*** 微信公众号:老周聊架构*/
public class PartitionerTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);DataStream<Integer> dataStream = env.fromElements(1, 2, 3, 4, 5, 6);//1.分区策略前的操作//输出dataStream每个元素及所属的子任务编号dataStream.map(new RichMapFunction<Integer, Object>() {@Overridepublic Object map(Integer value) throws Exception {System.out.println(String.format("元素值: %s, 分区策略前,子任务编号: %s", value,getRuntimeContext().getIndexOfThisSubtask()));return value;}});//2.设置分区策略//设置DataStream向下游发送数据时使用的策略DataStream<Integer> dataStreamAfter = dataStream.broadcast();//3.分区策略后的操作dataStreamAfter.map(new RichMapFunction<Integer, Object>() {@Overridepublic Object map(Integer value) throws Exception {System.out.println(String.format("元素值: %s, 分区策略后,子任务编号: %s", value,getRuntimeContext().getIndexOfThisSubtask()));return value;}}).print();env.execute("PartitionerTest Job");}
}

直接IDEA控制台输出:
在这里插入图片描述

从输出结果可以看出,数据共分为3个分区(编号为0、1、2)。执行分区策略前,每个元素所属的分区:
在这里插入图片描述
执行分区策略后,每个元素所属的分区如下:
在这里插入图片描述
对比表发现,广播分区策略将上游每个元素分别发送到了下游算子的所有分区,这种策略会把数据复制多份,向下游算子的每个分区发送一份。
在这里插入图片描述
我们把上面的任务提交到Flink,同样也可以看出前面分区前每个子任务两条数据,分区后每个子任务六条数据。
在这里插入图片描述
在这里插入图片描述
3.3 ForwardPartitioner

转发分区策略只将元素转发给本地运行的下游算子的实例,即将元素发送到与当前算子实例在同一个TaskManager的下游算子实例,而不需要进行网络传输。要求上下游算子并行度一样,这样上下游算子可以同属一个子任务。

这里把上面的代码调整下:

dataStream.forward()

IDEA控制台输出:
在这里插入图片描述
从输出结果可以看出,数据共分为3个分区(编号为0、1、2)。执行分区策略前,每个元素所属的分区:
在这里插入图片描述

执行分区策略后,每个元素所属的分区如下:
在这里插入图片描述

对比发现,转发分区策略将上游同一个分区的元素发送到了下游同一个分区中。使用数据流图表示如下图:

在这里插入图片描述
在上下游的算子没有指定分区策略的情况下,如果上下游的算子并行度一致,则默认使用ForwardPartitioner,否则使用RebalancePartitioner。在StreamGraph类的源码中可以看到该规则:
在这里插入图片描述
对于ForwardPartitioner,必须保证上下游算子并行度一致,否则会抛出异常。
在这里插入图片描述
3.4 GlobalPartitioner

全局分区策略将上游所有元素发送到下游子任务编号等于0的分区算子实例上(下游第一个实例)。

这里把上面的代码调整下:

dataStream.global()

IDEA控制台输出:
在这里插入图片描述
分区前:
在这里插入图片描述

分区后:
在这里插入图片描述

全局分区策略将上游所有分区中的所有元素发送到了下游编号为0的分区中:
在这里插入图片描述

3.5 .KeyGroupStreamPartitioner

Key分区策略根据元素Key的Hash值输出到下游算子指定的实例。keyBy()算子底层正是使用的该分区策略,底层最终会调用KeyGroupStreamPartitioner的selectChannel()方法,计算每个Key对应的通道索引(通道编号,可理解为分区编号),根据通道索引将Key发送到下游相应的分区中。selectChannel()方法源码如下:
在这里插入图片描述
在这里插入图片描述

总的来说,Flink底层计算通道索引(分区编号)的流程如下:

  • 计算Key的HashCode值。
  • 将Key的HashCode值进行特殊的Hash处理,即MathUtils.murmurHash(keyHash),返回一个非负哈希码。
  • 将非负哈希码除以最大并行度取余数,得到keyGroupId,即Key组索引。
  • 使用公式keyGroupId×parallelism/maxParallelism得到分区编号。parallelism为当前算子的并行度,即通道数量;maxParallelism为系统默认支持的最大并行度,即128。

3.6 RebalancePartitioner

平衡分区策略使用循环遍历下游分区的方式,将上游元素均匀分配给下游算子的每个实例。每个下游算子的实例都具有相等的负载。当数据流中的元素存在数据倾斜时,使用该策略对性能有很大的提升。

这里把上面的代码调整下:

dataStream.setParallelism(2);

dataStreamAfter.setParallelism(3);

dataStream.rebalance()

IDEA控制台输出:
在这里插入图片描述

分区前:
在这里插入图片描述
分区后:

在这里插入图片描述

平衡分区策略将上游所有元素均匀发送到了下游算子的所有分区:
在这里插入图片描述

3.7 RescalePartitioner

重新调节分区策略基于上下游算子的并行度,将元素以循环的方式输出到下游算子的每个实例。类似于平衡分区策略,但又与平衡分区策略不同。

上游算子将元素发送到下游哪一个算子实例,取决于上游和下游算子的并行度。例如,如果上游算子的并行度为2,而下游算子的并行度为4,那么一个上游算子实例将把元素均匀分配给两个下游算子实例,而另一个上游算子实例将把元素均匀分配给另外两个下游算子实例。相反,如果下游算子的并行度为2,而上游算子的并行度为4,那么两个上游算子实例将分配给一个下游算子实例,而另外两个上游算子实例将分配给另一个下游算子实例。

假设上游算子并行度为2,分区编号为A和B,下游算子并行度为4,分区编号为1、2、3、4,那么A将把数据循环发送给1和2,B则把数据循环发送给3和4。假设上游算子并行度为4,编号为A、B、C、D,下游算子并行度为2,编号为1、2,那么A和B把数据发送给1,C和D则把数据发送给2。

这里把上面的代码调整下:

dataStream.rescale()

同时将第一个map算子的并行度设置为2,第二个map算子的并行度设置为4。

IDEA控制台输出:
在这里插入图片描述

分区前:
在这里插入图片描述

分区后:
在这里插入图片描述
在这里插入图片描述

接下来改变map算子的并行度,将第一个map算子的并行度设置为4,第二个map算子的并行度设置为2。

在这里插入图片描述

如果想将元素均匀地输出到下游算子的每个实例,以实现负载均衡,同时又不希望使用平衡分区策略的全局负载均衡,则可以使用重新调节分区策略。该策略会尽可能避免数据在网络间传输,而能否避免还取决于TaskManager的Task Slot数量、上下游算子的并行度等。

3.8 ShufflePartitioner

随机分区策略将上游算子元素输出到下游算子的随机实例中。元素会被均匀分配到下游算子的每个实例。这种策略可以实现计算任务的负载均衡。

这里把上面的代码调整下:

dataStream.shuffle()

这里就不做过多演示了。我们下面来看下自定义分区策略。

四、自定义分区策略

自定义分区策略的API为CustomPartitionerWrapper。该策略允许开发者自定义规则将上游算子元素发送到下游指定的算子实例中。

4.1 新建自定义分区器

新建分区器类MyCustomPartitioner并实现接口Partitioner(Object表示分区Key的数据类型),实现其中未实现的方法partition(),在该方法中添加相应的分区逻辑。

/*** 自定义分区策略* 微信公众号:老周聊架构*/
public class MyCustomPartitioner implements Partitioner {@Overridepublic int partition(Object key, int numPartitions) {if (key.equals("chinese")) {return 0;} else if (key.equals("math")) {return 1;} else {return 2;}}
}

上述代码通过partition()方法取得分区编号,将Key值等于chinese的元素分配到编号为0的分区,将Key值等于math的元素分配到编号为1的分区,其余元素分配到编号为2的分区。

4.2 使用自定义分区器

调用DataStream的partitionCustom()方法传入自定义分区器类MyCustomPartitioner的实例,可以对DataStream按照自定义规则进行重新分区,代码如下:

/*** 自定义分区策略* 微信公众号:老周聊架构*/
public class CustomPartitionerTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);DataStream<String> dataStream = env.fromElements("chinese,98", "math,88", "english,96");//1.分区策略前的操作//输出dataStream每个元素及所属的子任务编号SingleOutputStreamOperator<Map<String, Integer>> dataStreamBefore =dataStream.map(new RichMapFunction<String, Map<String, Integer>>() {@Overridepublic Map<String, Integer> map(String value) throws Exception {System.out.println(String.format("元素值: %s, 分区策略前,子任务编号: %s", value,getRuntimeContext().getIndexOfThisSubtask()));Map<String, Integer> map = new HashMap<>();map.put(value.split(",")[0], Integer.parseInt(value.split(",")[1]));return map;}}).setParallelism(2);//2.设置分区策略//设置DataStream向下游发送数据时使用的策略DataStream<Map<String, Integer>> dataStreamAfter = dataStreamBefore.partitionCustom(new MyCustomPartitioner(), value -> value);//3.分区策略后的操作dataStreamAfter.map(new RichMapFunction<Map<String, Integer>, Map<String, Integer>>() {@Overridepublic Map<String, Integer> map(Map<String, Integer> value) throws Exception {System.out.println(String.format("元素值: %s, 分区策略后,子任务编号: %s", value,getRuntimeContext().getIndexOfThisSubtask()));return value;}}).setParallelism(3).print();env.execute("CustomPartitionerTest Job");}
}

分区前:
在这里插入图片描述

分区后:

在这里插入图片描述
自定义分区策略将上游所有元素按照自定义的规则发送到了下游的3个分区中。

把任务给到Flink上去跑,发现:
在这里插入图片描述

这是因为泛型擦除,下面的DataStream泛型需要指定类型,不能
在这里插入图片描述

小知识:

在编译之后程序会采取去泛型化的措施。也就是说Java中的泛型,只在编译阶段有效。在编译过程中,正确检验泛型结果后,在运行时会将泛型的相关信息擦除,编译器只会在对象进入JVM和离开JVM的边界处添加类型检查和转换的方法,泛型的信息不会进入到运行时阶段,这就是所谓的Java类型擦除

类型加好以后,再跑一下任务,会出现任务成功。

在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/60734.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

IO流实用案例:用字节流--输入流(Inpustream)、输出流(OutputStream)写一个拷贝图片的案例--超简单!

案例背景&#xff1a; 我的电脑桌面有一张白敬亭的照片&#xff0c;我们需要把这张照片拷贝到我的电脑D:\学习软件\copyBJT目录下&#xff0c;当前我们这个目录是没有东西的。 代码演示以及注释&#xff1a; ublic class StreamCopy {public static void main(String[] args)…

ArkTS学习笔记:ArkTS起步

ArkTS是HarmonyOS的主力应用开发语言&#xff0c;基于TypeScript扩展&#xff0c;强化了静态检查和分析&#xff0c;旨在提升程序稳定性和性能。它采用静态类型&#xff0c;禁止运行时改变对象布局&#xff0c;并对UI开发框架能力进行扩展&#xff0c;支持声明式UI描述和自定义…

卡尔曼滤波:从理论到应用的简介

卡尔曼滤波&#xff08;Kalman Filter&#xff09;是一种递归算法&#xff0c;用于对一系列噪声观测数据进行动态系统状态估计。它广泛应用于导航、控制系统、信号处理、金融预测等多个领域。本文将介绍卡尔曼滤波的基本原理、核心公式和应用案例。 1. 什么是卡尔曼滤波&#x…

Mac解压包安装MongoDB8并设置launchd自启动

记录一下在mac上安装mongodb8过程&#xff0c;本机是M3芯片所以下载m芯片的安装包&#xff0c;intel芯片的类似操作。 首先下载安装程序包。 # M芯片下载地址 https://fastdl.mongodb.org/osx/mongodb-macos-arm64-8.0.3.tgz # intel芯片下载地址 https://fastdl.mongodb.org…

【已解决】git push一直提示输入用户名及密码、fatal: Could not read from remote repository的问题

问题描述&#xff1a; 在实操中&#xff0c;git push代码到github上一直提示输入用户名及密码&#xff0c;并且跳出的输入框输入用户名和密码后&#xff0c;报错找不到远程仓库 实际解决中&#xff0c;发现我环境有两个问题解决&#xff1a; git push一直提示输入用户名及密码…

【Rust 编程语言工具】rustup-init.exe 安装与使用指南

rustup-init.exe 是用于安装和管理 Rust 编程语言工具链的 Windows 可执行文件。Rust 是一种系统级编程语言&#xff0c;旨在提供安全、并发和高性能的功能。rustup-init.exe 是官方提供的安装器&#xff0c;用于将 Rust 安装到 Windows 操作系统中&#xff0c;并配置相关环境。…

mysql 示例验证demo

确保引入了正确的头文件&#xff1a; 在 MySQL C Connector 中&#xff0c;ResultSet 和 Statement 类型的声明需要包含相关的头文件。你需要包括 resultset.h 和 statement.h。 更新代码&#xff1a; 你需要确保程序正确地包含了这些头文件&#xff0c;并且按照正确的顺序使…

【5.线性表-链式表示-王道课后算法题】

王道数据结构-第二章-链式表示算法题 1.在带头结点的单链表L中&#xff0c;删除所有值为x的结点&#xff0c;并释放其空间&#xff0c;假设值为x的结点不唯一&#xff0c;试编写算法以实现上述操作。2. 试编写在带头结点的单链表L中删除一个最小值结点的高效算法(假设该结点唯一…

manjaro蓝牙鼠标无法连接问题解决

问题描述&#xff1a; 我这边出现的问题是使用图形Bluetooth Manager连接&#xff08;华为无线鼠标&#xff09;蓝牙鼠标&#xff0c;会出现连接成功&#xff0c;然后鼠标触发任何动作都会导致连接断开&#xff0c;过一会儿之后会一直出现连接、断开、连接、断开死循环 我的解决…

SSL 证书申请以及配置流程

SSL 证书申请以及配置流程 手动申请免费 SSL 证书的简明指南 如果你希望手动为你的网站申请免费的 SSL 证书&#xff0c;Let’s Encrypt 提供了一个很棒的免费服务。而 Certbot 则是官方推荐的工具&#xff0c;可以帮助你完成证书的申请和配置。以下是如何一步步完成的详细说…

Mac 使用mac 原生工具将mp4视频文件提取其中的 mp3 音频文件

简介 Hello! 非常感谢您阅读海轰的文章,倘若文中有错误的地方,欢迎您指出~ ଘ(੭ˊᵕˋ)੭ 昵称:海轰 标签:程序猿|C++选手|学生 简介:因C语言结识编程,随后转入计算机专业,获得过国家奖学金,有幸在竞赛中拿过一些国奖、省奖…已保研 学习经验:扎实基础 + 多做笔…

项目中用户数据获取遇到bug

项目跟练的时候 Uncaught (in promise) TypeError: Cannot read properties of undefined (reading ‘code’) at Proxy.userInfo (user.ts:57:17) 因此我想要用result接受信息的时候会出错&#xff0c;报错显示为result.code没有该值 导致我无法获取到相应的数据 解决如下 给…

【视觉SLAM】1-概述

读书笔记 文章目录 1. 经典视觉SLAM框架2. 数学表述2.1 运动方程2.2 观测方程2.3 问题抽象 1. 经典视觉SLAM框架 传感器信息读取&#xff1a;相机图像、IMU等多源数据&#xff1b;前端视觉里程计&#xff08;Visual Odometry&#xff0c;VO&#xff09;&#xff1a;估计相机的相…

vue3 中,字段必须在 onShow 前定义?

在Vue 3中&#xff0c;如果你在组件的 setup() 函数中使用了生命周期钩子&#xff0c;比如 onShow&#xff0c;你可能遇到了一个错误&#xff0c;提示你在 onShow 前定义了某个字段。这个错误通常意味着你尝试在组件的 setup() 函数中访问了一个在 onShow 钩子函数被调用之前尚…

Isaac Sim+SKRL机器人并行强化学习

目录 Isaac Sim介绍 OmniIssacGymEnvs安装 SKRL安装与测试 基于UR5的机械臂Reach强化学习测评 机器人控制 OMNI GYM环境编写 SKRL运行文件 训练结果与速度对比 结果分析 运行体验与建议 Isaac Sim介绍 Isaac Sim是英伟达出的一款机器人仿真平台&#xff0c;适用于做机…

图像处理技术椒盐噪声

椒盐噪声&#xff0c;也称为脉冲噪声&#xff0c;是图像中经常见到的一种噪声。它是一种随机出现的白点或者黑点&#xff0c;可能是亮的区域有黑色像素或是在暗的区域有白色像素&#xff08;或是两者皆有&#xff09;。这些白点和黑点会在图像中随机分布&#xff0c;导致图像中…

前端数据可视化库介绍Echarts、D3.js、Plotly、Matplotlib

目录 一、Echarts 1. 简介 2. 优点 3. 缺点 4. 代码示例 二、D3.js 1. 简介 2. 优点 3.缺点 4. 代码示例 三、Plotly 1.简介 2.优点 3.缺点 四、Matplotlib 1.简介 2.优点 3.缺点 一、Echarts 1. 简介 Echarts 是一个由百度开源的数据可视化库&#xff0c;它…

Python学习------第八天

函数 函数的传入参数 掌握函数返回值的作用 掌握函数返回值的定义语法 函数的嵌套调用&#xff1a; 函数的局部变量和全局变量 局部变量的作用&#xff1a;在函数体内部&#xff0c;临时保存数据&#xff0c;即当函数调用完成后&#xff0c;则销毁局部变量。 money 5000000 n…

机器学习基础04

目录 1.朴素贝叶斯-分类 1.1贝叶斯分类理论 1.2条件概率 1.3全概率公式 1.4贝叶斯推断 1.5朴素贝叶斯推断 1.6拉普拉斯平滑系数 1.7API 2.决策树-分类 2.1决策树 2.2基于信息增益的决策树建立 2.2.1信息熵 2.2.2信息增益 2.2.3信息增益决策树建立步骤 2.3基于基…

The Internals of PostgreSQL 翻译版 持续更新...

为了方便自己快速学习&#xff0c;整理了翻译版本&#xff0c;目前翻译的还不完善&#xff0c;后续会边学习边完善。 文档用于自己快速参考&#xff0c;会持续修正&#xff0c;能力有限,无法确保正确!!! 《The Internals of PostgreSQL 》 不是 《 PostgreSQL14 Internals 》…