相信很多开发都知道这个问题,看文章,看博客都有了解过。但是如果让你自己讲,能不能从头到尾讲明白原理和对应的解决方案呢?
这个小文件是怎么产生的?就一句话,spark处理完数据输出时,一个分区一个文件写到了hdfs上。
那怎么说这个小文件呢?每个文件的存储小于128M,实际上大部分是几kb。
这就很浪费呀,一个块128M,你给我只存几k?然后Namenode开始骂骂咧咧~~~~
言归正传,下面咱们开始唠唠:
1、在Spark处理数据时,如果未对中间结果或最终输出进行合并处理,每个RDD分区的数据将被写入单独的文件中,导致HDFS上可能生成大量小文件。
2. 大量小文件会降低HDFS存储效率和NameNode内存利用率,增加元数据管理负担,以及影响后续读取操作的I/O性能。
3. 为解决这一问题,Spark提供了诸如`coalesce()`或`repartition()`等API来调整分区数,并结合`DataFrameWriter`的`bucketBy`、`sortBy`及`saveAsTable`等方法,实现数据聚合与大文件输出,从而有效减少小文件的数量。
那如果控制小文件的产生呢?
在Spark中,减少shuffle阶段之前小文件产生的主要策略包括:
1. **合并分区(Repartitioning)**:
在进行shuffle操作前,可以使用`repartition()`或`coalesce()`方法来调整RDD的分区数。通过将数据重新分布到更少但更大的分区中,可以在shuffle过程中减少最终写入HDFS的小文件数量。
- `repartition()`会重新创建指定数量的分区,并可能触发一个完整的数据 Shuffle。
- `coalesce()`则可以在减少分区数时尽量避免全量 Shuffle,但如果需要增加分区数,则同样会触发 Shuffle。
2. **批量写入与压缩**:
使用批量写入机制,在聚合或者reduce操作完成后,一次性写出较大的结果文件而不是逐条写出。同时启用数据压缩选项,如Snappy、Gzip等,以减小单个文件的实际大小,即使分区较多也能有效控制物理文件的数量。
3. **自定义Partitioner**:
设计合理的自定义Partitioner,确保数据在Shuffle时能均匀分布并尽可能地聚集在一起,从而在写入磁盘时生成较少的大文件。
4. **增大块大小**:
如果是在Hadoop HDFS层面,可以考虑适当增大HDFS的块大小设置,使得每个数据块能够存储更多的记录,间接减少因为分区而产生的小文件数量。
5. **提前合并处理**:
对于源头就包含大量小文件的数据源,可以预先执行合并操作,例如在加载数据到Spark之前,先利用MapReduce或Spark作业将小文件合并为大文件。
6. **采用数据湖技术**:
使用支持小文件自动合并特性的数据湖解决方案,如Delta Lake或Apache Hudi,它们提供了事务性和管理小文件的功能。
在Apache Spark中,`coalesce()`算子也可以用来减少RDD或DataFrame的分区数,但它与`repartition()`有所不同:
1. **减小分区数**:`coalesce()`可以将数据集的分区数量缩小到一个较小的值,而不需要进行全量shuffle。这意味着它会尽量重用现有的分区数据,尽可能地保持数据本地性,从而减少网络传输开销。
2. **保持分区顺序(部分情况)**:如果只是少量减少分区数,`coalesce()`可能会保留分区内部的数据顺序(但这不是绝对保证的,具体取决于Spark实现和执行计划)。
3. **增加分区数受限**:但是,当你需要增加分区数时,`coalesce()`就不是一个合适的选择,因为它不支持通过增加分区数来触发shuffle。
因此,在考虑是否手动触发shuffle时:
- 如果你想要减少分区数,并且希望操作相对轻量级、避免全量shuffle,可以选择使用`coalesce()`。
- 如果你需要重新分布数据以达到特定的分区策略(比如均衡数据大小),或者需要增加分区数以便更好地并行处理数据,则应该使用`repartition()`,因为它会强制进行shuffle操作。
最最最后:在Spark作业即将写入HDFS时,减少小文件的策略可以包括以下几种:
- 使用
repartition()
或coalesce()
方法重新组织数据分布。通过增大目标分区数,使得每个分区包含更多的数据量,从而减少最终输出的小文件数量。但要注意,增加分区会导致额外的shuffle操作和资源消耗。 - 在写入HDFS之前,设置合理的批次大小(例如使用
DataFrameWriter
的option("maxRecordsPerFile", num_records)
),确保每次写入的数据量足够大。 - 启用数据压缩,这可以在减小文件大小的同时减少物理文件的数量。
df.write.mode("overwrite").format("parquet").option("compression", "gzip") # 或其他压缩算法.save(path_to_hdfs)
Spark SQL中减少小文件生成的方法
- 使用
coalesce()
或repartition()
函数来重新调整DataFrame的分区数,确保每个分区包含足够的数据量。repartition()
会触发一个全shuffle操作,而coalesce()
则可能不会增加分区数而是合并现有分区,因此需根据实际场景选择合适的函数。 -
-- 在SQL中可通过创建视图的方式实现 CREATE OR REPLACE TEMPORARY VIEW repartitioned_data AS SELECT * FROM original_table REPARTITION(num_partitions);
待补充~~~~~~~~~~~~