MapReduce算法–了解数据连接第二部分

自从我上一次发布以来已经有一段时间了,就像我上一次大休息一样,我正在Coursera上一些课程。 这次是Scala中的函数式编程 原理和反应式编程原理 。 我发现它们都是不错的课程,如果有时间的话,建议您选一门。 在本文中,我们将继续介绍如何使用MapReduce实现数据密集型文本处理中的算法的系列文章,这次涵盖了地图端连接。 从名称可以猜出,映射侧联接仅在映射阶段连接数据,而完全跳过简化阶段。 在上一篇有关数据连接的文章中,我们介绍了减少侧连接 。 减少端连接很容易实现,但缺点是所有数据都通过网络发送到减少器。 由于我们避免了跨网络发送数据的开销,因此地图端连接可显着提高性能。 但是,与减少侧联接不同,映射侧联接需要满足非常特定的条件。 今天,我们将讨论地图端连接的要求以及如何实现它们。

地图端加入条件

要利用地图侧联接,我们的数据必须满足以下条件之一:

  1. 要连接的数据集已经按相同的键排序,并且具有相同的分区数
  2. 在要连接的两个数据集中,一个足够小以适合内存

我们将考虑第一种情况,其中有两个(或更多)数据集需要连接,但是太大而无法容纳到内存中。 我们将假设最坏的情况是,文件没有按相同的顺序排序或分区。

资料格式

在开始之前,让我们看一下正在使用的数据。 我们将有两个数据集:

  1. 第一个数据集包括GUID,名字,姓氏,地址,城市和州
  2. 第二个数据集由一个GUID和雇主信息组成

两个数据集均以逗号分隔,并且联接键(GUID)位于第一位置。 加入后,我们希望将数据集2中的雇主信息附加到数据集1的末尾。 此外,我们希望将GUID保持在数据集1的第一个位置,但要从数据集2删除GUID。
资料集1:

aef9422c-d08c-4457-9760-f2d564d673bc,Linda,Narvaez,3253 Davis Street,Atlanta,GA08db7c55-22ae-4199-8826-c67a5689f838,John,Gregory,258 Khale Street,Florence,SCde68186a-1004-4211-a866-736f414eac61,Charles,Arnold,1764 Public Works Drive,Johnson City,TN6df1882d-4c81-4155-9d8b-0c35b2d34284,John,Schofield,65 Summit Park Avenue,Detroit,MI

数据集2:

de68186a-1004-4211-a866-736f414eac61,Jacobs6df1882d-4c81-4155-9d8b-0c35b2d34284,Chief Auto Partsaef9422c-d08c-4457-9760-f2d564d673bc,Earthworks Yard Maintenance08db7c55-22ae-4199-8826-c67a5689f838,Ellman's Catalog Showrooms

合并结果:

08db7c55-22ae-4199-8826-c67a5689f838,John,Gregory,258 Khale Street,Florence,SC,Ellman's Catalog Showrooms
6df1882d-4c81-4155-9d8b-0c35b2d34284,John,Schofield,65 Summit Park Avenue,Detroit,MI,Chief Auto Parts
aef9422c-d08c-4457-9760-f2d564d673bc,Linda,Narvaez,3253 Davis Street,Atlanta,GA,Earthworks Yard Maintenance
de68186a-1004-4211-a866-736f414eac61,Charles,Arnold,1764 Public Works Drive,Johnson City,TN,Jacobs

现在,我们继续介绍如何连接两个数据集。

Map-Side连接具有大数据集

为了能够执行映射端连接,我们需要按相同的键对数据进行排序并具有相同数量的分区,这意味着任何记录的所有键都在同一分区中。 尽管这似乎是一个艰巨的要求,但很容易解决。 Hadoop会对所有键进行排序,并保证将具有相同值的键发送到相同的reducer。 因此,只需运行一个MapReduce作业,该作业除了通过您要连接的键输出数据,并为所有数据集指定完全相同数量的化简器,我们将以正确的形式获取数据。 考虑到能够进行地图侧连接所带来的效率提高,可能值得花费额外的MapReduce作业。 在这一点上,有必要重复一遍,至关重要的是,在“准备”阶段对数据进行排序和分区时,所有数据集都必须指定完全相同数量的化简。 在本文中,我们将获取两个数据集,并在两个数据集上运行初始MapReduce作业以进行排序和分区,然后运行最终作业以执行地图端联接。 首先,让我们介绍一下MapReduce作业,以相同的方式对数据进行排序和分区。

第一步:排序和分区

首先,我们需要创建一个Mapper ,该Mapper将简单地选择要根据给定索引进行排序的键:

public class SortByKeyMapper extends Mapper<LongWritable, Text, Text, Text> {private int keyIndex;private Splitter splitter;private Joiner joiner;private Text joinKey = new Text();@Overrideprotected void setup(Context context) throws IOException, InterruptedException {String separator =  context.getConfiguration().get("separator");keyIndex = Integer.parseInt(context.getConfiguration().get("keyIndex"));splitter = Splitter.on(separator);joiner = Joiner.on(separator);}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {Iterable<String> values = splitter.split(value.toString());joinKey.set(Iterables.get(values,keyIndex));if(keyIndex != 0){value.set(reorderValue(values,keyIndex));}context.write(joinKey,value);}private String reorderValue(Iterable<String> value, int index){List<String> temp = Lists.newArrayList(value);String originalFirst = temp.get(0);String newFirst = temp.get(index);temp.set(0,newFirst);temp.set(index,originalFirst);return joiner.join(temp);}
}

SortByKeyMapper只需通过从在配置参数keyIndex给定位置找到的给定文本行中提取值来简单地设置joinKey的值。 同样,如果keyIndex不等于零,我们交换在第一个位置和keyIndex位置中找到的值的顺序。 尽管这是一个有问题的功能,但我们稍后将讨论为什么要这样做。 接下来,我们需要一个Reducer

public class SortByKeyReducer extends Reducer<Text,Text,NullWritable,Text> {private static final NullWritable nullKey = NullWritable.get();@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {for (Text value : values) {context.write(nullKey,value);}}
}

SortByKeyReducer写出给定键的所有值,但是会NullWritable键并写一个NullWritable 。 在下一节中,我们将解释为什么不使用密钥。

第二步:Map-Side联接

在执行地图侧连接时,记录到达映射器之前会被合并。 为此,我们使用CompositeInputFormat 。 我们还需要设置一些配置属性。 让我们看一下如何配置地图侧连接:

private static Configuration getMapJoinConfiguration(String separator, String... paths) {Configuration config = new Configuration();config.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", separator);String joinExpression = CompositeInputFormat.compose("inner", KeyValueTextInputFormat.class, paths);config.set("mapred.join.expr", joinExpression);config.set("separator", separator);return config;}

首先,我们通过设置mapreduce.input.keyvaluelinerecordreader.key.value.separator属性来指定用于分隔键和值的字符。 接下来,我们使用CompositeInputFormat.compose方法创建一个“联接表达式”,通过使用单词“ inner”指定内部联接 ,然后指定要使用的输入格式, KeyValueTextInput类以及最后一个String varargs,它们表示文件的路径。 join(运行map-reduce作业以对数据进行排序和分区的输出路径)。 KeyValueTextInputFormat类将使用分隔符将第一个值设置为键,其余的将用作该值。

映射器的加入

连接源文件中的值后,将Mapper.map方法,该方法将接收该键的Text对象(连接记录中的相同键)和一个TupleWritable ,该TupleWritable由输入文件中的连接值组成对于给定的密钥。 请记住,我们希望最终输出的第一个位置具有join-key,然后在一个定界的String中包含所有连接的值。 为此,我们有一个自定义的映射器,将我们的数据以正确的格式放置:

public class CombineValuesMapper extends Mapper<Text, TupleWritable, NullWritable, Text> {private static final NullWritable nullKey = NullWritable.get();private Text outValue = new Text();private StringBuilder valueBuilder = new StringBuilder();private String separator;@Overrideprotected void setup(Context context) throws IOException, InterruptedException {separator = context.getConfiguration().get("separator");}@Overrideprotected void map(Text key, TupleWritable value, Context context) throws IOException, InterruptedException {valueBuilder.append(key).append(separator);for (Writable writable : value) {valueBuilder.append(writable.toString()).append(separator);}valueBuilder.setLength(valueBuilder.length() - 1);outValue.set(valueBuilder.toString());context.write(nullKey, outValue);valueBuilder.setLength(0);}
}

CombineValuesMapper我们将键和所有联接的值附加到一个定界的String 。 在这里,我们终于可以看到在以前的MapReduce作业中放弃加入键的原因。 由于键是要连接的所有数据集的值中的第一个位置,因此我们的映射器自然会从连接的数据集中消除重复的键。 我们需要做的就是将给定的键插入StringBuilder ,然后附加TupleWritable包含的值。

全部放在一起

现在,我们拥有所有代码,可以在大型数据集上运行地图端联接。 让我们看一下我们将如何一起运行所有作业。 如前所述,我们假设我们的数据未按相同的顺序进行排序和分区,因此我们将需要运行N(在本例中为2)MapReduce作业,以获取正确格式的数据。 在运行初始排序/分区作业之后,将执行执行实际联接的最终作业。

public class MapSideJoinDriver {public static void main(String[] args) throws Exception {String separator = ",";String keyIndex = "0";int numReducers = 10;String jobOneInputPath = args[0];String jobTwoInputPath = args[1];String joinJobOutPath = args[2];String jobOneSortedPath = jobOneInputPath + "_sorted";String jobTwoSortedPath = jobTwoInputPath + "_sorted";Job firstSort = Job.getInstance(getConfiguration(keyIndex, separator));configureJob(firstSort, "firstSort", numReducers, jobOneInputPath, jobOneSortedPath, SortByKeyMapper.class, SortByKeyReducer.class);Job secondSort = Job.getInstance(getConfiguration(keyIndex, separator));configureJob(secondSort, "secondSort", numReducers, jobTwoInputPath, jobTwoSortedPath, SortByKeyMapper.class, SortByKeyReducer.class);Job mapJoin = Job.getInstance(getMapJoinConfiguration(separator, jobOneSortedPath, jobTwoSortedPath));configureJob(mapJoin, "mapJoin", 0, jobOneSortedPath + "," + jobTwoSortedPath, joinJobOutPath, CombineValuesMapper.class, Reducer.class);mapJoin.setInputFormatClass(CompositeInputFormat.class);List<Job> jobs = Lists.newArrayList(firstSort, secondSort, mapJoin);int exitStatus = 0;for (Job job : jobs) {boolean jobSuccessful = job.waitForCompletion(true);if (!jobSuccessful) {System.out.println("Error with job " + job.getJobName() + "  " + job.getStatus().getFailureInfo());exitStatus = 1;break;}}System.exit(exitStatus);}

MapSideJoinDriver对运行MapReduce作业进行基本配置。 有趣的一点是,排序/分区作业每个都指定10个化简器,而最后一个作业将归化器的数量显式设置为0,因为我们是在地图端加入的,不需要化简阶段。 由于我们没有任何复杂的依赖关系,因此我们将作业放入ArrayList并以线性顺序运行作业(第24-33行)。

结果

最初,我们有2个文件; 第一个文件中的姓名和地址信息,第二个文件中的就业信息。 这两个文件在第一列中都有唯一的ID。
文件一:

....
08db7c55-22ae-4199-8826-c67a5689f838,John,Gregory,258 Khale Street,Florence,SC
...

文件二:

....
08db7c55-22ae-4199-8826-c67a5689f838,Ellman's Catalog Showrooms
....

结果:

08db7c55-22ae-4199-8826-c67a5689f838,John,Gregory,258 Khale Street,Florence,SC,Ellman's Catalog Showrooms

正如我们在这里看到的那样,我们已经成功地将记录合并在一起,并保持了文件格式,而结果中没有重复的键。

结论

在本文中,我们演示了当两个数据集都很大且无法容纳到内存中时如何执行地图端连接。 如果您觉得这需要大量工作才能完成,那么您是正确的。 尽管在大多数情况下,我们希望使用像Pig或Hive这样的高级工具,但了解对大型数据集执行地图侧联接的机制很有帮助。 当您需要从头开始编写解决方案时,尤其如此。 谢谢你的时间。

资源资源

  • Jimmy Lin和Chris Dyer 使用MapReduce进行的数据密集型处理
  • Hadoop: Tom White 的权威指南
  • 来自博客的源代码和测试
  • 爱德华·卡普里奥洛(Edward Capriolo),迪恩·沃普勒(Dean Wampler)和杰森·卢瑟格伦(Jason Rutherglen)的编程蜂巢
  • 通过Alan Gates对Pig进行编程
  • Hadoop API
  • MRUnit用于单元测试Apache Hadoop映射减少工作

参考: MapReduce算法–了解数据 ,是我们的JCG合作伙伴 Bill Bejeck在《 随机编码想法》博客上发表的第二部分。

翻译自: https://www.javacodegeeks.com/2014/02/mapreduce-algorithms-understanding-data-joins-part-ii.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/364831.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C: City----逆向并查集

C: City 时间限制: 1 s 内存限制: 128 MB 题目描述 如果城市A和城市B互通&#xff0c;城市B和城市C互通&#xff0c;那么城市A和城市C也互通&#xff0c;A、B、C三个城市算一个聚集点。先已知有n个城市和m条道路&#xff0c;想求的是有几个聚集点&#xff1f;但小S觉…

java menu字体_Java开发网 - 请问如何让菜单字体变宋体?

Posted by:scottdingPosted on:2003-01-23 12:44贴出了大部分&#xff0c;你看看想改什么吧。Font font new Font("宋体",Font.PLAIN,14);UIManager.put("Button.font",font);UIManager.put("ToggleButton.font",font);UIManager.put("Rad…

Java8中 Parallel Streams 的陷阱 [译]

转载自https://www.cnblogs.com/imyijie/p/4478074.html Java8 提供了三个我们渴望的重要的功能:Lambdas 、 Stream API、以及接口的默认方法。不过我们很容易滥用它们甚至破坏自己的代码。 今天我们来看看Stream api&#xff0c;尤其是 parallel streams。这篇文章概述了其中的…

自定义消息提示框

使用原生JavaScript简单封装的一个消息提示模态框&#xff0c;如果谁有更好的方式可以分享&#xff0c;谢谢&#xff01; <!DOCTYPE html> <html lang"en"><head><title></title><meta charset"UTF-8"><meta name&…

ObjectStreamClass:监视Java对象的序列化

ObjectStreamClass可以是有用的类&#xff0c;用于分析JVM中加载的序列化类的序列化特征。 这篇文章介绍了此类提供的有关已加载序列化类的一些信息。 ObjectStreamClass提供了两个用于查找类的静态方法&#xff1a; lookup&#xff08;class&#xff09;和lookupAny&#xff…

@Transcational特性

捕获RuntimeException捕获Error并不捕获Checked Exception在方法中使用Transcational注解时候&#xff0c;通过throw new Exception&#xff08;&#xff09;&#xff0c;在发生异常的时候不会进行回滚&#xff0c;可以使用throw new RuntimeException&#xff08;&#xff09;…

SpringBoot集成Thymeleaf前端模版

1、在application.properties配置文件中添加 thymeleaf 的配置信息 spring.datasource.driverClassNamecom.mysql.jdbc.Driver spring.datasource.urljdbc:mysql://localhost:3306/test spring.datasource.usernameroot spring.datasource.passwordrootspring.thymeleaf.modeHT…

00005在java结果输出_Java-005-运算符详解

计算机的最基本用途之一就是执行数学运算,作为一门计算机语言Java也提供了套丰富的运算符来操纵变量, 可以把运算符分成以下几组算术运算符、关系运算符、位运算符、逻辑运算符、赋值运算符、其他运算符。①算术运算符用在数学表达式中它们的作用和在数学中的作用一样 表格中的…

spring data jpa 分页查询

法一&#xff08;本地sql查询,注意表名啥的都用数据库中的名称&#xff0c;适用于特定数据库的查询&#xff09; public interface UserRepository extends JpaRepository<User, Long> {Query(value "SELECT * FROM USERS WHERE LASTNAME ?1",countQuery &…

Python之递归

递归的意思是函数自己调用自己。递归次数&#xff1a;递归如果是死循环&#xff0c;最多执行999次。count0 def say():global countcount1print(say)print(count)say()say() #结果&#xff1a; # say # 1 # ... # say # 997 # say # RecursionError: maximum recursion depth e…

使用CDI的InjectionPoint注入配置值

依赖注入是用于组织类依赖的一项出色技术。 当前类中需要的所有类实例都是在运行时从DI容器提供的。 但是您的配置呢&#xff1f; 当然&#xff0c;您可以创建一个“ Configuration”类&#xff0c;并在需要它的任何地方注入该类&#xff0c;并从中获取必要的值。 但是CDI允许…

LOJ.6435.[PKUSC2018]星际穿越(倍增)

LOJBZOJ 参考这儿qwq。 首先询问都是求&#xff0c;向左走的最短路。\(f[i][j]\)表示从\(i\)走到\(j\)最少需要多少步。表示这样只会\(O(n^2\log n)\)的 但是感觉能卡过\(70\)分。 注意到从\(i\)出发&#xff0c;走\(j\)步能到达的点都是一段一段的。所以不妨令\(f[i][j]\)表示…

java setsession_Java Session.setServerAliveInterval方法代码示例

import com.jcraft.jsch.Session; //导入方法依赖的package包/类private Session startNewSession(boolean acquireChannel) throws JSchException, InterruptedException {Session newSession null;final AtomicBoolean cancelled new AtomicBoolean(false);ConnectingProgr…

鼠标拖动改变DIV等网页元素的大小的最佳实践

1.初次实现 1.1 html代码 <html xmlns"http://www.w3.org/1999/xhtml" xml:lang"en" lang"en"><head><meta http-equiv"Content-Type" content"text/html; charsetutf-8" /><title>div change wid…

[WC2006]水管局长

水管局长 题目链接&#xff1a;https://www.luogu.org/problemnew/show/P4172#sub LCT 显然两个点的路径上的边最大要最小在该图最小生成树上 正删倒加&#xff0c;倒着做变成加边操作 加边时判断一下是否能形成更优的生成树&#xff0c;用LCT删除和连接操作即可 1 #include<…

JDBC 4.0鲜为人知的Clob.free()和Blob.free()方法

在会议上谈论jOOQ时&#xff0c;我总是展示此幻灯片&#xff0c;其中包含许多人们经常犯的非常常见的JDBC错误&#xff1a; 此图中的六个常见的JDBC错误 您可以找到错误吗&#xff1f; 其中一些是显而易见的&#xff0c;例如&#xff1a; 第4行&#xff1a;由于第3行的连接…

反沙箱——SetErrorMode

目录 1.前言 2.原理讲解 3.代码实现 4.参考 1.前言 利用SetErrorMode进行反沙箱的技术&#xff0c;在2010年就有被提出&#xff0c;但是之前搜了很久都没有相关内容&#xff0c;这里简单的说一下这个反沙箱的实现。反沙箱参考GandCrab5.2。 2.原理讲解 首先讲一下SetErrorMode这…

bat java 启动脚本_从bat脚本运行的Java应用程序上的Windows关闭挂钩

小编典典在极少数情况下&#xff0c;虚拟机可能会中止&#xff0c;即在不完全关闭的情况下停止运行。当虚拟机在外部终止时会发生这种情况&#xff0c;例如在Unix上使用SIGKILL信号或在MicrosoftWindows上使用TerminateProcess调用。因此&#xff0c;不幸的是&#xff0c;我认为…

C89和C99区别--简单总结

&#xff08;1&#xff09;对数组的增强 可变长数组  C99中,程序员声明数组时,数组的维数可以由任一有效的整型表达式确定,包括只在运行时才能确定其值的表达式,这类数组就叫做可变长数组,但是只有局部数组才可以是变长的.可变长数组的维数在数组生存期内是不变的,也就是说,可…

四、spring boot 1.5.4 日志管理

spring boot日志默认采用logback进行输出&#xff0c;你可以对logback进行定制化&#xff0c;方法如下&#xff1a; 在resources文件夹下建立logback.xml配置文件 <?xml version"1.0" encoding"UTF-8"?> <configuration><!-- base.xml i…