[大数据]Hudi编译集成

1. Hudi概述

1.1 Hudi简介 What is Apache Hudi

Apache Hudi is the next generation streaming data lake platform. Apache Hudi brings core warehouse and database functionality directly to a data lake. Hudi provides tables, transactions, efficient upserts/deletes, advanced indexes, streaming ingestion services, data clustering/compaction optimizations, and concurrency all while keeping your data in open source file formats.

Apache Hudi 是下一代流式数据湖平台。Apache Hudi 将核心的数据仓库和数据库功能直接引入数据湖。Hudi 提供了表、事务、高效的更新/删除、先进的索引、流式摄取服务、数据聚类/压缩优化以及并发处理,同时保持数据在开源文件格式中。

行式存储: .avro

列式存储: .parquet

字节跳动基于Apache Hudi构建EB级数据湖实践

Not only is Apache Hudi great for streaming workloads, but it also allows you to create efficient incremental batch pipelines. Read the docs for more use case descriptions and check out who's using Hudi, to see how some of the largest data lakes in the world including Uber, Amazon, ByteDance, Robinhood and more are transforming their production data lakes with Hudi.

Apache Hudi 不仅适用于流式工作负载,还允许您创建高效的增量批处理管道。请阅读文档以获取更多用例描述,并查看谁在使用 Hudi,了解一些全球最大的数 据湖如何利用 Hudi 转型他们的生产数据湖,包括 Uber、亚马逊、字节跳动、Robinhood 等公司。 

Apache Hudi can easily be used on any cloud storage platform. Hudi’s advanced performance optimizations, make analytical workloads faster with any of the popular query engines including, Apache Spark, Flink, Presto, Trino, Hive, etc.

Apache Hudi 可以轻松地在任何云存储平台上使用。Hudi 的高级性能优化可以加快与任何流行查询引擎(包括 Apache Spark、Flink、Presto、Trino、Hive 等)一起进行的分析工作负载。

2. 编译Hudi

2.1 编译环境准备

相关组件版本如下:

Hadoop3.3.1
Hive3.1.3
Flink1.13.6,scala-2.12
Spark3.3.1,scala-2.12

1)安装Maven

(1)上传apache-maven-3.6.1-bin.tar.gz到/opt/software目录,并解压更名

tar -zxvf apache-maven-3.6.1-bin.tar.gz -C /opt/module/

mv apache-maven-3.6.1 maven-3.6.1

(2)添加环境变量到/etc/profile中

sudo vim /etc/profile.d/my_env.sh

#MAVEN_HOME

export MAVEN_HOME=/opt/module/maven-3.6.1

export PATH=$PATH:$MAVEN_HOME/bin

(3)测试安装结果

source /etc/profile

mvn -v

2)修改为阿里镜像

(1)修改setting.xml,指定为阿里仓库地址

vim /opt/module/maven-3.6.1/conf/settings.xml

<!-- 添加阿里云镜像-->
<mirror><id>nexus-aliyun</id><mirrorOf>central</mirrorOf><name>Nexus aliyun</name><url>http://maven.aliyun.com/nexus/content/groups/public</url>
</mirror>

2.2 下载源码包

https://github.com/apache/hudi/blob/release-0.12.3/README.md

wget https://dlcdn.apache.org/hudi/0.12.3/hudi-0.12.3.src.tgz

1)修改pom.xml,添加以下仓库地址:

<repository><id>nexus-aliyun</id><name>nexus-aliyun</name><url>http://maven.aliyun.com/nexus/content/groups/public/</url><releases><enabled>true</enabled></releases><snapshots><enabled>false</enabled></snapshots>
</repository>

 2)修改依赖的组件版本

<hadoop.version>3.3.1</hadoop.version>
<hive.version>3.1.3</hive.version>

 

2.3 编译命令

mvn clean package -DskipTests -Dspark3.3 -Dflink1.13 -Dscala-2.12 -Dhadoop.version=3.3.1 -Pflink-bundle-shade-hive3

直接编译报错:

hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java

原因:hadoop2 和hadoop3兼容性问题

hadoop2 的API:

修改为hadoop3的API:

其他地方都不修改:

 

 

 

 

 2.4 解决Spark模块依赖冲突

java.lang.AbstractMethodError: org.apache.hudi.org.apache.jetty.server.AllowedResourceAliasChecker$AllowedResourceAliasCheckListener.lifeCycleFailure(Lorg/apache/hudi/org/apache/jetty/util/component/LifeCycle;Ljava/lang/Throwable;)V

 修改了Hive版本为3.1.3,其携带的jetty是0.9.3,hudi本身用的0.9.4,存在依赖冲突。
1)修改hudi-spark-bundle的pom文件,排除低版本jetty,添加hudi指定版本的jetty:
vim /opt/software/hudi-0.12.3/packaging/hudi-spark-bundle/pom.xml

    <!-- Hive -->
    <dependency>
      <groupId>${hive.groupid}</groupId>
      <artifactId>hive-service</artifactId>
      <version>${hive.version}</version>
      <scope>${spark.bundle.hive.scope}</scope>
      <exclusions>
        <exclusion>
          <artifactId>servlet-api</artifactId>
          <groupId>javax.servlet</groupId>
        </exclusion>
        <exclusion>
          <artifactId>guava</artifactId>
          <groupId>com.google.guava</groupId>
        </exclusion>
        <exclusion>
          <groupId>org.eclipse.jetty</groupId>
          <artifactId>*</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.pentaho</groupId>
          <artifactId>*</artifactId>
        </exclusion>

      </exclusions>
    </dependency>

    <dependency>
      <groupId>${hive.groupid}</groupId>
      <artifactId>hive-service-rpc</artifactId>
      <version>${hive.version}</version>
      <scope>${spark.bundle.hive.scope}</scope>
    </dependency>

    <dependency>
      <groupId>${hive.groupid}</groupId>
      <artifactId>hive-jdbc</artifactId>
      <version>${hive.version}</version>
      <scope>${spark.bundle.hive.scope}</scope>
      <exclusions>
        <exclusion>
          <groupId>javax.servlet</groupId>
          <artifactId>*</artifactId>
        </exclusion>
        <exclusion>
          <groupId>javax.servlet.jsp</groupId>
          <artifactId>*</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.eclipse.jetty</groupId>
          <artifactId>*</artifactId>
        </exclusion>
      </exclusions>

    </dependency>

    <dependency>
      <groupId>${hive.groupid}</groupId>
      <artifactId>hive-metastore</artifactId>
      <version>${hive.version}</version>
      <scope>${spark.bundle.hive.scope}</scope>
      <exclusions>
        <exclusion>
          <groupId>javax.servlet</groupId>
          <artifactId>*</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.datanucleus</groupId>
          <artifactId>datanucleus-core</artifactId>
        </exclusion>
        <exclusion>
          <groupId>javax.servlet.jsp</groupId>
          <artifactId>*</artifactId>
        </exclusion>
        <exclusion>
          <artifactId>guava</artifactId>
          <groupId>com.google.guava</groupId>
        </exclusion>
      </exclusions>

    </dependency>

    <dependency>
      <groupId>${hive.groupid}</groupId>
      <artifactId>hive-common</artifactId>
      <version>${hive.version}</version>
      <scope>${spark.bundle.hive.scope}</scope>
      <exclusions>
        <exclusion>
          <groupId>org.eclipse.jetty.orbit</groupId>
          <artifactId>javax.servlet</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.eclipse.jetty</groupId>
          <artifactId>*</artifactId>
        </exclusion>
      </exclusions>

    </dependency>

    <!-- 增加hudi配置版本的jetty -->
    <dependency>
      <groupId>org.eclipse.jetty</groupId>
      <artifactId>jetty-server</artifactId>
      <version>${jetty.version}</version>
    </dependency>
    <dependency>
      <groupId>org.eclipse.jetty</groupId>
      <artifactId>jetty-util</artifactId>
      <version>${jetty.version}</version>
    </dependency>
    <dependency>
      <groupId>org.eclipse.jetty</groupId>
      <artifactId>jetty-webapp</artifactId>
      <version>${jetty.version}</version>
    </dependency>
    <dependency>
      <groupId>org.eclipse.jetty</groupId>
      <artifactId>jetty-http</artifactId>
      <version>${jetty.version}</version>
    </dependency>

2)修改hudi-utilities-bundle的pom文件,排除低版本jetty,添加hudi指定版本的jetty:
vim /opt/software/hudi-0.12.3/packaging/hudi-utilities-bundle/pom.xml
修改如下(红色部分)

    <!-- Hive -->
    <dependency>
      <groupId>${hive.groupid}</groupId>
      <artifactId>hive-service</artifactId>
      <version>${hive.version}</version>
      <scope>${utilities.bundle.hive.scope}</scope>
      <exclusions>
        <exclusion>
          <groupId>org.apache.hbase</groupId>
          <artifactId>*</artifactId>
        </exclusion>
    <exclusion>
          <artifactId>servlet-api</artifactId>
          <groupId>javax.servlet</groupId>
        </exclusion>
        <exclusion>
          <artifactId>guava</artifactId>
          <groupId>com.google.guava</groupId>
        </exclusion>
        <exclusion>
          <groupId>org.eclipse.jetty</groupId>
          <artifactId>*</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.pentaho</groupId>
          <artifactId>*</artifactId>
        </exclusion>

      </exclusions>
    </dependency>

    <dependency>
      <groupId>${hive.groupid}</groupId>
      <artifactId>hive-service-rpc</artifactId>
      <version>${hive.version}</version>
      <scope>${utilities.bundle.hive.scope}</scope>
    </dependency>

    <dependency>
      <groupId>${hive.groupid}</groupId>
      <artifactId>hive-jdbc</artifactId>
      <version>${hive.version}</version>
      <scope>${utilities.bundle.hive.scope}</scope>
      <exclusions>
        <exclusion>
          <groupId>javax.servlet</groupId>
          <artifactId>*</artifactId>
        </exclusion>
        <exclusion>
          <groupId>javax.servlet.jsp</groupId>
          <artifactId>*</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.eclipse.jetty</groupId>
          <artifactId>*</artifactId>
        </exclusion>
      </exclusions>

    </dependency>

    <dependency>
      <groupId>${hive.groupid}</groupId>
      <artifactId>hive-metastore</artifactId>
      <version>${hive.version}</version>
      <scope>${utilities.bundle.hive.scope}</scope>
      <exclusions>
        <exclusion>
          <groupId>org.apache.hbase</groupId>
          <artifactId>*</artifactId>
        </exclusion>
        <exclusion>
          <groupId>javax.servlet</groupId>
          <artifactId>*</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.datanucleus</groupId>
          <artifactId>datanucleus-core</artifactId>
        </exclusion>
        <exclusion>
          <groupId>javax.servlet.jsp</groupId>
          <artifactId>*</artifactId>
        </exclusion>
        <exclusion>
          <artifactId>guava</artifactId>
          <groupId>com.google.guava</groupId>
        </exclusion>
      </exclusions>

    </dependency>

    <dependency>
      <groupId>${hive.groupid}</groupId>
      <artifactId>hive-common</artifactId>
      <version>${hive.version}</version>
      <scope>${utilities.bundle.hive.scope}</scope>
      <exclusions>
        <exclusion>
          <groupId>org.eclipse.jetty.orbit</groupId>
          <artifactId>javax.servlet</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.eclipse.jetty</groupId>
          <artifactId>*</artifactId>
        </exclusion>
      </exclusions>

    </dependency>

    <dependency>
      <groupId>org.apache.htrace</groupId>
      <artifactId>htrace-core</artifactId>
      <version>${htrace.version}</version>
      <scope>compile</scope>
    </dependency>

    <!-- 增加hudi配置版本的jetty -->
    <dependency>
      <groupId>org.eclipse.jetty</groupId>
      <artifactId>jetty-server</artifactId>
      <version>${jetty.version}</version>
    </dependency>
    <dependency>
      <groupId>org.eclipse.jetty</groupId>
      <artifactId>jetty-util</artifactId>
      <version>${jetty.version}</version>
    </dependency>
    <dependency>
      <groupId>org.eclipse.jetty</groupId>
      <artifactId>jetty-webapp</artifactId>
      <version>${jetty.version}</version>
    </dependency>
    <dependency>
      <groupId>org.eclipse.jetty</groupId>
      <artifactId>jetty-http</artifactId>
      <version>${jetty.version}</version>
    </dependency>

再次编译:

mvn clean package -DskipTests -Dspark3.3 -Dflink1.13 -Dscala-2.12 -Dhadoop.version=3.3.1 -Pflink-bundle-shade-hive3 

 

3. 核心概念

3.1 基本概念

Apache Hudi核心概念一网打尽

3.1.1 时间轴(TimeLine)

At its core, Hudi maintains a timeline of all actions performed on the table at different instants of time that helps provide instantaneous views of the table, while also efficiently supporting retrieval of data in the order of arrival. A Hudi instant consists of the following components

Hudi的核心是维护表上在不同的即时时间(instants)执行的所有操作的时间轴(timeline),这有助于提供表的即时视图,同时还有效地支持按到达顺序检索数据。一个instant由以下三个部分组成:

在其核心,Hudi维护了一个时间轴,记录了表在不同时间点上执行的所有操作,这有助于提供即时的表视图,同时还能高效地支持按到达顺序检索数据。Hudi的一个时间点包括以下组件。

instantaneous:即时的,瞬时的

instant

n. 瞬息;顷刻;刹那;(某一)时刻

adj. 立即的;即刻的;速溶的;即食的;速食的;紧急的;紧迫的;迫切的;本月的;正在考虑中的

retrieval:[n.] 检索;恢复;取回;找回;恢复

  • Instant action : Type of action performed on the table 
  • Instant time : Instant time is typically a timestamp (e.g: 20190117010349), which monotonically increases in the order of action's begin time.
  • state : current state of the instant

即时行动:在表上执行的某种操作;
即时时间:通常是一个时间戳(例如:20190117010349),它按照动作开始时间的顺序单调递增。
状态:instant当前的状态。

monotonically:adv.单音调地;音调不变地;单调地

Hudi guarantees that the actions performed on the timeline are atomic & timeline consistent based on the instant time.

Hudi保证在时间轴上执行的操作基于即时时间是原子且时间一致的。

Key actions performed include

  • COMMITS - A commit denotes an atomic write of a batch of records into a table.
  • CLEANS - Background activity that gets rid of older versions of files in the table, that are no longer needed.
  • DELTA_COMMIT - A delta commit refers to an atomic write of a batch of records into a MergeOnRead type table, where some/all of the data could be just written to delta logs.
  • COMPACTION - Background activity to reconcile differential data structures within Hudi e.g: moving updates from row based log files to columnar [kəˈlʌmnə] formats. Internally, compaction manifests as a special commit on the timeline
  • ROLLBACK - Indicates that a commit/delta commit was unsuccessful & rolled back, removing any partial files produced during such a write
  • SAVEPOINT - Marks certain file groups as "saved", such that cleaner will not delete them. It helps restore the table to a point on the timeline, in case of disaster/data recovery scenarios.

主要的操作(Actions)包括:

  • 提交(COMMITS)- 提交表示将一批记录原子性地写入表中。
  • 清理(CLEANS)- 用于清除表中不再需要的旧文件版本的后台活动。
  • 增量提交(DELTA_COMMIT)- 增量提交指的是将一批记录原子性地写入MergeOnRead类型的表中,其中部分或全部数据可能刚刚写入增量日志。
  • 压缩(COMPACTION)- 后台活动,用于协调Hudi中的差异数据结构,例如将更新操作从基于行的日志文件合并到列式存储的数据文件中。在内部,Compaction表现为时间轴上的特殊提交。
  • 回滚(ROLLBACK)- 表示提交/增量提交失败并回滚,删除在此类写入期间生成的任何部分文件。
  • 保存点(SAVEPOINT)- 将某些文件组标记为“已保存”,以便清理器不会删除它们。它有助于在发生灾难需要数据恢复场景中将表恢复到时间轴上的某个点。

denote:vt. 表示;表明;代表;象征;意思是

reconcile [ˈrekənˌsaɪl]:vt.使接受;使顺从;使甘心于;使和解;使和好;使一致;使和谐;使(遭亵渎的教堂等)再次圣化。vi.和解;和好

Any given instant can be in one of the following states

  • REQUESTED - Denotes an action has been scheduled, but has not initiated
  • INFLIGHT - Denotes that the action is currently being performed
  • COMPLETED - Denotes completion of an action on the timeline

任何instant都处于以下状态之一:

REQUESTED——表示某个action已经调度,但尚未启动;

INFLIGHT——表示动作目前正在执行中;

COMPLETED——表示时间轴上的动作已完成。

两个时间概念:区分两个重要的时间概念:
    Arrival time: 数据到达 Hudi 的时间,commit time。
    Event time: record 中记录的时间。


 

上图中采用时间(小时)作为分区字段,从 10:00 开始陆续产生各种 commits,10:20 来了一条 9:00 的数据,根据event time该数据仍然可以落到 9:00 对应的分区,通过 timeline 直接消费 10:00 (commit time)之后的增量更新(只消费有新 commits 的 group),那么这条延迟的数据仍然可以被消费到。 

Example above shows upserts happenings between 10:00 and 10:20 on a Hudi table, roughly every 5 mins, leaving commit metadata on the Hudi timeline, along with other background cleaning/compactions. One key observation to make is that the commit time indicates the arrival time of the data (10:20AM), while the actual data organization reflects the actual time or event time, the data was intended for (hourly buckets from 07:00). These are two key concepts when reasoning about tradeoffs between latency and completeness of data.

上述示例显示,在Hudi表上发生了UPSERT操作,时间范围是10:00到10:20,大约每5分钟一次,并在Hudi时间轴上留下了提交元数据,以及其他后台清理/压缩。一个关键的观察点是,提交时间指示了数据到达的时间(上午10:20),而实际的数据组织反映了数据意图所在的实际时间或事件时间(从07:00开始的小时桶)。这是关于数据延迟和完整性的权衡时需要考虑的两个关键概念。

When there is late arriving data (data intended for 9:00 arriving >1 hr late at 10:20), we can see the upsert producing new data into even older time buckets/folders. With the help of the timeline, an incremental query attempting to get all new data that was committed successfully since 10:00 hours, is able to very efficiently consume only the changed files without say scanning all the time buckets > 07:00.

当有延迟到达的数据(预期9点到达的数据在10点20晚到达>1小时)时,我们可以看到upsert将新数据生成到更早的时段/文件夹中。在时间轴的帮助下,一个增量查询可以高效地获取自10点以来成功提交的所有新数据,只需要消费发生变化的文件即可,而不需要扫描所有大于7点的时间桶。

3.1.2 文件布局(File Layout)

Hudi将一个表映射为如下文件结构:

The following describes the general file layout structure for Apache Hudi

  • Hudi organizes data tables into a directory structure under a base path on a distributed file system
  • Tables are broken up into partitions
  • Within each partition, files are organized into file groups, uniquely identified by a file ID
  • Each file group contains several file slices
  • Each slice contains a base file (.parquet) produced at a certain commit/compaction instant time, along with set of log files (.log.*) that contain inserts/updates to the base file since the base file was produced.

Hudi adopts Multiversion Concurrency Control (MVCC), where compaction action merges logs and base files to produce new file slices and cleaning action gets rid of unused/older file slices to reclaim space on the file system.

以下描述了 Apache Hudi 的一般文件布局结构:

  • Hudi 将数据表组织成分布式文件系统上一个基础路径下的目录结构。  
  • 表被划分为多个分区。  
  • 在每个分区内,文件被组织成文件组,每个文件组由一个唯一的文件 ID 标识。  
  • 每个文件组包含多个文件切片。  
  • 每个切片包含一个基文件(.parquet),该文件是在某个提交/压缩瞬间生成的,以及一组日志文件(.log.*),这些日志文件包含自基文件生成以来对基文件的插入/更新。  

Hudi 采用多版本并发控制(MVCC),其中压缩操作将日志和基文件合并以生成新的文件切片,而清理操作则移除未使用的/旧的文件切片,以回收文件系统上的空间。

3.1.3 索引(Index)

3.1.4 表类型(Table Types)

3.1.5 查询类型(Query Types)

3.2 数据写

3.2.1 写操作

3.2.2 写流程(UPSERT)

3.2.3 写流程(INSERT)

3.2.4 写流程(INSERT OVERWRITE)

3.2.5 Key生成策略

3.2.6 删除策略

3.2.7 总结

3.3 数据读

3.3.1 Snapshot读

3.3.2 Incremental读

3.3.3 Streaming读

3.4 Compaction

4. 集成Spark

4.1 环境准备

4.2 spark-shell 方式

1)启动命令

# Spark 3.3
spark-shell \
--packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.12.3 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

# Spark 3.2
spark-shell \
  --packages org.apache.hudi:hudi-spark3.2-bundle_2.12:0.12.3 \
  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
  --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
  --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

 # Spark 3.1
spark-shell \
  --packages org.apache.hudi:hudi-spark3.1-bundle_2.12:0.12.3 \
  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
  --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

# Spark 2.4
spark-shell \
  --packages org.apache.hudi:hudi-spark2.4-bundle_2.11:0.12.3 \
  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
  --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' 

Please note the following

  • For Spark 3.2 and above, the additional spark_catalog config is required: --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
  • We have used hudi-spark-bundle built for scala 2.12 since the spark-avro module used can also depend on 2.12.

 2)设置表名,基本路径和数据生成器

// spark-shell
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.common.model.HoodieRecordval tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_cow"
val dataGen = new DataGenerator

   存储选择对象存储:

// spark-shell
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.common.model.HoodieRecordval tableName = "hudi_trips_cow"
val basePath1 = "obs://bigdata-teach/tmp/hudi_trips_cow"
val dataGen = new DataGenerator

 The DataGenerator can generate sample inserts and updates based on the the sample trip schema here.

hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java

3)Create Table

// scala
// No separate create table command required in spark. First batch of write to a table will create the table if not exists.  

4)插入数据

新增数据,生成一些数据,将其加载到DataFrame中,然后将DataFrame写入Hudi表。 

// spark-shell
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").options(getQuickstartWriteConfigs).option(PRECOMBINE_FIELD_OPT_KEY, "ts").option(RECORDKEY_FIELD_OPT_KEY, "uuid").option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").option(TABLE_NAME, tableName).mode(Overwrite).save(basePath1)

Mode(overwrite)将覆盖重新创建表(如果已存在)。可以检查/tmp/hudi_trps_cow 路径下是否有数据生成。 

We provided a record key (uuid in schema), partition field (region/country/city) and combine logic (ts in schema) to ensure trip records are unique within each partition. For more info, refer to Modeling data stored in Hudi and for info on ways to ingest data into Hudi, refer to Writing Hudi Tables. Here we are using the default write operation : upsert. If you have a workload without updates, you can also issue insert or bulk_insert operations which could be faster. To know more, refer to Write operations .

我们提供了一个记录键(Schema中的uuid)、分区字段(地区/国家/城市)和组合逻辑(Schema中的ts),以确保每个分区内的行程记录是唯一的。有关更多信息,请参阅Hudi中存储的数据建模,以及有关将数据导入Hudi的方法,请参阅编写Hudi表。在这里,我们使用默认的写操作:upsert。如果您的工作负载没有更新操作,您还可以使用insert或bulk_insert操作,这可能更快。要了解更多信息,请参阅写操作。

4.3 spark-sql方式

# Spark 3.3
spark-sql --packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.12.3 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'

create table hudi_cow_pt_tbl (
id bigint,
name string,
ts bigint,
dt string,
hh string
) using hudi
tblproperties (
type = 'cow',
primaryKey = 'id',
preCombineField = 'ts'
)
partitioned by (dt, hh)
location 'obs://bigdata-test1233/tmp/tmp/hudi/hudi_cow_pt_tbl'; 

spark-sql (default)> show create table hudi_cow_nonpcf_tbl;
createtab_stmt
CREATE TABLE default.hudi_cow_nonpcf_tbl (
  _hoodie_commit_time STRING,
  _hoodie_commit_seqno STRING,
  _hoodie_record_key STRING,
  _hoodie_partition_path STRING,
  _hoodie_file_name STRING,
  uuid INT,
  name STRING,
  price DOUBLE)
USING hudi
LOCATION 'obs://bigdata-test1233/tmp/tmp/hudi/hudi_cow_pt_tbl'
TBLPROPERTIES (
  'primaryKey' = 'uuid',
  'type' = 'cow')
 

insert into hudi_cow_nonpcf_tbl select 1, 'a1', 20; 

 

 

4.4 IDEA集成

4.5 DeltaStreamer导入工具

4.6 并发控制

4.7 常规调优

5. 集成Flink

5.1 环境准备

5.2 sql-client方式

5.3 IDEA编码方式

5.4 类型映射

5.5 核心参数设置

5.6 内存优化

5.7 读取方式

5.8 限流

5.9 写入方式

5.10 写入模式

5.11 Bucket索引

5.12 Hudi Catalog

5.13 离线Compaction

5.14 离线Clustering

5.15 常见基础问题

5.16 核心原理分析

6. 集成Hive

6.1 集成步骤

6.2 Hive 同步

6.3 Flink使用HiveCatalog

6.4 创建Hive 外表

6.5 查询Hive外表

6.6 Hive sync tool

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/889186.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

windows下 mysql开启 binlog日志

一、查看是否开启 binlog -- 方式一 show binary logs;-- 方式二 show VARIABLES like log_bin 说明没有开启 方式一 &#xff1a;you are not using binary logging 方式二&#xff1a;log_bin off 二、编辑 my.ini 配置文件 默认安装地点位于&#xff1a;C:\ProgramDat…

本题要求采用选择法排序,将给定的n个整数从大到小排序后输出。

#include <stdio.h> #define MAXN 10 int main() { int i, index, k, n, temp; int a[MAXN]; scanf("%d", &n); for (i 0; i < n; i) { scanf("%d", &a[i]); } // 外层循环控制排序轮数&#xff0c;一共需要n-1轮 for (k 0; k < n…

Vue.js的生命周期

Vue.js 是一个构建用户界面的渐进式框架&#xff0c;它提供了一个响应式和组件化的方式来构建前端应用。了解 Vue 的生命周期对于开发者来说至关重要&#xff0c;因为它可以帮助我们更好地控制组件的状态和行为。本文将详细介绍 Vue 的生命周期&#xff0c;并提供相应的代码示例…

Java-22 深入浅出 MyBatis - 手写ORM框架3 手写SqlSession、Executor 工作原理

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; 大数据篇正在更新&#xff01;https://blog.csdn.net/w776341482/category_12713819.html 目前已经更新到了&#xff1a; MyBatis&#xff…

Android 逆向/反编译/Hook修改应用行为 基础实现

前言&#xff1a;本文通过一个简单的情景案例实现安卓逆向的基本操作 一、情景描述 本文通过一个简单的情景案例来实现安卓逆向的基本操作。在这个案例中所使用的项目程序是我自己的Demo程序&#xff0c;不会造成任何的财产侵害&#xff0c;本文仅作为日常记录及案例分享。实…

IDEA创建Spring Boot项目配置阿里云Spring Initializr Server URL【详细教程-轻松学会】

1.首先打开idea选择新建项目 2.选择Spring Boot框架(就是选择Spring Initializr这个) 3.点击中间界面Server URL后面的三个点更换为阿里云的Server URL Idea中默认的Server URL地址&#xff1a;https://start.spring.io/ 修改为阿里云Server URL地址&#xff1a;https://star…

基于MATLAB的信号处理工具:信号分析器

信号&#xff08;或时间序列&#xff09;是与特定时间相关的一系列数字或测量值&#xff0c;不同的行业和学科将这一与时间相关的数字序列称为信号或时间序列。生物医学或电气工程师会将其称为信号&#xff0c;而统计学家或金融定量分析师会使用时间序列这一术语。例如&#xf…

Plugin - 插件开发03_Spring Boot动态插件化与热加载

文章目录 Pre方案概览使用插件的好处流程CodePlugin 定义Plugin 实现Plugin 使用方动态加载插件类加载器注册与卸载插件配置文件启动类测试验证 小结 Pre 插件 - 通过SPI方式实现插件管理 插件 - 一份配置&#xff0c;离插件机制只有一步之遥 插件 - 插件机制触手可及 Plug…

ECharts柱状图-阶梯瀑布图,附视频讲解与代码下载

引言&#xff1a; 在数据可视化的世界里&#xff0c;ECharts凭借其丰富的图表类型和强大的配置能力&#xff0c;成为了众多开发者的首选。今天&#xff0c;我将带大家一起实现一个柱状图图表&#xff0c;通过该图表我们可以直观地展示和分析数据。此外&#xff0c;我还将提供…

【Hash Function and HashMap】

散列函数&#xff08;Hash Function&#xff09;是一种将任意大小的数据映射到固定大小值的函数。在 HashMap 中&#xff0c;它扮演着核心角色。让我详细解释&#xff1a; 散列函数基本原理 输入&#xff1a;任意类型的键&#xff08;key&#xff09;输出&#xff1a;固定大小…

【jvm】为什么要有GC

目录 1. 自动内存管理2. 提升程序稳定性3. 优化性能4. 跨平台能力5. 分代回收策略 1. 自动内存管理 1.JVM中的GC机制负责自动管理内存&#xff0c;这意味着开发人员不需要手动分配和释放内存。2.这一特性大大简化了Java程序的内存管理&#xff0c;降低了内存泄漏和内存溢出等问…

Python泛型编程:TypeVar和Generic详解 - 写给初学者的指南

Python泛型编程&#xff1a;TypeVar和Generic详解 - 写给初学者的指南 前言1. 为什么需要泛型&#xff1f;2. TypeVar&#xff1a;定义泛型类型变量3. Generic&#xff1a;创建泛型类4. 多个泛型类型变量5. 使用场景小结结语 前言 大家好&#xff01;今天我们来聊一聊Python中…

COUNT(*)、COUNT(1)、COUNT(某一列)的区别是什么?哪个性能更好

一些特殊情况&#xff1a; 有索引时&#xff1a;如果查询使用了索引&#xff0c;且查询的列在索引中&#xff0c;COUNT(某一列) 可能在某些情况下会比较快&#xff0c;因为数据库只需要扫描索引&#xff0c;而不需要扫描整个表。有 NULL 值时&#xff1a;COUNT(某一列) 可能会…

C/C++流星雨

系列文章 序号直达链接1C/C爱心代码2C/C跳动的爱心3C/C李峋同款跳动的爱心代码4C/C满屏飘字表白代码5C/C大雪纷飞代码6C/C烟花代码7C/C黑客帝国同款字母雨8C/C樱花树代码9C/C奥特曼代码10C/C精美圣诞树11C/C俄罗斯方块12C/C贪吃蛇13C/C孤单又灿烂的神-鬼怪14C/C闪烁的爱心15C/C…

【机器学习】——K均值聚类:揭开数据背后的隐藏结构

目录 引言&#xff1a;什么是聚类分析&#xff1f;K均值聚类的基本原理 2.1 聚类的概念2.2 K均值聚类简介 K均值算法的工作原理 3.1 初始化与选定K值3.2 计算距离与分配簇3.3 更新质心3.4 迭代与收敛 K均值聚类的优缺点 4.1 优点4.2 缺点与局限性 K均值聚类的常见应用 5.1 市场…

【WRF-Urban】SLUCM新增空间分布城市冠层参数及人为热排放AHF代码详解(下)

目录 详细解释更改文件内容4 运行模块(run):README.namelist5 输出模块(share):share/module_check_a_mundo.Fshare/output_wrf.F参考SLUCM新增空间分布城市冠层参数及人为热排放AHF代码详解的前两部分内容可参见-【WRF-Urban】SLUCM新增空间分布城市冠层参数及人为热排放A…

go 集成nacos注册中心、配置中心

使用限制 Go>v1.15 Nacos>2.x 安装 使用go get安装SDK&#xff1a; go get -u github.com/nacos-group/nacos-sdk-go/v2 快速使用 初始化客户端配置ClientConfig constant.ClientConfig{TimeoutMs uint64 // 请求Nacos服务端的超时时间&#xff0c;默…

ModelScope-Agent(1): 基于开源大语言模型的可定制Agent系统

目录 简介快速入门 简介 github地址 快速入门 看前两篇&#xff0c;调用千问API和天气API # 选用RolePlay 配置agent from modelscope_agent.agents.role_play import RolePlay # NOQArole_template 你扮演一个天气预报助手&#xff0c;你需要查询相应地区的天气&#x…

终端中运行 conda install 命令后一直显示“Solving environment: \ ”

初步接触深度学习&#xff0c;在配置环境方面出了点问题&#xff0c;运行 conda install 命令时&#xff0c;卡在 "Solving environment: \ "。 网上搜索发现&#xff0c; 一般可能的原因就是以下几种 环境解析耗时&#xff1a; Conda 在安装包时需要解析当前环境&…

Jenkins相关的Api接口调用详解

Jenkins API是Jenkins持续集成和持续部署(CI/CD)平台提供的一组接口,允许外部程序通过HTTP请求与Jenkins进行交互。以下是对Jenkins API使用的简介: 一、Jenkins API的主要功能 作业管理:通过API,可以创建、配置、删除以及查询作业(Job)。构建触发:可以远程触发新的构…