侯亚南
数据技术处
支宸啸
数据技术处
在大数据计算中,我们可能会遇到一个很棘手的问题——数据倾斜,此时spark任务的性能会比预期要差很多:绝大多数task都很快执行完成,但个别task执行极慢或者报OOM(内存溢出)。数据倾斜调优,就是使用各种技术方案解决不同类型的数据倾斜问题,以保证Spark作业的性能。
●
●
●
01
# 原 理 #
数据倾斜只会发生在shuffle过程中,在进行shuffle的时候,必须将各个节点上相同的key拉取到某个节点上的一个task来进行处理,比如按照key进行聚合(groupByKey、reduceByKey、aggregateByKey)或join等操作。此时如果某个key对应的数据量特别大的话,就会发生数据倾斜,导致个别task执行极慢,整个Spark作业的运行进度是由运行时间最长的那个task决定的。
02
# 定位问题 #
当某个task运行过慢时,需要定位数据倾斜发生在第几个stage中。如果是用yarn-client模式提交,那么本地是直接可以看到log的,可以在log中找到当前运行到了第几个stage;如果是用yarn-cluster模式提交,则可以通过Spark Web UI来查看当前运行到了第几个stage。此外,无论是使用yarn-client模式还是yarn-cluster模式,我们都可以在Spark Web UI上深入看一下当前这个stage各个task分配的数据量,从而进一步确定是不是task分配的数据不均匀导致了数据倾斜。
03
# 解决方案 #
3.1 使用hive ETL进行预处理
如果hive表中的数据本身很不均匀,而且业务场景需要频繁使用Spark对hive表执行某个分析操作,此时可以尝试通过hive来进行预处理(即通过hive ETL预先对数据按照key进行聚合,或者是预先和其他表进行join),然后在Spark作业中针对的数据源是预处理后的hive表,这样在Spark作业中就不需要使用原先的shuffle类算子执行这类操作了。但这种方式属于治标不治本,只是把数据倾斜的发生提前到了hive ETL中,避免Spark程序发生数据倾斜而已。
3.2 过滤少数导致倾斜的key
如果发现导致倾斜的key就少数几个,而且对计算本身的影响并不大的话,那么可以过滤掉这些key。比如,在Spark SQL中可以使用where子句过滤掉这些key,或者在Spark Core中对RDD使用filter算子过滤掉这些key。如果需要通过动态判定哪些key的数据量最多来进行过滤,那么可以使用sample算子对RDD进行采样,然后计算出每个key的数量,取数据量最多的key过滤掉即可。该方案实现简单,而且效果也很好,可以完全规避掉数据倾斜。但大多数情况下,导致数据倾斜的key还是很多的,并不是只有少数几个。
3.3 提高shuffle操作的并行度
此方案是一种对数据倾斜迎难而上的方案,通过增加shuffle read task的数量,可以让原本分配给一个task的多个key分配给多个task,从而让每个task处理比原来更少的数据,可以通过修改spark.sql.shuffle.partitions的值来增加shuffle read task的并行度。此方案可以有效缓解数据倾斜,但是没有彻底解决问题,如果出现极端情况,比如某个key对应的数据量巨大,那么无论task数量增加到多少,这个key对应的数据还是可能会分配到一个task中去处理。
3.4 局部聚合+全局聚合
该方案适用于对RDD执行reduceByKey等聚合类shuffle算子或者在Spark SQL中使用group by语句进行分组聚合。通过将原本相同的key附加随机前缀的方式,变成多个不同的key,就可以让原本被一个task处理的数据分散到多个task上去做局部聚合,进而解决单个task处理数据量过多的问题,最后再去除掉随机前缀,进行全局聚合,就可以得到最终的结果。但如果是join类的shuffle操作,还得用其他的解决方案。
3.5 将reduce join转为map join
此方案适用于大表join小表的情况,通过不使用join算子进行连接操作,而使用Broadcast变量与map类算子来实现join操作,进而完全规避掉shuffle类的操作,彻底避免数据倾斜的发生和出现。通过将较小RDD中的数据直接用collect算子拉取到Driver端的内存中来,然后对其创建一个Broadcast变量;接着对另外一个RDD执行map类算子,在算子函数内,从Broadcast变量中获取较小RDD的全量数据,与当前RDD的每一条数据按照key进行比对。如果key相同的话,那么就将两个RDD的数据按照需要的方式连接起来。此方案不适用于两个大表join的情况。
3.6 采样倾斜key并分拆join操作
如果两个RDD/hive表进行join的时候,数据量都比较大,其中某一个RDD/hive表中的少数几个key的数据量过大,而另一个RDD/hive表中的所有key都分布比较均匀,可以将数据量大的几个key分拆成独立RDD,并附加随机前缀打散成n份去进行join。此时这几个key对应的数据就不会集中在少数几个task上,而是分散到多个task上去join了,最后将结果使用union算子合并起来即可。但如果导致倾斜的key特别多的话,不适合该方案。
3.7 使用随机前缀和扩容RDD进行join
如果在进行join操作的时候,RDD中有大量的key导致数据倾斜,可以将该RDD的每条数据都打上一个n以内的随机前缀,同时对另外一个正常的RDD进行扩容,将每条数据都扩容成n条数据,扩容出来的每条数据都依次打上一个0~n的前缀,最后将两个处理后的RDD进行join即可。该方案与上一种方案的不同之处在于,上种方案是尽量只对少数倾斜key对应的数据进行特殊处理(扩容RDD),对内存的占用并不大;而该方案是针对有大量倾斜key的情况,没法将部分key拆分出来进行单独处理,只能对整个RDD进行数据扩容,对内存资源要求很高。
实际项目中,应该综合分析数据的特征、需要进行的操作等来合理选取方案,可以多种方案组合使用。
推荐阅读 |
大数据技术初探之sparkstreaming与flink技术对比 |
技术分享|大数据技术初探之流计算框架 |