【flink状态管理(2)各状态初始化入口】状态初始化流程详解与源码剖析

文章目录

    • 1. 状态初始化总流程梳理
    • 2.创建StreamOperatorStateContext
    • 3. StateInitializationContext的接口设计。
    • 4. 状态初始化举例:UDF状态初始化

在TaskManager中启动Task线程后,会调用StreamTask.invoke()方法触发当前Task中算子的执行,在invoke()方法中会调用restoreInternal()方法,这中间包括创建和初始化算子中的状态数据。
另外在invoke中,可以通过判断任务状态来判断是否需要初始化状态。

        // Allow invoking method 'invoke' without having to call 'restore' before it.if (!isRunning) {LOG.debug("Restoring during invoke will be called.");restoreInternal();}

StreamTask调用initializeStateAndOpenOperators()方法对当前Task中所有算子的状态数据进行初始化。

RegularOperatorChain.
public void initializeStateAndOpenOperators(StreamTaskStateInitializer streamTaskStateInitializer) throws Exception {  Iterator var2 = this.getAllOperators(true).iterator();  while(var2.hasNext()) {  StreamOperatorWrapper<?, ?> operatorWrapper = (StreamOperatorWrapper)var2.next();  StreamOperator<?> operator = operatorWrapper.getStreamOperator();  operator.initializeState(streamTaskStateInitializer);  operator.open();  }  }

 
找到了算子状态初始化的位置,我们继续了解状态是如何初始化的。

1. 状态初始化总流程梳理

AbstractStreamOperator.initializeState中描述了状态初始化的总体流程,如下代码以及注释:

# AbstractStreamOperator.initializeStatepublic final void initializeState(StreamTaskStateInitializer streamTaskStateManager)  throws Exception {  //1. 获取类型序列化器final TypeSerializer<?> keySerializer =  config.getStateKeySerializer(getUserCodeClassloader());  //2. get containingTaskfinal StreamTask<?, ?> containingTask = Preconditions.checkNotNull(getContainingTask());  final CloseableRegistry streamTaskCloseableRegistry =  Preconditions.checkNotNull(containingTask.getCancelables());  //3. create StreamOperatorStateContextfinal StreamOperatorStateContext context =  streamTaskStateManager.streamOperatorStateContext(  getOperatorID(),  getClass().getSimpleName(),  getProcessingTimeService(),  this,  keySerializer,  streamTaskCloseableRegistry,  metrics,  config.getManagedMemoryFractionOperatorUseCaseOfSlot(  ManagedMemoryUseCase.STATE_BACKEND,  runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(),  runtimeContext.getUserCodeClassLoader()),  isUsingCustomRawKeyedState());  //4. create stateHandlerstateHandler =  new StreamOperatorStateHandler(  context, getExecutionConfig(), streamTaskCloseableRegistry);  timeServiceManager = context.internalTimerServiceManager();  //5. initialize OperatorStatestateHandler.initializeOperatorState(this);  //6. set KeyedStateStore in runtimeContextruntimeContext.setKeyedStateStore(stateHandler.getKeyedStateStore().orElse(null));  
}

在StreamOperator初始化状态数据的过程中,首先从StreamTask中获取创建状态需要的组件,例如托管状态的管理后端KeyedStateBackend、OperatorStateBackend以及原生状态管理的KeyedStateInputs和OperatorStateInputs组件。

状态数据操作过程中使用的管理组件最终都会封装成StateInitializationContext并传递给子类使用,例如在AbstractUdfStreamOperator中,就会使用StateInitializationContext中的信息初始化用户定义的UDF中的状态数据。

2.创建StreamOperatorStateContext

接下来看如何在Task实例初始化时创建这些组件,并将其存储在StreamOperatorStateContext中供算子使用,如下代码:

StreamTaskStateInitializerImpl
@Override  
public StreamOperatorStateContext streamOperatorStateContext(  @Nonnull OperatorID operatorID,  @Nonnull String operatorClassName,  @Nonnull ProcessingTimeService processingTimeService,  @Nonnull KeyContext keyContext,  @Nullable TypeSerializer<?> keySerializer,  @Nonnull CloseableRegistry streamTaskCloseableRegistry,  @Nonnull MetricGroup metricGroup,  double managedMemoryFraction,  boolean isUsingCustomRawKeyedState)  throws Exception {  //1. 获取task实例信息TaskInfo taskInfo = environment.getTaskInfo();  OperatorSubtaskDescriptionText operatorSubtaskDescription =  new OperatorSubtaskDescriptionText(  operatorID,  operatorClassName,  taskInfo.getIndexOfThisSubtask(),  taskInfo.getNumberOfParallelSubtasks());  final String operatorIdentifierText = operatorSubtaskDescription.toString();  final PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates =  taskStateManager.prioritizedOperatorState(operatorID);  CheckpointableKeyedStateBackend<?> keyedStatedBackend = null;  OperatorStateBackend operatorStateBackend = null;  CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs = null;  CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs = null;  InternalTimeServiceManager<?> timeServiceManager;  try {  // 创建keyed类型的状态后端// -------------- Keyed State Backend --------------  keyedStatedBackend =  keyedStatedBackend(  keySerializer,  operatorIdentifierText,  prioritizedOperatorSubtaskStates,  streamTaskCloseableRegistry,  metricGroup,  managedMemoryFraction);  //创建operator类型的状态后端// -------------- Operator State Backend --------------  operatorStateBackend =  operatorStateBackend(  operatorIdentifierText,  prioritizedOperatorSubtaskStates,  streamTaskCloseableRegistry);  //创建原生类型状态后端// -------------- Raw State Streams --------------  rawKeyedStateInputs =  rawKeyedStateInputs(  prioritizedOperatorSubtaskStates  .getPrioritizedRawKeyedState()  .iterator());  streamTaskCloseableRegistry.registerCloseable(rawKeyedStateInputs);  rawOperatorStateInputs =  rawOperatorStateInputs(  prioritizedOperatorSubtaskStates  .getPrioritizedRawOperatorState()  .iterator());  streamTaskCloseableRegistry.registerCloseable(rawOperatorStateInputs);  //创建Internal Timer Service Manager// -------------- Internal Timer Service Manager --------------  if (keyedStatedBackend != null) {  // if the operator indicates that it is using custom raw keyed state,  // then whatever was written in the raw keyed state snapshot was NOT written            // by the internal timer services (because there is only ever one user of raw keyed            // state);            // in this case, timers should not attempt to restore timers from the raw keyed            // state.            final Iterable<KeyGroupStatePartitionStreamProvider> restoredRawKeyedStateTimers =  (prioritizedOperatorSubtaskStates.isRestored()  && !isUsingCustomRawKeyedState)  ? rawKeyedStateInputs  : Collections.emptyList();  timeServiceManager =  timeServiceManagerProvider.create(  keyedStatedBackend,  environment.getUserCodeClassLoader().asClassLoader(),  keyContext,  processingTimeService,  restoredRawKeyedStateTimers);  } else {  timeServiceManager = null;  }  // -------------- Preparing return value --------------  return new StreamOperatorStateContextImpl(  prioritizedOperatorSubtaskStates.getRestoredCheckpointId(),  operatorStateBackend,  keyedStatedBackend,  timeServiceManager,  rawOperatorStateInputs,  rawKeyedStateInputs);  } catch (Exception ex) {  。。。。
}

流程梳理:

  1. 从environment中获取TaskInfo,并基于Task实例创建OperatorSubtaskDescriptionText。Operator中Task实例的描述信息包含OperatorID、OperatorClassName等,最终用于创建OperatorStateBackend的状态存储后端。
  2. 创建KeyedStateBackend,KeyedStateBackend是KeyedState的状态管理后端,提供创建和管理KeyedState的方法。
  3. 创建OperatorStateBackend,OperatorStateBackend是OperatorState的状态管理后端,提供获取和管理OperatorState的接口。
  4. 创建KeyGroupStatePartitionStreamProvider实例,提供创建和获取原生KeyedState的方法。
  5. 创建StatePartitionStreamProvider实例,提供创建和获取原生OperatorState的方法。
  6. 将所有创建出来的托管状态管理后端keyedStatedBackend和operatorStateBackend、原生状态存储后端rawKeyedStateInputs和rawOperatorStateInputs及timeServiceManager实例,全部封装在StreamOperatorStateContextImpl上下文对象中,并返回给AbstractStreamOperator使用。

 
小结
StreamTaskStateInitializer.streamOperatorStateContext()方法包含创建托管状态和原生状态管理后端的全过程。StreamOperator的实现类能够从StreamOperatorStateContext中获取这些状态管理组件并使用它们创建指定类型的状态,最终状态数据会存储在状态管理后端指定的物理介质上,例如堆内存或RocksDB。

StateInitializationContext会被用于算子和UserDefinedFunction中,实现算子或函数中的状态数据操作。

 

3. StateInitializationContext的接口设计。

StateInitializationContext接口同时继承了ManagedInitializationContext接口和FunctionInitializationContext接口。StateInitializationContext接口的默认实现类为StateInitializationContextImpl。
在这里插入图片描述

  1. ManagedInitializationContext接口提供了托管状态使用的KeyedStateStore和OperatorStateStore获取方法,即KeyedStateBackend和OperatorStateBackend的封装类。算子进行初始化时,会通过KeyedStateStore和OperatorStateStore提供的方法创建和管理指定类型的托管状态。

  2. FunctionInitializationContext提供了用户自定义函数状态数据初始化需要的方法。它和ManagedInitializationContext保持一致,这主要是为了和算子使用的上下文进行区分,但两者的操作基本一致。

  3. StateInitializationContext提供了对托管状态数据的管理,并在内部继承和拓展了获取及管理原生状态数据的方法,如getRawOperatorStateInputs()、getRawKeyedStateInputs()等

  4. StateInitializationContextImpl具备操作管理状态和原生状态的能力。基于它可以获取不同类型的状态管理后端,并基于状态管理操作状态数据。

在这里插入图片描述

 

4. 状态初始化举例:UDF状态初始化

在AbstractStreamOperator中调用initializeState(StateInitializationContext context)抽象方法初始化Operator中的状态。这里以AbstractUdfStreamOperator为例说明具体算子、UDF是如何进行状态初始化的。

AbstractUdfStreamOperator.initializeState()方法实际上调用了StreamingFunctionUtils.restoreFunctionState()方法对User-Defined Function中的状态数据进行初始化和恢复,实际上就是将上文创建的StateInitializationContext上下文信息提供给Function接口使用。

public void initializeState(StateInitializationContext context) throws Exception {super.initializeState(context);StreamingFunctionUtils.restoreFunctionState(context, userFunction);
}

恢复函数内部的状态数据涉及Checkpoint的实现,我们会在之后介绍如何在StreamingFunctionUtils.restoreFunctionState()方法中恢复函数中的状态数据。

 
《Flink设计与实现:核心原理与源码解析》张利兵

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/672855.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

常用的前端模块化标准总结

1、模块化标准出现以前使用的模块化方案&#xff1a; 1&#xff09;文件划分&#xff1a; 将不同的模块定义在不同的文件中&#xff0c;然后使用时通过script标签引入这些文件 缺点&#xff1a; 模块变量相当于是定义在全局的&#xff0c;容易造成变量名冲突&#xff08;即不…

flink反压及解决思路和实操

1. 反压原因 反压其实就是 task 处理不过来&#xff0c;算子的 sub-task 需要处理的数据量 > 能够处理的数据量&#xff0c;比如&#xff1a; 当前某个 sub-task 只能处理 1w qps 的数据&#xff0c;但实际上到来 2w qps 的数据&#xff0c;但是实际只能处理 1w 条&#…

Qt信号和槽机制(什么是信号和槽,connect函数的形式,按钮的常用信号,QWidget的常用槽,自定义槽函数案例 点击按钮,输出文本)

一.什么是信号和槽 信号槽式Qt中的一个很重要的机制。信号槽实际上是观察者模式,当发生了感兴趣的事件&#xff0c;某一个操作就会被自动触发。当某个事件发生之后&#xff0c;比如按钮检测到自己被点击了一下&#xff0c;它就会发出一个信号。这种发出类似广播。如果有对象对…

ArcGIS学习(五)坐标系-2

3.不同基准面坐标系之间的转换 在上一关中,我们学习了ArcGIS中的投影(投影栅格)工具,并以"WGS1984地理坐标系与WGS1984的UTM投影坐标系的转换”为例进行讲解。 "WGS1984地理坐标系与WGS1984的UTM投影坐标系的转换”代表的是同一个基准面下的两个坐标的转换。 …

人工智能 | 深度学习的进展

深度学习的进展 深度学习是人工智能领域的一个重要分支&#xff0c;它利用神经网络模拟人类大脑的学习过程&#xff0c;通过大量数据训练模型&#xff0c;使其能够自动提取特征、识别模式、进行分类和预测等任务。近年来&#xff0c;深度学习在多个领域取得了显著的进展&#…

cesium mapboxgl+threebox glb 朝向问题

一、3Dbuilder打开glb 二、cesium在pitch和heading都为0的情况下&#xff0c;不设置模型的朝向 三、mapboxglthreebox在pitch和bearing都为0的情况下&#xff0c;不设置模型的朝向 四、对于地图默认视角&#xff0c;cesium设置pitch-90、heading0的时候和mapboxglthreebox设置p…

光学PCIe 6.0技术引领AI时代超大规模集群

随着云计算、大数据和人工智能技术的快速发展&#xff0c;超大规模数据中心正经历一场前所未有的变革。传统的集中式架构逐渐转变为解聚式&#xff08;disaggregated&#xff09;架构&#xff0c;这种架构将计算、存储和网络资源从单一的物理服务器中分离出来&#xff0c;形成独…

前端vite+vue3——自动化配置路由布局

文章目录 ⭐前言&#x1f496;vue3系列文章 ⭐ 自动化配置路由&#x1f496;引入vite版本自定义目录映射&#x1f496;自动化读取文件下的路由&#x1f496;main入口加载路由&#x1f496;入口app.vue配置&#x1f496;layout基础布局配置&#x1f496;效果 ⭐总结⭐结束 ⭐前言…

Linux系统安装apache服务器并发布网站,以及配置

这个如果单纯是发布一个静态页面的话&#xff0c;真的好方便 第一步&#xff0c;安装httpd—apache服务器yum install -y httpd 第二步&#xff0c;启动服务 systemctl start httpd 这里其实服务器已经搭建好了。 设置服务系统启动时自动启动 systemctl enable httpd 查看apac…

算法专题:滑动窗口

参考练习习题总集 文章目录 3. 无重复字符的最长子串30. 串联所有单词的子串76. 最小覆盖子串187. 重复的DNA序列219. 存在重复元素 II220. 存在重复元素 III396. 旋转函数424. 替换后的最长重复字符438. 找到字符串中所有字母异位词 滑动窗口太简单了&#xff0c;没啥说的自己…

信号的处理机制

信号的处理机制 信号的来源信号的处理方式注册信号处理函数信号的未决和阻塞实时信号实时信号的特性使用实时信号注意事项 为什么需要可重入的信号处理函数可重入函数的特性例子在信号处理中保持可重入性 Unix/Linux系统中的信号处理机制提供了一种方式&#xff0c;允许操作系统…

ip、子网掩码和A、B、C段

文章目录 概要ip和子网掩码的关系如何进一步理解两者之间关系示例问题根据IP地址和子网掩码求网络号、主机号A段、B段、C段 概要 ip、子网掩码、C段相关知识 ip和子网掩码的关系 IP地址和子网掩码在网络中密切关联&#xff0c;共同用于确定一个设备属于哪个网络以及如何划分网…

text-generation-webui搭建大模型运行环境与踩坑记录

text-generation-webui搭建大模型运行环境 text-generation-webui环境初始化准备模型启动项目Bug说明降低版本启动项目 text-generation-webui text-generation-webui是一个基于Gradio的LLM Web UI开源项目&#xff0c;可以利用其快速搭建部署各种大模型环境。 环境初始化 下载…

python数据容器之列表相关的操作

列表是Python中最常用的数据容器之一&#xff0c;它可以存储多个元素&#xff0c;并且可以进行增加、删除、修改、查找等操作。下面是一些常见的列表操作&#xff1a; 创建列表&#xff1a;使用方括号 [] 或者 list() 函数来创建一个列表。例如&#xff1a; fruits [apple, …

【漏洞复现】EPON上行A8-C政企网关未授权下载漏洞

Nx01 产品简介 EPON上行A8-C政企网关是一款终端产品&#xff0c;提供企业网络解决方案。 Nx02 漏洞描述 EPON上行A8-C政企网关配置文件未授权下载漏洞&#xff0c;攻击者在未授权状态下下载配置文件&#xff0c;获取配置文件内敏感信息。 Nx03 产品主页 fofa-query: "Z…

Retinexformer论文精读笔记

Retinexformer论文精读笔记 论文为2023年ICCV的Retinexformer: One-stage Retinex-based Transformer for Low-light Image Enhancement。论文链接&#xff1a;browse.arxiv.org/pdf/2303.06705.pdf&#xff0c;代码链接&#xff1a;caiyuanhao1998/Retinexformer: “Retinexfo…

Mac 下载安装Java、maven并配置环境变量

下载Java8 下载地址&#xff1a;https://www.oracle.com/java/technologies/downloads/ 根据操作系统选择版本 没有oracle账号需要注册、激活登录 mac直接选择.dmg文件进行下载&#xff0c;下载后安装。 默认安装路径&#xff1a;/Library/Java/JavaVirtualMachines/jdk-1…

【C#】.net core 6.0 创建默认Web应用,以及默认结构讲解,适合初学者

欢迎来到《小5讲堂》 大家好&#xff0c;我是全栈小5。 这是《C#》系列文章&#xff0c;每篇文章将以博主理解的角度展开讲解&#xff0c; 特别是针对知识点的概念进行叙说&#xff0c;大部分文章将会对这些概念进行实际例子验证&#xff0c;以此达到加深对知识点的理解和掌握。…

Redis面试题44

人工智能是否有可能取代人类工作岗位&#xff1f; 答&#xff1a;人工智能在某些领域可能会取代一些人类工作岗位&#xff0c;但同时也会创造新的工作岗位。以下是一些关于人工智能是否取代人类工作岗位的观点&#xff1a; 替代观点&#xff1a;人工智能的发展和应用可能导致一…

标注工具体积3D数据集

史上最全 | 计算机视觉2D/3D标注工具汇总&#xff01; 3D点云标注有哪些好用的开源工具 https://www.appen.com.cn/blog/3d-annotation-tool/ 3D LABELING TOOLBOX 超全的3D视觉数据集汇总 常用的图像标注工具汇总 OpenCV探索之路&#xff08;二十五&#xff09;&#xf…