用Java编写Hadoop MapReduce任务

尽管Hadoop框架本身是使用Java创建的,但MapReduce作业可以用许多不同的语言编写。 在本文中,我将展示如何像其他Java项目一样,基于Maven项目在Java中创建MapReduce作业。

    • 准备示例输入

让我们从一个虚构的商业案例开始。 在这种情况下,我们需要一个CSV文件,其中包含字典中的英语单词,并添加了其他语言的所有翻译,并以'|'分隔 符号。 我已经根据这篇文章给出了这个例子。 因此,这项工作将阅读不同语言的词典,并将每个英语单词与另一种语言的翻译匹配。 作业的输入字典是从此处获取的 。 我下载了几种不同语言的文件,并将它们放到一个文件中(Hadoop处理多个大文件比处理多个小文件更好)。 我的示例文件可以在这里找到。

    • 创建Java MapReduce项目

下一步是为MapReduce作业创建Java代码。 就像我在使用Maven项目之前所说的那样,所以我在自己的IDE IntelliJ中创建了一个新的空Maven项目。 我修改了默认pom以添加必要的插件和依赖项:
我添加的依赖项:

<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-core</artifactId><version>1.2.0</version><scope>provided</scope>
</dependency>

Hadoop依赖关系对于使用MapReduce作业中的Hadoop类是必需的。 由于我想在AWS EMR上运行作业,因此请确保我具有匹配的Hadoop版本。 此外,由于Hadoop框架将在Hadoop群集上可用,因此可以将范围设置为“已提供”。

除了依赖关系之外,我还在pom.xml中添加了以下两个插件:

<plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-jar-plugin</artifactId><configuration><archive><manifest><addClasspath>true</addClasspath><mainClass>net.pascalalma.hadoop.Dictionary</mainClass></manifest></archive></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.6</source><target>1.6</target></configuration></plugin>
</plugins>

第一个插件用于创建我们项目的可执行jar。 这使JAR在Hadoop集群上的运行更加容易,因为我们不必声明主类。

为了使创建的JAR与AWS EMR集群的实例兼容,第二个插件是必需的。 该AWS集群随附JDK 1.6。 如果您忽略此选项,则群集将失败(我收到类似“不支持的major.minor版本51.0”之类的消息)。 稍后,我将在另一篇文章中介绍如何设置此AWS EMR集群。

这是基本项目,就像常规的Java项目一样。 接下来让我们实现MapReduce作业。

    • 实现MapReduce类

我已经描述了我们要在第一步中执行的功能。 为此,我在Hadoop项目中创建了三个Java类。 第一类是“ Mapper ”:

package net.pascalalma.hadoop;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;
import java.util.StringTokenizer;/*** Created with IntelliJ IDEA.* User: pascal* Date: 16-07-13* Time: 12:07*/
public class WordMapper extends Mapper<Text,Text,Text,Text> {private Text word = new Text();public void map(Text key, Text value, Context context) throws IOException, InterruptedException{StringTokenizer itr = new StringTokenizer(value.toString(),",");while (itr.hasMoreTokens()){word.set(itr.nextToken());context.write(key, word);}}
}

这个课不是很复杂。 它只是从输入文件中接收一行,并为其创建一个Map,该映射中的每个键都有一个值(在此阶段允许多个键)。

下一类是“ Reducer ”,它将地图缩小为所需的输出:

package net.pascalalma.hadoop;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/*** Created with IntelliJ IDEA.* User: pascal* Date: 17-07-13* Time: 19:50*/
public class AllTranslationsReducer extends Reducer<Text, Text, Text, Text> {private Text result = new Text();@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {String translations = "";for (Text val : values) {translations += "|" + val.toString();}result.set(translations);context.write(key, result);}
}

减少步骤将收集给定键的所有值,并将它们彼此之间用“ |”分隔 符号。

剩下的最后一堂课是将所有内容放在一起以使其可运行的工作:

package net.pascalalma.hadoop;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;/*** Created with IntelliJ IDEA.* User: pascal* Date: 16-07-13* Time: 12:07*/
public class Dictionary {public static void main(String[] args) throws Exception{Configuration conf = new Configuration();Job job = new Job(conf, "dictionary");job.setJarByClass(Dictionary.class);job.setMapperClass(WordMapper.class);job.setReducerClass(AllTranslationsReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);job.setInputFormatClass(KeyValueTextInputFormat.class);job.setOutputFormatClass(TextOutputFormat.class);FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1]));boolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}
}

在这种主要方法中,我们将一个Job放在一起并运行它。 请注意,我只是希望args [0]和args [1]是输入文件和输出目录的名称(不存在)。 我没有为此添加任何检查。 这是我在IntelliJ中的“运行配置”:

屏幕截图-2013-08-15-at-21-36-35

只需确保在运行类时输出目录不存在。 作业创建的日志记录输出如下所示:

2013-08-15 21:37:00.595 java[73982:1c03] Unable to load realm info from SCDynamicStore
aug 15, 2013 9:37:01 PM org.apache.hadoop.util.NativeCodeLoader <clinit>
WARNING: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
aug 15, 2013 9:37:01 PM org.apache.hadoop.mapred.JobClient copyAndConfigureFiles
WARNING: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
aug 15, 2013 9:37:01 PM org.apache.hadoop.mapred.JobClient copyAndConfigureFiles
WARNING: No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
aug 15, 2013 9:37:01 PM org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus
INFO: Total input paths to process : 1
aug 15, 2013 9:37:01 PM org.apache.hadoop.io.compress.snappy.LoadSnappy <clinit>
WARNING: Snappy native library not loaded
aug 15, 2013 9:37:01 PM org.apache.hadoop.mapred.JobClient monitorAndPrintJob
INFO: Running job: job_local_0001
aug 15, 2013 9:37:01 PM org.apache.hadoop.mapred.Task initialize
INFO:  Using ResourceCalculatorPlugin : null
aug 15, 2013 9:37:01 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
INFO: io.sort.mb = 100
aug 15, 2013 9:37:01 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
INFO: data buffer = 79691776/99614720
aug 15, 2013 9:37:01 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
INFO: record buffer = 262144/327680
aug 15, 2013 9:37:02 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
INFO: Starting flush of map output
aug 15, 2013 9:37:02 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
INFO: Finished spill 0
aug 15, 2013 9:37:02 PM org.apache.hadoop.mapred.Task done
INFO: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
aug 15, 2013 9:37:02 PM org.apache.hadoop.mapred.JobClient monitorAndPrintJob
INFO:  map 0% reduce 0%
aug 15, 2013 9:37:04 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
INFO: 
aug 15, 2013 9:37:04 PM org.apache.hadoop.mapred.Task sendDone
INFO: Task 'attempt_local_0001_m_000000_0' done.
aug 15, 2013 9:37:04 PM org.apache.hadoop.mapred.Task initialize
INFO:  Using ResourceCalculatorPlugin : null
aug 15, 2013 9:37:04 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
INFO: 
aug 15, 2013 9:37:04 PM org.apache.hadoop.mapred.Merger$MergeQueue merge
INFO: Merging 1 sorted segments
aug 15, 2013 9:37:04 PM org.apache.hadoop.mapred.Merger$MergeQueue merge
INFO: Down to the last merge-pass, with 1 segments left of total size: 524410 bytes
aug 15, 2013 9:37:04 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
INFO: 
aug 15, 2013 9:37:05 PM org.apache.hadoop.mapred.Task done
INFO: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
aug 15, 2013 9:37:05 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
INFO: 
aug 15, 2013 9:37:05 PM org.apache.hadoop.mapred.Task commit
INFO: Task attempt_local_0001_r_000000_0 is allowed to commit now
aug 15, 2013 9:37:05 PM org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
INFO: Saved output of task 'attempt_local_0001_r_000000_0' to /Users/pascal/output
aug 15, 2013 9:37:05 PM org.apache.hadoop.mapred.JobClient monitorAndPrintJob
INFO:  map 100% reduce 0%
aug 15, 2013 9:37:07 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
INFO: reduce > reduce
aug 15, 2013 9:37:07 PM org.apache.hadoop.mapred.Task sendDone
INFO: Task 'attempt_local_0001_r_000000_0' done.
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.JobClient monitorAndPrintJob
INFO:  map 100% reduce 100%
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.JobClient monitorAndPrintJob
INFO: Job complete: job_local_0001
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO: Counters: 17
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:   File Output Format Counters 
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:     Bytes Written=423039
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:   FileSystemCounters
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:     FILE_BYTES_READ=1464626
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:     FILE_BYTES_WRITTEN=1537251
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:   File Input Format Counters 
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:     Bytes Read=469941
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:   Map-Reduce Framework
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:     Reduce input groups=11820
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:     Map output materialized bytes=524414
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:     Combine output records=0
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:     Map input records=20487
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:     Reduce shuffle bytes=0
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:     Reduce output records=11820
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:     Spilled Records=43234
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:     Map output bytes=481174
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:     Total committed heap usage (bytes)=362676224
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:     Combine input records=0
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:     Map output records=21617
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:     SPLIT_RAW_BYTES=108
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:     Reduce input records=21617Process finished with exit code 0

可以在提供的输出目录中找到此作业创建的输出文件,如以下屏幕截图所示:

屏幕截图-2013-08-15-at-21-42-49

如您所见,我们可以在IDE中(或从命令行)运行此main方法,但是我想在去之前在Mapper和Reducer上执行一些单元测试。 我将在另一篇文章中演示如何做到这一点。

参考:在The Pragmatic Integrator博客上,由我们的JCG合作伙伴 Pascal Alma 用 Java编写了Hadoop MapReduce任务 。

翻译自: https://www.javacodegeeks.com/2013/08/writing-a-hadoop-mapreduce-task-in-java.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/367631.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Spring集成:轻量级集成方法

当今的应用程序希望能够访问企业环境中的所有业务&#xff0c;而无需考虑与绝望的系统无缝集成的应用程序技术。 可以通过使用中间件技术对各种系统进行布线来实现这种集成。 集成平台使应用程序可以相互共享信息的环境&#xff0c;从而使体系结构具有高度的互操作性。 Spring…

接口IDisposable的用法

C#的每一个类型都代表一种资源&#xff0c;而资源又分为两类&#xff1a; 托管资源 由CLR管理分配和释放的资源&#xff0c;即从CLR里new出来的对象。非托管资源 不受CLR管理的对象&#xff0c;如Windows内核对象&#xff0c;或者文件、数据库连接、套接字、COM对象等。如果类…

图形处理:betweeness中心性– neo4j的密码与graphstream

上周&#xff0c; 我写了关于中间性中心性算法以及使用graphstream 理解它的尝试 &#xff0c;在阅读源代码时&#xff0c;我意识到我可以使用neo4j的所有最短路径算法将某些东西放在一起。 概括地说&#xff0c;中间性中心度算法用于确定图中节点的负载和重要性。 在与Jen讨…

小程序之Tab切换

小程序越来越火了&#xff0c;作为一名&#xff0c;额 有理想的攻城狮&#xff0c;当然要紧跟互联网时代的步伐啦&#xff0c;于是我赶紧抽时间学习了一下小程序的开发&#xff0c;顺便把经验分享给大家。 对于申请账号以及安装开发工具等&#xff0c;大家可以看官网&#xff…

6759: 异或序列

6759: 异或序列 时间限制: 1 Sec 内存限制: 128 MB题目描述 已知一个长度为n的整数数列a1,a2,…,an&#xff0c;给定查询参数l、r&#xff0c;问在al,al1,…,ar区间内&#xff0c;有多少子序列满足异或和等于k。也就是说&#xff0c;对于所有的x,y(l≤x≤y≤r)&#xff0c;满足…

java随机数排序算法_理解快速排序算法

快速排序在平均状况下&#xff0c;排序n个项目要Ο(n log n)次比较。在最坏状况下则需要Ο(n^2)次比较&#xff0c;但这种状况并不常见。事实上&#xff0c;快速排序通常明显比 其他Ο(n log n)算法更快&#xff0c;因为它的内部循环(inner loop)可以在大部分的架构上很有效率地…

开课吧视频内容汇总

1. 前端读取文件内容&#xff0c; FileReader对象 2. 用户联网状态 3. application/x-www-form-urlencoded 参数序列化 &#xff08;具体借鉴jquery的$.param方法&#xff09;&#xff0c;后端接收到的数据格式是 a[0][a] 1,并不会将其整理成对象或者数组 var nextStr ;funct…

Servlet和JSP中的文件上传示例

使用Servlet和JSP将文件上传到服务器是Java Web应用程序中的常见任务。 在对Servlet或JSP进行编码以处理文件上传请求之前&#xff0c;您需要了解一点有关HTML和HTTP协议中文件上传支持的知识。 如果要让用户从文件系统中选择文件并上传到服务器&#xff0c;则需要使用<inpu…

20165312-第4周-课上内容补做以及知识点总结

20165312-第4周-课上内容补做以及知识点总结 1、课上内容补做 教材代码完成情况测试p45这题很快就做完了&#xff0c;然后忘记提交了。。就开始做递归。想起来的时候已经过了时间。 public class Example3_7 {public static void main(String args[]) {int sum0,i,j;for(i1;i&l…

JavaScript实现表单的全选,反选,获取值

构思 通过for循环和for in循环来实现&#xff0c;界面效果如下 步骤 全选&#xff1a; 循环给所有的表单设置checked 反选&#xff1a; 循环内判断checked是否为true&#xff0c;如果为true则改为false否则改为true 获取值&#xff1a; 最开始用for取&#xff0c;但是只打印最后…

EJB钝化和激活示例

在本教程中&#xff0c;我们将了解状态Java企业会话Bean中激活和钝化的工作方式。 1.简介 有状态会话Bean通常保存有关特定客户端的信息&#xff0c;并在整个会话中保存该信息。 但是&#xff0c;事实是&#xff0c;客户端会话往往会在相当长的时间内保持活动状态&#xff0c;…

命令模式详解

原文链接:https://www.cnblogs.com/java-my-life/archive/2012/06/01/2526972.html 在阎宏博士的《JAVA与模式》一书中开头是这样描述命令&#xff08;Command&#xff09;模式的&#xff1a; 命令模式属于对象的行为模式。命令模式又称为行动(Action)模式或交易(Transaction)模…

c mysql5.7_CentOS7下MySQL5.7的三种安装方式详解

操作系统环境&#xff1a;CentOS 7.4最小化安装[rootnode3 src]# cat /etc/redhat-releaseCentOS Linux release 7.4.1708 (Core)[rootnode3 ~]# uname -r3.10.0-693.5.2.el7.x86_64[rootnode3 ~]#安装版本为&#xff1a;MySQL 5.7.20一、编译安装MySQL5.71、下载源码包[rootno…

Struts2 学习之小白开始

Struts2 基础知识学习总结 Struts2 概述&#xff1a;Struts2 是一个用来开发 MVC 应用程序的框架&#xff0c;他提供了 Web 应用程序开发过程中的一些常见问题的解决方案&#xff0c;比如对于用户输入信息合法性的验证&#xff0c;统一的布局&#xff0c;国际化等&#xff0c;既…

机器学习的数学基础 - 信息论

机器学习的数学基础 - 信息论 信息论 信息论本来是通信中的概念&#xff0c;但是其核心思想“熵”在机器学习中也得到了广泛的应用。比如决策树模型ID3&#xff0c;C4.5中是利用信息增益来划分特征而生成一颗决策树的&#xff0c;而信息增益就是基于这里所说的熵。所以它的重要…

了解ElasticSearch分析器

令人遗憾的是&#xff0c;许多早期的互联网啤酒配方不一定采用易于消化的格式。 也就是说&#xff0c;这些食谱是通常在电子邮件或论坛帖子中最初组成的非结构化的方向和成分混合列表。 因此&#xff0c;尽管很难轻松地将这些配方放入传统的数据存储中&#xff08;表面上看是为…

c++简单程序设计-2

1.验证性实验部分①函数声明和函数定义各自的作用及二者的区别&#xff1a;函数声明就是调用函数之前提示一下有这个函数函数定义就是写一个函数②什么是形参&#xff1f;什么是实参&#xff1f;函数参数和返回值在函数中起到什么作用&#xff1f;函数定义时写的参数叫做形参&a…

Linux虚机安装配置Tomcat

d第一步&#xff1a;下载Tomcat包&#xff0c;网址http://tomcat.apache.org/ 选择tar.gz包下载&#xff0c;并传到虚机中 第二步&#xff1a;解压下载好的Tomcat包 命令&#xff1a;tar -zxvf apache-tomcat-8.0.53.tar.gz 第三步&#xff1a;配置环境变量 进入到Tomcat下bin包…

Nginx安装及配置详解

nginx概述 nginx是一款自由的、开源的、高性能的HTTP服务器和反向代理服务器&#xff1b;同时也是一个IMAP、POP3、SMTP代理服务器&#xff1b;nginx可以作为一个HTTP服务器进行网站的发布处理&#xff0c;另外nginx可以作为反向代理进行负载均衡的实现。 这里主要通过三个方面…

servlet简单概括总结

最近在看java web的相关内容&#xff0c;不管是整体还是细节&#xff0c;要学习的知识有很多&#xff0c;所以有一个好的学习体系非常重要。在阅读学习一些博客和教程中关于servlet的内容后&#xff0c;现将知识体系和自己的总结体会进行梳理&#xff0c;希望在更深入理解的同时…