iceberg1.4.2 +minio通过spark创建表,插入数据

iceberg 是一种开放的表格式管理,解决大数据数据中结构化,非结构化和半结构化不统一的问题。主要是通过对表的管理实现增删改查,同时支持历史回滚(版本旅行)等操作。下层支持hadoop,s3,对象存储,上层支持hive,spark,flink 等应用。实现在中间把两部分隔离开来,实现一种对接和数据管理的标准。有这个标准,不管是谁建的表,都可以操作和访问。比如我用spark创建表,flink去读取的时候,可以读取到数据。不存在组件不同无法识别的情况。

在idea进行pom.xml配置

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.gbicc</groupId><artifactId>bigdata</artifactId><version>1.0-SNAPSHOT</version><inceptionYear>2008</inceptionYear><properties><scala.version>2.12.18</scala.version></properties><repositories><repository><id>scala-tools.org</id><name>Scala-Tools Maven2 Repository</name><url>http://scala-tools.org/repo-releases</url></repository></repositories><pluginRepositories><pluginRepository><id>scala-tools.org</id><name>Scala-Tools Maven2 Repository</name><url>http://scala-tools.org/repo-releases</url></pluginRepository></pluginRepositories><dependencies><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.4</version><scope>test</scope></dependency><dependency><groupId>org.specs</groupId><artifactId>specs</artifactId><version>1.2.5</version><scope>test</scope></dependency><!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core --><!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core --><dependency><groupId>org.apache.iceberg</groupId><artifactId>iceberg-core</artifactId><version>1.4.2</version></dependency><dependency><groupId>io.minio</groupId><artifactId>minio</artifactId><version>8.5.7</version></dependency><!-- https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-s3 --><dependency><groupId>com.amazonaws</groupId><artifactId>aws-java-sdk-s3</artifactId><version>1.12.620</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-aws</artifactId><version>3.2.2</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>3.2.2</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-data --><dependency><groupId>org.apache.iceberg</groupId><artifactId>iceberg-data</artifactId><version>1.4.2</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.4.2</version> <!-- 根据实际情况选择版本号 --></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>3.4.2</version> <!-- 根据实际情况选择版本号 --></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.4.2</version> <!-- 根据实际情况选择版本号 --></dependency><!-- https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-spark --><dependency><groupId>org.apache.iceberg</groupId><artifactId>iceberg-spark</artifactId><version>1.4.2</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-spark-runtime-3.3 --><dependency><groupId>org.apache.iceberg</groupId><artifactId>iceberg-spark-runtime-3.4_2.12</artifactId><version>1.4.2</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.14.2</version></dependency><dependency><groupId>org.apache.iceberg</groupId><artifactId>iceberg-data</artifactId><version>1.4.2</version></dependency><dependency><groupId>com.amazonaws</groupId><artifactId>aws-java-sdk-s3</artifactId><version>1.12.620</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-aws</artifactId><version>3.2.2</version></dependency><dependency><groupId>org.apache.iceberg</groupId><artifactId>iceberg-aws</artifactId><version>1.4.2</version></dependency><dependency><groupId>com.amazonaws</groupId><artifactId>aws-java-sdk-bundle</artifactId><version>1.11.375</version></dependency><dependency><groupId>org.apache.iceberg</groupId><artifactId>iceberg-parquet</artifactId><version>1.4.2</version></dependency><dependency><groupId>io.delta</groupId><artifactId>delta-core_2.12</artifactId><version>2.4.0</version></dependency><dependency><groupId>io.delta</groupId><artifactId>delta-spark_2.12</artifactId><version>3.0.0</version></dependency></dependencies><reporting><plugins><plugin><groupId>org.scala-tools</groupId><artifactId>maven-scala-plugin</artifactId><configuration><scalaVersion>${scala.version}</scalaVersion></configuration></plugin></plugins></reporting>
</project>

下面进行代码编写

package org.icebergtestimport org.apache.iceberg.{PartitionSpec, Schema}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.iceberg.catalog.TableIdentifier
import org.apache.iceberg.spark.SparkSchemaUtil
import org.apache.iceberg.types.Types
import org.apache.spark.sql.types._
import org.apache.iceberg._
import org.apache.iceberg.catalog.TableIdentifier
import org.apache.iceberg.data.GenericRecord
import org.apache.iceberg.types.{Types => _, _}
object icebergspark {def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession.builder().master("local").appName("test")/* .config("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider").config("spark.hadoop.fs.s3a.access.key", "minioadmin").config("spark.hadoop.fs.s3a.secret.key", "minioadmin").config("spark.hadoop.fs.s3a.endpoint", "http://127.0.0.1:9000").config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false").config("spark.hadoop.fs.s3a.path.style.access", "true").config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem").config("spark.debug.maxToStringFields", "2048")*/.config("spark.hadoop.fs.s3a.access.key", "minioadmin").config("spark.hadoop.fs.s3a.secret.key", "minioadmin").config("spark.hadoop.spark.hadoop.fs.s3a.endpoint", "http://127.0.0.1:9000").config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false").config("spark.hadoop.fs.s3a.path.style.access", "true").config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem").config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider").config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")//指定hadoop catalog,catalog名称为hadoop_prod.config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog").config("spark.sql.catalog.hadoop_prod.type", "hadoop").config("spark.sql.catalog.hadoop_prod.hadoop.fs.s3a.access.key", "minioadmin").config("spark.sql.catalog.hadoop_prod.hadoop.fs.s3a.secret.key", "minioadmin").config("spark.sql.catalog.hadoop_prod.hadoop.fs.s3a.endpoint", "http://127.0.0.1:9000").config("spark.sql.catalog.hadoop_prod.warehouse", "s3a://test1/").config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions").getOrCreate()import org.apache.iceberg.spark.SparkSessionCatalog// 将 Iceberg 的 SparkSessionCatalog 注册到 Spark 中// 将 Iceberg 的 SparkSessionCatalog 注册到 Spark 中// 将 Iceberg 的 SparkSessionCatalog 注册到 Spark 中//1.创建Iceberg表,并插入数据//spark.sql("create table hadoop_prod.mydb.mytest (id int,name string,age int) using iceberg".stripMargin)spark.sql("""|insert into hadoop_prod.mydb.mytest values (1,"zs",18),(2,"ls",19),(3,"ww",20)""".stripMargin)//1.SQL 方式读取Iceberg中的数据// spark.sql("select * from hadoop_prod.mydb.mytest").show()spark.sql("""|select * from hadoop_prod.mydb.mytest VERSION AS OF 4696493712637386339;""".stripMargin).show()/*** 2.使用Spark查询Iceberg中的表除了使用sql 方式之外,还可以使用DataFrame方式,建议使用SQL方式*///第一种方式使用DataFrame方式查询Iceberg表数据snapshots,history,manifests,filesval frame1: DataFrame = spark.table("hadoop_prod.mydb.mytest.snapshots")frame1.show()val frame2: DataFrame = spark.table("hadoop_prod.mydb.mytest.history")frame2.show()// spark.read.option("snapshot-id","4696493712637386339"). format("iceberg").load("3a://test/mydb/mytest")//第二种方式使用DataFrame加载 Iceberg表数据val frame3: DataFrame = spark.read.format("iceberg").load("hadoop_prod.mydb.mytest")frame3.show()}
}

通过上面的例子,直接复制执行

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/579304.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

浅谈locust 性能压测使用

1. 基本介绍 Locust是一个开源的负载测试工具&#xff0c;用于模拟大量用户并发访问一个系统或服务&#xff0c;以评估其性能和稳定性。编写语言为Python&#xff0c;可通过Python来自定义构建性能压测场景脚本。Locust支持分 布式负载测试&#xff0c;可以通过多个机器协同工…

基于 Element UI 适用于 Vue 2 版本的虚拟列表选择器组件el-select

背景&#xff1a;在某些使用情况下&#xff0c;单个选择器可能最终加载数万行数据。 将这么多的数据渲染至 DOM 中可能会给浏览器带来负担&#xff0c;从而造成性能问题。 ——vue3element-plus有现成的轮子。而vue2element-ui没有。 以下 文章大部分 摘自 源组件中的README.md…

韩版传奇 2 源码分析与 Unity 重制(一)服务端 TCP 状态管理

专题介绍 该专题将会分析 LOMCN 基于韩版传奇 2,使用 .NET 重写的传奇源码(服务端 + 客户端),分析数据交互、状态管理和客户端渲染等技术,此外笔者还会分享将客户端部分移植到 Unity 和服务端用现代编程语言重写的全过程。 相关资料 官方论坛: https://www.lomcn.org/fo…

张江智荟毁约offer

毕业8年后&#xff0c;找工作被国企歧视学历&#xff01;已经收到了offer&#xff0c;在入职前一周被通知要撤回offer&#xff0c;拒绝录用&#xff0c;理由居然是他们只要本科211以上的人 这是我今天&#xff08;2023-12-26&#xff09;亲身经历的事&#xff0c;听说过面试前…

【树莓派4b的uboot编译移植】

树莓派4b的uboot编译移植 引言 0.1、什么是uboot OS跑起来前&#xff0c;需要的一段引导程序负责部署整个计算机系统&#xff0c;引导操作系统内核启动并给内核传参提供一个命令行界面供人操作是一个开源项目&#xff0c;uboot就是universal bootloader&#xff08;通用的启…

orangepi——蜂鸣器简单应用和首行缩进设置

1.修改缩进 命令&#xff1a;sudo vi /etc/vim/vimrc&#xff1a;用超级用户进入修改页面 修改&#xff1a; set tabstop4 set shifwidth4 set nu 再重新打开vi界面&#xff0c;可以看到有了行显示&#xff0c;缩进为4字符 2.蜂鸣器 进入到解压的wiringOP-next中&#xf…

词法语法语义分析程序设计及实现,包含出错提示和错误恢复

词法说明 (1)关键字 main, int, char, if, else, for, while, void (2)运算符 - * / < < > > ! (3)界符 ; ( ) { } (4)标识符 ID letter(letter|digit)* (5)整型常数 NUM digit digit* (6)空格 ‘ ‘ ‘\n’ ‘\r’ ‘\t’ 空格用来分隔ID,NUM,运算符,界…

Matlab实时读取串口数据并实时画图方法

** Matlab实时读取串口数据并实时画图方法 ** 按照数据串口协议如&#xff1a;$KT2,1.80,88.18,39.54,42.86,LO[0.72,-1.04,0.35]&#xff0c;举例。 s serialport("COM12",115200,"Timeout",5); poszeros(100000,3); j1; data1 read(s,1,"uint8&…

正则表达式:元字符

一、什么事元字符 正则是由一系列的元字符组成的&#xff0c;所谓元字符就是指那些在正则表达式中具有特殊意义的专用字符&#xff0c;元字符是构成正则表达式的基本元件。 二、元字符的分类 1.特殊单字符 表达式含义\d匹配任意一个数字\D匹配任意非数字\w匹配任意一个字母、…

面试经典150题(50-53)

leetcode 150道题 计划花两个月时候刷完&#xff0c;今天&#xff08;第二十二天&#xff09;完成了4道(50-53)150&#xff1a; 50.&#xff08;141. 环形链表&#xff09;题目描述&#xff1a; 给你一个链表的头节点 head &#xff0c;判断链表中是否有环。 如果链表中有某个…

算法练习Day20 (Leetcode/Python-回溯算法)

虽然看似进入了一个新章节&#xff0c;但其实还是前几天二叉树章节的延续。。 回溯算法 &#xff08;以下内容摘抄自代码随想录&#xff09;&#xff1a; 回溯法解决的问题都可以抽象为树形结构&#xff0c;是的&#xff0c;我指的是所有回溯法的问题都可以抽象为树形结构&…

云原生Kubernetes:K8S集群版本升级(v1.22.14 - v1.23.14)

目录 一、理论 1.K8S集群升级 2.环境 3.升级集群&#xff08;v1.23.14&#xff09; 4.验证集群&#xff08;v1.23.14&#xff09; 二、实验 1. 环境 2.升级集群&#xff08;v1.23.14&#xff09; 2.验证集群&#xff08;v1.23.14&#xff09; 一、理论 1.K8S集群升级 …

详解视频美颜SDK:算法优化与性能提升

众所周知&#xff0c;视频美颜SDK的算法优化和性能提升至关重要。下文小编将与大家深度探讨视频美颜SDK的算法原理&#xff0c;以及近期的性能优化措施。 一、常见用法 视频美颜SDK对人脸进行识别&#xff0c;并附加适当的美颜效果。例如&#xff1a; 1.识别、关键点 2.肤色…

modbus-tcp-rtu协议图表

MODBUS TCP 读寄存器 请求 序号 意义 所占字节 字节存放格式 1 事务处理标识 2个字节 高字节在前 2 协议标识 2个字节 高字节在前 3 长度 2个字节 高字节在前 4 单元标识 1个字节 0x00-0xff 5 功能码 1个字节 0x03 6 起始寄存器地址 2个字节 高字节…

flutter项目从创建到运行,以及一些常用的命令

# 创建项目 命令行 flutter create flutter_app &#xff08;这种vsCode软件可用&#xff09; 按下ctrlshiftp&#xff0c; 输入 Flutter: New Project 选择 Application 选择项目存放位置 输入项目名字 点击 enter 完成创建 # 运行项目 1、命令行中运行&#xff1a; cd flutte…

GLES学习笔记---OpenGL绘制到ImageReader

一、ImageReader简介 ImageReader 之前经常使用在camera应用里面&#xff0c;创建一个ImageReader&#xff0c;然后获取surface&#xff0c;配流和下发request时候将surface下发给framework&#xff0c;中间具体对ImageReader做了什么没有具体研究过&#xff0c;等到Hal层came…

MYSQL数据库的备份与恢复-数据库实验七

一、实验目的 1. 了解备份和恢复的基本概念。 2. 掌握使用MySQL命令进行数据库备份的操作方法。 3. 掌握使用MySQL命令进行数据库恢复的操作方法。 二、实验内容 1. 使用mysqldump命令备份数据库studentsdb的所有表&#xff0c;存于D:\下&#xff0c;文件名为all_tables.s…

C#与php自定义数据流传输

C#与php自定义数据流传输 介绍一、客户端与服务器数据传输流程图客户端发送数据给服务器&#xff1a;服务器返回数据给客户端&#xff1a; 二、自定义数据流C#版本数据流PHP版本数据流 三、数据传输测试1.在Unity中创建一个C#脚本NetWorkManager.cs2.服务器www目录创建StreamTe…

Java基础知识:单元测试和调试技巧

在Java编程中&#xff0c;单元测试和调试是提高代码质量和开发效率的重要环节。通过单元测试&#xff0c;我们可以验证代码的正确性&#xff0c;而调试则帮助我们找出并修复代码中的错误。本文将介绍Java中的单元测试和调试技巧&#xff0c;并提供相关示例代码&#xff0c;帮助…

华清远见嵌入式学习——ARM——作业4

作业要求&#xff1a; 代码运行效果图&#xff1a; 代码&#xff1a; do_irq.c: #include "key_it.h" extern void printf(const char *fmt, ...); unsigned int i 0;//延时函数 void delay(int ms) {int i,j;for(i0;i<ms;i){for(j0;j<2000;j);} }void do_i…