大数据组件(四)快速入门实时数据湖存储系统Apache Paimon(1)
- Apache Paimon 是一项
流式数据湖存储技术
,可以为用户提供高吞吐、低延迟的数据摄入、流式订阅以及实时查询能力。- 读/写:Paimon 支持多种读/写数据和执行 OLAP 查询的方式。
- 对于读取,支持如下三种方式消费数据
- 历史快照(批处理模式)
- 最新的偏移量(流模式)
- 混合模式下读取增量快照
- 对于写入,它支持
来自数据库变更日志(CDC)的流式同步
或来自离线数据的批量插入/覆盖
。
- 对于读取,支持如下三种方式消费数据
- 生态系统:除了Apache Flink之外,Paimon还支持Apache Hive、Apache Spark、Trino等其他计算引擎的读取。
- 底层存储:Paimon 将列式文件存储在文件系统/对象存储上,并使用 LSM 树结构来支持大量数据更新和高性能查询。
- Paimon 提供抽象概念的表:
- 在批处理执行模式下,它就像一个Hive表,支持Batch SQL的各种操作。 查询它以查看最新的快照。
- 在流执行模式下,它的作用就像一个消息队列。 查询它的行为就像从
历史数据永不过期的消息队列中
查询stream changelog。
- 读/写:Paimon 支持多种读/写数据和执行 OLAP 查询的方式。
- 今天,我们快速了解下最近比较火的Apache Paimon:
- 官方文档:https://paimon.apache.org/docs/1.0/
- 推荐阅读:当流计算邂逅数据湖:Paimon 的前生今世
1 Apache Paimon的下载及安装
1.1 下载及安装
# flink 1.16下载地址
https://archive.apache.org/dist/flink/flink-1.16.0/# 相关配置
[root@centos01 ~]# cat /opt/apps/flink-1.16.0/conf/flink-conf.yaml
......
# 解决中文乱码,1.17之前参数是env.java.opts,后面版本是env.java.opts.all
env.java.opts: -Dfile.encoding=UTF-8
classloader.check-leaked-classloader: falseexecution.checkpointing.interval: 10s
state.backend: rocksdb
state.checkpoints.dir: hdfs://centos01:8020/flink_ck/ckps
state.backend.incremental: true# 环境变量配置(需要FLINK_HOME以及HADOOP_CLASSPATH)
[root@centos01 ~]# cat /etc/profile
export JAVA_HOME=/opt/apps/jdk1.8.0_141
export HADOOP_HOME=/opt/apps/hadoop-3.1.1
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export HIVE_HOME=/opt/apps/hive-3.1.2
export SPARK_HOME=/opt/apps/spark-3.3.0
export FLINK_HOME=/opt/apps/flink-1.16.0
export SEATUNNEL_HOME=/opt/apps/seatunnel-2.3.8
export ZOOKEEPER_HOME=/opt/apps/zookeeper-3.5.8
export HBASE_HOME=/opt/apps/hbase-2.2.5
export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/sbin:$HADOOP_HOME/bin:$FLINK_HOME/bin:$HIVE_HOME/bin:$SEATUNNEL_HOME/bin:$ZOOKEEPER_HOME/bin:$HBASE_HOME/bin::$SPARK_HOME/bin:$SPARK_HOME/sbin
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
export HADOOP_CLASSPATH=`hadoop classpath`# paimon与flink集成的jar包下载地址(我这里使用flink 1.16)
https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-1.16/1.0.0/
# action的包下载地址
https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-action/1.0.0/# 上传paimon的jar包及action包到flink的lib目录
[root@centos01 apps]# mv ./paimon-flink-1.16-1.0.0.jar /opt/apps/flink-1.16.0/lib/
[root@centos01 apps]# mv ./paimon-flink-action-1.0.0.jar /opt/apps/flink-1.16.0/lib/# 这里又添加了几个jar包用于后续使用
[root@centos01 ~]# ll /opt/apps/flink-1.16.0/lib/
# 手动添加的jar包,为了使用flink-sql客户端、整合hive、读取kafka数据、以及解决冲突等
-rw-r--r--. 1 root root 53820 Feb 14 14:34 commons-cli-1.4.jar
-rw-r--r--. 1 root root 3534428 Feb 18 11:25 kafka-clients-2.6.2.jar
-rw-r--r--. 1 root root 396461 Feb 18 11:07 flink-connector-kafka-1.16.0.jar
-rw-r--r--. 1 root root 59604787 Feb 14 14:34 flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar
-rw-r--r--. 1 root root 8645789 Feb 14 14:58 flink-connector-hive_2.12-1.16.0.jar
-rw-r--r--. 1 root root 50975967 Feb 14 15:49 flink-sql-connector-hive-3.1.2_2.12-1.16.0.jar
-rw-r--r--. 1 root root 1654887 Feb 14 14:46 hadoop-mapreduce-client-core-3.1.1.jar# paimon相关jar包
-rw-r--r--. 1 root root 48288345 Feb 14 14:40 paimon-flink-1.16-1.0.0.jar
-rw-r--r--. 1 root root 11469 Feb 17 11:33 paimon-flink-action-1.0.0.jar# flink原始jar包
-rw-r--r--. 1 502 games 198855 Oct 20 2022 flink-cep-1.16.0.jar
-rw-r--r--. 1 502 games 515825 Oct 20 2022 flink-connector-files-1.16.0.jar
-rw-r--r--. 1 502 games 102470 Oct 20 2022 flink-csv-1.16.0.jar
-rw-r--r--. 1 502 games 117102633 Oct 20 2022 flink-dist-1.16.0.jar
-rw-r--r--. 1 502 games 180250 Oct 20 2022 flink-json-1.16.0.jar
-rw-r--r--. 1 502 games 21052633 Oct 20 2022 flink-scala_2.12-1.16.0.jar
-rw-r--r--. 1 502 games 10737871 Sep 20 2022 flink-shaded-zookeeper-3.5.9.jar
-rw-r--r--. 1 502 games 15367434 Oct 20 2022 flink-table-api-java-uber-1.16.0.jar
-rw-r--r--. 1 502 games 36237974 Oct 20 2022 flink-table-planner-loader-1.16.0.jar
-rw-r--r--. 1 502 games 3133682 Oct 20 2022 flink-table-runtime-1.16.0.jar
-rw-r--r--. 1 502 games 208006 Sep 20 2022 log4j-1.2-api-2.17.1.jar
-rw-r--r--. 1 502 games 301872 Sep 20 2022 log4j-api-2.17.1.jar
-rw-r--r--. 1 502 games 1790452 Sep 20 2022 log4j-core-2.17.1.jar
-rw-r--r--. 1 502 games 24279 Sep 20 2022 log4j-slf4j-impl-2.17.1.jar# 在hive中也能查询已有的paimon表
# 下载地址
https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-hive-connector-3.1/1.0.0/
# 在Hive中,auxlib目录是一个特殊目录,用于存放Hive自动加载的第三方JAR包
[root@centos01 hive-3.1.2]# mkdir ./auxlib
[root@centos01 hive-3.1.2]# cp /opt/apps/paimon-hive-connector-3.1-1.0.0.jar ./auxlib/# 启动hadoop和yarn
[root@centos01 ~]# start-all.sh
# 启动Hive
[root@centos01 ~]# nohup hive --service metastore 2>&1 &
[root@centos01 ~]# nohup hive --service hiveserver2 2>&1 & # yarn-session模式
# 中http://centos01:8088/cluster可以看到Flink session cluster的job ID
[root@centos01 ~]# /opt/apps/flink-1.16.0/bin/yarn-session.sh -d# 启动Flink的sql-client
[root@centos01 ~]# /opt/apps/flink-1.16.0/bin/sql-client.sh -s yarn-session# paimon支持hdfs、Hive以及JDBC的catalog
# 我们这里使用hive元数据(注意:下面的sql语句每次进入flink-sql客户端都要执行)
Flink SQL> CREATE CATALOG paimon_hive_catalog WITH ('type' = 'paimon','metastore' = 'hive','uri' = 'thrift://centos01:9083'
);Flink SQL> USE CATALOG paimon_hive_catalog;
Flink SQL> create database if not exists paimon_db;
Flink SQL> use paimon_db;# 设置显示模式
Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';# 设置流模式
Flink SQL> SET 'execution.runtime-mode' = 'streaming';
1.2 文件布局简介
- 如下图所示,一张表的所有文件都存储在一个基本目录下,Paimon 文件以分层方式组织。
- 核心概念参考官网:https://paimon.apache.org/docs/1.0/concepts/overview/
-
上图说明了 Paimon 的文件布局, 从snapshot文件开始,Paimon reader可以递归地访问表中的所有记录。
-
Snapshot Files:所有snapshot文件都存储在snapshot目录中。
snapshot文件是一个 JSON 文件
,包含有关此snapshot的信息,包括:- 正在使用的Schema文件
- 包含此snapshot的所有更改的清单列表(manifest list)
-
Manifest Files:
- 所有清单(manifest)列表和清单文件都存储在清单目录中。
- 清单列表(manifest list)是清单文件名的列表。
- 清单文件是包含有关 LSM 数据文件和changelog文件的更改的文件。 例如对应快照中创建了哪个LSM数据文件、删除了哪个文件。
-
Data Files:
- 数据文件按分区和桶(Bucket)分组。每个Bucket目录都包含一个 LSM 树及其changelog文件。
- 目前,Paimon 支持使用 orc(默认)、parquet 和 avro 作为数据文件格式。
-
Paimon 采用 LSM 树(日志结构合并树)作为文件存储的数据结构
。-
LSM 树将文件组织成多个 sorted runs(如下图所示)。 sorted runs由一个或多个数据文件组成,并且每个数据文件恰好属于一个 sorted runs。
-
数据文件中的记录按其主键排序。 在 sorted runs中,数据文件的主键范围永远不会重叠。
-
如图所示,不同的 sorted runs可能具有重叠的主键范围,甚至可能包含相同的主键。查询LSM树时,必须合并所有 sorted runs,并且必须根据用户指定的
合并引擎和每条记录的时间戳
来合并具有相同主键的所有记录。 -
写入LSM树的新记录将首先缓存在内存中。当内存缓冲区满时,内存中的所有记录将被顺序并刷新到磁盘,并创建一个新的 sorted runs。
-
-
-
Compaction
- 当越来越多的记录写入LSM树时,sorted runs的数量将会增加。由于查询LSM树需要将所有 sorted runs合并起来,太多 sorted runs将导致查询性能较差,甚至内存不足。
- 为了限制 sorted runs的数量,我们必须偶尔将多个 sorted runs合并为一个大的 sorted runs。 这个过程称为压缩。
- 然而,压缩是一个资源密集型过程,会消耗一定的CPU时间和磁盘IO,因此过于频繁的压缩可能会导致写入速度变慢。 这是查询和写入性能之间的权衡。 Paimon 目前采用了类似于 Rocksdb 通用压缩的压缩策略。
- 默认情况下,当Paimon将记录追加到LSM树时,它也会根据需要执行压缩。 用户还可以选择在专用压缩作业中执行所有压缩。
-
2 主键表引擎详解
- 当Paimon sink收到两条或更多具有相同主键的记录时,它会将它们合并为一条记录以保持主键唯一。
- 通过指定paimon表的merge-engine属性,用户可以选择如何将记录合并在一起。
2.1 默认的合并引擎deduplicate
- deduplicate合并引擎是默认的合并引擎。
- 设置deduplicate合并引擎,paimon表只会保留最新的记录,并丢弃其他具有相同主键的记录。
- 如果最新的记录是DELETE记录,则所有具有相同主键的记录都将被删除。
Flink SQL> drop table if exists orders_dedup;
-- orders_dedup文件目录:hdfs://centos01:8020/user/hive/warehouse/paimon_db.db/orders_dedup
Flink SQL> CREATE TABLE orders_dedup (order_id INT PRIMARY KEY NOT ENFORCED, customer_id INT,product_id INT,quantity INT,price DECIMAL(10, 2),update_time TIMESTAMP(3)
) WITH ('connector' = 'paimon','merge-engine' = 'deduplicate'
);-- 插入数据(有兴趣的可以看hdfs上目录的变化)
Flink SQL> INSERT INTO orders_dedup VALUES (1, 101, 201, 2, 99.99, CURRENT_TIMESTAMP), (2, 102, 202, 1, 49.99, CURRENT_TIMESTAMP);-- 我们可以批读
Flink SQL> SET 'execution.runtime-mode' = 'batch';
Flink SQL> select * from orders_dedup;
+----------+-------------+------------+----------+-------+-------------------------+
| order_id | customer_id | product_id | quantity | price | update_time |
+----------+-------------+------------+----------+-------+-------------------------+
| 1 | 101 | 201 | 2 | 99.99 | 2025-02-18 21:47:20.832 |
| 2 | 102 | 202 | 1 | 49.99 | 2025-02-18 21:47:20.835 |
+----------+-------------+------------+----------+-------+-------------------------+
2 rows in set-- 我们也可以流读
Flink SQL> SET 'execution.runtime-mode' = 'streaming';
Flink SQL> select * from orders_dedup;
+----+-------------+-------------+-------------+-------------+--------------+-------------------------+
| op | order_id | customer_id | product_id | quantity | price | update_time |
+----+-------------+-------------+-------------+-------------+--------------+-------------------------+
| +I | 1 | 101 | 201 | 2 | 99.99 | 2025-02-18 21:47:20.832 |
| +I | 2 | 102 | 202 | 1 | 49.99 | 2025-02-18 21:47:20.835 |-- 我们在另一个flink-sql窗口,插入重复数据
INSERT INTO orders_dedup VALUES (1, 101, 201, 3, 99.99, CURRENT_TIMESTAMP); -- 更新数量-- 在刚才orders_dedup流读窗口,可以发现出现了-U和+U
+----+-------------+-------------+-------------+-------------+--------------+-------------------------+
| op | order_id | customer_id | product_id | quantity | price | update_time |
+----+-------------+-------------+-------------+-------------+--------------+-------------------------+
| +I | 1 | 101 | 201 | 2 | 99.99 | 2025-02-18 21:47:20.832 |
| +I | 2 | 102 | 202 | 1 | 49.99 | 2025-02-18 21:47:20.835 |
| -U | 1 | 101 | 201 | 2 | 99.99 | 2025-02-18 21:47:20.832 |
| +U | 1 | 101 | 201 | 3 | 99.99 | 2025-02-18 21:54:34.921 |-- 此时我们进行批量读取
Flink SQL> SET 'execution.runtime-mode' = 'batch';
Flink SQL> select * from orders_dedup;
-- 可以发现,order_id为1行发生了变化
+----------+-------------+------------+----------+-------+-------------------------+
| order_id | customer_id | product_id | quantity | price | update_time |
+----------+-------------+------------+----------+-------+-------------------------+
| 1 | 101 | 201 | 3 | 99.99 | 2025-02-18 21:54:34.921 |
| 2 | 102 | 202 | 1 | 49.99 | 2025-02-18 21:47:20.835 |
+----------+-------------+------------+----------+-------+-------------------------+
2 rows in set-- 注意:在hive中也能读取此表(原始数据只有一份)
-- 其实,还能够在hive中建立paimon表,并插入数据,这里不再演示
0: jdbc:hive2://192.168.42.101:10000> use paimon_db;
No rows affected (0.718 seconds)
0: jdbc:hive2://192.168.42.101:10000> select * from orders_dedup;
+------------------------+---------------------------+--------------------------+------------------------+---------------------+---------------------------+
| orders_dedup.order_id | orders_dedup.customer_id | orders_dedup.product_id | orders_dedup.quantity | orders_dedup.price | orders_dedup.update_time |
+------------------------+---------------------------+--------------------------+------------------------+---------------------+---------------------------+
| 1 | 101 | 201 | 3 | 99.99 | 2025-02-18 21:54:34.921 |
| 2 | 102 | 202 | 1 | 49.99 | 2025-02-18 21:47:20.835 |
2.2 partial-update
-
通过指定
'merge-engine' = 'partial-update'
,用户可以通过多次更新来更新记录的列,直到记录完成。 -
这是通过使用同一主键下的最新数据逐一更新值字段来实现的,
注意:在此过程中不会覆盖空值
。 -
如下所示:
- <1, 23.0, 10, NULL>-
- <1, NULL, NULL, ‘This is a book’>
- <1, 25.2, NULL, NULL>
假设第一列是主键key,那么最后的结果是 <1, 25.2, 10, ‘This is a book’>
Flink SQL> SET 'execution.runtime-mode' = 'batch';
Flink SQL> CREATE TABLE orders_partial_update (order_id INT PRIMARY KEY NOT ENFORCED,customer_id INT,product_id INT,quantity INT,price DECIMAL(10, 2),update_time TIMESTAMP(3)
) WITH ('connector' = 'paimon','merge-engine' = 'partial-update', -- 指定部分更新'partial-update.remove-record-on-delete' = 'true'
);-- 插入数据
Flink SQL> INSERT INTO orders_partial_update VALUES (1, 101, 201, 2, 99.99, CURRENT_TIMESTAMP)
, (2, 102, 202, 1, 49.99, CURRENT_TIMESTAMP);Flink SQL> select * from orders_partial_update;
+----------+-------------+------------+----------+-------+-------------------------+
| order_id | customer_id | product_id | quantity | price | update_time |
+----------+-------------+------------+----------+-------+-------------------------+
| 1 | 101 | 201 | 2 | 99.99 | 2025-02-14 17:22:57.942 |
| 2 | 102 | 202 | 1 | 49.99 | 2025-02-14 17:22:57.942 |
+----------+-------------+------------+----------+-------+-------------------------+-- 部分更新
Flink SQL> INSERT INTO orders_partial_update(order_id, customer_id, product_id, quantity, price, update_time)
VALUES (1, cast(NULL as INT), cast(NULL as INT), 3, cast(NULL as DECIMAL(10, 2)), CURRENT_TIMESTAMP); Flink SQL> select * from orders_partial_update;
-- 可以看到quantity和update_time发生了变化,其他字段不变
+----------+-------------+------------+----------+-------+-------------------------+
| order_id | customer_id | product_id | quantity | price | update_time |
+----------+-------------+------------+----------+-------+-------------------------+
| 1 | 101 | 201 | 3 | 99.99 | 2025-02-14 17:27:49.498 |
| 2 | 102 | 202 | 1 | 49.99 | 2025-02-14 17:22:57.942 |
+----------+-------------+------------+----------+-------+-------------------------+-- 时光旅行
Flink SQL> SELECT * FROM orders_partial_update /*+ OPTIONS('scan.snapshot-id' = '1') */;
+----------+-------------+------------+----------+-------+-------------------------+
| order_id | customer_id | product_id | quantity | price | update_time |
+----------+-------------+------------+----------+-------+-------------------------+
| 1 | 101 | 201 | 2 | 99.99 | 2025-02-14 17:22:57.942 |
| 2 | 102 | 202 | 1 | 49.99 | 2025-02-14 17:22:57.942 |
+----------+-------------+------------+----------+-------+-------------------------+
2 rows in setFlink SQL> SELECT * FROM orders_partial_update /*+ OPTIONS('scan.snapshot-id' = '2') */;
+----------+-------------+------------+----------+-------+-------------------------+
| order_id | customer_id | product_id | quantity | price | update_time |
+----------+-------------+------------+----------+-------+-------------------------+
| 1 | 101 | 201 | 3 | 99.99 | 2025-02-14 17:27:49.498 |
| 2 | 102 | 202 | 1 | 49.99 | 2025-02-14 17:22:57.942 |
+----------+-------------+------------+----------+-------+-------------------------+
那么,"partial-update"有何作用呢?
2.2.1 利用"partial-update"实现双流join
- 传统方式:Flink 双流 Join 需要将两个流的所有数据都存储在 Flink 状态中,以便进行匹配。随着数据量的增长,状态会变得非常庞大,可能导致内存和存储压力。
- Paimon 方式:Paimon 表作为外部存储,将 Join 结果直接写入表中,Flink 不需要维护庞大的状态。这显著减少了 Flink 的状态管理压力。
-- 创建paimon结果表
drop table if exists order_payments;
CREATE TABLE order_payments (order_id STRING,order_time TIMESTAMP comment '订单时间',customer_id STRING,total_amount DECIMAL(10, 2),payment_status STRING,payment_time TIMESTAMP comment '支付时间',PRIMARY KEY (order_id) NOT ENFORCED
) WITH ('connector' = 'paimon','merge-engine' = 'partial-update','changelog-producer' = 'lookup','partial-update.remove-record-on-delete' = 'true'
);-- 订单信息表(临时表)
CREATE TEMPORARY TABLE orders (order_id STRING,order_time TIMESTAMP,customer_id STRING,total_amount DECIMAL(10, 2)
) WITH ('connector' = 'kafka','topic' = 'orders','properties.bootstrap.servers' = 'localhost:9092','format' = 'json','properties.group.id' = 'testordersGroup','json.fail-on-missing-field' = 'false','scan.startup.mode' = 'earliest-offset','json.ignore-parse-errors' = 'true'
);-- 创建topic(临时表)
/opt/apps/kafka_2.12-2.6.2/bin/kafka-topics.sh --create --topic orders --replication-factor 1 --partitions 1 --bootstrap-server centos01:9092
-- 启动命令行生产者,生产数据
/opt/apps/kafka_2.12-2.6.2/bin/kafka-console-producer.sh --topic orders --bootstrap-server centos01:9092
{"order_id": "order_1", "order_time": "2023-10-01 10:00:00", "customer_id": "customer_1", "total_amount": 100.00}
{"order_id": "order_2", "order_time": "2023-10-01 11:00:00", "customer_id": "customer_2", "total_amount": 200.00}-- 支付信息表
CREATE TEMPORARY TABLE payments (payment_id STRING,order_id STRING,payment_time TIMESTAMP,amount DECIMAL(10, 2)
) WITH ('connector' = 'kafka','topic' = 'payments','properties.bootstrap.servers' = 'localhost:9092','format' = 'json','properties.group.id' = 'testpaymentsGroup','json.fail-on-missing-field' = 'false','scan.startup.mode' = 'earliest-offset','json.ignore-parse-errors' = 'true'
);-- 创建topic
/opt/apps/kafka_2.12-2.6.2/bin/kafka-topics.sh --create --topic payments --replication-factor 1 --partitions 1 --bootstrap-server centos01:9092 Flink SQL> SET 'table.exec.sink.upsert-materialize' = 'NONE';-- 插入到paimon表中
Flink SQL> insert into order_payments
selectcoalesce(o.order_id, p.order_id) as order_id,o.order_time,o.customer_id,o.total_amount,casewhen p.payment_id is not null then 'paid'else 'unpaid'end as payment_status,p.payment_time
fromorders o
left joinpayments p
ono.order_id = p.order_id;Flink SQL> SET 'execution.runtime-mode' = 'streaming';
Flink SQL> select * from order_payments;
+----+--------------------------------+----------------------------+--------------------------------+--------------+--------------------------------+----------------------------+
| op | order_id | order_time | customer_id | total_amount | payment_status | payment_time |
+----+--------------------------------+----------------------------+--------------------------------+--------------+--------------------------------+----------------------------+
| +I | order_1 | 2023-10-01 10:00:00.000000 | customer_1 | 100.00 | unpaid | <NULL> |
| +I | order_2 | 2023-10-01 11:00:00.000000 | customer_2 | 200.00 | unpaid | <NULL> |-- 然后,启动命令行生产者,生产数据
/opt/apps/kafka_2.12-2.6.2/bin/kafka-console-producer.sh --topic payments --bootstrap-server centos01:9092
{"payment_id": "payment_1", "order_id": "order_1", "payment_time": "2023-10-01 10:30:00", "amount": 100.00}
{"payment_id": "payment_2", "order_id": "order_2", "payment_time": "2023-10-01 11:30:00", "amount": 200.00}-- 可以发现当支付信息相关字段发生了更新
+----+--------------------------------+----------------------------+--------------------------------+--------------+--------------------------------+----------------------------+
| op | order_id | order_time | customer_id | total_amount | payment_status | payment_time |
+----+--------------------------------+----------------------------+--------------------------------+--------------+--------------------------------+----------------------------+
| -U | order_1 | 2023-10-01 10:00:00.000000 | customer_1 | 100.00 | unpaid | <NULL> |
| +U | order_1 | 2023-10-01 10:00:00.000000 | customer_1 | 100.00 | paid | 2023-10-01 10:30:00.000000 |
| -U | order_2 | 2023-10-01 11:00:00.000000 | customer_2 | 200.00 | unpaid | <NULL> |
| +U | order_2 | 2023-10-01 11:00:00.000000 | customer_2 | 200.00 | paid | 2023-10-01 11:30:00.000000 |
2.2.2 Sequence Field(序列字段)的使用
- 在分布式计算中,会出现导致数据无序的情况,这可能会导致错误
- 具体可以参考如下案例:
drop table if exists my_table_demo_1;
CREATE TABLE my_table_demo_1 (pk BIGINT PRIMARY KEY NOT ENFORCED,v1 DOUBLE, v2 BIGINT, update_time TIMESTAMP(3)
) WITH ('connector' = 'paimon'
);-- 在分布式计算中,会出现导致数据无序的情况,如下所示:
INSERT INTO my_table_demo_1 VALUES
(1, 100.0, 10, TIMESTAMP '2025-02-18 15:01:00'),
(1, 200.0, 20, TIMESTAMP '2025-02-18 15:02:00'),
(1, 200.0, 20, TIMESTAMP '2025-02-18 15:00:00'), -- 乱序数据
(2, 300.0, 30, TIMESTAMP '2025-02-18 15:30:00');-- 可以看到最后保留的是'2025-02-18 15:00:00'的数据
Flink SQL> SELECT * FROM my_table_demo_1;
+----+---------------+---------------+----------------+-------------------------+
| op | pk | v1 | v2 | update_time |
+----+---------------+---------------+----------------+-------------------------+
| +I | 1 | 200.0 | 20 | 2025-02-18 15:00:00.000 |
| +I | 2 | 300.0 | 30 | 2025-02-18 15:30:00.000 |
- 利用Sequence Field(序列字段)可以解决上述问题,会根据设定的序列字段保留最终的数据。如果序列字段的值相同,则根据输入顺序决定哪一个最后合并。
- 可以为sequence.field定义多个字段,例如 ‘update_time,flag’,在这种情况下,会按照顺序比较多个字段。
- 用户自定义的序列字段与诸如first_row和first_value等特性冲突,这可能导致意外的结果。
drop table if exists my_table_demo_2;
CREATE TABLE my_table_demo_2 (pk BIGINT PRIMARY KEY NOT ENFORCED,v1 DOUBLE, v2 BIGINT, update_time TIMESTAMP(3)
) WITH ('connector' = 'paimon', 'sequence.field' = 'update_time' -- 设置sequence.field为update_time
);-- 乱序数据
INSERT INTO my_table_demo_2 VALUES
(1, 100.0, 10, TIMESTAMP '2025-02-18 15:01:00'),
(1, 200.0, 20, TIMESTAMP '2025-02-18 15:02:00'), -- 这条记录将因为update_time较晚而被最后合并
(1, 200.0, 20, TIMESTAMP '2025-02-18 15:00:00'),
(2, 300.0, 30, TIMESTAMP '2025-02-18 15:30:00');Flink SQL> SELECT * FROM my_table_demo_2;
+----+------------+---------------+------------+-------------------------+
| op | pk | v1 | v2 | update_time |
+----+------------+---------------+------------+-------------------------+
| +I | 1 | 200.0 | 20 | 2025-02-18 15:02:00.000 |
| +I | 2 | 300.0 | 30 | 2025-02-18 15:30:00.000 |
2.2.3 利用Sequence Group解决多流更新出现的混乱
- 指定
Sequence Field
并不能解决多流更新的部分更新表的乱序问题,因为多流更新时 Sequence(序列) 字段可能会被另一个流的最新数据覆盖。 - 因此paimon引入了部分更新表的序列组(Sequence Group)机制:
- 每个流定义其自己的序列组,用来解决多流更新时出现混乱问题;
- 真正的部分更新,而不仅仅是非空更新。
drop table if exists T;
CREATE TABLE T (k INT,a INT,b INT,g_1 INT,c INT,d INT,g_2 INT,PRIMARY KEY (k) NOT ENFORCED
) WITH ('merge-engine'='partial-update','fields.g_1.sequence-group'='a,b', 'fields.g_2.sequence-group'='c,d'
);INSERT INTO T VALUES (1, 1, 1, 1, 1, 1, 1);Flink SQL> select * from T;
+---+---+---+-----+---+---+-----+
| k | a | b | g_1 | c | d | g_2 |
+---+---+---+-----+---+---+-----+
| 1 | 1 | 1 | 1 | 1 | 1 | 1 |
+---+---+---+-----+---+---+-----+
1 row in set-- g_2为null, 因此c, d字段不会被更新
INSERT INTO T VALUES (1, 2, 2, 2, 2, 2, CAST(NULL AS INT));Flink SQL> select * from T;
+---+---+---+-----+---+---+-----+
| k | a | b | g_1 | c | d | g_2 |
+---+---+---+-----+---+---+-----+
| 1 | 2 | 2 | 2 | 1 | 1 | 1 |
+---+---+---+-----+---+---+-----+
1 row in set-- g_1 is smaller, 因此a, b字段不会被更新
-- g_2不为空,因此c, d字段会被更新
INSERT INTO T VALUES (1, 3, 3, 1, 3, 3, 3);Flink SQL> select * from T;
+---+---+---+-----+---+---+-----+
| k | a | b | g_1 | c | d | g_2 |
+---+---+---+-----+---+---+-----+
| 1 | 2 | 2 | 2 | 3 | 3 | 3 |
+---+---+---+-----+---+---+-----+
2.3 aggregation
- 有时用户只关心聚合结果,"aggregation"引擎根据聚合函数将同一主键下的各个值字段与最新数据一一聚合。
- 每个不属于主键的字段都可以被赋予一个聚合函数,由
fields.<field-name>.aggregate-function
表属性指定,否则它将使用 last_non_null_value 聚合作为默认值。 - 当前支持的聚合函数和数据类型有:
- sum:支持 DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT 和 DOUBLE。
- min/max:支持 CHAR、VARCHAR、DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT、DOUBLE、DATE、TIME、TIMESTAMP 和 TIMESTAMP_LTZ。
- last_value / last_non_null_value:支持所有数据类型。
- listagg:支持STRING数据类型。
- bool_and / bool_or:支持BOOLEAN数据类型。
- first_value/first_not_null_value:支持所有数据类型。
- 只有 sum 支持撤回(UPDATE_BEFORE 和 DELETE),其他聚合函数不支持撤回。 如果允许某些函数忽略撤回消息,可以配置:
'fields.${field_name}.ignore-retract'='true'
drop table if exists my_agg;
CREATE TABLE my_agg (product_id BIGINT,price DOUBLE,sales BIGINT,PRIMARY KEY (product_id) NOT ENFORCED
) WITH ('merge-engine' = 'aggregation','changelog-producer' = 'lookup','fields.price.aggregate-function' = 'max', -- price字段将通过max函数聚合'fields.sales.aggregate-function' = 'sum' -- sales字段将通过sum函数聚合
);insert into my_agg values(1, 23.0, 15);Flink SQL> SET 'execution.runtime-mode' = 'streaming';
Flink SQL> select * from my_agg;
+----+----------------------+--------------------------------+----------------------+
| op | product_id | price | sales |
+----+----------------------+--------------------------------+----------------------+
| +I | 1 | 23.0 | 15 |
-- 在另一个窗口,再次插入数据
Flink SQL> insert into my_agg values(1, 30.2, 20);
-- 之前的流读窗口
+----+----------------------+--------------------------------+----------------------+
| op | product_id | price | sales |
+----+----------------------+--------------------------------+----------------------+
| +I | 1 | 23.0 | 15 |
| -U | 1 | 23.0 | 15 |
| +U | 1 | 30.2 | 35 |
2.4 first-row
- 通过指定
'merge-engine' = 'first-row'
,用户可以保留同一主键的第一行。 - 它与Deduplicate合并引擎不同,在First Row合并引擎中,它将生成仅insert changelog。
CREATE TABLE user_actions (user_id INT,action_time TIMESTAMP,action STRING,PRIMARY KEY (user_id) NOT ENFORCED
) WITH ('merge-engine' = 'first-row','changelog-producer' = 'lookup', -- 必须与lookup changelog producer一起使用'ignore-delete' = 'true' -- 忽略DELETE和UPDATE_BEFORE消息
);-- 插入数据到 user_actions 表
INSERT INTO user_actions (user_id, action_time, action)
VALUES(1, TIMESTAMP '2023-10-01 10:00:00', 'login'),(1, TIMESTAMP '2023-10-01 10:05:00', 'logout'),(2, TIMESTAMP '2023-10-01 10:10:00', 'login'),(1, TIMESTAMP '2023-10-01 10:15:00', 'login'),(2, TIMESTAMP '2023-10-01 10:20:00', 'logout');Flink SQL> select * from user_actions;
+----+-------------+----------------------------+--------------------------------+
| op | user_id | action_time | action |
+----+-------------+----------------------------+--------------------------------+
| +I | 1 | 2023-10-01 10:00:00.000000 | login |
| +I | 2 | 2023-10-01 10:10:00.000000 | login |
3 changelog-producer简介
-
官网介绍:Changelog Producer | Apache Paimon
-
源码级讲解:https://xie.infoq.cn/article/f08b0912ef6565dd1cc78e988
-
flink中checkpoint和snapshot的联系
- 批写(手动执行sql脚本)每一个sql会立即生成一次checkpoint效果
- 流式写入根据checkpoint间隔,定期进行checkpoint
- 一次checkpoint会产生1-2个snapshot文件,具体是要看这次checkpoint是否触发compaction:
- 触发了就是2个data文件(一个是合并后的数据,一个本次checkpoint写入数据,默认产生5个snapshot就会触发compaction)
- 否则只有一个(本次checkpoint写入数据)
- 一次snapshot会产生一个data文件
3.1 none
'changelog-producer' = 'none'
的情况下,一次操作产生一个data文件的,不会产生changelog文件;'changelog-producer' = 'none'
的情况下,流式读取结果是正确的(含有-U、+U的op),虽然不像input模式有changelog文件产生,但是paimon会记录每次操作产生的快照,根据不同版本的快照数据,经过汇总能够推断出changelog;
-- 'changelog-producer' = 'none'的应用场景:
-- 用于只进行批处理,不进行流式处理的表
CREATE TABLE paimon_none_mode (a INT,b INT,c STRING,PRIMARY KEY (a) NOT ENFORCED
) WITH ('connector' = 'paimon','changelog-producer' = 'none'
);-- 间隔一段时间插入
INSERT INTO paimon_none_mode VALUES (1, 1, '1');
INSERT INTO paimon_none_mode VALUES (1, 1, '2');
INSERT INTO paimon_none_mode VALUES (1, 1, '3');
INSERT INTO paimon_none_mode VALUES (1, 1, '4');
INSERT INTO paimon_none_mode VALUES (1, 1, '5'); -- 此时会有两个snapshot,即:snapshot-5和snapshot-6INSERT INTO paimon_none_mode VALUES (1, 1, '6');
INSERT INTO paimon_none_mode VALUES (1, 1, '7');
INSERT INTO paimon_none_mode VALUES (1, 1, '8');
INSERT INTO paimon_none_mode VALUES (1, 1, '9'); -- 此时会有两个snapshot,即:snapshot-10和snapshot-11
INSERT INTO paimon_none_mode VALUES (1, 1, '10');-- paimon会记录每次操作产生的快照,根据不同版本的快照数据,经过汇总能够推断出changelog
-- hdfs目录中无changelog文件
Flink SQL> select * from paimon_none_mode /*+ OPTIONS('scan.snapshot-id' = '1') */;
+----+-------------+-------------+--------------+
| op | a | b | c |
+----+-------------+-------------+--------------+
| +I | 1 | 1 | 1 |
| -U | 1 | 1 | 1 |
| +U | 1 | 1 | 2 |
| -U | 1 | 1 | 2 |
| +U | 1 | 1 | 3 |
| -U | 1 | 1 | 3 |
| +U | 1 | 1 | 4 |
| -U | 1 | 1 | 4 |
| +U | 1 | 1 | 5 |
| -U | 1 | 1 | 5 |
| +U | 1 | 1 | 6 |
| -U | 1 | 1 | 6 |
| +U | 1 | 1 | 7 |
| -U | 1 | 1 | 7 |
| +U | 1 | 1 | 8 |
| -U | 1 | 1 | 8 |
| +U | 1 | 1 | 9 |
| -U | 1 | 1 | 9 |
| +U | 1 | 1 | 10 |
不过当快照被删除之后,none模式就无法自动推算changelog了
。因此这个none模式的changelog实际上是不够稳定的,而且比较耗费运算资源;- 由于批处理模式不会使用changelog,因此批处理模式和none模式是比较搭配的。
drop table paimon_none_mode_expire;
CREATE TABLE paimon_none_mode_expire (a INT,b INT,c STRING,PRIMARY KEY (a) NOT ENFORCED
) WITH ('connector' = 'paimon','changelog-producer' = 'none','snapshot.num-retained.min' = '10', 'snapshot.num-retained.max' = '10' -- 设置最多保存10个快照
);-- 间隔一段时间插入
INSERT INTO paimon_none_mode_expire VALUES (1, 1, '1');
INSERT INTO paimon_none_mode_expire VALUES (1, 1, '2');
INSERT INTO paimon_none_mode_expire VALUES (1, 1, '3');
INSERT INTO paimon_none_mode_expire VALUES (1, 1, '4');
INSERT INTO paimon_none_mode_expire VALUES (1, 1, '5');
INSERT INTO paimon_none_mode_expire VALUES (1, 1, '6');
INSERT INTO paimon_none_mode_expire VALUES (1, 1, '7');
INSERT INTO paimon_none_mode_expire VALUES (1, 1, '8');
INSERT INTO paimon_none_mode_expire VALUES (1, 1, '9');
INSERT INTO paimon_none_mode_expire VALUES (1, 1, '10');
INSERT INTO paimon_none_mode_expire VALUES (1, 1, '11');-- 可以发现之前快照已经删除
[root@centos01 ~]# hdfs dfs -ls /user/hive/warehouse/paimon_db.db/paimon_none_mode_expire/snapshot/*
-rw-r--r-- 1 root supergroup 1 2025-02-18 18:27 /user/hive/warehouse/paimon_db.db/paimon_none_mode_expire/snapshot/EARLIEST
-rw-r--r-- 1 root supergroup 2 2025-02-18 18:27 /user/hive/warehouse/paimon_db.db/paimon_none_mode_expire/snapshot/LATEST
-rw-r--r-- 1 root supergroup 622 2025-02-18 18:26 /user/hive/warehouse/paimon_db.db/paimon_none_mode_expire/snapshot/snapshot-10
-rw-r--r-- 1 root supergroup 624 2025-02-18 18:26 /user/hive/warehouse/paimon_db.db/paimon_none_mode_expire/snapshot/snapshot-11
-rw-r--r-- 1 root supergroup 622 2025-02-18 18:27 /user/hive/warehouse/paimon_db.db/paimon_none_mode_expire/snapshot/snapshot-12
-rw-r--r-- 1 root supergroup 622 2025-02-18 18:27 /user/hive/warehouse/paimon_db.db/paimon_none_mode_expire/snapshot/snapshot-13
-rw-r--r-- 1 root supergroup 621 2025-02-18 18:24 /user/hive/warehouse/paimon_db.db/paimon_none_mode_expire/snapshot/snapshot-4
-rw-r--r-- 1 root supergroup 621 2025-02-18 18:25 /user/hive/warehouse/paimon_db.db/paimon_none_mode_expire/snapshot/snapshot-5
-rw-r--r-- 1 root supergroup 623 2025-02-18 18:25 /user/hive/warehouse/paimon_db.db/paimon_none_mode_expire/snapshot/snapshot-6
-rw-r--r-- 1 root supergroup 621 2025-02-18 18:25 /user/hive/warehouse/paimon_db.db/paimon_none_mode_expire/snapshot/snapshot-7
-rw-r--r-- 1 root supergroup 621 2025-02-18 18:25 /user/hive/warehouse/paimon_db.db/paimon_none_mode_expire/snapshot/snapshot-8
-rw-r--r-- 1 root supergroup 621 2025-02-18 18:25 /user/hive/warehouse/paimon_db.db/paimon_none_mode_expire/snapshot/snapshot-9Flink SQL> select * from paimon_none_mode_expire /*+ OPTIONS('scan.snapshot-id' = '1')*/;
+----+-------------+-------------+----------------+
| op | a | b | c |
+----+-------------+-------------+----------------+
| +I | 1 | 1 | 4 |
| -U | 1 | 1 | 4 |
| +U | 1 | 1 | 5 |
| -U | 1 | 1 | 5 |
| +U | 1 | 1 | 6 |
| -U | 1 | 1 | 6 |
| +U | 1 | 1 | 8 |
| -U | 1 | 1 | 8 |
| +U | 1 | 1 | 7 |
| -U | 1 | 1 | 7 |
| +U | 1 | 1 | 9 |
| -U | 1 | 1 | 9 |
| +U | 1 | 1 | 10 |
| -U | 1 | 1 | 10 |
| +U | 1 | 1 | 11 |
3.2 input
'changelog-producer' = 'input'
的情况下,一次checkpoint产生一个data文件的同时,也会产生一个changelog文件,其中changelog文件内容和data文件内容完全一致;'changelog-producer' = 'input'
的情况下,如果你的操作不完整,那么流式读取的结果也是不对的。- 例如insert2次相同主键,按照主键表的逻辑,应该是会出现-D +I 或者 -U +U 的场景,但是由于input模式,不会额外的处理changelog,insert两次,changelog就写两次insert
- 要想对,那就需要先insert,然后delete,再insert,或者执行update操作
'changelog-producer' = 'input'
就是为流处理设计的,但是数据处理操作必须要标准,即必须要有-U +U -D +I的操作,而CDC任务接到的数据就是标准的,因此一般用于cdc采集且后期要进行流式处理的表
-- 应用场景:
-- 一般用于cdc采集且后期要进行流式处理的表
CREATE TABLE paimon_input_mode (a INT,b INT,c STRING,PRIMARY KEY (a) NOT ENFORCED
) WITH ('connector' = 'paimon','changelog-producer' = 'input'
);-- 插入重复的主键数据
-- 第一次插入
INSERT INTO paimon_input_mode VALUES (1, 1, '1');-- 延迟一段时间后,在另一个窗口第二次插入
INSERT INTO paimon_input_mode VALUES (1, 1, '2');Flink SQL> SET 'execution.runtime-mode' = 'streaming';
[INFO] Session property has been set.-- 可以发现没有-U和+U
Flink SQL> select * from paimon_input_mode;
+----+-------------+-------------+--------------+
| op | a | b | c |
+----+-------------+-------------+--------------+
| +I | 1 | 1 | 1 |
| +I | 1 | 1 | 2 |
3.3 lookup
-
'changelog-producer' = 'lookup'
会查询变化前的数据,并对比变化后数据,自动生成一份chagelog。相比input模式而言,lookup会自己产生正确的changelog,而不管输入数据是否符合规范; -
lookup模式针对于不是cdc采集的表,而且要用于流式处理的表;
-
Lookup 会将数据缓存在内存和本地磁盘上,可以使用以下选项来调整性能:
Option Default Type Description lookup.cache-file-retention 1 h Duration The cached files retention time for lookup. After the file expires, if there is a need for access, it will be re-read from the DFS to build an index on the local disk. lookup.cache-max-disk-size unlimited MemorySize Max disk size for lookup cache, you can use this option to limit the use of local disks. lookup.cache-max-memory-size 256 mb MemorySize Max memory size for lookup cache. -
应用场景:
- 非cdc采集的表且更在乎数据延迟
- 使用了聚合引擎或者写入过程有计算逻辑
- 流处理表使用了first-row合并引擎
3.4 full-compaction
-
'changelog-producer' = 'full-compaction'
模式相比于lookup模式区别:- lookup模式:一次checkpoint产生一次changelog,数据延迟小,资源消耗大
- full-compaction模式:n次checkpoint产生一次changelog,数据延迟大,资源消耗小
-
通过指定
full-compaction.delta-commits
表属性,在增量提交(检查点 checkpoint)后将不断触发 full compaction。 默认情况下设置为 1,因此每个检查点都会进行完全压缩并生成change log。 -
应用场景:
非cdc采集的表且
不在乎数据延迟的场景