Flink实战(11)-Exactly-Once语义之两阶段提交

0 大纲

[Apache Flink]2017年12月发布的1.4.0版本开始,为流计算引入里程碑特性:TwoPhaseCommitSinkFunction。它提取了两阶段提交协议的通用逻辑,使得通过Flink来构建端到端的Exactly-Once程序成为可能。同时支持:

  • 数据源(source)
  • 和输出端(sink)

包括Apache Kafka 0.11及更高版本。它提供抽象层,用户只需实现少数方法就能实现端到端Exactly-Once语义。

新功能及Flink实现逻辑:

  • 描述Flink checkpoint机制如何保证Flink程序结果的Exactly-Once的
  • 显示Flink如何通过两阶段提交协议与数据源和数据输出端交互,以提供端到端的Exactly-Once保证
  • 通过一个简单的示例,了解如何使用TwoPhaseCommitSinkFunction实现Exactly-Once的文件输出

1 Flink应用中的Exactly-Once语义

Exactly-Once,指每个输入的事件只影响最终结果一次。即使机器或软件故障,既没有重复数据,也不会丢数据。

Flink很久就提供Exactly-Once,checkpoint机制是Flink有能力提供Exactly-Once语义的核心。

一次checkpoint是以下内容的一致性快照:

  • 应用程序的当前状态
  • 输入流的位置

Flink可配置一个固定时间点,定期产生checkpoint,将checkpoint的数据写入持久存储系统,如S3或HDFS。将checkpoint数据写入持久存储是异步,即Flink应用程序在checkpoint过程中可以继续处理数据。

如果发生机器或软件故障,重新启动后,Flink应用程序将从最新的checkpoint点恢复处理; Flink会恢复应用程序状态,将输入流回滚到上次checkpoint保存的位置,然后重新开始运行。这意味着Flink可以像从未发生过故障一样计算结果。

Flink 1.4.0前,Exactly-Once语义仅限Flink应用程序内部,没有扩展到Flink数据处理完后发送的大多数外部系统。Flink应用程序与各种数据输出端进行交互,开发人员自己维护组件上下文保证Exactly-Once语义。

为提供端到端的Exactly-Once语义 – 即除了Flink应用程序内部,Flink写入的外部系统也需要能满足Exactly-Once语义 – 这些外部系统必须提供提交或回滚的方法,然后通过Flink的checkpoint机制协调。

分布式系统中,协调提交和回滚的常用方法是2pc协议。讨论Flink的TwoPhaseCommitSinkFunction如何利用2pc提供端到端的Exactly-Once语义。

2 Flink应用程序端到端的Exactly-Once语义

Kafka经常与Flink使用。Kafka 0.11版本添加事务支持。这意味着现在通过Flink读写Kafaka,并提供端到端的Exactly-Once语义有了必要支持。

Flink对端到端的Exactly-Once语义的支持不仅局限Kafka,可将它与任何一个提供必要的协调机制的源/输出端一起使用。如Pravega,来自DELL/EMC的开源流媒体存储系统,通过Flink的TwoPhaseCommitSinkFunction也能支持端到端的Exactly-Once语义。

image-20231124142310942

示例程序有:

  • 从Kafka读取的数据源(Flink内置的KafkaConsumer)
  • 窗口聚合
  • 将数据写回Kafka的数据输出端(Flink内置的KafkaProducer)

要使数据输出端提供Exactly-Once保证,须将所有数据通过一个事务提交给Kafka。提交捆绑了两个checkpoint之间的所有要写数据。这确保在故障时,能回滚写入的数据。但分布式系统中,通常有多个并发运行的写入任务,所有组件须在提交或回滚时“一致”才能确保一致结果。Flink使用2PC及预提交阶段解决这问题。

pre-commit

checkpoint开始时,即2PC的“预提交”阶段。当checkpoint开始时,Flink的JobManager会将checkpoint barrier(将数据流中的记录分为进入当前checkpoint与进入下一个checkpoint)注入数据流。

brarrier在operator之间传递。对每个operator,它触发operator的状态快照写入state backend。

数据源保存了消费Kafka的偏移量(offset),之后将checkpoint barrier传递给下一operator。

这种方式仅适用于operator具有『内部』状态。

内部状态

指Flink state backend保存和管理的。如第二个operator中window聚合算出来的sum值。当一个进程有它的内部状态时,除了在checkpoint前需将数据变更写入state backend,无需在pre-commit阶段执行其他操作。

Flink负责在checkpoint成功时正确提交这些写入或故障时中止这些写入。

image-20231124150402626

3 Flink应用启动pre-commit阶段

当进程具有『外部』状态,需额外处理。外部状态通常以写入外部系统(如Kafka)的形式出现。此时,为提供Exactly-Once保证,外部系统须【支持事务】,才能和两阶段提交协议集成。

示例数据需写入Kafka,因此数据输出端(Data Sink)有外部状态。此时,在预提交阶段:

  • 除了将其状态写入state backend
  • 数据输出端还必须预先提交其外部事务

当checkpoint barrier在所有operator都传递了一遍,并且触发的checkpoint回调成功完成时,预提交阶段结束。所有触发的状态快照都被视为该checkpoint的一部分。checkpoint是整个应用程序状态的快照,包括预先提交的外部状态。若故障,可回滚到上次成功完成快照的时间点。

下一步是通知所有operator,checkpoint已经成功了。这是2PC的提交阶段,JobManager为应用程序中的每个operator发出checkpoint已完成的回调。

数据源和 widnow operator没有外部状态,因此在提交阶段,这些operator不必执行任何操作。但是,数据输出端(Data Sink)拥有外部状态,此时应该提交外部事务。

总结

  • 一旦所有operator完成预提交,就提交一个commit。
  • 如果至少有一个预提交失败,则所有其他提交都将中止,我们将回滚到上一个成功完成的checkpoint。
  • 在预提交成功之后,提交的commit需要保证最终成功 – operator和外部系统都需要保障这点。如果commit失败(例如,由于间歇性网络问题),整个Flink应用程序将失败,应用程序将根据用户的重启策略重新启动,还会尝试再提交。这个过程至关重要,因为如果commit最终没有成功,将会导致数据丢失。

因此,我们可以确定所有operator都同意checkpoint的最终结果:所有operator都同意数据已提交,或提交被中止并回滚。

4 在Flink中实现两阶段提交Operator

完整的实现两阶段提交协议可能有点复杂,这就是为什么Flink将它的通用逻辑提取到抽象类TwoPhaseCommitSinkFunction中的原因。

接下来基于输出到文件的简单示例,说明如何使用TwoPhaseCommitSinkFunction。用户只需要实现四个函数,就能为数据输出端实现Exactly-Once语义:

  • beginTransaction – 在事务开始前,我们在目标文件系统的临时目录中创建一个临时文件。随后,我们可以在处理数据时将数据写入此文件。
  • preCommit – 在预提交阶段,我们刷新文件到存储,关闭文件,不再重新写入。我们还将为属于下一个checkpoint的任何后续文件写入启动一个新的事务。
  • commit – 在提交阶段,我们将预提交阶段的文件原子地移动到真正的目标目录。需要注意的是,这会增加输出数据可见性的延迟。
  • abort – 在中止阶段,我们删除临时文件。

我们知道,如果发生任何故障,Flink会将应用程序的状态恢复到最新的一次checkpoint点。一种极端的情况是,预提交成功了,但在这次commit的通知到达operator之前发生了故障。在这种情况下,Flink会将operator的状态恢复到已经预提交,但尚未真正提交的状态。

我们需要在预提交阶段保存足够多的信息到checkpoint状态中,以便在重启后能正确的中止或提交事务。在这个例子中,这些信息是临时文件和目标目录的路径。

TwoPhaseCommitSinkFunction已经把这种情况考虑在内了,并且在从checkpoint点恢复状态时,会优先发出一个commit。我们需要以幂等方式实现提交,一般来说,这并不难。在这个示例中,我们可以识别出这样的情况:临时文件不在临时目录中,但已经移动到目标目录了。

在TwoPhaseCommitSinkFunction中,还有一些其他边界情况也会考虑在内,请参考Flink文档了解更多信息。

FAQ

flink sink在如果过来一个checkpoint barrier,会去存储state,这个动作会和普通的write并行吗?还是串行?

在Flink的checkpoint机制中,当一个Checkpoint Barrier过来时,sink会触发对状态的snapshot,这个snapshot动作默认是和普通的write操作并行进行的。

具体来说:

  • Flink的checkpoint机制是通过在datastream中注入Checkpoint Barrier来实现的。

  • 当source接收到Checkpoint Barrier时,会将其传递给下游的transformation和sink。

  • 当sink接收到Checkpoint Barrier时,会启动一个新的线程来执行state snapshot(状态保存)。

  • 这个状态snapshot线程会从状态后端Snapshot State,并存储检查点。

  • 而sink的主线程在接收到Checkpoint Barrier时,会继续处理正常的write。

  • 这样,状态snapshot和正常的write操作就是并行进行的。

但是也可以通过Sink的配置来设置snapshot和write的执行策略,主要有两种模式:

  1. 并行模式(默认):snapshot和write同时进行

  2. 串行模式:snapshot完成后再进行write

综上,Flink sink在默认的并行checkpoint模式下,状态snapshot和普通的write操作是并行执行的。可以通过配置来改变其行为。这样可以根据实际需要进行平衡。

总结

  • Flink的checkpoint机制是支持两阶段提交协议并提供端到端的Exactly-Once语义的基础。
  • 这个方案的优点是: Flink不像其他一些系统那样,通过网络传输存储数据 – 不需要像大多数批处理程序那样将计算的每个阶段写入磁盘。
  • Flink的TwoPhaseCommitSinkFunction提取了两阶段提交协议的通用逻辑,基于此将Flink和支持事务的外部系统结合,构建端到端的Exactly-Once成为可能。
  • 从Flink 1.4.0开始,Pravega和Kafka 0.11 producer都提供了Exactly-Once语义;Kafka在0.11版本首次引入了事务,为在Flink程序中使用Kafka producer提供Exactly-Once语义提供了可能性。
  • Kafaka 0.11 producer的事务是在TwoPhaseCommitSinkFunction基础上实现的,和at-least-once producer相比只增加了非常低的开销。

    本文由博客一文多发平台 OpenWrite 发布!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/168283.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Redis】前言--介绍redis的全局系统观

一.前言 学习是要形成自己的网状知识以及知识架构图,要不最终都还是碎片化的知识,不能达到提升的目的,只有掌握了全貌的知识才是全解,要不只是一知半解。这章会介绍redis的系统架构图,帮助认识redis的设计是什么样的&a…

解决几乎任何机器学习问题 -- 学习笔记(组织机器学习项目)

书籍名:Approaching (Almost) Any Machine Learning Problem-解决几乎任何机器学习问题 此专栏记录学习过程,内容包含对这本书的翻译和理解过程 我们首先来看看文件的结构。对于你正在做的任何项目,都要创建一个新文件夹。在本例中,我 将项目命名为 “p…

笔记:内网渗透流程之信息收集

信息收集 首先,收集目标内网的信息,包括子网结构、域名信息、IP地址范围、开放的端口和服务等。这包括通过主动扫描和渗透测试工具收集信息,以及利用公开的信息源进行信息搜集。 本机信息收集 查看系统配置信息 查看系统详细信息&#xf…

电子桌牌如何赋能数字化会务?以深圳程序员节为例。

10月24日,由深圳市人民政府指导,深圳市工业和信息化局、龙华区人民政府、国家工业信息安全发展研究中心、中国软件行业协会联合主办的2023深圳中国1024程序员节开幕式暨主论坛活动在深圳龙华区启幕。以“领航鹏城发展,码动程序世界”为主题&a…

模拟退火算法应用——求解函数的最小值

仅作自己学习使用 一、问题 需求: 计算函数 的极小值,其中个体x的维数n10,即x(x1,x2,…,x10),其中每一个分量xi均需在[-20,20]内。因此可以知道,这个函数只有一个极小值点x (0,0,…,0),且其极小值是0&…

医保线上购药系统:引领医疗新潮流

在科技的驱动下,医疗健康服务正经历一场数字化的革新。医保线上购药系统,不仅是一种医疗服务的新选择,更是技术代码为我们的健康管理带来的全新可能。本文将通过一些简单的技术代码示例,深入解析医保线上购药系统的工作原理和优势…

MySQL数据库主从集群搭建

快捷查看指令 ctrlf 进行搜索会直接定位到需要的知识点和命令讲解(如有不正确的地方欢迎各位小伙伴在评论区提意见,博主会及时修改) MySQL数据库主从集群搭建 主从复制,是用来建立一个和主数据库完全一样的数据库环境&#xff0c…

短视频获客系统成功分享,与其开发流程与涉及到的技术

先来看实操成果,↑↑需要的同学可看我名字↖↖↖↖↖,或评论888无偿分享 一、短视频获客系统的开发流程 1. 需求分析:首先需要对目标用户进行深入了解,明确系统的功能和目标,制定详细的需求文档。 2. 系统设计&#…

关于vs code Debug调试时候出现“找不到任务C/C++: g++.exe build active file” 解决方法

vs code Debug调试时候出现“找不到任务C/C: g.exe build active file” ,出现报错,Debug失败 后来经过摸索和上网查找资料解决问题 方法如下 在Vs code的操作页面左侧有几个配置文件 红框里的是需要将要修改的文件 查看tasks.json和launch.json框选&…

Android Frameworks 开发总结之七

1.修改android 系统/system/下面文件时权限不够问题 下面提到的方式目前在Bobcat的userdebug image上测试可行,还没有在user上测试过. 修改前: leifleif:~$ adb root restarting adbd as root leifleif:~$ adb disable-verity verity is already disabled using …

Find My鼠标|苹果Find My技术与鼠标结合,智能防丢,全球定位

随着折叠屏、多屏幕、OLED 等新兴技术在个人计算机上的应用,产品更新换代大大加速,进一步推动了个人计算机需求的增长。根据 IDC 统计,2021 年全球 PC 市场出货量达到 3.49 亿台,同比增长 14.80%,随着个人计算机市场发…

亚马逊云科技re:Invent大会:云计算与生成式AI共筑科技新局面,携手构建未来

随着科技的飞速发展,云计算和生成式 AI 已经成为了推动科技进步的重要力量。这两者相互结合,正在为我们创造一个全新的科技局面。 亚马逊云科技的re:Invent大会再次证明了云计算和生成式AI的强大结合正在塑造科技的新未来。这次大会聚焦了云计算的前沿技…

为什么要隐藏id地址?使用IP代理技术可以实现吗?

随着网络技术的不断发展,越来越多的人开始意识到保护个人隐私的重要性。其中,隐藏自己的IP地址已经成为了一种常见的保护措施。那么,为什么要隐藏IP地址?使用IP代理技术可以实现吗?下面就一起来探讨这些问题。 首先&am…

Qt 软件调试(二)使用dump捕获崩溃信息

Qt应用程序异常崩溃该怎么办&#xff0c;生成dump文件再回溯分析&#xff0c;可以快速且准确的帮助我们定位到崩溃的点。那么&#xff0c;本章我们分享下如何在Qt中生成dump文件。 一、使用minudump捕获崩溃信息 #include <QCoreApplication> #include <QDir> #i…

【洛谷 P1636】Einstein学画画 题解(图论+欧拉通路)

Einstein学画画 题目描述 Einstein 学起了画画。 此人比较懒~~&#xff0c;他希望用最少的笔画画出一张画…… 给定一个无向图&#xff0c;包含 n n n 个顶点&#xff08;编号 1 ∼ n 1 \sim n 1∼n&#xff09;&#xff0c; m m m 条边&#xff0c;求最少用多少笔可以画…

nodejs微信小程序+python+PHP-书吧租阅管理系统的设计与实现-安卓-计算机毕业设计

目 录 摘 要 I ABSTRACT II 目 录 II 第1章 绪论 1 1.1背景及意义 1 1.2 国内外研究概况 1 1.3 研究的内容 1 第2章 相关技术 3 2.1 nodejs简介 4 2.2 express框架介绍 6 2.4 MySQL数据库 4 第3章 系统分析 5 3.1 需求分析 5 3.2 系统可行性分析 5 3.2.1技术可行性&#xff1a;…

深度学习+不良身体姿势检测+警报系统+代码+部署(姿态识别矫正系统)

正确的身体姿势是一个人整体健康的关键。然而&#xff0c;保持正确的身体姿势可能很困难&#xff0c;因为我们经常忘记这一点。这篇博文将引导您完成为此构建解决方案所需的步骤。最近&#xff0c;我们在使用 POSE 进行身体姿势检测方面玩得很开心。它就像一个魅力&#xff01;…

Ubuntu20安装ssh服务

Ubuntu20上执行如下命令查看是否存在ssh服务 #ps -e | grep ssh 只有ssh-agent&#xff0c;没有sshd; 因此要安装openssh-server. 搜索openssh-server,得到下载链接&#xff1a; openssh-server 复制这个Binary Package链接即可下载&#xff0c;然后使用如下命令安装 sudo…

PyQt6库和工具库QTDesigner安装与配置

锋哥原创的PyQt6视频教程&#xff1a; 2024版 PyQt6 Python桌面开发 视频教程(无废话版) 玩命更新中~_哔哩哔哩_bilibili2024版 PyQt6 Python桌面开发 视频教程(无废话版) 玩命更新中~共计12条视频&#xff0c;包括&#xff1a;2024版 PyQt6 Python桌面开发 视频教程(无废话版…

【knife4j-spring-boot】Springboot + knife4j-spring-boot 整合swagger脚手架

swagger-boostrap-ui从1.x版本到如今2.x&#xff0c;同时也更改名字Knife4j 在此记录下 knife4j-spring-boot-starter 的整合。 只需要引入knife4j-spring-boot-starter&#xff0c;无需引入其他的swagger包&#xff0c;knife4j-spring-boot-starter已经包含。 官方版本说明…