Apache Flink 结合 Kafka 构建端到端的 Exactly-Once 处理

文章目录:

  1. Apache Flink 应用程序中的 Exactly-Once 语义
  2. Flink 应用程序端到端的 Exactly-Once 语义
  3. 示例 Flink 应用程序启动预提交阶段
  4. 在 Flink 中实现两阶段提交 Operator
  5. 总结

Apache Flink 自2017年12月发布的1.4.0版本开始,为流计算引入了一个重要的里程碑特性:TwoPhaseCommitSinkFunction(相关的Jira)。它提取了两阶段提交协议的通用逻辑,使得通过Flink来构建端到端的Exactly-Once程序成为可能。同时支持一些数据源(source)和输出端(sink),包括Apache Kafka 0.11及更高版本。它提供了一个抽象层,用户只需要实现少数方法就能实现端到端的Exactly-Once语义。

有关TwoPhaseCommitSinkFunction的使用详见文档: TwoPhaseCommitSinkFunction。或者可以直接阅读Kafka 0.11 sink的文档: kafka。

接下来会详细分析这个新功能以及Flink的实现逻辑,分为如下几点。

  • 描述Flink checkpoint机制是如何保证Flink程序结果的Exactly-Once的
  • 显示Flink如何通过两阶段提交协议与数据源和数据输出端交互,以提供端到端的Exactly-Once保证
  • 通过一个简单的示例,了解如何使用TwoPhaseCommitSinkFunction实现Exactly-Once的文件输出

一、Apache Flink应用程序中的Exactly-Once语义

当我们说『Exactly-Once』时,指的是每个输入的事件只影响最终结果一次。即使机器或软件出现故障,既没有重复数据,也不会丢数据。

Flink很久之前就提供了Exactly-Once语义。在过去几年中,我们对Flink的checkpoint机制有过深入的描述,这是Flink有能力提供Exactly-Once语义的核心。Flink文档还提供了该功能的全面概述。

在继续之前,先看下对checkpoint机制的简要介绍,这对理解后面的主题至关重要。

  • 次checkpoint是以下内容的一致性快照:
  • 应用程序的当前状态
  • 输入流的位置

Flink可以配置一个固定的时间点,定期产生checkpoint,将checkpoint的数据写入持久存储系统,例如S3或HDFS。将checkpoint数据写入持久存储是异步发生的,这意味着Flink应用程序在checkpoint过程中可以继续处理数据。

如果发生机器或软件故障,重新启动后,Flink应用程序将从最新的checkpoint点恢复处理; Flink会恢复应用程序状态,将输入流回滚到上次checkpoint保存的位置,然后重新开始运行。这意味着Flink可以像从未发生过故障一样计算结果。

在Flink 1.4.0之前,Exactly-Once语义仅限于Flink应用程序内部,并没有扩展到Flink数据处理完后发送的大多数外部系统。Flink应用程序与各种数据输出端进行交互,开发人员需要有能力自己维护组件的上下文来保证Exactly-Once语义。

为了提供端到端的Exactly-Once语义 – 也就是说,除了Flink应用程序内部,Flink写入的外部系统也需要能满足Exactly-Once语义 – 这些外部系统必须提供提交或回滚的方法,然后通过Flink的checkpoint机制来协调。

分布式系统中,协调提交和回滚的常用方法是两阶段提交协议。在下一节中,我们将讨论Flink的TwoPhaseCommitSinkFunction是如何利用两阶段提交协议来提供端到端的Exactly-Once语义。

二、Flink应用程序端到端的Exactly-Once语义

我们将介绍两阶段提交协议,以及它如何在一个读写Kafka的Flink程序中实现端到端的Exactly-Once语义。Kafka是一个流行的消息中间件,经常与Flink一起使用。Kafka在最近的0.11版本中添加了对事务的支持。这意味着现在通过Flink读写Kafaka,并提供端到端的Exactly-Once语义有了必要的支持。

Flink对端到端的Exactly-Once语义的支持不仅局限于Kafka,您可以将它与任何一个提供了必要的协调机制的源/输出端一起使用。例如Pravega,来自DELL/EMC的开源流媒体存储系统,通过Flink的TwoPhaseCommitSinkFunction也能支持端到端的Exactly-Once语义。

在今天讨论的这个示例程序中,我们有:

  • 从Kafka读取的数据源(Flink内置的KafkaConsumer)
  • 窗口聚合
  • 将数据写回Kafka的数据输出端(Flink内置的KafkaProducer)

要使数据输出端提供Exactly-Once保证,它必须将所有数据通过一个事务提交给Kafka。提交捆绑了两个checkpoint之间的所有要写入的数据。这可确保在发生故障时能回滚写入的数据。但是在分布式系统中,通常会有多个并发运行的写入任务的,简单的提交或回滚是不够的,因为所有组件必须在提交或回滚时“一致”才能确保一致的结果。Flink使用两阶段提交协议及预提交阶段来解决这个问题。

在checkpoint开始的时候,即两阶段提交协议的“预提交”阶段。当checkpoint开始时,Flink的JobManager会将checkpoint barrier(将数据流中的记录分为进入当前checkpoint与进入下一个checkpoint)注入数据流。

brarrier在operator之间传递。对于每一个operator,它触发operator的状态快照写入到state backend。

数据源保存了消费Kafka的偏移量(offset),之后将checkpoint barrier传递给下一个operator。

这种方式仅适用于operator具有『内部』状态。所谓内部状态,是指Flink state backend保存和管理的 -例如,第二个operator中window聚合算出来的sum值。当一个进程有它的内部状态的时候,除了在checkpoint之前需要将数据变更写入到state backend,不需要在预提交阶段执行任何其他操作。Flink负责在checkpoint成功的情况下正确提交这些写入,或者在出现故障时中止这些写入。

三、示例Flink应用程序启动预提交阶段

但是,当进程具有『外部』状态时,需要作些额外的处理。外部状态通常以写入外部系统(如Kafka)的形式出现。在这种情况下,为了提供Exactly-Once保证,外部系统必须支持事务,这样才能和两阶段提交协议集成。

在本文示例中的数据需要写入Kafka,因此数据输出端(Data Sink)有外部状态。在这种情况下,在预提交阶段,除了将其状态写入state backend之外,数据输出端还必须预先提交其外部事务。

当checkpoint barrier在所有operator都传递了一遍,并且触发的checkpoint回调成功完成时,预提交阶段就结束了。所有触发的状态快照都被视为该checkpoint的一部分。checkpoint是整个应用程序状态的快照,包括预先提交的外部状态。如果发生故障,我们可以回滚到上次成功完成快照的时间点。

下一步是通知所有operator,checkpoint已经成功了。这是两阶段提交协议的提交阶段,JobManager为应用程序中的每个operator发出checkpoint已完成的回调。

数据源和 widnow operator没有外部状态,因此在提交阶段,这些operator不必执行任何操作。但是,数据输出端(Data Sink)拥有外部状态,此时应该提交外部事务。

我们对上述知识点总结下:

  • 旦所有operator完成预提交,就提交一个commit。
  • 如果至少有一个预提交失败,则所有其他提交都将中止,我们将回滚到上一个成功完成的checkpoint。
  • 在预提交成功之后,提交的commit需要保证最终成功 – operator和外部系统都需要保障这点。如果commit失败(例如,由于间歇性网络问题),整个Flink应用程序将失败,应用程序将根据用户的重启策略重新启动,还会尝试再提交。这个过程至关重要,因为如果commit最终没有成功,将会导致数据丢失。

因此,我们可以确定所有operator都同意checkpoint的最终结果:所有operator都同意数据已提交,或提交被中止并回滚。

四、在Flink中实现两阶段提交Operator

完整的实现两阶段提交协议可能有点复杂,这就是为什么Flink将它的通用逻辑提取到抽象类TwoPhaseCommitSinkFunction中的原因。

接下来基于输出到文件的简单示例,说明如何使用TwoPhaseCommitSinkFunction。用户只需要实现四个函数,就能为数据输出端实现Exactly-Once语义:

  • beginTransaction – 在事务开始前,我们在目标文件系统的临时目录中创建一个临时文件。随后,我们可以在处理数据时将数据写入此文件。
  • preCommit – 在预提交阶段,我们刷新文件到存储,关闭文件,不再重新写入。我们还将为属于下一个checkpoint的任何后续文件写入启动一个新的事务。
  • commit – 在提交阶段,我们将预提交阶段的文件原子地移动到真正的目标目录。需要注意的是,这会增加输出数据可见性的延迟。
  • abort – 在中止阶段,我们删除临时文件。

我们知道,如果发生任何故障,Flink会将应用程序的状态恢复到最新的一次checkpoint点。一种极端的情况是,预提交成功了,但在这次commit的通知到达operator之前发生了故障。在这种情况下,Flink会将operator的状态恢复到已经预提交,但尚未真正提交的状态。

我们需要在预提交阶段保存足够多的信息到checkpoint状态中,以便在重启后能正确的中止或提交事务。在这个例子中,这些信息是临时文件和目标目录的路径。

TwoPhaseCommitSinkFunction已经把这种情况考虑在内了,并且在从checkpoint点恢复状态时,会优先发出一个commit。我们需要以幂等方式实现提交,一般来说,这并不难。在这个示例中,我们可以识别出这样的情况:临时文件不在临时目录中,但已经移动到目标目录了。

在TwoPhaseCommitSinkFunction中,还有一些其他边界情况也会考虑在内,请参考Flink文档了解更多信息。

总结

总结下本文涉及的一些要点:

  • Flink的checkpoint机制是支持两阶段提交协议并提供端到端的Exactly-Once语义的基础。
  • 这个方案的优点是: Flink不像其他一些系统那样,通过网络传输存储数据 – 不需要像大多数批处理程序那样将计算的每个阶段写入磁盘。
  • Flink的TwoPhaseCommitSinkFunction提取了两阶段提交协议的通用逻辑,基于此将Flink和支持事务的外部系统结合,构建端到端的Exactly-Once成为可能。
  • 从Flink 1.4.0开始,Pravega和Kafka 0.11 producer都提供了Exactly-Once语义;Kafka在0.11版本首次引入了事务,为在Flink程序中使用Kafka producer提供Exactly-Once语义提供了可能性。
  • Kafaka 0.11 producer的事务是在TwoPhaseCommitSinkFunction基础上实现的,和at-least-once producer相比只增加了非常低的开销。

这是个令人兴奋的功能,期待Flink TwoPhaseCommitSinkFunction在未来支持更多的数据接收端。


原文链接
本文为云栖社区原创内容,未经允许不得转载。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/518619.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

一文教你如何使用 MongoDB 和 HATEOAS 创建 REST Web 服务

作者 | Ion Pascari译者 | 天道酬勤 责编 | 徐威龙封图| CSDN 下载于视觉中国最近,作者在把HATEOAS实现到REST Web服务时遇到了一件有趣的事情,而且他也很幸运地尝试了一个名为MongoDB的NoSQL数据库,他发现该数据库在许多不需要管理实务的不同…

使用NGINX作为HTTPS正向代理服务器

NGINX主要设计作为反向代理服务器,但随着NGINX的发展,它同样能作为正向代理的选项之一。正向代理本身并不复杂,而如何代理加密的HTTPS流量是正向代理需要解决的主要问题。本文将介绍利用NGINX来正向代理HTTPS流量两种方案,及其使用…

IoT SaaS加速器——助力阿尔茨海默病人护理

场景介绍 阿尔茨海默病,是导致中老年人认知功能障碍的最常见疾病之一,是发生在老年期及老年前期的一种原发性退行性脑病。据估计,全世界痴呆症患者数量为4700万,到2030年将达到7500万人。痴呆症患者数量到2050年预计将是现在的近…

一个数据科学家需要哪些核心工具包?

作者 | Rebecca Vickery译者 | 天道酬勤 责编 | 徐威龙封图| CSDN 下载于视觉中国数据科学家的主要作用是将机器学习、统计方法和探索性分析应用到数据中,来提取见解并帮助制定决策。 编程和计算工具的使用对该角色来说必不可少。 实际上,许多人都用这句…

Java-静态方法、非静态方法

// 学生类 public class Student {// 静态方法 staticpublic static void say01(){System.out.println("学生01 静态方法说话了");}// 非静态方法public void say02(){System.out.println("学生02 非静态方法说话了");} }静态方法、非静态方法 public cla…

SpringBoot2.x Flowable 6.4.2 开源项目

文章目录一、项目服务端初始化1. 创建数据库2. 初始化表结构.3. 表结构补充4. 配置文件修改5. 下载依赖6. 异常解决7. 启动服务端二、前端初始化2.1. 安装Node(V12.x.x以上)和NPM(V6.x.x以上)2.2. 安装淘宝镜像2.2. 初始化前端项目2.3. 启动项目2.4. web登录页面2.5. 效果图三、…

MongoDB 4.2 新特性解读

云数据库 MongoDB 版 基于飞天分布式系统和高性能存储,提供三节点副本集的高可用架构,容灾切换,故障迁移完全透明化。并提供专业的数据库在线扩容、备份回滚、性能优化等解决方案。 了解更多 MongoDB World 2019 上发布新版本 MongoDB 4.2…

Java-类与对象的创建

// 学生类 public class Student {// 属性String name; // 默认 nullint age; // 默认 0// 方法public void study(){System.out.println(this.name " 在学习");} }public class Application {public static void main(String[] args) {// 实例化后会返回一个自己…

Spark3.0发布了,代码拉过来,打个包,跑起来!| 附源码编译

作者 | 敏叔V587责编 | 徐威龙封图| CSDN 下载于视觉中国Spark3.0已经发布有一阵子了,官方发布了预览版,带来了一大波更新,对于我们程序员来说,首先当然是代码拉过来,打个包,跑起来!&#xff01…

MySQL单表数据不要超过500万行:是经验数值,还是黄金铁律?

今天,探讨一个有趣的话题:MySQL 单表数据达到多少时才需要考虑分库分表?有人说 2000 万行,也有人说 500 万行。那么,你觉得这个数值多少才合适呢? 曾经在中国互联网技术圈广为流传着这么一个说法&#xff1…

Java-构造器

一个类即使什么都不写,它也会存在一个方法 package oop.demo02;/*** author blake.wang* date 2021-04-19 18:58*/ public class Person {// 一个类即使什么都不写,它也会存在一个方法,具体可以看一个空 类 的编译后的class文件// 显示的定…

IntelliJ IDEA 2020 基础设置

文章目录1. 字体设置2. 编码设置3. jdk设置4. 自动引入包和删除无用引入的包5. 打开文件左右联动定位1. 字体设置 菜单字体 编辑区 控制台 收缩自如 2. 编码设置 3. jdk设置 4. 自动引入包和删除无用引入的包 5. 打开文件左右联动定位

跟面试官侃半小时MySQL事务隔离性,从基本概念深入到实现

来源 | 阿丸笔记提到MySQL的事务,我相信对MySQL有了解的同学都能聊上几句,无论是面试求职,还是日常开发,MySQL的事务都跟我们息息相关。而事务的ACID(即原子性Atomicity、一致性Consistency、隔离性Isolation、持久性D…

Java-封装

// 类 public class Student {// 属性私有private String name; // 名字private int id; // 学号private char sex; // 性别private int age; // 年龄// 提供一些可以操作这个属性的方法// 提供一些 public 的 get \ set 方法// get 获得这个数据public String getName(){r…

一条数据的漫游奇遇记

数据库存储引擎是一个有历史的技术,经过数十年的发展,已经出现很多优秀成熟的产品。阿里巴巴 X-Engine 团队撰写的论文 "X-Engine: An Optimized Storage Engine for Large-scale E-Commerce Transaction Processing",详细讲述了团…

idea 编辑区设置

文章目录1. 显示行号2. tabs位置3. tabs排序4.tabs数量5.1. 显示行号 2. tabs位置 3. tabs排序 鼠标按住上下拖拽 按字母排序 4.tabs数量 5.

支付宝玉伯:从前端到体验,如何把格局做大

国内的前端行业,是一个群星璀璨,同时又有些纷纷扰扰的圈子。很多初出茅庐的年轻人怀着改变世界的梦想,谁也不服谁。不过,有一些为前端领域做出贡献的拓荒者几乎受到所有人的尊敬,玉伯就是这些拓荒者中的一员。 如今&am…

Java-继承

一个父类可以有多个子类,但是一个子类只能有一个父类 Person // 在java中,所有的类,都直接或者间接继承Object类 // person 人 父类 public class Person {// public > protected > default > private// public// protected --…

2019报告:AI程序员人才需求暴涨35倍!每10个公司就有6个人才缺口

就在最近,斯坦福大学AI研究所发布了《 2019 年 AI 指数报告》,其中一条数字让笔者震惊:58% 的受访大公司表示,2019 年至少在一个业务部门采用了 AI那就是说,近半数以上的公司都需要AI人才!但是现…

高效代码编辑功能

文章目录1. 行内跳转和选中2. 文件快速切换3. 查看最近查看过的文件4. 打开文件所在文件夹5.导航栏文件切换6. 查找和替换1. 行内跳转和选中 home跳转当前行首 end跳转当前行尾 按住ctrl左右快速按照单词跳转 按住ctrlshrift左右快速选中 2. 文件快速切换 alt左右 3. 查看最…