文章目录
- MapReduce简介
- MapTask
- ReduceTask
- Mapper阶段解读
- Reducer阶段解读
- MapReduce适用的问题
- MapReduce的特点
- MapReduce基本思想
- 大数据处理思想:分而治之
- 构建抽象模型:Map 函数和 Reduce 函数
- 上升到架构:并行自动化并隐藏底层细节
- MapReduce计算架构提供的主要功能
- MapReduce框架中的名词解释
- MapReduce与YARN
- MapReduce的原理
- MapReduce进程
- 常用数据序列化类型
- MapReduce实际处理流程
- FileInputFormat切片机制
- Mapreduce的shuffle机制
- MapReduce案例(wordcount)
MapReduce简介
MapReduce是一种可用于数据处理的编程框架。MapReduce采用"分而治之"的思想,把对大规模数据集的操作,分发给一个主节点管理下的各个分节点共同完成,然后通过整合各个节点的中间结果,得到最终结果。简单地说,MapReduce就是"任务的分解与结果的汇总"。
在分布式计算中,MapReduce框架负责处理了并行编程中分布式存储、工作调度、负载均衡、容错均衡、容错处理以及网络通信等复杂问题,把处理过程高度抽象为两个函数:map和reduce,map负责把任务分解成多个任务,reduce负责把分解后多任务处理的结果汇总起来。
Map/Reduce是一个用于大规模数据处理的分布式计算编程模型。
MapReduce程序的工作分两个阶段进行:
Map阶段(映射)
这个函数单独地应用在每个单元格上的操作就属于映射(Map)。
由一个或者多个MapTask组成。每个MapTask处理输入数据集合中的一片数据(InputSplit),并将产生的若干个数据片段(一个数据文件)写到本地磁盘上。
Reduce阶段
由一个或者多个ReduceTask组成。ReduceTask则从每个MapTask上远程拷贝相应的数据片段,经分组聚集和归约后,将结果写到HDFS上作为最终结果。
使用需要定义map函数和reduce函数
map函数用来处理原始数据(初始键值对)以生成一批中间的key/value对
reduce函数将 所有这些中间的有着相同key的values合并起来。
输入到每一个阶段均是键 - 值对。
MapTask
执行过程概述:
首先,通过用户提供的InputFormat将对应的InputSplit解析成一系列key/value,并依次交给用户编写的map()函数处理,接着按照指定的Partition对数据分片,以确定每个key/value将交给哪个ReduceTask处理,之后将数据交给用户定义的Combiner进行一次本地合并(没有则直接跳过),最后即将处理结果保存到本地磁盘上。
具体步骤:
(1)Read阶段:MapTask通过用户编写的RecordReader,从输入InputSplit中解析出一个个key/value。
(2)Map阶段:该阶段只要是将解析出的key/value交给用户编写的map()函数处理,并产生一系列新的key/value。
(3)Collect阶段:在用户编写的map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,它会将生成的ley/value分片(通过调用Partition),并写入一个环形内存缓冲区中。
(4)Spill阶段:即“溢写”,当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并操作。
(5)Combine阶段:当所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。
ReduceTask
执行过程概述:
ReduceTask的输入数据来自各个MapTask,因此首先通过HTTP请求从各个已经运行完成的MapTask所在TaskTracker机器上拷贝相应的数据分片,待所有数据拷贝完成后,再以key为关键字对所有数据进行排序(sort),通过排序,key相同的记录聚集到一起形成若干分组,然后将分组数据交给用户编写的reduce()函数处理,并将数据结果直接写到HDFS上作为最终输出结果。
具体步骤:
(1)Shuffle阶段:也称为Copy阶段。ReduceTask从各个MapTask所在的TaskTracker上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。
(2)Merge阶段:在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上的文件过多,并且可以为后面整体的归并排序减负,提升排序效率。
(3)Sort阶段:按照MapReduce的语义,用户编写的reduce()函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚集在一起,Hadoop采用了基于排序的策略。由于各个MapTask已经实现了自己的处理结果进行了局部排序,因此,ReduceTask只需要对所有数据进行一次归并排序即可。
(4)Reduce阶段:在该阶段中,ReduceTask将每组数据依次交给用户编写的reduce()函数处理。
(5)Write阶段:reduce()函数将计算结果写到HDFS上。
Mapper阶段解读
Mapper的输入文件位于HDFS上,先对输入数据切分,每一个split分块对应一个Mapper任务,通过RecordReader对象从输入分块中读取并生成键值对,然后执行Map函数,输出的中间键值对被partion()函数区分并写入缓冲区,同时调用sort()进行排序。
Reducer阶段解读
Reducer主要有三个阶段:Shuffle、Sort、Reduce
1 . Shuffle阶段:
Reducer的输入就是Mapper阶段已经排好序的输出。在这个阶段,框架为每个Reducer任务获得所有Mapper输出中与之相关的分块,把Map端的输出结果传送到Reduce端,大量操作是数据复制(因此也称数据复制阶段)。
2 . Sort阶段:
框架按照key对Reducer的输入进行分组(Mapper阶段时每一个Map任务对于它本身的输出结果会有一个排序分组,而不同Map任务的输出中可能会有相同的key,因此要再一次分组)。Shuffle和Sort是同时进行的,Map的输出也是一边被取回一边被合并。排序是基于内存和磁盘的混合模式进行,经过多次Merge才能完成排序。(PS:如果两次排序分组规则需要不同,可以指定一个Comparator比较器来控制分组规则)。
3 . Reduce阶段:
通过Shuffle和Sort操作后得到的<key, (list of values)>被送到Reducer的reduce()函数中执行,针对每一个<key, (list of values)>会调用一次reduce()函数。
MapReduce适用的问题
用MapReduce来处理的数据集(或任务)必须具备这样的特点:待处理的数据集可以分解成许多小的数据集,而且每一个小数据集都可以完全并行地进行处理。
MapReduce的特点
1)MapReduce 易于编程 。它简单的实现一些接口,就可以完成一个分布式程序
2)良好的 扩展性 。当你的计算资源不能得到满足的时候,你可以通过简单的增加机器来扩展它的计算能力。
3)高容错性 。比如其中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上面上运行,不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由Hadoop 内部完成的。
4)适合 PB 级以上海量数据的离线处理 。比如像毫秒级别的返回一个结果,MapReduce 很难做到。MapReduce 虽然具有很多的优势,但是它也有不擅长的地方。这里的不擅长不代表它不能做,而是在有些场景下实现的效果差,并不适合 MapReduce 来处理,主要表现在以下几个方面。
1.实时计算。
2.流式计算。流式计算的输入数据时动态的,而 MapReduce 的输入数据集是静态的,不能动态变化。这是因为 MapReduce 自身的设计特点决定了数据源必须是静态的。
3.DAG(有向图)计算。多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,MapReduce 并不是不能做,而是使用后,每个MapReduce 作业的输出结果都会写入到磁盘,会造成大量的磁盘IO,导致性能非常的低下。
MapReduce基本思想
大数据处理思想:分而治之
并行计算的第一个重要问题是如何划分计算任务或者计算数据以便对划分的子任务或数据块同时进行计算。但是,一些计算问题的前后数据项之间存在很强的依赖关系,无法进行划分,只能串行计算。
对于不可拆分的计算任务或相互间有依赖关系的数据无法进行并行计算。一个大数据若可以分为具有同样计算过程的数据块,并且这些数据块之间不存在数据依赖关系,则提高处理速度的最好办法就是并行计算。
构建抽象模型:Map 函数和 Reduce 函数
Map 函数和 Reduce 函数都是以 <key,value>作为输入的,按一定的映射规则转换成另一个或一批 <key,value> 进行输出。
1) Map:<k1,v1>List(<K2,V2>)
输入:键值对<k1,v1>表示的数据。
处理:数据记录将以“键值对”形式传入 Map 函数;Map 函数将处理这些键值对,并以另一种键值对形式输出中间结果 List(<K2,V2>)。
输出:键值对List(<K2,V2>)示的一组中间数据。
2) Reduce:<K2,List(V2)>→List(<K3,V3>)
输入:由 Map 输出的一组键值对 List(<K2,V2>)将被进行合并处理,同样主键下的不同数值会合并到一个列表List(V2)中,故 Reduce 的输入为<K2,List(V2)>。
处理:对传入的中间结果列表数据进行某种整理或进一步的处理,并产生最终的输出结果List(<K3,V3>)。
输出:最终输出结果List(<K3,V3>)。
基于 MapReduce 的并行计算模型如图 3 所示。各个 Map 函数对所划分的数据并行处理,从不同的输入数据产生不同的中间结果。
各个 Reduce 函数也各自并行计算,负责处理不同的中间结果。进行 Reduce 函数处理之前,必须等到所有的 Map 函数完成。
因此,在进入 Reduce 函数前需要有一个同步屏障;这个阶段也负责对 Map 函数的中间结果数据进行收集整理处理,以便 Reduce 函数能更有效地计算最终结果,最终汇总所有 Reduce 函数的输出结果即可获得最终结果。
基于MapReduce的并行计算模型
Map 函数的输入数据来自于 HDFS的文件块,这些文件块的格式是任意类型的,可以是文档,可以是数字,也可以是二进制。文件块是一系列元素组成的集合,这些元素也可以是任意类型的。
Map 函数首先将输入的数据块转换成 <key,Value> 形式的键值对,键和值的类型也是任意的。
Map 函数的作用就是把每一个输入的键值对映射成一个或一批新的键值对。输出键值对里的键与输入键值对里的键可以是不同的。
需要注意的是,Map 函数的输出格式与 Reduce 函数的输入格式并不相同,前者是 List(<K2,V2>) 格式,后者是<K2,List(V2)> 的格式。所以,Map 函数的输出并不能直接作为 Reduce 函数的输入。
MapReduce 框架会把 Map 函数的输出按照键进行归类,把具有相同键的键值对进行合并,合并成 <K2,List(V2)>
的格式,其中,List(V2) 是一批属于同一个 K2 的 value。
Reduce 函数的任务是将输入的一系列具有相同键的值以某种方式组合起来,然后输出处理后的键值对,输出结果一般会合并成一个文件。
为了提高 Reduce 的处理效率,用户也可以指定 Reduce 任务的个数,也就是说,可以有多个 Reduce 并发来完成规约操作。
MapReduce 框架会根据设定的规则把每个键值对输入到相应的 Reduce 任务进行处理。这种情况下,MapReduce将会输出多个文件。
一般情况下,并不需要把这些输出文件进行合并,因为这些文件也许会作为下一个 MapRedue 任务的输入。
上升到架构:并行自动化并隐藏底层细节
MapReduce 提供了一个统一的计算框架,来完成计算任务的划分和调度,数据的分布存储和划分,处理数据与计算任务的同步,结果数据的收集整理,系统通信、负载平衡、计算性能优化、系统结点出错检测和失效恢复处理等。
MapReduce 通过抽象模型和计算框架把需要做什么与具体怎么做分开了,为程序员提供了一个抽象和高层的编程接口和框架,程序员仅需要关心其应用层的具体计算问题,仅需编写少量的处理应用本身计算问题的程序代码。
与具体完成并行计算任务相关的诸多系统层细节被隐藏起来,交给计算框架去处理:从分布代码的执行,到大到数千个,小到单个的结点集群的自动调度使用。
MapReduce计算架构提供的主要功能
1)任务调度
提交的一个计算作业(Job)将被划分为很多个计算任务(Tasks)。
任务调度功能主要负责为这些划分后的计算任务分配和调度计算结点(Map 结点或 Reduce 结点),同时负责监控这些结点的执行状态,以及 Map 结点执行的同步控制,也负责进行一些计算性能优化处理。例如,对最慢的计算任务采用多备份执行,选最快完成者作为结果。
2)数据/程序互定位
为了减少数据通信量,一个基本原则是本地化数据处理,即一个计算结点尽可能处理其本地磁盘上分布存储的数据,这实现了代码向数据的迁移。
当无法进行这种本地化数据处理时,再寻找其他可用结点并将数据从网络上传送给该结点(数据向代码迁移),但将尽可能从数据所在的本地机架上寻找可用结点以减少通信延迟。
3)出错处理
在以低端商用服务器构成的大规模 MapReduce 计算集群中,结点硬件(主机、兹盘、内存等)出错和软件有缺陷是常态。因此,MapReduce 架构需要能检测并隔离出错结点,并调度分配新的结点接管出错结点的计算任务。
4)分布式数据存储与文件管理
海量数据处理需要一个良好的分布数据存储和文件管理系统作为支撑,该系统能够把海量数据分布存储在各个结点的本地磁盘上,但保持整个数据在逻辑上成为一个完整的数据文件。
为了提供数据存储容错机制,该系统还要提供数据块的多备份存储管理能力。
5)Combiner 和 Partitioner
为了减少数据通信开销,中间结果数据进入 Reduce 结点前需要进行合并(Combine)处理,即把具有同样主键的数据合并到一起避免重复传送。
一个 Reduce 结点所处理的数据可能会来自多个 Map 结点,因此,Map 结点输出的中间结果需使用一定的策略进行适当的划分(Partition)处理,保证相关数据发送到同一个 Reduce 结点上。
MapReduce框架中的名词解释
split:
分片是指MapReduce框架将数据源根据一定的规则将源数据分成若干个小数据的过程;其中,一个小数据集,也被称为一个分片。
Map:
Map有两层含义:
其一、是指MapReduce框架中的Map过程,即将一个分片根据用户定义的Map逻辑处理后,经由MapReduce框架处理,形成输出结果,供后续Reduce过程使用;
其二,是指用户定义Java程序实现Mapper类的map接口的用户自定义逻辑,此时通常被称为mapper。
Reduce:
Reduce也有两层含义:
其一,是指MapReduce框架中的Reduce过程,即将Map的结果作为输入,根据用户定义的Reduce逻辑,将结果处理并汇总,输出最后的结果;
其二,是指用户定义Java程序实现Reducer类的reduce接口的用户自定义逻辑,此时通常被称为reducer。
Combine:
Combine是一个可由用户自定的过程,类似于Map和Reduce,MapReduce框架会在Map和Reduce过程中间调用Combine逻辑(会在下面章节中仔细讲解),通常Combine和reduce的用户代码是一样的(也可被称为本地的reduce过程),但是请注意并不是所有用MapReduce框架实现的算法都适合增加Combine过程(比如求平均值)。
Partition:
在MapReduce框架中一个split对应一个map,一个partiton对应一个reduce(无partition指定时,由用户配置项指定,默认为1个)。 reduce的个数决定了输出文件的个数。比如,在需求中,数据是从对每个省汇总而成,要求计算结果按照省来存放,则需要根据源数据中的表明省的字段分区,用户自定义partition类,进行分区。
MapReduce与YARN
YARN概述
Yarn是一个资源调度平台,负责为运算程序提供服务器运算资源,相当于一个分布式的操作系统平台,而mapreduce等运算程序则相当于运行于操作系统之上的应用程序。
YARN中的重要概念
1) yarn并不清楚用户提交的程序的运行机制
2) yarn只提供运算资源的调度(用户程序向yarn申请资源,yarn就负责分配资源)
3) yarn中的主管角色叫ResourceManager
4) yarn中具体提供运算资源的角色叫NodeManager
5) 这样一来,yarn其实就与运行的用户程序完全解耦,就意味着yarn上可以运行各种类型的分布式运算程序(mapreduce只是其中的一种),比如mapreduce、storm程序,spark程序,tez等等 。
6) 所以,spark、storm等运算框架都可以整合在yarn上运行,只要他们各自的框架中有符合yarn规范的资源请求机制即可
7) Yarn就成为一个通用的资源调度平台,从此,企业中以前存在的各种运算集群都可以整合在一个物理集群上,提高资源利用率,方便数据共享
MapReduce的原理
map以(key, value)的形式输入数据并根据编写的map()处理数据,输出为(key,
value)的形式,map的输出经过中间阶段(叫做shuffle)的处理,再以(key,value)的形式传入reduce()内进行处理,最后以(key, value)的形式输出最终结果。
一个MapReduce作业(Job)是客户端要执行的一个工作单元:它包括输入数据,MapReduce程序与配置信息.Hadoop将作业分成若干个任务来执行,它包括两类任务:map任务与reduce任务.这些任务分布在集群的不同节点上,由YARN负责调度.如果一个任务失败,它将在另一个不同的节点上重新调度运行.
Hadoop将MapReduce的输入数据划分成等长的小数据块,称为输入分片(input split)或简称"分片".Hadoop为每个分片创建一个map任务,并由该任务来运行用户自己定义的map函数从而处理分片中的每条记录.
map任务与reduce任务之间存在一个shuffle,这是MapReduce中最为消耗时间的过程,因为它对数据进行了多次处理,其中包括排序,分区,溢写,combiner等过程.combiner就是一个map端的reduce,可以让数据更加紧凑,所以一般都指定为reduce()所在的类(注意,有些任务中不适用combiner).这一切的处理都是为了减少map任务与reduce任务之间的网络传输,毕竟集群中最为稀缺的资源就是网络带宽,应该想尽办法节省。
分布式的运算程序往往需要分成至少2个阶段:
第一个阶段的MapTask并发实例,完全并行运行,互不相干。
第二个阶段的ReduceTask并发实例互不相干,但是他们的数据依赖于上一个阶段的所有MapTask并发实例的输出。
MapReduce编程模型只能包含 一个Map阶段 和 一个Reduce阶段,如果用户的业务逻辑非常复杂,那就只能 多个MapReduce程序,串行运行。
MapReduce进程
一个完整的MapReduce程序在分布式运行时有三类实例进程:
(1)MrAppMaster:负责整个程序的 过程调度 及 状态协调。
(2)MapTask:负责 Map阶段的 整个数据处理流程。
(3)ReduceTask:负责 Reduce阶段的 整个数据处理流程。
常用数据序列化类型
MapReduce实际处理流程
mapreduce 其实是分治算法的一种现,所谓分治算法就是“就是分而治之 ,将大的问题分解为相同类型的子问题(最好具有相同的规模),对子问题进行求解,然后合并成大问题的解。
mapreduce就是分治法的一种,将输入进行分片,然后交给不同的task进行处理,然后合并成最终的解。
mapreduce实际的处理过程可以理解为Input->Map->Sort->Combine->Partition->Reduce->Output。
1)Input阶段
数据以一定的格式传递给Mapper,有TextInputFormat,DBInputFormat,SequenceFileFormat等可以使用,在Job.setInputFormat可以设置,也可以自定义分片函数。
2)map阶段
对输入的(key,value)进行处理,即map(k1,v1)->list(k2,v2),使用Job.setMapperClass进行设置。
3)Sort阶段
对于Mapper的输出进行排序,使用Job.setOutputKeyComparatorClass进行设置,然后定义排序规则。
4)Combine阶段
这个阶段对于Sort之后又相同key的结果进行合并,使用Job.setCombinerClass进行设置,也可以自定义Combine Class类。
5)Partition阶段
将Mapper的中间结果按照key的范围划分为R份(Reduce作业的个数),默认使用HashPartioner(key.hashCode()&Integer.MAX_VALUE%numPartitions),也可以自定义划分的函数。
使用Job.setPartitionClass设置。
6)Reduce阶段
对于Mapper阶段的结果进行进一步处理,Job.setReducerClass进行设置自定义的Reduce类。
7)Output阶段
Reducer输出数据的格式。
FileInputFormat切片机制
1)FileInputFormat切片机制切片定义在InputFormat类中的getSplit()方法
2)FileInputFormat中默认的切片机制:
简单地按照文件的内容长度进行切片
切片大小,默认等于block大小
切片时不考虑数据集整体,而是逐个针对每一个文件单独切片 。比如待处理数据有两个文件:
file1.txt 320M
file2.txt 10M
经过FileInputFormat的切片机制运算后,形成的切片信息如下:
file1.txt.split1-- 0~128
file1.txt.split2-- 128~256
file1.txt.split3-- 256~320
file2.txt.split1-- 0~10M
3)FileInputFormat中切片的大小的参数配置
通过分析源码,在FileInputFormat中,计算切片大小的逻辑:Math.max(minSize, Math.min(maxSize, blockSize)); 切片主要由这几个值来运算决定
minsize:默认值:1
配置参数: mapreduce.input.fileinputformat.split.minsize
maxsize:默认值:Long.MAXValue
配置参数:mapreduce.input.fileinputformat.split.maxsize
blocksize
因此,默认情况下,切片大小=blocksize
maxsize(切片最大值):
参数如果调得比blocksize小,则会让切片变小,而且就等于配置的这个参数的值
minsize (切片最小值):
参数调的比blockSize大,则可以让切片变得比blocksize还大
选择并发数的影响因素:
运算节点的硬件配置
运算任务的类型:CPU密集型还是IO密集型
运算任务的数据量
Mapreduce的shuffle机制
MapReduce计算模型主要由三个阶段构成:Map、Shuffle、Reduce。
(1)Map是映射,负责数据的过滤分类,将原始数据转化为键值对;
(2)Reduce是合并,将具有相同key值的value进行处理后再输出新的键值对作为最终结果;
(3)为了让Reduce可以并行处理Map的结果,必须对Map的输出进行一定的排序与分割,然后再交给对应的Reduce,这个过程就是Shuffle。Shuffle过程包含Map Shuffle和Reduce Shuffle。
1)概述
mapreduce中,map阶段处理的数据如何传递给reduce阶段,是mapreduce框架中最关键的一个流程,这个流程就叫shuffle。
shuffle: 洗牌、发牌——(核心机制:数据分区,排序,缓存)。
具体来说:就是将maptask输出的处理结果数据,分发给reducetask,并在分发的过程中,对数据按key进行了分区和排序。
分区partition(确定哪个数据进入哪个reduce)
Sort根据key排序
Combiner进行局部value的合并
2)详细流程
1、 maptask收集我们的map()方法输出的kv对,放到内存缓冲区中
2、 从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件
3、 多个溢出文件会被合并成大的溢出文件
4、 在溢出过程中,及合并的过程中,都要调用partitoner进行分组和针对key进行排序
5、 reducetask根据自己的分区号,去各个maptask机器上取相应的结果分区数据
6、 reducetask会取到同一个分区的来自不同maptask的结果文件,reducetask会将这些文件再进行合并(归并排序)
7、 合并成大文件后,shuffle的过程也就结束了,后面进入reducetask的逻辑运算过程(从文件中取出一个一个的键值对group,调用用户自定义的reduce()方法)
Shuffle中的缓冲区大小会影响到mapreduce程序的执行效率,原则上说,缓冲区越大,磁盘io的次数越少,执行速度就越快。
缓冲区的大小可以通过参数调整, 参数:io.sort.mb 默认100M
MapReduce案例(wordcount)
wordcount是最简单也是最能体现MapReduce思想的程序之一,可以称为MapReduce版"Hello World",单词计数主要完成功能是:统计一系列文本文件中每个单词出现的次数,即简单如下图所示:
WordcountMapper.java
package wordcount;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*KEYIN, VALUEIN, KEYOUT, VALUEOUT
* 四个泛型解释:
* KEYIN:K1的类型
* VALUEIN:V1的类型
*
* KEYOUT:K2的类型
* VALUEOUT:V2的类型
* */
public class WordCountMapper extends Mapper<LongWritable,Text,Text,LongWritable> {//map方法就是将K1和V1转化为K2和V2/*参数:key: K1行偏移量value:V1 每一行的文本数据context:表示上下文对象*/@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {Text text = new Text();LongWritable longWritable = new LongWritable();//将一行的文本数据进行拆分String[] split = value.toString().split(",");//遍历数组,进行组装K2和v2for (String word:split){//将K2和V2写入上文text.set(word);longWritable.set(1);context.write(text,longWritable);}// 将K2和v2写入上下文中}
}
dataReduce.java
package wordcount;import java.io.IOException;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;/*
* 四个泛型解释:
* KEYIN:K2类型
* VALUEIN:V2类型
* KEYOUT:K3类型
* VALUEOUT:V3类型
*
* */
public class dataReduce extends Reducer<Text, LongWritable,Text,LongWritable> {//把新的K2和V2转为K3和V3 将K3和V3写入上下文中/*参数:key:新K2values:集合 新V2context:表示上下文对象** K2 v2* hello <1,1>*K3 v3hello 2*** */@Overrideprotected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {long count=0;//1.遍历结合,将集合中数字相加,得到v3for (LongWritable longWritable:values){count+=longWritable.get();}//2.将K3和V3写入上下文中context.write(key,new LongWritable(count));}
}
TaskMain.java
package wordcount;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;public class TaskMain extends Configured implements Tool {//该方法用于指定一个Job任务public int run(String[] strings) throws Exception {//创建一个Job任务对象Job job = Job.getInstance(super.getConf(), "wordcount");job.setJarByClass(TaskMain.class);//2.获得job对象(八个步骤)//第一步:指定文件的读取方式和读取路径job.setInputFormatClass(TextInputFormat.class);
// TextInputFormat.addInputPath(job,new Path("file:///D:\\mapreduce_data"));TextInputFormat.addInputPath(job,new Path("hdfs://Master:9000/wordcount"));//第二步:指定map阶段的处理方式和数据类型job.setMapperClass(WordCountMapper.class);//设置map阶段K2的类型job.setMapOutputKeyClass(Text.class);//设置map阶段V2的类型job.setMapOutputValueClass(LongWritable.class);//第三、四、五、六 采用默认方式//第七步:指定Reduc阶段的处理方式和数据类型job.setReducerClass(dataReduce.class);//设置K3的类型 v3的类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(LongWritable.class);//第八步:设置输出类型,设置输出的路径job.setOutputFormatClass(TextOutputFormat.class);
// TextOutputFormat.setOutputPath(job,new Path("file:///D:\\test\\output2"));TextOutputFormat.setOutputPath(job,new Path("hdfs://Master:9000/wordcount_results2"));//等待任务结束boolean flag = job.waitForCompletion(true);return flag?0:1;}public static void main(String[] args) throws Exception {Configuration configuration=new Configuration();int run=ToolRunner.run(configuration,new TaskMain(),args); //启动JOb任务System.exit(run);}
}
提示:
Mapper类的对象 一行一行地读取 原始数据的中内容
每读一行 就调用一次map方法 切割第一行的单词 生成键值对
K是单词 V是单词的数量
然后把所有键值对写入到临时文件
最后对临时文件排序 类似sql的groupby
随后shuffle,随后Reducer读数据 每次读一组
输入拆分:
输入到MapReduce工作被划分成固定大小的块叫做 input splits ,输入折分是由单个映射消费输入块。
映射 - Mapping
这是在 map-reduce 程序执行的第一个阶段。在这个阶段中的每个分割的数据被传递给映射函数来产生输出值。在我们的例子中,映射阶段的任务是计算输入分割出现每个单词的数量(更多详细信息有关输入分割在下面给出)并编制以某一形式列表<单词,出现频率>
重排
这个阶段消耗映射阶段的输出。它的任务是合并映射阶段输出的相关记录。在我们的例子,同样的词汇以及它们各自出现频率。
Reducing
在这一阶段,从重排阶段输出值汇总。这个阶段结合来自重排阶段值,并返回一个输出值。总之,这一阶段汇总了完整的数据集。
在我们的例子中,这个阶段汇总来自重排阶段的值,计算每个单词出现次数的总和。
wordcount项目在MapReduce计算框架下的处理流程:
首先,通过job.waitForCompletion(true)开启了WordCount这个MapReduce作业,后续通过InputFormat的实现类FileInputFormat将输入数据,即输入文件,分片从而得到Map方法,即Map用户定义的方法的输入,即图中所示,FileInputFormat将文件按照行分割,并组织成为的形式,成为用户Map方法的输入,其中Key是字符的偏移量,value即一行的内容。
数据被输入到用户定义的map方法中,map方法以文件中的每行数据作为输入,将每行按照空格分词,并将每个词组织为K-V对,输出;Map的输出交予了MapReduce框架来进行处理,简单来说MapReduce框架将这些K-V对依照key的字典顺序由小到大排列,并对相同的key的value进行合并为数组list,输出给combine过程;
将map方法的输出结果根据Key排序完成之后,如果有combine过程被定义这时候MapReduce框架就会调用Combine过程。Combine过程是由用户指定的,必须的过程,一般Combine过程在逻辑上就是Reduce过程,map的输出结果需要通过网络传递给reduce,其作用是减少Map的输出的结果集的大小,从而降低网络的开销。
用户通过job.setCombinerClass(IntSumReducer.class)指定Combine的实现类;Combine其实就是在Map端先执行一次用户的reduce方法,先在中间进行一次计算,从而将结果集减少;但是需要注意的是,并不是所有的算法都适用进行多次reduce计算,请谨慎选择;
然后,多个map的结果,汇集到reduce,由于WordCount就开启了一个reduce,故只有一个reduce接收所有map端的输出;在输入到用户定义的reduce方法之前,MapReduce框架还会进行一步排序操作,这步操作类似于在map端进行的排序,将相同key的value合并为list,不同的是排序的输入,是来自于多个Map的输出,是根据key排序的K-V对数据;
经过排序后的K-ValueList对,被输入到的Reduce方法,在WordCount的reduce方法中,它对每个key对应的value的list进行求和,从而获得每个单词的总的出现次数。