Flink StreamTask启动和执行源码分析

文章目录

  • 前言
  • StreamTask 部署启动
  • Task 线程启动
  • StreamTask 初始化
  • StreamTask 执行


前言

Flink的StreamTask的启动和执行是一个复杂的过程,涉及多个关键步骤。以下是StreamTask启动和执行的主要流程:

  1. 初始化:StreamTask的初始化阶段涉及多个任务,包括Operator的配置、task特定的初始化以及初始化算子的State等。在这个阶段,Flink将业务处理函数抽象为operator,并通过operatorChain将业务代码串起来执行,以完成业务逻辑的处理。同时,还会调用具体task的init方法进行初始化。
  2. 读取数据和事件:StreamTask通过mailboxProcessor读取数据和事件。
  3. 运行业务逻辑:在StreamTask的beforeInvoke方法中,主要调用生成operatorChain并执行相关的业务逻辑。这些业务逻辑可能包括Source算子和map算子等,它们将被Chain在一起并在一个线程内同步执行。
  4. 资源清理:在执行完业务逻辑后,StreamTask会进行关闭和资源清理的操作,这部分在afterInvoke阶段完成。

值得注意的是,从资源角度来看,每个TaskManager内部有多个slot,每个slot内部运行着一个subtask,即每个slot内部运行着一个StreamTask。这意味着StreamTask是由TaskManager(TM)部署并执行的本地处理单元。

总的来说,Flink的StreamTask启动和执行是一个由多个阶段和组件协同工作的过程,涉及数据的读取、业务逻辑的执行以及资源的清理等多个方面。这些步骤确保了StreamTask能够高效、准确地处理数据流,并满足实时计算和分析的需求。


StreamTask 部署启动

当 TaskExecutor 接收提交 Task 执行的请求,则调用:

TaskExecutor.submitTask(TaskDeploymentDescriptor tdd, 
JobMasterId jobMasterId,Time timeout){// 构造 Task 对象Task task = new Task(jobInformation, taskInformation, ExecutionAttemptId,AllocationId, SubtaskIndex, ....);// 启动 Task 的执行task.startTaskThread();
}

Task对象的构造方法

public Task(.....){
// 封装一个 Task信息对象 TaskInfo,(TaskInfo, JobInfo,JobMasterInfo)
this.taskInfo = new TaskInfo(....);
// 各种成员变量赋值
......
// 一个Task的执行有输入也有输出: 关于输出的抽象: ResultPartition 和
ResultSubPartitionPipelinedSubpartition// 初始化 ResultPartition 和 ResultSubPartition
final ResultPartitionWriter[] resultPartitionWriters =
shuffleEnvironment.createResultPartitionWriters(....);
this.consumableNotifyingPartitionWriters =
ConsumableNotifyingResultPartitionWriterDecorator.decorate(....);
// 一个Task的执行有输入也有输出: 关于输入的抽象: InputGate 和 InputChannel(从上有
一个Task节点拉取数据)
// InputChannel 可能有两种实现: Local Remote
// 初始化 InputGate 和 InputChannel
final IndexedInputGate[] gates = shuffleEnvironment.createInputGates(.....);
// 初始化一个用来执行 Task 的线程,目标对象,就是 Task 自己
executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask);
}

Task 线程启动

Task 的启动,是通过启动 Task 对象的内部 executingThread 来执行 Task 的,具体逻辑在 run 方法中:

private void doRun() {// ----------------------------//  Initial State transition// ----------------------------while (true) {ExecutionState current = this.executionState;if (current == ExecutionState.CREATED) {if (transitionState(ExecutionState.CREATED, ExecutionState.DEPLOYING)) {// success, we can start our workbreak;}} else if (current == ExecutionState.FAILED) {// we were immediately failed. tell the TaskManager that we reached our final statenotifyFinalState();if (metrics != null) {metrics.close();}return;} else if (current == ExecutionState.CANCELING) {if (transitionState(ExecutionState.CANCELING, ExecutionState.CANCELED)) {// we were immediately canceled. tell the TaskManager that we reached our final// statenotifyFinalState();if (metrics != null) {metrics.close();}return;}} else {if (metrics != null) {metrics.close();}throw new IllegalStateException("Invalid state for beginning of operation of task " + this + '.');}}// all resource acquisitions and registrations from here on// need to be undone in the endMap<String, Future<Path>> distributedCacheEntries = new HashMap<>();TaskInvokable invokable = null;try {// ----------------------------//  Task Bootstrap - We periodically//  check for canceling as a shortcut// ----------------------------// activate safety net for task threadLOG.debug("Creating FileSystem stream leak safety net for task {}", this);FileSystemSafetyNet.initializeSafetyNetForThread();// first of all, get a user-code classloader// this may involve downloading the job's JAR files and/or classesLOG.info("Loading JAR files for task {}.", this);userCodeClassLoader = createUserCodeClassloader();final ExecutionConfig executionConfig =serializedExecutionConfig.deserializeValue(userCodeClassLoader.asClassLoader());if (executionConfig.getTaskCancellationInterval() >= 0) {// override task cancellation interval from Flink config if set in ExecutionConfigtaskCancellationInterval = executionConfig.getTaskCancellationInterval();}if (executionConfig.getTaskCancellationTimeout() >= 0) {// override task cancellation timeout from Flink config if set in ExecutionConfigtaskCancellationTimeout = executionConfig.getTaskCancellationTimeout();}if (isCanceledOrFailed()) {throw new CancelTaskException();}// ----------------------------------------------------------------// register the task with the network stack// this operation may fail if the system does not have enough// memory to run the necessary data exchanges// the registration must also strictly be undone// ----------------------------------------------------------------LOG.debug("Registering task at network: {}.", this);setupPartitionsAndGates(partitionWriters, inputGates);for (ResultPartitionWriter partitionWriter : partitionWriters) {taskEventDispatcher.registerPartition(partitionWriter.getPartitionId());}// next, kick off the background copying of files for the distributed cachetry {for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :DistributedCache.readFileInfoFromConfig(jobConfiguration)) {LOG.info("Obtaining local cache file for '{}'.", entry.getKey());Future<Path> cp =fileCache.createTmpFile(entry.getKey(), entry.getValue(), jobId, executionId);distributedCacheEntries.put(entry.getKey(), cp);}} catch (Exception e) {throw new Exception(String.format("Exception while adding files to distributed cache of task %s (%s).",taskNameWithSubtask, executionId),e);}if (isCanceledOrFailed()) {throw new CancelTaskException();}// ----------------------------------------------------------------//  call the user code initialization methods// ----------------------------------------------------------------TaskKvStateRegistry kvStateRegistry =kvStateService.createKvStateTaskRegistry(jobId, getJobVertexId());Environment env =new RuntimeEnvironment(jobId,vertexId,executionId,executionConfig,taskInfo,jobConfiguration,taskConfiguration,userCodeClassLoader,memoryManager,ioManager,broadcastVariableManager,taskStateManager,aggregateManager,accumulatorRegistry,kvStateRegistry,inputSplitProvider,distributedCacheEntries,partitionWriters,inputGates,taskEventDispatcher,checkpointResponder,operatorCoordinatorEventGateway,taskManagerConfig,metrics,this,externalResourceInfoProvider);// Make sure the user code classloader is accessible thread-locally.// We are setting the correct context class loader before instantiating the invokable// so that it is available to the invokable during its entire lifetime.executingThread.setContextClassLoader(userCodeClassLoader.asClassLoader());// When constructing invokable, separate threads can be constructed and thus should be// monitored for system exit (in addition to invoking thread itself monitored below).FlinkSecurityManager.monitorUserSystemExitForCurrentThread();try {// now load and instantiate the task's invokable codeinvokable =loadAndInstantiateInvokable(userCodeClassLoader.asClassLoader(), nameOfInvokableClass, env);} finally {FlinkSecurityManager.unmonitorUserSystemExitForCurrentThread();}//  actual task core work// we must make strictly sure that the invokable is accessible to the cancel() call// by the time we switched to running.this.invokable = invokable;restoreAndInvoke(invokable);// make sure, we enter the catch block if the task leaves the invoke() method due// to the fact that it has been canceledif (isCanceledOrFailed()) {throw new CancelTaskException();}// ----------------------------------------------------------------//  finalization of a successful execution// ----------------------------------------------------------------// finish the produced partitions. if this fails, we consider the execution failed.for (ResultPartitionWriter partitionWriter : partitionWriters) {if (partitionWriter != null) {partitionWriter.finish();}}// try to mark the task as finished// if that fails, the task was canceled/failed in the meantimeif (!transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED)) {throw new CancelTaskException();}} catch (Throwable t) {}  }

StreamTask 初始化

StreamTask 初始化指的就是 SourceStreamTask 和 OneInputStreamTask 的实例对象的构建!Task 这个类,只是一个笼统意义上的 Task,就是一个通用 Task 的抽象,不管是批处理的,还是流式处理的,不管是 源Task, 还是逻辑处理 Task, 都被抽象成 Task 来进行调度执行!

StreamTask 执行

核心步骤如下:

public final void invoke() throws Exception {
// Task 正式工作之前
beforeInvoke();
// Task 开始工作: 针对数据执行正儿八经的逻辑处理
runMailboxLoop();
// Task 要结束
afterInvoke();
// Task 最后执行清理
cleanUpInvoke();
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/735422.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

蓝桥杯2023年-接龙数列(dp)

题目描述 对于一个长度为 K 的整数数列&#xff1a;A1, A2, . . . , AK&#xff0c;我们称之为接龙数列当且仅当 Ai 的首位数字恰好等于 Ai−1 的末位数字 (2 ≤ i ≤ K)。 例如 12, 23, 35, 56, 61, 11 是接龙数列&#xff1b;12, 23, 34, 56 不是接龙数列&#xff0c;因为 …

前端学习之行内和块级标签

行内标签 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>span</title> </head> <body><!-- 行内标签特点&#xff1a;1、不换行,一行可以放多个2、默认宽度内容撑开代表&#…

[2023年]-hadoop面试真题(一)

&#xff08;北京&#xff09;HDFS底层存储原理? (北京) HDFS读写数据流程? (北京) HDFS如何管理元数据或者checkpoint的理解 ? (北京) HDFS常用命令 ? (北京) hadoop调优 (北京) HDFS扩容原理 (北京) HDFS有哪些进程,分别是什么? (北京) HDFS中大量小文件对…

Go实现日志2——支持结构化和hook

代码保存在&#xff1a;https://github.com/liwook/Go-projects/tree/main/log/sulogV2​​​​​​​ 1.日志结构化 日志记录的事件以结构化格式(键值对&#xff0c;或通常是 JSON)表示&#xff0c;随后可以通过编程方式对其进行解析&#xff0c;便于对日志进行监控、警报、…

Googlenet网络架构

原文链接&#xff1a;[1409.4842v1] Going Deeper with Convolutions (arxiv.org) 图源&#xff1a;深入解读GoogLeNet网络结构&#xff08;附代码实现&#xff09;-CSDN博客 表截自原文 以下&#x1f4d2;来自博客深入解读GoogLeNet网络结构&#xff08;附代码实现&#xff0…

【顶刊|修正】多区域综合能源系统热网建模及系统运行优化【复现+延伸】

目录 主要内容 部分代码 结果一览 下载链接 主要内容 该程序复现《多区域综合能源系统热网建模及系统运行优化》模型并进一步延伸&#xff0c;基于传热学的基本原理建立了区域热网能量传输通用模型&#xff0c;对热网热损方程线性化实现热网能量流建模&#xff0…

使用docker-compose编排ruoyi项目

目录 一、开始部署 1.拉取ruoyi代码 2.拉取node镜像 3.拉取maven镜像 4.在/root/ruoyi/java下写一个Dockerfile用于后端Java环境 5.拉取MySQL&#xff0c;Redis&#xff0c;Nginx镜像 6.在/root/java目录下写一个nginx.conf 7.在/root/ruoyi目录下写docker-compose.yml文…

Idea导入Maven项目

方法一&#xff1a;使用Maven面板 方法二&#xff1a;在项目结构中设置&#xff0c;在最后一步中选择pom.xml。

js【详解】bind()、call()、apply()( 含手写 bind,手写 call,手写 apply )

必备知识点&#xff1a;js 【详解】函数中的 this 指向_js function this-CSDN博客 https://blog.csdn.net/weixin_41192489/article/details/123093256 bind、call、apply 的相同点 都是Function原型上的方法用途都是改变 this 的指向第一个参数都是新的 this bind、call、app…

前端学习之列表标签

目录 有序列表 结果 无序标签 结果 数据标签 结果 有序列表 &#xff08;注&#xff1a;注释是解释&#xff09; <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>Document</title> </…

SpringBoot实现 PDF 添加水印

方案一&#xff1a;使用 Apache PDFBox 库 ①、依赖 <dependency><groupId>org.apache.pdfbox</groupId><artifactId>pdfbox</artifactId><version>2.0.24</version> </dependency>②、添加水印 public class PdfoxWaterma…

蓝桥集训之日期差值

蓝桥集训之日期差值 模版&#xff1a;判断闰年 总天数 月份天数 #include <iostream>#include <cstring>#include <algorithm>using namespace std;const int months[]{0, 31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31};int is_leap(int y){if(y % 10…

【JavaEE初阶系列】——计算机是如何工作的

目录 &#x1f388;冯诺依曼体系 ❗外存和内存的概念 ❗CPU中央处理器—人类当今科技领域巅峰之作之一 &#x1f6a9;如何衡量cpu &#x1f6a9;指令&#xff08;Instruction&#xff09; &#x1f388;操作系统&#xff08;Operating System&#xff09; &#x1f388;…

关于GPU显卡的介绍

一.关于英伟达历代产品架构 显卡是一种计算机硬件设备,也被称为显示适配器或图形处理器。目前的硬件部分主要由主板、芯片、存储器、散热器&#xff08;散热片、风扇&#xff09;等部分。显卡的主要芯片是显卡的主要处理单元。显卡上也有和计算机存储器相似的存储器&#xff0…

聊聊.NET中的连接池

在.NET中&#xff0c;连接池被广泛用于管理和优化不同类型资源的连接。连接池可以减少建立和关闭连接所需的时间和资源消耗&#xff0c;从而提高了应用程序的性能和响应能力。 HttpClient中的连接池 System.Net.Http.HttpClient 类用于发送 HTTP 请求以及从 URI 所标识的资源…

安全测试报告-模板内容

1. 概述 为检验XXXX平台 系统的安全性&#xff0c;于 XXXX年 XX 月 XX 日至 XXXX年 XX 月 XX日对目标系统进行了安全测试。在此期间测试人员将使用各 种非破坏性质的攻击手段&#xff0c;对目标系统做深入的探测分析&#xff0c;进而挖掘系统中的安 全漏洞和风险隐患。研发团队…

代码讲解:如何把3D数据转换成旋转的视频?

目录 3D数据集下载 读取binvox文件 使用matplotlib创建图 动画效果 完整代码 3D数据集下载 这里以shapenet数据集为例&#xff0c;可以访问外网的可以去直接申请下载&#xff1b;我也准备了一个备份在百度网盘的数据集&#xff0c;可以参考&#xff1a; ShapeNet简介和下…

Java适配器模式源码剖析及使用场景

文章目录 一、适配器模式介绍二、大白话理解三、 项目案例四、Java源码 一、适配器模式介绍 适配器模式(Adapter Pattern)是一种结构型设计模式,它作用于将一个类的接口转换成客户端所期望的另一种接口,从而使原本由于接口不兼容而无法一起工作的那些类可以在一起工作。它属于…

Vue3中Vue Router的使用区别

在 Vue 3 中&#xff0c;useRouter 和 useRoute 是两个用于 Vue Router 的 Composition API 函数&#xff0c;它们的用途和返回的对象不同&#xff0c;接下来详细了解一下它们的区别以及如何正确使用它们。 useRouter useRouter 用于获取 router 实例&#xff0c;这个实例提供…

macOS14.4安装FFmpeg及编译FFmpeg源码

下载二进制及源码包 二进制 使用brew安装ffmpeg : brew install ffmpeg 成功更新到ffmpeg6.1 下载FFmpeg源码