mapreduce 算法_MapReduce算法–了解数据联接第1部分

mapreduce 算法

在本文中,我们继续执行一系列实现算法的系列,该算法在使用MapReduce进行数据密集型文本处理中找到,这一次讨论数据联接。 虽然我们将讨论在Hadoop中联接数据的技术并提供示例代码,但在大多数情况下,您可能不会自己编写代码来执行联接。 相反,使用诸如Hive或Pig的更高抽象级别的工具可以更好地完成连接数据。 如果有可以帮助您处理数据的工具,为什么还要花时间学习如何联接数据? 可以说,联接数据是Hadoop的最大用途之一。 全面了解Hadoop如何执行联接对于确定使用哪个联接以及在出现问题时进行调试至关重要。 另外,一旦您完全了解了Hadoop中如何执行不同的联接,就可以更好地利用Hive和Pig等工具。 最后,可能出现一种情况,即某个工具无法满足您的需求,您必须袖手旁观并自行编写代码。

加入的需要

在处理大型数据集时,如果不是必需的话,通过公用密钥连接数据的需求可能会非常有用。 通过加入数据,您可以进一步获得洞察力,例如加入时间戳以将事件与一天中的时间关联起来。 连接数据的需求多种多样。 我们将在3个单独的帖子中介绍3种类型的联接:Reduce-Side联接,Map-Side联接和Memory-Backed联接。 在这一期中,我们将考虑使用Reduce-Side联接。

减少侧面连接

在我们将要讨论的联接模式中,减少端联接是最容易实现的。 简化方联接的直接原因是Hadoop将相同的密钥发送到相同的reducer,因此默认情况下,数据是为我们组织的。 要执行连接,我们只需要缓存一个密钥并将其与传入密钥进行比较。 只要键匹配,我们就可以结合来自相应键的值。 由于所有数据在整个网络上都经过混洗,因此使用减少侧连接进行权衡是性能。 在减少侧连接中,我们将考虑两种不同的方案:一对一和一对多。 我们还将探索不需要跟踪传入密钥的选项; 给定键的所有值都将在简化器中分组在一起。

一对一加入

一对一联接的情况是数据集“ X”中的值与数据集“ Y”中的值共享一个公共密钥。 由于Hadoop保证将相同的键发送到同一reducer,因此在两个数据集上进行映射将为我们处理联接。 由于仅对键进行排序,因此值的顺序未知。 我们可以使用辅助排序轻松解决这种情况。 我们二级排序的实现方式是用“ 1”或“ 2”标记键,以确定值的顺序。 我们需要采取一些额外的步骤来实施我们的标记策略。

实现一个WritableComparable

首先,我们需要编写一个实现WritableComparable接口的类,该接口将用于包装密钥。

public class TaggedKey implements Writable, WritableComparable<TaggedKey> {private Text joinKey = new Text();private IntWritable tag = new IntWritable();@Overridepublic int compareTo(TaggedKey taggedKey) {int compareValue = this.joinKey.compareTo(taggedKey.getJoinKey());if(compareValue == 0 ){compareValue = this.tag.compareTo(taggedKey.getTag());}return compareValue;}//Details left out for clarity}

当我们对TaggedKey类进行排序时,具有相同joinKey值的键将在tag字段的值上进行次要排序,以确保我们想要的顺序。

编写自定义分区程序

接下来,我们需要编写一个自定义分区程序,该分区程序仅在确定复合键和数据发送到哪个减速器时才考虑连接键:

public class TaggedJoiningPartitioner extends Partitioner<TaggedKey,Text> {@Overridepublic int getPartition(TaggedKey taggedKey, Text text, int numPartitions) {return taggedKey.getJoinKey().hashCode() % numPartitions;}
}

至此,我们拥有了连接数据并确保值顺序的条件。 但是,当它们进入reduce()方法时,我们不想跟踪它们。 我们希望将所有价值观归为一体。 为此,我们将使用Comparator ,该Comparator在决定如何对值进行分组时仅考虑联接键。

编写组比较器

用于分组的比较器如下所示:

public class TaggedJoiningGroupingComparator extends WritableComparator {public TaggedJoiningGroupingComparator() {super(TaggedKey.class,true);}@Overridepublic int compare(WritableComparable a, WritableComparable b) {TaggedKey taggedKey1 = (TaggedKey)a;TaggedKey taggedKey2 = (TaggedKey)b;return taggedKey1.getJoinKey().compareTo(taggedKey2.getJoinKey());}
}

数据结构

现在,我们需要确定用于键联接数据的内容。 对于我们的样本数据,我们将使用从Fakenames Generator生成的CSV文件。 第一列是GUID,它将用作我们的连接键。 我们的样本数据包含诸如姓名,地址,电子邮件,工作信息,信用卡和拥有的汽车之类的信息。 为了演示的目的,我们将使用GUID,名称和地址字段,并将它们放在一个结构如下的文件中:

cdd8dde3-0349-4f0d-b97a-7ae84b687f9c,Esther,Garner,4071 Haven Lane,Okemos,MI
81a43486-07e1-4b92-b92b-03d0caa87b5f,Timothy,Duncan,753 Stadium Drive,Taunton,MA
aef52cf1-f565-4124-bf18-47acdac47a0e,Brett,Ramsey,4985 Shinn Street,New York,NY

然后,我们将使用GUID,电子邮件地址,用户名,密码和信用卡号字段,然后将其放置在另一个类似于以下文件中:

cdd8dde3-0349-4f0d-b97a-7ae84b687f9c,517-706-9565,EstherJGarner@teleworm.us,Waskepter38,noL2ieghie,MasterCard,
5305687295670850
81a43486-07e1-4b92-b92b-03d0caa87b5f,508-307-3433,TimothyDDuncan@einrot.com,Conerse,Gif4Edeiba,MasterCard,
5265896533330445
aef52cf1-f565-4124-bf18-47acdac47a0e,212-780-4015,BrettMRamsey@dayrep.com,Subjecall,AiKoiweihi6,MasterCard,524

现在,我们需要有一个Mapper,它将知道如何处理我们的数据以提取正确的联接键并设置正确的标签。

创建映射器

这是我们的Mapper代码:

public class JoiningMapper extends Mapper<LongWritable, Text, TaggedKey, Text> {private int keyIndex;private Splitter splitter;private Joiner joiner;private TaggedKey taggedKey = new TaggedKey();private Text data = new Text();private int joinOrder;@Overrideprotected void setup(Context context) throws IOException, InterruptedException {keyIndex = Integer.parseInt(context.getConfiguration().get("keyIndex"));String separator = context.getConfiguration().get("separator");splitter = Splitter.on(separator).trimResults();joiner = Joiner.on(separator);FileSplit fileSplit = (FileSplit)context.getInputSplit();joinOrder = Integer.parseInt(context.getConfiguration().get(fileSplit.getPath().getName()));}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {List<String> values = Lists.newArrayList(splitter.split(value.toString()));String joinKey = values.remove(keyIndex);String valuesWithOutKey = joiner.join(values);taggedKey.set(joinKey, joinOrder);data.set(valuesWithOutKey);context.write(taggedKey, data);}}

让我们回顾一下setup()方法中发生的事情。

  1. 首先,从启动作业时在配置中设置的值中获取连接键的索引和文本中使用的分隔符。
  2. 然后,我们创建一个Guava拆分器,用于拆分从对context.getConfiguration().get("separator")的调用中检索到的分隔符上的数据。 我们还创建了一个Guava Joiner,用于在提取密钥后将数据重新放在一起。
  3. 接下来,我们获取此映射器将要处理的文件的名称。 我们使用文件名提取存储在配置中的该文件的连接顺序。

我们还应该讨论map()方法中发生的事情:

  1. 分散数据并创建值列表
  2. 从列表中删除联接密钥
  3. 重新将数据重新合并为单个字符串
  4. 设置连接密钥,连接顺序和剩余数据
  5. 写出数据

因此,我们已经读入数据,提取了密钥,设置了连接顺序,然后将数据写回了。 让我们看一下如何结合数据。

联接数据

现在让我们看一下数据如何在化简器中联接:

public class JoiningReducer extends Reduce<TaggedKey, Text, NullWritable, Text> {private Text joinedText = new Text();private StringBuilder builder = new StringBuilder();private NullWritable nullKey = NullWritable.get();@Overrideprotected void reduce(TaggedKey key, Iterable<Text> values, Context context) throws IOException, InterruptedException {builder.append(key.getJoinKey()).append(",");for (Text value : values) {builder.append(value.toString()).append(",");}builder.setLength(builder.length()-1);joinedText.set(builder.toString());context.write(nullKey, joinedText);builder.setLength(0);}
}

因为带有标签“ 1”的密钥首先到达了还原器,所以我们知道名称和地址数据是第一个值,而电子邮件,用户名,密码和信用卡数据是第二个值。 因此,我们不需要跟踪任何键。 我们只需遍历这些值并将它们连接在一起。

一对一加入结果

这是运行我们的一对一MapReduce作业的结果:

cdd8dde3-0349-4f0d-b97a-7ae84b687f9c,Esther,Garner,4071 Haven Lane,Okemos,MI,517-706-9565,EstherJGarner@teleworm.us,Waskepter38,noL2ieghie,MasterCard,
5305687295670850
81a43486-07e1-4b92-b92b-03d0caa87b5f,Timothy,Duncan,753 Stadium Drive,Taunton,MA,508-307-3433,TimothyDDuncan@einrot.com,Conerse,Gif4Edeiba,MasterCard,
5265896533330445
aef52cf1-f565-4124-bf18-47acdac47a0e,Brett,Ramsey,4985 Shinn Street,New York,NY,212-780-4015,BrettMRamsey@dayrep.com,Subjecall,AiKoiweihi6,MasterCard,
5243379373546690

如我们所见,上述示例数据中的两条记录已合并为一条记录。 我们已经成功地将GUID,名称,地址,电子邮件地址,用户名,密码和信用卡字段加入到一个文件中。

指定加入顺序

此时,我们可能会问我们如何指定多个文件的连接顺序? 答案就在我们的ReduceSideJoinDriver类中,该类充当MapReduce程序的驱动程序。

public class ReduceSideJoinDriver {public static void main(String[] args) throws Exception {Splitter splitter = Splitter.on('/');StringBuilder filePaths = new StringBuilder();Configuration config = new Configuration();config.set("keyIndex", "0");config.set("separator", ",");for(int i = 0; i< args.length - 1; i++) {String fileName = Iterables.getLast(splitter.split(args[i]));config.set(fileName, Integer.toString(i+1));filePaths.append(args[i]).append(",");}filePaths.setLength(filePaths.length() - 1);Job job = Job.getInstance(config, "ReduceSideJoin");job.setJarByClass(ReduceSideJoinDriver.class);FileInputFormat.addInputPaths(job, filePaths.toString());FileOutputFormat.setOutputPath(job, new Path(args[args.length-1]));job.setMapperClass(JoiningMapper.class);job.setReducerClass(JoiningReducer.class);job.setPartitionerClass(TaggedJoiningPartitioner.class);job.setGroupingComparatorClass(TaggedJoiningGroupingComparator.class);job.setOutputKeyClass(TaggedKey.class);job.setOutputValueClass(Text.class);System.exit(job.waitForCompletion(true) ? 0 : 1);}
}
    1. 首先,我们在第5行上创建一个番石榴分割器,该分割器将用“ /”分割字符串。
    2. 然后在第8-10行上,设置连接键的索引和文件中使用的分隔符。
    3. 在第12-17行中,我们为要连接的输入文件设置标签。 命令行上文件名的顺序决定了它们在联接中的位置。 从命令行循环遍历文件名时,我们将拆分整个文件名,并通过Guava Iterables.getLast()方法检索最后一个值(基本文件名)。 然后,我们使用文件名作为键调用config.set() ,并使用i + 1作为值,这将设置标签或连接顺序。 args数组中的最后一个值在循环中被跳过,因为它用于第23行的MapReduce作业的输出路径。在循环的最后一行,我们将每个文件路径附加到StringBuilder中,稍后使用( 22)设置作业的输入路径。
    4. 我们只需要对所有文件使用一个映射器,即JoiningMapper,该映射器在第25行设置。
    5. 第27和28行分别设置了我们的自定义分区和组比较器,以确保键和值到达化简器的顺序,并使用正确的键正确地对值进行分组。

通过使用分区程序和分组比较器,我们知道第一个值属于第一个键,并且可以用于将Iterable包含的所有其他值连接到给定键的reduce()方法中。 现在是时候考虑一​​对多联接了。

一对多加入

好消息是到目前为止,我们已经完成了所有工作,实际上我们可以使用代码执行一对多连接。 对于一对多联接,我们可以考虑以下两种方法:1)一个包含单个记录的小文件,另一个包含具有相同键的多个记录的文件,以及2)同样具有单个记录的小文件,但是N每个文件包含与第一个文件匹配的记录的文件数。 主要区别在于,采用第一种方法时,超出前两个键的联接的值的顺序将是未知的。 但是,使用第二种方法,我们将“标记”每个联接文件,以便我们可以控制所有联接值的顺序。 对于我们的示例,第一个文件将保留为我们的GUID名称-地址文件,并且我们将有3个其他文件,其中将包含汽车,雇主和工作描述记录。 这可能不是最现实的情况,但将用于演示。 以下是在进行联接之前数据的外观示例:

//The single person records
cdd8dde3-0349-4f0d-b97a-7ae84b687f9c,Esther,Garner,4071 Haven Lane,Okemos,MI
81a43486-07e1-4b92-b92b-03d0caa87b5f,Timothy,Duncan,753 Stadium Drive,Taunton,MA
aef52cf1-f565-4124-bf18-47acdac47a0e,Brett,Ramsey,4985 Shinn Street,New York,NY
//Automobile records
cdd8dde3-0349-4f0d-b97a-7ae84b687f9c,2003 Holden Cruze
81a43486-07e1-4b92-b92b-03d0caa87b5f,2012 Volkswagen T5
aef52cf1-f565-4124-bf18-47acdac47a0e,2009 Renault Trafic
//Employer records
cdd8dde3-0349-4f0d-b97a-7ae84b687f9c,Creative Wealth
81a43486-07e1-4b92-b92b-03d0caa87b5f,Susie's Casuals
aef52cf1-f565-4124-bf18-47acdac47a0e,Super Saver Foods
//Job Description records
cdd8dde3-0349-4f0d-b97a-7ae84b687f9c,Data entry clerk
81a43486-07e1-4b92-b92b-03d0caa87b5f,Precision instrument and equipment repairer
aef52cf1-f565-4124-bf18-47acdac47a0e,Gas and water service dispatcher

一对多加入结果

现在,让我们看一下一对多联接结果的示例(使用上面的相同值来辅助比较):

cdd8dde3-0349-4f0d-b97a-7ae84b687f9c,Esther,Garner,4071 Haven Lane,Okemos,MI,2003 Holden Cruze,Creative Wealth,Data entry clerk
81a43486-07e1-4b92-b92b-03d0caa87b5f,Timothy,Duncan,753 Stadium Drive,Taunton,MA,2012 Volkswagen T5,Susie's Casuals,Precision instrument and equipment repairer
aef52cf1-f565-4124-bf18-47acdac47a0e,Brett,Ramsey,4985 Shinn Street,New York,NY,2009 Renault Trafic,Super Saver Foods,Gas and water service dispatcher

如结果所示,我们已经能够成功地以指定顺序连接多个值。

结论

我们已经成功演示了如何在MapReduce中执行约简边连接。 即使该方法并不太复杂,我们也可以看到在Hadoop中执行联接可能涉及编写大量代码。 虽然学习联接的工作方式是一项有用的练习,但是在大多数情况下,使用Hive或Pig这样的工具联接数据要好得多。 谢谢你的时间。

资源资源

    • Jimmy Lin和Chris Dyer 使用MapReduce进行的数据密集型处理
    • Hadoop: Tom White 的权威指南
    • 来自博客的源代码和测试
    • 编程蜂巢爱德华卡普里奥罗,院长Wampler和Jason拉瑟格伦
    • 通过Alan Gates 编程Pig
    • Hadoop API
    • MRUnit用于单元测试Apache Hadoop映射减少工作

    参考: MapReduce算法–了解数据 ,是我们的JCG合作伙伴 Bill Bejeck在“ 随机思考编码”博客上的第1部分 。

    翻译自: https://www.javacodegeeks.com/2013/07/mapreduce-algorithms-understanding-data-joins-part-1.html

    mapreduce 算法

    本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/348191.shtml

    如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

    相关文章

    几个有用的word小技巧,保准提升效率~

    这几天改报告改到头秃。年底了&#xff0c;实验室各种项目在结项&#xff0c;作为一名研一新生&#xff0c;理所应当地承担起了体力活的工作。主要负责项目报告的研究背景调研、报告汇总、格式调整等。 格式调整&#xff0c;看起来非常简单的一项工作&#xff0c;却是最费时费…

    Java 8:长期支持的堡垒

    斯蒂芬科尔本 &#xff08; Stephen Colebourne &#xff09;的文章“ Java 9可以使用六个星期 ”开始&#xff0c;“ Java 9仅仅六个星期 就已过时。” Colebourne参考了Mark Reinhold博客文章“ Moving Java Forwarding Faster ”&#xff0c;并写道&#xff1a;“新的Java发…

    我的2020(年终总结)

    我的2020 2020对每个人来说可能都是极不平凡的一年&#xff0c;对我尤其是。这一年我整个的人心境发生了极大的变化。总结来说&#xff0c;有这么几件大事&#xff1a;复试、毕设、大学毕业、研究生开学、分手、再脱单、疫情在家。今天坐在实验室也不知道该干点什么&#xff0…

    Java学习 第三章 数组(二)多维数组

    多维数组的使用 由数组构成的数组 二维数组&#xff1a; ① 二维数组的声明和初始化 ② 如何调用数组的指定位置的元素 ③ 如何获取数组的长度 ④ 如何遍历数组 ⑤ 数组元素的默认初始化值 &#xff1a;见ArrayTest1.java 数组元素是整形&#xff1a;0 数组元素是浮点型&…

    Spring Boot 2中的功能切换

    无论您是否喜欢&#xff0c;软件开发都是一项协作活动。 整合工作一直被妖魔化&#xff0c;并被视为必不可少的邪恶。 有几种方法可以解决有效集成的挑战。 功能切换开关属于该组。 在本文中&#xff0c;您将在实践中看到如何在Spring Boot应用程序中使用功能切换&#xff08;也…

    Java学习 第三章 数组(三)排序算法

    ** Java学习 第三章 数组&#xff08;三&#xff09;排序算法 ** 主要内容&#xff1a;排序算法、排序算法横向比较、Arrays工具类的使用、数组常见异常 1.数组中涉及到的常见算法&#xff1a;排序算法 1.1 排序算法分类&#xff1a;内部排序和外部排序 1.2 十大内部排序算…

    【强化学习】一些网站整理

    莫烦教程 https://mofanpy.com/tutorials/machine-learning/reinforcement-learning/ 博客园&#xff1a;刘建平Pinard https://www.cnblogs.com/pinard/category/1254674.html Deep-Q-Network 学习笔记 https://www.cnblogs.com/cjnmy36723/p/7017549.html 强化学习到深度强…

    xmx java_为什么我的Java进程比Xmx消耗更多的内存?

    xmx java你们有些人去过那里。 您已经在启动脚本中添加了-Xmx选项&#xff0c;并放松了下来&#xff0c;因为您知道Java进程将不会消耗比经过微调的选项所允许的更多的内存。 然后&#xff0c;您感到非常讨厌。 要么自己检查开发/测试框中的过程表&#xff0c;要么事情真的变坏…

    卫星通信系统概述

    卫星通信系统指通过在轨人造卫星作为中继站对无线电信号进行转发&#xff0c;实现地面及空间等用户之间信息传输的系统。卫星通信系统组成包括空间段及地面段&#xff0c;系统组成如图所示。其中空间段主要指在轨卫星、对在轨卫星进行操控的地面站&#xff0c;这些地面站主要实…

    ElasticSearch初学者教程

    1.简介 在此示例中&#xff0c;我们将演示如何使用Elasticsearch &#xff0c; Elasticsearch是一个基于Apache Lucene的分布式自由文本搜索和分析数据库引擎&#xff0c;具有一个基于maven的简单Java客户端。 在撰写本文时&#xff0c;我们将使用最新版本的Elasticsearch&…

    博弈论与纳什均衡

    三十分钟理解博弈论“纳什均衡” – Nash Equilibrium https://blog.csdn.net/xbinworld/article/details/50932559 纳什均衡(Nash equilibrium)及经典案例 https://blog.csdn.net/u010420283/article/details/83927742 论文&#xff1a; [1] 刘帅军. 卫星通信系统中动态资源…

    Opnet入门

    一、opnet快速入门 1.系统界面&文件菜单说明 2.常用文件名后缀及描述 3.Opnet建模层次 用户只有一种节点域模型 三、 OPNET Modeler网络仿真机制 1.事件的属性 每次点击next会出现以下界面&#xff1a; 2.事件的执行 调度型&#xff1a;按照正常程序调度事件 强制性&…

    JSON的JUnit Hamcrest Matcher

    这篇文章展示了如何编写JUnit测试来检查对象是否与JSON字符串匹配。 如果您要实现REST服务并想测试您的服务是否产生了预期的JSON响应&#xff0c;那么这一点很重要。 JSONassert是比较JSON对象的有用库。 首先&#xff0c;您必须将Java对象转换为JSON字符串&#xff08;例如&…

    Python列表推导式

    列表推导式 是Python构建列表&#xff08;list&#xff09;的一种快捷方式,可以使用简洁的代码就创建出一个列表&#xff0c;即循环创建列表. for可以用来创建列表&#xff0c;列表推导式就相当于是for循环的简化版 1. 最简单的情况 values [10, 21, 5, 7, 12] squares [] …

    一些python函数及其用法

    1.np.ravel&#xff08;&#xff09;方法 ravel是将数组维度拉成一维数组&#xff0c;也就是将矩阵向量化 x np.array{ [ [ 1 , 2 , 3 ] , [ 4 , 5 , 6 ] ] } print(np.ravel(x))输出 [ 1 2 3 4 5 6 ]2.b a[np.newaxis,:] import numpy as np a np.arange(0, 10) print(a…

    使用Spring WebFlux进行操作

    Spring Boot 2.0最近在GA上线了&#xff0c;所以我决定在相当长一段时间内写我的第一篇有关Spring的文章。 自发布以来&#xff0c;我已经看到越来越多的提到Spring WebFlux以​​及有关如何使用它的教程。 但是&#xff0c;在阅读完它们并尝试使它们自己工作之后&#xff0c;我…

    【强化学习】Policy Gradient原理

    1.Policy Gradient和DQN系列强化算法最大的区别在于&#xff1a; DQN系列基于Value&#xff0c;也就是说执行完所有的动作并保存所得到的价值&#xff0c;根据这些价值计算出最优价值函数&#xff0c;并以此选择动作&#xff0c;最终获得一个特定的策略。 Policy Gradient基于策…

    ajax的url怎么将后缀补上_蜂蜜杏仁怎么做?杏仁和蜂蜜腌制方法

    蜂蜜杏仁怎么做?杏仁和蜂蜜腌制方法蜂蜜杏仁是一种非常好吃的小零食&#xff0c;很多小可爱都喜欢吃蜂蜜杏仁。不过有些时候忘记补货&#xff0c;就会断粮&#xff0c;于是大家都想要在家里自制蜂蜜杏仁。不过你知道蜂蜜杏仁应该怎么做吗?蜂蜜杏仁的做法其实并不难&#xff0…

    【强化学习】Policy Gradients代码注释版本

    import gym # import tensorflow as tf import numpy as np# Hyper Parameters GAMMA 0.95 # discount factor 折扣因子 LEARNING_RATE 0.01 # 学习率class Policy_Gradient():# 咱们来搞一下大头&#xff01;def __init__(self, env): # 初始化# 先初始化一些参量sel…

    c语言构建栈_选择技术栈构建通用平台

    c语言构建栈Java社区中有许多关于Spring vs Java EE的话题。 一群人会争辩说您应该使用一个而不是其他。等等。当我看到这一点时&#xff0c;我不禁要想为什么我们不能同时使用它们呢&#xff1f; 实际上&#xff0c;我认为有效地使用它们将为建立基础架构创建一个伟大的技术堆…