Paimon数据湖详解(第49天)

系列文章目录

一. Paimon数据湖增删改查
二. 查询优化
三. 系统表
四. Lookup Joins


文章目录

      • 系列文章目录
      • 前言
      • Paimon数据湖的使用
      • 1、创建Table
        • 1.1 创建catalog管理的表
        • 1.2 分区表
        • 1.3 Create Table As(了解)
        • 1.4 Create Table Like
        • 1.5 表属性
        • 1.6 创建外部表
        • 1.7 创建临时表
      • 2、修改Table
        • 2.1 修改或添加Table Properties
        • 2.2 移除Table Properties
        • 2.3 表重命名
        • 2.4 添加新列
        • 2.5 删除列
        • 2.6 修改列的名称
        • 2.7 更改列的空性
        • 2.8 更改列的备注
        • 2.9 更改列的类型
        • 2.10 修改watermark
      • 3、写入Table
        • 3.1 语法
        • 3.2 将可空字段写入非空字段
        • 3.3 修改数据
        • 3.4 清除数据
        • 3.5 更新数据
        • 3.6 删除数据
      • 4、查询Table
        • 4.1 批量查询
          • 4.1.1 批处理时间旅行
          • 4.1.2 批处理增量
        • 4.2 流式查询
          • 4.2.1 流处理时间旅行
          • 4.2.2 Consumer ID(了解)
          • 4.2.3 Read Overwrite
        • 4.3 查询优化
      • 5、系统表(了解)
      • 6、Lookup Joins


前言

本文主要详解了Paimon数据湖的使用。


Paimon数据湖的使用

1、创建Table

1.1 创建catalog管理的表

在Paimon catalog中创建的表由catalog管理。当表从catalog中删除时,它的表文件也将被删除。类似Hive中的内部表。

示例(在Filesystem Metastore中创建):

在这里插入图片描述

CREATE TABLE mypaimon.test.MyTable (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING,PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
);解释: NOT ENFORCED叫做非强制。因为Paimon主要是进行海量数据分析,如果在海量数据上保证数据的主键,那么是非常消耗性能。我们不能完全依赖Paimon对数据进行主键的管理,我们需要在代码层面去确保数据的主键。
1.2 分区表

示例如下:

-- 正常创建分区表
CREATE TABLE mypaimon.test.MyPartitionTable1 (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING,PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) PARTITIONED BY (dt, hh);-- 错误创建分区表:分区字段不在PRIMARY KEY中
CREATE TABLE mypaimon.test.MyPartitionTable2 (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING,PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) PARTITIONED BY (item_id);-- 字段指定默认值
CREATE TABLE mypaimon.test.MyPartitionTable3 (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING,PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) PARTITIONED BY (dt, hh)
with('fields.item_id.deafult-value'='999'
);
注意: 在阿里云Flink的1.15版本及以下,如果定义了主键,那么分区字段必须是主键的子集

可能遇到的错误:

在这里插入图片描述

原因: 在阿里云Flink的15版本及以下,如果定义了主键,分区字段必须是主键的子集
1.3 Create Table As(了解)

在开源版本中,Paimon支持 create table as 语法,表可以由查询的结果创建和填充。当使用CREATE TABLE作为SELECT时,我们可以指定主键或分区,语法请参考下面的sql。

但是在阿里云flink中,虽然也支持CREATE TABLE AS写法,但是目前不支持paimon表作为源表。所以以下SQL了解即可。

/* 可以修改设置 */
CREATE TABLE MyTableAll (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING,PRIMARY KEY (dt, hh, user_id) NOT ENFORCED 
) PARTITIONED BY (dt, hh);CREATE TABLE MyTableAllAs WITH ('primary-key' = 'dt,hh', 'partition' = 'dt') AS SELECT * FROM MyTableAll;
1.4 Create Table Like

要创建与另一个表具有相同模式、分区和表属性的表,可以使用create table LIKE。

CREATE TABLE IF NOT EXISTS mypaimon.test.MyTable (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING,
PRIMARY KEY (dt, hh, user_id) NOT ENFORCED);CREATE TABLE mypaimon.test.MyTableLike LIKE mypaimon.test.MyTable;
1- create table as既会复制数据,也会复制表结构
2- create table like只会复制表结构
1.5 表属性

用户可以指定表属性来启用特性或提高Paimon的性能。表属性详细请参考:https://help.aliyun.com/zh/flink/developer-reference/apache-paimon-connector?spm=a2c4g.11174283.0.i3

示例:

CREATE TABLE mypaimon.test.MyTableProperties (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING,
PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) PARTITIONED BY (dt, hh) WITH ('bucket' = '2','bucket-key' = 'user_id'
);
1.6 创建外部表

外部表由catalog记录,但是不由catalog管理。如果删除了paimon的外部表,那么只会删除元数据信息不会删除对应的数据。类似Hive的外部表。

Flink SQL支持读写外部表。外部Paimon表是通过指定连接器和路径属性创建的。下面的SQL创建了一个名为MyExternalTable的外部表,它有2列,其中表文件的路径是 oss://gz18-paimon-oss/test.db/word_count。

-- 不能使用paimon的catalog来创建
CREATE TABLE MyExternalTable (word STRING PRIMARY KEY NOT ENFORCED,cnt BIGINT
) WITH ('connector' = 'paimon','path' = 'oss://gz18-paimon-oss/test.db/MyExternalTable','auto-create' = 'true' -- 如果表路径不存在,此table属性将为空表创建表文件,目前仅由Flink支持
);注意: path中的gz18-paimon-oss要改成你自己的oss的bucket名称

并且可以对这个表进行操作

-- 部署到线上运行
insert into MyExternalTable values('hello',1000);
-- 批模式查询
SELECT sum(cnt) AS total_cnt FROM MyExternalTable;

注意:需要将insert 语句部署到线上运行,才能将数据真正插入到表中。

通过paimon外部表的方式,可以对paimon的数据进行查询和修改。

可能遇到的错误:

在这里插入图片描述

原因: 不能在Paimon的Catalog中使用'connector' = 'paimon'
1.7 创建临时表

临时表仅有Flink支持,与外部表一样,临时表只是记录,而不是由当前的FlinkSQL会话管理。如果临时表被删除,它的资源不会被删除。当前FlinkSQL会话关闭的时候,会删除临时表。

如果希望将Paimon catalog与其他表一起使用,但又不希望将它们存储在 catalog 中,则可以创建一个临时表。下面的Flink SQL创建了一个Paimon catalog和一个临时表,并演示了如何同时使用这两个表。

准备工作:

创建temp_table.csv,其中数据为:

1,beijing
2,shanghai

编辑好后上传到OSS的gz18-bucket 目录。

-- 使用创建过的mypaimon catalog-- 【流式作业】创建my_table表并导入数据
CREATE TABLE mypaimon.test.my_table (k INT,v STRING
);-- 【批模式】【部署】运行
INSERT INTO mypaimon.test.my_table values (1, 'zs'),(2, 'ls');-- 以下在批作业中一起【调试】运行即可
-- 【批模式】创建temp_table表
CREATE TEMPORARY TABLE mypaimon.test.temp_table (k INT,v STRING
) WITH ('connector' = 'filesystem','path' = 'oss://gz18-bucket/temp_table.csv','format' = 'csv'
);-- 【批模式】查询
SELECT my_table.k, my_table.v, temp_table.v 
FROM mypaimon.test.my_table 
JOIN mypaimon.test.temp_table ON my_table.k = temp_table.k;注意: path中的gz18-bucket一定要改成你自己的bucket名称

在这里插入图片描述

查询结果如下:
在这里插入图片描述

如果删除 temp_table后, temp_table.csv依然存在。

-- 在阿里云flink中不需要运行,temporary表只会在运行时生效
drop temporary table mypaimon.test.temp_table;

2、修改Table

2.1 修改或添加Table Properties
ALTER TABLE mypaimon.test.my_table SET (
'write-buffer-size' = '256 MB'
);

在这里插入图片描述

2.2 移除Table Properties
ALTER TABLE mypaimon.test.my_table RESET ('write-buffer-size');
2.3 表重命名
ALTER TABLE mypaimon.test.my_table RENAME TO my_table_new;

注意:如果使用对象存储,如S3或OSS,请谨慎使用此语法,因为对象存储的重命名不是原子性的,在失败的情况下可能只会移动部分文件。

2.4 添加新列

注意:使用的阿里云Flink版本需要是1.17版本,否则不支持添加和修改字段信息。

ALTER TABLE mypaimon.test.my_table ADD (c1 INT, c2 STRING);desc mypaimon.test.my_table;

在这里插入图片描述

要添加具有指定位置的新列,请使用FIRST或AFTER col_name。

ALTER TABLE mypaimon.test.my_table ADD c INT FIRST;
ALTER TABLE mypaimon.test.my_table ADD b INT AFTER c;

注意:要将一个存在的列修改到一个新的位置,使用FIRST或AFTER col_name。

ALTER TABLE mypaimon.test.my_table MODIFY c1 DOUBLE FIRST;
ALTER TABLE mypaimon.test.my_table MODIFY c2 DOUBLE AFTER c1;
2.5 删除列
ALTER TABLE mypaimon.test.my_table DROP (c1, c2);
2.6 修改列的名称
ALTER TABLE mypaimon.test.my_table RENAME c TO cc;
2.7 更改列的空性
CREATE TABLE mypaimon.test.null_table (id INT PRIMARY KEY NOT ENFORCED, coupon_info FLOAT NOT NULL);
-- 将列' coupon_info '从NOT NULL更改为可空
ALTER TABLE mypaimon.test.null_table MODIFY coupon_info FLOAT;
-- 将列' coupon_info '从可空改为NOT NULL
-- 如果已经有NULL值,设置如下表选项,在修改表之前静默删除这些记录。
SET 'table.exec.sink.not-null-enforcer' = 'DROP';
ALTER TABLE mypaimon.test.null_table MODIFY coupon_info FLOAT NOT NULL;
2.8 更改列的备注
ALTER TABLE mypaimon.test.null_table MODIFY coupon_info BIGINT COMMENT '优惠券信息';
2.9 更改列的类型
ALTER TABLE mypaimon.test.null_table MODIFY coupon_info DOUBLE;

对于是否可以转化,可以参考下面链接:

https://paimon.apache.org/docs/0.7/how-to/altering-tables/

在这里插入图片描述

2.10 修改watermark

注意:watermark中涉及的字段数据类型必须是timestamp或者timestamp_ltz

只在flink中生效,修改的语法如下:

CREATE TABLE mypaimon.test.my_table (k INT,v STRING
);desc mypaimon.test.my_table
image-20240303091437682

添加

ALTER TABLE mypaimon.test.my_table ADD (log_ts varchar(20));
ALTER TABLE mypaimon.test.my_table ADD (ts AS TO_TIMESTAMP(log_ts) AFTER log_ts,WATERMARK FOR ts AS ts - INTERVAL '1' HOUR);

在这里插入图片描述

修改

ALTER TABLE mypaimon.test.my_table MODIFY WATERMARK FOR ts AS ts - INTERVAL '2' HOUR;

在这里插入图片描述

删除

ALTER TABLE mypaimon.test.my_table DROP WATERMARK;

在这里插入图片描述

3、写入Table

可以使用INSERT语句向表中插入新行或覆盖表中的现有数据。插入的行可以由值表达式指定,也可以由查询结果指定。

3.1 语法
INSERT { INTO | OVERWRITE } table_identifier [ part_spec ] [ column_list ] 
{ value_expr | query }

(1) part_spec

一个可选参数,用于指定分区的键和值对的逗号分隔列表。请注意,可以在分区规范中使用类型化文字(例如,date’ 2019-01-02 ')。

语法: PARTITION ( partition_col_name = partition_col_val [ , … ] )

(2) column_list

一个可选参数,用于指定属于table_identifier表的以逗号分隔的列列表。

注意:所有指定的列都应该存在于表中,需包括除静态分区列之外的所有列,并且不能重复。

语法: (col_name1 [, column_name2, …])

(3) value_expr

指定要插入的值。可以插入显式指定的值或NULL。必须用逗号分隔子句中的每个值。可以指定多个值集来插入多行。

语法: VALUES ( { value | NULL } [ , … ] ) [ , ( … ) ]

注意:目前,Flink不支持直接使用NULL,因此应该使用 CAST (NULL AS data_type) 将NULL转换为实际数据类型。

3.2 将可空字段写入非空字段

不能在一个表的非空列中插入另一个表的可空列。假设,我们在表a中有一个列key1,它是主键,主键不能为空。在表B中有一个列key2,它是可空的。如果我们运行这样的sql语句:

INSERT INTO A key1 SELECT key2 FROM B

Flink 和 Spark中都会报错,处理方式是:使用”NVL”或”COALESCE”,将一个可空列转成非空列。

INSERT INTO A key1 SELECT COALESCE(key2, <non-null expression>) FROM B
3.3 修改数据

(1)使用insert将记录/更改应用到表

Paimon支持在sink阶段按分区和桶对数据进行shuffle。

INSERT INTO MyTable SELECT ...

(2)覆盖语义

  • 对于未分区的表,Paimon支持覆盖整个表
INSERT OVERWRITE MyTable SELECT ...
  • 对于分区表,Paimon支持覆盖指定分区
INSERT OVERWRITE MyTable PARTITION (key1 = value1, key2 = value2, ...) SELECT ...
  • 动态覆盖

Flink的默认覆盖模式是动态分区覆盖(这意味着Paimon只删除出现在被覆盖数据中的分区)。可以设置dynamic-partition-overwrite 为 false 将其更改为静态覆盖。

dynamic-partition-overwrite配置参数的默认值是true。

参考链接:https://paimon.apache.org/docs/0.7/maintenance/configurations/

-- 假如MyTable是一个分区表-- 动态覆盖
INSERT OVERWRITE MyTable SELECT ...-- 静态覆盖 (覆盖整个表)
INSERT OVERWRITE MyTable /*+ OPTIONS('dynamic-partition-overwrite' = 'false') */ SELECT ...
3.4 清除数据

(1)清除表:可以使用INSERT OVERWRITE通过插入空值来清除表。

INSERT OVERWRITE MyTable /*+ OPTIONS('dynamic-partition-overwrite'='false') */ SELECT * FROM MyTable WHERE false

(2)清除分区:

目前Paimon支持两种方式去清除分区:

  • 与清除表一样,可以使用INSERT OVERWRITE 向分区插入空值来清除分区的数据。
-- 语法
INSERT OVERWRITE MyTable /*+ OPTIONS('dynamic-partition-overwrite'='false') */ PARTITION (key1 = value1, key2 = value2, ...) 
SELECT selectSpec FROM MyTable WHERE false
-- 案例:
CREATE TABLE `new-mypaimon`.test.OverwriteTable (k0 INT,k1 INT,v STRING) PARTITIONED BY (k0, k1);
-- 在批作业中部署运行
insert into `new-mypaimon`.test.OverwriteTable values(1,1,'a'),(2,2,'b'),(3,3,'c');
-- 在批作业中调试运行
select * from `new-mypaimon`.test.OverwriteTable;
-- 方法一 
-- 在批作业中部署运行
INSERT OVERWRITE `new-mypaimon`.test.OverwriteTable /*+ OPTIONS('dynamic-partition-overwrite'='false') */ PARTITION (k0 = 1) 
SELECT k1, v FROM `new-mypaimon`.test.OverwriteTable WHERE false;
-- 在批作业中调试运行
select * from `new-mypaimon`.test.OverwriteTable;

在这里插入图片描述

  • 上面的方法不支持删除多个分区。如果需要删除多个分区,可以通过flink run提交drop-partition作业。
3.5 更新数据

目前,Paimon支持在Flink 1.17及以后的版本中使用UPDATE更新记录。可以在Flink的批处理模式下执行UPDATE。目前阿里云Flink中暂不支持。

需要表具备以下两个特点:

  • 必须是主键表;
  • 需要对MergeEngine进行重复数据删除或部分更新。
-- 语法
UPDATE table_identifier SET column1 = value1, column2 = value2, ... WHERE condition;-- 开源版本案例:
CREATE TABLE UpdateTable(a STRING,b INT,c INT,PRIMARY KEY (a) NOT ENFORCED
)WITH('write-mode' = 'change-log','merge-engine' = 'deduplicate');-- 插入
INSERT INTO UpdateTable values('myTable', 1,1);
select * from UpdateTable;
-- 更新
UPDATE UpdateTable SET b = 1, c = 2 WHERE a = 'myTable';
UPDATE UpdateTable SET b = 1, c = 3 WHERE a = 'myTable';
3.6 删除数据

在Flink 1.16和以前的版本中,Paimon只支持通过Flink run提交“delete”作业来删除记录。

在Flink 1.17及更高的版本,可以直接使用删除语句(只在批模式下支持)。但是需要设置两个条件:

  • 设置write-mode为change-log
  • 如果表有主键, MergeEngine需要重复数据删除

目前阿里云Flink中暂不支持。

-- 语法
DELETE FROM table_identifier WHERE conditions;-- 开源版本案例:
CREATE TABLE DeleteTable (id BIGINT NOT NULL,currency STRING,rate BIGINT,dt String,PRIMARY KEY (id, dt) NOT ENFORCED) PARTITIONED BY (dt) WITH ( 'write-mode' = 'change-log','merge-engine' = 'deduplicate' );-- 插入语句
INSERT INTO DeleteTable values(1, 'UNKNOWN', 1, '2023-08-22');
select * from DeleteTable;

在这里插入图片描述

-- 删除语句
DELETE FROM DeleteTable WHERE currency = 'UNKNOWN';
select * from DeleteTable;

在这里插入图片描述

4、查询Table

4.1 批量查询

Paimon的批处理读取返回表快照中的所有数据。默认情况下,批处理读取返回最新的快照。

-- Flink SQL
SET 'execution.runtime-mode' = 'batch';
4.1.1 批处理时间旅行

带时间旅行的Paimon批读可以指定一个快照或一个标签,并读取相应的数据。

-- 查看快照信息
SELECT * FROM t$snapshots;-- 可以指定snapshot id
SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '1') */;-- 可以指定timestamp (unix milliseconds)
SELECT * FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;-- 可以指定tag,如 'my-tag'
SELECT * FROM t /*+ OPTIONS('scan.tag-name' = 'my-tag') */;注意: snapshot的编号从1开始。

案例:

(1)建表并插入数据

CREATE TABLE mypaimon.test.travelTable (k0 INT,k1 INT,v STRING) PARTITIONED BY (k0, k1);
-- 在批作业中部署运行
insert into mypaimon.test.travelTable values(1,1,'a'),(2,2,'b'),(3,3,'c');
-- 在批作业中部署运行
insert into mypaimon.test.travelTable values(4,4,'d');
-- 在批作业中部署运行
INSERT OVERWRITE mypaimon.test.travelTable /*+ OPTIONS('dynamic-partition-overwrite'='false') */ PARTITION (k0 = 1) 
SELECT k1, v FROM mypaimon.test.travelTable WHERE false;-- 在批作业中调试运行
select * from mypaimon.test.travelTable;

(2)

SELECT * FROM mypaimon.test.travelTable/*+ OPTIONS('scan.snapshot-id' = '1') */;

在这里插入图片描述

(3)

SELECT * FROM mypaimon.test.travelTable/*+ OPTIONS('scan.snapshot-id' = '2') */;

在这里插入图片描述

(4)

SELECT * FROM mypaimon.test.travelTable/*+ OPTIONS('scan.snapshot-id' = '3') */;

在这里插入图片描述

可能遇到的错误:

在这里插入图片描述

原因: 进行时间旅行的时候,如果访问了不存在的snapshot快照,那么会报错
4.1.2 批处理增量

读取开始快照(不包含)和结束快照之间的增量变化。

举例:

  • ‘5,10’ 意味着 snapshot 5 和 snapshot 10 之间的
  • ‘TAG1,TAG3’ 意味着 TAG1 和 TAG3 之间的
SELECT * FROM t /*+ OPTIONS('incremental-between' = '12,20') */;

在批处理SQL中,不允许返回DELETE记录,因此将删除-D的记录。如果你想看到DELETE记录,你可以使用audit_log表:

SELECT * FROM t$audit_log /*+ OPTIONS('incremental-between' = '12,20') */;

案例:

SELECT * FROM mypaimon.test.travelTable /*+ OPTIONS('incremental-between' = '1,3') */;

在这里插入图片描述

总结:在批量查询时,可以指定快照id,也可以指定时间戳,还可以指定tag进行查询。这样就可以实现查询历史状态,也就是时间旅行。

4.2 流式查询

默认情况下,流式读取在第一次启动时生成表上的最新快照,并继续读取最新的更改。

默认情况下,Paimon确保任务的启动被正确处理,并包含全部数据。

-- 开源版本
SET 'execution.runtime-mode' = 'streaming';
-- 阿里云Flink
在流作业草稿中运行

也可以不使用快照数据,使用最新浏览的模式。

-- 连续读取最新更改,而不在开始时生成快照
SELECT * FROM t /*+ OPTIONS('scan.mode' = 'latest') */
4.2.1 流处理时间旅行

如果你只想处理今天及以后的数据,你可以使用分区过滤器:

SELECT * FROM t WHERE dt > '2023-06-26';

如果它不是一个分区表,或者不能按分区进行过滤,可以使用Time travel的流读取。

-- 可以指定snapshot id
SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '1') */;-- 可以指定timestamp (unix milliseconds)
SELECT * FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;-- 在第一次启动时读取快照id 1L,并继续读取更改
SELECT * FROM t /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '1') */;

案例:

-- 在流作业草稿中调试运行
SELECT * FROM mypaimon.test.travelTable /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '2') */;

在这里插入图片描述

如果持续向mypaimon.test.travelTable表插入数据,则会打印出新增的数据。

4.2.2 Consumer ID(了解)

在流式读取表时指定consumer-id,这是一个实验性功能。

SELECT * FROM t /*+ OPTIONS('consumer-id' = 'myid') */;

当流读取Paimon表时,下一个快照id将被记录到文件系统中。这有几个优点:

  • 当之前的作业停止后,新启动的作业可以继续消耗之前的进度,而不需要从状态恢复。新的读取将从消费者文件中找到的下一个快照 ID 开始读取。
  • 在判断一个快照是否过期时,Paimon会查看文件系统中该表的所有消费者,如果还有消费者依赖这个快照,那么这个快照就不会因为过期而被删除。
  • 当没有水印定义时,Paimon表会将快照中的水印传递到下游Paimon表,这意味着可以跟踪整个管道的水印进度。

注意:消费者将防止快照过期。可以指定“consumer.expiration-time”来管理消费者的生命周期。

示例:

(1)指定consumer-id开始流式查询:

SELECT * FROM t /*+ OPTIONS(‘consumer-id’ = ‘test’) */;

(2)停掉原先的流式查询,插入数据:

insert into t values(1,2,3);

(3)再次指定consumer-id流式查询:

SELECT * FROM t /*+ OPTIONS(‘consumer-id’ = ‘test’) */;

4.2.3 Read Overwrite

默认情况下,流读取将忽略INSERT OVERWRITE生成的提交。如果想要读取OVERWRITE的提交,可以配置 streaming-read-overwrite 为true。

总结:流式查询会不断读取最新表的变化,然后在最开始时是否读取全量数据,可以通过scan-mode来指定,使用-full的类型即可。

可以像kafka一样,使用consumer id去管理消费者的消费点位

4.3 查询优化

(1)强烈建议与查询一起指定分区和主键过滤器,这将加快查询的数据跳过。

可以加速数据跳转的过滤函数有:

  • =
  • <
  • <=
  • >
  • >=
  • IN (…)
  • LIKE ‘abc%’
  • IS NULL

(2)Paimon将按主键对数据进行排序,这加快了点查询和范围查询的速度。当使用复合主键时,查询过滤器最好在主键的最左边形成一个前缀,以获得良好的加速。

示例:

-- 复合主键的表
CREATE TABLE orders (catalog_id BIGINT,order_id BIGINT,.....,PRIMARY KEY (catalog_id, order_id) NOT ENFORCED 
)

通过为主键的最左边的前缀指定一个范围过滤器,查询可以获得很好的加速。

SELECT * FROM orders WHERE catalog_id=1025;SELECT * FROM orders WHERE catalog_id=1025 AND order_id=29495;SELECT * FROM ordersWHERE catalog_id=1025AND order_id>2035 AND order_id<6000;

但是,下面的过滤器不能很好地加速查询。

SELECT * FROM orders WHERE order_id=29495;SELECT * FROM orders WHERE catalog_id=1025 OR order_id=29495;

5、系统表(了解)

在开源版本中,支持查询系统表,系统表中记录了表的一些元数据信息,详见https://paimon.apache.org/docs/0.5/how-to/system-tables/。

6、Lookup Joins

Lookup Joins 是流查询中的一种join。它用从Paimon查询的数据填充表。

在Flink中,Paimon支持对带有主键的表和仅追加表进行lookup join。下面的示例说明了这个特性。

以下案例是使用客户表做维表,与订单表进行关联,然后查看关联的效果。

(1)创建paimon表

-- 使用创建好的mypaimon catalog
-- 在paimon catalog中建一个表
CREATE TABLE mypaimon.test.customers (id INT PRIMARY KEY NOT ENFORCED,name STRING,country STRING,zip STRING
);

(2)在Mysql中建表并插入数据

CREATE DATABASE IF NOT EXISTS test;
CREATE TABLE test.customers (id INT PRIMARY KEY,name varchar(20),country varchar(20),zip varchar(20)
);
INSERT INTO test.customers VALUES (1, '张三', '中国', '081212'),(2, '李四', '中国', '872211');

(3)在Flink中创建Mysql映射表

CREATE TABLE if not exists mysql_cdc_customers (id int,name string,country string,zip string,PRIMARY KEY (id) NOT ENFORCED
) WITH ('connector'= 'mysql','hostname'= 'rm-cn-lr53rh4wc00028.rwlb.rds.aliyuncs.com','port'= '3306','username'= 'itcast','password'='Itcast123','server-time-zone'= 'Asia/Shanghai','database-name'= 'test','table-name'= 'customers'
);注意: hostname、username、password都需要改成自己的。

(4)将mysql数据插入到paimon中

-- 流作业草稿中部署运行 
INSERT INTO mypaimon.test.customers
select * from mysql_cdc_customers;

(5)登录ECS控制台

点击实例,点击远程连接,选择通过Workbench远程连接

输入密码,点击确定

(6)创建topic

-- 启动kafka(前提启动zookeeper)
/export/server/zookeeper/bin/zkServer.sh start
cd /export/server/kafka/
bin/kafka-server-start.sh -daemon config/server.properties-- 创建topic(ip为ECS的内网ip,可以通过ifconfig查询)
bin/kafka-topics.sh --create --topic orders_info --bootstrap-server node1:9092

(7)在Flink创建kafka映射表

--创建一个临时左表,从kafka读数
CREATE TABLE Orders (order_id INT,total INT,customer_id INT,proc_time AS PROCTIME()
) WITH ('connector' = 'kafka','topic' = 'orders_info','properties.bootstrap.servers' = '172.28.237.195:9092','properties.group.id' = 'testGroup','scan.startup.mode' = 'earliest-offset','format' = 'csv'
);注意: properties.bootstrap.servers中的IP地址改成你自己的ECS服务器内网(私网)的IP地址

(8)向orders_info中插入一些数据:

bin/kafka-console-producer.sh --broker-list node1:9092 --topic orders_info
1,100,1
2,20,1

(10)现在可以在lookup join中使用customers表

-- 流式作业草稿中调试运行
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
JOIN mypaimon.test.customers
FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;

结果如下:

在这里插入图片描述

再向kafka中输入 3,50,2

结果变化如下:

在这里插入图片描述

再向kafka中输入 4,66,3

结果没有变化,因为id为3的维表信息关联不到。

此时向mysql中插入一条数据:

insert into test.customers values (3, '王五', '中国', '111111'); 

结果不会更新。

等待片刻(插入到paimon的任务的checkpoint完成)。

再向kafka中输入 5,77,3,此时可以看到能关联到结果。

在这里插入图片描述

此时再改变维表值,如在mysql中进行更新

update test.customers set zip='test' where id = 3;

此时,已经产生的结果也不再发生变化。新来的结果会关联新数据。

再向kafka中输入 6,88,3 ,结果如下

在这里插入图片描述

lookup join 操作将在本地维护一个RocksDB缓存,并实时提取表的最新更新。lookup join 操作将只提取必要的数据,因此筛选条件对性能非常重要。

此特性仅适用于最多包含数千万条记录的表,以避免过度使用本地磁盘。

注意:如果Orders(主表)的记录Join缺失,因为customers(lookup表)对应的数据还没有准备好。可以考虑使用Flink的Delayed Retry Strategy For Lookup。

总结:

(1)如果事实表(Kafka数据源)先于维表(MySQL数据源)到达了,这时事实表就不会关联到维表,即使后续维表数据来了,也不会更新之前的关联结果了

(2)paimon表一旦发生变化,会实时更新维表(Flink的内存)中

(3)如果关联结果已经生成了,然后维表发生了变化,这时之前的关联结果也不会发生变化了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/51445.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

无心剑中译莎士比亚《爱如星辰引迷舟》

莎士比亚十四行诗第116首 Sonnet 116 爱如星辰引迷舟 Let me not to the marriage of true minds Admit impediments. Love is not love Which alters when it alteration finds, Or bends with the remover to remove: O, no! it is an ever-fixed mark That looks on tempe…

C++(week14): C++提高:(一)面向对象设计:设计原则、设计模式

文章目录 一、面向对象设计的概念4.统一建模语言&#xff1a;UML语言StartUML 二、类与类之间的关系0.总结(1)类与类的五种关系(2)区别(3)面向对象 vs 基于对象 1.继承 (泛化耦合)2.组合 (Composition)3.聚合 (Aggregation)4.关联(1)双向关联(2)单向关联 5.依赖 (Dependency) 三…

简单几步,把浏览器书签转换成导航网页

废话不多说直奔主题上干货 Step 1 下载浏览器书签 1&#xff0c;电脑浏览器点击下载Pintree Pintree 是一个开源项目&#xff0c;旨在将浏览器书签导出成导航网站。通过简单的几步操作&#xff0c;就可以将你的书签转换成一个美观且易用的导航页面。 2. 安装 Pintree B…

【保姆级讲解下QT6.3】

&#x1f3a5;博主&#xff1a;程序员不想YY啊 &#x1f4ab;CSDN优质创作者&#xff0c;CSDN实力新星&#xff0c;CSDN博客专家 &#x1f917;点赞&#x1f388;收藏⭐再看&#x1f4ab;养成习惯 ✨希望本文对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出…

【人工智能 | 机器学习 | 理论篇】线性模型

文章目录 1. 基本形式2. 线性回归3. 对数几率回归4. 线性判别分析5. 多分类学习6. 类别不平衡问题 1. 基本形式 设有 d 个属性描述的示例 x ( x 1 , x 2 , x 3 , . . . , x d ) x ({x_1, x_2, x_3, ..., x_d}) x(x1​,x2​,x3​,...,xd​) 线性模型&#xff08;linear mode…

每天一个设计模式之命令模式(第二天)

交互模式中的命令模式&#xff0c;在开始记录之前&#xff0c;要讲些自己的感受&#xff0c;真真切切的感受到了悟性的瓶颈&#xff01;一共十页书&#xff0c;需要自己细细琢磨品味&#xff0c;至少三四遍才大概了解了他们间的逻辑&#xff0c;我需要调整下自己的学习思路&…

快速写一个Makefile

本文主要展示Makefile的基本要素和示例&#xff0c;让读者可以快速写出一个实用的Makefile。 简要说明 Makefile&#xff0c;GNU make命令工具。 书写格式 <target> : <prerequisites> [tab] <commands> <target> 文件名或某操作的名字&#xff0…

uniapp开发精选短视频视频小程序实战笔记20240725,实现顶部轮播图和热门短剧

创建项目 创建项目,叫video_app。 在pages.json里面修改一下标题: 新建search搜索页面和me我的页面。 此时界面预览效果如下: 引入静态资源 主要是static里面的内容,全部复制过来。 配置底部导航栏 pages.json,放到顶层,和全部样式同级: "tabBar&quo…

详细分析 Sql Server查询卡顿的排查方向

目录 前言1. 问题所示2. 原理分析2.1 缺乏索引2.2 表碎片2.3 查询计划缓存2.4 锁和阻塞 3. 总结 前言 本篇为理论知识的分析以及对症下药&#xff0c;前阵子发生过Bug&#xff0c;后通过迁移服务器以及数据库最终才解决问题&#xff0c;但是细想当时可能是因为碎片或者缓存的概…

WEBKIT 通过JavaScript 调用本地,硬件未来之窗OS硬件APP

以酒店为例我们需要调用shen份证读取&#xff0c;采集人脸&#xff0c;门锁写房卡&#xff0c;如何通过浏览器调用 1.通过本地http服务 2.通过webkit模式 这里说政务单位模式的集成 由于篇幅问题&#xff0c;怎么集成webkit就不说了 一、webkkit加载交互本地代码 browser.…

23、Python之面向对象:实例属性、类属性,傻傻分不清楚

引言 在上一篇文章中&#xff0c;我们初步介绍了Python面向对象中类定义的语法&#xff0c;顺带介绍了关于面向对象的系统工程中&#xff0c;所涉及的OOA与OOD。 其实&#xff0c;简单来说&#xff0c;类的定义其实就是面向对象的“封装”特性的体现。我们将分析、设计得到的…

BLE自适应跳频算法详解

前言 &#xff08;1&#xff09;自适应跳频算法是相当的简单&#xff0c;小学生都能够看懂&#xff0c;而且网上已经有相当多的关于自适应跳频算法的介绍。既然如此&#xff0c;为什么我还要写这样一篇博客呢&#xff1f; &#xff08;2&#xff09;原因很简单&#xff0c;我发…

内网横向——利用WMI进行内网横向

文章目录 一、WMI介绍二、常规利用方法三、常见利用工具3.1 wmiexec3.2 Invoke-WmiCommand 四、WMI事件订阅的利用4.1 手动实现4.2 Sharp-WMIEvent 网络拓扑&#xff1a; 攻击机kali IP&#xff1a;192.168.111.0 跳板机win7 IP&#xff1a;192.168.111.128&#xff0c;192.168…

业务记录:处理动态表头的CSV/EXCEL文件

业务描述&#xff1a;传入一个动态表头的CSV文件&#xff0c;解析CSV&#xff0c;并保存入库。 CSV文件的表头是不确定的&#xff0c;即顺序和字段个数不确定&#xff0c;以及表头是中文字段。 例如&#xff1a; 为了顺利解析CSV文件&#xff0c;前端需要传入对应的字段名和顺…

axure制作切换栏--动态面板的应用

先看下效果&#xff1a;点击上面的切换栏 切换到西游记栏目&#xff1a; 切换到水浒传栏目&#xff1a; 上述两个图片比对可以发现&#xff0c;在点击切换栏的时候&#xff0c;里面的内容以及切换栏的下面蓝色横线也会发生对应的变化。这里涉及到两个地方的变化&#xff0c;就…

Golang 知识结构图

总结Go的入门知识结构&#xff0c;如下图所示&#xff1a;

图形/视图结构的三个坐标系

图形/视图结构的三个坐标系分别为视图结构系物理结构系&#xff0c;场景坐标系&#xff0c;图形项坐标系。 本文记录实践三个坐标系及视图与场景坐标转换&#xff0c;通过事件槽来显示出来的过程。 自定义1个View视图组件&#xff0c;其中扩展了鼠标点击、鼠标移动的事件&…

Golang | Leetcode Golang题解之第297题二叉树的序列化与反序列化

题目&#xff1a; 题解&#xff1a; type Codec struct{}func Constructor() (_ Codec) {return }func (c Codec) serialize(root *TreeNode) string {if root nil {return "X"}left : "(" c.serialize(root.Left) ")"right : "("…

前端渲染模式

渲染的概念 在Web开发中&#xff0c;渲染&#xff08;Rendering&#xff09;是一个核心概念&#xff0c;指的是将应用程序的数据&#xff08;data&#xff09;与模板&#xff08;template&#xff09;结合&#xff0c;生成最终的HTML页面&#xff0c;这个页面随后会被浏览器解析…

主宰生物进化的 “魔咒” —— 制约生物在特殊环境下进化方式的线索

一些神秘的法则制约着生物在特殊环境下的进化方式。它们还为动物将如何适应气候变暖提供了线索。 一些奇特的进化现象 一艘装满大象和老鼠的 “诺亚方舟” 搁浅在一座偏远的小岛上。动物们都幸存下来并繁衍后代。但是&#xff0c;随着世代相传&#xff0c;奇怪的事情发生了&a…