多表关联和单表关联类似,它也是通过对原始数据进行一定的处理,从其中挖掘出关心的信息
1 实例描述
输入是两个文件,一个代表工厂表,包含工厂名列和地址编号列;另一个代表地址表,包含地址名列和地址编号列。要求从输入数据中找出工厂名和地址名的对应关系,输出"工厂名——地址名"表
样例输入如下所示:
1)factory.txt
factoryname addressed
Beijing Red Star 1
Shenzhen Thunder 3
Guangzhou Honda 2
Beijing Rising 1
Guangzhou Development Bank 2
Tencent 3
Back of Beijing 1
2)address.txt
addressID addressname
1 Beijing
2 Guangzhou
3 Shenzhen
4 Xian
期望输出:
factoryname addressname
Back of Beijing Beijing
Beijing Red Star Beijing
Beijing Rising Beijing
Guangzhou Development Bank Guangzhou
Guangzhou Honda Guangzhou
Shenzhen Thunder Shenzhen
Tencent Shenzhen
2 问题分析
多表关联和单表关联相似,都类似于数据库中的自然连接。相比单表关联,多表关联的左右表和连接列更加清楚。所以可以采用和单表关联的相同的处理方式,map识别出输入的行属于哪个表之后,对其进行分割,将连接的列值保存在key中,另一列和左右表标识保存在value中,然后输出。reduce拿到连接结果之后,解析value内容,根据标志将左右表内容分开存放,然后求笛卡尔积,最后直接输出。
3.关键代码
package com.mk.mapreduce;import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;
import java.util.LinkedList;
import java.util.List;public class JoinOther {public static class JoinOtherMapper extends Mapper<LongWritable, Text, Text, TableInfo> {@Overrideprotected void setup(Context context) throws IOException, InterruptedException {}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {if (StringUtils.isBlank(value.toString())) {System.out.println("空白行");return;}String[] values = value.toString().split("\\s{2,}");if (values.length < 2 || values[0].equals("factoryname") || values[0].equals("addressID")) {System.out.println("长度不够的行:" + value.toString());return;}FileSplit fileInputSplit = (FileSplit) context.getInputSplit();if(fileInputSplit.getPath().toString().endsWith("/factory.txt")) {context.write(new Text(values[1]), new TableInfo(TableInfo.FACTORY, values[0]));}else {context.write(new Text(values[0]), new TableInfo(TableInfo.ADDRESS, values[1]));}}}public static class JoinOtherReducer extends Reducer<Text, TableInfo, Text, Text> {@Overrideprotected void setup(Context context) throws IOException, InterruptedException {context.write(new Text("factoryname"), new Text("addressname"));}@Overrideprotected void reduce(Text key, Iterable<TableInfo> values, Context context) throws IOException, InterruptedException {List<String> addresses = new LinkedList<>();List<String> factories = new LinkedList<>();for (TableInfo v : values) {if (v.getTable() == TableInfo.ADDRESS) {addresses.add(v.value.toString());} else {factories.add(v.value.toString());}}if (!addresses.isEmpty() && !factories.isEmpty()) {for (String factory :factories)for (String address : addresses)context.write(new Text(factory), new Text(address));}}}public static class TableInfo implements WritableComparable<TableInfo> {public static final int FACTORY = 1;public static final int ADDRESS = 2;private int table;private Text value;public TableInfo() {}public TableInfo(int table, String value) {this.table = table;this.value = new Text(value);}public int getTable() {return table;}public void setTable(int table) {this.table = table;}public void setValue(Text value) {this.value = value;}@Overridepublic int compareTo(TableInfo o) {int c = table - o.table;if (c != 0)return c;return value.compareTo(o.value);}@Overridepublic void write(DataOutput out) throws IOException {out.writeInt(table);this.value.write(out);}@Overridepublic void readFields(DataInput in) throws IOException {this.table = in.readInt();if (this.value == null)this.value = new Text();this.value.readFields(in);}@Overridepublic String toString() {return "TableInfo{" +"table=\'" + table +"\', value=\'" + value +"\'}";}}public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {String uri = "hdfs://192.168.150.128:9000";String input = "/joinOther/input";String output = "/joinOther/output";Configuration conf = new Configuration();if (System.getProperty("os.name").toLowerCase().contains("win"))conf.set("mapreduce.app-submission.cross-platform", "true");FileSystem fileSystem = FileSystem.get(URI.create(uri), conf);Path path = new Path(output);fileSystem.delete(path, true);Job job = new Job(conf, "JoinOther");job.setJar("./out/artifacts/hadoop_test_jar/hadoop-test.jar");job.setJarByClass(JoinOther.class);job.setMapperClass(JoinOtherMapper.class);job.setReducerClass(JoinOtherReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(TableInfo.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);FileInputFormat.addInputPaths(job, uri + input);FileOutputFormat.setOutputPath(job, new Path(uri + output));boolean ret = job.waitForCompletion(true);System.out.println(job.getJobName() + "-----" + ret);}
}