Flink集成Iceberg在同程艺龙的实践

简介: 本文由同城艺龙大数据开发工程师张军分享,主要介绍同城艺龙 Flink 集成 Iceberg 的生产实践。

本文由同城艺龙大数据开发工程师张军分享,主要介绍同城艺龙 Flink 集成 Iiceberg 的生产实践。内容包括:

  1. 背景及痛点
  2. Flink + Iceberg 的落地
  3. Iceberg 优化实践
  4. 后续工作
  5. 收益及总结

一、背景及痛点

业务背景

同程艺龙是一个提供机票、住宿、交通等服务的在线旅游服务平台,目前我所在的部门属于公司的研发部门,主要职责是为公司内其他业务部门提供一些基础服务,我们的大数据系统主要承接的业务是部门内的一些大数据相关的数据统计、分析工作等。数据来源有网关日志数据、服务器监控数据、K8s 容器的相关日志数据,App 的打点日志, MySQL 的 binlog 日志等。我们主要的大数据任务是基于上述日志构建实时报表,提供基于 Presto 的报表展示和即时查询服务,同时也会基于 Flink 开发一些实时、批处理任务,为业务方提供准确及时的数据支撑。

原架构方案

由于我们所有的原始数据都是存储在 Kafka 的,所以原来的技术架构就是首先是 Flink 任务消费 Kafka 的数据,经过 Flink SQL 或者 Flink jar 的各种处理之后实时写入 Hive,其中绝大部分任务都是 Flink SQL 任务,因为我认为 SQL 开发相对代码要简单的多,并且维护方便、好理解,所以能用 SQL 写的都尽量用 SQL 来写。
提交 Flink 的平台使用的是 Zeppelin,其中提交 Flink SQL 任务是 Zeppelin 自带的功能,提交 jar 包任务是我自己基于 Application 模式开发的 Zeppelin 插件。
对于落地到 Hive 的数据,使用开源的报表系统 metabase (底层使用 Presto) 提供实时报表展示、定时发送邮件报表,以及自定义 SQL 查询服务。由于业务对数据的实时性要求比较高,希望数据能尽快的展示出来,所以我们很多的 Flink 流式任务的 checkpoint 设置为 1 分钟,数据格式采用的是 orc 格式。

痛点

由于采用的是列式存储格式 ORC,无法像行式存储格式那样进行追加操作,所以不可避免的产生了一个大数据领域非常常见且非常棘手的问题,即 HDFS 小文件问题。

开始的时候我们的小文件解决方案是自己写的一个小文件压缩工具,定期去合并,我们的 Hive 分区一般都是天级别的,所以这个工具的原理就是每天凌晨启动一个定时任务去压缩昨天的数据,首先把昨天的数据写入一个临时文件夹,压缩完,和原来的数据进行记录数的比对检验,数据条数一致之后,用压缩后的数据覆盖原来的数据,但是由于无法保证事务,所以出现了很多问题:

  • 压缩的同时由于延迟数据的到来导致昨天的 Hive 分区又有数据写入了,检验就会失败,导致合并小文件失败。
  • 替换旧数据的操作是没有事务保证的,如果替换的过程中旧分区有新的数据写入,就会覆盖新写入的数据,造成数据丢失。
  • 没有事务的支持,无法实时合并当前分区的数据,只能合并压缩前一个分区的,最新的分区数据仍然有小文件的问题,导致最新数据查询性能提高不了。

二、Flink+Iceberg 的落地

Iceberg 技术调研

所以基于以上的 HDFS 小文件、查询慢等问题,结合我们的现状,我调研了目前市面上的数据湖技术:Delta、Apache Iceberg 和 Apache Hudi,考虑了目前数据湖框架支持的功能和以后的社区规划,最终我们是选择了 Iceberg,其中考虑的原因有以下几方面:

■ Iceberg 深度集成 Flink

前面讲到,我们的绝大部分任务都是 Flink 任务,包括批处理任务和流处理任务,目前这三个数据湖框架,Iceberg 是集成 Flink 做的最完善的,如果采用 Iceberg 替代 Hive 之后,迁移的成本非常小,对用户几乎是无感知的,
比如我们原来的 SQL 是这样的:

INSERT INTO hive_catalog.db.hive_table SELECT * FROM kafka_table

迁移到 Iceberg 以后,只需要修改 catalog 就行。

INSERT INTO iceberg_catalog.db.iIcebergceberg_table SELECT * FROM kafka_table

Presto 查询也是和这个类似,只需要修改 catalog 就行了。

■Iceberg 的设计架构使得查询更快

image.png

在 Iceberg 的设计架构中,manifest 文件存储了分区相关信息、data files 的相关统计信息(max/min)等,去查询一些大的分区的数据,就可以直接定位到所要的数据,而不是像 Hive 一样去 list 整个 HDFS 文件夹,时间复杂度从 O(n) 降到了 O(1),使得一些大的查询速度有了明显的提升,在 Iceberg PMC Chair Ryan Blue 的演讲中,我们看到命中 filter 的任务执行时间从 61.5 小时降到了 22 分钟。

■使用 Flink SQL 将 CDC 数据写入 Iceberg

Flink CDC 提供了直接读取 MySQL binlog 的方式,相对以前需要使用 canal 读取 binlog 写入 Iceberg,然后再去消费 Iceberg 数据。少了两个组件的维护,链路减少了,节省了维护的成本和出错的概率。并且可以实现导入全量数据和增量数据的完美对接,所以使用 Flink SQL 将 MySQL binlog 数据导入 Iceberg 来做 MySQL->Iceberg 的导入将会是一件非常有意义的事情。

此外对于我们最初的压缩小文件的需求,虽然 Iceberg 目前还无法实现自动压缩,但是它提供了一个批处理任务,已经能满足我们的需求。

■Hive 表迁移 Iceberg 表

迁移准备工作

目前我们的所有数据都是存储在 Hive 表的,在验证完 Iceberg 之后,我们决定将 Hive 的数据迁移到 Iceberg,所以我写了一个工具,可以使用 Hive 的数据,然后新建一个 Iceberg 表,为其建立相应的元数据,但是测试的时候发现,如果采用这种方式,需要把写入 Hive 的程序停止,因为如果 Iceberg 和 Hive 使用同一个数据文件,而压缩程序会不断地压缩 Iceberg 表的小文件,压缩完之后,不会马上删除旧数据,所以 Hive 表就会查到双份的数据,故我们采用双写的策略,原来写入 Hive 的程序不动,新启动一套程序写入 Iceberg,这样能对 Iceberg 表观察一段时间。还能和原来 Hive 中的数据进行比对,来验证程序的正确性。

经过一段时间观察,每天将近几十亿条数据、压缩后几个 T 大小的 Hive 表和 Iceberg 表,一条数据也不差。所以在最终对比数据没有问题之后,把 Hive 表停止写入,使用新的 Iceberg 表。

迁移工具

我将这个 Hive 表迁移 Iceberg 表的工具做成了一个基于 Flink batch job 的 Iceberg Action,提交了社区,不过目前还没合并:https://github.com/apache/iceberg/pull/2217。这个功能的思路是使用 Hive 原始的数据不动,然后新建一个 Iceberg table,再为这个新的 Iceberg table 生成对应的元数据,大家有需要的话可以先看看。

此外,Iceberg 社区,还有一个把现有的数据迁移到已存在的 Iceberg table 的工具,类似 Hive 的 LOAD DATA INPATH ... INTO TABLE ,是用 Spark 的存储过程做的,大家也可以关注下:https://github.com/apache/iceberg/pull/2210

三、Iceberg 优化实践

压缩小文件

目前压缩小文件是采用的一个额外批任务来进行的,Iceberg 提供了一个 Spark 版本的 action,我在做功能测试的时候发现了一些问题,此外我对 Spark 也不是非常熟悉,担心出了问题不好排查,所以参照 Spark 版本的自己实现了一个 Flink 版本,并修复了一些 bug,进行了一些功能的优化。

由于我们的 Iceberg 的元数据都是存储在 Hive 中的,也就是我们使用了 HiveCatalog,所以压缩程序的逻辑是把 Hive 中所有的 Iceberg 表全部都查出来,依次压缩。压缩没有过滤条件,不管是分区表还是非分区表,都进行全表的压缩,这样做是为了处理某些使用 eventtime 的 Flink 任务。如果有延迟的数据的到来,就会把数据写入以前的分区,如果不是全表压缩只压缩当天分区的话,新写入的其他天的数据就不会被压缩。

之所以没有开启定时任务来压缩,是因为比如定时五分钟压缩一个表,如果五分钟之内这个压缩任务没完成,没有提交新的 snapshot,下一个定时任务又开启了,就会把上一个没有完成的压缩任务中的数据重新压缩一次,所以每个表依次压缩的策略可以保证某一时刻一个表只有一个任务在压缩。

代码示例参考:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Actions.forTable(env, table) .rewriteDataFiles() //.maxParallelism(parallelism) //.filter(Expressions.equal("day", day)) //.targetSizeInBytes(targetSizeInBytes) .execute();

目前系统运行稳定,已经完成了几万次任务的压缩。

image.png

注意:
不过目前对于新发布的 Iceberg 0.11 来说,还有一个已知的 bug,即当压缩前的文件大小大于要压缩的大小(targetSizeInBytes)时,会造成数据丢失,其实这个问题我在最开始测试小文件压缩的时候就发现了,并且提了一个 pr,我的策略是大于目标文件的数据文件不参与压缩,不过这个 pr 没有合并到 0.11 版本中,后来社区另外一个兄弟也发现了相同的问题,提交了一个 pr( https://github.com/apache/iceberg/pull/2196 ) ,策略是将这个大文件拆分到目标文件大小,目前已经合并到 master,会在下一个 bug fix 版本 0.11.1 中发布。

查询优化

■ 批处理定时任务

目前对于定时调度中的批处理任务,Flink 的 SQL 客户端还没 Hive 那样做的很完善,比如执行 hive-f 来执行一个文件。而且不同的任务需要不同的资源,并行度等。

所以我自己封装了一个 Flink 程序,通过调用这个程序来进行处理,读取一个指定文件里面的 SQL,来提交批任务。在命令行控制任务的资源和并行度等。

/home/flink/bin/fFlinklinklink run -p 10 -m yarn-cluster /home/work/iceberg-scheduler.jar my.sql

■ 优化

批任务的查询这块,我做了一些优化工作,比如 limit 下推,filter 下推,查询并行度推断等,可以大大提高查询的速度,这些优化都已经推回给社区,并且在 Iceberg 0.11 版本中发布。

运维管理

■ 清理 orphan 文件

  1. 定时任务删除

在使用 Iceberg 的过程中,有时候会有这样的情况,我提交了一个 Flink 任务,由于各种原因,把它停了,这个时候 Iceberg 还没提交相应的快照。此外由于一些异常导致程序失败,会产生一些不在 Iceberg 元数据里面的孤立的数据文件,这些文件对 Iceberg 来说是不可达的,也是没用的。所以我们需要像 jvm 的垃圾回收一样来清理这些文件。

目前 Iceberg 提供了一个 Spark 版本的 action 来处理这些没用的文件,我们采取的策略和压缩小文件一样,获取 Hive 中的所有的 Iceberg 表。每隔一个小时执行一次定时任务来删除这些没用的文件。

SparkSession spark = ...... Actions.forTable(spark, table) .removeOrphanFiles() //.deleteWith(...) .execute();
  1. 踩坑

我们在程序运行过程中出现了正常的数据文件被删除的问题,经过调研,由于快照保留设置是一小时,这个清理程序清理时间也是设置一个小时,通过日志发现是这个清理程序删除了正常的数据。查了查代码,应该是设置了一样的时间,在清理孤立文件的时候,有其他程序正在读取要 expired 的 snapshot,导致删除了正常的数据。最后把这个清理程序的清理时间改成默认的三天,没有再出现删除数据文件的问题。
当然,为了保险起见,我们可以覆盖原来的删除文件的方法,改成将文件到一个备份文件夹,检查没有问题之后,手工删除。

■ 快照过期处理

我们的快照过期策略,是和压缩小文件的批处理任务写在一起的,压缩完小文件之后,进行表的快照过期处理,目前保留的时间是一个小时。这是因为对于有一些比较大的表,分区比较多,而且 checkpoint 比较短,如果保留的快照过长的话,还是会保留过多小文件,我们暂时没有查询历史快照的需求,所以我将快照的保留时间设置了一个小时。

long olderThanTimestamp = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1);table.expireSnapshots()// .retainLast(20).expireOlderThan(olderThanTimestamp).commit();

■ 数据管理

写入了数据之后,当想查看相应的快照有多少数据文件时,直接查询 Spark 无法知道哪个是有用的,哪个是没用的。所以需要有对应的管理工具。目前 Flink 这块还不太成熟,我们可以使用 Spark3 提供的工具来查看。

  1. DDL

目前 create table 这些操作我们是通过 Flink SQL Client 来做的。其他相关的 DDL 的操作可以使用 Spark 来做:https://iceberg.apache.org/spark/#ddl-commands

  1. DML

一些相关的数据的操作,比如删除数据等可以通过 MySQL 来实现,Presto 目前只支持分区级别的删除功能。

  1. show partitions & show create table

在我们操作 Hive 的时候,有一些很常用的操作,比如 show partitions、 show create table 等,这些目前 Flink 还没有支持,所以在操作 Iceberg 的时候就很不方便,我们自己基于 Flink 1.12 做 了修改,不过目前还没有完全提交到社区,后续有时间会提交到 Flink 和 Iceberg 社区。

四、后续工作

  • Flink SQL 接入 CDC 数据到 Iceberg

目前在我们内部的版本中,我已经测试通过可以使用 Flink SQL 将 CDC 数据(比如 MySQL binlog)写入 Iceberg,社区的版本中实现该功能还需要做一些工作,我也提交了一些相关的 PR 来推进这个工作。

  • 使用 SQL 进行删除和更新

对于 copy-on-write 表,我们可以使用 Spark SQL 来进行行级的删除和更新。具体的支持的语法可以参考源码中的测试类:

org.apache.iceberg.spark.extensions.TestDelete & org.apache.iceberg.spark.extensions.TestUpdate,这些功能我在测试环境测试是可以的,但是还没有来得及更新到生产。

  • 使用 Flink SQL 进行 streaming read

在工作中会有一些这样的场景,由于数据比较大,Iceberg 的数据只存了较短的时间,如果很不幸因为程序写错了等原因,想从更早的时间来消费就无能为力了。
当引入了 Iceberg 的 streaming read 之后,这些问题就可以解决了,因为 Iceberg 存储了所有的数据,当然这里有一个前提就是对于数据没有要求特别精确,比如达到秒级别,因为目前 Flink 写入 Iceberg 的事务提交是基于 Flink Checkpoint 间隔的。

五、收益及总结

经过对 Iceberg 大概一个季度的调研,测试,优化和 bug 修复,我们将现有的 Hive 表都迁移到了 Iceberg,完美解决了原来的所有的痛点问题,目前系统稳定运行,而且相对 Hive 得到了很多的收益:

  • Flink 写入的资源减少

举一个例子,默认配置下,原来一个 flink 读取 kafka 写入 hive 的任务,需要60个并行度才不会让 Kafka 产生积压。改成写入 iceberg 之后,只需要20个并行度就够了。

  • 查询速度变快

前面我们讲到 Iceberg 查询的时候不会像 Hive 一样去 list 整个文件夹来获取分区数据,而是先从 manifest 文件中获取相关数据,查询的性能得到了显著的提升,一些大的报表的查询速度从 50 秒提高到 30 秒。

  • 并发读写

由于 Iceberg 的事务支持,我们可以实现对一个表进行并发读写,Flink 流式数据实时入湖,压缩程序同时压缩小文件,清理过期文件和快照的程序同时清理无用的文件,这样就能更及时的提供数据,做到分钟级的延迟,查询最新分区数据的速度大大加快了,并且由于 Iceberg 的 ACID 特性可以保证数据的准确性。

  • time travel

可以回溯查询以前某一时刻的数据。

总结一下,我们目前可以实现使用 Flink SQL 对 Iceberg 进行批、流的读写,并可以对小文件进行实时的压缩,使用 Spark SQL 做一些 delete 和 update 工作以及一些 DDL 操作,后续可以使用 Flink SQL 将 CDC 的数据写入 Iceberg。目前对 Iceberg 的所有的优化和 bug fix,我已经贡献给社区。由于笔者水平有限,有时候也难免有错误,还请大家不吝赐教。

作者介绍:
张军,同程艺龙大数据开发工程师

原文链接

本文为阿里云原创内容,未经允许不得转载。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/513869.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

阿里巴巴开源容器镜像加速技术

简介: 近日阿里巴巴开源了其云原生容器镜像加速技术,其推出的overlaybd镜像格式,相比于传统的分层tar包文件格式,实现了基于网络的按需读取,从而使得容器可以快速启动。 近日阿里巴巴开源了其云原生容器镜像加速技术&…

Unity重写Inspector简化分组配置文件

Unity重写Inspector简化分组配置文件 重写Inspector创建分组管理配置文件创建修改参数参数对应类工程在我的资源中名为CreateConfig,免费下载 重写Inspector创建分组管理配置文件 创建 修改参数 参数对应类 using UnityEngine;public class GameConfig : Scriptab…

985大学的高材生只会写代码片段,丢人吗?

很多同学在学习编程的时候都会遇到各种各样的难题,比如:没有合适的资料、学习过于碎片化、资料的质量层次不齐、看了很多视频自己动手时却还是不会、接触不到完整项目、无法检测自己的编程水平是不是企业所认可的……最近,小郭和小解同学也遇…

快手基于RocketMQ的在线消息系统建设实践

简介: 快手需要建设一个主要面向在线业务的消息系统作为 Kafka 的补充,低延迟、高并发、高可用、高可靠的分布式消息中间件 RocketMQ 正是我们所需的。 作者:黄理 黄理,10多年软件开发和架构经验,热衷于代码和性能优…

基于 RocketMQ Prometheus Exporter 打造定制化 DevOps 平台

简介: 本文将对 RocketMQ-Exporter 的设计实现做一个简单的介绍,读者可通过本文了解到 RocketMQ-Exporter 的实现过程,以及通过 RocketMQ-Exporter 来搭建自己的 RocketMQ 监控系统。RocketMQ 在线可交互教程现已登录知行动手实验室&#xff…

c语言结构体函数平面向量加法公式,插值 拟合 符号变量与符号表达式 微积分 解方程 向量运算...

7.1.1 分段线性插值所谓分段线性插值就是通过插值点用折线段连接起来逼近原曲线,这也是计算机绘制图形的基本原理。实现分段线性插值不需编制函数程序,MATLAB自身提供了内部函数interp1其主要用法如下:interp1(x,y,xi) 一维插值◆ yiinterp1(…

Redis 很屌,不懂使用规范就糟蹋了

作者 | 码哥 来源 | 码哥字节❝这可能是最中肯的 Redis 使用规范了一网友昨天和我说,公司凌晨 12 点之后,网站用户量暴增,出现了一个技术故障,用户无法下单,当时老大火冒三丈!经过查找发现 Redis 报 C…

python统计字符在文件中出现的次数_一文搞定统计字符串中某字符出现的频次

下面是统计字符串中某字符出现的次数的方法 方法1: 这个方法相当简单,零基础自学编程,代码写成这样能满足需求,但它逐个逐个计数,比较笨拙。rlt {} for i in content: if i in rlt.keys(): rlt[i] 1 else: rlt[i] 1…

深度 | 数据仓库分层存储技术揭秘

简介: 作者: 沄浩、士远 一 、背景 据IDC发布的《数据时代2025》报告显示,全球每年产生的数据将从2018年的33ZB增长到2025年的175ZB,平均每天约产生491EB数据。随着数据量的不断增长,数据存储成本成为企业IT预算的重…

android tab 切换动画,Android之ViewPager+TabLayout组合实现导航条切换效果(微信和QQ底部多标签切换)...

前言之前在另外一篇中用Fragment和button实现了点击切换Fragment的效果,比较简陋。这次改用ViewPagerTabLayout 实现联动的效果。实现效果ViewPager 多个页面滑动TabLayout 和 ViewPager绑定,实现Fragment和标签绑定TabLayout的自定义标签以及选中颜色改…

5G 和云原生时代的技术下半场,视频化是最大最新的确定性

简介: 随着 5G/ 芯片 / 区块链等等新技术的不断成熟、云计算的普及和云原生时代带来的诸多便捷,开发者和架构师们眼前的挑战也不再只是 0-1 的建设问题,技术如何更多地带来业务价值成为了一个值得讨论的话题。阿里巴巴集团研究员,…

linux unzip命令不存在_15个常用基础命令Linux(很多人不知道!)

Linux 是码农最常用的的OS,很多操作都是命令行,所以很有必要熟练和理解其中一些重要的命令。这里会介绍一些。这里讲的所有都基于bash,mac也可以使用。!!这件事发生了几次? 输入并运行一条长命令后,您发现您忘记在开头添加sudo。 …

云安全的新战场上,要靠什么来抵御威胁

当谈及安全产业,你脑海里能够想到哪些事情?是红黑大战的攻防演练,还是PC上的各种安全软件?事实上,安全的范围远超我们的想象,安全产业也一直在背后,默默的保护在互联网生活的周围。 互联网的发…

函数计算助力高德地图平稳支撑亿级流量高峰

简介: 2020 年的“十一出行节”期间,高德地图创造了记录 ——截止 2020 年 10 月 1 日 13 时 27 分 27 秒,高德地图当日活跃用户突破 1 亿,比 2019 年 10 月 1 日提前 3 时 41 分达成此记录。 期间,Serverless 作为其中…

阿里云李克:阿里云边缘云计算的技术和实践

简介: 李克:边缘计算的核心目标是推动人、事、物的快速决策。 在4月7日下午举办的边缘计算论坛上,阿里云资深技术专家李克为我们带来了《阿里云边缘云计算的技术和实践》为题的精彩演讲。 备受关注的2021全球分布式云大会北京站于4月7日隆重…

数学在左,人生在右

在人们印象中,数学作为一门基础学科,由简单的数字和符号组成或简单或复杂的算式,融入我们的生活、学习、工作的方方面面,是理性、严谨的。 然而笔者在 2021 阿里巴巴全球数学竞赛颁奖典礼上看到数学的另一面:在数学的…

函数计算助力语雀构建稳定且安全的业务架构

简介: 语雀是一个专业的云端知识库,用于团队的文档协作。现在已是阿里员工进行文档编写和知识沉淀的标配,并于 2018 年开始对外提供服务。 客户介绍 语雀是一个专业的云端知识库,用于团队的文档协作。现在已是阿里员工进行文档编…

android menu自定义,Android提高之自定义Menu(TabMenu)实现方法

一般使用过UCWEB-Android版的人都应该对其特殊的menu有一定的印象,把menu做成Tab-Menu(支持分页的Menu),可以容纳比Android传统的menu更丰富的内容(Android的menu超过6项则缩略在[更多]里),本文参考网上的例子的基础上对例子进行简化以及封装…

一行指令造成 60 亿美元蒸发,更让 Facebook 遭遇史诗级故障!

作者 | 马超 责编 | 张红月出品 | CSDN弱小从来不是生存的障碍,傲慢才是。10月4日 FaceBook 发生了一次史诗级中断事故,故障期间 FaceBook 所有旗下APP全面对外服务中断,而且故障的时间长达7个小时之久。根据 Facebook 最新的声明来看&…

一不小心,它成为了 GitHub Alibaba Group 下 Star 最多的开源项目

简介: 随着微服务的流行,应用更加轻量和高效,但是带来的困境是线上问题排查越来越复杂困难。传统的 Java 排查问题,需要重启应用再进行调试,但是重启应用之后现场会丢失,问题难以复现。 来源 | 阿里巴巴云…