hadoop临时文件 jar包_hadoop之Mapper/reducer源码分析之二

若当前JobClient (0.22 hadoop) 运行在YARN.则job提交任务运行在YARNRunner

Hadoop Yarn 框架原理及运作机制

26ba9d71df83d83ed2c6e0e560d63518.png

主要步骤

  • 作业提交
  • 作业初始化
  • 资源申请与任务分配
  • 任务执行

具体步骤

在运行作业之前,Resource Manager和Node Manager都已经启动,所以在上图中,Resource Manager进程和Node Manager进程不需要启动

  • 1. 客户端进程通过runJob(实际中一般使用waitForCompletion提交作业)在客户端提交Map Reduce作业(在Yarn中,作业一般称为Application应用程序)
  • 2. 客户端向Resource Manager申请应用程序ID(application id),作为本次作业的唯一标识
  • 3. 客户端程序将作业相关的文件(通常是指作业本身的jar包以及这个jar包依赖的第三方的jar),保存到HDFS上。也就是说Yarn based MR通过HDFS共享程序的jar包,供Task进程读取
  • 4. 客户端通过runJob向ResourceManager提交应用程序
  • 5.a/5.b. Resource Manager收到来自客户端的提交作业请求后,将请求转发给作业调度组件(Scheduler),Scheduler分配一个Container,然后Resource Manager在这个Container中启动Application Master进程,并交由Node Manager对Application Master进程进行管理
  • 6. Application Master初始化作业(应用程序),初始化动作包括创建监听对象以监听作业的执行情况,包括监听任务汇报的任务执行进度以及是否完成(不同的计算框架为集成到YARN资源调度框架中,都要提供不同的ApplicationMaster,比如Spark、Storm框架为了运行在Yarn之上,它们都提供了ApplicationMaster)
  • 7. Application Master根据作业代码中指定的数据地址(数据源一般来自HDFS)进行数据分片,以确定Mapper任务数,具体每个Mapper任务发往哪个计算节点,Hadoop会考虑数据本地性,本地数据本地性、本机架数据本地性以及最后跨机架数据本地性)。同时还会计算Reduce任务数,Reduce任务数是在程序代码中指定的,通过job.setNumReduceTask显式指定的
  • 8.如下几点是Application Master向Resource Manager申请资源的细节
  • 8.1 Application Master根据数据分片确定的Mapper任务数以及Reducer任务数向Resource Manager申请计算资源(计算资源主要指的是内存和CPU,在Hadoop Yarn中,使用Container这个概念来描述计算单位,即计算资源是以Container为单位的,一个Container包含一定数量的内存和CPU内核数)。
  • 8.2 Application Master是通过向Resource Manager发送Heart Beat心跳包进行资源申请的,申请时,请求中还会携带任务的数据本地性等信息,使得Resource Manager在分配资源时,不同的Task能够分配到的计算资源尽可能满足数据本地性
  • 8.3 Application Master向Resource Manager资源申请时,还会携带内存数量信息,默认情况下,Map任务和Reduce任务都会分陪1G内存,这个值是可以通过参数mapreduce.map.memory.mb and mapreduce.reduce.memory.mb进行修改。

5. YARNRunner

@Override public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) throws IOException, InterruptedException {  addHistoryToken(ts);  // Construct necessary information to start the MR AM ApplicationSubmissionContext appContext = createApplicationSubmissionContext(conf, jobSubmitDir, ts); // Submit to ResourceManager try { ApplicationId applicationId = resMgrDelegate.submitApplication(appContext); ApplicationReport appMaster = resMgrDelegate .getApplicationReport(applicationId); String diagnostics = (appMaster == null ? "application report is null" : appMaster.getDiagnostics()); if (appMaster == null || appMaster.getYarnApplicationState() == YarnApplicationState.FAILED || appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) { throw new IOException("Failed to run job : " + diagnostics); } return clientCache.getClient(jobId).getJobStatus(jobId); } catch (YarnException e) { throw new IOException(e); } }

调用YarnClient的submitApplication()方法,其实现如下: 

6. YarnClientImpl

@Override public ApplicationId submitApplication(ApplicationSubmissionContext appContext) throws YarnException, IOException { ApplicationId applicationId = appContext.getApplicationId(); if (applicationId == null) { throw new ApplicationIdNotProvidedException( "ApplicationId is not provided in ApplicationSubmissionContext"); } SubmitApplicationRequest request = Records.newRecord(SubmitApplicationRequest.class); request.setApplicationSubmissionContext(appContext); // Automatically add the timeline DT into the CLC // Only when the security and the timeline service are both enabled if (isSecurityEnabled() && timelineServiceEnabled) { addTimelineDelegationToken(appContext.getAMContainerSpec()); } //TODO: YARN-1763:Handle RM failovers during the submitApplication call. rmClient.submitApplication(request); int pollCount = 0; long startTime = System.currentTimeMillis(); EnumSet waitingStates =  EnumSet.of(YarnApplicationState.NEW, YarnApplicationState.NEW_SAVING, YarnApplicationState.SUBMITTED); EnumSet failToSubmitStates =  EnumSet.of(YarnApplicationState.FAILED, YarnApplicationState.KILLED);  while (true) { try { ApplicationReport appReport = getApplicationReport(applicationId); YarnApplicationState state = appReport.getYarnApplicationState(); if (!waitingStates.contains(state)) { if(failToSubmitStates.contains(state)) { throw new YarnException("Failed to submit " + applicationId +  " to YARN : " + appReport.getDiagnostics()); } LOG.info("Submitted application " + applicationId); break; } long elapsedMillis = System.currentTimeMillis() - startTime; if (enforceAsyncAPITimeout() && elapsedMillis >= asyncApiPollTimeoutMillis) { throw new YarnException("Timed out while waiting for application " + applicationId + " to be submitted successfully"); } // Notify the client through the log every 10 poll, in case the client // is blocked here too long. if (++pollCount % 10 == 0) { LOG.info("Application submission is not finished, " + "submitted application " + applicationId + " is still in " + state); } try { Thread.sleep(submitPollIntervalMillis); } catch (InterruptedException ie) { LOG.error("Interrupted while waiting for application " + applicationId + " to be successfully submitted."); } } catch (ApplicationNotFoundException ex) { // FailOver or RM restart happens before RMStateStore saves // ApplicationState LOG.info("Re-submit application " + applicationId + "with the " + "same ApplicationSubmissionContext"); rmClient.submitApplication(request); } } return applicationId; }

7. ClientRMService

ClientRMService是resource manager的客户端接口。这个模块处理从客户端到resource mananger的rpc接口。

@Override public SubmitApplicationResponse submitApplication( SubmitApplicationRequest request) throws YarnException { ApplicationSubmissionContext submissionContext = request .getApplicationSubmissionContext(); ApplicationId applicationId = submissionContext.getApplicationId(); // ApplicationSubmissionContext needs to be validated for safety - only // those fields that are independent of the RM's configuration will be // checked here, those that are dependent on RM configuration are validated // in RMAppManager. String user = null; try { // Safety user = UserGroupInformation.getCurrentUser().getShortUserName(); } catch (IOException ie) { LOG.warn("Unable to get the current user.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/538839.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ANDROID:SHOWASACTION="NEVER"是做什么用的?

原文地址:http://www.cnblogs.com/android-joker/p/4478491.html 点击阅读原文 --------------------------------------------------------- 安卓开发项目文件中有一个目录叫做menu,里面有tybmain.xml item选项里有一句 android:showAsAction "…

吴恩达ex3_Wu-Enda机器学习编程作业Python实现EX3,吴恩达,machinelearning,python,ex3nn

# -*- coding: utf-8 -*-"""Created on Wed Jul 1 20:28:57 2020author: cheetah023"""import numpy as npimport matplotlib.pyplot as pltimport scipy.io as sciimport random as ra#函数定义def sigmoid(X):return 1 /(1 np.exp(-X))def pr…

php数组验证用户名密码,求个php数组验证问题,在线等

现在有这么一个数组,是属性表Array([0] > Array([list_attr_id] > 30[list_attr_name] > 颜色[list_attr_attr] > 黑色|白色|金色[list_attr_price] > 0|10.1|20[list_attr_shop_id] > 28)[1] > Array([list_attr_id] > 31[list_attr_name] > 规格[…

基于python的视频监控系统_Python实现微信监控报警系统

概述: 本文主要分享一下博主在学习wxpy 的过程中开发的一个小程序。博主在最近有一个监控报警的需求需要完成,然后刚好在学习wxpy 这个东西,因此很巧妙的将工作和学习联系在一起。 博文中主要使用到的技术设计到Python,Redis,以及…

python 编码文件json.loads json.dumps

python 编码文件json.loads json.dumps import yaml d {name: 张三, age: 1} print d jd json.dumps(d, ensure_asciiFalse, encodingutf-8)) ud json.loads(jd, encodingutf-8) print ud ud yaml.safe_load(jd, encodingutf-8) print udposted on 2018-04-23 15:18 秦瑞It…

getActionBar()报空指针异常

调用 getActionBar()的Activity类 public class WlanListActivity extends AppCompatActivity 在使用getActionBar("标题内容")的时候报空指针。 原因是要用AppCompatActivity类里的getSupportActionBar()

python 类中定义列表_Python3中的自定义列表类,具有

我想用python3编写一个自定义列表类,就像在这个问题How would I create a custom list class in python?中一样,但与该问题不同,我想实现__get__和{}方法。虽然我的类与list类似,但是这些方法背后隐藏着一些神奇的操作。所以我想…

红黑树与平衡二叉树_百图详解红黑树,想不理解都难

之前在公司组内分享了红黑树的工作原理,今天把它整理下发出来,希望能对大家有所帮助,对自己也算是一个知识点的总结。这篇文章算是我写博客写公众号以来画图最多的一篇文章了,没有之一,我希望尽可能多地用图片来形象地…

linux 父子进程结束,Linux下让父进程结束后,子进程自动结束

在多进程编程的时候,经常会遇到这样的情况。父进程创建了一堆子进程,当遇到错误或者操作失误的时候把父进程关闭了,但是子进程还在跑,不得不一个一个地杀死子进程,或者使用ps,grep,awk,kill来配合批量杀死。之前在写 x…

android:showAsAction 无效

我想要的效果 但actionbar上的搜索菜单不显示 在androidstudio里,android:showAsAction"always"标红 根据提示,需要加入 xmlns:app"http://schemas.android.com/apk/res-auto" 加入后依然无效 正确的加入方式是:

Exchange_Server_2013在Windows_2008_R2部署

Exchange Server 2013可以部署在Windows Server 2012的平台,也可以部署在Windows Server 2008 R2的平台。如果部署在Windows Server 2008 R2平台要求操作系统版本为Windows Server 2008 R2 SP1的版本。如下拓扑图:在本架构中有两台服务器,都安…

建立副本名称冲突_包的建立(一)

这次的内容,涉及到 R 语言包的建立。事实上,CRAN 提供的官方参考指南,并不适合快速阅读,且内容繁杂。比较适合作为后期提高的 教材。而 http://r-pkgs.had.co.nz/ 上 的教程则更适合作为 R 包编写的帮助指南。这里,仅仅…

Android 多选列表

原文&#xff1a;http://blog.csdn.net/wljun739/article/details/37655209 点击阅读原文 ----------------------------------------------------------- 1、activity_main.xml[java] view plaincopy<LinearLayout xmlns:android"http://schemas.android.com/apk/res/…

python自带的编辑器怎么换行_Python3基础 print 自带换行功能

镇场诗&#xff1a; ———大梦谁觉&#xff0c;水月中建博客。百千磨难&#xff0c;才知世事无常。 ———今持佛语&#xff0c;技术无量愿学。愿尽所学&#xff0c;铸一良心博客。 —————————————————————————————————————————— 1 …

查看db2数据库名linux,【名说】DB2数据库备份与恢复(linux环境)

lslinux 下备份db2数据库1.SSH方式&#xff1a;登录db2数据库(因为是linux环境 &#xff0c; putty就不错)2.进入备份文件夹&#xff1a;cd /home/backup/db2 list application | grep 数据库名//(可能会有一些连接进程&#xff0c;有则全部杀掉)//杀进程&#xff1a;db2 "…

leetcode 回文数

2019独角兽企业重金招聘Python工程师标准>>> 判断一个整数是否是回文数。回文数是指正序&#xff08;从左向右&#xff09;和倒序&#xff08;从右向左&#xff09;读都是一样的整数。 示例 1: 输入: 121 输出: true 示例 2: 输入: -121 输出: false 解释: 从左向右…

安装ae显示安装程序无法初始化_adobe CC 2015/2017安装失败(adobe cc安装不了的解决办法)...

adobe CC 2015/2017安装失败(adobe cc安装不了的解决办法)书法字体2015.06.18Adobe Application ManagerAdobe Creative Cloud 2015/2017全系统软件已经可以从官网下载了&#xff0c;相信又将有一大波设计师会更新安装adobe CC 2015/2017软件。本着尝鲜的精神&#xff0c;本人也…

Hadoop控制输出文件命名

原文地址&#xff1a;http://blog.csdn.net/zuochanxiaoheshang/article/details/8769198 点击阅读原文 --------------------------------------------------- Hadoop 控制输出文件命名 在一般情况下&#xff0c;Hadoop 每一个 Reducer 产生一个输出文件&#xff0c;文件以 …

office高级应用与python综合案例教程_office高级应用与python综合案例实验指导--详细介绍...

随着社会经济的发展&#xff0c;现代信息技术逐渐改变着人们的工作和生活方式。为使学生掌握办公自动化软件高级应用的技能&#xff0c;了解Python程序基础知识&#xff0c;综合运用办公自动化软件分析和解决实际问题&#xff0c;编者编写了本书。 本书围绕高等学校培养应用型人…

linux系统的安全机制有哪些内容,系统安全机制

AG351.SELINUXSElinux 是一个强制访问控制系统,它为每个进程与文件都打上一个安全上下文标签,而 selinux 通过这个标签对系统访问控制进行管理。2.针对车载产品对于启动安全、平台运行安全、通信安全三个主要领域有着特 殊 很 高 的 要 求 , 为 此 Quectel 结 合 了 Qualcomm 给…