Flink作业执行之 4.JobGraph

Flink作业执行之 4.JobGraph

1. 入口

前文了解了由Transformation到StreamGraph的过程,StreamGraph即作业的逻辑拓扑结构。
生成逻辑结构后,接下来的操作往往是对逻辑结构的优化。在很多组件中都是这样的处理,如hive、spark等都会执行“逻辑计划->优化计划->物理计划”的处理过程。

从StreamGraph到JobGraph的过程中很重要的事情是将节点组成节点链。

env.execute()方法中调用executeAsync方法完成JobGraph实例的生成。

@Internal
public JobClient executeAsync(StreamGraph streamGraph) throws Exception {checkNotNull(streamGraph, "StreamGraph cannot be null.");// 根据启动方式得到具体的PipelineExecutor子实例final PipelineExecutor executor = getPipelineExecutor();CompletableFuture<JobClient> jobClientFuture =// execute方法内生成JobGraph,并提到了JobManager中executor.execute(streamGraph, configuration, userClassloader);// ...
}

PipelineExecutor接口根据提供的配置执行管道。接口内只有一个execute方法。execute方法负责两件事,首先创建JobGraph,然后提交JobGraph到JobManager。

接口实现体系如下,分别对应不同的作业提交方式。

在这里插入图片描述
在IDEA中启动作业时,使用的是LocalExecutor,其execute方法实现如下。

@Override
public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration, ClassLoader userCodeClassloader)throws Exception {// ... // 生成JobGraph实例final JobGraph jobGraph = getJobGraph(pipeline, effectiveConfig, userCodeClassloader);// 将JobGraph提交到集群中return PerJobMiniClusterFactory.createWithFactory(effectiveConfig, miniClusterFactory).submitJob(jobGraph, userCodeClassloader);
}

不同的ipelineExecutor实现,提交JobGraph的方式所有不同,但是创建JobGrpah的方式是相同。均使用FlinkPipelineTranslatortranslateToJobGraph方式完成。

2. StreamGraphTranslator

FlinkPipelineTranslator接口负责将Pipeline转化为JobGraph。根据批或流场景分别有不同的实现。

在这里插入图片描述

StreamingJobGraphGenerator负责完成StreamGraph到JobGraph的转换。

// FlinkPipelineTranslator(StreamGraphTranslator)
public JobGraph translateToJobGraph(Pipeline pipeline, Configuration optimizerConfiguration, int defaultParallelism) {checkArgument(pipeline instanceof StreamGraph, "Given pipeline is not a DataStream StreamGraph.");//调用StreamGraph方法StreamGraph streamGraph = (StreamGraph) pipeline;return streamGraph.getJobGraph(userClassloader, null);
}// StreamGraph
public JobGraph getJobGraph(ClassLoader userClassLoader, @Nullable JobID jobID) {return StreamingJobGraphGenerator.createJobGraph(userClassLoader, this, jobID);
}

回顾TransformationTranslator负责Transformation到StreamGraph转换,可以暂时归纳为,一般使用AaTranslator的封装完成由Aa到Bb的转换。方便对源码中某些类的理解。
StreamGraph和JobGraph生成过程封装类的相似之处,二者都提供了相应的Generator和Translator封装类,不同之处在于调用顺序的不同。

在这里插入图片描述

3. JobGraph

JobGraph表示flink数据流程序,处于JobManager接受的低级别。来自更高级别所API的所有程序都将转化为JobGraph。

JobGraph是从Client端到JobManager的最后一层封装,无论批或流作业都会转成JobGraph之后提交到JobManager。目的是为了统一处理批和流处理。

JobGraph同样表示DAG,包含节点JobVertex和边JobEdge。与StreamGraph相比,还多了表示中间结果集的IntermediateDataSet。因此本质上是将节点和中间结果集相连得到DAG。
IntermediateDataSet由JobVertex产生,由JobEdge消费其中的数据。

JobGraph中节点、边和中间结果集的关系示意如下。

在这里插入图片描述

节点之间的联系如下。

在这里插入图片描述
JobGraph中重要属性如下。

// job id和名称
private JobID jobID;
private final String jobName;
// 作业类型,流或批
private JobType jobType = JobType.BATCH;
// JobGraph中包含的节点
private final Map<JobVertexID, JobVertex> taskVertices = new LinkedHashMap<JobVertexID, JobVertex>();

3.1. JobVertex

JobGraph中的节点,会将符合条件的多个StreamNode链接在一起生成一个JobVertex。

JobVertex中重要属性如下。

// id,使用hash值表示,区别于StreamNode的id,StreamNode的id直接使用Transformation的id,是自增数字。
private final JobVertexID id;
// 名称
private String name;
// 输入,节点所有的输入边
private final ArrayList<JobEdge> inputs = new ArrayList<>();
// 输出,节点输出的中间结果集
private final Map<IntermediateDataSetID, IntermediateDataSet> results = new LinkedHashMap<>();
// 节点中包含的全部算子id,按深度优先的顺序对节点进行存储
private final List<OperatorIDPair> operatorIDs;
// taskManager根据该类名创建执行任务的类的对象(反射)。取StreamNode中vertexClass值,即StreamTask子类。
private String invokableClassName;

JobVertex使用JobEdge表示输入,使用IntermediateDataSet表示输出。这点区别于StreamNode,StreamNode并没有严格的输入输出概念,只有上下游概念。

operatorIDs中元素个数等于JobVertex中包含的StreamNede节点个数。
OperatorIDPair类用于维护JobVertex中的节点信息,包含两个属性,表示节点id。

// 节点hash值
private final OperatorID generatedOperatorID;
private final OperatorID userDefinedOperatorID;

3.2. IntermediateDataSet

逻辑结构,表示JobVertex的输出,JobVertex产生结果集的个数与对应SteamNode的出边相同。

// id,类型同JobVertexID
private final IntermediateDataSetID id;
// 产生该结果集的JobVertex,在构造方法中完成赋值
private final JobVertex producer;
// 下游消费结果集的JobEdge,产生JobEdge后调用其addConsumer方法赋值
private final List<JobEdge> consumers = new ArrayList<>();

3.3. JobEdge

JobGraph中的JobEdge除表示边含义,连接上游生产的中间数据集和下游的JobVertex之外,更多的表示一种通信通道,上游JobVertex生产的中间结果集,由JobEdge消费数据至下游JobVertex。

// 下游节点
private final JobVertex target;
// 上游结果集
private final IntermediateDataSet source;
// 边的分布模式,决定生产任务的哪些子任务连接到那些消费子任务,
// ALL_TO_ALL:每个下游任务都消费这条边的数据,每个下游任务都消费这条边的数据,
// POINTWISE:一个或多个下游任务消费这条边的数据
private final DistributionPattern distributionPattern;

4. 生成JobGraph

4.1. StreamingJobGraphGenerator

上文提到在流场景中,StreamingJobGraphGenerator负责完成StreamGraph到JobGraph的转换。接下来重点了解下实现过程。

StreamingJobGraphGenerator中关键字段如下,主要为表示链接的信息。

// 生成JobGraph的streamGraph
private final StreamGraph streamGraph;
// 结果JobGraph
private final JobGraph jobGraph;
// JobGraph中的节点集合,key=StreamNode id
private final Map<Integer, JobVertex> jobVertices;
// 已经处理完成的StreamNode id
private final Collection<Integer> builtVertices;
// 物理边,无法连接的两个StreaNode之间的边将成为物理边
private final List<StreamEdge> physicalEdgesInOrder;
// 保存链接信息
private final Map<Integer, Map<Integer, StreamConfig>> chainedConfigs;
// 保存链接的名称
private final Map<Integer, String> chainedNames;
// 保存节点信息
private final Map<Integer, StreamConfig> vertexConfigs;

createJobGraph方法中完成JobGraph的转换,该方法中涉及的内容较多。节点、边和中间结果集的创建将由setChaining方法完成。

4.1.1. 创建JobVertex

而StreamNode的id直接使用Transformation的id是自增数字,而JobVertex使用根StreamNode生成的hash值作为id,这点有所不同。因此首先会计算全部StreamNode的hash值,并维护StreamNode id和hash值的映射关系。为了向后兼容,还会为每个节点生成老版本的Hash值。

在代码中可以看到hasheslegacyHashes两个集合变量,分别存储新和老的hash值映射关系。

以StreamNode中的Source节点作为起点,通过递归方式处理全部节点。将连续且可进行连接的节点组合在一起生成一个JobVertex节点。举个例子,假如有上下游关系为1->2->3->4->5的节点,且3到4之间进行keyBy等操作,则1、2和3将链接在一起生成一个JobVertex节点,4和5将生成一个JobVertex节点。

通过Map<Integer, OperatorChainInfo>集合维护每个JobVertex中全部StreamNode的节点关系。
key为起点StreamNode id,key个数将和作业中生成算子链个数一致,初始时会将Source节点加入到集合中。
value放入一个封装类,用于createChain方法递归调用期间帮助维护operator chain的信息。

OperatorChainInfo类是定义在StreamingJobGraphGenerator中的内部类,其chainedOperatorHashes属性表示同一个算子链中全部的算子连接关系。

private static class OperatorChainInfo {// OperatorChain的起点idprivate final Integer startNodeId;// 上文提到的StreamNode id和hash值的映射关系private final Map<Integer, byte[]> hashes;// 老版本的hash值映射关系private final List<Map<Integer, byte[]>> legacyHashes;// 链接到一起的算子关系,key是算子链的开始节点,list部分存放链接关系// f0是当前节点的hash值,f1是legacyHashes相关的数据,大多数为nullprivate final Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes;private final Map<Integer, ChainedSourceInfo> chainedSources;private final List<OperatorCoordinator.Provider> coordinatorProviders;private final StreamGraph streamGraph;private OperatorChainInfo(int startNodeId,Map<Integer, byte[]> hashes,List<Map<Integer, byte[]>> legacyHashes,Map<Integer, ChainedSourceInfo> chainedSources,StreamGraph streamGraph) {this.startNodeId = startNodeId;this.hashes = hashes;this.legacyHashes = legacyHashes;this.chainedOperatorHashes = new HashMap<>();this.coordinatorProviders = new ArrayList<>();this.chainedSources = chainedSources;this.streamGraph = streamGraph;}
}

createChain方法负责递归操作,递归过程为后序位置的处理方式,后序位置指的是先向下进行递归调用,直到不满足继续递归条件为止,然后按照递归过程由内向外方向执行每个节点的处理逻辑,后序位置的递归示意如下。

public void recursion(){if(condition == true){// 递归调用recursion();}// 处理逻辑// ...
}

createChain方法内部通过3个集合变量来维护节点下游信息。
每次递归处理时,依次判断与当前节点直接相连的下游节点是否可以与当前节点连接到一起,并分别加入到各自的集合中。然后对各自集合进行遍历递归处理。

private List<StreamEdge> createChain(StreamNode) {// 物理出边,即算子链和算子链之间的边,算子链内部不存在边,一个算子链是一个整体。// 物流出边,只有存在无法和前节点链接的节点时,当前边才会成为出边List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();// 可以被链接的出边List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();// 不可以被链接的出边List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();StreanNode currentNode = ;// 遍历当前节点全部出边,判断是否可链接for (SteamEdge outEdge : 当前节点的全部出边) {if (isChainable(outEdge)) {// 可链接的边加入到chainableOutputs集合中chainableOutputs.add(outEdge);} else {// 无法连接的边加入到nonChainableOutputs集合中nonChainableOutputs.add(outEdege);}}for (StreamEdge chainable : chainableOutputs) {// 递归调用transitiveOutEdges.addAll(createChain(StreamNode));}for (StreamEdge nonChainable : nonChainableOutputs) {// 无法连接的将生成物理边,链接JobVertext节点transitiveOutEdges.add(nonChainable);// 递归调用createChain(StreamNode);}// ===下面逻辑为节点具体的转化逻辑===// 生成链接名称,并加入到chainedNames中chainedNames.put(key,name);// 将当前StreamNode加入到OperatorChainInfo的chainedOperatorHashes集合中chainedOperatorHashes.put(startNodeId,list);StreamConfig config = currentNodeId.equals(startNodeId)// 当处理到算子链的第一个节点时,生成JobVertex? createJobVertex(startNodeId, chainInfo): new StreamConfig(new Configuration());// StreamConfig设置config.set...;// 算子链的起点时if (currentNodeId.equals(startNodeId)) {// 遍历出边,在connect方法中生成jobEdge和ntermediateDataSetfor (StreamEdge edge : transitiveOutEdges) {NonChainedOutput output =opIntermediateOutputs.get(edge.getSourceId()).get(edge);connect(startNodeId, edge, output);}}return transitiveOutEdges
}

判断下游节点是否可以与当前节点进行链接由isChainable方法完成,首先需满足下游节点的入边为1的前提条件,满足该条件时,再根据上下游节点是否在相同slot、算子类型、上游算子ChainingStrategy类型、分区器、执行模型、并行度、StreamGraph是否允许链接等信息进行判断。

从每个算子链的最后一个节点开始处理链接关系。当处理到算子链的起点时,执行createJobVertex(startNodeId, chainInfo)逻辑来生成JobVertex,将JobVertex加入到JobGraph的jobVertices集合中。

4.1.2. 构键JobEdge和IntermediateDataSet

处理到算子链的起点时,遍历物理出边,依次生成JobEdge和IntermediateDataSet。connect方法完成此逻辑,核心逻辑如下伪代码所示。

private void connect(Integer headOfChain, StreamEdge edge, NonChainedOutput output) {// 将边加入到physicalEdgesInOrder集合中physicalEdgesInOrder.add(edge);// 上游算子链JobVertex headVertex = jobVertices.get(headOfChain);// 下游算子链JobVertex downStreamVertex = jobVertices.get(edge.getTargetId());// JobEdgeJobEdge jobEdge = downStreamVertex.connectNewDataSetAsInput(headVertex,DistributionPattern.POINTWISE,resultPartitionType,opIntermediateOutputs.get(edge.getSourceId()).get(edge).getDataSetId(),partitioner.isBroadcast());
}// 生成IntermediateDataSet和边,并将二者加入到JobVextex中相应的集合中
public JobEdge connectNewDataSetAsInput(JobVertex input,DistributionPattern distPattern,ResultPartitionType partitionType,IntermediateDataSetID intermediateDataSetId,boolean isBroadcast) {// 生成结果集的同时,将节点作为结果集的生产者IntermediateDataSet dataSet =input.getOrCreateResultDataSet(intermediateDataSetId, partitionType);JobEdge edge = new JobEdge(dataSet, this, distPattern, isBroadcast);this.inputs.add(edge);// 边加入到结果集的消费者中dataSet.addConsumer(edge);return edge;
}

将JobVextex添加到IntermediateDataSet的produder属性中,将JobEdge加入到consumer属性中,分别作为中间结果集的生产者和消费者。

以上即为JobGrpah、JobVertex、JobEdge和IntermediateDataSet生成的简要逻辑。

4.1.3. 举个例子

生成JobGraph的代码逻辑比较复杂,生成各自实例后,还有相应的环境、配置等信息。本文主要对骨架逻辑进行了简要说明。通过一个稍复杂一些的示例来理解下上述过程。
假如有如下结构的StreamGraph,示例中边上存在红色X标记表示节点之间无法连接成算子链。

在这里插入图片描述

  • 从source节点开始递归处理,当递归到节点2时,节点2存在3条出边,节点8无法与节点2链接,因此e2、e5加入到chainableOutputs,e9加入到nonChainableOutputs。然后分别遍历两个集合。
  • e2方向当递归到节点3,节点4无法与节点3进行连接,e3将会成为一条物理边,并从节点4开始生成新的JobVertex,4->sink1节点生成JobVertex1。
  • e5方向同理,e7将成为物理边,7->sink1节点生成JobVertex2。
  • e9方向,e9为物理边,8->sink2节点生成JobVertex3。
  • 当节点2的下游节点全部处理完成,回到e2节点,再回到Source节点时,Source作为最开始算子链的起点,将开始生成最后一个JobVertex4,包含source、2、3、5、6节点。JobVertex4包含3条出边,因此将生成3个中件结果集和3条边,链接上面生成的JobVertex1、JobVertex2、JobVertex3。

最终生成的JobGrhap示例如下所示。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/37560.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

os实训课程模拟考试(选择题复习)

目录 一、操作系统的基本功能和设计目标 &#xff08;1&#xff09;基础知识 &#xff08;2&#xff09;题目与答案 1、操作系统是一组 B &#xff08;单选&#xff09; 2、以下哪项不是操作系统关心的主要问题&#xff1f;D &#xff08;单选&#xff09; 3、下列关于…

使用SpringBoot整合filter

SpringBoot整合filter&#xff0c;和整合servlet类似&#xff0c;也有两种玩儿法 1、创建一个SpringBoot工程&#xff0c;在工程中创建一个filter过滤器&#xff0c;然后用注解WebFilter配置拦截的映射 2、启动类还是使用ServletComponentScan注解来扫描拦截器注解WebFilter 另…

Windows系统开启自带虚拟机功能Hyper-V

前言 最近有小伙伴咨询&#xff1a;Windows系统上有自带的虚拟机软件吗&#xff1f; 答案肯定是有的。它就是Hyper-V&#xff0c;但很多小伙伴都不知道怎么打开这个功能。 今天小白就带大家来看看如何正确打开这个Windows自带的虚拟机功能Hyper-V。 开始之前&#xff0c;你…

基于Spring Boot与Vue的智能房产匹配平台+文档

博主介绍&#xff1a;✌在职Java研发工程师、专注于程序设计、源码分享、技术交流、专注于Java技术领域和毕业设计✌ 温馨提示&#xff1a;文末有 CSDN 平台官方提供的老师 Wechat / QQ 名片 :) Java精品实战案例《700套》 2025最新毕业设计选题推荐&#xff1a;最热的500个选题…

学会python——在Excel中生成图表数据(python实例十五)

目录 1.认识Python 2.环境与工具 2.1 python环境 2.2 Visual Studio Code编译 3.生成表格数据 3.1 代码构思 3.2 代码示例 4.绘制图表 4.1 代码构思 4.2 代码示例 5.总结 1.认识Python Python 是一个高层次的结合了解释性、编译性、互动性和面向对象的脚本语言。 P…

Shell 脚本编程保姆级教程(下)

七、Shell 流程控制 7.1 if #!/bin/bash num1100 if test $[num1] 100 thenecho num1 是 100 fi 7.2 if else #!/bin/bash num1100 num2100 if test $[num1] -eq $[num2] thenecho 两个数相等&#xff01; elseecho 两个数不相等&#xff01; fi 7.3 if else-if else #!/…

电影APP——项目建议书参考

项目建议书 1. 前言1.1 实现目标1.2 项目应用范围1.3 项目名称 2. 概述2.1 国内外发展综述2.2 拟解决的问题2.2.1 业务问题2.2.2 技术需求 2.3 系统环境需求2.3.1 网络需求描述2.3.2 业务需求描述2.3.3 运行环境/用户描述 2.4 功能建议2.4.1应用场景描述2.4.2功能划分/功能模型…

性能之巅的巴比达内网穿透访问单位的web管理系统

在这个数字化飞速发展的时代&#xff0c;作为一名IT部门的小主管&#xff0c;我经常面临着一项挑战&#xff1a;如何在外网环境下高效、安全地访问我们单位内部部署的Web管理系统。这不仅仅是关乎我个人的工作效率&#xff0c;更是影响到整个团队能否快速响应市场需求的关键。直…

650V 1200V 碳化硅MOS TO247 封装 内阻30毫欧 40 80毫欧

650V 1200V 碳化硅MOS TO247 封装 内阻30毫欧 40 80毫欧

LangChain E-Mails with LLM

题意&#xff1a;通过LangChain使用大型语言模型&#xff08;LLM&#xff09;处理电子邮件 问题背景&#xff1a; I am quite new to LangChain and Python as im mainly doing C# but i am interested in using AI on my own data. So i wrote some python code using langch…

如何安装和卸载软件?

如何安装和卸载软件&#xff1f; &#x1f4bb; 如何安装和卸载软件&#xff1f;——默语的详细教程摘要引言正文内容&#x1f5a5;️ 在Windows上安装和卸载软件安装软件卸载软件 &#x1f34f; 在Mac上安装和卸载软件安装软件卸载软件 &#x1f914; QA环节&#x1f4dd; 表格…

QT QThread 线程类的使用及示例

QThread 是 Qt 框架提供的一个用于处理多线程的类&#xff0c;它允许开发者编写具有并发功能的应用程序&#xff0c;提高程序的响应速度、执行效率和用户体验。 在操作系统中&#xff0c;线程是进程内的执行单元&#xff0c;拥有独立的执行路径。每个线程有自己独立的栈空间&a…

从零开始学Spring Boot系列-集成Spring Security实现用户认证与授权

在Web应用程序中&#xff0c;安全性是一个至关重要的方面。Spring Security是Spring框架的一个子项目&#xff0c;用于提供安全访问控制的功能。通过集成Spring Security&#xff0c;我们可以轻松实现用户认证、授权、加密、会话管理等安全功能。本篇文章将指导大家从零开始&am…

日期类(java)

文章目录 第一代日期类 Date常用构造方法SimpleDateFormat 日期格式化类日期转字符串&#xff08;String -> Date)字符串转日期 (String->Date) 第二代日期类 Calendar常用字段与如何得到实例对象相关 API 第三代日期类&#xff08;LocalDate\TIme)日期&#xff0c;时间&…

springboot + Vue前后端项目(第二十一记)

项目实战第二十一记 写在前面1. springboot文件默认传输限制2. 安装视频插件包命令3. 前台Video.vue4. 创建视频播放组件videoDetail.vue5. 路由6. 效果图总结写在最后 写在前面 本篇主要讲解系统集成视频播放插件 1. springboot文件默认传输限制 在application.yml文件中添…

pip安装neuralcoref失败ERROR

最终解决的方法如下&#xff1a; git clone https://github.com/huggingface/neuralcoref.git cd neuralcoref pip install -r requirements.txt python setup.py install 原始步骤&#xff1a; 安装 neuralcoref 的依赖&#xff1a; 安装编译 neuralcoref 所需的依赖项&am…

boost asio异步服务器(4)处理粘包

粘包的产生 当客户端发送多个数据包给服务器时&#xff0c;服务器底层的tcp接收缓冲区收到的数据为粘连在一起的。这种情况的产生通常是服务器端处理数据的速率不如客户端的发送速率的情况。比如&#xff1a;客户端1s内连续发送了两个hello world&#xff01;,服务器过了2s才接…

MCU解决800V电动汽车牵引逆变器的常见设计挑战的3种方式

电动汽车 (EV) 牵引逆变器是电动汽车的。它将高压电池的直流电转换为多相&#xff08;通常为三相&#xff09;交流电以驱动牵引电机&#xff0c;并控制制动产生的能量再生。电动汽车电子产品正在从 400V 转向 800V 架构&#xff0c;这有望实现&#xff1a; 快速充电 – 在相同…

绝了!Stable Diffusion做AI治愈图片视频,用来做副业简直无敌!10分钟做一个爆款视频保姆教程

一 项目分析 这个治愈类视频的玩法是通过AI生成日常生活场景&#xff0c;制作的vlog&#xff0c;有这样的一个号&#xff0c;发布了几条作品&#xff0c;就涨粉了2000多&#xff0c;点赞7000多&#xff0c;非常的受欢迎。 下面给大家看下这种作品是什么样的&#xff0c;如图所…

探索高效开发神器:Blackbox AI(免费编程助手)

人不走空 &#x1f308;个人主页&#xff1a;人不走空 &#x1f496;系列专栏&#xff1a;算法专题 ⏰诗词歌赋&#xff1a;斯是陋室&#xff0c;惟吾德馨 &#x1f916; 想要代码生成&#xff1f;&#x1f44c; &#x1f4ac; 需要和AI聊天解决难题&#xff1f;&#…