Flink JobGraph构建过程

文章目录

  • 前言
  • JobGraph创建的过程
  • 总结


前言

StreamGraph构建过程中分析了StreamGraph的构建过程,在StreamGraph构建完毕之后会对StreamGraph进行优化构建JobGraph,然后再提交JobGraph。优化过程中,Flink会尝试将尽可能多的StreamNode聚合在一个JobGraph节点中,通过合并创建JobVertex,并生成JobEdge,以减少数据在不同节点之间流动所产生的序列化、反序列化、网络传输的开销。它包含的主要抽象概念有:

1、JobVertex:经过优化后符合条件的多个 StreamNode 可能会 chain 在一起生成一个JobVertex,即一个JobVertex 包含一个或多个 operator,JobVertex 的输入是 JobEdge,输出是IntermediateDataSet。

2、IntermediateDataSet:表示 JobVertex 的输出,即经过 operator 处理产生的数据集。producer 是JobVertex,consumer 是 JobEdge。

3、JobEdge:代表了job graph中的一条数据传输通道。source是IntermediateDataSet,target是 JobVertex。即数据通过JobEdge由IntermediateDataSet传递给目标JobVertex。
在这里插入图片描述


JobGraph创建的过程

AbstractJobClusterExecutor.execute -> PipelineExecutorUtils.getJobGraph  -> 
PipelineTranslator.translateToJobGraph -> StreamGraphTranslator.translateToJobGraph-> StreamGraph.getJobGraph ->  StreamingJobGraphGenerator.createJobGraph

createJobGraph()函数

private JobGraph createJobGraph() {preValidate();jobGraph.setJobType(streamGraph.getJobType());jobGraph.enableApproximateLocalRecovery(streamGraph.getCheckpointConfig().isApproximateLocalRecoveryEnabled());// 为节点生成确定性哈希,以便在提交时识别它们(如果它们没有更改)。.Map<Integer, byte[]> hashes =defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);// Generate legacy version hashes for backwards compatibilityList<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());for (StreamGraphHasher hasher : legacyStreamGraphHashers) {legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));}setChaining(hashes, legacyHashes);setPhysicalEdges();markContainsSourcesOrSinks();setSlotSharingAndCoLocation();setManagedMemoryFraction(Collections.unmodifiableMap(jobVertices),Collections.unmodifiableMap(vertexConfigs),Collections.unmodifiableMap(chainedConfigs),id -> streamGraph.getStreamNode(id).getManagedMemoryOperatorScopeUseCaseWeights(),id -> streamGraph.getStreamNode(id).getManagedMemorySlotScopeUseCases());configureCheckpointing();jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings());final Map<String, DistributedCache.DistributedCacheEntry> distributedCacheEntries =JobGraphUtils.prepareUserArtifactEntries(streamGraph.getUserArtifacts().stream().collect(Collectors.toMap(e -> e.f0, e -> e.f1)),jobGraph.getJobID());for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :distributedCacheEntries.entrySet()) {jobGraph.addUserArtifact(entry.getKey(), entry.getValue());}// 在最后完成ExecutionConfig时设置它try {jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());} catch (IOException e) {}jobGraph.setChangelogStateBackendEnabled(streamGraph.isChangelogStateBackendEnabled());addVertexIndexPrefixInVertexName();setVertexDescription();// Wait for the serialization of operator coordinators and stream config.try {FutureUtils.combineAll(vertexConfigs.values().stream().map(config ->config.triggerSerializationAndReturnFuture(serializationExecutor)).collect(Collectors.toList())).get();waitForSerializationFuturesAndUpdateJobVertices();} catch (Exception e) {throw new FlinkRuntimeException("Error in serialization.", e);}if (!streamGraph.getJobStatusHooks().isEmpty()) {jobGraph.setJobStatusHooks(streamGraph.getJobStatusHooks());}return jobGraph;}

在 StreamGraph 构建 JobGragh 的过程中,最重要的事情就是 operator 的 chain 优化,那么到底什
么样的情况的下 Operator 能chain 在一起呢?

// 1、下游节点的入度为1 (也就是说下游节点没有来自其他节点的输入)
downStreamVertex.getInEdges().size() == 1;
// 2、上下游节点都在同一个 slot group 中
upStreamVertex.isSameSlotSharingGroup(downStreamVertex);
// 3、前后算子不为空
!(downStreamOperator == null || upStreamOperator == null);
// 4、上游节点的 chain 策略为 ALWAYS 或 HEAD(只能与下游链接,不能与上游链接,Source 默认HEAD!upStreamOperator.getChainingStrategy() == ChainingStrategy.NEVER;
// 5、下游节点的 chain 策略为 ALWAYS(可以与上下游链接,map、flatmap、filter 等默认是
ALWAYS!downStreamOperator.getChainingStrategy() != ChainingStrategy.ALWAYS;
// 6、两个节点间物理分区逻辑是 ForwardPartitioner
(edge.getPartitioner() instanceof ForwardPartitioner);
// 7、两个算子间的shuffle方式不等于批处理模式
edge.getShuffleMode() != ShuffleMode.BATCH;
// 8、上下游的并行度一致
upStreamVertex.getParallelism() == downStreamVertex.getParallelism();
// 9、用户没有禁用 chain
streamGraph.isChainingEnabled();

构造边

private void connect(Integer headOfChain, StreamEdge edge, NonChainedOutput output) {physicalEdgesInOrder.add(edge);Integer downStreamVertexID = edge.getTargetId();JobVertex headVertex = jobVertices.get(headOfChain);JobVertex downStreamVertex = jobVertices.get(downStreamVertexID);StreamConfig downStreamConfig = new StreamConfig(downStreamVertex.getConfiguration());downStreamConfig.setNumberOfNetworkInputs(downStreamConfig.getNumberOfNetworkInputs() + 1);StreamPartitioner<?> partitioner = output.getPartitioner();ResultPartitionType resultPartitionType = output.getPartitionType();if (resultPartitionType == ResultPartitionType.HYBRID_FULL|| resultPartitionType == ResultPartitionType.HYBRID_SELECTIVE) {hasHybridResultPartition = true;}checkBufferTimeout(resultPartitionType, edge);JobEdge jobEdge;if (partitioner.isPointwise()) {jobEdge =downStreamVertex.connectNewDataSetAsInput(headVertex,DistributionPattern.POINTWISE,resultPartitionType,opIntermediateOutputs.get(edge.getSourceId()).get(edge).getDataSetId(),partitioner.isBroadcast());} else {jobEdge =downStreamVertex.connectNewDataSetAsInput(headVertex,DistributionPattern.ALL_TO_ALL,resultPartitionType,opIntermediateOutputs.get(edge.getSourceId()).get(edge).getDataSetId(),partitioner.isBroadcast());}// set strategy name so that web interface can show it.jobEdge.setShipStrategyName(partitioner.toString());jobEdge.setForward(partitioner instanceof ForwardPartitioner);jobEdge.setDownstreamSubtaskStateMapper(partitioner.getDownstreamSubtaskStateMapper());jobEdge.setUpstreamSubtaskStateMapper(partitioner.getUpstreamSubtaskStateMapper());if (LOG.isDebugEnabled()) {LOG.debug("CONNECTED: {} - {} -> {}",partitioner.getClass().getSimpleName(),headOfChain,downStreamVertexID);}}

总结

1、在StreamGraph构建完毕之后会开始构建JobGraph,然后再提交JobGraph。

2、StreamingJobGraphGenerator.createJobGraph()是构建JobGraph的核心实现,实现中首先会广度优先遍历StreamGraph,为其中的每个StreamNode生成一个Hash值,如果用户设置了operator的uid,那么就根据uid来生成Hash值,否则系统会自己为每个StreamNode生成一个Hash值。如果用户自己为operator提供了Hash值,也会拿来用。生成Hash值的作用主要应用在从checkpoint中的数据恢复

3、在生成Hash值之后,会调用setChaining()方法,创建operator chain、构建JobGraph顶点JobVertex、边JobEdge、中间结果集IntermediateDataSet的核心方法。

1)、创建StreamNode chain(operator chain)

从source开始,处理出边StreamEdge和target节点(edge的下游节点),递归的向下处理StreamEdge上和target StreamNode,直到找到那条过渡边,即不能再进行chain的那条边为止。那么这中间的StreamNode可以作为一个chain。这种递归向下的方式使得程序先chain完StreamGraph后面的节点,再处理头结点,类似于后序递归遍历。

2)、创建顶点JobVertex

顶点的创建在创建StreamNode chain的过程中,当已经完成了一个StreamNode chain的创建,在处理这个chain的头结点时会创建顶点JobVertex,顶点的JobVertexID根据头结点的Hash值而决定。同时JobVertex持有了chain上的所有operatorID。因为是后续遍历,所有JobVertex的创建过程是从后往前进行创建,即从sink端到source端

3)、创建边JobEdge和IntermediateDataSet

JobEdge的创建是在完成一个StreamNode chain,在处理头结点并创建完顶点JobVertex之后、根据头结点和过渡边进行connect操作时进行的,连接的是当前的JobVertex和下游的JobVertex,因为JobVertex的创建是由下至上的。

根据头结点和边从jobVertices中找到对应的JobGraph的上下游顶点JobVertex,获取过渡边的分区器,创建对应的中间结果集IntermediateDataSet和JobEdge。IntermediateDataSet由上游的顶点JobVertex创建,上游顶点JobVertex作为它的生产者producer,IntermediateDataSet作为上游顶点的输出。JobEdge中持有了中间结果集IntermediateDataSet和下游的顶点JobVertex的引用, JobEdge作为中间结果集IntermediateDataSet的消费者,JobEdge作为下游顶点JobVertex的input。整个过程就是
上游JobVertex——>IntermediateDataSet——>JobEdge——>下游JobVertex

4、接下来就是为顶点设置共享solt组、设置checkpoint配置等操作了,最后返回JobGraph,JobGraph的构建就完毕了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/726382.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Sharding-JDBC源码解析与vivo的定制开发

作者&#xff1a;vivo IT 平台团队 - Xiong Huanxin Sharding-JDBC是在JDBC层提供服务的数据库中间件&#xff0c;在分库分表场景具有广泛应用。本文对Sharding-JDBC的解析、路由、改写、执行、归并五大核心引擎进行了源码解析&#xff0c;并结合业务实践经验&#xff0c;总结…

基于 Vue3 学习状态管理器:pinia

pinia 基本概念 Pinia 是 Vue 的存储库&#xff0c;Pinia和Vuex一样都是是vue的全局状态管理器&#xff0c;它允许跨组件/页面共享状态。实际上&#xff0c;其实Pinia就是Vuex5&#xff0c;官网也说过&#xff0c;为了尊重原作者&#xff0c;所以取名 pinia&#xff0c;而没有…

raylib库在CodeBlocks上的配置

raylib下载 raylib | A simple and easy-to-use library to enjoy videogames programming CodeBlocks

本地部署websocket服务端并结合内网穿透实现固定公网地址连接

文章目录 1. Java 服务端demo环境2. 在pom文件引入第三包封装的netty框架maven坐标3. 创建服务端,以接口模式调用,方便外部调用4. 启动服务,出现以下信息表示启动成功,暴露端口默认99995. 创建隧道映射内网端口6. 查看状态->在线隧道,复制所创建隧道的公网地址加端口号7. 以…

CGAL 5.6.1 - Algebraic Foundations

1. 引言 CGAL 的目标是精确计算非线性对象&#xff0c;特别是定义在代数曲线和曲面上的对象。因此&#xff0c;表示多项式、代数扩展和有限域的类型在相关的实现中扮演着更加重要的角色。为了跟上这些变化&#xff0c;我们引入了这个软件包。由于引入的框架必须特别支持多项式…

docker常见命令

命令 说明 docker pull 拉取镜像 docker push 推送镜像到DockerRegistry docker images 查看本地镜像 docker rmi 删除本地镜像 docker run 创建并运行容器&#xff08;不能重复创建&#xff09; docker stop 停止指定容器 docker start 启动指定容器 docker r…

window环境下使用k8s部署.net core项目

前提&#xff1a;已经部署镜像到Docker 在项目发布目录下新建.yaml文件&#xff0c;内容如下&#xff08;以下仅举例出两种方式内容&#xff0c;可按需自由配置&#xff09; --方式一(创建deployment 、服务、指定命名空间) # ------------------- 注意层级结构&#xff0c;…

电脑远程桌面选项变成灰色没办法勾选怎么办?

有些人在使用Windows系统自带的远程桌面工具时&#xff0c;会发现系统属性远程桌面选项卡中勾选启用“允许远程连接到此计算机”。 导致此问题出现的原因主要是由于组策略或者注册表设置错误造成的。 修复远程桌面选项变灰的两种方法&#xff01; 方法一&#xff1a;设置本地组…

从spark streaming与structured streaming看spark core与spark sql的区别

导读 Spark中针对流式数据处理的方案有&#xff1a; Spark StreamingStructured Streaming 本文通过对比spark streaming与structured streaming&#xff0c;来深入理解spark core与spark sql的区别。 Spark Streaming 基于微批(DStream) Spark Streaming是基于微批(Micro batc…

1.【Labview白话系列】Labview数组精讲

题主经过写文章一段时间的发现&#xff0c;许多同学对该软件的理解和编程能力是不太一样的&#xff0c;有些知识相对一些同学较为简单&#xff0c;但是有些同学提问就比较困难。那么针对这个问题&#xff0c;题主打算出一期说白话系列的专栏&#xff0c;在该栏目中用最通俗的大…

pycharm手动安装常用插件

下载插件 &#xff08;1&#xff09;下载地址&#xff1a;JetBrains Marketplace 这里以语言包为例子 2、中文语言包 进入pycharm中的设置&#xff0c;点击plugins,选从磁盘中安装插件

六、矩阵问题

73、矩阵置零&#xff08;中等&#xff09; 题目描述 给定一个 m x n 的矩阵&#xff0c;如果一个元素为 0 &#xff0c;则将其所在行和列的所有元素都设为 0 。请使用 原地 算法。 示例 1&#xff1a; 输入&#xff1a;matrix [[1,1,1],[1,0,1],[1,1,1]] 输出&#xff1a…

应用案例 | Softing echocollect e网关助力汽车零部件制造商构建企业数据库,提升生产效率和质量

为了提高生产质量和效率&#xff0c;某知名汽车零部件制造商采用了Softing echocollect e多协议数据采集网关——从机器和设备中获取相关数据&#xff0c;并直接将数据存储在中央SQL数据库系统中用于分析处理&#xff0c;从而实现了持续监控和生产过程的改进。 一 背景 该企业…

【国家机关办公建筑 大型公共建筑的能耗监测、集中统一管理】安科瑞能耗监测系统整体解决方案

背景 为全面推进大型公建节能管理工作&#xff0c;需建立大型公建节能监管体系&#xff0c;逐步建立起全国联网的大型公建能耗监测平台&#xff0c;在大型公建安装分项计量装置&#xff0c;通过远程传输等手段及时采集分析能耗数据&#xff0c;实现对大型公建的实时动态监测、汇…

Docker数据卷的挂载

目录 1 概念 2 常用命令 3 操作步骤(主要讲在创建容器时的挂载) 3.1 挂载在默认目录 3.2 挂载在自定义目录 4 附加内容(查看容器的挂载情况) 1 概念 数据卷&#xff08;volume&#xff09;是一个虚拟目录&#xff0c;是容器内目录与宿主机目录之间映射的桥梁。这样容器内…

微服务day05-Gateway网关

Gateway网关 为了防止微服务能被任何身份的人访问&#xff0c;需要对访问微服务的人做身份认证和权限校验。网关的功能就是对访问用户进行身份认证和权限校验。网关具有3种功能&#xff1a; 身份验证和权限校验&#xff1a;网关作为微服务入口&#xff0c;需要校验用户是是否…

git 如何将多个提交点合并为一个提交点 commit

文章目录 核心命令详细使用模式总结示例 核心命令 git merge branch2 是将分支branch2的提交点合并到本地当前分支。 而在执行这条命令的时候&#xff0c;加一个选项--squash就表示在合并的时候将多个提交点合并为一个提交点。 git merge --squash branch2 先看squash单词的意…

React Hooks 完全指南:无类组件革命

目录 ​编辑 前言 Hooks的前世 函数组件 类组件 状态和生命周期的管理 Hooks用途以及相应代码 状态管理 用于生命周期管理和副作用操作的 Hooks 用于上下文管理的 Hooks 其他用途的 Hooks 前言 React Hooks 是在 React 16.8 版本中引入的一个非常强大的新特性&…

建筑外窗遮阳系数测试的太阳光模拟器

太阳光模拟器是一种用于测试建筑外窗遮阳系数的高科技设备。它能够模拟太阳光照射房屋的情景&#xff0c;帮助建筑师和设计师更好地了解建筑外窗的遮阳性能&#xff0c;从而提高建筑的能源效率和舒适度。 这种模拟器的工作原理非常简单&#xff0c;它通过使用高亮度的光源和精…

scrapy 爬虫:多线程爬取去微博热搜排行榜数据信息,进入详情页面拿取第一条微博信息,保存到本地text文件、保存到excel

如果想要保存到excel中可以看我的这个爬虫 使用Scrapy 框架开启多进程爬取贝壳网数据保存到excel文件中&#xff0c;包括分页数据、详情页数据&#xff0c;新手保护期快来看&#xff01;&#xff01;仅供学习参考&#xff0c;别乱搞_爬取贝壳成交数据c端用户登录-CSDN博客 最终…