Hadoop学习之MapReduce
目录
Hadoop学习之MapReduce
1 MapReduce简介
1.1 什么是MapReduce
1.2 MapReduce的作用
1.3 MapReduce的运行方式
2 MapReduce的运行机制
2.1 相关进程
2.2 MapReduce的编程套路
2.3 MapTask的并行度
2.4 切片及其源码解读
2.5 ReduceTask的并行度与分区
3 hadoop中的自定义类型
4 MapReduce中的排序
4 MapReduce中的分组
5 MapReduce中的Combiner
6 MapReduce中的Shuffle
7 MapReduce的join
7.1 reduce join
7.2 map join
1 MapReduce简介
1.1 什么是MapReduce
MapReduce是一个分布式运算程序的编程框架,它的核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的的分布式运算程序,并发运行在一个hadoop集群上。
1.2 MapReduce的作用
海量数据在单机上处理的话磁盘受限,内存受限,计算能力受限是无法胜任的,将单机版程序扩展到集群来分布式运行,将极大增加程序的复杂度和开发难度,而MapReduce 框架,使得开发人员可以将绝大部分工作集中在业务逻辑的开发上,而将分布式计算中的复杂性交由框架来处理。
1.3 MapReduce的运行方式
(1)打jar包运行:①将代码打成jar包上传到linux服务器;②使用hadoop命令提交到yarn集群运行;③处理的数据文件和结果文件位于hdfs文件系统
(2)本地运行(程序没有被提交到集群上):①windows 本地安装 hadoop,并且配置环境变量,hadoop 必须是 windows 平台上编译的对应版本,否则将安装目录的 lib 和 bin 目录替换成 windows 平台编译版本;②将hadoop.dll 放置在 c:/windows/system32 文件夹中;③winutils.exe 放置在$HADOOP_HOME/bin 目录中;④在 eclipse 配置 hadoop 安装目录, windows --> preferences -->Hadoop Map/Reduce --> Hadoop Installation Direcotry
(3)本地运行(将代码提交到集群上):需要修改配置 ,修改源码 ,不适用这里就不介绍了。
2 MapReduce的运行机制
2.1 相关进程
一个完整的 MapReduce 程序在分布式运行时有两类实例进程:
1、MRAppMaster(MapReduce Application Master):负责整个程序的过程调度及状态协调
2、Yarnchild:负责 map 阶段的整个数据处理流程(对应 MapTask阶段并发任务),与reduce 阶段的整个数据处理流程 (对应ReduceTask:阶段汇总任务)
注意: MapTask 和 ReduceTask 的进程都是 YarnChild,并不是说这 MapTask 和 ReduceTask 就跑在同一个 YarnChild 进行里,每个MapTask 和 ReduceTask都对应一个YarnChild进程。
2.2 MapReduce的编程套路
(1)根据客户指定的 InputFormat 来获取 RecordReader 读取数据,形成输入 KV 对;
(2)将输入 KV 对传递给客户定义的 map()方法,做逻辑运算,并将写出;
(3)将map()方法输出的 KV 进行shuffle处理,溢写,分区,排序分组等一系列操作;
(4)Reducetask 进程启动之后,从若干台 maptask 运行所在机器上获取到若干个 maptask 输出结果文件,并在本地重新归并排序, 然后按照相同 key 的 KV 为一个组,调用客户定义的 reduce()方法进行逻辑运算,并收集运算输出的结果 KV。
(5)调用客户指定的 OutputFormat 将结果数据输出到外部存储
2.3 MapTask的并行度
MapTask:运行Mapper端逻辑的任务
并行度:有多少个MapTask一起运行
MapTask并行度:Mapper端逻辑在进行运行的时候,需要拆分成多少个task(任务),这个task是job运行的最小单位,不可拆分,一个task只能在一个节点上运行。
maptask 的并行度决定 map 阶段的任务处理并发度,那么每一个任务对应一部分原始数据,那么这个数据因该多大呢?
hdfs的默认存储的块大小是128M,假设一个任务对应一个的数据量是1G(hdfs上8个块,可能分散存储到不同的节点),那么在获取数据的时候,就必然会面临跨节点传输问题,计算效率是非常低的。
假设一个任务对应的数据量是100M,要处理一个200M的文件(分为两个块block1和block2),那么task1处理的是block1块的0-100M,task2处理的是block1块的100-128M和block2块的129-200M,仍然会造成跨节点数据传输问题 降低计算的效率。
综上所述,一个任务对应的数据 最合理的应该就是和hdfs数据存储的块的大小一致 128M 。
注意:每一个任务只会被分配到每一个节点的一小部分资源,一个节点上可以执行多个任务(maptask|reducetask), 一个任务只能在一个节点执行。
一个 job 的 map 阶段并行度由客户端在提交 job 时决定,客户端对 map 阶段并行度的规划的基本逻辑为: 将待处理数据执行逻辑切片(即按照一个特定切片大小,将待处理数据划分成逻辑上的多 个 split),然后每一个 split 分配一个 mapTask 并行实例处理。
2.4 切片及其源码解读
决定maptask并行度的的逻辑切片规划描述文件,是由FileInputFormat实现类的getSplits()方法完成的。 该方法返回的是 List,InputSplit 封装了每一个逻辑切片的信息,包括长度和位置信息,而 getSplits()方法返回一组 InputSplit。
源码实现如下:
切片肯定是在map之前运行的,map之前的类是FileInputFormat文件加载,我们进入FileInputFormat类中,这个类中肯定会有切片相关的方法,getSplits方法就是进行逻辑切片的方法。首先看它的返回值:它的返回值是List集合,泛型是InputSplit,InputSplit对应的就是每一个逻辑切片对象,有一个逻辑切片就封装成这个对象,所以List集合就是封装所有切片对象的集合,所以返回值就是所有切片的集合。然后看它的参数JobContext job,是贯穿整个Job的上下文对象
因为返回值是一个list集合,所以这个方法里面肯定有把切片放到集合的过程。添加切片到集合的方法如下图,其中splitSize为核心参数,既然这边有用了这个参数,那么前面肯定有对这个参数的赋值,往前走找到最早对这个splitSize赋值的语句。
splitSize由computeSplitSize方法得来,这个方法有3个参数为blockSize, minSize, maxSize。blockSize是配置文件中块的大小,默认128M。往前找minSize
minSize通过调用Math包下的max方法得到getFormatMinSplitSize()方法的返回值和getMinSplitSize(job)方法返回值的较大值。
进入getFormatMinSplitSize()方法,得知返回值为1
进入getMinSplitSize(job)方法
SPLIT_MINSIZE属性如下
由于自己没有配置过这个属性,所以进入mapred-default.xml配置文件查找
从mapred-default.xml中可知mapreduce.input.fileinputformat.split.minsize默认为0,所以SPLIT_MINSIZE为0,那么job.getConfiguration().getLong(SPLIT_MINSIZE, 1L)方法返回0。那么Math.max(getFormatMinSplitSize(), getMinSplitSize(job))的返回值是1即minSize为1。
回到下图中,我们已经知道了minSize
接下来查看maxSize的大小,进入到getMaxSplitSize方法,分析方法如上。
在core-site.xml配置文件找不到这个属性,则返回Long的最大值,所以getMaxSplitSize的返回值为Long的最大值,即maxSize为Long的最大值
现在回到computeSplitSize方法,这三个参数都明确了blockSize为128M,minSize为0,maxSize为Long的最大值
进入到computeSplitSize方法的具体实现
Math.min(maxSize, blockSize)返回128M,所以Math.max(minSize, Math.min(maxSize, blockSize))返回128M。所以最终默认splitSize的大小等于blockSize的大小。
修改切片大小:
由上述可知:minSize对应mapreduce.input.fileinputformat.split.minsize
maxSize对应mapreduce.input.fileinputformat.split.maxsize
如果想要splitSize>blockSzie(128M),那就修改minSize
如果想要splitSize<blockSzie,那就修改maxSize
修改方式:①修改mapred-site.xml:不建议,这么改就写死了,对所有任务生效,注意:单位是M
②代码中修改,注意size是字节单位,提倡这个方法
知道了切片大小后,重新读取getSplits方法。
getSplits方法中创建存放inputSplit的结果集后获取输入路径下的所有文件,循环遍历这些文件,如果文件的长度不为0的话,判断是分布式文件系统还是本地文件系统,如果是分布式文件系统,获取当前文件的分块信息。然后再判断当前文件是否可切分,如果可以切分,获得逻辑切片大小和块大小,并定义一个当前文件剩余大小的属性,进行循环切分,条件是当前文件剩余大小除以逻辑切片大小大于1.1的话,所以一个文件的最后一个切片最大为128*1.1M,如果,最后一个切片大小为12M-128*1.1M的话会跨节点访问,但是这样还是比重新启动一个maptask的效率高。
2.5 ReduceTask的并行度与分区
2.5.1 ReduceTask的并行度
ReduceTask并行度:运行reducer逻辑的任务的并行运行的个数。
ReduceTask并行度的并发数与 Map Task 的并发数不同, Map Task 由切片数决定不同,Reducetask 数量的决定是直接手动设置的。如 job.setNumReduceTasks(3);默认值是 1见下图,手动设置为3,表示运行3 个 reduceTask,如果设置为 0,表示不运行 reduceTask 任务,就是没有 reducer 阶段,只有 mapper 阶段。
每一个reducetask对应一个 yarnchild。reducetas是 reduce 端运行任务的最小单位 ,一个reducetask只能运行在一个节点 一个节点上可以运行多个reducetask的
注意:①如果数据分布不均匀,就有可能在 reduce 阶段产生数据倾斜;②reducetask 数量并不是任意设置,需要跟进业务逻辑需求进行设计,有些情况下,需要计算全局汇总结果,就只能有 1 个 reducetask
2.5.2 分区
分区简单理解就是为了给reducetask做数据准备的 ,所以有几个reducetask 在shuffle中就会分几个分区
如果启动多个reducetask,是不会造成同一个组的数据分散到不同的reducetask中。因为对map输出的结果,在shuffle过程中进行数据分发的时候 有一个分发策略(分区算法),既可以按照reducetask的个数将数据分成对应的份数,又不会将map输出的相同的key 进行分发到不同的reducetask中。那么默认的分区算法是怎样的呢?
Partitioner的实现类如下:
当map端调用context.write(…)的时候,实际上是调用collector.collect(…)
方法中用到了numPartitions参数,说明前面已经对他赋值了,再往上找,找到如下代码:
当numPartitions的个数大于1时,partitioner分区器通过反射得到,进入getPartitionerClass方法
由这方法可知当用户写了分区的自定义方法,那么通过反射即可实例化自定义类,否则使用系统自带的类。即默认为HashPartitioner。
所以默认的分区算法如下:
所以这个分区算法为:mapkey .hash & Integer_max % 分区个数(reducetask的个数)。
经过分区之后 ,不同的分区的数据内部进行排序分组,最终这个数据给不同的reducetask进行处理。
2.5.3 自定义分区:
默认分区算法缺点: 没有办法进行制定对应的数据,到对应的分区中,所以如果要实现这个需求就需要自定义分区算法。
自定义分区步骤:
①自定义一个类继承,Partitioner
②重写getPartition()方法
③在job中 制定自定义分区类
job.setPartitionerClass(MyPartition.class)
④指定reducetask的个数,不指定默认值运行一个
job.setNumReduceTasks(3)
注意:①自定义分区中分区的个数和reducetask的个数要一致;②分区编号一定 要和reducetask的编号对应,reducetask的编号从0开始顺序递增;③虽然自定义分区中,分区编号是可以自己定义返回值的,不一定要顺序递增,但是出于性能考虑 分区编号最好是顺序递增的;④reducetask设置和分区个数相同,否则必然有reducetask在执行空跑。
import java.util.HashMap;
import java.util.Map;import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;
/*** @Description 根据课程名称course进行分区* @author refuel* @version v1.0*/
public class CoursePartition extends Partitioner<CourseInfo, NullWritable> {Map<String,Integer> map = new HashMap<>();int number = 0; @Overridepublic int getPartition(CourseInfo key, NullWritable value, int numPartitions) { String course = key.getCourse();if(map.containsKey(course)) {return map.get(course);}else {map.put(course, number);return number++;} }
}
#驱动类中
job.setPartitionerClass(CoursePartition.class);
job.setNumReduceTasks(4);
3 hadoop中的自定义类型
在Hadoop中已经有一些内置的自定义类型如下:
BooleanWritable | 布尔型数值 |
ByteWritable | 单字节数值 |
DoubleWritable | 双字节数值 |
FloatWritable | 浮点数 |
IntWritable | 整型数 |
LongWritable | 长整型数 |
Text | 使用UTF8格式存储的文本 |
NullWritable | 当<key, value>中的key或value为空时使用 |
当这些内置的数据类型不能满足我们的需求时,就需要自定义类型。自定义的类型必须具备序列化和反序列的能力,当另个进程通讯时,这些数据都会以二进制序列的形式在网络上传送,如果我们需要将Java对象进行传输的时候,也应该先将对象进行序列化,而java中的Serializable序列化会将类结构也序列化,不便于高效的进行数据传输,而这些在hadoop中是可以不需要,所以hadoop实现了一个自己的序列化接口Writable接口。
hadoop中的自定义类步骤:
①实现Writable接口;
②重新write(对象----》二进制 序列化)和readFields( 二进制---》对象 反序列化)方法。
注意:一定给无参构造并重写toString
当自定义类型放在map输出的key,必须同时具备排序能力Comparable接口和具备序列化反序列能力Writable接口,因为map输出的key必须具备排序的能力。
hadoop中的自定义类作为map输出的key步骤:
①实现WritableComparable接口
②重新compareTo()方法用来指定排序规则。重新write(对象----》二进制 序列化)和readFields( 二进制---》对象 反序列化)方法。
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.WritableComparable;
/*** @Description 不是作为map的key普通的自定义类型* @author refuel* @version v1.0*/
public class CourseInfo implements Writable {private String course;private int number;private double avg;public String getCourse() {return course;}public void setCourse(String course) {this.course = course;}public int getNumber() {return number;}public void setNumber(int number) {this.number = number;}public double getAvg() {return avg;}public void setAvg(double avg) {this.avg = avg;}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(course);out.writeInt(number);out.writeDouble(avg);}@Overridepublic void readFields(DataInput in) throws IOException {this.course=in.readUTF();this.number=in.readInt();this.avg=in.readDouble();}@Overridepublic String toString() {return course +"\t" + number + "\t" + avg;}}
4 MapReduce中的排序
MapReduce的排序发生在shuffle过程,默认安装map输出的key进行排序,如果map的key为Text类型则默认按照字典顺序升序排序,如果为数值类型默认按照值从小到大排序。
默认类型只能进行单一元素的全排序,因为默认类型的排序方法compareTo方法已经定义好了,如果无法满足需求就需要自定义类型进行排序,自定义类型必须实现WritableComparable接口(使其具备排序能力,序列化能力和反序列化能力)。
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.WritableComparable;
/*** @Description 作为map输出的key的自定义类型* @author refuel* @version v1.0*/
public class Person implements WritableComparable<Person> {private String name;private String gender;private Integer age;private String dept;public Person() {super();}public Person(String name, String gender, Integer age, String dept) {super();this.name = name;this.gender = gender;this.age = age;this.dept = dept;}public void setPerson(String name, String gender, Integer age, String dept) {this.name = name;this.gender = gender;this.age = age;this.dept = dept;}public String getName() {return name;}public void setName(String name) {this.name = name;}public String getGender() {return gender;}public void setGender(String gender) {this.gender = gender;}public Integer getAge() {return age;}public void setAge(Integer age) {this.age = age;}public String getDept() {return dept;}public void setDept(String dept) {this.dept = dept;}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(name);out.writeUTF(gender);out.writeInt(age);out.writeUTF(dept);}@Overridepublic void readFields(DataInput in) throws IOException {this.name= in.readUTF();this.gender = in.readUTF();this.age = in.readInt();this.dept = in.readUTF();}@Overridepublic int compareTo(Person o) {int deptStatus = o.getDept().compareTo(this.getDept());if(deptStatus==0) {int genderStatus = o.getGender().compareTo(this.getGender());if(genderStatus==0) {return o.getAge()-this.getAge();}return genderStatus;}return deptStatus;}}
4 MapReduce中的分组
分组和分区类似,也是用来划分数据集的,只不过更加细粒度,根据Map<key,value>中的key进行分组。在同一个分区中,相同key的值记录是属于同一个分组的,相当于groupby key的功能。
默认情况下,当map的key使用hadoop中的默认类型的时候,将mapkey 相同的分到一组;当map的key使用的是自定义类型的时候 ,是按照排序的字段进行分组的。因为分组在本质上也是一个比较的过程,分组默认调用的类是WritableComparator,用于比较的方法为compare(),源码如下:
观察源码可知,底层调用map的key的comparaTo方法。
注意:排序关注的是大小,分组关注的是是否相等
所以默认类型中,map的key的comparaTo方法按照整个字符串或整个值进行比较大小,将整个串或值完全相同的分到一组。而自定义类型中,map的key的comparaTo方法返回0时才是同一组,即将map的key的所有的比较属性都相同的分到一组。
默认情况下分组字段和排序的字段完全一致,当排序和分组规则不一致,就不能使用默认分组了,必须自定义分组了。
自定义分组步骤:
①自定义一个类继承WritableComparator接口,注意要在无参构造器中调用父类的构造方法并传入自定义的map的key的类。
②重写compare分组方法
③job中指定
注意:
①分组发生在排序之后的,compare比较的时候只会比较相邻的,想要得到需要的分组,必须在排序阶段将需要分组的数据排到一起。
②虽然在排序阶段将需要分组的数据排到一起,是一定需要自定义分组,不然会将排序字段也作为分组的条件之一。如下例子中是将相同部门相同性别的分为同一组,按照年龄排序,如果仅在排序阶段将需要分组的数据排到一起,那么就变成了相同部门相同性别相同年龄才为一组
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.WritableComparable;
/*** @Description 作为map输出的key的自定义类型* @author refuel* @version v1.0*/
public class Person implements WritableComparable<Person> {private String name;private String gender;private Integer age;private String dept;public Person() {super();}public Person(String name, String gender, Integer age, String dept) {super();this.name = name;this.gender = gender;this.age = age;this.dept = dept;}public void setPerson(String name, String gender, Integer age, String dept) {this.name = name;this.gender = gender;this.age = age;this.dept = dept;}public String getName() {return name;}public void setName(String name) {this.name = name;}public String getGender() {return gender;}public void setGender(String gender) {this.gender = gender;}public Integer getAge() {return age;}public void setAge(Integer age) {this.age = age;}public String getDept() {return dept;}public void setDept(String dept) {this.dept = dept;}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(name);out.writeUTF(gender);out.writeInt(age);out.writeUTF(dept);}@Overridepublic void readFields(DataInput in) throws IOException {this.name= in.readUTF();this.gender = in.readUTF();this.age = in.readInt();this.dept = in.readUTF();}@Overridepublic int compareTo(Person o) {int deptStatus = o.getDept().compareTo(this.getDept());if(deptStatus==0) {int genderStatus = o.getGender().compareTo(this.getGender());if(genderStatus==0) {return o.getAge()-this.getAge();}return genderStatus;}return deptStatus;}}
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;public class DeptGrouping extends WritableComparator {public DeptGrouping() {super(Person.class,true);}@Overridepublic int compare(WritableComparable a, WritableComparable b) {Person p1 = (Person)a;Person p2 = (Person)b;int deptStatus = p1.getDept().compareTo(p2.getDept());if(deptStatus==0) {return p1.getGender().compareTo(p2.getGender());}return deptStatus;}}
#驱动类中添加自定义分组类
job.setGroupingComparatorClass(DeptGrouping.class);
5 MapReduce中的Combiner
Combiner:局部聚合组件,是优化组件 ,接受数据来自maptask,输出数据给reducetask,默认不是shuffle的组件。
作用:减少shuffle过程的数据量,减少reduce端接受的数据量,提升性能
工作过程:对每一个maptask的输出结果做一个局部聚合,聚合操作逻辑取决于reduce端的操作逻辑
Combiner实现步骤:
①自定义一个类继承 Reducer <maptask输出,reducetask的输入>,注意前两个泛型== 后两个泛型
②重写 reduce 方法
③在job中指定
一般情况下,在实际开发过程中,当Reducer的代码中 前两个泛型== 后两个泛型, 可以使用Reducer的代替Combiner的代码 ,如果泛型不一致 不可以替代的。
注意:combiner之所以没有默认在shuffle中是因为使用场景受限
适用场景 | 不适用场景 |
max ,min, sum等操作 | avg直接求平均 |
虽然combiner不适合直接求平均值,但是可以在combiner中统计sum和count,在reduce端求平均值这样间接求得。
6 MapReduce中的Shuffle
MapReduce 中,mapper 阶段处理的数据如何传递给 reducer 阶段,的流程就叫 Shuffle(数据混洗),Shuffle是 MapReduce 框架中 最关键的一个流程。
Shuffle的核心机制包括:数据分区,排序,局部聚合,缓存,拉取,再合并,排序等。
MapReduce 执行流程图如下:
MapReduce 执行流程解读:(Shuffle流程为2-7步)
(1)每一个文件以block形式存储在HDFS上,默认128M存3份,运行时每个maptask处理一个split,默认情况下一个split的大小与block块的大小一样,有多少个 block 就有多少个 map 任务,所以不同大小的文件会有不同个map任务进行并行计算;
(2)每个 maptask处理完输入的 split 后会把结果写入到内存的一个环形缓冲区,环形缓冲区的默认大小为 100M,阈值为0.8,当缓冲区的大小使用超过阀值,一个后台的线程就会启动把缓冲区中的数据溢写 (spill)到本地磁盘中(mapred-site.xml:mapreduce.cluster.local.dir),将数据从内存中写出到磁盘的过程就是溢写,同 Mapper 继续时向环形缓冲区中写入数据;
(3)环形缓冲区的数据在溢写到磁盘前会先按照分区编号进行排序,每个分区中的都数据会有后台线程根据 map 任务的输出结果 key进行内排序(字典顺序、自然顺序或自定义顺序comparator),如果有combiner,它会在溢写到磁盘之前对排好序的输出数据上运行,最后在本地生成分好区且排好序的小文件; 如果预留20%写满了,但是80%的数据还没有溢写完成,整个数组处于阻塞状态,阻塞到80%的空间释放再次启动,到阀值后会向本地磁盘新建一个溢写文件;
(4)每个maptask完成之前,会把本地磁盘的所有溢写文件不断合并成得到一个结果文件,合并得到的结果文件会根据小溢写文件的分区而分区,每个分区的数据会再次根据 key进行排序,得到的结果文件是分好区且排好序的,可以合并成一个文件的溢写文件数量默认为 10(mapred-site.xml:mapreduce.task.io.sort.factor);这个结果文件的分区存在一个映射关系, 比如 0~1024 字节内容为 0 号分区内容,1025~2048 字节内容为 1 号分区内容等等;
(5)当有一个maptask完成就会启动reducetask,reduce 任务进入fetch(复制)阶段,reduce 任务通过 http 协议(hadoop内置了netty容器)把所有Mapper结果文件的对应的分区数据复制过来。Reducer可以并行复制Mapper的结果,默认线程数为 5个(mapred-site.xml:mapreduce.reduce.shuffle.parallelcopies)。Reducer 个数由 mapred-site.xml 的 mapreduce.job.reduces 配置决定, 或者初始化 job 时调用 Job.setNumReduceTasks(int);Reducer 中的一个线程定期向 MRAppMaster询问Mapper输出结果文件位置,mapper结束后会向MRAppMaster汇报信息; 从而 Reducer得知Mapper状态,得到map结果文件目录;
由于 Reducer可能会失败,所有 Reducer 复制完成 map 结果文件后,NodeManager 并没有在第一个 map结果文件复制完成后删除它,直到作业完成后 MRAppMaster 通知 NodeManager 进行删除;
(6)fetch(复制)阶段完成后,Reducer 进入 Merge 阶段,循环地合并 map 结果文件,并按map的key排序,合并因子默认为 10(mapred-site.xml:mapreduce.task.io.sort.factor),经过不断地 Merge 后 得到一个“最终文件”,可能存储在磁盘也可能存在内存中;
(7)在执行进入到reducer类中,会对合并并排序的最终文件进行分组;
(8)经过合并,排序,分组后的最终文件输入到 reduce 进行计算,计算结果输入到 HDFS。
环形缓冲区详解:
Mapper任务执行完后的数据会通过MapOutputBuffer提交到一个kvbuffer缓冲区中,这个缓冲区的数据是专门存储map端输出数据的,它是一个环形缓冲区,大小可通过配置mapreduce.task.io.sort.mb来配置这个缓冲区的大小,默认是100MB。kvbuffer本质上是一个byte数组,模拟的环形数据结构,环形缓冲区适用于写入和读取的内容保持在顺序的情况下,要不然就不能均匀的向前推进。
在Hadoop中数据要排序有个非常良好的策略,就是不移动数据本身,而是为每个数据建立一个元数据kvmeta,在排序的时候,直接对元数据进行排序,然后顺序读写元数据即可。淫威每条元数据的大小是固定的4*4byte,即4个整数,所以读取非常方便。
元数据:描述数据的数据,这的元数据描述的是原始数据在数组中存储的位置,其主要构成如下:
①分区编号partitioner,通过getPartition()得到
②原始数据的key的起始下标
③原始数据的value的起始下标
④原始数据的value的长度
一条元数据的长度是固定的4*4byte,即4个整数,所有的元数据合并在一起就是一个元数据块,相当于一个数组,可以通过KV对的元数据,再按照其元数据的指引就可找到这个KV对的K和V,还可以知道这个KV对属于哪个Partition。
读取配置文件,如下图
根据上图配置文件可知环形缓冲区的大小为100*1024*1024字节,将数据写出到磁盘的阈值默认0.8。
缓存区的数据(数组的数据)达到一定的阈值,默认0.8时,开始写入磁盘,将数据从内存中写出到磁盘的过程就是溢写。预留20%是为了在溢写过程中持续的接受maptask的输出数据,若这20%写满了,但是80%的数据还没有溢写完成,整个数组处于阻塞状态,阻塞到80%的空间释放。
注意:字节数组的整个写入过程是收尾相连的,这是按环形缓冲区使用的,所以往里写入内容时一旦超过终点就又“翻折”到缓冲区的起点,反之亦然,当80%的数据溢写完成,元数据会做一个位置的调整,和原始数据形成一个背对背的结构,新的equator就形成了。
7 MapReduce的join
7.1 reduce join
多表关联发生在reduce端,需要map端能够同时读取两个数据,两个表的关联键相同的数据必须分到一组,所以map端输出的key为相同的关联键。
案例如下:
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
/*** @Description reduce join的map端实现* @author refuel* @version v1.0*/
public class ReduceJoinMap extends Mapper<LongWritable,Text,Text,Text> {Text k = new Text();Text v = new Text();String filename;int count =0;Map<String,Integer> map = new HashMap<>();@Overrideprotected void setup(Mapper<LongWritable, Text, Text, Text>.Context context)throws IOException, InterruptedException {FileSplit fileSplit = (FileSplit)context.getInputSplit();filename = fileSplit.getPath().getName();}@Overrideprotected void map(LongWritable key,Text value,Mapper<LongWritable,Text,Text,Text>.Context context) throws IOException, InterruptedException {if("movies.dat".equals(filename)) {String[] mwords = value.toString().split("::");k.set(mwords[0]);v.set("M"+mwords[1]+"\t"+mwords[2]);context.write(k, v);}else {String[] rwords = value.toString().split("::");k.set(rwords[1]);v.set("R"+rwords[0]+"\t"+rwords[1]+"\t"+rwords[2]+"\t"+rwords[3]);context.write(k, v);}}
}import java.io.IOException;
import java.util.ArrayList;
import java.util.List;import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/*** @Description reduce join的reduce端实现* @author refuel* @version v1.0*/
public class ReduceJoinReduce extends Reducer<Text,Text, Text, NullWritable> {List<String> mlist=new ArrayList<String>();List<String> rlist=new ArrayList<String>();Text k = new Text();@Overrideprotected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, NullWritable>.Context context)throws IOException, InterruptedException {mlist.clear();rlist.clear();for(Text value:values) {String line = value.toString();if(line.startsWith("M")) {mlist.add(line);}else {rlist.add(line);}}for(String m:mlist) {for(String r:rlist) {k.set(r+"\t"+m);context.write(k, NullWritable.get());}}}}package com.refuel.homework.mapreduce.day09_3;import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/*** @Description reduce join的驱动类实现* @author refuel* @version v1.0*/
public class ReduceJoinDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(ReduceJoinDriver.class);job.setMapperClass(ReduceJoinMap.class);job.setReducerClass(ReduceJoinReduce.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);FileSystem fs = FileSystem.get(conf);Path path = new Path("E:\\test\\mapreduce\\out10");if(fs.exists(path)) {fs.delete(path, true);}FileInputFormat.setInputPaths(job, new Path("E:\\test\\mapreduce\\movies.dat"),new Path("E:\\test\\mapreduce\\ratings.dat"));FileOutputFormat.setOutputPath(job, path);boolean result = job.waitForCompletion(true);System.exit(result?0:1);}}
reduce join有可能缺陷问题:数据倾斜。MapReduce中数据倾斜的本质其实是分区中数据分配极大不均匀。
当mapreduce 程序中有reduce 有可能产生数据倾斜,combiner是一种的数据倾斜的处理方式,但是不能绝对避免,避免数据倾斜需要调整分区算法
7.2 map join
多表关联发生在map端。整个join过程在map端发生,只需要maptask不需要reducetask
优势: 有效避免join 数据倾斜;劣势:缓存中的表不可过大,适合处理大小表关联
实现思路:map() 一行调用一次,在map端只读取一个文件(大文件),另一个文件(小文件)加载在每一个maptask运行节点的内存中,每当map端进行读取一个文件的一行 就去内存中 找是否可以匹配另一个文件
实现步骤:
①将小文件加载到本地缓存(磁盘)中job.addCacheFile(uri); 将制定的路径的文件,加载到每一个maptask的运行节点上;
②setup方法中定义一个流,定义一个集合,流开始读取,放在集合中;
③map方法中读取大文件,每次读取一行,和内存集合中的数据做关联,关联完成写出hdfs 。
注意:注意:只有maptask 时候,需要将reducetask设置0,job.setNumReduceTasks(0),否则默认运行一个Reducer 。
案例如下:
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;/*** @Description map join的map端实现* @author refuel* @version v1.0*/
public class MapJoinMap extends Mapper<LongWritable,Text,Text,NullWritable> {Text k = new Text();Map<String,String> map;@Overrideprotected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context)throws IOException, InterruptedException {Path[] localCacheFiles = context.getLocalCacheFiles();String cacheFile=localCacheFiles[0].toString();BufferedReader br = new BufferedReader(new FileReader(cacheFile));map = new HashMap<>();String line =null;while((line= br.readLine())!=null) {String[] word = line.split("::");map.put(word[0],word[1]+"\t"+word[2]);}}@Overrideprotected void map(LongWritable key,Text value,Mapper<LongWritable,Text,Text,NullWritable>.Context context) throws IOException, InterruptedException {String[] words = value.toString().split("::");if(map.containsKey(words[1])) {k.set(words[0]+"\t"+words[1]+"\t"+words[2]+"\t"+words[3]+"\t"+map.get(words[1]));context.write(k, NullWritable.get());}}
}import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/*** @Description map join的驱动类* @author refuel* @version v1.0*/
public class MapJoinDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(MapJoinDriver.class);job.setMapperClass(MapJoinMap.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);job.addCacheFile(new URI(args[0]));job.setNumReduceTasks(0); FileSystem fs = FileSystem.get(conf);Path path = new Path(args[2]);if(fs.exists(path)) {fs.delete(path, true);}FileInputFormat.addInputPath(job, new Path(args[1]));FileOutputFormat.setOutputPath(job, path);boolean result = job.waitForCompletion(true);System.exit(result?0:1);}}