1 实例描述
对数据文件中的数据进行去重。数据文件中的每行都是一个数据
样例输入如下所示:
1)file1
2012-3-1 a
2012-3-2 b
2012-3-3 c
2012-3-4 d
2012-3-5 a
2012-3-6 b
2012-3-7 c
2012-3-3 c
2)file2
2012-3-1 b
2012-3-2 a
2012-3-3 b
2012-3-4 d
2012-3-5 a
2012-3-6 c
2012-3-7 d
2012-3-3 c
期望输出:
2012-3-1 a
2012-3-1 b
2012-3-2 a
2012-3-2 b
2012-3-3 b
2012-3-3 c
2012-3-4 d
2012-3-5 a
2012-3-6 b
2012-3-6 c
2012-3-7 c
2012-3-7 d
2 问题分析
数据去重的最终目标是让原始数据中出现次数超过一次的数据在输出文件中只出现一次。
分析:
根据reduce的过程特性,会自动根据key来计算输入的value集合
把数据作为key输出给reduce,无论这个数据出现多少次,reduce最终结果中key只能输出一次。
3.实现步骤
实例中每个数据代表输入文件中的一行内容,map阶段采用Hadoop默认的作业输入方式。
将value设置为key,并直接输出。 map输出数据的key为数据,将value设置成空值
2. 在MapReduce流程中,map的输出<key,value>经过shuffle过程聚集成<key,value-list>后会交给reduce
3. reduce阶段不管每个key有多少个value,它直接将输入的key复制为输出的key,并输出(输出中的value被设置成空)。
4.关键代码
package com.mk.mapreduce;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;
import java.net.URI;public class Distinct {public static class DistinctMapper extends Mapper<LongWritable, Text, Text, NullWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {value.set(value.toString().trim());context.write(value, NullWritable.get());}}public static class DistinctReducer extends Reducer<Text,NullWritable, Text, NullWritable> {@Overrideprotected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {context.write(key, NullWritable.get());}}public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {String uri = "hdfs://192.168.150.128:9000";String input = "/distinct/input";String output = "/distinct/output";Configuration conf = new Configuration();if(System.getProperty("os.name").toLowerCase().contains("win"))conf.set("mapreduce.app-submission.cross-platform","true");FileSystem fileSystem = FileSystem.get(URI.create(uri), conf);Path path = new Path(output);fileSystem.delete(path,true);Job job = new Job(conf,"Distinct");job.setJar("./out/artifacts/hadoop_test_jar/hadoop-test.jar");job.setJarByClass(Distinct.class);job.setMapperClass(DistinctMapper.class);job.setCombinerClass(DistinctReducer.class);job.setReducerClass(DistinctReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);FileInputFormat.addInputPaths(job, uri + input);FileOutputFormat.setOutputPath(job, new Path(uri + output));boolean ret = job.waitForCompletion(true);System.out.println(job.getJobName() + "-----" +ret);}
}