【Flink入门修炼】2-3 Flink Checkpoint 原理机制

如果让你来做一个有状态流式应用的故障恢复,你会如何来做呢?
单机和多机会遇到什么不同的问题?
Flink Checkpoint 是做什么用的?原理是什么?

一、什么是 Checkpoint?

Checkpoint 是对当前运行状态的完整记录。程序重启后能从 Checkpoint 中恢复出输入数据读取到哪了,各个算子原来的状态是什么,并继续运行程序。
即用于 Flink 的故障恢复。
这种机制保证了实时程序运行时,即使突然遇到异常也能够进行自我恢复。

二、如何实现 Checkpoint 功能?

如果让你来设计,对于流式应用如何做到故障恢复?
我们从最简单的单机单线程看起。

一)单机情况

同步执行,每次只处理一条数据

image.png

很简单,这种情况下,整个流程一次只处理一条数据。

  • 数据到 Write 阶段结束,各个算子记录一次各自状态信息(如读取的 offset、中间算子的状态)
  • 遇到故障需要恢复的时候,从上一次保存的状态开始执行
  • 当然为了降低记录带来的开销,可以攒一批之后再记录。
同时处理多条数据

每个计算节点还是只处理一条数据,但该节点空闲就可以处理下一条数据。
image.png

如果还按照一个数据 Write 阶段结束开始保存状态,就会出现问题:

  • 前面节点的状态,在处理下一个数据时被改过了
  • 从此时保存的记录恢复,前面的节点会出现重复处理的问题
  • 此时被称为 - 确保数据不丢(At Least Once)

一种解决方式:

  • 在输入数据中,定期插入一个 barrier
  • 各算子遇到 barrier 就开始做状态保留,并且不再接收新数据的计算。
  • 当前算子状态保留后,将 barrier 传递给下一个算子,并重复上面的步骤。
  • 当 barrier 传递到最后一个算子,并完成状态保留后,本次状态保留完成。

这样,各个节点保存的都是相同数据节点时的状态。
故障恢复时,能做到不重复处理数据,也就是精确一次(Exactly-once)。
image.png

但这里,你可能会发现一个问题:

  • 数据已经写出了怎么办?在两个保存点之间,已经把结果写到外部了,重启后不是又把部分数据再写了一次?

这里实际是**「程序内部精确一次」「端到端精确一次」**。
那么如何做到「端到端精确一次」?

  • 方案一:最后一个 sink 算子不直接向外部写出,等到 barrier 来了,才把这一批数据批量写出去
  • 方案二:两阶段提交。需要 sink 端支持(如 kafka)。
    • 方式类似于 MySQL 的事务。
    • sink 端正常向外部写出,不过输出端处于 pre-commit 状态,这些数据还不可读取
    • 当 sink 端等到 barrier 时,将输出端数据变为 committed,下游输出端的数据才正式可读

不过以上方法为了做到端到端精确一次,会带来数据延迟问题。(因为要等 Checkpoint 做完,数据才实际可读)。

解决数据延迟有一种方案:

  • 方案:幂等写入。同样一条数据,无论写入多少次对输出端看来都是一样的。(比如按照主键重复写这一条数据,并且数据本身没变化)

二)重要概念介绍

一致性级别

前面的例子中,我们提到了部分一致性级别,这里我们总结下。在流处理中,一致性可以分为 3 个级别:

  • at-most-once(最多一次): 这其实是没有正确性保障的委婉说法——故障发生之后,计数结果可能丢失。
  • at-least-once (至少一次): 这表示计数结果可能大于正确值,但绝不会小于正确值。也就是说,计数程序在发生故障后可能多算,但是绝不会少算。
  • exactly-once (精确一次): 这指的是系统保证在发生故障后得到的计数结果与正确值一致。恰好处理一次是最严格的保证,也是最难实现的。

按区间分:

  • 程序(Flink)内部精确一次
  • 端到端精确一次
Checkpoint 中保留的是什么信息?

🤔 如果是你来设计,checkpoint 都需要保留哪些信息,才能让程序恢复执行?
【这里说的就是 state
考虑一个开发需求:单词计数。
从 kafka 中读数据,处理逻辑是将输入数据拆分成单词,有一个 map 记录各个单词的数量,最后输出。

  • 从输入流中,拆分单词
  • 将统计的结果放到内存中一个 Map 集合,单词做为 key,对应的数量做为 value

想要恢复的时候还能接着上次的状态来,要么就需要几个信息:

  • 处理到哪条数据了
  • 中间状态是啥
  • 数据写出到哪条了

以及,上述信息应是针对同一条数据的。否则状态就乱了。
那么可以得到,保留的信息是:

source中间算子sink
已输入的数据(offset)[<hello, 5>, <world, 10>, …]写出到第几条了

三)多机多进程

随着业务的发展,单机已经不能满足需求了,开始并行分布式的处理。
读取、处理、写出,也不再是一个进程从头到尾干完,会拆分到多个机器上执行。也不再等待一条数据处理完,才处理下一条。
image.png

多机多线程,问题就开始变得复杂起来:

  • 如何确保状态拥有精确一次的容错保证?
  • 如何在分布式场景下,替多个拥有本地状态的算子产生一个全域一致的快照?
  • 对于流合并,合并节点会受到多个 barrier 如何处理?
  • 如何在不中断运算的前提下产生快照?

🤔 先思考下,如果还用单线程中 barrier 的方式来处理。会遇到什么问题,该如何解决?

处理流程

我们还是在数据流中插入 barrier。

  • 到达第一个 source 节点和之前的没区别,source 节点开始保存状态(offset)

image.png

  • 接下来,source 将 barrier 拆分为两个,分别发往下游的算子

image.png

  • 下游算子收到 barrier,开始记录状态

image.png

  • 关键是最后的 operator#2,它会收到多个 barrier
    • barrier 的初始目的是,收到 barrier 表示前面的数据都处理完了,要开始保存状态了
    • 两个绿色的节点(operator#1)分别发送 barrier,代表两个 barrier 之前处理过的数据,实际都是第一个蓝色节点(source)barrier 之前的数据。
    • 那么最后的橙色节点(operator#2),理应收到所有由绿色节点(operator#1)发送的 barrier,才代表数据已经收全了,可以开始保存状态。【叫做 barrier 对齐】

image.png

对于多分支合并的情况,在等待所有 barrier 到齐的过程中:

  • 先收到 barrier 的分支,还会有数据不断流入
  • 为了能做到精确一次(Exactly-once),就不能处理这些数据,需要先缓存起来,否则这个节点的状态就不对了
  • 上面一条反过来说,如果不等,直接处理,那么就是**至少一次(At Least Once)**的效果。(想想在故障恢复的时候,是不是就会重复计算了)

如何在不中断运算的前提下产生快照?
前面做快照,我们假设的是节点收到 barrier 后,就不再接收新数据,把当前节点状态保存后,再接收新数据,然后把 barrier 再向后传递。
那,是否必须这样串行来呢?

  • 卡住新数据,保存当前状态,这里必须串行,不串行状态就乱了
  • 但是,向后发送 barrier 可以同时做,不影响当前节点的保存

那,后面节点保存完了,前面节点还没保存完怎么办?

  • 没关系,一次 checkpoint 成功,需要等待所有节点都成功才行,保存的先后顺序无所谓

三、Flink Checkpoint 配置

程序中如何开启 checkpoint
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 开启 checkpoint,并设置间隔 ms
env.enableCheckpointing(1000);
// 模式 Exactly-Once、At-Least-Once
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 两个 checkpoint 之间最小间隔
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// 超时时间
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 同时执行的 checkpoint 数量(比如上一个还没执行完,下一个已经触发开始了)
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 当用户取消了作业后,是否保留远程存储上的Checkpoint数据
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
checkpoint 存储

Flink 开箱即用地提供了两种 Checkpoint 存储类型:

  • JobManagerCheckpointStorage
    • 将 Checkpoint 快照存储在 JobManager 的堆内存中
  • FileSystemCheckpointStorage
    • 放到 HDFS 或本地磁盘中

四、小结

本节介绍了 Flink Checkpoint 故障恢复机制。从单机单线程,到多机多线程一步步分析如何实现状态保存和故障恢复。
同时对一致性级别进行了探讨,对程序内部和端到端一致性的实现方式给出了可行的方案。
后续会对 Checkpoint 程序内部实现原理进行剖析。


参考文章:
Flink Checkpoint 深入理解-CSDN博客
漫谈 Flink - Why Checkpoint - Ying
Flink之Checkpoint机制-阿里云开发者社区 (图不错)
Flink 状态一致性、端到端的精确一次(ecactly-once)保证 - 掘金
硬核!八张图搞懂 Flink 端到端精准一次处理语义 Exactly-once(深入原理,建议收藏)-腾讯云开发者社区-腾讯云

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/3458.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

elementui el-date-picker禁止选择今年、今天、之前、时间范围限制18个月

1、禁止选择今年之前的所有年份 <el-date-pickerv-if"tabsActive 0":clearable"false"v-model"yearValue"change"yearTimeChange"type"year"placeholder"选择年"value-format"yyyy":picker-options…

03 OLED显示屏实现

文章目录 前言一、软件模拟IIC协议1.开启IIC协议2.结束IIC协议3.传输数据 二、OLED的操作1.传输数据的准备2.写入命令3.写入数据4.初始化函数5.设置光标6.显示字符7.显示字符串8.清屏9.显示汉字10.显示图片11.显示动图 三、完整代码总结 前言 这一章主要是上一节没有讲完的项目…

前端项目中使用插件prettier/jscodeshift/json-stringify-pretty-compact格式化代码或json数据

同学们可以私信我加入学习群&#xff01; 正文开始 前言一、json代码格式化-选型二、json-stringify-pretty-compact简单试用三、prettier在前端使用四、查看prettier支持的语言和插件五、使用prettier格式化vue代码最终效果如图&#xff1a; ![在这里插入图片描述](https://im…

LLM应用实战:当KBQA集成LLM(二)

1. 背景 又两周过去了&#xff0c;本qiang~依然奋斗在上周提到的项目KBQA集成LLM&#xff0c;感兴趣的可通过传送门查阅先前的文章《LLM应用实战&#xff1a;当KBQA集成LLM》。 本次又有什么更新呢&#xff1f;主要是针对上次提到的缺点进行优化改进。主要包含如下方面&#…

多客圈子交友系统 uniapp+thinkphp6适配小程序/H5/app/api全开源,多款插件自选,支持个性定制!

网上交友的优点包括&#xff1a; 1. 方便&#xff1a;网上交友可以随时随地进行&#xff0c;不受时间和空间的限制&#xff0c;方便且高效。 2. 匿名性&#xff1a;网上交友可以实现匿名性&#xff0c;用户可以匿名地搜索、聊天或交换信息&#xff0c;保护个人隐私和安全。 3.…

COOIS 生产订单显示系统增强

需求说明&#xff1a;订单系统显示页面新增批量打印功能 增强点&#xff1a;CL_COIS_DISP_LIST_NAVIGATION -->TOOLBAR方法中新增隐式增强添加自定义打印按钮 增强点&#xff1a;BADI-->WORKORDER_INFOSYSTEM新增增强实施 实现位置&#xff1a;IF_EX_WORKORDER_INFOSYS…

制造型企业 如何实现便捷的机台文件统一管理?

机台文件统一管理&#xff0c;这是生产制造型企业都需要去做的&#xff0c;机台文件需要统一管理的原因主要包括以下几点&#xff1a; 1、提高效率&#xff1a;统一管理可以简化文件的访问和使用过程&#xff0c;提高工作效率&#xff0c;尤其是在需要频繁访问或更新机台文件的…

MySQL中什么情况下会出现索引失效?如何排查索引失效?

目录 1-引言&#xff1a;什么是MySQL的索引失效&#xff1f;(What、Why)1-1 索引失效定义1-2 为什么排查索引失效 2- 索引失效的原因及排查&#xff08;How&#xff09;2-1 索引失效的情况① 索引列参与计算② 对索引列进行函数操作③ 查询中使用了 OR 两边有范围查询 > 或 …

USB设备的音频类UAC

一、UAC简介 UAC&#xff08;USB Audio Class&#xff09;是USB设备的音频类&#xff0c;它定义了USB音频设备与主机计算机通信的方式。UAC标准是USB规范的一部分&#xff0c;并受到各种操作系统&#xff08;包括Windows、macOS和Linux&#xff09;的支持。 UAC是基于libusb,实…

图像在神经网络中的预处理与后处理的原理和作用(最详细版本)

1. 问题引出及内容介绍 相信大家在学习与图像任务相关的神经网络时&#xff0c;经常会见到这样一个预处理方式。 self.to_tensor_norm transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]) 具体原理及作用稍后解释&…

java8 Stream流常用方法(持续更新中...)

java8 Stream流常用方法 1.过滤数据中年龄大于等于十八的学生2.获取对象中其中的一个字段并添加到集合(以学生姓名&#xff08;name&#xff09;为例)3.获取对象中其中的一个字段并转为其他数据类型最后添加到集合(以学生性别&#xff08;sex&#xff09;为例&#xff0c;将Str…

Apache Doris 2.x 版本【保姆级】安装+使用教程

Doris简介 Apache Doris 是一个基于 MPP 架构的高性能、实时的分析型数据库&#xff0c;以极速易用的特点被人们所熟知&#xff0c;仅需亚秒级响应时间即可返回海量数据下的查询结果&#xff0c;不仅可以支持高并发的点查询场景&#xff0c;也能支持高吞吐的复杂分析场景。基于…

【论文速读】|大语言模型(LLM)智能体可以自主利用1-day漏洞

本次分享论文&#xff1a; LLM Agents can Autonomously Exploit One-day Vulnerabilities 基本信息 原文作者&#xff1a;Richard Fang, Rohan Bindu, Akul Gupta, Daniel Kang 作者单位&#xff1a;无详细信息提供 关键词&#xff1a;大语言模型, 网络安全, 1-day漏洞, …

Redisson分布式锁 --- 源码分析

1.获取一把锁 RLock lock redissonClient.getLock("订单lock"); 2.业务代码加锁 lock.lock(); 2.1 lock.tryAcquire Long ttl tryAcquire(leaseTime, unit, threadId); 2.2 lua脚本: tryLockInnerAsync方法 如果获取锁失败&#xff0c;返回的结果是这个key的剩…

MMSeg搭建模型的坑

Input type(torch.suda.FloatTensor) and weight type (torch.FloatTensor) should be same 自己搭建模型的时候&#xff0c;经常会遇到二者不匹配&#xff0c;以这种情况为例&#xff0c;是因为部分模型没有加载到CUDA上面造成的。 注意搭建模型的时候&#xff0c;所有层都应…

【氮化镓】液态Ga在GaN(0001)和(0001̅)表面上的三维有序排列随温度的变化

文章标题是《Temperature dependence of liquid-gallium ordering on the surface of epitaxially grown GaN》&#xff0c;作者是Takuo Sasaki等人&#xff0c;发表在《Applied Physics Express》上。文章主要研究了在分子束外延(MBE)条件下&#xff0c;液态镓(Ga)在GaN(0001)…

WCH RISC CH32V303RCT6 单片机的SDI Printf 虚拟串口功能 类似RTT打印功能 简单分析

参考&#xff1a; 有关于 SDI printf 更多的信息和资料吗&#xff1f; 关于 CH32 系列 MCU SDI 虚拟串口功能的使用 【CH32X035 评估板测评】 教你使用 SDI 接口重定向 printf 0.前言 有段时间没有看CH32V单片机的开发了&#xff0c;今天帮新来的同事调试时候看到debug.c里面有…

java-spring 06 图灵 getBean方法和 doGetBean方法

01.一般的流程是&#xff0c;这里是从上一章的preInstantiateSingleton方法顺序过来的。 getBean() -> doGetBean() -> createBean() -> doCreateBean() -> createBeanInstance() -> populateBean() -> initializeBean() 02.getBean方法&#xff0c;一般就…

项目十:学会python爬虫数据保存(小白圆满级)

前言 上篇我们学会的文本文件、csv文件和excel文件的相关基础知识和操作&#xff0c;这一次我们再来了解一下四个文件操作方式 存储方法 HTML文件 将数据保存为HTML格式&#xff0c;可以直接在浏览器中查看。 使用字符串拼接将数据保存为HTML格式。 代码案例 # 创建数据…

Cookie、Session以及Token的区别

Cookei、Session以及Token总的来说都是为了实现客户端访问服务器数据而利用的一种手段&#xff0c;可以把服务器数据看成是密码箱&#xff0c;而它们是三种不同的钥匙。 一、定义 1.Cookie 客户端第一次访问服务器时&#xff0c;服务器返回cookie给客户端A&#xff0c;客户端…