这里相较于 wordcount,新的知识点在于学生实体类的编写以及使用
数据信息:
1. Student 实体类
import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;public class Student implements WritableComparable<Student> {
// Objectprivate long stuid;private String stuName;private int score;public Student(long stuid, String stuName, int score) {this.stuid = stuid;this.stuName = stuName;this.score = score;}@Overridepublic String toString() {return "Student{" +"stuid=" + stuid +", stuName='" + stuName + '\'' +", score=" + score +'}';}public Student() {}public long getStuid() {return stuid;}public void setStuid(long stuid) {this.stuid = stuid;}public String getStuName() {return stuName;}public void setStuName(String stuName) {this.stuName = stuName;}public int getScore() {return score;}public void setScore(int score) {this.score = score;}// 自动整理文件格式 ctrl + shift + f 英文输放状态@Overridepublic int compareTo(Student o) {return this.score > o.score ? 1 : 0;}@Overridepublic void write(DataOutput dataOutput) throws IOException {dataOutput.writeLong(stuid);dataOutput.writeUTF(stuName);dataOutput.writeInt(score);}@Overridepublic void readFields(DataInput dataInput) throws IOException {this.stuid = dataInput.readLong();this.stuName = dataInput.readUTF();this.score = dataInput.readInt();}
}
2. mapper 阶段,StudentMapper 类
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*** 输出 key:学生id value:Student对象*/
public class StudentMapper extends Mapper<LongWritable, Text,LongWritable,Student> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] split = value.toString().split(",");LongWritable stuidKey = new LongWritable(Long.parseLong(split[0]));Student stuValue = new Student(Long.parseLong(split[0]),split[1],Integer.parseInt(split[2]));context.write(stuidKey,stuValue);}
}
3. reduce 阶段,StudentReduce 类
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class StudentReducer extends Reducer<LongWritable,Student,Student, NullWritable> {@Overrideprotected void reduce(LongWritable key, Iterable<Student> values, Context context) throws IOException,InterruptedException {Student stuOut = new Student();int sumScore = 0;String stuName = "";for (Student stu :values) {sumScore+=stu.getScore();stuName = stu.getStuName();}stuOut.setScore(sumScore);stuOut.setStuid(key.get());stuOut.setStuName(stuName);System.out.println(stuOut.toString());context.write(stuOut, NullWritable.get());}
}
4. 驱动类,studentDriver 类
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class StudentDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(StudentDriver.class);//配置 job中map阶段处理类和map阶段的输出类型job.setMapperClass(StudentMapper.class);job.setMapOutputKeyClass(LongWritable.class);job.setMapOutputValueClass(Student.class);//配置 job中deduce阶段处理类和reduce阶段的输出类型job.setReducerClass(StudentReducer.class);job.setOutputKeyClass(Student.class);job.setOutputValueClass(NullWritable.class);// 输入路径配置 "hdfs://kb131:9000/kb23/hadoopstu/stuscore.csv"Path inpath = new Path(args[0]); // 外界获取文件输入路径FileInputFormat.setInputPaths(job, inpath);// 输出路径配置 "hdfs://kb131:9000/kb23/hadoopstu/out2"Path path = new Path(args[1]); //FileSystem fs = FileSystem.get(path.toUri(), conf);if (fs.exists(path))fs.delete(path,true);FileOutputFormat.setOutputPath(job,path);job.waitForCompletion(true);}
}