Apache Paimon 文件操作

本文旨在澄清不同文件操作对文件的影响。

本页面提供具体示例和实用技巧,以有效地管理这些操作。此外,通过对提交(commit)和压实(compact)等操作的深入探讨,我们旨在提供有关文件创建和更新的见解。

前提

对以下几篇有了解:
1、Apache Paimon 介绍
2、Apache Paimon 基础概念
3、Apache Paimon 文件布局设计
4、知道如何在 Flink 中使用 Paimon

创建 catalog

在 Flink lib 中放入 paimon-flink 依赖包,执行  ./sql-client.sh 启动 Flink SQL Client,然后执行下面的命令去创建 Paimon catlog:

CREATE CATALOG paimon WITH (
'type' = 'paimon',
'warehouse' = 'file:///tmp/paimon'
);USE CATALOG paimon;

执行完后会在 file:///tmp/paimon 路径下创建目录 default.db

创建 Table

执行下面的命令会创建一个带有 3 个属性的 Paimon 表:

CREATE TABLE T (id BIGINT,a INT,b STRING,dt STRING COMMENT 'timestamp string in format yyyyMMdd',PRIMARY KEY(id, dt) NOT ENFORCED
) PARTITIONED BY (dt);

执行后,Paimon 表 T 会在 /tmp/paimon/default.db/T 目录下生成目录,它的 schema 会存放在目录 /tmp/paimon/default.db/T/schema/schema-0 下。

c37214dd4edc3b3e73604a12c1d91d38.png

写入数据到 Table

INSERT INTO T VALUES (1, 10001, 'varchar00001', '20230501');
5ee958a74e2c7719d687bf90f4242668.png

用户可以通过执行查询 SELECT * FROM T 来验证这些记录的可见性,该查询将返回一行结果。提交过程会创建一个位于路径 /tmp/paimon/default.db/T/snapshot/snapshot-1 的快照。快照-1 的文件布局如下所述:

一旦任务运行完成变成 finished 时,提交成功后数据就写入到 Paimon 表中。用户可以通过执行查询 SELECT * FROM T 来验证数据的可见性,该查询将返回一行结果。

另外可以发现目录的结构发生如下变化,新增 dt=20230501manifestsnapshot三个目录。三个的目录结构如下:

10beb60936705a7907e03ffa1169a724.png

查询目录下的数据如下所示:

4ac9fbff8bda08899c8c86c09a8ca923.png

这是因为提交过程中会创建一个位于 /tmp/paimon/default.db/T/snapshot/snapshot-1 的快照。snapshot-1 的文件布局如下所述:

ac5165245eb8114a939e11e1b51262b7.png
snapshot-1的内容包含了这个 snapshot 的元数据,比如 manifest list  和 schema id:

{"version" : 3,"id" : 1,"schemaId" : 0,"baseManifestList" : "manifest-list-989f0744-4419-48dd-b0fd-955343e6e803-0","deltaManifestList" : "manifest-list-989f0744-4419-48dd-b0fd-955343e6e803-1","changelogManifestList" : null,"commitUser" : "5132ef16-41ec-4172-8bf4-01a304507b36","commitIdentifier" : 9223372036854775807,"commitKind" : "APPEND","timeMillis" : 1697080282120,"logOffsets" : { },"totalRecordCount" : 1,"deltaRecordCount" : 1,"changelogRecordCount" : 0,"watermark" : -9223372036854775808
}

需要提醒的是,manifest list 包含了 snapshot 的所有更改,baseManifestList 是应用在 deltaManifestList 中的更改所基于的基本文件。第一次提交将导致生成 1 个清单文件,并创建了 2 个清单列表(文件名可能与你的实验中的不同):

➜  T 
manifest-232271e3-f294-4556-8947-ee87483c3bfd-0
manifest-list-989f0744-4419-48dd-b0fd-955343e6e803-0
manifest-list-989f0744-4419-48dd-b0fd-955343e6e803-1
6636815cd4bff6ef359c6e0e46705656.png
image.png

manifest-232271e3-f294-4556-8947-ee87483c3bfd-0: 如前面文件布局图中的 manifest-1-0,存储了关于 snapshot 中数据文件信息的 manifest。

cf9bbbbb14fdab1b0ffdb68af60e2b09.png

manifest-list-989f0744-4419-48dd-b0fd-955343e6e803-0: 是基础的 baseManifestList,如前面文件布局图中的 manifest-list-1-base,实际上是空的。

00210ba45c64d5d1a01f54ce88389212.png

manifest-list-989f0744-4419-48dd-b0fd-955343e6e803-1:是deltaManifestList,如前面文件布局图中的 manifest-list-1-delta,其中包含一系列对数据文件执行操作的清单条目,而在这种情况下,清单条目为 manifest-1-0

5254d85be145f7145c1a4d0176519df7.png

在不同分区中插入一批记录,在 Flink SQL 中执行以下语句:

INSERT INTO T VALUES 
(2, 10002, 'varchar00002', '20230502'),
(3, 10003, 'varchar00003', '20230503'),
(4, 10004, 'varchar00004', '20230504'),
(5, 10005, 'varchar00005', '20230505'),
(6, 10006, 'varchar00006', '20230506'),
(7, 10007, 'varchar00007', '20230507'),
(8, 10008, 'varchar00008', '20230508'),
(9, 10009, 'varchar00009', '20230509'),
(10, 10010, 'varchar00010', '20230510');
78c1c0510ee7690053425a27d2f2c475.png

等任务执行完成并提交快照后,执行 SELECT * FROM T 将返回 10 行数据。

53935a35ecbf59ebf67b1e59f904d4ac.png

创建了一个新的快照,即 snapshot-2,并给出了以下物理文件布局:

ac71d4fd4c53aa07e60a8f4e0b80d2f8.png

dt=20230501
dt=20230502
dt=20230503
dt=20230504
dt=20230505
dt=20230506
dt=20230507
dt=20230508
dt=20230509
dt=20230510
manifest
schema
snapshot➜  T ll snapshot
EARLIEST
LATEST
snapshot-1
snapshot-2➜  T ll manifest
manifest-232271e3-f294-4556-8947-ee87483c3bfd-0 //snapshot-1 manifest file
manifest-list-989f0744-4419-48dd-b0fd-955343e6e803-0 //snapshot-1 baseManifestList
manifest-list-989f0744-4419-48dd-b0fd-955343e6e803-1 //snapshot-1 deltaManifestListmanifest-ed94ba0c-e71f-4a01-863c-479b34af2551-0 //snapshot-2 manifest file
manifest-list-03bad670-05ae-48b2-b88c-de631ad14333-0 //snapshot-2 baseManifestList
manifest-list-03bad670-05ae-48b2-b88c-de631ad14333-1 snapshot-2 baseManifestList

新的快照文件布局图如下:

d83d7ca3ede10fcff688245a1649159a.png

删除数据

接下来删除满足条件 dt>=20230503 的数据。在 Flink SQL Client 中,执行以下语句:

DELETE FROM T WHERE dt >= '20230503';

注意⚠️:

1、需使用 Flink 1.17 及以上版本,否则不支持该语法

Flink SQL> DELETE FROM T WHERE dt >= '20230503';
>
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Unsupported query: DELETE FROM T WHERE dt >= '20230503';
3f6e34d0e4fa8108ee248f35d9e333e0.png

2、使用 batch 模式,否则报错不支持

Flink SQL> DELETE FROM T WHERE dt >= '20230503';
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: DELETE statement is not supported for streaming mode now.
8cba985e4a6077c4c8d03e0fcd7ccae1.png

需要设置 SET 'execution.runtime-mode' = 'batch';

f6ae65f4d88ef90106d7f7a91be18d20.png

cfe389a547190d230108158d2d4bdc7f.png

第三次提交完成,并生成了快照 snapshot-3。现在,表下的目录,会发现没有分区被删除。相反,为分区 20230503 到 20230510 创建了一个新的数据文件:

71d27104f729d5dafd9f49a3bc265894.png

➜  T ll dt=20230510/bucket-0data-0531fa1e-6ff1-47ed-aeea-3a01896c9698-0.orc # newer data file created by the delete statement 
data-0c8a6a16-13c5-4049-b4ed-e968d34e20ae-0.orc # older data file created by the insert statement

这是有道理的,因为我们在第二次提交中插入了一条数据(为 +I[10, 10010, 'varchar00010', '20230510'] ),然后在第三次提交中删除了数据。现在再执行一下 SELECT * FROM T只能查询到两条数据。

+I[1, 10001, 'varchar00001', '20230501']
+I[2, 10002, 'varchar00002', '20230502']
9e2bfece666a555f0c787c08db76be83.png

f54388d8ebd37539035300e9f3b61db4.png

snapshot-3后新的文件布局如下图:

cf60933c659066f1353336aa7d6d9654.png

9625cbe157d4995a7c90f8214bbc9a69.png

4dfa9936a847f42d9e550daadae4c426.png

请注意,manifest-3-0 包含了 8 个 ADD 操作类型的清单条目,对应着 8 个新写入的数据文件。

ae60db9dd79c4b573cedd100e82d1320.png

Compact Table

你可能已经注意到的,随着连续 snapshot 的增加,小文件的数量会增加,这可能会导致读取性能下降。因此,需要进行全量压缩以减少小文件的数量。

通过 flink run 运行一个专用的合并小文件任务:

<FLINK_HOME>/bin/flink run \-D execution.runtime-mode=batch \./paimon-flink-action-0.6-SNAPSHOT.jar \compact \--warehouse <warehouse-path> \--database <database-name> \ --table <table-name> \[--partition <partition-name>] \[--catalog-conf <paimon-catalog-conf> [--catalog-conf <paimon-catalog-conf> ...]] \[--table-conf <paimon-table-dynamic-conf> [--table-conf <paimon-table-dynamic-conf>] ...]

例如(提前下载好压缩任务 jar 在 Flink 客户端下):

./bin/flink run \-D execution.runtime-mode=batch \./paimon-flink-action-0.6-20231012.001913-36.jar \compact \--path file:///tmp/paimon/default.db/T
e5563a9678ae637da9037c652ed4d9fc.png

d3e949f396eadd31eb1e8bc113d5b85d.png

压缩任务结束后,再来看下 snapshot-4 文件结构:

{"version" : 3,"id" : 4,"schemaId" : 0,"baseManifestList" : "manifest-list-00af18ef-486d-4046-be60-25533e078333-0","deltaManifestList" : "manifest-list-00af18ef-486d-4046-be60-25533e078333-1","changelogManifestList" : null,"commitUser" : "ab094a14-17e0-4215-8e79-cd0651436dee","commitIdentifier" : 9223372036854775807,"commitKind" : "COMPACT","timeMillis" : 1697102418177,"logOffsets" : { },"totalRecordCount" : 2,"deltaRecordCount" : -16,"changelogRecordCount" : 0,"watermark" : -9223372036854775808
}
dae13a84c0deafcf77d7b3459948e9b7.png

16d2ce5a4f4b9c41ea7bbb10b07c9f4e.png

manifest-4-0 包含 20 个清单条目(18 个 DELETE 操作和 2 个 ADD 操作):

  • 对于分区 20230503 到 20230510,有两个删除操作,对应两个数据文件

  • 对于分区 20230501 到 20230502,有一个删除操作和一个添加操作,对应同一个数据文件。

Alter Table

执行以下语句来配置全量压缩:

ALTER TABLE T SET ('full-compaction.delta-commits' = '1');

这将为 Paimon 表创建一个新的 schema,即 schema-1,但在下一次提交之前,不会有任何 snapshot 使用此模式。

3e105c1f595d8d263e76ea29b28e59ff.png

过期 snapshot

请注意,标记为删除的数据文件直到 snapshot 过期且没有任何消费者依赖于该 snapshot 时才会真正被删除。参考 Manage Snapshots 可以查阅更多信息。

在 snapshot 过期过程中,首先确定 snapshot 的范围,然后标记这些 snapshot 内的数据文件以进行删除。只有当存在引用特定数据文件的 DELETE 类型的清单条目时,才会标记该数据文件进行删除。这种标记确保文件不会被后续的 snapshot 使用,并且可以安全地删除。

假设上图中的所有 4 个 snapshot 即将过期。过期过程如下:

1、首先删除所有标记为删除的数据文件,并记录任何更改的 bucket。
2、然后删除所有的 changelog 文件和关联的 manifests。
3、最后,删除快照本身并写入最早的提示文件。

如果删除过程后留下的空目录,也将被删除。

假设创建了另一个快照 snapshot-5,并触发了快照过期。将删除 snapshot-1 到 snapshot-4。为简单起见,我们只关注以前快照的文件,快照过期后的最终布局如下:

20b502dfeebe97fd3c1763b3935a4233.png
因此,分区 20230503 到 20230510 的数据被物理删除了。

Flink 流式写入

我们通过利用 CDC 数据的示例来测试 Flink 流式写入,将介绍将变更数据捕获并写入 Paimon 的过程,以及异步压缩、快照提交和过期的机制背后的原理。帮我们更详细地了解 CDC 数据摄取的工作流程以及每个参与组件所扮演的独特角色。

d0a88a3857ccb937c6ce88ebb9d27e83.png

1、MySQL CDC Source 统一读取快照数据和增量数据,其中 SnapshotReader 读取快照数据,而 BinlogReader 读取增量数据。
2、Paimon Sink 将数据按 Bucket 级别写入 Paimon 表中。其中的 CompactManager 将异步触发压缩操作。
3、Committer Operator 是一个单例,负责提交和过期快照。

接下来,我们将逐步介绍端到端的数据流程:

0c81aca22d6de1f13cd71dd5e312c7de.png

MySQL CDC Source 首先读取快照数据和增量数据,然后对它们进行规范化处理,并将其发送到下游。

f1c5797ae530bdb31755ef8f5d4d07e6.png

Paimon Sink 首先将新记录缓存在基于堆的 LSM 树中,并在内存缓冲区满时将其 flush 到磁盘上。请注意,每个写入的数据文件都是一个 sorted run。在这个阶段,还没有创建 manifest 文件和 snapshot。在 Flink 执行 Checkpoint 之前,Paimon Sink 将 flush 所有缓冲的记录并发送可提交的消息到下游,下游会在 Checkpoint 期间由 Committer Operator 读取并提交。

b439f35f502c32b49c12c9d4e318a091.png

在 Checkpoint 期间,Committer Operator 将创建一个新的 snapshot,并将其与 manifest lists 关联,以便snapshot 包含表中所有数据文件的信息。

a91a006ef76561aab533def80691a5c8.png

稍等一会后,可能会进行异步压缩,CompactManager 生成的可提交消息包含有关先前文件和合并文件的信息,以便 Committer Operator 可以构建相应的 manifest entries。在这种情况下,Committer Operator 在Flink Checkpoint 期间可能会生成两个 snapshot,一个用于写入的数据(类型为 Append 的快照),另一个用于压缩(类型为 Compact 的快照)。如果在 Checkpoint 间隔期间没有写入数据文件,则只会创建类型为Compact 的快照。Committer Operator 将检查 snapshot 的过期情况,并对标记为删除的数据文件执行物理删除操作。

b2863a1c2486c6419bf27704d9ec5ff4.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/676670.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

STM32 定时器

目录 TIM 定时器定时中断 定时器外部时钟 PWM驱动LED呼吸灯&#xff08;OC&#xff09; PWM控制舵机 PWMA驱动直流电机 输入捕获模式测频率&#xff08;IC&#xff09; 输入捕获模式测占空比 编码器接口测速(编码器接口) TIM 通用定时器 高级定时器 定时器定时中断 Ti…

springboot项目热部署实现(Spring Boot DevTools方式)

文章目录 Spring Boot DevTools简介Spring Boot DevTools原理spring Boot Devtools优缺点Spring Boot DevTools集成步骤第一步&#xff1a;添加maven依赖第二步&#xff1a;IDEA热部署配置 Spring Boot DevTools简介 Spring Boot DevTools是Spring Boot提供的一个开发工具&…

Vue事件中如何使用 event 对象

在Vue中&#xff0c;事件处理函数常常需要获取事件触发时的相关信息&#xff0c;比如鼠标位置、按键信息等。而要获取这些信息&#xff0c;就需要使用event对象。那么在Vue的事件中如何正确使用event对象呢&#xff1f;接下来就来详细介绍一下。 首先&#xff0c;在Vue的事件中…

JSP编程

JSP编程 您需要理解在JSP API的类和接口中定义的用于创建JSP应用程序的各种方法的用法。此外,还要了解各种JSP组件,如在前一部分中学习的JSP动作、JSP指令及JSP脚本。JSP API中定义的类提供了可借助隐式对象通过JSP页面访问的方法。 1. JSP API的类 JSP API是一个可用于创建…

专业145+总分400+合肥工业大学833信号分析与处理综合考研经验电子信息通信,真题,大纲,参考书

今年专业课145总分400&#xff0c;我总结一下自己的专业课合肥工业大学833信号分析与处理和其他几门的复习经验。希望对大家复习有帮助。 我所用的教材是郑君里的《信号与系统》&#xff08;第三版&#xff09;和高西全、丁玉美的《数字信号处理》&#xff08;第四版&#xff…

堆排序----C语言数据结构

目录 引言 堆排序的实现**堆的向下调整算法** 对排序的时间复杂度建堆的时间复杂度&#xff1a;排序过程的时间复杂度&#xff1a;总体时间复杂度&#xff1a; 引言 堆排序&#xff08;Heap Sort&#xff09;是一种基于比较的排序算法&#xff0c;利用堆的数据结构来实现。它的…

备战蓝桥杯---动态规划之背包问题引入

先看一个背包问题的简单版&#xff1a; 如果我们暴力枚举可能会超时。 但我们想一想&#xff0c;我们其实不关心怎么放&#xff0c;我们关心的是放后剩下的体积。 用可行性描述即可。 于是我们令f[i][j]表示前i个物品能否放满体积为j的背包。 f[i][j]f[i-1][j]||f[i-1][j-v…

C++ 内存管理(newdelete)

目录 本节目标 1. C/C内存分布 2. C语言中动态内存管理方式&#xff1a;malloc/calloc/realloc/free 3. C内存管理方式 3.1 new/delete操作内置类型 3.2 new和delete操作自定义类型 4. operator new与operator delete函数 5. new和delete的实现原理 6. 定位new表达式(placem…

【5G NR】【一文读懂系列】移动通讯中使用的信道编解码技术-卷积码原理

目录 一、引言 二、卷积编码的发展历史 2.1 卷积码的起源 2.2 主要发展阶段 2.3 重要里程碑 三、卷积编码的基本概念 3.1 基本定义 3.2 编码器框图 3.3 编码多项式 3.4 网格图(Trellis)描述 四、MATLAB示例 一、引言 卷积编码&#xff0c;作为数字通信领域中的一项…

文心一言 VS 讯飞星火 VS chatgpt (197)-- 算法导论14.3 5题

五、用go语言&#xff0c;对区间树 T 和一个区间 i &#xff0c;请修改有关区间树的过程来支持新的操作 INTERVALSEARCH-EXACTLY(T&#xff0c;i) &#xff0c;它返回一个指向 T 中结点 x 的指针&#xff0c;使得 x.int. lowi.low 且 x.int.high i.high ;或者&#xff0c;如果…

爱奇艺图片格式演进

01 背景 图片是爱奇艺APP页面的主要视觉元素&#xff0c;对整体用户体验有着至关重要的影响。同时&#xff0c;由大量启动带来的图片CDN峰值带宽成本也有待降低。因此&#xff0c;在努力提升用户体验的同时&#xff0c;优化图片CDN峰值带宽成本已成为一项关键任务。而决定图片显…

研究多态恶意软件,探讨网络安全与AI

前言 近期ChatGPT火遍全球&#xff0c;AI技术被应用到了全球各行各业当中&#xff0c;国内外各大厂商也开始推出自己的ChatGPT&#xff0c;笔者所在公司在前段时间也推出了自研的安全GPT&#xff0c;AI技术在网络安全行业得到了很多的应用&#xff0c;不管是网络安全研究人员、…

ElasticSearch之倒排索引

写在前面 本文看下es的倒排索引相关内容。 1&#xff1a;正排索引和倒排索引 正排索引就是通过文档id找文档内容&#xff0c;而倒排索引就是通过文档内容找文档id&#xff0c;如下图&#xff1a; 2&#xff1a;倒排索引原理 假定我们有如下的数据&#xff1a; 为了建立倒…

?你咋知道我的电脑密码的?---> Mimikatz!

还记得昨天在内网中提到了mimikatz这个工具&#xff0c;那么今天就来和大家讲一下这一款牛逼的工具 但是在这里先祝自己和各位看官新年快乐&#xff0c;万事顺遂 &#x1f409;&#x1f432;&#x1f432;&#x1f432;&#x1f432; 1.Mimikatz的介绍 传说呢&#xff0c;是…

学习通考试怎么搜题找答案? #学习方法#微信#其他

大学生必备的做题、搜题神器&#xff0c;收录上万本教材辅助书籍&#xff0c;像什么高数、物理、计算机、外语等都有&#xff0c;资源十分丰富。 1.菜鸟教程 菜鸟教程是一个完全免费的编程学习软件。 它免费提供了HTML / CSS 、JavaScript 、服务端、移动端、XML 教程、http…

开发JSP应用程序

开发JSP应用程序 问题陈述 TecknoSoft Pvt Ltd.公司的首席技术官(CTO)John Barrett将创建一个应用程序的任务委托给了开发团队,该应用程序应在客户访问其账户详细信息前验证其客户ID和密码。客户ID应是数字形式。John希望如果所输入的客户ID或密码不正确,应向客户显示错误…

Stable Diffusion 模型下载:Disney Pixar Cartoon Type A(迪士尼皮克斯动画片A类)

文章目录 模型介绍生成案例案例一案例二案例三案例四案例五案例六案例七案例八案例九案例十 下载地址 模型介绍 目前还没有一个好的皮克斯迪士尼风格的卡通模型&#xff0c;所以我决定自己制作一个。这是将皮克斯风格模型与我自己的Loras合并在一起&#xff0c;创建一个通用的…

endnotesX9 如何批量导入 .enw文件

文章是用schoolar搜出来 点击下载引用之后&#xff0c;endnotesX9只能一个一个从.enw文件导入&#xff0c;麻烦 —————————————— 可以在schoolar保存到个人图书馆 类似于上面这种&#xff0c;我用的是保存&#xff0c;保存很多的论文之后点我的个人图书馆&#x…

【GO语言卵细胞级别教程】03.条件与循环语句

注意&#xff1a;以下演示所用的项目&#xff0c;在第一章节已经介绍了&#xff0c;这里不做赘述 目录&#xff1a; 【GO语言卵细胞级别教程】03.条件与循环语句1.条件语句1.1 if语句1.1.1 单层if语句1.1.2 if-else语句1.1.3 if-else-if 语句1.1.4 if 嵌套 1.2 switch 语句1.1…

使用CICFlowMeter 实现对pcap文件的特征提取【教程】

使用CICFlowMeter 实现对pcap文件的特征提取【教程】 针对现有的关于CICFlowMeter 的使用教程不够全面&#xff0c;一些细节没有展示&#xff0c;我将结合网络上的相关资料和实际的经历&#xff0c;提供一些经验和建议。 configuration information --------------- Windows…