5 Paimon数据湖之表数据查询详解

更多Paimon数据湖内容请关注:https://edu.51cto.com/course/35051.html

虽然前面我们已经讲过如何查询Paimon表中的数据了,但是有一些细节的东西还需要详细分析一下。

  • 首先是针对Paimon中系统表的查询,例如snapshots\schemas\options等等这些系统表。
    其实简单理解就是我们可以通过sql的形式查询系统表来查看实体表的快照、schema等信息,这些信息我们也可以直接到hdfs中查看,只是不太方便。

  • 在查询数据的时候,可以细分为批量读取和流式读取,因为Paimon可以同时支持批处理和流处理。

  • 在查询数据的时候,如果想要从之前的某一个时间点开始查询数据,也就说任务启动的时候想要查询一些历史数据,则需要用到时间旅行这个特性,可以在SQL查询语句中通过动态表选项指定scan.mode参数来控制具体查询哪些历史数据。

Scan Mode的值可以有多种,不同的值代表不同的含义,下面我们来具体分析一下:
在这里插入图片描述

注意:在分析的时候,我们需要针对批处理和流处理这两种情况分别进行分析。

  • (1)default:如果我们在执行查询的时候,没有指定scan.mode参数,则默认是default。但是此时需要注意,如果我们也没有同时指定其他参数,例如:timestamp-millis\snapshot-id等scan相关的参数,那么默认会执行latest-full策略。
    所以说,我们在执行查询的时候,如果没有指定任何scan相关的参数,那么默认执行的策略就是latest-full。

  • (2)latest-full:和full是一样的效果,不过full这个参数已经被标记为过期了。针对批处理,表示只读取最新快照中的所有数据,读取完成以后任务就执行结束了。针对流处理,表示第一次启动时读取最新快照中的所有数据,然后继续读取后续新增的变更数据,这个任务会一直运行。

  • (3)latest:针对批处理,他的执行效果和latest-full是一样的,只会读取最新快照中的所有数据。但是针对流处理就不一样了,此时表示只读取最新的变更数据,也就是说任务启动之后,只读取新增的数据,之前的历史快照中的数据不读取。类似于kafka中消费者里面的latest消费策略。

  • (4)from-snapshot:使用此策略的时候,需要同时指定snapshot-id参数。针对批处理,表示只读取指定id的快照中的所有数据。针对流处理,表示从指定id的快照开始读取变更数据(注意:此时不是读取这个快照中的所有数据,而是读取此快照中的变更数据,也可以理解为这个快照和上一个快照相比新增的数据),当然,后续新增的变更数据也是可以读取到的,因为这个是流处理,他会一直执行读取操作。

  • (5)from-snapshot-full:使用此策略的时候,也需要同时指定snapshot-id参数。针对批处理,他的执行效果和from-snapshot是一样的。针对流处理,表示第一次启动时读取指定id的快照中的所有数据,然后继续读取后续新增的变更数据,此时任务会一直执行。

  • (6)from-timestamp:使用此策略的时候,需要同时指定timestamp-millis参数。针对批处理,表示只读取指定时间戳的快照中的所有数据。针对流处理,表示从指定时间戳的快照开始读取变更数据,(注意,这里也是读取这个快照中的变更数据,不是所有数据。),然后读取后续新增的变更数据。

  • (7)incremental:表示是增量查询,这个主要是针对批处理的,通过这种策略可以读取开始和结束快照之间的增量变化。开始和结束快照可以通过快照id或者是时间戳进行指定。
    如果是使用快照id,则需要通过incremental-between参数指定。
    如果是使用时间戳,则需要通过incremental-between-timestamp参数指定。

  • (8)compacted-full:想要使用这个参数有一个前提,Paimon表需要开启完全压缩(full compaction)。此时针对批处理,表示只读取最新完全压缩(full compaction)的快照中的所有数据。针对流处理,表示第一次启动时读取最新完全压缩(full compaction)的快照中的所有数据,然后继续读取后续新增的变更数据。

针对这里面的latest、latest-full、compacted-full这几种策略放在一起可能容易混淆,下面我们来通过一个图重新梳理一下:
在这里插入图片描述

首先看中间这条线,表示是数据的时间轴,左边是历史数据,右边是最新产生的数据。

中间这条线上面是批处理,下面是流处理。

我们首先来看批处理:
如果我们指定了scan.modelatest-full或者是latest,则会读取最新的快照中的所有数据,也就是Last Snapshot中的数据。
如果我们指定了scan.modecompacted-full,则会读取最新的完全压缩(full compaction)的快照中的数据,也就是Last Compact Snapshot中的数据。

接下来看一下流处理:
如果我们指定了scan.modelatest-full,则会在任务第一次启动时读取最新快照中的所有数据,然后继续读取后续新增的变更数据。也就是第一次启动时先读取Last Snapshot中的所有数据,接着读取后续新产生的数据。
如果我们指定了scan.modelatest,则此时只读取最新的变更数据,不读取LastSnapshot快照中的数据。
如果我们指定了scan.modecompacted-full,则第一次启动时会读取最新完全压缩(full compaction)的快照中的所有数据,也就是Last Compact Snapshot中的数据,接着读取后续新产生的数据。

这就是这些策略在批处理和流处理中的执行流程。

(1)查询系统表

下面我们来通过具体的案例来演示一下前面提到的查询数据相关的用法。

首先创建一个向Paimon表中模拟写入数据的类,便于一会测试使用
创建package:tech.xuwei.paimon.query

创建object:FlinkSQLWriteToPaimon

代码如下:

package tech.xuwei.paimon.queryimport org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment/*** 通过FlinkSQL 向Paimon中模拟写入数据* Created by xuwei*/
object FlinkSQLWriteToPaimon {def main(args: Array[String]): Unit = {//获取执行环境val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setRuntimeMode(RuntimeExecutionMode.STREAMING)val tEnv = StreamTableEnvironment.create(env)//创建Paimon类型的CatalogtEnv.executeSql("""|CREATE CATALOG paimon_catalog WITH (|    'type'='paimon',|    'warehouse'='hdfs://bigdata01:9000/paimon'|)|""".stripMargin)tEnv.executeSql("USE CATALOG paimon_catalog")//创建Paimon类型的表tEnv.executeSql("""|CREATE TABLE IF NOT EXISTS `query_table`(|    name STRING,|    age INT,|    PRIMARY KEY (name) NOT ENFORCED|)|""".stripMargin)//写入数据tEnv.executeSql("INSERT INTO query_table(name,age) VALUES('jack',18)")tEnv.executeSql("INSERT INTO query_table(name,age) VALUES('tom',19)")tEnv.executeSql("INSERT INTO query_table(name,age) VALUES('mick',20)")}}

在idea中运行这个代码。

接下来创建一个类来查询一下Paimon中的系统表。

创建object:FlinkPaimonSystemTable

代码如下:

package tech.xuwei.paimon.queryimport org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment/*** 查询Paimon中的系统表* Created by xuwei*/
object FlinkPaimonSystemTable {def main(args: Array[String]): Unit = {//获取执行环境val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setRuntimeMode(RuntimeExecutionMode.BATCH)//使用批处理模式val tEnv = StreamTableEnvironment.create(env)//创建Paimon类型的CatalogtEnv.executeSql("""|CREATE CATALOG paimon_catalog WITH (|    'type'='paimon',|    'warehouse'='hdfs://bigdata01:9000/paimon'|)|""".stripMargin)tEnv.executeSql("USE CATALOG paimon_catalog")//snapshot信息表,对应的其实就是hdfs中表的snapshot目录下的snapshot-*文件信息println("====================snapshot信息表===========================")tEnv.executeSql("SELECT * FROM query_table$snapshots").print()//schema信息表,对应的其实就是hdfs中表的schema目录下的schema-*文件信息println("====================schema信息表===========================")tEnv.executeSql("SELECT * FROM query_table$schemas").print()//manifest信息表,对应的其实就是hdfs中表的manifest目录下的manifest-*文件信息println("====================manifest信息表===========================")tEnv.executeSql("SELECT * FROM query_table$manifests").print()//file信息表,对应的其实就是hdfs中表的bucket-*目录下的data-*文件信息println("====================file信息表===========================")tEnv.executeSql("SELECT * FROM query_table$files").print()//option信息表,对应的就是建表语句中with里面指定的参数信息,在表的schema-*文件中也能看到option信息println("====================option信息表===========================")tEnv.executeSql("SELECT * FROM query_table$options").print()//consumer信息表,在查询数据的sql语句中指定了consumer-id之后才能看到println("====================consumer信息表===========================")tEnv.executeSql("SELECT * FROM query_table$consumers").print()//audit log信息表,相当于是表的审核日志,可以看到表中每条数据的rowkind,也就是+I\-U\+U\-Dprintln("====================audit log信息表===========================")tEnv.executeSql("SELECT * FROM query_table$audit_log").print()}
}

运行代码。
注意:在本地执行flink sql中的print,会看到下面错误:

java.lang.IllegalStateException: MiniCluster is not yet running or has already been shut down.at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)at org.apache.flink.runtime.minicluster.MiniCluster.getDispatcherGatewayFuture(MiniCluster.java:1026)at org.apache.flink.runtime.minicluster.MiniCluster.runDispatcherCommand(MiniCluster.java:899)at org.apache.flink.runtime.minicluster.MiniCluster.getJobStatus(MiniCluster.java:823)at org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobStatus(MiniClusterJobClient.java:91)at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.isJobTerminated(CollectResultFetcher.java:210)at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:118)at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)at org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:219)at org.apache.flink.table.utils.print.TableauStyle.print(TableauStyle.java:120)at org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:153)at tech.xuwei.paimon.query.FlinkPaimonSystemTable$.main(FlinkPaimonSystemTable.scala:35)at tech.xuwei.paimon.query.FlinkPaimonSystemTable.main(FlinkPaimonSystemTable.scala)

这个异常不影响程序执行,实际工作中我们不会写这种代码,一般都是在sql中写insert into select语句了,在这主要是为了方便测试,忽略这个异常即可。

如果感觉看起来比较乱,可以修改一下log4j.properties日志中的告警级别,改为error级别即可。

log4j.rootLogger=error,stdoutlog4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout 
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n

重新运行代码,可以看到如下结果:

====================snapshot信息表===========================
+----+----------------------+----------------------+--------------------------------+----------------------+--------------------------------+-------------------------+----------------------+----------------------+------------------------+----------------------+
| op |          snapshot_id |            schema_id |                    commit_user |    commit_identifier |                    commit_kind |             commit_time |   total_record_count |   delta_record_count | changelog_record_count |            watermark |
+----+----------------------+----------------------+--------------------------------+----------------------+--------------------------------+-------------------------+----------------------+----------------------+------------------------+----------------------+
| +I |                    1 |                    0 | 8f74d97b-bf6b-4ac7-bb47-3bb... |  9223372036854775807 |                         APPEND | 2023-07-28 17:35:22.859 |                    1 |                    1 |                      0 | -9223372036854775808 |
| +I |                    2 |                    0 | 49412497-1749-4566-8bf8-1c5... |  9223372036854775807 |                         APPEND | 2023-07-28 17:35:24.802 |                    2 |                    1 |                      0 | -9223372036854775808 |
| +I |                    3 |                    0 | e55e756d-e528-4b7c-97f0-a01... |  9223372036854775807 |                         APPEND | 2023-07-28 17:35:26.409 |                    3 |                    1 |                      0 | -9223372036854775808 |
+----+----------------------+----------------------+--------------------------------+----------------------+--------------------------------+-------------------------+----------------------+----------------------+------------------------+----------------------+
3 rows in set
====================schema信息表===========================
+----+----------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
| op |            schema_id |                         fields |                 partition_keys |                   primary_keys |                        options |                        comment |
+----+----------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
| +I |                    0 | [{"id":0,"name":"name","typ... |                             [] |                       ["name"] |                             {} |                                |
+----+----------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
1 row in set
====================manifest信息表===========================
+----+--------------------------------+----------------------+----------------------+----------------------+----------------------+
| op |                      file_name |            file_size |      num_added_files |    num_deleted_files |            schema_id |
+----+--------------------------------+----------------------+----------------------+----------------------+----------------------+
| +I | manifest-800ac729-22d3-494b... |                 1665 |                    1 |                    0 |                    0 |
| +I | manifest-61d14e4e-d2a0-42ac... |                 1675 |                    1 |                    0 |                    0 |
| +I | manifest-fd8e45b0-d456-467a... |                 1673 |                    1 |                    0 |                    0 |
+----+--------------------------------+----------------------+----------------------+----------------------+----------------------+
3 rows in set
====================file信息表===========================
+----+--------------------------------+-------------+--------------------------------+--------------------------------+----------------------+-------------+----------------------+----------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+-------------------------+
| op |                      partition |      bucket |                      file_path |                    file_format |            schema_id |       level |         record_count |   file_size_in_bytes |                        min_key |                        max_key |              null_value_counts |                min_value_stats |                max_value_stats |           creation_time |
+----+--------------------------------+-------------+--------------------------------+--------------------------------+----------------------+-------------+----------------------+----------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+-------------------------+
| +I |                             [] |           0 | data-6b23bcaf-3dbe-46c0-a67... |                            orc |                    0 |           0 |                    1 |                  566 |                         [jack] |                         [jack] |                {age=0, name=0} |            {age=18, name=jack} |            {age=18, name=jack} | 2023-07-28 17:35:22.453 |
| +I |                             [] |           0 | data-ce40f0df-aa2a-4682-8b6... |                            orc |                    0 |           0 |                    1 |                  581 |                         [mick] |                         [mick] |                {age=0, name=0} |            {age=20, name=mick} |            {age=20, name=mick} | 2023-07-28 17:35:26.257 |
| +I |                             [] |           0 | data-ac9bd895-2b8e-4efe-969... |                            orc |                    0 |           0 |                    1 |                  572 |                          [tom] |                          [tom] |                {age=0, name=0} |             {age=19, name=tom} |             {age=19, name=tom} | 2023-07-28 17:35:24.603 |
+----+--------------------------------+-------------+--------------------------------+--------------------------------+----------------------+-------------+----------------------+----------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+-------------------------+
3 rows in set
====================option信息表===========================
Empty set
====================tag信息表===========================
Empty set
====================consumer信息表===========================
Empty set
====================audit log信息表===========================
+----+--------------------------------+--------------------------------+-------------+
| op |                        rowkind |                           name |         age |
+----+--------------------------------+--------------------------------+-------------+
| +I |                             +I |                           jack |          18 |
| +I |                             +I |                           mick |          20 |
| +I |                             +I |                            tom |          19 |
+----+--------------------------------+--------------------------------+-------------+
3 rows in set
(2)批量读取

下面演示一下如何在批量读取中使用时间旅行功能。

创建object:tech.xuwei.paimon.query.FlinkPaimonBatchQuery

代码如下:

package tech.xuwei.paimon.queryimport org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment/*** 批量读取* Created by xuwei*/
object FlinkPaimonBatchQuery {def main(args: Array[String]): Unit = {//获取执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment//SET 'execution.runtime-mode' = 'batch';env.setRuntimeMode(RuntimeExecutionMode.BATCH)//使用批处理模式val tEnv = StreamTableEnvironment.create(env)//创建Paimon类型的CatalogtEnv.executeSql("""|CREATE CATALOG paimon_catalog WITH (|    'type'='paimon',|    'warehouse'='hdfs://bigdata01:9000/paimon'|)|""".stripMargin)tEnv.executeSql("USE CATALOG paimon_catalog")//批量查询数据tEnv.executeSql("""|SELECT * FROM query_table|-- /*+ OPTIONS('scan.mode'='latest-full') */ -- 默认策略,可以省略不写,只读取最新快照中的所有数据|-- /*+ OPTIONS('scan.mode'='latest') */ -- 在批处理模式下和latest-full的效果一致|-- /*+ OPTIONS('scan.mode'='from-snapshot','scan.snapshot-id' = '2') */ -- 只读取指定id的快照中的所有数据|-- /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '2') */ -- 在批处理模式下和from-snapshot的效果一致|-- /*+ OPTIONS('scan.mode'='from-timestamp','scan.timestamp-millis' = '1690536924802') */ -- 只读取指定时间戳的快照中的所有数据|-- /*+ OPTIONS('scan.mode'='incremental','incremental-between' = '1,3') */ -- 指定两个快照id,查询这两个快照之间的增量变化|-- /*+ OPTIONS('scan.mode'='incremental','incremental-between-timestamp' = '1690536922859,1690536926409') */ -- 指定两个时间戳,查询这两个快照之间的增量变化|""".stripMargin).print()}
}

运行代码,查看每一种策略的数据结果。

注意:在演示compacted-full这种策略的时候需要给表开启full-compaction

所以重新创建一个新的表。

创建object:FlinkSQLWriteToPaimonForCompact

代码如下:

package tech.xuwei.paimon.queryimport org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment/*** 通过FlinkSQL 向Paimon中模拟写入数据* Created by xuwei*/
object FlinkSQLWriteToPaimonForCompact {def main(args: Array[String]): Unit = {//获取执行环境val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setRuntimeMode(RuntimeExecutionMode.STREAMING)val tEnv = StreamTableEnvironment.create(env)//创建Paimon类型的CatalogtEnv.executeSql("""|CREATE CATALOG paimon_catalog WITH (|    'type'='paimon',|    'warehouse'='hdfs://bigdata01:9000/paimon'|)|""".stripMargin)tEnv.executeSql("USE CATALOG paimon_catalog")//创建Paimon类型的表tEnv.executeSql("""|CREATE TABLE IF NOT EXISTS `query_table_compact`(|    name STRING,|    age INT,|    PRIMARY KEY (name) NOT ENFORCED|)WITH(|    'changelog-producer' = 'full-compaction',|    'full-compaction.delta-commits' = '1'|)|""".stripMargin)//写入数据tEnv.executeSql("INSERT INTO query_table_compact(name,age) VALUES('jack',18)")tEnv.executeSql("INSERT INTO query_table_compact(name,age) VALUES('tom',19)")tEnv.executeSql("INSERT INTO query_table_compact(name,age) VALUES('mick',20)")}}

运行代码。

再创建一个新的读取数据的类:

创建object:FlinkPaimonBatchQueryForCompact

代码如下:

package tech.xuwei.paimon.queryimport org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment/*** 批量读取* Created by xuwei*/
object FlinkPaimonBatchQueryForCompact {def main(args: Array[String]): Unit = {//获取执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment//SET 'execution.runtime-mode' = 'batch';env.setRuntimeMode(RuntimeExecutionMode.BATCH)//使用批处理模式val tEnv = StreamTableEnvironment.create(env)//创建Paimon类型的CatalogtEnv.executeSql("""|CREATE CATALOG paimon_catalog WITH (|    'type'='paimon',|    'warehouse'='hdfs://bigdata01:9000/paimon'|)|""".stripMargin)tEnv.executeSql("USE CATALOG paimon_catalog")//批量查询数据tEnv.executeSql("""|SELECT * FROM query_table_compact|/*+ OPTIONS('scan.mode' = 'compacted-full') */ --表需要开启full-compaction,设置changelog-producer和full-compaction.delta-commits|""".stripMargin).print()}
}

运行代码,可以看到如下结果:

+----+--------------------------------+-------------+
| op |                           name |         age |
+----+--------------------------------+-------------+
| +I |                           jack |          18 |
| +I |                           mick |          20 |
| +I |                            tom |          19 |
+----+--------------------------------+-------------+

由于目前每一次提交数据都会触发完全压缩,所以我们查询最新的完全压缩快照中的数据是可以获取到所有数据的。

此时可以通过系统表查看一下这个表的snapshot信息:
创建object:FlinkPaimonSystemTableForCompact

代码如下:

package tech.xuwei.paimon.queryimport org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment/*** 查询Paimon中的系统表* Created by xuwei*/
object FlinkPaimonSystemTableForCompact {def main(args: Array[String]): Unit = {//获取执行环境val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setRuntimeMode(RuntimeExecutionMode.BATCH)//使用批处理模式val tEnv = StreamTableEnvironment.create(env)//创建Paimon类型的CatalogtEnv.executeSql("""|CREATE CATALOG paimon_catalog WITH (|    'type'='paimon',|    'warehouse'='hdfs://bigdata01:9000/paimon'|)|""".stripMargin)tEnv.executeSql("USE CATALOG paimon_catalog")//snapshot信息表,对应的其实就是hdfs中表的snapshot目录下的snapshot-*文件信息println("====================snapshot信息表===========================")tEnv.executeSql("SELECT * FROM query_table_compact$snapshots").print()}
}

执行代码,可以看到如下结果

====================snapshot信息表===========================
+----+----------------------+----------------------+--------------------------------+----------------------+--------------------------------+-------------------------+----------------------+----------------------+------------------------+----------------------+
| op |          snapshot_id |            schema_id |                    commit_user |    commit_identifier |                    commit_kind |             commit_time |   total_record_count |   delta_record_count | changelog_record_count |            watermark |
+----+----------------------+----------------------+--------------------------------+----------------------+--------------------------------+-------------------------+----------------------+----------------------+------------------------+----------------------+
| +I |                    1 |                    0 | 38d8f3a4-aeb3-4072-90cf-421... |  9223372036854775807 |                         APPEND | 2023-07-28 17:57:07.293 |                    1 |                    1 |                      0 | -9223372036854775808 |
| +I |                    2 |                    0 | 38d8f3a4-aeb3-4072-90cf-421... |  9223372036854775807 |                        COMPACT | 2023-07-28 17:57:08.211 |                    3 |                    2 |                      1 | -9223372036854775808 |
| +I |                    3 |                    0 | 84203720-0e42-40a6-8202-642... |  9223372036854775807 |                         APPEND | 2023-07-28 17:57:09.423 |                    4 |                    1 |                      0 | -9223372036854775808 |
| +I |                    4 |                    0 | 84203720-0e42-40a6-8202-642... |  9223372036854775807 |                        COMPACT | 2023-07-28 17:57:09.641 |                    8 |                    4 |                      1 | -9223372036854775808 |
| +I |                    5 |                    0 | 25d0f600-076a-407f-a07a-caf... |  9223372036854775807 |                         APPEND | 2023-07-28 17:57:11.500 |                    9 |                    1 |                      0 | -9223372036854775808 |
| +I |                    6 |                    0 | 25d0f600-076a-407f-a07a-caf... |  9223372036854775807 |                        COMPACT | 2023-07-28 17:57:12.130 |                   15 |                    6 |                      1 | -9223372036854775808 |
+----+----------------------+----------------------+--------------------------------+----------------------+--------------------------------+-------------------------+----------------------+----------------------+------------------------+----------------------+

此时可以看到在commit_kind这一列中显示的有APPENDCOMPACT,表示这个快照是追加产生的还是完全压缩产生的。

由于我们配置的每一次提交数据都会触发完全压缩,所以对应的有3个完全压缩产生的快照。

为了便于验证,我们可以把最新的那个完全压缩的快照删除掉,再执行查询,看看结果是什么样的:

删除最新的完全压缩的快照:

[root@bigdata04 ~]# hdfs dfs -rm -r /paimon/default.db/query_table_compact/snapshot/snapshot-6

注意:这个删除操作建议大家在命令行执行,不要在web页面执行,在web页面删除可能会直接把这个表的目录删除掉!!!!!

然后再执行FlinkPaimonBatchQueryForCompact,结果如下:

+----+--------------------------------+-------------+
| op |                           name |         age |
+----+--------------------------------+-------------+
| +I |                           jack |          18 |
| +I |                            tom |          19 |
+----+--------------------------------+-------------+
2 rows in set

注意:此时最新的完全压缩的快照就是snapshot-4了,这个快照中只有2条数据。

这就是批量读取中时间旅行参数的使用。

(3)流式读取

下面演示一下如何在流式读取中使用时间旅行功能。

创建object:FlinkPaimonStreamingQuery

代码如下:

package tech.xuwei.paimon.queryimport org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment/*** 流式读取* Created by xuwei*/
object FlinkPaimonStreamingQuery {def main(args: Array[String]): Unit = {//获取执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment//SET 'execution.runtime-mode' = 'streaming';env.setRuntimeMode(RuntimeExecutionMode.STREAMING)//使用流处理模式val tEnv = StreamTableEnvironment.create(env)//创建Paimon类型的CatalogtEnv.executeSql("""|CREATE CATALOG paimon_catalog WITH (|    'type'='paimon',|    'warehouse'='hdfs://bigdata01:9000/paimon'|)|""".stripMargin)tEnv.executeSql("USE CATALOG paimon_catalog")//流式查询数据tEnv.executeSql("""|SELECT * FROM query_table|-- /*+ OPTIONS('scan.mode'='latest-full') */ -- 默认策略,可以省略不写,第一次启动时读取最新快照中的所有数据,然后继续读取后续新增的变更数据|-- /*+ OPTIONS('scan.mode'='latest') */ -- 只读取最新的变更数据|-- /*+ OPTIONS('scan.mode'='from-snapshot','scan.snapshot-id' = '2') */ -- 从指定id的快照开始读取变更数据(包含后续新增)|-- /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '2') */ -- 第一次启动时读取指定id的快照中的所有数据,然后继续读取后续新增的变更数据|-- /*+ OPTIONS('scan.mode'='from-timestamp','scan.timestamp-millis' = '1690536924802') */ -- 从指定时间戳的快照开始读取变更数据(包含后续新增)|""".stripMargin).print()}
}
(4)Consumer ID

最后我们在流式读取这里扩展一个知识点:Consumer ID,这个功能是针对流式读取设计的。

相当于我们在kafka消费者中指定一个groupid,这样可以通过groupid维护消费数据的偏移量信息,便于任务停止以后重启的时候继续基于之前的进度进行查询。

在这里Consumer ID的主要作用是为了方便记录每次查询到的数据快照的位置,他会把下一个还未读取的快照id记录到hdfs文件中。
当之前的任务停止以后,新启动的任务可以基于之前任务记录的快照id继续查询数据,不需要从状态中恢复位置信息。

这个特性目前属于实验特性,还没有经过大量生产环境的验证,大家可以先提前了解一下。

下面来结合一个案例演示一下:
具体的思路是这样的:

  • 1:首先使用Consumer ID查询一次query_table表中的数据。
  • 2:然后停止之前的查询任务,向query_table表中模拟产生1条数据。
  • 3:重新启动第1步骤中的任务,验证一下是否只读取到了新增的那1条数据

创建object:FlinkPaimonStreamingQueryForConsumerid

代码如下:

package tech.xuwei.paimon.queryimport org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment/*** 流式读取* Created by xuwei*/
object FlinkPaimonStreamingQueryForConsumerid {def main(args: Array[String]): Unit = {//获取执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment//SET 'execution.runtime-mode' = 'streaming';env.setRuntimeMode(RuntimeExecutionMode.STREAMING)//使用流处理模式val tEnv = StreamTableEnvironment.create(env)//注意:在流处理模式中,操作Paimon表时需要开启Checkpoint。env.enableCheckpointing(5000)//创建Paimon类型的CatalogtEnv.executeSql("""|CREATE CATALOG paimon_catalog WITH (|    'type'='paimon',|    'warehouse'='hdfs://bigdata01:9000/paimon'|)|""".stripMargin)tEnv.executeSql("USE CATALOG paimon_catalog")//流式查询数据tEnv.executeSql("""|SELECT * FROM query_table|/*+ OPTIONS('consumer-id'='con-1') */ -- 指定消费者id|""".stripMargin).print()}
}

注意:在这需要开启checkpoint,否则Consumer ID的功能无法正常触发。

第一次执行此代码,可以看到如下结果:

+----+--------------------------------+-------------+
| op |                           name |         age |
+----+--------------------------------+-------------+
| +I |                           jack |          18 |
| +I |                           mick |          20 |
| +I |                            tom |          19 |

停止此代码。

此时其实可以到hdfs中查看一下维护的Consumer ID信息:

[root@bigdata04 ~]# hdfs dfs -cat /paimon/default.db/query_table/consumer/consumer-con-1
{"nextSnapshot" : 4
}

这里面记录的是下一次需要读取的快照id,数值为4,此时最新的快照id是3,因为快照id为3的快照已经读取过了,下一个快照id就是4了。

其实直接查询consumer系统表也是可以看到这些信息的。

创建object:FlinkPaimonSystemTableForConsumerid

代码如下:

package tech.xuwei.paimon.queryimport org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment/*** 查询Paimon中的系统表* Created by xuwei*/
object FlinkPaimonSystemTableForConsumerid {def main(args: Array[String]): Unit = {//获取执行环境val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setRuntimeMode(RuntimeExecutionMode.BATCH)//使用批处理模式val tEnv = StreamTableEnvironment.create(env)//创建Paimon类型的CatalogtEnv.executeSql("""|CREATE CATALOG paimon_catalog WITH (|    'type'='paimon',|    'warehouse'='hdfs://bigdata01:9000/paimon'|)|""".stripMargin)tEnv.executeSql("USE CATALOG paimon_catalog")//consumer信息表,在查询数据的sql语句中指定了consumer-id之后才能看到println("====================consumer信息表===========================")tEnv.executeSql("SELECT * FROM query_table$consumers").print()}
}

执行代码,可以看到如下结果:

====================consumer信息表===========================
+----+--------------------------------+----------------------+
| op |                    consumer_id |     next_snapshot_id |
+----+--------------------------------+----------------------+
| +I |                          con-1 |                    4 |
+----+--------------------------------+----------------------+
1 row in set

从这可以看出来,next_snapshot_id4,查出来的结果是一样的。

接下来我们向query_table中新增一条数据。

创建object:FlinkSQLWriteToPaimonForConsumerid

代码如下:

package tech.xuwei.paimon.queryimport org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment/*** 通过FlinkSQL 向Paimon中模拟写入数据* Created by xuwei*/
object FlinkSQLWriteToPaimonForConsumerid {def main(args: Array[String]): Unit = {//获取执行环境val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setRuntimeMode(RuntimeExecutionMode.STREAMING)val tEnv = StreamTableEnvironment.create(env)//创建Paimon类型的CatalogtEnv.executeSql("""|CREATE CATALOG paimon_catalog WITH (|    'type'='paimon',|    'warehouse'='hdfs://bigdata01:9000/paimon'|)|""".stripMargin)tEnv.executeSql("USE CATALOG paimon_catalog")//创建Paimon类型的表tEnv.executeSql("""|CREATE TABLE IF NOT EXISTS `query_table`(|    name STRING,|    age INT,|    PRIMARY KEY (name) NOT ENFORCED|)|""".stripMargin)//写入数据tEnv.executeSql("INSERT INTO query_table(name,age) VALUES('jessic',30)")}}

执行代码。

最后,我们再重新启动FlinkPaimonStreamingQueryForConsumerid,可以看到如下结果:

+----+--------------------------------+-------------+
| op |                           name |         age |
+----+--------------------------------+-------------+
| +I |                         jessic |          30 |

能看到这个结果,说明这个consumer id生效了,当我们第二次使用相同的consumer id读取这个表的时候,是可以基于之前的进度继续读取的。

停止此任务。

此时再执行FlinkPaimonSystemTableForConsumerid,查看最新的next_snapshot_id

====================consumer信息表===========================
+----+--------------------------------+----------------------+
| op |                    consumer_id |     next_snapshot_id |
+----+--------------------------------+----------------------+
| +I |                          con-1 |                    5 |
+----+--------------------------------+----------------------+
1 row in set

此时next_snapshot_id变成了5,这是正确的。

更多Paimon数据湖内容请关注:https://edu.51cto.com/course/35051.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/138436.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Spring Cloud之多级缓存

目录 传统缓存 多级缓存 JVM进程缓存 Caffeine 缓存驱逐策略 实现进程缓存 常用Lua语法 数据类型 变量声明 循环使用 定义函数 条件控制 安装OpenResty 实现Nginx业务逻辑编写 请求参数解析 实现lua访问tomcat JSON的序列化和反序列化 Tomcat的集群负载均衡 …

stm32超声波测距不准的解决方法(STM32 delay_us()产生1us)

首先要说明一下原理:使用stm32无法准确产生1us的时间,但是超声波测距一定要依赖时间,时间不准,距离一定不准,这是要肯定的,但是在不准确的情况下,要测量一个比较准确的时间,那么只能…

python中的异常与模块

异常 为了能够让代码可以正常的运行下去,不会因为某个语句而让程序崩溃,所以我们就需要使用异常,异常的语法格式如下: try:可能出现异常的语句 except:出现异常之后的处理同时python也是支持捕获指定异常的 try:可能出现异常的…

说说对Fiber架构的理解?解决了什么问题?

一、问题 JavaScript引擎和页面渲染引擎两个线程是互斥的,当其中一个线程执行时,另一个线程只能挂起等待 如果 JavaScript 线程长时间地占用了主线程,那么渲染层面的更新就不得不长时间地等待,界面长时间不更新,会导…

NIO 笔记(二)Netty框架专题

【笔记来自:it白马】 Netty框架 前面我们学习了Java为我们提供的NIO框架,提供使用NIO提供的三大组件,我们就可以编写更加高性能的客户端/服务端网络程序了,甚至还可以自行规定一种通信协议进行通信。 NIO框架存在的问题 但是之…

Centos7安装PostgreSQL 14

环境: Centos7安装PostgreSQL_14版本数据库; 打开官方网站:PostgreSQL: Linux downloads (Red Hat family) 一、 版本选择 复制、粘贴并运行如下脚本: 二、安装步骤 这些命令是在 CentOS 7.x 系统上安装和配置 PostgreSQL 14 的步…

最新大麦订单生成器 大麦订单图一键生成

1、8.6全新版 本次更新了四种订单模板生成 多模板自由切换 2、在软件中输入生成的信息,这里输入的是商品信息,选择生成的商品图片,最后生成即可 新版大麦订单生成 四种模板图样式展示 这个样式图就是在大麦生成完的一个订单截图&#xff…

Hadoop架构、Hive相关知识点及Hive执行流程

Hadoop架构 Hadoop由三大部分组成:HDFS、MapReduce、yarn HDFS:负责数据的存储 其中包括: namenode:主节点,用来分配任务给从节点 secondarynamenode:副节点,辅助主节点 datanode:从节点&#x…

出口美国操作要点汇总│走美国海运拼箱的注意事项│箱讯科技

01服务标准 美国的货物需要细致的服务,货物到港后的服务也是非常重要的。如果在货物到港15天内,如果没有报关行进行(PROCEED),货物就会进入了G.O.仓库,G.O.仓库的收费标准是非常高的。 02代理资格审核 美国航线除了各家船公司&a…

阿里云付费用户破100万 用户规模亚洲最大

导读阿里巴巴集团公布2018财年第一季度财报,阿里云达到一个重要里程碑,云计算付费用户数量首次超过100万,成为亚洲首家达到百万级用户规模的云计算公司。同时,企业级市场被云计算人工智能等新技术全面激活,推动该季度营…

企业年会/年终活动如何邀请媒体记者报道?

​媒体邀约是企业或组织进行宣传的重要手段之一。通过邀请媒体参加活动,可以增加活动的曝光度和知名度,吸引更多的关注和参与。同时,媒体报道还可以提高企业或组织的权威性和可信度,从而让公众更容易接受其传达的信息。 企业年会或…

MFC-TCP网络编程服务端-Socket

目录 1、通过Socket建立服务端: 2、UI设计: 3、代码的实现: (1)、CListenSocket类 (2)、CConnectSocket类 (3)、CTcpServerDlg类 1、通过Socket建立服务端&#xff…

大数据治理——为业务提供持续的、可度量的价值(一)

目录 大数据治理——为业务提供持续的、可度量的价值... 1 概述... 2 大数据治理系列... 2 第一部分:大数据治理统一流程模型概述和明确元数据管理策略... 2 第二部分:元数据集成体系结构... 15 第三部分:实施元数据管理... 25 第四部…

顺序图——画法详解

百度百科的定义: 顺序图是将交互关系表示为一个二维图。纵向是时间轴,时间沿竖线向下延伸。横向轴代表了在协作中各独立对象的类元角色。类元角色用生命线表示。当对象存在时,角色用一条虚线表示,当对象的过程处于激活状态时&…

AI机器人小奥,学习不再填鸭

在这个充满科技魅力的时代,一款专为孩子学习量身打造的AI机器人“小奥”正式与大家见面! 它是一款集全球领先的人工智能、语音识别、语义理解、情感陪伴为一体的高科技教育产品,旨在帮助孩子提高学习兴趣、拓宽知识面,以科技创新助…

人工智能基础——Python:Pillow与图像处理

人工智能的学习之路非常漫长,不少人因为学习路线不对或者学习内容不够专业而举步难行。不过别担心,我为大家整理了一份600多G的学习资源,基本上涵盖了人工智能学习的所有内容。点击下方链接,0元进群领取学习资源,让你的学习之路更加顺畅!记得…

AMD64内存属性详解

本文参考文档为AMD64 Architecture Programmer’s Manual Volume 2: System Programming,版本号3.41,这不是对原英文文档的翻译,但是所有内容的排版都是根据原手册的排版来的,如有与官方文档冲突的内容,以官方文档为准…

Jmeter_逻辑控制器

逻辑控制器 控制取样器执行顺序的组件实现(分支 循环) 分类 1、如果(if) 控制器 分支实现 2、forEach控制器 循环往复实现 3、循环控制器 循环往复实现 如果(if) 控制器 需求1:测试计划中定义一个 http 请求访问百度,但是该请求不是无条件执行的,…

基于ssm的校园快递物流管理系统(java+jsp+ssm+javabean+mysql+tomcat)

博主24h在线,想要源码文档部署视频直接私聊,9.9拿走! 基于javawebmysql的ssm校园快递物流管理系统(javajspssmjavabeanmysqltomcat) 运行环境: Java≥8、MySQL≥5.7、Tomcat≥8 开发工具: eclipse/idea/myeclipse/s…

2023年云计算发展趋势浅析

​​​​​​​ 云计算的概念 云计算是一种通过互联网提供计算资源和服务的模式。它允许用户通过网络访问和使用共享的计算资源,而无需拥有或管理这些资源的物理设备。云计算的核心理念是将计算能力、存储资源和应用程序提供给用户,以便随时随地根据需要…