Checkpoint机制和生产配置

1.前提

在将Checkpoint之前,先回顾一下flink处理数据的流程:

在这里插入图片描述

2. 概述

Checkpoint机制,又叫容错机制,可以保证流式任务中,不会因为异常时等原因,造成任务异常退出。可以保证任务正常运行。
(1)能在集群异常时,保持已计算的数据,下次恢复时能在已保存数据的基础上,继续计算(类似于快照);
(2)避免数据丢失(通过Barrier实现)

3.机制运行流程

在这里插入图片描述
解释:

(1)主节点上的检查点协调器(CheckpointCoordinator)会周期性地发送一个个地Barrier(栅栏,前面说的 偏移量做标识),Barrier会混在数据里,随着数据流,流向source算子;

(2)source算子在摄入数据的时候,如果碰到Barrier栅栏,不会去处理,Barrier就会让先算子去汇报当前的状态

(3)处理完之后,Barrier就会随着数据流,流向下一个算子;

(4)下一个算子收到Barrier,同样会停下手里的工作,也会向检查点协调器汇报当前的状态,把状态往主节点传递一份(备份,防止算子出错,状态丢失)
(5)上一步处理完之后,Barrier又会随着数据流向下一个算子,以此类推。
(6)等Barrier流经所有的算子之后,这一轮的快照就算制作完成

4. 状态后端

状态后端,StateBackend,就是Flink存储状态的介质(存储状态的地方)。Flink提供了三种状态后端的存储方式:

  • MemoryStateBackend(内存,使用HashMapStateBackend实现,生产一般不用)
  • FsStateBackend(文件系统,比如说HDFS,生产常用)
  • RocksDBStateBackend(RocksDB数据库,生产常用)
  • 同时也可以把状态外置到 Hbase和Redis,解决大状态存储问题
MemoryStateBackend

内存,掉电易失。不安全。基本不用。
在这里插入图片描述
配置如下:

state.backend: hashmap
# 可选,当不指定 checkpoint 路径时,默认自动使用 JobManagerCheckpointStorage
state.checkpoint-storage: jobmanager
FsStateBackend

FsStateBackend,文件系统的状态后端,就是把状态保存在文件系统中,常用来保存状态的文件系统有HDFS;
工作中常用;
在这里插入图片描述
配置如下:

state.backend: hashmap 
state.checkpoints.dir: file:///checkpoint-dir/ # 默认为FileSystemCheckpointStorage 
state.checkpoint-storage: filesystem
RocksDBStateBackend

RocksDBStateBackend,把状态保存在RocksDB数据库中。

RocksDB,是一个小型文件系统的数据库。

配置如下:

state.backend: rocksdb
state.checkpoints.dir: file:///checkpoint-dir/# Optional, Flink will automatically default to FileSystemCheckpointStorage
# when a checkpoint directory is specified.
state.checkpoint-storage: filesystem

特点:可以保持巨大的状态,且支持增量状态保存。

5.重启策略

5.1 重启策略概述

Flink流式任务,需要长期运行,就算遇到一些数据异常问题等,也不能随便退出。

Flink为了让任务能够在遇到异常退出时,能够重新启动,正常运行,Flink提出了重启策略的概念。

5.2 Flink的重启策略

Flink支持四种类型的重启策略:

  • none:没有重启。任务一旦遇到异常,就退出。

  • fixed-delay:固定延迟重启策略。也就是说,可以配置一个重启的次数。超过次数后,才会退出。

  • failure-rate:失败率重启策略。也就是说,任务的失败频率。超过该频率后才退出。在设定的频率之内,不会退出。

  • exponential-delay:指数延迟重启策略。也就是说,任务在失败后,下一次的延迟时间是随着指数增长的。

5.3案例演示
模拟异常的代码
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** Flink 代码实现流处理,进行单词统计。数据源来自于socket数据。* todo 演示Flink遇到异常重启。*/
public class RestartStrategy {public static void main(String[] args) throws Exception {//1.构建流式执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.STREAMING);env.setParallelism(1);//2.数据输入(数据源)//从socket读取数据,socket = hostname + portDataStreamSource<String> source = env.socketTextStream("node1", 9999);//3.数据处理//3.1 使用flatMap进行扁平化处理SingleOutputStreamOperator<String> flatMapStream = source.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {String[] words = value.split(" ");for (String word : words) {if (word.equals("evil")) {//evil:恶魔,魔鬼,程序如果碰到魔鬼就退出。throw new Exception("魔鬼来了,程序退出");}out.collect(word);}}});//3.2 使用map进行转换,转换成(单词,1)SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = flatMapStream.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {return Tuple2.of(value, 1);}});//3.3使用keyBy进行单词分组KeyedStream<Tuple2<String, Integer>, String> keyedStream = mapStream.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> value) throws Exception {return value.f0;}});//3.4 使用reduce(sum)进行聚合操作,sum:就是根据第一个元素(Integer)进行sum操作SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyedStream.sum(1);//4.数据输出result.print();//5.启动流式任务env.execute();}
}
5.4Checkpoint配置

修改flink-conf.yaml文件

execution.checkpointing.interval: 5000
#设置有且仅有一次模式 目前支持 EXACTLY_ONCE、AT_LEAST_ONCE        
execution.checkpointing.mode: EXACTLY_ONCE
state.backend: hashmap
#设置checkpoint的存储方式
state.checkpoint-storage: filesystem
#设置checkpoint的存储位置
state.checkpoints.dir: hdfs://node1:8020/checkpoints
#设置savepoint的存储位置
state.savepoints.dir: hdfs://node1:8020/checkpoints
#设置checkpoint的超时时间 即一次checkpoint必须在该时间内完成 不然就丢弃
execution.checkpointing.timeout: 600000
#设置两次checkpoint之间的最小时间间隔
execution.checkpointing.min-pause: 500
#设置并发checkpoint的数目
execution.checkpointing.max-concurrent-checkpoints: 1
#开启checkpoints的外部持久化这里设置了清除job时保留checkpoint,默认值时保留一个 假如要保留3个
state.checkpoints.num-retained: 3
#默认情况下,checkpoint不是持久化的,只用于从故障中恢复作业。当程序被取消时,它们会被删除。但是你可以配置checkpoint被周期性持久化到外部,类似于savepoints。这些外部的checkpoints将它们的元数据输出到外#部持久化存储并且当作业失败时不会自动
清除。这样,如果你的工作失败了,你就会有一个checkpoint来恢复。
#ExternalizedCheckpointCleanup模式配置当你取消作业时外部checkpoint会产生什么行为:
#RETAIN_ON_CANCELLATION: 当作业被取消时,保留外部的checkpoint。注意,在此情况下,您必须手动清理checkpoint状态。
#DELETE_ON_CANCELLATION: 当作业被取消时,删除外部化的checkpoint。只有当作业失败时,检查点状态才可用。
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION#------------------------------------# 设置固定延迟策略
restart-strategy: fixed-delay
# 尝试重启次数
restart-strategy.fixed-delay.attempts: 3
# 两次连续重启的间隔时间
restart-strategy.fixed-delay.delay: 3 s
fixed-delay重启策略

提交命令:

#1.启动HDFS
#2.把jar包上传到Linux
#3.配置Flink的Checkpoint和重启策略
#4.提交任务cd $FLINK_HOMEbin/flink run -c test.RestartStrategy /root/original-gz_flinkbase-1.0-SNAPSHOT.jar
#5.在socket中数据单词
nc -lk 9999
hadoop
hive
flink
evil

运行结果:
在这里插入图片描述

6.官方推荐的配置

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/3686.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

全志ARM-修改开发板内核启动日志

修改开发板内核日志输出级别&#xff1a; 默认输出级别为1&#xff0c;需要用超级用户权限修改 sudo vi /boot/orangepiEvn.txt 把第一行内核启动输出权限改为7&#xff0c;第二行把输出方式该为“serial”串口输出

后端面试真题--计算机基础篇

计算机基础 1、计算机网络&#xff08;常考&#xff09;2、操作系统3、数据结构4、Linux二级目录三级目录 1、计算机网络&#xff08;常考&#xff09; 1、介绍一下网络七层架构&#xff0c;TCP/IP的四层架构&#xff0c;每一层都有什么协议 2、讲讲TCP/IP的流量控制和拥塞控制…

前端实现将二进制文件流,并下载为excel文件

目录 一、关于二进制流二、项目实践三、常见问题及解决 一、关于二进制流 含义&#xff1a;二进制流是一种计算机文件格式&#xff0c;它的数据以二进制形式存储&#xff0c;与文本文件不同。 二进制文件可以包含任意类型的数据&#xff0c;例如&#xff1a;图像、音频、视频…

机器学习笔记 - scikit-learn中的metric参数和metric_params参数如何使用?

在 scikit-learn 库中的一些算法里面,这里是指需要使用距离度量或相似度度量的算法,这里拿KNeighborsclassifier举例来说,会有metric参数,这个参数用于指定距离度量方法。这个参数有多个可选项,常用的有以下几种: 1.euclidean:欧氏距离。这是默认的距离度量方法。 2.manha…

c++输入年月日,计算是该年的第几天

直接把每个月的天数存到数组直接相加会很方便&#xff0c;不用过多的判断。 注意&#xff1a;润年判断方法&#xff08;可以被4整除但不能被100整除&#xff0c;或者可以被400整除&#xff09; #include <iostream> using namespace std;int main() {int year,month,da…

PyCharm 中的特殊标记

在使用 PyCharm 开发 Python 项目的时候&#xff0c;经常会有一些特殊的标记&#xff0c;有些是 IDE 提示的代码规范&#xff0c;有些则为了方便查找而自定义的标记。 我在之前写过一些关于异常捕获的文章&#xff1a;Python3 PyCharm 捕获异常报 Too broad exception clause …

LeetCode——滑动窗口

滑动窗口 包含特定元素的子串&#xff08;要匹配到的目标&#xff09;&#xff0c;或最长[这个好像没啥意思]、或最短、或等长 思考&#xff1a;&#xff08;暂时感受&#xff09; 1&#xff09;什么时候扩充窗口——串没走完就得扩呀&#xff1b; 2&#xff09;窗口扩充后…

2024年Java JDK下载安装教程,附详细图文

文章目录 简介一、JDK的下载二、JDK的安装三、设置环境变量(不一定需要执行&#xff09; 简介 博主介绍&#xff1a;✌程序员徐师兄、7年大厂程序员经历。全网粉丝12w、csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f34…

4D Solver

好的,我将详细解释一下4D Solver是如何将视频、深度数据与MetaHuman人物表情网格数据相结合的: 4D Solver实际上是一种先进的计算机视觉算法,它能将2D视频图像数据与3D几何网格数据相匹配和对齐。 具体过程分为以下几个步骤: 标定(Calibration) 首先需要对捕捉设备(iPhone或…

Golang操作Redis

一. Redis介绍 1.1 简介 Redis是完全开源免费的&#xff0c;遵循BSD协议&#xff0c;是一个高性能的key-value数据库。 Redis与其它的key-value缓存产品有以下三个特点&#xff1a; Redis支持数据持久化&#xff0c;可以见内存中的数据报错在磁盘中&#xff0c;重启的时候可以…

python hashlib模块介绍

hashlib是Python标准库中的一个模块,用于提供常见哈希函数的接口。它可以用来计算数据的哈希值,包括MD5、SHA-1、SHA-224、SHA-256、SHA-384和SHA-512等算法。 下面是hashlib模块的一些常见方法和特点: 哈希对象的创建:使用hashlib模块的相应函数创建哈希对象,例如md5()、…

助力实现更可持续未来的智能解决方案:AI如何改变世界

人工智能已然成为今年的热门话题。由于生成式AI应用的快速采用&#xff0c;新闻头条充斥着有关AI如何彻底改变我们的政策制定、就业和经济走向的预测。您知道AI也是我们应对各种可持续发展挑战的先锋吗&#xff1f;AI通过分析大量数据&#xff0c;并提供有用的见解和工具&#…

【Unity动画系统】Animator组件的属性

介绍Animator组件的全部属性 Controller&#xff1a;动画控制器 Avatar&#xff1a;人物骨骼 Apply Root Motion&#xff1a;有一些动画片段自带位移&#xff0c;如果希望自带的位移应用在游戏对象上&#xff0c;那么就勾选&#xff1b;如果自己编写脚本&#xff0c;那么就不…

Swift 中如何四舍五入

在 Swift 中&#xff0c;你可以使用不同的方法来进行四舍五入操作&#xff0c;具体取决于你的需求和场景。以下是几种常见的方法&#xff1a; round()函数 round()函数可以对浮点数进行四舍五入&#xff0c;并返回最接近的整数。 let number 3.75let roundedNumber round(n…

深度学习和强化学习的区别

深度学习和强化学习是机器学习领域中两个非常重要的研究方向&#xff0c;它们有着不同的应用、原理和目标。虽然这两者可以结合使用&#xff08;例如在深度强化学习中&#xff09;&#xff0c;但它们的基本概念和方法有着根本的区别。 深度学习&#xff08;Deep Learning&…

【静态分析】静态分析笔记08 - 指针分析 - 上下文敏感

参考&#xff1a; 【课程笔记】南大软件分析课程8——指针分析-上下文敏感&#xff08;课时11/12&#xff09; - 简书 ------------------------------------------------------------------------------------------------------------- 1. 上下文不敏感的问题 说明&#…

C# 图像处理 添加水印

方法1&#xff0c;使用自带的画刷进行绘制水印 示例代码 public partial class Form1 : Form{public Form1(){InitializeComponent();}string photoPathstring.Empty;Bitmap image null;private void button1_Click(object sender, EventArgs e) //选择照片{OpenFileDialog d…

SQL中PIVOT函数的用法

文章目录 前言1. SQL Server2. MySQL3. Oracle 前言 PIVOT 是 SQL Server 中的一个功能&#xff0c;用于将行转换为列。然而&#xff0c;MySQL 和 Oracle 不直接支持 PIVOT 语法。但是&#xff0c;你可以使用条件聚合或其他技术来模拟 PIVOT 的行为。 语法&#xff1a; -- 从子…

设计模式:开放-封闭原则(Open-Closed Principle,OCP)介绍

开放-封闭原则&#xff08;Open-Closed Principle&#xff0c;OCP&#xff09;是面向对象设计原则之一&#xff0c;它指导我们编写可扩展和易维护的代码。该原则的核心思想是&#xff1a; 软件实体&#xff08;类、模块、函数等&#xff09;应该对扩展开放&#xff0c;对修改封…

Anagrams

描述 Most crossword puzzle&#xff08;猜字谜&#xff09; fans are used to anagrams&#xff08;字谜&#xff09;--groups of words with the same letters in different orders--for example OPTS, SPOT, STOP, POTS and POST. Some words however do not have this att…