目录
- 一、 MapReduce
- 1.1 MapReduce定义
- 1.2 MapReduce优缺点
- 1.2.1 优点
- 1.2.2 缺点
- 1.3 MapReduce核心思想
- 1.4 MapReduce进程
- 1.5 常用数据序列化类型
- 1.6 MapReduce编程规范
- 1.6.1Mapper阶段
- 1.6.2 Reduce阶段
- 1.6.3 Driver阶段
- 1.7 WordCount案例实操
- 1.7.1 本地测试
- 1.7.2 提交到集群测试
- 二、 Hadoop序列化
- 2.1 序列化概述
- 2.2 自定义bean对象实现序列化接口(Writable)
- 2.3 序列化案例实操
一、 MapReduce
1.1 MapReduce定义
MapReduce是一种用于处理大规模数据集的编程模型和分布式运算程序的编程框架。它最初由Google公司开发,在后来成为了Apache Hadoop项目的核心组件之一。MapReduce的核心思想是将一个大的计算任务分解为多个可以并行执行的小任务,并通过将数据并行处理来实现高效的大规模数据处理。它适用于分布式环境下的数据处理,可以在大规模集群上并行执行计算任务,从而提高处理速度和可扩展性。
1.2 MapReduce优缺点
1.2.1 优点
- 简化编程模型
MapReduce提供了简单的编程接口,开发人员只需实现map和reduce函数即可,无需关注底层的并行和分布式细节。只需简单的实现一些接口,就可以完成一个分布式程序。 - 高可扩展性
MapReduce允许将大规模数据集分布在集群中进行处理,从而实现了高度可扩展性。可以通过增加集群中的节点数量来增加处理能力。 - 高容错性
MapReduce具有容错机制,当某个节点发生故障时,可以自动重新分配任务到其他可用节点上,保证任务的完成。 - 适用于大规模数据集
MapReduce适用于处理大规模的数据集,可以在分布式环境下高效地处理海量数据。
1.2.2 缺点
- 高延迟
由于MapReduce是批处理模型,需要等待所有任务完成后才能得到结果,因此对于实时性要求较高的场景不太适用。 - 适用性限制
MapReduce适用于批处理任务,但不适用于需要实时响应和交互式查询的场景。它主要适用于离线数据分析、批量处理和一次性计算等应用。 - 数据倾斜
在某些情况下,由于数据分布的不均匀,输入的数据可能会导致Reduce任务之间的负载不均衡,一些任务可能会比其他任务运行更长时间,从而影响整体性能。 - 不擅长DAG(有向无环图)计算
多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,MapReduce并不是不能做,而是使用后,每个MapReduce作业的输出结果都会写入到磁盘,会造成大量的磁盘IO,导致性能非常的低下。
1.3 MapReduce核心思想
MapReduce的核心思想是将一个大的计算任务分解为多个可以并行执行的小任务,并通过将数据并行处理来实现高效的大规模数据处理。
具体而言,MapReduce模型包含两个主要的阶段:Map阶段和Reduce阶段。
在Map阶段,原始数据集会被划分成多个拆分的数据块,每个数据块会被分配给一个Map任务进行处理。Map任务接收输入数据,并将其转化为一系列中间键值对(key-value pairs)。
在Reduce阶段,Map任务输出的中间键值对会按照键进行排序和分区,并将相同键的键值对发送到同一个Reduce任务。Reduce任务负责对属于自己的中间键值对进行聚合并生成最终结果。
通过这种方式,MapReduce利用数据的并行处理能力,实现了高效的大规模数据处理和分析。每个小任务(Map任务)可以独立地处理自己分配到的数据块,而Reduce任务可以并行地对各个Map任务的输出进行聚合,从而加速整个计算过程。
注意:
1)第一个阶段的MapTask并发实例,完全并行运行,互不相干。
2)第二个阶段的ReduceTask并发实例互不相干,但是他们的数据依赖于上一个阶段的所有MapTask并发实例的输出。
3)MapReduce编程模型只能包含一个Map阶段和一个Reduce阶段,如果用户的业务逻辑非常复杂,那就只能多个MapReduce程序,串行运行。
1.4 MapReduce进程
一个完整的MapReduce程序在分布式运行时有三类实例进程:
(1)MrAppMaster:负责整个程序的过程调度及状态协调。
(2)MapTask:负责Map阶段的整个数据处理流程。
(3)ReduceTask:负责Reduce阶段的整个数据处理流程。
1.5 常用数据序列化类型
Java类型 | Hadoop Writable类型 |
---|---|
Boolean | BooleanWritable |
Byte | ByteWritable |
Int | IntWritable |
Float | FloatWritable |
Long | LongWritable |
Double | DoubleWritable |
String | Text |
Map | MapWritable |
Array | ArrayWritable |
Null | NullWritable |
1.6 MapReduce编程规范
用户编写的程序分成三个部分:Mapper、Reducer和Driver。
1.6.1Mapper阶段
WordCount官方代码截图:
- 用户自定义的Mapper要继承自己的父类Mapper
- Mapper的输入输出是KV键值对的形式(KV的类型可自定义)
- Mapper中的业务逻辑是写在map()方法中
- Mapper的输出数据是KV键值对的形式(KV的类型可自定义)
- map()方法对每一个<K,V>调用一次
1.6.2 Reduce阶段
WordCount官方代码截图:
- 用户自定义的Reducer要继承自己的父类
- Reducer的输入数据类型对应Mapper的输出数据类型
- Reducer的业务逻辑写在reduce()方法中
- ReduceTask进程对每一组相K的<K,V>组调用一次reduce()方法
1.6.3 Driver阶段
WordCount官方代码截图:
相当于Yarn集群客户端,用于提交我们整个程序到Yarn集群,提交的是封装了MapReduce程序相关运行参数的job对象。
1.7 WordCount案例实操
1.7.1 本地测试
(1)创建maven工程,并在pom.xml文件中添加如下依赖
<dependencies><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.2.4</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.30</version></dependency>
</dependencies>
(2)在项目的src/main/resources目录下,新建一个文件,命名为“log4j.properties”,在文件中填入。
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
(3)编写Mapper类
package com.amxl.mapreduce.wordcount;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{Text k = new Text();IntWritable v = new IntWritable(1);@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// 1 获取一行String line = value.toString();// 2 切割String[] words = line.split(" ");// 3 输出for (String word : words) {k.set(word);context.write(k, v);}}
}
(2)编写Reducer类
package com.amxl.mapreduce.wordcount;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{int sum;
IntWritable v = new IntWritable();@Overrideprotected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {// 1 累加求和sum = 0;for (IntWritable count : values) {sum += count.get();}// 2 输出v.set(sum);context.write(key,v);}
}
(3)编写Driver驱动类
package com.amxl.mapreduce.wordcount;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class WordCountDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {// 1 获取配置信息以及获取job对象Configuration conf = new Configuration();Job job = Job.getInstance(conf);// 2 关联本Driver程序的jarjob.setJarByClass(WordCountDriver.class);// 3 关联Mapper和Reducer的jarjob.setMapperClass(WordCountMapper.class);job.setReducerClass(WordCountReducer.class);// 4 设置Mapper输出的kv类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);// 5 设置最终输出kv类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);// 6 设置输入和输出路径FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));// 7 提交jobboolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}
}
(4)本地测试
- 需要首先配置好HADOOP_HOME变量以及Windows运行依赖
- 在IDEA/Eclipse上运行程序
1.7.2 提交到集群测试
集群上测试
(1)用maven打jar包,需要添加的打包插件依赖
<build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>3.6.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins>
</build>
(2)将程序打成jar包
(3)修改不带依赖的jar包名称为wc.jar,并拷贝该jar包到Hadoop集群的/opt/module/hadoop-3.2.4路径。
(4)启动Hadoop集群
[amo@hadoop102 hadoop-3.2.4]sbin/start-dfs.sh
[amo@hadoop103 hadoop-3.2.4]$ sbin/start-yarn.sh
(5)执行WordCount程序
[amo@hadoop102 hadoop-3.2.4]$ hadoop jar wc.jarcom.amxl.mapreduce.wordcount.WordCountDriver /user/amo/input /user/amo/output
二、 Hadoop序列化
2.1 序列化概述
1)什么是序列化
序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁盘(持久化)和网络传输。
反序列化就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象。
2)为什么要序列化
一般来说,“活的”对象只生存在内存里,关机断电就没有了。而且“活的”对象只能由本地的进程使用,不能被发送到网络上的另外一台计算机。 然而序列化可以存储“活的”对象,可以将“活的”对象发送到远程计算机。
3)为什么不用Java的序列化
Java的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,Header,继承体系等),不便于在网络中高效传输。所以,Hadoop自己开发了一套序列化机制(Writable)。
4)Hadoop序列化特点:
- 紧凑 :高效使用存储空间。
- 快速:读写数据的额外开销小。
- 互操作:支持多语言的交互
2.2 自定义bean对象实现序列化接口(Writable)
在企业开发中往往常用的基本序列化类型不能满足所有需求,比如在Hadoop框架内部传递一个bean对象,那么该对象就需要实现序列化接口。
具体实现bean对象序列化步骤如下7步。
(1)必须实现Writable接口
(2)反序列化时,需要反射调用空参构造函数,所以必须有空参构造
public FlowBean() {super();
}
(3)重写序列化方法
@Override
public void write(DataOutput out) throws IOException {out.writeLong(upFlow);out.writeLong(downFlow);out.writeLong(sumFlow);
}
(4)重写反序列化方法
@Override
public void readFields(DataInput in) throws IOException {upFlow = in.readLong();downFlow = in.readLong();sumFlow = in.readLong();
}
(5)注意反序列化的顺序和序列化的顺序完全一致
(6)要想把结果显示在文件中,需要重写toString(),可用"\t"分开,方便后续用。
(7)如果需要将自定义的bean放在key中传输,则还需要实现Comparable接口,因为MapReduce框中的Shuffle过程要求对key必须能排序。详见后面排序案例。
@Override
public int compareTo(FlowBean o) {// 倒序排列,从大到小return this.sumFlow > o.getSumFlow() ? -1 : 1;
}
2.3 序列化案例实操
1)编写MapReduce程序
(1)编写流量统计的Bean对象
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;//1 继承Writable接口
public class FlowBean implements Writable {private long upFlow; //上行流量private long downFlow; //下行流量private long sumFlow; //总流量//2 提供无参构造public FlowBean() {}//3 提供三个参数的getter和setter方法public long getUpFlow() {return upFlow;}public void setUpFlow(long upFlow) {this.upFlow = upFlow;}public long getDownFlow() {return downFlow;}public void setDownFlow(long downFlow) {this.downFlow = downFlow;}public long getSumFlow() {return sumFlow;}public void setSumFlow(long sumFlow) {this.sumFlow = sumFlow;}public void setSumFlow() {this.sumFlow = this.upFlow + this.downFlow;}//4 实现序列化和反序列化方法,注意顺序一定要保持一致@Overridepublic void write(DataOutput dataOutput) throws IOException {dataOutput.writeLong(upFlow);dataOutput.writeLong(downFlow);dataOutput.writeLong(sumFlow);}@Overridepublic void readFields(DataInput dataInput) throws IOException {this.upFlow = dataInput.readLong();this.downFlow = dataInput.readLong();this.sumFlow = dataInput.readLong();}//5 重写ToString@Overridepublic String toString() {return upFlow + "\t" + downFlow + "\t" + sumFlow;}
}
(2)编写Mapper类
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> {private Text outK = new Text();private FlowBean outV = new FlowBean();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//1 获取一行数据,转成字符串String line = value.toString();//2 切割数据String[] split = line.split("\t");//3 抓取我们需要的数据:手机号,上行流量,下行流量String phone = split[1];String up = split[split.length - 3];String down = split[split.length - 2];//4 封装outK outVoutK.set(phone);outV.setUpFlow(Long.parseLong(up));outV.setDownFlow(Long.parseLong(down));outV.setSumFlow();//5 写出outK outVcontext.write(outK, outV);}
}
(3)编写Reducer类
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean> {private FlowBean outV = new FlowBean();@Overrideprotected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {long totalUp = 0;long totalDown = 0;//1 遍历values,将其中的上行流量,下行流量分别累加for (FlowBean flowBean : values) {totalUp += flowBean.getUpFlow();totalDown += flowBean.getDownFlow();}//2 封装outKVoutV.setUpFlow(totalUp);outV.setDownFlow(totalDown);outV.setSumFlow();//3 写出outK outVcontext.write(key,outV);}
}
(4)编写Driver驱动类
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;public class FlowDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {//1 获取job对象Configuration conf = new Configuration();Job job = Job.getInstance(conf);//2 关联本Driver类job.setJarByClass(FlowDriver.class);//3 关联Mapper和Reducerjob.setMapperClass(FlowMapper.class);job.setReducerClass(FlowReducer.class);//4 设置Map端输出KV类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);//5 设置程序最终输出的KV类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);//6 设置程序的输入输出路径FileInputFormat.setInputPaths(job, new Path("D:\\inputflow"));FileOutputFormat.setOutputPath(job, new Path("D:\\flowoutput"));//7 提交Jobboolean b = job.waitForCompletion(true);System.exit(b ? 0 : 1);}
}