Flink——Flink检查点(checkpoint)、保存点(savepoint)的区别与联系

Flink checkpoint

Checkpoint是Flink实现容错机制最核心的功能,能够根据配置周期性地基于Stream中各个Operator的状态来生成Snapshot,从而将这些状态数据定期持久化存储下来,从而将这些状态数据定期持久化存储下来,当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些Snapshot进行恢复,从而修正因为故障带来的程序数据状态中断。

  1. Checkpoint指定触发生成时间间隔后,每当需要触发Checkpoint时,会向Flink程序运行时的多个分布式的Stream Source中插入一个Barrier标记
  2. 当一个Operator接收到一个Barrier时,它会暂停处理Steam中新接收到的数据记录
  3. 每个Stream中都会存在对应的Barrier,该Operator要等到所有的输入Stream中的Barrier都到达。当所有Stream中的Barrier都已经到达该Operator,这时所有的Barrier在时间上看来是同一个时刻点(表示已经对齐)
  4. 该Operator会将数据记录(Outgoing Records)发射(Emit)出去,作为下游Operator的输入
  5. 最后将Barrier对应Snapshot发射(Emit)出去作为此次Checkpoint的结果数据

开启checkpoint

 

        //1.1 开启CKenv.enableCheckpointing(5000);env.getCheckpointConfig().setCheckpointTimeout(10000);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
//env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/cdc-test/ck"));

ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION,表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint处理。

上面代码配置了执行Checkpointing的时间间隔为1分钟。

保存多个checkpoint
默认情况下,如果设置了Checkpoint选项,则Flink只保留最近成功生成的1个Checkpoint

Flink可以支持保留多个Checkpoint,需要在Flink的配置文件conf/flink-conf.yaml中,添加如下配置,指定最多需要保存Checkpoint的个数:

state.checkpoints.num-retained: 20

如果希望会退到某个Checkpoint点,只需要指定对应的某个Checkpoint路径即可实现。

从checkpoint 恢复
如果Flink程序异常失败,或者最近一段时间内数据处理错误,我们可以将程序从某一个Checkpoint点,比如chk-860进行回放,执行如下命令

bin/flink run -s hdfs://namenode01.td.com/flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-860/_metadata flink-app-jobs.jar
  • 所有的Checkpoint文件都在以Job ID为名称的目录里面
  • 当Job停掉后,重新从某个Checkpoint点(chk-860)进行恢复时,重新生成Job ID
  • Checkpoint编号会从该次运行基于的编号继续连续生成:chk-861、chk-862、chk-863

checkpoint的建议

  • Checkpoint 间隔不要太短
    • 过短的间对于底层分布式文件系统而言,会带来很大的压力。
    • Flink 作业处理 record 与执行 checkpoint 存在互斥锁,过于频繁的checkpoint,可能会影响整体的性能。
  • 合理设置超时时间

Flink savepoint

Savepoint会在Flink Job之外存储自包含(self-contained)结构的Checkpoint,它使用Flink的Checkpointing机制来创建一个非增量的Snapshot,里面包含Streaming程序的状态,并将Checkpoint的数据存储到外部存储系统中

Flink程序中包含两种状态数据:

用户定义的状态(User-defined State)是基于Flink的Transformation函数来创建或者修改得到的状态数据
系统状态(System State),是指作为Operator计算一部分的数据Buffer等状态数据,比如在使用Window Function时,在Window内部缓存Streaming数据记录
Flink提供了API来为程序中每个Operator设置ID,这样可以在后续更新/升级程序的时候,可以在Savepoint数据中基于Operator ID来与对应的状态信息进行匹配,从而实现恢复。

设置Operator ID:

DataStream<String> stream = env.// Stateful source (e.g. Kafka) with ID.addSource(new StatefulSource()).uid("source-id") // ID for the source operator.shuffle()// Stateful mapper with ID.map(new StatefulMapper()).uid("mapper-id") // ID for the mapper// Stateless printing sink.print(); // Auto-generated ID

创建Savepoint

创建一个Savepoint,需要指定对应Savepoint目录,有两种方式来指定

需要配置Savepoint的默认路径,需要在Flink的配置文件conf/flink-conf.yaml中,添加如下配置,设置Savepoint存储目录

state.savepoints.dir: hdfs://namenode01.td.com/flink/flink-savepoints

手动执行savepoint命令的时候,指定Savepoint存储目录

bin/flink savepoint :jobId [:targetDirectory]

使用默认配置

bin/flink savepoint 40dcc6d2ba90f13930abce295de8d038

为正在运行的Flink Job指定一个目录存储Savepoint数据

bin/flink savepoint 40dcc6d2ba90f13930abce295de8d038 hdfs://namenode01.td.com/tmp/flink/savepoints

从Savepoint恢复

bin/flink run -s :savepointPath [:runArgs]

以上面保存的Savepoint为例,恢复Job运行

bin/flink run -s hdfs://namenode01.td.com/tmp/flink/savepoints/savepoint-40dcc6-a90008f0f82f flink-app-jobs.jar

会启动一个新的Flink Job,ID为cdbae3af1b7441839e7c03bab0d0eefd

Savepoint 目录结构

1bbc5是Flink Job ID字符串前6个字符,后面bd967f90709b是随机生成的字符串
_metadata文件包含了Savepoint的元数据信息
其他文件内容都是序列化的状态信息

总结

checkpoint和savepoint是Flink为我们提供的作业快照机制,它们都包含有作业状态的持久化副本。

用几句话总结一下。

  1. checkpoint的侧重点是“容错”,即Flink作业意外失败并重启之后,能够直接从早先打下的checkpoint恢复运行,且不影响作业逻辑的准确性。而savepoint的侧重点是“维护”,即Flink作业需要在人工干预下手动重启、升级、迁移或A/B测试时,先将状态整体写入可靠存储,维护完毕之后再从savepoint恢复现场。

  2. savepoint是“通过checkpoint机制”创建的,所以savepoint本质上是特殊的checkpoint。

  3. checkpoint面向Flink Runtime本身,由Flink的各个TaskManager定时触发快照并自动清理,一般不需要用户干预;savepoint面向用户,完全根据用户的需要触发与清理。

  4. checkpoint的频率往往比较高(因为需要尽可能保证作业恢复的准确度),所以checkpoint的存储格式非常轻量级,但作为trade-off牺牲了一切可移植(portable)的东西,比如不保证改变并行度和升级的兼容性。savepoint则以二进制形式存储所有状态数据和元数据,执行起来比较慢而且“贵”,但是能够保证portability,如并行度改变或代码升级之后,仍然能正常恢复。

  5. checkpoint是支持增量的(通过RocksDB),特别是对于超大状态的作业而言可以降低写入成本。savepoint并不会连续自动触发,所以savepoint没有必要支持增量。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/83743.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

FPGA设计时序约束一、主时钟与生成时钟

​目录 一、主时钟create_clock 1.1 定义 1.2 约束设置格式 1.3 Add this clock to the existing clock 1.4 示例 1.5 差分信号 二、生成时钟generate_clock 2.1 定义 2.2 格式 2.2.1 by clock frequency 2.2.2 by clock edges 2.2.3 示例 2.2.4 自动生成时钟 2.…

MongoDB-1入门介绍

NoSQL NoSQL(NoSQL Not Only SQL)&#xff0c;意即反SQL运动&#xff0c;指的是非关系型的数据库 优点 1、对数据库高并发读写。 2、对海量数据的高效率存储和访问。 3、对数据库的高可扩展性和高可用性。 弱点&#xff1a; 1、数据库事务一致性需求 2、数据库的写实时性…

flink集群与资源@k8s源码分析-集群

0 介绍 本文是flink集群与资源@k8s源码分析系列的第二篇-集群 1 场景 下面详细分析各用例 2 启动k8s集群 k8s集群支持session和application模式,job模式将会被废弃,本文分析session模式集群 Configuration作为配置容器,几乎所有的构建需要从配置类获取配置项,这里不显示…

将docker镜像打成tar包

# 打包 docker save -o zookeeper.tar bitnami/zookeeper:3.9.0-debian-11-r11# 解压 docker load -i zookeeper.tar

day27IO(异常File综合案例)

1. 异常 1.1 异常概念 异常&#xff0c;就是不正常的意思。在生活中:医生说,你的身体某个部位有异常,该部位和正常相比有点不同,该部位的功能将受影响.在程序中的意思就是&#xff1a; 异常 &#xff1a;指的是程序在执行过程中&#xff0c;出现的非正常的情况&#xff0c;最…

事务碰上锁好似那油锅里进了火

目录 前言 场景 代码复现 提出疑问 该怎么解决呢 1.使用编程式事务 2.将事务独立出一个方法 前言 很多时候我们谈起事务都是如虎色变&#xff0c;一想起来都是脑袋懵懵的 事务的隔离级别及传播机制是什么Spring的事务底层实现原理了解吗哪几种情况下事务会失效 …

【探索C++】C++对C语言的扩展

(꒪ꇴ꒪ )&#xff0c;Hello我是祐言QAQ我的博客主页&#xff1a;C/C语言&#xff0c;数据结构&#xff0c;Linux基础&#xff0c;ARM开发板&#xff0c;网络编程等领域UP&#x1f30d;快上&#x1f698;&#xff0c;一起学习&#xff0c;让我们成为一个强大的攻城狮&#xff0…

JavaScript基础知识12——运算符:算数运算符,比较运算符

哈喽&#xff0c;大家好&#xff0c;我是雷工。 以下为JavaScript基础知识学习笔记。 一、算数运算符 1、算术运算符&#xff1a;即进行数学计算的符号。 2、有哪些算数运算符&#xff1a; &#xff1a;加法 -&#xff1a;减法 *&#xff1a;乘法 /:除法 %:取余&#xff08;…

系统架构设计师-数据库系统(3)

目录 一、数据控制 1、安全性 2、完整性 3、并发控制 4、故障恢复 二、数据库设计概述 1、数据库设计关注的问题 2、数据库性能优化 3、规范化与反规范化 一、数据控制 1、安全性 2、完整性 &#xff08;1&#xff09;实体完整性约束&#xff1a;规定基本关系的主属性不能取空…

Python开发利器之VS Code

Python官方提供了一个Python集成开发环境&#xff08;IDE&#xff09;&#xff1a; IDLE (Integrated Development and Learning Environment)。 它提供了一个图形用户界面&#xff0c;可以让开发者编写、调试和执行Python程序。IDLE包含Python解释器、代码编辑器、调试器和文件…

RK3568平台开发系列讲解(工具命令篇)ADB的安装

🚀返回专栏总目录 文章目录 一、ADB介绍二、Windows 下安装 adb 工具沉淀、分享、成长,让自己和他人都能有所收获!😄 一、ADB介绍 adb 全称 Android Debug Bridge,直译过来就是 Android 调试桥,它是一个通用的命令行工具。adb 做为 Android 设备与 PC 端连接的一个桥梁…

揭秘:Wasserstein GAN与梯度惩罚(WGAN-GP)

一、说明 什么是梯度惩罚&#xff1f;为什么它比渐变裁剪更好&#xff1f;如何实施梯度惩罚&#xff1f;在提起GAN对抗网络中&#xff0c;就不能避免Wasserstein距离的概念&#xff0c;本篇为系列读物&#xff0c;目的是揭示围绕Wasserstein-GAN建模的一些重要概念进行探讨。 图…

浅谈DBT的一些不足之处

DBT的好处是显而易见的&#xff0c;它支持连接多达41种数据库。而且不需要你写DDL语句&#xff0c;只要写select语句&#xff0c;DBT会自动帮你推断schema结构&#xff0c;将数据写入到数据库中&#xff1a; 但是使用了一段时间之后&#xff0c;发现DBT也存在着如下这些不足之处…

/usr/bin/ld: cannot find -lmysqlcllient

文章目录 1. question: /usr/bin/ld: cannot find -lmysqlcllient2. solution 1. question: /usr/bin/ld: cannot find -lmysqlcllient 2. solution 在 使用编译命令 -lmysqlclient时&#xff0c;如果提示这个信息。 先确认一下 有没有安装mysql-devel 执行如下命令 yum inst…

【Linux】Ubuntu美化主题【教程】

【Linux】Ubuntu美化主题【教程】 文章目录 【Linux】Ubuntu美化主题【教程】1. 安装优化工具Tweak2.下载自己喜欢的主题3. 下载自己喜欢的iconReference 1. 安装优化工具Tweak 首先安装优化工具Tweak sudo apt-get install gnome-tweak-tool安装完毕后在菜单中打开Tweak 然后…

**20.迭代器模式(Iterator)

意图&#xff1a;提供一种方法顺序访问一个聚合对象中的各个元素&#xff0c;而又不需要暴露该对象的内部表示。 上下文&#xff1a;集合对象内部结构常常变化各异。对于这些集合对象&#xff0c;能否在不暴露其内部结构的同时&#xff0c;让外部Client透明地访问其中包含的元素…

nbcio-boot移植到若依ruoyi-nbcio平台里一formdesigner部分(三)

因为这个版本的若依plus不支持本地文件上传&#xff0c;所以需要增加这些本地上传文件的后端代码 和前端代码修改。 1、后端部分 先配置跳过测试吧&#xff0c;平时编译也不需要这个 <!--添加配置跳过测试--><plugin><groupId>org.apache.maven.plugins<…

Python常用库(六):科学计算库-Numpy[上篇]:创建、访问、赋值

1.Numpy 1.1 介绍 NumPy是Python中非常流行且重要的科学计算库&#xff0c;提供了一个强大的多维数组对象(ndarray)和许多数学操作&#xff0c;包括矩阵运算、线性代数、微积分等等。 numpy是Python中一个非常有用的工具&#xff0c;特别是在需要进行数值计算、线性代数计算、…

淘宝分布式文件存储系统( 三 ) -TFS

淘宝分布式文件存储系统( 三 ) ->>TFS 目录 : 文件重新映射的接口介绍文件映射 mmap_file.cpp的实现进行测试 文件重新映射 (增加 或者 减少 文件映射区域的大小) mremap() 函数的原型如下 #include <sys/mman.h> void *mremap( void * old_address , size_…

阿里测开面试大全(一)附答案完整版

万字长文&#xff0c;建议收藏 1 什么是POM&#xff0c;为什么要使用它&#xff1f; POM是Page Object Model的简称&#xff0c;它是一种设计思想&#xff0c;而不是框架。大概的意思是&#xff0c;把一个一个页面&#xff0c;当做一个对象&#xff0c;页面的元素和元素之间操…