1、概念
2、Hadoop默认分组机制--所有的Key分到一个组,一个Reduce任务处理
3、代码示例
FlowBean
package com.ares.hadoop.mr.flowgroup;import java.io.DataInput; import java.io.DataOutput; import java.io.IOException;import org.apache.hadoop.io.WritableComparable;public class FlowBean implements WritableComparable<FlowBean>{private String phoneNum;private long upFlow;private long downFlow;private long sumFlow;public FlowBean() {// TODO Auto-generated constructor stub } // public FlowBean(String phoneNum, long upFlow, long downFlow, long sumFlow) { // super(); // this.phoneNum = phoneNum; // this.upFlow = upFlow; // this.downFlow = downFlow; // this.sumFlow = sumFlow; // }public String getPhoneNum() {return phoneNum;}public void setPhoneNum(String phoneNum) {this.phoneNum = phoneNum;}public long getUpFlow() {return upFlow;}public void setUpFlow(long upFlow) {this.upFlow = upFlow;}public long getDownFlow() {return downFlow;}public void setDownFlow(long downFlow) {this.downFlow = downFlow;}public long getSumFlow() {return sumFlow;}public void setSumFlow(long sumFlow) {this.sumFlow = sumFlow;}@Overridepublic void readFields(DataInput in) throws IOException {// TODO Auto-generated method stubphoneNum = in.readUTF();upFlow = in.readLong();downFlow = in.readLong();sumFlow = in.readLong();}@Overridepublic void write(DataOutput out) throws IOException {// TODO Auto-generated method stubout.writeUTF(phoneNum);out.writeLong(upFlow);out.writeLong(downFlow);out.writeLong(sumFlow);}@Overridepublic String toString() {return "" + phoneNum + "\t" + upFlow + "\t" + downFlow + "\t" + sumFlow;}@Overridepublic int compareTo(FlowBean o) {// TODO Auto-generated method stubreturn sumFlow>o.getSumFlow()?-1:1;}}
FlowGroup
package com.ares.hadoop.mr.flowgroup;import java.io.IOException;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.log4j.Logger;import com.ares.hadoop.mr.exception.LineException; import com.ares.hadoop.mr.flowgroup.FlowBean;;public class FlowGroup extends Configured implements Tool {private static final Logger LOGGER = Logger.getLogger(FlowGroup.class);enum Counter {LINESKIP}public static class FlowGroupMapper extends Mapper<LongWritable, Text, Text, FlowBean> {private String line;private int length;private final static char separator = '\t';private String phoneNum;private long upFlow;private long downFlow;//private long sumFlow;private Text text = new Text();private FlowBean flowBean = new FlowBean();@Overrideprotected void map(LongWritable key,Text value,Mapper<LongWritable, Text, Text, FlowBean>.Context context)throws IOException, InterruptedException {// TODO Auto-generated method stub//super.map(key, value, context); String errMsg;try {line = value.toString();String[] fields = StringUtils.split(line, separator);length = fields.length;if (length != 11) {throw new LineException(key.get() + ", " + line + " LENGTH INVALID, IGNORE...");}phoneNum = fields[1];upFlow = Long.parseLong(fields[length-3]);downFlow = Long.parseLong(fields[length-2]);//sumFlow = upFlow + downFlow; text.set(phoneNum);flowBean.setPhoneNum(phoneNum);flowBean.setUpFlow(upFlow);flowBean.setDownFlow(downFlow);//flowBean.setSumFlow(sumFlow); context.write(text, flowBean);} catch (LineException e) {// TODO: handle exception LOGGER.error(e);System.out.println(e);context.getCounter(Counter.LINESKIP).increment(1);return;} catch (NumberFormatException e) {// TODO: handle exceptionerrMsg = key.get() + ", " + line + " FLOW DATA INVALID, IGNORE...";LOGGER.error(errMsg);System.out.println(errMsg);context.getCounter(Counter.LINESKIP).increment(1);return;} catch (Exception e) {// TODO: handle exception LOGGER.error(e);System.out.println(e);context.getCounter(Counter.LINESKIP).increment(1);return;} }}public static class FlowGroupReducer extends Reducer<Text, FlowBean, FlowBean, NullWritable> {private FlowBean flowBean = new FlowBean();@Overrideprotected void reduce(Text key,Iterable<FlowBean> values,Reducer<Text, FlowBean, FlowBean, NullWritable>.Context context)throws IOException, InterruptedException {// TODO Auto-generated method stub//super.reduce(arg0, arg1, arg2);long upFlowCounter = 0;long downFlowCounter = 0;for (FlowBean flowBean : values) {upFlowCounter += flowBean.getUpFlow();downFlowCounter += flowBean.getDownFlow();}flowBean.setPhoneNum(key.toString());flowBean.setUpFlow(upFlowCounter);flowBean.setDownFlow(downFlowCounter);flowBean.setSumFlow(upFlowCounter + downFlowCounter);context.write(flowBean, NullWritable.get());}}@Overridepublic int run(String[] args) throws Exception {// TODO Auto-generated method stubString errMsg = "FlowGroup: TEST STARTED...";LOGGER.debug(errMsg);System.out.println(errMsg);Configuration conf = new Configuration();//FOR Eclipse JVM Debug //conf.set("mapreduce.job.jar", "flowsum.jar");Job job = Job.getInstance(conf);// JOB NAMEjob.setJobName("FlowGroup");// JOB MAPPER & REDUCERjob.setJarByClass(FlowGroup.class);job.setMapperClass(FlowGroupMapper.class);job.setReducerClass(FlowGroupReducer.class);// JOB PARTITIONjob.setPartitionerClass(FlowGroupPartition.class);// JOB REDUCE TASK NUMBERjob.setNumReduceTasks(5);// MAP & REDUCEjob.setOutputKeyClass(FlowBean.class);job.setOutputValueClass(NullWritable.class);// MAPjob.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);// JOB INPUT & OUTPUT PATH//FileInputFormat.addInputPath(job, new Path(args[0]));FileInputFormat.setInputPaths(job, args[1]);FileOutputFormat.setOutputPath(job, new Path(args[2]));// VERBOSE OUTPUTif (job.waitForCompletion(true)) {errMsg = "FlowGroup: TEST SUCCESSFULLY...";LOGGER.debug(errMsg);System.out.println(errMsg);return 0;} else {errMsg = "FlowGroup: TEST FAILED...";LOGGER.debug(errMsg);System.out.println(errMsg);return 1;} }public static void main(String[] args) throws Exception {if (args.length != 3) {String errMsg = "FlowGroup: ARGUMENTS ERROR";LOGGER.error(errMsg);System.out.println(errMsg);System.exit(-1);}int result = ToolRunner.run(new Configuration(), new FlowGroup(), args);System.exit(result);} }
FlowGroupPartition
package com.ares.hadoop.mr.flowgroup;import java.util.HashMap;import org.apache.hadoop.mapreduce.Partitioner;public class FlowGroupPartition<KEY, VALUE> extends Partitioner<KEY, VALUE>{private static HashMap<String, Integer> groupMap = new HashMap<String, Integer>();static {groupMap.put("135", 0);groupMap.put("136", 1);groupMap.put("137", 2);groupMap.put("138", 3);}@Overridepublic int getPartition(KEY key, VALUE value, int numPartitions) {// TODO Auto-generated method stubreturn (groupMap.get(key.toString().substring(0, 3)) == null)?4:groupMap.get(key.toString().substring(0, 3));}}