Apache Paimon 使用之 Creating Table

1.创建 Catalog 管理的 Tables

在Paimon Catalog中创建的Tables由Catalog管理,当Tables从Catalog中删除时,其table files也将被删除。

当使用Paimon Catalog,创建一个名为MyTable的managed table,在Catalog的default数据库中有五列,其中dthhuser_id是primary keys。

Flink 引擎

CREATE TABLE MyTable (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING,PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
);

Spark3引擎

CREATE TABLE MyTable (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING
) TBLPROPERTIES ('primary-key' = 'dt,hh,user_id'
);

Hive引擎

SET hive.metastore.warehouse.dir=warehouse_path;CREATE TABLE MyTable (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING
)
STORED BY 'org.apache.paimon.hive.PaimonStorageHandler'
TBLPROPERTIES ('primary-key' = 'dt,hh,user_id'
);

注意:在删除表之前,应停止在表上的插入Job,否则无法完全删除表文件。

I)分区表

创建一个名为MyTable的表,其中dthh是分区列,dthhuser_id是主键。

Flink引擎

CREATE TABLE MyTable (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING,PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) PARTITIONED BY (dt, hh);

Spark3引擎

CREATE TABLE MyTable (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING
) PARTITIONED BY (dt, hh) TBLPROPERTIES ('primary-key' = 'dt,hh,user_id'
);

Hive引擎

SET hive.metastore.warehouse.dir=warehouse_path;CREATE TABLE MyTable (user_id BIGINT,item_id BIGINT,behavior STRING
) PARTITIONED BY ( dt STRING,hh STRING
)
STORED BY 'org.apache.paimon.hive.PaimonStorageHandler'
TBLPROPERTIES ('primary-key' = 'dt,hh,user_id'
);

注意:通过配置partition.expiration-time,可以自动删除过期的分区。

ii)选择分区字段

以下三种类型的字段可以定义为数仓中的分区字段:

  • Creation Time(推荐):创建时间通常是不可变的,因此可以将其视为分区字段并将其添加到主键中。
  • Event Time:事件时间是原始表中的一个字段,对于CDC数据,例如从MySQL CDC同步的表或Paimon生成的Changelogs,都是完整的CDC数据,包括UPDATE_BEFORE Records,即使声明了包含主键的分区字段,也可以实现唯一性(需要'changelog-producer'='input'
  • CDC op_ts:它不能被定义为分区字段,无法知道以前的record timestamp。
iii)指定统计模式

Paimon将自动收集数据文件的统计数据,以加快查询过程。支持四种模式:

  • full:收集完整的指标:null_count, min, max
  • truncate(length):长度可以是任何正数,默认模式是truncate(16)这意味着收集空数,最小/最大值,截断长度为16。主要是为了避免过大的列会放大清单文件。
  • counts:只收集空计数。
  • none:禁用元数据统计信息收集。

统计收集器模式可以通过'metadata.stats-mode'配置,默认为'truncate(16)',可以通过设置'fields.{field_name}.stats-mode'来配置字段级别。

iiii)字段默认值

Paimon表目前支持为表属性中的字段设置默认值,请注意,无法指定分区字段和主键字段。

Flink引擎

CREATE TABLE MyTable (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING,PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) PARTITIONED BY (dt, hh)
with('fields.item_id.default-value'='0'
);

Spark3引擎

CREATE TABLE MyTable (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING
) PARTITIONED BY (dt, hh) TBLPROPERTIES ('primary-key' = 'dt,hh,user_id','fields.item_id.default-value'='0'
);

Hive引擎

SET hive.metastore.warehouse.dir=warehouse_path;CREATE TABLE MyTable (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING
)
STORED BY 'org.apache.paimon.hive.PaimonStorageHandler'
TBLPROPERTIES ('primary-key' = 'dt,hh,user_id','partition'='dt,hh','fields.item_id.default-value'='0'
);
2.Create Table As

表可以通过查询结果创建,例如CREATE TABLE table_b AS SELECT id, name FORM table_a,生成的表 table_b 将等同于创建表并插入带有以下语句的数据:CREATE TABLE table_b (id INT, name STRING); INSERT INTO table_b SELECT id, name FROM table_a;

当使用CREATE TABLE AS SELECT,可以指定主键或分区。

Flink 引擎

/* For streaming mode, you need to enable the checkpoint. */CREATE TABLE MyTable (user_id BIGINT,item_id BIGINT
);
CREATE TABLE MyTableAs AS SELECT * FROM MyTable;/* partitioned table */
CREATE TABLE MyTablePartition (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING
) PARTITIONED BY (dt, hh);
CREATE TABLE MyTablePartitionAs WITH ('partition' = 'dt') AS SELECT * FROM MyTablePartition;/* change options */
CREATE TABLE MyTableOptions (user_id BIGINT,item_id BIGINT
) WITH ('file.format' = 'orc');
CREATE TABLE MyTableOptionsAs WITH ('file.format' = 'parquet') AS SELECT * FROM MyTableOptions;/* primary key */
CREATE TABLE MyTablePk (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING,PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
);
CREATE TABLE MyTablePkAs WITH ('primary-key' = 'dt,hh') AS SELECT * FROM MyTablePk;/* primary key + partition */
CREATE TABLE MyTableAll (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING,PRIMARY KEY (dt, hh, user_id) NOT ENFORCED 
) PARTITIONED BY (dt, hh);
CREATE TABLE MyTableAllAs WITH ('primary-key' = 'dt,hh', 'partition' = 'dt') AS SELECT * FROM MyTableAll;

Spark3引擎

CREATE TABLE MyTable (user_id BIGINT,item_id BIGINT
);
CREATE TABLE MyTableAs AS SELECT * FROM MyTable;/* partitioned table*/
CREATE TABLE MyTablePartition (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING
) PARTITIONED BY (dt, hh);
CREATE TABLE MyTablePartitionAs PARTITIONED BY (dt) AS SELECT * FROM MyTablePartition;/* change TBLPROPERTIES */
CREATE TABLE MyTableOptions (user_id BIGINT,item_id BIGINT
) TBLPROPERTIES ('file.format' = 'orc');
CREATE TABLE MyTableOptionsAs TBLPROPERTIES ('file.format' = 'parquet') AS SELECT * FROM MyTableOptions;/* primary key */
CREATE TABLE MyTablePk (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING
) TBLPROPERTIES ('primary-key' = 'dt,hh,user_id'
);
CREATE TABLE MyTablePkAs TBLPROPERTIES ('primary-key' = 'dt') AS SELECT * FROM MyTablePk;/* primary key + partition */
CREATE TABLE MyTableAll (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING
) PARTITIONED BY (dt, hh) TBLPROPERTIES ('primary-key' = 'dt,hh,user_id'
);
CREATE TABLE MyTableAllAs PARTITIONED BY (dt) TBLPROPERTIES ('primary-key' = 'dt,hh') AS SELECT * FROM MyTableAll;
i)Create Table Like

要创建与另一个表相同的schema、分区和表属性的表,请使用CREATE TABLE LIKE。

CREATE TABLE MyTable (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING,PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
);CREATE TABLE MyTableLike LIKE MyTable;-- Create Paimon Table like other connector table
CREATE TABLE MyTableLike WITH ('connector' = 'paimon') LIKE MyTable;
ii)Table Properties

可以指定表属性来启用功能或提高Paimon的性能。

以下SQL创建一个名为MyTable的表,其五列由dthh分区,其中dthhuser_id是主键,此表有两个属性:'bucket' = '2''bucket-key' = 'user_id'

Flink引擎

CREATE TABLE MyTable (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING,PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) PARTITIONED BY (dt, hh) WITH ('bucket' = '2','bucket-key' = 'user_id'
);

Spark引擎

CREATE TABLE MyTable (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING
) PARTITIONED BY (dt, hh) TBLPROPERTIES ('primary-key' = 'dt,hh,user_id','bucket' = '2','bucket-key' = 'user_id'
);

Hive引擎

CREATE TABLE MyTable (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING
)
STORED BY 'org.apache.paimon.hive.PaimonStorageHandler'
TBLPROPERTIES ('primary-key' = 'dt,hh,user_id','partition'='dt,hh','bucket' = '2','bucket-key' = 'user_id'
);
3.Creating External Tables

外部表被记录,但不由catalog管理,如果外部表被删除,其表文件不会被删除。

Paimon外部表可以在任何catalog中使用,如果不想创建Paimon Catalog,只想读/写Table,可以考虑创建外部表。

注意:如果表已经存在,options 不会像dynamic options一样更新到表的metadata中。

Flink引擎-已弃用,建议使用Paimon Catalog

Flink SQL支持读取和写入外部表,外部Paimon表是通过指定connectorpath表属性创建的,以下SQL创建了一个名为MyTable的外部表,有五列,其中表文件的基本路径是hdfs:///path/to/table

CREATE TABLE MyTable (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING,PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) WITH ('connector' = 'paimon','path' = 'hdfs:///path/to/table','auto-create' = 'true' -- this table property creates table files for an empty table if table path does not exist-- currently only supported by Flink
);

注意:Flink SQL必须声明所有字段。

Spark引擎

Spark3仅支持通过Scala API创建外部表,以下Scala代码将位于hdfs:///path/to/table的表加载到DataSet

val dataset = spark.read.format("paimon").load("hdfs:///path/to/table")

Hive引擎

要访问现有的paimon表,可以在Hive中将它们注册为外部表,以下SQL创建了一个名为my_table的外部表,其中表文件的基本路径是hdfs:///path/to/table,由于schema存储在表文件中,所以用户无需定义列。

CREATE EXTERNAL TABLE my_table
STORED BY 'org.apache.paimon.hive.PaimonStorageHandler'
LOCATION 'hdfs:///path/to/table';
4.Creating Temporary Tables

临时表仅支持Flink引擎,与外部表一样,临时表只是记录,但不由当前的Flink SQL session管理。

如果临时表被删除,其resources将不会被删除,当Flink SQL session关闭时,临时表会被丢弃。

如果想将Paimon Catalog与其它表一起使用,但不想将它们存储在其它Catalog中,可以创建一个临时表。

以下Flink SQL创建了一个Paimon Catalog和一个临时表:

CREATE CATALOG my_catalog WITH ('type' = 'paimon','warehouse' = 'hdfs:///path/to/warehouse'
);USE CATALOG my_catalog;-- Assume that there is already a table named my_table in my_catalogCREATE TEMPORARY TABLE temp_table (k INT,v STRING
) WITH ('connector' = 'filesystem','path' = 'hdfs:///path/to/temp_table.csv','format' = 'csv'
);SELECT my_table.k, my_table.v, temp_table.v FROM my_table JOIN temp_table ON my_table.k = temp_table.k;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/725603.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

px2rem实现vue项目响应式布局

第一步 首先需要在项目中安装px2rem插件 npm install postcss-px2rem px2rem-loader --save 第二步 在项目src目录下新建util文件夹,在util文件夹下新建rem.js文件,内容如下: // rem等比适配配置文件 // 基准大小 const baseSize 14 //…

【机器学习300问】27、什么是决策树?

〇、两个预测任务 (1)任务一:银行预测偿还能力 当前,某银行正致力于发掘潜在的放贷用户。他们掌握了每位用户的三个关键特征:房产状况、婚姻状况以及年收入。此外,银行还拥有过往这些用户的债务偿还能力的…

刚工作菜鸟的小总结2

刚工作菜鸟的小总结2 1. using 关键字 using关键字可以用来定义一个类型的别名。例如 using SI_Error = int ,它的作用是将 SI_Error 这个名称与 int 类型进行关联,也就是说在后续代码中,可以使用 SI_Error 来代替 int 类型。如果程序中出现了 SI_Error ,我们就能清晰知道…

c/c++ 指针

参考链接:https://blog.csdn.net/soonfly/article/details/51131141 指针是一个特殊的变量,它里面存储的数值被解释成为内存里的一个地址。 一、指针定义 复杂指针定义涉及较多运算符,分析指针功能从变量名处起,根据运算符优先级结合,一步一步分析。首…

Linux系统——LVS-DR群集部署及拓展

目录 引言 1.LVS的工作模式及其工作过程 2.列举出LVS调度算法 3.LVS调度常见算法(均衡策略) 3.1固定调度算法:rr,wrr,dh,sh 3.2动态调度算法:wlc,lc,lblc 4.LVS三种工作模式区别 一、I…

前端每日一练:vue3 为什么要用 proxy 替换 Object.defineproperty ?为什么只对对象劫持,而要对数组进行方法重写?

vue3 为什么要用 proxy 替换 Object.defineproperty ? Vue 3 在设计上选择使用 Proxy 替代 Object.defineProperty 主要是为了提供更好的响应性和性能。​Object.defineProperty 是在 ES5 中引入的属性定义方法,用于对对象的属性进行劫持和拦截。Vue 2.…

更快更强,Claude 3全面超越GPT4,能归纳15万单词

ChatGPT4和Gemini Ultra被Claude 3 AI模型超越了? 3月4日周一,人工智能公司Anthropic推出了Claude 3系列AI模型和新型聊天机器人,其中包括Opus、Sonnet和Haiku三种模型,该公司声称,这是迄今为止它们开发的最快速、最强…

.NET Core日志内容详解,详解不同日志级别的区别和有关日志记录的实用工具和第三方库详解与示例

在本文中,我们将详细介绍.NET Core日志内容,包括不同日志级别的区别,以及一些常用的日志记录实用工具和第三方库。同时,我们还将通过示例来展示如何使用这些工具和库。 一、.NET Core日志级别 .NET Core日志系统提供了五种日志级…

Linux系统——SElinux

目录 前言 一、SELinux 的作用及权限管理机制 1.SELinux 的作用 1.1DAC 1.2MAC 1.3DAC 和 MAC 的对比 2.SELinux 基本概念 2.1主体(Subject) 2.2对象(Object) 2.3政策和规则(Policy & Rule)…

Nginx+Tomcat实现负载均衡动静分离

目录 一、背景与环境 1. 背景 2. 环境图示 3. 目标 二、操作过程 1. 第一层 2. 第二层 一、背景与环境 1. 背景 在一个Web应用程序中,通常会将动态内容(由Tomcat处理)与静态内容(如图片、CSS、JavaScript文件等&#xff…

什么是5G边缘计算网关?

随着5G技术的飞速发展和普及,边缘计算作为5G时代的关键技术之一,正日益受到业界的关注。而5G边缘计算网关,作为连接5G网络和边缘计算节点的桥梁,扮演着至关重要的角色。HiWoo Box,作为一款卓越的5G边缘计算网关&#x…

springcloud:3.5测试慢调用熔断降级

服务提供者【test-provider8001】 Openfeign远程调用服务提供者搭建 文章地址http://t.csdnimg.cn/06iz8 相关接口 测试远程调用:http://localhost:8001/payment/index 服务消费者【test-consumer-resilience4j8004】 Openfeign远程调用消费者搭建 文章地址http://t…

Unity3D Netty网络框架的使用详解与原理详解

前言 Unity3D是一款强大的跨平台游戏开发引擎,而Netty是一个高性能、异步事件驱动的网络应用框架。结合使用Unity3D和Netty可以实现网络游戏的开发,实现客户端和服务器端之间的通信。本文将详细介绍Unity3D和Netty的使用方法和原理,以及如何…

学习大数据,所必需的java基础(8)

文章目录 字符缓冲流字符缓冲输出流 _Buffered和Writer字符缓冲输入流字符缓冲流练习 转换流字符编码字符集转换流转换流_OutputStreamWriter序列流和反序列流的介绍序列化流_ObjectOutputStream反序列化_ObjectInputStream不想被序列化操作反序列化时出现的问题以及分析和解决…

解决:hive数据库初始失败

1、背景 采用Mysql数据库保存hive元数据时,我们一般是这样操作: 启动mysql服务;用mysql新建一个库(比如库名为"hive“,这个“hive”与hive-site.xml中的local:3306/hive的“hive”对应,是用来保存hiv…

滴滴基于 Clickhouse 构建新一代日志存储系统

ClickHouse 是2016年开源的用于实时数据分析的一款高性能列式分布式数据库,支持向量化计算引擎、多核并行计算、高压缩比等功能,在分析型数据库中单表查询速度是最快的。2020年开始在滴滴内部大规模地推广和应用,服务网约车和日志检索等核心平…

SkyWalking 本地启动以及闪退问题

1. 下载包 Downloads | Apache SkyWalking SkyWalking APM包含OAP和UI Java Agent 就是Java 的探针 2. 运行 UI 默认端口是 8080, OAP 默认端口是 11800(grpc)12800(http) 如果占用可以修改配置文件 UI 项目的配…

MySQL和Redis Common Command

Ubuntu 配置 MySQL 下载对应版本的deb包并解压,用不到test可删除,wegt或者直接去镜像网站下载均可 tar xvf mysql-server_5.7.37-1ubuntu18.04_amd64.deb-bundle.tar -C /mnt/d/opt/mysql-debrm -f mysql-community-test_5.7.37-1ubuntu18.04_amd64.deb…

Verdi VC Apps Batch mode 使用

Verdi VC Apps除了能在Verdi gui中启动之外,其实还可以使用batch mode. 下面我简单介绍一下如何使用: $VERDI_HOME/share/VIA/Apps/Bin/ 目录下有各个Apps对应的perl脚本,我们使用该脚本来启动batch mode. 以listRegisters.pl 为例&#xf…