通过hadoop权威指南学习hadoop,对shuffle过程一直很疑惑,经过查看网上多个帖子,最终 完成此篇问答总结。

1.什么叫shuffle

map任务输出到reducer任务输入之间的过程就叫做shuffle

 

2.每个map任务都有对应的缓存吗?默认是多少,怎么配置这个值的大小?

每个map任务都有一个缓存支持输出,默认大小是100m,可以通过属性io.sort.mb配置

 

3.什么时候触发缓存的数据写入磁盘

当缓存的容量达到缓存一定比例时触发,这个比例由属性Io.sort.spill.percent配置,默认是0.8

 

4.为什么需要设置写入比例

达到一定比例后,由于写缓存和读缓存是可以同时并行执行的,这会降低把缓存数据腾空的时间,从而提高效率

 

5.怎么理解缓存叫做环形缓存

缓存有一个阀值比例配置,当达到整个缓存的这个比例时,会触发spill操作;触发时,map输出还会接着往剩下的空间写入,但是写满的空间会被锁定,数据溢出写入磁盘。

当这部分溢出的数据写完后,空出的内存空间可以接着被使用,形成像环一样的被循环使用的效果。如图:

图一表示刚好达到溢出比例的结构:


 

图二表示有数据开始spill到磁盘,并且新的数据继续往空的空间写入


 

图三表示溢出的数据都被写入磁盘后缓存的状态


 

图四表示溢出前剩余的空间被写满后继续从头(以前被溢出的数据所占空间)开始写入


 

以上四个图展示的过程为,尾部写满后从头部接着写,形成类似环状的形态

 

6.缓存的结构是什么样的?

如图:


数据从右到左开始写入,关于此keyvalue的元数据(partition,keystart,valuestart)写入左边的索引区

 

 

 

7.怎么理解partition的过程

分做两步:

1.标记key value所属与的分区

    map输出的时候,写入缓存之前,会调用partition函数,计算出数据所属的分区,并且把这个元 数据存储起来

2.把属与同一分区的数据合并在一起

    当数据达到溢出的条件时(即达到溢出比例,启动线程准备写入文件前),读取缓存中的数据和分区元数据,然后把属与同一分区的数据合并到一起

 

8.map任务端数据输出排序过程是什么样的?

当达到溢出条件后,比如默认的是0.8,则会读出80M的数据,根据之前的分区元数据,按照分区号进行排序,这样就可实现同一分区的数据都在一起,然后再根据map输出的key进行排序。最后实现溢出的文件内是分区的,且分区内是有序的

 

9.map任务数据输出后所做的combinemerge有什么区别?

1combine主要是把形如aa:1,aa:2这样的key值相同的数据进行计算,计算规则与reduce一致,比如:当前计算是求key对应的值求和,则combine操作后得到aa:3这样的结果。

       map输出数据根据分区排序完成后,在写入文件之前会执行一次combine操作(前提是设客户端设置了这个操作);如果map输出比较大,溢出文件个数大于3(此值可以通过属性min.num.spills.for.combine配置)时,在merge的过程(多个spill文件合并为一个大文件)中还会执行combine操作

注意事项:不是每种作业都可以做combine操作的,只有满足以下条件才可以:

 a)reduce的输入输出类型都一样,因为combine本质上就是用的reduce

 b)计算逻辑上,combine操作后不会影响计算结果,像求和就不会影响

 

2)merge操作是对形如a:1 a:2这样的数据最后形成{"a":[1,2]}这样的数据,作为reduce任务的输入

    map输出的时候,只有在多个溢出文件合并为一个大文件时才会执行merge操作

 

无论是combine还是merge都是为了增加数据的密度,减少数据的传输和存储,提高系统的效率

 

10.怎么标记溢出文件中不同分区的数据

每次溢出的数据写入文件时,都按照分区的数值从小到大排序,内部存储是以tag的方式区分不同分区的数据;同时生成一个索引文件,这个索引文件记录分区的描述信息,包括:起始位置、长度、以及压缩长度,这些信息存储在IndexRecord结构里面。一个spill文件中的多个段的索引数据被组织成SpillRecord结构,SpillRecord又被加入进indexCacheList中。

 

11.怎样把所有的spill文件合并进入唯一一个文件

      map输出数据比较多的时候,会生成多个溢出文件,任务完成的最后一件事情就是把这些文件合并为一个大文件。合并的过程中一定会做merge操作,可能会做combine操作。

1)如果生成的文件太多,可能会执行多次合并,每次最多能合并的文件数默认为10,可以通过属性min.num.spills.for.combine配置

2)多个溢出文件合并是,同一个分区内部也必须再做一次排序,排序算法是多路归并排序

3)是否还需要做combine操作,一是看是否设置了combine,二是看溢出的文件数是否大于等于3,请看第9点的介绍

4)最终生成的文件格式与单个溢出文件一致,也是按分区顺序存储,并且有一个对应的索引文件,记录每个分区数据的起始位置,长度以及压缩长度。这个索引文件名叫做file.out.index

 

12.reducer怎么知道去哪儿读取map输出呢?

当任务执行完成后,tasktracker会通知jobtracker;当reducer所在的reducer通过心跳请求任务时,jobtracker会告诉reducer去哪儿拷贝数据

 

13.reducer怎么知道自己应该读取那个分区呢?

这个问题,我一直没有搞明白,目前猜测是按照顺序,比如第一个分配的reudcer任务对应的分区号是0;还有一种可能是,执行map任务的tasktracker把分区索引告诉了jobtracker,然后jobtracker明确告诉reducer去哪儿读取输出,读取的是那个分区的数据。

 

14.reduce端的过程是什么样的?


 

reduce的运行是分成三个阶段的。分别为copy->sort->reduce。由于job的每一个map都会根据reduce(n)数将数据分成map 输出结果分成n个partition,

所以map的中间结果中是有可能包含每一个reduce需要处理的部分数据的。所以,为了优化reduce的执行时间,hadoop中是等job的第一个map结束后,

所有的reduce就开始尝试从完成的map中下载该reduce对应的partition部分数据。这个过程就是通常所说的shuffle,也就是copy过程。

 

Reduce task在做shuffle时,实际上就是从不同的已经完成的map上去下载属于自己这个reduce的部分数据,由于map通常有许多个,

所以对一个reduce来说,下载也可以是并行的从多个map下载,这个并行度是可以调整的,调整参数为:mapred.reduce.parallel.copies(default 5)。

默认情况下,每个只会有5个并行的下载线程在从map下数据,如果一个时间段内job完成的map有100个或者更多,那么reduce也最多只能同时下载5个map的数据,

所以这个参数比较适合map很多并且完成的比较快的job的情况下调大,有利于reduce更快的获取属于自己部分的数据。

 

reduce的每一个下载线程在下载某个map数据的时候,有可能因为那个map中间结果所在机器发生错误,或者中间结果的文件丢失,或者网络瞬断等等情况,

这样reduce的下载就有可能失败,所以reduce的下载线程并不会无休止的等待下去,当一定时间后下载仍然失败,那么下载线程就会放弃这次下载,

并在随后尝试从另外的地方下载(因为这段时间map可能重跑)。所以reduce下载线程的这个最大的下载时间段是可以调整的,

调整参数为:mapred.reduce.copy.backoff(default 300秒)。如果集群环境的网络本身是瓶颈,那么用户可以通过调大这个参数来避免reduce下载线程被误判为失败的情况。不过在网络环境比较好的情况下,没有必要调整。通常来说专业的集群网络不应该有太大问题,所以这个参数需要调整的情况不多。

 

Reduce将map结果下载到本地时,同样也是需要进行merge的,所以io.sort.factor的配置选项同样会影响reduce进行merge时的行为,该参数的详细介绍上文已经提到,

当发现reduce在shuffle阶段iowait非常的高的时候,就有可能通过调大这个参数来加大一次merge时的并发吞吐,优化reduce效率。

 

Reduce在shuffle阶段对下载来的map数据,并不是立刻就写入磁盘的,而是会先缓存在内存中,然后当使用内存达到一定量的时候才刷入磁盘。

这个内存大小的控制就不像map一样可以通过io.sort.mb来设定了,而是通过另外一个参数来设置:mapred.job.shuffle.input.buffer.percent(default 0.7),

这个参数其实是一个百分比,意思是说,shuffile在reduce内存中的数据最多使用内存量为:0.7 × maxHeap of reduce task。也就是说,

如果该reduce task的最大heap使用量(通常通过mapred.child.java.opts来设置,比如设置为-Xmx1024m)的一定比例用来缓存数据。默认情况下,

reduce会使用其heapsize的70%来在内存中缓存数据。如果reduce的heap由于业务原因调整的比较大,相应的缓存大小也会变大,这也是为什么reduce

用来做缓存的参数是一个百分比,而不是一个固定的值了。

 

假设mapred.job.shuffle.input.buffer.percent为0.7,reduce task的max heapsize为1G,那么用来做下载数据缓存的内存就为大概700MB左右,

这700M的内存,跟map端一样,也不是要等到全部写满才会往磁盘刷的,而是当这700M中被使用到了一定的限度(通常是一个百分比),就会开始往磁盘刷。

这个限度阈值也是可以通过job参数来设定的,设定参数为:mapred.job.shuffle.merge.percent(default 0.66)。如果下载速度很快,

很容易就把内存缓存撑大,那么调整一下这个参数有可能会对reduce的性能有所帮助。

 

当reduce将所有的map上对应自己partition的数据下载完成后,就会开始真正的reduce计算阶段(中间有个sort阶段通常时间非常短,几秒钟就完成了,

因为整个下载阶段就已经是边下载边sort,然后边merge的)。当reduce task真正进入reduce函数的计算阶段的时候,有一个参数也是可以调整reduce的计算行为。

也就是:mapred.job.reduce.input.buffer.percent(default 0.0)。由于reduce计算时肯定也是需要消耗内存的,而在读取reduce需要的数据时,

同样是需要内存作为buffer,这个参数是控制,需要多少的内存百分比来作为reduce读已经sort好的数据的buffer百分比。默认情况下为0,也就是说,

默认情况下,reduce是全部从磁盘开始读处理数据。如果这个参数大于0,那么就会有一定量的数据被缓存在内存并输送给reduce,当reduce计算逻辑消耗内存很小时,

可以分一部分内存用来缓存数据,反正reduce的内存闲着也是闲着。

 

参考资料

1.http://www.linuxidc.com/Linux/2011-11/47053.htm

2.http://blog.csdn.net/mrtitan/article/details/8711366

3.http://www.alidata.org/archives/1470

4.http://blog.sina.com.cn/s/blog_4a1f59bf0100ssap.html

5.http://blog.csdn.net/HEYUTAO007/article/details/5725379