Flink状态的理解

Flink是一个带状态的数据处理系统;系统在处理数据的过程中,各算子所记录的状态会随着数据的处理而不断变化;

1. 状态

所谓状态State,一般指一个具体的 Task 的状态,即线程处理过程中需要保存的历史数据或历史累计数据,默认保存在 Java 的堆内存中。
在这里插入图片描述
根据算子是否存在按照Key进行分区,State可以划分为keyed state 和 Non-keyed state(Operator State、算子状态)

  • operator state是task级别的state,说白了就是每个task对应一个state, 在逻辑上,由算子task下所有subtask共享
    在这里插入图片描述
    Operator State的经常被用在Source或Sink算子上,用来保存流入数据的偏移量或对输出数据做缓存,以保证Flink应用的Exactly-Once语义。
  • keyed state 是基于KeyedStream上的状态,这个状态是跟特定的Key 绑定的。KeyedStream流上的每一个Key,都对应一个State
    在这里插入图片描述

2. 状态数据结构

状态数据由Flink内置状态机制管理。keyed state提供了5种数据结构

2.1 keyed state 数据结构

状态状态描述
ValueState保存一个可以更新和检索的值(如上所述,每个值都对应到当前的输入数据的key
ListState保存一个元素的列表。可以往这个列表中追加数据,并在当前的列表上进行检索
MapState维护了一个映射列表
ReducingState保存一个单值,表示添加到状态的所有值的聚合
AggregateState保留一个单值,表示添加到状态的所有值的聚合。和 ReducingState 相反的是, 聚合类型可能与添加到状态的元素的类型不同

2.2 operator state

状态状态描述
ListStateListState的快照存储数据,系统重启后,list数据的重分配模式为: round-robin 轮询平均分配
UnionListStateUnionListState的快照存储数据,在系统重启后,list数据的重分配模式为: 广播模式; 在每个subtask上都拥有一份完整的数据;

3. 状态后端

默认情况下,state会保存在taskmanager的JVM堆内存,checkpoint会存储在JobManager的内存中。然而,状态数据的存储和checkpoint的存储位置可以改变,由state Backend(状态后端)配置实现

老版本(flink-1.12版及以前) Fsstatebackend MemoryStatebackend RocksdbStateBackend

flink1.12版本之后,可用的状态后端类型有两种
HashMapStateBackend、EmbeddedRocksDBStateBackend
而且其所生成的快照文件也统一了格式,因而在job重新部署或者版本升级时,可以任意替换statebackend

  • HashMapStateBackend
    ※ 状态数据是以java对象形式存储在heap内存中;
    ※ 内存空间不够时,也会溢出部分数据到本地磁盘文件;
    ※ 可以支撑大规模的状态数据;(只不过在状态数据规模超出内存空间时,读写效率就会明显降低)

  • EmbeddedRocksDBStateBackend

    ※ RocksDB使用一套日志结构的数据库引擎,它是Flink中内置的第三方状态管理器, 为了更好的性能,这套引擎是用C++编写的。 Key和value是任意大小的字节流。

    ※ 它需要配置一个远端的filesystem uri(一般是HDFS),在做checkpoint的时候,会把本地的数据直接复制到fileSystem中。fail over的时候从fileSystem中恢复到本地, RocksDB克服了state受内存限制的缺点,同时又能够持久化到远端文件系统中,比较适合在生产中使用。

    ※ 使用RocksDB + HDFS进行state存储:首先state先在taskManger的本地存储到RocksDB,然后异步写入到HDFS中,状态数量仅仅受限于本地磁盘容量限制
    在这里插入图片描述

4. 状态数据容错

Flink是一个stateful(带状态)的数据处理系统;系统在处理数据的过程中,各算子所记录的状态会随着数据的处理而不断变化;

一旦系统崩溃,需要重启后能够恢复出崩溃前的状态才能进行数据的接续处理;因此,必须要一种机制能对系统内的各种状态进行持久化容错;Flink用checkpoint机制实现状态数据的容错

4.1 checkpoint

Checkpoint是Flink实现容错机制最核心的功能,它能够根据配置周期性地基于Stream中各个Operator/task的状态来生成快照,从而将这些状态数据定期持久化存储下来,当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些快照进行恢复,从而修正因为故障带来的程序数据异常。

  • checkpoint默认关闭,需要手工开启。开启后,默认Exactly-once快照模式。还有一种快照模式为At-least-once

  • checkPoint的位置设置flink-conf.yaml#state.checkpoints.dir

  • Flink支持不同的重启策略,以在故障发生时控制作业如何重启,集群在启动时会伴随一个默认的重启策略。配置参数flink-conf.yaml#restart-strategy

♦ 如果没有启用 checkpointing,则使用无重启 (no restart) 策略 ♦ 如果启用了 checkpointing,但没有配置重启策略,则使用固定 间隔 (fixed-delay) 策略,尝试重启次数默认值是:Integer.MAX_VALUE。
♦ 另一种重启策略为Failure rate,某时间段内失败了N次就重启

全局配置

# 每隔3s重启一次,重试间隔为10s
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s
#5分钟内若失败了3次则认为该job失败,重试间隔为10s
restart-strategy: failure-rate
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s
# 不重启
restart-strategy: none

单个JOB内配置

Configuration conf = new Configuration();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
//开启状态检查点机制(它将会定期对整个系统中各个task的状态进行快照持久化,以便失败重启后还能从失败之前的状态恢复)env.enableCheckpointing(1000,CheckpointingMode.EXACTLY_ONCE);
// checkpoint机制触发后,持久化保存各task状态数据的存储位置(生产中用hdfs
env.getCheckpointConfig().setCheckpointStorage("hdfs://node01:8020/tmp/flink/state");
// 指定状态后端存储(内存)
env.setStateBackend(new HashMapStateBackend());
// 开启自动重启策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.milliseconds(2000)));
// env.setRestartStrategy(RestartStrategies.failureRateRestart(3, org.apache.flink.api.common.time.Time.seconds(100), org.apache.flink.api.common.time.Time.seconds(10)));
// env.setRestartStrategy(RestartStrategies.noRestart())
  • checkpoint个数默认保留最近成功生成的一个,支持保留多个,通过参数flink-conf.yaml#state.checkpoints.num-retained控制,如果希望回退到某个Checkpoint点,只需要指定对应的某个Checkpoint路径即可实现
flink run -t yarn-per-job  -yjm 1024 -ytm 1024 -s hdfs://node01:8020/tmp/flink/state/715b120fe8736a3af7842ea0a5264c46/chk-6/_metadata

4.2 savePoint

savePoint是检查点一种特殊实现,底层其实也是使用Checkpoint的机制。

savePoint是用户以手工命令的方式触发checkpoint,并将结果持久化到指定的存储目录中。

作业升级、代码修改、任务迁移和维护,都可以使用savePoint

  1. savePoint的存储位置
    savePoint的存储位置flink-conf.yaml#state.savepoints.dir,不是必须设置,但设置了后, 后面创建指定Job的Savepoint时,可以不用在手动执行命令时指定Savepoint的位置。

  2. savePoint的手动触发:

#【针对on standAlone模式】
bin/flink savepoint jobId [targetDirectory]

#【针对on yarn模式需要指定-yid参数】
bin/flink savepoint jobId [targetDirectory] -yid yarnAppId

jobId 需要触发savepoint的jobId编号
targetDirectory 指定savepoint存储数据目录
-yid 指定yarnAppId
例如: flink savepoint 84e766231bbe4b9ff3667f9a0d80b867 -yid application_1619059559839_0001

  1. 查看HDFS上savepoint目录

#Savepoint directory /flink/savepoints/savepoint-:shortjobid-:savepointid/

#Savepoint file contains the checkpoint meta data /savepoints/savepoint-:shortjobid-:savepointid/_metadata

在这里插入图片描述
4. 触发savepoint并且停止作业

##语法: bin/flink stop jobId -yid yarnAppId

##例如: flink stop 84e766231bbe4b9ff3667f9a0d80b867 -yid application_1619059559839_0001

  1. 从指定的savepoint启动job

##语法: bin/flink run -s savepointPath [runArgs]

##例如: flink run -t yarn-per-job -yjm 1024 -ytm 1024
-s hdfs://node01:8020/flink/savepoints/savepoint-84e766-0591f3377ad0
-c com.loess.checkpoint.TestCheckPoint flink-study-1.0-SNAPSHOT.jar

  1. 清除savepoint数据

bin/flink savepoint -d savepointPath

##也可以手动删除某个savepoint,这通过常规的文件系统操作就可以做到,不影响其它的savepoints和checkpoints

  1. savePoint使用建议

为了能够在作业的不同版本之间以及 Flink 的不同版本之间顺利升级,推荐通过 uid(String) 方法手动的给算子赋予 ID,这些
ID 将用于确定每一个算子的状态范围。

不手动给各算子指定 ID,则会由 Flink 自动给每个算子生成一个 ID。只要这些 ID 没有改变就能从保存点
(savepoint)将程序恢复回来。而这些自动生成的 ID
依赖于程序的结构,并且对代码的更改敏感。当程序改变时,ID会随之变化,所以建议用户手动设置 ID

DataStream<String> stream = env.// Stateful source (e.g. Kafka) with ID.addSource(new StatefulSource()).uid("source-id") // ID for the source operator.shuffle()// Stateful mapper with ID.map(new StatefulMapper()).uid("mapper-id") // ID for the mapper// Stateless printing sink.print(); // Auto-generated ID

4.3 savePoint与checkPoint的区别

  • checkpoint的侧重点是“容错”,即Flink作业意外失败并重启之后,能够直接从checkpoint来恢复运行,且不影响作业逻辑的准确性。而savepoint的侧重点是“维护”,即Flink作业需要在人工干预下手动重启、升级、迁移或A/B测试时,先将状态整体写入可靠存储,维护完毕之后再从savepoint恢复现场。
  • savepoint是通过checkpoint机制创建的,所以savepoint本质上是特殊的checkpoint。
  • checkpoint面向Flink Runtime本身,由Flink的各个TaskManager定时触发快照并自动清理,一般不需要用户干预;savepoint面向用户,完全根据用户的需要触发与清理。
  • checkpoint是支持增量的(通过RocksDB),特别是对于超大状态的作业而言可以降低写入成本。savepoint并不会连续自动触发,所以savepoint没有必要支持增量。
    在这里插入图片描述

5. checkpoint机制原理

checkPoint是所有 Operator / Task 的状态在某个时间点的一份拷贝(一份快照), 这个时间点应该是所有 Operator / Task 任务都恰好处理完一个相同的输入数据的时候。
在这里插入图片描述
若某个subTask挂了,则此时的状态都被清空,从checkpoint恢复最近一次的状态,重新启动应用程序,计算输入流

5.1 Barrier机制

Barrier是一种特殊事件,用来作为快照信号,由checkpoint 协调器向数据流中注入该信号,subtask任务收到该信号后,就会执行状态的快照。
在这里插入图片描述

  1. 首先是JobManager中的checkpoint Coordinator(协调器) 向任务中的所有source Task周期性发送barrier(栅栏)进行快照请求。
  2. source Task接受到barrier后, 会把当前自己的状态进行snapshot(可以保存在HDFS上)。
  3. source向checkpoint coordinator确认snapshot已经完成。
  4. source继续向下游transformation operator发送 barrier。
  5. transformation operator重复source的操作,直到sink operator向协调器确认snapshot完成。
  6. coordinator确认完成本周期的snapshot已经完成。

5.2 Barrier对齐

对于下游算子来说,可能有多个与之相连的上游输入,我们将算子之间的边 称为通道。Source要将一个ID为n的Checkpoint Barrier向所有下游算子广播,这也意味着下游算子的多个输入里都有同一个Checkpoint Barrier,而且 不同输入里Checkpoint Barrier的流入进度可能不同。因此Checkpoint Barrier传播的过程需要进行对齐(Barrier Alignment)
在这里插入图片描述
算子对齐分为四部:
(1). 算子子任务在某个输入通道中收到第一个ID为n的Checkpoint Barrier,但是其他输入通道中ID为n的Checkpoint Barrier还未到达,该算子子任务开始准备进行对齐

(2). 算子子任务将第一个输入通道的数据缓存下来,同时继续处理其他输入通道的数据,这个过程被称为对齐。

(3). 第二个输入通道的Checkpoint Barrier抵达该算子子任务,该算子子任务执行快照,将状态写入State Backend,然后将ID为n的Checkpoint Barrier向下游所有输出通道广播。

(4). 对于这个算子子任务,快照执行结束,继续处理各个通道中新流入数据,包括刚才缓存起来的数据。
在这里插入图片描述

6.快照性能优化方案

每次进行Checkpoint前,都需要暂停处理新流入数据,然后开始执行快照,假如状态比较大,一次快照可能长达几秒甚至几分钟。

Checkpoint Barrier对齐时,必须等待所有上游通道都处理完,假如某个上游通道处理很慢,这可能造成整个数据流堵塞。

两种优化方案
① Flink提供了异步快照(Asynchronous Snapshot)的机制。当实际执行快照时,Flink可以立即向下广播Checkpoint Barrier,表示自己已经执行完自己部分的快照。一旦数据同步完成,再给Checkpoint Coordinator发送确认信息。通过基于 Chandy-Lamport 算法的分布式快照,将检查点的保存和数据处理分离开,不暂停整个应用。
② Flink允许跳过对齐这一步,或者说一个算子子任务不需要等待所有上游通道的Checkpoint Barrier,直接将Checkpoint Barrier广播,执行快照并继续处理后续流入数据。为了保证数据一致性,Flink必须将那些较慢的数据流中的元素也一起快照,一旦重启,这些元素会被重新处理一遍

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/9466.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数据结构【排序】

第七章 排序 一、排序 1.定义&#xff1a;将无序的数排好序 &#xff1b; 2.稳定性&#xff1a; Kᵢ和Kⱼ中&#xff0c;Kᵢ优先于Kⱼ那么在排序后的记录中仍然保持Kᵢ优先&#xff1b; 3.评价标准&#xff1a;执行时间和所需的辅助空间&#xff0c;其次是算法的稳定性&#xf…

[算法很美打卡] 多维数组篇 (打卡第一天)

文章目录 顺时针打印二维数组0所在的行列清零 顺时针打印二维数组 package 每日算法学习打卡.算法打卡.七月份.七月二十六号;public class test1 {public static void main(String[] args) {int[][] matrix {{1,2},{5,6},{9,10},{13,14},};print(matrix);}static void print(i…

LangChain Agents深入剖析及源码解密上(一)

LangChain Agents深入剖析及源码解密上(一) LangChain Agents深入剖析及源码解密上 Agent工作原理详解 本节会结合AutoGPT的案例,讲解LangChain代理(Agent)为核心的内容。我们前面已经谈了代理本身的很多内容,也看了绝大部分的源代码,例如:ReAct的源代码,还有mrkl的源代…

P3183 [HAOI2016] 食物链

题目描述 如图所示为某生态系统的食物网示意图&#xff0c;据图回答第 1 小题 现在给你 n 个物种和 m 条能量流动关系&#xff0c;求其中的食物链条数。物种的名称为从 1 到 n 编号 M 条能量流动关系形如 1,2,3​,…,am−1​,bm−1​,am​,bm​。其中 ai​ 和 bi​ 表示能量从物…

Java反射 -- 详细介绍 (框架核心)

反射 是 Java框架 的核心 &#xff0c;无论是Tomcat、SpringMVC、Spring IOC、Spring AOP、动态代理 &#xff0c;都使用到了 反射 反射的作用简单讲就是 无需 new 对象&#xff0c;就可以动态获取到一个类的全部信息&#xff0c;包括 属性、方法&#xff0c;构造器&#xff0…

边写代码边学习之全连接Dense

1. 全连接原理 全连接神经网络&#xff08;Fully Connected Neural Network&#xff09;是一种最基本的神经网络结构&#xff0c;也被称为多层感知器&#xff08;Multilayer Perceptron&#xff0c;MLP&#xff09;。其原理是模拟人脑神经元之间的连接方式&#xff0c;通过多个…

pytorch学习——多层感知机

一.感知机 感知机——神经网络基本单元&#xff0c;最简单的深度网络称为多层感知机。多层感知机由多层神经元组成&#xff0c;每一层与它上一层相连&#xff0c;从中接收输入&#xff0c; 同时每一层也与它的下一层相连&#xff0c;影响当前层的神经元。 解释&#xff1a;如果…

生态系统景观指数-聚集度指数AI计算

景观指数是景观生态学的常见指标&#xff0c;可用于不同生态系统的特征识别。景观指数是反映景观结构与空间格局的定量指标&#xff0c;目前已成为景观生态学领域常用的分析景观格局、度量空间异质性的重要方法。不同水平下的指数结果往往代表不同含义&#xff0c;应在把握指数…

决策树学习

决策树学习 决策树决策树基础适用决策树学习的经典目标问题样本的表示训练样本决策树的概念发展历史 经典决策树算法ID3算法属性选择和节点混杂度&#xff08;Impurity&#xff09;ID3 Q1: 哪个属性是最佳属性&#xff1f;当前最佳属性节点选择熵&#xff08;Entropy&#xff0…

如何高效实现文件传输:小文件采用零拷贝、大文件采用异步io+直接io

一般会如何实现文件传输&#xff1f; 服务器提供文件传输功能&#xff0c;需要将磁盘上的文件读取出来&#xff0c;通过网络协议发送到客户端。如果需要你自己编码实现这个文件传输功能&#xff0c;你会怎么实现呢&#xff1f; 通常&#xff0c;你会选择最直接的方法&#xf…

交叉编译----宿主机x86 ubuntu 64位-目标机ARMv8 aarch64

1.交叉编译是什么&#xff0c;为什么要交叉编译 编译&#xff1a;在一个平台上生成在该平台上的可执行代码交叉编译&#xff1a;在一个平台上生成在另一个平台上的可执行代码交叉编译的例子&#xff1a;如51单片机的可执行代码&#xff08;hex文件&#xff09;是在集成环境kei…

AT15透明屏有哪些特点?

AT15透明屏是一种新型的显示技术&#xff0c;它采用了透明材料制成的屏幕&#xff0c;可以实现透明显示效果。这种屏幕可以广泛应用于各种领域&#xff0c;如商业广告、展览展示、智能家居等。 AT15透明屏的特点之一是其高透明度。 由于采用了透明材料制成&#xff0c;AT15透明…

深度对话|Sui资产所有权如何让游戏体验更好

近日&#xff0c;我们采访了Mysten Labs的游戏产品总监Bill Allred&#xff0c;共同探讨了为什么Sui非常适合游戏。他分享了对Sui关键创新的看法&#xff0c;以及它为游戏开发者带来的价值&#xff0c;Sui的关键创新帮助开发者将他们所想象的游戏变为现实。 你能谈谈某些游戏的…

【微服务系统设计】系统设计基础:速率限制器

什么是速率限制器&#xff1f; 速率限制是指防止操作的频率超过定义的限制。在大型系统中&#xff0c;速率限制通常用于保护底层服务和资源。速率限制一般在分布式系统中作为一种防御机制&#xff0c;使共享资源能够保持可用性。 速率限制通过限制在给定时间段内可以到达您的 A…

vue解决跨域访问问题(个人学习笔记六)

目录 友情提醒第一章、跨越问题解决1.1&#xff09;什么是跨域问题&#xff1f;1.2&#xff09;第一种解决方式&#xff1a;后端设置允许跨域访问1.3&#xff09;第二种解决方式&#xff1a;前端配置代理 第二章、配置代理服务器2.1&#xff09;配置简单代理服务器2.2&#xff…

端口复用与重映射

端口复用和重映射 STM32F1有很多的内置外设&#xff0c;这些外设的外部引脚都是与GPIO复用的。也就是说&#xff0c;一个GPIO如果可以复用为内置外设的功能引脚&#xff0c;那么当这个GPIO作为内置外设使用的时候&#xff0c;就叫做复用。 大家都知道&#xff0c;MCU都有串口…

kotlin 编写一个简单的天气预报app(一)

使用Android Studio开发天气预报APP 今天我来分享一下如何使用Android Studio开发一个天气预报APP。在文中&#xff0c;我们将使用第三方接口获取实时天气数据&#xff0c;并显示在APP界面上。 步骤一&#xff1a;创建新项目 首先&#xff0c;打开Android Studio并创建一个新…

MySQL 中一条 SQL 的查询与更新

MySQL 中一条 SQL 的查询与更新 1 SQL 的查询1.1 MySQL 的逻辑架构图1.2 连接器1.3 查询缓存1.4 分析器1.5 优化器1.6 执行器 2 SQL 的更新2.1 redo log&#xff08;重做日志&#xff09;2.2 binlog&#xff08;归档日志&#xff09;2.3 redo log 和 binlog 日志的差异2.4 示例…

React 中 {} 的应用

列表渲染&#xff1a; 1.使用数组的 map 方法&#xff08;因为map 可以映射&#xff09; 2、列表要添加 key 属性&#xff0c;值要唯一 // 导入 // 用来获取 dom元素 import { createRoot } from "react-dom/client";// 内容 const contentArr [{ id: 1, name: &…