我们将继续进行有关实现MapReduce算法的系列文章,该系列可在使用MapReduce进行数据密集型文本处理中找到。 本系列的其他文章:
- 使用MapReduce进行数据密集型文本处理
- 使用MapReduce进行数据密集型文本处理-本地聚合第二部分
- 使用Hadoop计算共现矩阵
- MapReduce算法–顺序反转
这篇文章介绍了在使用MapReduce进行数据密集型文本处理的第3章中找到的二级排序模式。 虽然Hadoop在将映射器发出的数据自动排序后再发送给reducer,但是如果您还想按值排序怎么办? 您当然会使用二级排序。 通过稍加操作键对象的格式,二级排序使我们能够在排序阶段将值考虑在内。 这里有两种可能的方法。
第一种方法涉及让减速器缓冲给定键的所有值,并对这些值进行归约器排序。 由于减速器将接收给定键的所有值,因此此方法可能会导致减速器内存不足。
第二种方法涉及通过向自然键添加部分或整个值来创建组合键,以实现您的排序目标。 这两种方法之间的权衡是对reducer中的值进行显式排序,这很可能会更快(存在内存不足的风险),但实现“值到键”转换方法会减轻MapReduce框架的排序工作,这是Hadoop / MapReduce设计要做的核心。 出于本文的目的,我们将考虑“关键价值”方法。 我们将需要编写一个自定义分区程序,以确保所有具有相同键(自然键不包含带有值的复合键)的数据都发送到相同的reducer和自定义比较器,以便一旦数据被自然键分组到达减速机。
值到密钥转换
直接创建复合键。 我们需要做的是分析在排序过程中要考虑值的哪一部分,并将适当的部分添加到自然键中。 然后,我们需要在键类或比较器类中使用compareTo方法,以确保对组合键进行了说明。 我们将重新访问天气数据集,并将温度作为自然键的一部分(自然键是年和月连接在一起)的一部分。 结果将是给定月份和年份中最冷的一天的列表。 该示例的灵感来自Hadoop,《权威指南》一书中的二级排序示例。 尽管可能有更好的方法可以实现此目标,但它足以说明次级排序的工作原理。
映射器代码
我们的映射器代码已经将年和月连接在一起,但是我们还将把温度作为键的一部分。 由于我们将值包含在键本身中,因此映射器将发出NullWritable,在其他情况下,我们将发出温度。
public class SecondarySortingTemperatureMapper extends Mapper<LongWritable, Text, TemperaturePair, NullWritable> {private TemperaturePair temperaturePair = new TemperaturePair();private NullWritable nullValue = NullWritable.get();private static final int MISSING = 9999;
@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();String yearMonth = line.substring(15, 21);int tempStartPosition = 87;if (line.charAt(tempStartPosition) == '+') {tempStartPosition += 1;}int temp = Integer.parseInt(line.substring(tempStartPosition, 92));if (temp != MISSING) {temperaturePair.setYearMonth(yearMonth);temperaturePair.setTemperature(temp);context.write(temperaturePair, nullValue);}}
}
现在,我们已将温度添加到密钥中,我们为启用辅助排序做好了准备。 剩下要做的就是在必要时编写考虑温度的代码。 在这里,我们有两个选择,编写一个Comparator或在TemperaturePair类上调整compareTo方法(TemperaturePair实现WritableComparable)。 在大多数情况下,我建议您编写一个单独的Comparator,但是TemperaturePair类是专门为演示二次排序而编写的,因此我们将修改TemperaturePair类的compareTo方法。
@Overridepublic int compareTo(TemperaturePair temperaturePair) {int compareValue = this.yearMonth.compareTo(temperaturePair.getYearMonth());if (compareValue == 0) {compareValue = temperature.compareTo(temperaturePair.getTemperature());}return compareValue;}
如果我们想按降序排序,我们可以简单地将温度比较的结果乘以-1。
现在,我们已经完成了排序所必需的部分,我们需要编写一个自定义分区程序。
合伙人代码
为了确保在确定将哪个缩减程序发送数据时仅考虑自然键,我们需要编写一个自定义分区程序。 该代码简单明了,在计算将数据发送到的减速器时,仅考虑TemperaturePair类的yearMonth值。
public class TemperaturePartitioner extends Partitioner<TemperaturePair, NullWritable>{@Overridepublic int getPartition(TemperaturePair temperaturePair, NullWritable nullWritable, int numPartitions) {return temperaturePair.getYearMonth().hashCode() % numPartitions;}
}
尽管自定义分区程序保证了年和月的所有数据都到达同一精简程序,但我们仍然需要考虑精简程序将按键对记录进行分组的事实。
分组比较器
数据到达精简器后,所有数据均按键分组。 由于我们有一个复合键,因此我们需要确保记录仅按自然键分组。 这是通过编写自定义GroupPartitioner完成的。 为了将记录分组在一起,我们只考虑了TemperaturePair类的yearMonth字段的Comparator对象。
public class YearMonthGroupingComparator extends WritableComparator {public YearMonthGroupingComparator() {super(TemperaturePair.class, true);}@Overridepublic int compare(WritableComparable tp1, WritableComparable tp2) {TemperaturePair temperaturePair = (TemperaturePair) tp1;TemperaturePair temperaturePair2 = (TemperaturePair) tp2;return temperaturePair.getYearMonth().compareTo(temperaturePair2.getYearMonth());}
}
结果
这是运行我们的二级排序作业的结果:
new-host-2:sbin bbejeck$ hdfs dfs -cat secondary-sort/part-r-00000
190101 -206
190102 -333
190103 -272
190104 -61
190105 -33
190106 44
190107 72
190108 44
190109 17
190110 -33
190111 -217
190112 -300
结论
虽然按值对数据进行排序可能不是普遍的需求,但是在需要时,这是个不错的工具。 此外,通过使用自定义分区程序和组分区程序,我们能够更深入地了解Hadoop的内部工作原理。 感谢您的时间。
资源资源
- Jimmy Lin和Chris Dyer 使用MapReduce进行的数据密集型处理
- Hadoop: Tom White 的权威指南
- 来自博客的源代码和测试
- Hadoop API
- MRUnit用于单元测试Apache Hadoop映射减少工作
参考: MapReduce算法–来自我们的JCG合作伙伴 Bill Bejeck的“ 二级排序”,来自“ 随机编码思考”博客。
翻译自: https://www.javacodegeeks.com/2013/01/mapreduce-algorithms-secondary-sorting.html