Flink四大基石之CheckPoint(检查点) 的使用详解

目录

一、Checkpoint 剖析

State 与 Checkpoint 概念区分

设置 Checkpoint 实战

执行代码所需的服务与遇到的问题

二、重启策略解读

重启策略意义

代码示例与效果展示

三、SavePoint 

与 Checkpoint 异同

操作步骤详解

四、总结


        在大数据流式处理领域,Apache Flink 凭借其卓越的性能和强大的功能占据重要地位。而理解 Flink 中的 Checkpoint(检查点)、重启策略以及 SavePoint(保存点)这些关键概念,对于保障流处理任务的稳定性、容错性以及可维护性至关重要。本文将深入剖析它们的原理、用法,并结合实际代码示例展示其效果,希望能帮助大家更好地掌握 Flink 相关知识。

一、Checkpoint 剖析

State 与 Checkpoint 概念区分

State(状态)

        在 Flink 中,State 代表某一个 Operator(算子)在某一时刻的状态,像常见的聚合算子 maxBysum 等操作过程中就会维护状态信息。比如在对数据流按某个字段做 sum 聚合时,它需要记住历史数据以便持续累加计算,并且这些状态数据默认存于内存之中,为算子的持续、准确运行提供依据。

Checkpoint(检查点 / 快照点)

        它是 Flink 中所有有状态的 Operator 在某一个特定时刻的 State 快照信息汇总,也就是 State 的存档记录。可以简单理解为对整个作业运行时状态拍了一张 “照片”,定格所有相关算子彼时的状态,方便后续在故障恢复等场景使用。

设置 Checkpoint 实战

以下是一段设置 Checkpoint 的 Flink Java 代码示例:

package com.bigdata.day06;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class _01CheckPointDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 在windows运行,将数据提交hdfs,会出现权限问题,使用这个语句解决。System.setProperty("HADOOP_USER_NAME", "root");// 在这个基础之上,添加快照// 第一句:开启快照,每隔1s保存一次快照env.enableCheckpointing(1000);// 第二句:设置快照保存的位置env.setStateBackend(new FsStateBackend("hdfs://bigdata01:9820/flink/checkpoint"));// 第三句: 通过webui的cancel按钮,取消flink的job时,不删除HDFS的checkpoint目录env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//2. source-加载数据DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 9999);SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String s) throws Exception {String[] arr = s.split(",");return Tuple2.of(arr[0], Integer.valueOf(arr[1]));}});//3. transformation-数据处理转换SingleOutputStreamOperator<Tuple2<String, Integer>> result = mapStream.keyBy(0).sum(1);result.print();//4. sink-数据输出//5. execute-执行env.execute();}
}

执行代码所需的服务与遇到的问题

启动本地的nc, 启动hdfs服务。

启动代码,发现有权限问题:

Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied: user=admin, access=WRITE, inode="/flink":root:supergroup:drwxr-xr-x

解决方案:

System.setProperty("HADOOP_USER_NAME", "root");

        在设置检查点之前,设置一句这样带权限的语句,如果是集群运行中,不存在该问题。可以不设置!!! 

查看快照情况:

        运行,刷新查看checkpoint保存的数据,它会先生成一个新的文件夹,然后再删除老的文件夹,在某一时刻,会出现两个文件夹同时存在的情况。

启动HDFS、Flink

start-dfs.sh
start-cluster.sh

数据是保存了,但是并没有起作用,想起作用需要在集群上运行,以下演示集群上的效果:

第一次运行的时候

在本地先clean, 再package ,再Wagon一下:

flink run -c 全类名 /opt/app/flink-test-1.0-SNAPSHOT.jarflink run -c com.bigdata.day06._01CheckPointDemo /opt/app/FlinkDemo-1.0-SNAPSHOT.jar记得,先启动nc ,再启动任务,否则报错!

通过nc -lk 9999 输入以下内容:

想查看运行结果,可以通过使用的slot数量判断一下:

取消flink job的运行

查看一下这次的单词统计到哪个数字了:

第二次运行的时候

flink run -c 全类名  -s hdfs://hadoop10:8020/flink-checkpoint/293395ef7e496bda2eddd153a18d5212/chk-34  /opt/app/flink-test-1.0-SNAPSHOT.jar启动:
flink run -c com.bigdata.day06._01CheckPointDemo -s hdfs://bigdata01:9820/flink/checkpoint/bf416df7225b264fc34f8ff7e3746efe/chk-603  /opt/app/FlinkDemo-1.0-SNAPSHOT.jar

-s 指定从checkpoint目录恢复状态数据 ,注意每个人都不一样

从上一次离开时,截止的checkpoint目录

观察数据:输入一个hello,1 得到新的结果hello,8

二、重启策略解读

重启策略意义


        流式数据如同永不干涸的河流持续流淌,一旦因某条错误数据致使程序异常退出,后续海量数据丢失风险极高,对企业而言,这意味着数据资产受损、业务分析结果偏差等严重后果,重启策略应运而生。它作为独立策略,与 Checkpoint 虽无必然绑定关系(即便没配置 Checkpoint 也能单独配置重启策略),却在保障程序持续运行层面协同发挥关键作用。

        一个流在运行过程中,假如出现了程序异常问题,可以进行重启,比如,在代码中人为添加一些异常:

进行wordcount时,输入了一个bug,1 人为触发异常。

        注意:此时如果有checkpoint ,是不会出现异常的,需要将checkpoint的代码关闭,再重启程序。会发现打印了异常,那为什么checkpoint的时候不打印,因为并没有log4j的配置文件,需要搞一个这样的配置文件才行。

程序中添加log4j.properties的代码:

# Global logging configuration
#  Debug   info   warn  error
log4j.rootLogger=debug, stdout
# MyBatis logging configuration...
log4j.logger.org.mybatis.example.BlogMapper=TRACE
# Console output...
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%5p [%t] - %m%n

        开启检查点之后,报错了程序还在运行是因为开启检查点之后,程序会进行自动重启(无限重启【程序错了才重启】)

//开启checkpoint,默认是无限重启,可以设置为不重启
//env.setRestartStrategy(RestartStrategies.noRestart());//重启3次,重启时间间隔是10s
//env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));//2分钟内重启3次,重启时间间隔是5s
env.setRestartStrategy(RestartStrategies.failureRateRestart(3,Time.of(2,TimeUnit.MINUTES),Time.of(5,TimeUnit.SECONDS))
);env.execute("checkpoint自动重启");   //最后一句execute可以设置jobName,显示在8081界面

程序如果上传至服务器端运行,可以看到重启状态

代码示例与效果展示

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.concurrent.TimeUnit;public class Demo02 {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 代码中不能有checkpoint,不是说checkpoint不好,而是太好了,它已经自带重试机制了。而且是无限重启的// 通过如下方式可将重试机制关掉// env.setRestartStrategy(RestartStrategies.noRestart());//// 两种办法// 第一种办法:重试3次,每一次间隔10S//env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));// 第二种写法:在2分钟内,重启3次,每次间隔10senv.setRestartStrategy(RestartStrategies.failureRateRestart(3,Time.of(2,TimeUnit.MINUTES),Time.of(5,TimeUnit.SECONDS)));//2. source-加载数据DataStreamSource<String> streamSource = env.socketTextStream("bigdata01", 8899);streamSource.map(new MapFunction<String, Tuple2<String,Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {String[] arr = value.split(",");String word = arr[0];if(word.equals("bug")){throw new Exception("有异常,服务会挂掉.....");}// 将一个字符串变为int类型int num = Integer.valueOf(arr[1]);// 第二种将字符串变为数字的方法System.out.println(Integer.parseInt(arr[1]));Tuple2<String, Integer> tuple2 = new Tuple2<>(word,num);// 还有什么方法? 第二种创建tuple的方法Tuple2<String, Integer> tuple2_2 = Tuple2.of(word,num);return tuple2;}}).keyBy(tuple->tuple.f0).sum(1).print();//3. transformation-数据处理转换//4. sink-数据输出//5. execute-执行env.execute();}
}

        在此代码中人为在 map 函数里设置异常触发点(输入包含 “bug” 的数据时抛出异常)。若开启 Checkpoint,因它自带重试机制(默认无限重启),异常可能被掩盖,需关闭 Checkpoint 相关代码才能看到异常打印情况。同时,要完整看到重启策略效果(如按设定的次数、间隔重启),需打包代码上传至集群运行,本地测试难以呈现完整现象,且提交时务必确认使用的类名准确无误。

三、SavePoint 

与 Checkpoint 异同

相同点

        本质都是对 Flink 作业状态的一种保存方式,以便后续恢复作业时复用状态,保障数据处理连贯性。

不同点

        Checkpoint 是 Flink 自动按设定规则周期性完成 State 快照保存,旨在应对故障自动恢复场景;而 SavePoint 是手动触发的快照操作,提供更灵活的作业状态管理时机,比如在版本升级、业务规则调整需暂停并后续重启作业场景发挥优势。

操作步骤详解

提交作业并输入数据

        提交含重启策略代码打包成的 jar 包运行作业(类似 flink run -c 全类名 /opt/app/flink-test-1.0-SNAPSHOT.jar),输入数据观察单词对应数字变化。

执行 SavePoint 操作

以下是 -->  停止flink job,并且触发savepoint操作
flink stop --savepointPath  hdfs://bigdata01:9820/flink-savepoint  152e493da9cdeb327f6cbbad5a7f8e41后面的序号为Job 的ID以下是 -->  不会停止flink的job,只是完成savepoint操作
flink savepoint 79f53c5c0bb3563b6b6ed3011176c411 hdfs://bigdata01:9820/flink-savepoint

备注:如何正确停止一个 flink 的任务

flink stop 6a27b580aa5c6b57766ae6241d9270ce(任务编号)

查看与重启作业

        查看最近完成作业对应的 SavePoint,之后依据之前保存路径重启作业(flink run -c 全类名 -s hdfs://hadoop10:8020/flink-savepoint/savepoint-79f53c-64b5d94771eb /opt/app/flink-test-1.0-SNAPSHOT.jar),再次输入数据可看到基于之前状态的累加效果。

        此外,在集群运行 Flink 程序时,默认并行度常为 1,它不会按照机器的CPU核数,而是按照配置文件中的一个默认值运行的。比如:flink-conf.yaml

web-ui 界面提交作业:


这个图形化界面,跟我们使用如下命令是一个效果:

flink run -c com.bigdata.day06._01CheckPointDemo -s hdfs://bigdata01:9820/flink/checkpoint/bf416df7225b264fc34f8ff7e3746efe/chk-603  /opt/app/FlinkDemo-1.0-SNAPSHOT.jar

四、总结

        通过对 Flink 中 Checkpoint、重启策略和 SavePoint 的详细解读与代码实践展示,我们明晰它们各自在保障流处理任务稳定、容错与灵活运维层面的独特价值。合理运用这些机制,能助我们打造更健壮、高效的 Flink 大数据处理应用,从容应对复杂多变的业务需求与运行环境挑战,后续大家可在实际项目中深入实践优化,挖掘其更大潜力。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/888220.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

字典树TRIE

模板 模板总共分为两部分 插入一个字符串查找一个字符串 int idx 0; int trie[3000010][150]; int ans[3000010];##原理 trie[上节点编号][下方连接的字母] 下方连接的字母的节点编号 trie[0][0]1;trie[0][1]5; trie[1][1]2; trie[2][1]4;trie[2][2]3; trie[5][2]6; tri…

【MySQL-6】MySQL的复合查询

1. 整体学习的思维导图 2. 回顾基本查询 使用scott数据库中的表&#xff0c;完成以下查询&#xff1a; 查询工资高于500或岗位为MANAGER的雇员&#xff0c;同时还要满足他们的姓名首字母为大写的J mysql> select * from emp where (sal>500 or jobMANAGER) and ename …

STL算法之其它算法_中

目录 lower_bound(应用于有序区间) upper_bound&#xff08;应用于有序区间&#xff09; binary_search&#xff08;应用于有序区间&#xff09; next_permutation prev_permutation lower_bound(应用于有序区间) 这是二分查找(binary search)的一种版本&#xff0c;试图在…

[高阶数据结构六]最短路径算法

1.前言 最短路径算法是在图论的基础上讲解的&#xff0c;如果你还不知道图论的相关知识的话&#xff0c;可以阅读下面几篇文章。 [高阶数据结构四] 初始图论_初始图结构-CSDN博客 [高阶数据结构五] 图的遍历和最小生成树_图的遍历和生成树求解-CSDN博客 本章重点&#xff1a;…

uniapp:封装商品列表为组件并使用

封装商品列表为组件并使用 商品组件封装 <template><!-- 商品列表 --><view class"goods_list"><view class"goods_item" v-for"item in goods" :key"item.id"><image :src"item.img_url">…

【AI系统】LLVM 架构设计和原理

LLVM 架构设计和原理 在上一篇文章中&#xff0c;我们详细探讨了 GCC 的编译过程和原理。然而&#xff0c;由于 GCC 存在代码耦合度高、难以进行独立操作以及庞大的代码量等缺点。正是由于对这些问题的意识&#xff0c;人们开始期待新一代编译器的出现。在本节&#xff0c;我们…

【C语言】结构体(二)

一&#xff0c;结构体的初始化 和其它类型变量一样&#xff0c;对结构体变量可以在定义时指定初始值 #include <stdio.h> #include <stdlib.h> struct books // 结构体类型 {char title[50];char author[50]; //结构体成员char subject[100];int book_id; }…

四、初识C语言(4)

一、作业&#xff1a;static修饰局部变量 #define _CRT_SECURE_NO_WARNINGS 1 #include <stdio.h> #include <string.h> //作业&#xff1a;static修饰局部变量 int sum (int a) {int c 0;static int b 3;c 1;b 2;return (abc); } int main() {int i 0;int a …

Linux 中的 ls 命令:从使用到源码解析

ls 命令是 Linux 系统中最常用和最基本的命令之一。下面将深入探讨 ls 命令的使用方法、工作原理、源码解析以及实际应用场景。 1. ls 命令的使用** ls 命令用于列出目录内容&#xff0c;显示文件和目录的详细信息。 1.1 基本用法 ls [选项] [文件或目录]例如&#xff1a; …

The selected directory is not a valid home for Go SDK

在idea里配置go语言的环境时&#xff0c;选择go语言的安装目录&#xff0c;一直提示这个 The selected directory is not a valid home for Go SDK后来查了一下&#xff0c;发现原来idea识别不出来 需要改一下配置文件&#xff0c;找到go环境的安装目录&#xff0c;我是默认安…

Leetcode581. 最短无序连续子数组(HOT100)

链接 我的代码&#xff1a; class Solution { public:int findUnsortedSubarray(vector<int>& nums) {vector<int> res nums;sort(res.begin(),res.end());int l 0,r nums.size()-1;while(nums[l]res[l]){l;if(lnums.size()){return 0;}}while(nums[r]res…

SQL优化与性能——数据库事务管理

数据库事务管理是数据库系统中至关重要的一部分&#xff0c;确保了数据的一致性、完整性、可靠性和隔离性。尤其在高并发、高负载的系统中&#xff0c;事务管理的设计和实现直接影响到系统的稳定性和性能。本章将详细探讨以下内容&#xff1a;事务的ACID特性、使用 BEGIN、COMM…

【Robocasa】Code Review

文章目录 OverviewalgoInitializationImportant Class MethodsTrain LoopTest Time ConfigsdemoConfig FactoryConfig StructureConfig Locking默认锁定状态配置修改的上下文管理器 dataset示例数据集对象参数说明 model基础模块EncoderCoreVisualCoreScanCore随机化器 (Random…

【单细胞数据库】癌症单细胞数据库CancerSEA

数据库地址&#xff1a;home (hrbmu.edu.cn) Cite Huating Yuan, Min Yan, Guanxiong Zhang, Wei Liu, Chunyu Deng, Gaoming Liao, Liwen Xu, Tao Luo, Haoteng Yan, Zhilin Long, Aiai Shi, Tingting Zhao, Yun Xiao, Xia Li, CancerSEA: a cancer single-cell state atlas…

React 的学习记录一:与 Vue 的相同点和区别

目录 一、学习目标 二、学习内容1️⃣——React的特点 1.组件化设计 2.单向数据流 3.声明式 UI 4.虚拟 DOM 5.Hooks 6.JSX 7.React Native 三、React与vue的比较总结 四、总结 一、学习目标 时间&#xff1a;两周 内容&#xff1a; React的特点React的入门React的…

数据库管理-第267期 23ai:Oracle Data Redaction演示(20241128)

数据库管理267期 2024-11-286 数据库管理-第267期 23ai&#xff1a;Oracle Data Redaction演示&#xff08;20241128&#xff09;1 示例表及数据2 创建编校策略2.1 名字全编校2.2 电话部分编校 3 DML演示3.1 场景13.2 场景2 总结 数据库管理-第267期 23ai&#xff1a;Oracle Da…

hue 4.11容器化部署,已结合Hive与Hadoop

配合《Hue 部署过程中的报错处理》食用更佳 官方配置说明页面&#xff1a; https://docs.gethue.com/administrator/configuration/connectors/ 官方配置hue.ini页面 https://github.com/cloudera/hue/blob/master/desktop/conf.dist/hue.ini docker部署 注意&#xff1a; …

Spring Boot自定义启动banner

在启动 Springboot 应用时&#xff0c;默认情况下会在控制台打印出 Springboot 相关的banner信息。 自定义banner 如果你想自定义一个独特的启动banner&#xff0c;该怎么做呢&#xff1f;Springboot 允许我们通过自定义启动banner来替换默认的banner。只需要在 resources 目…

leaflet 的基础使用

目录 一、创建dom节点 二、创建地图 三、添加底图&#xff08;天地图&#xff09;&#xff0c;在地图创建完成后添加底图 本章主要讲述leaflet在vue中的使用&#xff1a; leaflet 详情总目录&#xff1a;传送 一、创建dom节点 <div class"map" id"map_…

ubuntu的用户使用

ubuntu系统中的常规用户登录方式 在系统root用户是无法直接登录的,因为root用户的权限过大所以其安全性比较差 在登录系统时一般使用在安装系统时建立的普通用户登录 如果需要超级用户权限: Ubuntu用户密码破解 在系统安装完成后默认grub启动等待时间为0&#xff0c;建议改…