深入理解 Flink(六)Flink Job 提交和 Flink Graph 详解

Flink Program 编程套路回顾

1、获取执行环境对象
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
2、通过执行环境对象,注册数据源 Source,得到数据抽象
DataStream ds = env.socketTextStream(...)
3、调用数据抽象的各种Transformation执行逻辑计算
DataStream resultDS = ds.flatMap(...).keyBy(...).sum(...);
4、将各种Transformation执行完毕之后得到的计算结果数据抽象注册 Sink
resultDS.addSink(...)
5、提交Job执行
env.execute(...)

Flink Job 提交脚本解析

# Submission to an already running Flink on YARN cluster
./bin/flink run --target yarn-session
# Submission spinning up a Flink on YARN cluster in Per-Job Mode
./bin/flink run --target yarn-per-job
# Submission spinning up Flink on YARN cluster in Application Mode
./bin/flink run-application --target yarn-application

具体可以参考官网:
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/cli.html
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/cli.html#advanced-cli

CliFrontend 提交分析

当用户把 Flink 应用程序打成 jar 使用 flink run … 的 shell 命令提交的时候,底层是通过 CliFrontend 来处理。底层的逻辑,就是通过反射来调用用户程序的 main() 方法执行。
需要注意的是,Application 模式下,会通过 YarnClusterDescriptor.deployInternal 方法在 yarn 中部署一个 application 集群,返回 YarnRestClusterClient 对象。yarn 中会启动一个 EmbeddedJobClient,执行 submitJob 方法提交 jobGraph。

ExecutionEnvironment 源码解析

StreamExecutionEnvironment 是 Flink 应用程序的执行入口,提供了一些重要的操作机制:

1、提供了 readTextFile(), socketTextStream(), createInput(), addSource() 等方法去对接数据源。
2、提供了 setParallelism() 设置应用程序的并行度。
3、StreamExecutionEnvironment 管理了 ExecutionConfig 对象,该对象负责 Job 执行的一些行为配置管理。还管理了 Configuration 管理一些其他的配置。这个所谓的其他配置,还包含了 Checkpoint 的配置,这个 chekcpoint 的配置参数,会单独解析出来,存储在 CheckpontConfig 中
4、StreamExecutionEnvironment 管理了一个 List<Transformation<?>> transformations 成员变量,该成员变量,主要用于保存 Job 的各种算子转化得到的 Transformation,把这些 Transformation 按照逻辑拼接起来,就能得到 StreamGragh, 注意转换顺序:
UserFunction ==> StreamOperator ==> Transformation ==> StreamNode
5、StreamExecutionEnvironment 提供了 execute() 方法主要用于提交 Job 执行。该方法接收的参数就是:StreamGraph

Flink on YARN Per-job 模式提交流程分析

入口类:ApplicatoinMaster: YarnJobClusterEntryPoint
在这里插入图片描述
在这里插入图片描述

Job提交流程源码分析

getStreamGraph(jobName) 生成 StreamGraph 解析

// 入口
StreamGraph streamGraph = getStreamGraph(jobName, true){// 通过 StreamGraphGenerator 来生成 StreamGraphStreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate(){streamGraph = new StreamGraph(....)for(Transformation<?> transformation : transformations) {transform(transformation);}}
}transform(transformation){// 先递归处理该 Transformation 的输入Collection<Integer> inputIds = transform(transform.getInput());// 将 Transformation 变成 Operator 设置到 StreamGraph 中,其实就是添加 StreamNodestreamGraph.addOperator(....);// 设置该 StreamNode 的并行度streamGraph.setParallelism(transform.getId(), parallelism);// 设置该 StreamNode 的入边 SreamEdgefor(Integer inputId : inputIds) {streamGraph.addEdge(inputId, transform.getId(), 0);// 内部实现// 构建 StreamNode 之间的 边(StreamEdge) 对象StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, ...){// TODO_MA 注释: 给 上游 StreamNode 设置 出边getStreamNode(edge.getSourceId()).addOutEdge(edge);}// TODO_MA 注释: 给 下游 StreamNode 设置 入边getStreamNode(edge.getTargetId()).addInEdge(edge);}
}

execute(StreamGraph) 解析

// 入口
JobClient jobClient = executeAsync(streamGraph){// 执行一个 SreamGraphexecutorFactory.getExecutor(configuration).execute(streamGraph, configuration){// 第一件事:由 StreamGraph 生成 JobGraghJobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);// 第二件事:通过 RestClusterClient 提交 JobGraph 到Flink集群clusterClient.submitJob(jobGraph)}
}// 通过 RestClusterClient 来提交 JobGraph
RestClusterClient.submitJob(JobGraph jobGraph){// 继续提交RestClusterClient.sendRetriableRequest(){// 通过 RestClient 提交RestClient.sendRequest(webMonitorHost, webMonitorPort, ...){// 继续提交RestClient.submitRequest(targetAddress,targetPort,httpRequest,responseType)}}
}

最终通过 channel 把请求数据,发给 WebMonitorEndpoint 中的 JobSubmitHandler 来执行处理。

小结

01、用户根据 Flink 应用程序的编写套路,写好应用程序,打成 jar 包,通过 flink run 的命令来执行提交
02、这个命令的底层,其实是执行: CliFrontend 组件来执行提交
03、这个 CliFrontend 的内部,会通过反射的技术,来转交执行到用户自定义应用程序的 main()
04、先获取 StreamExecutionEnvironment 执行环境对象实例
05、执行算子:其实就是从 算子 ---> function ---> StreamOperator ---> Transformation
06、执行 StreamExecutionEnvironment 的 executor 方法来执行提交
07、首先遍历 StreamExecutionEnvironment 的 transformations 这个 list 来生成 StreamGraph,之后会继续被构建成 JobGraph
08、具体的内部的提交是通过 RestClusterClient 来执行提交
09、在通过 RestClusterClient 提交之前,其实还会做一件事:把 SreamGraph 变成 JobGraph,也还会先把 JobGraph 持久化成为一个磁盘文件
10、在这个 RestClusterClient 的内部,其实是通过 RestClient 来提交
11、RestClient 其实在初始化的时候,就初始化了一个 Netty 客户端
12、通过封装一个 HttpRequest 对象,包含了需要提交的 JobGraph 文件和 Jar 包等,通过 Netty 客户端链接服务端,发送请求对象到服务端
13、Flink 主节点 JobManager 负责处理这个请求的是 WebMonitorEndpoint 中的 Netty 服务端,接收到 rest 请求会调用 Router 执行 route 处理,找到对应的 Handler 执行处理。提交 Job 对应的 Handler 是 JobSubmitHandler

在这里插入图片描述

WebMonitorEndpoint 处理 RestClient 的 JobSubmit 请求

最终处理这个请求: Flink 主节点 JobManager 负责处理这个请求的是 WebMonitorEndpoint 中的 Netty 服务端,接收到 rest 请求会调用 Router 执行 route 处理,找到对应的 Handler 执行处理。提交 Job 对应的 Handler 是 JobSubmitHandler。

// JobManager 服务端处理入口
JobSubmitHandler.handleRequest(){// 恢复得到 JobGraphJobGraph jobGraph = loadJobGraph(requestBody, nameToFile);// 通过 Dispatcher 提交 JobGraphDispatcher.submitJob(jobGraph, timeout);
}

JobMaster 启动源码剖析

关键方法: jobMasterServiceFactory.createJobMasterService
核心的工作是:

  • 创建 JobMaster 这个 RpcEndpoint 组件,负责通信。内部会创建一个 DefaultScheduler 调度组件,在初始化该调度组件的时候,会调用 ExecutionGraphFactory 的相关方法,来把 JobGraph 转换成 ExectionGraph
  • JobMaster 启动,跳转到 onStart() 方法。内部的主要工作,就是以下这三:
    • 启动心跳机制,维持和 ResourceManager,和 TaskExecutor 之间的心跳
    • 启动 SlotPoolImpl 这个 slot 管理组件。
    • 从 ZK 获取 ResourceManager 的地址,从而进行 JobMaster 向 ResourceManager 的注册
  • 启动的这个 JobMaster 负责这个 Job 中的所有的 Task 的 slot 的申请和 任务的派发,状态的跟踪,容错,还有 checkpoint等各种操作

JobMaster 和 ResourceManager/TaskExecutor 的心跳

在这里插入图片描述

JobMaster 向 ResourceManager 注册

// 启动 JobMaster
jobMaster.start(){JobMaster.onStart(){startJobExecution(){// 第一件大事:启动 JobMaster 必要的一些工作startJobMasterServices(){// 第一件事: 启动心跳机制this.taskManagerHeartbeatManager = createTaskManagerHeartbeatManager(heartbeatServices);this.resourceManagerHeartbeatManager = createResourceManagerHeartbeatManager(heartbeatServices);// 第二件事: 启动 SlotPoolImplslotPoolService.start(getFencingToken(), getAddress(), getMainThreadExecutor());// 第三件事: 从 ZK 获取 ResourceManager 的地址// 这儿就是 JobMaster 向 ResourceManager 执行注册的入口resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());}// 第二件大事:开始调度执行startScheduling();}}
}
ResourceManager.registerJobManager(){// ResourceManager 关于 JobMaster 的注册内部实现,重要的事情做了四件registerJobMasterInternal(jobMasterGateway, jobId, ....){// TODO_MA 马中华 注释: 生成 JobMaster 注册对象JobManagerRegistration jobManagerRegistration = new JobManagerRegistration(jobId, jobManagerResourceId, ....);// TODO_MA 马中华 注释: 完成注册jobManagerRegistrations.put(jobId, jobManagerRegistration);jmResourceIdRegistrations.put(jobManagerResourceId, jobManagerRegistration);// TODO_MA 马中华 注释: 加入心跳管理jobManagerHeartbeatManager.monitorTarget(jobManagerResourceId, new HeartbeatTarget<Void>() {});// TODO_MA 马中华 注释: 返回 JobMaster 注册成功return new JobMasterRegistrationSuccess(getFencingToken(), resourceId);}
}

Flink Graph 演变

在这里插入图片描述

StreamGraph 构建和提交源码解析

在这里插入图片描述
关于 StreamNode 的定义:

public class StreamNode {private final int id;private int parallelism;private List<StreamEdge> inEdges = new ArrayList<StreamEdge>();private List<StreamEdge> outEdges = new ArrayList<StreamEdge>();private final Class<? extends AbstractInvokable> jobVertexClass;
}

关于 StreamEdge 的定义:

public class StreamEdge implements Serializable {private final String edgeId;private final int sourceId;private final int targetId;
}

JobGraph 构建和提交源码解析

JobGraph: StreamGraph 经过优化后生成了 JobGraph,提交给 Flink 集群的数据结构。它包含的主要抽象概念有:

1、JobVertex:经过优化后符合条件的多个 StreamNode 可能会 chain 在一起生成一个 JobVertex,即一个JobVertex 包含一个或多个 operator,JobVertex 的输入是 JobEdge,输出是 IntermediateDataSet。
2、IntermediateDataSet:表示 JobVertex 的输出,即经过 operator 处理产生的数据集。producer 是 JobVertex,consumer 是 JobEdge。
3、JobEdge:代表了 job graph 中的一条数据传输通道。source 是 IntermediateDataSet,target 是 JobVertex。即数据通过 JobEdge 由 IntermediateDataSet 传递给目标 JobVertex。

在 StreamGraph 构建 JobGragh 的过程中,最重要的事情就是 operator 的 chain 优化,那么到底什么样的情况的下 Operator 能chain 在一起呢 ?答案是要满足以下 9 个条件:

// 1、下游节点的入度为1 (也就是说下游节点没有来自其他节点的输入)
downStreamVertex.getInEdges().size() == 1;
// 2、上下游节点都在同一个 slot group 中
upStreamVertex.isSameSlotSharingGroup(downStreamVertex);
// 3、前后算子不为空
!(downStreamOperator == null || upStreamOperator == null);
// 4、上游节点的 chain 策略为 ALWAYS 或 HEAD(只能与下游链接,不能与上游链接,Source 默认是 HEAD)
!upStreamOperator.getChainingStrategy() == ChainingStrategy.NEVER;
// 5、下游节点的 chain 策略为 ALWAYS(可以与上下游链接,map、flatmap、filter 等默认是 ALWAYS)
!downStreamOperator.getChainingStrategy() != ChainingStrategy.ALWAYS;
// 6、两个节点间物理分区逻辑是 ForwardPartitioner
(edge.getPartitioner() instanceof ForwardPartitioner);
// 7、两个算子间的 shuffle 方式不等于批处理模式
edge.getShuffleMode() != ShuffleMode.BATCH;
// 8、上下游的并行度一致
upStreamVertex.getParallelism() == downStreamVertex.getParallelism();
// 9、用户没有禁用 chain
streamGraph.isChainingEnabled();

在这里插入图片描述
构建逻辑的重点代码:

1、在 connect 之间,调用的 createChain() 就是先执行优化,然后再生成 JobVertex
2、然后 调用 connect 之后,是为了组织关系1、先生成 IntermediateDataSet 和 JobEdge2、把 IntermediateDataSet 和 当前 JobVertex 设置为 JobEdge 的 source 和 target3、把 JobEdge 设置为这个 IntermediateDataSet 的消费者

关于 JobVertex 的定义:

public class JobVertex implements java.io.Serializable {private final JobVertexID id;private final ArrayList<IntermediateDataSet> results = new ArrayList<>();private final ArrayList<JobEdge> inputs = new ArrayList<>();private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;private String invokableClassName;
}

关于 IntermediateDataSet 的定义:

public class IntermediateDataSet implements java.io.Serializable {private final IntermediateDataSetID id;private final JobVertex producer;private final List<JobEdge> consumers = new ArrayList<JobEdge>();
}

关于 JobEdge 的定义:

public class JobEdge implements java.io.Serializable {private final JobVertex target;private IntermediateDataSet source;private IntermediateDataSetID sourceId;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/614795.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

python统计分析——小提琴图(plt.violinplot)

参考资料&#xff1a;用python动手学统计学&#xff0c;帮助文档 使用matplotlib.pyplot.violinplot()函数绘制小提琴图 小提琴图是将数值型数据的核密度图与箱线图融合在一起&#xff0c;具体来说是用核密度估计的结果替换了箱子&#xff0c;而形成的一个形似小提琴的图形。 …

openssl3.2 - 在VS2019下源码调试openssl.exe

文章目录 openssl3.2 - 在VS2019下源码调试openssl.exe概述笔记先看一个用.bat调用openssl干活的实例VS2019调试参数设置设置 - 命令参数设置 - 工作目录设置 - 环境变量将命令行中需要的文件拷贝到exe目录单步调试备注END openssl3.2 - 在VS2019下源码调试openssl.exe 概述 …

Java面试题之JVM

Java面试题之JVM 1. JVM的组成部分及其作用&#xff1f;2. JVM的堆和栈的区别&#xff1f;3. 简述一下垃圾回收机制&#xff1f;(垃圾回收的原理&#xff1f;)4. 垃圾回收器都有什么&#xff1f;该怎么选择&#xff1f;5. 如何判断垃圾可以回收了&#xff1f;6. 垃圾回收算法有…

vue3的福音框架arco.design

前言&#xff1a; 在vue2于2023年底正式宣布不在维护&#xff0c;vue3使用越来越频繁的时刻&#xff0c;我们实现项目的辅助框架也越来越多。element, iview, antd 等经典框架继续风靡一时&#xff0c;不过也有很多好的框架&#xff0c;功能也强大&#xff0c;比如我们今天说的…

Python办公自动化 – 自动化文本翻译和Oracle数据库操作

Python办公自动化 – 自动化文本翻译和Oracle数据库操作 以下是往期的文章目录&#xff0c;需要可以查看哦。 Python办公自动化 – Excel和Word的操作运用 Python办公自动化 – Python发送电子邮件和Outlook的集成 Python办公自动化 – 对PDF文档和PPT文档的处理 Python办公自…

Unity中URP下实现能量罩(扭曲流光花纹)

文章目录 前言一、能量罩花纹1、在属性面板接收能量罩花纹纹理2、申明 纹理 和 采样器3、在顶点着色器&#xff0c;应用 Tilling 和 Offset4、在片元着色器&#xff0c;纹理采样后&#xff0c;与之前的结果相乘输出 二、能量罩流光1、在顶点着色器&#xff0c;记录原uv值2、在片…

【软件测试】学习笔记-从0到1:API测试怎么做

这篇文章是API测试的基础&#xff0c;先从0到1设计一个API测试用例&#xff0c;通过这个测试用例&#xff0c;体会到最基本的API测试是如何进行的&#xff0c;并介绍几款常用的API测试工具。 API测试的基本步骤 通常来讲&#xff0c;无论采用什么API测试工具&#xff0c;API测…

Spring Boot - Application Events 的发布顺序_ApplicationContextInitializedEvent

文章目录 Pre概述Code源码分析 Pre Spring Boot - Application Events 的发布顺序_ApplicationEnvironmentPreparedEvent Spring Boot - Application Events 的发布顺序_ApplicationEnvironmentPreparedEvent 概述 Spring Boot 的广播机制是基于观察者模式实现的&#xff0c…

Github

文章目录 Github 的作用基本概念创建仓库以及相关介绍创建文件、查看文件信息、编辑程序上传文件搜索文件下载/检出文件 Github 的作用 项目代码托管平台 基本概念 Repository 仓库&#xff0c;用于存放项目代码 *Star 收藏项目&#xff0c;方便下次查看&#xff08;有一百个st…

【谭浩强C程序设计精讲 7】数据的输入输出

文章目录 3.5 数据的输入输出3.5.1 输入输出举例3.5.2 有关数据输入输出的概念3.5.3 用 printf 函数输出数据1. printf 的一般格式2. 格式字符 3.5.4 用 scanf 函数输入数据1. scanf 函数的一般形式2. scanf 函数中的格式声明3. 使用 scanf 函数时应注意的问题 3.5.5 字符输入输…

UI自动化测试工具对企业具有重要意义

随着软件行业的不断发展&#xff0c;企业对高质量、高效率的软件交付有着越来越高的要求。在这个背景下&#xff0c;UI自动化测试工具成为了企业不可或缺的一部分。以下是UI自动化测试工具对企业的重要作用&#xff1a; 1. 提高软件质量 UI自动化测试工具能够模拟用户的操作&am…

K8s---存储卷(动态pv和pvc)

当我要发布pvc可以生成pv&#xff0c;还可以共享服务器上直接生成挂载目录。pvc直接绑定pv。 动态pv需要两个组件 1、卷插件&#xff1a;k8s本生支持的动态pv创建不包括nfs&#xff0c;需要声明和安装一个外部插件 Provisioner: 存储分配器。动态创建pv,然后根据pvc的请求自动…

易安联参与制定的《面向云计算的零信任体系》行业标准即将实施

中华人民共和国工业和信息化部公告2023年第38号文件正式发布行业标准&#xff1a;YD/T 4598.2-2023《面向云计算的零信任体系 第2部分&#xff1a;关键能力要求》及YD/T 4598.3-2023《面向云计算的零信任体系 第3部分&#xff1a;安全访问服务边缘能力要求》&#xff0c;并于20…

npm run dev,vite 配置 ip 访问

启动项目通过本地 ip 的方式访问 方式一.通过修改 package.json "scripts": {"dev": "vite --host 0.0.0.0",}, 方式二.通过修改 vite.config.ts export default defineConfig({plugins: [vue(), vueJsx()],server: { // 配置 host 与 port 方…

electron+vue网页直接播放RTSP视频流?

目前大部分摄像头都支持RTSP协议&#xff0c;但是在浏览器限制&#xff0c;最新版的浏览器都不能直接播放RTSP协议&#xff0c;Electron 桌面应用是基于 Chromium 内核的&#xff0c;所以也不能直接播放RTSP&#xff0c;但是我们又有这个需求怎么办呢&#xff1f; 市场上的方案…

SQL-修改表操作

目录 DDL-表操作-修改 添加字段 &#xff08;方括号内容可选&#xff09; 修改字段 修改指定字段的数据类型 修改字段名和字段类型 删除字段 修改表名 删除表 删除指定表&#xff0c;并重新创建该表 总结 &#x1f389;欢迎您来到我的MySQL基础复习专栏 ☆* o(≧▽≦…

大文件分片上传,断点续传,秒传 示例(待更新...)

1.html代码 <template><div class"card content-box"><el-upload ref"upload" class"upload-demo" action"https://run.mocky.io/v3/9d059bf9-4660-45f2-925d-ce80ad6c4d15":limit"1" :on-change"hand…

30道JVM综合面试题详解含答案(值得珍藏)

1. 描述一下JVM加载Class文件的原理机制? Java中的所有类&#xff0c;都需要由类加载器装载到JVM中才能运行。类加载器本身也是一个类&#xff0c;而它的工作就是把class文件从硬盘读取到内存中。在写程序的时候&#xff0c;我们几乎不需要关心类的加载&#xff0c;因为这些都…

网络编程的理论基础

文章目录 1 重点知识2 应用层3 再谈 "协议"4 HTTP协议4.1 认识URL4.2 urlencode和urldecode4.3 HTTP协议格式4.4 HTTP的方法4.5 HTTP的状态码4.6 HTTP常见Header4.7 最简单的HTTP服务器 3 传输层4 再谈端口号4.1 端口号范围划分4.2 认识知名端口号(Well-Know Port Nu…

环信IM Demo登录方式如何修改为自己项目的?

在环信即时通讯云IM 官网下载Demo&#xff0c;本地运行只有手机验证码的方式登录&#xff1f;怎么更改为自己项目的Appkey和用户去进行登录呢&#xff1f; &#x1f447;&#x1f447;&#x1f447;本文以Web端为例&#xff0c;教大家如何更改代码来实现 1、 VUE2 Demo vue2…