hudi数据湖万字全方位教程+应用示例

1时间轴(TimeLine

Hudi的核心是维护表上在不同的即时时间(instants执行的所有操作的时间轴(timeline,这有助于提供表的即时视图

一个instant由以下三个部分组成:

1Instant action:在表上执行的操作类型

COMMITS:一次commit表示将一批数据原子性地写入一个表。

CLEANS:清除表中不再需要的旧版本文件的后台活动。

DELTA_COMMIT:增量提交指的是将一批数据原子性地写入一个MergeOnRead类型的表,其中部分或所有数据可以写入增量日志。

COMPACTION:合并Hudi内部差异数据结构的后台活动,例如:将更新操作从基于行的log日志文件合并到列式存储的数据文件。在内部,COMPACTION体现为timeline上的特殊提交。

ROLLBACK:表示当commit/delta_commit不成功时进行回滚,其会删除在写入过程中产生的部分文件。

SAVEPOINT:将某些文件组标记为已保存,以便其不会被删除。在发生灾难需要恢复数据的情况下,它有助于将数据集还原到时间轴上的某个点。

2Instant time:时间

通常是一个时间戳(例如:20190117010349),它按照动作开始时间的顺序单调增加。

3State:状态

REQUESTED:表示某个action已经调度,但尚未执行。

INFLIGHT:表示action当前正在执行

        COMPLETED:表示timeline上的action已经完成

区分两个重要的时间概念:

Arrival time: 数据到达 Hudi 的时间,commit time。

Event time: record 中记录的时间。

10:20 来了一条 9:00 的数据,根据event time该数据仍然可以落到 9:00 对应的分区,通过 timeline 直接消费 10:00 (commit time)之后的增量更新(只消费有新 commits 的 group),那么这条延迟的数据仍然可以被消费到

2、文件布局

Hudi存储分为两个部分:

(1)元数据.hoodie目录对应着表的元数据信息,包括表的版本管理(Timeline)、归档目录(存放过时的instant也就是版本),一个instant记录了一次提交(commit)的行为、时间戳和状态,Hudi以时间轴的形式维护了在数据集上执行的所有操作的元数据;

(2)数据:和hive一样,以分区方式存放数据分区里面存放着Base File.parquet)和Log File.log.*

每个分区中,文件被组织成文件组,每个文件组包含几个文件片

每个文件片包含:

一个基本文件(.parquet):在某个commit/compaction即时时间(instant time)生成的(MOR可能没有)

多个日志文件(.log.*),这些日志文件包含自生成基本文件以来对基本文件的插入/更新(COW没有

索引(Index

给定的hoodie key(record key + partition path)与文件组id建立唯一映射。这种映射关系,数据第一次写入文件后保持不变

一个 FileGroup 包含了一批 record 的所有版本记录。Index 用于区分消息是 INSERT 还是 UPDATE。

索引选项

Bloom Index 默认配置,使用布隆过滤器来判断记录存在与否

Simple Index  性能比较差

HBase Index 把index存放在HBase里面,对于小批次的keys,查询效率高

Flink State-based

Index

Flink只有一种state based index(和bucket_index),其他index是Spark可选配置。

全局索引:全局索引在全表的所有分区范围下强制要求键的唯一性,但是随着表增大,update/delete 操作损失的性能越高,因此更适用于小表

非全局索引:默认的索引实现,只能保证数据在分区的唯一性。更适用于大表

HBase索引本质上是一个全局索引,bloom和simple index都有全局选项:

hoodie.index.type=GLOBAL_BLOOM

hoodie.index.type=GLOBAL_SIMPLE

对维度表的随机更删,使用简单索引对此场景更合适,如果额外的运维成本可以接受的话,也可以采用HBase索引

表类型

Copy On Write

在COW表中,只有数据文件/基本文件(.parquet,没有增量日志文件(.log.*

对每一个新批次写入都将创建相应数据文件的新版本(新的FileSlice),新版本文件包括旧版本文件的记录以及来自传入批次的记录(全量最新)。

data_file1 和 data_file2 都将创建更新的版本,data_file1 V2 data_file1 V1 的内容与data_file1 中传入批次匹配记录的记录合并。

由于在写入期间进行合并,COW 会产生一些写入延迟

Merge On Read

MOR表中,包含列存的基本文件(.parquet)和行存的增量日志文件(基于行的avro格式,.log.*

MOR表的合并成本在读取端。因此在写入期间我们不会合并或创建较新的数据文件版本。

每次的读取延迟都比较高

查询类型

1Snapshot Queries

快照查询,可以查询指定commit/delta commit即时操作后表的最新快照。

2Incremental Queries

增量查询,可以查询给定commit/delta commit即时操作以来新写入的数据。

3Read Optimized Queries

读优化查询,可查看给定的commit/compact即时操作的表的最新快照。

Read Optimized Queries是对Merge On Read表类型快照查询的优化。

写流程(INSERT

1Copy On Write

(1)先对 records 按照 record key 去重(可选)

(2)不会创建 Index

(3)如果有小的 base file 文件,merge base file,生成新的 FileSlice + base file,否则直接写新的 FileSlice + base file

2Merge On Read

(1)先对 records 按照 record key 去重(可选)

(2)不会创建 Index

(3)如果 log file 可索引,并且有小的 FileSlice,尝试追加或写最新的 log file;如果 log file 不可索引,写一个新的 FileSlice + base file

通过对写流程的梳理可以了解到 Apache Hudi 相对于其他数据湖方案的核心优势:

(1)写入过程充分优化了文件存储的小文件问题,Copy On Write 写会一直将一个 bucket (FileGroup)的 base 文件写到设定的阈值大小才会划分新的 bucket;Merge On Read 写在同一个 bucket 中,log file 也是一直 append 直到大小超过设定的阈值 roll over。

(2)对 UPDATE 和 DELETE 的支持非常高效,一条 record 的整个生命周期操作都发生在同一个 bucket,不仅减少小文件数量,也提升了数据读取的效率(不必要的 join 和 merge)。

Compaction

(1)没有 base file:走 copy on write insert 流程,直接 merge 所有的 log file 并写 base file

(2)有 base file:走 copy on write upsert 流程,先读 log file 建 index,再读 base file,最后读 log file 写新的 base file

Flink 和 Spark streaming 的 writer 都可以 apply 异步的 compaction 策略,按照间隔 commits 数或者时间来触发 compaction 任务,在独立的 pipeline 中执行。

flink操作hudi

插入数据

set sql-client.execution.result-mode=tableau;

-- 创建hudi

CREATE TABLE t1(

  uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,

  name VARCHAR(10),

  age INT,

  ts TIMESTAMP(3),

  `partition` VARCHAR(20)

)

PARTITIONED BY (`partition`)

WITH (

  'connector' = 'hudi',

  'path' = 'hdfs://hadoop1:8020/tmp/hudi_flink/t1',

  'table.type' = 'MERGE_ON_READ' –- 默认是COW

);

-- 插入数据

INSERT INTO t1 VALUES

  ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1');

查询数据

select * from t1;

更新数据

insert into t1 values

  ('id1','Danny',27,TIMESTAMP '1970-01-01 00:00:01','par1');

编写代码

package com.atguigu.hudi.flink;

 

import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;

import org.apache.flink.contrib.streaming.state.PredefinedOptions;

import org.apache.flink.streaming.api.CheckpointingMode;

import org.apache.flink.streaming.api.environment.CheckpointConfig;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

 

import java.util.concurrent.TimeUnit;

 

 

public class HudiDemo {

    public static void main(String[] args) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

 

        // 设置状态后端RocksDB

        EmbeddedRocksDBStateBackend embeddedRocksDBStateBackend = new EmbeddedRocksDBStateBackend(true);

        embeddedRocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM);

        env.setStateBackend(embeddedRocksDBStateBackend);

 

        // checkpoint配置

        env.enableCheckpointing(TimeUnit.SECONDS.toMillis(30), CheckpointingMode.EXACTLY_ONCE);

        CheckpointConfig checkpointConfig = env.getCheckpointConfig();

        checkpointConfig.setCheckpointStorage("hdfs://hadoop1:8020/ckps");

        checkpointConfig.setMinPauseBetweenCheckpoints(TimeUnit.SECONDS.toMillis(20));

        checkpointConfig.setTolerableCheckpointFailureNumber(5);

        checkpointConfig.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(1));

                checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

 

        StreamTableEnvironment sTableEnv = StreamTableEnvironment.create(env);

 

        sTableEnv.executeSql("CREATE TABLE sourceT (\n" +

                "  uuid varchar(20),\n" +

                "  name varchar(10),\n" +

                "  age int,\n" +

                "  ts timestamp(3),\n" +

                "  `partition` varchar(20)\n" +

                ") WITH (\n" +

                "  'connector' = 'datagen',\n" +

                "  'rows-per-second' = '1'\n" +

                ")");

 

        sTableEnv.executeSql("create table t2(\n" +

                "  uuid varchar(20),\n" +

                "  name varchar(10),\n" +

                "  age int,\n" +

                "  ts timestamp(3),\n" +

                "  `partition` varchar(20)\n" +

                ")\n" +

                "with (\n" +

                "  'connector' = 'hudi',\n" +

                "  'path' = '/tmp/hudi_flink/t2',\n" +

                "  'table.type' = 'MERGE_ON_READ'\n" +

                ")");

 

        sTableEnv.executeSql("insert into t2 select * from sourceT");

 

    }

}

 

 

 

 

去重参数

通过如下语法设置主键:

-- 设置单个主键

create table hoodie_table (

  f0 int primary key not enforced,

  f1 varchar(20),

  ...

) with (

  'connector' = 'hudi',

  ...

)

 

-- 设置联合主键

create table hoodie_table (

  f0 int,

  f1 varchar(20),

  ...

  primary key(f0, f1) not enforced

) with (

  'connector' = 'hudi',

  ...

)

 

 

流读(Streaming Query

当前表默认是快照读取,即读取最新的全量快照数据并一次性返回。通过参数 read.streaming.enabled 参数开启流读模式,通过 read.start-commit 参数指定起始消费位置,支持指定 earliest 从最早消费。

CREATE TABLE t5(

  uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,

  name VARCHAR(10),

  age INT,

  ts TIMESTAMP(3),

  `partition` VARCHAR(20)

) WITH (

  'connector' = 'hudi',

  'path' = 'hdfs://hadoop1:8020/tmp/hudi_flink/t5',

  'table.type' = 'MERGE_ON_READ',

  'read.streaming.enabled' = 'true',

  'read.streaming.check-interval' = '4'   -- 默认60s

);

 

 

insert into t5 select * from sourceT;

 

select * from t5;

增量读取(Incremental Query

0.10.0 开始支持。

如果有增量读取 batch 数据的需求,增量读取包含三种场景。

(1)Stream 增量消费,通过参数 read.start-commit 指定起始消费位置;

(2)Batch 增量消费,通过参数 read.start-commit 指定起始消费位置,通过参数 read.end-commit 指定结束消费位置,区间为闭区间,即包含起始、结束的 commit

(3)TimeTravelBatch 消费某个时间点的数据:通过参数 read.end-commit 指定结束消费位置即可(由于起始位置默认从最新,所以无需重复声明)

flink读取kafka数据并写入hudi数据湖

(1)创建kafka源表

create table stu3_binlog_source_kafka(

  id bigint not null,

  name string,

  school string,

  nickname string,

  age int not null,

  class_num int not null,

  phone bigint not null,

  email string,

  ip string

 ) with (

  'connector' = 'kafka',

  'topic' = 'cdc_mysql_stu3_sink',

  'properties.bootstrap.servers' = 'hadoop1:9092',

  'format' = 'json',

  'scan.startup.mode' = 'earliest-offset',

  'properties.group.id' = 'testGroup'

  );

(2)创建hudi目标表

 create table stu3_binlog_sink_hudi(

  id bigint not null,

  name string,

  `school` string,

  nickname string,

  age int not null,

  class_num int not null,

  phone bigint not null,

  email string,

  ip string,

  primary key (id) not enforced

)

 partitioned by (`school`)

 with (

  'connector' = 'hudi',

  'path' = 'hdfs://hadoop1:8020/tmp/hudi_flink/stu3_binlog_sink_hudi',

  'table.type' = 'MERGE_ON_READ',

  'write.option' = 'insert',

  'write.precombine.field' = 'school'

  );

(3)将kafka数据写入到hudi中

insert into stu3_binlog_sink_hudi

select * from  stu3_binlog_source_kafka;

离线批量导入

如果存量数据来源于其他数据源,可以使用批量导入功能,快速将存量数据导成 Hoodie 表格式。

1)原理

(1)批量导入省去了 avro 的序列化以及数据的 merge 过程,后续不会再有去重操作,数据的唯一性需要自己来保证。

(2)bulk_insert 需要在 Batch Execuiton Mode 下执行更高效,Batch 模式默认会按照 partition path 排序输入消息再写入 Hoodie,避免 file handle 频繁切换导致性能下降。

SET execution.runtime-mode = batch;

SET execution.checkpointing.interval = 0;

(3)bulk_insert write task 的并发通过参数 write.tasks 指定,并发的数量会影响到小文件的数量,理论上,bulk_insert write task 的并发数就是划分的 bucket 数,当然每个 bucket 在写到文件大小上限(parquet 120 MB)的时候会 roll over 到新的文件句柄,所以最后:写文件数量 >= bulk_insert write task 数。

案例

Flink SQL client创建hudi表

 create table stu4_sink_hudi(

  id bigint not null,

  name string,

  `school` string,

  nickname string,

  age int not null,

 score decimal(4,2) not null,

  class_num int not null,

  phone bigint not null,

  email string,

  ip string,

  primary key (id) not enforced

)

 partitioned by (`school`)

 with (

  'connector' = 'hudi',

  'path' = 'hdfs://hadoop1:8020/tmp/hudi_flink/stu4_sink_hudi',

  'table.type' = 'MERGE_ON_READ',

  'write.option' = 'bulk_insert',

  'write.precombine.field' = 'school'

  );

Flink SQL client执行mysql数据插入到hudi中

insert into stu4_sink_hudi select * from stu4;

全量接增量

如果已经有全量的离线 Hoodie 表,需要接上实时写入,并且保证数据不重复,可以开启 index bootstrap 功能。

如果觉得流程冗长,可以在写入全量数据的时候资源调大直接走流模式写,全量走完接新数据再将资源调小(或者开启限流功能)。

(1) CREATE TABLE 创建和 Hoodie 表对应的语句,注意 table type 要正确

(2)设置 index.bootstrap.enabled = true开启索引加载功能

(6)重启任务将 index.bootstrap.enabled 关闭,参数配置到合适的大小,如果RowDataToHoodieFunction 和 BootstrapFunction 并发不同,可以重启避免 shuffle

Changelog 模式

如果希望 Hoodie 保留消息的所有变更(I/-U/U/D),之后接上 Flink 引擎的有状态计算实现全链路近实时数仓生产(增量计算),Hoodie 的 MOR 表通过行存原生支持保留消息的所有变更(format 层面的集成),通过流读 MOR 表可以消费到所有的变更记录。

changelog.enabled

false

false

默认是关闭状态,即 UPSERT 语义,所有的消息仅保证最后一条合并消息中间的变更可能会被 merge 改成 true 支持消费所有变更

批(快照)读仍然会合并所有的中间结果,不管 format 是否已存储中间状态。

案例演示

(1)使用changelog

set sql-client.execution.result-mode=tableau;

 

CREATE TABLE t6(

  id int,

  ts int,

  primary key (id) not enforced

) WITH (

  'connector' = 'hudi',

  'path' = 'hdfs://hadoop1:8020/tmp/hudi_flink/t6',

  'table.type' = 'MERGE_ON_READ',

  'read.streaming.enabled' = 'true',

  'read.streaming.check-interval' = '4',

  'changelog.enabled' = 'true'

);

 

insert into t6 values (1,1);

insert into t6 values (1,2);

 

set table.dynamic-table-options.enabled=true;

select * from t6/*+ OPTIONS('read.start-commit'='earliest')*/;

select count(*) from t6/*+ OPTIONS('read.start-commit'='earliest')*/;

 

 

 

 

Hudi Catalog

从 0.12.0 开始支持,通过 catalog 可以管理 flink 创建的表避免每次使用都要重复建表操作,另外 hms 模式的 catalog 支持自动补全 hive 同步参数。

DFS 模式 Catalog SQL样例:

CREATE CATALOG hoodie_catalog

  WITH (

    'type'='hudi',

    'catalog.path' = '${catalog 的默认路径}',

    'mode'='dfs'

  );

Hms 模式 Catalog SQL 样例:

CREATE CATALOG hoodie_catalog

  WITH (

    'type'='hudi',

    'catalog.path' = '${catalog 的默认路径}',

    'hive.conf.dir' = '${hive-site.xml 所在的目录}',

    'mode'='hms' -- 支持 'dfs' 模式通过文件系统管理表属性

  );

 

离线 Compaction

MOR 表的 compaction 默认是自动打开的策略是 5 commits 执行一次压缩。 因为压缩操作比较耗费内存,和写流程放在同一个 pipeline,在数据量比较大的时候(10w+/s qps),容易干扰写流程,此时采用离线定时任务的方式执行 compaction 任务更稳定。

设置参数

compaction.async.enabled 为 false,关闭在线 compaction。

compaction.schedule.enabled 仍然保持开启,由写任务阶段性触发压缩 plan。

命令行的方式

./bin/flink run -c org.apache.hudi.sink.compact.HoodieFlinkCompactor lib/hudi-flink1.13-bundle-0.12.0.jar --path hdfs://xxx:8020/table

离线 Clustering

异步的 clustering 相对于 online 的 async clustering 资源隔离,从而更加稳定。

5.14.1 设置参数

clustering.async.enabled 为 false,关闭在线 clustering。

clustering.schedule.enabled 仍然保持开启,由写任务阶段性触发 clustering plan。

命令行的方式

./bin/flink run -c org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob lib/hudi-flink1.13-bundle-0.12.0.jar --path hdfs://xxx:8020/table

常见基础问题

5.15.1 存储一直看不到数据

如果是 streaming 写,请确保开启 checkpoint,Flink 的 writer 有 3 种刷数据到磁盘的策略:

当某个 bucket 在内存积攒到一定大小 (可配,默认 64MB)

当总的 buffer 大小积攒到一定大小(可配,默认 1GB)

当 checkpoint 触发,将内存里的数据全部 flush 出去

5.15.2 数据有重复

如果是 COW 写,需要开启参数 write.insert.drop.duplicates,COW 写每个 bucket 的第一个文件默认是不去重的,只有增量的数据会去重,全局去重需要开启该参数;MOR 写不需要开启任何参数,定义好 primary key 后默认全局去重。(注意:从 0.10 版本开始,该属性改名 write.precombine 并且默认为 true。)

如果需要多 partition 去重,需要开启参数: index.global.enabled 为 true。(注意:从 0.10 版本开始,该属性默认为 true。)

索引 index 是判断数据重复的核心数据结构,index.state.ttl 设置了索引保存的时间,默认为 1.5 天,对于长时间周期的更新,比如更新一个月前的数据,需要将 index.state.ttl 调大(单位天),设置小于 0 代表永久保存。(注意:从 0.10 版本开始,该属性默认为 0。)

5.15.3 Merge On Read 只有 log 文件

 Merge On Read 默认开启了异步的 compaction,策略是 5 commits 压缩一次,当条件满足参会触发压缩任务,另外,压缩本身因为耗费资源,所以不一定能跟上写入效率,可能会有滞后。

可以先观察 log,搜索 compaction 关键词,看是否有 compact 任务调度:

After filtering, Nothing to compact for 关键词说明本次 compaction strategy 是不做压缩。

集成 Hive

Hudi 源表对应一份 HDFS 数据,通过 Spark,Flink 组件或者 Hudi CLI,可以将 Hudi 表的数据映射为 Hive 外部表,基于该外部表, Hive可以方便的进行实时视图,读优化视图以及增量视图的查询。

6.1 集成步骤

以 hive3.1.2、hudi 0.12.0为例,其他版本类似。

1)拷贝编译好的jar

hudi-hadoop-mr-bundle-0.12.0.jar , hudi-hive-sync-bundle-0.12.0.jar 放到 hive 节点的lib目录下;

cp /opt/software/hudi-0.12.0/packaging/hudi-hadoop-mr-bundle/target/hudi-hadoop-mr-bundle-0.12.0.jar /opt/module/hive/lib/

 

cp /opt/software/hudi-0.12.0/packaging/hudi-hive-sync-bundle/target/hudi-hive-sync-bundle-0.12.0.jar /opt/module/hive/lib/

2)配置完后重启 hive

// 按照需求选择合适的方式重启

nohup hive --service metastore &

nohup hive --service hiveserver2 &

 Hive 同步

6.2.1 Flink 同步Hive

1)使用方式

Flink hive sync 现在支持两种 hive sync mode, 分别是 hms 和 jdbc 模式。 其中 hms 只需要配置 metastore uris;而 jdbc 模式需要同时配置 jdbc 属性 和 metastore uris,具体配置模版如下:

## hms mode 配置

 

CREATE TABLE t1(

  uuid VARCHAR(20),

  name VARCHAR(10),

  age INT,

  ts TIMESTAMP(3),

  `partition` VARCHAR(20)

)

PARTITIONED BY (`partition`)

with(

  'connector'='hudi',

  'path' = 'hdfs://xxx.xxx.xxx.xxx:9000/t1',

  'table.type'='COPY_ON_WRITE',        -- MERGE_ON_READ方式在没生成 parquet 文件前,hive不会有输出

  'hive_sync.enable'='true',           -- required,开启hive同步功能

  'hive_sync.table'='${hive_table}',              -- required, hive 新建的表名

  'hive_sync.db'='${hive_db}',             -- required, hive 新建的数据库名

  'hive_sync.mode' = 'hms',            -- required, hive sync mode设置为hms, 默认jdbc

  'hive_sync.metastore.uris' = 'thrift://ip:9083' -- required, metastore的端口

);

Flink 使用 HiveCatalog

6.3.1 直接使用Hive Catalog

1)上传hive connectorflinklib

hive3.1.3的connector存在guava版本冲突,需要解决:官网下载connector后,用压缩软件打开jar包,删除/com/google文件夹。处理完后上传flink的lib中。

2)解决与hadoop的冲突   

避免与hadoop的冲突,拷贝hadoop-mapreduce-client-core-3.1.3.jar到flink的lib中(5.2.1已经做过)

3)创建catalog

CREATE CATALOG hive_catalog

  WITH (

    'type' = 'hive',

    'default-database' = 'default',

    'hive-conf-dir' = '/opt/module/hive/conf',

'hadoop-conf-dir'='/opt/module/hadoop-3.1.3/etc/hadoop'

  );

 

use catalog hive_catalog;

 

-- hive-connector内置了hive module,提供了hive自带的系统函数

load module hive with ('hive-version'='3.1.2');

show modules;

show functions;

 

-- 可以调用hivesplit函数

select split('a,b', ',');

查询 Hive 外表

6.5.1 设置参数

使用 Hive 查询 Hudi 表前,需要通过set命令设置 hive.input.format,否则会出现数据重复,查询异常等错误,如下面这个报错就是典型的没有设置 hive.input.format 导致的:

java.lang.IllegalArgumentException: HoodieRealtimeReader can oly work on RealTimeSplit and not with xxxxxxxxxx

除此之外对于增量查询,还需要 set 命令额外设置3个参数。

set hoodie.mytableName.consume.mode=INCREMENTAL;

set hoodie.mytableName.consume.max.commits=3;

set hoodie.mytableName.consume.start.timestamp=commitTime;

注意这3个参数是表级别参数。

COW 表查询

这里假设同步的 Hive 外表名为 hudi_cow。

1实时视图

设置 hive.input.format 为以下两个之一:

org.apache.hadoop.hive.ql.io.HiveInputFormat

org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat

像普通的hive表一样查询即可:

set hive.input.format= org.apache.hadoop.hive.ql.io.HiveInputFormat;

select count(*) from hudi_cow;

2增量视图

除了要设置 hive.input.format,还需要设置上述的3个增量查询参数,且增量查询语句中的必须添加 where 关键字并`_hoodie_commit_time > 'startCommitTime' 作为过滤条件(这地方主要是hudi的小文件合并会把新旧commit的数据合并成新数据,hive是没法直接从parquet文件知道哪些是新数据哪些是老数据)

set hive.input.format= org.apache.hadoop.hive.ql.io.HiveInputFormat;

set hoodie.hudicow.consume.mode= INCREMENTAL;

set hoodie.hudicow.consume.max.commits=3;

set hoodie.hudicow.consume.start.timestamp= xxxx;

select count(*) from hudicow where `_hoodie_commit_time`>'xxxx'

-- (这里注意`_hoodie_commit_time` 的引号是反引号(tab键上面那个)不是单引号, 'xxxx'是单引号)

6.5.3 MOR 表查询

这里假设 MOR 类型 Hudi 源表的表名为hudi_mor,映射为两张 Hive 外部表hudi_mor_ro(ro表)和 hudi_mor_rt(rt表)。

1实时视图

设置了 hive.input.format 之后,即可查询到Hudi源表的最新数据

set hive.input.format= org.apache.hadoop.hive.ql.io.HiveInputFormat;

select * from hudicow_rt;

2读优化视图

ro 表全称 read oprimized table,对于 MOR 表同步的 xxx_ro 只暴露压缩后的 parquet。其查询方式和COW表类似。设置完 hiveInputFormat 之后 和普通的 Hive 表一样查询即可

3)增量视图

这个增量查询针对的rt,不是ro表。同 COW 表的增量查询类似:

set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat; // 这地方指定为HoodieCombineHiveInputFormat

set hoodie.hudimor.consume.mode=INCREMENTAL;

set hoodie.hudimor.consume.max.commits=-1;

set hoodie.hudimor.consume.start.timestamp=xxxx;

select * from hudimor_rt where `_hoodie_commit_time`>'xxxx';// 这个表名要是rt

索引

说明:

set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;最好只用于 rt 表的增量查询 当然其他种类的查询也可以设置为这个,这个参数会影响到普通的hive表查询,因此在rt表增量查询完成后,应该设置 set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; 或者改为默认值set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; 用于其他表的查询。

set hoodie.mytableName.consume.mode=INCREMENTAL; 仅用于该表的增量查询模式,若要对该表切换为其他查询模式,应设置set hoodie.hudisourcetablename.consume.mode=SNAPSHOT;

 

湖仓一体的优势如下:

流数仓架构本质上有两个痛点实时/离线计算层不统一;实时/离线存储层不统一。

减少数据冗余:湖仓一体提单一的数据存储平台减少了数据的冗余和重复,避免维护多个存储系统的成本和时间。

成本效益:湖仓一体利用低成本对象存储实现高效益数据存储,降低了存储成本,并避免了维护多个数据存储系统的成本。

事务支持:湖仓体支持 ACID 事务,确保了多方同时读取或写入数据的一致性。

Schema的实施和治理:湖仓一体支持Schema的实施和演化,确保数据的完整性,并提供了强大的治理和审计机制。

开放性:湖仓一体采用开放和标准化的存储格式,如Parquet,可以让各种工具和引擎直接访问数据。

存算分离:湖仓一体将存储和计算解耦,可以横向扩展到更大规模和更多并发用户。

支持多种工作负载:湖仓一体支持数据科学、机器学习、SQL和数据分析等各种工作负载,减少了需要维护多个工具的成本。

端到端的流计算支持:湖仓一体支持流计算,实时/离线存储层统一,实现实时报告的需求,避免了使用单独系统来实时数据应用程序的需求。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/45207.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java高级重点知识点-25-Stream流、方法引用

文章目录 Stream流流式思想概述获取流常用方法 方法引用方法引用符通过对象名引用成员方法通过类名称引用静态方法通过super引用成员方法通过this引用成员方法类的构造器引用数组的构造器引用 Stream流 通过循环遍历来讲解流的优势; 要求:筛选所有姓张的…

实现Android夜间模式主题:从入门到精通

实现Android夜间模式主题:从入门到精通 随着用户对夜间模式的需求越来越高,Android开发者需要掌握如何在应用中实现夜间模式。本文将详细介绍在Android中实现夜间模式的步骤,包括配置、实现、以及一些最佳实践,帮助开发者创建更具吸引力和用户友好的应用。 夜间模式的优势…

Redis基础教程(二十):Java使用Redis

💝💝💝首先,欢迎各位来到我的博客,很高兴能够在这里和您见面!希望您在这里不仅可以有所收获,同时也能感受到一份轻松欢乐的氛围,祝你生活愉快! 💝&#x1f49…

华贝甄选干细胞科技,揭秘生命修复的奥秘

在探索生命奥秘的漫漫征途中,华贝甄选凭借干细胞科技的神奇力量,为您点亮健康与活力的希望之光。 我们深知,细胞是生命的基石,而干细胞则是这基石中蕴含的无限潜能。华贝甄选精心打造的干细胞疗法,如同神奇的魔法&…

2024SpringCloud学习笔记

远程调用Rest Template 服务注册与发现&分布式配置管理 Consul 下载安装 官网https:/ldeveloper.hashicorp.com/consul/downloads 开发者模式启动consul agennt -dev 浏览器访问本地端口:8500 服务注册与发现 Maven引入 <!--SpringCloud consul discovery -->…

【Python实战因果推断】31_双重差分2

目录 Canonical Difference-in-Differences Diff-in-Diff with Outcome Growth Canonical Difference-in-Differences 差分法的基本思想是&#xff0c;通过使用受治疗单位的基线&#xff0c;但应用对照单位的结果&#xff08;增长&#xff09;演变&#xff0c;来估算缺失的潜…

小阿轩yx-NoSQL 之 Redis 配置与优化

小阿轩yx-NoSQL 之 Redis 配置与优化 Redis 数据库介绍 是一个非关系型数据库 关系数据库与非关系型数据库 按照数据库结构划分的 关系型数据库 是一个结构化的数据库&#xff0c;创建在关系模型基础上&#xff0c;一般面向于记录借助集合代数等数学概念和方法处理数据库…

215.Mit6.S081-实验三-page tables

在本实验室中&#xff0c;您将探索页表并对其进行修改&#xff0c;以简化将数据从用户空间复制到内核空间的函数。 一、实验准备 开始编码之前&#xff0c;请阅读xv6手册的第3章和相关文件&#xff1a; kernel/memlayout.h&#xff0c;它捕获了内存的布局。kernel/vm.c&…

Python:Python基础知识(注释、命名、数据类型、运算符)

.注释 Python有两种注释方法&#xff1a;单行注释和多行注释。单行注释以#开头&#xff0c;多行注释以三个单引号 或三个双引号 """ 开头和结尾。 2.命名规则 命名规则: 大小写字母、数字、下划线和汉字等字符及组合&#xff1b; 注意事项: 大小写敏感、首…

Linux环境下Oracle 11g的离线安装与配置历程

在成功体验了 Windows 版本的Oracle 11g 后&#xff0c;这几天心血来潮&#xff0c;决定再挑战一下Linux 环境下的安装&#xff0c;特别是在考虑到部门内部虚拟机无法联网的情况下&#xff0c;我选择了在CentOS 7上进行离线安装。这次安装之旅&#xff0c;主要参考了下面大佬的…

【计算机科学】CCF-C特刊征稿合集,见刊快,期刊质量高,速投!

期刊推荐 期刊名称&#xff1a;ACTA INFORMATICA 主题包括以下项目的理论方面。 算法及其分析 自动机和形式语言 可计算性和复杂性 数据处理 离散数学 逻辑学&#xff08;计算机科学&#xff09; 人工智能的数学基础 编程语言理论 安全 系统理论 验证 中科院四区 …

STM32智能物流机器人系统教程

目录 引言环境准备智能物流机器人系统基础代码实现&#xff1a;实现智能物流机器人系统 4.1 数据采集模块 4.2 数据处理与导航算法 4.3 通信与网络系统实现 4.4 用户界面与数据可视化应用场景&#xff1a;物流机器人管理与优化问题解决方案与优化收尾与总结 1. 引言 智能物流…

mindspore打卡23天之微调本地MindNLP ChatGLM-6B StreamChat

MindNLP ChatGLM-6B StreamChat 本案例基于MindNLP和ChatGLM-6B实现一个聊天应用。 1 环境配置 %%capture captured_output # 实验环境已经预装了mindspore2.2.14&#xff0c;如需更换mindspore版本&#xff0c;可更改下面mindspore的版本号 !pip uninstall mindspore -y !p…

基于JavaSpringBoot+Vue+uniapp微信小程序校园宿舍管理系统设计与实现(7000字论文参考+源码+LW+部署讲解)

博主介绍&#xff1a;硕士研究生&#xff0c;专注于信息化技术领域开发与管理&#xff0c;会使用java、标准c/c等开发语言&#xff0c;以及毕业项目实战✌ 从事基于java BS架构、CS架构、c/c 编程工作近16年&#xff0c;拥有近12年的管理工作经验&#xff0c;拥有较丰富的技术架…

Linux:NFS共享存储

目录 一、NFS基本概述 二、NFS共享文件实验 2.1、安装nfs和rpcbind软件 2.2、修改配置文件设置共享 2.3、创建共享目录 ​编辑 2.4、开启服务 2.5、客户端验证共享目录可访问 三、tcpdump命令 3.1、概述 3.2、简单表达 3.3、过滤规则 ​编辑 3.4、tcpdump常见参数…

强化学习实战2:动手写迷宫环境

迷宫环境介绍与创建 迷宫环境图示如下&#xff1a; 如图所示&#xff0c;其为一个 三乘三 的网格世界&#xff0c;我们要让 agent 从 S0 采取策略出发&#xff0c;然后走到 S8&#xff0c;图中红线部分表示障碍不能逾越&#xff0c;其中 S1 和 S4 之间有一个障碍&#xff0c;S…

C语言有哪些特点?

C语言是一种结构化语言&#xff0c;它有着清晰的层次&#xff0c;可按照模块的方式对程序进行编写&#xff0c;十分有利于程序的调试&#xff0c;且c语言的处理和表现能力都非常的强大&#xff0c;依靠非常全面的运算符和多样的数据类型&#xff0c;可以轻易完成各种数据结构的…

Kotlin MultiPlatform(KMP)

Kotlin MultiPlatform 1.KMP 是什么 Kotlin Multiplatform 是一个工具&#xff0c;它让我们用同一种编程语言&#xff08;Kotlin&#xff09;写代码&#xff0c;这些代码可以同时在不同的设备上运行&#xff0c;比如手机、电脑和网页。这样做可以节省时间&#xff0c;因为你不…

1、项目目录设计

文章目录 前言一、项目目录设计 前言 本项目我们将会完成一个Go项目开发框架&#xff0c;该项目不会包含具体的CRUD业务代码&#xff0c;而是从头搭建一个工作中实用的开发框架。让开发者能够熟悉整个项目的搭建流程&#xff0c;能够独立完成项目从0到1的搭建&#xff0c;而且…

【RHCE】实验(HTTP,DNS,SELinux,firewalld的运用)

一、题目 二、主服务器配置 1.下载HTTP服务&#xff0c;DNS服务 [rootlocalhost ~]# yum install -y httpd bind 2.开启防火墙&#xff0c;放行服务 # 开启防火墙 [rootlocalhost ~]# systemctl start firewalld # 放行服务 [rootlocalhost ~]# firewall-cmd --add-service…