一、源码下载
下面是hadoop官方源码下载地址,我下载的是hadoop-3.2.4,那就一起来看下吧
Index of /dist/hadoop/core
二、Reducer类
我们先看下我们写的reduce所继承的Reducer类
public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {/*** 传递给Reducer实现的上下文*/public abstract class Context implements ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {}/*** 在任务开始时调用一次*/protected void setup(Context context) throws IOException, InterruptedException {// NOTHING}/*** 每个键调用一次此方法。大多数应用程序将通过重写此方法来定义其reduce类。默认实现是标识函数* */@SuppressWarnings("unchecked")protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context) throws IOException, InterruptedException {for(VALUEIN value: values) {context.write((KEYOUT) key, (VALUEOUT) value);}}/*** 在任务结束时调用一次*/protected void cleanup(Context context) throws IOException, InterruptedException {// NOTHING}/*** 高级应用程序编写者可以使用 run(org.apache.hadop.mapreduce.Reporter.Context) 方法* 来控制reduce任务的工作方式*/public void run(Context context) throws IOException, InterruptedException {setup(context);try {while (context.nextKey()) {reduce(context.getCurrentKey(), context.getValues(), context);//如果使用了备份存储,请将其重置Iterator<VALUEIN> iter = context.getValues().iterator();if(iter instanceof ReduceContext.ValueIterator) {((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore(); }}} finally {cleanup(context);}}
}
经过该类的注释我们可以得到以下信息:
1、将map阶段输出的key相同的一组中间值缩减为一组较小的值
2、Reducer实现可以通过JobContext.getConfiguration()访问作业的配置
3、Reducer有三个主要阶段
3.1、Shuffle:Reducer使用HTTP在网络上复制每个Mapper的排序输出
3.2、Sort:框架合并多个Mapper的输出并按key进行排序(因为不同的Mapper可能输出相同的key)
Shuffle 和 Sort 阶段同时发生,即当提取输出时,它们被合并
3.3、SecondarySort:为了对迭代器返回的value进行二次排序,应用程序应该用二次关键字扩展关键字,并定义一个分组比较器。键将使用整个键进行排序,但将使用分组比较器进行分组,以决定在同一调用中发送哪些key和value到reduce。分组比较器是通过Job.setGroupingComparatorClass(Class)指定的。排序顺序由Job.setSortCompratorclass(Class)控制
4、Reduce阶段:为排序输入中的每<key,value集合>调用reduce()
5、ReduceTask的输出通常通过Context.write()写入RecordWriter
6、Reducer的输出未重新排序
三、ReduceTask是如何调起的
ReduceTask和MapTask一样都是由YarnChild启动的,详细可以看下上一篇博客<Hadoop-MapReduce-源码跟读-MapTask阶段篇>中的MapTask调起过程
四、ReduceTask运行细节(源码跟读)
这里我们就不从YarnChild开始捋了,而是从ReduceTask的run方法开始跟读
请注意:以下分析的是一个Job中一个ReduceTask的源码,一个Job可以有多个ReduceTask
1、ReduceTask
public void run(JobConf job, final TaskUmbilicalProtocol umbilical)throws IOException, InterruptedException, ClassNotFoundException {job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());if (isMapOrReduce()) {//Progress帮助生成进度报告的实用程序。是层次结构。子阶段的节点通过调用addPhase()创建//添加copy、sort、reduce到流程中copyPhase = getProgress().addPhase("copy");sortPhase = getProgress().addPhase("sort");reducePhase = getProgress().addPhase("reduce");}//启动将处理与父级通信的线程(创建TaskReporter并启动通信线程)TaskReporter reporter = startReporter(umbilical);//是否是哟个新API,默认false,可以通过mapred.reducer.new-api设置boolean useNewApi = job.getUseNewReducer();//初始化:// 1、构建job的上下文// 2、构建尝试任务的上下文// 3、更改任务的状态UNASSIGNED到RUNNING// 4、获取输出格式化类,默认是TextOutputFormat.class 可以通过mapreduce.job.outputformat.class设置// 5、获取输出提交器,框架依赖输出提交器做一下操作:// 5.1、在初始化期间设置作业。例如,在作业初始化期间为作业创建临时输出目录// 5.2、作业完成后清理作业。例如,在作业完成后删除临时输出目录// 5.3、设置任务临时输出// 5.4、检查任务是否需要提交。这是为了在任务不需要提交的情况下避免提交过程// 5.5、任务输出的提交// 5.6、放弃任务提交// 6、获取输出目录,并将其设置为工作目录// 7、设置任务的输出(这是从将输出到HDFS的每个单独任务的进程中调用的,并且它只是为该任务调用的。对于同一任务,但对于不同的任务尝试,可以多次调用此函数)// 8、从Job配置中的类名创建根到指定进程的ResourceCalculatorProcessTree并对其进行配置。如果类名为null,此方法将尝试返回可用于此系统的进程树插件。// 9、更新进程树// 10、获取自创建进程树以来进程树中所有进程使用的CPU时间(以毫秒为单位)initialize(job, getJobID(), reporter, useNewApi);//检查任务类型:cleanupJobTask、jobSetupTask、taskCleanupTaskif (jobCleanup) {runJobCleanupTask(umbilical, reporter);return;}if (jobSetup) {runJobSetupTask(umbilical, reporter);return;}if (taskCleanup) {runTaskCleanupTask(umbilical, reporter);return;}//初始化编解码器//检查是否要压缩map输出数据codec = initCodec();//RawKeyValueIterator是一个迭代器,用于在对中间数据进行排序/合并期间对原始键和值进行迭代。RawKeyValueIterator rIter = null;ShuffleConsumerPlugin shuffleConsumerPlugin = null;//获取用户定义的组合器类,该类用于在将map输出发送到reduce之前组合映射输出。通常,组合器与作业的 Reducer 相同,即 getReducerClass(),可以通过mapred.combiner.class设置Class combinerClass = conf.getCombinerClass();//构建组合器的输出收集者//默认收集10000个<key,value>向框架汇报一次,可以通过mapreduce.task.combine.progress.records设置CombineOutputCollector combineCollector = (null != combinerClass) ? new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null;//获取Shuffle类//默认是Shuffle.class,可以通过mapreduce.job.reduce.shuffle.consumer.plugin.class设置Class<? extends ShuffleConsumerPlugin> clazz =job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);//利用反射获取ShuffleConsumerPlugin//ShuffleConsumerPugin用于服务Reducers。它可以从内置的ShuffleHandler或第三方辅助服务中打乱MOF文件。shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);//构建Shuffle上下文(如果是本地模式,会设置localMapFiles,即:任务map输出都是在本地)ShuffleConsumerPlugin.Context shuffleContext = new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical, super.lDirAlloc, reporter, codec, combinerClass, combineCollector, spilledRecordsCounter, reduceCombineInputCounter,shuffledMapsCounter,reduceShuffleBytes, failedShuffleCounter,mergedMapOutputsCounter,taskStatus, copyPhase, sortPhase, this,mapOutputFile, localMapFiles);//根据Shuffle上下文初始化Shuffle消费者插件,默认会调用Shuffle.init()// 1、获取MapTask的个数并设置剩余MapTask和完成MapTask数量// 2、设置失败限制:Math.max(30, totalMaps / 10) 既:最小值为30// 3、设置重新拉取时间、拉取失败个数(默认5)等// 4、创建合并管理器shuffleConsumerPlugin.init(shuffleContext);//Shuffle开始,我们看看具体实现(第2步)rIter = shuffleConsumerPlugin.run();//释放数据结构mapOutputFilesOnDisk.clear();//排序阶段完成sortPhase.complete(); //开始REDUCE阶段setPhase(TaskStatus.Phase.REDUCE); //发生状态变更到task trackerstatusUpdate(umbilical);Class keyClass = job.getMapOutputKeyClass();Class valueClass = job.getMapOutputValueClass();//获取用户定义的WritableComparable比较器,用于对reduce的输入所有key进行分组。RawComparator comparator = job.getOutputValueGroupingComparator();if (useNewApi) {//我们看新APIrunNewReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass);} else {runOldReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass);}shuffleConsumerPlugin.close();done(umbilical, reporter);}private <INKEY,INVALUE,OUTKEY,OUTVALUE>void runNewReducer(JobConf job,final TaskUmbilicalProtocol umbilical,final TaskReporter reporter,RawKeyValueIterator rIter,RawComparator<INKEY> comparator,Class<INKEY> keyClass,Class<INVALUE> valueClass) throws IOException,InterruptedException, ClassNotFoundException {final RawKeyValueIterator rawIter = rIter;rIter = new RawKeyValueIterator() {public void close() throws IOException {rawIter.close();}public DataInputBuffer getKey() throws IOException {return rawIter.getKey();}public Progress getProgress() {return rawIter.getProgress();}public DataInputBuffer getValue() throws IOException {return rawIter.getValue();}public boolean next() throws IOException {boolean ret = rawIter.next();reporter.setProgress(rawIter.getProgress().getProgress());return ret;}};//制作一个任务上下文,以便我们可以获得类org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job,getTaskID(), reporter);//制作用户自定义reducerorg.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer =(org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>)ReflectionUtils.newInstance(taskContext.getReducerClass(), job);//制作RecordWriterorg.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> trackedRW = new NewTrackingRecordWriter<OUTKEY, OUTVALUE>(this, taskContext);job.setBoolean("mapred.skip.on", isSkipping());job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());//创建reduce的上下文,comparator是vlaue的比较器org.apache.hadoop.mapreduce.Reducer.Context reducerContext = createReduceContext(reducer, job, getTaskID(),rIter, reduceInputKeyCounter, reduceInputValueCounter, trackedRW,committer,reporter, comparator, keyClass,valueClass);try {//运行自定义的reduce(我们还是看WordCount的reduce)reducer.run(reducerContext);} finally {trackedRW.close(reducerContext);}}
2、Shuffle
Shuffe阶段会返回一个迭代器,用于在对中间数据进行排序/合并期间对原始键和值进行迭代
public RawKeyValueIterator run() throws IOException, InterruptedException {//缩放我们每次RPC调用获取的最大事件数,以缓解ApplicationMaster上的OOM问题// TODO: 在 HADOOP-8942 之后应该没有必要这样做//MIN_EVENTS_TO_FETCH = 100//MAX_RPC_OUTSTANDING_EVENTS = 3000000//我们为此作业配置的reduce任务数。默认为1//因此eventsPerReducer的取值范围是 [100,3000000]int eventsPerReducer = Math.max(MIN_EVENTS_TO_FETCH,MAX_RPC_OUTSTANDING_EVENTS / jobConf.getNumReduceTasks());//MAX_EVENTS_TO_FETCH = 10000//eventsPerReducer = [100,3000000]//因此 maxEventsToFetch 取值范围是 [100,10000]int maxEventsToFetch = Math.min(MAX_EVENTS_TO_FETCH, eventsPerReducer);//启动MapTask完成事件获取线程EventFetcher//EventFetcher 会在TaskTracker中查询给定事件ID中的一组MapTask完成事件
(TaskCompletionEvent.Status.SUCCEEDED)//处理任务完成事件:// 1.将SUCCEEDED映射保存在knownOutput中以获取输出。// 2.将OBSOLETE/FAILED/KILLED状态的map保存在obsoleteOutput中,以停止从这些map中获取。// 3.从neededOutput中删除TIPFAILED状态的map,因为我们根本不需要它们的输出。final EventFetcher<K, V> eventFetcher =new EventFetcher<K, V>(reduceId, umbilical, scheduler, this,maxEventsToFetch);eventFetcher.start();//启动map输出提取器线程Fetcher//localMapFiles://为map和reduce的瞬态存储操作工作区域。//map和reduce任务使用此类来标识中间文件需要写入/读取的目录。//如果job为本地作业那么localMapFiles是不为空的,既isLocal=true 如果是分布式isLocal=falseboolean isLocal = localMapFiles != null;//在Shuffle阶段,reduce运行的默认并行传输数。默认5个,可以通过mapreduce.reduce.shuffle.parallelcopies设置final int numFetchers = isLocal ? 1 :jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);//生成Fetcher数组Fetcher<K, V>[] fetchers = new Fetcher[numFetchers];if (isLocal) {//如果是本地模式,就创建一个LocalFetcher线程即可//LocalJobRunner使用LocalFetcher执行本地文件系统获取fetchers[0] = new LocalFetcher<K, V>(jobConf, reduceId, scheduler,merger, reporter, metrics, this, reduceTask.getShuffleSecret(),localMapFiles);fetchers[0].start();} else {//分布式环境下需要创建多个Fetcher从不同节点拉取数据for (int i=0; i < numFetchers; ++i) {fetchers[i] = new Fetcher<K, V>(jobConf, reduceId, scheduler, merger,reporter, metrics, this, reduceTask.getShuffleSecret());fetchers[i].start();}}//等待Shuffle成功完成while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) {reporter.progress();synchronized (this) {if (throwable != null) {throw new ShuffleError("error in shuffle in " + throwingThreadName,throwable);}}}//停止map完成事件获取线程,因为Shuffle阶段已经完了eventFetcher.shutDown();//停止map输出获取线程for (Fetcher<K, V> fetcher : fetchers) {fetcher.shutDown();}//停止调度scheduler.close();copyPhase.complete(); //复制已完成//设置此任务的当前阶段为SORTtaskStatus.setPhase(TaskStatus.Phase.SORT);//向 task tracker 发送状态更新reduceTask.statusUpdate(umbilical);//完成正在进行的合并RawKeyValueIterator kvIter = null;try {//kvIter 就是<key,value>的迭代器了,也就是传给reduce方法的值,那么SORT阶段就是在这里做了,下面我们详细看下merger.close()中的实现(第3步)kvIter = merger.close();} catch (Throwable e) {throw new ShuffleError("Error while doing final merge ", e);}// Sanity checksynchronized (this) {if (throwable != null) {throw new ShuffleError("error in shuffle in " + throwingThreadName,throwable);}}return kvIter;}
3、MergeManagerImpl
在这里会对从各个map拉取的数据做排序、合并处理
public RawKeyValueIterator close() throws Throwable {//等待正在进行的合并完成if (memToMemMerger != null) { memToMemMerger.close();}inMemoryMerger.close();onDiskMerger.close();//内存中的map输出数据List<InMemoryMapOutput<K, V>> memory = new ArrayList<InMemoryMapOutput<K, V>>(inMemoryMergedMapOutputs);inMemoryMergedMapOutputs.clear();memory.addAll(inMemoryMapOutputs);inMemoryMapOutputs.clear();//磁盘中的map输出数据List<CompressAwarePath> disk = new ArrayList<CompressAwarePath>(onDiskMapOutputs);onDiskMapOutputs.clear();return finalMerge(jobConf, rfs, memory, disk);}private RawKeyValueIterator finalMerge(JobConf job, FileSystem fs,List<InMemoryMapOutput<K,V>> inMemoryMapOutputs,List<CompressAwarePath> onDiskMapOutputs) throws IOException {LOG.info("finalMerge called with " +inMemoryMapOutputs.size() + " in-memory map-outputs and " +onDiskMapOutputs.size() + " on-disk map-outputs");//获取在ReduceTask中可用的最大内存限制final long maxInMemReduce = getMaxInMemReduceLimit();// merge config paramsClass<K> keyClass = (Class<K>)job.getMapOutputKeyClass();Class<V> valueClass = (Class<V>)job.getMapOutputValueClass();//是否应保留失败任务的临时文件?默认false,可以通过mapreduce.task.files.preserve.failedtasks设置boolean keepInputs = job.getKeepFailedTaskFiles();final Path tmpDir = new Path(reduceId.toString());//获取用于比较key的比较器,必须是RawComparator 的子类,默认为null,可以通过mapreduce.job.output.key.comparator.class设置//如果没有指定比较器,那么会获取WritableComparable(可序列化可比较)实现的比较器//RawComparator接口允许其实现直接比较数据流中的记录,无需先把数据流饭序列化为对象,这样便避免了新建对象的额外开销。final RawComparator<K> comparator =(RawComparator<K>)job.getOutputKeyComparator();//腾出内存所需的段List<Segment<K,V>> memDiskSegments = new ArrayList<Segment<K,V>>();long inMemToDiskBytes = 0;boolean mergePhaseFinished = false;if (inMemoryMapOutputs.size() > 0) {TaskID mapId = inMemoryMapOutputs.get(0).getMapId().getTaskID();//这里会和ReduceTask中可用的最大内存限制做比较,把大于内存限制的部分数据放到磁盘上,内存中保存尽可能多的数据inMemToDiskBytes = createInMemorySegments(inMemoryMapOutputs, memDiskSegments,maxInMemReduce);final int numMemDiskSegments = memDiskSegments.size();//ioSortFactor 是对文件进行排序时要同时合并的流的数量。这决定了打开的文件句柄的数量。//默认是10 可以通过mapreduce.task.io.sort.factor设置if (numMemDiskSegments > 0 &&ioSortFactor > onDiskMapOutputs.size()) {//如果我们到达这里,这意味着我们有少于io.sort.factor磁盘段,//并且这将增加1(内存段合并的结果)。由于这个总数仍然是<=io.sort.factor,//我们将不再进行任何中间合并,所有这些磁盘段的合并将直接提供给reduce方法mergePhaseFinished = true;//必须溢写到磁盘,但不能保留在mem中进行中间合并//创建一个本地reduce输入文件名,以.merged为后缀final Path outputPath = mapOutputFile.getInputFileForWrite(mapId,inMemToDiskBytes).suffix(Task.MERGED_OUTPUT_PREFIX);//开始合并,下面我们详细看下合并细节。(看第3.1步)final RawKeyValueIterator rIter = Merger.merge(job, fs,keyClass, valueClass, memDiskSegments, numMemDiskSegments,tmpDir, comparator, reporter, spilledRecordsCounter, null, mergePhase);//用CryptoOutputStream封装给定的FSDataOutputStream。//流所需的数据缓冲区大小由“mapreduce.job.encrypted-intermediate-data.buffer.kb”作业配置变量指定FSDataOutputStream out =IntermediateEncryptedStream.wrapIfNecessary(job,fs.create(outputPath), outputPath);Writer<K, V> writer = new Writer<K, V>(job, out, keyClass, valueClass,codec, null, true);try {//合并输出到文件,默认合并10000条记录就向MR ApplicationMaster发送进度通知Merger.writeFile(rIter, writer, reporter, job);writer.close();onDiskMapOutputs.add(new CompressAwarePath(outputPath,writer.getRawLength(), writer.getCompressedLength()));writer = null;//添加到最终磁盘输出的列表中} catch (IOException e) {if (null != outputPath) {try {fs.delete(outputPath, true);} catch (IOException ie) {// NOTHING}}throw e;} finally {if (null != writer) {writer.close();}}LOG.info("Merged " + numMemDiskSegments + " segments, " +inMemToDiskBytes + " bytes to disk to satisfy " +"reduce memory limit");inMemToDiskBytes = 0;memDiskSegments.clear();} else if (inMemToDiskBytes != 0) {LOG.info("Keeping " + numMemDiskSegments + " segments, " +inMemToDiskBytes + " bytes in memory for " +"intermediate, on-disk merge");}}//磁盘上的段处理List<Segment<K,V>> diskSegments = new ArrayList<Segment<K,V>>();long onDiskBytes = inMemToDiskBytes;long rawBytes = inMemToDiskBytes;CompressAwarePath[] onDisk = onDiskMapOutputs.toArray(new CompressAwarePath[onDiskMapOutputs.size()]);for (CompressAwarePath file : onDisk) {long fileLength = fs.getFileStatus(file).getLen();onDiskBytes += fileLength;rawBytes += (file.getRawDataLength() > 0) ? file.getRawDataLength() : fileLength;LOG.debug("Disk file: " + file + " Length is " + fileLength);diskSegments.add(new Segment<K, V>(job, fs, file, codec, keepInputs,(file.toString().endsWith(Task.MERGED_OUTPUT_PREFIX) ?null : mergedMapOutputsCounter), file.getRawDataLength()));}LOG.info("Merging " + onDisk.length + " files, " +onDiskBytes + " bytes from disk");Collections.sort(diskSegments, new Comparator<Segment<K,V>>() {public int compare(Segment<K, V> o1, Segment<K, V> o2) {if (o1.getLength() == o2.getLength()) {return 0;}return o1.getLength() < o2.getLength() ? -1 : 1;}});// build final list of segments from merged backed by disk + in-mem//从备份磁盘和内存构建最终的段数据列表List<Segment<K,V>> finalSegments = new ArrayList<Segment<K,V>>();long inMemBytes = createInMemorySegments(inMemoryMapOutputs, finalSegments, 0);LOG.info("Merging " + finalSegments.size() + " segments, " +inMemBytes + " bytes from memory into reduce");if (0 != onDiskBytes) {final int numInMemSegments = memDiskSegments.size();diskSegments.addAll(0, memDiskSegments);memDiskSegments.clear();//只有当将要进行中间合并时,才通过mergePhaseProgress thisPhase = (mergePhaseFinished) ? null : mergePhase; RawKeyValueIterator diskMerge = Merger.merge(job, fs, keyClass, valueClass, codec, diskSegments,ioSortFactor, numInMemSegments, tmpDir, comparator,reporter, false, spilledRecordsCounter, null, thisPhase);diskSegments.clear();if (0 == finalSegments.size()) {return diskMerge;}finalSegments.add(new Segment<K,V>(new RawKVIteratorReader(diskMerge, onDiskBytes), true, rawBytes));}//开始合并(合并细节看第3.1步)return Merger.merge(job, fs, keyClass, valueClass,finalSegments, finalSegments.size(), tmpDir,comparator, reporter, spilledRecordsCounter, null,null);}
3.1、Merger
Merger是Map和Reduce任务用于合并其内存和磁盘段的实用程序类
最终会调用Merger内部类MergeQueue中的merge()
RawKeyValueIterator merge(Class<K> keyClass, Class<V> valueClass,int factor, int inMem, Path tmpDir,Counters.Counter readsCounter,Counters.Counter writesCounter,Progress mergePhase)throws IOException {LOG.info("Merging " + segments.size() + " sorted segments");/** 如果内存中有段,则它们首先出现在段列表中,然后是已排序的磁盘段。* 否则(如果只有磁盘段),则如果段列表中有多个因子段,则它们是已排序的段。*/int numSegments = segments.size();int origFactor = factor;int passNo = 1;if (mergePhase != null) {mergeProgress = mergePhase;}//计算要合并的输入字节的预期大小,将用于计算合并进度。//这模拟了上面的merge(),并试图获得所有合并中要合并的字节数(假设合并时没有调用合并器)long totalBytes = computeBytesInMerges(factor, inMem);if (totalBytes != 0) {progPerByte = 1.0f / (float)totalBytes;}//从构造函数中创建的排序map中创建MergeStreams,并将最终输出转储到文件中do {//获取此合并过程的因子。我们假设内存中的段是段列表中的第一个条目,并且传递因子不适用于它们factor = getPassFactor(factor, passNo, numSegments - inMem);if (1 == passNo) {factor += inMem;}List<Segment<K, V>> segmentsToMerge =new ArrayList<Segment<K, V>>();int segmentsConsidered = 0;int numSegmentsToConsider = factor;long startBytes = 0; //此合并的段的起始字节while (true) {//提取最小的“factor”段数对空段调用清理(无 key/value 数据)List<Segment<K, V>> mStream = getSegmentDescriptors(numSegmentsToConsider);for (Segment<K, V> segment : mStream) {//在最后可能的时刻初始化段;这有助于确保我们在需要缓冲区之前不会使用它们segment.init(readsCounter);long startPos = segment.getReader().bytesRead;boolean hasNext = segment.nextRawKey();long endPos = segment.getReader().bytesRead;if (hasNext) {startBytes += endPos - startPos;segmentsToMerge.add(segment);segmentsConsidered++;}else {segment.close();numSegments--; //我们忽略该段进行合并}}//如果我们有所需数量的分段,或者查看所有可用的分段,我们就会中断if (segmentsConsidered == factor || segments.size() == 0) {break;}numSegmentsToConsider = factor - segmentsConsidered;}//将流馈送到优先级队列initialize(segmentsToMerge.size());clear();for (Segment<K, V> segment : segmentsToMerge) {put(segment);}//如果剩余的段数较少,则只返回迭代器,否则执行另一个单级合并if (numSegments <= factor) {if (!includeFinalMerge) { //用于reduce任务//重置totalBytesProcessed并重新计算剩余段的totalBytes,//以跟踪最终合并的进度。最终合并被视为reduce阶段的进展,即reduce任务的第三阶段。totalBytesProcessed = 0;totalBytes = 0;for (int i = 0; i < segmentsToMerge.size(); i++) {totalBytes += segmentsToMerge.get(i).getRawDataLength();}}if (totalBytes != 0) //偏执progPerByte = 1.0f / (float)totalBytes;totalBytesProcessed += startBytes; if (totalBytes != 0)mergeProgress.set(Math.min(1.0f, totalBytesProcessed * progPerByte));elsemergeProgress.set(1.0f); //最后一次传输,没有剩余的分段-我们就完成了LOG.info("Down to the last merge-pass, with " + numSegments + " segments left of total size: " +(totalBytes - totalBytesProcessed) + " bytes");return this;} else {LOG.info("Merging " + segmentsToMerge.size() + " intermediate segments out of a total of " + (segments.size()+segmentsToMerge.size()));long bytesProcessedInPrevMerges = totalBytesProcessed;totalBytesProcessed += startBytes;//如果在空间限制下可用,我们希望在多个磁盘上分散创建临时文件long approxOutputSize = 0; for (Segment<K, V> s : segmentsToMerge) {approxOutputSize += s.getLength() + ChecksumFileSystem.getApproxChkSumLength(s.getLength());}Path tmpFilename = new Path(tmpDir, "intermediate").suffix("." + passNo);Path outputFile = lDirAlloc.getLocalPathForWrite(tmpFilename.toString(),approxOutputSize, conf);FSDataOutputStream out = fs.create(outputFile);out = IntermediateEncryptedStream.wrapIfNecessary(conf, out,outputFile);Writer<K, V> writer = new Writer<K, V>(conf, out, keyClass, valueClass,codec, writesCounter, true);writeFile(this, writer, reporter, conf);writer.close();//我们完成了一次单级合并;现在清理优先级队列this.close();//将新创建的分段添加到要合并的分段列表中Segment<K, V> tempSegment = new Segment<K, V>(conf, fs, outputFile, codec, false);//在排序列表中插入新的合并段int pos = Collections.binarySearch(segments, tempSegment,segmentComparator);if (pos < 0) {//二进制搜索失败。所以要插入的位置是pos-1pos = -pos-1;}segments.add(pos, tempSegment);numSegments = segments.size();//从totalBytes中减去新段的预期大小和实际大小之间的差值(新段的预计大小为inputBytesOfThisMerge)。//若合并中未调用合并器,则预期大小和实际大小将匹配(几乎)long inputBytesOfThisMerge = totalBytesProcessed -bytesProcessedInPrevMerges;totalBytes -= inputBytesOfThisMerge - tempSegment.getRawDataLength();if (totalBytes != 0) {progPerByte = 1.0f / (float)totalBytes;}passNo++;}//我们只担心第一次通过合并因子。因此,将系数重置为原来的值factor = origFactor;} while(true);}
4、WordCount中的reduce
public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {private IntWritable result = new IntWritable();//key为从map输出收集的去重后的key,value为按照key进行分组合并排序后的value迭代器//当处理当下key时,会对下一个key的value按照reduce上下文传入的比较器进行排序public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}result.set(sum);//默认调用TextOutputFormat.write()进行输出context.write(key, result);}}
5、TextOutputFormat
public synchronized void write(K key, V value)throws IOException {boolean nullKey = key == null || key instanceof NullWritable;boolean nullValue = value == null || value instanceof NullWritable;if (nullKey && nullValue) {return;}if (!nullKey) {writeObject(key);}if (!(nullKey || nullValue)) {out.write(keyValueSeparator);}if (!nullValue) {writeObject(value);}//换行符“\n”out.write(NEWLINE);}
五、总结
1、初始化:比如构建作业和尝试任务的上下文、更新任务状态,构建输出提交器等
2、Shuffle:根据本地模式和集群模式生成不同的线程(Fetcher)组来收集map端的输出
3、Sort:对Shuffle的结果进行排序合并
4、SecondarySort:对相同key的value进行二次排序
5、构建自定义reducer、RecordWriter、reduce的上下文
6、运行用户自定义的Reduce
7、无序输出结果,一个reduce输出一份结果