实训笔记7.22
- 7.22
- 一、MapReduce中的Shuffle机制
- 1.1 第一块内容:MapTask的输出的分区问题
- 1.1.1 计算分区的机制
- 1.1.2 分区数和NumReduceTask的关系
- 1.2 第二块内容:MapTask的输出的环形缓冲区的问题
- 1.3 第三块内容:MapTask的输出的溢写排序的问题
- 1.4 第四块内容(可选操作-MR优化策略):MapTask输出数据时的Combiner局部聚合问题
- 1.5 第五块内容:ReduceTask拉取数据的分组排序的问题
- 二、MpReduce的工作流程:详细的工作流程
- 2.1 第一步、提交MR作业资源
- 2.2 第二步:运行MapTask任务
- 2.3 第三步:运行ReduceTask任务
- 2.4 第四步:输出计算结果
- 三、MapReduce中Job提交作业机制
- 四、MapReduce中MapTask作业机制
- 五、MapReduce中的ReduceTask机制
- 5.1 第一知识点:ReduceTask任务数的设置
- 5.2 第二个知识点(非常重要):数据倾斜问题:Map阶段和Reduce阶段都存在
- 六、MapReduce中的OutputFormat机制
- 6.1 OutputFormat常见的实现类
- 6.1.1 TextOutputFormat:是OutputFormat的默认实现类
- 6.1.2 SequenceFileOutputFormat
- 6.2 自定义OutputFormat
- 七、代码示例
7.22
一、MapReduce中的Shuffle机制
Shuffle译为重新洗牌,在大数据中,shuffle指的是将大数据计算所需的相关数据重新“打乱”,需要跨计算节点,跨网络传输数据,如果Shuffle过程中,需要传输的数据量过大,那么分布式计算的效率的偏低。在MapReduce中,Shuffle机制需要涉及到大量的磁盘IO(数据从内存写入到磁盘当中)—磁盘IO也是影响计算性能的核心因素 【MapReduce的优化操作】提升MR程序的计算效率,优化基本上都是对shuffle阶段进行优化
1.1 第一块内容:MapTask的输出的分区问题
MapTask输出数据时,先计算kv数据的分区,计算出分区编号以后,然后将kv以及分区编号借助collector收集器将数据写出到环形缓冲区中
1.1.1 计算分区的机制
- 分区数(底层就是NumReduceTask)如果等于1的时候:底层会借助一个Partitioner的匿名内部类的形式去计算分区编号,计算逻辑直接返回的就是0
partitioner = new org.apache.hadoop.mapreduce.Partitioner <K,v>() {@Overridepublic int getPartition(K key, V value, int numPartitions){ return partitions - 1;
}
};
- 分区数如果大于1的时候:底层会使用我们在Driver驱动程序设置的分区类,如果分区类没有设置,那么默认使用HashPartitioner类进行分区
if(partitions>1){partitioner=(org.apache.hadoop.mapreduce.Partitioner<K,V>)ReflectionUtils.newInstance(jobContext.getPartitionerClass(),job);
}else{
1.1.2 分区数和NumReduceTask的关系
-
一个ReduceTask只能处理一个分区的数据,因此原则上ReduceTask的数量和分区数必须是一致的
-
如果分区数和ReduceTask的数量如果不一致出现以下三种情况
- 自定义的分区数大于1,但是ReduceTask的数量等于1,此时程序正常运行,返回一个底层的匿名内部类的分区器进行分区,所有的数据都到0号分区了,自定义的分区类没有任何的用户
- 自定义的分区数大于1,ReduceTask的数量大于1 但是小于分区数,程序运行会报错
- 自定义的分区数据大于1,reduceTask的数量大于1 而且大于自定义的分区数,程序会正常运行,按照我们自己定义的分区机制运行,只不过多余的ReduceTask会空运行。
1.2 第二块内容:MapTask的输出的环形缓冲区的问题
-
当我们计算完成kv数据的分区之后,MR程序会借助collector收集器的collect方法将kv数据以及分区编号向环形缓冲区写入,环形缓冲区是一个内存中概念,在底层源码当中就是一个字节数组byte[] kvbuffer。
-
环形缓冲区默认只有100M,而且环形缓冲区还有一个阈值,阈值默认是80%,如果缓冲区写入的数据超过了阈值,缓冲区的已经写入的数据会溢写到磁盘文件中spliiN.out文件,同时溢写的过程中,会在环形缓冲区剩余的20%的空间反向继续写入后续的MapTask计算完成的数据。
-
环形缓冲区大小和阈值可以自己设置的:
mapreduce.task.io.sort.mb 100M
设置环形缓冲区的大小
mapreduce.map.sort.spill.percent 0.8
设置环形缓冲区的溢写因子
【优化机制】:如果想让MR程序执行的更加快速,在缓冲区这块我们可以减少溢写磁盘的次数,因此一般况下对于不同的计算程度可以设置缓冲区的大小和阈值,减少溢写次数
1.3 第三块内容:MapTask的输出的溢写排序的问题
-
环形缓冲区在进行溢写的时候,会先对环形缓冲区的数据按照不同的分区,按照分区的key值的比较器进行排序,排序的目的保证溢写文件分区有序。因此在MR程序,要求MapTask输出的key值必须实现WritableComparable接口,并且重写序列化和反序列化机制以及比较器方法,同时在比较器方法中重写比较规则。MapReduce溢写文件的时候,是一次性全部写入的,全部溢写完成以后清空环形缓冲区的溢写数据。
-
溢写磁盘的时候,每一次溢写都会进行一次排序,溢写的排序底层默认使用的是快速排序算法实现的。
-
MapTask运行过程中,可能产生多个溢写文件,最后多个溢写文件合并成一个大的溢写文件,合并大的溢写文件的时候,还得需要进行一次排序操作,排序采用的归并排序算法。
【问题】:重写比较器的比较方法时,一定要注意,比较器返回的值只能是正整数或者负整数,但千万不能是0,因为一旦是0,那么两个相等的数据只会保留一个。
1.4 第四块内容(可选操作-MR优化策略):MapTask输出数据时的Combiner局部聚合问题
-
Combiner是MapReduce的可选组件,可以添加也可以不加,如果我们添加了Combiner,Combiner是在map输出之后,reduce输入之前执行的,Combiner在这个过程中,执行几次,执行时机都是不确定的。和MR程序的计算负载,资源是有很大关系的,有可能Combiner设置了,一次也不执行。
-
Combiner可以理解为Map端的局部聚合,Combiner的存在,可以减少Map端的溢写文件的数据量以及Map向Reduce传输的数据量
-
并不是所有的MR程序都可以添加Combiner,Combiner使用的前提是不能影响MR程序的执行逻辑。如果使用MapReduce程序计算平均值等操作,Combiner一定不能存在。
-
如果我们要自定义Combiner,Combiner的输入和输出的kv类型必须和Map阶段的输出类型保持一致。Combiner其实就是一个Reducer。所以在有些情况下,如果Combiner和Reducer的逻辑是一样的,同时Reducer的输入和输出满足Combiner的要求,那么可以使用Reducer充当Combiner使用。
1.5 第五块内容:ReduceTask拉取数据的分组排序的问题
-
ReduceTask是和分区一一对应的,一个ReduceTask用来处理一个分区的数据,ReduceTask处理的分区数据可能是来自多个MapTask,因此ReduceTask在进行计算之前,需要先进行一个copy阶段,copy阶段主要是将每一个MapTask上该分区的数据拉去到ReduceTask所在的节点上。默认拉去到ReduceTask内存中,如果内存放不下Spill溢写操作
-
因为ReduceTask拉取得数据量可能很大,拉取 得过程中也会对数据进行merge合并操作
-
ReduceTask把数据拉取合并完成之后,需要进行分组以及排序,排序merge合并完成之后,需要对整体的拉取的数据再进行一次归并排序,分组将该分区的数据按照key值划分不同的数据组,然后一组相同的key值调用一次Reduce方法进行处理。
-
排序默认使用的是key值的比较器进行排序的,分组默认基于key值的判断相等(hashCode、equals)策略进行key值相等判断以外,还会借助比较器进行key值的相等判断,如果hashCode和equals判断两个key值相等,但是比较器比较出来两个对象大小不一致,那么此时MR程序也会认为两个key值不等,划分不同的组中。 MR程序中判断相等比较器是主力。
-
默认情况下,reduce聚合key值的时候,需要对key值进行分组,但是key值分组的时候默认使用的是map阶段输出key值的比较器进行相等判断。但是在有些情况下,我们reduce聚合key值并不是按照map阶段的key值的比较器进行分组,因此我们就需要在Reduce阶段在单独定义分组排序,分组排序的目的是为了告诉reduce你应该如何进行分组
-
reduce端的分组排序如果我们要自定义,只需要继承一个类即可WritableComparator
二、MpReduce的工作流程:详细的工作流程
2.1 第一步、提交MR作业资源
- InputFormat生成切片规划文件job.split文件
- 将整个MR程序的相关配置项全部封装到一个job.xml配置文件
- 借助jobSummitter提交切片规划文件以及配置文件到指定的目录
2.2 第二步:运行MapTask任务
- 通过InputFormat的createRecordReader读取对应切片的kv数据
- 通过mapTask的map方法进行kv数据的处理
- 调用context.write方法将map处理完成的kv数据写出,先计算kv数据的分区编号
- 调用collector收集器将kv数据以及分区写出到环形缓冲区,
- 环形缓冲区到达一定的阈值之后,先对环形缓冲区数据进行排序,排好序之后将数据一次性溢写到文件中,清空溢写的数据缓冲区,溢写可能会发生多次,也就可能会产生多个溢写文件,当map任务运行完成,多个溢写文件会合并成一个大的溢写文件spill.out,同时合并大文件需要进行排序。
- 溢写的过程中如果设置了Combiner,那么溢写的过程中会进行Combiner操作,Combiner到底什么时机执行,不一定,Combiner作用是为了减少了map溢写的数据量以及map向reduce传输的数据量
2.3 第三步:运行ReduceTask任务
- copy阶段:先从不同的MapTask上拷贝指定分区的数据到达ReduceTask的节点内存,内存放不下,溢写磁盘文件中
- merge阶段:拷贝数据到ReduceTask中,溢写数据的时候会进行合并操作,减少溢写文件的产生
- Sort阶段:安装指定的分组规则对数据进行聚合,同时对merge合并完成的数据进行一次排序
- 执行Reduce方法,一组相同key调用一次reduce方法
2.2 第三步~第六步 – > 2.3 第一步~第三步:mapreduce中shuffle机制
2.4 第四步:输出计算结果
reduce计算完成,调用context.write方法写出key value数据,MR底层会调用OutputFormat的实现类实现数据到文件的写出
三、MapReduce中Job提交作业机制
核心重点:不同的InputFormat实现类的切片机制
job提交作业的源码解读
四、MapReduce中MapTask作业机制
- 不同的InputFormat实现类的读取切片的kv机制
- MapTask的任务数的决定机制–切片
- MapTask运行过程中内存相关配置
- MapTask的运行节点和负责的切片节点之间的关系:移动数据不如移动计算的机制
五、MapReduce中的ReduceTask机制
ReduceTask把数据分组好以后,一组相同的key调用一次Reduce方法,Reduce方法就可以去聚合数据,进行逻辑计算。
5.1 第一知识点:ReduceTask任务数的设置
-
MR程序当中,MapTask的数量我们是基于切片数量自动确定的,我们人为无法手动设置mapTask的任务数,如果想修改MapTask的个数,我们无法直接修改,只能通过修改切片机制间接的修改MapTask的任务个数。
-
MR程序当中,ReduceTask的数量机制和MapTask机制不太一样的,ReduceTask的任务个数是可以手动指定的。因此ReduceTask的数量给多少合适?默认要求ReduceTask的数量必须和分区数保持一致,因为一个Reduce任务处理一个分区的数据。
-
【注意】在MR程序中,有一个比较特殊的机制,Reduce的数量可以设置为0,那么一旦ReduceTask的数量设置为0,那么MR程序只有Mapper阶段,没有reduce阶段,map阶段的输出就是整个MR程序的最终输出了。 一般写MR程序的时候,要求如果操作不涉及到对数据集整体的聚合操作(计算的结果需要从数据集整体中获得),我们都不建议大家增加Reduce阶段,因为增加了Reduce,MR执行效率会非常的低。
5.2 第二个知识点(非常重要):数据倾斜问题:Map阶段和Reduce阶段都存在
- 数据倾斜不是MR程序运行原理,是MR程序在运行过程中可能会出现一种影响MR程序运行效率的情况。所谓的数据倾斜指的是多个ReduceTask处理的分区数据量差距过大,这样的话就会导致一个问题,有些ReduceTask会快速的运行完成,而有些ReduceTask运行时间非常久。
- 如果我们发现所有reduceTask,有大部分运行很块结束了,而少部分Task运行时间过长,这一般都是因为数据倾斜问题的导致。解决方案很简单
- 自定义分区机制,尽可能让各个分区的数据分布均匀一点
- 消除热点key的数据,分区之所以不均匀,还有很大的可能性是因为确实有部分的key值出现的次数太了。消除机制在处理数据时候,在热点key值的后面增加一些随机数。
- 抽样分析数据–数学算法和思想
六、MapReduce中的OutputFormat机制
OutputFormat是MR程序中输出格式化类,抽象类中定义了一个方法getRecordWriter,方法是OutputFormat的核心作用,方法作用就是定义了我们的Reduce或者Map阶段输出最终的结果数据时,kv数据如何写入到文件中。写入的规则是一个什么规则。
6.1 OutputFormat常见的实现类
6.1.1 TextOutputFormat:是OutputFormat的默认实现类
-
输出数据的规则是将reduce输出的每一个key-value数据以\t分割,然后一组kv数据单独占据一行
-
通过这个类输出数据时,会数据放到指定的文件,文件默认的命名规则
part-m/r-xxxxx
6.1.2 SequenceFileOutputFormat
-
是Hadoop中一种特殊的输出文件格式,输出文件格式SequenceFile文件
-
SequenceFile文件
- SequenceFile文件是Hadoop提供的一种特殊的二进制文件,二进制文件支持对数据进行压缩以后再写出到文件内部,这种文件在某种程度上可以极大的提高分布式计算的效率。SequenceFile文件存储数据的格式都是kv格式,只不过kv数据都是二进制压缩过的数据。
- Sequencefile文件支持对数据压缩,也可以不压缩,整体上文件的压缩方式一共有三种
- none:对数据不压缩
- record:只对kv数据中value数据进行压缩
- block:将多个kv数据都进行压缩
6.2 自定义OutputFormat
七、代码示例
package com.sxuek.group;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;/*** MR程序辅助排序(分组排序)的案例:* 辅助排序是reduce拉取完数据之后执行,通过辅助排序,reduce可以判断哪些key值为相同的key,如果没有辅助排序,那么MR程序会使用map阶段输出的key的排序规则当作key值判断相等的条件** 现在有一个订单文件,格式如下:* 订单id 商品id 成交金额* 0000001 Pdt_01 222.8* 0000001 Pdt_05 25.8* 0000002 Pdt_03 522.8* 0000002 Pdt_04 122.4* 0000002 Pdt_05 722.4* 0000003 Pdt_01 222.8* 0000003 Pdt_02 33.8* 这个文件三列,每一列之间都是以\t分割的。现在我们需要基于上述的文件求每一个订单中成交金额最大的商品。结果如下:* 0000001 pdt_01 222.8* 0000002 Pdt_05 722.4* 0000003 pdt_01 222.8** 案例分析:* 如果我们只是想把订单数据按照订单编号从低到高排序,同时如果订单编号一致,那么按照成交金额从高到底排序。* 到时候只需要按照订单id分组,取第一条数据,第一条数据就是我们某一个订单中成交金额最大的商品信息** 如果我们要获取每一个订单的成交金额最大的信息,逻辑只需要在刚刚的代码基础之上,reduce在进行汇总数据的时候,重新指定一下分组规则即可。* 分组条件应该是只要订单id一致即可。如果订单id一致,多个订单数据只有第一条数据才会进入reduce。*/
public class OrderDriver {public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();configuration.set("fs.defaultFS","hdfs://192.168.68.101:9000");Job job = Job.getInstance(configuration);job.setJarByClass(OrderDriver.class);FileInputFormat.setInputPaths(job,new Path("/order.txt"));job.setMapperClass(OrderMapper.class);job.setMapOutputKeyClass(OrderBean.class);job.setMapOutputValueClass(NullWritable.class);job.setGroupingComparatorClass(OrderGroupComparator.class);job.setReducerClass(OrderReducer.class);job.setOutputKeyClass(NullWritable.class);job.setOutputValueClass(OrderReducer.class);job.setNumReduceTasks(1);//默认就是只有一个reduce任务Path path = new Path("/orderOutput");FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.68.101:9000"), configuration, "root");if (fileSystem.exists(path)){fileSystem.delete(path,true);}FileOutputFormat.setOutputPath(job,path);boolean b = job.waitForCompletion(true);System.exit(b?0:1);}
}/*** map阶段的逻辑就是把每一行的订单数据读取进来以后,按照\t分割,将每一行的数据字段以orderBean对象封装,* 封装好以后以orderBean为key 以null值为value输出即可* 那么MR程序在计算过程中会自动根据OrderBean定义的排序规则对数据进行排序*/
class OrderMapper extends Mapper<LongWritable, Text,OrderBean, NullWritable>{@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, OrderBean, NullWritable>.Context context) throws IOException, InterruptedException {String line = value.toString();String[] array = line.split("\t");String orderId = array[0];String pId = array[1];double amount = Double.parseDouble(array[2]);OrderBean orderBean = new OrderBean(orderId,pId,amount);context.write(orderBean,NullWritable.get());}
}/*** Reducer阶段,reduce阶段只需要将读取进来的key value数据输出即可,因为排序规则在map到reduce中间的shuffle阶段已经全自动化完成了* 因此如果只是排序规则,到了reduce阶段只需要将处理好的数据原模原样的输出即可*/
class OrderReducer extends Reducer<OrderBean,NullWritable,NullWritable,OrderBean>{@Overrideprotected void reduce(OrderBean key, Iterable<NullWritable> values, Reducer<OrderBean, NullWritable, NullWritable, OrderBean>.Context context) throws IOException, InterruptedException {context.write(NullWritable.get(),key);}
}/*** 定义辅助排序,重新定义reduce的key值分组逻辑:* 如果orderId一致 认为两条数据是同一个可以 reduce聚合的时候使用第一条数据当作key值进行计算*/
class OrderGroupComparator extends WritableComparator{public OrderGroupComparator(){super(OrderBean.class,true);}@Overridepublic int compare(WritableComparable a, WritableComparable b) {OrderBean o1 = (OrderBean) a;OrderBean o2 = (OrderBean) b;return o1.getOrderId().compareTo(o2.getOrderId());}
}/*** 因为你要对数据进行排序,排序的规则还涉及到多个不同的字段,MR程序中只有map阶段输出的key才具备排序的能力* 因此也就意味着多个排序字段都要当作map输出key来传递,但是key值只能传递一个,多个字段封装为一个JavaBean* 1、定义一个封装原始数据的JavaBean类*/
class OrderBean implements WritableComparable<OrderBean>{private String orderId;private String pId;private Double amount;public OrderBean() {}public OrderBean(String orderId, String pId, Double amount) {this.orderId = orderId;this.pId = pId;this.amount = amount;}public String getOrderId() {return orderId;}public void setOrderId(String orderId) {this.orderId = orderId;}public String getpId() {return pId;}public void setpId(String pId) {this.pId = pId;}public Double getAmount() {return amount;}public void setAmount(Double amount) {this.amount = amount;}@Overridepublic String toString() {return orderId+"\t"+pId+"\t"+amount;}/*** 比较器:想按照订单id升序排序,如果订单id一致 按照成交金额降序排序* @param o the object to be compared.* @return*/@Overridepublic int compareTo(OrderBean o) {if(this.orderId.compareTo(o.orderId) == 0){//判断成交金额if (this.amount > o.amount){return -1;}else if(this.amount < o.amount){return 1;}else{return 0;}}else{return this.orderId.compareTo(o.orderId);}}/*** 序列化写的方法* @param out <code>DataOuput</code> to serialize this object into.* @throws IOException*/@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(orderId);out.writeUTF(pId);out.writeDouble(amount);}@Overridepublic void readFields(DataInput in) throws IOException {orderId = in.readUTF();pId = in.readUTF();amount = in.readDouble();}
}
package com.sxuek.noreduce;import com.sxuek.group.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;
import java.net.URI;/*** 现在有一个文件,文件中有很多行数据,每一行数据都是以空格分割的多个单词组成的,* 现在要求通过MR程序实现将文件中所有以大写字母开头的英语单词过滤掉,最终输出结果文件,结果文件* 中只有以小写字母开头的英语单词。*/
public class DemoDriver {public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();configuration.set("fs.defaultFS","hdfs://192.168.68.101:9000");Job job = Job.getInstance(configuration);job.setJarByClass(DemoDriver.class);FileInputFormat.setInputPaths(job,new Path("/wordcount.txt"));/*** 当前MR程序只有map阶段 没有reduce阶段,* 也就意味着map阶段的输出就是最终的结果数据*/job.setMapperClass(DemoMapper.class);job.setOutputKeyClass(NullWritable.class);job.setOutputValueClass(Text.class);/*** 如果mr程序中没有reduce阶段,一定一定要把reduce的任务数设置为0,* 如果没有设置为0,同时MR程序中没有指定reducer类,那么MR程序会默认自动给你添加Reduce类,并且启动* 一个reduceTask,自动生成的reduce类很简单,reduce类就是map输出的是什么结果 reducer原模原样输出*/job.setNumReduceTasks(0);Path path = new Path("/demoOutput");FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.68.101:9000"), configuration, "root");if (fileSystem.exists(path)){fileSystem.delete(path,true);}FileOutputFormat.setOutputPath(job,path);boolean b = job.waitForCompletion(true);System.exit(b?0:1);}
}
class DemoMapper extends Mapper<LongWritable, Text, NullWritable,Text>{@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, NullWritable, Text>.Context context) throws IOException, InterruptedException {String line = value.toString();System.out.println(line);String[] words = line.split(" ");for (String word : words) {System.out.println(word);char c = word.charAt(0);if (c >=65 && c <=90){continue;//过滤操作 只要map方法执行完成,没有调用context.write方法将数据写出 那么就代表当前数据需要舍弃}else{context.write(NullWritable.get(),new Text(word));}}}
}