Flink 容错

Flink的容错机制是确保数据流应用程序在出现故障时能够恢复一致状态的关键组成部分。其核心是通过创建分布式数据流和操作符状态的一致快照来实现,这种快照被称为检查点(Checkpoint)。

1. 检查点(Checkpoint)

  • 保存机制: Flink定期对整个Job任务进行快照,将快照产生的备份数据保存到指定的StateBackend中。这种保存是周期性的,可以根据配置的时间间隔触发。
  • 恢复机制: 当出现故障时,Flink会回退到最后一个成功的检查点,并重新启动所有的算子。这样可以确保即使在故障发生后,应用程序的状态也只会反映数据流中的每个记录一次,实现精确一次(exactly-once)的语义。
  • 控制节点: JobManager中的检查点协调器负责向source节点的数据插入barrier标记,以触发检查点的保存。

2. 检查点分界线(Barrier)

  • 作用:barrier标记表示这个标记之前的所有数据已经将状态更改存入当前检查点。后续的算子节点只要遇到它就开始对状态做持久化快照保存。
  • 策略
    • 精确一次:等待所有并行分区的barrier都到齐,才可以开始状态的保存。处理多次的结果是一样的。
    • 至少一次:对先到的数据进行处理,但可能导致从source重复发送已经处理过的数据。

3. 容错机制的配置

  • 启用检查点:通过StreamExecutionEnvironment.enableCheckpointing(long interval, CheckpointingMode mode)方法启用检查点,并设置时间间隔和模式(如EXACTLY_ONCE)。
  • 其他配置:还包括检查点超时、最大并发检查点数、检查点之间的最小暂停时间、检查点目录等。

4. 状态后端(State Backend)

  • 作用:决定状态在Checkpoint时如何持久化以及持久化在哪里。
  • 类型
    • HashMapStateBackend:将数据以Java对象的形式存储在堆中,适用于有较大状态、较长窗口和较大key/value状态的Job。
    • EmbeddedRocksDBStateBackend:将正在运行中的状态数据保存在RocksDB数据库中,使用异步方式生成快照。

Flink的容错机制通过检查点和状态后端确保了数据流应用程序在故障发生后的恢复能力。通过合理的配置和使用,可以确保应用程序在故障后能够恢复到一致的状态,并继续处理数据,从而实现精确一次的数据处理语义。

Checkpointing

Checkpointing 机制是 Flink 实现容错(Fault Tolerance)和状态一致性(State Consistency)的核心组件。Checkpointing 允许 Flink 在分布式数据流处理过程中捕获操作符(operators)的状态,以便在发生故障时能够恢复并继续处理数据,从而确保数据处理的“恰好一次”(Exactly-Once)语义。

条件

  1. 持久化的数据源:
  • Flink Checkpointing 机制需要与持久化的数据源进行交互,以确保在发生故障时能够从数据源中重新消费指定时间段的记录。
  • 持久化消息队列(如 Apache Kafka, RabbitMQ, Amazon Kinesis, Google PubSub 等)或文件系统(如 HDFS, S3, GFS, NFS, Ceph 等)可以满足这样的需求。
  1. 状态的持久化存储:
  • Flink Checkpointing 需要将操作符的状态进行持久化存储,以便在发生故障时能够恢复状态。
  • 状态通常保存在分布式文件系统中,如 HDFS、S3 等。
  1. Flink 集群配置:
  • Flink 集群必须正确配置并运行,以支持 Checkpointing 机制。
  • 特别是 JobManager 和 TaskManager 的角色需要明确,并确保它们之间的通信畅通无阻。
  1. Checkpointing 启用与配置:
  • Flink 默认不启用 Checkpointing,需要用户显式地在代码中调用 enableCheckpointing() 方法来启用它。
  • 在启用 Checkpointing 时,还需要配置一些关键参数,如:
    • 检查点的时间间隔(通常以毫秒为单位)。
    • 检查点的超时时间(超过该时间后,检查点将被视为失败)。
    • 状态后端(用于存储和恢复状态)。
  1. 状态后端的选择:
  • 根据应用的需求和集群的配置,选择合适的状态后端来存储和恢复状态。
  • 不同的状态后端具有不同的性能和一致性保证。
  1. 容错级别:
  • Flink 支持不同的容错级别,包括“恰好一次”(Exactly-Once)、“至少一次”(At-Least-Once)和“最多一次”(At-Most-Once)。
  • Checkpointing 机制主要用于实现“恰好一次”的容错级别。
  1. 网络和存储稳定性:
  • Flink Checkpointing 需要依赖稳定的网络和存储系统来确保检查点的正确生成和恢复。
  • 如果网络或存储系统不稳定,可能会导致检查点失败或数据丢失。

Flink Checkpointing 的前提条件包括持久化的数据源、状态的持久化存储、正确的 Flink 集群配置、Checkpointing 的启用与配置、合适的状态后端选择、适当的容错级别以及稳定的网络和存储系统。这些条件共同确保了 Flink Checkpointing 机制能够正常运行并提供可靠的数据处理容错能力。

开启&配置

  1. 开启 Checkpointing
    需要首先调用 enableCheckpointing 方法来开启 Checkpointing。这个方法有两个参数:
  • interval:检查点的时间间隔(以毫秒为单位)。例如,如果你想每 1000 毫秒(即 1 秒)生成一个检查点,你可以这样设置:env.enableCheckpointing(1000);
  • mode(可选):Checkpointing 模式。目前 Flink 支持 EXACTLY_ONCE 和 AT_LEAST_ONCE 两种模式。对于大多数应用来说,选择 EXACTLY_ONCE 模式即可满足需求。例如:env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 每 1000ms 开始一次 checkpoint
env.enableCheckpointing(1000);
// 高级选项:
// 设置模式为精确一次 (这是默认值)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 确认 checkpoints 之间的时间会进行 500 ms
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// Checkpoint 必须在一分钟内完成,否则就会被抛弃
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 允许两个连续的 checkpoint 错误
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);     
// 同一时间只允许一个 checkpoint 进行
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留
env.getCheckpointConfig().setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 开启实验性的 unaligned checkpoints
env.getCheckpointConfig().enableUnalignedCheckpoints();
  1. 配置 Checkpoint 存储位置
    Checkpoint 的数据需要存储在某个位置以便在发生故障时进行恢复。可以通过 CheckpointConfig 来设置存储位置。例如,如果把 Checkpoint 存储在 HDFS 上,可以这样设置:
CheckpointConfig checkpointConfig = env.getCheckpointConfig();  
checkpointConfig.setCheckpointStorage("hdfs:///ip:port/dir");
  1. 其他参数配置
    除了上述两个关键参数外,Flink 还提供了一些其他参数来优化 Checkpointing 的性能:
  • checkpointTimeout:检查点超时时间(以毫秒为单位)。如果在这个时间内没有完成 Checkpoint,那么该 Checkpoint 将被丢弃并尝试进行下一次 Checkpoint。默认值是 60000 毫秒(即 1 分钟)。例如:checkpointConfig.setCheckpointTimeout(60000);
  • minPauseBetweenCheckpoints:两次 Checkpoint 之间的最小暂停时间(以毫秒为单位)。这个参数可以确保 Flink 不会在很短的时间内连续进行多次 Checkpoint,从而避免对系统性能造成过大的影响。例如:checkpointConfig.setMinPauseBetweenCheckpoints(500);
  • maxConcurrentCheckpoints:最大并发 Checkpoint 数。这个参数用于限制同时进行的 Checkpoint 数量,以避免对系统性能造成过大的影响。例如:checkpointConfig.setMaxConcurrentCheckpoints(1);

State Backends

State Backends是负责管理和存储Flink应用程序状态的组件。Flink提供了多种不同的State Backends,每种都有其特定的用途和优缺点。

1. MemoryStateBackend

  • 存储位置:状态数据保存在Java堆内存中。
  • 适用场景:本地调试或小规模状态数据的场景。
  • 限制:每个独立的状态默认限制大小为5MB,但可以通过构造函数增加容量;状态的大小不能超过Akka的framesize大小。
  • 配置:如果没有明确配置State Backend,Flink将默认使用MemoryStateBackend。
    全局配置:
state.backend: hashmap
state.checkpoint-storage: jobmanager

代码配置:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage());

2. FsStateBackend

  • 存储位置:状态数据保存在TaskManager的内存中,并通过Checkpoint机制将状态快照写入配置好的文件系统或目录中。
  • 适用场景:状态数据较大,需要持久化存储的场景。
  • 配置:通过配置文件系统路径(如HDFS、本地文件系统等)来设置FsStateBackend。
  • 特点:FsStateBackend通过配置一个fileStateThreshold阈值,当状态大小超过该阈值时,将状态存储在文件系统中,否则仍然保存在内存中。
    全局配置:
state.backend: hashmap
state.checkpoints.dir: file:///checkpoint-dir/
state.checkpoint-storage: filesystem

代码配置:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir");
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir"));

3. RocksDBStateBackend

  • 存储位置:使用RocksDB作为状态存储后端,数据保存在磁盘上。
  • 适用场景:状态数据非常大、需要高可用性和持久性保证的场景。
  • 特点:RocksDB支持增量快照,这对于具有大量变化缓慢状态的应用程序非常有用。状态快照会持久化到分布式文件系统(如HDFS)中。
    全局配置:
state.backend: rocksdb
state.checkpoints.dir: file:///checkpoint-dir/
state.checkpoint-storage: filesystem

代码配置:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir");
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir"));

4. 配置和使用

  • 配置方式
    • 可以通过 StreamExecutionEnvironment.setStateBackend(…) 方法来配置 State Backend。
    • 也可以在 Flink 的配置文件(如 flink-conf.yaml)中设置默认的 State Backend。
  • 选择建议
    -对于开发调试或状态量较小的情况,可以使用 MemoryStateBackend。
    • 对于生产环境或状态量较大的情况,推荐使用 FsStateBackend 或 RocksDBStateBackend(或 EmbeddedRocksDBStateBackend)。
    • 如果对性能有特别高的要求,且状态量非常大,可以考虑使用 RocksDBStateBackend 或 EmbeddedRocksDBStateBackend。

开箱即用的 state backends

最新版本Flink 内置了以下这些开箱即用的 state backends :

  • HashMapStateBackend
  • EmbeddedRocksDBStateBackend
    如果不设置,默认使用 HashMapStateBackend。

HashMapStateBackend

在 HashMapStateBackend 内部,数据以 Java 对象的形式存储在堆中。 Key/value 形式的状态和窗口算子会持有一个 hash table,其中存储着状态值、触发器。
适用场景:

  • 有较大 state,较长 window 和较大 key/value 状态的 Job。
  • 所有的高可用场景。

EmbeddedRocksDBStateBackend

EmbeddedRocksDBStateBackend 将正在运行中的状态数据保存在 RocksDB 数据库中,RocksDB 数据库默认将数据存储在 TaskManager 的数据目录。 不同于 HashMapStateBackend 中的 java 对象,数据被以序列化字节数组的方式存储,这种方式由序列化器决定,因此 key 之间的比较是以字节序的形式进行而不是使用 Java 的 hashCode 或 equals() 方法。
EmbeddedRocksDBStateBackend 会使用异步的方式生成 snapshots。

EmbeddedRocksDBStateBackend 的局限:

  • 由于 RocksDB 的 JNI API 构建在 byte[] 数据结构之上, 所以每个 key 和 value 最大支持 2^31 字节。 RocksDB 合并操作的状态(例如:ListState)累积数据量大小可以超过 2^31 字节,但是会在下一次获取数据时失败。这是当前 RocksDB JNI 的限制。

EmbeddedRocksDBStateBackend 的适用场景:

  • 状态非常大、窗口非常长、key/value 状态非常大的 Job。
  • 所有高可用的场景。

设置 State Backend

如果没有明确指定,将使用 jobmanager 做为默认的 state backend。能在 flink-conf.yaml 中为所有 Job 设置其他默认的 State Backend。 每一个 Job 的 state backend 配置会覆盖默认的 state backend 配置:
设置每个 Job 的 State Backend
对每个 Job 的 State Backend 进行设置,如下所示:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend());

若想在 IDE 中使用 EmbeddedRocksDBStateBackend

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb</artifactId><version>1.18.1</version><scope>provided</scope>
</dependency>

注意: 由于 RocksDB 是 Flink 默认分发包的一部分,所以如果你没在代码中使用 RocksDB,则不需要添加此依赖。而且可以在 flink-conf.yaml 文件中通过 state.backend.type 配置 State Backend,以及更多的 checkpointing 和 RocksDB 特定的 参数。

设置默认的(全局的) State Backend
在 flink-conf.yaml 可以通过键 state.backend.type 设置默认的 State Backend。

# 用于存储 operator state 快照的 State Backend
state.backend: hashmap
# 存储快照的目录
state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/848170.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python知识点11---高阶函数

提前说一点&#xff1a;如果你是专注于Python开发&#xff0c;那么本系列知识点只是带你入个门再详细的开发点就要去看其他资料了&#xff0c;而如果你和作者一样只是操作其他技术的Python API那就足够了。 本篇介绍一下Python的内置函数也叫高阶函数&#xff0c;就是Python自…

【Linux多线程】LWP和pthread_t

文章目录 LWPclone系统调用查看线程LWP理解LWP与TID pthread_id LWP LWP是Linux中线程的具体实现形式&#xff0c;在linux中&#xff0c;进程和线程本质上都是相同的&#xff0c;都是通过task_struct结构体来表示的。LWP是内核级线程&#xff0c;TID是其唯一标识符&#xff0c…

什么是PaaS平台?

随着信息化发展&#xff0c;数字技术与经济社会各个领域的融合逐渐深入&#xff0c;行业需求不断升级&#xff0c;逐渐呈现多样化、复杂性的态势。传统软件开发模式&#xff0c;耗时耗力&#xff0c;已经难以应对企业新形势下的业务需求。面对挑战&#xff0c;PaaS平台以其天然…

spark MLlib 中的分类模型

理解这些机器学习模型的数学原理需要一定的数学基础&#xff0c;下面我将简要介绍每个模型的数学原理&#xff0c;并附上相关的数学公式。 1. LinearSVC&#xff08;线性支持向量机&#xff09; 数学原理&#xff1a; 线性支持向量机的目标是找到一个超平面&#xff0c;最大化…

APP ID 和 APP Key

什么是 APP ID 和 APP Key&#xff1f; APP ID 和 APP Key 是用于识别和授权应用程序的凭证。它们常用于各种API服务和第三方集成中&#xff0c;以确保只有经过认证的应用程序可以访问受保护的资源。 APP ID APP ID 是一个唯一的标识符&#xff0c;用于标识特定的应用程序。…

工厂车间运用生产管理看板系统的多重优势

在当今竞争激烈的制造业领域&#xff0c;工厂车间不断寻求创新和优化的方法来提高生产效率、质量和管理水平。生产管理看板系统的运用成为了许多工厂的明智选择&#xff0c;它带来了多重显著优势。 一、生产管理看板系统极大地提升了生产过程的可视化程度。 通过生产管理看板系…

nginx代理vue项目路由跳转刷新

常规代理 在我们日常开发中&#xff0c;前端部署到服务器&#xff0c;需要用到nginx部署&#xff0c;简单代理如下&#xff1a; #user nobody; worker_processes 1;#error_log logs/error.log; #error_log logs/error.log notice; #error_log logs/error.log info;#pid…

SSTI注入漏洞

SSTI注入漏洞 1.SSTI注入概述2.SSTI检测工具3.SSTI利用方法Java基本FreeMarker (Java)ThymeleafSpring Framework (Java)Spring视图操作&#xff08;Java&#xff09;Smarty (PHP)Twig (PHP)Jade (NodeJS)NUNJUCKS (NodeJS)ERB (Ruby)Jinja2 (Python)Mako (Python)ASP 1.SSTI注…

怎么把mov格式的视频转换mp4?四种方法教会你mov转MP4!

怎么把mov格式的视频转换mp4&#xff1f;在这个数字化时代&#xff0c;视频已经跻身为生活的核心元素&#xff0c;然而&#xff0c;制作和分享视频时选择合适的格式变得至关重要&#xff0c;在庞大的视频格式库中&#xff0c;我们熟知的包括mov和MP4&#xff0c;它们各有特色&a…

(第26天)【leetcode题解】226、翻转二叉树 589、N叉树的前序遍历 590、N叉树的后序遍历

目录 226、翻转二叉树题目描述思路代码 589、N叉树的前序遍历题目描述思路代码 590、N叉树的后序遍历题目描述思路代码 思考总结 226、翻转二叉树 题目描述 给你一棵二叉树的根节点 root &#xff0c;翻转这棵二叉树&#xff0c;并返回其根节点。 示例&#xff1a; 输入&…

金融科技引领跨境支付新潮流:智慧、速度与安全的完美融合

一、引言 在全球经济日益紧密相连的今天,跨境支付作为连接各国贸易和金融活动的桥梁,正迎来金融科技带来的深刻变革。金融科技以其独特的智慧化、高效化和安全化特性,正逐步渗透到跨境支付的各个环节,为跨境支付领域带来前所未有的创新和发展。本文将探讨金融科技如何引领跨…

mysql高级刷题-01-求中位数

题目&#xff1a; 解题代码 select sum(num) / count(num) as median from (select num,row_number() over (order by num desc,id desc ) as desc_math,row_number() over (order by num ,id ) as asc_mathfrom number) as t1 where asc_math in (desc_math, desc…

java最新JDK参数设置中文版

官网地址&#xff1a;https://www.oracle.com/java/technologies/javase/vmoptions-jsp.html ​ java最新JDK参数设置 行为选项Garbage First&#xff08;G1&#xff09;垃圾收集选项性能选项调试选项 行为选项 选项默认值描述-XX:-AllowUserSignalHandlers未设置如果应用程序…

WPF中Ignorable

在WPF中&#xff0c;“Ignorable”这个概念主要与XAML解析和标记扩展有关。当WPF的XAML解析器遇到一个它不认识的元素或属性时&#xff0c;它会抛出一个异常&#xff0c;这是因为默认情况下&#xff0c;WPF要求所有XAML都是完全可识别和可解析的。然而&#xff0c;在某些情况下…

前端实现输入内容计算密码强度

提示:记录工作中遇到的需求及解决办法 文章目录 前言一、思路二、计算密码强度分数密码强度动画展示效果完善动画效果完整代码前言 平时我们在浏览各种网站和 APP 的时候,都接触过密码这个东西~ 密码设置的好不好,关乎到你的账号安全性,越复杂的密码越安全,所以密码强度…

微信公众号文章背景颜色改成白色

微信公众号文章背景颜色黑色&#xff0c;看不清字。 按F12 , 找到 rich_media_area_primary &#xff0c;把 background 改成 white .rich_media_area_primary {background: white; }

2024年社会发展与管理创新科学国际学术会议(ICSDMIS 2024)

2024年社会发展与管理创新科学国际学术会议&#xff08;ICSDMIS 2024&#xff09; 2024 International Conference on Social Development and Management Innovation Science&#xff08;ICSDMIS 2024&#xff09; 会议简介&#xff1a; 2024年社会发展与管理创新科学国际学术…

C语言中 printf函数格式化输出

一. 简介 本文来简单学习一下&#xff0c;C语言中printf函数格式化输出时&#xff0c;因为我们的粗心没有 将数据类型与格式化参数对应&#xff0c;而导致的一些问题。 二. C语言中printf函数的格式化输出 在C语言中&#xff0c;printf函数是用于格式化输出的函数&#xff0…

Redis 异常三连环

本文针对一种特殊情况下的Reids连环异常&#xff0c;分别是下面三种异常&#xff1a; NullPointerException: Cannot read the array length because “arg” is nullJedisDataException: ERR Protocol error: invalid bulk lengthJedisConnectionException: Unexpected end o…

NAT端口映射,实现外网访问内网服务器

目录 前言一、搭建网络拓扑1.1 配置server和pc1.1.1 配置server01.1.2 配置server11.1.3 配置pc0 1.2 配置客户路由器1.2.1 配置路由器IP1.2.2 配置静态路由 1.3 配置ISP路由器 二、配置端口映射2.1 在客户路由器配置端口映射2.2 测试公网计算机访问私网服务器2.2.1 PC0向serve…