MapReduce二次排序

2019独角兽企业重金招聘Python工程师标准>>> hot3.png

 默认情况下,Map输出的结果会对Key进行默认的排序,但是有时候需要对Key排序的同时还需要对Value进行排序,这时候就要用到二次排序了。下面我们来说说二次排序

1、二次排序原理

  我们把二次排序分为以下几个阶段

  Map起始阶段

    在Map阶段,使用job.setInputFormatClass()定义的InputFormat,将输入的数据集分割成小数据块split,同时InputFormat提供一个RecordReader的实现。在这里我们使用的是TextInputFormat,它提供的RecordReader会将文本的行号作为Key,这一行的文本作为Value。这就是自定 Mapper的输入是<LongWritable,Text> 的原因。然后调用自定义Mapper的map方法,将一个个<LongWritable,Text>键值对输入给Mapper的map方法

  Map最后阶段

    在Map阶段的最后,会先调用job.setPartitionerClass()对这个Mapper的输出结果进行分区,每个分区映射到一个Reducer。每个分区内又调用job.setSortComparatorClass()设置的Key比较函数类排序。可以看到,这本身就是一个二次排序。如果没有通过job.setSortComparatorClass()设置 Key比较函数类,则使用Key实现的compareTo()方法

  Reduce阶段

    在Reduce阶段,reduce()方法接受所有映射到这个Reduce的map输出后,也会调用job.setSortComparatorClass()方法设置的Key比较函数类,对所有数据进行排序。然后开始构造一个Key对应的Value迭代器。这时就要用到分组,使用 job.setGroupingComparatorClass()方法设置分组函数类。只要这个比较器比较的两个Key相同,它们就属于同一组,它们的 Value放在一个Value迭代器,而这个迭代器的Key使用属于同一个组的所有Key的第一个Key。最后就是进入Reducer的 reduce()方法,reduce()方法的输入是所有的Key和它的Value迭代器,同样注意输入与输出的类型必须与自定义的Reducer中声明的一致

 

  接下来我们通过示例,可以很直观的了解二次排序的原理

  输入文件 sort.txt 内容为

    40 20

    40 10

    40 30

    40 5

    30 30

    30 20

    30 10

    30 40

    50 20

    50 50

    50 10

    50 60

  输出文件的内容(从小到大排序)如下

    30 10

    30 20

    30 30

    30 40

    --------

    40 5

    40 10

    40 20

    40 30

    --------

    50 10

    50 20

    50 50

    50 60

  从输出的结果可以看出Key实现了从小到大的排序,同时相同Key的Value也实现了从小到大的排序,这就是二次排序的结果

2、二次排序的具体流程

  在本例中要比较两次。先按照第一字段排序,然后再对第一字段相同的按照第二字段排序。根据这一点,我们可以构造一个复合类IntPair ,它有两个字段,先利用分区对第一字段排序,再利用分区内的比较对第二字段排序。二次排序的流程分为以下几步。

  1、自定义 key

    所有自定义的key应该实现接口WritableComparable,因为它是可序列化的并且可比较的。WritableComparable 的内部方法如下所示

 

// 反序列化,从流中的二进制转换成IntPair
public void readFields(DataInput in) throws IOException// 序列化,将IntPair转化成使用流传送的二进制
public void write(DataOutput out)//  key的比较
public int compareTo(IntPair o)//  默认的分区类 HashPartitioner,使用此方法
public int hashCode()//  默认实现
public boolean equals(Object right)

 

  2、自定义分区

    自定义分区函数类FirstPartitioner,是key的第一次比较,完成对所有key的排序。

public static class FirstPartitioner extends Partitioner< IntPair,IntWritable>

    在job中使用setPartitionerClasss()方法设置Partitioner

job.setPartitionerClasss(FirstPartitioner.Class);

  3、Key的比较类

    这是Key的第二次比较,对所有的Key进行排序,即同时完成IntPair中的first和second排序。该类是一个比较器,可以通过两种方式实现。

    1) 继承WritableComparator。

public static class KeyComparator extends WritableComparator

      必须有一个构造函数,并且重载以下方法。

public int compare(WritableComparable w1, WritableComparable w2)

    2) 实现接口 RawComparator。

      上面两种实现方式,在Job中,可以通过setSortComparatorClass()方法来设置Key的比较类。

job.setSortComparatorClass(KeyComparator.Class);

      注意:如果没有使用自定义的SortComparator类,则默认使用Key中compareTo()方法对Key排序。

  4、定义分组类函数

    在Reduce阶段,构造一个与 Key 相对应的 Value 迭代器的时候,只要first相同就属于同一个组,放在一个Value迭代器。定义这个比较器,可以有两种方式。

    1) 继承 WritableComparator。

public static class GroupingComparator extends WritableComparator

      必须有一个构造函数,并且重载以下方法。

public int compare(WritableComparable w1, WritableComparable w2)

    2) 实现接口 RawComparator。

      上面两种实现方式,在 Job 中,可以通过 setGroupingComparatorClass()方法来设置分组类。

job.setGroupingComparatorClass(GroupingComparator.Class);

      另外注意的是,如果reduce的输入与输出不是同一种类型,则 Combiner和Reducer 不能共用 Reducer 类,因为 Combiner 的输出是 reduce 的输入。除非重新定义一个Combiner。

3、代码实现

  Hadoop的example包中自带了一个MapReduce的二次排序算法,下面对 example包中的二次排序进行改进

 

package com.buaa;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.WritableComparable;/** 
* @ProjectName SecondarySort
* @PackageName com.buaa
* @ClassName IntPair
* @Description 将示例数据中的key/value封装成一个整体作为Key,同时实现 WritableComparable接口并重写其方法
* @Author 刘吉超
* @Date 2016-06-07 22:31:53
*/
public class IntPair implements WritableComparable<IntPair>{private int first;private int second;public IntPair(){}public IntPair(int left, int right){set(left, right);}public void set(int left, int right){first = left;second = right;}@Overridepublic void readFields(DataInput in) throws IOException{first = in.readInt();second = in.readInt();}@Overridepublic void write(DataOutput out) throws IOException{out.writeInt(first);out.writeInt(second);}@Overridepublic int compareTo(IntPair o){if (first != o.first){return first < o.first ? -1 : 1;}else if (second != o.second){return second < o.second ? -1 : 1;}else{return 0;}}@Overridepublic int hashCode(){return first * 157 + second;}@Overridepublic boolean equals(Object right){if (right == null)return false;if (this == right)return true;if (right instanceof IntPair){IntPair r = (IntPair) right;return r.first == first && r.second == second;}else{return false;}}public int getFirst(){return first;}public int getSecond(){return second;}
}

 

package com.buaa;import java.io.IOException;
import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;/** 
* @ProjectName SecondarySort
* @PackageName com.buaa
* @ClassName SecondarySort
* @Description TODO
* @Author 刘吉超
* @Date 2016-06-07 22:40:37
*/
@SuppressWarnings("deprecation")
public class SecondarySort {public static class Map extends Mapper<LongWritable, Text, IntPair, IntWritable> {public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();StringTokenizer tokenizer = new StringTokenizer(line);int left = 0;int right = 0;if (tokenizer.hasMoreTokens()) {left = Integer.parseInt(tokenizer.nextToken());if (tokenizer.hasMoreTokens())right = Integer.parseInt(tokenizer.nextToken());context.write(new IntPair(left, right), new IntWritable(right));}}}/** 自定义分区函数类FirstPartitioner,根据 IntPair中的first实现分区*/public static class FirstPartitioner extends Partitioner<IntPair, IntWritable>{@Overridepublic int getPartition(IntPair key, IntWritable value,int numPartitions){return Math.abs(key.getFirst() * 127) % numPartitions;}}/** 自定义GroupingComparator类,实现分区内的数据分组*/@SuppressWarnings("rawtypes")public static class GroupingComparator extends WritableComparator{protected GroupingComparator(){super(IntPair.class, true);}@Overridepublic int compare(WritableComparable w1, WritableComparable w2){IntPair ip1 = (IntPair) w1;IntPair ip2 = (IntPair) w2;int l = ip1.getFirst();int r = ip2.getFirst();return l == r ? 0 : (l < r ? -1 : 1);}}public static class Reduce extends Reducer<IntPair, IntWritable, Text, IntWritable> {public void reduce(IntPair key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {for (IntWritable val : values) {context.write(new Text(Integer.toString(key.getFirst())), val);}}}public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {// 读取配置文件Configuration conf = new Configuration();// 判断路径是否存在,如果存在,则删除    Path mypath = new Path(args[1]);  FileSystem hdfs = mypath.getFileSystem(conf);  if (hdfs.isDirectory(mypath)) {  hdfs.delete(mypath, true);  } Job job = new Job(conf, "secondarysort");// 设置主类job.setJarByClass(SecondarySort.class);// 输入路径FileInputFormat.setInputPaths(job, new Path(args[0]));// 输出路径FileOutputFormat.setOutputPath(job, new Path(args[1]));// Mapperjob.setMapperClass(Map.class);// Reducerjob.setReducerClass(Reduce.class);// 分区函数job.setPartitionerClass(FirstPartitioner.class);// 本示例并没有自定义SortComparator,而是使用IntPair中compareTo方法进行排序 job.setSortComparatorClass();// 分组函数job.setGroupingComparatorClass(GroupingComparator.class);// map输出key类型job.setMapOutputKeyClass(IntPair.class);// map输出value类型job.setMapOutputValueClass(IntWritable.class);// reduce输出key类型job.setOutputKeyClass(Text.class);// reduce输出value类型job.setOutputValueClass(IntWritable.class);// 输入格式job.setInputFormatClass(TextInputFormat.class);// 输出格式job.setOutputFormatClass(TextOutputFormat.class);System.exit(job.waitForCompletion(true) ? 0 : 1);}
}

 

转载于:https://my.oschina.net/xiaoluobutou/blog/807362

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/542048.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数据有序_详解数据库插入性能优化:合并+事务+有序数据进行INSERT操作

概述对于一些数据量较大的系统&#xff0c;数据库面临的问题除了查询效率低下&#xff0c;还有就是数据入库时间长。特别像报表系统&#xff0c;每天花费在数据导入上的时间可能会长达几个小时或十几个小时之久。因此&#xff0c;优化数据库插入性能是很有意义的。其实最有效的…

Java ProcessBuilder environment()方法与示例

ProcessBuilder类的environment()方法 (ProcessBuilder Class environment() method) environment() method is available in java.lang package. environment()方法在java.lang包中可用。 environment() method is used to return Map interfaces of this process builder env…

容器内应用日志收集方案

容器化应用日志收集挑战 应用日志的收集、分析和监控是日常运维工作重要的部分&#xff0c;妥善地处理应用日志收集往往是应用容器化重要的一个课题。 Docker处理日志的方法是通过docker engine捕捉每一个容器进程的STDOUT和STDERR&#xff0c;通过为contrainer制定不同log dri…

python统计行号_利用Python进行数据分析(第三篇上)

上一篇文章我记录了自己在入门 Python 学习的一些基础内容以及实际操作代码时所碰到的一些问题。这篇我将会记录我在学习和运用 Python 进行数据分析的过程&#xff1a;介绍 Numpy 和 Pandas 两个包运用 Numpy 和 Pandas 分析一维、二维数据数据分析的基本过程实战项目【用 Pyt…

lnmp架构搭建—源码编译(nginx、mysql、php)

含义及理解&#xff1a; LNMP LinuxNginxMysqlPHP&#xff1a;LNMP是指一组通常一起使用来运行动态网站或者服务器的自由软件名称首字母缩写。L指Linux&#xff0c;N指Nginx&#xff0c;M一般指MySQL&#xff0c;也可以指MariaDB&#xff0c;P一般指PHP&#xff0c;也可以指P…

Java PipedInputStream available()方法与示例

PipedInputStream类的available()方法 (PipedInputStream Class available() method) available() method is available in java.io package. available()方法在java.io包中可用。 available() method is used to return the number of available bytes left that can be read …

解析xml_Mybatis中mapper的xml解析详解

上一篇文章分析了mapper注解关键类MapperAnnotationBuilder&#xff0c;今天来看mapper的项目了解析关键类XMLMapperBuilder。基础介绍回顾下之前是在分析configuration的初始化过程&#xff0c;已经进行到了最后一步mapperElement(root.evalNode("mappers"))&#x…

lnmp—MemCache的作用

含义及理解&#xff1a; 1 . memcache是一个高性能的分布式的内存对象缓存系统&#xff0c;用于动态web应用以减轻数据库负担。通过在内存里维护一个统一的巨大的hash表&#xff0c;来存储经常被读写的一些数组与文件&#xff0c;从而极大的提高网站的运行效率。 memcache是一…

Java ListResourceBundle getKeys()方法与示例

ListResourceBundle类的getContents()方法 (ListResourceBundle Class getContents() method) getContents() method is available in java.util package. getContents()方法在java.util包中可用。 getContents() method is used to return an enumeration of all the keys tha…

orale用户密码过期处理

使用具有管理权限的用户登录1、查看用户的proifle是哪个&#xff0c;一般是default&#xff1a;SELECT username,PROFILE FROM dba_users;2、查看指定概要文件&#xff08;如default&#xff09;的密码有效期设置&#xff1a;sql>SELECT * FROM dba_profiles s WHERE s.prof…

python字典怎么设置_在python中设置字典中的属性

在python中设置字典中的属性是否可以在python中从字典创建一个对象&#xff0c;使每个键都是该对象的属性&#xff1f;像这样的东西&#xff1a;d { name: Oscar, lastName: Reyes, age:32 }e Employee(d)print e.name # Oscarprint e.age 10 # 42我认为这几乎与这个问题相反…

Java ObjectInputStream readByte()方法与示例

ObjectInputStream类readByte()方法 (ObjectInputStream Class readByte() method) readByte() method is available in java.io package. readByte()方法在java.io包中可用。 readByte() method is used to read a byte (i.e. 8 bit) of data from this ObjectInputStream. re…

openresty—实现缓存前移

含义及理解&#xff1a; OpenResty(又称&#xff1a;ngx_openresty) 是一个基于 NGINX 的可伸缩的 Web 平台&#xff0c;由中国人章亦春发起&#xff0c;提供了很多高质量的第三方模块。 其目标是让Web服务直接跑在Nginx服务内部&#xff0c;充分利用Nginx的非阻塞I/O模型&am…

Nginx+Keepalived+Tomcat之动静分离的web集群

NginxKeepalivedTomcat之动静分离的web集群 博客分类&#xff1a; webserverNginxKeepalivedTomcat之动静分离的web集群为小公司提供大概一天持续在100万/日之间访问的高性能、高可用、高并发访问及动静分离的web集群方案NginxKeepalived 高可用、反向代理NginxPHP …

安装完成后的配置_cent os7 默认安装后的一般配置

在安装cent os7后&#xff0c;进入系统会出现一些命令无法执行。这是因为最小化没有安装包含的软件包。这时候先要配置一下基本的IP参数&#xff0c;(包括动态&#xff0c;静态&#xff0c;或者是双网卡绑定)。我们在虚拟机中模拟操作一下&#xff0c;配置文件在/etc/sysconfig…

Java Integer类lowerOneBit()方法与示例

整数类lowerOneBit()方法 (Integer class lowestOneBit() method) lowestOneBit() method is available in java.lang package. minimumOneBit()方法在java.lang包中可用。 lowestOneBit() method is used to find at most only single 1’s bit from the rightmost side one b…

lnmp构架——对tomcat详解

tomcat的安装部署 安装jdk和tomcat tar zxf jdk-7u79-linux-x64.tar.gz -C /usr/local/ tar zxf apache-tomcat-7.0.37.tar.gz -C /usr/local/做好软连接便于访问 cd /usr/local ln -s jdk1.7.0_79/ java ln -s apache-tomcat-7.0.37/ tomcat配置环境变量 vim /etc/profile…

Linux 查找文件

find 查找目录 -name "文件名"find / -name "php.ini"locate 文件名locate php.ini 一&#xff1a;locate命令 locate命令用于查找文件&#xff0c;它比find命令的搜索速度快&#xff0c;它需要一个数据库&#xff0c;这个数据库由每天的例行工作&#xff…

Java GregorianCalendar hashCode()方法与示例

GregorianCalendar类的hashCode()方法 (GregorianCalendar Class hashCode() method) hashCode() method is available in java.util package. hashCode()方法在java.util包中可用。 hashCode() method is used to returns the hash code for this GregorianCalendar. hashCode…

python元组为什么不可变_为什么python字符串和元组是不可变的?

我不知道为什么字符串和元组是不可变的&#xff1b;使它们不可变的优点和缺点是什么&#xff1f;除了Python解释器的内部实现&#xff0c;这种设计在编写程序上是否有很好的意义&#xff1f;(例如&#xff0c;如果元组和字符串是可变的&#xff0c;会更容易吗&#xff1f;)如果…