Flink StreamGraph生成过程

文章目录

    • 概要
    • SteramGraph 核心对象
    • SteramGraph 生成过程

概要

在 Flink 中,StreamGraph 是数据流的逻辑表示,它描述了如何在 Flink 作业中执行数据流转换。StreamGraph 是 Flink 运行时生成执行计划的基础。
在这里插入图片描述
使用DataStream API开发的应用程序,首先被转换为 Transformation,再被映射为StreamGraph,在客户端进行StreamGraph、JobGraph的转换,提交JobGraph到Flink集群后,Flink集群负责将JobGraph转换为ExecutionGraph,之后进入调度执行阶段。

SteramGraph 核心对象

  • StreamNode
    StreamNode 是 StremGraph 中的节点 ,从 Transformation 转换而来,可以简单理解为一个 StreamNode 表示一个算子,从逻辑上来说,SteramNode 在 StreamGraph 中存在实体和虚拟的 StreamNode。StremNode 可以有多个输入,也可以有多个输出。
    实体的 StreamNode 会最终变成物理算子。虚拟的 StreamNode 会附着在 StreamEdge 上。
  • StreamEdge
    StreamEdge 是 StreamGraph 中的边,用来连接两个 StreamNode,一个 StreamNode 可以有多个出边、入边,StreamEdge 中包含了旁路输出、分区器、字段筛选输出等信息。

SteramGraph 生成过程

StreamGraph 在 FlinkClient 中生成,由 FlinkClient 在提交的时候触发 Flink 应用的 main 方法,用户编写的业务逻辑组装成 Transformation 流水线,在最后调用 StreamExecutionEnvironment.execute() 的时候开始触发 StreamGraph 构建。
StreamGraph在Flink的作业提交前生成,生成StreamGraph的入口在StreamExecutionEnvironment中

    @Internalpublic StreamGraph getStreamGraph() {return this.getStreamGraph(this.getJobName());}@Internalpublic StreamGraph getStreamGraph(String jobName) {return this.getStreamGraph(jobName, true);}@Internalpublic StreamGraph getStreamGraph(String jobName, boolean clearTransformations) {StreamGraph streamGraph = this.getStreamGraphGenerator().setJobName(jobName).generate();if (clearTransformations) {this.transformations.clear();}return streamGraph;}private StreamGraphGenerator getStreamGraphGenerator() {if (this.transformations.size() <= 0) {throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");} else {RuntimeExecutionMode executionMode = (RuntimeExecutionMode)this.configuration.get(ExecutionOptions.RUNTIME_MODE);return (new StreamGraphGenerator(this.transformations, this.config, this.checkpointCfg, this.getConfiguration())).setRuntimeExecutionMode(executionMode).setStateBackend(this.defaultStateBackend).setChaining(this.isChainingEnabled).setUserArtifacts(this.cacheFile).setTimeCharacteristic(this.timeCharacteristic).setDefaultBufferTimeout(this.bufferTimeout);}}

StreamGraph实际上是在StreamGraphGenerator中生成的,从SinkTransformation(输出向前追溯到SourceTransformation)。在遍历过程中一边遍历一遍构建StreamGraph,如代码清单所示


@Internal
public class StreamGraphGenerator {private final List<Transformation<?>> transformations;private StateBackend stateBackend;private static final Map<Class<? extends Transformation>, TransformationTranslator<?, ? extends Transformation>> translatorMap;protected static Integer iterationIdCounter;private StreamGraph streamGraph;private Map<Transformation<?>, Collection<Integer>> alreadyTransformed;public StreamGraphGenerator(List<Transformation<?>> transformations, ExecutionConfig executionConfig, CheckpointConfig checkpointConfig) {this(transformations, executionConfig, checkpointConfig, new Configuration());}public StreamGraphGenerator(List<Transformation<?>> transformations, ExecutionConfig executionConfig, CheckpointConfig checkpointConfig, ReadableConfig configuration) {this.chaining = true;this.timeCharacteristic = DEFAULT_TIME_CHARACTERISTIC;this.jobName = "Flink Streaming Job";this.savepointRestoreSettings = SavepointRestoreSettings.none();this.defaultBufferTimeout = -1L;this.runtimeExecutionMode = RuntimeExecutionMode.STREAMING;this.transformations = (List)Preconditions.checkNotNull(transformations);this.executionConfig = (ExecutionConfig)Preconditions.checkNotNull(executionConfig);this.checkpointConfig = new CheckpointConfig(checkpointConfig);this.configuration = (ReadableConfig)Preconditions.checkNotNull(configuration);}public StreamGraph generate() {this.streamGraph = new StreamGraph(this.executionConfig, this.checkpointConfig, this.savepointRestoreSettings);this.shouldExecuteInBatchMode = this.shouldExecuteInBatchMode(this.runtimeExecutionMode);this.configureStreamGraph(this.streamGraph);this.alreadyTransformed = new HashMap();Iterator var1 = this.transformations.iterator();while(var1.hasNext()) {Transformation<?> transformation = (Transformation)var1.next();this.transform(transformation);}StreamGraph builtStreamGraph = this.streamGraph;this.alreadyTransformed.clear();this.alreadyTransformed = null;this.streamGraph = null;return builtStreamGraph;}private Collection<Integer> transform(Transformation<?> transform) {if (this.alreadyTransformed.containsKey(transform)) {return (Collection)this.alreadyTransformed.get(transform);} else {LOG.debug("Transforming " + transform);if (transform.getMaxParallelism() <= 0) {int globalMaxParallelismFromConfig = this.executionConfig.getMaxParallelism();if (globalMaxParallelismFromConfig > 0) {transform.setMaxParallelism(globalMaxParallelismFromConfig);}}transform.getOutputType();TransformationTranslator<?, Transformation<?>> translator = (TransformationTranslator)translatorMap.get(transform.getClass());Collection transformedIds;if (translator != null) {transformedIds = this.translate(translator, transform);} else {transformedIds = this.legacyTransform(transform);}if (!this.alreadyTransformed.containsKey(transform)) {this.alreadyTransformed.put(transform, transformedIds);}return transformedIds;}}private Collection<Integer> legacyTransform(Transformation<?> transform) {Collection transformedIds;if (transform instanceof FeedbackTransformation) {transformedIds = this.transformFeedback((FeedbackTransformation)transform);} else {if (!(transform instanceof CoFeedbackTransformation)) {throw new IllegalStateException("Unknown transformation: " + transform);}transformedIds = this.transformCoFeedback((CoFeedbackTransformation)transform);}if (transform.getBufferTimeout() >= 0L) {this.streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());} else {this.streamGraph.setBufferTimeout(transform.getId(), this.defaultBufferTimeout);}if (transform.getUid() != null) {this.streamGraph.setTransformationUID(transform.getId(), transform.getUid());}if (transform.getUserProvidedNodeHash() != null) {this.streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());}if (!this.streamGraph.getExecutionConfig().hasAutoGeneratedUIDsEnabled() && transform instanceof PhysicalTransformation && transform.getUserProvidedNodeHash() == null && transform.getUid() == null) {throw new IllegalStateException("Auto generated UIDs have been disabled but no UID or hash has been assigned to operator " + transform.getName());} else {if (transform.getMinResources() != null && transform.getPreferredResources() != null) {this.streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources());}this.streamGraph.setManagedMemoryUseCaseWeights(transform.getId(), transform.getManagedMemoryOperatorScopeUseCaseWeights(), transform.getManagedMemorySlotScopeUseCases());return transformedIds;}}private <T> Collection<Integer> transformFeedback(FeedbackTransformation<T> iterate) {if (this.shouldExecuteInBatchMode) {throw new UnsupportedOperationException("Iterations are not supported in BATCH execution mode. If you want to execute such a pipeline, please set the '" + ExecutionOptions.RUNTIME_MODE.key() + "'=" + RuntimeExecutionMode.STREAMING.name());} else if (iterate.getFeedbackEdges().size() <= 0) {throw new IllegalStateException("Iteration " + iterate + " does not have any feedback edges.");} else {List<Transformation<?>> inputs = iterate.getInputs();Preconditions.checkState(inputs.size() == 1);Transformation<?> input = (Transformation)inputs.get(0);List<Integer> resultIds = new ArrayList();Collection<Integer> inputIds = this.transform(input);resultIds.addAll(inputIds);if (this.alreadyTransformed.containsKey(iterate)) {return (Collection)this.alreadyTransformed.get(iterate);} else {Tuple2<StreamNode, StreamNode> itSourceAndSink = this.streamGraph.createIterationSourceAndSink(iterate.getId(), getNewIterationNodeId(), getNewIterationNodeId(), iterate.getWaitTime(), iterate.getParallelism(), iterate.getMaxParallelism(), iterate.getMinResources(), iterate.getPreferredResources());StreamNode itSource = (StreamNode)itSourceAndSink.f0;StreamNode itSink = (StreamNode)itSourceAndSink.f1;this.streamGraph.setSerializers(itSource.getId(), (TypeSerializer)null, (TypeSerializer)null, iterate.getOutputType().createSerializer(this.executionConfig));this.streamGraph.setSerializers(itSink.getId(), iterate.getOutputType().createSerializer(this.executionConfig), (TypeSerializer)null, (TypeSerializer)null);resultIds.add(itSource.getId());this.alreadyTransformed.put(iterate, resultIds);List<Integer> allFeedbackIds = new ArrayList();Iterator var10 = iterate.getFeedbackEdges().iterator();while(var10.hasNext()) {Transformation<T> feedbackEdge = (Transformation)var10.next();Collection<Integer> feedbackIds = this.transform(feedbackEdge);allFeedbackIds.addAll(feedbackIds);Iterator var13 = feedbackIds.iterator();while(var13.hasNext()) {Integer feedbackId = (Integer)var13.next();this.streamGraph.addEdge(feedbackId, itSink.getId(), 0);}}String slotSharingGroup = this.determineSlotSharingGroup((String)null, allFeedbackIds);if (slotSharingGroup == null) {slotSharingGroup = "SlotSharingGroup-" + iterate.getId();}itSink.setSlotSharingGroup(slotSharingGroup);itSource.setSlotSharingGroup(slotSharingGroup);return resultIds;}}}private <F> Collection<Integer> transformCoFeedback(CoFeedbackTransformation<F> coIterate) {if (this.shouldExecuteInBatchMode) {throw new UnsupportedOperationException("Iterations are not supported in BATCH execution mode. If you want to execute such a pipeline, please set the '" + ExecutionOptions.RUNTIME_MODE.key() + "'=" + RuntimeExecutionMode.STREAMING.name());} else {Tuple2<StreamNode, StreamNode> itSourceAndSink = this.streamGraph.createIterationSourceAndSink(coIterate.getId(), getNewIterationNodeId(), getNewIterationNodeId(), coIterate.getWaitTime(), coIterate.getParallelism(), coIterate.getMaxParallelism(), coIterate.getMinResources(), coIterate.getPreferredResources());StreamNode itSource = (StreamNode)itSourceAndSink.f0;StreamNode itSink = (StreamNode)itSourceAndSink.f1;this.streamGraph.setSerializers(itSource.getId(), (TypeSerializer)null, (TypeSerializer)null, coIterate.getOutputType().createSerializer(this.executionConfig));this.streamGraph.setSerializers(itSink.getId(), coIterate.getOutputType().createSerializer(this.executionConfig), (TypeSerializer)null, (TypeSerializer)null);Collection<Integer> resultIds = Collections.singleton(itSource.getId());this.alreadyTransformed.put(coIterate, resultIds);List<Integer> allFeedbackIds = new ArrayList();Iterator var7 = coIterate.getFeedbackEdges().iterator();while(var7.hasNext()) {Transformation<F> feedbackEdge = (Transformation)var7.next();Collection<Integer> feedbackIds = this.transform(feedbackEdge);allFeedbackIds.addAll(feedbackIds);Iterator var10 = feedbackIds.iterator();while(var10.hasNext()) {Integer feedbackId = (Integer)var10.next();this.streamGraph.addEdge(feedbackId, itSink.getId(), 0);}}String slotSharingGroup = this.determineSlotSharingGroup((String)null, allFeedbackIds);itSink.setSlotSharingGroup(slotSharingGroup);itSource.setSlotSharingGroup(slotSharingGroup);return Collections.singleton(itSource.getId());}}private Collection<Integer> translate(TransformationTranslator<?, Transformation<?>> translator, Transformation<?> transform) {Preconditions.checkNotNull(translator);Preconditions.checkNotNull(transform);List<Collection<Integer>> allInputIds = this.getParentInputIds(transform.getInputs());if (this.alreadyTransformed.containsKey(transform)) {return (Collection)this.alreadyTransformed.get(transform);} else {String slotSharingGroup = this.determineSlotSharingGroup(transform.getSlotSharingGroup(), (Collection)allInputIds.stream().flatMap(Collection::stream).collect(Collectors.toList()));Context context = new StreamGraphGenerator.ContextImpl(this, this.streamGraph, slotSharingGroup, this.configuration);return this.shouldExecuteInBatchMode ? translator.translateForBatch(transform, context) : translator.translateForStreaming(transform, context);}}private List<Collection<Integer>> getParentInputIds(@Nullable Collection<Transformation<?>> parentTransformations) {List<Collection<Integer>> allInputIds = new ArrayList();if (parentTransformations == null) {return allInputIds;} else {Iterator var3 = parentTransformations.iterator();while(var3.hasNext()) {Transformation<?> transformation = (Transformation)var3.next();allInputIds.add(this.transform(transformation));}return allInputIds;}}private String determineSlotSharingGroup(String specifiedGroup, Collection<Integer> inputIds) {if (specifiedGroup != null) {return specifiedGroup;} else {String inputGroup = null;Iterator var4 = inputIds.iterator();while(var4.hasNext()) {int id = (Integer)var4.next();String inputGroupCandidate = this.streamGraph.getSlotSharingGroup(id);if (inputGroup == null) {inputGroup = inputGroupCandidate;} else if (!inputGroup.equals(inputGroupCandidate)) {return "default";}}return inputGroup == null ? "default" : inputGroup;}}static {DEFAULT_TIME_CHARACTERISTIC = TimeCharacteristic.ProcessingTime;Map<Class<? extends Transformation>, TransformationTranslator<?, ? extends Transformation>> tmp = new HashMap();tmp.put(OneInputTransformation.class, new OneInputTransformationTranslator());tmp.put(TwoInputTransformation.class, new TwoInputTransformationTranslator());tmp.put(MultipleInputTransformation.class, new MultiInputTransformationTranslator());tmp.put(KeyedMultipleInputTransformation.class, new MultiInputTransformationTranslator());tmp.put(SourceTransformation.class, new SourceTransformationTranslator());tmp.put(SinkTransformation.class, new SinkTransformationTranslator());tmp.put(LegacySinkTransformation.class, new LegacySinkTransformationTranslator());tmp.put(LegacySourceTransformation.class, new LegacySourceTransformationTranslator());tmp.put(UnionTransformation.class, new UnionTransformationTranslator());tmp.put(PartitionTransformation.class, new PartitionTransformationTranslator());tmp.put(SideOutputTransformation.class, new SideOutputTransformationTranslator());tmp.put(ReduceTransformation.class, new ReduceTransformationTranslator());tmp.put(TimestampsAndWatermarksTransformation.class, new TimestampsAndWatermarksTransformationTranslator());tmp.put(BroadcastStateTransformation.class, new BroadcastStateTransformationTranslator());tmp.put(KeyedBroadcastStateTransformation.class, new KeyedBroadcastStateTransformationTranslator());translatorMap = Collections.unmodifiableMap(tmp);iterationIdCounter = 0;} 
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/726024.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Netty权威指南——基础篇4 网络通信基础 备份

1 TCP粘包/拆包 TCP是个“流”协议&#xff0c;所谓流&#xff0c;就是没有界限的一串数字。可以想象河里流水&#xff0c;是连成一片的&#xff0c;其间没有分界线。TCP底层并不了解上层业务数据的具体含义&#xff0c;它会根据TCP缓冲区的实际情况进行包的划分&#xff0c;一…

OpenHarmony教程指南—Navigation开发 页面切换场景范例

简介 在应用开发时&#xff0c;我们常常遇到&#xff0c;需要在应用内多页面跳转场景时中使用Navigation导航组件做统一的页面跳转管理&#xff0c;它提供了一系列属性方法来设置页面的标题栏、工具栏以及菜单栏的各种展示样式。除此之外还拥有动态加载&#xff0c;navPathSta…

onlyoffice监听https

修改onlyoffice 在开始将您的ONLYOFFICE Docs切换到HTTPS协议之前&#xff0c;您需要创建一个安全证书和证书私钥。将它们放到安装ONLYOFFICE Docs的计算机上的一个文件夹中。 获得证书后&#xff0c;请执行以下步骤&#xff1a; 所有命令都应以管理员权限执行。要以管理员身份…

递推与递归DFS

&#xff1b;例题引入&#xff1a; 在跳楼梯问题中&#xff0c;我们假设每次可以跳1级或2级。如果我们想跳到第N级台阶&#xff0c;那么我们的最后一次跳跃只能是1级或2级。 如果我们最后一次跳1级&#xff0c;那么我们必须先跳到第N-1级台阶。由于跳到第N-1级台阶有f(N-1)种方…

快速了解Redis

Redis是什么&#xff1f; Redis是一个数据库&#xff0c;是一个跨平台的非关系型数据库&#xff0c;Redis完全开源&#xff0c;遵守BSD协议。它通过键值对(Key-Value)的形式存储数据。 它与mysql数据库有什么区别&#xff1f; redis通过键值对(Key-Value)的形式存储数据&…

springboot源码解析之Model和Map参数解析

springboot源码解析之Model和Map参数解析 标签:源码:springboot 测试代码 Controller public class HelloController {RequestMapping("/helloModelAndMap")public String helloModelAndMap(HttpServletRequest request, Model model, Map<String, Object> …

万物皆可Find My,伦茨科技ST17H6x芯片赋能产品苹果Find My功能

苹果的Find My功能使得用户可以轻松查找iPhone、Mac、AirPods以及Apple Watch等设备。如今Find My还进入了耳机、充电宝、箱包、电动车、保温杯等多个行业。苹果发布AirTag发布以来&#xff0c;大家都更加注重物品的防丢&#xff0c;苹果的 Find My 就可以查找 iPhone、Mac、Ai…

[ISP]DCT离散余弦变换及C++代码demo

1.基本定义 离散余弦变换(DCT for Discrete Cosine Transform) DCT&#xff08;Discrete Cosine Transform&#xff0c;离散余弦变换&#xff09;是一种常用的信号处理技术&#xff0c;广泛应用于图像处理、音频处理、视频压缩等领域。DCT将一个信号或数据序列从时域&#xf…

【Tauri】(5):本地运行candle和 qwen 大模型,并测试速度

1&#xff0c;本地运行candle 关于candle项目 https://github.com/huggingface/candle Hugging Face 使用rust开发的高性能推理框架。 语法简单&#xff0c; 风格与 PyTorch 相似。 CPU 和 Cuda Backend&#xff1a;m1、f16、bf16。 支持 Serverless&#xff08;CPU&#xff…

简单两步,从补税到退税

大家好&#xff0c;我是拭心。 最近到了一年一度的个人所得税年度申报时期&#xff0c;有人可以退好几千&#xff0c;而有的人则需要补上万元&#xff0c;人类的悲喜这一刻并不相通。 我申报的时候&#xff0c;提示我需要补税一万多&#xff0c;心有不甘但差一点就认了&#xf…

java SSM科研管理系统myeclipse开发mysql数据库springMVC模式java编程计算机网页设计

一、源码特点 java SSM科研管理系统是一套完善的web设计系统&#xff08;系统采用SSM框架进行设计开发&#xff0c;springspringMVCmybatis&#xff09;&#xff0c;对理解JSP java编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S…

selenium鼠标操作实战

鼠标操作实战 鼠标单击操作 click()内置鼠标操作包ActionChains鼠标双击操作double_click()鼠标右击操作context_click()鼠标指针悬浮操作move_to_element(ele)鼠标拖动操作drag_and_drop(source, target)其他鼠标操作汇总 鼠标单击操作 click() from selenium import webdriv…

python中的文件操作2

文件遍历 在Python中&#xff0c;遍历文件通常指的是逐行读取文件中的内容。这种方式对于处理大型文件特别有用&#xff0c;因为它不需要一次性将整个文件加载到内存中。下面是几种常见的遍历文件内容的方法&#xff1a; 1. 使用with语句和for循环 这是最推荐的方式&#xf…

“2024杭州智慧城市及安防展会”将于4月在杭州博览中心盛大召开

2024杭州国际智慧城市及安防展览会&#xff0c;将于4月24日在杭州国际博览中心盛大开幕。这场备受瞩目的盛会&#xff0c;不仅汇集了全球智慧城市与安防领域的顶尖企业&#xff0c;更是展示最新技术、交流创新理念的重要平台。近日&#xff0c;从组委会传来消息&#xff0c;展会…

【网站项目】089理发店会员管理系统

&#x1f64a;作者简介&#xff1a;拥有多年开发工作经验&#xff0c;分享技术代码帮助学生学习&#xff0c;独立完成自己的项目或者毕业设计。 代码可以私聊博主获取。&#x1f339;赠送计算机毕业设计600个选题excel文件&#xff0c;帮助大学选题。赠送开题报告模板&#xff…

设计模式之模版方法实践

模版方法实践案例 实践之前还是先了解一下模版方法的定义 定义 模板方法模式是一种行为设计模式&#xff0c;它定义了一个骨架&#xff0c;并允许子类在不改变结构的情况下重写的特定步骤。模板方法模式通过在父类中定义一个模板方法&#xff0c;其中包含了主要步骤&#xf…

上海亚商投顾:沪指缩量调整 机器人概念股午后大涨

上海亚商投顾前言&#xff1a;无惧大盘涨跌&#xff0c;解密龙虎榜资金&#xff0c;跟踪一线游资和机构资金动向&#xff0c;识别短期热点和强势个股。 一.市场情绪 沪指昨日缩量震荡&#xff0c;创业板指午后涨超1%&#xff0c;随后上演冲高回落走势。风电、光伏等新能源方向…

Redis第7讲——哨兵模式详解

为了解决主从模式的无法自动容错及恢复的问题&#xff0c;Redis在主从复制的基础上加入了哨兵节点&#xff0c;也就是我们熟悉的哨兵模式。但现在基本不会用到哨兵模式&#xff0c;也就是这种模式只存在于面试中。 一、什么是哨兵模式 ps&#xff1a;主从服务器之间的数据同步…

ping多个IP的工具

Ping Tool 项目地址 python开发的IP搜索小工具 ping一个网段所有IP&#xff0c;显示结果查看某个ip地址开放监听的端口配置可保存