Hadoop-MapReduce-源码跟读-客户端篇

一、源码下载

下面是hadoop官方源码下载地址,我下载的是hadoop-3.2.4,那就一起来看下吧

Index of /dist/hadoop/core

二、从WordCount进入源码

用idea将源码加载进来后,找到org.apache.hadoop.examples.WordCount类(快捷方法:双击Shift输入WordCount)

/*** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
package org.apache.hadoop.examples;import java.io.IOException;
import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;public class WordCount {public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{private final static IntWritable one = new IntWritable(1);private Text word = new Text();public void map(Object key, Text value, Context context) throws IOException, InterruptedException {StringTokenizer itr = new StringTokenizer(value.toString());while (itr.hasMoreTokens()) {word.set(itr.nextToken());context.write(word, one);}}}public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {private IntWritable result = new IntWritable();public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}result.set(sum);context.write(key, result);}}public static void main(String[] args) throws Exception {//构建一个新的Configuration,static代码块中加载了core-default.xml、core-site.xml配置//如果core-site.xml将某个属性的final设置为true,那么用户将无法进行修改Configuration conf = new Configuration();//获取用户命令行中指定的选项,并进行配置String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length < 2) {System.err.println("Usage: wordcount <in> [<in>...] <out>");System.exit(2);}//根据配置和job名称创建一个新的Job,Job是提交者面对的视图//此时Cluster是空的,只有在需要时,才会根据conf参数创建ClusterJob job = Job.getInstance(conf, "word count");//通过查找示例类位置来设置作业的jar文件,此时Job状态被设置为DEFINEjob.setJarByClass(WordCount.class);//为作业设置Mapper,该类必须是Mapper的子类,那么设置mapreduce.job.map.class的value为该类job.setMapperClass(TokenizerMapper.class);//为作业设置combiner,该类必须是Reducer的子类,那么设置mapreduce.job.combine.class的value为该类job.setCombinerClass(IntSumReducer.class);//为作业设置Reducer,该类必须是Reducer的子类,那么设置mapreduce.job.reduce.class的value为该类job.setReducerClass(IntSumReducer.class);//设置作业输出的Key类型类,即mapreduce.job.output.key.classjob.setOutputKeyClass(Text.class);//设置作业输出的Value类型类,即mapreduce.job.output.value.classjob.setOutputValueClass(IntWritable.class);//设置输入数据的路径,设置mapreduce.input.fileinputformat.inputdir为以逗号为连接符的多个输入路径for (int i = 0; i < otherArgs.length - 1; ++i) {FileInputFormat.addInputPath(job, new Path(otherArgs[i]));}//设置输出数据的路径,即mapreduce.output.fileoutputformat.outputdirFileOutputFormat.setOutputPath(job,new Path(otherArgs[otherArgs.length - 1]));//将Job提交到集群并等待其完成。传参为true表示实时监控作业和打印状态System.exit(job.waitForCompletion(true) ? 0 : 1);}
}

注释已加,下面我们从job.waitForCompletion(true) 进入源码学习

其中涉及的方法有很多,不可能一一来看,我们这里只看主线上的方法以及重要的方法

1、Job.waitForCompletion

/*** Submit the job to the cluster and wait for it to finish.* @param verbose print the progress to the user* @return true if the job succeeded* @throws IOException thrown if the communication with the *         <code>JobTracker</code> is lost*/public boolean waitForCompletion(boolean verbose) throws IOException, InterruptedException,ClassNotFoundException {//如果此时Job状态是否为DEFINE,就提交if (state == JobState.DEFINE) {//将作业提交到集群并立即返回。submit();}//如果传入的参数为true,就实时打印Job状态if (verbose) {//随着进度和任务的进行,实时监控作业和打印状态monitorAndPrintJob();} else {// get the completion poll interval from the client.//从客户端获取完成轮询间隔。可以通过mapreduce.client.completion.pollinterval设置,//默认5000ms,JobClient轮询MapReduce ApplicationMaster以获取有关作业状态的更新的间隔(以毫秒为单位)。//测试小数据量时可以设置间隔短些,生产上设置的间隔长一些可以减少客户端-服务器交互int completionPollIntervalMillis = Job.getCompletionPollInterval(cluster.getConf());//检查作业是否已完成。这是一个非阻塞呼叫。while (!isComplete()) {try {Thread.sleep(completionPollIntervalMillis);} catch (InterruptedException ie) {}}}return isSuccessful();}

2、Job.submit

/*** Submit the job to the cluster and return immediately.* @throws IOException*/public void submit() throws IOException, InterruptedException, ClassNotFoundException {ensureState(JobState.DEFINE);//默认设置为新API,除非它们被显式设置,或者使用了旧的映射器或reduce属性。setUseNewAPI();//采用impersonation(doAs)机制,为符合身份和权限的用户构建Cluster//Cluster提供一种访问有关 map/reduce 群集的信息的方法。connect();//获取JobSubmitter 从字面上看时Job提交者 (参数为文件系统和客户端)//JobClient可以使用自有方法提交作业以供执行,并了解当前系统状态。final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient());status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException {//用于向系统提交作业的内部方法。return submitter.submitJobInternal(Job.this, cluster);}});//更改作业状态为RUNNINGstate = JobState.RUNNING;//获取可以显示该作业进度信息的URL。LOG.info("The url to track the job: " + getTrackingURL());}

3、JobSubmitter.submitJobInternal

/**
* 用于向系统提交Job的内部方法。
* Job提交过程包括:
*    1、检查Job的输入和输出规格
*    2、计算Job的InputSplit
*    3、如有必要,请为Job的DistributedCache设置必要的记帐信息
*    4、将Job的jar和配置复制到分布式文件系统上的map-reduce系统目录中
*    5、将作业提交到ResourceManager,并可选择监视其状态。
* @param job the configuration to submit
* @param cluster the handle to the Cluster
* @throws ClassNotFoundException
* @throws InterruptedException
* @throws IOException
*/
JobStatus submitJobInternal(Job job, Cluster cluster) throws ClassNotFoundException, InterruptedException, IOException {//验证作业输出规格,如果输出目录存在为避免重写则抛出异常checkSpecs(job);//根据Job获取Configuration,刚刚是根据配置创建Job,可见他们可以互相得到Configuration conf = job.getConfiguration();//加载MapReduce框架存档路径到分布式缓存conf//如果设置了MapReduce框架存档的路径(此路径通常位于HDFS文件系统中的公共位置),框架存档将自动与作业一起分发addMRFrameworkToDistributedCache(conf);//初始化临时目录并返回路径Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);//configure the command line options correctly on the submitting dfs//在提交的dfs上正确配置命令行选项//返回本地主机的地址。通过从系统中检索主机的名称,然后将该名称解析为InetAddress//注意:解析后的地址可能会被缓存一小段时间//如果存在安全管理器并被阻挡,那么返回表示环回地址的InetAddress//会获取系统的所有网卡信息,但是返回的是第一个InetAddress ip = InetAddress.getLocalHost();if (ip != null) {//设置提交端的ip地址submitHostAddress = ip.getHostAddress();//设置提交端的hostnamesubmitHostName = ip.getHostName();//设置job相关配置mapreduce.job.submithostnameconf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);//设置job相关配置mapreduce.job.submithostaddressconf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);}//作业分配一个唯一的jobIdJobID jobId = submitClient.getNewJobID();//为job设置jobIdjob.setJobID(jobId);//Path submitJobDir = new Path(jobStagingArea, jobId.toString());JobStatus status = null;try {//设置mapreduce.job.user.nameconf.set(MRJobConfig.USER_NAME,UserGroupInformation.getCurrentUser().getShortUserName());//设置hadoop.http.filter.initializers,默认的过滤类是org.apache.hadoop.http.lib.StaticUserWebFilter//这里设置的是AmFilterInitializer//该配置是以逗号分隔的类名列表,必须是FilterInitializer子类//这些Filter将应用于所有面向用户的jsp和servlet网页conf.set("hadoop.http.filter.initializers", "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");//设置mapreduce.job.dirconf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());LOG.debug("Configuring job " + jobId + " with " + submitJobDir + " as the submit dir");// get delegation token for the dir//获取dir的委派令牌TokenCache.obtainTokensForNamenodes(job.getCredentials(),new Path[] { submitJobDir }, conf);//获取密钥和令牌并将其存储到TokenCache中populateTokenCache(conf, job.getCredentials());// generate a secret to authenticate shuffle transfers// 生成一个密钥以验证无序传输if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {KeyGenerator keyGen;try {keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);keyGen.init(SHUFFLE_KEY_LENGTH);} catch (NoSuchAlgorithmException e) {throw new IOException("Error generating shuffle secret key", e);}//设置MapReduce中Shuffle的密钥key,可见Shuffle的传输是有校验的,是有数据完整性保障的SecretKey shuffleKey = keyGen.generateKey();TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),job.getCredentials());}//判断是否加密中间MapReduce溢写文件,默认false(mapreduce.job.encrypted-intermediate-data)if (CryptoUtils.isEncryptedSpillEnabled(conf)) {//如果设置了加密,就把最大作业尝试次数设置为1,默认值是2//该参数是应用程序尝试的最大次数,如果失败ApplicationMaster会进行重试conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 1);LOG.warn("Max job attempts set to 1 since encrypted intermediate" +"data spill is enabled");}//上传和配置与传递job相关的文件、libjar、jobjar和归档文件。//如果启用了共享缓存,则此客户端将使用libjar、文件、归档和jobjar的共享缓存//1.对于已经成功共享的资源,我们将继续以共享的方式使用它们。//2.对于不在缓存中并且需要NM上传的资源,我们不会要求NM上传。copyAndConfigureFiles(job, submitJobDir);//获取job conf的文件路径Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);// Create the splits for the job//为job创建 splitsLOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));//重点看该方法,该方法为job计算分片int maps = writeSplits(job, submitJobDir);//设置map个数 mapreduce.job.maps 可见分片数=map个数conf.setInt(MRJobConfig.NUM_MAPS, maps);LOG.info("number of splits:" + maps);//获取最大map数 mapreduce.job.max.map 默认 -1 即无限制int maxMaps = conf.getInt(MRJobConfig.JOB_MAX_MAP,MRJobConfig.DEFAULT_JOB_MAX_MAP);if (maxMaps >= 0 && maxMaps < maps) {throw new IllegalArgumentException("The number of map tasks " + maps +" exceeded limit " + maxMaps);}// write "queue admins of the queue to which job is being submitted"// to job file.//将“作业提交到的队列的队列管理员”写入作业文件//获取队列名称 mapreduce.job.queuename 默认是defaultString queue = conf.get(MRJobConfig.QUEUE_NAME,JobConf.DEFAULT_QUEUE_NAME);//获取给定作业队列的管理员。此方法仅供hadoop内部使用。AccessControlList acl = submitClient.getQueueAdmins(queue);//设置mapred.queue.default.acl-administer-jobsconf.set(toFullPropertyName(queue,QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());// removing jobtoken referrals before copying the jobconf to HDFS// as the tasks don't need this setting, actually they may break// because of it if present as the referral will point to a// different job.//在将job conf复制到HDFS之前删除jobtoken引用,因为任务不需要此设置,//实际上它们可能会因此而中断,因为引用将指向不同的作业。TokenCache.cleanUpTokenReferral(conf);//判断配置中mapreduce.job.token.tracking.ids.enabled(跟踪作业使用的令牌的ID的配置,默认false)if (conf.getBoolean(MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {// Add HDFS tracking idsArrayList<String> trackingIds = new ArrayList<String>();for (Token<? extends TokenIdentifier> t :job.getCredentials().getAllTokens()) {trackingIds.add(t.decodeIdentifier().getTrackingId());}conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,trackingIds.toArray(new String[trackingIds.size()]));}// Set reservation info if it exists//设置预订信息(如果存在)mapreduce.job.reservation.idReservationId reservationId = job.getReservationId();if (reservationId != null) {conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());}// Write job file to submit dir//写入作业文件以提交目录(HDFS上)writeConf(conf, submitJobFile);//// Now, actually submit the job (using the submit name)// 现在,真正提交作业(使用提交名称)//这里调用了YARNRunner.submitJob() 下面我们看下这个方法printTokens(jobId, job.getCredentials());status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());if (status != null) {return status;} else {throw new IOException("Could not launch job");}} finally {if (status == null) {LOG.info("Cleaning up the staging area " + submitJobDir);if (jtFs != null && submitJobDir != null)jtFs.delete(submitJobDir, true);}}}

3.1 JobSubmitter.writeSplits

private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,Path jobSubmitDir) throws IOException,InterruptedException, ClassNotFoundException {JobConf jConf = (JobConf)job.getConfiguration();int maps;//默认是false,在Job.submit是用setUseNewAPI()方法设置过trueif (jConf.getUseNewMapper()) {//重点看该方法maps = writeNewSplits(job, jobSubmitDir);} else {maps = writeOldSplits(jConf, jobSubmitDir);}return maps;}

3.2 JobSubmitter.writeNewSplits

private <T extends InputSplit>int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,InterruptedException, ClassNotFoundException {Configuration conf = job.getConfiguration();//获取输入格式化类,可以通过mapreduce.job.inputformat.class设置,//默认为TextInputFormat.classInputFormat<?, ?> input =ReflectionUtils.newInstance(job.getInputFormatClass(), conf);//重点看这个方法//按逻辑拆分作业的输入文件集。//每个InputSplit都被分配给一个单独的Mapper进行处理(分片数量=MapTask数量)//注意:InputSplit是逻辑上的分割(比如 <输入文件路径,开始,偏移量>),并没有改变文件对应的块//InputFormat还创建RecordReader以读取InputSplit。List<InputSplit> splits = input.getSplits(job);T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);// sort the splits into order based on size, so that the biggest// go first//根据大小将拆分部分按顺序排序,使最大的优先Arrays.sort(array, new SplitComparator());JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array);return array.length;}

3.3 FileInputFormat.getSplits

/** * Generate the list of files and make them into FileSplits.* 生成文件列表,并将它们制作成FileSplits。* @param job the job context* @throws IOException*/public List<InputSplit> getSplits(JobContext job) throws IOException {StopWatch sw = new StopWatch().start();//getFormatMinSplitSize() 返回 1//getMinSplitSize(job)) 获取mapreduce.input.fileinputformat.split.minsize值,默认1//两者取最大值,因为两者默认值都是1,那么 minSize = 1long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));//获取mapreduce.input.fileinputformat.split.maxsize的值,默认值Long.MAX_VALUE(2的63次方-1 MAX_VALUE=0x7fffffffffffffffL)long maxSize = getMaxSplitSize(job);// generate splits// 声明分片列表List<InputSplit> splits = new ArrayList<InputSplit>();//列出输入目录,仅选择与正则表达式匹配的文件List<FileStatus> files = listStatus(job);//获取mapreduce.input.fileinputformat.input.dir.recursive的值 默认false//获取mapreduce.input.fileinputformat.input.dir.nonrecursive.ignore.subdirs的值 默认false//两者都为true,才把ignoreDirs 设置为trueboolean ignoreDirs = !getInputDirRecursive(job)&& job.getConfiguration().getBoolean(INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS, false);//循环输入的每个文件,计算全部的InputSplitfor (FileStatus file: files) {//忽略目录if (ignoreDirs && file.isDirectory()) {continue;}//FileStatus接口,表示文件的客户端信息//Path 为FileSystem中文件或目录的名称//通过FileStatus获取PathPath path = file.getPath();//获取此文件的长度,以字节为单位。long length = file.getLen();//如果文件长度不等于0if (length != 0) {//BlockLocation 表示块的网络位置、有关包含块副本的主机的信息以及其他块元数据//(例如,与块相关的文件偏移量、长度、是否已损坏等)。//如果文件是3个复本,则BlockLocation的偏移量和长度表示文件中的绝对值,而主机是保存副本的3个数据节点。以下是一个示例://BlockLocation(offset: 0, length: BLOCK_SIZE,hosts: {"host1:9866", "host2:9866, host3:9866"})//如果文件是擦除编码的,则每个BlockLocation表示一个逻辑块组。值偏移是文件中块组的偏移,值长度是块组的总长度。BlockLocation的主机是保存块组的所有数据块和奇偶校验块的数据节点。//假设我们有一个RS_3_2编码文件(3个数据单元和2个奇偶校验单元)。BlockLocation示例如下://BlockLocation(offset: 0, length: 3 * BLOCK_SIZE, hosts: {"host1:9866","host2:9866","host3:9866","host4:9866","host5:9866"})BlockLocation[] blkLocations;//判断文件是否是LocatedFileStatus的实例//获取文件的 block 位置列表    if (file instanceof LocatedFileStatus) {blkLocations = ((LocatedFileStatus) file).getBlockLocations();} else {FileSystem fs = path.getFileSystem(job.getConfiguration());blkLocations = fs.getFileBlockLocations(file, 0, length);}//判断文件是可拆分的吗?通常情况下,这是真的,但如果文件是流压缩的,则不会。if (isSplitable(job, path)) {//获取该文件的块大小,HDFS允许文件可以指定自己的块大小和副本数long blockSize = file.getBlockSize();//计算该文件的分片大小://Math.max(minSize, Math.min(maxSize, blockSize));//minSize 默认 = 1//maxSize 默认 = Long.MAX_VALUE//那么默认情况下该文件的分片大小=blockSize(该文件的块大小)long splitSize = computeSplitSize(blockSize, minSize, maxSize);//默认剩下的字节长度=文件总的字节长度long bytesRemaining = length;//文件剩下的字节长度 / 分片大小(默认该文件块大小) > 1.1//含义:如果文件剩下的字节长度还有 块大小的1.1倍就继续 //    如果一个文件只有一个块 那么就不走该循环了while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {//length-bytesRemaining 相当于对于该文件整体的偏移量//根据偏移量获取对应该文件的第几个块int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);//添加分片splits.add(makeSplit(path, length-bytesRemaining, splitSize,blkLocations[blkIndex].getHosts(),blkLocations[blkIndex].getCachedHosts()));bytesRemaining -= splitSize;}//一般到最后一个分片会走这里,或者该文件特别小只有一个块会走这里if (bytesRemaining != 0) {int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,blkLocations[blkIndex].getHosts(),blkLocations[blkIndex].getCachedHosts()));}} else { // not splitableif (LOG.isDebugEnabled()) {// Log only if the file is big enough to be splittedif (length > Math.min(file.getBlockSize(), minSize)) {LOG.debug("File is not splittable so no parallelization "+ "is possible: " + file.getPath());}}//制作分片,分片数量=文件数量,分片为该文件对应的副本中第一个副本所在位置(优先取在缓存中的副本)splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),blkLocations[0].getCachedHosts()));}} else { //Create empty hosts array for zero length files//如果输入文件的字节大小=0,创建空的分片splits.add(makeSplit(path, 0, length, new String[0]));}}// Save the number of input files for metrics/loadgen// 为 job 设置文件数 mapreduce.input.fileinputformat.numinputfilesjob.getConfiguration().setLong(NUM_INPUT_FILES, files.size());sw.stop();if (LOG.isDebugEnabled()) {LOG.debug("Total # of splits generated by getSplits: " + splits.size()+ ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));}return splits;}

4、YARNRunner.submitJob

public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)throws IOException, InterruptedException {//添加TokensaddHistoryToken(ts);//构建启动MapReduce ApplicationMaster所需的所有信息//    1、设置LocalResources(表示运行容器所需的本地资源,NodeManager负责在启动容器之前本地化资源)//    2、设置安全令牌//    3、为ApplicationMaster容器设置ContainerLaunchContext(表示NodeManager启动容器所需的所有信息包括:ContainerId、资源情况、分配给谁、安全令牌、环境变量、启动容器的命令、容器失败退出时的重试策略、运行容器所必需的,如二进制文件、jar、共享对象、辅助文件等、)//    4、设置ApplicationSubmissionContext(表示ResourceManager启动应用程序的ApplicationMaster所需的所有信息。包括:ApplicationId、用户、名称、优先级、执行ApplicationMaster的容器的ContainerLaunchContext、可尝试的最大次数、尝试间隔、NodeManager处理应用程序日志所需的所有信息)//    5、设置ApplicationMaster资源请求//    6、为AM容器请求设置标签(如果存在)//    7、为job容器设置标签ApplicationSubmissionContext appContext =createApplicationSubmissionContext(conf, jobSubmitDir, ts);// Submit to ResourceManager// 向ResourceManager提交try {//最终是用YarnClient来提交到YarnApplicationId applicationId =resMgrDelegate.submitApplication(appContext);ApplicationReport appMaster = resMgrDelegate.getApplicationReport(applicationId);String diagnostics =(appMaster == null ?"application report is null" : appMaster.getDiagnostics());if (appMaster == null|| appMaster.getYarnApplicationState() == YarnApplicationState.FAILED|| appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) {throw new IOException("Failed to run job : " +diagnostics);}return clientCache.getClient(jobId).getJobStatus(jobId);} catch (YarnException e) {throw new IOException(e);}}

5、YarnClientImpl.submitApplication

/*** 向YARN提交新申请,这是一个阻塞调用-在提交的应用程序成功提交并被ResourceManager接受之前,* 它不会返回ApplicationId。* 用户在提交新应用程序时应提供ApplicationId作为参数ApplicationSubmissionContext的一部分* 这在内部调用ApplicationClientProtocol.submitApplication() 之后在内部调用 ApplicationClientProtocol.getApplicationReport()* */
public ApplicationIdsubmitApplication(ApplicationSubmissionContext appContext)throws YarnException, IOException {//获取applicationIdApplicationId applicationId = appContext.getApplicationId();if (applicationId == null) {throw new ApplicationIdNotProvidedException("ApplicationId is not provided in ApplicationSubmissionContext");}//构建SubmitApplicationRequest(向ResourceManager提交应用程序的请求信息)SubmitApplicationRequest request =Records.newRecord(SubmitApplicationRequest.class);request.setApplicationSubmissionContext(appContext);// Automatically add the timeline DT into the CLC// Only when the security and the timeline service are both enabled//仅当安全和时间线服务都启用时自动将时间线DT添加到CLC中if (isSecurityEnabled() && timelineV1ServiceEnabled) {addTimelineDelegationToken(appContext.getAMContainerSpec());}//TODO: YARN-1763:Handle RM failovers during the submitApplication call.//提交作业//客户端用于向ResourceManager提交新应用程序的接口//客户端需要通过SubmitApplicationRequest提供详细信息,如运行ApplicationMaster所需的队列、用于启动Application Master的等效队列等rmClient.submitApplication(request);int pollCount = 0;long startTime = System.currentTimeMillis();//Job等待状态设置EnumSet<YarnApplicationState> waitingStates = EnumSet.of(YarnApplicationState.NEW,YarnApplicationState.NEW_SAVING,YarnApplicationState.SUBMITTED);//Job失败状态设置EnumSet<YarnApplicationState> failToSubmitStates = EnumSet.of(YarnApplicationState.FAILED,YarnApplicationState.KILLED);		while (true) {try {//获取应用的报告,包括://    ApplicationId//    Applications user//    Application queue//    Application name//    允许ApplicationMaster的主机//    ApplicationMaster的RPC端口//    跟踪url//    ApplicationMaster的各种状态//    出现错误时的诊断信息//    应用的开始时间//    如果开启了安全性,应用的客户端令牌ApplicationReport appReport = getApplicationReport(applicationId);YarnApplicationState state = appReport.getYarnApplicationState();if (!waitingStates.contains(state)) {if(failToSubmitStates.contains(state)) {throw new YarnException("Failed to submit " + applicationId + " to YARN : " + appReport.getDiagnostics());}LOG.info("Submitted application " + applicationId);break;}long elapsedMillis = System.currentTimeMillis() - startTime;if (enforceAsyncAPITimeout() &&elapsedMillis >= asyncApiPollTimeoutMillis) {throw new YarnException("Timed out while waiting for application " +applicationId + " to be submitted successfully");}// Notify the client through the log every 10 poll, in case the client// is blocked here too long.//每10次轮询通过日志通知客户端,以防客户端在此处被阻止的时间过长。if (++pollCount % 10 == 0) {LOG.info("Application submission is not finished, " +"submitted application " + applicationId +" is still in " + state);}try {//通过yarn.client.app-submission.poll-interval 设置,默认值200msThread.sleep(submitPollIntervalMillis);} catch (InterruptedException ie) {String msg = "Interrupted while waiting for application "+ applicationId + " to be successfully submitted.";LOG.error(msg);throw new YarnException(msg, ie);}} catch (ApplicationNotFoundException ex) {// FailOver or RM restart happens before RMStateStore saves// ApplicationState//故障转移或RM重新启动发生在RMStateStore保存ApplicationState之前LOG.info("Re-submit application " + applicationId + "with the " +"same ApplicationSubmissionContext");rmClient.submitApplication(request);}}return applicationId;}

三、总结

1、构建Configuration,并加载hadoop默认的配置文件core-default.xml、core-site.xml
2、解析命令行参数,配置用户配置的环境变量
3、设置Job信息,比如:主类、Mapper类、Reduce类、Combiner类、输出格式、输入输出文件等
4、异步提交Job,实时监控作业并打印Job状态
5、根据用户身份和权限构建Cluster,并向集群提交Job
6、检查Job的输入和输出规格
7、计算Job的InputSplit(格式:<输入文件路径,开始,偏移量>,默认分片数量=所有输入文件对应的块的数量,且每个分片对应一个Mapper)
8、如有必要,请为Job的DistributedCache设置必要的记帐信息
9、将Job的jar和配置复制到分布式文件系统上的map-reduce系统目录中
10、将作业提交到ResourceManager,并可选择监视其状态

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/651937.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

支持下一代网络IpV6的串口服务器,IpV6串口485接口转网口

和IPv4比较&#xff0c;IPv6有两个极具吸引力的特点&#xff1a;一个是IPv6采用的128位地址格式&#xff0c;而IPv4采用32位的地址格式&#xff0c;因此IPv6使地址空间增大了296&#xff1b;另一个是IPv6物联网数据业务具有更强的支持能力&#xff0c;成为未来物联网的重要协议…

Kafka消息流转的挑战与对策:消息丢失与重复消费问题

消息丢失和重复消费时分布式系统重的常见问题&#xff0c;如果处理不好会对业务造成很大的影响。比如用户下单是通过消息队列处理的&#xff0c;对于用户的订单来说&#xff0c;消息丢失会造成用户下单丢失&#xff0c;影响售卖&#xff0c;如果重复消费&#xff0c;可能会生成…

Ps:创建基于饱和度的蒙版

能够区分图像上哪些区域的饱和度高&#xff0c;哪些区域的饱和度低&#xff0c;在调色过程中是相当有用的。 比如&#xff0c;使得饱和度高的区域更加饱和&#xff0c;可增加图像色彩反差&#xff0c;让画面更引人注目。 或者&#xff0c;使得饱和度区域趋于饱和&#xff0c;让…

技术书评和笔记【01】脑机接口-电路与系统 【2020版】

前言: 荷兰作者,Amir Zjajo博士,毕业于荷兰代尔夫特理工大学,方向 面向移动健康的低功耗混合型号电路与系统,以及,面向认知的神经形态电路。 ,脑机接口 - 电路与系统一书,系统介绍了,脑机接口电路与系统的实现技术,尤其,提到了量产和设计的问题,难能可贵,摘录如…

JVM篇----第九篇

系列文章目录 文章目录 系列文章目录前言一、分代收集算法二、新生代与复制算法三、老年代与标记复制算法前言 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站,这篇文章男女通用,看懂了就去分享给你的码吧。 一、分代…

MySQL-删除重复数据

在实际应用中&#xff0c;遇到一个这样的问题&#xff0c;MySQL中存储的数据为资讯类数据&#xff0c;在页面展示时会出现多个平台的新闻报导相同的内容&#xff0c;导致页面会出现重复数据。因为数据是每天定期更新&#xff0c;所以最快捷有效的方式是在更新完数据后增加一个去…

3、创建特性(Creating Features)

使用Pandas转换特性以适合您的模型。 文章目录 1、简介2、数学变换3、计数4、构建和分解特征5、分组转换1、简介 一旦你确定了一组有潜力的特性,就可以开始开发它们了。在这节课中,你将学习如何在Pandas中进行一些常见的转换。如果你对Pandas不熟练, 请参考《从零开始的Pand…

YOLOv8融合改进 更换检测头同时改进C2f模块

一、Detect_DyHead检测头和C2f-EMSC,C2f-EMSCP模块 详细介绍和代码在往期的博客里: Detect_DyHead: (YOLOv8改进检测头Detect为Detect_Dyhead-CSDN博客) C2f-EMSC和C2f-EMSCP: (YOLOv8改进之多尺度转换模块C2f-EMSC和C2f-EMSCP-CSDN博客) 二、算法实现 1、将检测…

QT之 QDebug 调试(一)

在QT中&#xff0c;进行调试&#xff0c;则需要在头文件地方加上 #include <QDebug> 加上之后&#xff0c;在编译之后则其输出的信息则在应用程序输出那里显示信息。 其QDebug 信息调试则如&#xff1a; qDebug() << " 需要插入的信息 "…

RPC教程 7.服务发现与注册中心

0.前言 这一节的内容只能解决只有一个服务的情况。要是有多个服务(即是多个结构体&#xff09;这种就解决不了&#xff0c;也即是没有服务ip地址和服务实例的映射关系。 1.为什么需要注册中心 在上一节中&#xff0c;客户端想要找到服务实例的ip,需要硬编码把ip写到代码中。…

猿媛员的专属春联来咯

我们“因程序汇聚&#xff0c;因猿份相识”&#xff0c;今天来给辛苦了一年的“猿媛员”们送上几幅恶搞对联&#xff0c;为图一笑 &#x1f604; 闲言少叙&#xff0c;上对联 龙行多福 上联&#xff1a;龙龙龙龙龙龙龙 下联&#xff1a;福福福福福福福 形象版 上联&#…

centos 7安装MySQl

本文参考借鉴&#xff1a;https://cloud.tencent.com/developer/article/2353312&#xff0c;非常赞&#xff01; 为了避免权限不足的问题&#xff0c;建议切换至root用户进行安装 1.MySQL的清理与安装 查看是否存在MySQL服务 安装mysql之前&#xff0c;需要先看看要安装系…

【极数系列】Flink搭建入门项目Demo 秒懂Flink开发运行原理(05)

文章目录 引言1.创建mavenx项目2.包结构3.引入pom依赖4.增加log4j2.properties配置5.创建主启动类6.构建打jar包7.flinkUI页面部署 引言 gitee地址&#xff1a;https://gitee.com/shawsongyue/aurora.git 源码直接下载可运行&#xff0c;模块&#xff1a;aurora_flink Flink 版…

phar反序列化漏洞

基础&#xff1a; Phar是一种PHP文件归档格式&#xff0c;它类似于ZIP或JAR文件格式&#xff0c;可以将多个PHP文件打包成一个单独的文件&#xff08;即Phar文件&#xff09;。 打包后的Phar文件可以像普通的PHP文件一样执行&#xff0c;可以包含PHP代码、文本文件、图像等各…

剖析线程池ThreadPoolExecutor

文章目录 线程池一、线程池概述二、ThreadPoolExecutor类详解三、线程池参数配置与优化四、线程池监控与调优五、线程池与其他并发工具比较六、线程池在实际应用中的案例分析案例背景线程池的配置配置线程池参数。处理用户请求 监控与调优 七、线程池的扩展与自定义实现八、线程…

Python爬虫---Scrapy框架---CrawlSpider

CrawlSpider 1. CrawlSpider继承自scrapy.Spider 2. CrawlSpider可以定义规则&#xff0c;再解析html内容的时候&#xff0c;可以根据链接规则提取出指定的链接&#xff0c;然后再向这些链接发送请求&#xff0c;所以&#xff0c;如果有需要跟进链接的需求&#xff0c;意思就是…

Redis实现多种限流算法

一 常见限流算法 1 固定窗口限流 每一个时间段计数器&#xff0c;当计数器达到阈值后拒绝&#xff0c;每过完这个时间段&#xff0c;计数器重置0&#xff0c;重新计数。 优点&#xff1a;实现简单&#xff0c;性能高&#xff1b; 缺点&#xff1a;明显的临界问题&#xff0c…

有手就行!阿里云上3分钟搞定幻兽帕鲁联机服务器搭建

幻兽帕鲁最近在社区呈现了爆火的趋势&#xff0c;在线人数已突破百万级别&#xff0c;官方服务器也开始出现不稳定&#xff0c;卡人闪退的情况。对于有一定财力的小伙伴&#xff0c;搭建一个私人服务器是一个最稳定而舒服的解决方案。 本文萝卜哥将讲解一下如何快速搭建 palwo…

看图说话:Git图谱解读

很多新加入公司的同学在使用Git各类客户端管理代码的过程中对于Git图谱解读不太理解&#xff0c;我们常用的Git客户端是SourceTree&#xff0c;配合P4Merge进行冲突解决基本可以满足日常工作大部分需要。不同的Git客户端工具对图谱展示会有些许差异&#xff0c;以下是SourceTre…

查看 Avro 格式的 Kafka 消息(启用了 Confluent Schema Registry )

使用 Avro 格式传递 Kafka 消息要比 Json 更加高效,因为它是二进制格式,在启用了 Confluent Schema Registry 的情况下,会进一步地提升传输效率,因为 Avro 中的 Schema 信息将不再出现在消息中,消息体积会进一步压缩,同时,还可以利用到 Schema Registry 的其他好处,例如…