mapreduce 算法
自从我上一次发布以来已经有一段时间了,就像我上一次大休息一样,我正在Coursera上一些课程。 这次是Scala中的函数式编程 原理和React式编程原理 。 我发现它们都是不错的课程,如果有时间的话,建议您选一门。 在这篇文章中,我们将继续介绍如何使用MapReduce实现数据密集型文本处理中的算法的系列,这次涵盖了地图端连接。 从名称可以猜出,映射侧联接只在映射阶段连接数据,而完全跳过简化阶段。 在上一篇有关数据联接的文章中,我们介绍了减少侧联接 。 减少端连接很容易实现,但缺点是所有数据都通过网络发送到减少器。 由于我们避免了跨网络发送数据的开销,因此地图端连接可显着提高性能。 但是,与减少侧联接不同,映射侧联接需要满足非常特定的条件。 今天,我们将讨论地图端连接的要求以及如何实现它们。
地图端加入条件
要利用地图侧联接,我们的数据必须满足以下条件之一:
- 要加入的数据集已经按相同的键排序,并且具有相同的分区数
- 在要连接的两个数据集中,一个足够小以适合内存
我们将考虑第一种情况,其中有两个(或更多)数据集需要连接,但是太大而无法容纳到内存中。 我们将假设最坏的情况是,文件没有按相同的顺序排序或分区。
资料格式
在开始之前,让我们看一下正在使用的数据。 我们将有两个数据集:
- 第一个数据集由GUID,名字,姓氏,地址,城市和州组成
- 第二个数据集包含GUID和雇主信息
两个数据集均以逗号分隔,并且联接键(GUID)位于第一位置。 加入后,我们希望将数据集2中的雇主信息附加到数据集1的末尾。 此外,我们希望将GUID保持在数据集1的第一个位置,但要从数据集2删除GUID。
数据集1:
aef9422c-d08c-4457-9760-f2d564d673bc,Linda,Narvaez,3253 Davis Street,Atlanta,GA08db7c55-22ae-4199-8826-c67a5689f838,John,Gregory,258 Khale Street,Florence,SCde68186a-1004-4211-a866-736f414eac61,Charles,Arnold,1764 Public Works Drive,Johnson City,TN6df1882d-4c81-4155-9d8b-0c35b2d34284,John,Schofield,65 Summit Park Avenue,Detroit,MI
数据集2:
de68186a-1004-4211-a866-736f414eac61,Jacobs6df1882d-4c81-4155-9d8b-0c35b2d34284,Chief Auto Partsaef9422c-d08c-4457-9760-f2d564d673bc,Earthworks Yard Maintenance08db7c55-22ae-4199-8826-c67a5689f838,Ellman's Catalog Showrooms
合并结果:
08db7c55-22ae-4199-8826-c67a5689f838,John,Gregory,258 Khale Street,Florence,SC,Ellman's Catalog Showrooms
6df1882d-4c81-4155-9d8b-0c35b2d34284,John,Schofield,65 Summit Park Avenue,Detroit,MI,Chief Auto Parts
aef9422c-d08c-4457-9760-f2d564d673bc,Linda,Narvaez,3253 Davis Street,Atlanta,GA,Earthworks Yard Maintenance
de68186a-1004-4211-a866-736f414eac61,Charles,Arnold,1764 Public Works Drive,Johnson City,TN,Jacobs
现在,我们继续介绍如何连接两个数据集。
Map-Side连接具有大数据集
为了能够执行地图端连接,我们需要将数据按相同的键排序并具有相同数量的分区,这意味着任何记录的所有键都在同一分区中。 尽管这似乎是一个艰巨的要求,但很容易解决。 Hadoop对所有键进行排序,并保证将具有相同值的键发送到相同的reducer。 因此,只需运行一个MapReduce作业,该作业只不过要通过您要连接的键输出数据,并为所有数据集指定完全相同数量的化简器,我们将以正确的形式获取数据。 考虑到能够进行地图侧连接所带来的效率提高,可能值得花费额外的MapReduce作业。 在这一点上需要重复,至关重要的是,在“准备”阶段,将对数据进行排序和分区时,所有数据集都必须指定完全相同数量的化简。 在本文中,我们将获取两个数据集,并在两个数据集上运行初始MapReduce作业以进行排序和分区,然后运行最终作业以执行地图端联接。 首先,让我们介绍一下MapReduce作业,以相同的方式对数据进行排序和分区。
第一步:排序和分区
首先,我们需要创建一个Mapper
,该Mapper
将简单地选择要根据给定索引进行排序的键:
public class SortByKeyMapper extends Mapper<LongWritable, Text, Text, Text> {private int keyIndex;private Splitter splitter;private Joiner joiner;private Text joinKey = new Text();@Overrideprotected void setup(Context context) throws IOException, InterruptedException {String separator = context.getConfiguration().get("separator");keyIndex = Integer.parseInt(context.getConfiguration().get("keyIndex"));splitter = Splitter.on(separator);joiner = Joiner.on(separator);}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {Iterable<String> values = splitter.split(value.toString());joinKey.set(Iterables.get(values,keyIndex));if(keyIndex != 0){value.set(reorderValue(values,keyIndex));}context.write(joinKey,value);}private String reorderValue(Iterable<String> value, int index){List<String> temp = Lists.newArrayList(value);String originalFirst = temp.get(0);String newFirst = temp.get(index);temp.set(0,newFirst);temp.set(index,originalFirst);return joiner.join(temp);}
}
SortByKeyMapper
只需通过从在配置参数keyIndex
给定位置找到的给定文本行中提取值来简单地设置joinKey
的值。 同样,如果keyIndex
不等于零,我们交换在第一个位置和keyIndex
位置中找到的值的顺序。 尽管这是一个有问题的功能,但是我们稍后将讨论为什么要这样做。 接下来,我们需要一个Reducer
:
public class SortByKeyReducer extends Reducer<Text,Text,NullWritable,Text> {private static final NullWritable nullKey = NullWritable.get();@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {for (Text value : values) {context.write(nullKey,value);}}
}
SortByKeyReducer
写出给定键的所有值,但是会NullWritable
键并写一个NullWritable
。 在下一节中,我们将解释为什么不使用密钥。
第二步:Map-Side联接
在执行地图侧连接时,记录在到达映射器之前会被合并。 为此,我们使用CompositeInputFormat 。 我们还需要设置一些配置属性。 让我们看一下如何配置地图侧连接:
private static Configuration getMapJoinConfiguration(String separator, String... paths) {Configuration config = new Configuration();config.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", separator);String joinExpression = CompositeInputFormat.compose("inner", KeyValueTextInputFormat.class, paths);config.set("mapred.join.expr", joinExpression);config.set("separator", separator);return config;}
首先,我们通过设置mapreduce.input.keyvaluelinerecordreader.key.value.separator
属性来指定用于分隔键和值的字符。 接下来,我们使用CompositeInputFormat.compose
方法创建一个“联接表达式”,通过使用单词“ inner”指定内部联接 ,然后指定要使用的输入格式, KeyValueTextInput类以及最后一个String varargs,它们表示文件的路径。 join(运行map-reduce作业以对数据进行排序和分区的输出路径)。 KeyValueTextInputFormat
类将使用分隔符将第一个值设置为键,其余的将用作该值。
映射器的加入
连接源文件中的值后,将Mapper.map
方法,该方法将接收键的Text
对象(连接记录中的键相同)和一个TupleWritable
,该TupleWritable
由输入文件中的连接值组成对于给定的密钥。 请记住,我们希望最终输出的第一个位置具有join-key,然后在一个定界的String
中包含所有连接的值。 为此,我们有一个自定义的映射器,将我们的数据以正确的格式放置:
public class CombineValuesMapper extends Mapper<Text, TupleWritable, NullWritable, Text> {private static final NullWritable nullKey = NullWritable.get();private Text outValue = new Text();private StringBuilder valueBuilder = new StringBuilder();private String separator;@Overrideprotected void setup(Context context) throws IOException, InterruptedException {separator = context.getConfiguration().get("separator");}@Overrideprotected void map(Text key, TupleWritable value, Context context) throws IOException, InterruptedException {valueBuilder.append(key).append(separator);for (Writable writable : value) {valueBuilder.append(writable.toString()).append(separator);}valueBuilder.setLength(valueBuilder.length() - 1);outValue.set(valueBuilder.toString());context.write(nullKey, outValue);valueBuilder.setLength(0);}
}
在CombineValuesMapper
我们将键和所有联接的值附加到一个定界的String
。 在这里,我们终于可以看到为什么在以前的MapReduce作业中丢弃了join键的原因。 由于键是要连接的所有数据集的值中的第一个位置,因此我们的映射器自然会从连接的数据集中消除重复的键。 我们需要做的就是将给定的键插入StringBuilder
,然后附加包含在TupleWritable
的值。
放在一起
现在,我们拥有所有代码,可以在大型数据集上运行地图端联接。 让我们看一下我们将如何一起运行所有作业。 如前所述,我们假设我们的数据未按相同的顺序进行排序和分区,因此我们将需要运行N(在本例中为2)MapReduce作业,以获取正确格式的数据。 在运行初始排序/分区作业之后,将执行执行实际联接的最终作业。
public class MapSideJoinDriver {public static void main(String[] args) throws Exception {String separator = ",";String keyIndex = "0";int numReducers = 10;String jobOneInputPath = args[0];String jobTwoInputPath = args[1];String joinJobOutPath = args[2];String jobOneSortedPath = jobOneInputPath + "_sorted";String jobTwoSortedPath = jobTwoInputPath + "_sorted";Job firstSort = Job.getInstance(getConfiguration(keyIndex, separator));configureJob(firstSort, "firstSort", numReducers, jobOneInputPath, jobOneSortedPath, SortByKeyMapper.class, SortByKeyReducer.class);Job secondSort = Job.getInstance(getConfiguration(keyIndex, separator));configureJob(secondSort, "secondSort", numReducers, jobTwoInputPath, jobTwoSortedPath, SortByKeyMapper.class, SortByKeyReducer.class);Job mapJoin = Job.getInstance(getMapJoinConfiguration(separator, jobOneSortedPath, jobTwoSortedPath));configureJob(mapJoin, "mapJoin", 0, jobOneSortedPath + "," + jobTwoSortedPath, joinJobOutPath, CombineValuesMapper.class, Reducer.class);mapJoin.setInputFormatClass(CompositeInputFormat.class);List<Job> jobs = Lists.newArrayList(firstSort, secondSort, mapJoin);int exitStatus = 0;for (Job job : jobs) {boolean jobSuccessful = job.waitForCompletion(true);if (!jobSuccessful) {System.out.println("Error with job " + job.getJobName() + " " + job.getStatus().getFailureInfo());exitStatus = 1;break;}}System.exit(exitStatus);}
MapSideJoinDriver
对运行MapReduce作业进行基本配置。 有趣的一点是,排序/分区作业每个都指定10个化简器,而最后一个作业明确将化简器的数量设置为0,因为我们是在地图端加入的,不需要化简阶段。 由于我们没有任何复杂的依赖关系,因此将作业放入ArrayList并以线性顺序运行作业(第24-33行)。
结果
最初,我们有2个文件; 第一个文件中的姓名和地址信息,第二个文件中的就业信息。 这两个文件在第一列中都有唯一的ID。
文件一:
....
08db7c55-22ae-4199-8826-c67a5689f838,John,Gregory,258 Khale Street,Florence,SC
...
文件二:
....
08db7c55-22ae-4199-8826-c67a5689f838,Ellman's Catalog Showrooms
....
结果:
08db7c55-22ae-4199-8826-c67a5689f838,John,Gregory,258 Khale Street,Florence,SC,Ellman's Catalog Showrooms
如我们在这里看到的,我们已经成功地将记录合并在一起,并保持了文件格式,而结果中没有重复的键。
结论
在本文中,我们演示了当两个数据集都很大且无法容纳到内存中时如何执行地图端连接。 如果您觉得这需要大量工作才能完成,那么您是正确的。 尽管在大多数情况下,我们希望使用诸如Pig或Hive之类的高级工具,但了解对大型数据集执行地图侧联接的机制很有帮助。 当您需要从头开始编写解决方案时,尤其如此。 谢谢你的时间。
资源资源
- Jimmy Lin和Chris Dyer 使用MapReduce进行的数据密集型处理
- Hadoop: Tom White 的权威指南
- 来自博客的源代码和测试
- 编程蜂巢爱德华卡普里奥罗,院长Wampler和Jason拉瑟格伦
- 通过Alan Gates 编程Pig
- Hadoop API
- MRUnit用于单元测试Apache Hadoop映射减少工作
翻译自: https://www.javacodegeeks.com/2014/02/mapreduce-algorithms-understanding-data-joins-part-ii.html
mapreduce 算法