shuffle是一个涉及到CPU(序列化反序列化)、网络IO(跨节点数据传输)以及磁盘IO(shuffle中间结果落盘)的操作。
优化思路:
减少shuffle的数据量,减少shuffle的次数。
具体方式:
- 能不shuffle的时候尽量不要shuffle数据,可以使用mapjoin(广播变量broadcast);
- 能用reduceByKey就不要用groupByKey,因为reducerByKey会在shuffle前进行本地聚合(map阶段进行预聚合combine),减少写出中间文件的个数,从而可以使在shuffle过程中减少磁盘IO;
- spark2.0后已经没有HashShuffleManager,只有SortShuffleManager,SortShuffleManager内部有3种shuffle操作,可适应小中大集群。
- 参数调节:如下
spark.reducer.maxSizeInFlight:reduce task的拉取缓存,默认48m
spark.shuffle.file.buffer:map task的写磁盘缓存,默认32k
spark.shuffle.io.maxRetries:拉取失败的最大重试次数,默认3次
spark.shuffle.io.retryWait:拉取失败的重试间隔,默认5s