Flink四大基石之CheckPoint(检查点) 的使用详解

目录

一、Checkpoint 剖析

State 与 Checkpoint 概念区分

设置 Checkpoint 实战

执行代码所需的服务与遇到的问题

二、重启策略解读

重启策略意义

代码示例与效果展示

三、SavePoint 

与 Checkpoint 异同

操作步骤详解

四、总结


        在大数据流式处理领域,Apache Flink 凭借其卓越的性能和强大的功能占据重要地位。而理解 Flink 中的 Checkpoint(检查点)、重启策略以及 SavePoint(保存点)这些关键概念,对于保障流处理任务的稳定性、容错性以及可维护性至关重要。本文将深入剖析它们的原理、用法,并结合实际代码示例展示其效果,希望能帮助大家更好地掌握 Flink 相关知识。

一、Checkpoint 剖析

State 与 Checkpoint 概念区分

State(状态)

        在 Flink 中,State 代表某一个 Operator(算子)在某一时刻的状态,像常见的聚合算子 maxBysum 等操作过程中就会维护状态信息。比如在对数据流按某个字段做 sum 聚合时,它需要记住历史数据以便持续累加计算,并且这些状态数据默认存于内存之中,为算子的持续、准确运行提供依据。

Checkpoint(检查点 / 快照点)

        它是 Flink 中所有有状态的 Operator 在某一个特定时刻的 State 快照信息汇总,也就是 State 的存档记录。可以简单理解为对整个作业运行时状态拍了一张 “照片”,定格所有相关算子彼时的状态,方便后续在故障恢复等场景使用。

设置 Checkpoint 实战

以下是一段设置 Checkpoint 的 Flink Java 代码示例:

package com.bigdata.day06;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class _01CheckPointDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 在windows运行,将数据提交hdfs,会出现权限问题,使用这个语句解决。System.setProperty("HADOOP_USER_NAME", "root");// 在这个基础之上,添加快照// 第一句:开启快照,每隔1s保存一次快照env.enableCheckpointing(1000);// 第二句:设置快照保存的位置env.setStateBackend(new FsStateBackend("hdfs://bigdata01:9820/flink/checkpoint"));// 第三句: 通过webui的cancel按钮,取消flink的job时,不删除HDFS的checkpoint目录env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//2. source-加载数据DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 9999);SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String s) throws Exception {String[] arr = s.split(",");return Tuple2.of(arr[0], Integer.valueOf(arr[1]));}});//3. transformation-数据处理转换SingleOutputStreamOperator<Tuple2<String, Integer>> result = mapStream.keyBy(0).sum(1);result.print();//4. sink-数据输出//5. execute-执行env.execute();}
}

执行代码所需的服务与遇到的问题

启动本地的nc, 启动hdfs服务。

启动代码,发现有权限问题:

Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied: user=admin, access=WRITE, inode="/flink":root:supergroup:drwxr-xr-x

解决方案:

System.setProperty("HADOOP_USER_NAME", "root");

        在设置检查点之前,设置一句这样带权限的语句,如果是集群运行中,不存在该问题。可以不设置!!! 

查看快照情况:

        运行,刷新查看checkpoint保存的数据,它会先生成一个新的文件夹,然后再删除老的文件夹,在某一时刻,会出现两个文件夹同时存在的情况。

启动HDFS、Flink

start-dfs.sh
start-cluster.sh

数据是保存了,但是并没有起作用,想起作用需要在集群上运行,以下演示集群上的效果:

第一次运行的时候

在本地先clean, 再package ,再Wagon一下:

flink run -c 全类名 /opt/app/flink-test-1.0-SNAPSHOT.jarflink run -c com.bigdata.day06._01CheckPointDemo /opt/app/FlinkDemo-1.0-SNAPSHOT.jar记得,先启动nc ,再启动任务,否则报错!

通过nc -lk 9999 输入以下内容:

想查看运行结果,可以通过使用的slot数量判断一下:

取消flink job的运行

查看一下这次的单词统计到哪个数字了:

第二次运行的时候

flink run -c 全类名  -s hdfs://hadoop10:8020/flink-checkpoint/293395ef7e496bda2eddd153a18d5212/chk-34  /opt/app/flink-test-1.0-SNAPSHOT.jar启动:
flink run -c com.bigdata.day06._01CheckPointDemo -s hdfs://bigdata01:9820/flink/checkpoint/bf416df7225b264fc34f8ff7e3746efe/chk-603  /opt/app/FlinkDemo-1.0-SNAPSHOT.jar

-s 指定从checkpoint目录恢复状态数据 ,注意每个人都不一样

从上一次离开时,截止的checkpoint目录

观察数据:输入一个hello,1 得到新的结果hello,8

二、重启策略解读

重启策略意义


        流式数据如同永不干涸的河流持续流淌,一旦因某条错误数据致使程序异常退出,后续海量数据丢失风险极高,对企业而言,这意味着数据资产受损、业务分析结果偏差等严重后果,重启策略应运而生。它作为独立策略,与 Checkpoint 虽无必然绑定关系(即便没配置 Checkpoint 也能单独配置重启策略),却在保障程序持续运行层面协同发挥关键作用。

        一个流在运行过程中,假如出现了程序异常问题,可以进行重启,比如,在代码中人为添加一些异常:

进行wordcount时,输入了一个bug,1 人为触发异常。

        注意:此时如果有checkpoint ,是不会出现异常的,需要将checkpoint的代码关闭,再重启程序。会发现打印了异常,那为什么checkpoint的时候不打印,因为并没有log4j的配置文件,需要搞一个这样的配置文件才行。

程序中添加log4j.properties的代码:

# Global logging configuration
#  Debug   info   warn  error
log4j.rootLogger=debug, stdout
# MyBatis logging configuration...
log4j.logger.org.mybatis.example.BlogMapper=TRACE
# Console output...
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%5p [%t] - %m%n

        开启检查点之后,报错了程序还在运行是因为开启检查点之后,程序会进行自动重启(无限重启【程序错了才重启】)

//开启checkpoint,默认是无限重启,可以设置为不重启
//env.setRestartStrategy(RestartStrategies.noRestart());//重启3次,重启时间间隔是10s
//env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));//2分钟内重启3次,重启时间间隔是5s
env.setRestartStrategy(RestartStrategies.failureRateRestart(3,Time.of(2,TimeUnit.MINUTES),Time.of(5,TimeUnit.SECONDS))
);env.execute("checkpoint自动重启");   //最后一句execute可以设置jobName,显示在8081界面

程序如果上传至服务器端运行,可以看到重启状态

代码示例与效果展示

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.concurrent.TimeUnit;public class Demo02 {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 代码中不能有checkpoint,不是说checkpoint不好,而是太好了,它已经自带重试机制了。而且是无限重启的// 通过如下方式可将重试机制关掉// env.setRestartStrategy(RestartStrategies.noRestart());//// 两种办法// 第一种办法:重试3次,每一次间隔10S//env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));// 第二种写法:在2分钟内,重启3次,每次间隔10senv.setRestartStrategy(RestartStrategies.failureRateRestart(3,Time.of(2,TimeUnit.MINUTES),Time.of(5,TimeUnit.SECONDS)));//2. source-加载数据DataStreamSource<String> streamSource = env.socketTextStream("bigdata01", 8899);streamSource.map(new MapFunction<String, Tuple2<String,Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {String[] arr = value.split(",");String word = arr[0];if(word.equals("bug")){throw new Exception("有异常,服务会挂掉.....");}// 将一个字符串变为int类型int num = Integer.valueOf(arr[1]);// 第二种将字符串变为数字的方法System.out.println(Integer.parseInt(arr[1]));Tuple2<String, Integer> tuple2 = new Tuple2<>(word,num);// 还有什么方法? 第二种创建tuple的方法Tuple2<String, Integer> tuple2_2 = Tuple2.of(word,num);return tuple2;}}).keyBy(tuple->tuple.f0).sum(1).print();//3. transformation-数据处理转换//4. sink-数据输出//5. execute-执行env.execute();}
}

        在此代码中人为在 map 函数里设置异常触发点(输入包含 “bug” 的数据时抛出异常)。若开启 Checkpoint,因它自带重试机制(默认无限重启),异常可能被掩盖,需关闭 Checkpoint 相关代码才能看到异常打印情况。同时,要完整看到重启策略效果(如按设定的次数、间隔重启),需打包代码上传至集群运行,本地测试难以呈现完整现象,且提交时务必确认使用的类名准确无误。

三、SavePoint 

与 Checkpoint 异同

相同点

        本质都是对 Flink 作业状态的一种保存方式,以便后续恢复作业时复用状态,保障数据处理连贯性。

不同点

        Checkpoint 是 Flink 自动按设定规则周期性完成 State 快照保存,旨在应对故障自动恢复场景;而 SavePoint 是手动触发的快照操作,提供更灵活的作业状态管理时机,比如在版本升级、业务规则调整需暂停并后续重启作业场景发挥优势。

操作步骤详解

提交作业并输入数据

        提交含重启策略代码打包成的 jar 包运行作业(类似 flink run -c 全类名 /opt/app/flink-test-1.0-SNAPSHOT.jar),输入数据观察单词对应数字变化。

执行 SavePoint 操作

以下是 -->  停止flink job,并且触发savepoint操作
flink stop --savepointPath  hdfs://bigdata01:9820/flink-savepoint  152e493da9cdeb327f6cbbad5a7f8e41后面的序号为Job 的ID以下是 -->  不会停止flink的job,只是完成savepoint操作
flink savepoint 79f53c5c0bb3563b6b6ed3011176c411 hdfs://bigdata01:9820/flink-savepoint

备注:如何正确停止一个 flink 的任务

flink stop 6a27b580aa5c6b57766ae6241d9270ce(任务编号)

查看与重启作业

        查看最近完成作业对应的 SavePoint,之后依据之前保存路径重启作业(flink run -c 全类名 -s hdfs://hadoop10:8020/flink-savepoint/savepoint-79f53c-64b5d94771eb /opt/app/flink-test-1.0-SNAPSHOT.jar),再次输入数据可看到基于之前状态的累加效果。

        此外,在集群运行 Flink 程序时,默认并行度常为 1,它不会按照机器的CPU核数,而是按照配置文件中的一个默认值运行的。比如:flink-conf.yaml

web-ui 界面提交作业:


这个图形化界面,跟我们使用如下命令是一个效果:

flink run -c com.bigdata.day06._01CheckPointDemo -s hdfs://bigdata01:9820/flink/checkpoint/bf416df7225b264fc34f8ff7e3746efe/chk-603  /opt/app/FlinkDemo-1.0-SNAPSHOT.jar

四、总结

        通过对 Flink 中 Checkpoint、重启策略和 SavePoint 的详细解读与代码实践展示,我们明晰它们各自在保障流处理任务稳定、容错与灵活运维层面的独特价值。合理运用这些机制,能助我们打造更健壮、高效的 Flink 大数据处理应用,从容应对复杂多变的业务需求与运行环境挑战,后续大家可在实际项目中深入实践优化,挖掘其更大潜力。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/888220.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LeetCode Hot100 31~40

链表 31. K个一组翻转链表 题目不难理解 主要是怎么写出清晰易懂的代码 可以先分成K组 再排序 class Solution { public:ListNode* reverseKGroup(ListNode* head, int k) {ListNode* dummyHead new ListNode();dummyHead->next head;// 首先查看需要翻转几次int count…

字典树TRIE

模板 模板总共分为两部分 插入一个字符串查找一个字符串 int idx 0; int trie[3000010][150]; int ans[3000010];##原理 trie[上节点编号][下方连接的字母] 下方连接的字母的节点编号 trie[0][0]1;trie[0][1]5; trie[1][1]2; trie[2][1]4;trie[2][2]3; trie[5][2]6; tri…

Python学习第十五天--魔术方法

魔法方法就是可以给你的类增加魔力的特殊方法&#xff0c;它们总被双下划线所包围&#xff0c;像这种格式:"__方法名__"&#xff0c;这些方法很强大&#xff0c;充满魔力&#xff0c;可以让你实现很多功能。 使用dir()查看类的所有属性和方法 class A:passprint(di…

支持JT1078和GB28181的流媒体服务器-LKM启动配置文件参数说明

流媒体服务器地址&#xff1a;https://github.com/lkmio/lkm GB28181信令&#xff0c;模拟多个国标设备工具&#xff1a;https://github.com/lkmio/gb-cms 文章目录 gop_cachegop_buffer_sizeprobe_timeoutwrite_timeoutmw_latencylisten_ippublic_ipidle_timeoutreceive_timeo…

【MySQL-6】MySQL的复合查询

1. 整体学习的思维导图 2. 回顾基本查询 使用scott数据库中的表&#xff0c;完成以下查询&#xff1a; 查询工资高于500或岗位为MANAGER的雇员&#xff0c;同时还要满足他们的姓名首字母为大写的J mysql> select * from emp where (sal>500 or jobMANAGER) and ename …

STL算法之其它算法_中

目录 lower_bound(应用于有序区间) upper_bound&#xff08;应用于有序区间&#xff09; binary_search&#xff08;应用于有序区间&#xff09; next_permutation prev_permutation lower_bound(应用于有序区间) 这是二分查找(binary search)的一种版本&#xff0c;试图在…

GEE教程——Google Earth Engine 处理和分析刚果民主共和国(DR Congo)地区的 Landsat 8 图像(NDVI和NDWI)

目录 简介 函数 sort(property, ascending) Arguments: Returns: Collection size() Arguments: Returns: Integer 代码解释 代码 结果 简介 GEE教程——Google Earth Engine 处理和分析刚果民主共和国(DR Congo)地区的 Landsat 8 图像(NDVI和NDWI) 函数 sor…

[高阶数据结构六]最短路径算法

1.前言 最短路径算法是在图论的基础上讲解的&#xff0c;如果你还不知道图论的相关知识的话&#xff0c;可以阅读下面几篇文章。 [高阶数据结构四] 初始图论_初始图结构-CSDN博客 [高阶数据结构五] 图的遍历和最小生成树_图的遍历和生成树求解-CSDN博客 本章重点&#xff1a;…

Meta Reality Labs的VR/AR投资战略转向:内部视角与市场影响

最近,关于Meta(原Facebook)计划减少其在消费者虚拟现实(VR)领域的投资而增加对增强现实(AR)眼镜的投资的消息引起了广泛讨论。这一战略调整不仅反映了Meta对未来技术趋势的看法,也揭示了公司在面对激烈的市场竞争时所采取的新方向。本文将从不同角度探讨此次战略转向的…

ASP.NET Core项目中使用SqlSugar连接多个数据库的方式

之前学习ASP.NETCore及SqlSugar时都是只连接单个数据库处理数据&#xff0c;仅需在Program文件中添加ISqlSugarClient的单例即可&#xff08;如下代码所示&#xff09;。 builder.Services.AddSingleton<ISqlSugarClient>(s > {SqlSugarScope sqlSugar new SqlSugar…

flutter_quill如何设置Editor中的文字为富文本

比如一个场景 在输入框中&#xff0c;某某某 是一个颜色&#xff0c;其他文本是一个颜色 这里要注意 const QuillEditor({required this.controller,required this.focusNode,required this.scrollController,required this.scrollable,required this.padding,required this…

uniapp:封装商品列表为组件并使用

封装商品列表为组件并使用 商品组件封装 <template><!-- 商品列表 --><view class"goods_list"><view class"goods_item" v-for"item in goods" :key"item.id"><image :src"item.img_url">…

【AI系统】LLVM 架构设计和原理

LLVM 架构设计和原理 在上一篇文章中&#xff0c;我们详细探讨了 GCC 的编译过程和原理。然而&#xff0c;由于 GCC 存在代码耦合度高、难以进行独立操作以及庞大的代码量等缺点。正是由于对这些问题的意识&#xff0c;人们开始期待新一代编译器的出现。在本节&#xff0c;我们…

【C语言】结构体(二)

一&#xff0c;结构体的初始化 和其它类型变量一样&#xff0c;对结构体变量可以在定义时指定初始值 #include <stdio.h> #include <stdlib.h> struct books // 结构体类型 {char title[50];char author[50]; //结构体成员char subject[100];int book_id; }…

【docker】docker网络六种网络模式

Docker 网络模式总结 网络模式描述使用场景bridge默认的网络模式&#xff0c;容器之间通过虚拟网桥通信&#xff0c;容器与宿主机隔离。单机部署、本地开发、小型项目host容器与宿主机共享网络堆栈&#xff0c;容器直接使用宿主机的 IP 地址。高性能网络应用、日志处理、大量与…

四、初识C语言(4)

一、作业&#xff1a;static修饰局部变量 #define _CRT_SECURE_NO_WARNINGS 1 #include <stdio.h> #include <string.h> //作业&#xff1a;static修饰局部变量 int sum (int a) {int c 0;static int b 3;c 1;b 2;return (abc); } int main() {int i 0;int a …

【python】类方法和静态方法

类方法 通过classmethod装饰器实现 class A(object):bar 1classmethoddef class_foo(cls):print Hello, , clsprint cls.bar>>> A.class_foo() # 直接通过类来调用方法 Hello, <class __main__.A> 1在上面&#xff0c;我们使用了 classmethod 装饰方法 clas…

Linux 中的 ls 命令:从使用到源码解析

ls 命令是 Linux 系统中最常用和最基本的命令之一。下面将深入探讨 ls 命令的使用方法、工作原理、源码解析以及实际应用场景。 1. ls 命令的使用** ls 命令用于列出目录内容&#xff0c;显示文件和目录的详细信息。 1.1 基本用法 ls [选项] [文件或目录]例如&#xff1a; …

SQL 中SET @variable的使用

在 SQL 中&#xff0c;SET variable 用于声明和赋值用户定义的变量。具体来说&#xff0c; 符号用于表示一个局部变量&#xff0c;可以在 SQL 语句中存储和使用。它通常在存储过程、函数或简单的 SQL 查询中使用。 1. 声明并赋值给变量 你可以使用 SET 语句给一个变量赋值。例…

The selected directory is not a valid home for Go SDK

在idea里配置go语言的环境时&#xff0c;选择go语言的安装目录&#xff0c;一直提示这个 The selected directory is not a valid home for Go SDK后来查了一下&#xff0c;发现原来idea识别不出来 需要改一下配置文件&#xff0c;找到go环境的安装目录&#xff0c;我是默认安…