本地聚集
在很高的级别上,当Mappers发出数据时,中间结果将写入磁盘,然后通过网络发送到Reducers以进行最终处理。 在处理MapReduce作业中,写入磁盘然后通过网络传输数据的延迟是一项昂贵的操作。 因此,有理由认为,只要有可能,减少从映射器发送的数据量将提高MapReduce作业的速度。 本地聚合是一种用于减少数据量并提高MapReduce工作效率的技术。 本地聚合不能代替reducers,因为我们需要一种方法来使用来自不同映射器的相同键来收集结果。 我们将考虑实现本地聚合的3种方法:
- 使用Hadoop Combiner函数。
- 文本处理和MapReduce书中介绍了两种“映射器”组合方法。
当然,任何优化都需要权衡取舍,我们也会对此进行讨论。
为了演示本地聚合,我们将使用hadoop-0.20.2-cdh3u3在MacBookPro上安装的伪分布式群集上,在Charles Dickens的纯文本版本的A Christmas Carol (从Project Gutenberg下载)上运行无处不在的字数统计作业。从Cloudera发行。 我计划在以后的文章中在具有更实际大小的数据的EC2集群上运行相同的实验。
合路器
组合器函数是扩展Reducer类的对象。 实际上,对于此处的示例,我们将重复使用单词计数作业中使用的相同的reduce。 设置MapReduce作业时会指定组合器功能,如下所示:
job.setReducerClass(TokenCountReducer.class);
这是化简器代码:
public class TokenCountReducer extends Reducer<Text,IntWritable,Text,IntWritable>{@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int count = 0;for (IntWritable value : values) {count+= value.get();}context.write(key,new IntWritable(count));}
}
组合器的工作就是按照名称中的含义进行操作,聚合数据的最终结果是网络上的数据减少,从而使效率得到提高。 如前所述,请记住,仍然需要简化器将结果与来自不同映射器的相同键组合在一起。 由于组合器功能是一种优化,因此Hadoop框架无法保证组合器将被调用多少次(如果有的话)。
在Mapper组合选项1中
使用Combiners的第一种选择(图3.2,第41页)非常简单,对我们原始的字数映射器进行了一些修改:
public class PerDocumentMapper extends Mapper<LongWritable, Text, Text, IntWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {IntWritable writableCount = new IntWritable();Text text = new Text();Map<String,Integer> tokenMap = new HashMap<String, Integer>();StringTokenizer tokenizer = new StringTokenizer(value.toString());while(tokenizer.hasMoreElements()){String token = tokenizer.nextToken();Integer count = tokenMap.get(token);if(count == null) count = new Integer(0);count+=1;tokenMap.put(token,count);}Set<String> keys = tokenMap.keySet();for (String s : keys) {text.set(s);writableCount.set(tokenMap.get(s));context.write(text,writableCount);}}
}
正如我们在这里看到的,对于遇到的每个单词,我们不会发出计数为1的单词,而是使用映射来跟踪已处理的每个单词。 然后,在处理完所有标记后,我们遍历该映射并发出该行中遇到的每个单词的总数。
在Mapper组合选项2中
映射器合并的第二个选项(图3.3,第41页)与上述示例非常相似,但有两个区别-创建哈希映射时和发出映射中包含的结果时。 在上面的示例中,创建了一个映射,并在每次调用map方法时将其内容通过电线转储。 在此示例中,我们将使地图成为实例变量,并将地图的实例化移动到我们的映射器中的setUp方法。 同样,在完成对mapper的所有调用并调用cleanUp方法之前,不会将映射的内容发送到reducers。
public class AllDocumentMapper extends Mapper<LongWritable,Text,Text,IntWritable> {private Map<String,Integer> tokenMap;@Overrideprotected void setup(Context context) throws IOException, InterruptedException {tokenMap = new HashMap<String, Integer>();}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {StringTokenizer tokenizer = new StringTokenizer(value.toString());while(tokenizer.hasMoreElements()){String token = tokenizer.nextToken();Integer count = tokenMap.get(token);if(count == null) count = new Integer(0);count+=1;tokenMap.put(token,count);}}@Overrideprotected void cleanup(Context context) throws IOException, InterruptedException {IntWritable writableCount = new IntWritable();Text text = new Text();Set<String> keys = tokenMap.keySet();for (String s : keys) {text.set(s);writableCount.set(tokenMap.get(s));context.write(text,writableCount);}}
}
从上面的代码示例中可以看到,在对map方法的所有调用中,映射器都跟踪唯一字数。 通过跟踪唯一令牌及其计数,应该大大减少发送给reducer的记录数量,这反过来又可以改善MapReduce作业的运行时间。 这样可以达到与使用MapReduce框架提供的Combiner Function选项相同的效果,但是在这种情况下,可以确保将调用组合代码。 但是这种方法也有一些警告。 在地图调用之间保持状态可能会出现问题,并且绝对违反“地图”功能的功能精神。 同样,通过保持所有映射器的状态,取决于作业中使用的数据,内存可能是另一个要解决的问题。 最终,必须权衡所有权衡以确定最佳方法。
结果
现在,让我们看一下不同映射器的一些结果。 由于作业是在伪分布式模式下运行的,因此实际的运行时间是无关紧要的,但是我们仍然可以推断出使用本地聚合会如何影响在实际集群上运行的MapReduce作业的效率。
每个令牌映射器:
12/09/13 21:25:32 INFO mapred.JobClient: Reduce shuffle bytes=366010
12/09/13 21:25:32 INFO mapred.JobClient: Reduce output records=7657
12/09/13 21:25:32 INFO mapred.JobClient: Spilled Records=63118
12/09/13 21:25:32 INFO mapred.JobClient: Map output bytes=302886
在Mapper精简选项1中:
12/09/13 21:28:15 INFO mapred.JobClient: Reduce shuffle bytes=354112
12/09/13 21:28:15 INFO mapred.JobClient: Reduce output records=7657
12/09/13 21:28:15 INFO mapred.JobClient: Spilled Records=60704
12/09/13 21:28:15 INFO mapred.JobClient: Map output bytes=293402
在Mapper精简选项2中:
12/09/13 21:30:49 INFO mapred.JobClient: Reduce shuffle bytes=105885
12/09/13 21:30:49 INFO mapred.JobClient: Reduce output records=7657
12/09/13 21:30:49 INFO mapred.JobClient: Spilled Records=15314
12/09/13 21:30:49 INFO mapred.JobClient: Map output bytes=90565
组合器选项:
12/09/13 21:22:18 INFO mapred.JobClient: Reduce shuffle bytes=105885
12/09/13 21:22:18 INFO mapred.JobClient: Reduce output records=7657
12/09/13 21:22:18 INFO mapred.JobClient: Spilled Records=15314
12/09/13 21:22:18 INFO mapred.JobClient: Map output bytes=302886
12/09/13 21:22:18 INFO mapred.JobClient: Combine input records=31559
12/09/13 21:22:18 INFO mapred.JobClient: Combine output records=7657
不出所料,没有合并的Mapper的结果最差,紧随其后的是第一个映射器内的合并选项(尽管如果在运行字数统计之前将数据清理干净,这些结果本来可以更好)。 第二个映射器内合并选项和合并器功能实际上具有相同的结果。 重要的事实是,作为前两个选项,两者产生的结果都减少了2/3,减少了混洗字节。 将通过网络发送到缩减程序的字节数量减少该数量一定会对MapReduce作业的效率产生积极影响。 这里要牢记一点,那就是合并器/映射器合并不能仅在所有MapReduce作业中使用,在这种情况下,字数计数非常适合于这种增强,但这可能并不总是正确的。
结论
正如您所看到的,在寻求提高MapReduce作业的性能时,需要认真考虑使用映射器内合并或Hadoop合并器功能的好处。 至于哪种方法,则要权衡每种方法的权衡。
相关链接
- Jimmy Lin和Chris Dyer 使用MapReduce进行的数据密集型处理
- Hadoop: Tom White 的权威指南
- 来自博客的源代码
- MRUnit用于单元测试Apache Hadoop映射减少工作
- Gutenberg项目提供了大量纯文本格式的书籍,非常适合在本地测试Hadoop作业。
祝您编程愉快,别忘了分享!
参考: 《 随机编码》博客上的JCG合作伙伴 Bill Bejeck 提供的MapReduce数据密集型文本处理功能 。
翻译自: https://www.javacodegeeks.com/2012/09/mapreduce-working-through-data.html