【Flink】Flink 的八种分区策略(源码解读)

Flink 的八种分区策略(源码解读)

  • 1.继承关系图
    • 1.1 接口:ChannelSelector
    • 1.2 抽象类:StreamPartitioner
    • 1.3 继承关系图
  • 2.分区策略
    • 2.1 GlobalPartitioner
    • 2.2 ShufflePartitioner
    • 2.3 BroadcastPartitioner
    • 2.4 RebalancePartitioner
    • 2.5 RescalePartitioner
    • 2.6 ForwardPartitioner
    • 2.7 KeyGroupStreamPartitioner
    • 2.8 CustomPartitionerWrapper

Flink 包含 8 种分区策略,这 8 种分区策略(分区器)分别如下面所示,本文将从源码的角度解读每个分区器的实现方式。

在这里插入图片描述

  • GlobalPartitioner
  • ShufflePartitioner
  • RebalancePartitioner
  • RescalePartitioner
  • BroadcastPartitioner
  • ForwardPartitioner
  • KeyGroupStreamPartitioner
  • CustomPartitionerWrapper

1.继承关系图

1.1 接口:ChannelSelector

public interface ChannelSelector<T extends IOReadableWritable> {/*** 初始化channels数量,channel可以理解为下游Operator的某个实例(并行算子的某个subtask).*/void setup(int numberOfChannels);/***根据当前的record以及Channel总数,*决定应将record发送到下游哪个Channel。*不同的分区策略会实现不同的该方法。*/int selectChannel(T record);/***是否以广播的形式发送到下游所有的算子实例*/boolean isBroadcast();
}

1.2 抽象类:StreamPartitioner

public abstract class StreamPartitioner<T> implementsChannelSelector<SerializationDelegate<StreamRecord<T>>>, Serializable {private static final long serialVersionUID = 1L;protected int numberOfChannels;@Overridepublic void setup(int numberOfChannels) {this.numberOfChannels = numberOfChannels;}@Overridepublic boolean isBroadcast() {return false;}public abstract StreamPartitioner<T> copy();
}

1.3 继承关系图

在这里插入图片描述

2.分区策略

2.1 GlobalPartitioner

该分区器会将所有的数据都发送到下游的某个算子实例(subtask id = 0)。

在这里插入图片描述

/*** 发送所有的数据到下游算子的第一个task(ID = 0)* @param <T>*/
@Internal
public class GlobalPartitioner<T> extends StreamPartitioner<T> {private static final long serialVersionUID = 1L;@Overridepublic int selectChannel(SerializationDelegate<StreamRecord<T>> record) {//只返回0,即只发送给下游算子的第一个taskreturn 0;}@Overridepublic StreamPartitioner<T> copy() {return this;}@Overridepublic String toString() {return "GLOBAL";}
}

2.2 ShufflePartitioner

随机选择一个下游算子实例进行发送。

在这里插入图片描述

/*** 随机的选择一个channel进行发送* @param <T>*/
@Internal
public class ShufflePartitioner<T> extends StreamPartitioner<T> {private static final long serialVersionUID = 1L;private Random random = new Random();@Overridepublic int selectChannel(SerializationDelegate<StreamRecord<T>> record) {//产生[0,numberOfChannels)伪随机数,随机发送到下游的某个taskreturn random.nextInt(numberOfChannels);}@Overridepublic StreamPartitioner<T> copy() {return new ShufflePartitioner<T>();}@Overridepublic String toString() {return "SHUFFLE";}
}

2.3 BroadcastPartitioner

发送到下游所有的算子实例。

在这里插入图片描述

/*** 发送到所有的channel*/
@Internal
public class BroadcastPartitioner<T> extends StreamPartitioner<T> {private static final long serialVersionUID = 1L;/*** Broadcast模式是直接发送到下游的所有task,所以不需要通过下面的方法选择发送的通道*/@Overridepublic int selectChannel(SerializationDelegate<StreamRecord<T>> record) {throw new UnsupportedOperationException("Broadcast partitioner does not support select channels.");}@Overridepublic boolean isBroadcast() {return true;}@Overridepublic StreamPartitioner<T> copy() {return this;}@Overridepublic String toString() {return "BROADCAST";}
}

2.4 RebalancePartitioner

通过循环的方式依次发送到下游的 task

在这里插入图片描述

/***通过循环的方式依次发送到下游的task* @param <T>*/
@Internal
public class RebalancePartitioner<T> extends StreamPartitioner<T> {private static final long serialVersionUID = 1L;private int nextChannelToSendTo;@Overridepublic void setup(int numberOfChannels) {super.setup(numberOfChannels);//初始化channel的id,返回[0,numberOfChannels)的伪随机数nextChannelToSendTo = ThreadLocalRandom.current().nextInt(numberOfChannels);}@Overridepublic int selectChannel(SerializationDelegate<StreamRecord<T>> record) {//循环依次发送到下游的task,比如:nextChannelToSendTo初始值为0,numberOfChannels(下游算子的实例个数,并行度)值为2//则第一次发送到ID = 1的task,第二次发送到ID = 0的task,第三次发送到ID = 1的task上...依次类推nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels;return nextChannelToSendTo;}public StreamPartitioner<T> copy() {return this;}@Overridepublic String toString() {return "REBALANCE";}
}

2.5 RescalePartitioner

基于上下游 Operator 的并行度,将记录以循环的方式输出到下游 Operator 的每个实例。

举例:

  • 上游并行度是 2,下游是 4,则上游一个并行度以循环的方式将记录输出到下游的两个并行度上;上游另一个并行度以循环的方式将记录输出到下游另两个并行度上。
  • 若上游并行度是 4,下游并行度是 2,则上游两个并行度将记录输出到下游一个并行度上;上游另两个并行度将记录输出到下游另一个并行度上。

在这里插入图片描述

@Internal
public class RescalePartitioner<T> extends StreamPartitioner<T> {private static final long serialVersionUID = 1L;private int nextChannelToSendTo = -1;@Overridepublic int selectChannel(SerializationDelegate<StreamRecord<T>> record) {if (++nextChannelToSendTo >= numberOfChannels) {nextChannelToSendTo = 0;}return nextChannelToSendTo;}public StreamPartitioner<T> copy() {return this;}@Overridepublic String toString() {return "RESCALE";}
}

Flink 中的执行图可以分成四层:StreamGraphJobGraphExecutionGraph物理执行图

  • StreamGraph:是根据用户通过 Stream API 编写的代码生成的最初的图。用来表示程序的拓扑结构。
  • JobGraph:StreamGraph 经过优化后生成了 JobGraph,提交给 JobManager 的数据结构。主要的优化为,将多个符合条件的节点 chain 在一起作为一个节点,这样可以减少数据在节点之间流动所需要的序列化 / 反序列化 / 传输消耗。
  • ExecutionGraph:JobManager 根据 JobGraph 生成 ExecutionGraph。ExecutionGraph 是 JobGraph 的并行化版本,是调度层最核心的数据结构。
  • 物理执行图:JobManager 根据 ExecutionGraph 对 Job 进行调度后,在各个 TaskManager 上部署 Task 后形成的 “”,并不是一个具体的数据结构。

StreamingJobGraphGenerator 就是 StreamGraph 转换为 JobGraph。在这个类中,把 ForwardPartitionerRescalePartitioner 列为 POINTWISE 分配模式,其他的为 ALL_TO_ALL 分配模式。代码如下:

if (partitioner instanceof ForwardPartitioner || partitioner instanceof RescalePartitioner) {jobEdge = downStreamVertex.connectNewDataSetAsInput(headVertex,// 上游算子(生产端)的实例(subtask)连接下游算子(消费端)的一个或者多个实例(subtask)DistributionPattern.POINTWISE,resultPartitionType);} else {jobEdge = downStreamVertex.connectNewDataSetAsInput(headVertex,// 上游算子(生产端)的实例(subtask)连接下游算子(消费端)的所有实例(subtask)DistributionPattern.ALL_TO_ALL,resultPartitionType);}

2.6 ForwardPartitioner

发送到下游对应的第一个 task,保证上下游算子并行度一致,即上游算子与下游算子是 1 : 1 1:1 1:1 的关系。

在这里插入图片描述

/*** 发送到下游对应的第一个task* @param <T>*/
@Internal
public class ForwardPartitioner<T> extends StreamPartitioner<T> {private static final long serialVersionUID = 1L;@Overridepublic int selectChannel(SerializationDelegate<StreamRecord<T>> record) {return 0;}public StreamPartitioner<T> copy() {return this;}@Overridepublic String toString() {return "FORWARD";}
}

在上下游的算子没有指定分区器的情况下,如果上下游的算子并行度一致,则使用 ForwardPartitioner,否则使用 RebalancePartitioner,对于 ForwardPartitioner,必须保证上下游算子并行度一致,否则会抛出异常。

//在上下游的算子没有指定分区器的情况下,如果上下游的算子并行度一致,则使用ForwardPartitioner,否则使用RebalancePartitioner
if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {partitioner = new ForwardPartitioner<Object>();
} else if (partitioner == null) {partitioner = new RebalancePartitioner<Object>();
}if (partitioner instanceof ForwardPartitioner) {//如果上下游的并行度不一致,会抛出异常if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {throw new UnsupportedOperationException("Forward partitioning does not allow " +"change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() +", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() +" You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");}
}

2.7 KeyGroupStreamPartitioner

根据 key 的分组索引选择发送到相对应的下游 subtask

在这里插入图片描述

  • org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner
/*** 根据key的分组索引选择发送到相对应的下游subtask* @param <T>* @param <K>*/
@Internal
public class KeyGroupStreamPartitioner<T, K> extends StreamPartitioner<T> implements ConfigurableStreamPartitioner {
...@Overridepublic int selectChannel(SerializationDelegate<StreamRecord<T>> record) {K key;try {key = keySelector.getKey(record.getInstance().getValue());} catch (Exception e) {throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);}//调用KeyGroupRangeAssignment类的assignKeyToParallelOperator方法,代码如下所示return KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfChannels);}
...
}
  • org.apache.flink.runtime.state.KeyGroupRangeAssignment
public final class KeyGroupRangeAssignment {
.../*** 根据key分配一个并行算子实例的索引,该索引即为该key要发送的下游算子实例的路由信息,* 即该key发送到哪一个task*/public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {Preconditions.checkNotNull(key, "Assigned key must not be null!");return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));}/***根据key分配一个分组id(keyGroupId)*/public static int assignToKeyGroup(Object key, int maxParallelism) {Preconditions.checkNotNull(key, "Assigned key must not be null!");//获取key的hashcodereturn computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);}/*** 根据key分配一个分组id(keyGroupId),*/public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {//与maxParallelism取余,获取keyGroupIdreturn MathUtils.murmurHash(keyHash) % maxParallelism;}//计算分区index,即该key group应该发送到下游的哪一个算子实例public static int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) {return keyGroupId * parallelism / maxParallelism;}
...

2.8 CustomPartitionerWrapper

通过 Partitioner 实例的 Partition 方法(自定义的)将记录输出到下游。

public class CustomPartitionerWrapper<K, T> extends StreamPartitioner<T> {private static final long serialVersionUID = 1L;Partitioner<K> partitioner;KeySelector<T, K> keySelector;public CustomPartitionerWrapper(Partitioner<K> partitioner, KeySelector<T, K> keySelector) {this.partitioner = partitioner;this.keySelector = keySelector;}@Overridepublic int selectChannel(SerializationDelegate<StreamRecord<T>> record) {K key;try {key = keySelector.getKey(record.getInstance().getValue());} catch (Exception e) {throw new RuntimeException("Could not extract key from " + record.getInstance(), e);}//实现Partitioner接口,重写partition方法return partitioner.partition(key, numberOfChannels);}@Overridepublic StreamPartitioner<T> copy() {return this;}@Overridepublic String toString() {return "CUSTOM";}
}

比如:

public class CustomPartitioner implements Partitioner<String> {// key: 根据key的值来分区// numPartitions: 下游算子并行度@Overridepublic int partition(String key, int numPartitions) {return key.length() % numPartitions;//在此处定义分区策略}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/734028.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

树莓派4B Ubuntu20.04 Python3.9安装ROS踩坑记录

问题描述 在使用sudo apt-get update命令更新时发现无法引入apt-pkg,使用python3 -c "import apt_pkg"发现无法引入&#xff0c;应该是因为&#xff1a;20.04的系统默认python是3.8&#xff0c;但是我换成了3.9所以没有编译文件&#xff0c;于是使用sudo update-alte…

three.js如何实现简易3D机房?(四)点击事件+呼吸灯效果

接上一篇&#xff1a; three.js如何实现简易3D机房&#xff1f;&#xff08;三&#xff09;显示信息弹框/标签&#xff1a;http://t.csdnimg.cn/5W2wA 目录 八、点击事件 1.实现效果 2.获取相交点 3.呼吸灯效果 4.添加点击事件 5.问题解决 八、点击事件 1.实现效果 2.…

蓝桥杯大赛软件python赛道真题:蛇形填数

真题链接&#xff1a;https://www.lanqiao.cn/problems/594/learning/ 题目描述&#xff1a; 本题为填空题&#xff0c;只需要算出结果后&#xff0c;在代码中使用输出语句将所填结果输出即可。 如下图所示&#xff0c;小明用从1开始的正整数“蛇形”填充无限大的矩阵。 1 2 6 …

【打工日常】使用docker部署个人实时在线文档协助编辑器

一、Etherpad介绍 Etherpad是一个高度可定制的开源在线编辑器&#xff0c;提供真正实时的协作编辑。放在自己的服务器里面&#xff0c;可以更大程度的保护自己工作的隐私&#xff0c;并且Etherpad允许您实时协作编辑文档&#xff0c;就像在浏览器中运行的实时多人编辑器一样这样…

SpringBoot轻松搞定接口防抖(防重复提交)

大家好&#xff0c;我是月夜枫&#xff0c;作为一名合格的码农&#xff0c;在开发后端Java业务系统&#xff0c;包括各种管理后台和小程序等。在这些项目中&#xff0c;我设计过单/多租户体系系统&#xff0c;对接过许多开放平台&#xff0c;也搞过消息中心这类较为复杂的应用&…

基于Java的高校实验室管理系统(Vue.js+SpringBoot)

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、研究内容2.1 实验室类型模块2.2 实验室模块2.3 实验管理模块2.4 实验设备模块2.5 实验订单模块 三、系统设计3.1 用例设计3.2 数据库设计 四、系统展示五、样例代码5.1 查询实验室设备5.2 实验放号5.3 实验预定 六、免责说明 一、摘…

【STM32】HAL库 CubeMX 教程 --- 通用定时器 TIM2 定时

实验目标&#xff1a; 通过CUbeMXHAL&#xff0c;配置TIM2&#xff0c;1s中断一次&#xff0c;闪烁LED。 一、常用型号的TIM时钟频率 1. STM32F103系列&#xff1a; 所有 TIM 的时钟频率都是72MHz&#xff1b;F103C8不带基本定时器&#xff0c;F103RC及以上才带基本定时器。…

[BUG] docker运行Java程序时配置代理-Dhttp.proxyHost后启动报错

[BUG] docker运行Java程序时配置代理-Dhttp.proxyHost后启动报错 bug现象描述 版本&#xff1a;2.0.4&#xff08;客户端和服务端都是&#xff09; 环境&#xff1a;私有云环境&#xff0c;只有少量跳板机器可以访问公网&#xff0c;其他机器均通过配置代理方式访问公网 bug现…

9、Linux-安装JDK、Tomcat和MySql

目录 一、安装JDK 1、传输JDK文件&#xff08;.tar.gz&#xff09; 2、解压 3、备份环境变量 4、配置环境变量 5、重新加载环境变量 6、验证&#xff08;java -version&#xff09; 二、安装Tomcat 1、传输文件&#xff0c;解压到/usr/local 2、进入Tomcat的bin目录 …

[PyQt5]PyQt5连接MYSQL时显示Driver not loaded解决方案

在第一次用PyQt5的 QSqlDatabase.addDatabase 连接mysql的时候&#xff0c;可能会出现Driver not loaded的情况&#xff0c;如下&#xff1a; from PyQt5.QtSql import QSqlQuery, QSqlDatabase from PyQt5.QtWidgets import QApplication import sysapp QApplication(sys.ar…

GO语言接入支付宝

GO语言接入支付宝 今天就go语言接入支付宝写一个教程 使用如下库&#xff0c;各种接口较为齐全 "github.com/smartwalle/alipay/v3"先简单介绍下加密&#xff1a; 试想&#xff0c;当用户向支付宝付款时&#xff0c;若不进行任何加密&#xff0c;那么黑客就可以任…

【杂记】IDEA和Eclipse如何查看GC日志

1.Eclipse查看GC日志 1.1 右击代码编辑区 -> Run As -> Run Configurations 1.2 点击Arguments栏 -> VM arguments:区域填写XX参数 -> Run 1.3 控制台输出GC详细日志 2.IDEA查看GC日志 2.1 鼠标右击代码编辑器空白区域&#xff0c;选择Edit 项目名.main()... 2.…

模型驱动软件开发

MDE 模型驱动工程&#xff08;MDE, Model-Driven Engineering&#xff09;是软件工程的一个分支&#xff0c;它将模型与建模拓展到软件开发的所有方面&#xff0c;形成一个多维建模空间&#xff0c;从而将工程活动建立在这些模型的映射和转换之上。[1] MDE的基本原则是将模型视…

实验二(一):IPV4编址及IPV4路由基础实验

一实验介绍 1.关于本实验 IPv4( Internet Protocol Version 4)是 TCP/IP 协议族中最为核心的协议之一。 它工作在 TCP/IP参考模型的网际互联层&#xff0c;该层与 OSI参考模型的网络层相对应。 网络层提供了无连接数据传输服务&#xff0c;即网络在发送分组时不需要先建立连…

学会与自己和解

最近半年来&#xff0c;在学习智能驾驶方面的技术&#xff0c;但有些文档和资料不方便分享&#xff0c;有一段时间没有写 写文档啦&#xff01;那就写一些技术之外的东西吧&#xff0c;最近也一直在学心理建设&#xff0c;学会与自己和解 行动 唯有自己先行动起来&#xff0c;…

使用Migration升级数据库

使用Migration升级数据库 package com.tiger.room2;import android.content.Context;import androidx.annotation.NonNull; import androidx.room.Database; import androidx.room.Room; import androidx.room.RoomDatabase; import androidx.room.migration.Migration; impo…

Jmeter---非GUI命令行的执行生成报告、使用ant插件执行接口测试脚本生成报告

非GUI命令行的执行 1. 在jmx后缀的文件目录下打开命令行 2. 运行&#xff1a; jmeter -n -t filename.jmx&#xff08;-n : 非GUI的方式 -t: 指定需要执行的脚本&#xff09; 生成jtl报告 运行&#xff1a; jmeter -n -t filename.jmx -l result_filename.jtl 生成html报…

【Java探索之旅】数据类型与变量,字面常量,整型变量

&#x1f3a5; 屿小夏 &#xff1a; 个人主页 &#x1f525;个人专栏 &#xff1a; Java入门到精通 &#x1f304; 莫道桑榆晚&#xff0c;为霞尚满天&#xff01; 文章目录 &#x1f4d1;前言一、字面常量二、数据类型三、变量3.1 变量概念3.2 语法格式 四、整型变量4.1 整型变…

覆盖element-ui的el-menu样式记录:背景图片、菜单图标、菜单高亮与鼠标悬浮高亮、调整子菜单等样式

页面中修改el-menu 设置background-color"transparent"&#xff0c;menu菜单下的背景图片则能正常显示了 <el-menuclass"el-menu-demo"mode"horizontal"background-color"transparent"><el-menu-item index"1">…

【Python+Selenium学习系列5】Selenium特殊元素定位之-鼠标悬停操作

前言 Selenium模拟用户在浏览器中的操作&#xff0c;比如点击按钮。在某些场景下&#xff0c;我们需要模拟鼠标悬停的操作&#xff0c;来触发一些隐藏的元素。本文将介绍Python Selenium实现鼠标悬停操作。 鼠标悬停&#xff0c;即当光标与其名称表示的元素重叠时触发的事件&…