Flink JobGraph构建过程

文章目录

  • 前言
  • JobGraph创建的过程
  • 总结


前言

StreamGraph构建过程中分析了StreamGraph的构建过程,在StreamGraph构建完毕之后会对StreamGraph进行优化构建JobGraph,然后再提交JobGraph。优化过程中,Flink会尝试将尽可能多的StreamNode聚合在一个JobGraph节点中,通过合并创建JobVertex,并生成JobEdge,以减少数据在不同节点之间流动所产生的序列化、反序列化、网络传输的开销。它包含的主要抽象概念有:

1、JobVertex:经过优化后符合条件的多个 StreamNode 可能会 chain 在一起生成一个JobVertex,即一个JobVertex 包含一个或多个 operator,JobVertex 的输入是 JobEdge,输出是IntermediateDataSet。

2、IntermediateDataSet:表示 JobVertex 的输出,即经过 operator 处理产生的数据集。producer 是JobVertex,consumer 是 JobEdge。

3、JobEdge:代表了job graph中的一条数据传输通道。source是IntermediateDataSet,target是 JobVertex。即数据通过JobEdge由IntermediateDataSet传递给目标JobVertex。
在这里插入图片描述


JobGraph创建的过程

AbstractJobClusterExecutor.execute -> PipelineExecutorUtils.getJobGraph  -> 
PipelineTranslator.translateToJobGraph -> StreamGraphTranslator.translateToJobGraph-> StreamGraph.getJobGraph ->  StreamingJobGraphGenerator.createJobGraph

createJobGraph()函数

private JobGraph createJobGraph() {preValidate();jobGraph.setJobType(streamGraph.getJobType());jobGraph.enableApproximateLocalRecovery(streamGraph.getCheckpointConfig().isApproximateLocalRecoveryEnabled());// 为节点生成确定性哈希,以便在提交时识别它们(如果它们没有更改)。.Map<Integer, byte[]> hashes =defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);// Generate legacy version hashes for backwards compatibilityList<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());for (StreamGraphHasher hasher : legacyStreamGraphHashers) {legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));}setChaining(hashes, legacyHashes);setPhysicalEdges();markContainsSourcesOrSinks();setSlotSharingAndCoLocation();setManagedMemoryFraction(Collections.unmodifiableMap(jobVertices),Collections.unmodifiableMap(vertexConfigs),Collections.unmodifiableMap(chainedConfigs),id -> streamGraph.getStreamNode(id).getManagedMemoryOperatorScopeUseCaseWeights(),id -> streamGraph.getStreamNode(id).getManagedMemorySlotScopeUseCases());configureCheckpointing();jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings());final Map<String, DistributedCache.DistributedCacheEntry> distributedCacheEntries =JobGraphUtils.prepareUserArtifactEntries(streamGraph.getUserArtifacts().stream().collect(Collectors.toMap(e -> e.f0, e -> e.f1)),jobGraph.getJobID());for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :distributedCacheEntries.entrySet()) {jobGraph.addUserArtifact(entry.getKey(), entry.getValue());}// 在最后完成ExecutionConfig时设置它try {jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());} catch (IOException e) {}jobGraph.setChangelogStateBackendEnabled(streamGraph.isChangelogStateBackendEnabled());addVertexIndexPrefixInVertexName();setVertexDescription();// Wait for the serialization of operator coordinators and stream config.try {FutureUtils.combineAll(vertexConfigs.values().stream().map(config ->config.triggerSerializationAndReturnFuture(serializationExecutor)).collect(Collectors.toList())).get();waitForSerializationFuturesAndUpdateJobVertices();} catch (Exception e) {throw new FlinkRuntimeException("Error in serialization.", e);}if (!streamGraph.getJobStatusHooks().isEmpty()) {jobGraph.setJobStatusHooks(streamGraph.getJobStatusHooks());}return jobGraph;}

在 StreamGraph 构建 JobGragh 的过程中,最重要的事情就是 operator 的 chain 优化,那么到底什
么样的情况的下 Operator 能chain 在一起呢?

// 1、下游节点的入度为1 (也就是说下游节点没有来自其他节点的输入)
downStreamVertex.getInEdges().size() == 1;
// 2、上下游节点都在同一个 slot group 中
upStreamVertex.isSameSlotSharingGroup(downStreamVertex);
// 3、前后算子不为空
!(downStreamOperator == null || upStreamOperator == null);
// 4、上游节点的 chain 策略为 ALWAYS 或 HEAD(只能与下游链接,不能与上游链接,Source 默认HEAD!upStreamOperator.getChainingStrategy() == ChainingStrategy.NEVER;
// 5、下游节点的 chain 策略为 ALWAYS(可以与上下游链接,map、flatmap、filter 等默认是
ALWAYS!downStreamOperator.getChainingStrategy() != ChainingStrategy.ALWAYS;
// 6、两个节点间物理分区逻辑是 ForwardPartitioner
(edge.getPartitioner() instanceof ForwardPartitioner);
// 7、两个算子间的shuffle方式不等于批处理模式
edge.getShuffleMode() != ShuffleMode.BATCH;
// 8、上下游的并行度一致
upStreamVertex.getParallelism() == downStreamVertex.getParallelism();
// 9、用户没有禁用 chain
streamGraph.isChainingEnabled();

构造边

private void connect(Integer headOfChain, StreamEdge edge, NonChainedOutput output) {physicalEdgesInOrder.add(edge);Integer downStreamVertexID = edge.getTargetId();JobVertex headVertex = jobVertices.get(headOfChain);JobVertex downStreamVertex = jobVertices.get(downStreamVertexID);StreamConfig downStreamConfig = new StreamConfig(downStreamVertex.getConfiguration());downStreamConfig.setNumberOfNetworkInputs(downStreamConfig.getNumberOfNetworkInputs() + 1);StreamPartitioner<?> partitioner = output.getPartitioner();ResultPartitionType resultPartitionType = output.getPartitionType();if (resultPartitionType == ResultPartitionType.HYBRID_FULL|| resultPartitionType == ResultPartitionType.HYBRID_SELECTIVE) {hasHybridResultPartition = true;}checkBufferTimeout(resultPartitionType, edge);JobEdge jobEdge;if (partitioner.isPointwise()) {jobEdge =downStreamVertex.connectNewDataSetAsInput(headVertex,DistributionPattern.POINTWISE,resultPartitionType,opIntermediateOutputs.get(edge.getSourceId()).get(edge).getDataSetId(),partitioner.isBroadcast());} else {jobEdge =downStreamVertex.connectNewDataSetAsInput(headVertex,DistributionPattern.ALL_TO_ALL,resultPartitionType,opIntermediateOutputs.get(edge.getSourceId()).get(edge).getDataSetId(),partitioner.isBroadcast());}// set strategy name so that web interface can show it.jobEdge.setShipStrategyName(partitioner.toString());jobEdge.setForward(partitioner instanceof ForwardPartitioner);jobEdge.setDownstreamSubtaskStateMapper(partitioner.getDownstreamSubtaskStateMapper());jobEdge.setUpstreamSubtaskStateMapper(partitioner.getUpstreamSubtaskStateMapper());if (LOG.isDebugEnabled()) {LOG.debug("CONNECTED: {} - {} -> {}",partitioner.getClass().getSimpleName(),headOfChain,downStreamVertexID);}}

总结

1、在StreamGraph构建完毕之后会开始构建JobGraph,然后再提交JobGraph。

2、StreamingJobGraphGenerator.createJobGraph()是构建JobGraph的核心实现,实现中首先会广度优先遍历StreamGraph,为其中的每个StreamNode生成一个Hash值,如果用户设置了operator的uid,那么就根据uid来生成Hash值,否则系统会自己为每个StreamNode生成一个Hash值。如果用户自己为operator提供了Hash值,也会拿来用。生成Hash值的作用主要应用在从checkpoint中的数据恢复

3、在生成Hash值之后,会调用setChaining()方法,创建operator chain、构建JobGraph顶点JobVertex、边JobEdge、中间结果集IntermediateDataSet的核心方法。

1)、创建StreamNode chain(operator chain)

从source开始,处理出边StreamEdge和target节点(edge的下游节点),递归的向下处理StreamEdge上和target StreamNode,直到找到那条过渡边,即不能再进行chain的那条边为止。那么这中间的StreamNode可以作为一个chain。这种递归向下的方式使得程序先chain完StreamGraph后面的节点,再处理头结点,类似于后序递归遍历。

2)、创建顶点JobVertex

顶点的创建在创建StreamNode chain的过程中,当已经完成了一个StreamNode chain的创建,在处理这个chain的头结点时会创建顶点JobVertex,顶点的JobVertexID根据头结点的Hash值而决定。同时JobVertex持有了chain上的所有operatorID。因为是后续遍历,所有JobVertex的创建过程是从后往前进行创建,即从sink端到source端

3)、创建边JobEdge和IntermediateDataSet

JobEdge的创建是在完成一个StreamNode chain,在处理头结点并创建完顶点JobVertex之后、根据头结点和过渡边进行connect操作时进行的,连接的是当前的JobVertex和下游的JobVertex,因为JobVertex的创建是由下至上的。

根据头结点和边从jobVertices中找到对应的JobGraph的上下游顶点JobVertex,获取过渡边的分区器,创建对应的中间结果集IntermediateDataSet和JobEdge。IntermediateDataSet由上游的顶点JobVertex创建,上游顶点JobVertex作为它的生产者producer,IntermediateDataSet作为上游顶点的输出。JobEdge中持有了中间结果集IntermediateDataSet和下游的顶点JobVertex的引用, JobEdge作为中间结果集IntermediateDataSet的消费者,JobEdge作为下游顶点JobVertex的input。整个过程就是
上游JobVertex——>IntermediateDataSet——>JobEdge——>下游JobVertex

4、接下来就是为顶点设置共享solt组、设置checkpoint配置等操作了,最后返回JobGraph,JobGraph的构建就完毕了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/726382.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Vue 导出前端数据报表为xlsx文件

文章目录 前言一、添加依赖包二、新建导出功能按钮组件三、使用示例前言 导出数据报表基本上有两种形式,第一种是前端请求数据之后,后端将数据导出为文件,前端再将文件下载下来;第二种是前端请求数据之后,前端将数据保存到文件。 本文将讲解第二种方法,前端请求数据之后…

Sharding-JDBC源码解析与vivo的定制开发

作者&#xff1a;vivo IT 平台团队 - Xiong Huanxin Sharding-JDBC是在JDBC层提供服务的数据库中间件&#xff0c;在分库分表场景具有广泛应用。本文对Sharding-JDBC的解析、路由、改写、执行、归并五大核心引擎进行了源码解析&#xff0c;并结合业务实践经验&#xff0c;总结…

vue3之Prop特性注意点

1、Ts中接收父组件传递参数prop的定义写法&#xff1a; <script setup lang"ts">defineProps<{title?: stringlikes?: number}>() </script>2、所有的 props 都遵循着单向绑定原则&#xff0c;props 因父组件的更新而变化&#xff0c;子组件中不…

基于 Vue3 学习状态管理器:pinia

pinia 基本概念 Pinia 是 Vue 的存储库&#xff0c;Pinia和Vuex一样都是是vue的全局状态管理器&#xff0c;它允许跨组件/页面共享状态。实际上&#xff0c;其实Pinia就是Vuex5&#xff0c;官网也说过&#xff0c;为了尊重原作者&#xff0c;所以取名 pinia&#xff0c;而没有…

raylib库在CodeBlocks上的配置

raylib下载 raylib | A simple and easy-to-use library to enjoy videogames programming CodeBlocks

监控易对多云平台的运维管理方案

随着企业信息化建设的不断深入&#xff0c;越来越多的企业开始采用多云策略&#xff0c;以充分利用不同云服务提供商的优势&#xff0c;实现业务的高可用性、灵活性和成本效益。然而&#xff0c;多云环境也带来了运维管理的复杂性&#xff0c;如何有效监控和管理多个云平台的资…

本地部署websocket服务端并结合内网穿透实现固定公网地址连接

文章目录 1. Java 服务端demo环境2. 在pom文件引入第三包封装的netty框架maven坐标3. 创建服务端,以接口模式调用,方便外部调用4. 启动服务,出现以下信息表示启动成功,暴露端口默认99995. 创建隧道映射内网端口6. 查看状态->在线隧道,复制所创建隧道的公网地址加端口号7. 以…

CGAL 5.6.1 - Algebraic Foundations

1. 引言 CGAL 的目标是精确计算非线性对象&#xff0c;特别是定义在代数曲线和曲面上的对象。因此&#xff0c;表示多项式、代数扩展和有限域的类型在相关的实现中扮演着更加重要的角色。为了跟上这些变化&#xff0c;我们引入了这个软件包。由于引入的框架必须特别支持多项式…

【杂言】迟到的 2024 展望

研一下开学已有半月&#xff0c;本来想在家写的新年展望拖到了现在。翻看 2021 年的展望&#xff0c;我发现 flag 基本达成了&#xff08;除了 12 点睡觉&#xff09;&#xff0c;所以给新的一年立下大方向也是很有必要的。也许等到 60 岁我再回看&#xff0c;也是一件趣事吧。…

docker常见命令

命令 说明 docker pull 拉取镜像 docker push 推送镜像到DockerRegistry docker images 查看本地镜像 docker rmi 删除本地镜像 docker run 创建并运行容器&#xff08;不能重复创建&#xff09; docker stop 停止指定容器 docker start 启动指定容器 docker r…

15、技巧之八: 如何确认WebDriver支持哪个版本的Xpath?【Selenium+Python3网页自动化总结】

Firefox目前支持XPath 1.0版本&#xff0c;目前没有计划支持XPath 2.0版本。 曾经想知道给定的WebDriver支持哪个版本的XPath吗&#xff1f;我们当然希望是2.0或3.0版本&#xff0c;但最可能的版本是1.0。正如Selenium维基所述&#xff0c;WebDriver在可能的情况下使用浏览器的…

window环境下使用k8s部署.net core项目

前提&#xff1a;已经部署镜像到Docker 在项目发布目录下新建.yaml文件&#xff0c;内容如下&#xff08;以下仅举例出两种方式内容&#xff0c;可按需自由配置&#xff09; --方式一(创建deployment 、服务、指定命名空间) # ------------------- 注意层级结构&#xff0c;…

如何下载网页中嵌套的PDF

项目场景&#xff1a; 网页中常有发布的PDF文件&#xff0c;只有浏览功能 问题描述 想下载的话有时候会截图&#xff0c;或者联系网站管理员 解决方案&#xff1a; 1.使用谷歌浏览器或者360浏览器极速模式&#xff0c;在当前页面按F12 2.网络&#xff08;Network&#xff09;…

OpenSSL 安全漏洞(CVE-2023-3817)

厂商补丁: 目前厂商已发布升级补丁以修复漏洞&#xff0c;补丁获取链接&#xff1a; https://www.openssl.org/news/secadv/20230731.txt OpenSSL安全建议[2023年7月31日] 检查DH q参数值花费过多时间(CVE-2023-3817) 严重程度:低 问题总结:DH键或参数过长可能导致检查速度很…

电脑远程桌面选项变成灰色没办法勾选怎么办?

有些人在使用Windows系统自带的远程桌面工具时&#xff0c;会发现系统属性远程桌面选项卡中勾选启用“允许远程连接到此计算机”。 导致此问题出现的原因主要是由于组策略或者注册表设置错误造成的。 修复远程桌面选项变灰的两种方法&#xff01; 方法一&#xff1a;设置本地组…

从spark streaming与structured streaming看spark core与spark sql的区别

导读 Spark中针对流式数据处理的方案有&#xff1a; Spark StreamingStructured Streaming 本文通过对比spark streaming与structured streaming&#xff0c;来深入理解spark core与spark sql的区别。 Spark Streaming 基于微批(DStream) Spark Streaming是基于微批(Micro batc…

linux-rpm命令

rpm命令管理程序包&#xff1a;安装、升级、卸载、查询和校验 1、忽略依赖关系安装/卸载包 安装&#xff1a;rpm -Uvh 软件包名 --nodeps 卸载&#xff1a;rpm -e 软件包名 --nodes&#xff01;&#xff01;&#xff01;&#xff01;慎用&#xff01;&#xff01;&#xff01…

Qt之Gui的事件转换

QGuiApplication的实现类QGuiApplicationPrivate方法processWindowSystemEvent处理window系统事件 static void processWindowSystemEvent(QWindowSystemInterfacePrivate::WindowSystemEvent *e);在QWindowSystemInterface的处理事件方法中会调用上面的processWindowSystemEv…

1.【Labview白话系列】Labview数组精讲

题主经过写文章一段时间的发现&#xff0c;许多同学对该软件的理解和编程能力是不太一样的&#xff0c;有些知识相对一些同学较为简单&#xff0c;但是有些同学提问就比较困难。那么针对这个问题&#xff0c;题主打算出一期说白话系列的专栏&#xff0c;在该栏目中用最通俗的大…