如何基于 Apache SeaTunnel 同步数据到 Iceberg

概述

Apache SeaTunnel

Apache SeaTunnel 是一个分布式、高性能、易扩展、用于海量数据(离线&实时)同步和转化的开源数据集成平台, 支持spark、flink 及自研 Zeta 引擎,有庞大的用户社群.

Apache Iceberg

Apache Iceberg 是一个开源的表格格式,它旨在改善大数据生态系统中复杂的数据湖管理。作为Apache软件基金会的一部分,Iceberg专为提供更强大、更灵活的数据湖表格管理功能而设计,它通过提供一种更加高效和可靠的方式来处理大规模数据集,从而解决了传统数据湖在数据可靠性、性能和可维护性方面的挑战。

主要特点
  1. 模式演变和兼容性:Iceberg支持模式的演变,同时保证了向前和向后的兼容性。这使得在不破坏已有数据的情况下添加、删除、更新字段成为可能。
  2. 隐藏分区:分区信息作为表模式的一部分进行存储,这消除了需要手动管理分区目录的复杂性。分区对查询透明,即可进行常规查询而无需指定具体的分区。
  3. 多计算引擎支持:Iceberg可以与现代计算引擎无缝集成,包括Apache Spark、Apache Flink、PrestoDB 和 Trino等。同一数据集可以被多个引擎并发访问且保持一致性。
  4. 存储引擎支持: HDFS / S3
  5. 原子操作:Iceberg支持原子性写入操作。这意味着表更新要么全部成功,要么全部失败,确保了数据的一致性。
  6. 快照管理:支持表的快照功能,允许用户回滚到历史版本,以及进行增量读取操作。这对于数据恢复和审计尤为重要。
  7. 高效读写:通过提供文件层面的元数据,使得读写操作可以更高效地进行。该功能减少了需要扫描的数据量,改善了查询性能。
使用场景
  • 数据湖构建和管理:对于需要构建和管理大型数据湖的企业和组织,Apache Iceberg提供了一个高效、可扩展且易于管理的解决方案。
  • 多计算引擎环境:在使用多个计算引擎进行数据处理的环境中,Iceberg能够提供一致的数据视图和并发控制。
  • 数据科学和分析:提供了更强大且灵活的数据组织方式,使得进行复杂分析和数据科学项目更加容易。

SeaTunnel Iceberg sink

介绍

Apache SeaTunnel connector-Iceberg 是专门为Iceberg引擎开发的数据同步组件, 主要为了方便SeaTunnel 用户能更加友好的使用Iceberg来构建企业级数据湖仓

Iceberg sink 特性

  • 支持数据批量数据写入
  • cdc模式下的数据同步
  • 支持配置自动建表
  • 支持schema evolution
  • 支持指定分区键
  • 支持数据提交到指定的branch

Sink参数配置

NameTypeRequiredDefaultDescription
catalog_namestringyesdefault用户指定的目录名称。默认为 default。
namespacestringyesdefaultIceberg 数据库名称。默认为 default
tablestringyes-Iceberg 表名称。
iceberg.catalog.configmapno-指定用于初始化 Iceberg 目录的属性,具体配置参考:Iceberg Catalog Properties
hadoop.configmapno-指定 Hadoop 配置的属性,具体配置参考: Hadoop Configuration
iceberg.hadoop-conf-pathstringno-指定加载 'core-site.xml'、'hdfs-site.xml'、'hive-site.xml' 文件的路径。
case_sensitivebooleannofalse控制是否以区分大小写的方式匹配 schema。
iceberg.table.write-propsmapno-传递给 Iceberg 写入器初始化的属性,这些属性具有优先权,可以在 Iceberg Write Properties 找到具体参数。
iceberg.table.auto-create-propsmapno-Iceberg 在自动创建表时指定的配置, 具体参照: Table Behavior Properties
iceberg.table.schema-evolution-enabledbooleannofalse将其设置为 true 可以使 Iceberg 表在同步过程中支持模式演变。目前仅支持添加字段 和 部分类型变更
iceberg.table.primary-keysstringno-表的主键配置,多个主键用","分割 ,与 "iceberg.table.upsert-mode-enabled" 一起使用,用于同主键数据的增量更新
iceberg.table.upsert-mode-enabledbooleannofalse将其设置为 true 以启用 upsert 模式,默认为 false, 用于 Iceberg 中数据的增量更新
iceberg.table.partition-keysstringno-创建表时指定的分区字段,多个分区字段使用","分隔。
iceberg.table.commit-branchstringno-指定数据提交的分支

同步模式

批处理

  • 批模式数据导入, append模式 ,不进行数据的增量更新
  • 支持 flink , spark ,zeta 引擎
env {parallelism = 1job.mode = "BATCH"# You can set spark configuration herespark.app.name = "SeaTunnel"spark.executor.instances = 2spark.executor.cores = 1spark.executor.memory = "1g"spark.master = local
}source {FakeSource {row.num = 100schema = {fields {c_map = "map<string, string>"c_array = "array<int>"c_string = stringc_boolean = booleanc_tinyint = tinyintc_smallint = smallintc_int = intc_bigint = bigintc_float = floatc_double = doublec_decimal = "decimal(30, 8)"c_bytes = bytesc_date = datec_timestamp = timestamp}}result_table_name = "fake"}
}transform {
}sink {Iceberg {catalog_name="seatunnel_test"iceberg.catalog.config={"type"="hadoop""warehouse"="file:///tmp/seatunnel/iceberg/hadoop-sink/"}namespace="seatunnel_namespace"table="iceberg_sink_table"iceberg.table.write-props={write.format.default="parquet"write.target-file-size-bytes=10}iceberg.table.partition-keys="c_timestamp"case_sensitive=true}
}

流写入(CDC)

  • 配置mysql cdc 进行数据的增量采集
  • Sink 指定 iceberg.table.primary-keys 和 iceberg.table.upsert-mode-enabled=true 进行数据增量写入
  • 配置 iceberg.table.schema-evolution-enabled=true 支持 schema 的演进(当前仅支持增加字段和部分类型变更)
  • 支持 flink / zeta 引擎的数据同步,不支持 spark
    env {
    parallelism = 1
    job.mode = "STREAMING"
    checkpoint.interval = 5000
    }
    

source { MySQL-CDC { result_table_name="customer_result_table" catalog { factory = Mysql } debezium = { # include ddl "include.schema.changes" = true } database-names=["mysql_cdc"] table-names = ["mysql_cdc.mysql_cdc_e2e_source_table"] format=DEFAULT username = "st_user" password = "seatunnel" base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc" } }

transform { }

sink { Iceberg { catalog_name="seatunnel_test" iceberg.catalog.config={ "type"="hadoop" "warehouse"="file:///tmp/seatunnel/iceberg/hadoop-cdc-sink/" } namespace="seatunnel_namespace" table="iceberg_sink_table" iceberg.table.write-props={ write.format.default="parquet" write.target-file-size-bytes=10 } iceberg.table.primary-keys="id" iceberg.table.partition-keys="f_datetime" iceberg.table.upsert-mode-enabled=true iceberg.table.schema-evolution-enabled=true case_sensitive=true } }

```

总结

基于Apache SeaTunnel来构建数据湖项目, 我们可以直接引用 SeaTunnel 强大的组件生态,不用独立构造新的项目来实现业务需求,同时Apache SeaTunnel的标准的架构设计也为熟悉开源的朋友提供了快速独立扩展的机会,可以在此基础上快速扩展自己的需求,做出符合自己业务需要的组件, 也欢迎大家试用 Iceberg-connect , 希望能帮大家真正解决实际生产场景中遇到的问题,

也希望大家能积极反馈使用中的问题,并贡献场景,大家共同来解决,并促进 Iceberg-connect 组件的完善, 一起共创数据开发的新场景.

本文由 白鲸开源科技 提供发布支持!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/47717.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MySQL 架构中的三层服务是什么?

MySQL的架构可以分为客户端层、服务器层、存储引擎层三次服务&#xff0c;这些服务协同工作以提供高效的数据库管理服务。 1. 客户端层&#xff08;Client Layer&#xff09; 功能&#xff1a;客户端层是与用户交互的部分&#xff0c;用户通过客户端向MySQL服务器发送请求、接…

模板方法设计模式

模板方法设计模式: 模板方法设计模式&#xff1a;解决方法中存在重复代码的问题。 模板方法设计模式的写法&#xff1a; 1、定义一个抽象类 2、在里面定义2个方法 ​ 一个是模板方法&#xff1a;把相同代码放里面去 ​ 一个是抽象方法&#xff1a;具体实现交给子类完成 建议使用…

微软成为PostgreSQL主要贡献者

微软对PostgreSQL贡献的很多新功能都来自于客户在使用微软Azure上的PostgreSQL管理实例数据库&#xff0c;所以这些新功能都来自于真实的客户需求 微软贡献的这些新功能都是比较实用的功能 在这里&#xff0c;【真实的客户需求】要突出一下&#xff0c;因为现在很多社区贡献者…

4. docker镜像、Dockerfile

docker镜像、Dockerfile 一、docker镜像1、镜像介绍2、镜像核心技术 二、Dockerfile定制镜像1、Dockerfile使用流程1.1 编写Dockerfile1.2、构建镜像1.3 创建容器测试镜像定制操作 2、Dockerfile常用指令 一、docker镜像 1、镜像介绍 分层的文件系统 优势&#xff1a;节省空间…

基于JAVA+SpringBoot+uniapp的心理小程序(小程序版本)

✌全网粉丝20W,csdn特邀作者、博客专家、CSDN新星计划导师、java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ 技术范围&#xff1a;SpringBoot、Vue、SSM、HLMT、Jsp、SpringCloud、Layui、Echarts图表、Nodejs、爬…

使用 JavaScript 实现图片预览功能

在本文中&#xff0c;我们将学习如何使用 JavaScript 实现一个简单的图片预览功能。我们将使用 HTML、CSS 和 JavaScript 来创建一个用户界面&#xff0c;用户可以输入图片 URL 并实时预览图片。 创建 HTML 页面结构 首先&#xff0c;我们需要创建一个包含用于输入图片 URL 和…

docker默认存储地址 var/lib/docker 满了,换个存储地址操作流程

1. 查看docker 存储地址 docker info如下 var/lib/docker2、查看内存大小 按需执行 df -h 找超过100M的大文件 find / -type f -size 100M -exec ls -lh {} \; df -Th /var/lib/docker 查找这个文件的容量 df -h 查找所有挂载点 du -hs /home/syy_temp/*1、df -h 2、sud…

2-38 基于matlab的蚁群算法优化无人机uav巡检

基于matlab的蚁群算法优化无人机uav巡检&#xff0c;巡检位置坐标可根据需求设置&#xff0c;从基地出发&#xff0c;返回基地&#xff0c;使得路径最小。可设置蚁群数量&#xff0c;信息素系数。输出最佳路线长度。程序已调通&#xff0c;可直接运行。 2-38 蚁群算法优化无人…

冥想太极八段锦八部金刚功易筋经,更清晰地认识自己。

接下来&#xff0c;我将开始进行我的太极生涯&#xff0c;我的爱好实际上就是对于我自己的修行&#xff0c;以后对于抖音视频的文案&#xff0c;自己就不再使自己进行一个长篇大论了&#xff0c;自己在csdn记录下自己不断地进行学习和提升的反思。对自己真实就是最好的成长&…

科普文:多线程如何使用CPU缓存?

一、前言 计算机的基础知识聊的比较少&#xff0c;但想要更好的理解多线程以及为后续多线程的介绍做铺垫&#xff0c;所以有必要单独开一篇来聊一下 CPU cache。 二、CPU 前面有一篇文章关于 CPU是如何进行计算 感兴趣的同学&#xff0c;可以先移步了解一下&#xff0c;不了…

2、PostgreSQL之基本的SQL语言

PostgreSQL之基本的SQL语言 在上一篇文章中&#xff0c;我们已经安装好了PostgreSQL&#xff0c;并且能够通过psql访问数据库&#xff0c;以及远程访问数据库。下面就来介绍一些PostgreSQL的基本操作。 1、创建一个新表 在psql中输入以下命令&#xff1a; CREATE TABLE wea…

[003-02-10].第10节:Docker环境下搭建Redis主从复制架构

我的博客大纲 我的后端学习大纲 我的Redis学习大纲 1.cluster&#xff08;集群&#xff09;模式-docker版 哈希槽分区进行亿级数据存储 1.1.面试题&#xff1a;1~2亿条数据需要缓存&#xff0c;请问如何设计这个存储案例 1.回答&#xff1a;单机单台100%不可能&#xff0c;肯…

细说MCU用定时器控制ADC采样频率的实现方法并通过Simulink查看串口输出波形

目录 一、硬件工程 二、建立Simulink模型 1.安装MATLAB和Simulink 2.建立Simulink模型 三、代码修改 1.修改回调函数 2.产看结果 3.完整的main.c 本文作者的文章 细说MCU用定时器控制ADC采样频率的实现方法-CSDN博客 https://wenchm.blog.csdn.net/article/details/…

Zabbix的安装部署及使用流程

Zabbix的安装部署及使用流程可以分为以下几个主要步骤&#xff1a; 一、准备工作 确定监控目标&#xff1a; 确定需要监控的设备或应用程序&#xff0c;如服务器、网络设备、应用程序等。 准备环境&#xff1a; 准备至少两台机器&#xff0c;一台作为Zabbix服务器&#xff08…

270-VC709E 基于FMC接口的Virtex7 XC7VX690T PCIeX8 接口卡

一、板卡概述 本板卡基于Xilinx公司的FPGA XC7VX690T-FFG1761 芯片&#xff0c;支持PCIeX8、两组 64bit DDR3容量8GByte&#xff0c;HPC的FMC连接器&#xff0c;板卡支持各种FMC子卡扩展。软件支持windows&#xff0c;Linux操作系统。 二、功能和技术指标&#xff1a; 板卡功…

Getx学习笔记之中间件鉴权

目录 前言 一、实现步骤 1.添加依赖 2.创建鉴权中间件 3.定义路由 4.设置初始路由 5.模拟登陆状态 二、Getx鉴权步骤总结 三、本文demo示例 四、参考文章 前言 在 Flutter 中&#xff0c;使用 GetX 可以很方便地实现中间件鉴权&#xff08;Authentication&#xff09…

MySQL生产环境迁移至YashanDB数据库深度体验

前言 首届YashanDB「迁移体验官」开放后&#xff0c;陆续收到「体验官」们的投稿&#xff0c;小崖在此把优秀的投稿文章分享给大家~今天分享的用户文章是《MySQL生产环境迁移至YashanDB数据库深度体验》&#xff08;作者&#xff1a;呆呆的私房菜&#xff09;&#xff0c;满满…

Python简化命令行界面库之fire使用详解

概要 在开发命令行工具时,开发者通常需要编写大量代码来解析命令行参数,这既耗时又容易出错。Python Fire 是 Google 开源的一个库,旨在简化命令行界面的开发。它可以将任何 Python 对象自动生成一个命令行界面,从而大大减少了开发时间和代码复杂度。本文将详细介绍 Pytho…

mysql-造数据/列转行

-- MySQL 列转行 set global group_concat_max_len102400; set group_concat_max_len102400; SELECT global.group_concat_max_len; SELECT group_concat_max_len; select table_name,concat(group_concat(COLUMN_NAME order by ORDINAL_POSITION separator ,)) as all_columns…

网络安全-网络安全及其防护措施10

46.软件定义网络&#xff08;SDN&#xff09; 软件定义网络&#xff08;SDN&#xff09;的概念和特点 软件定义网络&#xff08;SDN&#xff09;是一种新兴的网络架构&#xff0c;通过将网络的控制平面&#xff08;Control Plane&#xff09;和数据转发平面&#xff08;Data …