Hadoop-MapReduce-源码跟读-ReduceTask阶段篇

一、源码下载

下面是hadoop官方源码下载地址,我下载的是hadoop-3.2.4,那就一起来看下吧

Index of /dist/hadoop/core

二、Reducer类

我们先看下我们写的reduce所继承的Reducer类

public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {/*** 传递给Reducer实现的上下文*/public abstract class Context implements ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {}/*** 在任务开始时调用一次*/protected void setup(Context context) throws IOException, InterruptedException {// NOTHING}/*** 每个键调用一次此方法。大多数应用程序将通过重写此方法来定义其reduce类。默认实现是标识函数* */@SuppressWarnings("unchecked")protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context) throws IOException, InterruptedException {for(VALUEIN value: values) {context.write((KEYOUT) key, (VALUEOUT) value);}}/*** 在任务结束时调用一次*/protected void cleanup(Context context) throws IOException, InterruptedException {// NOTHING}/*** 高级应用程序编写者可以使用 run(org.apache.hadop.mapreduce.Reporter.Context) 方法* 来控制reduce任务的工作方式*/public void run(Context context) throws IOException, InterruptedException {setup(context);try {while (context.nextKey()) {reduce(context.getCurrentKey(), context.getValues(), context);//如果使用了备份存储,请将其重置Iterator<VALUEIN> iter = context.getValues().iterator();if(iter instanceof ReduceContext.ValueIterator) {((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();        }}} finally {cleanup(context);}}
}

经过该类的注释我们可以得到以下信息:

1、将map阶段输出的key相同的一组中间值缩减为一组较小的值

2、Reducer实现可以通过JobContext.getConfiguration()访问作业的配置

3、Reducer有三个主要阶段

        3.1、Shuffle:Reducer使用HTTP在网络上复制每个Mapper的排序输出

        3.2、Sort:框架合并多个Mapper的输出并按key进行排序(因为不同的Mapper可能输出相同的key)

        Shuffle 和 Sort 阶段同时发生,即当提取输出时,它们被合并

        3.3、SecondarySort:为了对迭代器返回的value进行二次排序,应用程序应该用二次关键字扩展关键字,并定义一个分组比较器。键将使用整个键进行排序,但将使用分组比较器进行分组,以决定在同一调用中发送哪些key和value到reduce。分组比较器是通过Job.setGroupingComparatorClass(Class)指定的。排序顺序由Job.setSortCompratorclass(Class)控制

4、Reduce阶段:为排序输入中的每<key,value集合>调用reduce()

5、ReduceTask的输出通常通过Context.write()写入RecordWriter

6、Reducer的输出未重新排序

三、ReduceTask是如何调起的

ReduceTask和MapTask一样都是由YarnChild启动的,详细可以看下上一篇博客<Hadoop-MapReduce-源码跟读-MapTask阶段篇>中的MapTask调起过程

四、ReduceTask运行细节(源码跟读)

这里我们就不从YarnChild开始捋了,而是从ReduceTask的run方法开始跟读

请注意:以下分析的是一个Job中一个ReduceTask的源码,一个Job可以有多个ReduceTask

1、ReduceTask

public void run(JobConf job, final TaskUmbilicalProtocol umbilical)throws IOException, InterruptedException, ClassNotFoundException {job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());if (isMapOrReduce()) {//Progress帮助生成进度报告的实用程序。是层次结构。子阶段的节点通过调用addPhase()创建//添加copy、sort、reduce到流程中copyPhase = getProgress().addPhase("copy");sortPhase  = getProgress().addPhase("sort");reducePhase = getProgress().addPhase("reduce");}//启动将处理与父级通信的线程(创建TaskReporter并启动通信线程)TaskReporter reporter = startReporter(umbilical);//是否是哟个新API,默认false,可以通过mapred.reducer.new-api设置boolean useNewApi = job.getUseNewReducer();//初始化://    1、构建job的上下文//    2、构建尝试任务的上下文//    3、更改任务的状态UNASSIGNED到RUNNING//    4、获取输出格式化类,默认是TextOutputFormat.class 可以通过mapreduce.job.outputformat.class设置//    5、获取输出提交器,框架依赖输出提交器做一下操作://        5.1、在初始化期间设置作业。例如,在作业初始化期间为作业创建临时输出目录//        5.2、作业完成后清理作业。例如,在作业完成后删除临时输出目录//        5.3、设置任务临时输出//        5.4、检查任务是否需要提交。这是为了在任务不需要提交的情况下避免提交过程//        5.5、任务输出的提交//        5.6、放弃任务提交//    6、获取输出目录,并将其设置为工作目录//    7、设置任务的输出(这是从将输出到HDFS的每个单独任务的进程中调用的,并且它只是为该任务调用的。对于同一任务,但对于不同的任务尝试,可以多次调用此函数)//    8、从Job配置中的类名创建根到指定进程的ResourceCalculatorProcessTree并对其进行配置。如果类名为null,此方法将尝试返回可用于此系统的进程树插件。//    9、更新进程树//    10、获取自创建进程树以来进程树中所有进程使用的CPU时间(以毫秒为单位)initialize(job, getJobID(), reporter, useNewApi);//检查任务类型:cleanupJobTask、jobSetupTask、taskCleanupTaskif (jobCleanup) {runJobCleanupTask(umbilical, reporter);return;}if (jobSetup) {runJobSetupTask(umbilical, reporter);return;}if (taskCleanup) {runTaskCleanupTask(umbilical, reporter);return;}//初始化编解码器//检查是否要压缩map输出数据codec = initCodec();//RawKeyValueIterator是一个迭代器,用于在对中间数据进行排序/合并期间对原始键和值进行迭代。RawKeyValueIterator rIter = null;ShuffleConsumerPlugin shuffleConsumerPlugin = null;//获取用户定义的组合器类,该类用于在将map输出发送到reduce之前组合映射输出。通常,组合器与作业的 Reducer 相同,即 getReducerClass(),可以通过mapred.combiner.class设置Class combinerClass = conf.getCombinerClass();//构建组合器的输出收集者//默认收集10000个<key,value>向框架汇报一次,可以通过mapreduce.task.combine.progress.records设置CombineOutputCollector combineCollector = (null != combinerClass) ? new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null;//获取Shuffle类//默认是Shuffle.class,可以通过mapreduce.job.reduce.shuffle.consumer.plugin.class设置Class<? extends ShuffleConsumerPlugin> clazz =job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);//利用反射获取ShuffleConsumerPlugin//ShuffleConsumerPugin用于服务Reducers。它可以从内置的ShuffleHandler或第三方辅助服务中打乱MOF文件。shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);//构建Shuffle上下文(如果是本地模式,会设置localMapFiles,即:任务map输出都是在本地)ShuffleConsumerPlugin.Context shuffleContext = new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical, super.lDirAlloc, reporter, codec, combinerClass, combineCollector, spilledRecordsCounter, reduceCombineInputCounter,shuffledMapsCounter,reduceShuffleBytes, failedShuffleCounter,mergedMapOutputsCounter,taskStatus, copyPhase, sortPhase, this,mapOutputFile, localMapFiles);//根据Shuffle上下文初始化Shuffle消费者插件,默认会调用Shuffle.init()//    1、获取MapTask的个数并设置剩余MapTask和完成MapTask数量//    2、设置失败限制:Math.max(30, totalMaps / 10) 既:最小值为30//    3、设置重新拉取时间、拉取失败个数(默认5)等//    4、创建合并管理器shuffleConsumerPlugin.init(shuffleContext);//Shuffle开始,我们看看具体实现(第2步)rIter = shuffleConsumerPlugin.run();//释放数据结构mapOutputFilesOnDisk.clear();//排序阶段完成sortPhase.complete();                         //开始REDUCE阶段setPhase(TaskStatus.Phase.REDUCE); //发生状态变更到task trackerstatusUpdate(umbilical);Class keyClass = job.getMapOutputKeyClass();Class valueClass = job.getMapOutputValueClass();//获取用户定义的WritableComparable比较器,用于对reduce的输入所有key进行分组。RawComparator comparator = job.getOutputValueGroupingComparator();if (useNewApi) {//我们看新APIrunNewReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass);} else {runOldReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass);}shuffleConsumerPlugin.close();done(umbilical, reporter);}private <INKEY,INVALUE,OUTKEY,OUTVALUE>void runNewReducer(JobConf job,final TaskUmbilicalProtocol umbilical,final TaskReporter reporter,RawKeyValueIterator rIter,RawComparator<INKEY> comparator,Class<INKEY> keyClass,Class<INVALUE> valueClass) throws IOException,InterruptedException, ClassNotFoundException {final RawKeyValueIterator rawIter = rIter;rIter = new RawKeyValueIterator() {public void close() throws IOException {rawIter.close();}public DataInputBuffer getKey() throws IOException {return rawIter.getKey();}public Progress getProgress() {return rawIter.getProgress();}public DataInputBuffer getValue() throws IOException {return rawIter.getValue();}public boolean next() throws IOException {boolean ret = rawIter.next();reporter.setProgress(rawIter.getProgress().getProgress());return ret;}};//制作一个任务上下文,以便我们可以获得类org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job,getTaskID(), reporter);//制作用户自定义reducerorg.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer =(org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>)ReflectionUtils.newInstance(taskContext.getReducerClass(), job);//制作RecordWriterorg.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> trackedRW = new NewTrackingRecordWriter<OUTKEY, OUTVALUE>(this, taskContext);job.setBoolean("mapred.skip.on", isSkipping());job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());//创建reduce的上下文,comparator是vlaue的比较器org.apache.hadoop.mapreduce.Reducer.Context reducerContext = createReduceContext(reducer, job, getTaskID(),rIter, reduceInputKeyCounter, reduceInputValueCounter, trackedRW,committer,reporter, comparator, keyClass,valueClass);try {//运行自定义的reduce(我们还是看WordCount的reduce)reducer.run(reducerContext);} finally {trackedRW.close(reducerContext);}}

2、Shuffle

Shuffe阶段会返回一个迭代器,用于在对中间数据进行排序/合并期间对原始键和值进行迭代

public RawKeyValueIterator run() throws IOException, InterruptedException {//缩放我们每次RPC调用获取的最大事件数,以缓解ApplicationMaster上的OOM问题// TODO: 在 HADOOP-8942 之后应该没有必要这样做//MIN_EVENTS_TO_FETCH = 100//MAX_RPC_OUTSTANDING_EVENTS = 3000000//我们为此作业配置的reduce任务数。默认为1//因此eventsPerReducer的取值范围是 [100,3000000]int eventsPerReducer = Math.max(MIN_EVENTS_TO_FETCH,MAX_RPC_OUTSTANDING_EVENTS / jobConf.getNumReduceTasks());//MAX_EVENTS_TO_FETCH = 10000//eventsPerReducer = [100,3000000]//因此 maxEventsToFetch 取值范围是 [100,10000]int maxEventsToFetch = Math.min(MAX_EVENTS_TO_FETCH, eventsPerReducer);//启动MapTask完成事件获取线程EventFetcher//EventFetcher 会在TaskTracker中查询给定事件ID中的一组MapTask完成事件
(TaskCompletionEvent.Status.SUCCEEDED)//处理任务完成事件://    1.将SUCCEEDED映射保存在knownOutput中以获取输出。//    2.将OBSOLETE/FAILED/KILLED状态的map保存在obsoleteOutput中,以停止从这些map中获取。//    3.从neededOutput中删除TIPFAILED状态的map,因为我们根本不需要它们的输出。final EventFetcher<K, V> eventFetcher =new EventFetcher<K, V>(reduceId, umbilical, scheduler, this,maxEventsToFetch);eventFetcher.start();//启动map输出提取器线程Fetcher//localMapFiles://为map和reduce的瞬态存储操作工作区域。//map和reduce任务使用此类来标识中间文件需要写入/读取的目录。//如果job为本地作业那么localMapFiles是不为空的,既isLocal=true 如果是分布式isLocal=falseboolean isLocal = localMapFiles != null;//在Shuffle阶段,reduce运行的默认并行传输数。默认5个,可以通过mapreduce.reduce.shuffle.parallelcopies设置final int numFetchers = isLocal ? 1 :jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);//生成Fetcher数组Fetcher<K, V>[] fetchers = new Fetcher[numFetchers];if (isLocal) {//如果是本地模式,就创建一个LocalFetcher线程即可//LocalJobRunner使用LocalFetcher执行本地文件系统获取fetchers[0] = new LocalFetcher<K, V>(jobConf, reduceId, scheduler,merger, reporter, metrics, this, reduceTask.getShuffleSecret(),localMapFiles);fetchers[0].start();} else {//分布式环境下需要创建多个Fetcher从不同节点拉取数据for (int i=0; i < numFetchers; ++i) {fetchers[i] = new Fetcher<K, V>(jobConf, reduceId, scheduler, merger,reporter, metrics, this, reduceTask.getShuffleSecret());fetchers[i].start();}}//等待Shuffle成功完成while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) {reporter.progress();synchronized (this) {if (throwable != null) {throw new ShuffleError("error in shuffle in " + throwingThreadName,throwable);}}}//停止map完成事件获取线程,因为Shuffle阶段已经完了eventFetcher.shutDown();//停止map输出获取线程for (Fetcher<K, V> fetcher : fetchers) {fetcher.shutDown();}//停止调度scheduler.close();copyPhase.complete(); //复制已完成//设置此任务的当前阶段为SORTtaskStatus.setPhase(TaskStatus.Phase.SORT);//向 task tracker 发送状态更新reduceTask.statusUpdate(umbilical);//完成正在进行的合并RawKeyValueIterator kvIter = null;try {//kvIter 就是<key,value>的迭代器了,也就是传给reduce方法的值,那么SORT阶段就是在这里做了,下面我们详细看下merger.close()中的实现(第3步)kvIter = merger.close();} catch (Throwable e) {throw new ShuffleError("Error while doing final merge ", e);}// Sanity checksynchronized (this) {if (throwable != null) {throw new ShuffleError("error in shuffle in " + throwingThreadName,throwable);}}return kvIter;}

3、MergeManagerImpl

在这里会对从各个map拉取的数据做排序、合并处理

public RawKeyValueIterator close() throws Throwable {//等待正在进行的合并完成if (memToMemMerger != null) { memToMemMerger.close();}inMemoryMerger.close();onDiskMerger.close();//内存中的map输出数据List<InMemoryMapOutput<K, V>> memory = new ArrayList<InMemoryMapOutput<K, V>>(inMemoryMergedMapOutputs);inMemoryMergedMapOutputs.clear();memory.addAll(inMemoryMapOutputs);inMemoryMapOutputs.clear();//磁盘中的map输出数据List<CompressAwarePath> disk = new ArrayList<CompressAwarePath>(onDiskMapOutputs);onDiskMapOutputs.clear();return finalMerge(jobConf, rfs, memory, disk);}private RawKeyValueIterator finalMerge(JobConf job, FileSystem fs,List<InMemoryMapOutput<K,V>> inMemoryMapOutputs,List<CompressAwarePath> onDiskMapOutputs) throws IOException {LOG.info("finalMerge called with " +inMemoryMapOutputs.size() + " in-memory map-outputs and " +onDiskMapOutputs.size() + " on-disk map-outputs");//获取在ReduceTask中可用的最大内存限制final long maxInMemReduce = getMaxInMemReduceLimit();// merge config paramsClass<K> keyClass = (Class<K>)job.getMapOutputKeyClass();Class<V> valueClass = (Class<V>)job.getMapOutputValueClass();//是否应保留失败任务的临时文件?默认false,可以通过mapreduce.task.files.preserve.failedtasks设置boolean keepInputs = job.getKeepFailedTaskFiles();final Path tmpDir = new Path(reduceId.toString());//获取用于比较key的比较器,必须是RawComparator 的子类,默认为null,可以通过mapreduce.job.output.key.comparator.class设置//如果没有指定比较器,那么会获取WritableComparable(可序列化可比较)实现的比较器//RawComparator接口允许其实现直接比较数据流中的记录,无需先把数据流饭序列化为对象,这样便避免了新建对象的额外开销。final RawComparator<K> comparator =(RawComparator<K>)job.getOutputKeyComparator();//腾出内存所需的段List<Segment<K,V>> memDiskSegments = new ArrayList<Segment<K,V>>();long inMemToDiskBytes = 0;boolean mergePhaseFinished = false;if (inMemoryMapOutputs.size() > 0) {TaskID mapId = inMemoryMapOutputs.get(0).getMapId().getTaskID();//这里会和ReduceTask中可用的最大内存限制做比较,把大于内存限制的部分数据放到磁盘上,内存中保存尽可能多的数据inMemToDiskBytes = createInMemorySegments(inMemoryMapOutputs, memDiskSegments,maxInMemReduce);final int numMemDiskSegments = memDiskSegments.size();//ioSortFactor 是对文件进行排序时要同时合并的流的数量。这决定了打开的文件句柄的数量。//默认是10 可以通过mapreduce.task.io.sort.factor设置if (numMemDiskSegments > 0 &&ioSortFactor > onDiskMapOutputs.size()) {//如果我们到达这里,这意味着我们有少于io.sort.factor磁盘段,//并且这将增加1(内存段合并的结果)。由于这个总数仍然是<=io.sort.factor,//我们将不再进行任何中间合并,所有这些磁盘段的合并将直接提供给reduce方法mergePhaseFinished = true;//必须溢写到磁盘,但不能保留在mem中进行中间合并//创建一个本地reduce输入文件名,以.merged为后缀final Path outputPath = mapOutputFile.getInputFileForWrite(mapId,inMemToDiskBytes).suffix(Task.MERGED_OUTPUT_PREFIX);//开始合并,下面我们详细看下合并细节。(看第3.1步)final RawKeyValueIterator rIter = Merger.merge(job, fs,keyClass, valueClass, memDiskSegments, numMemDiskSegments,tmpDir, comparator, reporter, spilledRecordsCounter, null, mergePhase);//用CryptoOutputStream封装给定的FSDataOutputStream。//流所需的数据缓冲区大小由“mapreduce.job.encrypted-intermediate-data.buffer.kb”作业配置变量指定FSDataOutputStream out =IntermediateEncryptedStream.wrapIfNecessary(job,fs.create(outputPath), outputPath);Writer<K, V> writer = new Writer<K, V>(job, out, keyClass, valueClass,codec, null, true);try {//合并输出到文件,默认合并10000条记录就向MR ApplicationMaster发送进度通知Merger.writeFile(rIter, writer, reporter, job);writer.close();onDiskMapOutputs.add(new CompressAwarePath(outputPath,writer.getRawLength(), writer.getCompressedLength()));writer = null;//添加到最终磁盘输出的列表中} catch (IOException e) {if (null != outputPath) {try {fs.delete(outputPath, true);} catch (IOException ie) {// NOTHING}}throw e;} finally {if (null != writer) {writer.close();}}LOG.info("Merged " + numMemDiskSegments + " segments, " +inMemToDiskBytes + " bytes to disk to satisfy " +"reduce memory limit");inMemToDiskBytes = 0;memDiskSegments.clear();} else if (inMemToDiskBytes != 0) {LOG.info("Keeping " + numMemDiskSegments + " segments, " +inMemToDiskBytes + " bytes in memory for " +"intermediate, on-disk merge");}}//磁盘上的段处理List<Segment<K,V>> diskSegments = new ArrayList<Segment<K,V>>();long onDiskBytes = inMemToDiskBytes;long rawBytes = inMemToDiskBytes;CompressAwarePath[] onDisk = onDiskMapOutputs.toArray(new CompressAwarePath[onDiskMapOutputs.size()]);for (CompressAwarePath file : onDisk) {long fileLength = fs.getFileStatus(file).getLen();onDiskBytes += fileLength;rawBytes += (file.getRawDataLength() > 0) ? file.getRawDataLength() : fileLength;LOG.debug("Disk file: " + file + " Length is " + fileLength);diskSegments.add(new Segment<K, V>(job, fs, file, codec, keepInputs,(file.toString().endsWith(Task.MERGED_OUTPUT_PREFIX) ?null : mergedMapOutputsCounter), file.getRawDataLength()));}LOG.info("Merging " + onDisk.length + " files, " +onDiskBytes + " bytes from disk");Collections.sort(diskSegments, new Comparator<Segment<K,V>>() {public int compare(Segment<K, V> o1, Segment<K, V> o2) {if (o1.getLength() == o2.getLength()) {return 0;}return o1.getLength() < o2.getLength() ? -1 : 1;}});// build final list of segments from merged backed by disk + in-mem//从备份磁盘和内存构建最终的段数据列表List<Segment<K,V>> finalSegments = new ArrayList<Segment<K,V>>();long inMemBytes = createInMemorySegments(inMemoryMapOutputs, finalSegments, 0);LOG.info("Merging " + finalSegments.size() + " segments, " +inMemBytes + " bytes from memory into reduce");if (0 != onDiskBytes) {final int numInMemSegments = memDiskSegments.size();diskSegments.addAll(0, memDiskSegments);memDiskSegments.clear();//只有当将要进行中间合并时,才通过mergePhaseProgress thisPhase = (mergePhaseFinished) ? null : mergePhase; RawKeyValueIterator diskMerge = Merger.merge(job, fs, keyClass, valueClass, codec, diskSegments,ioSortFactor, numInMemSegments, tmpDir, comparator,reporter, false, spilledRecordsCounter, null, thisPhase);diskSegments.clear();if (0 == finalSegments.size()) {return diskMerge;}finalSegments.add(new Segment<K,V>(new RawKVIteratorReader(diskMerge, onDiskBytes), true, rawBytes));}//开始合并(合并细节看第3.1步)return Merger.merge(job, fs, keyClass, valueClass,finalSegments, finalSegments.size(), tmpDir,comparator, reporter, spilledRecordsCounter, null,null);}

3.1、Merger

Merger是Map和Reduce任务用于合并其内存和磁盘段的实用程序类

最终会调用Merger内部类MergeQueue中的merge()

RawKeyValueIterator merge(Class<K> keyClass, Class<V> valueClass,int factor, int inMem, Path tmpDir,Counters.Counter readsCounter,Counters.Counter writesCounter,Progress mergePhase)throws IOException {LOG.info("Merging " + segments.size() + " sorted segments");/** 如果内存中有段,则它们首先出现在段列表中,然后是已排序的磁盘段。* 否则(如果只有磁盘段),则如果段列表中有多个因子段,则它们是已排序的段。*/int numSegments = segments.size();int origFactor = factor;int passNo = 1;if (mergePhase != null) {mergeProgress = mergePhase;}//计算要合并的输入字节的预期大小,将用于计算合并进度。//这模拟了上面的merge(),并试图获得所有合并中要合并的字节数(假设合并时没有调用合并器)long totalBytes = computeBytesInMerges(factor, inMem);if (totalBytes != 0) {progPerByte = 1.0f / (float)totalBytes;}//从构造函数中创建的排序map中创建MergeStreams,并将最终输出转储到文件中do {//获取此合并过程的因子。我们假设内存中的段是段列表中的第一个条目,并且传递因子不适用于它们factor = getPassFactor(factor, passNo, numSegments - inMem);if (1 == passNo) {factor += inMem;}List<Segment<K, V>> segmentsToMerge =new ArrayList<Segment<K, V>>();int segmentsConsidered = 0;int numSegmentsToConsider = factor;long startBytes = 0; //此合并的段的起始字节while (true) {//提取最小的“factor”段数对空段调用清理(无 key/value 数据)List<Segment<K, V>> mStream = getSegmentDescriptors(numSegmentsToConsider);for (Segment<K, V> segment : mStream) {//在最后可能的时刻初始化段;这有助于确保我们在需要缓冲区之前不会使用它们segment.init(readsCounter);long startPos = segment.getReader().bytesRead;boolean hasNext = segment.nextRawKey();long endPos = segment.getReader().bytesRead;if (hasNext) {startBytes += endPos - startPos;segmentsToMerge.add(segment);segmentsConsidered++;}else {segment.close();numSegments--; //我们忽略该段进行合并}}//如果我们有所需数量的分段,或者查看所有可用的分段,我们就会中断if (segmentsConsidered == factor || segments.size() == 0) {break;}numSegmentsToConsider = factor - segmentsConsidered;}//将流馈送到优先级队列initialize(segmentsToMerge.size());clear();for (Segment<K, V> segment : segmentsToMerge) {put(segment);}//如果剩余的段数较少,则只返回迭代器,否则执行另一个单级合并if (numSegments <= factor) {if (!includeFinalMerge) { //用于reduce任务//重置totalBytesProcessed并重新计算剩余段的totalBytes,//以跟踪最终合并的进度。最终合并被视为reduce阶段的进展,即reduce任务的第三阶段。totalBytesProcessed = 0;totalBytes = 0;for (int i = 0; i < segmentsToMerge.size(); i++) {totalBytes += segmentsToMerge.get(i).getRawDataLength();}}if (totalBytes != 0) //偏执progPerByte = 1.0f / (float)totalBytes;totalBytesProcessed += startBytes;         if (totalBytes != 0)mergeProgress.set(Math.min(1.0f, totalBytesProcessed * progPerByte));elsemergeProgress.set(1.0f); //最后一次传输,没有剩余的分段-我们就完成了LOG.info("Down to the last merge-pass, with " + numSegments + " segments left of total size: " +(totalBytes - totalBytesProcessed) + " bytes");return this;} else {LOG.info("Merging " + segmentsToMerge.size() + " intermediate segments out of a total of " + (segments.size()+segmentsToMerge.size()));long bytesProcessedInPrevMerges = totalBytesProcessed;totalBytesProcessed += startBytes;//如果在空间限制下可用,我们希望在多个磁盘上分散创建临时文件long approxOutputSize = 0; for (Segment<K, V> s : segmentsToMerge) {approxOutputSize += s.getLength() + ChecksumFileSystem.getApproxChkSumLength(s.getLength());}Path tmpFilename = new Path(tmpDir, "intermediate").suffix("." + passNo);Path outputFile =  lDirAlloc.getLocalPathForWrite(tmpFilename.toString(),approxOutputSize, conf);FSDataOutputStream out = fs.create(outputFile);out = IntermediateEncryptedStream.wrapIfNecessary(conf, out,outputFile);Writer<K, V> writer = new Writer<K, V>(conf, out, keyClass, valueClass,codec, writesCounter, true);writeFile(this, writer, reporter, conf);writer.close();//我们完成了一次单级合并;现在清理优先级队列this.close();//将新创建的分段添加到要合并的分段列表中Segment<K, V> tempSegment = new Segment<K, V>(conf, fs, outputFile, codec, false);//在排序列表中插入新的合并段int pos = Collections.binarySearch(segments, tempSegment,segmentComparator);if (pos < 0) {//二进制搜索失败。所以要插入的位置是pos-1pos = -pos-1;}segments.add(pos, tempSegment);numSegments = segments.size();//从totalBytes中减去新段的预期大小和实际大小之间的差值(新段的预计大小为inputBytesOfThisMerge)。//若合并中未调用合并器,则预期大小和实际大小将匹配(几乎)long inputBytesOfThisMerge = totalBytesProcessed -bytesProcessedInPrevMerges;totalBytes -= inputBytesOfThisMerge - tempSegment.getRawDataLength();if (totalBytes != 0) {progPerByte = 1.0f / (float)totalBytes;}passNo++;}//我们只担心第一次通过合并因子。因此,将系数重置为原来的值factor = origFactor;} while(true);}

4、WordCount中的reduce

  public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {private IntWritable result = new IntWritable();//key为从map输出收集的去重后的key,value为按照key进行分组合并排序后的value迭代器//当处理当下key时,会对下一个key的value按照reduce上下文传入的比较器进行排序public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}result.set(sum);//默认调用TextOutputFormat.write()进行输出context.write(key, result);}}

5、TextOutputFormat

    public synchronized void write(K key, V value)throws IOException {boolean nullKey = key == null || key instanceof NullWritable;boolean nullValue = value == null || value instanceof NullWritable;if (nullKey && nullValue) {return;}if (!nullKey) {writeObject(key);}if (!(nullKey || nullValue)) {out.write(keyValueSeparator);}if (!nullValue) {writeObject(value);}//换行符“\n”out.write(NEWLINE);}

五、总结

1、初始化:比如构建作业和尝试任务的上下文、更新任务状态,构建输出提交器等

2、Shuffle:根据本地模式和集群模式生成不同的线程(Fetcher)组来收集map端的输出

3、Sort:对Shuffle的结果进行排序合并

4、SecondarySort:对相同key的value进行二次排序

5、构建自定义reducer、RecordWriter、reduce的上下文

6、运行用户自定义的Reduce

7、无序输出结果,一个reduce输出一份结果

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/659576.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Spring Boot 中操作 Bean 的生命周期

1.InitializingBean和DisposableBean InitializingBean接口提供了afterPropertiesSet方法&#xff0c;用于在bean的属性设置好之后调用&#xff1b; DisposableBean接口提供了destroy方法&#xff0c;用于在bean销毁之后调用&#xff1b; public class TestComponent implem…

C语言-算法-搜索剪枝与记忆化搜索

Function 题目描述 对于一个递归函数 w ( a , b , c ) w(a,b,c) w(a,b,c) 如果 a ≤ 0 a \le 0 a≤0 或 b ≤ 0 b \le 0 b≤0 或 c ≤ 0 c \le 0 c≤0 就返回值$ 1$。如果 a > 20 a>20 a>20 或 b > 20 b>20 b>20 或 c > 20 c>20 c>20 就返…

用 CanvasKit 实现超级丝滑的原神地图(已开源)!!!

首先给大家送上预览地址&#xff1a; 官网地址&#xff1a;https://webstatic.mihoyo.com/ys/app/interactive-map/index.html canvaskit地址&#xff1a;http://106.55.55.247/ky-genshin-map/ 为什么 canvaskit 有如此高的性能&#xff1f; 第一个问题&#xff0c;官方网页…

算法训练营day19,二叉树8-2

type TreeNode struct { Val int Left *TreeNode Right *TreeNode } 450. 删除二叉搜索树中的节点 /*本题比较难&#xff0c;删除节点要分五种情况考虑 1.没有找到要删除的节点 2.找到要删除的节点是叶子节点 3.找到要删除的节点&#xff0c;左指针不为空&#xff0c;…

[嵌入式系统-7]:龙芯1B 开发学习套件 -4- LoongIDE 集成开发工具的使用-创建应用程序工程、编译、下载、调试

目录 前言&#xff1a; 步骤1&#xff1a;设置工作工作空间 步骤2&#xff1a;设置工具链 步骤3&#xff1a;创建裸机应用程序 步骤4&#xff1a;创建带实时操作系统的应用程序 步骤5&#xff1a;编译 步骤6&#xff1a;下载调试 前言&#xff1a; LoongIDE集成开发环境…

使用 axios 请求库,设置请求拦截

什么是 axios&#xff1f; 基于promise网络请求库&#xff0c;可以同构&#xff08;同一套代码可以运行在浏览器&#xff09;&#xff0c;在服务端&#xff0c;使用原生node.js的http模块&#xff0c;在客户端&#xff08;浏览器&#xff09;中&#xff0c;使用XMLHttpRequests…

什么是适配器模式?它的实现方式有哪些?

什么是适配器模式&#xff1f;它的实现方式有哪些&#xff1f; 适配器模式是一种结构型设计模式&#xff0c;用于将一个类的接口转换成客户端所期望的另一个接口&#xff0c;以解决由于接口不兼容而不能协同工作的问题。适配器模式可以使原本由于接口不兼容而不能一起工作的类…

知识价值1-github站点域名

github如果访问不上&#xff0c;有一个办法是hosts映射&#xff1a; github.com x.x.x.x github.global.ssl.fastly.net y.y.y.y assets-cdn.github.com z.z.z.z1 assets-cdn.github.com z.z.z.z2 assets-cdn.github.com z.z.z.z3 assets-cdn.github.com z.z.z.z3 那这几个域名…

vue3开发,axios发送请求是携带params参数的避坑

vue3开发,axios发送请求是携带params参数的避坑&#xff01;今天一直报错&#xff0c;点击新增购物车&#xff0c;报错&#xff0c; 【Uncaught (in promise) TypeError: target must be an object】。查询了网上的资料说的都不对。都没有解决。最终还是被我整明白了。 网上网…

指针的深入理解(三)

这一节主要使用复习回调函数&#xff0c; 利用冒泡模拟实现qsort函数。 qsort 排序使用冒泡排序&#xff0c;主要难点在于运用元素个数和字节数以及基地址控制元素的比较&#xff1a; if里面使用了一个判断函数&#xff0c;qsort可以排序任意的数据&#xff0c;原因就是因为可…

[工具探索]Safari 和 Google Chrome 浏览器内核差异

最近有些Vue3的项目&#xff0c;使用了safari进行测试环境搞开发&#xff0c;发现页面存在不同程序的页面乱码情况&#xff0c;反而google浏览器没问题&#xff0c;下面我们就对比下他们之间的差异点&#xff1a; 日常开发google chrome占多数&#xff1b;现在主流浏览器 Goog…

oracle 监听的主机名出现异常时候,排查放向

oracle创建监听有多种方式&#xff1a; 1、手动编写$ORACLE_HOME/network/admin/listener.ora配置文件 2、通过netmgr或者netca创建 3、通过netca静默创建 当前环境是&#xff1a; 1、/etc/hosts文件中没有对主机名进行解析 2、在oracle的.bash_profile中增加了环境变量export…

机器学习-3降低损失(Reducing Loss)

机器学习-3降低损失(Reducing Loss) 学习内容来自&#xff1a;谷歌ai学习 https://developers.google.cn/machine-learning/crash-course/framing/check-your-understanding?hlzh-cn 本文作为学习记录1.降低损失&#xff1a;迭代方法 迭代学习 下图展示了机器学习算法用于训…

Flink实战五_状态机制

接上文&#xff1a;Flink实战四_TableAPI&SQL 在学习Flink的状态机制之前&#xff0c;我们需要理解什么是状态。回顾我们之前介绍的很多流计算的计算过程&#xff0c;有些计算方法&#xff0c;比如说我们之前多次使用的将stock.txt中的一行文本数据转换成Stock股票对象的ma…

python笔记11

1、模块简介 在Python中&#xff0c;模块是一种组织代码的方式&#xff0c;允许你将相关的代码放在一个文件中&#xff0c;以便更好地组织和重用。模块可以包含变量、函数和类等。下面是关于Python模块的一些基本概念&#xff1a; 1. 创建模块 要创建一个模块&#xff0c;只…

【DB2 流浪之旅】 第一讲 Linux 环境安装 db2 数据库

DB2数据库是IBM开发的一种大型关系型数据库平台。它支持多用户或应用程序在同一条SQL 语句中查询不同database甚至不同DBMS中的数据。一般DB2是搭配IBM Power系列小机使用的&#xff0c;兼容性好、性能高。当然DB2也有Linux版本的&#xff0c;相对性能会差一些&#xff0c;主要…

【FAS Survey】《Deep learning for face anti-spoofing: A Survey》

PAMI-2022 最新成果&#xff1a;https://github.com/ZitongYu/DeepFAS 文章目录 1 Introduction & Background1.1 Face Spoofing Attacks1.2 Datasets for Face Anti-Spoofing1.3 Evaluation Metrics1.4 Evaluation Protocols 2 Deep FAS with Commercial RGB Camera2.1 H…

CF1918 D. Blocking Elements [二分+数据结构优化dp]

传送门:CF [前题提要]:二分数据结构优化dp,赛时想到了二分,想到了dp,想到了应该是某种双log的做法,但是硬是想不出正确的dp的定义,看了讲解感觉dp方程的定义还是很典的,dp题写的少是这样的… 题目要求我们输出满足所有去掉的数字和以及区间段和的最大值的最小值.不难想到使用二…

meson、ninja编译dpdk

解压目录meson编译dpdk meson buildmeson编译dpdk debug版 meson setup --buildtypedebug debugbuildmeson编译使用静态库&#xff0c;编译example meson .. --prefix/usr/local --buildtypedebugoptimized --default-librarystatic -Dexamplesallninja编译 ninjaninja安装…

springboot-前后端分离——第二篇

本篇主要介绍一个发送请求的工具—postman&#xff0c;然后对请求中的参数进行介绍&#xff0c;例如简单参数、实体参数、数组参数、集合参数、日期类型参数以及json类型参数&#xff0c;对这些参数接收进行总结。最后对响应数据进行介绍&#xff0c;使用统一响应结果返回浏览器…