尽管Hadoop框架本身是使用Java创建的,但MapReduce作业可以用许多不同的语言编写。 在本文中,我将展示如何像其他Java项目一样,基于Maven项目在Java中创建MapReduce作业。
- 准备示例输入
让我们从一个虚构的商业案例开始。 在这种情况下,我们需要一个CSV文件,其中包含字典中的英语单词,并添加了其他语言的所有翻译,并以'|'分隔 符号。 我已经根据这篇文章给出了这个例子。 因此,这项工作将阅读不同语言的词典,并将每个英语单词与另一种语言的翻译匹配。 作业的输入字典是从此处获取的 。 我下载了几种不同语言的文件,并将它们放到一个文件中(Hadoop处理多个大文件比处理多个小文件更好)。 我的示例文件可以在这里找到。
- 创建Java MapReduce项目
下一步是为MapReduce作业创建Java代码。 就像我在使用Maven项目之前所说的那样,所以我在自己的IDE IntelliJ中创建了一个新的空Maven项目。 我修改了默认pom以添加必要的插件和依赖项:
我添加的依赖项:
<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-core</artifactId><version>1.2.0</version><scope>provided</scope>
</dependency>
Hadoop依赖关系对于使用MapReduce作业中的Hadoop类是必需的。 由于我想在AWS EMR上运行作业,因此请确保我具有匹配的Hadoop版本。 此外,由于Hadoop框架将在Hadoop群集上可用,因此可以将范围设置为“已提供”。
除了依赖关系之外,我还在pom.xml中添加了以下两个插件:
<plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-jar-plugin</artifactId><configuration><archive><manifest><addClasspath>true</addClasspath><mainClass>net.pascalalma.hadoop.Dictionary</mainClass></manifest></archive></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.6</source><target>1.6</target></configuration></plugin>
</plugins>
第一个插件用于创建我们项目的可执行jar。 这使JAR在Hadoop集群上的运行更加容易,因为我们不必声明主类。
为了使创建的JAR与AWS EMR集群的实例兼容,第二个插件是必需的。 该AWS集群随附JDK 1.6。 如果您忽略此选项,则群集将失败(我收到类似“不支持的major.minor版本51.0”之类的消息)。 稍后,我将在另一篇文章中介绍如何设置此AWS EMR集群。
这是基本项目,就像常规的Java项目一样。 接下来让我们实现MapReduce作业。
- 实现MapReduce类
我已经描述了我们要在第一步中执行的功能。 为此,我在Hadoop项目中创建了三个Java类。 第一类是“ Mapper ”:
package net.pascalalma.hadoop;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;
import java.util.StringTokenizer;/*** Created with IntelliJ IDEA.* User: pascal* Date: 16-07-13* Time: 12:07*/
public class WordMapper extends Mapper<Text,Text,Text,Text> {private Text word = new Text();public void map(Text key, Text value, Context context) throws IOException, InterruptedException{StringTokenizer itr = new StringTokenizer(value.toString(),",");while (itr.hasMoreTokens()){word.set(itr.nextToken());context.write(key, word);}}
}
这个课不是很复杂。 它只是从输入文件中接收一行,并为其创建一个Map,该映射中的每个键都有一个值(在此阶段允许多个键)。
下一类是“ Reducer ”,它将地图缩小为所需的输出:
package net.pascalalma.hadoop;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/*** Created with IntelliJ IDEA.* User: pascal* Date: 17-07-13* Time: 19:50*/
public class AllTranslationsReducer extends Reducer<Text, Text, Text, Text> {private Text result = new Text();@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {String translations = "";for (Text val : values) {translations += "|" + val.toString();}result.set(translations);context.write(key, result);}
}
减少步骤将收集给定键的所有值,并将它们彼此之间用“ |”分隔 符号。
剩下的最后一堂课是将所有内容放在一起以使其可运行的工作:
package net.pascalalma.hadoop;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;/*** Created with IntelliJ IDEA.* User: pascal* Date: 16-07-13* Time: 12:07*/
public class Dictionary {public static void main(String[] args) throws Exception{Configuration conf = new Configuration();Job job = new Job(conf, "dictionary");job.setJarByClass(Dictionary.class);job.setMapperClass(WordMapper.class);job.setReducerClass(AllTranslationsReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);job.setInputFormatClass(KeyValueTextInputFormat.class);job.setOutputFormatClass(TextOutputFormat.class);FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1]));boolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}
}
在这种主要方法中,我们将一个Job放在一起并运行它。 请注意,我只是希望args [0]和args [1]是输入文件和输出目录的名称(不存在)。 我没有为此添加任何检查。 这是我在IntelliJ中的“运行配置”:
只需确保在运行类时输出目录不存在。 作业创建的日志记录输出如下所示:
2013-08-15 21:37:00.595 java[73982:1c03] Unable to load realm info from SCDynamicStore
aug 15, 2013 9:37:01 PM org.apache.hadoop.util.NativeCodeLoader <clinit>
WARNING: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
aug 15, 2013 9:37:01 PM org.apache.hadoop.mapred.JobClient copyAndConfigureFiles
WARNING: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
aug 15, 2013 9:37:01 PM org.apache.hadoop.mapred.JobClient copyAndConfigureFiles
WARNING: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
aug 15, 2013 9:37:01 PM org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus
INFO: Total input paths to process : 1
aug 15, 2013 9:37:01 PM org.apache.hadoop.io.compress.snappy.LoadSnappy <clinit>
WARNING: Snappy native library not loaded
aug 15, 2013 9:37:01 PM org.apache.hadoop.mapred.JobClient monitorAndPrintJob
INFO: Running job: job_local_0001
aug 15, 2013 9:37:01 PM org.apache.hadoop.mapred.Task initialize
INFO: Using ResourceCalculatorPlugin : null
aug 15, 2013 9:37:01 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
INFO: io.sort.mb = 100
aug 15, 2013 9:37:01 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
INFO: data buffer = 79691776/99614720
aug 15, 2013 9:37:01 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
INFO: record buffer = 262144/327680
aug 15, 2013 9:37:02 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
INFO: Starting flush of map output
aug 15, 2013 9:37:02 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
INFO: Finished spill 0
aug 15, 2013 9:37:02 PM org.apache.hadoop.mapred.Task done
INFO: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
aug 15, 2013 9:37:02 PM org.apache.hadoop.mapred.JobClient monitorAndPrintJob
INFO: map 0% reduce 0%
aug 15, 2013 9:37:04 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
INFO:
aug 15, 2013 9:37:04 PM org.apache.hadoop.mapred.Task sendDone
INFO: Task 'attempt_local_0001_m_000000_0' done.
aug 15, 2013 9:37:04 PM org.apache.hadoop.mapred.Task initialize
INFO: Using ResourceCalculatorPlugin : null
aug 15, 2013 9:37:04 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
INFO:
aug 15, 2013 9:37:04 PM org.apache.hadoop.mapred.Merger$MergeQueue merge
INFO: Merging 1 sorted segments
aug 15, 2013 9:37:04 PM org.apache.hadoop.mapred.Merger$MergeQueue merge
INFO: Down to the last merge-pass, with 1 segments left of total size: 524410 bytes
aug 15, 2013 9:37:04 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
INFO:
aug 15, 2013 9:37:05 PM org.apache.hadoop.mapred.Task done
INFO: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
aug 15, 2013 9:37:05 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
INFO:
aug 15, 2013 9:37:05 PM org.apache.hadoop.mapred.Task commit
INFO: Task attempt_local_0001_r_000000_0 is allowed to commit now
aug 15, 2013 9:37:05 PM org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
INFO: Saved output of task 'attempt_local_0001_r_000000_0' to /Users/pascal/output
aug 15, 2013 9:37:05 PM org.apache.hadoop.mapred.JobClient monitorAndPrintJob
INFO: map 100% reduce 0%
aug 15, 2013 9:37:07 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
INFO: reduce > reduce
aug 15, 2013 9:37:07 PM org.apache.hadoop.mapred.Task sendDone
INFO: Task 'attempt_local_0001_r_000000_0' done.
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.JobClient monitorAndPrintJob
INFO: map 100% reduce 100%
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.JobClient monitorAndPrintJob
INFO: Job complete: job_local_0001
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO: Counters: 17
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO: File Output Format Counters
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO: Bytes Written=423039
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO: FileSystemCounters
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO: FILE_BYTES_READ=1464626
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO: FILE_BYTES_WRITTEN=1537251
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO: File Input Format Counters
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO: Bytes Read=469941
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO: Map-Reduce Framework
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO: Reduce input groups=11820
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO: Map output materialized bytes=524414
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO: Combine output records=0
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO: Map input records=20487
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO: Reduce shuffle bytes=0
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO: Reduce output records=11820
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO: Spilled Records=43234
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO: Map output bytes=481174
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO: Total committed heap usage (bytes)=362676224
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO: Combine input records=0
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO: Map output records=21617
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO: SPLIT_RAW_BYTES=108
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO: Reduce input records=21617Process finished with exit code 0
可以在提供的输出目录中找到此作业创建的输出文件,如以下屏幕截图所示:
如您所见,我们可以在IDE中(或从命令行)运行此main方法,但是我想在去之前在Mapper和Reducer上执行一些单元测试。 我将在另一篇文章中演示如何做到这一点。
翻译自: https://www.javacodegeeks.com/2013/08/writing-a-hadoop-mapreduce-task-in-java.html