Apache Paimon 使用之 Querying Tables

Querying Tables
1.Batch Query

Paimon的批量读取返回表快照中的所有数据。默认情况下,批处理读取返回最新的快照。

-- Flink SQL
SET 'execution.runtime-mode' = 'batch';
2.Batch Time Travel

Paimon批量读取指定快照或标签的数据。

Flink 动态配置

-- read the snapshot with id 1L
SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '1') */;-- read the snapshot from specified timestamp in unix milliseconds
SELECT * FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;-- read tag 'my-tag'
SELECT * FROM t /*+ OPTIONS('scan.tag-name' = 'my-tag') */;

Flink 1.18+

-- read the snapshot from specified timestamp
SELECT * FROM t FOR SYSTEM_TIME AS OF TIMESTAMP '2023-01-01 00:00:00';-- you can also use some simple expressions (see flink document to get supported functions)
SELECT * FROM t FOR SYSTEM_TIME AS OF TIMESTAMP '2023-01-01 00:00:00' + INTERVAL '1' DAY

Spark3

Spark 3.3+,可以在查询中使用VERSION AS OFTIMESTAMP AS OF进行时间旅行:

-- read the snapshot with id 1L (use snapshot id as version)
SELECT * FROM t VERSION AS OF 1;-- read the snapshot from specified timestamp 
SELECT * FROM t TIMESTAMP AS OF '2023-06-01 00:00:00.123';-- read the snapshot from specified timestamp in unix seconds
SELECT * FROM t TIMESTAMP AS OF 1678883047;-- read tag 'my-tag'
SELECT * FROM t VERSION AS OF 'my-tag';

如果标签的名称是一个数字,并且等于快照ID,则VERSION AS OF语法将首先考虑标签。

例如,标签叫1但基于快照2,语句 SELECT * FROM t VERSION AS OF ‘1’ 实际上查询快照2(即标签1)而不是快照1。

Spark3-DF

// read the snapshot from specified timestamp in unix seconds
spark.read.option("scan.timestamp-millis", "1678883047000").format("paimon").load("path/to/table")// read the snapshot with id 1L (use snapshot id as version)
spark.read.option("scan.snapshot-id", 1).format("paimon").load("path/to/table")// read tag 'my-tag'
spark.read.option("scan.tag-name", "my-tag").format("paimon").load("path/to/table")

Hive 引擎

Hive需要将以下配置参数添加到hive-site.xml文件中:

<!--This parameter is used to configure the whitelist of permissible configuration items allowed for use in SQL standard authorization mode.-->
<property><name>hive.security.authorization.sqlstd.confwhitelist</name><value>mapred.*|hive.*|mapreduce.*|spark.*</value>
</property><!--This parameter is an additional configuration for hive.security.authorization.sqlstd.confwhitelist. It allows you to add other permissible configuration items to the existing whitelist.-->
<property><name>hive.security.authorization.sqlstd.confwhitelist.append</name><value>mapred.*|hive.*|mapreduce.*|spark.*</value>
</property>
-- read the snapshot with id 1L (use snapshot id as version)
SET paimon.scan.snapshot-id=1
SELECT * FROM t;
SET paimon.scan.snapshot-id=null;-- read the snapshot from specified timestamp in unix seconds
SET paimon.scan.timestamp-millis=1679486589444;
SELECT * FROM t;
SET paimon.scan.timestamp-millis=null;-- read tag 'my-tag'
set paimon.scan.tag-name=my-tag;
SELECT * FROM t;
set paimon.scan.tag-name=null;
3.批次读取新增数据

在开始的snapshot和结束的snapshot之间读取增量的变化数据。

例如

  • “5,10”是指快照5和快照10之间的变化。
  • “TAG1,TAG3”是指TAG1和TAG3之间的更改。

Flink 引擎

-- incremental between snapshot ids
SELECT * FROM t /*+ OPTIONS('incremental-between' = '12,20') */;-- incremental between snapshot time mills
SELECT * FROM t /*+ OPTIONS('incremental-between-timestamp' = '1692169000000,1692169900000') */;

Spark3引擎

需要Spark 3.2+。

Paimon支持使用Spark SQL执行Spark Table Valued Function实现的增量查询。要启用此功能,需要以下配置:

--conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions

可以在查询中使用paimon_incremental_query来提取增量数据:

-- read the incremental data between snapshot id 12 and snapshot id 20.
SELECT * FROM paimon_incremental_query('tableName', 12, 20);

Spark-DF

// incremental between snapshot ids
spark.read().format("paimon").option("incremental-between", "12,20").load("path/to/table")// incremental between snapshot time mills
spark.read().format("paimon").option("incremental-between-timestamp", "1692169000000,1692169900000").load("path/to/table")

Hive

-- incremental between snapshot ids
SET paimon.incremental-between='12,20';
SELECT * FROM t;
SET paimon.incremental-between=null;-- incremental between snapshot time mills
SET paimon.incremental-between-timestamp='1692169000000,1692169900000';
SELECT * FROM t;
SET paimon.incremental-between-timestamp=null;

在批处理SQL中,不允许返回DELETE记录,因此-D的记录将被删除。如果想查看DELETE记录,可以使用audit_log表:

SELECT * FROM t$audit_log /*+ OPTIONS('incremental-between' = '12,20') */;
4.流式查询

默认情况下,流式查询在首次启动时会在表上生成最新得快照,并继续读取最新的更改。

-- Flink SQL
SET 'execution.runtime-mode' = 'streaming';

可以在没有快照数据的情况下进行流式查询,可以使用latest scan模式:

-- Continuously reads latest changes without producing a snapshot at the beginning.
SELECT * FROM t /*+ OPTIONS('scan.mode' = 'latest') */;
4.Streaming Time Travel

如果只想处理今天及以后的数据,可以使用分区进行过滤:

SELECT * FROM t WHERE dt > '2023-06-26';

如果不是分区表,或者无法按分区过滤,可以使用时间旅行的流式读取。

Flink 动态配置

-- read changes from snapshot id 1L 
SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '1') */;-- read changes from snapshot specified timestamp
SELECT * FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;-- read snapshot id 1L upon first startup, and continue to read the changes
SELECT * FROM t /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '1') */;

Flink 1.18+

-- read the snapshot from specified timestamp
SELECT * FROM t FOR SYSTEM_TIME AS OF TIMESTAMP '2023-01-01 00:00:00';-- you can also use some simple expressions (see flink document to get supported functions)
SELECT * FROM t FOR SYSTEM_TIME AS OF TIMESTAMP '2023-01-01 00:00:00' + INTERVAL '1' DAY

时间旅行的流式读取依赖于快照,但默认情况下,快照仅保留1小时内的数据,会影响读取较旧的增量数据。

因此,Paimon还提供了另一种流式读取模式,scan.file-creation-time-millis,该模式保留timeMillis之后生成的文件。

SELECT * FROM t /*+ OPTIONS('scan.file-creation-time-millis' = '1678883047356') */;
5.Consumer ID

可以在流式读取表时指定consumer-id

SELECT * FROM t /*+ OPTIONS('consumer-id' = 'myid') */;

当流式读取Paimon表时,下一个快照ID将记录到文件系统中。优点如下:

  • 当上一个作业停止时,新开始的作业可以上一个进度开始,而无需从状态恢复。新的读取将从消费者文件中找到的下一个快照ID开始读取。如果不希望这种行为,可以将“consumer.ignore-progress”设置为True。
  • 在决定快照是否已过期时,Paimon会查看文件系统中表的所有消费者,如果有消费者仍然依赖此快照,则此快照不会在过期前删除。
  • 当没有水印定义时,Paimon表会将快照中的水印传递给下游的Paimon表,这意味着可以跟踪整个管道的水印进度。

注意:消费者将防止快照过期,可以指定“consumer.expiration-time”来管理消费者的生命周期。

默认情况下,消费者使用exactly-once模式来记录消费进度,这严格确保消费者中记录的是所有reader精确消费的快照ID + 1。

可以将consumer.mode设置为at-least-once以允许reader以不同的速率消耗快照,并将所有reader中最慢的快照ID记录到消费者中。这种模式可以提供更多功能,例如水印对齐。

注意

  • 当没有水印定义时,at-least-once模式的消费者无法提供将快照中的水印传递给下游的能力。
  • 由于exactly-once模式和at-least-once模式的实现完全不同,因此flink的状态是不兼容的,在切换模式时无法从状态恢复。

可以使用给定的消费者ID和下一个快照ID重置消费者,并删除具有给定消费者ID的消费者。

首先,需要使用此消费者ID停止流式传输任务,然后执行重置消费者操作作业。

Flink 引擎

<FLINK_HOME>/bin/flink run \/path/to/paimon-flink-action-0.7.0-incubating.jar \reset-consumer \--warehouse <warehouse-path> \--database <database-name> \ --table <table-name> \--consumer_id <consumer-id> \[--next_snapshot <next-snapshot-id>] \[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]]

如果想删除消费者,请不要指定-next_snapshot参数。

6.Read Overwrite

默认情况下,流式读取将忽略INSERT OVERWRITE生成的提交。如果想读取OVERWRITE的提交,可以配置streaming-read-overwrite

a) 并行读取

Flink 引擎

默认情况下,批处理读取的并行度与拆分数相同,而流读取的并行度与桶数相同,但不大于scan.infer-parallelism.max

禁用scan.infer-parallelism,将使用全局并行度配置,还可以从scan.parallelism手动指定并行性。

KeyDefaultTypeDescription
scan.infer-parallelismtrueBooleanIf it is false, parallelism of source are set by global parallelism. Otherwise, source parallelism is inferred from splits number (batch mode) or bucket number(streaming mode).
scan.infer-parallelism.max1024IntegerIf scan.infer-parallelism is true, limit the parallelism of source through this option.
scan.parallelism(none)IntegerDefine a custom parallelism for the scan source. By default, if this option is not defined, the planner will derive the parallelism for each statement individually by also considering the global configuration. If user enable the scan.infer-parallelism, the planner will derive the parallelism by inferred parallelism.
7.查询优化

强烈建议在查询的同时指定分区和主键进行过滤,这将加快查询数据的速度。

可以加速数据查询效率的:

  • =
  • <
  • <=
  • >
  • >=
  • IN (...)
  • LIKE 'abc%'
  • IS NULL

Paimon将按主键对数据进行排序,可以加快点查询和范围查询的速度,使用复合主键时,查询过滤器最好匹配主键的最左前缀,以便加速。

假设表如下

CREATE TABLE orders (catalog_id BIGINT,order_id BIGINT,.....,PRIMARY KEY (catalog_id, order_id) NOT ENFORCED -- composite primary key
);

查询通过为主键最左前缀指定范围过滤器来获得良好的加速。

SELECT * FROM orders WHERE catalog_id=1025;SELECT * FROM orders WHERE catalog_id=1025 AND order_id=29495;SELECT * FROM ordersWHERE catalog_id=1025AND order_id>2035 AND order_id<6000;

以下过滤器无法加速查询。

SELECT * FROM orders WHERE order_id=29495;SELECT * FROM orders WHERE catalog_id=1025 OR order_id=29495;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/732897.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

鸿蒙开发(二)-项目结构

鸿蒙开发(二)-项目结构 上篇文章我们讲了如何配置鸿蒙开发的基础环境&#xff0c;以及创建了第一个鸿蒙程序。 这篇我们讲述了鸿蒙应用的项目目录结构。 如图所示&#xff1a;我们切换项目project可以看到。 另一种则是Ohos模式: AppScope->app.json5 应用的全局配置 {&q…

300分钟吃透分布式缓存-26讲:如何大幅成倍提升Redis处理性能?

主线程 Redis 自问世以来&#xff0c;广受好评&#xff0c;应用广泛。但相比&#xff0c; Memcached 单实例压测 TPS 可以高达百万&#xff0c;线上可以稳定跑 20~40 万而言&#xff0c;Redis 的单实例压测 TPS 不过 10~12 万&#xff0c;线上一般最高也就 2~4 万&#xff0c;…

【算法沉淀】最长回文子串

&#x1f389;&#x1f389;欢迎光临&#x1f389;&#x1f389; &#x1f3c5;我是苏泽&#xff0c;一位对技术充满热情的探索者和分享者。&#x1f680;&#x1f680; &#x1f31f;特别推荐给大家我的最新专栏《数据结构与算法&#xff1a;初学者入门指南》&#x1f4d8;&am…

element-ui 中 upload组件 如何传递额外的参数 ?

参考elementui 文档 如何通过data 进行额外传递参数&#xff1f;(:data"uploadData") <el-uploadref"fileUploadBtn1"class"upload-demo"accept".xls,.xlsx" :limit"1" :action"uploadFileUrl" :on-success&q…

MongoDB聚合运算符;$dateToParts

$dateToParts聚合运算符将日期表达式拆分成多个字段放在一个文档返回&#xff0c;属性有year、month、day、hour、minute、second和millisecond。如果iso8601属性设置为true&#xff0c;返回的各部分用ISO周日期返回&#xff0c;属性分别是&#xff1a;isoWeekYear、isoWeek、i…

jvm堆概述

《java虚拟机规范》中对java堆的描述是&#xff1a;所有的对象实例以及数组都应当在运行时分配在堆上。 一个JVM实例只存在一个堆内存(就是new 出来一个对象)&#xff0c;java内存管理的核心区域 java堆区在jvm启动的时候就被创建&#xff0c;空间大小确定。是jvm管理的最大一…

通过Step Back提示增强LLM的推理能力

原文地址&#xff1a;enhancing-llms-reasoning-with-step-back-prompting 论文地址&#xff1a;https://arxiv.org/pdf/2310.06117.pdf 2023 年 11 月 6 日 Introduction 在大型语言模型不断发展的领域中&#xff0c;一个持续的挑战是它们处理复杂任务的能力&#xff0c;这…

图形库实战丨C语言扫雷小游戏(超2w字,附图片素材)

目录 效果展示 游玩链接&#xff08;无需安装图形库及VS&#xff09; 开发环境及准备 1.VS2022版本 2.图形库 游戏初始化 1.头文件 2.创建窗口 3.主函数框架 开始界面函数 1.初始化 1-1.设置背景颜色及字体 1-2.处理背景音乐及图片素材 1-3.处理背景图位置 2.选…

【操作系统学习笔记】文件管理2.1

【操作系统学习笔记】文件管理2.1 参考书籍: 王道考研 视频地址: Bilibili 文件系统的层次结构 用户/应用程序用户接口: 文件系统需要向上层的用户提供一些简单易用的功能接口。这层就是用于处理用户发出的系统调用请求文件目录系统: 用户是通过文件路径来访问文件的&#x…

在职场上,如何提升影响力

在职场上提升影响力是一个持续且多方面的过程&#xff0c;涉及个人技能、人际关系、领导能力以及个人品牌的建立等多个方面。以下是一些建议&#xff0c;帮助你提升在职场上的影响力&#xff1a; 提升专业技能&#xff1a; 深入学习和掌握所在领域的专业知识&#xff0c;保持对…

Linux服务器安装jdk

背景: 安装JDK是我们java程序在服务器运行的必要条件,下面描述几个简单的命令就可再服务器上成功安装jdk 命令总览: yum update -y yum list | grep jdk yum -y install java-1.8.0-openjdk java -version 1.查看可安装版本 yum list | grep jdk 2.如果查不到可先进行 yum upd…

leetcode 热题 100_缺失的第一个正数

题解一&#xff1a; 正负模拟哈希&#xff1a;偏技巧类的题目&#xff0c;在无法使用额外空间的情况下&#xff0c;只能在原数组中做出类似哈希表的模拟。除去数值&#xff0c;我们还可以用正负来表示下标值的出现情况。首先&#xff0c;数组中存在正负数和0&#xff0c;而负数…

猫头虎分享已解决Bug || 数据中心断电:PowerLoss, DataCenterBlackout

博主猫头虎的技术世界 &#x1f31f; 欢迎来到猫头虎的博客 — 探索技术的无限可能&#xff01; 专栏链接&#xff1a; &#x1f517; 精选专栏&#xff1a; 《面试题大全》 — 面试准备的宝典&#xff01;《IDEA开发秘籍》 — 提升你的IDEA技能&#xff01;《100天精通鸿蒙》 …

通过Annotation将用户操作记录到数据库表功能实现

一、背景 在用户对我们所开发的系统访问的时候&#xff0c;需要我们的系统具有强大的健壮性&#xff0c;使得给与用户的体验感十足。在业务开发的过程中&#xff0c;我们通过将几个相关的操作绑定成一个事件&#xff0c;使得安全性以及数据的前后一致性得到提高。但是在溯源方面…

未来城市:探索数字孪生在智慧城市中的实际应用与价值

目录 一、引言 二、数字孪生与智慧城市的融合 三、数字孪生在智慧城市中的实际应用 1、智慧交通管理 2、智慧能源管理 3、智慧建筑管理 4、智慧城市管理 四、数字孪生在智慧城市中的价值 五、挑战与展望 六、结论 一、引言 随着科技的飞速发展&#xff0c;智慧城市已…

RLAIF在提升大型语言模型训练中的应用

RLAIF在提升大型语言模型训练中的应用 大型语言模型&#xff08;LLMs&#xff09;在理解和生成自然语言方面展示了巨大能力&#xff0c;但仍面临输出不可靠、推理能力有限、缺乏一致性个性或价值观对齐等挑战。为解决这些问题&#xff0c;研究者开发了一种名为“来自AI反馈的强…

C++模板基础知识

文章目录 模板模板的声明与定义函数模板非类型模板参数类模板类的成员函数定义构造函数的定义类的静态成员的定义类模板的实例化使用模板类型中的类型成员 默认模板参数指定显示模板实参(函数模板显示实参)引用折叠和右值引用参数可变参数模板对参数包的扩展对参数包的转发可变…

linux 日志轮转

前言: 在Linux系统中&#xff0c;日志轮转是一种重要的管理机制&#xff0c;它可以帮助管理日志文件的大小、数量以及保持系统的性能稳定。通过日志轮转&#xff0c;可以定期对日志文件进行归档、压缩或清理&#xff0c;确保系统的日志记录不会无限增长而占用过多的磁盘空间…

动态SLAM论文阅读笔记

近期阅读了许多动态SLAM相关的论文&#xff0c;它们基本都是基于ORB-SLAM算法&#xff0c;下面简单记录一下它们的主要特点&#xff1a; 1.DynaSLAM 采用CNN网络进行分割多视图几何辅助的方式来判断动态点&#xff0c;并进行了背景修复工作。 2.Detect-SLAM 实时性问题&…

TQTT X310 软件无线电设备的FLASH固件更新方法--WIN和UBUNTU环境

TQTT X310 除了PCIE口全部兼容USRP 官方的X310&#xff0c;并配备两块UBX160射频子板以及GPSDO。TQTT X310可以直接使用官方的固件&#xff0c;但是不支持官方的固件升级命令。这篇BLOG提供烧写刷新FLASH的方法。 这里分别给出WIN下和UBUNTU下升级的软件和方法 WIN环境下烧写…