Apache Paimon Flink引擎解析

Paimon 支持 Flink 1.17, 1.16, 1.15 和 1.14,当前 Paimon 提供了两类 Jar 包,一类支持数据读写,另一类支持其它操作(compaction)

Version	      Type	        Jar
Flink 1.18	  Bundled Jar	  paimon-flink-1.18-0.7.0-incubating.jar
Flink 1.17	  Bundled Jar	  paimon-flink-1.17-0.7.0-incubating.jar
Flink 1.16	  Bundled Jar	  paimon-flink-1.16-0.7.0-incubating.jar
Flink 1.15	  Bundled Jar	  paimon-flink-1.15-0.7.0-incubating.jar
Flink 1.14	  Bundled Jar	  paimon-flink-1.14-0.7.0-incubating.jar
Flink Action	Action Jar	  paimon-flink-action-0.7.0-incubating.jar
1.环境准备

下载 Flink 后解压

tar -xzf flink-*.tgz

拷贝 Paimon bundled jar 包到 Flink 的 lib 目录下

cp paimon-flink-*.jar <FLINK_HOME>/lib/

拷贝 Hadoop Bundled Jar 包到 Flink 的 lib 目录下

cp flink-shaded-hadoop-2-uber-*.jar <FLINK_HOME>/lib/

为同时运行多个Flink作业,修改/conf/flink-conf.yaml中的集群配置

taskmanager.numberOfTaskSlots: 2

本地启动 Flink 集群

<FLINK_HOME>/bin/start-cluster.sh

验证 Web UI 查看集群是否已启动并运行

localhost:8081

启动Flink SQL客户端来执行SQL脚本

<FLINK_HOME>/bin/sql-client.sh
2.创建 Paimon Catalog 和 Table

创建 Catalog 和 Table

-- if you're trying out Paimon in a distributed environment,
-- the warehouse path should be set to a shared file system, such as HDFS or OSS
CREATE CATALOG my_catalog WITH ('type'='paimon','warehouse'='file:/tmp/paimon'
);USE CATALOG my_catalog;-- create a word count table
CREATE TABLE word_count (word STRING PRIMARY KEY NOT ENFORCED,cnt BIGINT
);
3.使用Flink通用的Catalog创建Table

使用Flink通用的Catalog,需要使用Hive metastore,然后可以使用Paimon、Hive和Flink通用表(Kafka和其他表)中的所有表。

在此模式下,应该使用"connector"选项来创建tables。

Paimon将在hive-site.xml中使用hive.metastore.warehouse.dir,需要使用带有scheme的path,例如,hdfs://....否则,Paimon将使用本地路径。

CREATE CATALOG my_catalog WITH ('type'='paimon-generic','hive-conf-dir'='...','hadoop-conf-dir'='...'
);USE CATALOG my_catalog;-- create a word count table
CREATE TABLE word_count (word STRING PRIMARY KEY NOT ENFORCED,cnt BIGINT
) WITH ('connector'='paimon'
);
4.创建 Source 数据表
-- create a word data generator table
CREATE TEMPORARY TABLE word_table (word STRING
) WITH ('connector' = 'datagen','fields.word.length' = '1'
);-- paimon requires checkpoint interval in streaming mode
SET 'execution.checkpointing.interval' = '10 s';-- write streaming data to dynamic table
INSERT INTO word_count SELECT word, COUNT(*) FROM word_table GROUP BY word;
5.数据查询案例

OLAP查询

-- use tableau result mode
SET 'sql-client.execution.result-mode' = 'tableau';-- switch to batch mode
RESET 'execution.checkpointing.interval';
SET 'execution.runtime-mode' = 'batch';-- olap query the table
SELECT * FROM word_count;

流式查询

-- switch to streaming mode
SET 'execution.runtime-mode' = 'streaming';-- track the changes of table and calculate the count interval statistics
SELECT `interval`, COUNT(*) AS interval_cnt FROM(SELECT cnt / 10000 AS `interval` FROM word_count) GROUP BY `interval`;
6.退出Flink SQL客户端,停止Flink集群

退出Flink SQL客户端

-- uncomment the following line if you want to drop the dynamic table and clear the files
-- DROP TABLE word_count;-- exit sql-client
EXIT;

停止Flink集群

./bin/stop-cluster.sh
7.触发 Savepoint 和 recover

由于Paimon有自己的snapshot管理,可能与Flink的checkpoint管理相冲突,在从savepoint恢复时会导致异常(不会导致存储损坏)。

建议使用以下方法开启savepoint

使用Stop with savepoint。
使用 Tag with savepoint,并在从savepoint恢复之前rollback-to-tag。

8.使用Action Jar

Flink本地集群启动后,使用以下命令执行 action jar

<FLINK_HOME>/bin/flink run \/path/to/paimon-flink-action-0.7.0-incubating.jar \<action><args>

compact 一张 table

<FLINK_HOME>/bin/flink run \/path/to/paimon-flink-action-0.7.0-incubating.jar \compact \--path <TABLE_PATH>
9.支持的Flink数据类型

支持所有Flink数据类型,除了

  • MULTISET不受支持。
  • MAP不支持作为主键。
10.使用Flink Managed Memory

Paimon 任务可以创建 memory pools,基于Flink executor管理的 executor memory , 像Flink任务管理的 managed memory。

通过 executor 管理的多个任务的 writer buffers 可以提升 sinks 的稳定性和性能。

使用 Flink managed memory 的配置如下:

OptionDefaultDescription
sink.use-managed-memory-allocatorfalseIf true, flink sink will use managed memory for merge tree; otherwise, it will create an independent memory allocator, which means each task allocates and manages its own memory pool (heap memory), if there are too many tasks in one Executor, it may cause performance issues and even OOM.
sink.managed.writer-buffer-memory256MWeight of writer buffer in managed memory, Flink will compute the memory size, for writer according to the weight, the actual memory used depends on the running environment. Now the memory size defined in this property are equals to the exact memory allocated to write buffer in runtime.

在SQL中为Flink Managed Memory设置内存权重,然后Flink sink operator将获得memory pool大小,并为Paimon writer创建allocator。

INSERT INTO paimon_table /*+ OPTIONS('sink.use-managed-memory-allocator'='true', 'sink.managed.writer-buffer-memory'='256M') */
SELECT * FROM ....;
11.Setting dynamic options

与Paimon表交互时,可以在不更改catalog options的情况下调整table options。

Paimon将获取job-level的dynamic options,并在current session中生效,dynamic options的格式是:

paimon.${catalogName}.${dbName}.${tableName}.${config_key}catalogName/dbName/tableName 可以是 *

例如:

-- set scan.timestamp-millis=1697018249000 for the table mycatalog.default.T
SET 'paimon.mycatalog.default.T.scan.timestamp-millis' = '1697018249000';
SELECT * FROM T;-- set scan.timestamp-millis=1697018249000 for the table default.T in any catalog
SET 'paimon.*.default.T.scan.timestamp-millis' = '1697018249000';
SELECT * FROM T;
12.Procedures

Flink 1.18及以上版本支持Call Statements,通过编写SQL来操作Paimon表的数据和元数据。

注意:当 call 一个 procedure 时,必须按顺序传递参数,如果不想传递某些参数,必须使用 ‘’ 作为占位符。

例如,用并行度为4的任务压缩表default.t,但不想指定分区和排序策略,调用语句应该是
CALL sys.compact('default.t', '', '', '', 'sink.parallelism=4')

指定分区:使用字符串来表示partition filter,“,“表示"AND”,”;"表示“OR”。

例如,指定两个分区date=01或date=02,需要写’date=01;date=02’;如果指定一个带有date=01和day=01的分区,需要写’date=01,day=01’。

table options 语法:使用字符串来表示table options,格式是’key1=value1,key2=value2…'。

Procedure NameUsageExplainationExample
compactCALL [catalog.]sys.compact(‘identifier’) CALL [catalog.]sys.compact(‘identifier’, ‘partitions’) CALL [catalog.]sys.compact(‘identifier’, ‘partitions’, ‘order_strategy’, ‘order_columns’, ‘table_options’)TO compact a table. Arguments:identifier: the target table identifier. Cannot be empty.partitions: partition filter.order_strategy: ‘order’ or ‘zorder’ or ‘none’. Left empty for ‘none’.order_columns: the columns need to be sort. Left empty if ‘order_strategy’ is ‘none’.table_options: additional dynamic options of the table.CALL sys.compact(‘default.T’, ‘p=0’, ‘zorder’, ‘a,b’, ‘sink.parallelism=4’)
compact_databaseCALL [catalog.]sys.compact_database() CALL [catalog.]sys.compact_database(‘includingDatabases’) CALL [catalog.]sys.compact_database(‘includingDatabases’, ‘mode’) CALL [catalog.]sys.compact_database(‘includingDatabases’, ‘mode’, ‘includingTables’) CALL [catalog.]sys.compact_database(‘includingDatabases’, ‘mode’, ‘includingTables’, ‘excludingTables’) CALL [catalog.]sys.compact_database(‘includingDatabases’, ‘mode’, ‘includingTables’, ‘excludingTables’, ‘tableOptions’)To compact databases. Arguments:includingDatabases: to specify databases. You can use regular expression.mode: compact mode. “divided”: start a sink for each table, detecting the new table requires restarting the job; “combined” (default): start a single combined sink for all tables, the new table will be automatically detected.includingTables: to specify tables. You can use regular expression.excludingTables: to specify tables that are not compacted. You can use regular expression.tableOptions: additional dynamic options of the table.CALL sys.compact_database(‘db1|db2’, ‘combined’, ‘table_.*’, ‘ignore’, ‘sink.parallelism=4’)
create_tagCALL [catalog.]sys.create_tag(‘identifier’, ‘tagName’, snapshotId)To create a tag based on given snapshot. Arguments:identifier: the target table identifier. Cannot be empty.tagName: name of the new tag.snapshotId (Long): id of the snapshot which the new tag is based on.CALL sys.create_tag(‘default.T’, ‘my_tag’, 10)
delete_tagCALL [catalog.]sys.delete_tag(‘identifier’, ‘tagName’)To delete a tag. Arguments:identifier: the target table identifier. Cannot be empty.tagName: name of the tag to be deleted.CALL sys.delete_tag(‘default.T’, ‘my_tag’)
merge_into– when matched then upsert CALL [catalog.]sys.merge_into(‘identifier’,‘targetAlias’, ‘sourceSqls’,‘sourceTable’,‘mergeCondition’, ‘matchedUpsertCondition’,‘matchedUpsertSetting’) – when matched then upsert; when not matched then insert CALL [catalog.]sys.merge_into(‘identifier’,‘targetAlias’, ‘sourceSqls’,‘sourceTable’,‘mergeCondition’, ‘matchedUpsertCondition’,‘matchedUpsertSetting’, ‘notMatchedInsertCondition’,‘notMatchedInsertValues’) – when matched then delete CALL [catalog].sys.merge_into(‘identifier’,‘targetAlias’, ‘sourceSqls’,‘sourceTable’,‘mergeCondition’, ‘matchedDeleteCondition’) – when matched then upsert + delete; – when not matched then insert CALL [catalog].sys.merge_into(‘identifier’,‘targetAlias’, ‘sourceSqls’,‘sourceTable’,‘mergeCondition’, ‘matchedUpsertCondition’,‘matchedUpsertSetting’, ‘notMatchedInsertCondition’,‘notMatchedInsertValues’, ‘matchedDeleteCondition’)To perform “MERGE INTO” syntax. See merge_into action for details of arguments.– for matched order rows, – increase the price, – and if there is no match, – insert the order from – the source table CALL sys.merge_into(‘default.T’, ‘’, ‘’, ‘default.S’, ‘T.id=S.order_id’, ‘’, ‘price=T.price+20’, ‘’, ‘*’)
remove_orphan_filesCALL [catalog.]sys.remove_orphan_files(‘identifier’) CALL [catalog.]sys.remove_orphan_files(‘identifier’, ‘olderThan’)To remove the orphan data files and metadata files. Arguments:identifier: the target table identifier. Cannot be empty.olderThan: to avoid deleting newly written files, this procedure only deletes orphan files older than 1 day by default. This argument can modify the interval.CALL remove_orphan_files(‘default.T’, ‘2023-10-31 12:00:00’)
reset_consumer– reset the new next snapshot id in the consumer CALL [catalog.]sys.reset_consumer(‘identifier’, ‘consumerId’, nextSnapshotId) – delete consumer CALL [catalog.]sys.reset_consumer(‘identifier’, ‘consumerId’)To reset or delete consumer. Arguments:identifier: the target table identifier. Cannot be empty.consumerId: consumer to be reset or deleted.nextSnapshotId (Long): the new next snapshot id of the consumer.CALL sys.reset_consumer(‘default.T’, ‘myid’, 10)
rollback_to– rollback to a snapshot CALL sys.rollback_to(‘identifier’, snapshotId) – rollback to a tag CALL sys.rollback_to(‘identifier’, ‘tagName’)To rollback to a specific version of target table. Argument:identifier: the target table identifier. Cannot be empty.snapshotId (Long): id of the snapshot that will roll back to.tagName: name of the tag that will roll back to.CALL sys.rollback_to(‘default.T’, 10)
expire_snapshots– expires snapshot CALL sys.expire_snapshots(‘identifier’, retainMax)To expire snapshots. Argument:identifier: the target table identifier. Cannot be empty.retainMax: the maximum number of completed snapshots to retain.CALL sys.expire_snapshots(‘default.T’, 2)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/717127.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SentenceTransformer简单使用

SentenceTransformer简单使用 1 SentenceTransformer介绍 SentenceTransformer主要用于对句子、文本和图像进行嵌入。可用于文本和图像的相似度对比查找等 # SentenceTransformer官网地址 https://www.sbert.net/# 安装SentenceTransformer pip install -U sentence-transfo…

求数字的每一位之和

求数字的每一位之和 题目描述&#xff1a;解法思路&#xff1a;解法代码&#xff1a;运行结果&#xff1a; 题目描述&#xff1a; 输入一个整数m&#xff0c;求这个整数m的每⼀位之和&#xff0c;并打印。 测试1&#xff1a; 输⼊&#xff1a;1234 输出&#xff1a;10 测试2&…

土壤侵蚀量化评估

根据之前的文章,已经算出了R、K、LS、C、P 现在计算土壤侵蚀,将几个前期制作好的因子的TIFF文件,用栅格计算器相乘 发现局部地区存在轻度侵蚀,大部分区域是微度侵蚀 然后对比了一下范围 其中的几个因子都在文献范围内,说明计算结果并未出错,可能就是研究区正常范围和结…

6020一拖二快充线:手机充电的革命性创新

在快节奏的现代生活中&#xff0c;手机已不仅仅是一个通讯工具&#xff0c;更是我们工作、学习和娱乐的得力助手。然而&#xff0c;手机的电量问题一直是困扰着我们的难题。为了解决这个问题&#xff0c;市场上出现了一种名为“一拖二快充线”的充电设备&#xff0c;它不仅具备…

etcd入门-(1)安装篇

一、etcd安装 https://github.com/etcd-io/etcd/releases 根据需要下载安装etcd, 确保添加到环境变量 执行 etcd -v 查看安装版本 二、etcd运行 本地运行集群 1.首先安装goreman go install github.com/mattn/goremanlatest2.准备Procfile 将脚本下载到本地&#xff0c;或者复…

八. 实战:CUDA-BEVFusion部署分析-分析BEVFusion中各个ONNX

目录 前言0. 简述1. camera.backbone.onnx(fp16)2. camera.backbone.onnx(int8)3. camera.vtransform.onnx(fp16)4. fuser.onnx(fp16)5. fuser.onnx(int8)6. lidar.backbone.xyz.onnx7. head.bbox.onnx(fp16)总结下载链接参考 前言 自动驾驶之心推出的《CUDA与TensorRT部署实战…

每日一类:Qt中的万能容器

在Qt框架中&#xff0c;QVariant类扮演着一个非常重要的角色。它是一个万能容器类&#xff0c;可以存储Qt中的任何基本类型数据&#xff0c;包括自定义类型。这种灵活性使得QVariant成为Qt编程中不可或缺的工具&#xff0c;特别是在需要处理不同类型数据或进行对象间通信时。 …

Unity UGUI之Scrollbar基本了解

Unity的Scrollbar组件是用于在UI中创建滚动条的组件之一。滚动条通常与其他可滚动的UI元素&#xff08;如滚动视图或列表&#xff09;一起使用&#xff0c;以便用户可以在内容超出可见区域时滚动内容。 以下是Scrollbar的基本信息和用法: 1、创建 在Unity的Hierarchy视图中右…

柯西矩阵介绍

经典定义 柯西矩阵&#xff08;Cauchy Matrix&#xff09;&#xff0c;是一种特殊类型的矩阵&#xff0c;它在数学中的多个领域&#xff0c;包括线性代数、数值分析和插值理论中都有重要应用。柯西矩阵以19世纪法国数学家奥古斯丁-路易柯西的名字命名。 柯西矩阵是一个方阵&am…

Krylov matrix

Krylov矩阵是一种在数值线性代数中使用的矩阵&#xff0c;尤其是在迭代解法中用于求解线性方程组、特征值问题和其他线性代数问题。它是由俄国数学家阿列克谢尼古拉耶维奇克雷洛夫&#xff08;Alexei Nikolaevich Krylov&#xff09;的名字命名的。 Krylov子空间由以下形式的矩…

jetson nano——编译安装opencv==4.4

目录 1.下载源码&#xff0c;我提供的链接如下&#xff1a;1.1文件上传的路径位置&#xff0c;注意ymck是我自己的用户名&#xff08;你们自己换成你们自己相对应的就行&#xff09; 2.解压文件3.安装依赖4.增加swap交换内存4.1临时增加交换内存swap4.2永久增加swap 5.安装open…

2024-03-03 作业

作业要求&#xff1a; 1.使用fwrite、fread将一张随意的bmp图片&#xff0c;修改成德国的国旗 2.使用提供的getch函数&#xff0c;编写一个专门用来输入密码的函数&#xff0c;要求输入密码的时候&#xff0c;显示 * 号&#xff0c;输入回车的时候&#xff0c;密码输入结束 作业…

学习Android的第十九天

目录 Android ExpandableListView 分组列表 ExpandableListView 属性 ExpandableListView 事件 ExpandableListView 的 Adapter 范例 参考文档 Android ViewFlipper 翻转视图 ViewFlipper 属性 ViewFlipper 方法 为 ViewFlipper 加入 View 例子&#xff1a;全屏幕可…

【MySQL】索引(重点)-- 详解

一、索引 没有索引&#xff0c;可能会有什么问题&#xff1f; 索引 &#xff1a;提高数据库的性能&#xff0c;索引是物美价廉的东西了。不用加内存&#xff0c;不用改程序&#xff0c;不用调 sql &#xff0c;只要执行正确的 create index &#xff0c;查询速度就可能提高成…

加密与安全_探索数字证书

文章目录 Pre概述使用keytool生成证书使用Openssl生成证书 &#xff08;推荐&#xff09;证书的吊销小结 Pre PKI - 借助Nginx 实现Https 服务端单向认证、服务端客户端双向认证 PKI - 04 证书授权颁发机构&#xff08;CA&#xff09; & 数字证书 PKI - 数字签名与数字证…

java面试题(spring框架篇)(黑马 )

树形图&#xff1a; 一、Spring框架种的单例bean是线程安全吗&#xff1f; Service Scope("singleton") public class UserServiceImpl implements UserService{ } singleton:bean在每个Spring IOC容器中只有一个实例 protype&#xff1a;一个bean的定义可以有多个…

CPU iowait是什么意思

在linux系统&#xff0c;使用top命令时&#xff0c;可以看到cpu使用统计情况&#xff0c;有时我们会注意到iowait这一项非常高。我们直到&#xff0c;在cpu运行进程、线程时&#xff0c;遇到IO操作&#xff0c;因为IO读写通常比较慢&#xff0c;CPU通常可以阻塞线程&#xff0c…

【Web安全靶场】xss-labs-master 1-20

xss-labs-master 其他靶场见专栏 文章目录 xss-labs-masterlevel-1level-2level-3level-4level-5level-6level-7level-8level-9level-10level-11level-12level-13level-14level-15level-16level-17level-18level-19level-20 level-1 第一关没有进行任何限制&#xff0c;get请求…

pytorch_神经网络构建6

文章目录 强化学习概念实现qLearning基于这个思路,那么解决这个问题的代码如下 强化学习概念 强化学习有一个非常直观的表现&#xff0c;就是从出发点到目标之间存在着一个连续的状态转换&#xff0c;比如说从状态一到状态456&#xff0c;而每一个状态都有多种的行为&#xff…

全国青少年软件编程(Python)等级考试试卷(一级) 测试卷2021年12月

第 1 题 【 单选题 】 下面程序的运行结果是什么&#xff1f;&#xff08; &#xff09; a10 b5 ca*b print(c) A :10 B :15 C :50 D :5 正确答案:C 试题解析: 第 2 题 【 单选题 】 与a>b and b>c等价的是&#xff1f;&#xff08; &#xff09; A…