本文Iceberg使用的为HiveCataLog,依赖HiveMemstore
1、首先获取要操作的表对象及SparkSession
import org.apache.iceberg.{CatalogProperties, Table}
import org.apache.iceberg.spark.actions.SparkActions
......
......
......
//获取表
val tabled: TableIdentifier = TableIdentifier.of(Namespace.of("ns"), "tb")
val table: Table = hiveCatalog.loadTable(tabled)
//获取SparkSession
val sparkConf = new SparkConf()
val sparkSession: SparkSession = SparkSession.builder().master("local").appName("iceberg").config(sparkConf).getOrCreate()
2、 合并datafiles
filter 可以指定需要操作的数据范围
option 指定合并的目标文件大小
SparkActions.get(sparkSession).rewriteDataFiles(table).filter(Expressions.lessThan("age", 1)).filter(Expressions.greaterThan("age", 10)).option("target-file-size-bytes", (128 * 1024 * 1024).toString) // 128 MB.execute()
3、合并manifest files
SparkActions.get(sparkSession).rewriteManifests(table).rewriteIf(file =>file.length() < 10 *1024*1024) // 10 MB.execute()
4、删除过期快照
- 方法一
val before: Long = System.currentTimeMillis() - (1000L * 60 * 60 * 24)
table.expireSnapshots().expireOlderThan(before).commit()
- 方法二
val before: Long = System.currentTimeMillis() - (1000L * 60 * 60 * 24)
SparkActions.get(sparkSession).expireSnapshots(table).expireOlderThan(before).execute();
5、删除孤立文件
为啥会产生孤立文件?
- 情况1:计算引擎执行任务失败,会产生不会metadata.json引用的datafile 和metadata file
- 情况2:标记快照为过期,需要删除没有被引用的datafile,但无法确定该datafile是否被快照引用,导致本该删除datafile却没有被删除
val before: Long = System.currentTimeMillis() - (1000L * 60 * 60 * 24)
SparkActions.get(sparkSession).deleteOrphanFiles(table).olderThan(before).execute()
孤立文件的删除会耗费很长的时间,所以不要频繁进行孤立文件的删除
6、删除旧版本的metadata file
iceberg 每次write都会产生一个新的snapshot,同时也会产生一个新的version。建议给表设置对应的参数:
write.metadata.delete-after-commit.enabled=true
write.metadata.previous-versions-max=5
这样每次对标产生改变的操作commit后,会自动删除老的metadata files,保留指定版本数量的metadata files