MapReduce编程实践

一、MapReduce编程思想

学些MapRedcue主要是学习它的编程思想,在MR的编程模型中,主要思想是把对数据的运算流程分成map和reduce两个阶段:

Map阶段:读取原始数据,形成key-value数据(map方法)。即,负责数据的过滤分发

Reduce阶段:把map阶段的key-value数据按照相同的key进行分组聚合(reduce方法)。即,数据的计算归并

它其实是一种数据逻辑运算模型,对于这样的运算模型,有一些成熟的具体软件实现,比如hadoop中的mapreduce框架、spark等,例如在hadoop的mr框架中,对map阶段的具体实现是map task,对reduce阶段的实现是reduce task。这些框架已经为我们提供了一些通用功能的实现,让我们专注于数据处理的逻辑,而不考虑分布式的具体实现,比如读取文件、写文件、数据分发等。我们要做的工作就是在这些编程框架下,来实现我们的具体需求。

下面我们先介绍一些map task和reduce task中的一些具体实现:

二、MapTask和ReduceTask

2.1 Map Task

读数据:利用InputFormat组件完成数据的读取。

    InputFormat-->TextInputFormat 读取文本文件的具体实现

            -->SequenceFileInputFormat 读取Sequence文件

            -->DBInputFormat 读数据库

处理数据:这一阶段将读取到的数据按照规则进行处理,生成key-value形式的结果。maptask通过调用用Mapper类的map方法实现对数据的处理。

分区:这一阶段主要是把map阶段产生的key-value数据进行分区,以分发给不同的reduce task来处理,使用的是Partitioner类。maptask通过调用Partitioner类的getPartition()方法来决定如何划分数据给不同的reduce task。

排序:这一阶段,对key-value数据做排序。maptask会按照key对数据进行排序,排序时调用key.compareTo()方法来实现对key-value数据排序。

2.2 Reduce Task

读数据:这一阶段通过http方式从maptask产生的数据文件中下载属于自己的“区”的数据。由于一个区的数据可能来自多个maptask,所以reduce还要把这些分散的数据进行合并(归并排序)

处理数据:一个reduce task中,处理刚才下载到自己本地的数据。通过调用GroupingComparator的compare()方法来判断文件中的哪些key-value属于同一组。然后将这一组数传给Reducer类的reduce()方法聚合一次。

输出结果:调用OutputFormat组件将结果key-value数据写出去。

    Outputformat --> TextOutputFormat 写文本文件(会把一个key-value对写一行,分隔符为制表符\t

          --> SequenceFileOutputFormat 写Sequence文件(直接将key-value对象序列化到文件中)

          --> DBOutputFormat 

下面介绍下利用MapReduce框架下的一般编程过程。我们要做的 工作就是把我们对数据的处理逻辑加入到框架的业务逻辑中。我们编写的MapReduce的job客户端主要包括三个部分,Mapper 、 Reducer和JobSubmitter,三个部分分别完成MR程序的map逻辑、reduce逻辑以及将我们编写的job程序提交给集群。下面分别介绍这三个部分如何实现。

三、Hadoop中MapReduce框架下的一般编程步骤

Mapper:创建类,该类要实现Mapper父类,复写read()方法,在方法内实现当前工程中的map逻辑。

Reducer:创建类,继承Reducer父类,复写reduce()方法,方法内实现当前工程中的reduce逻辑。

jobSubmitter:这是job在集群上实际运行的类,主要是通过main方法,封装job相关参数,并把job提交。jobsubmitter内一般包括以下操作

step1:创建Configuration对象,并通过创建的对象对集群进行配置,同时支持用户自定义一些变量并配置。这一步有些像我们集群搭建的时候对$haoop_home/etc/hadoop/*下的一些文件进行的配置。

step2:获得job对象,并通过job对象对我们job运行进行一些配置。例如,设置集群运行的jar文件、设置实际执行map和reduce的类等,下面列出一些必要设置和可选设置。

        Configuration conf = new Configuration(); //创建集群配置对象。Job job = Job.getInstance(conf);//根据配置对象获取一个job客户端实例。job.setJarByClass(JobSubmitter.class);//设置集群上job执行的类job.setMapperClass(FlowCountMapper.class);//设置job执行时使用的Mapper类job.setReducerClass(FlowCountReducer.class);//设置job执行时使用的Reducer类job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);FileInputFormat.setInputPaths(job, new Path("I:\\hadooptest\\input"));FileOutputFormat.setOutputPath(job, new Path("I:\\hadooptest\\output_pri"));//设置maptask做数据分发时使用的分发逻辑类,如果不指定,默认使用hashparjob.setPartitionerClass(ProvincePartitioner.class);job.setNumReduceTasks(4);//自定义的分发逻辑下,可能产生n个分区,所以reducetask的数量需要是nboolean res = job.waitForCompletion(true);System.exit(res ? 0:-1);

 一般实践中,可以定义一个类,其中添加main方法对job进行提交,并在其中定义静态内部类maper和reduce类。

四、MapReduce框架中的可自定义项

<不小心删除以后就没有再补充了,挺重要的。。。。补上吧。。。。>

总结,你要把bean写到文本吗?重写toString方法

要传输吗?实现Writable接口

要排序吗?实现writablecompareble接口

 

遇到一些复杂的需求,需要我们自定义实现一些组件

2.1 自定义序列化数据类型

MapReduce框架为我们提供了基本数据类型的序列化类型,如String的Text类型,int的IntWritalbe类型,null的NullWritable类型等。但是有时候会有一些我们自定义的类型需要我们在map和reduce之间进行传输或者需要写到hdfs上。hadoop提供了自己的序列化机制,实现自定义类型的序列化和反序列化将自定义的类实现hadoop提供的Writable接口。

自定义类实现Writable接口,实现readFields(in)write(out)方法。

同时,重写toString()方法,可以自定义在写到文件系统时候写入的字段内容。

     * hadoop系统在序列化该类的对象时要调用的方法*/@Overridepublic void write(DataOutput out) throws IOException {out.writeInt(upFlow);out.writeUTF(phone);out.writeInt(dFlow);out.writeInt(amountFlow);}/*** hadoop系统在反序列化该类的对象时要调用的方法*/@Overridepublic void readFields(DataInput in) throws IOException {this.upFlow = in.readInt();this.phone = in.readUTF();this.dFlow = in.readInt();this.amountFlow = in.readInt();}@Overridepublic String toString() {return this.phone + ","+this.upFlow +","+ this.dFlow +"," + this.amountFlow;}
View Code

2.2 自定义排序规则

MapReduce中提供了一个排序机制,map worker 和reduce worker ,都会对数据按照key的大小来排序,所以map和reduce阶段输出的记录都是经过排序的(按照key排序)。我们在实践中有时候需要对计算出来的结果进行排序,比如一个这样的需求:计算每个页面访问次数,并按照访问量倒序输出。我们可以在统计了每个页面访问次数之后进行排序,但是我们还可以直接应用MR自身的排序特性,在MR处理的时候按照我们的需求进行排序。这时候就需要我们自定义排序规则。

自定义类,实现WritableComparable接口,实现其中的compareTo()方法,在其中自定义排序的规则。同时一般还要实现readFields(in) 和write(out)和toString()方法。

public class PageCount implements WritableComparable<PageCount>{private String page;private int count;public void set(String page, int count) {this.page = page;this.count = count;}public String getPage() {return page;}public void setPage(String page) {this.page = page;}public int getCount() {return count;}public void setCount(int count) {this.count = count;}@Overridepublic int compareTo(PageCount o) {return o.getCount()-this.count==0?this.page.compareTo(o.getPage()):o.getCount()-this.count;}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(this.page);out.writeInt(this.count);}@Overridepublic void readFields(DataInput in) throws IOException {this.page= in.readUTF();this.count = in.readInt();}@Overridepublic String toString() {return this.page + "," + this.count;}}
View Code

总结:

实现Writable接口,是为了bean能够传输,能够写到文件系统中。

实现WritableComparable还为了bean能够按照你定义的规则进行排序。

2.2 自定义分区规则

我们知道,map计算出来的结果会分发给不同的reduce任务去进一步处理。MR中提供了一个默认的数据分发规则,会按照map的输出中的key的hashcode,然后模除reduce task的数量,模除的结果就是数据的分区。我们可以通过自定义map数据分发给reduce的规则,实现把数据按照自己的需求记录到不同的数据中。比如实现这样的需求,有一个通话记录的文件,按照归属地分别存储数据。

 自定义类,继承Partitioner父类(类的泛型为MapTask的输出的key,value的类型),重写 getPartition(<>key, <>value, int numPartitions) 方法,在其中自定义分区的规则,方法返回计算出来的分区数。MapTask每处理一行数据都会调用getPartition方法。因此最好不要在方法中创建可以给很多数据行共同使用的对象。在jobsubmitter中,设置maptask在做数据分区时使用的分区逻辑类, job.setPartitonerClass(your.class) ,同时注意设置reduceTask的任务数量为我们在分区逻辑中定义的规则下回产生的分区数量, job.setNumReduceTasks(numOfPartition); 

/*** 本类是提供给MapTask用的* MapTask通过这个类的getPartition方法,来计算它所产生的每一对kv数据该分发给哪一个reduce task* @author ThinkPad**/
public class ProvincePartitioner extends Partitioner<Text, FlowBean>{static HashMap<String,Integer> codeMap = new HashMap<>();static{codeMap.put("135", 0);codeMap.put("136", 1);codeMap.put("137", 2);codeMap.put("138", 3);codeMap.put("139", 4);}@Overridepublic int getPartition(Text key, FlowBean value, int numPartitions) {Integer code = codeMap.get(key.toString().substring(0, 3));return code==null?5:code;}}
Partitioner
public class JobSubmitter {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(JobSubmitter.class);job.setMapperClass(FlowCountMapper.class);job.setReducerClass(FlowCountReducer.class);// 设置参数:maptask在做数据分区时,用哪个分区逻辑类  (如果不指定,它会用默认的HashPartitioner)job.setPartitionerClass(ProvincePartitioner.class);// 由于我们的ProvincePartitioner可能会产生6种分区号,所以,需要有6个reduce task来接收job.setNumReduceTasks(6);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);FileInputFormat.setInputPaths(job, new Path("F:\\mrdata\\flow\\input"));FileOutputFormat.setOutputPath(job, new Path("F:\\mrdata\\flow\\province-output"));job.waitForCompletion(true);}}
JobSubmitter

2.3 自定义分组规则

MapTask每调用一次map就会产生一个k-v,多次调用后,生成多个k-v,具有相同key的的记录称为一组,会存入一个partition中,注意一个patition可以包含多个组。

 

一个ReduceTask处理一个partition,在处理的时候 ,按照key的顺序进行。调用一次reduce会聚合一组数据,就是reduce方法中传入的一个Itetor。为了确认一个分区中的两条记录是不是同一个组,会调用一个工具类GroupingCompatator的compare(01,02)方法,用来判断两个key是否相同,如果两个key相等,则为同一组。利用这样的机制,我们可以自定义一个分组规则。

自定义类,实现 WritableComparator 类实现 compare 方法,在其中告知MapTask如何判断两个 记录是不是属于同一个组。调用父类构造函数,指定比较的类。

public class OrderIdGroupingComparator extends WritableComparator {pbulic OrderIdGroupingComparator(){//通过构造函数指定要比较的类super(OrderBean.class, true);//
     }@Overridepublic int compare(WritableComparable a, WritableComparable b) {//参数中将来会传入我们自定义的继承了WritableComparable的bean,把a、b向下转型为我们自定义类型的bean,才能比较a和bOrderBean o1 = (OrderBean)a;OrderBean o2 = (OrderBean)b;return o1.getOrderId().compareTo(o2.getOrderID);//id相同就是同一组
    }
}
View Code

在jobSubmiter中指定分组规则,

job.setGroupingComparatorClass(OrderIdGroupingComparator.class);

注意:关于区分分区和分组:

分区比分组的范围更加大。分区是指,在map task结束之后,中间结果数据会被分给哪些reduce task,而分组是指,同一个分区中(即一个reduce task处理的数据中)数据的分组。在默认的计算分区的方法中,不同key的hash code对reduce task取模计算出来的结果可能相同,这样的数据会被分到同一个分区;这一个分区中的key的haashcode不同,这样就在一个区中分了不同组。

那么什么时候使用分区,什么时候使用分组呢?

再如在计算每个订单中总金额最大的3笔中的案例中,可以考虑进行倒序排序,然后取前三;按照id进行倒序排序吗?不现实,因为订单id太多,不可能启动那么多的reduce task。那么就要把多个订单的数据存储到第一个分区中,同时保证同一个订单的数据全部在一个分区中,这时候,就需要自定义分区规则(保证同一订单中的数据在同一个分区),但是又要分组排序,所以这时候就需要自定义分组规则(保证该分区中同一订单在一组,不同订单在不同组)

2.3自定义MapTask的局部聚合规则

默认情况下,map计算的结果逐条保存到磁盘中,传输给reduce之后也是分条的记录,这样可能造成一个问题就是如果某个分区下的数据较多,而有的分区下数据较少,就导致出现reduce task之间任务量差距较大,即出现数据倾斜的情况。一个解决办法是在形成map结果文件的时候进行一次局部聚合。

使用Combiner组件可以实现在每个MapTask中对数据进行一次局部聚合。这个局部聚合的逻辑其实和Reducer的逻辑是一样的,都是对map计算出的kv数据进行聚合,只不过如果是maptask来调用我们定义的Reducer实现类,则聚合的是当前这个maptask运行的结果,如果是reducetask来调用我们定义的Reducer实现类,则聚合的是全部maptask的运行结果。

定义类局部聚合类XXCombationer,继承Rducer复写reduce方法,在方法中实现具体的聚合逻辑;在jobSubmitter的job中设置mapTask端的局部聚合类为我们定义的类 job.setCombinerClass(XXCombiner.class) 。

 

2.4 控制输入输出格式。。。

 

 

五、MR程序的调试、执行方式

 

5.1 提交到linux运行

 

5.2 Win本地执行

 

转载于:https://www.cnblogs.com/Jing-Wang/p/10886890.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/248921.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

webpack基础+webpack配置文件常用配置项介绍+webpack-dev-server - QxQstar - 博客园

一.webpack基础 1.在项目中生成package.json&#xff1a;在项目根目录中输入npm init&#xff0c;根据提示输入相应信息。&#xff08;也可以不生成package.json文件&#xff0c;但是package.json是很有用的&#xff0c;所有建议生成&#xff09; 2.安装webpaack a.在全局中安装…

编译原理--NFA/DFA

现成的, 讲义: https://www.cnblogs.com/AndyEvans/p/10240790.html https://www.cnblogs.com/AndyEvans/p/10241031.html 一个例子, 写得非常好. 一下子就全明白了, 尤其是像我这种没有听过编译原理课程的人. https://blog.csdn.net/tyler_download/article/details/53139240 …

OpenLayers3关于Map Export的Canvas跨域

一 Canvas跨域现象 地图导出是地图中常用的功能&#xff0c;并且OpenLayers3中也提供了两个地图导出的例子:http://openlayers.org/en/latest/examples/export-map.html http://openlayers.org/en/latest/examples/export-pdf.html。 看到这两个例子我们都很兴奋&#xff0c;直…

typescript-koa-postgresql 实现一个简单的rest风格服务器 —— 连接 postgresql 数据库...

接上一篇&#xff0c;这里使用 sequelize 来连接 postgresql 数据库 1、安装 sequelize&#xff0c;数据库驱动 pg yarn add sequelize sequelize-typescript pg reflect-metadata 2、新建配置文件夹 conf 及 配置文件 db.conf.ts /*** name: 数据库配置* param : undefined* r…

SmartGit使用教程

说明 官网的客户端是命令行形式的&#xff0c;有兴趣可以去了解下。这里针对图形界面的smartgit做一个使用说明。 软件下载和安装 下载地址[2016.12.16测试可以] 按需选择,如果不知道自己电脑是什么系统的&#xff0c;那我没话说了https://www.syntevo.com/smartgit/ 安装 …

jquery 下拉框 select2 运用 笔记

1,添加select2 样式 参考&#xff08;https://select2.org/ &#xff09; 2,Html: <select id"txtType" name"Type" class"form-control select2" multiple"multiple"> </select> 3,jquery section scripts{ $(documen…

获取浏览器屏幕高度(js,jq) - 进击的小牛牛 - 博客园

javascript IE中&#xff1a; document.body.clientWidth > BODY对象宽度 document.body.clientHeight > BODY对象高度 document.documentElement.clientWidth > 可见区域宽度 document.documentElement.clientHeight > 可见区域高度 FireFox中&#xff1a; docum…

第一个爬虫和测试

Python测试函数的方法之一是用&#xff1a;try……except def gameover(a,b):if a>10 and b>10 and abs(a-b)2:return Trueif (a>11 and b<11) or (a<11 and b>11):return Truereturn False try:agameover(10,11)print(a) except:print("Error") g…

JS组件系列——Bootstrap 树控件使用经验分享 - 懒得安分 - 博客园

前言&#xff1a;很多时候我们在项目中需要用到树&#xff0c;有些树仅仅是展示层级关系&#xff0c;有些树是为了展示和编辑层级关系&#xff0c;还有些树是为了选中项然后其他地方调用选中项。不管怎么样&#xff0c;树控件都是很多项目里面不可或缺的组件之一。今天&#xf…

蓝桥杯 历届试题 九宫重排 (bfs+康托展开去重优化)

Description 如下面第一个图的九宫格中&#xff0c;放着 1~8 的数字卡片&#xff0c;还有一个格子空着。与空格子相邻的格子中的卡片可以移动到空格中。经过若干次移动&#xff0c;可以形成第二个图所示的局面。我们把第一个图的局面记为&#xff1a;12345678.把第二个图的局面…

DIV或者DIV里面的图片水平与垂直居中的方法 - 站住,别跑 - 博客园

DIV或者DIV里面的图片水平与垂直居中的方法 <div class“box”><img /> </div> 水平居中的常用方式&#xff1a; text-align:center ——这可以实现子元素字体&#xff0c;图片的水平居中。 margin:0 auto —— 这是针对块元素的水平居中方法 垂直居中的常…

设置图片元素上下垂直居中的7种css样式_赵一鸣博客

设置图片元素上下垂直居中的7种css样式 阅读(9548) 2018-07-15 14:13:34 图片、文字左右居中很简单&#xff0c;只需要以下代码&#xff1a; 1 text-align:center; 文字上下居中也很简单&#xff0c;假设外部div元素的高度是100px&#xff0c;那么&#xff1a; 1 line-heig…

day36 Pyhton 网络编程03

一.内容回顾 socket通常也称作"套接字"&#xff0c;用于描述IP地址和端口&#xff0c;是一个通信链的句柄&#xff0c;应用程序通常通过"套接字"向网络发出请求或者应答网络请求。 socket起源于Unix&#xff0c;而Unix/Linux基本哲学之一就是“一切皆文件”…

推荐21个顶级的Vue UI库! – TalkingData‘s Blog

推荐21个顶级的Vue UI库&#xff01; 最近&#xff0c;随着“星球大战”&#xff08;指 GitHub 的 Star 数量大比拼&#xff09;的爆发&#xff0c;Vue.js 在 GitHub 上的 Star 数超过了 React。虽然 NPM 的下载量仍然落后于 React&#xff0c;但 Vue.js 的受欢迎程度似乎在持续…

2019河北省大学生程序设计竞赛(重现赛)B 题 -Icebound and Sequence ( 等比数列求和的快速幂取模)...

题目链接&#xff1a;https://ac.nowcoder.com/acm/contest/903/B 题意&#xff1a; 给你 q,n,p,求 q1q2...qn 的和 模 p。 思路&#xff1a;一开始不会做&#xff0c;后面查了下发现有个等比数列求和的快速幂公式&#xff0c;附上链接https://www.cnblogs.com/yuiffy/p/380917…

nodejs服务后台持续运行

forever.jpeg 我用本地mac连接阿里云服务器&#xff0c;启动nodejs服务&#xff0c;客户端掉线&#xff0c;服务也会终止。如何在客户端掉线的情况下&#xff0c;node服务正常运行&#xff1f; forever介绍 forever是一个nodejs守护进程&#xff0c;完全由命令行操控。forev…

Node.js+Koa开发微信公众号个人笔记(一)准备工作 - ZhangCui - 博客园

本人也是在学习过程中&#xff0c;所以文章只作为学习笔记&#xff0c;如果能帮到你&#xff0c;那就更好啦~当然也难免会有错误&#xff0c;请不吝指出~ 一、准备工作 1、本人学习教程&#xff1a;慕课网Scott老师的《Node.js七天搞定微信公众号》 &#xff0c;但是有点小贵…

【vue-router①】router-link跳转页面传递参数 - 进击的前端之路(偶尔爬坑java小路) - SegmentFault 思否

在vue项目中&#xff0c;往往会遇到这样的情况&#xff0c;就是要实现在一个循环列表中&#xff0c;点击其中一条跳转到下个页面&#xff0c;然后将这一条的相关数据带到下个页面中显示&#xff0c;这是个循环列表&#xff0c;无论点哪一条都是跳到相同的页面&#xff0c;只是填…

Jmeter-【JSON Extractor】-响应结果中三级key取值

一、请求返回样式 二、取第三个option 三、查看结果 转载于:https://www.cnblogs.com/Nancy-Lee/p/10938758.html

手摸手,带你用vue撸后台 系列一(基础篇) - 掘金

完整项目地址&#xff1a;vue-element-admin 系列文章&#xff1a; 手摸手&#xff0c;带你用 vue 撸后台 系列一&#xff08;基础篇&#xff09;手摸手&#xff0c;带你用 vue 撸后台 系列二(登录权限篇)手摸手&#xff0c;带你用 vue 撸后台 系列三 (实战篇)手摸手&#xf…