【flink状态管理(四)】MemoryStateBackend的实现

文章目录

  • 1.基于MemoryStateBackend创建KeyedStateBackend
    • 1.1. 状态初始化
    • 1.2. 创建状态
  • 2. 基于MemoryStateBackend创建OperatorStateBackend
  • 3.基于MemoryStateBackend创建CheckpointStorage

在Flink中,默认的StateBackend实现为MemoryStateBackend,本文以MemoryStateBackend为例说明StateBackend的设计与实现。

 
本文介绍MemoryStateBackend中如下三个主要组件的创建过程:

  • HeapKeyedStateBackend
  • OperatorStateBackend
  • MemoryBackendCheckpointStorage

FsStateBackend和RocksDBStateBackend这两种状态后端存储的实现,功能和MemoryStateBackend类似,区别在于内部创建的KeyedStateBackend和CheckpointStorage。

 

1.基于MemoryStateBackend创建KeyedStateBackend

1.1. 状态初始化

AbstractStreamOperator.keyedStatedBackend()方法定义了创建和初始化KeyedStatedBackend的逻辑,具体如下。

protected <K> AbstractKeyedStateBackend<K> keyedStateBackend(TypeSerializer<K> keySerializer,String operatorIdentifierText,PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates,CloseableRegistry backendCloseableRegistry,MetricGroup metricGroup) throws Exception {if (keySerializer == null) {return null;}String logDescription = "keyed state backend for " + operatorIdentifierText;//1. TaskInfo taskInfo = environment.getTaskInfo();final KeyGroupRange keyGroupRange = KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(taskInfo.getMaxNumberOfParallelSubtasks(),taskInfo.getNumberOfParallelSubtasks(),taskInfo.getIndexOfThisSubtask());// 确保恢复状态过程中构建的数据流被关闭CloseableRegistry cancelStreamRegistryForRestore = new CloseableRegistry();backendCloseableRegistry.registerCloseable(cancelStreamRegistryForRestore);// 创建BackendRestorerProcedureBackendRestorerProcedure<AbstractKeyedStateBackend<K>, KeyedStateHandle> backendRestorer =new BackendRestorerProcedure<>((stateHandles) -> stateBackend.createKeyedStateBackend(environment,environment.getJobID(),operatorIdentifierText,keySerializer,taskInfo.getMaxNumberOfParallelSubtasks(),keyGroupRange,environment.getTaskKvStateRegistry(),TtlTimeProvider.DEFAULT,metricGroup,stateHandles,cancelStreamRegistryForRestore),backendCloseableRegistry,logDescription);try {return backendRestorer.createAndRestore(prioritizedOperatorSubtaskStates.getPrioritizedManagedKeyedState());} finally {if (backendCloseableRegistry.unregisterCloseable(cancelStreamRegistryForRestore)) {IOUtils.closeQuietly(cancelStreamRegistryForRestore);}}
}
  1. 获取当前Task的TaskInfo,并基于TaskInfo的参数创建KeyGroupRange,表示当前Task实例中存储的Key分组区间
  2. 创建CloseableRegistry并注册到backendCloseableRegistry中,用于确保在任务取消的情况下关闭在恢复状态过程中构造的数据流。
  3. 创建BackendRestorerProcedure,提供了stateBackend.createKeyedStateBackend()方法,也包含恢复历史状态数据的方法。
  4. 创建KeyedStateBackend,同时对状态数据进行恢复。prioritizedOperatorSubtaskStates是从TaskStateManager中根据OperatorID获取的算子历史状态,通过prioritizedOperatorSubtaskStates获取当前算子的PrioritizedManagedKeyedState,并基于这些状态数据恢复算子的状态。

 

1.2. 创建状态

接下来我们看MemoryStateBackend.createKeyedStateBackend()方法的具体实现。

public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(Environment env,JobID jobID,String operatorIdentifier,TypeSerializer<K> keySerializer,int numberOfKeyGroups,KeyGroupRange keyGroupRange,TaskKvStateRegistry kvStateRegistry,TtlTimeProvider ttlTimeProvider,MetricGroup metricGroup,@Nonnull Collection<KeyedStateHandle> stateHandles,CloseableRegistry cancelStreamRegistry) throws BackendBuildingException {// 获取TaskStateManager实例TaskStateManager taskStateManager = env.getTaskStateManager();// 创建HeapPriorityQueueSetFactory实例HeapPriorityQueueSetFactory priorityQueueSetFactory =new HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128);// 创建HeapKeyedStateBackendBuilder实例HeapKeyedStateBackendreturn new HeapKeyedStateBackendBuilder<>(kvStateRegistry,keySerializer,env.getUserClassLoader(),numberOfKeyGroups,keyGroupRange,env.getExecutionConfig(),ttlTimeProvider,stateHandles,AbstractStateBackend.getCompressionDecorator(env.getExecutionConfig()),taskStateManager.createLocalRecoveryConfig(),priorityQueueSetFactory,isUsingAsynchronousSnapshots(),cancelStreamRegistry).build();
}
  1. 从environment参数中获取TaskStateManager实例
  2. 创建HeapPriorityQueueSetFactory实例,用于生成HeapPriorityQueueSet优先级队列,存储TimerHeapInternalTimer等数据。
  3. 调用HeapKeyedStateBackendBuilder.build()方法创建HeapKeyedStateBackend。

 

2. 基于MemoryStateBackend创建OperatorStateBackend

和创建KeyedStateBackend的过程相似,AbstractStreamOperator.operatorStateBackend()方法实现了创建OperatorStateBackend的方法。

protected OperatorStateBackend operatorStateBackend(String operatorIdentifierText,PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates,CloseableRegistry backendCloseableRegistry) throws Exception {String logDescription = "operator state backend for " + operatorIdentifierText;CloseableRegistry cancelStreamRegistryForRestore = new CloseableRegistry();backendCloseableRegistry.registerCloseable(cancelStreamRegistryForRestore);BackendRestorerProcedure<OperatorStateBackend, OperatorStateHandle> backendRestorer =new BackendRestorerProcedure<>((stateHandles) -> stateBackend.createOperatorStateBackend(environment,operatorIdentifierText,stateHandles,cancelStreamRegistryForRestore),backendCloseableRegistry,logDescription);try {return backendRestorer.createAndRestore(prioritizedOperatorSubtaskStates.getPrioritizedManagedOperatorState());} finally {if (backendCloseableRegistry.unregisterCloseable(cancelStreamRegistryForRestore)) {IOUtils.closeQuietly(cancelStreamRegistryForRestore);}}
}
  1. 创建CloseableRegisty,确保在任务取消的情况下能够关闭在恢复状态时构造的数据流。
  2. 创建BackendRestorerProcedure,封装了stateBackend.createOperatorStateBackend()方法,并包含恢复历史状态数据的操作。
  3. 创建OperatorStateBackend,并恢复状态数据。

其中prioritizedOperatorSubtaskStates是从TaskStateManager中根据OperatorID获取的算子专有历史状态,可以通过prioritizedOperatorSubtaskStates获取当前算子中的PrioritizedManagedOperatorState,并基于这些状态数据恢复OperatorStateBackend中算子的状态。

 

3.基于MemoryStateBackend创建CheckpointStorage

在createCheckpointStorage()方法中,直接创建MemoryBackendCheckpointStorage实例并返回,没有涉及太多的流程

public CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException {return new MemoryBackendCheckpointStorage(jobId, getCheckpointPath(), getSavepointPath(), maxStateSize);
}

 

参考:《Flink设计与实现:核心原理与源码解析》–张利兵

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/675030.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

掌握虚拟化与网络配置之道:深入浅出VMware及远程管理技巧

目录 虚拟机介绍 虚拟机的关键字 服务器架构的发展 为什么用虚拟机VMware 虚拟机和阿里云的区别 功能角度 价格因素 应用场景 优势方面 找到windows的服务管理 配置VMware 关于VMware安装的几个服务 vmware如何修改各种网络配置 关于NAT的详细信息(了解) NAT(网…

C语言——oj刷题——实现字符串逆序

当我们需要逆序一个字符串的内容时&#xff0c;可以通过C语言编写一个函数来实现。下面将详细介绍如何通过C语言实现这个功能&#xff0c;并附上代码示例。 1、实现原理 要逆序一个字符串的内容&#xff0c;可以使用两个指针来交换字符串中对应位置的字符。具体实现原理如下&am…

Ribbon全方位解析:构建弹性的Java微服务

第1章 引言 大家好,我是小黑,咱们今天聊聊Ribbon,这货是个客户端负载均衡工具,用在Spring Cloud里面能让咱们的服务调用更加灵活和健壮。负载均衡,听起来挺高大上的,其实就是把外界的请求平摊到多个服务器上,避免某个服务器压力太大,其他的却在那儿闲着。 Ribbon的牛…

Netty连接通道中的Channel参数模型

ChannelOption(Channel中的连接参数) ChannelOption.SOBACKLOG ChannelOption.SO_BACKLOG对应的是tcp/ip协议listen函数中的backlog参数&#xff0c;服务端处理客户端连接请求是顺序处理的&#xff0c;所以同一时间只能处理一个客户端连接&#xff0c;多个客户端来的时候&…

大数据领域的数据仓库

在大数据领域&#xff0c;数据仓库&#xff08;Data Warehouse&#xff09;是一个用于存储、管理和分析大量数据的集中式系统。它从多个异构数据源收集数据&#xff0c;对数据进行清洗、转换和整合&#xff0c;然后将其存储在一个集中的位置&#xff0c;以支持复杂的查询、报告…

Windows命令行设置IP地址

Windows命令行设置IP地址 一、常规设置IP的方法 开始-控制面板-网络连接-本地连接-属性-常规Internet协议&#xff0c;选择自动获取&#xff0c;或手动设置IP和DNS。 二、命令行设置IP的方法 2.1. netsh命令介绍 2.1.1. 设置动态获取IP地址和和自动获取DNS&#xff08;DHCP&a…

传输层协议 ——— TCP协议

TCP协议 TCP协议谈谈可靠性为什么网络中会存在不可靠&#xff1f;TCP协议格式TCP如何将报头与有效载荷进行分离&#xff1f;序号与确认序号 确认应答机制&#xff08;ACK&#xff09;超时重传机制连接管理机制三次握手四次挥手 流量控制滑动窗口拥塞控制延迟应答捎带应答面向字…

使用ESP-01/ESP-01S接入Homekit远程控制电器

一、准备材料 ESP-01/ESP-01s 芯片 、 继电器模块 、 烧录器 二、下载固件和烧录软件 固件地址https://github.com/RavenSystem/esp-homekit-devices 烧录软件下载地址&#xff1a;https://drive.google.com/file/d/1_M4EzolaJWpYXts_FwUIqH8pZWqy-fye/view 三、烧录固件 …

基于gici多传感器融合定位的图优化代码学习

前言 本文是基于gici-open项目对因子图优化GraphC类 的学习&#xff0c;由于此项目的最小二乘估计部分采用了google的开源ceres库&#xff0c;可以从ceres的官方帮助文档处了解&#xff1a;Solving Non-linear Least Squares — Ceres Solver (ceres-solver.org) 在graph.h的…

springboot(ssm大学生计算机基础网络教学系统 在线课程系统Java系统

springboot(ssm大学生计算机基础网络教学系统 在线课程系统Java系统 开发语言&#xff1a;Java 框架&#xff1a;springboot&#xff08;可改ssm&#xff09; vue JDK版本&#xff1a;JDK1.8&#xff08;或11&#xff09; 服务器&#xff1a;tomcat 数据库&#xff1a;mys…

JavaScript事件

事件 事件-表单 元素获得焦点 onfocus&#xff1a; onfocus 事件在对象获得焦点时发生。鼠标点击获取焦点是发生 onblur&#xff1a; onblur 事件发生在对象失去焦点时&#xff0c;比如说你有个文本框&#xff0c;你鼠标点击进去&#xff0c;只有在离开的时候才会执行onblu…

【原创】Qt库open62541 MinGW编译

一、前言 为了统一公司的驱动层开发&#xff0c;准备采用OpcUA的方式转发底层数据&#xff0c;而服务器有Windows Server&#xff0c;也有CentOS&#xff0c;因此想用Qt开发一个基于MinGW的OpcUA Server&#xff0c;这样就能跨平台部署。这里记录一下&#xff0c;希望对你也有用…

Compose | UI组件(十五) | Scaffold - 脚手架

文章目录 前言一、Scaffold脚手架简介二、Scaffold的主要组件三、如何使用Scaffold四、Compose中Scaffold脚手架的具体例子例子1&#xff1a;基本Scaffold布局例子2&#xff1a;带有Drawer的Scaffold布局例子3&#xff1a;带有Snackbar的Scaffold布局 总结 前言 Compose中的Sca…

Python循环语句——for循环临时变量作用域

一、引言 在Python编程中&#xff0c;变量是程序运行的核心。其中&#xff0c;临时变量扮演着重要的角色&#xff0c;用于存储中间结果或临时数据。然而&#xff0c;这些临时变量并非随意存在&#xff0c;它们受到作用域的限制。了解临时变量的作用域对于编写高效、可维护的代…

神经网络 | 常见的激活函数

Hi&#xff0c;大家好&#xff0c;我是半亩花海。本文主要介绍神经网络中必要的激活函数的定义、分类、作用以及常见的激活函数的功能。 目录 一、激活函数定义 二、激活函数分类 三、常见的几种激活函数 1. Sigmoid 函数 &#xff08;1&#xff09;公式 &#xff08;2&a…

代码随想录算法训练营第三十天 回溯算法总结、332.重新安排行程、51. N皇后、37. 解数独

代码随想录算法训练营第三十天 | 回溯算法总结、**332.重新安排行程、**51. N皇后、37. 解数独 回溯算法总结 回溯就是递归的副产品&#xff0c;只要有递归就会有回溯 回溯就是一个暴力搜索法&#xff0c;并不是什么高效的算法 回溯算法的题目分类&#xff1a; 组合&#…

LeetCode 第28天

93. 复原 IP 地址 这题挺难的&#xff0c;实际上我觉得分割字符串的题都挺难的&#xff0c;即使知道了回溯算法&#xff0c;也是无从下手。因为要对字符串进行处理&#xff0c;关于分割点不知道怎么处理。关键部分理解在代码里。 class Solution { private: // 判断分割的子串…

GPT每日面试题—如何理解JS原型链

充分利用ChatGPT的优势&#xff0c;帮助我们快速准备前端面试。今日问题&#xff1a;如何理解JS原型链&#xff1f; Q&#xff1a;如果在前端面试中&#xff0c;被问到如何理解JS原型链&#xff0c;怎么回答比较好&#xff1f; A&#xff1a;当面试官问到如何理解 JavaScript …

12.03 校招 实习 内推 面经

绿*泡*泡VX&#xff1a; neituijunsir 交流裙 &#xff0c;内推/实习/校招汇总表格 1、自动驾驶一周资讯 - 英伟达自动驾驶中国团队扩招&#xff1b;地平线与安波福首个合作成果取得定点&#xff1b;通用汽车自动驾驶Cruise首席执行官辞职 自动驾驶一周资讯 - 英伟达自动驾…

问题:创业者在组建创业团队时,在个人特征和动机方面更应该注重创业者的( ) #知识分享#微信#媒体

问题&#xff1a;创业者在组建创业团队时&#xff0c;在个人特征和动机方面更应该注重创业者的&#xff08; &#xff09; 参考答案如图所示