MapReduce运行机制

相关链接 
MapReduce中Shuffle机制详解——Map端Shuffle链接 
MapReduce中Shuffle机制详解——Reduce端Shuffle链接

MapReduce将作业job的整个运行过程分为两个阶段:Map阶段和Reduce阶段。按照时间顺序包括:输入分片(input split)、map阶段、combiner阶段、shuffle阶段和reduce阶段。系统执行排序、将map输出作为输入传给reducer的过程称为shuffle(shuffle是MapReducer的心脏)

Map阶段由一定数量的 Map Task组成 
1. 输入数据格式解析: InputFormat 
2. 输入数据处理: Mapper 
3. 本地合并: Combiner(local reduce) 
4. 数据分组: Partitioner

Reduce阶段由一定数量的 Reduce Task组成 
1. 数据远程拷贝 
2. 数据按照key排序 
3. 数据处理: Reducer 
4. 数据输出格式: OutputFormat

1、流程简介 
这里写图片描述 
各个map函数对所划分的数据并行处理,从不同的输入数据产生不同的中间结果输出 
各个reduce也各自并行计算,各自负责处理不同的中间结果数据集合进行reduce处理之前,必须等到所有的map函数做完 
在进入reduce前需要有一个同步障(barrier) 
这个阶段也负责对map的中间结果数据进行收集整理(aggregation & shuffle)处理,以便reduce更有效地计算最终结果, 最终汇总所有reduce的输出结果即可获得最终结果

第一步:假设一个文件有三行英文单词作为 MapReduce 的Input(输入)这里经过 Splitting 过程把文件分割为3块。分割后的3块数据就可以并行处理,每一块交给一个 map 线程处理。 
第二步:每个 map 线程中,以每个单词为key,以1作为词频数value,然后输出。 
第三步:每个 map 的输出要经过 shuffling(混洗),将相同的单词key放在一个桶里面,然后交给 reduce 处理。 
第四步:reduce 接受到 shuffling 后的数据, 会将相同的单词进行合并,得到每个单词的词频数,最后将统计好的每个单词的词频数作为输出结果。

上述就是 MapReduce 的大致流程,前两步可以看做 map 阶段,后两步可以看做 reduce 阶段。

2、输入分片与类型 
MapReduce将待处理数据执行逻辑切片(即按照一个特定切片大小,将待处理数据划分成逻辑上的多个split),然后每一个split分配一个mapTask并行实例处理

输入分片(split)与map对应,是每个map处理的唯一单位。每个分片包括多条记录,每个记录都有对应键值对。 
输入切片的接口:InputSplit接口(不需要开发人员直接处理,由InputFormat创建)

输入分片(input split):在进行map计算之前,mapreduce会根据输入文件计算输入分片(input split),每个输入分片(input split)针对一个map任务,输入分片(input split)存储的并非数据本身,而是一个分片长度和一个记录数据的位置的数组,输入分片(input split)往往和hdfs的block(块)关系很密切,存储位置供MapReduce使用以便将map任务尽量放在分片数据附近,而长度用来排序分片,以便优化处理最大的分片,从而最小化作业运行时间。

假如我们设定hdfs的块的大小是64mb,如果我们输入有三个文件,大小分别是3mb、65mb和127mb,那么mapreduce会把3mb文件分为一个输入分片(input split),65mb则是两个输入分片(input split)而127mb也是两个输入分片(input split),换句话说我们如果在map计算前做输入分片调整,例如合并小文件,那么就会有5个map任务将执行,而且每个map执行的数据大小不均,这个也是mapreduce优化计算的一个关键点。

输入类型

FileInputFormat类: 
FileInputFormat 是所有使用文件作为其数据源的 InputFormat 实现的基类。它提供了两个功能:一个定义哪些文件包含在一个作业的输入中;一个为输入文件生成分片的实现。把分片分割成记录的作业由其子类来完成。

TextlnputFormat类: 
TextInputFormat 是默认的 InputFormat。每条记录是一行输入。键是 LongWritable 类型,存储该行在整个文件中的字节偏移量。值是这行的内容,不包括任何行终止符(换行符和回车符),它是 Text 类型的。但是输入分片和HDFS块之间可能不能很好的匹配,出现跨块的情况

KeyValueTextlnputFormat类: 
TextInputFormat 的键,即每一行在文件中的字节偏移量,通常并不是特别有用。通常情况下,文件中的每一行是一个键/值对,使用某个分界符进行分隔,比如制表符。例如 以下由 Hadoop 默认 OutputFormat(即 TextOutputFormat)产生的输出。如果要正确处理这类 文件,KeyValueTextInputFormat 比较合适。可以通过 key.value.separator.in.input.line 属性来指定分隔符。它的默认值是一个制表符。

NLineInputFormat类: 
与TextInputFormat一样,键是文件中行的字节偏移量,值是行本身。主要是希望mapper收到固定行数的输入。

MultipleInputs多种输入: 
MultipleInputs类处理多种格式的输入,允许为每个输入路径指定InputFormat和Mapper。两个mapper的输出类型是一样的,所以reducer看到的是聚集后的map输出,并不知道输入是不同的mapper产生的。 
重载版本:addInputPath(),没有mapper参数,主要支持多种输入格式只有一个mapper。 
这里写图片描述

3、Map与Reduce的个数

Map任务的个数

读取数据产生多少个Mapper?? 
Mapper数据过大的话,会产生大量的小文件,过多的Mapper创建和初始化都会消耗大量的硬件资源 
Mapper数太小,并发度过小,Job执行时间过长,无法充分利用分布式硬件资源

Mapper数量由什么决定?? 
(1)输入文件数目(2)输入文件的大小(3)配置参数 这三个因素决定的。 
输入的目录中文件的数量决定多少个map会被运行起来,应用针对每一个分片运行一个map,一般而言,对于每一个输入的文件会有一个map split。如果输入文件太大,超过了hdfs块的大小(128M)那么对于同一个输入文件我们会有多余2个的map运行起来。

涉及参数:
mapreduce.input.fileinputformat.split.minsize //启动map最小的split size大小,默认0
mapreduce.input.fileinputformat.split.maxsize //启动map最大的split size大小,默认256M
dfs.block.size//block块大小,默认128M
计算公式:splitSize =  Math.max(minSize, Math.min(maxSize, blockSize))下面是FileInputFormat class 的getSplits()的伪代码: num_splits = 0for each input file f:remaining = f.lengthwhile remaining / split_size > split_slope:num_splits += 1remaining -= split_sizewhere:split_slope = 1.1 分割斜率split_size =~ dfs.blocksize 分割大小约等于hdfs块大小会有一个比例进行运算来进行切片,为了减少资源的浪费
例如一个文件大小为260M,在进行MapReduce运算时,会首先使用260M/128M,得出的结果和1.1进行比较
大于则切分出一个128M作为一个分片,剩余132M,再次除以128,得到结果为1.03,小于1.1
则将132作为一个切片,即最终260M被切分为两个切片进行处理,而非3个切片。  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

reduce任务的个数

Reduce任务是一个数据聚合的步骤,数量默认为1。而使用过多的Reduce任务则意味着复杂的shuffle,并使输出文件的数量激增。

一个job的ReduceTasks数量是通过mapreduce.job.reduces参数设置
也可以通过编程的方式,调用Job对象的setNumReduceTasks()方法来设置
一个节点Reduce任务数量上限由mapreduce.tasktracker.reduce.tasks.maximum设置(默认2)。可以采用以下探试法来决定Reduce任务的合理数量:
1.每个reducer都可以在Map任务完成后立即执行:0.95 * (节点数量 * mapreduce.tasktracker.reduce.tasks.maximum)
2.较快的节点在完成第一个Reduce任务后,马上执行第二个:1.75 * (节点数量 * mapreduce.tasktracker.reduce.tasks.maximum)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

4、小文件合并 
Mapper是基于文件自动产生的,如何自己控制Mapper的个数?需要通过参数的控制来调节Mapper的个数。减少Mapper的个数就要合并小文件,这种小文件有可能是直接来自于数据源的小文件,也可能是Reduce产生的小文件。

设置合并器:(set都是在hive脚本,也可以配置Hadoop)设置合并器本身:set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;set hive.merge.mapFiles=true;set hive.merge.mapredFiles=true;set hive.merge.size.per.task=256000000;//每个Mapper要处理的数据,就把上面的5M10M……合并成为一个一般还要配合一个参数:set mapred.max.split.size=256000000 // mapred切分的大小set mapred.min.split.size.per.node=128000000//低于128M就算小文件,数据在一个节点会合并,在多个不同的节点会把数据抓过来进行合并。Hadoop中的参数:可以通过控制文件的数量控制mapper数量mapreduce.input.fileinputformat.split.minsize(default:0),小于这个值会合并mapreduce.input.fileinputformat.split.maxsize 大于这个值会切分
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

5、Map阶段 
Map阶段是由一定数量的 Map Task组成。这些Map Task可以同时运行,每个Map Task又是由以下三个部分组成。 
   
1. InputFormat输入数据格式解析组件: 
因为不同的数据可能存储的数据格式不一样,这就需要有一个InputFormat组件来解析这些数据的存放格式,默认情况下,它提供了一个TextInputFormat文本文件输入格式来解释数据格式。 
它会将文件的每一行解释成(key,value),key代表每行偏移量,value代表每行数据内容,通常情况我们不需要自定义InputFormat,因为MapReduce提供了多种支持不同数据格式InputFormat的实现 
     
2. Mapper输入数据处理:这个Mapper是必须要实现的,因为根据不同的业务对数据有不同的处理 
   
3. Partitioner数据分组: 
Mapper数据处理之后输出之前,输出key会经过Partitioner分组或者分桶选择不同的reduce,默认的情况下Partitioner会对map输出的key进行hash取模。 
比如有6个ReduceTask,它就是模6,如果key的hash值为0,就选择第0个ReduceTask(为1,选Reduce Task1)。这样不同的map对相同单词key,它的hash值取模是一样的,所以会交给同一个reduce来处理。

这里写图片描述

6、Reduce阶段

  1. 数据运程拷贝 
    Reduce Task要远程拷贝每个map处理的结果,从每个map中读取一部分结果,每个Reduce Task拷贝哪些数据,是由上面Partitioner决定的。
  2. 数据按照key排序 
    Reduce Task读取完数据后,要按照key进行排序,相同的key被分到一组,交给同一个Reduce Task处理
  3. Reducer数据处理 
    以WordCount为例,相同的单词key分到一组,交个同一个Reducer处理,这样就实现了对每个单词的词频统计。
  4. OutputFormat数据输出格式 
    Reducer统计的结果将按照OutputFormat格式输出(默认情况下的输出格式为TextOutputFormat)

这里写图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/423684.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

WebService Software Factory 设计草图

以下是根据WSSF设计理念,按现公司的业务需求设计的接口项目拟稿,实现细分。仅供参考。(WSSF---ASMX版) 建议用1680*1050分辩浏览 转载于:https://www.cnblogs.com/RuiLei/archive/2008/10/05/1304160.html

Java面试题 20在面向对象编程里,经常使用is-a来说明对象之间的继承关系

Java面试题 20在面向对象编程里,经常使用is-a来说明对象之间的继承关系,下列对象中不具备继承关系的是?() A:手机与小米手机 B:企业家与雷军 C:编程语言与Java D:中国与北京 类之间存在以下…

MapReduce计数器

原文链接:http://itfish.net/article/61067.html 1、MapReduce计数器是什么?计数器是用来记录job的执行进度和状态的。它的作用可以理解为日志。我们可以在程序的某个位置插入计数器,记录数据或者进度的变化情况。 2、MapReduce计数器能做什么…

编写“线围棋”程序-2-可开局

棋盘有了,怎么支持在上面落子呢? 只要解决下面3个问题就可以了: 1.响应鼠标点击事件,获得“下棋子”的动作源。 2.修改和记录棋局状态。 3.在棋盘上显示棋局的状态。 为此,直接增加一个“棋局类“,也就是对…

Java面试题 21 下列说法正确的有()

下列说法正确的有() A 能被java.exe成功运行的java class文件必须有main()方法 B J2SDK就是Java API C:Appletviewer.exe可利用jar选项运行.jar文件 D能被Appletviewer成功运行的java class文件必须有main()方法 蒙蔽树上蒙蔽果,蒙蔽树下…

[翻译]SQL Server 未公开的两个存储过程sp_MSforeachtable 和 sp_MSforeachdb

SQL Server 未公开的两个存储过程sp_MSforeachtable 和 sp_MSforeachdb 您是否曾经写过代码来处理数据库中的所有表?处理一个 SQL Server实例中的所有数据库的代码又该如何写?然则,您是否知道有多种方法可以解决这问题?您可以创建…

Java面试题 22 牛客 Java是一门支持反射的语言,基于反射为Java提供了丰富的动态性支持

Java面试题 22 牛客 Java是一门支持反射的语言,基于反射为Java提供了丰富的动态性支持,下面关于Java反射的描述,哪些是错误的:( ) A Java反射主要涉及的类如Class, Method, Filed,等,他们都在java.lang.reflet包下 B 通…

java面试题24 关于Java中的数组,

java面试题24 关于Java中的数组,下面的一些描述,哪些描述是准确的:( ) A 数组是一个对象,不同类型的数组具有不同的类 B 数组长度是可以动态调整的 C 数组是一个连续的存储结构 D:一个固定长度的…

[开发技巧3]不显示报表直接打印

水晶报表9.2VB6 使用Application可以进行打印 在将数据赋给报表模板后,调用PrintOut方法 赋给报表数据objCRReport.Database.SetDataSource rst 此句打印,会出现打印提示框objCRReport.PrintOut 不提示,直接打印到默认打印机CallobjCRReport.…

SQL开发中容易忽视的一些小地方( 三)

目的:这篇文章我想说说我在工作中关于in和union all 的用法. 索引定义 : 微软的SQL SERVER提供了两种索引:聚集索引(clustered index,也称聚类索引、簇集索引)和非聚集索引(nonclustered index,也称非聚类索引、非簇集…

java面试题26 java语言的下面几种数组复制方法中,哪个效率最高?

java面试题26 java语言的下面几种数组复制方法中,哪个效率最高? A for 循环逐一复制 B System.arraycopy C Array.copyOf D 使用clone方法 效率:System.arraycopy > clone > Arrays.copyOf > for循环 1、System.arraycopy的用法…

pycharm使用笔记2-远程连接(转)

原文地址:https://blog.csdn.net/jinxiaonian11/article/details/70208920 随着科技的发展,远程办公已经是一种趋势,远程开发能力对于每一个程序员来说都是必不可少的。有时候就算在公司,在进行开发的时候有许多的数据都是储存在服务器上的&a…

java面试题27 java中下面哪些是Object类的方法()

java面试题27 java中下面哪些是Object类的方法() A notify() B notifyAll() C sleep() D wait() 蒙蔽树上蒙蔽果,蒙蔽树下你和我。遇到这种题,我默默的打开了编译工具 Object类中方法: protected Object clone()…

shiro学习(1):shiro简介

Apache Shiro是Java的一个安全框架。对比另一个安全框架Spring Sercurity,它更简单和灵活。 Shiro可以帮助我们完成:认证、授权、加密、会话管理、Web集成、缓存等。 Apache Shiro特性 Authentication:身份认证/登录,验证用户是…

微软启动了自爆程序,让我们一起帮它倒计时

……“公元2008年10月20日,注定成为人类信息技术史上不平凡的一天,因为在这一天,曾经创造了无数辉煌的计算机软件帝国微软公司,启动了自爆程序,剩下的,就是倒计时了……” ——《地球人类信息技术编年史》 …

python字典遍历的几种方法(转)

源地址:https://www.cnblogs.com/stuqx/p/7291948.html(1)遍历key值>>> a {a: 1, b: 2, c: 3} >>> for key in a:print(key:a[key])a:1 b:2 c:3 >>> for key in a.keys():print(key:a[key])a:1 b:2 c:3在使用上&a…

shiro学习(2):第一个shiro程序

工具idea 首先创建maven项目 配置文件 <?xml version"1.0" encoding"UTF-8"?><project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http…

shiro学习(3):用户权限

工具idea 首先创建maven项目 配置文件 <?xml version"1.0" encoding"UTF-8"?><project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http…

WPF学习笔记(三)

1.1 事件概括 第一节中我们给窗体添加了一个按钮&#xff0c;不过好像Button点个几下也只有些发光样式的变化&#xff0c;什么你还把系统皮肤去掉了&#xff1f;算了承认下确实够寒碜&#xff0c;那让我们再动动手。 1.1.1 路由事件简述 public HelloWorld() { Button button …

shiro学习(4):shiro认证流程

Shiro登录校验流程实现与分析 什么是Shiro Apache Shiro是一个强大且易用的Java安全框架,执行身份验证、授权、密码和会话管理。使用Shiro的易于理解的API,您可以快速、轻松地获得任何应用程序,从最小的移动应用程序到最大的网络和企业应用程序。 三个核心组件 Subject, Se…