一、配置
忽略损坏的文件、忽略丢失的文件、路径全局过滤器、递归文件查找和修改时间路径过滤器等选项/配置仅在使用基于文件的源(parquet,orc,avro,json,csv,txt)时才有效。
以下示例中使用的目录层次结构为:
dir/├── childDir/│ └── test.json└── test.avro└── test.orc└── test1.json
二、忽略损坏的文件
设置spark.sql.files.ignoreCorruptFiles
为true从文件读取数据时忽略损坏的文件。
读取数据文件时忽略损坏的文件例子:
spark.sql("set spark.sql.files.ignoreCorruptFiles=true");
// dir/test.avro 和 dir/test.avro不是json被忽略
Dataset<Row> jsonDF = spark.read().json("hdfs://master:9000/dir/","hdfs://master:9000/dir/chidDir/");
jsonDF.show();
三、忽略丢失的文件
设置spark.sql.files.ignoreMissingFiles
设置为true在文件读取数据时忽略不存在的文件。
spark.sql("set spark.sql.files.ignoreMissingFiles=true");Dataset<Row> jsonDF = spark.read().json("hdfs://master:9000/dir/","hdfs://master:9000/dir/chidDir/a.json");
jsonDF.show();
四、路径全局过滤器
pathGlobFilter
用于仅包含文件名与模式匹配的文件。语法实现为org.apache.hadoop.fs.GlobFilter类,
它不会更改分区发现的行为。
要加载具有与给定全局模式匹配的路径的文件,同时保持分区发现的行为,例子:
Dataset<Row> jsonDF = spark.read().format("json").option("pathGlobFilter", "*.json") // dir只读取json文件.load("hdfs://master:9000/dir");
jsonDF.show();
五、递归文件查找
recursiveFileLookup
用于递归加载文件,并且禁用分区推断。其默认值为false
。如果数据源显式指定partitionSpec
并且recursiveFileLookup
为true,则将引发异常。
递归加载所有文件例子:
Dataset<Row> jsonDF = spark.read().format("json").option("recursiveFileLookup", "true").load("hdfs://master:9000/dir");
jsonDF.show();
六、修改时间路径过滤器
modifiedBefore
和·modifiedAfter
是可以一起应用或单独应用的选项。(注意,结构化流文件源不支持这些选项。)
modifiedBefore
:可选的时间戳记,仅包含修改时间在指定时间之前发生的文件。提供的时间戳必须采用以下格式:YYYY-MM-DDTHH:mm:ss(例如:2021-03-31T20:10:00)modifiedAfter
:可选的时间戳记,仅包括修改时间在指定时间之后发生的文件。提供的时间戳必须采用以下格式:YYYY-MM-DDTHH:mm:ss(例如:2021-03-31T20:10:00)
如果未提供时区选项,则时间戳使用Spark会话时区(spark.sql.session.timeZone
)。
要加载路径与给定的修改时间范围匹配的文件,例子:
Dataset<Row> jsonDF = spark.read().format("json").option("modifiedBefore", "2021-03-21T11:34:00").option("modifiedAfter", "2021-03-05T11:34:00")// Interpret both times above relative to CST timezone.option("timeZone", "CST").load("hdfs://master:9000/dir");
jsonDF.show();