【Flink状态管理(八)】Checkpoint:CheckpointBarrier对齐后Checkpoint的完成、通知与对学习状态管理源码的思考

文章目录

  • 一. 调用StreamTask执行Checkpoint操作
    • 1. 执行Checkpoint总体代码流程
      • 1.1. StreamTask.checkpointState()
      • 1.2. executeCheckpointing
      • 1.3. 将算子中的状态快照操作封装在OperatorSnapshotFutures中
      • 1.4. 算子状态进行快照
      • 1.5. 状态数据快照持久化
  • 二. CheckpointCoordinator管理Checkpoint
    • 1. Checkpoint执行完毕后的确认过程
    • 2. 触发并完成Checkpoint操作
    • 3. 通知CheckpointComplete给TaskExecutor
  • 三. 状态管理学习小结

上文介绍了CheckpointBarrier的对齐操作,当CheckpointBarrier完成对齐操作后,接下来就是通过notifyCheckpoint()方法触发StreamTask节点的Checkpoint操作。

一. 调用StreamTask执行Checkpoint操作

如下代码,notifyCheckpoint()方法主要包含如下逻辑。

> 1. 判断toNotifyOnCheckpoint不为空。
> 2. 创建CheckpointMetaDataCheckpointMetrics实例,CheckpointMetaData用于存储
> Checkpoint的元信息,CheckpointMetrics用于记录和监控Checkpoint监控指标。
> 3. 触发StreamTask中算子的Checkpoint操作。
protected void notifyCheckpoint(CheckpointBarrier checkpointBarrier, long bufferedBytes, long alignmentDurationNanos) throws Exception {if (toNotifyOnCheckpoint != null) {// 创建CheckpointMetaData对象用于存储Meta信息CheckpointMetaData checkpointMetaData =new CheckpointMetaData(checkpointBarrier.getId(), checkpointBarrier.getTimestamp());// 创建CheckpointMetrics对象用于记录监控指标CheckpointMetrics checkpointMetrics = new CheckpointMetrics().setBytesBufferedInAlignment(bufferedBytes).setAlignmentDurationNanos(alignmentDurationNanos);// 调用toNotifyOnCheckpoint.triggerCheckpointOnBarrier()方法触发Checkpoint操作toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData,checkpointBarrier.getCheckpointOptions(),checkpointMetrics);}
}

注意:StreamTask是唯一实现了Checkpoint方法的子类,即只有StreamTask才能触发当前Task实例中的Checkpoint操作。

 

接下来具体看Checkpoint执行细节

1. 执行Checkpoint总体代码流程

Checkpoint触发过程分为两种情况:一种是CheckpointCoordinator周期性地触发数据源节点中的Checkpoint操作;另一种是下游算子通过对齐CheckpointBarrier事件触发本节点算子的Checkpoint操作。

不管是哪种方式触发Checkpoint,最终都是调用StreamTask.performCheckpoint()方法实现StreamTask实例中状态数据的持久化操作。

 

在StreamTask.performCheckpoint()方法中,首先判断当前的Task是否运行正常,然后使用actionExecutor线程池执行Checkpoint操作,Checkpoint的实际执行过程如下。

  1. Checkpoint执行前的准备操作,让OperatorChain中所有的Operator执行Pre-barrier工作。
  2. 将CheckpointBarrier事件发送到下游的节点中。
  3. 算子状态数据进行快照

执行checkpointState()方法,对StreamTask中OperatorChain的所有算子进行状态数据的快照操作,该过程为异步非阻塞过程,不影响数据的正常处理进程,执行完成后会返回True到CheckpointInputGate中。

  1. task挂掉情况处理:
  • 如果isRunning的条件为false,表明Task不在运行状态,此时需要给OperatorChain中的所有算子发送CancelCheckpointMarker消息,这里主要借助recordWriter.broadcastEvent(message)方法向下游算子进行事件广播。
  • 当且仅当OperatorChain中的算子还没有执行完Checkpoint操作的时候,下游的算子接收到CancelCheckpointMarker消息后会立即取消Checkpoint操作。
private boolean performCheckpoint(CheckpointMetaData checkpointMetaData,CheckpointOptions checkpointOptions,CheckpointMetrics checkpointMetrics,boolean advanceToEndOfTime) throws Exception {LOG.debug("Starting checkpoint ({}) {} on task {}",checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName());final long checkpointId = checkpointMetaData.getCheckpointId();if (isRunning) {// 使用actionExecutor执行Checkpoint逻辑actionExecutor.runThrowing(() -> {if (checkpointOptions.getCheckpointType().isSynchronous()) {setSynchronousSavepointId(checkpointId);if (advanceToEndOfTime) {advanceToEndOfEventTime();}}//Checkpoint操作的准备工作operatorChain.prepareSnapshotPreBarrier(checkpointId);//将checkpoint barrier发送到下游的stream中operatorChain.broadcastCheckpointBarrier(checkpointId,checkpointMetaData.getTimestamp(),checkpointOptions);//对算子中的状态进行快照操作,此步骤是异步操作,//不影响streaming拓扑中数据的正常处理checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);});return true;} else {// 如果Task处于其他状态,则向下游广播CancelCheckpointMarker消息actionExecutor.runThrowing(() -> {final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());recordWriter.broadcastEvent(message);});return false;}
}

 

1.1. StreamTask.checkpointState()

接下来我们看StreamTask.checkpointState()方法的具体实现,如下代码。

  1. 创建CheckpointStateOutputStream实例。主要有如下两种实现类:
    • FsCheckpointStateOutputStream:文件类型系统
    • MemoryCheckpointOutputStream:内存的数据流输出。
  2. 创建CheckpointingOperation实例,CheckpointingOperation封装了Checkpoint执行的具体操作流程,以及checkpointMetaData、checkpointOptions、storage和checkpointMetrics等Checkpoint执行过程中需要的环境配置信息。
  3. 调用CheckpointingOperation.executeCheckpointing()方法执行Checkpoint操作。
private void checkpointState(CheckpointMetaData checkpointMetaData,CheckpointOptions checkpointOptions,CheckpointMetrics checkpointMetrics) throws Exception {// 创建CheckpointStreamFactory实例CheckpointStreamFactory storage = checkpointStorage.resolveCheckpointStorageLocation(checkpointMetaData.getCheckpointId(),checkpointOptions.getTargetLocation());// 创建CheckpointingOperation实例CheckpointingOperation checkpointingOperation = new CheckpointingOperation(this,checkpointMetaData,checkpointOptions,storage,checkpointMetrics);// 执行Checkpoint操作checkpointingOperation.executeCheckpointing();
}

 

1.2. executeCheckpointing

如代码所示,CheckpointingOperation.executeCheckpointing()方法主要包含如下逻辑。

  1. 遍历所有StreamOperator算子,然后调用checkpointStreamOperator()方法为每个算子创建OperatorSnapshotFuture对象。这一步将所有算子的快照操作存储在OperatorSnapshotFutures集合中。
  2. 将OperatorSnapshotFutures存储到operatorSnapshotsInProgress的键值对集合中,其中Key为OperatorID,Value为该算子执行状态快照操作对应的OperatorSnapshotFutures对象
  3. 创建AsyncCheckpointRunnable线程对象,AsyncCheckpointRunnable实例中包含了创建好的OperatorSnapshotFutures集合。
  4. 调用StreamTask.asyncOperationsThreadPool线程池运行asyncCheckpointRunnable线程,执行operatorSnapshotsInProgress集合中算子的异步快照操作。
public void executeCheckpointing() throws Exception {//通过算子创建执行快照操作的OperatorSnapshotFutures对象for (StreamOperator<?> op : allOperators) {checkpointStreamOperator(op);}// 此处省略部分代码startAsyncPartNano = System.nanoTime();checkpointMetrics.setSyncDurationMillis((startAsyncPartNano - startSyncPartNano) / 1_000_000);AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(owner,operatorSnapshotsInProgress,checkpointMetaData,checkpointMetrics,startAsyncPartNano);// 注册Closeable操作owner.cancelables.registerCloseable(asyncCheckpointRunnable);// 执行asyncCheckpointRunnableowner.asyncOperationsThreadPool.execute(asyncCheckpointRunnable);}

 

1.3. 将算子中的状态快照操作封装在OperatorSnapshotFutures中

如下代码,AbstractStreamOperator.snapshotState()方法将当前算子的状态快照操作封装在OperatorSnapshotFutures对象中,然后通过asyncOperationsThreadPool线程池异步触发所有的OperatorSnapshotFutures操作,方法主要步骤如下。

  1. 创建OperatorSnapshotFutures对象,封装当前算子对应的状态快照操作。
  2. 创建snapshotContext上下文对象,存储快照过程需要的上下文信息,并调用snapshotState()方法执行快照操作。

snapshotState()方法由StreamOperator子类实现,例如在AbstractUdfStreamOperator中会调用StreamingFunctionUtils.snapshotFunctionState(context,getOperatorStateBackend(),
userFunction)方法执行函数中的状态快照操作。

  1. 向snapshotInProgress中指定KeyedStateRawFuture和OperatorStateRawFuture,专门用于处理原生状态数据的快照操作
  • 如果operatorStateBackend不为空,则将operatorStateBackend.snapshot()方法块设定到OperatorStateManagedFuture中,并注册到snapshotInProgress中等待执行。
  • 如果keyedStateBackend不为空,则将keyedStateBackend.snapshot()方法块设定到KeyedStateManagedFuture中,并注册到snapshotInProgress中等待执行。
  1. 返回创建的snapshotInProgress异步Future对象,snapshotInProgress中封装了当前算子需要执行的所有快照操作。
public final OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions,CheckpointStreamFactory factory) throws Exception {// 获取KeyGroupRangeKeyGroupRange keyGroupRange = null != keyedStateBackend ?keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;// 创建OperatorSnapshotFutures处理对象OperatorSnapshotFutures snapshotInProgress = new OperatorSnapshotFutures();// 创建snapshotContext上下文对象StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(checkpointId,timestamp,factory,keyGroupRange,getContainingTask().getCancelables());try {snapshotState(snapshotContext);// 设定KeyedStateRawFuture和OperatorStateRawFuturesnapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());// 如果operatorStateBackend不为空,设定OperatorStateManagedFutureif (null != operatorStateBackend) {snapshotInProgress.setOperatorStateManagedFuture(operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));}// 如果keyedStateBackend不为空,设定KeyedStateManagedFutureif (null != keyedStateBackend) {snapshotInProgress.setKeyedStateManagedFuture(keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));}} catch (Exception snapshotException) {// 此处省略部分代码}return snapshotInProgress;
}

这里可以看出,原生状态和管理状态的RunnableFuture对象会有所不同

  • RawState主要通过从snapshotContext中获取的RawFuture对象 管理状态的快照操作
  • ManagedState主要通过operatorStateBackend和keyedStateBackend进行状态的管理,并根据StateBackend的不同实现将状态数据写入内存或外部文件系统中。

 

1.4. 算子状态进行快照

我们知道所有的状态快照操作都会被封装到OperatorStateManagedFuture对象中,最终通过AsyncCheckpointRunnable线程触发执行。

下面我们看AsyncCheckpointRunnable线程的定义。如代码所示,AsyncCheckpointRunnable.run()方法主要逻辑如下。

  1. 调用FileSystemSafetyNet.initializeSafetyNetForThread()方法为当前线程初始化文件系统安全网,确保数据能够正常写入。
  2. 创建TaskStateSnapshot实例:

创建jobManagerTaskOperatorSubtaskStates和localTaskOperatorSubtaskStates对应的TaskStateSnapshot实例,其中jobManagerTaskOperatorSubtaskStates用于存储和记录发送给JobManager的Checkpoint数据,localTaskOperatorSubtaskStates用于存储TaskExecutor本地的状态数据。

  1. 执行所有状态快照线程操作

遍历operatorSnapshotsInProgress集合,获取OperatorSnapshotFutures并创建OperatorSnapshotFinalizer实例,用于执行所有状态快照线程操作。在OperatorSnapshotFinalizerz中会调用FutureUtils.runIfNotDoneAndGet()方法执行KeyedState和OperatorState的快照操作。

  1. 从finalizedSnapshots中获取JobManagerOwnedState和TaskLocalState,分别存储在jobManagerTaskOperatorSubtaskStates和localTaskOperatorSubtaskStates集合中。
  2. 调用checkpointMetrics对象记录Checkpoint执行的时间并汇总到Metric监控系统中。
  3. 如果AsyncCheckpointState为COMPLETED状态,则调用reportCompletedSnapshotStates()方法向JobManager汇报Checkpoint的执行结果。
  4. 如果出现其他异常情况,则调用handleExecutionException()方法进行处理。
public void run() {FileSystemSafetyNet.initializeSafetyNetForThread();try {// 创建TaskStateSnapshotTaskStateSnapshot jobManagerTaskOperatorSubtaskStates =new TaskStateSnapshot(operatorSnapshotsInProgress.size());TaskStateSnapshot localTaskOperatorSubtaskStates =new TaskStateSnapshot(operatorSnapshotsInProgress.size());for (Map.Entry<OperatorID, OperatorSnapshotFutures> entry : operatorSnapshotsInProgress.entrySet()) {OperatorID operatorID = entry.getKey();OperatorSnapshotFutures snapshotInProgress = entry.getValue();// 创建OperatorSnapshotFinalizer对象OperatorSnapshotFinalizer finalizedSnapshots =new OperatorSnapshotFinalizer(snapshotInProgress);jobManagerTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(operatorID,finalizedSnapshots.getJobManagerOwnedState());localTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(operatorID,finalizedSnapshots.getTaskLocalState());}final long asyncEndNanos = System.nanoTime();final long asyncDurationMillis = (asyncEndNanos - asyncStartNanos) / 1_000_000L;checkpointMetrics.setAsyncDurationMillis(asyncDurationMillis);if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsyncCheckpointState.RUNNING,CheckpointingOperation.AsyncCheckpointState.COMPLETED)) {reportCompletedSnapshotStates(jobManagerTaskOperatorSubtaskStates,localTaskOperatorSubtaskStates,asyncDurationMillis);} else {LOG.debug("{} - asynchronous part of checkpoint {} could not be completed because it was closed before.",owner.getName(),checkpointMetaData.getCheckpointId());}} catch (Exception e) {handleExecutionException(e);} finally {owner.cancelables.unregisterCloseable(this);FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();}
}

至此,算子状态数据快照的逻辑基本完成,算子中的托管状态主要借助KeyedStateBackend和OperatorStateBackend管理。

KeyedStateBackend和OperatorStateBackend都实现了SnapshotStrategy接口,提供了状态快照的方法。SnapshotStrategy根据不同类型存储后端,主要有HeapSnapshotStrategy和RocksDBSnapshotStrategy两种类型。

 

1.5. 状态数据快照持久化

这里我们以HeapSnapshotStrategy为例,介绍在StateBackend中对状态数据进行状态快照持久化操作的步骤。如代码所示,

HeapSnapshotStrategy.processSnapshotMetaInfoForAllStates()方法中定义了对KeyedState以及OperatorState的状态处理逻辑。

  1. 遍历每个StateSnapshotRestore。
  2. 调用StateSnapshotRestore.stateSnapshot()方法,此时会创建StateSnapshot对象。
  3. 将创建的StateSnapshot添加到metaInfoSnapshots和cowStateStableSnapshots集合中,完成堆内存存储类型KvState的快照操作。
private void processSnapshotMetaInfoForAllStates(List metaInfoSnapshots,Map<StateUID, StateSnapshot> cowStateStableSnapshots,Map<StateUID, Integer> stateNamesToId,Map<String, ? extends StateSnapshotRestore> registeredStates,StateMetaInfoSnapshot.BackendStateType stateType) {for (Map.Entry<String, ? extends StateSnapshotRestore> kvState :registeredStates.entrySet()) {final StateUID stateUid = StateUID.of(kvState.getKey(), stateType);stateNamesToId.put(stateUid, stateNamesToId.size());StateSnapshotRestore state = kvState.getValue();if (null != state) {final StateSnapshot stateSnapshot = state.stateSnapshot();metaInfoSnapshots.add(stateSnapshot.getMetaInfoSnapshot());cowStateStableSnapshots.put(stateUid, stateSnapshot);}}
}

 

二. CheckpointCoordinator管理Checkpoint

1. Checkpoint执行完毕后的确认过程

当StreamTask中所有的算子完成状态数据的快照操作后,Task实例会立即将TaskStateSnapshot消息发送到管理节点的CheckpointCoordinator中,并在CheckpointCoordinator中完成后续的操作。如图所示,Checkpoint执行完毕后的确认过程如下。

在这里插入图片描述

  1. 调用StreamTask.reportCompletedSnapshotStates

当StreamTask中的所有算子都完成快照操作后,会调用StreamTask.reportCompletedSnapshotStates()方法将TaskStateSnapshot等Ack消息发送给TaskStateManager。TaskStateManager封装了CheckpointCoordinatorGateway,因此可以直接和CheckpointCoordinator组件进行RPC通信。

  1. 消息传递
  • 将消息传递给CheckpointCoordinatorGateway
    TaskStateManager通过CheckpointResponder.acknowledgeCheckpoint()方法将acknowledgedTaskStateSnapshot消息传递给CheckpointCoordinatorGateway接口实现者,实际上就是JobMasterRPC服务。
  • 消息传递给CheckpointCoordinator
    JobMaster接收到RpcCheckpointResponder返回的Ack消息后,会调用SchedulerNG.acknowledgeCheckpoint()方法将消息传递给调度器。调度器会将Ack消息封装成AcknowledgeCheckpoint,传递给CheckpointCoordinator组件继续处理。
  1. 管理PendingCheckpoint

当CheckpointCoordinator接收到AcknowledgeCheckpoint后,会从pendingCheckpoints集合中获取对应的PendingCheckpoint,然后判断当前Checkpoint中是否收到AcknowledgedTasks集合所有的Task实例发送的Ack确认消息。
如果notYetAcknowledgedTasks为空,则调用completePendingCheckpoint()方法完成当前PendingCheckpoint操作,并从pendingCheckpoints集合中移除当前的PendingCheckpoint。

  1. 添加CompletedCheckpoint:

紧接着,PendingCheckpoint会转换成CompletedCheckpoint,此时CheckpointCoordinator会在completedCheckpointStore集合中添加CompletedCheckpoint。

  1. 通知Checkpoint操作结束。

CheckpointCoordinator会遍历tasksToCommitTo集合中的ExecutionVertex节点并获取Execution对象,然后通过Execution向TaskManagerGateway发送CheckpointComplete消息,通知所有的Task实例本次Checkpoint操作结束。

  1. 通知同步

当TaskExecutor接收到CheckpointComplete消息后,会从TaskSlotTable中获取对应的Task实例,向Task实例中发送CheckpointComplete消息。所有实现CheckpointListener监听器的组件或算子都会获取Checkpoint完成的消息,然后完成各自后续的处理操作。

 

2. 触发并完成Checkpoint操作

CheckpointCoordinator组件接收到Task实例的Ack消息(快照完成了?)后,会触发并完成Checkpoint操作。如代码PendingCheckpoint.finalizeCheckpoint()方法的具体实现如下。

1)向sharedStateRegistry中注册operatorStates。
2)结束pendingCheckpoint中的Checkpoint操作并生成CompletedCheckpoint3)将completedCheckpoint添加到completedCheckpointStore中,
4)从pendingCheckpoint中移除checkpointId对应的PendingCheckpoint,
并触发队列中的Checkpoint请求。
5)向所有的ExecutionVertex节点发送CheckpointComplete消息,
通知Task实例本次Checkpoint操作完成。private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) throws CheckpointException {final long checkpointId = pendingCheckpoint.getCheckpointId();final CompletedCheckpoint completedCheckpoint;// 首先向sharedStateRegistry中注册operatorStatesMap<OperatorID, OperatorState> operatorStates = pendingCheckpoint.getOperatorStates();sharedStateRegistry.registerAll(operatorStates.values());// 对pendingCheckpoint中的Checkpoint做结束处理并生成CompletedCheckpointtry {try {completedCheckpoint = pendingCheckpoint.finalizeCheckpoint();failureManager.handleCheckpointSuccess(pendingCheckpoint.getCheckpointId());}catch (Exception e1) {// 如果出现异常则中止运行并抛出CheckpointExecutionif (!pendingCheckpoint.isDiscarded()) {failPendingCheckpoint(pendingCheckpoint,CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1);}throw new CheckpointException("Could not finalize the pending checkpoint " +checkpointId + '.',CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1);}// 当完成finalization后,PendingCheckpoint必须被丢弃Preconditions.checkState(pendingCheckpoint.isDiscarded() && completedCheckpoint != null);// 将completedCheckpoint添加到completedCheckpointStore中try {completedCheckpointStore.addCheckpoint(completedCheckpoint);} catch (Exception exception) {// 如果completed checkpoint存储出现异常则进行清理executor.execute(new Runnable() {@Overridepublic void run() {try {completedCheckpoint.discardOnFailedStoring();} catch (Throwable t) {LOG.warn("Could not properly discard completed checkpoint {}.",completedCheckpoint.getCheckpointID(), t);}}});throw new CheckpointException("Could not complete the pending checkpoint " + checkpointId + '.', CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, exception);}} finally {// 最后从pendingCheckpoints中移除checkpointId对应的PendingCheckpointpendingCheckpoints.remove(checkpointId);// 触发队列中的Checkpoint请求triggerQueuedRequests();}// 记录checkpointIdrememberRecentCheckpointId(checkpointId);// 清除之前的CheckpointsdropSubsumedCheckpoints(checkpointId);// 计算和前面Checkpoint操作之间的最低延时lastCheckpointCompletionRelativeTime = clock.relativeTimeMillis();LOG.info("Completed checkpoint {} for job {} ({} bytes in {} ms).", checkpointId, job,completedCheckpoint.getStateSize(), completedCheckpoint.getDuration());// 通知所有的ExecutionVertex节点Checkpoint操作完成final long timestamp = completedCheckpoint.getTimestamp();for (ExecutionVertex ev : tasksToCommitTo) {Execution ee = ev.getCurrentExecutionAttempt();if (ee != null) {ee.notifyCheckpointComplete(checkpointId, timestamp);}}
}

 

3. 通知CheckpointComplete给TaskExecutor

当TaskExecutor接收到来自CheckpointCoordinator的CheckpointComplete消息后,会调用Task.notifyCheckpointComplete()方法将消息传递到指定的Task实例中。Task线程会将CheckpointComplete消息通知给StreamTask中的算子。

如下代码,

/**
将notifyCheckpointComplete()转换成RunnableWithException线程并提交到Mailbox中运行,且在MailboxExecutor线程模型中获取和执行的优先级是最高的。
最终notifyCheckpointComplete()方法会在MailboxProcessor中运行。
**/public Future<Void> notifyCheckpointCompleteAsync(long checkpointId) {return mailboxProcessor.getMailboxExecutor(TaskMailbox.MAX_PRIORITY).submit(() -> notifyCheckpointComplete(checkpointId),"checkpoint %d complete", checkpointId);
}

继续具体看StreamTask.notifyCheckpointComplete(),如下代码:

1)获取当前Task中算子链的算子,并发送Checkpoint完成的消息。
2)获取TaskStateManager对象,向其通知Checkpoint完成消息,这里主要调用
TaskLocalStateStore清理本地无用的Checkpoint数据。
3)如果当前Checkpoint是同步的Savepoint操作,直接完成并终止当前Task实例,并调用
resetSynchronousSavepointId()方法将syncSavepointId重置为空。private void notifyCheckpointComplete(long checkpointId) {try {boolean success = actionExecutor.call(() -> {if (isRunning) {LOG.debug("Notification of complete checkpoint for task {}", getName());// 获取当前Task中operatorChain所有的Operator,并通知每个Operator Checkpoint执行成功的消息for (StreamOperator<?> operator : operatorChain.getAllOperators()) {if (operator != null) {operator.notifyCheckpointComplete(checkpointId);}}return true;} else {LOG.debug("Ignoring notification of complete checkpoint for not-running task {}", getName());return true;}});// 获取TaskStateManager,并通知Checkpoint执行完成的消息getEnvironment().getTaskStateManager().notifyCheckpointComplete(checkpointId);// 如果是同步的Savepoint操作,则直接完成当前Taskif (success && isSynchronousSavepointId(checkpointId)) {finishTask();// Reset to "notify" the internal synchronous savepoint mailbox loop.resetSynchronousSavepointId();}} catch (Exception e) {handleException(new RuntimeException("Error while confirming checkpoint", e));}
}

算子接收到Checkpoint完成消息后,会根据自身需要进行后续的处理,默认在AbstractStreamOperator基本实现类中会通知keyedStateBackend进行后续操作。

对于AbstractUdfStreamOperator实例,会判断当前userFunction是否实现了CheckpointListener,如果实现了,则向UserFucntion通知Checkpoint执行完成的信息

例如在FlinkKafkaConsumerBase中会通过获取到的Checkpoint完成信息,将Offset提交至Kafka集群,确保消费的数据已经完成处理,详细实现可以参考FlinkKafkaConsumerBase.notifyCheckpointComplete()方法。

public void notifyCheckpointComplete(long checkpointId) throws Exception {super.notifyCheckpointComplete(checkpointId);if (userFunction instanceof CheckpointListener) {((CheckpointListener) userFunction).notifyCheckpointComplete(checkpointId);}
}

 

三. 状态管理学习小结

通过学习状态管理的源码,我们可以再来思考下如下几个场景问题,是不是有一点“庖丁解牛”的意思!

flink中状态存在的意义是什么,涉及到哪些场景。

  1. 实时聚合:比如,计算过去一小时内的平均销售额。这时,你会需要使用到Flink的状态来存储过去一小时内的所有销售数据。
  2. 窗口操作:Flink SQL支持滚动窗口、滑动窗口、会话窗口等。这些窗口操作都需要Flink的状态来存储在窗口期限内的数据。
  3. 状态的持久化与任务恢复:实时任务挂掉之后,为了快速从上一个点恢复任务,可以使用savepoint和checkpoint。
  4. 多流join:Flink至少存储一个流中的数据,以便于在新的记录到来时进行匹配。

 

其次通过学习Flink状态管理相关源码,可以进一步了解状态管理的细节操作,为解决更加复杂的问题打下理论基础

  1. 深入理解任务运行过程中,各算子状态的流转机制;
  2. 快速定位问题:在遇到实际问题时,能够快速反应出是哪块逻辑出现了问题;
  3. 应对故障:状态管理和Flink容错机制相关,可以了解Flink发生故障时如何保证状态的一致性和可恢复性
  4. 二次开发:可以自定义状态后端,或者拓展优化已有的例如RocksDB状态后端等;
  5. 性能优化:了解了Flink是如何有效的处理和管理状态,就可以优化任务性能,减少资源消耗。

 

参考:《Flink设计与实现:核心原理与源码解析》–张利兵

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/693996.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

图的遍历-----深度优先遍历(dfs),广度优先遍历(bfs)【java详解】

目录 简单介绍&#xff1a;什么是深度、广度优先遍历&#xff1f; 深度优先搜索&#xff08;DFS&#xff0c;Depth First Search&#xff09;&#xff1a; 大致图解&#xff1a; 广度优先搜索&#xff08;BFS&#xff0c;Breadth First Search&#xff09;&#xff1a; 大致图…

Python学习笔记——自定义函数(传递任意数量的实参)

Python允许函数从调用语句中收集任意数量的实参。例如下面自定义函数制作一个披萨&#xff0c;它需要接受很多配料&#xff0c;但无法预先确定顾客要点多少种配料。 下面行数只有一个形参*toppings&#xff0c;不管调用语句提供多少个实参&#xff0c;这个参数都会收集到&…

用 LangChain 和 Milvus 从零搭建 LLM 应用

如何从零搭建一个 LLM 应用&#xff1f;不妨试试 LangChain Milvus 的组合拳。 作为开发 LLM 应用的框架&#xff0c;LangChain 内部不仅包含诸多模块&#xff0c;而且支持外部集成&#xff1b;Milvus 同样可以支持诸多 LLM 集成&#xff0c;二者结合除了可以轻松搭建一个 LL…

在Discord上添加自己的服务器并邀请midjourney机器人加入

我开发的chatgpt网站&#xff1a; https://chat.xutongbao.top

vue2--多设备访问本地调试项目

背景 在vue2开发阶段&#xff0c;为了更好的和小伙伴对项目进行讨论&#xff0c;需要让小伙伴可以看到自己本地的项目。 方案 修改vue2项目配置 在本地进行项目调试时&#xff0c;都是使用的127.0.0.1:8080&#xff0c;为了让局域网中的其他小伙伴可以访问&#xff0c;需要…

阿里云国际-在阿里云服务器上快速搭建幻兽帕鲁多人服务器

幻兽帕鲁是最近流行的新型生存游戏。该游戏一夜之间变得极为流行&#xff0c;同时在线玩家数量达到了200万。然而&#xff0c;幻兽帕鲁的服务器难以应对大量玩家的压力。为解决这一问题&#xff0c;幻兽帕鲁允许玩家建立专用服务器&#xff0c;其提供以下优势&#xff1a; &am…

Docker中如何删除某个镜像

1. 停止使用镜像的容器 首先&#xff0c;您需要停止所有正在使用该镜像的容器。您可以使用 docker stop 命令来停止容器&#xff1a; docker stop 11184993a106如果有多个容器使用该镜像&#xff0c;您需要对每个容器都执行停止命令。您可以通过 docker ps -a | grep core-ba…

数据结构 计算结构体大小

一、规则&#xff1a; 操作系统制定对齐量&#xff1a; 64位操作系统&#xff0c;默认8Byte对齐 32位操作系统&#xff0c;默认4Byte对齐 结构体对齐规则&#xff1a; 1.结构体整体的大小&#xff0c;需要是最大成员对齐量的整数倍 2.结构体中每一个成员的偏移量需要存在…

Selenium常见问题解析

1、元素定位失败&#xff1a; 在使用Selenium自动化测试时&#xff0c;最常见的问题之一是无法正确地定位元素&#xff0c;这可能导致后续操作失败。解决方法包括使用不同的定位方式&#xff08;如xpath、CSS selector、id等&#xff09;&#xff0c;等待页面加载完全后再进行…

【Oracle】玩转Oracle数据库(三):数据库的创建和管理

前言 嘿&#xff0c;各位数据库小能手们&#xff01;今天我们要进入数据库的创世纪&#xff0c;探索Oracle数据库的创建和管理&#xff01;&#x1f527;&#x1f4bb; 在这篇博文【Oracle】玩转Oracle数据库&#xff08;三&#xff09;&#xff1a;数据库的创建和管理中&#…

详解动态内存管理!

目录 ​编辑 1.为什么要用动态内存分配 2.malloc和free 2.1 malloc 2.2 free 3.calloc和realloc 3.1 calloc 3.2 realloc 4.常见的动态内存的错误 4.1 对NULL的解引用操作 4.2 对动态内存开辟空间的越界访问 4.3 对非动态内存开辟空间用free释放 4.4 使用free释放动…

【北京游戏业:出海竞争实力全面】

本文将深入分析北京的游戏行业发展。在上海、广州、北京、深圳、成都、杭州、福建七大游戏产业中心城市中&#xff0c;北京无疑是出海竞争力最强的游戏产业集群。本文将全面剖析北京游戏行业的发展现状。 北京是中国游戏产业的发源地。拥有从游戏引擎到美术设计等完整的产业链…

如何在Shopee平台上选择爆款商品:借鉴爆款属性的有效策略

在当今激烈竞争的电商市场中&#xff0c;想要在Shopee平台上取得成功&#xff0c;卖家需要精心选择潜在的热销产品。借鉴爆款商品的属性是一种行之有效的策略&#xff0c;能够帮助卖家快速找到市场上的热门商品。通过分析市场趋势、竞品表现、社交媒体趋势等多方面因素&#xf…

【EI会议征稿通知】第四届自动化控制、算法与智能仿生学术会议(ACAIB 2024)

第四届自动化控制、算法与智能仿生学术会议(ACAIB 2024) 2024 4th Conference on Automation Control, Algorithm and Intelligent Bionics 第四届自动化控制、算法与智能仿生学术会议&#xff08;ACAIB 2024&#xff09;将于2024年6月7日-9日在中国银川举行。 本届大会由北…

leetcode hot100 完全平方数

本题中&#xff0c;是给一个整数n&#xff0c;让用完全平方数凑出这个整数&#xff0c;注意&#xff0c;题中给了n的范围&#xff0c;是大于等于1的&#xff0c;也就是说&#xff0c;dp[0]我们可以先不考虑。 整个问题可以抽象成完全背包问题的变形形式&#xff0c;物品就是这…

在 React 中使用 i18next支持多语言

基本用法 安装依赖包 npm i i18next react-i18next i18next-browser-languagedetector --savei18next 提供了翻译的基本能力; react-i18next 是 i18next 的一个插件&#xff0c;用来降低 react 的使用成本; i18next-browser-languagedetector 是用来检测浏览器语言的插件。 …

搜维尔科技:OptiTrack探索人类与技术之间关系的开创性表演

另一种蓝色通过 OptiTrack 释放创造力 总部位于荷兰的当代舞蹈团因其探索人类与技术之间关系的开创性表演而受到广泛赞誉。该公司由富有远见的编舞家大卫米登多普创立&#xff0c;不仅利用技术作为探索的主题&#xff0c;而且将其作为表达故事的动态工具。 “我一直对文化与…

【Java面试系列】JDK 1.8 新特性之 Stream API

目录 一、Stream 简介二、Stream 特点&#xff1a;Stream 注意点&#xff1a;1、什么是聚合操作2、Stream 流1、什么是流2、流的构成3、stream 流的两种操作4、惰性求值和及早求值方法5、Stream 流的并行 三、Stream操作的三个步骤1、创建流第一种&#xff1a;通过集合第二种&a…

ping 8.8.8.8和ping www.baidu.com都OK,但是打不开网页

ping 8.8.8.8和ping www.baidu.com都OK&#xff0c;但是打不开网页 打开设置 -> 网络 找到IPV4, DNS栏输入 8.8.8.8 , apply 设置里界面变成这样 然后网页就能加载了

mysql(五) buffer pool(缓存页数据与索引数据)

前面写了很多mysql的实用基础知识、接下来我将总结整理一些进阶和更深的一些知识、你知道的越多、不知道就越多、让我们一起学习。 目录 一、buffer pool的位置&#xff08;Innodb存储引擎内&#xff09; 二、Buffer Pool是什么&#xff1f; 1、降低磁盘访问的机制 2、Buf…