一、Partitioner概述
Map阶段总共五个步骤,2就是一个分区操作
哪个key到哪个Reducer的分配过程,是由Partitioner规定的。
二、Hadoop内置Partitioner
MapReduce的使用者通常会指定Reduce任务和Reduce任务输出文件的数量(R)。
用户在中间key上使用分区函数来对数据进行分区,之后在输入到后续任务执行进程。一个默认的分区函数式使用hash方法(比如常见的:hash(key) mod R)进行分区。hash方法能够产生非常平衡的分区。
Hadoop中自带了一个默认的分区类HashPartitioner,
它继承了Partitioner类,提供了一个getPartition的方法
/** Partition keys by their {@link Object#hashCode()}. */
public class HashPartitioner<K, V> extends Partitioner<K, V> {/** Use {@link Object#hashCode()} to partition. */public int getPartition(K key, V value,int numReduceTasks) {return
(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;}}
将key均匀布在Reduce Tasks上
(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
如果Key为Text的话,Text的hashcode方法跟String的基本一致,都是采用的Horner公式计算,得到一个int整数。但是,如果string太大的话这个int整数值可能会溢出变成负数,所以和整数的上限值Integer.MAX_VALUE(即0111111111111111)进行与运算,然后再对reduce任务个数取余,这样就可以让key均匀分布在reduce上
三、自定制Partitioner
一般我们都会使用默认的分区函数HashPartitioner
自定义数据类型处理手机上网日志: 在第二列上并不是所有的数据都是手机号(84138413并不是一个手机号),任务就是在统计手机流量时,将手机号码和非手机号输出到不同的文件中
自定义MKPartitioner
public static class MKPartitioner extends Partitioner<Text, KpiWritable> {@Overridepublic int getPartition(Text key, KpiWritable value, int numPartitions) {// 实现不同的长度不同的号码分配到不同的reduce task中int numLength = key.toString().length();if (numLength == 11) return 0;else return 1; }}
设置为打包运行,设置Partitioner为MKPartitioner设置ReducerTask的个数为2
注意:分区的例子必须要设置为打成jar包运行!
public int run(String[] args) throws Exception {// 定义一个作业Job job = new Job(getConf(), "MyJob");// 分区需要设置为打包运行job.setJarByClass(MyJob.class);// 设置输入目录FileInputFormat.setInputPaths(job, new Path(INPUT_PATH));// 设置自定义Mapper类job.setMapperClass(MyMapper.class);// 指定<k2,v2>的类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(KpiWritable.class);// 设置Partitionerjob.setPartitionerClass(NKPartitioner.class);job.setNumReduceTasks(2);// 设置自定义Reducer类job.setReducerClass(MyReducer.class);// 指定<k3,v3>的类型job.setOutputKeyClass(Text.class);job.setOutputKeyClass(KpiWritable.class);// 设置输出目录FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));// 提交作业System.exit(job.waitForCompletion(true) ? 0 : 1);return 0;}
打成jar包并在Hadoop中运行
- 通过Idea导出jar包
- 通过FTP上传到Linux中,可以使用各种FTP工具
- 通过Hadoop Shell执行jar包中的程序
通过Web接口验证Partitioner的运行:
- 通过访问http://hadoop01:50030 查看 是否有2个Reduce任务?
- Reduce输出结果是否一致?
小结:
- 分区Partitioner主要作用在于以下两点 根据业务需要,产生多个输出文件
- 多个reduce任务并发运行,提高整体job的运行效率