系列文章目录
一. Paimon数据湖增删改查
二. 查询优化
三. 系统表
四. Lookup Joins
文章目录
- 系列文章目录
- 前言
- Paimon数据湖的使用
- 1、创建Table
- 1.1 创建catalog管理的表
- 1.2 分区表
- 1.3 Create Table As(了解)
- 1.4 Create Table Like
- 1.5 表属性
- 1.6 创建外部表
- 1.7 创建临时表
- 2、修改Table
- 2.1 修改或添加Table Properties
- 2.2 移除Table Properties
- 2.3 表重命名
- 2.4 添加新列
- 2.5 删除列
- 2.6 修改列的名称
- 2.7 更改列的空性
- 2.8 更改列的备注
- 2.9 更改列的类型
- 2.10 修改watermark
- 3、写入Table
- 3.1 语法
- 3.2 将可空字段写入非空字段
- 3.3 修改数据
- 3.4 清除数据
- 3.5 更新数据
- 3.6 删除数据
- 4、查询Table
- 4.1 批量查询
- 4.1.1 批处理时间旅行
- 4.1.2 批处理增量
- 4.2 流式查询
- 4.2.1 流处理时间旅行
- 4.2.2 Consumer ID(了解)
- 4.2.3 Read Overwrite
- 4.3 查询优化
- 5、系统表(了解)
- 6、Lookup Joins
前言
本文主要详解了Paimon数据湖的使用。
Paimon数据湖的使用
1、创建Table
1.1 创建catalog管理的表
在Paimon catalog中创建的表由catalog管理。当表从catalog中删除时,它的表文件也将被删除。类似Hive中的内部表。
示例(在Filesystem Metastore中创建):
CREATE TABLE mypaimon.test.MyTable (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING,PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
);解释: NOT ENFORCED叫做非强制。因为Paimon主要是进行海量数据分析,如果在海量数据上保证数据的主键,那么是非常消耗性能。我们不能完全依赖Paimon对数据进行主键的管理,我们需要在代码层面去确保数据的主键。
1.2 分区表
示例如下:
-- 正常创建分区表
CREATE TABLE mypaimon.test.MyPartitionTable1 (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING,PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) PARTITIONED BY (dt, hh);-- 错误创建分区表:分区字段不在PRIMARY KEY中
CREATE TABLE mypaimon.test.MyPartitionTable2 (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING,PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) PARTITIONED BY (item_id);-- 字段指定默认值
CREATE TABLE mypaimon.test.MyPartitionTable3 (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING,PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) PARTITIONED BY (dt, hh)
with('fields.item_id.deafult-value'='999'
);
注意: 在阿里云Flink的1.15版本及以下,如果定义了主键,那么分区字段必须是主键的子集
可能遇到的错误:
原因: 在阿里云Flink的15版本及以下,如果定义了主键,分区字段必须是主键的子集
1.3 Create Table As(了解)
在开源版本中,Paimon支持 create table as 语法,表可以由查询的结果创建和填充。当使用CREATE TABLE作为SELECT时,我们可以指定主键或分区,语法请参考下面的sql。
但是在阿里云flink中,虽然也支持CREATE TABLE AS写法,但是目前不支持paimon表作为源表。所以以下SQL了解即可。
/* 可以修改设置 */
CREATE TABLE MyTableAll (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING,PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) PARTITIONED BY (dt, hh);CREATE TABLE MyTableAllAs WITH ('primary-key' = 'dt,hh', 'partition' = 'dt') AS SELECT * FROM MyTableAll;
1.4 Create Table Like
要创建与另一个表具有相同模式、分区和表属性的表,可以使用create table LIKE。
CREATE TABLE IF NOT EXISTS mypaimon.test.MyTable (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING,
PRIMARY KEY (dt, hh, user_id) NOT ENFORCED);CREATE TABLE mypaimon.test.MyTableLike LIKE mypaimon.test.MyTable;
1- create table as既会复制数据,也会复制表结构
2- create table like只会复制表结构
1.5 表属性
用户可以指定表属性来启用特性或提高Paimon的性能。表属性详细请参考:https://help.aliyun.com/zh/flink/developer-reference/apache-paimon-connector?spm=a2c4g.11174283.0.i3
示例:
CREATE TABLE mypaimon.test.MyTableProperties (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING,
PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) PARTITIONED BY (dt, hh) WITH ('bucket' = '2','bucket-key' = 'user_id'
);
1.6 创建外部表
外部表由catalog记录,但是不由catalog管理。如果删除了paimon的外部表,那么只会删除元数据信息不会删除对应的数据。类似Hive的外部表。
Flink SQL支持读写外部表。外部Paimon表是通过指定连接器和路径属性创建的。下面的SQL创建了一个名为MyExternalTable的外部表,它有2列,其中表文件的路径是 oss://gz18-paimon-oss/test.db/word_count。
-- 不能使用paimon的catalog来创建
CREATE TABLE MyExternalTable (word STRING PRIMARY KEY NOT ENFORCED,cnt BIGINT
) WITH ('connector' = 'paimon','path' = 'oss://gz18-paimon-oss/test.db/MyExternalTable','auto-create' = 'true' -- 如果表路径不存在,此table属性将为空表创建表文件,目前仅由Flink支持
);注意: path中的gz18-paimon-oss要改成你自己的oss的bucket名称
并且可以对这个表进行操作
-- 部署到线上运行
insert into MyExternalTable values('hello',1000);
-- 批模式查询
SELECT sum(cnt) AS total_cnt FROM MyExternalTable;
注意:需要将insert 语句部署到线上运行,才能将数据真正插入到表中。
通过paimon外部表的方式,可以对paimon的数据进行查询和修改。
可能遇到的错误:
原因: 不能在Paimon的Catalog中使用'connector' = 'paimon'
1.7 创建临时表
临时表仅有Flink支持,与外部表一样,临时表只是记录,而不是由当前的FlinkSQL会话管理。如果临时表被删除,它的资源不会被删除。当前FlinkSQL会话关闭的时候,会删除临时表。
如果希望将Paimon catalog与其他表一起使用,但又不希望将它们存储在 catalog 中,则可以创建一个临时表。下面的Flink SQL创建了一个Paimon catalog和一个临时表,并演示了如何同时使用这两个表。
准备工作:
创建temp_table.csv,其中数据为:
1,beijing
2,shanghai
编辑好后上传到OSS的gz18-bucket 目录。
-- 使用创建过的mypaimon catalog-- 【流式作业】创建my_table表并导入数据
CREATE TABLE mypaimon.test.my_table (k INT,v STRING
);-- 【批模式】【部署】运行
INSERT INTO mypaimon.test.my_table values (1, 'zs'),(2, 'ls');-- 以下在批作业中一起【调试】运行即可
-- 【批模式】创建temp_table表
CREATE TEMPORARY TABLE mypaimon.test.temp_table (k INT,v STRING
) WITH ('connector' = 'filesystem','path' = 'oss://gz18-bucket/temp_table.csv','format' = 'csv'
);-- 【批模式】查询
SELECT my_table.k, my_table.v, temp_table.v
FROM mypaimon.test.my_table
JOIN mypaimon.test.temp_table ON my_table.k = temp_table.k;注意: path中的gz18-bucket一定要改成你自己的bucket名称
查询结果如下:
如果删除 temp_table后, temp_table.csv依然存在。
-- 在阿里云flink中不需要运行,temporary表只会在运行时生效
drop temporary table mypaimon.test.temp_table;
2、修改Table
2.1 修改或添加Table Properties
ALTER TABLE mypaimon.test.my_table SET (
'write-buffer-size' = '256 MB'
);
2.2 移除Table Properties
ALTER TABLE mypaimon.test.my_table RESET ('write-buffer-size');
2.3 表重命名
ALTER TABLE mypaimon.test.my_table RENAME TO my_table_new;
注意:如果使用对象存储,如S3或OSS,请谨慎使用此语法,因为对象存储的重命名不是原子性的,在失败的情况下可能只会移动部分文件。
2.4 添加新列
注意:使用的阿里云Flink版本需要是1.17版本,否则不支持添加和修改字段信息。
ALTER TABLE mypaimon.test.my_table ADD (c1 INT, c2 STRING);desc mypaimon.test.my_table;
要添加具有指定位置的新列,请使用FIRST或AFTER col_name。
ALTER TABLE mypaimon.test.my_table ADD c INT FIRST;
ALTER TABLE mypaimon.test.my_table ADD b INT AFTER c;
注意:要将一个存在的列修改到一个新的位置,使用FIRST或AFTER col_name。
ALTER TABLE mypaimon.test.my_table MODIFY c1 DOUBLE FIRST;
ALTER TABLE mypaimon.test.my_table MODIFY c2 DOUBLE AFTER c1;
2.5 删除列
ALTER TABLE mypaimon.test.my_table DROP (c1, c2);
2.6 修改列的名称
ALTER TABLE mypaimon.test.my_table RENAME c TO cc;
2.7 更改列的空性
CREATE TABLE mypaimon.test.null_table (id INT PRIMARY KEY NOT ENFORCED, coupon_info FLOAT NOT NULL);
-- 将列' coupon_info '从NOT NULL更改为可空
ALTER TABLE mypaimon.test.null_table MODIFY coupon_info FLOAT;
-- 将列' coupon_info '从可空改为NOT NULL
-- 如果已经有NULL值,设置如下表选项,在修改表之前静默删除这些记录。
SET 'table.exec.sink.not-null-enforcer' = 'DROP';
ALTER TABLE mypaimon.test.null_table MODIFY coupon_info FLOAT NOT NULL;
2.8 更改列的备注
ALTER TABLE mypaimon.test.null_table MODIFY coupon_info BIGINT COMMENT '优惠券信息';
2.9 更改列的类型
ALTER TABLE mypaimon.test.null_table MODIFY coupon_info DOUBLE;
对于是否可以转化,可以参考下面链接:
https://paimon.apache.org/docs/0.7/how-to/altering-tables/
2.10 修改watermark
注意:watermark中涉及的字段数据类型必须是timestamp或者timestamp_ltz
只在flink中生效,修改的语法如下:
CREATE TABLE mypaimon.test.my_table (k INT,v STRING
);desc mypaimon.test.my_table
添加
ALTER TABLE mypaimon.test.my_table ADD (log_ts varchar(20));
ALTER TABLE mypaimon.test.my_table ADD (ts AS TO_TIMESTAMP(log_ts) AFTER log_ts,WATERMARK FOR ts AS ts - INTERVAL '1' HOUR);
修改
ALTER TABLE mypaimon.test.my_table MODIFY WATERMARK FOR ts AS ts - INTERVAL '2' HOUR;
删除
ALTER TABLE mypaimon.test.my_table DROP WATERMARK;
3、写入Table
可以使用INSERT语句向表中插入新行或覆盖表中的现有数据。插入的行可以由值表达式指定,也可以由查询结果指定。
3.1 语法
INSERT { INTO | OVERWRITE } table_identifier [ part_spec ] [ column_list ]
{ value_expr | query }
(1) part_spec
一个可选参数,用于指定分区的键和值对的逗号分隔列表。请注意,可以在分区规范中使用类型化文字(例如,date’ 2019-01-02 ')。
语法: PARTITION ( partition_col_name = partition_col_val [ , … ] )
(2) column_list
一个可选参数,用于指定属于table_identifier表的以逗号分隔的列列表。
注意:所有指定的列都应该存在于表中,需包括除静态分区列之外的所有列,并且不能重复。
语法: (col_name1 [, column_name2, …])
(3) value_expr
指定要插入的值。可以插入显式指定的值或NULL。必须用逗号分隔子句中的每个值。可以指定多个值集来插入多行。
语法: VALUES ( { value | NULL } [ , … ] ) [ , ( … ) ]
注意:目前,Flink不支持直接使用NULL,因此应该使用 CAST (NULL AS data_type)
将NULL转换为实际数据类型。
3.2 将可空字段写入非空字段
不能在一个表的非空列中插入另一个表的可空列。假设,我们在表a中有一个列key1,它是主键,主键不能为空。在表B中有一个列key2,它是可空的。如果我们运行这样的sql语句:
INSERT INTO A key1 SELECT key2 FROM B
Flink 和 Spark中都会报错,处理方式是:使用”NVL”或”COALESCE”,将一个可空列转成非空列。
INSERT INTO A key1 SELECT COALESCE(key2, <non-null expression>) FROM B
3.3 修改数据
(1)使用insert将记录/更改应用到表
Paimon支持在sink阶段按分区和桶对数据进行shuffle。
INSERT INTO MyTable SELECT ...
(2)覆盖语义
- 对于未分区的表,Paimon支持覆盖整个表
INSERT OVERWRITE MyTable SELECT ...
- 对于分区表,Paimon支持覆盖指定分区
INSERT OVERWRITE MyTable PARTITION (key1 = value1, key2 = value2, ...) SELECT ...
- 动态覆盖
Flink的默认覆盖模式是动态分区覆盖(这意味着Paimon只删除出现在被覆盖数据中的分区)。可以设置dynamic-partition-overwrite 为 false 将其更改为静态覆盖。
dynamic-partition-overwrite配置参数的默认值是true。
参考链接:https://paimon.apache.org/docs/0.7/maintenance/configurations/
-- 假如MyTable是一个分区表-- 动态覆盖
INSERT OVERWRITE MyTable SELECT ...-- 静态覆盖 (覆盖整个表)
INSERT OVERWRITE MyTable /*+ OPTIONS('dynamic-partition-overwrite' = 'false') */ SELECT ...
3.4 清除数据
(1)清除表:可以使用INSERT OVERWRITE通过插入空值来清除表。
INSERT OVERWRITE MyTable /*+ OPTIONS('dynamic-partition-overwrite'='false') */ SELECT * FROM MyTable WHERE false
(2)清除分区:
目前Paimon支持两种方式去清除分区:
- 与清除表一样,可以使用INSERT OVERWRITE 向分区插入空值来清除分区的数据。
-- 语法
INSERT OVERWRITE MyTable /*+ OPTIONS('dynamic-partition-overwrite'='false') */ PARTITION (key1 = value1, key2 = value2, ...)
SELECT selectSpec FROM MyTable WHERE false
-- 案例:
CREATE TABLE `new-mypaimon`.test.OverwriteTable (k0 INT,k1 INT,v STRING) PARTITIONED BY (k0, k1);
-- 在批作业中部署运行
insert into `new-mypaimon`.test.OverwriteTable values(1,1,'a'),(2,2,'b'),(3,3,'c');
-- 在批作业中调试运行
select * from `new-mypaimon`.test.OverwriteTable;
-- 方法一
-- 在批作业中部署运行
INSERT OVERWRITE `new-mypaimon`.test.OverwriteTable /*+ OPTIONS('dynamic-partition-overwrite'='false') */ PARTITION (k0 = 1)
SELECT k1, v FROM `new-mypaimon`.test.OverwriteTable WHERE false;
-- 在批作业中调试运行
select * from `new-mypaimon`.test.OverwriteTable;
- 上面的方法不支持删除多个分区。如果需要删除多个分区,可以通过flink run提交drop-partition作业。
3.5 更新数据
目前,Paimon支持在Flink 1.17及以后的版本中使用UPDATE更新记录。可以在Flink的批处理模式下执行UPDATE。目前阿里云Flink中暂不支持。
需要表具备以下两个特点:
- 必须是主键表;
- 需要对MergeEngine进行重复数据删除或部分更新。
-- 语法
UPDATE table_identifier SET column1 = value1, column2 = value2, ... WHERE condition;-- 开源版本案例:
CREATE TABLE UpdateTable(a STRING,b INT,c INT,PRIMARY KEY (a) NOT ENFORCED
)WITH('write-mode' = 'change-log','merge-engine' = 'deduplicate');-- 插入
INSERT INTO UpdateTable values('myTable', 1,1);
select * from UpdateTable;
-- 更新
UPDATE UpdateTable SET b = 1, c = 2 WHERE a = 'myTable';
UPDATE UpdateTable SET b = 1, c = 3 WHERE a = 'myTable';
3.6 删除数据
在Flink 1.16和以前的版本中,Paimon只支持通过Flink run提交“delete”作业来删除记录。
在Flink 1.17及更高的版本,可以直接使用删除语句(只在批模式下支持)。但是需要设置两个条件:
- 设置write-mode为change-log
- 如果表有主键, MergeEngine需要重复数据删除
目前阿里云Flink中暂不支持。
-- 语法
DELETE FROM table_identifier WHERE conditions;-- 开源版本案例:
CREATE TABLE DeleteTable (id BIGINT NOT NULL,currency STRING,rate BIGINT,dt String,PRIMARY KEY (id, dt) NOT ENFORCED) PARTITIONED BY (dt) WITH ( 'write-mode' = 'change-log','merge-engine' = 'deduplicate' );-- 插入语句
INSERT INTO DeleteTable values(1, 'UNKNOWN', 1, '2023-08-22');
select * from DeleteTable;
-- 删除语句
DELETE FROM DeleteTable WHERE currency = 'UNKNOWN';
select * from DeleteTable;
4、查询Table
4.1 批量查询
Paimon的批处理读取返回表快照中的所有数据。默认情况下,批处理读取返回最新的快照。
-- Flink SQL
SET 'execution.runtime-mode' = 'batch';
4.1.1 批处理时间旅行
带时间旅行的Paimon批读可以指定一个快照或一个标签,并读取相应的数据。
-- 查看快照信息
SELECT * FROM t$snapshots;-- 可以指定snapshot id
SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '1') */;-- 可以指定timestamp (unix milliseconds)
SELECT * FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;-- 可以指定tag,如 'my-tag'
SELECT * FROM t /*+ OPTIONS('scan.tag-name' = 'my-tag') */;注意: snapshot的编号从1开始。
案例:
(1)建表并插入数据
CREATE TABLE mypaimon.test.travelTable (k0 INT,k1 INT,v STRING) PARTITIONED BY (k0, k1);
-- 在批作业中部署运行
insert into mypaimon.test.travelTable values(1,1,'a'),(2,2,'b'),(3,3,'c');
-- 在批作业中部署运行
insert into mypaimon.test.travelTable values(4,4,'d');
-- 在批作业中部署运行
INSERT OVERWRITE mypaimon.test.travelTable /*+ OPTIONS('dynamic-partition-overwrite'='false') */ PARTITION (k0 = 1)
SELECT k1, v FROM mypaimon.test.travelTable WHERE false;-- 在批作业中调试运行
select * from mypaimon.test.travelTable;
(2)
SELECT * FROM mypaimon.test.travelTable/*+ OPTIONS('scan.snapshot-id' = '1') */;
(3)
SELECT * FROM mypaimon.test.travelTable/*+ OPTIONS('scan.snapshot-id' = '2') */;
(4)
SELECT * FROM mypaimon.test.travelTable/*+ OPTIONS('scan.snapshot-id' = '3') */;
可能遇到的错误:
原因: 进行时间旅行的时候,如果访问了不存在的snapshot快照,那么会报错
4.1.2 批处理增量
读取开始快照(不包含)和结束快照之间的增量变化。
举例:
- ‘5,10’ 意味着 snapshot 5 和 snapshot 10 之间的
- ‘TAG1,TAG3’ 意味着 TAG1 和 TAG3 之间的
SELECT * FROM t /*+ OPTIONS('incremental-between' = '12,20') */;
在批处理SQL中,不允许返回DELETE记录,因此将删除-D的记录。如果你想看到DELETE记录,你可以使用audit_log表:
SELECT * FROM t$audit_log /*+ OPTIONS('incremental-between' = '12,20') */;
案例:
SELECT * FROM mypaimon.test.travelTable /*+ OPTIONS('incremental-between' = '1,3') */;
总结:在批量查询时,可以指定快照id,也可以指定时间戳,还可以指定tag进行查询。这样就可以实现查询历史状态,也就是时间旅行。
4.2 流式查询
默认情况下,流式读取在第一次启动时生成表上的最新快照,并继续读取最新的更改。
默认情况下,Paimon确保任务的启动被正确处理,并包含全部数据。
-- 开源版本
SET 'execution.runtime-mode' = 'streaming';
-- 阿里云Flink
在流作业草稿中运行
也可以不使用快照数据,使用最新浏览的模式。
-- 连续读取最新更改,而不在开始时生成快照
SELECT * FROM t /*+ OPTIONS('scan.mode' = 'latest') */
4.2.1 流处理时间旅行
如果你只想处理今天及以后的数据,你可以使用分区过滤器:
SELECT * FROM t WHERE dt > '2023-06-26';
如果它不是一个分区表,或者不能按分区进行过滤,可以使用Time travel的流读取。
-- 可以指定snapshot id
SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '1') */;-- 可以指定timestamp (unix milliseconds)
SELECT * FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;-- 在第一次启动时读取快照id 1L,并继续读取更改
SELECT * FROM t /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '1') */;
案例:
-- 在流作业草稿中调试运行
SELECT * FROM mypaimon.test.travelTable /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '2') */;
如果持续向mypaimon.test.travelTable表插入数据,则会打印出新增的数据。
4.2.2 Consumer ID(了解)
在流式读取表时指定consumer-id,这是一个实验性功能。
SELECT * FROM t /*+ OPTIONS('consumer-id' = 'myid') */;
当流读取Paimon表时,下一个快照id将被记录到文件系统中。这有几个优点:
- 当之前的作业停止后,新启动的作业可以继续消耗之前的进度,而不需要从状态恢复。新的读取将从消费者文件中找到的下一个快照 ID 开始读取。
- 在判断一个快照是否过期时,Paimon会查看文件系统中该表的所有消费者,如果还有消费者依赖这个快照,那么这个快照就不会因为过期而被删除。
- 当没有水印定义时,Paimon表会将快照中的水印传递到下游Paimon表,这意味着可以跟踪整个管道的水印进度。
注意:消费者将防止快照过期。可以指定“consumer.expiration-time”来管理消费者的生命周期。
示例:
(1)指定consumer-id开始流式查询:
SELECT * FROM t /*+ OPTIONS(‘consumer-id’ = ‘test’) */;
(2)停掉原先的流式查询,插入数据:
insert into t values(1,2,3);
(3)再次指定consumer-id流式查询:
SELECT * FROM t /*+ OPTIONS(‘consumer-id’ = ‘test’) */;
4.2.3 Read Overwrite
默认情况下,流读取将忽略INSERT OVERWRITE生成的提交。如果想要读取OVERWRITE的提交,可以配置 streaming-read-overwrite 为true。
总结:流式查询会不断读取最新表的变化,然后在最开始时是否读取全量数据,可以通过scan-mode来指定,使用-full的类型即可。
可以像kafka一样,使用consumer id去管理消费者的消费点位
4.3 查询优化
(1)强烈建议与查询一起指定分区和主键过滤器,这将加快查询的数据跳过。
可以加速数据跳转的过滤函数有:
- =
- <
- <=
- >
- >=
- IN (…)
- LIKE ‘abc%’
- IS NULL
(2)Paimon将按主键对数据进行排序,这加快了点查询和范围查询的速度。当使用复合主键时,查询过滤器最好在主键的最左边形成一个前缀,以获得良好的加速。
示例:
-- 复合主键的表
CREATE TABLE orders (catalog_id BIGINT,order_id BIGINT,.....,PRIMARY KEY (catalog_id, order_id) NOT ENFORCED
)
通过为主键的最左边的前缀指定一个范围过滤器,查询可以获得很好的加速。
SELECT * FROM orders WHERE catalog_id=1025;SELECT * FROM orders WHERE catalog_id=1025 AND order_id=29495;SELECT * FROM ordersWHERE catalog_id=1025AND order_id>2035 AND order_id<6000;
但是,下面的过滤器不能很好地加速查询。
SELECT * FROM orders WHERE order_id=29495;SELECT * FROM orders WHERE catalog_id=1025 OR order_id=29495;
5、系统表(了解)
在开源版本中,支持查询系统表,系统表中记录了表的一些元数据信息,详见https://paimon.apache.org/docs/0.5/how-to/system-tables/。
6、Lookup Joins
Lookup Joins 是流查询中的一种join。它用从Paimon查询的数据填充表。
在Flink中,Paimon支持对带有主键的表和仅追加表进行lookup join。下面的示例说明了这个特性。
以下案例是使用客户表做维表,与订单表进行关联,然后查看关联的效果。
(1)创建paimon表
-- 使用创建好的mypaimon catalog
-- 在paimon catalog中建一个表
CREATE TABLE mypaimon.test.customers (id INT PRIMARY KEY NOT ENFORCED,name STRING,country STRING,zip STRING
);
(2)在Mysql中建表并插入数据
CREATE DATABASE IF NOT EXISTS test;
CREATE TABLE test.customers (id INT PRIMARY KEY,name varchar(20),country varchar(20),zip varchar(20)
);
INSERT INTO test.customers VALUES (1, '张三', '中国', '081212'),(2, '李四', '中国', '872211');
(3)在Flink中创建Mysql映射表
CREATE TABLE if not exists mysql_cdc_customers (id int,name string,country string,zip string,PRIMARY KEY (id) NOT ENFORCED
) WITH ('connector'= 'mysql','hostname'= 'rm-cn-lr53rh4wc00028.rwlb.rds.aliyuncs.com','port'= '3306','username'= 'itcast','password'='Itcast123','server-time-zone'= 'Asia/Shanghai','database-name'= 'test','table-name'= 'customers'
);注意: hostname、username、password都需要改成自己的。
(4)将mysql数据插入到paimon中
-- 流作业草稿中部署运行
INSERT INTO mypaimon.test.customers
select * from mysql_cdc_customers;
(5)登录ECS控制台
点击实例,点击远程连接,选择通过Workbench远程连接
输入密码,点击确定
(6)创建topic
-- 启动kafka(前提启动zookeeper)
/export/server/zookeeper/bin/zkServer.sh start
cd /export/server/kafka/
bin/kafka-server-start.sh -daemon config/server.properties-- 创建topic(ip为ECS的内网ip,可以通过ifconfig查询)
bin/kafka-topics.sh --create --topic orders_info --bootstrap-server node1:9092
(7)在Flink创建kafka映射表
--创建一个临时左表,从kafka读数
CREATE TABLE Orders (order_id INT,total INT,customer_id INT,proc_time AS PROCTIME()
) WITH ('connector' = 'kafka','topic' = 'orders_info','properties.bootstrap.servers' = '172.28.237.195:9092','properties.group.id' = 'testGroup','scan.startup.mode' = 'earliest-offset','format' = 'csv'
);注意: properties.bootstrap.servers中的IP地址改成你自己的ECS服务器内网(私网)的IP地址
(8)向orders_info中插入一些数据:
bin/kafka-console-producer.sh --broker-list node1:9092 --topic orders_info
1,100,1
2,20,1
(10)现在可以在lookup join中使用customers表
-- 流式作业草稿中调试运行
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
JOIN mypaimon.test.customers
FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;
结果如下:
再向kafka中输入 3,50,2
结果变化如下:
再向kafka中输入 4,66,3
结果没有变化,因为id为3的维表信息关联不到。
此时向mysql中插入一条数据:
insert into test.customers values (3, '王五', '中国', '111111');
结果不会更新。
等待片刻(插入到paimon的任务的checkpoint完成)。
再向kafka中输入 5,77,3,此时可以看到能关联到结果。
此时再改变维表值,如在mysql中进行更新
update test.customers set zip='test' where id = 3;
此时,已经产生的结果也不再发生变化。新来的结果会关联新数据。
再向kafka中输入 6,88,3 ,结果如下
lookup join 操作将在本地维护一个RocksDB缓存,并实时提取表的最新更新。lookup join 操作将只提取必要的数据,因此筛选条件对性能非常重要。
此特性仅适用于最多包含数千万条记录的表,以避免过度使用本地磁盘。
注意:如果Orders(主表)的记录Join缺失,因为customers(lookup表)对应的数据还没有准备好。可以考虑使用Flink的Delayed Retry Strategy For Lookup。
总结:
(1)如果事实表(Kafka数据源)先于维表(MySQL数据源)到达了,这时事实表就不会关联到维表,即使后续维表数据来了,也不会更新之前的关联结果了
(2)paimon表一旦发生变化,会实时更新维表(Flink的内存)中
(3)如果关联结果已经生成了,然后维表发生了变化,这时之前的关联结果也不会发生变化了