MR作业的提交监控、输入输出控制及特性使用

2019独角兽企业重金招聘Python工程师标准>>> hot3.png

MR作业的提交监控、输入输出控制及特性使用 博客分类: hadoop

提交作业并监控

JobClient是用户作业与JobTracker交互的主要接口,它提供了提交作业,跟踪作业进度、访问任务报告及logs、以及获取MR集群状态信息等方法。

提交作业流程包括:

  • 检查作业的输入输出
  • 计算作业的输入分片(InputSplit)
  • 如果需要,为DistributedCache设置必须的账户信息
  • 将作业用到的jar包文件和配置信息拷贝至文件系统(一般为HDFS)上的MR系统路径中
  • 提交作业到JobTracker,并可监控作业状态

作业历史(Job History)文件会记录在hadoop.job.history.user.location指定的位置,默认在作业输出路径下的logs/history/路径下。因此历史日志默认在mapred.output.dir/logs/history下。用户可以将hadoop.job.history.user.location值设置为none来不记录作业历史。

使用命令来查看历史日志:

1
$hadoop job -history output-dir

上面命令会显示作业的详细信息、失败的被kill的任务(tip)的详细信息。使用下面命令可以查看作业更详细的信息:

1
$hadoop job -history all output-dir

可以使用OutputLogFilter从输出路径中过滤日志文件。

一般,我们创建应用,通过JobConf设置作业的各种属性,然后使用JobClient提交作业并监控进度。

作业控制

有时可能需要一个作业链完成复杂的任务。这点是可以轻松实现的,因为作业输出一般都在分布式文件系统上,作业输出可以当做下个作业的输入,这样就形成了链式作业。

这种作业成功是否依赖于客户端。客户端可以使用以下方式来控制作业的执行:

  • runJob(JobConf):提交作业并仅在作业完成时返回
  • submitJob(JobConf):提交作业后立即返回一个RunningJob的引用,使用它可以查询作业状态并处理调度逻辑。
  • JobConf.setJobEndNotificationURI(String):设置作业完成时通知

你也可以使用Oozie来实现复杂的作业链。

作业输入

下面讲作业输入的内容。
InputFormat 描述MR作业的输入信息。InputFormat有以下作用:

  1. 验证作业的输入信息
  2. 将输入文件拆分为逻辑上的输入分片(InputSplit),每个输入分片被分配到一个独立的Mapper
  3. 提供RecordReader实现类从输入分片中搜集输入记录供Mapper处理

基于文件的InputFormat实现类即FileInputFormal是通过计算输入文件的总大小(以字节为单位)来分裂成逻辑分片的。然而文件系统的块大小又作为输入分片大小的上限,下限可以通过mapred.min.split.size来设定。

基于输入大小进行逻辑分片机制在很多情况下是不适合的,还需要注意记录边界。在这些情况下,应用应该实现RecordReader来处理记录边界,为独立的mapper任务提供面向行的逻辑分片。

TextInputFormat是默认的输入格式。

如果作业输入格式为TextInputFormat,MR框架可以检测以.gz扩展名的输入文件并自动使用合适的压缩算法来解压文件。特别注意的是,.gz格式的压缩文件不能被分片,每个文件作为一个输入分片被一个mapper处理。

输入分片(InputSplit)

InputSplit的数据会被一个独立的mapper来处理。一般输入分片是对输入基于面向字节为单位的。可以使用RecordReader来处理提供面向行的输入分片。

FileSplit是默认的InputSplit,通过设置map.input.file设置输入文件路径。

RecordReader

RecordReader从InputSplit读入数据对。RecordReader将InputSplit提供得面向字节的的输入转换为面向行的分片给Mapper实现类来处理。RecordReader的责任就是处理行边界并以kv方式将数据传给mapper作业。

作业输出

OutFormat描述作业的输出信息。MR框架使用OutputFormat来处理:

  • 验证作业的输出信息。例如验证输出路径是否存在.
  • 提供RecordWriter实现来写作业的输出文件。输出文件存储在文件系统中。TextOutputFormat是默认的输出格式。

OutputCommitter

OutputCommitter是MR作业的输出提交对象。MR框架使用它来处理:

  • 初始化时准备作业。例如,作业初始化时创建临时输出路径。作业准备阶段通过一个独立的任务在作业的PREP状态时完成,然后初始化作业的所有任务。一旦准备阶段完成,作业状态切换到Running状态。
  • 作业完成后清理作业。例如,在作业完成后清理输出临时路径。作业清理阶段通过一个独立的任务在作业最后完成。作业在清理阶段完成后设置为成功/失败/中止。
  • 设置任务的临时输出。任务准备阶段在任务初始化时作为同一个任务的一部分。
  • 检查任务是否需要一个提交对象。目的是为了在任务不需要时避免一个提交过程。
  • 提交任务输出。一旦任务完成,如果需要的话,任务将提交其输出物。
  • 销毁任务的提交。如果任务失败或者被中止,输出将被清理。如果不能被清理(如异常的块),与该任务相同的任务id会启动并处理清理。

FileOutputCommitter是默认的OutputCommiter实现。作业准备及清理任务占用同一个TaskTracker上空闲的map或者reduce槽。作业清理任务、任务清理任务以及作业准备任务按照该顺序拥有最高的优先级。

任务副作用文件

在一些应用中,任务需要创建和/或写入文件,这些文件不同于实际的作业输出文件。

在这种情况下,相同的2个Mapper或者Reducer有可能同时运行(比如推测执行的任务),并在文件系统上尝试打开和或者写入相同文件或路径。所以你必须为每个任务的执行使用唯一名字(比如使用attapid,attempt2007092218120001m000000_0)。

为了避免这样的情况,当OutputCommiter为FileOutputCommiter时,MR框架通过${mapred.work.output.dir}为每个任务执行尝试维护一个特殊的路径:${mapred.output.dir}/temporary/${taskid},这个路径用来存储任务执行的输出。在任务执行成功完成时,${mapred.output.dir}/temporary/${taskid} 下的文件被移到${mapred.output.dir}下。当然框架丢弃不成功的任务执行的子路径。这个进程对用户应用透明。

MR应用开发者可以利用这个特性在执行过程中通过FileOutputFormat.getWorkOutputPath()在${mapred.work.output.dir} 下创建需要的site files,框架会像成功的任务试执行那样处理,这样就避免为每个试执行任务取唯一名字。

注意:特殊试执行任务执行过程中${mapred.work.output.dir} 值实际是${mapred.output.dir}/temporary/${taskid},该值被框架定制。所以直接在FileOutputFormat.getWorkOutputPath()返回的路径中创建site-files,来使用这个特性。

对于只有mapper的作业,side-files将直接进入hdfs。

RecordWriter

RecordWriter将输出的kv值写入输出文件。RecordWriter实现类将作业输出写入文件系统。

其他特性

提交作业到队列

用户提交的作业到队列中,队列是一个作业的集合,允许MR提供一些特定功能。队列使用ACL控制哪些用户提交作业。队列一般和Hadoop Scheduler调度器一起使用。

Hadoop 安装后默认配置了名称为default的队列,这个队列是必须的。队列名称可以在hadoop配置文件中mapred.queue.names属性里配置。一些作业调度器比如Capacity Scheduler支持多个队列。

作业可以通过mapred.job.queue.name或者通过setQueueName(Stirng)设置队列名称,这是可选的。如果作业名称没有设置队列名称,则提交到default的队列中。

计数器Counters

计数器用来描述MR中所有的计数,可以是MR框架定义也可以是应用提供。每个计数器可以是任何Enum类型。一组特定的Enum被聚合成一个组即为Counters.Group.

应用可以定义Enum类型的计数器,并可以通过Reporter.incrCounter(Enum,long或者Reporter.incrCounter(String,String,long在map和或者reduce中更新这个计数器的值。然后这些计数器统一被框架聚合。

DistributedCache

DistributedCache可以高效地将应用明细、大的只读文件发布。DistributedCache是MR框架提供的缓存文件(如文本、压缩包、jar包等)的高效工具。

应用需要在JobConf里使用hdfs://指定地址进行缓存。这些文件必须在文件系统中已经存在。

MR作业的任务在某个节点上执行前,MR框架会拷贝必需的文件到这个节点。特别说明的是,作业必需的所有文件只需要拷贝一次,支持压缩包,并在各个节点上解压,有助于提高MR执行效率。

DistributedCache会跟踪所有缓存文件的修改时间戳。作业执行过程中应用或者外部不应该修改被缓存的文件。

通常,DistributedCache被用来缓存简单、只读的数据或者文本文件以及像压缩包/jar包等复杂文件。压缩包比如zip\tar\tgz\tar.gz文件到节点上被解压。所有文件都有执行权限。

属性 mapred.cache.{files|archives}配置的文件和包等可以被发布,多个文件可以使用“,”来分隔。也可以调用 DistributedCache.addCacheFile(URI,conf)或者DistributedCache.addCacheArchive(URI,conf) ,DistributedCache.setCacheFiles(URIs,conf)/DistributedCache.setCacheArchives(URIs,conf) 设置,其中URI为hdfs://host:port/absolute-path#link-name形式。如果使用streaming方式,可以通过 -cacheFile/-cacheArchive来设置。

另外,用户也可以通过DistributedCache.createSymlink(Configuration) API 将分布式缓存的文件使用符号链接到当前工作路径。或者也可以通过设置mapred.create.symlink 为yes。这样分布式缓存将URI的最末部分作为符号链接。例如hdfs://namenode:port/lib.so.1#lib.so 的符号链接为lib.so.

分布式缓存也可以用作在map,reduce阶段中基本的软件分布式处理机制,可以用来分布式处理jar包或者本地库。DistributedCache.addArchiveToClassPath(Path, Configuration)或者DistributedCache.addFileToClassPath(Path, Configuration)用来将需要缓存的文件或者jar包加入到作业运行的子jvm的classpath中。也可以通过mapred.job.classpath.{files|archives}来设置。同样这些分布式缓存的文件或jar包也可以使用符号链接到当前工作路径,当然也支持本地库并可被作业在执行中加载。

工具类(Tool)

Tool接口支持常用的Hadoop命令行参数的处理操作。它是MR工具或者应用的标准工具。应用应该通过ToolRunner.run(Tool,Stirng[])将标准命令行的参数使用GenericOptionsParser来处理。只处理应用的自定义参数。

Hadoop中命令行里通用的参数有:

1
2
3
4
-conf
-D
-fs
-jt

IsolationRunner

IsolationRunner是MR应用的调试工具。

要使用它,需要先设置keep.failed.task.files = true,另外请注意属性keep.tasks.files.pattern。

下一步,到失败作业的某个失败节点上,cd到TT的本地路径并运行IsolationRunner。

1
2
$ cd /taskTracker/${taskid}/work
$ hadoop org.apache.hadoop.mapred.IsolationRunner ../job.xml

IsolationRunner 会再单独的jvm中对相同的输入上运行失败的任务并可以debug。

性能分析(Profiling)

Profiling可以用来对作业的map,reduce任务使用java内置的性能分析工具进行采样,得出具有代表性(2个或3个试样)的采样结果。

用户可以通过设置mapred.task.profile属性指定框架是否对作业的任务收集性能信息。也可以通过API JobConf.setProfileEnabled(boolean)来指定。如果值为true,任务性能分析就启用了。采样结果保存在用户日志目录中,默认性能采样不开启。

一旦需要性能分析,可以通过属性mapred.task.profile.{maps|reduces} 设置采样任务的范围。使用JobConf.setProfileTaskRange(boolean,String)一样可以设置。默认的range是0-2.

可以设置mapred.task.profile.params确定分析参数,api方式为 JobConf.setProfileParams(String)。如果参数中含有“%s”占位符,MR框架会使用采样结果输出文件名替代它。这些参数被传到子JVM.默认值为-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s.

调试工具(debugging)

MR框架支持用户自定义脚本调试工具。例如当MR任务失败时,用户可以运行debug脚本处理任务日志。脚本可以访问任务的输出和错误日志,系统日志以及作业配置信息。调试脚本输出和错误流作为作业输出一部分显示到控制台诊断信息中。

下面讲讨论如何为作业提交调试脚本。当然脚本需要分发并提交到Hadoop中MR中。

如何分发脚本文件

使用DistributedCache分发并且为脚本文件建立符号链接。

如何提交脚本

一种便捷的途径是通过设置mapred.map.task.debug.script和mapred.reduce.task.debug.script分别实现对map和reduce指定调试脚本。也可以通过API方式,分别是JobConf.setMapDebugScript(String)和JobConf.setReduceDebugScript(String)。如果是streaming方式,可以通过命令行里指定-mapdebug 和-reducedebug实现。

脚本的参数是任务的输出流、错误流、系统日志以及job配置文件。调试模式下,以下命令在任务失败的节点上运行:

1
$script $stdout $stderr $syslog $jobconf

Pipes程序在命令形式上以第五个参数传入。如:

1
$script $stdout $stderr $syslog $jobconf $program

默认行为

对pipes,使用默认的脚本以gdb方式处理核心转储(core dumps),打印栈轨迹,并给出正在运行线程的信息。

JobControl

JobControl可以对一组MR作业和他们的依赖环境进行集中管理。

数据压缩

Hadoop Mapreduce可以对中间结果(map输出)和作业最终输出以指定压缩方式压缩。通常都会启用中间结果的压缩,因为可以显著提高作业运行效率并且不需要对作业本身进行改变。只有MR shuffle阶段生成的临时数据文件被压缩(最终结果可能或者不被压缩)。

Hadoop支持zlib,gzip,snappy压缩算法。推荐Snappy,因为其压缩和解压相比其他算法要高效。

因为性能和java库不可用原因,Hadoop也提供本地的zlib和gzip实现。详情可以参考这个文档

中间输出物

应用可以通过JobConf.setCOmpressMapOutput(boolean)设置是否对中间结果进行压缩,使用JobConf.setMapOutputCompressorClass(Class)设置压缩算法。

作业输出

通过FileOutputFormat.setCompressOutput(JobConf,boolean)设置是否压缩最终产出物,通过FileOutputFormat.setOutputComressorClass(JobConf,Class)设置压缩方式。

如果作业输出以SequenceFileOutputFormat格式存储。则可以通过SequenceFileOutputFormat.setOutputCompressionType(Jobconf,SequenceFile.CompressionType)API 设定压缩方式.压缩方式有RECORD/BLOCK默认是RECORD。

跳过损坏的记录

Hadoop提供了一个选项,在MR处理map阶段时跳过被损坏的输入记录。应用可以通过SkipBadRecords类使用这个特性。

作业处理时可能对确定的输入集上map任务会失败。通常是map函数存在bug,这时需要fix这些bug。但有时却无法解决这种特殊情况。比如这个bug可能是第三方库导致的。这时这些任务在经过若干尝试后仍然无法成功完成,作业失败。这时跳过这些记录集,对作业最终结果影响不大,仍然可以接受(比如对非常大的数据进行统计时)。

默认该特性没有开启。可以通过SkipBadRecords.setMapperMaxSkipRecords(Configuration,long)和SkipBadRecords.setReducerMaxSkipGroups(Configuration,long)开启。

开启后,框架对一定数目的map失败使用skipping 模式。更多信息可以查看SkipBadRecords.setAttemptsToStartSkipping(Configuration, int)。skipping模式中,map任务维护一个正被处理的记录范围值。这个过程框架通过被处理的记录行数计数器实现。可以了解SkipBadRecords.COUNTERMAPPROCESSED_RECORDS和SkipBadRecords.COUNTERREDUCEPROCESSED_GROUPS。这个计数器让框架知道多少行记录被成功处理了,哪些范围的记录导致了map的失败,当然这些坏记录就被跳过了。

被跳过的记录数目依赖应用被处理的记录计数器统计频率。建议当每行记录被处理时就增加该计数器。但是这个在一些应用中无法实现。这时框架可能也跳过了坏记录附近的记录。用户可以通过SkipBadRecords.setMapperMaxSkipRecords(Configuration,long)和SkipBadRecords.setReducerMaxSkipGroups(Configuration,long)控制被跳过的记录数。框架会使用类似二分搜索的方式努力去减小被跳过的记录范围。被跳过的范围会分为2部分,并对其中的一部分执行。一旦失败了,框架可以知道这部分含有损坏的记录。任务将重新执行直到遇到可接受的数据或者所有尝试机会用完。如果要提高任务尝试次数,可以通过JobConf.setMaxMapAttampts(int)和JobConf.setMaxReduceAttempts(int)设置。

被跳过的记录随后会以sequence file格式写入hdfs以便于后面可能的分析。路径可以通过SkipBadRecords.setSkipOutputPath(JobConf,Path)设定。

 

http://www.importnew.com/4736.html

转载于:https://my.oschina.net/xiaominmin/blog/1597339

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/453201.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

http协议与web本质

当你在浏览器地址栏敲入“http://www.csdn.net/”,然后猛按回车,呈现在你面前的,将是csdn的首页了(这真是废话,你会认为这是理所当然的)。作为一个开发者,尤其是web开发人员,我想你有…

Docker storage driver 选择

2019独角兽企业重金招聘Python工程师标准>>> Docker storage driver 选择 博客分类: docker 本文的目的是说明,如何在生产环境中选择Docker 的storage driver。以及对应Linux发行版本下Docker storage driver的配置方法。主要参考&#xff0c…

手机网站制作html5,【怎么样制作手机网站】如何使用dreamweavercs6建立手机网站?织梦手机WAP浏览模块如何制作手机网站?如何制作html5手机页面?...

【怎么样制作手机网站】如何使用dreamweavercs6建立手机网站?织梦手机WAP浏览模块如何制作手机网站?如何制作html5手机页面?下面就和小编一起来看看吧!如何使用dreamweavercs6建立手机网站?制作步骤如下:1。打开DreamweaverCS6软件,可以在DreamweaverCS6软件的开…

如果在docker中部署tomcat,并且部署java应用程序

2019独角兽企业重金招聘Python工程师标准>>> 如果在docker中部署tomcat,并且部署java应用程序 博客分类: docker 1、先说如何在docker中部署tomcat 第一步:root用户登录在系统根目录下创建文件夹tomcat7,命令如:mkdir tomcat7&…

Spring Boot结合thymeleaf

之前在Eclipse里写了个Spring Boot响应jsp的小demo,后来发现打成jar包导出之后找不到jsp文件了。经过在网上查阅信息与资料,发现Spring Boot对于jsp的支持其实是不好的,而且在一些书中和官方都明确表示没有办法支持在jar包中打入jsp文件。虽然…

视觉测量简介

1.1 视觉测量技术 1.1.1 现代检测技术的发展趋势 检测技术是现代化工业的基础技术之一,是保证产品质量的关键。在现代化的大生产之中,涉及到各种各样的检测。随着工业制造技术和加工工艺的提高和改进,对检测手段、检测速度和精度提出了更…

高并发系统之降级特技

2019独角兽企业重金招聘Python工程师标准>>> 高并发系统之降级特技 博客分类: 架构 在开发高并发系统时有三把利器用来保护系统:缓存、降级和限流。之前已经有一些文章介绍过缓存和限流了。本文将详细聊聊降级。当访问量剧增、服务出现问题&a…

freeradius 3.0 时间限制_创意营销3.0新模式下,易企秀要成为中国的Adobe

近几年,随着大数据和人工智能技术的发展,智能化、程序化营销在国内获得高速发展。从以创意内容、提升效率的工具到现在驱动企业数字化转型的智能营销,营销云在国内的热度与成熟度不断提升。营销云起源于“Enterprise Marketing Software Suit…

抢占式和非抢占式的进程调度

非抢占式(Nonpreemptive) 让进程运行直到结束或阻塞的调度方式 容易实现 适合专用系统,不适合通用系统 抢占式(Preemptive) 允许将逻辑上可继续运行的在运行过程暂停的调度方式 可防止单一进程长时间独占…

图形学基础知识

本篇主要给大家介绍图形学基础知识,了解Unity图像渲染机制,以及图像渲染管线流程。 主要是因为伴随着VR/AR的飞速发展,为了满足VR高清高帧率的极限渲染,着色器编程(Shader)也成为了Unity程序开发人员的必备…

调度队列模型

调度队列模型及准则 1 仅有进程调度的调度队列模型: 每个进程在执行时都可能出现以下三种情况: (1) 任务在给定的时间片内已经完成,该进程便在释放处理机后进入完成状态 (2) 任务在本次分得的时间片内尚未完成,OS便将该任务再放入…

数据库相关整理

一、MySQL 1、mysql如何做分页 mysql数据库做分页用limit关键字,它后面跟两个参数startIndex和pageSize 2、mysql引擎有哪些,各自的特点是什么? http://www.cnblogs.com/ctztake/p/8453990.html 3、数据库怎么建立索引 create index account_…

api接口怎么对接_系统对接项目管理方面怎么做?从一次项目接口对接说起

故事:最近业务方有一个新的业务合作模式,需要与第三方公司进行系统的对接,原本预期2周可以完成的项目,最后要用到3周时间才能完成,出现的现象其实还挺典型的,也不是没遇到过,因为自己这边的进度…

周转时间 平均周转时间 带权周转时间 平均带权周转时间

1.周转时间 2.平均周转时间 平均周转时间是对n个而言的 3.带权周转时间 真正的运行时间指的是进程占有处理机的时间 4.平均带权周转时间 即n个平均的带权周转时间

unity应用开发实战案例_「简历」STAR法则的实战应用,附手把手教学案例

关注应届生求职网,了解更多求职信息本文共2072字,预计阅读需3分钟本期分享导师-Anna_青云导师51Job职场导师知乎职场千赞答主多年猎头及多行业人力资源管理经历职业生涯规划师、职业生涯咨询师、心理咨询师终身学习践行者、斜杠青年、职场教练在简历优化…

SOA架构设计经验分享—架构、职责、数据一致性

1.背景介绍2.SOA的架构层次 2.1.应用服务(原子服务)2.2.组合服务2.3.业务服务(编排服务)3.SOA化的重构 3.1.保留服务空间,为了将来服务的组合4.运用DDDGRASP进行分析和设计(防止主观的判断导致错误的假设&a…

计算机设备没有音频,电脑没有音频设备怎么办

有些朋友的的电脑没有声音,任务栏右下角也没有小喇叭声音图标,进入到控制面板里面的声音选项里面去设置,在“音量”项目中显示“没有音频设备”,很多朋友遇到这种情况不知道应该怎么解决,今天笔者就电脑没有音频设备怎…

全志科技公司A83T Qt 支持双屏显示

目前A83T支持单屏显示,首屏为LCD或者首屏为hdmi,都使用无论使用SCREEN0还是SCREEN1都是使用FB0作为framebuffer,在android下可以实现LCD和HDMI同样屏幕显示,而我们需要LCD和HDMI分别显示。FrameBuffer采用的是linux下的framebuffe…

明日之后怎么跳过实名认证_明日之后宝箱达人活动怎么玩 明日之后宝箱达人可以开箱多少次...

《明日之后》宝箱达人是今天游戏中更新的活动,玩家们在部分的野外地图中可以找到宝箱。很多玩家都想知道这个宝箱获得的奖励是什么。接下来就让小编给大家带来明日之后宝箱达人活动奖励介绍,一起来看看吧。明日之后宝箱达人活动介绍 一、活动时间2020年9…