Iceberg 在基于 Flink 的流式数据入库场景中的应用

本文以流式数据入库的场景为基础,介绍引入 Iceberg 作为落地格式和嵌入 Flink sink 的收益,并分析了当前可实现的框架及要点。

应用场景

流式数据入库,是大数据和数据湖的典型应用场景。上游的流式数据,如日志,或增量修改,通过数据总线,经过必要的处理后,汇聚并存储于数据湖,供下游的应用(如报表或者商业智能分析)使用。

640 1.jpg

上述的应用场景通常有如下的痛点,需要整个流程不断的优化:

  • 支持流式数据写入,并保证端到端的不重不丢(即 exactly-once);
  • 尽量减少中间环节,能支持更实时(甚至是 T+0)的读取或导出,给下游提供更实时更准确的基础数据;
  • 支持 ACID,避免脏读等错误发生;
  • 支持修改已落地的数据,虽然大数据和数据湖长于处理静态的或者缓慢变化的数据,即读多写少的场景,但方便的修改功能可以提升用户体验,避免用户因为极少的修改,手动更换整个数据文件,甚至是重新导出;
  • 支持修改表结构,如增加或者变更列;而且变更不要引起数据的重新组织。

引入 Iceberg 作为 Flink sink

为了解决上述痛点,我们引入了 Iceberg 作为数据落地的格式。Iceberg 支持 ACID 事务、修改和删除、独立于计算引擎、支持表结构和分区方式动态变更等特性,很好的满足我们的需求。

同时,为了支持流式数据的写入,我们引入 Flink 作为流式处理框架,并将 Iceberg 作为 Flink sink。

下文主要介绍 Flink Iceberg sink 的实现框架和要点。但在这之前,需要先介绍一些实现中用到的 Flink 基本概念。

Flink 基本概念

从 Flink 的角度如何理解"流"和"批"

640 2.png

Flink 使用 DataFrame API 来统一的处理流和批数据。

Stream, Transformation 和 Operator

一个 Flink 程序由 stream 和 transformation 组成:

  • Stream: Transformation 之间的中间结果数据;
  • Transformation:对(一个或多个)输入 stream 进行操作,输出(一个或多个)结果 stream。

当 Flink 程序执行时,其被映射成 Streaming Dataflow,由如下的部分组成:

  • Source (operator):接收外部输入给 Flink;
  • Transformation (operator):中间对 stream 做的任何操作;
  • Sink (operator):Flink 输出给外部。

下图为 Flink 官网的示例,展示了一个以 Kafka 作为输入 Source,经过中间两个 transformation,最终通过 sink 输出到 Flink 之外的过程。

640 3.jpg

State, Checkpoint and Snapshot

Flink 依靠 checkpoint 和基于 snapshot 的恢复机制,保证程序 state 的一致性,实现容错。

Checkpoint 是对分布式的数据流,以及所有 operator 的 state,打 snapshot 的过程。

■ State

一个 operator 的 state,即它包含的所有用于恢复当前状态的信息,可分为两类:

  • 系统 state:如 operator 中对数据的缓存。
  • 用户自定义 state:和用户逻辑相关,可以利用 Flink 提供的 managed state,如 ValueState、ListState,来存储。

State 的存储位置,可以分为:

  • Local:内存,或者本地磁盘
  • State backend:远端的持久化存储,如 HDFS。

如下图所示:

640 4.jpg

■ Checkpoint

Flink 做 checkpoint 的过程如下:

  1. Checkpoint coordinator 首先发送 barrier 给 source。
  2. Source 做 snapshot,完成后向 coordinator 确认。
  3. Source 向下游发送 barrier。
  4. 下游 operator 收到所有上游的 barrier 后,做 snapshot,完成后向 coordinator 确认。
  5. 继续往下游发送 barrier,直到 sink。
  6. Sink 通知 coordinator 自己完成 checkpoint。
  7. Coordinator 确认本周期 snapshot 做完。

如下图所示:

640 5.jpg

■ Barrier

Barrier 是 Flink 做分布式 snapshot 的重要概念。它作为一个系统标记,被插入到数据流中,随真实数据一起,按照数据流的方向,从上游向下游传递。

由于每个 barrier 唯一对应 checkpoint id,所以数据流中的 record 实际被 barrier 分组,如下图所示,barrier n 和 barrier n-1 之间的 record,属于 checkpoint n。

640 6.jpg

Barrier 的作用是在分布式的数据流中,将 operator 的多个输入流按照 checkpoint对齐(align),如下图所示:

640 7.jpg

Flink Iceberg sink

了解了上述 Flink 的基本概念,这些概念又是如何被应用和映射到 Flink Iceberg sink 当中的呢?

总体框架

640 8.jpg

如图,Flink Iceberg sink 有两个主要模块和两个辅助模块组成:

640 9.png

实现要点

■ Writer

  1. 在当前的实现中,Java 的 Map 作为每条记录,输入给 writer。内部逻辑先将其转化为作为中间格式的 Avro IndexedRecord,而后通过 Iceberg 里的 Parquet 相关 API,累积的写入 DataFile。
  2. 使用 Avro 作为中间格式是一个临时方案,为简化适配,并最大限度的利用现有逻辑。但长期来看,使用中间格式会影响处理效率,社区也在试图通过 ISSUE-870 来去掉 Avro,进而使用 Iceberg 内建的数据类型作为输入,同时也需要加入一个到 Flink 内建数据类型的转换器。
  3. 在做 checkpoint 的过程中,发送 writer 自己的 barrier 到下游的 committer 之前,关闭单个 Parquet 文件,构建 DataFile,并发送 DataFile 的信息给下游。

■ Committer

  1. 全局唯一的 Committer 在收到上游所有 writer 的 barrier 以后,将收到的 DataFile 的信息填入 manifest file,并使用 ListState 把 manifest file 作为用户自定义的 state,保存于 snapshot 中。
  2. 当 checkpoint 完成以后,通过 merge append 将 manifest file 提交给 Iceberg。Iceberg 内部通过后续的一系列操作完成 commit。最终让新加入的数据对其他的读任务可见。

试用 Flink Iceberg sink

社区上 https://github.com/apache/incubator-iceberg/pull/856 提供了可以试用的原型代码。下载该 patch 放入 master 分支,编译并构建即可。如下的程序展示了如何将该 sink 嵌入到 Flink 数据流中:

// Configurate catalog
org.apache.hadoop.conf.Configuration hadoopConf =new org.apache.hadoop.conf.Configuration();
hadoopConf.set(org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS.varname,META_STORE_URIS);
hadoopConf.set(org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE.varname,META_STORE_WAREHOUSE);Catalog icebergCatalog = new HiveCatalog(hadoopConf);// Create Iceberg table
Schema schema = new Schema(...
);
PartitionSpec partitionSpec = builderFor(schema)...
TableIdentifier tableIdentifier =TableIdentifier.of(DATABASE_NAME, TABLE_NAME);
// If needed, check the existence of table by loadTable() and drop it
// before creating it
icebergCatalog.createTable(tableIdentifier, schema, partitionSpec);// Obtain an execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// Enable checkpointing
env.enableCheckpointing(...);// Add Source
DataStream<Map<String, Object>> dataStream =env.addSource(source, typeInformation);// Configure Ieberg sink
Configuration conf = new Configuration();
conf.setString(org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE.varname,META_STORE_URIS);
conf.setString(IcebergConnectorConstant.DATABASE, DATABASE_NAME);
conf.setString(IcebergConnectorConstant.TABLE, TABLE_NAME);// Append Iceberg sink to data stream
IcebergSinkAppender<Map<String, Object>> appender =new IcebergSinkAppender<Map<String, Object>>(conf, "test").withSerializer(MapAvroSerializer.getInstance()).withWriterParallelism(1);
appender.append(dataStream);// Trigger the execution
env.execute("Sink Test");

后续规划

Flink Iceberg sink 有很多需要完善的地方,例如:上文中提到的去掉 Avro 作为中间格式;以及在各种失败的情况下是否仍能保证端到端的 exactly-once;按固定时长做 checkpoint,在高低峰时生成不同大小的 DataFile,是否对后续读不友好等。这些问题都在我们的后续规划中,也会全数贡献给社区。

参考资料:

[1] Iceberg 官网:
https://iceberg.apache.org/
[2] Flink 1.10文 档:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/
[3] Neflix 提供的 Flink Iceberg connector 原型:
https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg
[4] Flink Iceberg sink 设计文档:
https://docs.google.com/document/d/19M-sP6FlTVm7BV7MM4Om1n_MVo1xCy7GyDl_9ZAjVNQ/edit?usp=sharing
[5] Flink 容错机制(checkpoint) :
https://www.cnblogs.com/starzy/p/11439988.html

 

 

# 社区活动推荐 #

普惠全球开发者,这一次,格外与众不同!首个 Apache 顶级项目在线会议 Flink Forward 全球直播中文精华版来啦,聚焦 Alibaba、Google、AWS、Uber、Netflix、新浪微博等海内外一线厂商,经典 Flink 应用场景,最新功能、未来规划一览无余。点击下方链接可了解更多大会详情:https://developer.aliyun.com/live/2594?spm=a2c6h.14242504.J_6074706160.2.3fca361f4cYyQx

原文链接
本文为云栖社区原创内容,未经允许不得转载。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/516229.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

uniapp H5页面打包发布

文章目录1. 打开 HBuilder2. 打开项目3. 点击发行4. 等待项目编译5. 查看路径6. 安装7. 启动1. 打开 HBuilder 2. 打开项目 打开当前要打包的项目 3. 点击发行 &#xff0c;找到【网站 - PC web 或手机 H5 】 确认【网站标题】&#xff0c;无需【网站域名】&#xff0c;直…

Python 让我再次在女同学面前长脸了!(真实案例)

事情是经过这样的&#xff1a;晚上在家王者的时候&#xff0c;微信突然弹出了一条好友添加提醒&#xff0c;一看昵称&#xff0c;居然是我们大学的班花&#xff01;&#xff01;&#xff01;这真是奇怪了&#xff0c;我之前连班花的微信都没加上&#xff0c;这次却突然主动加我…

【CDN】最近,你的APP崩了吗?

过去几个月里&#xff0c;#xxx崩了#这个话题频繁出现在热搜榜上&#xff0c;让不少程序员小哥哥瑟瑟发抖。 从疫情宅家时期著名的视频APP“三连崩”&#xff0c;到全面复工开课后的在线教育平台与办公软件频繁宕机&#xff0c;再到报复性消费引发的点餐系统接连“爆炸”&#x…

1024程序员节重要议程曝光,开源技术英雄会聊开源“真心话”

10 月 23-25 日&#xff0c;由 CSDN 等多家单位精心筹划的“长沙 中国 1024 程序员节”将盛大举行。程序员节活动以开源为主议题&#xff0c;包括&#xff1a;2场岳麓尖峰对话&#xff1b;2020 开源技术英雄大会&#xff1b;10 场热门技术分论坛/峰会&#xff1b;创意集市&…

绿网天下:上云解决系统安全和安全合规

公司简介 绿网天下&#xff08;福建&#xff09;网络科技股份有限公司&#xff08;以下简称绿网天下&#xff09;成立于2006年&#xff0c;总部设于中国厦门软件园生产基地。绿网天下是基于网络安全为基础的 K12 在线教育服务提供商&#xff0c;累计活跃用户数超千万。公司专注…

一键快速生成 Vue 的 HTML页面结构代码

目录 1. 创建配置文件2. 使用3. 配置说明4. 注意VS Code除了使用 !+Tab 在html文件中快速创建html结构代码之外,还可以自己定义代码段,这里分享如下在 Vue 环境下快速通过成 html 结构。 1. 创建配置文件 第一步 文件 ==> 首选项 ==>用户代码片段 第二步 搜索框中输…

蓝色巨人IBM全力奔赴的混合云之旅能顺利吗?

整理 | 郑丽媛头图 | CSDN下载自东方IC10 月 8 日&#xff0c;IBM 官方宣布&#xff0c;为了加速启动混合云发展战略&#xff0c;关注混合云的增长&#xff0c;目前正在计划把IT基础设施服务部门作为一家独立的上市公司分出来&#xff0c;形成两家上市公司。新公司暂定名为 New…

冠赢网络:游戏盾彻底解决DDoS/CC攻击

公司简介 厦门冠赢网络科技有限公司是一家高速发展的新型网络科技公司。2019年3月&#xff0c;冠赢网络荣任厦门市动漫游戏产业协会副会长单位&#xff1b;2019年5月&#xff0c;冠赢网络荣获“VR百强企业”称号。公司集手游、网游、VR游戏、直播平台的开发、推广、销售及运营…

腾讯位置服务地图选点这个怎么在小程序里面更新呀?

修改version https://developers.weixin.qq.com/miniprogram/dev/framework/plugin/using.html

完美日记:保障电商大促活动平稳运行

公司简介 PerfectDiary完美日记是广州逸仙电子商务有限公司旗下美妆品牌&#xff0c;创立于2016年。2016年&#xff0c;来自哈佛大学的品牌创始人和英国时尚设计师在伦敦相遇&#xff0c;希望有机会把欧美彩妆风尚带回亚洲&#xff0c;在视觉形象上有所突破。完美日记从T台获取…

立根融资租赁:内部系统平台上云

公司介绍 立根融资租赁&#xff08;上海&#xff09;有限公司成立于2015年8月&#xff0c;注册资本11.75亿元&#xff0c;是广州金融控股集团成员企业&#xff0c;实际控制人为广州市国资委。公司深耕公用建设、旅游、医疗、教育、新能源和消费金融等多个专业领域的融资租赁业…

全球权威MLPerf基准测试再发榜,浪潮AI服务器创18项AI性能纪录

美国东部时间10月21日&#xff0c;全球备受瞩目的权威AI基准测试MLPerf公布今年的推理测试榜单&#xff0c;浪潮AI服务器NF5488A5一举创造18项性能纪录&#xff0c;在数据中心AI推理性能上遥遥领先其他厂商产品。 MLPerf是当前全球最具影响力的AI计算基准评测组织&#xff0c;…

上学帮:阿里云助力教育资讯平台防爬虫

公司简介 广州市藏星网络科技有限公司的主要产品是移动互联网应用“上学帮”&#xff0c;产品在各大应用市场以及微信公众号中均有上架。“上学帮”是国内领先的本地教育信息服务及交易平台&#xff0c;公司业务全面覆盖0~18岁中国孩子的升学择校、校外培训机构、家庭教育等领…

Flink 消息聚合处理方案

微博机器学习平台使用 Flink 实时处理用户行为日志和生成标签&#xff0c;并且在生成标签后写入存储系统。为了降低存储系统的 IO 负载&#xff0c;有批量写入的需求&#xff0c;同时对数据延迟也需要进行一定的控制&#xff0c;因此需要一种有效的消息聚合处理方案。 在本篇文…

微信“支付”页全国多地上线“出行服务”,已覆盖108城

近期&#xff0c;微信“支付”页面新增“出行服务”入口&#xff0c;作为一站式的出行服务平台&#xff0c;“出行服务”提供覆盖车主服务、公交出行、打车租车等多场景出行服务&#xff0c;目前已经在北京、广东、重庆、黑龙江、山西、福建、湖北、陕西等全国108个地级市上线。…

Flink 新场景:OLAP 引擎性能优化及应用案例

摘要&#xff1a;本文由阿里巴巴技术专家贺小令&#xff08;晓令&#xff09;分享&#xff0c;主要介绍 Apache Flink 新场景 OLAP 引擎&#xff0c;内容分为以下四部分&#xff1a; 背景介绍Flink OLAP 引擎案例介绍未来计划一、背景介绍 1.OLAP 及其分类 OLAP 是一种让用户可…

如何评估一项技术是否值得长期投入

“每个人的时间都是有限的&#xff0c;在有限的时间里选择一项值得投入的技术会变得尤为重要。” 笔者从 2008 年开始工作到现在也有 12 个年头了&#xff0c;一路走来都在和数据打交道&#xff0c;做过很多大数据底层框架内核的开发&#xff08;Hadoop&#xff0c;Pig&#x…

绝了,项目内部源码资源被爆出!网友:请收下我的膝盖!

你好&#xff0c;程序员。多少个清晨&#xff0c;你让阳光肆无忌惮地穿透你精心搭配的格子衬衫&#xff1b;多少个白天&#xff0c;你在疯狂体会需求和 bug &#xff1b;多少个午夜&#xff0c;你任凭无法止步的代码&#xff0c;收割着你的头发在忙碌焦虑中自我否定变成了常态&…

dubbo-go 中如何实现路由策略功能

可在控制面对服务的路由进行精细控制&#xff0c;是一个成熟 RPC 系统必备的能力之一。作为一个逐步走向成熟的 RPC 系统&#xff0c;Apache/dubbo-go&#xff08;以下简称 dubbo-go &#xff09;的最新版本 v1.4 中已经实现了 Condition Router 和 Health Instance First Rout…

独家对话谢宝友:做一款类似于 Linux 的国产操作系统 | 人物志

作者 | 郑丽媛来源 | CSDN&#xff08;ID&#xff1a;CSDNnews&#xff09;从国外操作系统的长期垄断到中国自主研发操作系统数十年的起落浮沉&#xff0c;技术自主创新独立已成为国产基础软件的主要突破口。近几年间&#xff0c;随着物联网时代的到来&#xff0c;以 Linux 为主…