文章目录
- 前言
- StreamTask 部署启动
- Task 线程启动
- StreamTask 初始化
- StreamTask 执行
前言
Flink的StreamTask的启动和执行是一个复杂的过程,涉及多个关键步骤。以下是StreamTask启动和执行的主要流程:
- 初始化:StreamTask的初始化阶段涉及多个任务,包括Operator的配置、task特定的初始化以及初始化算子的State等。在这个阶段,Flink将业务处理函数抽象为operator,并通过operatorChain将业务代码串起来执行,以完成业务逻辑的处理。同时,还会调用具体task的init方法进行初始化。
- 读取数据和事件:StreamTask通过mailboxProcessor读取数据和事件。
- 运行业务逻辑:在StreamTask的beforeInvoke方法中,主要调用生成operatorChain并执行相关的业务逻辑。这些业务逻辑可能包括Source算子和map算子等,它们将被Chain在一起并在一个线程内同步执行。
- 资源清理:在执行完业务逻辑后,StreamTask会进行关闭和资源清理的操作,这部分在afterInvoke阶段完成。
值得注意的是,从资源角度来看,每个TaskManager内部有多个slot,每个slot内部运行着一个subtask,即每个slot内部运行着一个StreamTask。这意味着StreamTask是由TaskManager(TM)部署并执行的本地处理单元。
总的来说,Flink的StreamTask启动和执行是一个由多个阶段和组件协同工作的过程,涉及数据的读取、业务逻辑的执行以及资源的清理等多个方面。这些步骤确保了StreamTask能够高效、准确地处理数据流,并满足实时计算和分析的需求。
StreamTask 部署启动
当 TaskExecutor 接收提交 Task 执行的请求,则调用:
TaskExecutor.submitTask(TaskDeploymentDescriptor tdd,
JobMasterId jobMasterId,Time timeout){// 构造 Task 对象Task task = new Task(jobInformation, taskInformation, ExecutionAttemptId,AllocationId, SubtaskIndex, ....);// 启动 Task 的执行task.startTaskThread();
}
Task对象的构造方法
public Task(.....){
// 封装一个 Task信息对象 TaskInfo,(TaskInfo, JobInfo,JobMasterInfo)
this.taskInfo = new TaskInfo(....);
// 各种成员变量赋值
......
// 一个Task的执行有输入也有输出: 关于输出的抽象: ResultPartition 和
ResultSubPartition(PipelinedSubpartition)
// 初始化 ResultPartition 和 ResultSubPartition
final ResultPartitionWriter[] resultPartitionWriters =
shuffleEnvironment.createResultPartitionWriters(....);
this.consumableNotifyingPartitionWriters =
ConsumableNotifyingResultPartitionWriterDecorator.decorate(....);
// 一个Task的执行有输入也有输出: 关于输入的抽象: InputGate 和 InputChannel(从上有
一个Task节点拉取数据)
// InputChannel 可能有两种实现: Local Remote
// 初始化 InputGate 和 InputChannel
final IndexedInputGate[] gates = shuffleEnvironment.createInputGates(.....);
// 初始化一个用来执行 Task 的线程,目标对象,就是 Task 自己
executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask);
}
Task 线程启动
Task 的启动,是通过启动 Task 对象的内部 executingThread 来执行 Task 的,具体逻辑在 run 方法中:
private void doRun() {// ----------------------------// Initial State transition// ----------------------------while (true) {ExecutionState current = this.executionState;if (current == ExecutionState.CREATED) {if (transitionState(ExecutionState.CREATED, ExecutionState.DEPLOYING)) {// success, we can start our workbreak;}} else if (current == ExecutionState.FAILED) {// we were immediately failed. tell the TaskManager that we reached our final statenotifyFinalState();if (metrics != null) {metrics.close();}return;} else if (current == ExecutionState.CANCELING) {if (transitionState(ExecutionState.CANCELING, ExecutionState.CANCELED)) {// we were immediately canceled. tell the TaskManager that we reached our final// statenotifyFinalState();if (metrics != null) {metrics.close();}return;}} else {if (metrics != null) {metrics.close();}throw new IllegalStateException("Invalid state for beginning of operation of task " + this + '.');}}// all resource acquisitions and registrations from here on// need to be undone in the endMap<String, Future<Path>> distributedCacheEntries = new HashMap<>();TaskInvokable invokable = null;try {// ----------------------------// Task Bootstrap - We periodically// check for canceling as a shortcut// ----------------------------// activate safety net for task threadLOG.debug("Creating FileSystem stream leak safety net for task {}", this);FileSystemSafetyNet.initializeSafetyNetForThread();// first of all, get a user-code classloader// this may involve downloading the job's JAR files and/or classesLOG.info("Loading JAR files for task {}.", this);userCodeClassLoader = createUserCodeClassloader();final ExecutionConfig executionConfig =serializedExecutionConfig.deserializeValue(userCodeClassLoader.asClassLoader());if (executionConfig.getTaskCancellationInterval() >= 0) {// override task cancellation interval from Flink config if set in ExecutionConfigtaskCancellationInterval = executionConfig.getTaskCancellationInterval();}if (executionConfig.getTaskCancellationTimeout() >= 0) {// override task cancellation timeout from Flink config if set in ExecutionConfigtaskCancellationTimeout = executionConfig.getTaskCancellationTimeout();}if (isCanceledOrFailed()) {throw new CancelTaskException();}// ----------------------------------------------------------------// register the task with the network stack// this operation may fail if the system does not have enough// memory to run the necessary data exchanges// the registration must also strictly be undone// ----------------------------------------------------------------LOG.debug("Registering task at network: {}.", this);setupPartitionsAndGates(partitionWriters, inputGates);for (ResultPartitionWriter partitionWriter : partitionWriters) {taskEventDispatcher.registerPartition(partitionWriter.getPartitionId());}// next, kick off the background copying of files for the distributed cachetry {for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :DistributedCache.readFileInfoFromConfig(jobConfiguration)) {LOG.info("Obtaining local cache file for '{}'.", entry.getKey());Future<Path> cp =fileCache.createTmpFile(entry.getKey(), entry.getValue(), jobId, executionId);distributedCacheEntries.put(entry.getKey(), cp);}} catch (Exception e) {throw new Exception(String.format("Exception while adding files to distributed cache of task %s (%s).",taskNameWithSubtask, executionId),e);}if (isCanceledOrFailed()) {throw new CancelTaskException();}// ----------------------------------------------------------------// call the user code initialization methods// ----------------------------------------------------------------TaskKvStateRegistry kvStateRegistry =kvStateService.createKvStateTaskRegistry(jobId, getJobVertexId());Environment env =new RuntimeEnvironment(jobId,vertexId,executionId,executionConfig,taskInfo,jobConfiguration,taskConfiguration,userCodeClassLoader,memoryManager,ioManager,broadcastVariableManager,taskStateManager,aggregateManager,accumulatorRegistry,kvStateRegistry,inputSplitProvider,distributedCacheEntries,partitionWriters,inputGates,taskEventDispatcher,checkpointResponder,operatorCoordinatorEventGateway,taskManagerConfig,metrics,this,externalResourceInfoProvider);// Make sure the user code classloader is accessible thread-locally.// We are setting the correct context class loader before instantiating the invokable// so that it is available to the invokable during its entire lifetime.executingThread.setContextClassLoader(userCodeClassLoader.asClassLoader());// When constructing invokable, separate threads can be constructed and thus should be// monitored for system exit (in addition to invoking thread itself monitored below).FlinkSecurityManager.monitorUserSystemExitForCurrentThread();try {// now load and instantiate the task's invokable codeinvokable =loadAndInstantiateInvokable(userCodeClassLoader.asClassLoader(), nameOfInvokableClass, env);} finally {FlinkSecurityManager.unmonitorUserSystemExitForCurrentThread();}// actual task core work// we must make strictly sure that the invokable is accessible to the cancel() call// by the time we switched to running.this.invokable = invokable;restoreAndInvoke(invokable);// make sure, we enter the catch block if the task leaves the invoke() method due// to the fact that it has been canceledif (isCanceledOrFailed()) {throw new CancelTaskException();}// ----------------------------------------------------------------// finalization of a successful execution// ----------------------------------------------------------------// finish the produced partitions. if this fails, we consider the execution failed.for (ResultPartitionWriter partitionWriter : partitionWriters) {if (partitionWriter != null) {partitionWriter.finish();}}// try to mark the task as finished// if that fails, the task was canceled/failed in the meantimeif (!transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED)) {throw new CancelTaskException();}} catch (Throwable t) {} }
StreamTask 初始化
StreamTask 初始化指的就是 SourceStreamTask 和 OneInputStreamTask 的实例对象的构建!Task 这个类,只是一个笼统意义上的 Task,就是一个通用 Task 的抽象,不管是批处理的,还是流式处理的,不管是 源Task, 还是逻辑处理 Task, 都被抽象成 Task 来进行调度执行!
StreamTask 执行
核心步骤如下:
public final void invoke() throws Exception {
// Task 正式工作之前
beforeInvoke();
// Task 开始工作: 针对数据执行正儿八经的逻辑处理
runMailboxLoop();
// Task 要结束
afterInvoke();
// Task 最后执行清理
cleanUpInvoke();
}