c001.txt
------------------------------
filetype|commid|commname|addressid
comm|1|罗湖小区1|1
comm|2|罗湖小区2|1
comm|3|宝安小区1|4
comm|4|南山小区1|3
comm|5|南山小区2|3
comm|6|福田小区1|2
comm|7|福田小区2|2
comm|8|宝安2|4
comm|9|南山3|3
c002.txt
----------------------------
filetype|commid|commname|addressid
comm|10|罗湖小区7|1
comm|11|罗湖小区8|1
comm|12|宝安小区5|4
comm|13|南山小区6|3
comm|14|南山小区7|3
comm|15|福田小区6|2
comm|16|福田小区8|2
a001.txt
-------------------------
filetype|addressid|address
addr|1|罗湖
addr|2|福田
addr|3|南山
addr|4|宝安
输出结果:
-----------------------
commid commname addr
15 福田小区6 福田
16 福田小区8 福田
6 福田小区1 福田
7 福田小区2 福田
13 南山小区6 南山
14 南山小区7 南山
4 南山小区1 南山
5 南山小区2 南山
9 南山3 南山
3 宝安小区1 宝安
8 宝安2 宝安
12 宝安小区5 宝安
----------------------------
代码:
package org.apache.hadoop.examples;import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.fs.Path;public class TestUnion {public static int count=0;public static class TestUnionMapper extends Mapper<Object,Text,Text,Text>{ public void map(Object key,Text values,Context context) throws IOException,InterruptedException{ if(values.toString().indexOf("filetype")>=0){ return;}StringTokenizer itr=new StringTokenizer(values.toString(),"|");String fileType="";String fileTypeId="";while(itr.hasMoreTokens()){ fileType=itr.nextToken(); if(fileType.compareToIgnoreCase("addr")==0) { String addressId=itr.nextToken();String addressName=itr.nextToken();fileTypeId="2"; //标记为地址context.write(new Text(addressId),new Text(fileTypeId+"|"+addressName));}else if(fileType.compareToIgnoreCase("comm")==0) { String commId=itr.nextToken();String commName=itr.nextToken();String addressId=itr.nextToken();fileTypeId="1"; //标记为小区context.write(new Text(addressId),new Text(fileTypeId+"|"+commId+"|"+commName));}}}}public static class TestUnionReducer extends Reducer<Text,Text,Text,Text>{public void reduce(Text key,Iterable<Text> values,Context context) throws IOException,InterruptedException{List<String> addrs=new ArrayList<String>();List<String> comms=new ArrayList<String>();if(count<=0){count++;context.write(new Text("commid"),new Text("commname addr")); return;}else { for(Text val:values){String []astr=val.toString().trim().split("\\|"); // | 为特殊字符,必须转义 String fileTypeId=astr[0]; if(fileTypeId.compareToIgnoreCase("1")==0) //comm { String commId=astr[1];String commName=astr[2];comms.add(commId+" "+commName); }else if(fileTypeId.compareToIgnoreCase("2")==0) //addr {String addr=astr[1];addrs.add(addr); } }}if(comms.size()>0 && addrs.size()>0){ for(int m=0;m<comms.size();m++) for(int n=0;n<addrs.size();n++) //其实只有一条记录对应上面的context.write(new Text(comms.get(m)),new Text(addrs.get(n))); }}}public static void main(String[] args) throws Exception{// TODO Auto-generated method stubif(args.length!=2){System.err.println("please input two agrs:<in> <out>");System.exit(2);}Configuration conf=new Configuration();Job job=new Job(conf,"union data");job.setJarByClass(TestUnion.class);job.setMapperClass(TestUnionMapper.class);job.setReducerClass(TestUnionReducer.class);//job.setNumReduceTasks(0);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);FileInputFormat.addInputPath(job,new Path(args[0]));FileOutputFormat.setOutputPath(job,new Path(args[1]));System.exit(job.waitForCompletion(true)?0:1);}}
主要利用了reduce函数相同的KEY值聚合在一起的规则。