一、源码下载
下面是hadoop官方源码下载地址,我下载的是hadoop-3.2.4,那就一起来看下吧
Index of /dist/hadoop/core
二、从WordCount进入源码
用idea将源码加载进来后,找到org.apache.hadoop.examples.WordCount类(快捷方法:双击Shift输入WordCount)
/*** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
package org.apache.hadoop.examples;import java.io.IOException;
import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;public class WordCount {public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{private final static IntWritable one = new IntWritable(1);private Text word = new Text();public void map(Object key, Text value, Context context) throws IOException, InterruptedException {StringTokenizer itr = new StringTokenizer(value.toString());while (itr.hasMoreTokens()) {word.set(itr.nextToken());context.write(word, one);}}}public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {private IntWritable result = new IntWritable();public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}result.set(sum);context.write(key, result);}}public static void main(String[] args) throws Exception {//构建一个新的Configuration,static代码块中加载了core-default.xml、core-site.xml配置//如果core-site.xml将某个属性的final设置为true,那么用户将无法进行修改Configuration conf = new Configuration();//获取用户命令行中指定的选项,并进行配置String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length < 2) {System.err.println("Usage: wordcount <in> [<in>...] <out>");System.exit(2);}//根据配置和job名称创建一个新的Job,Job是提交者面对的视图//此时Cluster是空的,只有在需要时,才会根据conf参数创建ClusterJob job = Job.getInstance(conf, "word count");//通过查找示例类位置来设置作业的jar文件,此时Job状态被设置为DEFINEjob.setJarByClass(WordCount.class);//为作业设置Mapper,该类必须是Mapper的子类,那么设置mapreduce.job.map.class的value为该类job.setMapperClass(TokenizerMapper.class);//为作业设置combiner,该类必须是Reducer的子类,那么设置mapreduce.job.combine.class的value为该类job.setCombinerClass(IntSumReducer.class);//为作业设置Reducer,该类必须是Reducer的子类,那么设置mapreduce.job.reduce.class的value为该类job.setReducerClass(IntSumReducer.class);//设置作业输出的Key类型类,即mapreduce.job.output.key.classjob.setOutputKeyClass(Text.class);//设置作业输出的Value类型类,即mapreduce.job.output.value.classjob.setOutputValueClass(IntWritable.class);//设置输入数据的路径,设置mapreduce.input.fileinputformat.inputdir为以逗号为连接符的多个输入路径for (int i = 0; i < otherArgs.length - 1; ++i) {FileInputFormat.addInputPath(job, new Path(otherArgs[i]));}//设置输出数据的路径,即mapreduce.output.fileoutputformat.outputdirFileOutputFormat.setOutputPath(job,new Path(otherArgs[otherArgs.length - 1]));//将Job提交到集群并等待其完成。传参为true表示实时监控作业和打印状态System.exit(job.waitForCompletion(true) ? 0 : 1);}
}
注释已加,下面我们从job.waitForCompletion(true) 进入源码学习
其中涉及的方法有很多,不可能一一来看,我们这里只看主线上的方法以及重要的方法
1、Job.waitForCompletion
/*** Submit the job to the cluster and wait for it to finish.* @param verbose print the progress to the user* @return true if the job succeeded* @throws IOException thrown if the communication with the * <code>JobTracker</code> is lost*/public boolean waitForCompletion(boolean verbose) throws IOException, InterruptedException,ClassNotFoundException {//如果此时Job状态是否为DEFINE,就提交if (state == JobState.DEFINE) {//将作业提交到集群并立即返回。submit();}//如果传入的参数为true,就实时打印Job状态if (verbose) {//随着进度和任务的进行,实时监控作业和打印状态monitorAndPrintJob();} else {// get the completion poll interval from the client.//从客户端获取完成轮询间隔。可以通过mapreduce.client.completion.pollinterval设置,//默认5000ms,JobClient轮询MapReduce ApplicationMaster以获取有关作业状态的更新的间隔(以毫秒为单位)。//测试小数据量时可以设置间隔短些,生产上设置的间隔长一些可以减少客户端-服务器交互int completionPollIntervalMillis = Job.getCompletionPollInterval(cluster.getConf());//检查作业是否已完成。这是一个非阻塞呼叫。while (!isComplete()) {try {Thread.sleep(completionPollIntervalMillis);} catch (InterruptedException ie) {}}}return isSuccessful();}
2、Job.submit
/*** Submit the job to the cluster and return immediately.* @throws IOException*/public void submit() throws IOException, InterruptedException, ClassNotFoundException {ensureState(JobState.DEFINE);//默认设置为新API,除非它们被显式设置,或者使用了旧的映射器或reduce属性。setUseNewAPI();//采用impersonation(doAs)机制,为符合身份和权限的用户构建Cluster//Cluster提供一种访问有关 map/reduce 群集的信息的方法。connect();//获取JobSubmitter 从字面上看时Job提交者 (参数为文件系统和客户端)//JobClient可以使用自有方法提交作业以供执行,并了解当前系统状态。final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient());status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException {//用于向系统提交作业的内部方法。return submitter.submitJobInternal(Job.this, cluster);}});//更改作业状态为RUNNINGstate = JobState.RUNNING;//获取可以显示该作业进度信息的URL。LOG.info("The url to track the job: " + getTrackingURL());}
3、JobSubmitter.submitJobInternal
/**
* 用于向系统提交Job的内部方法。
* Job提交过程包括:
* 1、检查Job的输入和输出规格
* 2、计算Job的InputSplit
* 3、如有必要,请为Job的DistributedCache设置必要的记帐信息
* 4、将Job的jar和配置复制到分布式文件系统上的map-reduce系统目录中
* 5、将作业提交到ResourceManager,并可选择监视其状态。
* @param job the configuration to submit
* @param cluster the handle to the Cluster
* @throws ClassNotFoundException
* @throws InterruptedException
* @throws IOException
*/
JobStatus submitJobInternal(Job job, Cluster cluster) throws ClassNotFoundException, InterruptedException, IOException {//验证作业输出规格,如果输出目录存在为避免重写则抛出异常checkSpecs(job);//根据Job获取Configuration,刚刚是根据配置创建Job,可见他们可以互相得到Configuration conf = job.getConfiguration();//加载MapReduce框架存档路径到分布式缓存conf//如果设置了MapReduce框架存档的路径(此路径通常位于HDFS文件系统中的公共位置),框架存档将自动与作业一起分发addMRFrameworkToDistributedCache(conf);//初始化临时目录并返回路径Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);//configure the command line options correctly on the submitting dfs//在提交的dfs上正确配置命令行选项//返回本地主机的地址。通过从系统中检索主机的名称,然后将该名称解析为InetAddress//注意:解析后的地址可能会被缓存一小段时间//如果存在安全管理器并被阻挡,那么返回表示环回地址的InetAddress//会获取系统的所有网卡信息,但是返回的是第一个InetAddress ip = InetAddress.getLocalHost();if (ip != null) {//设置提交端的ip地址submitHostAddress = ip.getHostAddress();//设置提交端的hostnamesubmitHostName = ip.getHostName();//设置job相关配置mapreduce.job.submithostnameconf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);//设置job相关配置mapreduce.job.submithostaddressconf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);}//作业分配一个唯一的jobIdJobID jobId = submitClient.getNewJobID();//为job设置jobIdjob.setJobID(jobId);//Path submitJobDir = new Path(jobStagingArea, jobId.toString());JobStatus status = null;try {//设置mapreduce.job.user.nameconf.set(MRJobConfig.USER_NAME,UserGroupInformation.getCurrentUser().getShortUserName());//设置hadoop.http.filter.initializers,默认的过滤类是org.apache.hadoop.http.lib.StaticUserWebFilter//这里设置的是AmFilterInitializer//该配置是以逗号分隔的类名列表,必须是FilterInitializer子类//这些Filter将应用于所有面向用户的jsp和servlet网页conf.set("hadoop.http.filter.initializers", "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");//设置mapreduce.job.dirconf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());LOG.debug("Configuring job " + jobId + " with " + submitJobDir + " as the submit dir");// get delegation token for the dir//获取dir的委派令牌TokenCache.obtainTokensForNamenodes(job.getCredentials(),new Path[] { submitJobDir }, conf);//获取密钥和令牌并将其存储到TokenCache中populateTokenCache(conf, job.getCredentials());// generate a secret to authenticate shuffle transfers// 生成一个密钥以验证无序传输if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {KeyGenerator keyGen;try {keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);keyGen.init(SHUFFLE_KEY_LENGTH);} catch (NoSuchAlgorithmException e) {throw new IOException("Error generating shuffle secret key", e);}//设置MapReduce中Shuffle的密钥key,可见Shuffle的传输是有校验的,是有数据完整性保障的SecretKey shuffleKey = keyGen.generateKey();TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),job.getCredentials());}//判断是否加密中间MapReduce溢写文件,默认false(mapreduce.job.encrypted-intermediate-data)if (CryptoUtils.isEncryptedSpillEnabled(conf)) {//如果设置了加密,就把最大作业尝试次数设置为1,默认值是2//该参数是应用程序尝试的最大次数,如果失败ApplicationMaster会进行重试conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 1);LOG.warn("Max job attempts set to 1 since encrypted intermediate" +"data spill is enabled");}//上传和配置与传递job相关的文件、libjar、jobjar和归档文件。//如果启用了共享缓存,则此客户端将使用libjar、文件、归档和jobjar的共享缓存//1.对于已经成功共享的资源,我们将继续以共享的方式使用它们。//2.对于不在缓存中并且需要NM上传的资源,我们不会要求NM上传。copyAndConfigureFiles(job, submitJobDir);//获取job conf的文件路径Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);// Create the splits for the job//为job创建 splitsLOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));//重点看该方法,该方法为job计算分片int maps = writeSplits(job, submitJobDir);//设置map个数 mapreduce.job.maps 可见分片数=map个数conf.setInt(MRJobConfig.NUM_MAPS, maps);LOG.info("number of splits:" + maps);//获取最大map数 mapreduce.job.max.map 默认 -1 即无限制int maxMaps = conf.getInt(MRJobConfig.JOB_MAX_MAP,MRJobConfig.DEFAULT_JOB_MAX_MAP);if (maxMaps >= 0 && maxMaps < maps) {throw new IllegalArgumentException("The number of map tasks " + maps +" exceeded limit " + maxMaps);}// write "queue admins of the queue to which job is being submitted"// to job file.//将“作业提交到的队列的队列管理员”写入作业文件//获取队列名称 mapreduce.job.queuename 默认是defaultString queue = conf.get(MRJobConfig.QUEUE_NAME,JobConf.DEFAULT_QUEUE_NAME);//获取给定作业队列的管理员。此方法仅供hadoop内部使用。AccessControlList acl = submitClient.getQueueAdmins(queue);//设置mapred.queue.default.acl-administer-jobsconf.set(toFullPropertyName(queue,QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());// removing jobtoken referrals before copying the jobconf to HDFS// as the tasks don't need this setting, actually they may break// because of it if present as the referral will point to a// different job.//在将job conf复制到HDFS之前删除jobtoken引用,因为任务不需要此设置,//实际上它们可能会因此而中断,因为引用将指向不同的作业。TokenCache.cleanUpTokenReferral(conf);//判断配置中mapreduce.job.token.tracking.ids.enabled(跟踪作业使用的令牌的ID的配置,默认false)if (conf.getBoolean(MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {// Add HDFS tracking idsArrayList<String> trackingIds = new ArrayList<String>();for (Token<? extends TokenIdentifier> t :job.getCredentials().getAllTokens()) {trackingIds.add(t.decodeIdentifier().getTrackingId());}conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,trackingIds.toArray(new String[trackingIds.size()]));}// Set reservation info if it exists//设置预订信息(如果存在)mapreduce.job.reservation.idReservationId reservationId = job.getReservationId();if (reservationId != null) {conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());}// Write job file to submit dir//写入作业文件以提交目录(HDFS上)writeConf(conf, submitJobFile);//// Now, actually submit the job (using the submit name)// 现在,真正提交作业(使用提交名称)//这里调用了YARNRunner.submitJob() 下面我们看下这个方法printTokens(jobId, job.getCredentials());status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());if (status != null) {return status;} else {throw new IOException("Could not launch job");}} finally {if (status == null) {LOG.info("Cleaning up the staging area " + submitJobDir);if (jtFs != null && submitJobDir != null)jtFs.delete(submitJobDir, true);}}}
3.1 JobSubmitter.writeSplits
private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,Path jobSubmitDir) throws IOException,InterruptedException, ClassNotFoundException {JobConf jConf = (JobConf)job.getConfiguration();int maps;//默认是false,在Job.submit是用setUseNewAPI()方法设置过trueif (jConf.getUseNewMapper()) {//重点看该方法maps = writeNewSplits(job, jobSubmitDir);} else {maps = writeOldSplits(jConf, jobSubmitDir);}return maps;}
3.2 JobSubmitter.writeNewSplits
private <T extends InputSplit>int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,InterruptedException, ClassNotFoundException {Configuration conf = job.getConfiguration();//获取输入格式化类,可以通过mapreduce.job.inputformat.class设置,//默认为TextInputFormat.classInputFormat<?, ?> input =ReflectionUtils.newInstance(job.getInputFormatClass(), conf);//重点看这个方法//按逻辑拆分作业的输入文件集。//每个InputSplit都被分配给一个单独的Mapper进行处理(分片数量=MapTask数量)//注意:InputSplit是逻辑上的分割(比如 <输入文件路径,开始,偏移量>),并没有改变文件对应的块//InputFormat还创建RecordReader以读取InputSplit。List<InputSplit> splits = input.getSplits(job);T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);// sort the splits into order based on size, so that the biggest// go first//根据大小将拆分部分按顺序排序,使最大的优先Arrays.sort(array, new SplitComparator());JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array);return array.length;}
3.3 FileInputFormat.getSplits
/** * Generate the list of files and make them into FileSplits.* 生成文件列表,并将它们制作成FileSplits。* @param job the job context* @throws IOException*/public List<InputSplit> getSplits(JobContext job) throws IOException {StopWatch sw = new StopWatch().start();//getFormatMinSplitSize() 返回 1//getMinSplitSize(job)) 获取mapreduce.input.fileinputformat.split.minsize值,默认1//两者取最大值,因为两者默认值都是1,那么 minSize = 1long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));//获取mapreduce.input.fileinputformat.split.maxsize的值,默认值Long.MAX_VALUE(2的63次方-1 MAX_VALUE=0x7fffffffffffffffL)long maxSize = getMaxSplitSize(job);// generate splits// 声明分片列表List<InputSplit> splits = new ArrayList<InputSplit>();//列出输入目录,仅选择与正则表达式匹配的文件List<FileStatus> files = listStatus(job);//获取mapreduce.input.fileinputformat.input.dir.recursive的值 默认false//获取mapreduce.input.fileinputformat.input.dir.nonrecursive.ignore.subdirs的值 默认false//两者都为true,才把ignoreDirs 设置为trueboolean ignoreDirs = !getInputDirRecursive(job)&& job.getConfiguration().getBoolean(INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS, false);//循环输入的每个文件,计算全部的InputSplitfor (FileStatus file: files) {//忽略目录if (ignoreDirs && file.isDirectory()) {continue;}//FileStatus接口,表示文件的客户端信息//Path 为FileSystem中文件或目录的名称//通过FileStatus获取PathPath path = file.getPath();//获取此文件的长度,以字节为单位。long length = file.getLen();//如果文件长度不等于0if (length != 0) {//BlockLocation 表示块的网络位置、有关包含块副本的主机的信息以及其他块元数据//(例如,与块相关的文件偏移量、长度、是否已损坏等)。//如果文件是3个复本,则BlockLocation的偏移量和长度表示文件中的绝对值,而主机是保存副本的3个数据节点。以下是一个示例://BlockLocation(offset: 0, length: BLOCK_SIZE,hosts: {"host1:9866", "host2:9866, host3:9866"})//如果文件是擦除编码的,则每个BlockLocation表示一个逻辑块组。值偏移是文件中块组的偏移,值长度是块组的总长度。BlockLocation的主机是保存块组的所有数据块和奇偶校验块的数据节点。//假设我们有一个RS_3_2编码文件(3个数据单元和2个奇偶校验单元)。BlockLocation示例如下://BlockLocation(offset: 0, length: 3 * BLOCK_SIZE, hosts: {"host1:9866","host2:9866","host3:9866","host4:9866","host5:9866"})BlockLocation[] blkLocations;//判断文件是否是LocatedFileStatus的实例//获取文件的 block 位置列表 if (file instanceof LocatedFileStatus) {blkLocations = ((LocatedFileStatus) file).getBlockLocations();} else {FileSystem fs = path.getFileSystem(job.getConfiguration());blkLocations = fs.getFileBlockLocations(file, 0, length);}//判断文件是可拆分的吗?通常情况下,这是真的,但如果文件是流压缩的,则不会。if (isSplitable(job, path)) {//获取该文件的块大小,HDFS允许文件可以指定自己的块大小和副本数long blockSize = file.getBlockSize();//计算该文件的分片大小://Math.max(minSize, Math.min(maxSize, blockSize));//minSize 默认 = 1//maxSize 默认 = Long.MAX_VALUE//那么默认情况下该文件的分片大小=blockSize(该文件的块大小)long splitSize = computeSplitSize(blockSize, minSize, maxSize);//默认剩下的字节长度=文件总的字节长度long bytesRemaining = length;//文件剩下的字节长度 / 分片大小(默认该文件块大小) > 1.1//含义:如果文件剩下的字节长度还有 块大小的1.1倍就继续 // 如果一个文件只有一个块 那么就不走该循环了while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {//length-bytesRemaining 相当于对于该文件整体的偏移量//根据偏移量获取对应该文件的第几个块int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);//添加分片splits.add(makeSplit(path, length-bytesRemaining, splitSize,blkLocations[blkIndex].getHosts(),blkLocations[blkIndex].getCachedHosts()));bytesRemaining -= splitSize;}//一般到最后一个分片会走这里,或者该文件特别小只有一个块会走这里if (bytesRemaining != 0) {int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,blkLocations[blkIndex].getHosts(),blkLocations[blkIndex].getCachedHosts()));}} else { // not splitableif (LOG.isDebugEnabled()) {// Log only if the file is big enough to be splittedif (length > Math.min(file.getBlockSize(), minSize)) {LOG.debug("File is not splittable so no parallelization "+ "is possible: " + file.getPath());}}//制作分片,分片数量=文件数量,分片为该文件对应的副本中第一个副本所在位置(优先取在缓存中的副本)splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),blkLocations[0].getCachedHosts()));}} else { //Create empty hosts array for zero length files//如果输入文件的字节大小=0,创建空的分片splits.add(makeSplit(path, 0, length, new String[0]));}}// Save the number of input files for metrics/loadgen// 为 job 设置文件数 mapreduce.input.fileinputformat.numinputfilesjob.getConfiguration().setLong(NUM_INPUT_FILES, files.size());sw.stop();if (LOG.isDebugEnabled()) {LOG.debug("Total # of splits generated by getSplits: " + splits.size()+ ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));}return splits;}
4、YARNRunner.submitJob
public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)throws IOException, InterruptedException {//添加TokensaddHistoryToken(ts);//构建启动MapReduce ApplicationMaster所需的所有信息// 1、设置LocalResources(表示运行容器所需的本地资源,NodeManager负责在启动容器之前本地化资源)// 2、设置安全令牌// 3、为ApplicationMaster容器设置ContainerLaunchContext(表示NodeManager启动容器所需的所有信息包括:ContainerId、资源情况、分配给谁、安全令牌、环境变量、启动容器的命令、容器失败退出时的重试策略、运行容器所必需的,如二进制文件、jar、共享对象、辅助文件等、)// 4、设置ApplicationSubmissionContext(表示ResourceManager启动应用程序的ApplicationMaster所需的所有信息。包括:ApplicationId、用户、名称、优先级、执行ApplicationMaster的容器的ContainerLaunchContext、可尝试的最大次数、尝试间隔、NodeManager处理应用程序日志所需的所有信息)// 5、设置ApplicationMaster资源请求// 6、为AM容器请求设置标签(如果存在)// 7、为job容器设置标签ApplicationSubmissionContext appContext =createApplicationSubmissionContext(conf, jobSubmitDir, ts);// Submit to ResourceManager// 向ResourceManager提交try {//最终是用YarnClient来提交到YarnApplicationId applicationId =resMgrDelegate.submitApplication(appContext);ApplicationReport appMaster = resMgrDelegate.getApplicationReport(applicationId);String diagnostics =(appMaster == null ?"application report is null" : appMaster.getDiagnostics());if (appMaster == null|| appMaster.getYarnApplicationState() == YarnApplicationState.FAILED|| appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) {throw new IOException("Failed to run job : " +diagnostics);}return clientCache.getClient(jobId).getJobStatus(jobId);} catch (YarnException e) {throw new IOException(e);}}
5、YarnClientImpl.submitApplication
/*** 向YARN提交新申请,这是一个阻塞调用-在提交的应用程序成功提交并被ResourceManager接受之前,* 它不会返回ApplicationId。* 用户在提交新应用程序时应提供ApplicationId作为参数ApplicationSubmissionContext的一部分* 这在内部调用ApplicationClientProtocol.submitApplication() 之后在内部调用 ApplicationClientProtocol.getApplicationReport()* */
public ApplicationIdsubmitApplication(ApplicationSubmissionContext appContext)throws YarnException, IOException {//获取applicationIdApplicationId applicationId = appContext.getApplicationId();if (applicationId == null) {throw new ApplicationIdNotProvidedException("ApplicationId is not provided in ApplicationSubmissionContext");}//构建SubmitApplicationRequest(向ResourceManager提交应用程序的请求信息)SubmitApplicationRequest request =Records.newRecord(SubmitApplicationRequest.class);request.setApplicationSubmissionContext(appContext);// Automatically add the timeline DT into the CLC// Only when the security and the timeline service are both enabled//仅当安全和时间线服务都启用时自动将时间线DT添加到CLC中if (isSecurityEnabled() && timelineV1ServiceEnabled) {addTimelineDelegationToken(appContext.getAMContainerSpec());}//TODO: YARN-1763:Handle RM failovers during the submitApplication call.//提交作业//客户端用于向ResourceManager提交新应用程序的接口//客户端需要通过SubmitApplicationRequest提供详细信息,如运行ApplicationMaster所需的队列、用于启动Application Master的等效队列等rmClient.submitApplication(request);int pollCount = 0;long startTime = System.currentTimeMillis();//Job等待状态设置EnumSet<YarnApplicationState> waitingStates = EnumSet.of(YarnApplicationState.NEW,YarnApplicationState.NEW_SAVING,YarnApplicationState.SUBMITTED);//Job失败状态设置EnumSet<YarnApplicationState> failToSubmitStates = EnumSet.of(YarnApplicationState.FAILED,YarnApplicationState.KILLED); while (true) {try {//获取应用的报告,包括:// ApplicationId// Applications user// Application queue// Application name// 允许ApplicationMaster的主机// ApplicationMaster的RPC端口// 跟踪url// ApplicationMaster的各种状态// 出现错误时的诊断信息// 应用的开始时间// 如果开启了安全性,应用的客户端令牌ApplicationReport appReport = getApplicationReport(applicationId);YarnApplicationState state = appReport.getYarnApplicationState();if (!waitingStates.contains(state)) {if(failToSubmitStates.contains(state)) {throw new YarnException("Failed to submit " + applicationId + " to YARN : " + appReport.getDiagnostics());}LOG.info("Submitted application " + applicationId);break;}long elapsedMillis = System.currentTimeMillis() - startTime;if (enforceAsyncAPITimeout() &&elapsedMillis >= asyncApiPollTimeoutMillis) {throw new YarnException("Timed out while waiting for application " +applicationId + " to be submitted successfully");}// Notify the client through the log every 10 poll, in case the client// is blocked here too long.//每10次轮询通过日志通知客户端,以防客户端在此处被阻止的时间过长。if (++pollCount % 10 == 0) {LOG.info("Application submission is not finished, " +"submitted application " + applicationId +" is still in " + state);}try {//通过yarn.client.app-submission.poll-interval 设置,默认值200msThread.sleep(submitPollIntervalMillis);} catch (InterruptedException ie) {String msg = "Interrupted while waiting for application "+ applicationId + " to be successfully submitted.";LOG.error(msg);throw new YarnException(msg, ie);}} catch (ApplicationNotFoundException ex) {// FailOver or RM restart happens before RMStateStore saves// ApplicationState//故障转移或RM重新启动发生在RMStateStore保存ApplicationState之前LOG.info("Re-submit application " + applicationId + "with the " +"same ApplicationSubmissionContext");rmClient.submitApplication(request);}}return applicationId;}
三、总结
1、构建Configuration,并加载hadoop默认的配置文件core-default.xml、core-site.xml
2、解析命令行参数,配置用户配置的环境变量
3、设置Job信息,比如:主类、Mapper类、Reduce类、Combiner类、输出格式、输入输出文件等
4、异步提交Job,实时监控作业并打印Job状态
5、根据用户身份和权限构建Cluster,并向集群提交Job
6、检查Job的输入和输出规格
7、计算Job的InputSplit(格式:<输入文件路径,开始,偏移量>,默认分片数量=所有输入文件对应的块的数量,且每个分片对应一个Mapper)
8、如有必要,请为Job的DistributedCache设置必要的记帐信息
9、将Job的jar和配置复制到分布式文件系统上的map-reduce系统目录中
10、将作业提交到ResourceManager,并可选择监视其状态