MapReduce运行机制

相关链接 
MapReduce中Shuffle机制详解——Map端Shuffle链接 
MapReduce中Shuffle机制详解——Reduce端Shuffle链接

MapReduce将作业job的整个运行过程分为两个阶段:Map阶段和Reduce阶段。按照时间顺序包括:输入分片(input split)、map阶段、combiner阶段、shuffle阶段和reduce阶段。系统执行排序、将map输出作为输入传给reducer的过程称为shuffle(shuffle是MapReducer的心脏)

Map阶段由一定数量的 Map Task组成 
1. 输入数据格式解析: InputFormat 
2. 输入数据处理: Mapper 
3. 本地合并: Combiner(local reduce) 
4. 数据分组: Partitioner

Reduce阶段由一定数量的 Reduce Task组成 
1. 数据远程拷贝 
2. 数据按照key排序 
3. 数据处理: Reducer 
4. 数据输出格式: OutputFormat

1、流程简介 
这里写图片描述 
各个map函数对所划分的数据并行处理,从不同的输入数据产生不同的中间结果输出 
各个reduce也各自并行计算,各自负责处理不同的中间结果数据集合进行reduce处理之前,必须等到所有的map函数做完 
在进入reduce前需要有一个同步障(barrier) 
这个阶段也负责对map的中间结果数据进行收集整理(aggregation & shuffle)处理,以便reduce更有效地计算最终结果, 最终汇总所有reduce的输出结果即可获得最终结果

第一步:假设一个文件有三行英文单词作为 MapReduce 的Input(输入)这里经过 Splitting 过程把文件分割为3块。分割后的3块数据就可以并行处理,每一块交给一个 map 线程处理。 
第二步:每个 map 线程中,以每个单词为key,以1作为词频数value,然后输出。 
第三步:每个 map 的输出要经过 shuffling(混洗),将相同的单词key放在一个桶里面,然后交给 reduce 处理。 
第四步:reduce 接受到 shuffling 后的数据, 会将相同的单词进行合并,得到每个单词的词频数,最后将统计好的每个单词的词频数作为输出结果。

上述就是 MapReduce 的大致流程,前两步可以看做 map 阶段,后两步可以看做 reduce 阶段。

2、输入分片与类型 
MapReduce将待处理数据执行逻辑切片(即按照一个特定切片大小,将待处理数据划分成逻辑上的多个split),然后每一个split分配一个mapTask并行实例处理

输入分片(split)与map对应,是每个map处理的唯一单位。每个分片包括多条记录,每个记录都有对应键值对。 
输入切片的接口:InputSplit接口(不需要开发人员直接处理,由InputFormat创建)

输入分片(input split):在进行map计算之前,mapreduce会根据输入文件计算输入分片(input split),每个输入分片(input split)针对一个map任务,输入分片(input split)存储的并非数据本身,而是一个分片长度和一个记录数据的位置的数组,输入分片(input split)往往和hdfs的block(块)关系很密切,存储位置供MapReduce使用以便将map任务尽量放在分片数据附近,而长度用来排序分片,以便优化处理最大的分片,从而最小化作业运行时间。

假如我们设定hdfs的块的大小是64mb,如果我们输入有三个文件,大小分别是3mb、65mb和127mb,那么mapreduce会把3mb文件分为一个输入分片(input split),65mb则是两个输入分片(input split)而127mb也是两个输入分片(input split),换句话说我们如果在map计算前做输入分片调整,例如合并小文件,那么就会有5个map任务将执行,而且每个map执行的数据大小不均,这个也是mapreduce优化计算的一个关键点。

输入类型

FileInputFormat类: 
FileInputFormat 是所有使用文件作为其数据源的 InputFormat 实现的基类。它提供了两个功能:一个定义哪些文件包含在一个作业的输入中;一个为输入文件生成分片的实现。把分片分割成记录的作业由其子类来完成。

TextlnputFormat类: 
TextInputFormat 是默认的 InputFormat。每条记录是一行输入。键是 LongWritable 类型,存储该行在整个文件中的字节偏移量。值是这行的内容,不包括任何行终止符(换行符和回车符),它是 Text 类型的。但是输入分片和HDFS块之间可能不能很好的匹配,出现跨块的情况

KeyValueTextlnputFormat类: 
TextInputFormat 的键,即每一行在文件中的字节偏移量,通常并不是特别有用。通常情况下,文件中的每一行是一个键/值对,使用某个分界符进行分隔,比如制表符。例如 以下由 Hadoop 默认 OutputFormat(即 TextOutputFormat)产生的输出。如果要正确处理这类 文件,KeyValueTextInputFormat 比较合适。可以通过 key.value.separator.in.input.line 属性来指定分隔符。它的默认值是一个制表符。

NLineInputFormat类: 
与TextInputFormat一样,键是文件中行的字节偏移量,值是行本身。主要是希望mapper收到固定行数的输入。

MultipleInputs多种输入: 
MultipleInputs类处理多种格式的输入,允许为每个输入路径指定InputFormat和Mapper。两个mapper的输出类型是一样的,所以reducer看到的是聚集后的map输出,并不知道输入是不同的mapper产生的。 
重载版本:addInputPath(),没有mapper参数,主要支持多种输入格式只有一个mapper。 
这里写图片描述

3、Map与Reduce的个数

Map任务的个数

读取数据产生多少个Mapper?? 
Mapper数据过大的话,会产生大量的小文件,过多的Mapper创建和初始化都会消耗大量的硬件资源 
Mapper数太小,并发度过小,Job执行时间过长,无法充分利用分布式硬件资源

Mapper数量由什么决定?? 
(1)输入文件数目(2)输入文件的大小(3)配置参数 这三个因素决定的。 
输入的目录中文件的数量决定多少个map会被运行起来,应用针对每一个分片运行一个map,一般而言,对于每一个输入的文件会有一个map split。如果输入文件太大,超过了hdfs块的大小(128M)那么对于同一个输入文件我们会有多余2个的map运行起来。

涉及参数:
mapreduce.input.fileinputformat.split.minsize //启动map最小的split size大小,默认0
mapreduce.input.fileinputformat.split.maxsize //启动map最大的split size大小,默认256M
dfs.block.size//block块大小,默认128M
计算公式:splitSize =  Math.max(minSize, Math.min(maxSize, blockSize))下面是FileInputFormat class 的getSplits()的伪代码: num_splits = 0for each input file f:remaining = f.lengthwhile remaining / split_size > split_slope:num_splits += 1remaining -= split_sizewhere:split_slope = 1.1 分割斜率split_size =~ dfs.blocksize 分割大小约等于hdfs块大小会有一个比例进行运算来进行切片,为了减少资源的浪费
例如一个文件大小为260M,在进行MapReduce运算时,会首先使用260M/128M,得出的结果和1.1进行比较
大于则切分出一个128M作为一个分片,剩余132M,再次除以128,得到结果为1.03,小于1.1
则将132作为一个切片,即最终260M被切分为两个切片进行处理,而非3个切片。  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

reduce任务的个数

Reduce任务是一个数据聚合的步骤,数量默认为1。而使用过多的Reduce任务则意味着复杂的shuffle,并使输出文件的数量激增。

一个job的ReduceTasks数量是通过mapreduce.job.reduces参数设置
也可以通过编程的方式,调用Job对象的setNumReduceTasks()方法来设置
一个节点Reduce任务数量上限由mapreduce.tasktracker.reduce.tasks.maximum设置(默认2)。可以采用以下探试法来决定Reduce任务的合理数量:
1.每个reducer都可以在Map任务完成后立即执行:0.95 * (节点数量 * mapreduce.tasktracker.reduce.tasks.maximum)
2.较快的节点在完成第一个Reduce任务后,马上执行第二个:1.75 * (节点数量 * mapreduce.tasktracker.reduce.tasks.maximum)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

4、小文件合并 
Mapper是基于文件自动产生的,如何自己控制Mapper的个数?需要通过参数的控制来调节Mapper的个数。减少Mapper的个数就要合并小文件,这种小文件有可能是直接来自于数据源的小文件,也可能是Reduce产生的小文件。

设置合并器:(set都是在hive脚本,也可以配置Hadoop)设置合并器本身:set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;set hive.merge.mapFiles=true;set hive.merge.mapredFiles=true;set hive.merge.size.per.task=256000000;//每个Mapper要处理的数据,就把上面的5M10M……合并成为一个一般还要配合一个参数:set mapred.max.split.size=256000000 // mapred切分的大小set mapred.min.split.size.per.node=128000000//低于128M就算小文件,数据在一个节点会合并,在多个不同的节点会把数据抓过来进行合并。Hadoop中的参数:可以通过控制文件的数量控制mapper数量mapreduce.input.fileinputformat.split.minsize(default:0),小于这个值会合并mapreduce.input.fileinputformat.split.maxsize 大于这个值会切分
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

5、Map阶段 
Map阶段是由一定数量的 Map Task组成。这些Map Task可以同时运行,每个Map Task又是由以下三个部分组成。 
   
1. InputFormat输入数据格式解析组件: 
因为不同的数据可能存储的数据格式不一样,这就需要有一个InputFormat组件来解析这些数据的存放格式,默认情况下,它提供了一个TextInputFormat文本文件输入格式来解释数据格式。 
它会将文件的每一行解释成(key,value),key代表每行偏移量,value代表每行数据内容,通常情况我们不需要自定义InputFormat,因为MapReduce提供了多种支持不同数据格式InputFormat的实现 
     
2. Mapper输入数据处理:这个Mapper是必须要实现的,因为根据不同的业务对数据有不同的处理 
   
3. Partitioner数据分组: 
Mapper数据处理之后输出之前,输出key会经过Partitioner分组或者分桶选择不同的reduce,默认的情况下Partitioner会对map输出的key进行hash取模。 
比如有6个ReduceTask,它就是模6,如果key的hash值为0,就选择第0个ReduceTask(为1,选Reduce Task1)。这样不同的map对相同单词key,它的hash值取模是一样的,所以会交给同一个reduce来处理。

这里写图片描述

6、Reduce阶段

  1. 数据运程拷贝 
    Reduce Task要远程拷贝每个map处理的结果,从每个map中读取一部分结果,每个Reduce Task拷贝哪些数据,是由上面Partitioner决定的。
  2. 数据按照key排序 
    Reduce Task读取完数据后,要按照key进行排序,相同的key被分到一组,交给同一个Reduce Task处理
  3. Reducer数据处理 
    以WordCount为例,相同的单词key分到一组,交个同一个Reducer处理,这样就实现了对每个单词的词频统计。
  4. OutputFormat数据输出格式 
    Reducer统计的结果将按照OutputFormat格式输出(默认情况下的输出格式为TextOutputFormat)

这里写图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/423684.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

sql优化【转】

转:http://blog.chinaunix.net/uid-540802-id-3419311.html explain显示了mysql如何使用索引来处理select语句以及连接表。可以帮助选择更好的索引和写出更优化的查询语句。 使用方法,在select语句前加上explain就可以了: 如: exp…

WebService Software Factory 设计草图

以下是根据WSSF设计理念,按现公司的业务需求设计的接口项目拟稿,实现细分。仅供参考。(WSSF---ASMX版) 建议用1680*1050分辩浏览 转载于:https://www.cnblogs.com/RuiLei/archive/2008/10/05/1304160.html

Java面试题 20在面向对象编程里,经常使用is-a来说明对象之间的继承关系

Java面试题 20在面向对象编程里,经常使用is-a来说明对象之间的继承关系,下列对象中不具备继承关系的是?() A:手机与小米手机 B:企业家与雷军 C:编程语言与Java D:中国与北京 类之间存在以下…

MapReduce计数器

原文链接:http://itfish.net/article/61067.html 1、MapReduce计数器是什么?计数器是用来记录job的执行进度和状态的。它的作用可以理解为日志。我们可以在程序的某个位置插入计数器,记录数据或者进度的变化情况。 2、MapReduce计数器能做什么…

编写“线围棋”程序-2-可开局

棋盘有了,怎么支持在上面落子呢? 只要解决下面3个问题就可以了: 1.响应鼠标点击事件,获得“下棋子”的动作源。 2.修改和记录棋局状态。 3.在棋盘上显示棋局的状态。 为此,直接增加一个“棋局类“,也就是对…

matlab绘制三维图形

原文地址:matlab绘制三维图形 作者:hotinko1 三维曲线 plot3函数与plot函数用法十分相似,其调用格式为: plot3(x1,y1,z1,选项1,x2,y2,z2,选项2,…,xn,yn,zn,选项n) 其中每一组x,y,z组成一组曲线的坐标参数,选项的定义和…

Java面试题 21 下列说法正确的有()

下列说法正确的有() A 能被java.exe成功运行的java class文件必须有main()方法 B J2SDK就是Java API C:Appletviewer.exe可利用jar选项运行.jar文件 D能被Appletviewer成功运行的java class文件必须有main()方法 蒙蔽树上蒙蔽果,蒙蔽树下…

not enough arguments for format string

如果想要在格式化中显示百分号%, 需要写成%%,因此正确的写法是:item_add (%s test%%) % i

IIS 崩溃的后的感受

我的IIS 崩溃后,从控制面板中 重新安装也没有办法,系统在安装时仍然不能正确安装在控制面板中卸载掉,也是停在那里,无法删除。郁闷啊。找到下面的脚本,算是帮了自己的大忙。IIS.txt [Components] iis_commonon iis_ine…

[翻译]SQL Server 未公开的两个存储过程sp_MSforeachtable 和 sp_MSforeachdb

SQL Server 未公开的两个存储过程sp_MSforeachtable 和 sp_MSforeachdb 您是否曾经写过代码来处理数据库中的所有表?处理一个 SQL Server实例中的所有数据库的代码又该如何写?然则,您是否知道有多种方法可以解决这问题?您可以创建…

Java面试题 22 牛客 Java是一门支持反射的语言,基于反射为Java提供了丰富的动态性支持

Java面试题 22 牛客 Java是一门支持反射的语言,基于反射为Java提供了丰富的动态性支持,下面关于Java反射的描述,哪些是错误的:( ) A Java反射主要涉及的类如Class, Method, Filed,等,他们都在java.lang.reflet包下 B 通…

sqlserver关键字

ROWCOUNT Transact-SQL 语句可以通过下列方式设置 ROWCOUNT 的值: 将 ROWCOUNT 设置为受影响或被读取的行的数目。可以将行发送到客户端,也可以不发送。 保留前一个语句执行中的 ROWCOUNT。将 ROWCOUNT 重置为 0 但不将该值返回到客户端。 执行简单分配的…

linux 执行sh 文件是遇到找不到cd '目录'

在linux中将多个php命令写到同一个sh文件中执行 #!/bin/bash/ cd /www/sf/ /usr/local/bin/php xxx.php /usr/local/bin/php xxx1.php /usr/local/bin/php xxx2.php 然后执行命令 sh xxx.sh 有的时候会出现 找不到/www/sf/目录 为什么呢,找了下资料,是…

Asp.Net Session 丢失的奇怪问题,求救!

Asp.Net Session 丢失的奇怪问题,求救我遇到一个很奇怪的 asp.net 问题,我有三个页面:login.aspx :实现输入帐号密码,将帐号(yh :用户)及权限 ( js_id :角色ID) 保存为 s…

java面试题23 牛客ArrayLists和LinkedList的区别,下述说法正确的有?

java面试题23 牛客ArrayLists和LinkedList的区别,下述说法正确的有? A ArrayList是实现了基于动态数组的数据结构,LinkedList基于链表的数据结构。 B 对于随机访问get和set,ArrayList觉得优于LinkedList,因为LinkedL…

制作特殊字的脚本

<html> <head> <title>特殊文字的制作</title> <meta http-equiv"Content-Type" content"text/html; charsetgb2312"> </head> <body text#00ff00 bgColorblack οnlοade(d.q)> <center>特殊文字的制作…

cursor.execute(sql) 执行结果集是有记录的 但是num=cursor.rownumber 返回值为0

开始cursor.execute(.join(str(sql).strip())) #count cursor.rowcount; numcursor.rownumber修改后&#xff1a;cursor.execute(.join(str(sql).strip())) cursor.fetchall(); #count cursor.rowcount; numcursor.rownumber print(---------------) print (cursor.rownumber) …

Java Web 应用概述

1、java Web 应用是建立在java语言基础上的企业web应用系统&#xff0c;oracle公司根据行业发展和便于开发制定了一套规范&#xff1a;Java EE规范&#xff0c;截至到当前&#xff08;2016.3.11&#xff09;是java EE7规范&#xff0c;其中包括大家常见的Java Servlet 、JavaSe…

java面试题24 关于Java中的数组,

java面试题24 关于Java中的数组&#xff0c;下面的一些描述&#xff0c;哪些描述是准确的&#xff1a;&#xff08; &#xff09; A 数组是一个对象&#xff0c;不同类型的数组具有不同的类 B 数组长度是可以动态调整的 C 数组是一个连续的存储结构 D:一个固定长度的…

[开发技巧3]不显示报表直接打印

水晶报表9.2VB6 使用Application可以进行打印 在将数据赋给报表模板后&#xff0c;调用PrintOut方法 赋给报表数据objCRReport.Database.SetDataSource rst 此句打印&#xff0c;会出现打印提示框objCRReport.PrintOut 不提示&#xff0c;直接打印到默认打印机CallobjCRReport.…