- 目录
- 前言:
- 1、MapReduce原理
- 2、mapreduce实践(WordCount实例)
目录
今天先总体说下MapReduce的相关知识,后续将会详细说明对应的shuffle、mr与yarn的联系、以及mr的join操作的等知识。以下内容全是个人学习后的见解,如有遗漏或不足请大家多多指教。
前言:
为什么要MAPREDUCE
(1)海量数据在单机上处理因为硬件资源限制,无法胜任
(2)而一旦将单机版程序扩展到集群来分布式运行,将极大增加程序的复杂度和开发难度
(3)引入mapreduce框架后,开发人员可以将绝大部分工作集中在业务逻辑的开发上,而将分布式计算中的复杂性交由框架来处理。
设想一个海量数据场景下的wordcount需求:
单机版:内存受限,磁盘受限,运算能力受限分布式:
1、文件分布式存储(HDFS)
2、运算逻辑需要至少分成2个阶段(一个阶段独立并发,一个阶段汇聚)
3、运算程序如何分发
4、程序如何分配运算任务(切片)
5、两阶段的程序如何启动?如何协调?
6、整个程序运行过程中的监控?容错?重试?
可见在程序由单机版扩成分布式时,会引入大量的复杂工作。为了提高开发效率,可以将分布式程序中的公共功能封装成框架,让开发人员可以将精力集中于业务逻辑。
而mapreduce就是这样一个分布式程序的通用框架,其应对以上问题的整体结构如下:
1、MRAppMaster(mapreduce application master)
2、MapTask
3、ReduceTask
1、MapReduce原理
Mapreduce是一个分布式运算程序的编程框架,是用户开发“基于hadoop的数据分析应用”的核心框架;
Mapreduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个hadoop集群上;
Mapreduce框架结构及核心运行机制
1.1、结构
一个完整的mapreduce程序在分布式运行时有三类实例进程 :
1、MRAppMaster:负责整个程序的过程调度及状态协调
2、mapTask:负责map阶段的整个数据处理流程
3、ReduceTask:负责reduce阶段的整个数据处理流程
1.2、mapreduce框架的设计思想
这里面有两个任务的分配过程:1、总的任务切割分配给各个mapTask,不同的mapTask再将得到的hashmap按照首字母划分,分配给各个reduceTask。
1.3、mapreduce程序运行的整体流程(wordcount运行过程的解析)
流程解析
(job.split:负责任务的切分,形成一个任务切片规划文件。
wc.jar:要运行的jar包,包含mapper、reducer、Driver等java类。
job.xml:job的其他配置信息:如指定map是哪个类,reduce是那个类,以及输入数据的路径在哪,输出数据的路径在哪等配置信息。)
前提:客户端提交任务给yarn后(提交前会进行任务的规划),yarn利用ResouceManager去找到mrAppmaster.
1、 一个mr程序启动的时候,最先启动的是MRAppMaster,MRAppMaster启动后根据本次job的描述信息,计算出需要的maptask实例数量,然后向集群申请机器启动相应数量的maptask进程
2、 maptask进程启动之后,根据给定的数据切片范围进行数据处理,主体流程为:
a) 利用客户指定的inputformat来获取RecordReader读取数据,形成输入KV对(框架干的事)
b) 将输入KV对传递给客户定义的map()方法,做逻辑运算,并将map()方法输出的KV对收集到缓存
c) 将缓存中的KV对按照K分区排序后不断溢写到磁盘文件
3、 MRAppMaster监控到所有maptask进程任务完成之后,会根据客户指定的参数启动相应数量的reducetask进程,并告知reducetask进程要处理的数据范围(数据分区)
4、 Reducetask进程启动之后,根据MRAppMaster告知的待处理数据所在位置,从若干台maptask运行所在机器上获取到若干个maptask输出结果文件,并在本地进行重新归并排序,然后按照相同key的KV为一个组,调用客户定义的reduce()方法进行逻辑运算,并收集运算输出的结果KV,然后调用客户指定的outputformat将结果数据输出到外部存储(对应的就是context.write方法)
2、mapreduce实践(WordCount实例)
编程规范:
(1)用户编写的程序分成三个部分:Mapper,Reducer,Driver(提交运行mr程序的客户端)
(2)Mapper的输入数据是KV对的形式(KV的类型可自定义)
(3)Mapper的输出数据是KV对的形式(KV的类型可自定义)
(4)Mapper中的业务逻辑写在map()方法中
(5)map()方法(maptask进程)对每一个<K,V>调用一次
(6)Reducer的输入数据类型对应Mapper的输出数据类型,也是KV
(7)Reducer的业务逻辑写在reduce()方法中
(8)Reducetask进程对每一组相同k的<k,v>组调用一次reduce()方法
(9)用户自定义的Mapper和Reducer都要继承各自的父类
(10)整个程序需要一个Drvier来进行提交,提交的是一个描述了各种必要信息的job对象
WordCount程序
mapper类
package bigdata.mr.wcdemo;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
//map方法的生命周期: 框架每传一行数据就被调用一次* KEYIN: 默认情况下,是mr框架所读到的一行文本的起始偏移量,Long,* 但是在hadoop中有自己的更精简的序列化接口,所以不直接用Long,而用LongWritable* * VALUEIN:默认情况下,是mr框架所读到的一行文本的内容,String,同上,用Text* * KEYOUT:是用户自定义逻辑处理完成之后输出数据中的key,在此处是单词,String,同上,用Text* VALUEOUT:是用户自定义逻辑处理完成之后输出数据中的value,在此处是单词次数,Integer,同上,用IntWritable*/
public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{/*** map阶段的业务逻辑就写在自定义的map()方法中* maptask会对每一行输入数据调用一次我们自定义的map()方法*/@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //将maptask传给我们的文本内容先转换成StringString line = value.toString();//根据空格将这一行切分成单词String[] words = line.split(" "); //将单词输出为<单词,1>for(String word:words){//将单词作为key,将次数1作为value,以便于后续的数据分发,可以根据单词分发,以便于相同单词会到相同的reduce taskcontext.write(new Text(word), new IntWritable(1));}}
}
reducer类
package mr_test;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
//生命周期:框架每传递进来一个k相同的value 组,reduce方法就被调用一次* KEYIN, VALUEIN 对应 mapper输出的KEYOUT,VALUEOUT类型对应* KEYOUT, VALUEOUT 是自定义reduce逻辑处理结果的输出数据类型* KEYOUT是单词* VLAUEOUT是总次数*/
public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { /*** <angelababy,1><angelababy,1><angelababy,1><angelababy,1><angelababy,1>* <hello,1><hello,1><hello,1><hello,1><hello,1><hello,1>* <banana,1><banana,1><banana,1><banana,1><banana,1><banana,1>* 入参key,是一组相同单词kv对的key*/@Overrideprotected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { int count=0;for(IntWritable value:values){count+=value.get(); }context.write(key, new IntWritable(count));}
}
Driver类 用来描述job并提交job
package mr_test;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/*** 相当于一个yarn集群的客户端* 需要在此封装我们的mr程序的相关运行参数,指定jar包* 最后提交给yarn*/
public class WordcountDriver {public static void main(String[] args) throws IOException, Exception, InterruptedException {Configuration cf = new Configuration();
// 把这个程序打包成一个Job来运行Job job = Job.getInstance(); //指定本程序的jar包所在的本地路径job.setJarByClass(WordcountDriver.class); //指定本业务job要使用的mapper/Reducer业务类job.setMapperClass(WorldcountMapper.class);job.setReducerClass(WordcountReducer.class); //指定mapper输出数据的kv类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class); //指定最终输出的数据的kv类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class); //指定job的输入原始文件所在目录FileInputFormat.setInputPaths(job, new Path(args[0])); //指定job的输出结果所在目录FileOutputFormat.setOutputPath(job, new Path(args[1])); //将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn去运行boolean res = job.waitForCompletion(true);System.exit(res?0:1); }
}