37 手游基于 Flink CDC + Hudi 湖仓一体方案实践

简介: 介绍了 37 手游为何选择 Flink 作为计算引擎,并如何基于 Flink CDC + Hudi 构建新的湖仓一体方案。

本文作者是 37 手游大数据开发徐润柏,介绍了 37 手游为何选择 Flink 作为计算引擎,并如何基于 Flink CDC + Hudi 构建新的湖仓一体方案,主要内容包括:

  1. Flink CDC 基本知识介绍
  2. Hudi 基本知识介绍
  3. 37 手游的业务痛点和技术方案选型
  4. 37 手游湖仓一体介绍
  5. Flink CDC + Hudi 实践
  6. 总结

GitHub 地址
GitHub - apache/flink: Apache Flink
欢迎大家给 Flink 点赞送 star~

一、Flink-CDC 2.0

Flink CDC Connectors 是 Apache Flink 的一个 source 端的连接器,目前 2.0 版本支持从 MySQL 以及 Postgres 两种数据源中获取数据,2.1 版本社区确定会支持 Oracle,MongoDB 数据源。

Fink CDC 2.0 的核心 feature,主要表现为实现了以下三个非常重要的功能:

  • 全程无锁,不会对数据库产生需要加锁所带来的风险;
  • 多并行度,全量数据的读取阶段支持水平扩展,使亿级别的大表可以通过加大并行度来加快读取速度;
  • 断点续传,全量阶段支持 checkpoint,即使任务因某种原因退出了,也可通过保存的 checkpoint 对任务进行恢复实现数据的断点续传。
Flink CDC 2.0 详解核心改进

二、Hudi

Apache Hudi 目前被业内描述为围绕数据库内核构建的流式数据湖平台 (Streaming Data Lake Platform)。

由于 Hudi 拥有良好的 Upsert 能力,并且 0.10 Master 对 Flink 版本支持至 1.13.x,因此我们选择通过 Flink + Hudi 的方式为 37 手游的业务场景提供分钟级 Upsert 数据的分析查询能力。

三、37 手游的业务痛点和技术方案选型

img

1. 旧架构与业务痛点

1.1 数据实时性不够

  • 日志类数据通过 sqoop 每 30min 同步前 60min 数据到 Hive;
  • 数据库类数据通过 sqoop 每 60min 同步当天全量数据到 Hive;
  • 数据库类数据通过 sqoop 每天同步前 60 天数据到 Hive。

1.2 业务代码逻辑复杂且难维护

  • 目前 37 手游还有很多的业务开发沿用 MySQL + PHP 的开发模式,代码逻辑复杂且很难维护;
  • 相同的代码逻辑,往往流处理需要开发一份代码,批处理则需要另开发一份代码,不能复用。

1.3 频繁重刷历史数据

  • 频繁地重刷历史数据来保证数据一致。

1.4 Schema 变更频繁

  • 由于业务需求,经常需要添加表字段。

1.5 Hive 版本低

  • 目前 Hive 使用版本为 1.x 版本,并且升级版本比较困难;
  • 不支持 Upsert;
  • 不支持行级别的 delete。

由于 37 手游的业务场景,数据 upsert、delete 是个很常见的需求。所以基于 Hive 数仓的架构对业务需求的满足度不够。

2. 技术选型

在同步工具的选型上考虑过 Canal 和 Maxwell。但 Canal 只适合增量数据的同步并且需要部署,维护起来相对较重。而 Maxwell 虽然比较轻量,但与 Canal 一样需要配合 Kafka 等消息队列使用。对比之下,Flink CDC 可以通过配置 Flink connector 的方式基于 Flink-SQL 进行使用,十分轻巧,并且完美契合基于 Flink-SQL 的流批一体架构。

在存储引擎的选型上,目前最热门的数据湖产品当属:Apache Hudi,Apache Iceberg 和 DeltaLake,这些在我们的场景下各有优劣。最终,基于 Hudi 对上下游生态的开放、对全局索引的支持、对 Flink 1.13 版本的支持,以及对 Hive 版本的兼容性 (Iceberg 不支持 Hive1.x 的版本) 等原因,选择了 Hudi 作为湖仓一体和流批一体的存储引擎。

针对上述存在的业务痛点以及选型对比,我们的最终方案为:以 Flink1.13.2 作为计算引擎,依靠 Flink 提供的流批统一的 API,基于 Flink-SQL 实现流批一体,Flink-CDC 2.0 作为 ODS 层的数据同步工具以及 Hudi-0.10 Master 作为存储引擎的湖仓一体,解决维护两套代码的业务痛点。

四、新架构与湖仓一体

37 手游的湖仓一体方案,是 37 手游流批一体架构的一部分。通过湖仓一体、流批一体,准实时场景下做到了:数据同源、同计算引擎、同存储、同计算口径。数据的时效性可以到分钟级,能很好的满足业务准实时数仓的需求。下面是架构图:

img

MySQL 数据通过 Flink CDC 进入到 Kafka。之所以数据先入 Kafka 而不是直接入 Hudi,是为了实现多个实时任务复用 MySQL 过来的数据,避免多个任务通过 Flink CDC 接 MySQL 表以及 Binlog,对 MySQL 库的性能造成影响。

通过 CDC 进入到 Kafka 的数据除了落一份到离线数据仓库的 ODS 层之外,会同时按照实时数据仓库的链路,从 ODS->DWD->DWS->OLAP 数据库,最后供报表等数据服务使用。实时数仓的每一层结果数据会准实时的落一份到离线数仓,通过这种方式做到程序一次开发、指标口径统一,数据统一。

从架构图上,可以看到有一步数据修正 (重跑历史数据) 的动作,之所以有这一步是考虑到:有可能存在由于口径调整或者前一天的实时任务计算结果错误,导致重跑历史数据的情况。

而存储在 Kafka 的数据有失效时间,不会存太久的历史数据,重跑很久的历史数据无法从 Kafka 中获取历史源数据。再者,如果把大量的历史数据再一次推到 Kafka,走实时计算的链路来修正历史数据,可能会影响当天的实时作业。所以针对重跑历史数据,会通过数据修正这一步来处理。

总体上说,37 手游的数据仓库属于 Lambda 和 Kappa 混搭的架构。流批一体数据仓库的各个数据链路有数据质量校验的流程。第二天对前一天的数据进行对账,如果前一天实时计算的数据无异常,则不需要修正数据,Kappa 架构已经足够。

五、Flink CDC 2.0 + Kafka + Hudi 0.10 实践

1. 环境准备

  • Flink 1.13.2
  • .../lib/hudi-flink-bundle_2.11-0.10.0-SNAPSHOT.jar (修改 Master 分支的 Hudi Flink 版本为 1.13.2 然后构建)
  • .../lib/hadoop-mapreduce-client-core-2.7.3.jar (解决 Hudi ClassNotFoundException)
  • ../lib/flink-sql-connector-mysql-cdc-2.0.0.jar
  • ../lib/flink-format-changelog-json-2.0.0.jar
  • ../lib/flink-sql-connector-kafka_2.11-1.13.2.jar

source 端 MySQL-CDC 表定义:

create table sy_payment_cdc (ID BIGINT,...PRIMARY KEY(ID) NOT ENFORCED
) with('connector' = 'mysql-cdc','hostname' = '','port' = '','username' = '','password' = '','database-name' = '','table-name' = '','connect.timeout' = '60s','scan.incremental.snapshot.chunk.size' = '100000','server-id'='5401-5416'
);

值得注意的是:scan.incremental.snapshot.chunk.size 参数需要根据实际情况来配置,如果表数据量不大,使用默认值即可。

Sink 端 Kafka+Hudi COW 表定义:

create table sy_payment_cdc2kafka (ID BIGINT,...PRIMARY KEY(ID) NOT ENFORCED
) with ('connector' = 'kafka','topic' = '','scan.startup.mode' = 'latest-offset','properties.bootstrap.servers' = '','properties.group.id' = '','key.format' = '','key.fields' = '','format' = 'changelog-json'
);create table sy_payment2Hudi (ID BIGINT,...PRIMARY KEY(ID) NOT ENFORCED
)
PARTITIONED BY (YMD)
WITH ('connector' = 'Hudi','path' = 'hdfs:///data/Hudi/m37_mpay_tj/sy_payment','table.type' = 'COPY_ON_WRITE','partition.default_name' = 'YMD','write.insert.drop.duplicates' = 'true','write.bulk_insert.shuffle_by_partition' = 'false','write.bulk_insert.sort_by_partition' = 'false','write.precombine.field' = 'MTIME','write.tasks' = '16','write.bucket_assign.tasks' = '16','write.task.max.size' = '','write.merge.max_memory' = ''
);

针对历史数据入 Hudi,可以选择离线 bulk_insert 的方式入湖,再通过 Load Index Bootstrap 加载数据后接回增量数据。bulk_insert 方式入湖数据的唯一性依靠源端的数据本身,在接回增量数据时也需要做到保证数据不丢失。

这里我们选择更为简单的调整任务资源的方式将历史数据入湖。依靠 Flink 的 checkpoint 机制,不管是 CDC 2.0 入 Kafka 期间还是 Kafka 入 Hudi 期间,都可以通过指定 checkpoint 的方式对任务进行重启并且数据不会丢失。

我们可以在配置 CDC 2.0 入 Kafka,Kafka 入 Hudi 任务时调大内存并配置多个并行度,加快历史数据入湖,等到所有历史数据入湖后,再相应的调小入湖任务的内存配置并且将 CDC 入 Kafka 的并行度设置为 1,因为增量阶段 CDC 是单并行度,然后指定 checkpoint 重启任务。

按照上面表定义的参数配置,配置 16 个并行度,Flink TaskManager 内存大小为 50G 的情况下,单表 15 亿历史数据入至 Hudi COW 表实际用时 10 小时,单表 9 亿数据入至 Hudi COW 表实际用时 6 小时。当然这个耗时很大一部分是 COW 写放大的特性,在大数据量的 upsert 模式下耗时较多。

目前我们的集群由 200 多台机器组成,在线的流计算任务总数有 200 多,总数据量接近 2PB。

如果集群资源很有限的情况下,可以根据实际情况调整 Hudi 表以及 Flink 任务的内存配置,还可以通过配置 Hudi 的限流参数 write.rate.limit 让历史数据缓慢入湖。

img

之前 Flink CDC 1.x 版本由于全量 snapshot 阶段单并行度读取的原因,当时亿级以上的表在全量 snapshot 读取阶段就需要耗费很长时间,并且 checkpoint 会失败无法保证数据的断点续传。

所以当时入 Hudi 是采用先启动一个 CDC 1.x 的程序将此刻开始的增量数据写入 Kafka,之后再启动另外一个 sqoop 程序拉取当前的所有数据至 Hive 后,通过 Flink 读取 Hive 的数据写 Hudi,最后再把 Kafka 的增量数据从头消费接回 Hudi。由于 Kafka 与 Hive 的数据存在交集,因此数据不会丢失,加上 Hudi 的 upsert 能力保证了数据唯一。

但是,这种方式的链路太长操作困难,如今通过 CDC 2.0 在全量 snapshot 阶段支持多并行度以及 checkpoint 的能力,确实大大降低了架构的复杂度。

2. 数据比对

  • 由于生产环境用的是 Hive1.x,Hudi 对于 1.x 还不支持数据同步,所以通过创建 Hive 外部表的方式进行查询,如果是 Hive2.x 以上版本,可参考 Hive 同步章节;
  • 创建 Hive 外部表 + 预创建分区;
  • auxlib 文件夹添加 Hudi-hadoop-mr-bundle-0.10.0-SNAPSHOT.jar。
CREATE EXTERNAL TABLE m37_mpay_tj.`ods_sy_payment_f_d_b_ext`(`_hoodie_commit_time` string,`_hoodie_commit_seqno` string,`_hoodie_record_key` string,`_hoodie_partition_path` string,`_hoodie_file_name` string,`ID` bigint,...)
PARTITIONED BY (`dt` string)
ROW FORMAT SERDE'org.apache.hadoop.Hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT'org.apache.Hudi.hadoop.HoodieParquetInputFormat'
OUTPUTFORMAT'org.apache.hadoop.Hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION'hdfs:///data/Hudi/m37_mpay_tj/sy_payment'

最终查询 Hudi 数据 (Hive 外部表的形式) 与原来 sqoop 同步的 Hive 数据做比对得到:

  1. 总数一致;
  2. 按天分组统计数量一致;
  3. 按天分组统计金额一致。

六、总结

湖仓一体以及流批一体架构对比传统数仓架构主要有以下几点好处:

  • Hudi 提供了 Upsert 能力,解决频繁 Upsert/Delete 的痛点;
  • 提供分钟级的数据,比传统数仓有更高的时效性;
  • 基于 Flink-SQL 实现了流批一体,代码维护成本低;
  • 数据同源、同计算引擎、同存储、同计算口径;
  • 选用 Flink CDC 作为数据同步工具,省掉 sqoop 的维护成本。

最后针对频繁增加表字段的痛点需求,并且希望后续同步下游系统的时候能够自动加入这个字段,目前还没有完美的解决方案,希望 Flink CDC 社区能在后续的版本提供 Schema Evolution 的支持。

Reference

[1] MySQL CDC 文档: MySQL CDC Connector — Flink CDC 2.0.0 documentation

[2] Hudi Flink 答疑解惑:HUDI FLINK 答疑解惑 · 语雀

[3] Hudi 的一些设计:Hudi 的一些设计 · 语雀

原文链接
本文为阿里云原创内容,未经允许不得转载。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/512347.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

手把手搭建一个容器化+代理网关+可视化管理环境

作者 | togettoyou来源 | SuperGopher前言本文主要分享个人服务器的应用部署方案现状,容器化代理网关可视化管理。准备阶段我购买的是腾讯云服务器(2 核 4GB 3Mbps)域名也是在腾讯云备案过的,提前准备域名解析配置环境安装 Docker…

漫画 | 一口气搞懂 Serverless !

简介: 第二届云原生编程挑战赛为热爱技术的年轻人提供一个挑战世界级技术问题的舞台,希望用技术为全社会创造更大价值。 作者 | 刘欣 呃,我可能是别人眼中所说的不用奋斗的一代。 大家喜欢听的什么多姿多彩的生活,我都经历过一…

OpenKruise v0.10.0 新特性 WorkloadSpread 解读

简介: 针对需求,OpenKruise 在 v0.10.0 版本中新增了 WorkloadSpread 特性。目前它支持配合 Deployment、ReplicaSet、CloneSet 这些 workload,来管理它们下属 Pod 的分区部署与弹性伸缩。下文会深入介绍 WorkloadSpread 的应用场景和实现原理…

CSS 状态管理,玩出花样了!

作者 | 零一来源 | 前端印象CSS用于交互的方式无非就那么几种:伪类::hover、:link、:active ...动画:animation过渡动画:transition这些交互方式组合起来,真的可以玩出一些花样,例如我们本文的主题&#xf…

告别Kafka Stream,让轻量级流处理更加简单

简介: 还在花精力去选型Kafka组件去做清洗转化?来试试Kafka ETL任务功能! 一说到数据孤岛,所有技术人都不陌生。在 IT 发展过程中,企业不可避免地搭建了各种业务系统,这些系统独立运行且所产生的数据彼此独…

元宇宙“性骚扰”现象频出,Meta推出“个人结界”能保护好女玩家吗?

作者 | 小码君来源 | 抓码青年元宇宙,可以说是最近最炙手可热的概念了。各大厂纷纷入局元宇宙,Faceebook甚至将总公司名字都改名为Meta。不过盯上元宇宙的可不止资本大鳄,还有一些不怀好意的色狼。据外媒报道称,在Meta的Oculus设备…

【CDS技术揭秘系列 01】阿里云CDS-OSS容灾大揭秘

简介: 本文主要阐述 CDS 产品中 OSS 服务在容灾方面的部署形态以及实现的其本原理。 容灾功能可以保证用户一份数据在多个地方存在冗余备份,当某个机房出现极端异常(比如物理损毁)情况下,数据也不会出现丢失&#xff1…

如何促合作共赢?技术人的一点经验分享

简介: 本文作者将通过与詹韦团队一起合作的《树懒平台》,分享在工作过程中,我们什么情况下会有合作诉求?有了合作诉求之后,如何寻找对的人?锁定候选人之后,如何打动对方促成合作?合作…

不记得 Git 命令? 懒人版 Git 值得拥有!

作者 | Eason来源 | 程序员巴士Git的强大是所有开发者都心知肚明的事情,但是其多样的命令令人很是难受。不过在Github上有着这么一个开源项目lazygit[1]。Lazygit是由Jesseduffield开发并维护的项目,其代码托管在Github。通过使用Lazygit,我们…

Dubbo3.0|阿里巴巴服务框架三位一体的选择与实践

简介: 服务框架就像铁路的铁轨一样,是互通的基础,只有解决了服务框架的互通,才有可能完成更高层的业务互通,所以用相同的标准统一,合二为一并共建新一代的服务框架是必然趋势。Dubbo3.0 是 Dubbo2.0 与 HSF…

全球首款乘云而来的存储产品CDS诞生!

9月22日,阿里云发布全球首款“云定义存储”(Cloud Defined Storage,CDS)产品。作为一款本地部署的分布式存储产品,阿里云CDS拥有与公共云存储相同的技术架构,让企业客户在本地也能部署和使用与公共云一致的…

性能提升3倍、时延降低70%,阿里云企业级存储ESSD云盘再升级!

9月22日,阿里云存储年度新品发布会上,阿里云基础产品资深产品总监陈起鲲发布了其全球领先的旗舰级块存储产品ESSD的两款新规格(ESSD Auto PL、ESSD PL-X),并宣布了新增的多项企业级能力。 据了解,ESSD是阿…

一图看懂,什么是“云定义存储”

世界的诞生是从盘古开天辟地开始 而数据的存储 则由在龟甲上刻下的第一个字开始 经过数千年发展 数据存储也从最初的 龟壳、竹简等材料逐步进化到磁带、硬盘甚至云上 在之前几千年里 人们对于数据存储的需求并没那么高 仅仅通过纸张就能将所有数据记录下来 随着第一台计算机面…

Android 13 第一个开发者版本来了,网友直呼:Android 12 还没玩透!

整理 | 苏宓出品 | CSDN(ID:CSDNnews)2 月 10 日,Google 宣布 Android 13 首个预览版面向开发者开放,此版本重点聚焦隐私和安全、提供开发者生产力、应用兼容性,并保持与 OpenJDK 11 更新一致、提供主题图标…

「技术人生」第6篇:技术同学应该如何理解业务?

简介: 本文以大量理论论述解析业务,并提供多种基于不同场景的实操方法,帮助技术同学以科学、合理的方式开展日常工作、指导团队开展业务建设,保障顶层设计的落地执行。 一. 背景 目前已经发布《技术一号位的方法论》系列文章其实…

参数设置_变频器基本参数设置

工业设备的使用如何达到最大效能以及最佳效果,需要使用人员充分了解设备性能以及工艺要求,所以变频器参数设置或者优化是非常重要的环节,古人云失之毫厘差之千里就是这个道理。一、变频器基本参数设置参数设置可以是手持编程器操作&#xff0…

Morphling:云原生部署 AI , 如何把降本做到极致?

简介: Morphling 本意是游戏 Dota 中的英雄“水人”,他可以根据环境要求,通过灵活改变自身形态,优化战斗表现。我们希望通过 Morphling 项目,实现针对机器学习推理作业的灵活、智能的部署配置改变,优化服务…

datax参数设置_DataX Web数据增量同步配置说明

一、根据日期进行增量数据抽取1.页面任务配置打开菜单任务管理页面,选择添加任务按下图中5个步骤进行配置1.任务类型选DataX任务2.辅助参数选择时间自增3.增量开始时间选择,即sql中查询时间的开始时间,用户使用此选项方便第一次的全量同步。第…

Node18 即将支持 import HTTP资源!

作者 | 零一来源 | 前端印象最近看到Node官方提交了一条commit ,并且已经合入 master分支 ,如下图所示:node master commit由此可见,Node18可能会支持一个非常 nice 的功能,那就是 支持 import 远程HTTPS资源和本地的H…

MYSQL深潜 - 剖析Performance Schema内存管理

简介: 本文主要是通过对PFS引擎的内存管理的源码的阅读,解读PFS内存分配及释放原理,深入剖析其中存在的一些问题,以及一些改进思路。本文源代码分析基于Mysql-8.0.24版本。 作者 | 之枢 来源 | 阿里技术公众号 一 引言 MYSQL Pe…