delta lake
基本使用 可参见: https://docs.delta.io/2.3.0/quick-start.html#language-scala
bin/spark-shell --packages io.delta:delta-core_2.12:2.3.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
Note:–packages 会缓存依赖在 本地 ~/.ivy2/cache
$ ll ~/.ivy2/cache/io.delta/delta-core_2.12/jars/delta-core_2.12-2.3.0.jar
-rw-rw-r-- 1 hadoop hadoop 3986365 Apr 5 2023 /home/hadoop/.ivy2/cache/io.delta/delta-core_2.12/jars/delta-core_2.12-2.3.0.jar
val data = spark.range(0, 5)
data.write.format("delta").save("/tmp/delta-table")
val df = spark.read.format("delta").load("/tmp/delta-table")
df.show()
val data = spark.range(5, 10)
data.write.format("delta").mode("overwrite").save("/tmp/delta-table")
df.show()
import io.delta.tables._
import org.apache.spark.sql.functions._val deltaTable = DeltaTable.forPath("/tmp/delta-table")// Update every even value by adding 100 to it
deltaTable.update(condition = expr("id % 2 == 0"),set = Map("id" -> expr("id + 100")))// Delete every even value
deltaTable.delete(condition = expr("id % 2 == 0"))// Upsert (merge) new data
val newData = spark.range(0, 20).toDFdeltaTable.as("oldData").merge(newData.as("newData"),"oldData.id = newData.id").whenMatched.update(Map("id" -> col("newData.id"))).whenNotMatched.insert(Map("id" -> col("newData.id"))).execute()deltaTable.toDF.show()