【Flink状态管理(二)各状态初始化入口】状态初始化流程详解与源码剖析

文章目录

    • 1. 状态初始化总流程梳理
    • 2.创建StreamOperatorStateContext
    • 3. StateInitializationContext的接口设计。
    • 4. 状态初始化举例:UDF状态初始化

在TaskManager中启动Task线程后,会调用StreamTask.invoke()方法触发当前Task中算子的执行,在invoke()方法中会调用restoreInternal()方法,这中间包括创建和初始化算子中的状态数据。
另外在invoke中,可以通过判断任务状态来判断是否需要初始化状态。

        // Allow invoking method 'invoke' without having to call 'restore' before it.if (!isRunning) {LOG.debug("Restoring during invoke will be called.");restoreInternal();}

StreamTask调用initializeStateAndOpenOperators()方法对当前Task中所有算子的状态数据进行初始化。

RegularOperatorChain.
public void initializeStateAndOpenOperators(StreamTaskStateInitializer streamTaskStateInitializer) throws Exception {  Iterator var2 = this.getAllOperators(true).iterator();  while(var2.hasNext()) {  StreamOperatorWrapper<?, ?> operatorWrapper = (StreamOperatorWrapper)var2.next();  StreamOperator<?> operator = operatorWrapper.getStreamOperator();  operator.initializeState(streamTaskStateInitializer);  operator.open();  }  }

 
找到了算子状态初始化的位置,我们继续了解状态是如何初始化的。

1. 状态初始化总流程梳理

AbstractStreamOperator.initializeState中描述了状态初始化的总体流程,如下代码以及注释:

# AbstractStreamOperator.initializeStatepublic final void initializeState(StreamTaskStateInitializer streamTaskStateManager)  throws Exception {  //1. 获取类型序列化器final TypeSerializer<?> keySerializer =  config.getStateKeySerializer(getUserCodeClassloader());  //2. get containingTaskfinal StreamTask<?, ?> containingTask = Preconditions.checkNotNull(getContainingTask());  final CloseableRegistry streamTaskCloseableRegistry =  Preconditions.checkNotNull(containingTask.getCancelables());  //3. create StreamOperatorStateContextfinal StreamOperatorStateContext context =  streamTaskStateManager.streamOperatorStateContext(  getOperatorID(),  getClass().getSimpleName(),  getProcessingTimeService(),  this,  keySerializer,  streamTaskCloseableRegistry,  metrics,  config.getManagedMemoryFractionOperatorUseCaseOfSlot(  ManagedMemoryUseCase.STATE_BACKEND,  runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(),  runtimeContext.getUserCodeClassLoader()),  isUsingCustomRawKeyedState());  //4. create stateHandlerstateHandler =  new StreamOperatorStateHandler(  context, getExecutionConfig(), streamTaskCloseableRegistry);  timeServiceManager = context.internalTimerServiceManager();  //5. initialize OperatorStatestateHandler.initializeOperatorState(this);  //6. set KeyedStateStore in runtimeContextruntimeContext.setKeyedStateStore(stateHandler.getKeyedStateStore().orElse(null));  
}

在StreamOperator初始化状态数据的过程中,首先从StreamTask中获取创建状态需要的组件,例如托管状态的管理后端KeyedStateBackend、OperatorStateBackend以及原生状态管理的KeyedStateInputs和OperatorStateInputs组件。

状态数据操作过程中使用的管理组件最终都会封装成StateInitializationContext并传递给子类使用,例如在AbstractUdfStreamOperator中,就会使用StateInitializationContext中的信息初始化用户定义的UDF中的状态数据。

2.创建StreamOperatorStateContext

接下来看如何在Task实例初始化时创建这些组件,并将其存储在StreamOperatorStateContext中供算子使用,如下代码:

StreamTaskStateInitializerImpl
@Override  
public StreamOperatorStateContext streamOperatorStateContext(  @Nonnull OperatorID operatorID,  @Nonnull String operatorClassName,  @Nonnull ProcessingTimeService processingTimeService,  @Nonnull KeyContext keyContext,  @Nullable TypeSerializer<?> keySerializer,  @Nonnull CloseableRegistry streamTaskCloseableRegistry,  @Nonnull MetricGroup metricGroup,  double managedMemoryFraction,  boolean isUsingCustomRawKeyedState)  throws Exception {  //1. 获取task实例信息TaskInfo taskInfo = environment.getTaskInfo();  OperatorSubtaskDescriptionText operatorSubtaskDescription =  new OperatorSubtaskDescriptionText(  operatorID,  operatorClassName,  taskInfo.getIndexOfThisSubtask(),  taskInfo.getNumberOfParallelSubtasks());  final String operatorIdentifierText = operatorSubtaskDescription.toString();  final PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates =  taskStateManager.prioritizedOperatorState(operatorID);  CheckpointableKeyedStateBackend<?> keyedStatedBackend = null;  OperatorStateBackend operatorStateBackend = null;  CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs = null;  CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs = null;  InternalTimeServiceManager<?> timeServiceManager;  try {  // 创建keyed类型的状态后端// -------------- Keyed State Backend --------------  keyedStatedBackend =  keyedStatedBackend(  keySerializer,  operatorIdentifierText,  prioritizedOperatorSubtaskStates,  streamTaskCloseableRegistry,  metricGroup,  managedMemoryFraction);  //创建operator类型的状态后端// -------------- Operator State Backend --------------  operatorStateBackend =  operatorStateBackend(  operatorIdentifierText,  prioritizedOperatorSubtaskStates,  streamTaskCloseableRegistry);  //创建原生类型状态后端// -------------- Raw State Streams --------------  rawKeyedStateInputs =  rawKeyedStateInputs(  prioritizedOperatorSubtaskStates  .getPrioritizedRawKeyedState()  .iterator());  streamTaskCloseableRegistry.registerCloseable(rawKeyedStateInputs);  rawOperatorStateInputs =  rawOperatorStateInputs(  prioritizedOperatorSubtaskStates  .getPrioritizedRawOperatorState()  .iterator());  streamTaskCloseableRegistry.registerCloseable(rawOperatorStateInputs);  //创建Internal Timer Service Manager// -------------- Internal Timer Service Manager --------------  if (keyedStatedBackend != null) {  // if the operator indicates that it is using custom raw keyed state,  // then whatever was written in the raw keyed state snapshot was NOT written            // by the internal timer services (because there is only ever one user of raw keyed            // state);            // in this case, timers should not attempt to restore timers from the raw keyed            // state.            final Iterable<KeyGroupStatePartitionStreamProvider> restoredRawKeyedStateTimers =  (prioritizedOperatorSubtaskStates.isRestored()  && !isUsingCustomRawKeyedState)  ? rawKeyedStateInputs  : Collections.emptyList();  timeServiceManager =  timeServiceManagerProvider.create(  keyedStatedBackend,  environment.getUserCodeClassLoader().asClassLoader(),  keyContext,  processingTimeService,  restoredRawKeyedStateTimers);  } else {  timeServiceManager = null;  }  // -------------- Preparing return value --------------  return new StreamOperatorStateContextImpl(  prioritizedOperatorSubtaskStates.getRestoredCheckpointId(),  operatorStateBackend,  keyedStatedBackend,  timeServiceManager,  rawOperatorStateInputs,  rawKeyedStateInputs);  } catch (Exception ex) {  。。。。
}

流程梳理:

  1. 从environment中获取TaskInfo,并基于Task实例创建OperatorSubtaskDescriptionText。Operator中Task实例的描述信息包含OperatorID、OperatorClassName等,最终用于创建OperatorStateBackend的状态存储后端。
  2. 创建KeyedStateBackend,KeyedStateBackend是KeyedState的状态管理后端,提供创建和管理KeyedState的方法。
  3. 创建OperatorStateBackend,OperatorStateBackend是OperatorState的状态管理后端,提供获取和管理OperatorState的接口。
  4. 创建KeyGroupStatePartitionStreamProvider实例,提供创建和获取原生KeyedState的方法。
  5. 创建StatePartitionStreamProvider实例,提供创建和获取原生OperatorState的方法。
  6. 将所有创建出来的托管状态管理后端keyedStatedBackend和operatorStateBackend、原生状态存储后端rawKeyedStateInputs和rawOperatorStateInputs及timeServiceManager实例,全部封装在StreamOperatorStateContextImpl上下文对象中,并返回给AbstractStreamOperator使用。

 
小结
StreamTaskStateInitializer.streamOperatorStateContext()方法包含创建托管状态和原生状态管理后端的全过程。StreamOperator的实现类能够从StreamOperatorStateContext中获取这些状态管理组件并使用它们创建指定类型的状态,最终状态数据会存储在状态管理后端指定的物理介质上,例如堆内存或RocksDB。

StateInitializationContext会被用于算子和UserDefinedFunction中,实现算子或函数中的状态数据操作。

 

3. StateInitializationContext的接口设计。

StateInitializationContext接口同时继承了ManagedInitializationContext接口和FunctionInitializationContext接口。StateInitializationContext接口的默认实现类为StateInitializationContextImpl。
在这里插入图片描述

  1. ManagedInitializationContext接口提供了托管状态使用的KeyedStateStore和OperatorStateStore获取方法,即KeyedStateBackend和OperatorStateBackend的封装类。算子进行初始化时,会通过KeyedStateStore和OperatorStateStore提供的方法创建和管理指定类型的托管状态。

  2. FunctionInitializationContext提供了用户自定义函数状态数据初始化需要的方法。它和ManagedInitializationContext保持一致,这主要是为了和算子使用的上下文进行区分,但两者的操作基本一致。

  3. StateInitializationContext提供了对托管状态数据的管理,并在内部继承和拓展了获取及管理原生状态数据的方法,如getRawOperatorStateInputs()、getRawKeyedStateInputs()等

  4. StateInitializationContextImpl具备操作管理状态和原生状态的能力。基于它可以获取不同类型的状态管理后端,并基于状态管理操作状态数据。

在这里插入图片描述

 

4. 状态初始化举例:UDF状态初始化

在AbstractStreamOperator中调用initializeState(StateInitializationContext context)抽象方法初始化Operator中的状态。这里以AbstractUdfStreamOperator为例说明具体算子、UDF是如何进行状态初始化的。

AbstractUdfStreamOperator.initializeState()方法实际上调用了StreamingFunctionUtils.restoreFunctionState()方法对User-Defined Function中的状态数据进行初始化和恢复,实际上就是将上文创建的StateInitializationContext上下文信息提供给Function接口使用。

public void initializeState(StateInitializationContext context) throws Exception {super.initializeState(context);StreamingFunctionUtils.restoreFunctionState(context, userFunction);
}

恢复函数内部的状态数据涉及Checkpoint的实现,我们会在之后介绍如何在StreamingFunctionUtils.restoreFunctionState()方法中恢复函数中的状态数据。

 
《Flink设计与实现:核心原理与源码解析》张利兵

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/674393.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Netty中解决粘包/半包

目录 什么是TCP粘包半包&#xff1f; TCP 粘包/半包发生的原因 解决粘包半包 channelRead和channelReadComplete区别 什么是TCP粘包半包&#xff1f; 假设客户端分别发送了两个数据包 D1 和 D2 给服务端&#xff0c;由于服务端一次读取到的字节数是不确定的&#xff0c;故可…

Zoho Mail企业邮箱商业扩展第3部分:计算财务状况

在Zoho Mail商业扩展系列的压轴篇章中&#xff0c;王雪琳利用Zoho Mail的集成功能成功地完成了各项工作&#xff0c;并顺利地建立了自己的营销代理机构。让我们快速回顾一下她的成功之路。 一、使用Zoho Mail成功方法概述 首先她通过Zoho Mail为其电子邮件地址设置了自定义域…

spring boot打完jar包后使用命令行启动,提示xxx.jar 中没有主清单属性

在对springBoot接口中间件开发完毕后&#xff0c;本地启动没有任何问题&#xff0c;在使用package命令打包也没异常&#xff0c;打完包后使用命令行&#xff1a;java -jar xxx.jar启动发现报异常&#xff1a;xxx.jar 中没有主清单属性&#xff0c;具体解决方法如下&#xff1a;…

TI毫米波雷达开发——High Accuracy Demo 串口数据接收及TLV协议解析 matlab 源码

TI毫米波雷达开发——串口数据接收及TLV协议解析 matlab 源码 前置基础源代码功能说明功能演示视频文件结构01.bin / 02.binParseData.mread_file_and_plot_object_location.mread_serial_port_and_plot_object_location.m函数解析configureSport(comportSnum)readUartCallback…

day06.C++排序(整理)

一.直接插入排序 void Insertsort(int *a,int n){int i,j;for( i1;i<n;i){if(a[i]<a[i-1]){int tempa[i];//哨兵for( ji-1;temp<a[j];j--){a[j1]a[j];//记录后移}a[j1]temp;//插入到正确位置}} }二.希尔排序 void Shellsort(int *a,int n){for(int dltan/2;dlta>…

(2024 了,这文也太水了)审查 GAN 的 FID 和 SID 指标

Reviewing FID and SID Metrics on Generative Adversarial Networks 公和众和号&#xff1a;EDPJ&#xff08;进 Q 交流群&#xff1a;922230617 或加 VX&#xff1a;CV_EDPJ 进 V 交流群&#xff09; 目录 0. 摘要 2. 相关工作 3. 方法 4. 实验 0. 摘要 生成对抗网络&…

如何利用IP定位技术锁定网络攻击者

在当今高度互联的数字世界中&#xff0c;网络安全威胁日益猖獗。为了维护网络空间的安全与稳定&#xff0c;追踪并锁定网络攻击者成为了关键一环。而IP定位技术&#xff0c;作为一种重要的追踪手段&#xff0c;正发挥着越来越重要的作用。 IP定位技术&#xff0c;简而言之&…

Django模板(二)

标签if 标签在渲染过程中提供使用逻辑的方法,比如:if和for 标签被 {% 和 %} 包围,如下所示: 由于在模板中,没有办法通过代码缩进判断代码块,所以控制标签都需要有结束的标签 if判断标签{% if %} {% endif %} : # athlete_list 不为空 {% if athlete_list %}# 输出 ath…

vue实现购物车案例

话不多说&#xff0c;先上效果图。 安装elementui组件库&#xff0c;可直接食用。 <template><div><!-- 购物车部分 --><el-container><el-header><h1>购物车案例一条龙</h1></el-header><el-main><!-- 折叠面板…

【FPGA开发】Modelsim和Vivado的使用

本篇文章包含的内容 一、FPGA工程文件结构二、Modelsim的使用三、Vivado的使用3.1 建立工程3.2 分析 RTL ANALYSIS3.2.1 .xdc约束&#xff08;Constraints&#xff09;文件的产生 3.3 综合 SYNTHESIS3.4 执行 IMPLEMENTATION3.5 烧录程序3.6 程序固化3.6.1 SPI约束3.6.2 .bin文…

特征工程:特征提取和降维-上

目录 一、前言 二、正文 Ⅰ.主成分分析 Ⅱ.核主成分分析 三、结语 一、前言 前面介绍的特征选择方法获得的特征&#xff0c;是从原始数据中抽取出来的&#xff0c;并没有对数据进行变换。而特征提取和降维&#xff0c;则是对原始数据的特征进行相应的数据变换&#xff0c;并…

Rust开发WASM,WASM Runtime运行

安装wasm runtime curl https://wasmtime.dev/install.sh -sSf | bash 查看wasmtime的安装路径 安装target rustup target add wasm32-wasi 创建测试工程 cargo new wasm_wasi_demo 编译工程 cargo build --target wasm32-wasi 运行 wasmtime ./target/wasm32-wasi/d…

猫头虎分享已解决Bug ‍ || TypeError: Object of type ‘int64‘ is not JSON serializable

博主猫头虎的技术世界 &#x1f31f; 欢迎来到猫头虎的博客 — 探索技术的无限可能&#xff01; 专栏链接&#xff1a; &#x1f517; 精选专栏&#xff1a; 《面试题大全》 — 面试准备的宝典&#xff01;《IDEA开发秘籍》 — 提升你的IDEA技能&#xff01;《100天精通鸿蒙》 …

Appium使用初体验之参数配置,简单能够运行起来

一、服务器配置 Appium Server配置与Appium Server GUI&#xff08;可视化客户端&#xff09;中的配置对应&#xff0c;尤其是二者如果不在同一台机器上&#xff0c;那么就需要配置Appium Server GUI所在机器的IP&#xff08;Appium Server GUI的HOST也需要配置本机IP&#xf…

spring boot和spring cloud项目中配置文件application和bootstrap加载顺序

在前面的文章基础上 https://blog.csdn.net/zlpzlpzyd/article/details/136060312 日志配置 logback-spring.xml <?xml version"1.0" encoding"UTF-8"?> <configuration scan"true" scanPeriod"10000000 seconds" debug…

探索C语言中的联合体与枚举:数据多面手的完美组合!

​ ✨✨ 欢迎大家来到贝蒂大讲堂✨✨ &#x1f388;&#x1f388;养成好习惯&#xff0c;先赞后看哦~&#x1f388;&#x1f388; 所属专栏&#xff1a;C语言学习 贝蒂的主页&#xff1a;Betty‘s blog 1. 联合体的定义 联合体又叫共用体&#xff0c;它是一种特殊的数据类型&…

2024智慧城市新纪元:引领未来,重塑都市生活

随着科技的飞速发展和数字化转型的不断深入&#xff0c;2024年智慧城市领域迎来了全新的发展格局。 这一年&#xff0c;智慧城市的建设更加注重人性化、可持续性和创新性&#xff0c;为城市居民带来了前所未有的便捷与舒适。以下将重点关注智慧城市的几个核心内容&#xff0c;…

《幻兽帕鲁》攻略:0基础入门及游戏基础操作 幻兽帕鲁基础设施 幻兽帕鲁基础攻击力 Mac苹果电脑玩幻兽帕鲁 幻兽帕鲁加班加点

今天就跟大家聊聊《幻兽帕鲁》攻略&#xff1a;0基础入门及游戏基础操作。 如果想在苹果电脑玩《幻兽帕鲁》记得安装CrossOver哦。 以下纯干货&#xff1a; CrossOver正版安装包&#xff08;免费试用&#xff09;&#xff1a;https://souurl.cn/Y1gDao 一、基础操作 二、界面…

Logback - 日志框架

引言 在当今的企业级应用开发中&#xff0c;日志管理是一个不可或缺的部分。它不仅帮助我们进行错误跟踪&#xff0c;还能有效监控应用程序的运行状态&#xff0c;为性能优化提供数据支撑。Spring Boot作为一个简化Spring应用开发的框架&#xff0c;自带了强大的日志管理功能。…

雾计算:去中心化计算的未来之旅

雾计算是去中心化计算的基石&#xff0c;它将重塑我们的数字格局。通过使计算和存储更接近数据源&#xff0c;它改变了我们处理物联网生成数据的方式。通过雾计算探索未来&#xff0c;揭示了减少延迟、增强隐私和高效网络利用等好处。 随着传感器和可穿戴设备等物联网设备的数…