一、数据加载
(1)默认数据源(parquet)
最简单加载数据的方式,所有操作都使用默认数据源(parquet
)。如果指定默认数据源需要配置 spark.sql.sources.default参数。
Dataset<Row> manDF = spark.read().load("hdfs://master:9000/test.parquet");
manDF.select("name", "desc").write().save("hdfs://master:9000/test1.parquet");
(2)手动指定选项
可以手动指定将要使用的数据源以及要传递给数据源的任何其他选项。数据源通过其全名指定(即org.apache.spark.sql.parquet
),但内置的来源。
也可以使用自己的短名称(json
,parquet
,jdbc
,orc
,libsvm
,csv
,text
)。从任何数据源类型加载的DataFrame都可以使用此语法转换为其他类型。
请参阅API文档以获取内置源的可用选项,例如 org.apache.spark.sql.DataFrameReader
和org.apache.spark.sql.DataFrameWriter
。此处记录的选项也应通过非Scala Spark API(例如PySpark)应用。
Dataset<Row> manDF = spark.read().format("json").load("hdfs://master:9000/test.json");
manDF.select("name", "desc").write().format("parquet").save("hdfs://master:9000/test1.parquet");
(3)加载CSV文件
Dataset<Row> manDF = spark.read().format("csv").option("sep", ";").option("inferSchema", "true").option("header", "true").load("hdfs://master:9000/test.csv");
(4)写操作期使用额外的option
控制ORC数据源的Bloom过滤器和字典编码,以下ORC示例将创建Bloom过滤器,并仅将字典编码用于age
。对于parquet
,也存在parquet.enable.dictionary
。要查找有关其他ORC / Parquet选项的更多详细信息,请访问Apache ORC / Parquet官方网站。
manDF.write.format("orc").option("orc.bloom.filter.columns", "age").option("orc.dictionary.key.threshold", "1.0").option("orc.column.encoding.direct", "name").save("hdfs://master:9000/man.orc")
二、保存模式
保存操作可以选择带SaveMode
,指定如何处理现有数据(如果存在)。重要的是要认识到这些保存模式不使用任何锁定,也不是原子的。另外,执行时Overwrite
,将在写出新数据之前删除数据。
Scala / Java | 任何语言 | 意义 |
---|---|---|
SaveMode.ErrorIfExists (默认) | "error" or "errorifexists" (默认) | 将DataFrame保存到数据源时,如果已经存在数据,则将引发异常。 |
SaveMode.Append | "append" | 将DataFrame保存到数据源时,如果已经存在数据/表,则应该将DataFrame的内容附加到现有数据中。 |
SaveMode.Overwrite | "overwrite" | 覆盖模式意味着将DataFrame保存到数据源时,如果已经存在数据/表,则预期现有数据将被DataFrame的内容覆盖。 |
SaveMode.Ignore | "ignore" | 忽略模式意味着在将DataFrame保存到数据源时,如果已经存在数据,则期望保存操作不保存DataFrame的内容并且不更改现有数据。这类似于CREATE TABLE IF NOT EXISTS SQL中的。 |
保存到永久表
DataFrames
也可以使用以下saveAsTable
命令作为持久性表保存到Hive Metastore中。请注意,使用此功能不需要现有的Hive部署。Spark将为您创建一个默认的本地Hive Metastore(使用Derby)。与createOrReplaceTempView
命令不同, saveAsTable
它将具体化DataFrame的内容并在Hive元存储中创建一个指向数据的指针。即使您重新启动Spark程序,持久表仍将存在,只要您保持与同一metastore的连接即可。可以通过使用表名称table
在上调用方法来创建持久表的DataFrame SparkSession
。
对于基于文件的数据源,例如文本,镶木地板,json等,您可以通过path
选项指定自定义表路径 ,例如df.write.option("path", "/some/path").saveAsTable("t")
。删除表后,自定义表路径将不会删除,并且表数据仍然存在。如果未指定自定义表路径,Spark会将数据写入仓库目录下的默认表路径。删除表时,默认表路径也将被删除。
从Spark 2.1开始,持久数据源表在Hive元存储中存储了按分区的元数据。这带来了几个好处:
- 由于元存储只能返回查询的必要分区,因此不再需要在第一个查询中将所有分区发现到表中。
- Hive DDL(例如,
ALTER TABLE PARTITION ... SET LOCATION
现在可用于使用Datasource API创建的表)。
请注意,在创建外部数据源表(带有path
选项的表)时,默认情况下不会收集分区信息。要同步元存储中的分区信息,可以调用MSCK REPAIR TABLE
。
三、分组,分类和分区
对于基于文件的数据源,也可以对输出进行存储和分类或分区。
(1)桶和排序
存储桶和排序仅适用于持久表
cityDF.write().bucketBy(10, "city").sortBy("area").saveAsTable("city_buckets");
(2)分区
而分区可以既使用save
和saveAsTable
使用DataSet API时
manDF.write().partitionBy("age").format("json").save("hdfs://master:9000/man.json");
(3)分区和桶
对表使用分区和存储桶
manDF.write().partitionBy("age").bucketBy(18, "name").saveAsTable("man_partition_buckets")
partitionBy创建一个分区结构描述“分区发现”部分,它对具有高聚集数的列适用性有限。相反, bucketBy将数据分布在固定数量的存储桶中,在唯一值的数量不受限制时可以使用。