Spark如何用repartition来提升执行效率
- 什么是`repartition`
- 如何使用
- 什么时候需要使用`repartition`
什么是repartition
repartition
是 Spark 中的一个转换操作,它可以用来增加或减少分区的数量。这个操作会产生一个新的 RDD,DataFrame 或 Dataset,并将数据重新分布到新的分区中。
如何使用
在Spark中,repartition方法可以按照两种方式进行:
- 按照指定的分区数量进行重新分区。例如:
val df = sparkSession.read.json("path/to/json") // 读取JSON文件创建DataFrame
val repartitionedDF = df.repartition(10) // 重新分区为10个分区
- 按照某个或某些列的值进行重新分区。这种方式常用于DataFrame或Dataset。例如:
val df = sparkSession.read.json("path/to/json") // 读取JSON文件创建DataFrame
val repartitionedDF = df.repartition(col("columnName")) // 按照"columnName"列的值进行重新分区
如果不指定列repartition按照什么进行分区?
果在调用 repartition 方法时没有指定列,那么Spark会将数据均匀地分布到指定数量的分区中,但具体的分区策略是不确定的。
在这种情况下,Spark会尽量保证每个分区中的数据量大致相等,但并不能保证每个分区中的数据在业务逻辑上的均匀分布。也就是说,如果你的数据在某个特定的列上有特定的分布模式,那么在重新分区后,这种分布模式可能会被打乱。
因此,如果你的数据处理逻辑需要按照某个特定的列进行分区,那么你应该在调用 repartition 方法时指定这个列。
什么时候需要使用repartition
- executor内存不足
由于数据量过大,分到单个executor的数据量大小超过了内存会导致OOM。可以通过reparation调大执行分区数来减小单executor的数据量。需要观察内存不足的stage的现有分区数和调整后的分区数来进行优化。 - 数据倾斜
观察失败的stage(通常是内存不足)的每个executor的数据量,是否有数据不均匀。如果各个executor的数据量差距较大,说明默认的分区不合理,数据集中在个别分区。此时需要进行repartition:根据某个唯一键或者根据数据的特征来分配数据。
❗️需要注意的是,repartition 操作会引发全量数据的洗牌,可能会消耗大量的计算和I/O资源,因此在使用时需要谨慎考虑。