paimon,基础查询语句测试

基础设置

-- 创建catalog/加载catalog,如果这个catalog已经存在就不会创建,自动加载元数据信息CREATE CATALOG fs_paimon_catalog WITH (
'type' = 'paimon',
'warehouse' = 'hdfs://wsl01:8020/paimon/catalog'
);
-- 使用catalog
use catalog fs_paimon_catalog;-- sqlClinet使用
-- 设置为批处理模式
SET 'execution.runtime-mode' = 'batch';
-- 设置为流处理模式
SET 'execution.runtime-mode' = 'streaming';
-- 设置查询结果显示方式,sql-clinet 特有
SET 'sql-client.execution.result-mode' = 'tableau';
-- 设置checkpoint,如果使用流模式,必须设置
SET 'execution.checkpointing.interval' = '10 s';root@wsl01:~/soft/paimon/flink-1.17.0# cat fs_catalog.sql
CREATE CATALOG fs_catalog WITH ('type'='paimon','warehouse'='file:/mnt/d/wsl/soft/paimon/catalog'-- 'warehouse' = 'hdfs://wsl01:8020/paimon/catalog'
);
use catalog fs_catalog ;
SET 'sql-client.execution.result-mode' = 'tableau';
-- 默认批处理
SET 'execution.runtime-mode' = 'batch';-- 指定默认启动catalog
bin/sql-client.sh -i fs_catalog.sql

DDL

创建普通表

-- 普通表,没有主键CREATE TABLE t_sample (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING
);

创建主键表

CREATE TABLE t_pk (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING,PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
);

创建分区表

-- 分区表的分区字段必须是主键的子集CREATE TABLE t_partition (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING,PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) PARTITIONED BY (dt, hh);

创建临时表

-- 如果进入paimon创建的catalog后,无法创建非paimon类型的表,如果需要借助第三方的表,需要创建临时表来使用CREATE TEMPORARY TABLE t_temporary (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING,PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) WITH ('connector' = 'filesystem','path' = 'file:///mnt/d/wsl/soft/paimon/temp_table.csv','format' = 'csv'
);

复制表-AS,create table as


-- create as 创建主键表,CREATE TABLE t_create_as_pk AS SELECT * FROM t_pk;
show create table t__pk;
show create table t_create_as_pk;-- create as 创建分区表
show create table t_partition ;
CREATE TABLE t_create_as_partition AS SELECT * FROM t_partition;
show create table t_create_as_partition ;

上述执行结果告诉我们,create as 的表,只保留原表的字段,不保留其他属性信息


-- 通过with 重新指定,关于with的用法,参考flink
CREATE TABLE t_create_as_with with ('primary-key' = 'dt,hh','partition' = 'dt') AS SELECT * FROM t_pk  ;
show create table t_create_as_with;

上述执行结果告诉我们,create as 的表可以通过with 重新指定属性信息

复制表-LIKE,create table like

CREATE TABLE t_create_like like t_pk;
show create table t_pk;
show create table t_create_like;

上述执行结果告诉我们,create like 的表,保留全部信息

DML

常用管理语句

desc #{name}
show create table #{name}
show catalogs;
show databases;
show tables;

新增-普通表

insert into t_sample(user_id,item_id,behavior,dt,hh) values(100,100,'sing-sample','1','2');insert into t_sample  values(101,101,'jump-sample','1','2');insert into t_sample  select 102,102,'rap-sample','1','2';Flink SQL> select * from t_sample;
+---------+---------+-------------+----+----+
| user_id | item_id |    behavior | dt | hh |
+---------+---------+-------------+----+----+
|     100 |     100 | sing-sample |  1 |  2 |
|     101 |     101 | jump-sample |  1 |  2 |
|     102 |     102 |  rap-sample |  1 |  2 |
+---------+---------+-------------+----+----+
3 rows in set

新增-主键表

insert into t_pk values(1,1,'sing','1','2');
insert into t_pk values(2,2,'jump','1','2');
insert into t_pk values(3,3,'rap','1','2');
Flink SQL> select * from t_pk;
+---------+---------+----------+----+----+
| user_id | item_id | behavior | dt | hh |
+---------+---------+----------+----+----+
|       1 |       1 |     sing |  1 |  2 |
|       2 |       2 |     jump |  1 |  2 |
|       3 |       3 |      rap |  1 |  2 |
+---------+---------+----------+----+----+
3 rows in set
insert into t_pk values(3,3,'basketball','1','2');
Flink SQL>  select * from t_pk;
+---------+---------+------------+----+----+
| user_id | item_id |   behavior | dt | hh |
+---------+---------+------------+----+----+
|       1 |       1 |       sing |  1 |  2 |
|       2 |       2 |       jump |  1 |  2 |
|       3 |       3 | basketball |  1 |  2 |
+---------+---------+------------+----+----+
3 rows in set-- 我们发现,主键表,写入两条相同主键的数据,后者会覆盖前者
-- 主键表有一个默认引擎,默认是就是 'merge-engine' = 'deduplicate',因此才有这个效果

新增-分区表

insert into t_partition partition(dt='2024-10-08',hh='15')(user_id,item_id,behavior) values(1,1,'sing');
insert into t_partition partition(dt='2024-10-08',hh='15')(user_id,item_id,behavior) values(2,2,'jump');
insert into t_partition(user_id,item_id,behavior,dt,hh) values(3,3,'rap','2024-10-08','16');
insert into t_partition values(4,4,'basketball','2024-10-08','16');Flink SQL> select * from t_partition;
+---------+---------+------------+------------+----+
| user_id | item_id |   behavior |         dt | hh |
+---------+---------+------------+------------+----+
|       1 |       1 |       sing | 2024-10-08 | 15 |
|       2 |       2 |       jump | 2024-10-08 | 15 |
|       3 |       3 |        rap | 2024-10-08 | 16 |
|       4 |       4 | basketball | 2024-10-08 | 16 |
+---------+---------+------------+------------+----+
4 rows in setinsert into t_partition as select * from t_sample;insert into t_partition partition(dt='2099-10-08',hh='15')(user_id,item_id,behavior) select user_id,item_id,behavior from t_sample;Flink SQL> select * from t_partition;
+---------+---------+-------------+------------+----+
| user_id | item_id |    behavior |         dt | hh |
+---------+---------+-------------+------------+----+
|       1 |       1 |        sing | 2024-10-08 | 15 |
|       2 |       2 |        jump | 2024-10-08 | 15 |
|     100 |     100 | sing-sample | 2099-10-08 | 15 |
|     101 |     101 | jump-sample | 2099-10-08 | 15 |
|     102 |     102 |  rap-sample | 2099-10-08 | 15 |
|       3 |       3 |         rap | 2024-10-08 | 16 |
|       4 |       4 |  basketball | 2024-10-08 | 16 |
|     100 |     100 | sing-sample |          1 |  2 |
|     101 |     101 | jump-sample |          1 |  2 |
|     102 |     102 |  rap-sample |          1 |  2 |
+---------+---------+-------------+------------+----+
10 rows in setFlink SQL> insert overwrite t_partition(user_id,item_id,behavior) values(5,5,'non-partition');
[ERROR] Could not execute SQL statement. Reason:
org.apache.calcite.sql.validate.SqlValidatorException: Column 'dt' has no default value and does not allow NULLs

以上多种写入方式都支持,把分区字段当成普通字段用就行,但是分区字段不能为空

新增-覆盖写入

Flink SQL> select * from t_pk;
+---------+---------+------------+----+----+
| user_id | item_id |   behavior | dt | hh |
+---------+---------+------------+----+----+
|       1 |       1 |       sing |  1 |  2 |
|       2 |       2 |       jump |  1 |  2 |
|       3 |       3 | basketball |  1 |  2 |
+---------+---------+------------+----+----+insert into t_pk values(10,10,'overwrite','1','2');Flink SQL> select * from t_pk;
+---------+---------+-----------+----+----+
| user_id | item_id |  behavior | dt | hh |
+---------+---------+-----------+----+----+
|      10 |      10 | overwrite |  1 |  2 |
+---------+---------+-----------+----+----+
1 row in set

overwrite 会直接清空表,不会考虑主键


insert into t_partition partition(dt='2024-10-08',hh='15')(user_id,item_id,behavior) values(1,1,'sing');
insert into t_partition partition(dt='2024-10-08',hh='15')(user_id,item_id,behavior) values(2,2,'jump');
insert into t_partition partition(dt='2024-10-09',hh='15')(user_id,item_id,behavior) values(3,3,'rap');
Flink SQL> select * from t_partition;
+---------+---------+----------+------------+----+
| user_id | item_id | behavior |         dt | hh |
+---------+---------+----------+------------+----+
|       3 |       3 |      rap | 2024-10-09 | 15 |
|       1 |       1 |     sing | 2024-10-08 | 15 |
|       2 |       2 |     jump | 2024-10-08 | 15 |
+---------+---------+----------+------------+----+
3 rows in setinsert overwrite t_partition partition(dt='2024-10-08',hh='15')(user_id,item_id,behavior) values(4,4,'basketball');Flink SQL> select * from t_partition;
+---------+---------+------------+------------+----+
| user_id | item_id |   behavior |         dt | hh |
+---------+---------+------------+------------+----+
|       3 |       3 |        rap | 2024-10-09 | 15 |
|       4 |       4 | basketball | 2024-10-08 | 15 |
+---------+---------+------------+------------+----+
2 rows in set

分区表只会overwrite 当前要写入分区


Flink SQL> select * from t_partition;
+---------+---------+----------+------------+----+
| user_id | item_id | behavior |         dt | hh |
+---------+---------+----------+------------+----+
|       3 |       3 |      rap | 2024-10-09 | 15 |
|       1 |       1 |     sing | 2024-10-08 | 15 |
|       2 |       2 |     jump | 2024-10-08 | 15 |
+---------+---------+----------+------------+----+
3 rows in set
-- 对指定分区写入空记录,没有效果INSERT OVERWRITE t_partition  PARTITION (`dt` = '2024-10-08', `hh` = '15') SELECT user_id,item_id,behavior FROM t_partition WHERE false;Flink SQL> select * from t_partition;
+---------+---------+----------+------------+----+
| user_id | item_id | behavior |         dt | hh |
+---------+---------+----------+------------+----+
|       3 |       3 |      rap | 2024-10-09 | 15 |
|       1 |       1 |     sing | 2024-10-08 | 15 |
|       2 |       2 |     jump | 2024-10-08 | 15 |
+---------+---------+----------+------------+----+
3 rows in set-- 对指定分区写入空记录,指定 /*+ OPTIONS('dynamic-partition-overwrite'='false') */,会清空指定的分区INSERT OVERWRITE t_partition  /*+ OPTIONS('dynamic-partition-overwrite'='false') */  PARTITION (`dt` = '2024-10-08', `hh` = '15') SELECT user_id,item_id,behavior FROM t_partition WHERE false;
Flink SQL> select * from t_partition;
+---------+---------+----------+------------+----+
| user_id | item_id | behavior |         dt | hh |
+---------+---------+----------+------------+----+
|       3 |       3 |      rap | 2024-10-09 | 15 |
+---------+---------+----------+------------+----+
1 row in set-- 不指定分区写入空记录,指定 /*+ OPTIONS('dynamic-partition-overwrite'='false') */,会清空所有分区,truncate
INSERT OVERWRITE t_partition /*+ OPTIONS('dynamic-partition-overwrite'='false') */ SELECT * FROM t_partition WHERE false;Flink SQL> select * from t_partition;
Empty set

/*+ OPTIONS('dynamic-partition-overwrite'='false') */ Flink默认的覆盖模式是动态分区覆盖 (即Paimon只删除覆盖数据中出现的分区)。可以配置动态分区覆盖将其更改为静态覆盖。

paimon 没有truncate,因此我们可以借助overwite+静态覆盖,这个实现truncate

查询


修改

Important table properties setting:

  • Only primary key table supports this feature. 表必须有主键
  • MergeEngine needs to be deduplicate or partial-update to support this feature. 合并引擎必须为deduplicate,以后会支持partial-update
  • Do not support updating primary keys. 不能修改主键
  • Flink 版本1.17 及以上版本才支持
  • 必须是批处理模式
UPDATE table_identifier SET column1 = value1, column2 = value2, ... WHERE condition;Flink SQL> select * from t_partition;
+---------+---------+-------------+----+----+
| user_id | item_id |    behavior | dt | hh |
+---------+---------+-------------+----+----+
|     100 |     100 | sing-sample |  1 |  2 |
|     101 |     101 | jump-sample |  1 |  2 |
|     102 |     102 |  rap-sample |  1 |  2 |
+---------+---------+-------------+----+----+
3 rows in setupdate t_partition set behavior = 'baskteball-sample' where user_id =100;Flink SQL> select * from t_partition;
+---------+---------+-------------------+----+----+
| user_id | item_id |          behavior | dt | hh |
+---------+---------+-------------------+----+----+
|     100 |     100 | baskteball-sample |  1 |  2 |
|     101 |     101 |       jump-sample |  1 |  2 |
|     102 |     102 |        rap-sample |  1 |  2 |
+---------+---------+-------------------+----+----+
3 rows in set

删除


常用属性

CREATE TABLE my_table (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING,PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) PARTITIONED BY (dt, hh);

统计数优化

paimon 会默认为每一列添加3个统计属性:最大值、最小值、null值数量

有四种配置来约束统计属性

  • full:为所有数据添加最大值、最小值、null值数量统计
  • truncate(length):截断length长度后,为所有数据添加最大值、最小值、null值数量统计,这个是默认值:默认length 16,为了避免长文本字段的统计
  • counts:只对null值数量统计
  • none:不统计

如果需要修改某个字段的统计属性

  • fields.{field_name}.stats-mode,with ( ‘fields.behavior.stats-mode’ = ‘full’ )

官网原文:https://paimon.apache.org/docs/master/flink/sql-ddl/#specify-statistics-mode
在这里插入图片描述

字段默认值

paimon表可以设置字段默认值,但是 不能 对主键字段设置默认值

如果需要修改某个字段的统计属性

  • fields.{field_name}.default-value
  • with ( ‘fields.behavior.default-value’ = ‘sing’ )

官网原文:https://paimon.apache.org/docs/master/flink/sql-ddl/#field-default-value
在这里插入图片描述

聚类写入

在批作业中配置该参数可以启用聚类写入功能,使数据在特定列上按大小范围聚集分布,从而提升该列的查询速度。只能在批处理或者append table(流处理)中使用。

多个列名请使用英文逗号(,)进行分隔,例如’col1,col2’。

  • sink.clustering.by-columns
  • with ( ‘sink.clustering.by-columns’ = ‘user_id,item_id’ )

也可以使用Hints

  • INSERT INTO my_table /*+ OPTIONS(‘sink.clustering.by-columns’ = ‘a,b’) */ SELECT * FROM source;

动态覆盖

Flink overwrite模式,在分区表中默认是动态分区覆盖,也就是说在使用overwrite时,只覆盖当前写入分区的数据,写入数据为空时,不进行覆盖,我们可以设置为静态覆盖,当写入数据为空时,也会覆盖。如果写入的分区为空则覆盖所有分区!

Hints,跟在表的后边,也就是声明本次sql的执行策略,dynamic-partition-overwrite=false > 静态覆盖,truncate的替代语法(TRUNCATE TABLE my_table 需要flink 1.18 之后才支持)
INSERT OVERWRITE my_table /*+ OPTIONS(‘dynamic-partition-overwrite’=‘false’) */

合并引擎

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/881507.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java虚拟机(JVM)

目录 内存区域划分堆(Heap)方法区(Method Area)程序计数器(Program Counter Register)虚拟机栈(VM Stack)本地方法栈(Native Method Stack) 类加载的过程类加…

[C++]使用纯opencv部署yolov11-seg实例分割onnx模型

【算法介绍】 在C中使用纯OpenCV部署YOLOv11-seg进行实例分割是一项具有挑战性的任务,因为YOLOv11通常是用PyTorch等深度学习框架实现的,而OpenCV本身并不直接支持加载和运行PyTorch模型。然而,可以通过一些间接的方法来实现这一目标&#x…

运维工具之ansible

Ansible 1.什么是ansible? ​ ansible是基于ssh架构的自动化运维工具,由python语言实现,通过ansible可以远程批量部署等。 2.部署前提 ​ 控制端需要安装ansible,被控制端要开启ssh服务,并允许远程登录,被管理主机需要安装py…

卸载PLSQL及标准卸载流程

目录 1. 卸载PLSQL2. 删除注册表3. 删除数据信息 1. 卸载PLSQL 等待进度条走完 2. 删除注册表 regedit 右击删除 3. 删除数据信息 由于AppData是隐藏文件,需要勾选隐藏的项目。 重启电脑,PLSQL就卸载成功了。

浏览器和客户端结合的erp系统,java控制浏览器操作自动登录,socket客户端通信进行表单赋值

java做一个toB的客户端操作系统,客户端和web的结合; 主要是使用java编写客户端代码,采用selenium控制浏览器,主要是用到selenium自动化测试的功能; javaEE 项目调用 selenium使用谷歌控件chromedriver.exe控制浏览器…

使用Java调用OpenAI API并解析响应:详细教程

使用Java调用OpenAI API并解析响应:详细教程 在现代应用程序中,API调用是一个非常常见的任务。本文将通过一个完整的示例,讲解如何使用Java调用OpenAI的ChatGPT API,并通过ObjectMapper处理JSON响应。本文的示例不仅适用于OpenAI…

网络参考模型

OSI七层网络参考模型 OSI模型仅作为参考,现实中并不用,OSI模型的目的是为了解决主机之间的网络通讯。 1. 物理层: 物理层将由比特(0和1)组成的数据用不同的媒介(电、光或其他形式的电磁波)传输…

黑马软件测试第一篇_测试理论

概念 使用技术手段验证软件功能是否符合需求 测试种类 功能测试 自动化测试 接口测试 性能测试 按测试阶段划分 单元测试:针对程序源码进行测试 集成测试:又称接口测试,针对模块之间访问地址进行测试 系统测试:对整个系统进行…

京东零售数据湖应用与实践

作者:陈洪健:京东零售大数据架构师,深耕大数据 10 年,2019 年加入京东,主要负责 OLAP 优化、大数据传输工具生态、流批一体、SRE 建设。 当前企业数据处理广泛采用 Lambda 架构。Lambda 架构的优点是保证了数据的完整性…

YOLO的相关改进机制

我的面包多平台有多种关于YOLO的改进,大家尽早关注,不迷路

【宽字节注入】

字符编码 url 编码 GBK编码 utf8 编码 宽字节注入 php中的转译函数 宽字节注入介绍 练习 正常输入没有回显: 没有回显 usernameadmin&passwordadmin 闭合单引号,依旧没有回显 usernameadmin and 11%23&passwordadmin利用宽字节尝试闭合,依旧…

查看SQL Server授权序列号通过SQL查询查看安装日志文件使用PowerShell查询

本人详解 作者:王文峰,参加过 CSDN 2020年度博客之星,《Java王大师王天师》 公众号:JAVA开发王大师,专注于天道酬勤的 Java 开发问题中国国学、传统文化和代码爱好者的程序人生,期待你的关注和支持!本人外号:神秘小峯 山峯 转载说明:务必注明来源(注明:作者:王文峰…

在Stable Diffusion WebUI中安装SadTalker插件时几种错误提示的处理方法

SD中的插件一般安装比较简单,但也有一些插件安装会比较难。比如我在安装SadTalker时,就遇到很多问题,一度放弃了,后来查了一些网上攻略,自己也反复查看日志,终于解决,不吐不快。 一、在Stable …

闪迪U盘误删的数据该怎么恢复呢?3个方法轻松解决

闪迪是一家全球知名的美国公司,也是全球最大的闪存数据存储卡产品供应商,其中,闪迪U盘作为其主要产品之一,因其便携性、大容量和高速传输能力而深受用户喜爱。然而,在平时存储重要数据的时候,会因为我们一系…

ElasticSearch备考 -- Update by query Reindex

一、题目 有个索引task,里面的文档长这样 现在需要添加一个字段all,这个字段的值是以下 a、b、c、d字段的值连在一起 二、思考 需要把四个字段拼接到一起,组成一个新的字段,这个就需要脚本, 这里有两种方案&#xff…

CSRF | GET 型 CSRF 漏洞攻击

关注这个漏洞的其他相关笔记:CSRF 漏洞 - 学习手册-CSDN博客 0x01:GET 型 CSRF 漏洞攻击 —— 理论篇 GET 型 CSRF 漏洞是指攻击者通过构造恶意的 HTTP GET 请求,利用用户的登录状态,在用户不知情的情况下,诱使浏览器…

Elasticsearch(二)集成Spring Boot 基本的API操作

目录 一、集成Spring Boot 1、创建项目 2、pom文件 查看springboot集成的依赖 3、增加es的config类 二、索引相关API 1、创建索引 2、获取索引,判断其是否存在 3、删除索引 三、文档相关API 1、添加文档 2、获取文档,判断是否存在 3、获取文档…

【D3.js in Action 3 精译_029】3.5 给 D3 条形图加注图表标签(上)

当前内容所在位置(可进入专栏查看其他译好的章节内容) 第一部分 D3.js 基础知识 第一章 D3.js 简介(已完结) 1.1 何为 D3.js?1.2 D3 生态系统——入门须知1.3 数据可视化最佳实践(上)1.3 数据可…

深度学习:基于MindSpore实现ResNet50中药分拣

ResNet基本介绍 ResNet(Residual Network)是一种深度神经网络架构,由微软研究院的Kaiming He等人在2015年提出,并且在ILSVRC 2015竞赛中取得了很好的成绩。ResNet主要解决了随着网络深度增加而出现的退化问题,即当网络…

vulnhub-digitalworld.local DEVELOPMENT靶机

vulnhub:digitalworld.local: DEVELOPMENT ~ VulnHub 导入靶机,放在kali同网段,扫描 靶机在192.168.114.129,扫描端口 开了几个端口,8080端口有网页,访问 说是让访问html_pages 似乎把页面都写出来了&…