1.coalesce()
作用:缩减分区数,用于大数据集过滤后,提高小数据集的执行效率。
需求:创建一个4个分区的RDD,对其缩减分区
#1.创建一个RDD
rdd1 = sc.parallelize(range(1,11),4)
#2.对RDD重新分区
rdd2 = rdd1.coalesce(2)
#3.打印
print("rdd1分区数", rdd1.getNumPartitions()) # 4
print("rdd2分区数", rdd2.getNumPartitions()) # 2list1 = rdd2.collect()
print(list1)
2.repartition()
作用:根据分区数,重新通过网络随机洗牌(shuffle)所有数据。
需求:创建一个4个分区的RDD,对其重新分区
from pyspark import SparkContext, SparkConfdef main():conf = SparkConf().setAppName("第一个Spark程序")sc = SparkContext(conf=conf)#1.创建一个RDDrdd1 = sc.parallelize(range(1,11),4)#2.重新分区rdd2 = rdd1.repartition(6)#3.打印print("rdd1分区数", rdd1.getNumPartitions()) #4print("rdd2分区数", rdd2.getNumPartitions()) #6list1 = rdd2.glom().collect()print(list1)if __name__ == '__main__':main()
3.总结:coalesce和repartition的区别
按住ctrl点击coalesce方法可以看到它的底层逻辑,我们可以看到,coalesce进行重新分区有两个参数,numPartitions和shuffle=False,通过传递参数可以选择是否进行shuffle过程。由参数shuffle: Boolean = false/true决定。
repartition实际上是调用的coalesce()方法返回的是coalesce(numPartitions,shuffle=True),默认是进行shuffle过程的
注意:通常情况下我们使用coalesce或者repartitions的时候不传递shuffle参数,这种情况下我们使用coalesce用来减少分区的个数,使用repartitions来增加分区的个数,但是并不代表coalesce只能减少分区,repartitions只能增加分区:解释过程如下:
减少分区允许不进行shuffle过程,但是增大分区需要 (spark要求减少分区可以shuffle,也可以没有shuffle,增加分区必须shuffle)
所以coalesce可以在不进行shuffle的情况下减少分区,增大分区需要指定第二个参数为true
减少分区的应用场景:例如通过filter之后,有些分区数据量比较少,通过减少分区,防止数据倾斜
有一个RDD,数据量比较大,并且分区数也比较大。
经过filter过滤之后,数据量减少了很多,但是默认情况下分区数不变。
这个时候可以通过repartition算子改变分区。
经过filter、分组等操作之后,RDD各个分区的数量有倾斜,
通过repartition之后,可以保证各个分区的数量相同注意,在pyspark中,底层重分区是批量操作的,如果不需要,可以设置参数:sc._batchSize = 1
增大分区的应用场景:分区内数据量太大,通过增加分区提高并行度,减少输出的文件数量
拓展:
我们新建一个rdd,数据量相比较于真正的企业开发肯定是小的可怜,使用二十多个数据吧,然后设置分区数量是4,经过filter过滤保留大于10的元素,输出之后发现,第一个分区内的数据为0,其他三个分区内的元素分布还是较为均匀的,此时发生了数据倾斜,我们就需要解决这个问题,首先我想到的是通过充分区的方式来处理
此时有趣的一幕出现了,我们想通过重分区来解决数据倾斜的问题,输出之后发现,,,数据更倾斜了,第三四分区的数据都没了,,,,这显然没有的达到我们的预期,出现这种结果的原因到底是什么,分区过程这个元素的分发是如何计算的,把我的三四分区数据都搞没了
点开代码发现,里面有一个batchSize变量,这个值好像有点用处,我不会,问一下chatgpt吧,解释说是用于确定批处理大小,self.ctx._batchSize这个值是可以设置的,默认值batchSize是10,所以在数据分发处理的时候,会10个10个的分数据到不同的分区,而我们测试用的数据很少,只有一二十个,经过filter过滤之后更少了,所以只分到了第一二分区,第三四分区没有得到值,那我们可以通过设置这个值,来决定每次分发的数据的多少
我们设置这个值为1,结果发现输出的结果好像是比较平均的,但是也还是个数不同,导致这个结果出现的原因是因为,进过filter处理后的元素经过shuffle处理,会按照一个分区内的元素进行分发处理,假如原本有4个分区,第一个分区内有11个元素,第二个分区内有9个元素,第三四个,,,经过repartition分区还是四个分区,第一个分区内的11个元素分发,到重分区后的1 2 3 4,过程是挨个分到四个分区,第四个分区最后会有2个元素,123分区都是三个元素