顺丰科技 Hudi on Flink 实时数仓实践

简介: 介绍了顺丰科技数仓的架构,趟过的一些问题、使用 Hudi 来优化整个 job 状态的实践细节,以及未来的一些规划。

本文作者为刘杰,介绍了顺丰科技数仓的架构,趟过的一些问题、使用 Hudi 来优化整个 job 状态的实践细节,以及未来的一些规划。主要内容为:

  1. 数仓架构
  2. Hudi 代码躺过的坑
  3. 状态优化
  4. 未来规划

顺丰科技早在 2019 年引入 Hudi ,当时是基于 Spark 批处理,2020 年对数据的实时性要求更高公司对架构进行了升级,在社区 Hudi on Flink 的半成品上持续优化实现 Binlog 数据 CDC 入湖。在 Hudi 社区飞速发展的同时公司今年对数仓也提出了新的要求,最终采用 Flink + Hudi 的方式来宽表的实时化。过程中遇到了很多问题主要有两点:

  1. Hudi Master 代码当时存在一些漏洞;
  2. 宽表涉及到多个 Join,Top One 等操作使得状态很大。

庆幸的是社区的修复速度很给力加上 Hudi 强大 upsert 能力使这两个问题得到以有效的解决。

一、数仓架构

感兴趣的同学可以参考之前顺丰分享的 Hudi on Flink 在顺丰的实践应用。

二、Hudi 代码趟过的坑

在去年我们是基于 Hudi 0.6 左右进行的 Hudi on Flink 的实践,代码较老。为了拥抱社区我们使用最新 master 代码进行实践,在大数据量写入场景中,发现了一个比较隐秘的丢数问题,这个问题花了将近两周的时间才定位到。

1. Hudi StreamWriteFunction 算子核心流程梳理

StreamWriteFunction

StreamWriteFunction 算子收数据的时候会先把数据按照 fileld 分组缓存好,数据的持续流会使得缓存数据越来越大,当达到一定阈值时便会执行 flush。阈值由 2 个核心参数控制: write.batch.size 默认 64M ,write.task.max.size 默认 1G 。当单个分组数据达到 64M 或者总缓存数据达到 800M ~ 1G 就会触发 flush 。

flush 会调用 client 的 api 去创建一个 WriteHandle,然后把 WriteHandle 放入 Map 进行缓存,一个 handle 可以理解为对应一个文件的 cow。

如果一个 fileld 在同一 checkpoint 期间被多次写入,则后一次是基于前一次的 cow, 它的 handle 是一个 FlinkMergeAndReplaceHandle,判断一个 fileld 是否之前被写入过就是根据上面 Map 缓存得来的。

StreamWriteFunction 执行 snapshotState 时会把内存的所有分组数据一次进行 flush, 之后对 client 的 handle 进行清空。

2. 场景还原

Hudi 本身是具备 upsert 能力的,所以我们开始认为 Hudi Sink 在 At Least Once 模式下是没问题的,并且 At Least Once 模式下 Flink 算子不需要等待 Barrier 对齐,能够处理先到的数据使得处理速度更快,于是我们在 Copy On Write 场景中对 Flink CheckpointingMode 设置了 AT_LEAST_ONCE。

writeFunction 的上游是文件 BucketAssignFunction fileld 分配算子,假如有一批 insert 数据 A、B、C、D 属于同一个分区并且分配到同一个 BucketAssignFunction 的 subtask ,但是 A、B 和 C、D 是相邻两个不同的 checkpoint。

当 A 进入 BucketAssignFunction 时如果发现没有新的小文件可以使用,就会创建一个新的 fileld f0,当 B 流入时也会给他分配到 f0 上。同时因为是 AT_LEAST_ONCE 模式,C、D 数据都有可能被处理到也被分配到了 f0 上。也就是说 在 AT_LEAST_ONCE 模式下由于 C、D 数据被提前处理,导致 A、B、C、D 4 条属于两个 checkpoint 的 insert 数据被分配到了同一个 fileld。

writeFunction 有可能当接收到 A、B、C 后这个算子的 barrier 就对齐了,会把 A、B、C 进行 flush,而 D 将被遗留到下一个 checkpoint 才处理。A、B、C 是 insert 数据所以就会直接创建一个文件写入,D 属于下一个 checkpoint ,A、B、C 写入时创建的 handle 已被清理了,等到下一个 checkpoint 执行 flush。因为 D 也是 insert 数据所以也会直接创建一个文件写数据,但是 A、B、C、D 的 fileld 是一样的,导致最终 D 创建的文件覆盖了 A、B、C 写入的文件最终导致 A、B、C 数据丢失。

3. 问题定位

这个问题之所以难定位是因为具有一定随机性,每次丢失的数据都不太一样,而且小数据量不易出现。最终通过开启 Flink 的 Queryable State 进行查询, 查找丢失数据的定位到 fileld, 发现 ABCD state 的 instant 都是 I,然后解析对应 fileld 的所有版本进行跟踪还原。

三、状态优化

我们对线上最大的离线宽边进行了实时化的,宽表字段较多,涉及到多个表对主表的 left join 还包括一些 Top One 的计算,这些算子都会占用 state. 而我们的数据周期较长需要保存 180 天数据。估算下来状态大小将会达到上百 T,这无疑会对状态的持久化带来很大的压力。但是这些操作放入 Hudi 来做就显得轻而易举。

1. Top One 下沉 Hudi

在 Hudi 中有一个 write.precombine.field 配置项用来指定使用某个字段对 flush 的数据去重,当出现多条数据需要去重时就会按照整个字段进行比较,保留最大的那条记录,这其实和 Top One 很像。

我们在 SQL 上将 Top One 的排序逻辑组合成了一个字段设置为 Hudi 的 write.precombine.field,同时把这个字段写入 state,同一 key 的数据多次进来时都会和 state 的 write.precombine.field 进行比较更新。

Flink Top One 的 state 默认是保存整记录的所有字段,但是我们只保存了一个字段,大大节省了 state 的大小。

2. 多表 Left Join 下沉 Hudi

2.1 Flink SQL join

我们把这个场景简化成如下一个案例,假如有宽表 t_p 由三张表组成

insert into t_p 
select t0.id,t0.name,t1.age,           t2.sex 
from t0 left join t1 on t0.id = t1.id left join t2 on t0.id = t2.id

在 Flink SQL join 算子内部会维护一个左表和右表的 state,这都是每个 table 的全字段,且多一次 join 就会多出一个 state. 最终导致 state 大小膨胀,如果 join 算子上游是一个 append 流,state 大小膨胀的效果更明显。

2.2 把 Join 改写成 Union All

对于上面案例每次 left join 只是补充了几个字段,我们想到用 union all 的方式进行 SQL 改写,union all 需要补齐所有字段,缺的字段用 null 补。我们认为 null 补充的字段不是有效字段。 改成从 union all 之后要求 Hudi 具备局部更新的能力才能达到 join 的效果。

  • 当收到的数据是来自 t0 的时候就只更新 id 和 name 字段;
  • 同理 ,数据是来自 t1 的时候就只更新 age 字段;
  • t2 只更新 sex 字段。

不幸的是 Hudi 的默认实现是全字段覆盖,也就是说当收到 t0 的数据时会把 age sex 覆盖成 null, 收到 t1 数据时会把 name sex 覆盖成 null。这显然是不可接受的。这就要求我们对 Hudi sink 进行改造。

2.3 Hudi Union All 实现

Hudi 在 cow 模式每条记录的更新写入都是对旧数据进行 copy 覆盖写入,似乎只要知道这条记录来自哪个表,哪几个字段是有效的字段就选择性的对 copy 出来的字段进行覆盖即可。但是在分区变更的场景中就不是那么好使了。在分区变更的场景中,数据从一个分区变到另一个分区的逻辑是把旧分区数据删掉,往新分区新增数据。这可能会把一些之前局部更新的字段信息丢失掉。细聊下来 Hudi on Flink 涉及到由几个核心算子组成 pipeline。

  • RowDataToHoodieFunction:这是对收入的数据进行转化成一个 HudiRecord,收到数据是包含全字段的,我们在转化 HudiRecord 的时候只选择了有效字段进行转化。
  • BoostrapFunction:在任务恢复的时候会读取文件加载索引数据,当任务恢复后次算子不做数据转化处理。
  • BucketAssignFunction:这个算子用来对记录分配 location,loaction 包含两部分信息。一是分区目录,另一个是 fileld。fileld 用来标识记录将写入哪个文件,一旦记录被确定写入哪个文件,就会发记录按照 fileld 分组发送到 StreamWriteFunction,StreamWriteFunction 再按文件进行批量写入。

原生的 BucketAssignFunction 的算子逻辑如下图,当收到一条记录时会先从 state 里面进行查找是否之前有写过这条记录,如果有就会找对应的 location。如果分区没有发生变更,就把当前这条记录也分配给这个location,如果在 state 中没有找到 location 就会新创建一个 location,把这个新的location 分配给当前记录,并更新到 state。

总之这个 state 存储的 location 就是告诉当前记录应该从哪个文件进行更新或者写入。遇到分区变更的场景会复杂一点。假如一条记录从 2020 分区变更成了 2021,就会创建一条删除的记录,它的 loaction 是 state 中的 location。这条记录让下游进行实际的删除操作,然后再创建一个新的 location (分区是 2021) 发送到下游进行 insert。

image-20210823162638603

为了在 Hudi 中实现 top one,我们对 state 信息进行了扩展,用来做 Top One 时间字段。

对于 StreamWriteFunction 在 Insert 场景中,假如收到了如下 3 条数据 {id:1,name:zs},{id:1,age:20},{id:1,sex:man},在执行 flush 时会创建一个全字段的空记录 {id:null,name:null,age:null,sex:null},然后依次和 3 条记录进行合并。注意,这个合并过程只会选择有效字段的合并。如下图:

在 Update 场景中的更新逻辑类似 insert 场景,假如老数据是 {id:1,name:zs,age:20,sex:man} ,新收到了{id:1,name:ls},{id:1,age:30} 这 2 条数据,就会先从文件中把老的数据读出来,然后依次和新收到的数据进行合并,合并步骤同 insert。如下图:

image-20210823170931150

这样通过 union all 的方式达到了 left join 的效果,大大节省了 state 的大小。

四、未来规划

parquet 元数据信息收集,parquet 文件可以从 footer 里面得到每个行列的最大最小等信息,我们计划在写入文件的后把这些信息收集起来,并且基于上一次的 commit 的元数据信息进行合并,生成一个包含所有文件的元数据文件,这样可以在读取数据时进行谓词下推进行文件的过滤。

公司致力于打造基于 Hudi 作为底层存储,Flink 作为流批一体化的 SQL 计算引擎,Flink 的批处理 Hudi 这块还涉足不深,未来可能会计划用 Flink 对 Hudi 实现 clustering 等功能,在 Flink 引擎上完善 Hudi 的批处理功能。

原文链接
本文为阿里云原创内容,未经允许不得转载。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/512260.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Paillier半同态加密:原理、高效实现方法和应用

简介: 《数据安全法》已于9月1日起正式实施,两个月后《个人信息保护法》也将开始施行,意味着数据安全和隐私保护方面的监管将会在年内陆续到位。在合规收紧大背景下,“数据孤岛”现象日渐明显。如何实现安全的数据流通&#xff0c…

django给mysql配主从_django中的mysql主从读写分离:一、配置mysql主从分离

一、配置mysql主从同步的步骤:(1) 在主服务器上,必须开启二进制日志机制和配置一个独立的ID(2) 在每一个从服务器上,配置一个唯一的ID,创建一个用来专门复制主服务器数据的账号(3) 在开始复制进程前,在主服务器上记录二…

从 40% 跌至 4%,“糊”了的 Firefox 还能重回巅峰吗?

作者 | 丁广辉 责编 | 张红月出品 | CSDN(ID:CSDNnews)Mozilla Firefox,通常简称为Firefox,中文名叫做火狐,是由Mozilla基金会及其子公司Mozilla公司开发的一款自由、开源的网页浏览器。2004年&#x…

如何发现 Kubernetes 中服务和工作负载的异常

简介: 本次分享为Kubernetes 监控公开课的第二节内容:如何发现 Kubernetes 中服务和工作负载的异常。 分享由三个部分组成: 一、Kubernetes 异常定位存在痛点; 二、针对这些痛点,Kubernetes 监控如何更快、更准、更全的…

内含干货PPT下载|一站式数据管理DMS及最新解决方案发布

简介: 今天主要给大家介绍一站式数据管理平台DMS以及解决方案的发布。议题包含企业数据管理当前的一些痛,DMS一站式数据管理平台以及其核心技术,实时数仓解决方案以及相应的应用实践。 “数聚云端智驭未来”——阿里云数据库创新上云峰会暨第…

java 数组数据类型_java基本数据类型和数组

第一类:逻辑型boolean第二类:文本型char第三类:整数型(byte、short、int、long)char类型占2个字节short从-32768到32767int从-2147483648,到2147483647共10位long从-9223372036854775808到9223372036854775807共19位第四类:浮点型(float、double)在数学中0到1有无数个浮点数&am…

kube-scheduler 磁盘调度源码分析

作者 | leadersnowy来源 | CSDN博客kube-scheduler介绍首先我们知道,kube-scheduler的根本工作任务是根据各种调度算法将Pod调度到最合适的工作节点上一、整个调度流程分为两个阶段:1、预选(Predicates):输入是所有节点…

开放搜索查询分析服务架构解读

简介: 搜索行为在后端都会有大量的数据计算和处理才会召回符合用户需求的搜索结果,本次分享结合自建搜索业务中查询分析服务常见的问题及难点,介绍阿里云开放搜索查询分析具备的能力及解决方案,并深度解读阿里巴巴查询分析服务架构…

多任务多目标CTR预估技术

简介: 多目标(Multi Objective Learning)是MTL中的一种。在业务场景中,经常面临既要又要的多目标问题。而多个目标常常会有冲突。如何使多个目标同时得到提升,是多任务多目标在真实业务场景中存在的意义。 作者 | 志阳…

Veeam 发布 2022 年数据保护趋势报告,开发者需关注哪些点?

如今数据作为重要的生产要素,成为数字经济高速发展的关键驱动力之一。越来越多开发者和企业认识到数据保护的重要性,关注数据保护发展趋势,以通过相关的技术解决方案来制定应对策略。 为帮助企业捋请思路,加快数字化转型步伐&…

blazeds调用java_Flex使用Blazeds与Java交互及自定义对象转换详解(转)

一、建立Flex与Java交互的工程。本文中讲到的交互是利用Blazeds的,因为这个是免费的,呵呵,我是穷人。首先就是去下载Blazeds的压缩包,这个可以从官网或者CSDN、JavaEye上下到。解压缩这个包,将里面的Blazeds.war解压&a…

从行业应用到智慧城市,升哲科技Alpha协议如何保障物理世界的数据传输

随着国家《“十四五”信息通信行业发展规划》和《物联网新型基础设施建设三年行动计划(2021-2023年)》的政策出台,物联网的产业发展迎来了新一波浪潮。在农业、制造业、生态环境、智慧消防等场景下,以数字化转型、智能化升级为动力…

Serverless 工程实践 | 零基础上手 Knative 应用

简介: Knative 是一款基于 Kubernetes 的 Serverless 框架。其目标是制定云原生、跨平台的 Serverless 编排标准。 Knative 介绍 Knative 通过整合容器构建(或者函数)、工作负载管理(动态扩缩)以及事件模型这三者实现…

DataWorks功能实践速览 05——循环与遍历

简介: DataWorks功能实践系列,帮助您解析业务实现过程中的痛点,提高业务功能使用效率!通过往期的介绍,您已经了解到在DataWorks上进行任务运行的最关键的几个知识点,其中上期参数透传中为您介绍了可以将上游…

阿里安全开源顶尖技术“猎豹” 计算更快数据更安全

两家公司想开展合作,发挥各自优势联合开发一款产品,如何以“隐私计算”的形式,在保护隐私的情况下,高效地实现两方联合计算,便成为解决这一问题的关键。 最近,阿里安全最新研发的Cheetah(猎豹&…

PaddlePaddle:在 Serverless 架构上十几行代码实现 OCR 能力

简介: 飞桨深度学习框架采用基于编程逻辑的组网范式,对于普通开发者而言更容易上手,同时支持声明式和命令式编程,兼具开发的灵活性和高性能。 飞桨 (PaddlePaddle) 以百度多年的深度学习技术研究和业务应用为基础,是中…

云原生体系下 Serverless 弹性探索与实践

简介: SAE 通过对弹性组件和应用全生命周期的不断优化以达到秒级弹性,并在弹性能力,场景丰富度,稳定性上具备核心竞争力,是传统应用 0 改造上 Serverless 的最佳选择。 作者:竞霄 Serverless 时代的来临 …

java jndi使用_Java项目中使用JNDI连接数据库

因为写的大作业经常用到数据库连接 所以自己写了个数据库连接的类 package DB_Link_info;/* * 数据库链接信息 */public class DB_link_Info {public static final String driverName "com.microsoft.sqlserver.jdbc.SQLServerDriver";public static开发环境为Java,…

Joint Consensus两阶段成员变更的单步实现

简介: Raft提出的两阶段成员变更Joint Consensus是业界主流的成员变更方法,极大的推动了成员变更的工程应用。但Joint Consensus成员变更采用两阶段,一次变更需要提议两条日志, 在一些系统中直接使用时有些不便。那么Joint Consen…

真香!8 行代码搞定最大子数组和问题

作者 | 码农的荒岛求生来源 | 码农的荒岛求生今天给大家带来一道极其经典的题目,叫做最大和子数组,给定一个数组,找到其中的一个连续子数组,其和最大。示例:输入: nums [-2,1,-3,4,-1,2,1,-5,4] 输出: 6 解释: 子数组…