实验目的:
1.掌握MapReduce的基本编程流程;
2.掌握MapReduce序列化的使用;
实验内容:
一、在本地创建名为MapReduceTest的Maven工程,在pom.xml中引入相关依赖包,配置log4j.properties文件,搭建windwos开发环境。 编程实现以下内容:
(1)创建com.nefu.(xingming).maxcount包,编写wordcountMapper、Reducer、Driver三个类,实现统计每个学号的最高消费。
输入数据data.txt格式如下:
序号 \t 学号 \t 日期 \t 消费总额
输出数据格式要求如下:
学号 \t 最高消费
ZnMapper.java
package com.nefu.zhangna.maxcount;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class ZnMapper extends Mapper<LongWritable, Text,Text, IntWritable> {private Text outk=new Text();private IntWritable outv=new IntWritable();@Overridepublic void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line=value.toString();String[] content=line.split("\t");String schoolnumber=content[1];String totalFee=content[3];outk.set(schoolnumber);outv.set(Integer.parseInt(totalFee));context.write(outk,outv);}
}
ZnReducer.java
package com.nefu.zhangna.maxcount;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class ZnReducer extends Reducer<Text,IntWritable,Text, IntWritable> {private IntWritable outv=new IntWritable();@Overrideprotected void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {int total=0;for (IntWritable value:values){if(value.get()>total)total=value.get();}outv.set(total);context.write(key,outv);}
}
ZnDriver.java
package com.nefu.zhangna.maxcount;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;public class ZnDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {Configuration configuration=new Configuration();Job job=Job.getInstance(configuration);//FileSystem fs=FileSystem.get(new URI("hdfs://hadoop101:8020"),configuration,"hadoop");//fs.copyFromLocalFile(new Path("D://mapreducetest//data.txt"),new Path("/zn/data.txt"));job.setJarByClass(ZnDriver.class);job.setMapperClass(ZnMapper.class);job.setReducerClass(ZnReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);//job.setOutputKeyClass(Text.class);//job.setOutputValueClass(StudentBean.class);// job.setInputFormatClass(CombineTextInputFormat.class); //否则默认是TextInputFormat.class//CombineTextInputFormat.setMaxInputSplitSize(job,4194304); //设4MFileInputFormat.setInputPaths(job,new Path("D:\\mapreducetest\\data.txt"));FileOutputFormat.setOutputPath(job,new Path("D:\\cluster\\shiyan3-1"));boolean result=job.waitForCompletion(true);System.exit(result?0:1);}
}
(2)测试上述程序,查看运行结果
原数据
mapreduce之后
(3)查看日志,共有几个切片,几个MapTask(截图)
Number of split表示有一个切片,Starting task: attempt_local649325949_0001_m_000000_0表示有一个Map Tast任务
(4)添加文件data1.txt,重新运行程序,共有几个切片,几个MapTask(截图)
可见我输入了两个文件,切片的数目为2,也就有两个Map Text任务
(5)使用CombinTextInputFormat,让data.txt,data1.txt两个文件在一个切片中
在驱动类中CombinTextInputFormat,可见只有一个切片
(6)将data.txt上传至HDFS
(7)使用maven将程序打成jar包并上传至hadoop集群运行,观察是否能正确运行。
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
将程序打成jar包
二、创建com.nefu.(xingming).serialize包,编写ScoreBean、Mapper、Reducer、Driver三个类,实现统计每个学号的平均成绩。并将结果按照年级分别写到三个文件中。
输入数据mydata.txt文件格式:
学号 \t 姓名 \t 成绩
输出数据格式(共3个文件):
学号 \t 姓名 \t 平均成绩