Flink 状态管理与容错机制(CheckPoint SavePoint)的关系

一、什么是状态

无状态计算的例子: 例如一个加法算子,第一次输入2+3=5那么以后我多次数据2+3的时候得到的结果都是5。得出的结论就是,相同的输入都会得到相同的结果,与次数无关。
有状态计算的例子: 访问量的统计,我们都知道Nginx的访问日志一个请求一条日志,基于此我们就可以统计访问量。如下,/api/a这个url第一此访问的时候,返回的结果就是 count1,但当第二次访问的时候,返回的结果变成了2。为什么Flink知道之前已经处理过一次 hello world,这就是state发挥作用了,这里是被称为keyed state存储了之前需要统计的数据,keyby接口的调用会创建keyed streamkey进行划分,这是使用keyed state的前提。得出的结论就是,相同的输入得到不同的结果,与次数有关。这就是有状态的数据。
[点击并拖拽以移动] ​

什么场景下会大量使用到这种状态数据啦?简单举几个例子:
【1】去重的需求中,比如说我们只想知道这100个同事都属于那几个部门的等等。
【2】窗口计算,已进入未触发的数据。比如,我们一分钟统计一次,1-2之间的1.5这个时候的数据对于2来说就是一个有状态的数据,因为2的结果与1.5有关。
【3】机器学习/深度学习,训练的模型及参数。这对于机器学习的同学深入感触。比如,第一次输入hello,机器会给我一个反馈,那么下次会基于这个反馈做进一步的学习处理。那么上一步的结果对于我而言就是一种有状态的输入。
【4】访问历史数据,需要与昨日进行对比。昨日的数据对于今日而言也属于一种状态。你品,你细品。

为什么要管理状态,用内存不香吗?首先流失作业是有它的标准的,不是什么东西随随便便就说自己这个是流失处理。首先,7*24小时运行,高可靠,你内存不行吧,你的容量总有用完的时候吧。其次,数据不丢失不重,恰好计算一次,你内存要实现需要备份和恢复,你还总伴随着小部分数据的丢失吧。最后,数据实时产生,不延迟,你内存不够横向扩展时,你需要延迟吧。

理想的状态管理就是下面描述的样子,Flink也都帮我们实现了。
[点击并拖拽以移动] ​

二、状态的类型

Managed State & Raw State

Managed StateRaw State
状态管理方式Flink Runtime 管理 —自动存储,自动恢复 —内存管理上有优化用户自己管理(Flink不知道你在State中存储的数据结构的) —要自己实例化
状态数据结构已知的数据结构 —value,list,map…字节数据 —byte[]
推荐使用场景大多数情况下均可使用自定义 Operator 时可以使用(当Managed State 不够时使用)

Managed Stated 分为: Keyed StatedOperator State
【1】Keyed Stated: 只能用于keyBy生成的KeyedStream上的算子。每一个key对应一个State,一个Operator实例处理多个Key,访问相应的多个State。相同Key会在相同的实例中处理。整个过程如果没有keyBy操作,它是没有KeyedStream的,而Keyed Stated只能应用在KeyedStream 上。

并发改变: State随着Key在实例间迁移。例如:实例A中之前处理KeyAKeyB,后面我扩展了实例B,那么 实例A就只需要处理KeyAKeyB就交给 实例B进行处理。安装状态进行分离,可以理解为分布式。

通过 RuntimeContext 访问,说明Operator是一个Rich Function,否则是拿不到RuntimeContext

支持的数据结构: ValueStateListStateReducingStateAggregatingStateMapState

【2】Operator State: 可以用于所有的算子,常用于source上,例如FlinkKafkaConsumer。一个Operator实例对应一个State,所以一个Operator中会处理多个key,可以理解为集群。

并发改变: Operator State没有key,并发改变的时候就需要重新分配。内置了两种方案:均匀分配和合并后每个得到全量。

访问方式: 实现CheckpointedFunctionListCheckpointed接口。

支持的数据结构: ListState

三、Keyed State 使用示例

什么是 keyed state: 对于keyed state,有两个特点:
【1】只能应用于KeyedStream 的函数与操作中,例如Keyed UDF, window state
【2】keyed state是已经分区 / 划分好的,每一个 key 只能属于某一个 keyed state
对于如何理解已经分区的概念,我们需要看一下keyby的语义,大家可以看到下图左边有三个并发,右边也是三个并发,左边的词进来之后,通过keyby会进行相应的分发。例如对于hello wordhello这个词通过hash运算永远只会到右下方并发的task上面去。
[点击并拖拽以移动] ​

什么是 operator state
【1】又称为non-keyed state,每一个operator state都仅与一个operator的实例绑定。
【2】常见的operator statesource state,例如记录当前sourceoffset再看一段使用operator stateword count代码:
[点击并拖拽以移动] ​

这里的fromElements会调用FromElementsFunction的类,其中就使用了类型为list stateoperator state。如下几种Keyed State之间的依赖关系,都是state的子类。它们的访问方式和数据结构都有一定的区别。
[点击并拖拽以移动] ​

状态数据类型访问接口备注
ValueState单个值[update(T) 修改/T value 获取]例如 WordCount 用 word 做 key,state就是单个的数值。这个单个也可以是字符串、对象等都有可能。访问方式只有上面两种。
MapStateMapput(UK key, UV value) putAll(Map<UK,UV> map) remove(UK key) boolean contains(UK key) UV get(UK key) Iterable<Map.Entry> entries() Iterable<Map.Entry> iterator() Iterable keys() Iterable values()能够操作具体的对象的key
ListStateListadd/ addAll(List) update(List) Iterable get()
ReducingState单个值add/ addAll(List) update(List) T get()与 List 是同一个父类,这个add是直接将数据更新进了 Reducing的结果里面。举个例子,例如我们统计1分钟的结果,list是先将数据添加到list中,等到1分钟的时候全来出来统计。而 Reducing是来一条就统计一条结果。好处是节省内存。
AggregatingState单个值add(IN)/OUT get()与 List 是同一个父类,与Reducing的不同是,Reducing输入和输出的类型都是相同的。而Aggregating 是可以不同的。例如,我要计算一个平局值,Reducing是算好返回,而Aggregating会返回总和和个数。

举个ValueState的案例

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//获取数据流
DataStream<Event> events = env.addSource(source);DataStream<Alert> alerts = events// 生成 keyedStata 通过 sourceAddress.keyBy(Event::sourceAddress)// StateMachineMapper 状态机.flatMap(new StateMachineMapper());//我么看下状态机怎么写   实现 RichFlatMapFunction
@SuppressWarnings("serial")
static class StateMachineMapper extends RichFlatMapFunction<Event, Alert> {private ValueState<LeaderLatch.State> currentState;@Overridepublic void open(Configuration conf) {// 获取一个 valueStatecurrentState = getRuntimeContext().getState(new ValueStateDescriptor<>("state", State.class));}//来一条数据处理一条@Overridepublic void flatMap(Event evt, Collector<Alert> out) throws Exception {// 获取 valueState state = currentState.value();if (state == null) {state = State.Initial;//State 是本地的变量}// 把事件对状态的影响加上去,得到一个状态State nextState = state.transition(evt.type());//判断状态是否合法if (nextState == State.InvalidTransition) {//扔出去out.collect(new Alert(evt.sourceAddress(), state, evt.type()));}//是否不能继续转化了,例如取消的订单else if (nextState.isTerminal()) {// 从 state 中清楚掉currentState.clear();}else {// 修改状态currentState.update(nextState);}}
}

四、CheckPoint 与 state 的关系

Checkpoint是从source触发到下游所有节点完成的一次全局操作。下图可以有一个对Checkpoint的直观感受,红框里面可以看到一共触发了 569KCheckpoint,然后全部都成功完成,没有fail的。
[点击并拖拽以移动] ​

**state 其实就是 Checkpoint 所做的主要持久化备份的主要数据,**看下图的具体数据统计,其state也就9kb大小 。
[点击并拖拽以移动] ​

五、状态如何保存和恢复

Checkpoint定时制作分布式快照,对程序的状态进行备份。发生故障时,将整个作业的Task都回滚到最后一次成功Checkpoint中的状态,然后从保存的点继续处理。

必要条件: 数据源支持重发(如果不重发,丢失的消息就真的丢了)

一致性语义: 恰好一次(如果p相同,单线程,多个线程时,可能有的算子对其已经计算了一次了,有的没有就需要注意),至少一次。

//  获取运行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//状态数据
//两个checkpoint 触发间隔设置1S,越频繁追的数据就越少,io消耗也越大
env.enableCheckpointing(1000);
//EXACTLY_ONCE语义说明 Checkpoint是要对替的,这样消息不会重复,也不会对丢。
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//两个checkpoint 最少等待500ms 例如第一个checkpoint做了700ms按理300ms后就要做下一个checkpoint。但是它们之间的等待时间300ms<500ms 此时,就会延长200ms减少checkpoint过于频繁,影响业务。
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
//checkpoint多久超时,如果这个checkpoint在1分钟内还没做完,那就失败了
env.getCheckpointConfig().setCheckpointTimeout(60000);
//同时最多有多少个checkpoint进行
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
//当重新分配并发度,拆分task时,是否保存checkpoint。如果不保存就需要使用savepoint来保存数据,放到外部的介质中。
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);

Checkpoint vs Savepoint

CheckpointSavepoint
触发管理方式由Flink自动触发并管理由用户手动触发并管理
主要用途在 Task 发生异常时快速恢复,例如网络抖动导致的超时异常有计划的进行备份,使作业能停止后再恢复,例如修改代码、调整并发。
特点轻量、自动从故障中服务、在作业停止后默认清除持久、以标准格式存储,允许代码或配置发生变化、手动触发 savepoint 恢复。

可选的状态存储方式:
【1】MemoryStateBackend:构造方法:

MemoryStateBackend(int maxStateSize, boolean asynchronousSnapshots)

存储方式: StateTaskManager内存。CheckpointJobManager内存。
容量限制: 单个State maxStateSize默认5MmaxStateSize <= akka.framesize默认10M。总大小不超过JobManager内存。
推荐使用场景: 本地测试,几乎无状态的作业,比如ETL/JobManager不容易挂,或影响不大的情况。不推荐在生产场景使用。

【2】FsStateBackend: 构造方法:

FsStateBackend(URL checkpointDataUri, boolean asynchronousSnapshots)

存储方式: StateTaskManager内存。Checkpoint:外部文件系统(本地或HDFS)。
容量限制: 单个TaskManagerState总量不超过它的内存。总大小不超过配置的文件系统容量(会定期清理)。
推荐使用场景: 常规使用状态的作业,例如分钟级窗口聚合、join。需要开启HA的作业。可以在生产环境使用。

【3】RocksDBStateBackend: 构造方法:

RocksDBStateBackend(URL checkpointDataUri, boolean enableIncrementalCheckpointing)

存储方式: StateTaskManager上的KV数据库(实际使用内存+磁盘)。Checkpoint:外部文件系统(本地或HDFS)。
容量限制: 单个TaskManagerState总量不超过它的内存+磁盘,单个key 最大2G。总大小不超过配置的文件系统容量。
推荐使用场景: 超大状态的作业,例如天级窗口聚合。需要开启HA的作业。对状态读写性能要求比较高的作业。可以在生产环境使用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/237802.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

工业5G路由器提升驾考效率,实现智慧驾考物联网

为了提高驾考的考试效率&#xff0c;更好地服务广大学员&#xff0c;车管所驻考场监控中心结合物联网技术采用智慧驾考系统&#xff0c;实现考场监控、考试员远程监考、学员视频实时回传、自动评判等功能&#xff0c;为驾考公平公正安全提供保障。 该系统由智能监控管理平台和…

深入了解Spring MVC工作流程

目录 1. MVC架构简介 2. Spring MVC的工作流程 2.1. 客户端请求的处理 2.2. 视图解析和渲染 2.3. 响应生成与返回 3. Spring MVC的关键组件 3.1. DispatcherServlet 3.2. HandlerMapping 3.3. Controller 3.4. ViewResolver 4. 结论 Spring MVC&#xff08;Model-Vi…

基于Java版本与鸿鹄企业电子招投标系统的二次开发实践-鸿鹄企业电子招投标系统源代码+支持二开+鸿鹄电子招投标系统

随着市场竞争的加剧和企业规模的扩大&#xff0c;招采管理逐渐成为企业核心竞争力的重要组成部分。为了提高招采工作的效率和质量&#xff0c;我们提出了一种基于电子化平台的解决方案。该方案旨在通过电子化招投标&#xff0c;使得招标采购的质量更高、速度更快&#xff0c;同…

TCP_滑动窗口介绍

简介 TCP协议中有两个窗口&#xff0c;滑动窗口和拥塞窗口&#xff0c;两者均是一种流控机制&#xff1b;滑动窗口是接收方的流控机制&#xff0c;拥塞窗口是发送方的流控机制。 本文介绍滑动窗口&#xff0c;接收方为TCP连接设置了接收缓存。当TCP连接接收到正确、按序的字节…

推荐几个靠谱的视频素材网站,让你的作品更吸引人~

视频素材是视频制作的重要组成部分&#xff0c;它可以提升视频的质量&#xff0c;增加视频的吸引力&#xff0c;让你的视频更容易获得观众的喜爱和关注。如果你想往视频行业发展&#xff0c;或者只是想做一个视频自媒体&#xff0c;那么你一定会遇到一个问题&#xff1a;视频素…

应用在水箱液位检测中的电容传感芯片

水箱水位检测原理通常包括使用传感器来检测水位的变化。常见的传感器类型包括液位开关、液位计和液位传感器。液位开关是一种简单的传感器&#xff0c;它可以检测水位是否达到预定的高度。当水位升高时&#xff0c;开关会打开&#xff1b;当水位降低时&#xff0c;开关会关闭。…

2023年全球架构师峰会(ArchSummit北京站2023)-核心PPT资料下载

一、峰会简介 ArchSummit聚焦业界强大的技术成果&#xff0c;秉承“实践第一、案例为主”的原则&#xff0c;展示先进技术在行业中的典型实践&#xff0c;以及技术在企业转型、发展中的推动作用。旨在帮助技术管理者、CTO、架构师做好技术选型、技术团队组建与管理&#xff0c…

pip 常用指令 pip help 命令用法介绍

&#x1f4d1;pip 常用命令归类整理 pip help 是一个用于获取 Python 包管理器 pip 的帮助信息的命令。它可以帮助我们了解 pip 的各种命令和参数的用法。 pip help 命令的参数主要包括 pip 的各种子命令&#xff0c;例如 install、uninstall、freeze、list 等。你可以使用 p…

MATLAB中var函数用法

目录 语法 说明 示例 矩阵方差 数组方差 指定方差权重向量 指定方差的维度 数组页的方差 排除缺失值的方差 方差和均值 var函数的功能是求取方差。 语法 V var(A) V var(A,w) V var(A,w,"all") V var(A,w,dim) V var(A,w,vecdim) V var(___,nanfla…

2022复盘2023规划(技术篇)

2022复盘&2023规划&#xff08;技术篇&#xff09; 说明&#xff1a; 以下是我对我自身知识掌握度的分级解释 Lv0 简单了解技术点的应用场景 Lv1 前端&#xff1a; 掌握基础知识&#xff0c;能进行简单组件的开发与页面绘制&#xff1b; 后端&#xff1a; 掌握知识点基础&a…

Fabric:使用GoLand+Fabric-SDK-Go操作Fabric网络

遇到bug, 未完待续!!! 写在最前 前序博客已经介绍了使用命令的方式在Fabric上创建通道以及部署执行链码的方法&#xff0c;但这个过程太繁琐&#xff0c;尤其是当Fabric网络中peer节点和组织Org过多时&#xff0c;需要频繁的更改环境变量。 Hyperledger Fabric官方提供了Fabri…

Ubuntu 22.04 LTS上安装Docker-ce

在Ubuntu 22.04 LTS上安装Docker-ce Docker是一个开源平台&#xff0c;用于自动化应用程序的部署、扩展和管理。它使用容器技术&#xff0c;使开发、测试和部署过程更加简化和可靠。本文将介绍在Ubuntu 22.04 LTS上安装Docker-ce的步骤。 步骤1&#xff1a;更新软件包列表 …

5~80V降5V 0.3A同步整流降压型DC-DC转换器-AH7550

AH7550是一种TEL&#xff1a;l86*4884*3702*高压、高效率的同步整流降压型DC-DC开关转换器&#xff0c;恒定120KHz开关频率&#xff0c;输出电流能力高达0.6A&#xff0c;AH7550支持5V~80V的宽输入操作电压范围&#xff0c;同时支持最大占空比90%输出&#xff0c;芯片内置环路补…

小型家用超声波清洗机适合清洗眼镜吗?小型超声波清洗机排行榜

在快节奏的现代生活中&#xff0c;我们的眼睛时常与各种物品接触&#xff0c;如眼镜、隐形眼镜等。为了保持眼睛的健康和舒适&#xff0c;定期清洗眼镜变得尤为重要。而随着科技的发展&#xff0c;小型家用超声波清洗机逐渐进入人们的视野&#xff0c;它能否成为我们清洗眼镜的…

学生备考护眼灯什么牌子好推荐?高性能护眼台灯推荐

作为一名电器测评师&#xff0c;对各类电器好物都了解得比较多&#xff0c;最近也会被很多的人询问护眼台灯哪个牌子好&#xff0c;问其原因才知很多的人有使用护眼台灯出现眼睛酸痛刺眼的现象&#xff0c;这是因为使用了不专业护眼台灯导致的&#xff0c;一般这类护眼台灯使用…

基于springboot的日记本系统源码+数据库+安装使用说明

之前写的SpringBoot日记本系统备受好评&#xff0c;考虑到还是有很多小伙伴不会部署&#xff0c;所以这一篇文章就单独来讲一下部署步骤吧。 需要资源 idea&#xff08;破不破解都行&#xff09; MySQL&#xff08;最好5.7以上版本&#xff0c;最好8.0&#xff09; Navicat…

C/C++ 获取系统时间time_t的使用

time_t&#xff1a;它通常是一个长整型&#xff08;long int&#xff09;&#xff0c;用于表示从特定参考点&#xff08;通常是 1970年1月1日00:00:00UTC&#xff09;经过的秒数。这被广泛用作时间戳。 但是time_t获取的时间是一个长整型&#xff0c;可以通过ctime()函数将其转…

探索 Vue3 ( 三 ) Teleport传送组件

Teleport Vue 3.0新特性之一。 Teleport 是一种能够将我们的模板渲染至指定DOM节点&#xff0c;不受父级style、v-show等属性影响&#xff0c;但data、prop数据依旧能够共用的技术&#xff1b; Teleport中的内容允许我们控制在任意的 DOM 中&#xff0c;完全不受父级style样式…

Gartner2023数据库魔力象限发布 阿里云依旧领导者 腾讯退出 EDB/Yugabyte进入

这是一个跨越数年的系列&#xff0c;历史文章参考&#xff1a; * 数据库魔力象限2022&#xff1a;阿里领先、腾讯再次进入 * 2021 藏在魔力象限中的数据库江湖 * Gartner云计算魔力象限2018 概述 Gartner云数据库魔力象限&#xff08;后简称“象限”或“MQ”&#xff09;一…

Postman报:400 Bad Request

● 使用Postman发送Post请求报400&#xff0c;入参为JSON&#xff1b; 二、分析 1、Postman请求并没有请求到后台Api&#xff08;由于语法错误&#xff0c;服务器无法理解请求&#xff09;&#xff1b; 2、入参出错范围&#xff1a;cookie、header、body、form-data、x-www-f…