MapReduce二次排序

2019独角兽企业重金招聘Python工程师标准>>> hot3.png

 默认情况下,Map输出的结果会对Key进行默认的排序,但是有时候需要对Key排序的同时还需要对Value进行排序,这时候就要用到二次排序了。下面我们来说说二次排序

1、二次排序原理

  我们把二次排序分为以下几个阶段

  Map起始阶段

    在Map阶段,使用job.setInputFormatClass()定义的InputFormat,将输入的数据集分割成小数据块split,同时InputFormat提供一个RecordReader的实现。在这里我们使用的是TextInputFormat,它提供的RecordReader会将文本的行号作为Key,这一行的文本作为Value。这就是自定 Mapper的输入是<LongWritable,Text> 的原因。然后调用自定义Mapper的map方法,将一个个<LongWritable,Text>键值对输入给Mapper的map方法

  Map最后阶段

    在Map阶段的最后,会先调用job.setPartitionerClass()对这个Mapper的输出结果进行分区,每个分区映射到一个Reducer。每个分区内又调用job.setSortComparatorClass()设置的Key比较函数类排序。可以看到,这本身就是一个二次排序。如果没有通过job.setSortComparatorClass()设置 Key比较函数类,则使用Key实现的compareTo()方法

  Reduce阶段

    在Reduce阶段,reduce()方法接受所有映射到这个Reduce的map输出后,也会调用job.setSortComparatorClass()方法设置的Key比较函数类,对所有数据进行排序。然后开始构造一个Key对应的Value迭代器。这时就要用到分组,使用 job.setGroupingComparatorClass()方法设置分组函数类。只要这个比较器比较的两个Key相同,它们就属于同一组,它们的 Value放在一个Value迭代器,而这个迭代器的Key使用属于同一个组的所有Key的第一个Key。最后就是进入Reducer的 reduce()方法,reduce()方法的输入是所有的Key和它的Value迭代器,同样注意输入与输出的类型必须与自定义的Reducer中声明的一致

 

  接下来我们通过示例,可以很直观的了解二次排序的原理

  输入文件 sort.txt 内容为

    40 20

    40 10

    40 30

    40 5

    30 30

    30 20

    30 10

    30 40

    50 20

    50 50

    50 10

    50 60

  输出文件的内容(从小到大排序)如下

    30 10

    30 20

    30 30

    30 40

    --------

    40 5

    40 10

    40 20

    40 30

    --------

    50 10

    50 20

    50 50

    50 60

  从输出的结果可以看出Key实现了从小到大的排序,同时相同Key的Value也实现了从小到大的排序,这就是二次排序的结果

2、二次排序的具体流程

  在本例中要比较两次。先按照第一字段排序,然后再对第一字段相同的按照第二字段排序。根据这一点,我们可以构造一个复合类IntPair ,它有两个字段,先利用分区对第一字段排序,再利用分区内的比较对第二字段排序。二次排序的流程分为以下几步。

  1、自定义 key

    所有自定义的key应该实现接口WritableComparable,因为它是可序列化的并且可比较的。WritableComparable 的内部方法如下所示

 

// 反序列化,从流中的二进制转换成IntPair
public void readFields(DataInput in) throws IOException// 序列化,将IntPair转化成使用流传送的二进制
public void write(DataOutput out)//  key的比较
public int compareTo(IntPair o)//  默认的分区类 HashPartitioner,使用此方法
public int hashCode()//  默认实现
public boolean equals(Object right)

 

  2、自定义分区

    自定义分区函数类FirstPartitioner,是key的第一次比较,完成对所有key的排序。

public static class FirstPartitioner extends Partitioner< IntPair,IntWritable>

    在job中使用setPartitionerClasss()方法设置Partitioner

job.setPartitionerClasss(FirstPartitioner.Class);

  3、Key的比较类

    这是Key的第二次比较,对所有的Key进行排序,即同时完成IntPair中的first和second排序。该类是一个比较器,可以通过两种方式实现。

    1) 继承WritableComparator。

public static class KeyComparator extends WritableComparator

      必须有一个构造函数,并且重载以下方法。

public int compare(WritableComparable w1, WritableComparable w2)

    2) 实现接口 RawComparator。

      上面两种实现方式,在Job中,可以通过setSortComparatorClass()方法来设置Key的比较类。

job.setSortComparatorClass(KeyComparator.Class);

      注意:如果没有使用自定义的SortComparator类,则默认使用Key中compareTo()方法对Key排序。

  4、定义分组类函数

    在Reduce阶段,构造一个与 Key 相对应的 Value 迭代器的时候,只要first相同就属于同一个组,放在一个Value迭代器。定义这个比较器,可以有两种方式。

    1) 继承 WritableComparator。

public static class GroupingComparator extends WritableComparator

      必须有一个构造函数,并且重载以下方法。

public int compare(WritableComparable w1, WritableComparable w2)

    2) 实现接口 RawComparator。

      上面两种实现方式,在 Job 中,可以通过 setGroupingComparatorClass()方法来设置分组类。

job.setGroupingComparatorClass(GroupingComparator.Class);

      另外注意的是,如果reduce的输入与输出不是同一种类型,则 Combiner和Reducer 不能共用 Reducer 类,因为 Combiner 的输出是 reduce 的输入。除非重新定义一个Combiner。

3、代码实现

  Hadoop的example包中自带了一个MapReduce的二次排序算法,下面对 example包中的二次排序进行改进

 

package com.buaa;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.WritableComparable;/** 
* @ProjectName SecondarySort
* @PackageName com.buaa
* @ClassName IntPair
* @Description 将示例数据中的key/value封装成一个整体作为Key,同时实现 WritableComparable接口并重写其方法
* @Author 刘吉超
* @Date 2016-06-07 22:31:53
*/
public class IntPair implements WritableComparable<IntPair>{private int first;private int second;public IntPair(){}public IntPair(int left, int right){set(left, right);}public void set(int left, int right){first = left;second = right;}@Overridepublic void readFields(DataInput in) throws IOException{first = in.readInt();second = in.readInt();}@Overridepublic void write(DataOutput out) throws IOException{out.writeInt(first);out.writeInt(second);}@Overridepublic int compareTo(IntPair o){if (first != o.first){return first < o.first ? -1 : 1;}else if (second != o.second){return second < o.second ? -1 : 1;}else{return 0;}}@Overridepublic int hashCode(){return first * 157 + second;}@Overridepublic boolean equals(Object right){if (right == null)return false;if (this == right)return true;if (right instanceof IntPair){IntPair r = (IntPair) right;return r.first == first && r.second == second;}else{return false;}}public int getFirst(){return first;}public int getSecond(){return second;}
}

 

package com.buaa;import java.io.IOException;
import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;/** 
* @ProjectName SecondarySort
* @PackageName com.buaa
* @ClassName SecondarySort
* @Description TODO
* @Author 刘吉超
* @Date 2016-06-07 22:40:37
*/
@SuppressWarnings("deprecation")
public class SecondarySort {public static class Map extends Mapper<LongWritable, Text, IntPair, IntWritable> {public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();StringTokenizer tokenizer = new StringTokenizer(line);int left = 0;int right = 0;if (tokenizer.hasMoreTokens()) {left = Integer.parseInt(tokenizer.nextToken());if (tokenizer.hasMoreTokens())right = Integer.parseInt(tokenizer.nextToken());context.write(new IntPair(left, right), new IntWritable(right));}}}/** 自定义分区函数类FirstPartitioner,根据 IntPair中的first实现分区*/public static class FirstPartitioner extends Partitioner<IntPair, IntWritable>{@Overridepublic int getPartition(IntPair key, IntWritable value,int numPartitions){return Math.abs(key.getFirst() * 127) % numPartitions;}}/** 自定义GroupingComparator类,实现分区内的数据分组*/@SuppressWarnings("rawtypes")public static class GroupingComparator extends WritableComparator{protected GroupingComparator(){super(IntPair.class, true);}@Overridepublic int compare(WritableComparable w1, WritableComparable w2){IntPair ip1 = (IntPair) w1;IntPair ip2 = (IntPair) w2;int l = ip1.getFirst();int r = ip2.getFirst();return l == r ? 0 : (l < r ? -1 : 1);}}public static class Reduce extends Reducer<IntPair, IntWritable, Text, IntWritable> {public void reduce(IntPair key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {for (IntWritable val : values) {context.write(new Text(Integer.toString(key.getFirst())), val);}}}public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {// 读取配置文件Configuration conf = new Configuration();// 判断路径是否存在,如果存在,则删除    Path mypath = new Path(args[1]);  FileSystem hdfs = mypath.getFileSystem(conf);  if (hdfs.isDirectory(mypath)) {  hdfs.delete(mypath, true);  } Job job = new Job(conf, "secondarysort");// 设置主类job.setJarByClass(SecondarySort.class);// 输入路径FileInputFormat.setInputPaths(job, new Path(args[0]));// 输出路径FileOutputFormat.setOutputPath(job, new Path(args[1]));// Mapperjob.setMapperClass(Map.class);// Reducerjob.setReducerClass(Reduce.class);// 分区函数job.setPartitionerClass(FirstPartitioner.class);// 本示例并没有自定义SortComparator,而是使用IntPair中compareTo方法进行排序 job.setSortComparatorClass();// 分组函数job.setGroupingComparatorClass(GroupingComparator.class);// map输出key类型job.setMapOutputKeyClass(IntPair.class);// map输出value类型job.setMapOutputValueClass(IntWritable.class);// reduce输出key类型job.setOutputKeyClass(Text.class);// reduce输出value类型job.setOutputValueClass(IntWritable.class);// 输入格式job.setInputFormatClass(TextInputFormat.class);// 输出格式job.setOutputFormatClass(TextOutputFormat.class);System.exit(job.waitForCompletion(true) ? 0 : 1);}
}

 

转载于:https://my.oschina.net/xiaoluobutou/blog/807362

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/542048.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数据有序_详解数据库插入性能优化:合并+事务+有序数据进行INSERT操作

概述对于一些数据量较大的系统&#xff0c;数据库面临的问题除了查询效率低下&#xff0c;还有就是数据入库时间长。特别像报表系统&#xff0c;每天花费在数据导入上的时间可能会长达几个小时或十几个小时之久。因此&#xff0c;优化数据库插入性能是很有意义的。其实最有效的…

容器内应用日志收集方案

容器化应用日志收集挑战 应用日志的收集、分析和监控是日常运维工作重要的部分&#xff0c;妥善地处理应用日志收集往往是应用容器化重要的一个课题。 Docker处理日志的方法是通过docker engine捕捉每一个容器进程的STDOUT和STDERR&#xff0c;通过为contrainer制定不同log dri…

python统计行号_利用Python进行数据分析(第三篇上)

上一篇文章我记录了自己在入门 Python 学习的一些基础内容以及实际操作代码时所碰到的一些问题。这篇我将会记录我在学习和运用 Python 进行数据分析的过程&#xff1a;介绍 Numpy 和 Pandas 两个包运用 Numpy 和 Pandas 分析一维、二维数据数据分析的基本过程实战项目【用 Pyt…

lnmp架构搭建—源码编译(nginx、mysql、php)

含义及理解&#xff1a; LNMP LinuxNginxMysqlPHP&#xff1a;LNMP是指一组通常一起使用来运行动态网站或者服务器的自由软件名称首字母缩写。L指Linux&#xff0c;N指Nginx&#xff0c;M一般指MySQL&#xff0c;也可以指MariaDB&#xff0c;P一般指PHP&#xff0c;也可以指P…

解析xml_Mybatis中mapper的xml解析详解

上一篇文章分析了mapper注解关键类MapperAnnotationBuilder&#xff0c;今天来看mapper的项目了解析关键类XMLMapperBuilder。基础介绍回顾下之前是在分析configuration的初始化过程&#xff0c;已经进行到了最后一步mapperElement(root.evalNode("mappers"))&#x…

lnmp—MemCache的作用

含义及理解&#xff1a; 1 . memcache是一个高性能的分布式的内存对象缓存系统&#xff0c;用于动态web应用以减轻数据库负担。通过在内存里维护一个统一的巨大的hash表&#xff0c;来存储经常被读写的一些数组与文件&#xff0c;从而极大的提高网站的运行效率。 memcache是一…

openresty—实现缓存前移

含义及理解&#xff1a; OpenResty(又称&#xff1a;ngx_openresty) 是一个基于 NGINX 的可伸缩的 Web 平台&#xff0c;由中国人章亦春发起&#xff0c;提供了很多高质量的第三方模块。 其目标是让Web服务直接跑在Nginx服务内部&#xff0c;充分利用Nginx的非阻塞I/O模型&am…

Nginx+Keepalived+Tomcat之动静分离的web集群

NginxKeepalivedTomcat之动静分离的web集群 博客分类&#xff1a; webserverNginxKeepalivedTomcat之动静分离的web集群为小公司提供大概一天持续在100万/日之间访问的高性能、高可用、高并发访问及动静分离的web集群方案NginxKeepalived 高可用、反向代理NginxPHP …

安装完成后的配置_cent os7 默认安装后的一般配置

在安装cent os7后&#xff0c;进入系统会出现一些命令无法执行。这是因为最小化没有安装包含的软件包。这时候先要配置一下基本的IP参数&#xff0c;(包括动态&#xff0c;静态&#xff0c;或者是双网卡绑定)。我们在虚拟机中模拟操作一下&#xff0c;配置文件在/etc/sysconfig…

lnmp构架——对tomcat详解

tomcat的安装部署 安装jdk和tomcat tar zxf jdk-7u79-linux-x64.tar.gz -C /usr/local/ tar zxf apache-tomcat-7.0.37.tar.gz -C /usr/local/做好软连接便于访问 cd /usr/local ln -s jdk1.7.0_79/ java ln -s apache-tomcat-7.0.37/ tomcat配置环境变量 vim /etc/profile…

zabbix监控部署 与添加主机

zabbix介绍&#xff1a; zabbix&#xff08;[zbiks]&#xff09;是一个基于WEB界面的提供分布式系统监视以及网络监视功能的企业级的开源解决方案。zabbix能监视各种网络参数&#xff0c;保证服务器系统的安全运营&#xff1b;并提供灵活的通知机制以让系统管理员快速定位/解决…

打开是什么样子的图片_情侣头像 | 无论是什么样子的你 我都好喜欢

点击【情侣图片大全】- 右上角找到【…】立刻设我为星标/置顶 - 不迷路哦情侣图片大全“时光真疯狂&#xff0c;我一路执迷与匆忙”情侣/闺蜜/动漫/闺蜜网名长按图片保存 点击图片放大图片高清&#xff0c;建议在 W i f i 下浏览这个世界是多么神奇我竟然遇见了你无论是什么…

zabbix监控平台添加服务(http,nginx,mysql)

1 . 监控httpd服务&#xff1a; 首先确保已经搭建zabbix监控平台&#xff0c;并且将需要监控的主机已经添加。 对主机server2 上的http服务进行监控&#xff0c;首先确保server2主机安装了http服务。 使用http在zabbix中自带监控模版 点击配置->主机->server2->模…

zabbix使用JMX监控tomcat

JMX 全称是Java Management Extensions,即Java管理扩展。Java程序会开放一些端口&#xff0c;用来获取运行状况。 从Zabbix2.0开始&#xff0c;内置了监控JMX的功能,叫做"Zabbix Java Gateway ",在Zabbix Serve上会启动名为"Zabbix Java Gateway "的进程&…

自定义控件添加属性_|AutoCAD LT 2019 Mac自定义功能区的方法

AutoCAD LT是一款非常好用的CAD三维设计绘图软件&#xff0c;最新版本2019拥有改进的桌面、新应用实现跨设备工作流&#xff0c;以及DWG比较等新功能&#xff0c;并且AutoCAD LT 2019 Mac可以根据你的需要和工作习惯来自定义功能区&#xff0c;下面为大家带来自定义功能区的详细…

Zabbix监控——proxy 分布式监控配置

proxy分布式监控 Zabbix proxy是在大规模分布式监控场景中&#xff0c;采用的一种用以分担server端压力的分层结构&#xff0c; proxy可以代替zabbix server检索客户端的数据&#xff0c;然后把数据汇报给zabbix server&#xff0c;极大的减轻了server的负载压力&#xff0c;使…

AutoCAD_acadiso.dwt卡死

2019独角兽企业重金招聘Python工程师标准>>> 问题描述&#xff1a;每次执行到打开acadiso.dwt就卡死&#xff0c;且电脑显示有网&#xff0c;确打不开网页 可能原因&#xff1a;可能是因为AotuCAD是盗版的 解决办法&#xff1a; 1.在任务管理器中把WSCommCntr.exe进…

docker简介与搭建

1 . 对docker的理解&#xff1a; Docker 是一个开源的应用容器引擎&#xff0c;让开发者可以打包他们的应用以及依赖包到一个可移植的镜像中&#xff0c;然后发布到任何流行的 Linux或Windows 机器上&#xff0c;也可以实现虚拟化。容器是完全使用沙箱机制&#xff0c;相互之间…

007_Web to lead

转载于:https://www.cnblogs.com/bandariFang/p/6229491.html

设置header_Nginx的这些安全设置,你都知道吗?

Nginx 是最流行的 Web 服务器&#xff0c;可以只占用 2.5 MB 的内存&#xff0c;却可以轻松处理 1w 的 http 请求。做为网站的入口&#xff0c;Nginx 的安全设置重要性不言而喻。下面带你一起去认识一下这些安全配置吧&#xff01;nginx.conf是 Nginx 最主要的配置文件&#xf…