iceberg 是一种开放的表格式管理,解决大数据数据中结构化,非结构化和半结构化不统一的问题。主要是通过对表的管理实现增删改查,同时支持历史回滚(版本旅行)等操作。下层支持hadoop,s3,对象存储,上层支持hive,spark,flink 等应用。实现在中间把两部分隔离开来,实现一种对接和数据管理的标准。有这个标准,不管是谁建的表,都可以操作和访问。比如我用spark创建表,flink去读取的时候,可以读取到数据。不存在组件不同无法识别的情况。
在idea进行pom.xml配置
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.gbicc</groupId><artifactId>bigdata</artifactId><version>1.0-SNAPSHOT</version><inceptionYear>2008</inceptionYear><properties><scala.version>2.12.18</scala.version></properties><repositories><repository><id>scala-tools.org</id><name>Scala-Tools Maven2 Repository</name><url>http://scala-tools.org/repo-releases</url></repository></repositories><pluginRepositories><pluginRepository><id>scala-tools.org</id><name>Scala-Tools Maven2 Repository</name><url>http://scala-tools.org/repo-releases</url></pluginRepository></pluginRepositories><dependencies><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.4</version><scope>test</scope></dependency><dependency><groupId>org.specs</groupId><artifactId>specs</artifactId><version>1.2.5</version><scope>test</scope></dependency><!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core --><!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core --><dependency><groupId>org.apache.iceberg</groupId><artifactId>iceberg-core</artifactId><version>1.4.2</version></dependency><dependency><groupId>io.minio</groupId><artifactId>minio</artifactId><version>8.5.7</version></dependency><!-- https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-s3 --><dependency><groupId>com.amazonaws</groupId><artifactId>aws-java-sdk-s3</artifactId><version>1.12.620</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-aws</artifactId><version>3.2.2</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>3.2.2</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-data --><dependency><groupId>org.apache.iceberg</groupId><artifactId>iceberg-data</artifactId><version>1.4.2</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.4.2</version> <!-- 根据实际情况选择版本号 --></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>3.4.2</version> <!-- 根据实际情况选择版本号 --></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.4.2</version> <!-- 根据实际情况选择版本号 --></dependency><!-- https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-spark --><dependency><groupId>org.apache.iceberg</groupId><artifactId>iceberg-spark</artifactId><version>1.4.2</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-spark-runtime-3.3 --><dependency><groupId>org.apache.iceberg</groupId><artifactId>iceberg-spark-runtime-3.4_2.12</artifactId><version>1.4.2</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.14.2</version></dependency><dependency><groupId>org.apache.iceberg</groupId><artifactId>iceberg-data</artifactId><version>1.4.2</version></dependency><dependency><groupId>com.amazonaws</groupId><artifactId>aws-java-sdk-s3</artifactId><version>1.12.620</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-aws</artifactId><version>3.2.2</version></dependency><dependency><groupId>org.apache.iceberg</groupId><artifactId>iceberg-aws</artifactId><version>1.4.2</version></dependency><dependency><groupId>com.amazonaws</groupId><artifactId>aws-java-sdk-bundle</artifactId><version>1.11.375</version></dependency><dependency><groupId>org.apache.iceberg</groupId><artifactId>iceberg-parquet</artifactId><version>1.4.2</version></dependency><dependency><groupId>io.delta</groupId><artifactId>delta-core_2.12</artifactId><version>2.4.0</version></dependency><dependency><groupId>io.delta</groupId><artifactId>delta-spark_2.12</artifactId><version>3.0.0</version></dependency></dependencies><reporting><plugins><plugin><groupId>org.scala-tools</groupId><artifactId>maven-scala-plugin</artifactId><configuration><scalaVersion>${scala.version}</scalaVersion></configuration></plugin></plugins></reporting>
</project>
下面进行代码编写
package org.icebergtestimport org.apache.iceberg.{PartitionSpec, Schema}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.iceberg.catalog.TableIdentifier
import org.apache.iceberg.spark.SparkSchemaUtil
import org.apache.iceberg.types.Types
import org.apache.spark.sql.types._
import org.apache.iceberg._
import org.apache.iceberg.catalog.TableIdentifier
import org.apache.iceberg.data.GenericRecord
import org.apache.iceberg.types.{Types => _, _}
object icebergspark {def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession.builder().master("local").appName("test")/* .config("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider").config("spark.hadoop.fs.s3a.access.key", "minioadmin").config("spark.hadoop.fs.s3a.secret.key", "minioadmin").config("spark.hadoop.fs.s3a.endpoint", "http://127.0.0.1:9000").config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false").config("spark.hadoop.fs.s3a.path.style.access", "true").config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem").config("spark.debug.maxToStringFields", "2048")*/.config("spark.hadoop.fs.s3a.access.key", "minioadmin").config("spark.hadoop.fs.s3a.secret.key", "minioadmin").config("spark.hadoop.spark.hadoop.fs.s3a.endpoint", "http://127.0.0.1:9000").config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false").config("spark.hadoop.fs.s3a.path.style.access", "true").config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem").config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider").config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")//指定hadoop catalog,catalog名称为hadoop_prod.config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog").config("spark.sql.catalog.hadoop_prod.type", "hadoop").config("spark.sql.catalog.hadoop_prod.hadoop.fs.s3a.access.key", "minioadmin").config("spark.sql.catalog.hadoop_prod.hadoop.fs.s3a.secret.key", "minioadmin").config("spark.sql.catalog.hadoop_prod.hadoop.fs.s3a.endpoint", "http://127.0.0.1:9000").config("spark.sql.catalog.hadoop_prod.warehouse", "s3a://test1/").config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions").getOrCreate()import org.apache.iceberg.spark.SparkSessionCatalog// 将 Iceberg 的 SparkSessionCatalog 注册到 Spark 中// 将 Iceberg 的 SparkSessionCatalog 注册到 Spark 中// 将 Iceberg 的 SparkSessionCatalog 注册到 Spark 中//1.创建Iceberg表,并插入数据//spark.sql("create table hadoop_prod.mydb.mytest (id int,name string,age int) using iceberg".stripMargin)spark.sql("""|insert into hadoop_prod.mydb.mytest values (1,"zs",18),(2,"ls",19),(3,"ww",20)""".stripMargin)//1.SQL 方式读取Iceberg中的数据// spark.sql("select * from hadoop_prod.mydb.mytest").show()spark.sql("""|select * from hadoop_prod.mydb.mytest VERSION AS OF 4696493712637386339;""".stripMargin).show()/*** 2.使用Spark查询Iceberg中的表除了使用sql 方式之外,还可以使用DataFrame方式,建议使用SQL方式*///第一种方式使用DataFrame方式查询Iceberg表数据snapshots,history,manifests,filesval frame1: DataFrame = spark.table("hadoop_prod.mydb.mytest.snapshots")frame1.show()val frame2: DataFrame = spark.table("hadoop_prod.mydb.mytest.history")frame2.show()// spark.read.option("snapshot-id","4696493712637386339"). format("iceberg").load("3a://test/mydb/mytest")//第二种方式使用DataFrame加载 Iceberg表数据val frame3: DataFrame = spark.read.format("iceberg").load("hadoop_prod.mydb.mytest")frame3.show()}
}
通过上面的例子,直接复制执行