在本文中,我们继续执行一系列实现算法的系列,该算法在使用MapReduce进行数据密集型文本处理中找到,这一次讨论数据联接。 虽然我们将讨论在Hadoop中联接数据的技术并提供示例代码,但在大多数情况下,您可能不会自己编写代码来执行联接。 取而代之的是,使用可以在更高抽象级别工作的工具(例如Hive或Pig)可以更好地完成连接数据。 如果有可以帮助您处理数据的工具,为什么还要花时间学习如何联接数据呢? 可以说,联接数据是Hadoop的最大用途之一。 全面了解Hadoop如何执行联接对于确定使用哪个联接以及在出现问题时进行调试至关重要。 此外,一旦您完全了解了Hadoop中如何执行不同的联接,就可以更好地利用Hive和Pig等工具。 最后,在一种情况下,一种工具可能无法满足您的需求,因此您必须袖手旁观并自行编写代码。
加入的需要
在处理大型数据集时,如果不是必需的话,通过公用密钥连接数据的需求可能会非常有用。 通过加入数据,您可以进一步获得洞察力,例如加入时间戳以将事件与一天中的时间关联起来。 连接数据的需求多种多样。 我们将在3个单独的帖子中介绍3种类型的联接:Reduce-Side联接,Map-Side联接和Memory-Backed联接。 在这一期中,我们将考虑使用Reduce-Side联接。
减少侧面连接
在我们将要讨论的联接模式中,减少端联接是最容易实现的。 简化方联接的直接原因是Hadoop将相同的密钥发送到相同的reducer,因此默认情况下,数据是为我们组织的。 要执行联接,我们只需要缓存一个密钥并将其与传入密钥进行比较。 只要键匹配,我们就可以结合来自相应键的值。 由于所有数据在整个网络上都经过混洗,因此使用减少侧连接进行权衡是性能。 在减少侧连接中,我们将考虑两种不同的方案:一对一和一对多。 我们还将探索不需要跟踪传入密钥的选项; 给定键的所有值都将在简化器中分组在一起。
一对一加入
一对一联接的情况是数据集“ X”中的值与数据集“ Y”中的值共享一个公共密钥。 由于Hadoop保证将相等的键发送到同一reducer,因此在两个数据集上进行映射将为我们处理联接。 由于仅对键进行排序,因此值的顺序未知。 我们可以使用辅助排序轻松解决这种情况。 我们二级排序的实现方式是用“ 1”或“ 2”标记键,以确定值的顺序。 我们需要采取一些额外的步骤来实施我们的标记策略。
实现一个WritableComparable
首先,我们需要编写一个实现WritableComparable接口的类,该接口将用于包装密钥。
public class TaggedKey implements Writable, WritableComparable<TaggedKey> {private Text joinKey = new Text();private IntWritable tag = new IntWritable();@Overridepublic int compareTo(TaggedKey taggedKey) {int compareValue = this.joinKey.compareTo(taggedKey.getJoinKey());if(compareValue == 0 ){compareValue = this.tag.compareTo(taggedKey.getTag());}return compareValue;}//Details left out for clarity}
当我们对TaggedKey类进行排序时,具有相同joinKey
值的键将在tag
字段的值上进行次要排序,以确保我们想要的顺序。
编写自定义分区程序
接下来,我们需要编写一个自定义分区程序,该分区程序仅在确定复合键和数据发送到哪个减速器时才考虑连接键:
public class TaggedJoiningPartitioner extends Partitioner<TaggedKey,Text> {@Overridepublic int getPartition(TaggedKey taggedKey, Text text, int numPartitions) {return taggedKey.getJoinKey().hashCode() % numPartitions;}
}
至此,我们拥有了连接数据并确保值顺序的条件。 但是,当键进入reduce()
方法时,我们不想跟踪它们。 我们希望将所有价值观归为一体。 为此,我们将使用Comparator
,该Comparator
在决定如何对值进行分组时仅考虑联接键。
编写组比较器
用于分组的比较器如下所示:
public class TaggedJoiningGroupingComparator extends WritableComparator {public TaggedJoiningGroupingComparator() {super(TaggedKey.class,true);}@Overridepublic int compare(WritableComparable a, WritableComparable b) {TaggedKey taggedKey1 = (TaggedKey)a;TaggedKey taggedKey2 = (TaggedKey)b;return taggedKey1.getJoinKey().compareTo(taggedKey2.getJoinKey());}
}
数据结构
现在,我们需要确定将用于密钥的哪些数据。 对于我们的样本数据,我们将使用从Fakenames Generator生成的CSV文件。 第一列是GUID,它将用作我们的联接键。 我们的样本数据包含诸如姓名,地址,电子邮件,工作信息,信用卡和拥有的汽车之类的信息。 为了演示的目的,我们将使用GUID,名称和地址字段,并将它们放置在一个结构如下的文件中:
cdd8dde3-0349-4f0d-b97a-7ae84b687f9c,Esther,Garner,4071 Haven Lane,Okemos,MI
81a43486-07e1-4b92-b92b-03d0caa87b5f,Timothy,Duncan,753 Stadium Drive,Taunton,MA
aef52cf1-f565-4124-bf18-47acdac47a0e,Brett,Ramsey,4985 Shinn Street,New York,NY
然后,我们将使用GUID,电子邮件地址,用户名,密码和信用卡号字段,然后将其放置在另一个文件中,该文件应类似于:
cdd8dde3-0349-4f0d-b97a-7ae84b687f9c,517-706-9565,EstherJGarner@teleworm.us,Waskepter38,noL2ieghie,MasterCard,
5305687295670850
81a43486-07e1-4b92-b92b-03d0caa87b5f,508-307-3433,TimothyDDuncan@einrot.com,Conerse,Gif4Edeiba,MasterCard,
5265896533330445
aef52cf1-f565-4124-bf18-47acdac47a0e,212-780-4015,BrettMRamsey@dayrep.com,Subjecall,AiKoiweihi6,MasterCard,524
现在,我们需要有一个Mapper,它将知道如何处理我们的数据以提取正确的联接键并设置正确的标签。
创建映射器
这是我们的Mapper代码:
public class JoiningMapper extends Mapper<LongWritable, Text, TaggedKey, Text> {private int keyIndex;private Splitter splitter;private Joiner joiner;private TaggedKey taggedKey = new TaggedKey();private Text data = new Text();private int joinOrder;@Overrideprotected void setup(Context context) throws IOException, InterruptedException {keyIndex = Integer.parseInt(context.getConfiguration().get("keyIndex"));String separator = context.getConfiguration().get("separator");splitter = Splitter.on(separator).trimResults();joiner = Joiner.on(separator);FileSplit fileSplit = (FileSplit)context.getInputSplit();joinOrder = Integer.parseInt(context.getConfiguration().get(fileSplit.getPath().getName()));}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {List<String> values = Lists.newArrayList(splitter.split(value.toString()));String joinKey = values.remove(keyIndex);String valuesWithOutKey = joiner.join(values);taggedKey.set(joinKey, joinOrder);data.set(valuesWithOutKey);context.write(taggedKey, data);}}
让我们回顾一下setup()
方法中发生的事情。
- 首先,从启动作业时在“配置”中设置的值获取连接键的索引和文本中使用的分隔符。
- 然后,我们创建一个Guava拆分器,用于拆分从对
context.getConfiguration().get("separator")
的调用中检索到的分隔符上的数据。 我们还创建了一个Guava Joiner,用于在提取密钥后将数据重新放在一起。 - 接下来,我们获取此映射器将要处理的文件的名称。 我们使用文件名来提取此配置中存储的文件的连接顺序。
我们还应该讨论map()
方法中发生的事情:
- 分散数据并创建值列表
- 从列表中删除联接密钥
- 重新将数据重新合并为单个字符串
- 设置连接密钥,连接顺序和剩余数据
- 写出数据
因此,我们已经读入数据,提取了密钥,设置了连接顺序,然后将数据写回了。 让我们看一下如何结合数据。
联接数据
现在让我们看一下数据如何在化简器中联接:
public class JoiningReducer extends Reduce<TaggedKey, Text, NullWritable, Text> {private Text joinedText = new Text();private StringBuilder builder = new StringBuilder();private NullWritable nullKey = NullWritable.get();@Overrideprotected void reduce(TaggedKey key, Iterable<Text> values, Context context) throws IOException, InterruptedException {builder.append(key.getJoinKey()).append(",");for (Text value : values) {builder.append(value.toString()).append(",");}builder.setLength(builder.length()-1);joinedText.set(builder.toString());context.write(nullKey, joinedText);builder.setLength(0);}
}
因为带有“ 1”标签的密钥首先到达了还原器,所以我们知道名称和地址数据是第一个值,而电子邮件,用户名,密码和信用卡数据是第二个值。 因此,我们不需要跟踪任何键。 我们只需遍历值并将它们连接在一起。
一对一加入结果
这是运行我们的一对一MapReduce作业的结果:
cdd8dde3-0349-4f0d-b97a-7ae84b687f9c,Esther,Garner,4071 Haven Lane,Okemos,MI,517-706-9565,EstherJGarner@teleworm.us,Waskepter38,noL2ieghie,MasterCard,
5305687295670850
81a43486-07e1-4b92-b92b-03d0caa87b5f,Timothy,Duncan,753 Stadium Drive,Taunton,MA,508-307-3433,TimothyDDuncan@einrot.com,Conerse,Gif4Edeiba,MasterCard,
5265896533330445
aef52cf1-f565-4124-bf18-47acdac47a0e,Brett,Ramsey,4985 Shinn Street,New York,NY,212-780-4015,BrettMRamsey@dayrep.com,Subjecall,AiKoiweihi6,MasterCard,
5243379373546690
正如我们可以看到的,以上示例数据中的两条记录已合并为一条记录。 我们已经成功地将GUID,名称,地址,电子邮件地址,用户名,密码和信用卡字段加入到一个文件中。
指定加入顺序
此时,我们可能会问如何为多个文件指定连接顺序? 答案就在我们的ReduceSideJoinDriver
类中,该类充当MapReduce程序的驱动程序。
public class ReduceSideJoinDriver {public static void main(String[] args) throws Exception {Splitter splitter = Splitter.on('/');StringBuilder filePaths = new StringBuilder();Configuration config = new Configuration();config.set("keyIndex", "0");config.set("separator", ",");for(int i = 0; i< args.length - 1; i++) {String fileName = Iterables.getLast(splitter.split(args[i]));config.set(fileName, Integer.toString(i+1));filePaths.append(args[i]).append(",");}filePaths.setLength(filePaths.length() - 1);Job job = Job.getInstance(config, "ReduceSideJoin");job.setJarByClass(ReduceSideJoinDriver.class);FileInputFormat.addInputPaths(job, filePaths.toString());FileOutputFormat.setOutputPath(job, new Path(args[args.length-1]));job.setMapperClass(JoiningMapper.class);job.setReducerClass(JoiningReducer.class);job.setPartitionerClass(TaggedJoiningPartitioner.class);job.setGroupingComparatorClass(TaggedJoiningGroupingComparator.class);job.setOutputKeyClass(TaggedKey.class);job.setOutputValueClass(Text.class);System.exit(job.waitForCompletion(true) ? 0 : 1);}
}
- 首先,我们在第5行上创建一个番石榴分割器,该分割器将用“ /”分割字符串。
- 然后在第8-10行上,设置连接键的索引和文件中使用的分隔符。
- 在第12-17行中,我们为要连接的输入文件设置标签。 命令行上文件名的顺序决定了它们在联接中的位置。 当我们从命令行循环遍历文件名时,我们将拆分整个文件名并通过Guava
Iterables.getLast()
方法检索最后一个值(基本文件名)。 然后,我们使用文件名作为键调用config.set()
,并使用i + 1
作为值,这将设置标签或连接顺序。args
数组中的最后一个值在循环中被跳过,因为它用于第23行的MapReduce作业的输出路径。在循环的最后一行,我们将每个文件路径附加到StringBuilder中,稍后使用( 22)设置作业的输入路径。 - 我们只需要对所有文件使用一个映射器,即JoiningMapper,该映射器在第25行设置。
- 第27和28行分别设置了我们的自定义分区程序和组比较器,以确保键和值到达化简器的顺序,并使用正确的键正确地对值进行分组。
通过使用分区程序和分组比较器,我们知道第一个值属于第一个键,并且可以用于将Iterable
包含的所有其他值连接到给定键的reduce()
方法中。 现在是时候考虑一对多联接了。
一对多加入
好消息是到目前为止,我们已经完成了所有工作,实际上我们可以使用代码执行一对多连接。 对于一对多联接,我们可以考虑两种方法:1)一个包含单个记录的小文件,另一个包含具有相同键的多个记录的文件,以及2)同样具有单个记录的小文件,但是N每个文件包含与第一个文件匹配的记录的文件数。 主要区别在于,采用第一种方法时,除了前两个键的联接之外,值的顺序将是未知的。 但是,使用第二种方法,我们将“标记”每个联接文件,以便我们可以控制所有联接值的顺序。 对于我们的示例,第一个文件将保留为我们的GUID名称-地址文件,并且我们将拥有3个其他文件,其中将包含汽车,雇主和工作描述记录。 这可能不是最现实的情况,但将用于演示。 以下是在进行联接之前数据外观的示例:
//The single person records
cdd8dde3-0349-4f0d-b97a-7ae84b687f9c,Esther,Garner,4071 Haven Lane,Okemos,MI
81a43486-07e1-4b92-b92b-03d0caa87b5f,Timothy,Duncan,753 Stadium Drive,Taunton,MA
aef52cf1-f565-4124-bf18-47acdac47a0e,Brett,Ramsey,4985 Shinn Street,New York,NY
//Automobile records
cdd8dde3-0349-4f0d-b97a-7ae84b687f9c,2003 Holden Cruze
81a43486-07e1-4b92-b92b-03d0caa87b5f,2012 Volkswagen T5
aef52cf1-f565-4124-bf18-47acdac47a0e,2009 Renault Trafic
//Employer records
cdd8dde3-0349-4f0d-b97a-7ae84b687f9c,Creative Wealth
81a43486-07e1-4b92-b92b-03d0caa87b5f,Susie's Casuals
aef52cf1-f565-4124-bf18-47acdac47a0e,Super Saver Foods
//Job Description records
cdd8dde3-0349-4f0d-b97a-7ae84b687f9c,Data entry clerk
81a43486-07e1-4b92-b92b-03d0caa87b5f,Precision instrument and equipment repairer
aef52cf1-f565-4124-bf18-47acdac47a0e,Gas and water service dispatcher
一对多加入结果
现在,让我们看一下一对多联接结果的示例(使用上面的相同值来辅助比较):
cdd8dde3-0349-4f0d-b97a-7ae84b687f9c,Esther,Garner,4071 Haven Lane,Okemos,MI,2003 Holden Cruze,Creative Wealth,Data entry clerk
81a43486-07e1-4b92-b92b-03d0caa87b5f,Timothy,Duncan,753 Stadium Drive,Taunton,MA,2012 Volkswagen T5,Susie's Casuals,Precision instrument and equipment repairer
aef52cf1-f565-4124-bf18-47acdac47a0e,Brett,Ramsey,4985 Shinn Street,New York,NY,2009 Renault Trafic,Super Saver Foods,Gas and water service dispatcher
结果表明,我们已经能够成功地以指定顺序连接多个值。
结论
我们已经成功演示了如何在MapReduce中执行约简边连接。 即使该方法并不太复杂,我们也可以看到在Hadoop中执行联接可能涉及编写大量代码。 虽然学习联接的工作方式是一项有用的练习,但是在大多数情况下,使用Hive或Pig这样的工具联接数据要好得多。 谢谢你的时间。
资源资源
- Jimmy Lin和Chris Dyer 使用MapReduce进行的数据密集型处理
- Hadoop: Tom White 的权威指南
- 来自博客的源代码和测试
- 爱德华·卡普里奥洛(Edward Capriolo),迪恩·沃普勒(Dean Wampler)和杰森·卢瑟格伦(Jason Rutherglen)的编程蜂巢
- 通过Alan Gates对Pig进行编程
- Hadoop API
- MRUnit用于单元测试Apache Hadoop映射减少工作
翻译自: https://www.javacodegeeks.com/2013/07/mapreduce-algorithms-understanding-data-joins-part-1.html