1.需求
统计每个手机号消费总金额,按照消费金额降序排序,最终联通、电信、移动分别写入不同的文件。
130、131、132(联通) 133(电信) 135、136、137、138、139 (移动)
手机号,消费记录
13512345678,50
13512345678,90
13122345678,10
13122345678,110
13212345678,10
13212345678,90
13912345378,10
13912345378,90
13612345678,50
13612345678,55
13312345378,65
13312345378,90
2.将数据上传到hdfs
3.Idea代码
MyPartition
package demo8;import demo5.DescIntWritable;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;public class MyPartition extends Partitioner<DescIntWritable, Text> {@Overridepublic int getPartition(DescIntWritable descIntWritable, Text text, int numPartitions) {String textStr = text.toString();boolean arr1 = textStr.startsWith("130") || textStr.startsWith("131") || textStr.startsWith("132");boolean arr2=textStr.startsWith("133");if(arr1){return 0;}else if(arr2){return 1;}return 2;}
}
PhoneBillJob
package demo8;import demo5.DescIntWritable;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import java.io.IOException;public class PhoneBillJob {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {Configuration conf = new Configuration();conf.set("fs.defaultFS","hdfs://hadoop10:8020");Job job = Job.getInstance(conf);job.setJarByClass(PhoneBillJob.class);job.setInputFormatClass(TextInputFormat.class);job.setOutputFormatClass(TextOutputFormat.class);TextInputFormat.addInputPath(job,new Path("/phtest/phone.txt"));TextOutputFormat.setOutputPath(job,new Path("/phtest/out"));job.setMapperClass(PhoneBillMapper.class);job.setReducerClass(PhoneBillReducer.class);//map输出的键与值类型job.setMapOutputKeyClass(DescIntWritable.class);job.setMapOutputValueClass(Text.class);//reducer输出的键与值类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(DescIntWritable.class);//设置reduceTask的个数job.setNumReduceTasks(3);//设置自定义分区job.setPartitionerClass(MyPartition.class);boolean b = job.waitForCompletion(true);System.out.println(b);}static class PhoneBillMapper extends Mapper<LongWritable, Text,DescIntWritable,Text> {@Overrideprotected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {String[] arr = value.toString().split(",");context.write(new DescIntWritable(Integer.parseInt(arr[1])),new Text(arr[0]));}}static class PhoneBillReducer extends Reducer<DescIntWritable,Text,Text,DescIntWritable> {@Overrideprotected void reduce(DescIntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {for (Text value : values) {context.write(value, key);}}}}
4.在hdfs上查看结果
就一直往前走吧,别回头~