1、时间轴(TimeLine)
Hudi的核心是维护表上在不同的即时时间(instants)执行的所有操作的时间轴(timeline),这有助于提供表的即时视图
一个instant由以下三个部分组成:
1)Instant action:在表上执行的操作类型
COMMITS:一次commit表示将一批数据原子性地写入一个表。
CLEANS:清除表中不再需要的旧版本文件的后台活动。
DELTA_COMMIT:增量提交指的是将一批数据原子性地写入一个MergeOnRead类型的表,其中部分或所有数据可以写入增量日志。
COMPACTION:合并Hudi内部差异数据结构的后台活动,例如:将更新操作从基于行的log日志文件合并到列式存储的数据文件。在内部,COMPACTION体现为timeline上的特殊提交。
ROLLBACK:表示当commit/delta_commit不成功时进行回滚,其会删除在写入过程中产生的部分文件。
SAVEPOINT:将某些文件组标记为已保存,以便其不会被删除。在发生灾难需要恢复数据的情况下,它有助于将数据集还原到时间轴上的某个点。
2)Instant time:时间
通常是一个时间戳(例如:20190117010349),它按照动作开始时间的顺序单调增加。
3)State:状态
REQUESTED:表示某个action已经调度,但尚未执行。
INFLIGHT:表示action当前正在执行。
COMPLETED:表示timeline上的action已经完成。
区分两个重要的时间概念:
Arrival time: 数据到达 Hudi 的时间,commit time。
Event time: record 中记录的时间。
10:20 来了一条 9:00 的数据,根据event time该数据仍然可以落到 9:00 对应的分区,通过 timeline 直接消费 10:00 (commit time)之后的增量更新(只消费有新 commits 的 group),那么这条延迟的数据仍然可以被消费到。
2、文件布局
Hudi存储分为两个部分:
(1)元数据:.hoodie目录对应着表的元数据信息,包括表的版本管理(Timeline)、归档目录(存放过时的instant也就是版本),一个instant记录了一次提交(commit)的行为、时间戳和状态,Hudi以时间轴的形式维护了在数据集上执行的所有操作的元数据;
(2)数据:和hive一样,以分区方式存放数据;分区里面存放着Base File(.parquet)和Log File(.log.*);
每个分区中,文件被组织成文件组,每个文件组包含几个文件片
每个文件片包含:
一个基本文件(.parquet):在某个commit/compaction即时时间(instant time)生成的(MOR可能没有)
多个日志文件(.log.*),这些日志文件包含自生成基本文件以来对基本文件的插入/更新(COW没有)
索引(Index)
给定的hoodie key(record key + partition path)与文件组id建立唯一映射。这种映射关系,数据第一次写入文件后保持不变
一个 FileGroup 包含了一批 record 的所有版本记录。Index 用于区分消息是 INSERT 还是 UPDATE。
索引选项
Bloom Index 默认配置,使用布隆过滤器来判断记录存在与否
Simple Index 性能比较差
HBase Index 把index存放在HBase里面,对于小批次的keys,查询效率高
Flink State-based
Index
Flink只有一种state based index(和bucket_index),其他index是Spark可选配置。
全局索引:全局索引在全表的所有分区范围下强制要求键的唯一性,但是随着表增大,update/delete 操作损失的性能越高,因此更适用于小表。
非全局索引:默认的索引实现,只能保证数据在分区的唯一性。更适用于大表。
HBase索引本质上是一个全局索引,bloom和simple index都有全局选项:
hoodie.index.type=GLOBAL_BLOOM
hoodie.index.type=GLOBAL_SIMPLE
对维度表的随机更删,使用简单索引对此场景更合适,如果额外的运维成本可以接受的话,也可以采用HBase索引
表类型
Copy On Write
在COW表中,只有数据文件/基本文件(.parquet),没有增量日志文件(.log.*)。
对每一个新批次写入都将创建相应数据文件的新版本(新的FileSlice),新版本文件包括旧版本文件的记录以及来自传入批次的记录(全量最新)。
data_file1 和 data_file2 都将创建更新的版本,data_file1 V2 是data_file1 V1 的内容与data_file1 中传入批次匹配记录的记录合并。
由于在写入期间进行合并,COW 会产生一些写入延迟。
Merge On Read
MOR表中,包含列存的基本文件(.parquet)和行存的增量日志文件(基于行的avro格式,.log.*)
MOR表的合并成本在读取端。因此在写入期间我们不会合并或创建较新的数据文件版本。
每次的读取延迟都比较高
查询类型
1)Snapshot Queries
快照查询,可以查询指定commit/delta commit即时操作后表的最新快照。
2)Incremental Queries
增量查询,可以查询给定commit/delta commit即时操作以来新写入的数据。
3)Read Optimized Queries
读优化查询,可查看给定的commit/compact即时操作的表的最新快照。
Read Optimized Queries是对Merge On Read表类型快照查询的优化。
写流程(INSERT)
1)Copy On Write
(1)先对 records 按照 record key 去重(可选)
(2)不会创建 Index
(3)如果有小的 base file 文件,merge base file,生成新的 FileSlice + base file,否则直接写新的 FileSlice + base file
2)Merge On Read
(1)先对 records 按照 record key 去重(可选)
(2)不会创建 Index
(3)如果 log file 可索引,并且有小的 FileSlice,尝试追加或写最新的 log file;如果 log file 不可索引,写一个新的 FileSlice + base file
通过对写流程的梳理可以了解到 Apache Hudi 相对于其他数据湖方案的核心优势:
(1)写入过程充分优化了文件存储的小文件问题,Copy On Write 写会一直将一个 bucket (FileGroup)的 base 文件写到设定的阈值大小才会划分新的 bucket;Merge On Read 写在同一个 bucket 中,log file 也是一直 append 直到大小超过设定的阈值 roll over。
(2)对 UPDATE 和 DELETE 的支持非常高效,一条 record 的整个生命周期操作都发生在同一个 bucket,不仅减少小文件数量,也提升了数据读取的效率(不必要的 join 和 merge)。
Compaction
(1)没有 base file:走 copy on write insert 流程,直接 merge 所有的 log file 并写 base file
(2)有 base file:走 copy on write upsert 流程,先读 log file 建 index,再读 base file,最后读 log file 写新的 base file
Flink 和 Spark streaming 的 writer 都可以 apply 异步的 compaction 策略,按照间隔 commits 数或者时间来触发 compaction 任务,在独立的 pipeline 中执行。
flink操作hudi
插入数据
set sql-client.execution.result-mode=tableau;
-- 创建hudi表
CREATE TABLE t1(
uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = 'hdfs://hadoop1:8020/tmp/hudi_flink/t1',
'table.type' = 'MERGE_ON_READ' –- 默认是COW
);
-- 插入数据
INSERT INTO t1 VALUES
('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1');
查询数据
select * from t1;
更新数据
insert into t1 values
('id1','Danny',27,TIMESTAMP '1970-01-01 00:00:01','par1');
编写代码
package com.atguigu.hudi.flink;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.contrib.streaming.state.PredefinedOptions;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import java.util.concurrent.TimeUnit;
public class HudiDemo {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置状态后端RocksDB
EmbeddedRocksDBStateBackend embeddedRocksDBStateBackend = new EmbeddedRocksDBStateBackend(true);
embeddedRocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM);
env.setStateBackend(embeddedRocksDBStateBackend);
// checkpoint配置
env.enableCheckpointing(TimeUnit.SECONDS.toMillis(30), CheckpointingMode.EXACTLY_ONCE);
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setCheckpointStorage("hdfs://hadoop1:8020/ckps");
checkpointConfig.setMinPauseBetweenCheckpoints(TimeUnit.SECONDS.toMillis(20));
checkpointConfig.setTolerableCheckpointFailureNumber(5);
checkpointConfig.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(1));
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
StreamTableEnvironment sTableEnv = StreamTableEnvironment.create(env);
sTableEnv.executeSql("CREATE TABLE sourceT (\n" +
" uuid varchar(20),\n" +
" name varchar(10),\n" +
" age int,\n" +
" ts timestamp(3),\n" +
" `partition` varchar(20)\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second' = '1'\n" +
")");
sTableEnv.executeSql("create table t2(\n" +
" uuid varchar(20),\n" +
" name varchar(10),\n" +
" age int,\n" +
" ts timestamp(3),\n" +
" `partition` varchar(20)\n" +
")\n" +
"with (\n" +
" 'connector' = 'hudi',\n" +
" 'path' = '/tmp/hudi_flink/t2',\n" +
" 'table.type' = 'MERGE_ON_READ'\n" +
")");
sTableEnv.executeSql("insert into t2 select * from sourceT");
}
}
去重参数
通过如下语法设置主键:
-- 设置单个主键
create table hoodie_table (
f0 int primary key not enforced,
f1 varchar(20),
...
) with (
'connector' = 'hudi',
...
)
-- 设置联合主键
create table hoodie_table (
f0 int,
f1 varchar(20),
...
primary key(f0, f1) not enforced
) with (
'connector' = 'hudi',
...
)
流读(Streaming Query)
当前表默认是快照读取,即读取最新的全量快照数据并一次性返回。通过参数 read.streaming.enabled 参数开启流读模式,通过 read.start-commit 参数指定起始消费位置,支持指定 earliest 从最早消费。
CREATE TABLE t5(
uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
) WITH (
'connector' = 'hudi',
'path' = 'hdfs://hadoop1:8020/tmp/hudi_flink/t5',
'table.type' = 'MERGE_ON_READ',
'read.streaming.enabled' = 'true',
'read.streaming.check-interval' = '4' -- 默认60s
);
insert into t5 select * from sourceT;
select * from t5;
增量读取(Incremental Query)
从 0.10.0 开始支持。
如果有增量读取 batch 数据的需求,增量读取包含三种场景。
(1)Stream 增量消费,通过参数 read.start-commit 指定起始消费位置;
(2)Batch 增量消费,通过参数 read.start-commit 指定起始消费位置,通过参数 read.end-commit 指定结束消费位置,区间为闭区间,即包含起始、结束的 commit
(3)TimeTravel:Batch 消费某个时间点的数据:通过参数 read.end-commit 指定结束消费位置即可(由于起始位置默认从最新,所以无需重复声明)
flink读取kafka数据并写入hudi数据湖
(1)创建kafka源表
create table stu3_binlog_source_kafka(
id bigint not null,
name string,
school string,
nickname string,
age int not null,
class_num int not null,
phone bigint not null,
email string,
ip string
) with (
'connector' = 'kafka',
'topic' = 'cdc_mysql_stu3_sink',
'properties.bootstrap.servers' = 'hadoop1:9092',
'format' = 'json',
'scan.startup.mode' = 'earliest-offset',
'properties.group.id' = 'testGroup'
);
(2)创建hudi目标表
create table stu3_binlog_sink_hudi(
id bigint not null,
name string,
`school` string,
nickname string,
age int not null,
class_num int not null,
phone bigint not null,
email string,
ip string,
primary key (id) not enforced
)
partitioned by (`school`)
with (
'connector' = 'hudi',
'path' = 'hdfs://hadoop1:8020/tmp/hudi_flink/stu3_binlog_sink_hudi',
'table.type' = 'MERGE_ON_READ',
'write.option' = 'insert',
'write.precombine.field' = 'school'
);
(3)将kafka数据写入到hudi中
insert into stu3_binlog_sink_hudi
select * from stu3_binlog_source_kafka;
离线批量导入
如果存量数据来源于其他数据源,可以使用批量导入功能,快速将存量数据导成 Hoodie 表格式。
1)原理
(1)批量导入省去了 avro 的序列化以及数据的 merge 过程,后续不会再有去重操作,数据的唯一性需要自己来保证。
(2)bulk_insert 需要在 Batch Execuiton Mode 下执行更高效,Batch 模式默认会按照 partition path 排序输入消息再写入 Hoodie,避免 file handle 频繁切换导致性能下降。
SET execution.runtime-mode = batch;
SET execution.checkpointing.interval = 0;
(3)bulk_insert write task 的并发通过参数 write.tasks 指定,并发的数量会影响到小文件的数量,理论上,bulk_insert write task 的并发数就是划分的 bucket 数,当然每个 bucket 在写到文件大小上限(parquet 120 MB)的时候会 roll over 到新的文件句柄,所以最后:写文件数量 >= bulk_insert write task 数。
案例
Flink SQL client创建hudi表
create table stu4_sink_hudi(
id bigint not null,
name string,
`school` string,
nickname string,
age int not null,
score decimal(4,2) not null,
class_num int not null,
phone bigint not null,
email string,
ip string,
primary key (id) not enforced
)
partitioned by (`school`)
with (
'connector' = 'hudi',
'path' = 'hdfs://hadoop1:8020/tmp/hudi_flink/stu4_sink_hudi',
'table.type' = 'MERGE_ON_READ',
'write.option' = 'bulk_insert',
'write.precombine.field' = 'school'
);
Flink SQL client执行mysql数据插入到hudi中
insert into stu4_sink_hudi select * from stu4;
全量接增量
如果已经有全量的离线 Hoodie 表,需要接上实时写入,并且保证数据不重复,可以开启 index bootstrap 功能。
如果觉得流程冗长,可以在写入全量数据的时候资源调大直接走流模式写,全量走完接新数据再将资源调小(或者开启限流功能)。
(1) CREATE TABLE 创建和 Hoodie 表对应的语句,注意 table type 要正确
(2)设置 index.bootstrap.enabled = true开启索引加载功能
(6)重启任务将 index.bootstrap.enabled 关闭,参数配置到合适的大小,如果RowDataToHoodieFunction 和 BootstrapFunction 并发不同,可以重启避免 shuffle
Changelog 模式
如果希望 Hoodie 保留消息的所有变更(I/-U/U/D),之后接上 Flink 引擎的有状态计算实现全链路近实时数仓生产(增量计算),Hoodie 的 MOR 表通过行存原生支持保留消息的所有变更(format 层面的集成),通过流读 MOR 表可以消费到所有的变更记录。
changelog.enabled | false | false | 默认是关闭状态,即 UPSERT 语义,所有的消息仅保证最后一条合并消息,中间的变更可能会被 merge 掉;改成 true 支持消费所有变更。 |
批(快照)读仍然会合并所有的中间结果,不管 format 是否已存储中间状态。
案例演示
(1)使用changelog
set sql-client.execution.result-mode=tableau;
CREATE TABLE t6(
id int,
ts int,
primary key (id) not enforced
) WITH (
'connector' = 'hudi',
'path' = 'hdfs://hadoop1:8020/tmp/hudi_flink/t6',
'table.type' = 'MERGE_ON_READ',
'read.streaming.enabled' = 'true',
'read.streaming.check-interval' = '4',
'changelog.enabled' = 'true'
);
insert into t6 values (1,1);
insert into t6 values (1,2);
set table.dynamic-table-options.enabled=true;
select * from t6/*+ OPTIONS('read.start-commit'='earliest')*/;
select count(*) from t6/*+ OPTIONS('read.start-commit'='earliest')*/;
Hudi Catalog
从 0.12.0 开始支持,通过 catalog 可以管理 flink 创建的表,避免每次使用都要重复建表操作,另外 hms 模式的 catalog 支持自动补全 hive 同步参数。
DFS 模式 Catalog SQL样例:
CREATE CATALOG hoodie_catalog
WITH (
'type'='hudi',
'catalog.path' = '${catalog 的默认路径}',
'mode'='dfs'
);
Hms 模式 Catalog SQL 样例:
CREATE CATALOG hoodie_catalog
WITH (
'type'='hudi',
'catalog.path' = '${catalog 的默认路径}',
'hive.conf.dir' = '${hive-site.xml 所在的目录}',
'mode'='hms' -- 支持 'dfs' 模式通过文件系统管理表属性
);
离线 Compaction
MOR 表的 compaction 默认是自动打开的,策略是 5 个 commits 执行一次压缩。 因为压缩操作比较耗费内存,和写流程放在同一个 pipeline,在数据量比较大的时候(10w+/s qps),容易干扰写流程,此时采用离线定时任务的方式执行 compaction 任务更稳定。
设置参数
compaction.async.enabled 为 false,关闭在线 compaction。
compaction.schedule.enabled 仍然保持开启,由写任务阶段性触发压缩 plan。
命令行的方式
./bin/flink run -c org.apache.hudi.sink.compact.HoodieFlinkCompactor lib/hudi-flink1.13-bundle-0.12.0.jar --path hdfs://xxx:8020/table
离线 Clustering
异步的 clustering 相对于 online 的 async clustering 资源隔离,从而更加稳定。
5.14.1 设置参数
clustering.async.enabled 为 false,关闭在线 clustering。
clustering.schedule.enabled 仍然保持开启,由写任务阶段性触发 clustering plan。
命令行的方式
./bin/flink run -c org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob lib/hudi-flink1.13-bundle-0.12.0.jar --path hdfs://xxx:8020/table
常见基础问题
5.15.1 存储一直看不到数据
如果是 streaming 写,请确保开启 checkpoint,Flink 的 writer 有 3 种刷数据到磁盘的策略:
当某个 bucket 在内存积攒到一定大小 (可配,默认 64MB)
当总的 buffer 大小积攒到一定大小(可配,默认 1GB)
当 checkpoint 触发,将内存里的数据全部 flush 出去
5.15.2 数据有重复
如果是 COW 写,需要开启参数 write.insert.drop.duplicates,COW 写每个 bucket 的第一个文件默认是不去重的,只有增量的数据会去重,全局去重需要开启该参数;MOR 写不需要开启任何参数,定义好 primary key 后默认全局去重。(注意:从 0.10 版本开始,该属性改名 write.precombine 并且默认为 true。)
如果需要多 partition 去重,需要开启参数: index.global.enabled 为 true。(注意:从 0.10 版本开始,该属性默认为 true。)
索引 index 是判断数据重复的核心数据结构,index.state.ttl 设置了索引保存的时间,默认为 1.5 天,对于长时间周期的更新,比如更新一个月前的数据,需要将 index.state.ttl 调大(单位天),设置小于 0 代表永久保存。(注意:从 0.10 版本开始,该属性默认为 0。)
5.15.3 Merge On Read 写只有 log 文件
Merge On Read 默认开启了异步的 compaction,策略是 5 个 commits 压缩一次,当条件满足参会触发压缩任务,另外,压缩本身因为耗费资源,所以不一定能跟上写入效率,可能会有滞后。
可以先观察 log,搜索 compaction 关键词,看是否有 compact 任务调度:
After filtering, Nothing to compact for 关键词说明本次 compaction strategy 是不做压缩。
集成 Hive
Hudi 源表对应一份 HDFS 数据,通过 Spark,Flink 组件或者 Hudi CLI,可以将 Hudi 表的数据映射为 Hive 外部表,基于该外部表, Hive可以方便的进行实时视图,读优化视图以及增量视图的查询。
6.1 集成步骤
以 hive3.1.2、hudi 0.12.0为例,其他版本类似。
1)拷贝编译好的jar包
将 hudi-hadoop-mr-bundle-0.12.0.jar , hudi-hive-sync-bundle-0.12.0.jar 放到 hive 节点的lib目录下;
cp /opt/software/hudi-0.12.0/packaging/hudi-hadoop-mr-bundle/target/hudi-hadoop-mr-bundle-0.12.0.jar /opt/module/hive/lib/
cp /opt/software/hudi-0.12.0/packaging/hudi-hive-sync-bundle/target/hudi-hive-sync-bundle-0.12.0.jar /opt/module/hive/lib/
2)配置完后重启 hive
// 按照需求选择合适的方式重启
nohup hive --service metastore &
nohup hive --service hiveserver2 &
Hive 同步
6.2.1 Flink 同步Hive
1)使用方式
Flink hive sync 现在支持两种 hive sync mode, 分别是 hms 和 jdbc 模式。 其中 hms 只需要配置 metastore uris;而 jdbc 模式需要同时配置 jdbc 属性 和 metastore uris,具体配置模版如下:
## hms mode 配置
CREATE TABLE t1(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
with(
'connector'='hudi',
'path' = 'hdfs://xxx.xxx.xxx.xxx:9000/t1',
'table.type'='COPY_ON_WRITE', -- MERGE_ON_READ方式在没生成 parquet 文件前,hive不会有输出
'hive_sync.enable'='true', -- required,开启hive同步功能
'hive_sync.table'='${hive_table}', -- required, hive 新建的表名
'hive_sync.db'='${hive_db}', -- required, hive 新建的数据库名
'hive_sync.mode' = 'hms', -- required, 将hive sync mode设置为hms, 默认jdbc
'hive_sync.metastore.uris' = 'thrift://ip:9083' -- required, metastore的端口
);
Flink 使用 HiveCatalog
6.3.1 直接使用Hive Catalog
1)上传hive connector到flink的lib中
hive3.1.3的connector存在guava版本冲突,需要解决:官网下载connector后,用压缩软件打开jar包,删除/com/google文件夹。处理完后上传flink的lib中。
2)解决与hadoop的冲突
避免与hadoop的冲突,拷贝hadoop-mapreduce-client-core-3.1.3.jar到flink的lib中(5.2.1已经做过)
3)创建catalog
CREATE CATALOG hive_catalog
WITH (
'type' = 'hive',
'default-database' = 'default',
'hive-conf-dir' = '/opt/module/hive/conf',
'hadoop-conf-dir'='/opt/module/hadoop-3.1.3/etc/hadoop'
);
use catalog hive_catalog;
-- hive-connector内置了hive module,提供了hive自带的系统函数
load module hive with ('hive-version'='3.1.2');
show modules;
show functions;
-- 可以调用hive的split函数
select split('a,b', ',');
查询 Hive 外表
6.5.1 设置参数
使用 Hive 查询 Hudi 表前,需要通过set命令设置 hive.input.format,否则会出现数据重复,查询异常等错误,如下面这个报错就是典型的没有设置 hive.input.format 导致的:
java.lang.IllegalArgumentException: HoodieRealtimeReader can oly work on RealTimeSplit and not with xxxxxxxxxx
除此之外对于增量查询,还需要 set 命令额外设置3个参数。
set hoodie.mytableName.consume.mode=INCREMENTAL;
set hoodie.mytableName.consume.max.commits=3;
set hoodie.mytableName.consume.start.timestamp=commitTime;
注意这3个参数是表级别参数。
COW 表查询
这里假设同步的 Hive 外表名为 hudi_cow。
1)实时视图
设置 hive.input.format 为以下两个之一:
org.apache.hadoop.hive.ql.io.HiveInputFormat
org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat
像普通的hive表一样查询即可:
set hive.input.format= org.apache.hadoop.hive.ql.io.HiveInputFormat;
select count(*) from hudi_cow;
2)增量视图
除了要设置 hive.input.format,还需要设置上述的3个增量查询参数,且增量查询语句中的必须添加 where 关键字并将 `_hoodie_commit_time > 'startCommitTime' 作为过滤条件(这地方主要是hudi的小文件合并会把新旧commit的数据合并成新数据,hive是没法直接从parquet文件知道哪些是新数据哪些是老数据)
set hive.input.format= org.apache.hadoop.hive.ql.io.HiveInputFormat;
set hoodie.hudicow.consume.mode= INCREMENTAL;
set hoodie.hudicow.consume.max.commits=3;
set hoodie.hudicow.consume.start.timestamp= xxxx;
select count(*) from hudicow where `_hoodie_commit_time`>'xxxx'
-- (这里注意`_hoodie_commit_time` 的引号是反引号(tab键上面那个)不是单引号, 'xxxx'是单引号)
6.5.3 MOR 表查询
这里假设 MOR 类型 Hudi 源表的表名为hudi_mor,映射为两张 Hive 外部表hudi_mor_ro(ro表)和 hudi_mor_rt(rt表)。
1)实时视图
设置了 hive.input.format 之后,即可查询到Hudi源表的最新数据
set hive.input.format= org.apache.hadoop.hive.ql.io.HiveInputFormat;
select * from hudicow_rt;
2)读优化视图
ro 表全称 read oprimized table,对于 MOR 表同步的 xxx_ro 表,只暴露压缩后的 parquet。其查询方式和COW表类似。设置完 hiveInputFormat 之后 和普通的 Hive 表一样查询即可。
3)增量视图
这个增量查询针对的rt表,不是ro表。同 COW 表的增量查询类似:
set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat; // 这地方指定为HoodieCombineHiveInputFormat
set hoodie.hudimor.consume.mode=INCREMENTAL;
set hoodie.hudimor.consume.max.commits=-1;
set hoodie.hudimor.consume.start.timestamp=xxxx;
select * from hudimor_rt where `_hoodie_commit_time`>'xxxx';// 这个表名要是rt表
索引
说明:
set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;最好只用于 rt 表的增量查询 当然其他种类的查询也可以设置为这个,这个参数会影响到普通的hive表查询,因此在rt表增量查询完成后,应该设置 set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; 或者改为默认值set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; 用于其他表的查询。
set hoodie.mytableName.consume.mode=INCREMENTAL; 仅用于该表的增量查询模式,若要对该表切换为其他查询模式,应设置set hoodie.hudisourcetablename.consume.mode=SNAPSHOT;
湖仓一体的优势如下:
流数仓架构本质上有两个痛点:实时/离线计算层不统一;实时/离线存储层不统一。
减少数据冗余:湖仓一体提单一的数据存储平台,减少了数据的冗余和重复,避免了维护多个存储系统的成本和时间。
成本效益:湖仓一体利用低成本的对象存储实现高效益的数据存储,降低了存储成本,并避免了维护多个数据存储系统的成本。
事务支持:湖仓体支持 ACID 事务,确保了多方同时读取或写入数据的一致性。
Schema的实施和治理:湖仓一体支持Schema的实施和演化,确保数据的完整性,并提供了强大的治理和审计机制。
开放性:湖仓一体采用开放和标准化的存储格式,如Parquet,可以让各种工具和引擎直接访问数据。
存算分离:湖仓一体将存储和计算解耦,可以横向扩展到更大规模和更多并发用户。
支持多种工作负载:湖仓一体支持数据科学、机器学习、SQL和数据分析等各种工作负载,减少了需要维护多个工具的成本。
端到端的流计算支持:湖仓一体支持流计算,实时/离线存储层统一,实现实时报告的需求,避免了使用单独系统来实时数据应用程序的需求。