一、排序分组概述
MapReduce中排序和分组在哪里被执行
第3步中需要对不同分区中的数据进行排序和分组,默认情况按照key进行排序和分组
二、排序
在Hadoop默认的排序算法中,只会针对key值进行排序
任务:
数据文件中,如果按照第一列升序排列,
当第一列相同时,第二列升序排列
如果当第一列相同时,求出第二列的最小值
自定义排序
1.封装一个自定义类型作为key的新类型:将第一列与第二列都作为key
WritableComparable接口
定义:
public interface WritableComparable<T> extends Writable, Comparable<T> {
}
自定义类型MyNewKey实现了WritableComparable的接口,该接口中有一个compareTo()方法,当对key进行比较时会调用该方法,而我们将其改为了我们自己定义的比较规则,从而实现我们想要的效果
private static class MyNewKey implements WritableComparable<MyNewKey> {long firstNum;long secondNum;public MyNewKey() {}public MyNewKey(long first, long second) {firstNum = first;secondNum = second;}@Overridepublic void write(DataOutput out) throws IOException {out.writeLong(firstNum);out.writeLong(secondNum);}@Overridepublic void readFields(DataInput in) throws IOException {firstNum = in.readLong();secondNum = in.readLong();}/** 当key进行排序时会调用以下这个compreTo方法*/@Overridepublic int compareTo(MyNewKey anotherKey) {long min = firstNum - anotherKey.firstNum;if (min != 0) {// 说明第一列不相等,则返回两数之间小的数return (int) min;} else {return (int) (secondNum - anotherKey.secondNum);}}}
2.改写最初的MapReduce方法函数
public static class MyMapper extendsMapper<LongWritable, Text, MyNewKey, LongWritable> {protected void map(LongWritable key,Text value,Mapper<LongWritable, Text, MyNewKey, LongWritable>.Context context)throws java.io.IOException, InterruptedException {String[] spilted = value.toString().split("\t");long firstNum = Long.parseLong(spilted[0]);long secondNum = Long.parseLong(spilted[1]);// 使用新的类型作为key参与排序MyNewKey newKey = new MyNewKey(firstNum, secondNum);context.write(newKey, new LongWritable(secondNum));};}
public static class MyReducer extendsReducer<MyNewKey, LongWritable, LongWritable, LongWritable> {protected void reduce(MyNewKey key,java.lang.Iterable<LongWritable> values,Reducer<MyNewKey, LongWritable, LongWritable, LongWritable>.Context context)throws java.io.IOException, InterruptedException {context.write(new LongWritable(key.firstNum), new LongWritable(key.secondNum));};}
三、分组
在Hadoop中的默认分组规则中,也是基于Key进行的,会将相同key的value放到一个集合中去
目的:求出第一列相同时第二列的最小值
上面的例子看分组,因为我们自定义了一个新的key,它是以两列数据作为key的,因此这6行数据中每个key都不相同产生6组
它们是:1 1,2 1,2 2,3 1,3 2,3 3。
而实际上只可以分为3组,分别是1,2,3。现在首先改写一下reduce函数代码
public static class MyReducer extendsReducer<MyNewKey, LongWritable, LongWritable, LongWritable> {protected void reduce(MyNewKey key,java.lang.Iterable<LongWritable> values,Reducer<MyNewKey, LongWritable, LongWritable, LongWritable>.Context context)throws java.io.IOException, InterruptedException {long min = Long.MAX_VALUE;for (LongWritable number : values) {long temp = number.get();if (temp < min) {min = temp;}}context.write(new LongWritable(key.firstNum), new LongWritable(min));};}
自定义分组
为了针对新的key类型作分组,我们也需要自定义一下分组规则:
private static class MyGroupingComparator implementsRawComparator<MyNewKey> {/** 基本分组规则:按第一列firstNum进行分组*/@Overridepublic int compare(MyNewKey key1, MyNewKey key2) {return (int) (key1.firstNum - key2.firstNum);}/** @param b1 表示第一个参与比较的字节数组* * @param s1 表示第一个参与比较的字节数组的起始位置* * @param l1 表示第一个参与比较的字节数组的偏移量* * @param b2 表示第二个参与比较的字节数组* * @param s2 表示第二个参与比较的字节数组的起始位置* * @param l2 表示第二个参与比较的字节数组的偏移量*/@Overridepublic int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {return WritableComparator.compareBytes(b1, s1, 8, b2, s2, 8);}}
自定义了一个分组比较器MyGroupingComparator,该类实现了RawComparator接口,而RawComparator接口又实现了Comparator接口,这两个接口的定义:
public interface RawComparator<T> extends Comparator<T> {public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);
}
public interface Comparator<T> {int compare(T o1, T o2);boolean equals(Object obj);
}
分组实现步骤:
1.MyGroupingComparator实现这两个接口
RawComparator中的compare()方法是基于字节的比较,
Comparator中的compare()方法是基于对象的比较
由于在MyNewKey中有两个long类型,每个long类型又占8个字节。这里因为比较的是第一列数字,所以读取的偏移量为8字节。
2.添加对分组规则的设置:
// 设置自定义分组规则
job.setGroupingComparatorClass(MyGroupingComparator.class);
3. 运行结果: