汽车之家:基于 Flink + Iceberg 的湖仓一体架构实践

简介: 由汽车之家实时计算平台负责人邸星星在 4 月 17 日上海站 Meetup 分享的,基于 Flink + Iceberg 的湖仓一体架构实践。

内容简要:

一、数据仓库架构升级的背景

二、基于 Iceberg 的湖仓一体架构实践

三、总结与收益

四、后续规划

一、数据仓库架构升级的背景

1. 基于 Hive 的数据仓库的痛点

原有的数据仓库完全基于 Hive 建造而成,主要存在三大痛点:

痛点一:不支持 ACID

1)不支持 Upsert 场景;

2)不支持 Row-level delete,数据修正成本高。

痛点二:时效性难以提升

1)数据难以做到准实时可见;

2)无法增量读取,无法实现存储层面的流批统一;

3)无法支持分钟级延迟的数据分析场景。

痛点三:Table Evolution

1)写入型 Schema,对 Schema 变更支持不好;

2)Partition Spec 变更支持不友好。

2. Iceberg 关键特性

Iceberg 主要有四大关键特性:支持 ACID 语义、增量快照机制、开放的表格式和流批接口支持。

  • 支持 ACID 语义

    • 不会读到不完整的 Commit;
    • 基于乐观锁支持并发 Commit;
    • Row-level delete,支持 Upsert。
  • 增量快照机制

    • Commit 后数据即可见(分钟级);
    • 可回溯历史快照。
  • 开放的表格式

    • 数据格式:parquet、orc、avro
    • 计算引擎:Spark、Flink、Hive、Trino/Presto
  • 流批接口支持

    • 支持流、批写入;
    • 支持流、批读取。

二、基于 Iceberg 的湖仓一体架构实践

湖仓一体的意义就是说我不需要看见湖和仓,数据有着打通的元数据的格式,它可以自由的流动,也可以对接上层多样化的计算生态。

——贾扬清(阿里云计算平台高级研究员)

1. Append 流入湖的链路

img

上图为日志类数据入湖的链路,日志类数据包含客户端日志、用户端日志以及服务端日志。这些日志数据会实时录入到 Kafka,然后通过 Flink 任务写到 Iceberg 里面,最终存储到 HDFS。

2. Flink SQL 入湖链路打通

我们的 Flink SQL 入湖链路打通是基于 “Flink 1.11 + Iceberg 0.11” 完成的,对接 Iceberg Catalog 我们主要做了以下内容:

1)Meta Server 增加对 Iceberg Catalog 的支持;

2)SQL SDK 增加 Iceberg Catalog 支持。

然后在这基础上,平台开放 Iceberg 表的管理功能,使得用户可以自己在平台上建 SQL 的表。

3. 入湖 - 支持代理用户

第二步是内部的实践,对接现有预算体系、权限体系。

因为之前平台做实时作业的时候,平台都是默认为 Flink 用户去运行的,之前存储不涉及 HDFS 存储,因此可能没有什么问题,也就没有思考预算划分方面的问题。

但是现在写 Iceberg 的话,可能就会涉及一些问题。比如数仓团队有自己的集市,数据就应该写到他们的目录下面,预算也是划到他们的预算下,同时权限和离线团队账号的体系打通。

img

如上所示,这块主要是在平台上做了代理用户的功能,用户可以去指定用哪个账号去把这个数据写到 Iceberg 里面,实现过程主要有以下三个。

  • 增加 Table 级别配置:'iceberg.user.proxy' = 'targetUser’

    1)启用 Superuser

    2)团队账号鉴权

    img

  • 访问 HDFS 时启用代理用户:

    img

  • 访问 Hive Metastore 时指定代理用户

    1)参考 Spark 的相关实现:

    org.apache.spark.deploy.security.HiveDelegationTokenProvider

    2)动态代理 HiveMetaStoreClient,使用代理用户访问 Hive metastore

4. Flink SQL 入湖示例

DDL + DML

img

5. CDC 数据入湖链路

img

如上所示,我们有一个 AutoDTS 平台,负责业务库数据的实时接入。我们会把这些业务库的数据接入到 Kafka 里面,同时它还支持在平台上配置分发任务,相当于把进 Kafka 的数据分发到不同的存储引擎里,在这个场景下是分发到 Iceberg 里。

6. Flink SQL CDC 入湖链路打通

下面是我们基于 “Flink1.11 + Iceberg 0.11” 支持 CDC 入湖所做的改动:

  • 改进 Iceberg Sink:

    Flink 1.11 版本为 AppendStreamTableSink,无法处理 CDC 流,修改并适配。

  • 表管理

    1)支持 Primary key(PR1978)

    2)开启 V2 版本:'iceberg.format.version' = '2'

7. CDC 数据入湖

1. 支持 Bucket

Upsert 场景下,需要确保同一条数据写入到同一 Bucket 下,这又如何实现?

目前 Flink SQL 语法不支持声明 bucket 分区,通过配置的方式声明 Bucket:

'partition.bucket.source'='id', // 指定 bucket 字段

'partition.bucket.num'='10', // 指定 bucket 数量

2. Copy-on-write sink

做 Copy-on-Write 的原因是原本社区的 Merge-on-Read 不支持合并小文件,所以我们临时去做了 Copy-on-write sink 的实现。目前业务一直在测试使用,效果良好。

img

上方为 Copy-on-Write 的实现,其实跟原来的 Merge-on-Read 比较类似,也是有 StreamWriter 多并行度写入和 FileCommitter 单并行度顺序提交

在 Copy-on-Write 里面,需要根据表的数据量合理设置 Bucket 数,无需额外做小文件合并。

  • StreamWriter 在 snapshotState 阶段多并行度写入

    1)增加 Buffer;

    2)写入前需要判断上次 checkpoint 已经 commit 成功;

    3)按 bucket 分组、合并,逐个 Bucket 写入。

  • FileCommitter 单并行度顺序提交

    1)table.newOverwrite()

    2)Flink.last.committed.checkpoint.id

    img

8. 示例 - CDC 数据配置入湖

img

如上图所示,在实际使用中,业务方可以在 DTS 平台上创建或配置分发任务即可。

实例类型选择 Iceberg 表,然后选择目标库,表明要把哪个表的数据同步到 Iceberg 里,然后可以选原表和目标表的字段的映射关系是什么样的,配置之后就可以启动分发任务。启动之后,会在实时计算平台 Flink 里面提交一个实时任务,接着用 Copy-on-write sink 去实时地把数据写到 Iceberg 表里面。

img

9. 入湖其他实践

实践一:减少 empty commit

  • 问题描述:

    在上游 Kafka 长期没有数据的情况下,每次 Checkpoint 依旧会生成新的 Snapshot,导致大量的空文件和不必要的 Snapshot。

  • 解决方案(PR - 2042):

    增加配置 Flink.max-continuousempty-commits,在连续指定次数 Checkpoint 都没有数据后才真正触发 Commit,生成 Snapshot。

实践二:记录 watermark

  • 问题描述:

    目前 Iceberg 表本身无法直接反映数据写入的进度,离线调度难以精准触发下游任务。

  • 解决方案( PR - 2109 ):

    在 Commit 阶段将 Flink 的 Watermark 记录到 Iceberg 表的 Properties 中,可直观的反映端到端的延迟情况,同时可以用来判断分区数据完整性,用于调度触发下游任务。

实践三:删表优化

  • 问题描述:

    删除 Iceberg 可能会很慢,导致平台接口相应超时。因为 Iceberg 是面向对象存储来抽象 IO 层的,没有快速清除目录的方法。

  • 解决方案:

    扩展 FileIO,增加 deleteDir 方法,在 HDFS 上快速删除表数据。

10. 小文件合并及数据清理

定期为每个表执行批处理任务(spark 3),分为以下三个步骤:

1. 定期合并新增分区的小文件:

​ rewriteDataFilesAction.execute(); 仅合并小文件,不会删除旧文件。

2. 删除过期的 snapshot,清理元数据及数据文件:

​ table.expireSnapshots().expireOld erThan(timestamp).commit();

3. 清理 orphan 文件,默认清理 3 天前,且无法触及的文件:

​ removeOrphanFilesAction.older Than(timestamp).execute();

11. 计算引擎 – Flink

Flink 是实时平台的核心计算引擎,目前主要支持数据入湖场景,主要有以下几个方面的特点。

  • 数据准实时入湖:

    Flink 和 Iceberg 在数据入湖方面集成度最高,Flink 社区主动拥抱数据湖技术。

  • 平台集成:

    AutoStream 引入 IcebergCatalog,支持通过 SQL 建表、入湖 AutoDTS 支持将 MySQL、SQLServer、TiDB 表配置入湖。

  • 流批一体:

    在流批一体的理念下,Flink 的优势会逐渐体现出来。

12. 计算引擎 – Hive

Hive 在 SQL 批处理层面 Iceberg 和 Spark 3 集成度更高,主要提供以下三个方面的功能。

  • 定期小文件合并及 meta 信息查询:

    SELECT * FROM prod.db.table.history 还可查看 snapshots, files, manifests。

  • 离线数据写入:

    1)Insert into 2)Insert overwrite 3)Merge into

  • 分析查询:

    主要支持日常的准实时分析查询场景。

13. 计算引擎 – Trino/Presto

AutoBI 已经和 Presto 集成,用于报表、分析型查询场景。

  • Trino

    1)直接将 Iceberg 作为报表数据源

    2)需要增加元数据缓存机制:https://github.com/trinodb/trino/issues/7551

  • Presto

    社区集成中:https://github.com/prestodb/presto/pull/15836

14. 踩过的坑

1. 访问 Hive Metastore 异常

问题描述:HiveConf 的构造方法的误用,导致 Hive 客户端中声明的配置被覆盖,导致访问 Hive metastore 时异常

解决方案(PR-2075):修复 HiveConf 的构造,显示调用 addResource 方法,确保配置不会被覆盖:hiveConf.addResource(conf);

2.Hive metastore 锁未释放

问题描述:“CommitFailedException: Timed out after 181138 ms waiting for lock xxx.” 原因是 hiveMetastoreClient.lock 方法,在未获得锁的情况下,也需要显示 unlock,否则会导致上面异常。

解决方案(PR-2263):优化 HiveTableOperations#acquireLock 方法,在获取锁失败的情况下显示调用 unlock 来释放锁。

3. 元数据文件丢失

问题描述:Iceberg 表无法访问,报 “NotFoundException Failed to open input stream for file : xxx.metadata.json”

解决方案(PR-2328):当调用 Hive metastore 更新 iceberg 表的 metadata_location 超时后,增加检查机制,确认元数据未保存成功后再删除元数据文件。

三、收益与总结

1. 总结

​ 通过对湖仓一体、流批融合的探索,我们分别做了总结。

  • 湖仓一体

    1)Iceberg 支持 Hive Metastore;

    2)总体使用上与 Hive 表类似:相同数据格式、相同的计算引擎。

  • 流批融合

    准实时场景下实现流批统一:同源、同计算、同存储。

2. 业务收益

  • 数据时效性提升:

    入仓延迟从 2 小时以上降低到 10 分钟以内;算法核心任务 SLA 提前 2 小时完成。

  • 准实时的分析查询:

    结合 Spark 3 和 Trino,支持准实时的多维分析查询。

  • 特征工程提效:

    提供准实时的样本数据,提高模型训练时效性。

  • CDC 数据准实时入仓:

    可以在数仓针对业务表做准实时分析查询。

3. 架构收益 - 准实时数仓

img

上方也提到了,我们支持准实时的入仓和分析,相当于是为后续的准实时数仓建设提供了基础的架构验证。准实时数仓的优势是一次开发、口径统一、统一存储,是真正的批流一体。劣势是实时性较差,原来可能是秒级、毫秒级的延迟,现在是分钟级的数据可见性。

但是在架构层面上,这个意义还是很大的,后续我们能看到一些希望,可以把整个原来 “T + 1” 的数仓,做成准实时的数仓,提升数仓整体的数据时效性,然后更好地支持上下游的业务。

四、后续规划

1. 跟进 Iceberg 版本

全面开放 V2 格式,支持 CDC 数据的 MOR 入湖。

2. 建设准实时数仓

基于 Flink 通过 Data pipeline 模式对数仓各层表全面提速。

3. 流批一体

随着 upsert 功能的逐步完善,持续探索存储层面流批一体。

4. 多维分析

基于 Presto/Spark3 输出准实时多维分析。

原文链接
本文为阿里云原创内容,未经允许不得转载。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/513279.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于 Scheduled SQL 对 VPC FlowLog 实现细粒度时间窗口分析

简介: 针对VPC FlowLog的五元组和捕获窗口信息,在分析时使用不同时间窗口精度,可能得到不一样的流量特征,本文介绍一种方法将原始采集日志的时间窗口做拆分,之后重新聚合为新的日志做分析,达到更细粒度的分…

实力登场!移动云技术内核2.0 四大全新升级!

“中国数字经济占GDP比重持续增长,5G网络建设已进入规模化部署阶段。随着5G网络的发展,企业的数字化改造需求越来越旺盛。企业日益增长的数字化改造需求对云基础设施提出了新的挑战:需要支持多种类型网络接入、支持公有云、混合云、专属云等多…

obsidian使用分享

ob对比其他软件 上文提到obsidian,这里对obsidian做一个简要的总结 优点:对比notion,语雀这些软件,内容存储在应用商的服务器上。它是存在本地的。 对比思源笔记。说一下思源笔记的不足。思源是块来控制的,回车就是一…

苹果xr如何截屏_苹果手机自带的三种截屏技巧,你知道几个?现在知道还不迟...

今年苹果手机发布的新机自发布以来就受到了热烈的追捧,销量一直都处于只增不减的趋势。苹果手机为何如此之火?除了本身自带的IOS系统之外,手机自带很多小技巧,你知道不?今天就来为大家介绍苹果手机中的三种截屏小技巧&…

Scheduled SQL: SLS 大规模日志上的全局分析与调度

简介: 本文总结了大规模日志全局分析的需求,讨论SLS上现有的典型分析方案,并延伸到 SLS 原生数据处理方案,介绍 Schedueld SQL 功能与最佳实践。 大规模日志全局分析的需求 数据大规模与时效性 基于时间的数据(日志…

matlab制作以太网数据接收上位机_3D激光扫描仪设计及数据处理

本文内容转载自《电子技术应用》2019年第10期,版权归《电子技术应用》编辑部所有。段清明,王凡,徐琳琳,全文俊吉林大学仪器科学与电气工程学院摘要:利用2D激光雷达配合云台装置,设计了一种3D激光扫描仪作为…

跨平台(windows+linux)的线程辅助程序,跨平台(Windows+Linux)的Socket通讯程序(二)—结构...

上一篇"跨平台(WindowsLinux)的Socket通讯程序"给出了Socket通讯底层的一些函数的包装方法/类,同时屏蔽了操作系统(Windows/Linux)的不同。上一篇只是对通讯底层方法的封装,并没用涉及应用,这一篇将基于上一篇,并结合&q…

数据的“敏捷制造”,DataWorks一站式数据开发治理范式演进

简介: 企业大数据技术发展至今,历经了两次蜕变。第一次蜕变从最初的“小作坊”解决大数据问题,到后来企业用各类大数据技术搭建起属于自己的“大平台”,通过平台化的能力完成数据生产力的升级。 第二次蜕变让大数据从“大平台”向…

全新的 Fragment 通信方式

作者 | tech-bus.丹卿来源 | 程序员巴士前言就在前段时间,Google 推出了 Fragment Result API 和 Activity Results API,用来取代之前的 Activity 和 Fragment 之间通信方式的不足,大家可以前往看看都有哪些更新:https://medium.c…

数据传输完整性_电缆监测数据传输系统分析与设计

电缆线路是重要的输电方式,对电缆线路进行监测是保证电缆线路正常工作的重要的条件,研究人员利用嵌入式系统设计了电缆监测数据传输系统。该系统以CAN通信和嵌入式以太网络技术为核心,实现了对电缆及其沟道的实时监测、状态显示及预报警功能&…

大型企业多账号管理“安全心法”

简介: 云上多账号环境下的网络统一管理,是大型分支型企业网络安全防护的必经之路。无论是外企入华、国内企业出海,还是本土集团型企业规模化成长,云上统一网络安全管控与整体安全态势感知,都可以拉齐企业账号间安全水位…

苹果将于 2025 年推出的 Apple Car 长什么样?

整理 | 孙胜出品 | CSDN(ID:CSDNnews)据国外媒体报道,苹果公司预计将于2025年推出一款全新的自动驾驶汽车,旨在实现真正意义上的无人驾驶。报道称,基于自动驾驶的理念,苹果理想的汽车没有方向盘…

阿里云中间件首席架构师李小平:云原生实践助力企业高效创新

简介: 通过云原生技术,真正为企业带来更多的业务价值,助力企业整体的业务创新。 作者:李小平 前天我参加了信通院的云原生产业大会,在会场上非常感慨,参加会议的企业非常多,并且来自于各行各业…

cv曲线面积的意义_几何直觉的魅力:sinx曲线下的面积原理是如此的美妙

用“曲线下的面积”来描述积分,就像用一串单词来描述一本书。正弦函数的积分是其曲线下的面积。几何直觉就是:“正弦的积分是沿圆周路径的水平距离。”这句话第一次听说感觉比较抽象,当你理解了就会觉得它非常的美妙一般的思维模式求正弦函数的积分就是&…

OpenInfra 十一年:OpenStack 部署规模超 2500 万计算核心

后疫情时代下,产生海量在线需求,越来越多金融、政府、教育、通信和医疗保健等上云业务需依赖现代云基础设施来正常运行。其中开源提供了一种更具成本效益的开发方式,据最新《2021 年度 Octoverse 报告》显示,2021 年 GitHub 开发者…

集群镜像:实现高效的分布式应用交付

简介: Docker 解决了单个容器的镜像化问题,而 sealer 通过把整个集群打包,实现了分布式软件的 Build Share Run。 作者 | fanux.中弈 什么是集群镜像 顾名思义,和操作系统 .iso 镜像或 Docker 镜像类似,集群镜像是用一…

比开源快30倍的自研SQL Parser设计与实践

简介: SQL作为一种领域语言,最早用于关系型数据库,方便管理结构化数据;SQL由多种不同的类型的语言组成,包括数据定义语言,数据控制语言、数据操作语言;各数据库产品都有不同的声明和实现&#x…

SLS控制台内嵌操作指南

简介: SLS控制台内嵌操作指南 一、机制 详见:https://help.aliyun.com/document_detail/74971.html 二、操作 2.1 子账号操作(主账号身份操作) 登陆ram控制台,创建子账号。给子账号授予AliyunSTSAssumeRoleAccess权…

装linux服务器进去配置界面,在CentOS 8 Linux上安装和配置SuiteCRM的步骤

本文介绍在CentOS 8 Linux服务器上安装和配置SuiteCRM的详细步骤:更新系统、安装PHP、安装MariaDB和Nginx Web服务器、安装SuiteCRM、配置SuiteCRM Web访问界面。SuiteCRM是由SalesAgility团队开发和维护的开源企业级CRM应用程序,该产品最初是SugarCRM社…

Dev Lake 0.4.0 版本:开源、开放的研发效能数据平台

建设研发工具链后,效能提升如何更进一步? 工程师们反馈流程体验确实有所提升,和业务同事的沟通似乎也愉快了一些——但研发团队依然需要量化数据作为抓手,一方面佐证先前实践优化的有效性,另一方面为持续的效能提升寻找…