CDP集成Hudi实战-spark shell

[〇]关于本文

本文主要解释spark shell操作Hudi表的案例

软件版本
Hudi1.0.0
Hadoop Version3.1.1.7.3.1.0-197
Hive Version3.1.3000.7.3.1.0-197
Spark Version3.4.1.7.3.1.0-197
CDP7.3.1

[一]使用Spark-shell

1-配置hudi Jar包

[root@cdp73-1 ~]# for i in $(seq 1 6); do scp /opt/software/hudi-1.0.0/packaging/hudi-spark-bundle/target/hudi-spark3.4-bundle_2.12-1.0.0.jar   cdp73-$i:/opt/cloudera/parcels/CDH/lib/spark3/jars/; done
hudi-spark3.4-bundle_2.12-1.0.0.jar                                                                                                                                                                                          100%  105MB 418.2MB/s   00:00
hudi-spark3.4-bundle_2.12-1.0.0.jar                                                                                                                                                                                          100%  105MB 304.8MB/s   00:00
hudi-spark3.4-bundle_2.12-1.0.0.jar                                                                                                                                                                                          100%  105MB 365.0MB/s   00:00
hudi-spark3.4-bundle_2.12-1.0.0.jar                                                                                                                                                                                          100%  105MB 406.1MB/s   00:00
hudi-spark3.4-bundle_2.12-1.0.0.jar                                                                                                                                                                                          100%  105MB 472.7MB/s   00:00
hudi-spark3.4-bundle_2.12-1.0.0.jar                                                                                                                                                                                          100%  105MB 447.1MB/s   00:00
[root@cdp73-1 ~]#

2-进入Spark-shell

spark-shell --packages org.apache.hudi:hudi-spark3.4-bundle_2.12:1.0.0 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'

3-初始化项目

// spark-shell
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.table.HoodieTableConfig._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.keygen.constant.KeyGeneratorOptions._
import org.apache.hudi.common.model.HoodieRecord
import spark.implicits._val tableName = "trips_table"
val basePath = "hdfs:///tmp/trips_table"

4-创建表

首次提交将自动初始化表,如果指定的基本路径中尚不存在该表。

5-导入数据

// spark-shell
val columns = Seq("ts","uuid","rider","driver","fare","city")
val data =Seq((1695159649087L,"334e26e9-8355-45cc-97c6-c31daf0df330","rider-A","driver-K",19.10,"san_francisco"),(1695091554788L,"e96c4396-3fad-413a-a942-4cb36106d721","rider-C","driver-M",27.70 ,"san_francisco"),(1695046462179L,"9909a8b1-2d15-4d3d-8ec9-efc48c536a00","rider-D","driver-L",33.90 ,"san_francisco"),(1695516137016L,"e3cf430c-889d-4015-bc98-59bdce1e530c","rider-F","driver-P",34.15,"sao_paulo"    ),(1695115999911L,"c8abbe79-8d89-47ea-b4ce-4d224bae5bfa","rider-J","driver-T",17.85,"chennai"));var inserts = spark.createDataFrame(data).toDF(columns:_*)
inserts.write.format("hudi").option("hoodie.datasource.write.partitionpath.field", "city").option("hoodie.embed.timeline.server", "false").option("hoodie.table.name", tableName).mode(Overwrite).save(basePath)

【映射到Hudi写操作】​​​​​​​Hudi提供了多种写操作——包括批量和增量写操作——以将数据写入Hudi表,这些操作具有不同的语义和性能。当未配置记录键(请参见下面的键)时,将选择bulk_insert作为写操作,这与Spark的Parquet数据源的非默认行为相匹配。

6-查询数据

// spark-shell
val tripsDF = spark.read.format("hudi").load(basePath)
tripsDF.createOrReplaceTempView("trips_table")spark.sql("SELECT uuid, fare, ts, rider, driver, city FROM  trips_table WHERE fare > 20.0").show()
spark.sql("SELECT _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare FROM  trips_table").show()

7-更新数据

// Lets read data from target Hudi table, modify fare column for rider-D and update it. 
val updatesDf = spark.read.format("hudi").load(basePath).filter($"rider" === "rider-D").withColumn("fare", col("fare") * 10)updatesDf.write.format("hudi").option("hoodie.datasource.write.operation", "upsert").option("hoodie.embed.timeline.server", "false").option("hoodie.datasource.write.partitionpath.field", "city").option("hoodie.table.name", tableName).mode(Append).save(basePath)

8-合并数据

// spark-shell
val adjustedFareDF = spark.read.format("hudi").load(basePath).limit(2).withColumn("fare", col("fare") * 10)
adjustedFareDF.write.format("hudi").
option("hoodie.embed.timeline.server", "false").
mode(Append).
save(basePath)
// Notice Fare column has been updated but all other columns remain intact.
spark.read.format("hudi").load(basePath).show()

9-删除数据

/ spark-shell
// Lets  delete rider: rider-D
val deletesDF = spark.read.format("hudi").load(basePath).filter($"rider" === "rider-F")deletesDF.write.format("hudi").option("hoodie.datasource.write.operation", "delete").option("hoodie.datasource.write.partitionpath.field", "city").option("hoodie.table.name", tableName).option("hoodie.embed.timeline.server", "false").mode(Append).save(basePath)

​​​​​​​

10-数据索引

import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.table.HoodieTableConfig._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.keygen.constant.KeyGeneratorOptions._
import org.apache.hudi.common.model.HoodieRecord
import spark.implicits._val tableName = "trips_table_index"
val basePath = "hdfs:///tmp/hudi_indexed_table"val columns = Seq("ts","uuid","rider","driver","fare","city")
val data =Seq((1695159649087L,"334e26e9-8355-45cc-97c6-c31daf0df330","rider-A","driver-K",19.10,"san_francisco"),(1695091554788L,"e96c4396-3fad-413a-a942-4cb36106d721","rider-C","driver-M",27.70 ,"san_francisco"),(1695046462179L,"9909a8b1-2d15-4d3d-8ec9-efc48c536a00","rider-D","driver-L",33.90 ,"san_francisco"),(1695516137016L,"e3cf430c-889d-4015-bc98-59bdce1e530c","rider-F","driver-P",34.15,"sao_paulo"    ),(1695115999911L,"c8abbe79-8d89-47ea-b4ce-4d224bae5bfa","rider-J","driver-T",17.85,"chennai"));var inserts = spark.createDataFrame(data).toDF(columns:_*)
inserts.write.format("hudi").option("hoodie.datasource.write.partitionpath.field", "city").option("hoodie.table.name", tableName).option("hoodie.write.record.merge.mode", "COMMIT_TIME_ORDERING").option(RECORDKEY_FIELD_OPT_KEY, "uuid").option("hoodie.embed.timeline.server", "false").mode(Overwrite).save(basePath)// Create record index and secondary index for the table
spark.sql(s"CREATE TABLE hudi_indexed_table USING hudi LOCATION '$basePath'")
// Create bloom filter expression index on driver column
spark.sql(s"CREATE INDEX idx_bloom_driver ON hudi_indexed_table USING bloom_filters(driver) OPTIONS(expr='identity')");
// It would show bloom filter expression index 
spark.sql(s"SHOW INDEXES FROM hudi_indexed_table");
// Query on driver column would prune the data using the idx_bloom_driver index
spark.sql(s"SELECT uuid, rider FROM hudi_indexed_table WHERE driver = 'driver-S'");// Create column stat expression index on ts column
spark.sql(s"CREATE INDEX idx_column_ts ON hudi_indexed_table USING column_stats(ts) OPTIONS(expr='from_unixtime', format = 'yyyy-MM-dd')");
// Shows both expression indexes 
spark.sql(s"SHOW INDEXES FROM hudi_indexed_table");
// Query on ts column would prune the data using the idx_column_ts index
spark.sql(s"SELECT * FROM hudi_indexed_table WHERE from_unixtime(ts, 'yyyy-MM-dd') = '2023-09-24'");// To create secondary index, first create the record index
spark.sql(s"SET hoodie.metadata.record.index.enable=true");
spark.sql(s"CREATE INDEX record_index ON hudi_indexed_table (uuid)");
// Create secondary index on rider column
spark.sql(s"CREATE INDEX idx_rider ON hudi_indexed_table (rider)");// Expression index and secondary index should show up
spark.sql(s"SHOW INDEXES FROM hudi_indexed_table");
// Query on rider column would leverage the secondary index idx_rider
spark.sql(s"SELECT * FROM hudi_indexed_table WHERE rider = 'rider-E'");// Update a record and query the table based on indexed columns
spark.sql(s"UPDATE hudi_indexed_table SET rider = 'rider-B', driver = 'driver-N', ts = '1697516137' WHERE rider = 'rider-A'");
// Data skipping would be performed using column stat expression index
spark.sql(s"SELECT uuid, rider FROM hudi_indexed_table WHERE from_unixtime(ts, 'yyyy-MM-dd') = '2023-10-17'");
// Data skipping would be performed using bloom filter expression index
spark.sql(s"SELECT * FROM hudi_indexed_table WHERE driver = 'driver-N'");
// Data skipping would be performed using secondary index
spark.sql(s"SELECT * FROM hudi_indexed_table WHERE rider = 'rider-B'");// Drop all the indexes
spark.sql(s"DROP INDEX secondary_index_idx_rider on hudi_indexed_table");
spark.sql(s"DROP INDEX record_index on hudi_indexed_table");
spark.sql(s"DROP INDEX expr_index_idx_bloom_driver on hudi_indexed_table");
spark.sql(s"DROP INDEX expr_index_idx_column_ts on hudi_indexed_table");
// No indexes should show up for the table
spark.sql(s"SHOW INDEXES FROM hudi_indexed_table");spark.sql(s"SET hoodie.metadata.record.index.enable=false");

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/66371.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python爬虫基础——百度新闻页面结构剖析

经过上一篇文章文章[Python爬虫基础——认识网页结构(各种标签的使用)]的介绍,我们对网页结构已经有了初步的认识,本篇文章针对百度新闻界界面结构进行剖析。 在浏览器地址栏中输入https://news.baidu.com/,然后按住F12打开发这工具在“Eleme…

【老白学 Java】保存 / 恢复对象状态

保存、恢复对象状态 文章来源:《Head First Java》修炼感悟。 上两篇文章分别讨论了对象序列化和反序列化,主要是针对数据文件进行读、写操作的介绍。 本篇文章通过一个完整的例子,复习一下对象保存与恢复的操作步骤,在文章最后做…

进程间通信——网络通信——UDP

进程间通信(分类):网络通信、无名管道、有名管道、信号、消息队列、共享内存、信号量集 OSI七层模型:(理论模型) 应用层 : 要传输的数据信息,如文件传输,电子邮件等 表示层 : 数…

3272 小蓝的漆房

将devc设置支持编译就能用新的遍历方式 for(auto &x : s)//遍历容器s,变量为x /* 多循环的嵌套: 计数是否需要重置为0; 是否因为ans定义成全局变量导致ans在比较多时候会出现错误*/ /* 1.对于一个标准色,对目标数组遍历, 如…

海外云服务器能用来做什么?

海外云服务器不仅服务种类繁多,而且能满足多行业的需求,方便了越来越多的企业与个人。本文将探讨海外云服务器的核心服务及其适用领域,帮助企业更好地了解这一技术资源。 云存储:安全高效的数据管理 海外云服务器为用户提供了稳定…

导出中心设计

业务背景 应用业务经常需要导出数据,但是并发的导出以及不合理的导出参数常常导致应用服务的内存溢出、其他依赖应用的崩溃、导出失败;因此才有导出中心的设计 设计思想 将导出应用所需的内存转移至导出中心,将导出的条数加以限制&#xf…

Re77 读论文:LoRA: Low-Rank Adaptation of Large Language Models

诸神缄默不语-个人CSDN博文目录 诸神缄默不语的论文阅读笔记和分类 论文全名:LoRA: Low-Rank Adaptation of Large Language Models ArXiv网址:https://arxiv.org/abs/2106.09685 官方GitHub网站(包含在RoBERTa、DeBERTa、GPT-2上用Lora微调…

Redis 数据库源码分析

Redis 数据库源码分析 我们都知道Redis是一个 <key,value> 的键值数据库&#xff0c;其实也就是一个 Map。如果让我来实现这样一个 Map&#xff0c;我肯定是用数组&#xff0c;当一个 key 来的时候&#xff0c;首先进行 hash 运算&#xff0c;接着对数据的 length 取余&…

我的nvim的init.lua配置

nvim的配置文件路径在&#xff5e;/.config/nvim路径下&#xff1a; 一、目录如下&#xff1a; coc-settings.json文件是配置代码片段路径的文件init.lua配置文件的启动脚本lua/config.lua 全局配置文件lua/keymaps.lua 快捷键映射键文件lua/plugins.lua 插件的安装和配置文件…

权限掩码umask

1 、 设置新建文件或目录的默认权限 在 Linux 系统中&#xff0c;当用户创建一个新的文件或目录时&#xff0c;系统都会为新建的文件或目录分配默认的权限&#xff0c;该默认权限与umask 值有关&#xff0c;其具体关系是&#xff1a; 新建文件的默认权限 0666-umask 值 新建…

宝安湾区之光附近的钓鱼点

工作日的午休我经常在公司附近骑行&#xff0c;有时候也会骑行到宝安的湾区之光。但是我最感兴趣的除了湾区之光摩天轮&#xff0c;还有雷打不动的快乐钓鱼佬。 上图红框区域的河岸每天都会出现零零散散的快乐钓鱼佬&#xff0c;他们好像都有自己的钓鱼窝点。我发现来这里钓鱼也…

音视频-----RTSP协议 音视频编解码

流媒体协议详解&#xff1a;RTSP、RTP、RTCP、SIP、SDP、RTMP、WebRTC、WebSocket-CSDN博客 上文讲解比较清楚 多媒体编解码基础知识 一文详解WebRTC、RTSP、RTMP、SRT-腾讯云开发者社区-腾讯云 流媒体协议简介 &#xff08;整理&总结&#xff09;-CSDN博客 RTP :(Real-…

家教老师预约平台小程序系统开发方案

家教老师预约平台小程序系统将连接学生/家长与家教老师&#xff0c;提供一站式的家教服务预约体验。 一、用户需求分析1、家教老师&#xff1a;希望获得更多的学生资源&#xff0c;通过平台展示自己的教学特长和经验&#xff0c;管理个人日程&#xff0c;接收并确认预约请求&a…

Linux 系统安装 NCBI Blast + A Quick Guide

前言 NCBI BLAST&#xff08;Basic Local Alignment Search Tool&#xff09;是由美国国家生物技术信息中心&#xff08;NCBI&#xff09;开发的一个深受生物信息学研究者青睐的基因序列比对工具。作为生物序列信息比对的行业标准&#xff0c;BLAST可用于分析核酸&#xff08;…

嵌入式科普(26)为什么heap通常8字节对齐

目录 一、概述 二、newlibc heap 2.1 stm32cubeide .ld heap 2.2 e2studio .ld heap 三、glibc源码 3.1 Ubuntu c heap 四、总结 一、概述 结论&#xff1a;在嵌入式c语言中&#xff0c;heap通常8字节对齐 本文主要分析这个问题的分析过程 二、newlibc heap newlibc…

nginx学习之路-nginx配置https服务器

文章目录 1. 生成证书2. 配置证书1. 拷贝证书文件2. 修改conf/nginx.conf文件内容 3. 查看效果1. 重载配置2. 访问 1. 生成证书 在linux系统下执行&#xff0c;使用openssl命令。&#xff08;windows环境也可以使用cmder&#xff09; # 1. 生成私钥 server2025.key(无密码保护…

VulnHub—potato-suncs

使用命令扫描靶机ip arp-scan -l 尝试访问一下ip 发现一个大土豆没什么用 尝试扫描一下子域名 没有发现什么有用的信息 尝试扫描端口 namp -A 192.168.19.137 -p- 尝试访问一下端口,发现都访问不进去 查看源代码发现了网页的标题 potato&#xff0c;就想着爆破一下密码 hydr…

【AI部署】腾讯云每月1w小时免费GPU获取

一、如何进入活动页面 进入腾讯云官网&#xff0c;点击控制台&#xff1a; https://curl.qcloud.com/zl1rLuMf 点击工具&#xff0c;进入CloudStudio&#xff1a; 找到高性能工作空间&#xff0c;每月会有1w分钟的免费时长&#xff1a; 二、创建AI模版 点击直接创建 选择…

开源平台Kubernetes的优势是什么?

Kubernetes 是一个可移植、可扩展的开源平台&#xff0c;用于管理容器化的工作负载和服务&#xff0c;方便进行声明式配置和自动化。Kubernetes 拥有一个庞大且快速增长的生态系统&#xff0c;其服务、支持和工具的使用范围广泛。 Kubernetes 这个名字源于希腊语&#xff0c;意…

INT305 Machine Learning

W1 Introduction Nearest Neighbor Preliminaries and Nearest Neighbor Methods • Suppose we’re given a novel input vector &#x1d465; we’d like to classify. • The idea: find the nearest input vector to &#x1d465; in the training set and copy …