数据倾斜,顾名思义,就是在计算过程中数据分散度不够,导致某个节点数据过于集中,从而导致任务执行效率大大降低。参照对比下MR的整体流程和ODPS,整体结合理解数据倾斜发生的几个生命周期的节点,如下图:可以分为Map、Reduce、Join三个阶段理解。
Map端长尾
Map端是MR任务的开始,在Map端读取数据的时候,由于读入的数据文件大小分布不均匀,会导致一些Map instance的数据量很大,一些instance很小,造成了Map端长尾现象。一般情况下,Map端长尾现象较少,主要由以下两种原因导致:
Map端读入的文件大小不均匀,且小文件很多,导致当前Map端读取数据不均匀
Map端做聚合操作由于Map instance读取文件某个值较多引起长尾,主要指count(distinct)操作
方案
针对小文件和文件大小不均匀的情况,暂时没有特别好的方法,可以通过合并文件、文件切片大小等参数进行调优,即:set odps.sql.mapper.merge.limit.size = 64,set odps.sql.mapper.split.size = 256
针对第二种情况,采取distribute by rand()来打乱数据,使数据尽可能的分布均匀,如下:通过distribute by rand() 会让Map端分发后的数据重新随机进行一次分发。这种方式虽然平衡了instance处理的数据量,但是带来了Reduce端的资源紧张。具体可以增大Reduce端的instance个数,具体如下:set odps.sql.reducer.insatnce = 100;
select
t1.*
from
(
select
id,
name
from ods.cn_log_app_test01_pre_1d
where dt = ‘ d t ′ d i s t r i b u t e b y r a n d ( ) ) t 1 l e f t j o i n ( s e l e c t ∗ f r o m o d s . c n l o g a p p t e s t 0 2 p r e 1 d w h e r e d t = ′ {dt}' distribute by rand() )t1 left join ( select * from ods.cn_log_app_test02_pre_1d where dt = ' dt′distributebyrand())t1leftjoin(select∗fromods.cnlogapptest02pre1dwheredt=′{dt}’
) t2
on t1.id = t2.id
Reduce端长尾
Reduce端主要负责的是对Map端输出的K-V形式的数据进行处理,比如常见的聚合操作,即count、sum、avg等,得到最终的聚合结果。Reduce端产生长尾现象的主要原因是key的分布不均匀导致。比如:一些instance处理的数据量很大,可能40min这个reduce task还没结束,有的instance处理的数据量很小,主要场景如下:(从上至下优先级为常见-> 少见)
在Join阶段经常发生的:空值很多,造成这么多的Null值被分发到同一个instance上
在Map端聚合操作的时候出现key值分布不均匀,从而导致Reduce端长尾
多个distinct出现在同一段SQL代码中,数据会被分发多次,不仅会造成数据膨胀,还会把长尾现象放大多倍
动态分区数过多可能造成小文件过多,从而引起Reduce端长尾
第一种场景方案
当存在很多Null值的时候,可以选择把空值数据单独拎出来,然后别的数据去做Join,最后再把空值数据union all上去即可。生产中,有些空值是具备业务含义的,比如:剩余还款字段,用户某个时间点已还清贷款,显然去掉空值数据是肯定不对的,所以需要具体情况具体分析
加随机数,打散,实际用的不多,第一种方式用的更高频
比如数据如下:
concat(name + ‘_’ + random)
乌克兰 -> 乌克兰_1
乌克兰 -> 乌克兰_2
乌克兰 -> 乌克兰_3
最终结果数据 res.split(‘_’)[0]再取回原数据即可
第二种场景方案
查看数据key的分布情况,对一些热点key进行单独计算,然后再union all回来,简单来说,就是两个任务单独处理,例如:select / + skewjoin (a(c0,c1)) /* from T0 a join T1 b on a.c0 = b.c0 and a.c1 = b.c1 and a.c2 = b.c2
第三种场景方案
ODPS对动态分区的处理是引入一个额外的一级reduce task,相同的目标分区交由一个(或者少量几个)reduce instance写入,避免小文件过多
第四种场景方案
遇到该情况,最好的办法就是通过其他方案例如:临时表、现有模型去计算去重的指标,避免使用distinct造成数据膨胀
Join长尾优化
Join是指将相同的数据分发到同一个instance上处理,如果某个key的value量级很大,处理时间很长,即称之为长尾。主要以下几个场景:
大表join小表
空值和热点值在reduce长尾场景中已经说过,见上文
场景一方案
t1表数据量级:9700w+ t2表量级:1000w+
SELECT /+mapjoin(t2)/
t1.*
,t2.f_guaname
,t2.f_guades
,t2.f_own
FROM (
SELECT *
FROM ods.ods_zdf_itf_company_mort
WHERE dt = ‘ b i z d a t e ′ A N D n v l ( f d e l e t e d , ′ 0 ′ ) = ′ 0 ′ ) t 1 L E F T J O I N ( S E L E C T f g u a n a m e , f g u a d e s , f o w n F R O M o d s . o d s z d f i t f c o m p a n y m o r t g u a W H E R E d t = ′ {bizdate}' AND nvl(f_deleted, '0') = '0' ) t1 LEFT JOIN ( SELECT f_guaname ,f_guades ,f_own FROM ods.ods_zdf_itf_company_mort_gua WHERE dt = ' bizdate′ANDnvl(fdeleted,′0′)=′0′)t1LEFTJOIN(SELECTfguaname,fguades,fownFROMods.odszdfitfcompanymortguaWHEREdt=′{bizdate}’
AND nvl(f_deleted, ‘0’) = ‘0’
) t2
ON t1.f_morreg_id = t2.f_morreg_id
ODPS高频的长尾参数配置
set odps.sql.groupby.skewindata=true/false
row_number()优化【不常用】
分组过程中可能某个key的数据量太大,会造成倾斜的,那么自己搞个随机列,根据key + random分组,那么这个热点key的数据其实就打散了,那么再求出topN,其实这批次topN就已经是一定条件下的好马了,内层循环已经避免了数据倾斜,外层直接根据key全局分组,相当于这批好马里再挑好马。
select cate_id
,property_id
,value_id
from
(select cate_id
,property_id
,value_id
,row_number() over(partition by cate_id order by property_id asc,value_id asc) as rn
from
(select cate_id
,property_id
,value_id
from
(select cate_id
,property_id
,value_id
,row_number() over(partition by cate_id,sec_part order by property_id asc,value_id asc) as rn
from
(select cate_id
,property_id
,value_id
,ceil(M*rand())%P as sec_part
from one_plat.ads_tb_sycm_cate_xx
where ds = ‘${bizdate}’
) s
) p
where rn <= N
) part
) a
where rn <= N
常见资源配置参数
设置Map Task每个instance的cpu数,默认在[50-800]之间,一般不会调
set odps.sql.mapper.cpu=100;
设定Map Task每个Instance的Memory大小,单位M,默认1024M
set odps.sql.mapper.memory=1024;
设定一个Map的最大数据输入量
set odps.sql.mapper.split.size=256
[最好不要降低这个阈值,因为每个map task的instance时间是减少了,但是小文件多了,合并时间翻倍,得不偿失]
设定Join Task的Instance数量,默认为-1,在[0,2000]之间调整
set odps.sql.joiner.instances=-1;
场景:每个Join Instance处理的数据量比较大,耗时较长,没有发生长尾,可以考虑增大使用这个参数。
扩大string字符串的大小,这个场景是之前字符串中存储多条json的时候,多的存储几十万条,超出string默认大小,只能修改string大小,需要注意的是要不断调节,有OOM的风险
set odps.sql.cfile2.field.maxsize=16384;
总结
尽量不要通过设置参数的方式进行优化,首先要看能否从业务上或者算法上减少数据和其他方式进行优化
设置参数的话,要首先确定是Map、Reduce、Join哪个阶段的任务时间长,从而设置对应的参数
在没有出现数据倾斜的情况下,如果通过设置Cpu参数(含Memory参数)和设置Instance个数两种方式都能调优的话,最好是先设置Instance个数。因为如果Cpu/Memory参数设置不合理,执行任务的机器满足不了参数的要求,要重新找机器的,这样反而会影响效率
Instance的个数设置,有个简单的方法是二分法。先设置个很大的,如果不能满足要求,那么就继续增大,如果满足了要求,就折半降低参数大小,最终找到一个合适的值
如果面试中问到hivesql的优化,可以侧重数据倾斜做不同场景的优化分析,尽量避免按照网上的优化套路回答,往往效果不佳,毕竟面试官能搜到的答案,大概率不是他想看到的答案,突出有自己的思考且真正的有所应用才是最重要的,不然就会出现自己觉得答的很不错最后却没了结果。