数据湖有新解!Apache Hudi 与 Apache Flink 集成

简介: 纵观大数据领域成熟、活跃、有生命力的框架,无一不是设计优雅,能与其他框架相互融合,彼此借力,各专所长。

作者:王祥虎(Apache Hudi 社区)

Apache Hudi 是由 Uber 开发并开源的数据湖框架,它于 2019 年 1 月进入 Apache 孵化器孵化,次年 5 月份顺利毕业晋升为 Apache 顶级项目。是当前最为热门的数据湖框架之一。

1. 为何要解耦

Hudi 自诞生至今一直使用 Spark 作为其数据处理引擎。如果用户想使用 Hudi 作为其数据湖框架,就必须在其平台技术栈中引入 Spark。放在几年前,使用 Spark 作为大数据处理引擎可以说是很平常甚至是理所当然的事。因为 Spark 既可以进行批处理也可以使用微批模拟流,流批一体,一套引擎解决流、批问题。然而,近年来,随着大数据技术的发展,同为大数据处理引擎的 Flink 逐渐进入人们的视野,并在计算引擎领域获占据了一定的市场,大数据处理引擎不再是一家独大。在大数据技术社区、论坛等领地,Hudi 是否支持使用 Flink 计算引擎的的声音开始逐渐出现,并日渐频繁。所以使 Hudi 支持 Flink 引擎是个有价值的事情,而集成 Flink 引擎的前提是 Hudi 与 Spark 解耦。

同时,纵观大数据领域成熟、活跃、有生命力的框架,无一不是设计优雅,能与其他框架相互融合,彼此借力,各专所长。因此将 Hudi 与 Spark 解耦,将其变成一个引擎无关的数据湖框架,无疑是给 Hudi 与其他组件的融合创造了更多的可能,使得 Hudi 能更好的融入大数据生态圈。

2. 解耦难点

Hudi 内部使用 Spark API 像我们平时开发使用 List 一样稀松平常。自从数据源读取数据,到最终写出数据到表,无处不是使用 Spark RDD 作为主要数据结构,甚至连普通的工具类,都使用 Spark API 实现,可以说 Hudi 就是用 Spark 实现的一个通用数据湖框架,它与 Spark 的绑定可谓是深入骨髓。

此外,此次解耦后集成的首要引擎是 Flink。而 Flink 与 Spark 在核心抽象上差异很大。Spark 认为数据是有界的,其核心抽象是一个有限的数据集合。而 Flink 则认为数据的本质是流,其核心抽象 DataStream 中包含的是各种对数据的操作。同时,Hudi 内部还存在多处同时操作多个 RDD,以及将一个 RDD 的处理结果与另一个 RDD 联合处理的情况,这种抽象上的区别以及实现时对于中间结果的复用,使得 Hudi 在解耦抽象上难以使用统一的 API 同时操作 RDD 和 DataStream。

3. 解耦思路

理论上,Hudi 使用 Spark 作为其计算引擎无非是为了使用 Spark 的分布式计算能力以及 RDD 丰富的算子能力。抛开分布式计算能力外,Hudi 更多是把 RDD 作为一个数据结构抽象,而 RDD 本质上又是一个有界数据集,因此,把 RDD 换成 List,在理论上完全可行(当然,可能会牺牲些性能)。为了尽可能保证 Hudi Spark 版本的性能和稳定性。我们可以保留将有界数据集作为基本操作单位的设定,Hudi 主要操作 API 不变,将 RDD 抽取为一个泛型,Spark 引擎实现仍旧使用 RDD,其他引擎则根据实际情况使用 List 或者其他有界数据集。

解耦原则:

1)统一泛型。Spark API 用到的 JavaRDD,JavaRDD,JavaRDD 统一使用泛型 I,K,O 代替;

2)去 Spark 化。抽象层所有 API 必须与 Spark 无关。涉及到具体操作难以在抽象层实现的,改写为抽象方法,引入 Spark 子类实现。

例如:Hudi 内部多处使用到了 JavaSparkContext#map() 方法,去 Spark 化,则需要将 JavaSparkContext 隐藏,针对该问题我们引入了 HoodieEngineContext#map() 方法,该方法会屏蔽 map 的具体实现细节,从而在抽象成实现去 Spark 化。

3)抽象层尽量减少改动,保证 Hudi 原版功能和性能;

4)使用 HoodieEngineContext 抽象类替换 JavaSparkContext,提供运行环境上下文。

4.Flink 集成设计

Hudi 的写操作在本质上是批处理,DeltaStreamer 的连续模式是通过循环进行批处理实现的。为使用统一 API,Hudi 集成 Flink 时选择攒一批数据后再进行处理,最后统一进行提交(这里 Flink 我们使用 List 来攒批数据)。

攒批操作最容易想到的是通过使用时间窗口来实现,然而,使用窗口,在某个窗口没有数据流入时,将没有输出数据,Sink 端难以判断同一批数据是否已经处理完。因此我们使用 Flink 的检查点机制来攒批,每两个 Barrier 之间的数据为一个批次,当某个子任务中没有数据时,mock 结果数据凑数。这样在 Sink 端,当每个子任务都有结果数据下发时即可认为一批数据已经处理完成,可以执行 commit。

DAG 如下:

1.jpg

  • source 接收 Kafka 数据,转换成 List;
  • InstantGeneratorOperator 生成全局唯一的 instant.当上一个 instant 未完成或者当前批次无数据时,不创建新的 instant;
  • KeyBy partitionPath 根据 partitionPath 分区,避免多个子任务写同一个分区;
  • WriteProcessOperator 执行写操作,当当前分区无数据时,向下游发送空的结果数据凑数;
  • CommitSink 接收上游任务的计算结果,当收到 parallelism 个结果时,认为上游子任务全部执行完成,执行 commit.

注:InstantGeneratorOperator 和 WriteProcessOperator 均为自定义的 Flink 算子,InstantGeneratorOperator 会在其内部阻塞检查上一个 instant 的状态,保证全局只有一个 inflight(或 requested)状态的 instant.WriteProcessOperator 是实际执行写操作的地方,其写操作在 checkpoint 时触发。

5. 实现示例

1) HoodieTable

/*** Abstract implementation of a HoodieTable.** @param <T> Sub type of HoodieRecordPayload* @param <I> Type of inputs* @param <K> Type of keys* @param <O> Type of outputs*/
public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implements Serializable {protected final HoodieWriteConfig config;protected final HoodieTableMetaClient metaClient;protected final HoodieIndex<T, I, K, O> index;public abstract HoodieWriteMetadata<O> upsert(HoodieEngineContext context, String instantTime,I records);public abstract HoodieWriteMetadata<O> insert(HoodieEngineContext context, String instantTime,I records);public abstract HoodieWriteMetadata<O> bulkInsert(HoodieEngineContext context, String instantTime,I records, Option<BulkInsertPartitioner<I>> bulkInsertPartitioner);......
}

HoodieTable 是 Hudi 的核心抽象之一,其中定义了表支持的 insert,upsert,bulkInsert 等操作。以 upsert 为例,输入数据由原先的 JavaRDD inputRdds 换成了 I records, 运行时 JavaSparkContext jsc 换成了 HoodieEngineContext context.

从类注释可以看到 T,I,K,O 分别代表了 Hudi 操作的负载数据类型、输入数据类型、主键类型以及输出数据类型。这些泛型将贯穿整个抽象层。

2) HoodieEngineContext

/*** Base class contains the context information needed by the engine at runtime. It will be extended by different* engine implementation if needed.*/
public abstract class HoodieEngineContext {public abstract <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism);public abstract <I, O> List<O> flatMap(List<I> data, SerializableFunction<I, Stream<O>> func, int parallelism);public abstract <I> void foreach(List<I> data, SerializableConsumer<I> consumer, int parallelism);......
}

HoodieEngineContext 扮演了 JavaSparkContext 的角色,它不仅能提供所有 JavaSparkContext 能提供的信息,还封装了 map,flatMap,foreach 等诸多方法,隐藏了 JavaSparkContext#map(),JavaSparkContext#flatMap(),JavaSparkContext#foreach() 等方法的具体实现。

以 map 方法为例,在 Spark 的实现类 HoodieSparkEngineContext 中,map 方法如下:

@Overridepublic <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism) {return javaSparkContext.parallelize(data, parallelism).map(func::apply).collect();}

在操作 List 的引擎中其实现可以为(不同方法需注意线程安全问题,慎用 parallel()):

@Overridepublic <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism) {return data.stream().parallel().map(func::apply).collect(Collectors.toList());}

注:map 函数中抛出的异常,可以通过包装 SerializableFunction func 解决.

这里简要介绍下 SerializableFunction:

@FunctionalInterface
public interface SerializableFunction<I, O> extends Serializable {O apply(I v1) throws Exception;
}

该方法实际上是 java.util.function.Function 的变种,与java.util.function.Function 不同的是 SerializableFunction 可以序列化,可以抛异常。引入该函数是因为 JavaSparkContext#map() 函数能接收的入参必须可序列,同时在hudi的逻辑中,有多处需要抛异常,而在 Lambda 表达式中进行 try catch 代码会略显臃肿,不太优雅。

6.现状和后续计划

6.1 工作时间轴

2020 年 4 月,T3 出行(杨华@vinoyang,王祥虎@wangxianghu)和阿里巴巴的同学(李少锋@leesf)以及若干其他小伙伴一起设计、敲定了该解耦方案;

2020 年 4 月,T3 出行(王祥虎@wangxianghu)在内部完成了编码实现,并进行了初步验证,得出方案可行的结论;

2020 年 7 月,T3 出行(王祥虎@wangxianghu)将该设计实现和基于新抽象实现的 Spark 版本推向社区(HUDI-1089);

2020 年 9 月 26 日,顺丰科技基于 T3 内部分支修改完善的版本在 Apache Flink Meetup(深圳站)公开 PR, 使其成为业界第一个在线上使用 Flink 将数据写 Hudi 的企业。

2020 年 10 月 2 日,HUDI-1089 合并入 Hudi 主分支,标志着 Hudi-Spark 解耦完成。

6.2 后续计划

1)推进 Hudi 和 Flink 集成

将 Flink 与 Hudi 的集成尽快推向社区,初期该特性可能只支持 Kafka 数据源。

2)性能优化

为保证 Hudi-Spark 版本的稳定性和性能,此次解耦没有太多考虑 Flink 版本可能存在的性能问题。

3)类 flink-connector-hudi 第三方包开发

将 Hudi-Flink 的绑定做成第三方包,用户可以在 Flink 应用中以编码方式读取任意数据源,通过这个第三方包写入 Hudi。

 

原文链接
本文为阿里云原创内容,未经允许不得转载。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/514892.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

显微镜下的大明内容_平凡故事展现炮火下人性光辉,李少红《解放·终局营救》创作全解...

【巨匠】至心至情&#xff0c;匠心独运尝试过大量的题材与类型后&#xff0c;在建国70周年的历史性时刻&#xff0c;李少红老师终于执导了自己的第一部战争电影《解放终局营救》。 有人说&#xff0c;这只是李少红题材创新的一个新方向&#xff1b;有人说&#xff0c;李少红是想…

MQTT在游戏运营发行中的实践

前言 在游戏生态中&#xff0c;主要包含游戏的研发方以及运营发行方。一款游戏的运行&#xff0c;分为研发和运营两个阶段。研发的主体有个人、独立工作室、游戏研发公司等&#xff1b; 游戏的研发主体专注于游戏内容的研发&#xff0c;对游戏的发行及运营往往在人力、财力上…

2021 火爆技术人朋友圈的实时音视频 RTC 你 Pick 了嘛?

5月27日20点&#xff0c;第 13 期「大咖来了」&#xff01; CSDN 副总裁于邦旭、融云 CTO 任杰、即构科技副总裁刘莉&#xff0c;多方视角探讨 RTC 超级风口与机遇&#xff0c;还有众多精美礼品等你拿&#xff01; 立即戳&#xff1a;https://live.csdn.net/room/csdnnews/cn…

SAE 的极致应用部署效率

简介&#xff1a; SAE 在应用创建、部署、重启过程中的效率优化。 作者 | 文俊 阿里巴巴云原生团队 本文整理自《Serverless 技术公开课》 作为 Serverless 平台&#xff0c;SAE 提供了应用全托管的服务&#xff0c;充分利用了云原生的技术红利&#xff0c;以容器作为应用载体…

独家下载!《Java工程师成神之路(基础篇)》

简介&#xff1a; 初学Java的你还在烦恼不知道怎么去学&#xff0c;学习什么内容吗&#xff1f;那么多的技术书籍是否已经让你无从下手&#xff1f;别急&#xff0c;来看这一份完整的Java学习路径。 复制该链接到浏览器完成下载或分享&#xff1a;https://developer.aliyun.com…

Gartner:2021全球安全和风险支出将超1500亿美元

编辑 | 宋 慧 作者 | Gartner投稿 头图 | 付费下载于东方IC 全球信息技术研究和顾问公司Gartner预测&#xff0c;2021年全球信息安全和风险管理技术与服务支出预计将增长12.4%&#xff0c;达到1504亿美元。2020年安全和风险管理支出增长率为6.4%。 Gartner分析师认为&#xf…

四年,如何从前端小白蜕变为前端技术专家?

作者简介&#xff1a;珑晴——淘系技术部前端技术专家&#xff0c;16 年校招实习转正进入的阿里&#xff0c;当时是在聚划算前端团队&#xff0c;随着业务变化一路从聚划算到天猫至今加入淘系技术部&#xff0c;负责日常活动营销的同时&#xff0c;也多次参与大促会场&互动…

Tomcat 一键启停脚本 linux

文章目录一、脚本开发1. 编写脚本2. 修改脚本3. 赋予权限二、脚本执行2.1. 启动tomact2.2. 查看tomact状态2.3. 停止tomact一、脚本开发 1. 编写脚本 vim start-tomcat.sh添加以下内容&#xff1a; #!/bin/bash# description: Tomcat start/stop/status script#Location of …

从 Storm 迁移到 Flink,美团外卖实时数仓建设实践

简介&#xff1a; 本文主要介绍一种通用的实时数仓构建的方法与实践。实时数仓以端到端低延迟、SQL 标准化、快速响应变化、数据统一为目标。 作者&#xff1a;朱良 本文主要介绍一种通用的实时数仓构建的方法与实践。实时数仓以端到端低延迟、SQL 标准化、快速响应变化、数据…

Arm发布移动端v9体系新架构,CPU、GPU、IP全囊括了!

2021年5月25日晚&#xff0c;Arm发布了针对移动端的Armv9体系新架构&#xff0c;除了公布首款全面计算&#xff08;Total Compute&#xff09;解决方案&#xff0c;Arm还发布了首批基于Armv9 架构的Cortex-A CPU&#xff0c;为消费电子视觉体验而设计的Mali-G GPU系列&#xff…

阿里 双11 同款,流量防卫兵 Sentinel go 源码解读

简介&#xff1a; 本文主要分析阿里巴巴集团开源的流量控制中间件 Sentinel&#xff0c;其原生支持了 Java/Go/C 等多种语言&#xff0c;本文仅仅分析其 Go 语言实现。下文如无特殊说明&#xff0c;sentinel 指代 Sentinel-Go。 作者 | 于雨 apache/dubbo-go 项目负责人 本文…

工业发展 安全护航 2021年工业互联网安全发展峰会成功召开

在数字化创新日益深入的背景下&#xff0c;工业互联网已经成为制造企业构建敏捷、弹性的基础架构的重要转型方向。但与此同时&#xff0c;安全风险与威胁向OT环境渗透&#xff0c;产生了额外的复杂性&#xff0c;对于关键业务与数据带来了严重威胁&#xff0c;构建工业互联网安…

基于 Flink + ClickHouse 打造轻量级点击流实时数仓

作者&#xff1a;LittleMagic Flink 和 ClickHouse 分别是实时计算和&#xff08;近实时&#xff09;OLAP 领域的翘楚&#xff0c;也是近些年非常火爆的开源框架&#xff0c;很多大厂都在将两者结合使用来构建各种用途的实时平台&#xff0c;效果很好。关于两者的优点就不再赘…

Spring boot 2.3优雅下线,距离生产还有多远?

简介&#xff1a; 对于任何一个线上应用&#xff0c;如何在服务更新部署过程中保证业务无感知是开发者必须要解决的问题&#xff0c;即从应用停止到重启恢复服务这个阶段不能影响正常的业务请求&#xff0c;这使得无损下线成为应用生命周期中必不可少的一个环节。 前言 在生产…

发布 128 核 Altra Max,自研内核,明年推出 5nm 处理器,“性能怪兽”Ampere 搞大事?

2015 年&#xff0c;在英特尔就职 28 年的总裁 Renee James 辞职&#xff0c;正在大众纷纷猜测她将如何开启下一段旅程时&#xff0c;她有了创业的想法&#xff0c;2017 年带领新团队创立了专注于为云和边缘打造微处理器的 Ampere 公司。 在云原生浪潮下&#xff0c;底层硬件需…

2020亚太内容分发大会 阿里云荣获“边缘计算领航企业”奖

10月21日&#xff0c;第八届亚太内容分发大会在北京隆重召开。凭借在边缘计算领域的先发优势、技术实力与丰富实践&#xff0c;阿里云荣获“边缘计算领航企业”称号。 伴随着中国5G商用进程提速&#xff0c;大带宽、大连接、低时延的应用场景爆发&#xff0c;将催生产业变革&a…

最佳途径 | 容器规模化落地如何四步走?

随着云原生时代的发展&#xff0c;传统 IT 基础设施加速云化&#xff0c;云原生化成为云上的必然趋势。作为云原生代表技术之一&#xff0c;容器技术可帮助企业提升 IT 架构的敏捷性&#xff0c;加速应用创新&#xff0c;帮助企业更加灵活地应对商业发展中的不确定性。疫情期间…

elasticsearch 嵌入式_Elasticsearch 开箱指南

内容概要ES 基础介绍&#xff0c;重点是其中的核心概念。基础 API 实践操作。1. 基础介绍Elasticsearch (ES) 是一个数据库&#xff0c;提供了分布式的、准实时搜索和分析。基于 Apache Lucene&#xff0c;可以操作结构化数据、非结构化数据、数字类型数据、地理空间数据。数据…

最良心的 chrome 插件可以良心到什么程度?

CSDN下起了红包雨399 元智能音箱199 元天猫精灵300元现金红包/会员100元红包/会员更有千万流量曝光100%有奖......作为日常总发现 " 宝藏 " 的你总体验过一些 " 王炸 " 级别的chrome插件让你想 “ 真诚 ” 安利所以&#xff0c;CSDN开启了彩虹屁chrome插件…

一文教会你如何写复杂业务代码

简介&#xff1a; 这两天在看零售通商品域的代码。面对零售通如此复杂的业务场景&#xff0c;如何在架构和代码层面进行应对&#xff0c;是一个新课题。针对该命题&#xff0c;我进行了比较细致的思考和研究。结合实际的业务场景&#xff0c;我沉淀了一套“如何写复杂业务代码”…