若当前JobClient (0.22 hadoop) 运行在YARN.则job提交任务运行在YARNRunner
Hadoop Yarn 框架原理及运作机制
主要步骤
- 作业提交
- 作业初始化
- 资源申请与任务分配
- 任务执行
具体步骤
在运行作业之前,Resource Manager和Node Manager都已经启动,所以在上图中,Resource Manager进程和Node Manager进程不需要启动
- 1. 客户端进程通过runJob(实际中一般使用waitForCompletion提交作业)在客户端提交Map Reduce作业(在Yarn中,作业一般称为Application应用程序)
- 2. 客户端向Resource Manager申请应用程序ID(application id),作为本次作业的唯一标识
- 3. 客户端程序将作业相关的文件(通常是指作业本身的jar包以及这个jar包依赖的第三方的jar),保存到HDFS上。也就是说Yarn based MR通过HDFS共享程序的jar包,供Task进程读取
- 4. 客户端通过runJob向ResourceManager提交应用程序
- 5.a/5.b. Resource Manager收到来自客户端的提交作业请求后,将请求转发给作业调度组件(Scheduler),Scheduler分配一个Container,然后Resource Manager在这个Container中启动Application Master进程,并交由Node Manager对Application Master进程进行管理
- 6. Application Master初始化作业(应用程序),初始化动作包括创建监听对象以监听作业的执行情况,包括监听任务汇报的任务执行进度以及是否完成(不同的计算框架为集成到YARN资源调度框架中,都要提供不同的ApplicationMaster,比如Spark、Storm框架为了运行在Yarn之上,它们都提供了ApplicationMaster)
- 7. Application Master根据作业代码中指定的数据地址(数据源一般来自HDFS)进行数据分片,以确定Mapper任务数,具体每个Mapper任务发往哪个计算节点,Hadoop会考虑数据本地性,本地数据本地性、本机架数据本地性以及最后跨机架数据本地性)。同时还会计算Reduce任务数,Reduce任务数是在程序代码中指定的,通过job.setNumReduceTask显式指定的
- 8.如下几点是Application Master向Resource Manager申请资源的细节
- 8.1 Application Master根据数据分片确定的Mapper任务数以及Reducer任务数向Resource Manager申请计算资源(计算资源主要指的是内存和CPU,在Hadoop Yarn中,使用Container这个概念来描述计算单位,即计算资源是以Container为单位的,一个Container包含一定数量的内存和CPU内核数)。
- 8.2 Application Master是通过向Resource Manager发送Heart Beat心跳包进行资源申请的,申请时,请求中还会携带任务的数据本地性等信息,使得Resource Manager在分配资源时,不同的Task能够分配到的计算资源尽可能满足数据本地性
- 8.3 Application Master向Resource Manager资源申请时,还会携带内存数量信息,默认情况下,Map任务和Reduce任务都会分陪1G内存,这个值是可以通过参数mapreduce.map.memory.mb and mapreduce.reduce.memory.mb进行修改。
5. YARNRunner
@Override public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) throws IOException, InterruptedException { addHistoryToken(ts); // Construct necessary information to start the MR AM ApplicationSubmissionContext appContext = createApplicationSubmissionContext(conf, jobSubmitDir, ts); // Submit to ResourceManager try { ApplicationId applicationId = resMgrDelegate.submitApplication(appContext); ApplicationReport appMaster = resMgrDelegate .getApplicationReport(applicationId); String diagnostics = (appMaster == null ? "application report is null" : appMaster.getDiagnostics()); if (appMaster == null || appMaster.getYarnApplicationState() == YarnApplicationState.FAILED || appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) { throw new IOException("Failed to run job : " + diagnostics); } return clientCache.getClient(jobId).getJobStatus(jobId); } catch (YarnException e) { throw new IOException(e); } }
调用YarnClient的submitApplication()方法,其实现如下:
6. YarnClientImpl
@Override public ApplicationId submitApplication(ApplicationSubmissionContext appContext) throws YarnException, IOException { ApplicationId applicationId = appContext.getApplicationId(); if (applicationId == null) { throw new ApplicationIdNotProvidedException( "ApplicationId is not provided in ApplicationSubmissionContext"); } SubmitApplicationRequest request = Records.newRecord(SubmitApplicationRequest.class); request.setApplicationSubmissionContext(appContext); // Automatically add the timeline DT into the CLC // Only when the security and the timeline service are both enabled if (isSecurityEnabled() && timelineServiceEnabled) { addTimelineDelegationToken(appContext.getAMContainerSpec()); } //TODO: YARN-1763:Handle RM failovers during the submitApplication call. rmClient.submitApplication(request); int pollCount = 0; long startTime = System.currentTimeMillis(); EnumSet waitingStates = EnumSet.of(YarnApplicationState.NEW, YarnApplicationState.NEW_SAVING, YarnApplicationState.SUBMITTED); EnumSet failToSubmitStates = EnumSet.of(YarnApplicationState.FAILED, YarnApplicationState.KILLED); while (true) { try { ApplicationReport appReport = getApplicationReport(applicationId); YarnApplicationState state = appReport.getYarnApplicationState(); if (!waitingStates.contains(state)) { if(failToSubmitStates.contains(state)) { throw new YarnException("Failed to submit " + applicationId + " to YARN : " + appReport.getDiagnostics()); } LOG.info("Submitted application " + applicationId); break; } long elapsedMillis = System.currentTimeMillis() - startTime; if (enforceAsyncAPITimeout() && elapsedMillis >= asyncApiPollTimeoutMillis) { throw new YarnException("Timed out while waiting for application " + applicationId + " to be submitted successfully"); } // Notify the client through the log every 10 poll, in case the client // is blocked here too long. if (++pollCount % 10 == 0) { LOG.info("Application submission is not finished, " + "submitted application " + applicationId + " is still in " + state); } try { Thread.sleep(submitPollIntervalMillis); } catch (InterruptedException ie) { LOG.error("Interrupted while waiting for application " + applicationId + " to be successfully submitted."); } } catch (ApplicationNotFoundException ex) { // FailOver or RM restart happens before RMStateStore saves // ApplicationState LOG.info("Re-submit application " + applicationId + "with the " + "same ApplicationSubmissionContext"); rmClient.submitApplication(request); } } return applicationId; }
7. ClientRMService
ClientRMService是resource manager的客户端接口。这个模块处理从客户端到resource mananger的rpc接口。
@Override public SubmitApplicationResponse submitApplication( SubmitApplicationRequest request) throws YarnException { ApplicationSubmissionContext submissionContext = request .getApplicationSubmissionContext(); ApplicationId applicationId = submissionContext.getApplicationId(); // ApplicationSubmissionContext needs to be validated for safety - only // those fields that are independent of the RM's configuration will be // checked here, those that are dependent on RM configuration are validated // in RMAppManager. String user = null; try { // Safety user = UserGroupInformation.getCurrentUser().getShortUserName(); } catch (IOException ie) { LOG.warn("Unable to get the current user.