22、Flink 背压下的 Checkpoint处理

1.概述

通常,对齐 Checkpoint 的时长主要受 Checkpointing 过程中的同步和异步两个部分的影响;但当 Flink 作业正运行在严重的背压下时,Checkpoint 端到端延迟的主要影响因子将会是传递 Checkpoint Barrier 到 所有的算子/子任务的时间;可以通过高 alignment time and start delay metrics 观察到,解决方案如下:

  • 解决背压问题。优化 Flink 作业,调整 Flink 或 JVM 参数,通过扩容。
  • 减少 Flink 作业中缓冲在 In-flight 数据的数据量。
  • 启用非对齐 Checkpoints。

上述选项并不是互斥的,可以组合使用。

2.缓冲区 Debloating

Flink 1.14 引入了缓冲区 Debloating 机制,用于自动控制在 Flink 算子/子任务之间缓冲的 In-flight 数据的数据量;可以通过将属性taskmanager.network.memory.buffer-debloat.enabled设置为true来启用。

此特性对对齐和非对齐 Checkpoint 都生效,并且在这两种情况下都能缩短 Checkpointing 的时间,不过 Debloating 的效果对于 对齐 Checkpoint 更明显;当在非对齐 Checkpoint 情况下使用缓冲区 Debloating 时,好处是 Checkpoint 大小会更小,并且恢复时间更快 (需要保存 和恢复的 In-flight 数据更少)。

3.非对齐 Checkpoints
a)概述

从 Flink 1.11开始,Checkpoint 可以是非对齐的;Unaligned checkpoints 包含 In-flight 数据(例如,存储在缓冲区中的数据)作为 Checkpoint State的一部分,允许 Checkpoint Barrier 跨越这些缓冲区,使 Checkpoint 时长变得与当前吞吐量无关,因为 Checkpoint Barrier 实际上已经不再嵌入到数据流当中。

如果 Checkpointing 由于背压导致周期非常的长,应该使用非对齐 Checkpoint,这样 Checkpointing 时间基本上就与 端到端延迟无关;但是非对齐 Checkpointing 会增加状态存储的 I/O,因此当状态存储的 I/O 是 整个 Checkpointing 过程当中真正的瓶颈时,不应当使用非对齐 Checkpointing。

启用非对齐 Checkpoint

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 启用非对齐 Checkpoint
env.getCheckpointConfig().enableUnalignedCheckpoints();

或者在 flink-conf.yml 配置文件中增加配置

execution.checkpointing.unaligned: true
b)对齐 Checkpoint 超时

在启用非对齐 Checkpoint 后,依然可以通过编程的方式指定对齐 Checkpoint 的超时时间

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getCheckpointConfig().setAlignedCheckpointTimeout(Duration.ofSeconds(30));

或是在 flink-conf.yml 配置文件中配置:

execution.checkpointing.aligned-checkpoint-timeout: 30 s

在启动时,每个 Checkpoint 仍然是 aligned checkpoint,但是当全局 Checkpoint 持续时间超过 aligned-checkpoint-timeout 时, 如果 aligned checkpoint 还没完成,那么 Checkpoint 将会转换为 Unaligned Checkpoint。

c)限制

并发 Checkpoint

Flink 当前并不支持并发的非对齐 Checkpoint;Savepoint 也不能与非对齐 Checkpoint 同时发生,因此它们将会花费更长的时间。

与 Watermark 的相互影响

非对齐 Checkpoint 在恢复的过程中改变了关于 Watermark 的一个隐式保证;目前,Flink 确保 Watermark 作为恢复的第一步, 而不是将最近的 Watermark 存放在 Operator 中,以方便扩缩容。

在非对齐 Checkpoint 中,当恢复时,Flink 会在恢复 In-flight 数据后再生成 Watermark,如果 Pipeline 中使用了对每条记录都应用最新的 Watermark 的算子将会相对于使用对齐 Checkpoint 产生不同的结果;如果 Operator 依赖于最新的 Watermark 始终可用,解决办法是将 Watermark 存放在 OperatorState 中;此时 Watermark 应该使用单键 group 存放在 UnionState 以方便扩缩容。

与长时间运行的记录处理交互

尽管有未对齐的检查点,但 barrier 能够越过队列中的其它记录;如果当前记录需要大量时间进行处理,则此 barrier 的处理仍可能延迟,例如在窗口操作中同时触发多个定时器时。

当系统处理单个输入记录时被阻止,在等待多个网络缓冲区可用时,可能会出现 Flink 不能中断对单个输入记录的处理,未对齐的检查点必须等待当前处理的记录被完全处理;

原因要么是由于不适合单个网络缓冲区的大型记录的串行化,要么是在 flatMap 操作中,为一个输入记录生成许多输出记录;此时背压可以阻止未对齐的检查点,直到处理单个输入记录所需的所有网络缓冲区可用;当单个记录的处理需要一段时间时,也可能发生在其它情况下,导致检查点的时间可能高于预期。

某些数据分布模式不是检查点式的

有一部分包含属性的的连接无法与 Channel 中的数据一样保存在 Checkpoint 中;为了保留这些特性并且确保没有状态冲突或 非预期的行为,非对齐 Checkpoint 对于这些类型的连接是禁用的,所有其他的交换仍然执行非对齐 Checkpoint。

点对点连接

目前没有任何对于点对点连接中有关数据有序性的强保证;由于数据已经被前置的 Source 或是 KeyBy 相同的方式隐式组织,一些用户会依靠这种特性在提供的有序性保证的同时将计算敏感型的任务划分为更小的块。

只要并行度不变,非对齐 Checkpoint(UC) 将会保留这些特性,但是如果加上 UC 的伸缩容,这些特性将会被改变。

针对如下任务

image-20240509104253843

如果想将并行度从 p=2 扩容到 p=3,那么需要根据 KeyGroup 将 KeyBy 的 Channel 中的数据划分到3个 Channel 中去;通过使用 Operator 的 KeyGroup 范围和确定记录属于某个 Key(group) 的方法可以实现;但对于 Forward 的 Channel,没有 KeyContext,Forward Channel 里也没有任何记录被分配了任何 KeyGroup;也无法计算它,因为无法保证 Key 仍然存在。

广播 Connections

广播 Connection 无法保证所有 Channel 中的记录都以相同的速率被消费,可能导致某些 Task 已经应用了与特定广播事件对应的状态变更,而其他任务则没有。

image-20240509104545856

广播分区通常用于实现广播状态,它应该跨所有 Operator 都相同;Flink 实现广播状态,通过仅 Checkpointing 有状态算子的 SubTask 0 中状态的单份副本。

在恢复时,将该份副本发往所有的 Operator,可能导致某个算子将很快从它的 Checkpointed Channel 消费数据并将修改应用于记录来获得状态。

4.故障排除

in-flight 中的数据损坏。

注意:以下描述的操作是最后采取的手段,因为它们将会导致数据的丢失。

为了防止 In-flight 数据损坏,或者由于其他原因导致作业应该在没有 In-flight 数据的情况下恢复,可以使用 recover-without-channel-state.checkpoint-id 属性。

该属性需要指定一个 Checkpoint Id,对它来说 In-flight 中的数据将会被忽略;除非已经持久化的 In-flight 数据内部的损坏导致无法恢复的情况,否则不要设置该属性。

只有在重新部署作业后该属性才会生效,只有启用 externalized checkpoint 时,此操作才有意义。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/9342.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

线程与进程

进程 进程是程序的一次执行过程,系统程序的基本单位。有自己的main方法,并且主要由主方法运行起来的基本上就是进程。 线程 线程与进程相似,但线程是一个比进程更小的执行单位。一个进程在其执行的过程中可以产生多个线程。与进程不同的是…

局域网语音对讲系统_IP广播对讲系统停车场解决方案

局域网语音对讲系统_IP广播对讲系统停车场解决方案 需求分析: 随着国民经济和社会的发展, 选择坐车出行的民众越来越多。在保护交通安全的同时,也给停车场服务部门提出了更高的要求。人们对停车场系统提出了更高的要求与挑战, 需要…

部分设计模式概述

单例模式 工厂模式 适配器模式 模板方法模式 策略模式 责任链 观察者模式(又叫发布订阅模式)

容器化Jenkins远程发布java应用(方式一:pipline+ssh)

1.创建pipline工程 2.准备工程Jenkinsfile文件(java目录) 1.文件脚本内容 env.fileName "planetflix-app.jar" env.configName "planetflix_prod" env.remoteDirectory "/data/project/java" env.sourceFile "/…

Leetcode - 周赛396

目录 一,3136. 有效单词 二,3137. K 周期字符串需要的最少操作次数 三,3138. 同位字符串连接的最小长度 四,3139. 使数组中所有元素相等的最小开销 一,3136. 有效单词 本题就是一道阅读理解题: 字符串长…

NSSCTF | [SWPUCTF 2021 新生赛]easy_sql

打开题目,提示输入一些东西,很显眼的可以看到网站标题为“参数是wllm” 首先单引号判断闭合方式 ?wllm1 报错了,可以判断为单引号闭合。 然后判断字节数(注意‘--’后面的空格) ?wllm1 order by 3-- 接着输入4就…

vue多选功能

废话不多说&#xff0c;直接上代码&#xff01;&#xff01;&#xff01; <template><div class"duo-xuan-page"><liv-for"(item, index) in list":key"index"click"toggleSelection(item)":class"{ active: sel…

ue引擎游戏开发笔记(36)——为射击落点添加特效

1.需求分析&#xff1a; 在debug测试中能看到子弹落点后&#xff0c;需要给子弹添加击中特效&#xff0c;更真实也更具反馈感。 2.操作实现&#xff1a; 1.思路&#xff1a;很简单&#xff0c;类似开枪特效一样&#xff0c;只要在头文件声明特效变量&#xff0c;在fire函数中…

Leetcode—232. 用栈实现队列【简单】

2024每日刷题&#xff08;131&#xff09; Leetcode—232. 用栈实现队列 实现代码 class MyQueue { public:MyQueue() {}void push(int x) {st.push(x);}int pop() {if(show.empty()) {if(empty()) {return -1;} else {int ans show.top();show.pop();return ans;}} else {i…

Burp Suite 抓包,浏览器提示有软件正在阻止Firefox安全地连接到此网站

问题现象 有软件正在阻止Firefox安全地连接到此网站 解决办法 没有安装证书&#xff0c;在浏览器里面安装bp的证书就可以了 参考&#xff1a;教程合集 《H01-启动和激活Burp.docx》——第5步

金蝶BI应收分析报表:关于应收,这样分析

这是一张出自奥威-金蝶BI方案的BI应收分析报表&#xff0c;是一张综合运用了筛选、内存计算等智能分析功能以及数据可视化图表打造而成的BI数据可视化分析报表&#xff0c;可以让企业运用决策层快速知道应收账款有多少&#xff1f;账龄如何&#xff1f;周转情况如何&#xff1f…

ARTS Week 27

Algorithm 本周的算法题为 58. 最后一个单词的长度 给你一个字符串 s&#xff0c;由若干单词组成&#xff0c;单词前后用一些空格字符隔开。返回字符串中 最后一个 单词的长度。 单词 是指仅由字母组成、不包含任何空格字符的最大子字符串。 示例 1&#xff1a;输入&#xff1a…

JAVA基础面试题(第十一篇)上! JVM

Hello好久不见&#xff01;&#xff0c;最近我们讲更新JVM部分的面试题。 JVM 这块比较难理解&#xff0c;而且也是不擅长的点。所以今天我更新一下JVM希望小伙伴们能在面试中取得好成绩&#xff01; JVM 1. 什么是JVM内存结构&#xff1f; jvm将虚拟机分为5大区域&#xff0…

上市公司-库存周转率、供应链效率明细数据集(2000-2022年)

01、数据介绍 库存周转率是衡量企业库存管理效率的关键指标之一&#xff0c;它反映了企业库存的流转速度。而供应链效率则体现了企业在整个供应链管理中的表现&#xff0c;包括采购、生产、物流等环节的协同和优化。 提高库存周转率和供应链效率是上市公司优化企业运营和管理…

ICode国际青少年编程竞赛- Python-2级训练场-识别循环规律2

ICode国际青少年编程竞赛- Python-2级训练场-识别循环规律2 1、 for i in range(3):Dev.step(3)Dev.turnRight()Dev.step(4)Dev.turnLeft()2、 for i in range(3):Spaceship.step(3)Spaceship.turnRight()Spaceship.step(1)3、 Dev.turnLeft() Dev.step(Dev.x - Item[1].…

编译和链接(超详细)

✅博客主页:爆打维c-CSDN博客​​​​​​ &#x1f43e; &#x1f539;分享c语言知识及代码 一、编译和链接实例 假设我们有一个名为main.c的C语言源文件&#xff0c;它包含了一个简单的Hello World程序。我们可以使用gcc编译器对该源文件进行编译&#xff0c;生成一个可执行…

AI大模型探索之路-训练篇18:大语言模型预训练-微调技术之Prompt Tuning

系列篇章&#x1f4a5; AI大模型探索之路-训练篇1&#xff1a;大语言模型微调基础认知 AI大模型探索之路-训练篇2&#xff1a;大语言模型预训练基础认知 AI大模型探索之路-训练篇3&#xff1a;大语言模型全景解读 AI大模型探索之路-训练篇4&#xff1a;大语言模型训练数据集概…

亚马逊是如何铺设多个IP账号实现销量大卖的?

一、针对亚马逊平台机制&#xff0c;如何转变思路&#xff1f; 众所周知&#xff0c;一个亚马逊卖家只能够开一个账号&#xff0c;一家店铺&#xff0c;这是亚马逊平台明确规定的。平台如此严格限定&#xff0c;为的就是保护卖家&#xff0c;防止卖家重复铺货销售相同的产品&a…

【读论文】Gaussian Grouping: Segment and Edit Anything in 3D Scenes

Gaussian Grouping: Segment and Edit Anything in 3D Scenes 文章目录 Gaussian Grouping: Segment and Edit Anything in 3D Scenes1. What2. Why3. How3.1 Anything Mask Input and Consistency3.2 3D Gaussian Rendering and Grouping3.3 Downstream: Local Gaussian Editi…

10000 字详细讲解 Spring 中常用注解及其使用

如下图京东购物页面&#xff0c;当我们选择点击访问某一类商品时&#xff0c;就会向后端发起 HTTP 请求&#xff0c;当后端收到请求时&#xff0c;就会找到对应的代码逻辑对请求进行处理&#xff0c;那么&#xff0c;后端是如何来查找处理请求相对应的代码呢&#xff1f;答案就…