大数据之MapReduce详解(MR的运行机制及配合WordCount实例来说明运行机制)

  • 目录
    • 前言:
    • 1、MapReduce原理
    • 2、mapreduce实践(WordCount实例)

目录

今天先总体说下MapReduce的相关知识,后续将会详细说明对应的shuffle、mr与yarn的联系、以及mr的join操作的等知识。以下内容全是个人学习后的见解,如有遗漏或不足请大家多多指教。

前言:

为什么要MAPREDUCE
(1)海量数据在单机上处理因为硬件资源限制,无法胜任
(2)而一旦将单机版程序扩展到集群来分布式运行,将极大增加程序的复杂度和开发难度
(3)引入mapreduce框架后,开发人员可以将绝大部分工作集中在业务逻辑的开发上,而将分布式计算中的复杂性交由框架来处理。

设想一个海量数据场景下的wordcount需求:
单机版:内存受限,磁盘受限,运算能力受限分布式:
1、文件分布式存储(HDFS)
2、运算逻辑需要至少分成2个阶段(一个阶段独立并发,一个阶段汇聚)
3、运算程序如何分发
4、程序如何分配运算任务(切片)
5、两阶段的程序如何启动?如何协调?
6、整个程序运行过程中的监控?容错?重试?

可见在程序由单机版扩成分布式时,会引入大量的复杂工作。为了提高开发效率,可以将分布式程序中的公共功能封装成框架,让开发人员可以将精力集中于业务逻辑。

而mapreduce就是这样一个分布式程序的通用框架,其应对以上问题的整体结构如下:
1、MRAppMaster(mapreduce application master)
2、MapTask
3、ReduceTask

1、MapReduce原理

Mapreduce是一个分布式运算程序的编程框架,是用户开发“基于hadoop的数据分析应用”的核心框架;
Mapreduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个hadoop集群上;

Mapreduce框架结构及核心运行机制
1.1、结构
一个完整的mapreduce程序在分布式运行时有三类实例进程 :
1、MRAppMaster:负责整个程序的过程调度及状态协调
2、mapTask:负责map阶段的整个数据处理流程
3、ReduceTask:负责reduce阶段的整个数据处理流程
1.2、mapreduce框架的设计思想
这里写图片描述
这里面有两个任务的分配过程:1、总的任务切割分配给各个mapTask,不同的mapTask再将得到的hashmap按照首字母划分,分配给各个reduceTask。

1.3、mapreduce程序运行的整体流程(wordcount运行过程的解析)
这里写图片描述
流程解析
(job.split:负责任务的切分,形成一个任务切片规划文件。
wc.jar:要运行的jar包,包含mapper、reducer、Driver等java类。
job.xml:job的其他配置信息:如指定map是哪个类,reduce是那个类,以及输入数据的路径在哪,输出数据的路径在哪等配置信息。)
前提:客户端提交任务给yarn后(提交前会进行任务的规划),yarn利用ResouceManager去找到mrAppmaster.
1、 一个mr程序启动的时候,最先启动的是MRAppMaster,MRAppMaster启动后根据本次job的描述信息,计算出需要的maptask实例数量,然后向集群申请机器启动相应数量的maptask进程

2、 maptask进程启动之后,根据给定的数据切片范围进行数据处理,主体流程为:
a) 利用客户指定的inputformat来获取RecordReader读取数据,形成输入KV对(框架干的事)
b) 将输入KV对传递给客户定义的map()方法,做逻辑运算,并将map()方法输出的KV对收集到缓存
c) 将缓存中的KV对按照K分区排序后不断溢写到磁盘文件

3、 MRAppMaster监控到所有maptask进程任务完成之后,会根据客户指定的参数启动相应数量的reducetask进程,并告知reducetask进程要处理的数据范围(数据分区)

4、 Reducetask进程启动之后,根据MRAppMaster告知的待处理数据所在位置,从若干台maptask运行所在机器上获取到若干个maptask输出结果文件,并在本地进行重新归并排序,然后按照相同key的KV为一个组,调用客户定义的reduce()方法进行逻辑运算,并收集运算输出的结果KV,然后调用客户指定的outputformat将结果数据输出到外部存储(对应的就是context.write方法)

2、mapreduce实践(WordCount实例)

编程规范:

(1)用户编写的程序分成三个部分:Mapper,Reducer,Driver(提交运行mr程序的客户端)
(2)Mapper的输入数据是KV对的形式(KV的类型可自定义)
(3)Mapper的输出数据是KV对的形式(KV的类型可自定义)
(4)Mapper中的业务逻辑写在map()方法中
(5)map()方法(maptask进程)对每一个<K,V>调用一次
(6)Reducer的输入数据类型对应Mapper的输出数据类型,也是KV
(7)Reducer的业务逻辑写在reduce()方法中
(8)Reducetask进程对每一组相同k的<k,v>组调用一次reduce()方法
(9)用户自定义的Mapper和Reducer都要继承各自的父类
(10)整个程序需要一个Drvier来进行提交,提交的是一个描述了各种必要信息的job对象

WordCount程序
mapper类

package bigdata.mr.wcdemo;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
//map方法的生命周期:  框架每传一行数据就被调用一次* KEYIN: 默认情况下,是mr框架所读到的一行文本的起始偏移量,Long,* 但是在hadoop中有自己的更精简的序列化接口,所以不直接用Long,而用LongWritable* * VALUEIN:默认情况下,是mr框架所读到的一行文本的内容,String,同上,用Text* * KEYOUT:是用户自定义逻辑处理完成之后输出数据中的key,在此处是单词,String,同上,用Text* VALUEOUT:是用户自定义逻辑处理完成之后输出数据中的value,在此处是单词次数,Integer,同上,用IntWritable*/
public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{/*** map阶段的业务逻辑就写在自定义的map()方法中* maptask会对每一行输入数据调用一次我们自定义的map()方法*/@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {        //将maptask传给我们的文本内容先转换成StringString line = value.toString();//根据空格将这一行切分成单词String[] words = line.split(" ");       //将单词输出为<单词,1>for(String word:words){//将单词作为key,将次数1作为value,以便于后续的数据分发,可以根据单词分发,以便于相同单词会到相同的reduce taskcontext.write(new Text(word), new IntWritable(1));}}
}

reducer类

package mr_test;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
//生命周期:框架每传递进来一个k相同的value 组,reduce方法就被调用一次* KEYIN, VALUEIN 对应  mapper输出的KEYOUT,VALUEOUT类型对应* KEYOUT, VALUEOUT 是自定义reduce逻辑处理结果的输出数据类型* KEYOUT是单词* VLAUEOUT是总次数*/
public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {   /*** <angelababy,1><angelababy,1><angelababy,1><angelababy,1><angelababy,1>* <hello,1><hello,1><hello,1><hello,1><hello,1><hello,1>* <banana,1><banana,1><banana,1><banana,1><banana,1><banana,1>* 入参key,是一组相同单词kv对的key*/@Overrideprotected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { int count=0;for(IntWritable value:values){count+=value.get();     }context.write(key, new IntWritable(count));}
}

Driver类 用来描述job并提交job

package mr_test;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/*** 相当于一个yarn集群的客户端* 需要在此封装我们的mr程序的相关运行参数,指定jar包* 最后提交给yarn*/
public class WordcountDriver {public static void main(String[] args) throws IOException, Exception, InterruptedException {Configuration cf = new Configuration();
//  把这个程序打包成一个Job来运行Job job = Job.getInstance();        //指定本程序的jar包所在的本地路径job.setJarByClass(WordcountDriver.class);       //指定本业务job要使用的mapper/Reducer业务类job.setMapperClass(WorldcountMapper.class);job.setReducerClass(WordcountReducer.class);        //指定mapper输出数据的kv类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);  //指定最终输出的数据的kv类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class); //指定job的输入原始文件所在目录FileInputFormat.setInputPaths(job, new Path(args[0]));  //指定job的输出结果所在目录FileOutputFormat.setOutputPath(job, new Path(args[1]));     //将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn去运行boolean res = job.waitForCompletion(true);System.exit(res?0:1);   }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/456754.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

20155222 2016-2017-2 《Java程序设计》第8周学习总结

20155222 2016-2017-2 《Java程序设计》第8周学习总结 教材学习内容总结 Java NIO(New IO)是一个可以替代标准Java IO API的IO API&#xff08;从Java 1.4开始)&#xff0c;Java NIO提供了与标准IO不同的IO工作方式。 Java NIO: Channels and Buffers&#xff08;通道和缓冲区&…

BAT经典面试题精简版(基础知识附答案)

文章目录目录J2SE基础JVM操作系统TCP/IP数据结构与算法目录 J2SE基础 九种基本数据类型的大小&#xff0c;以及他们的封装类。 原始类型封装类 booleanBoolean charCharacter byteByte shortShort intInteger longLong floatFloat doubleDouble Switch能否用string做参数&…

使用2to3.py 转换 python2.x 代码 到python3

1.使用Windows 命令提示符&#xff08;cmd&#xff09;cd到2to3.py 脚本所在位置&#xff0c;如下图&#xff1a; 找不到的2 to 3.py的去 pycharm中双击shift搜索一下 2.紧接着运行 2to3.py 脚本&#xff08;可省略&#xff09; 3.执行你想要转换的文件 python 2to3.py -w H:…

iis6.0与asp.net的运行原理

这几天上网翻阅了不少前辈们的关于iis和asp.net运行原理的博客&#xff0c;学的有点零零散散&#xff0c;花了好长时间做了一个小结&#xff08;虽然文字不多&#xff0c;但也花了不少时间呢&#xff09;&#xff0c;鄙人不才&#xff0c;难免有理解不道的地方&#xff0c;还望…

Android学习笔记进阶十之Matrix错切变换

刚开始我也不懂啥叫错切变换&#xff0c;一看效果图你就恍然大悟。 对图像的错切变换做个总结&#xff1a; x x0 b*y0; y d*x0 y0; 与之对应的方法是&#xff1a; [java] view plaincopyMatrix matrix new Matrix(); matrix.setSkew(0.0f, 0.5f);

Django中的核心思想ORM---元类实现ORM

1. ORM是什么 ORM 是 python编程语言后端web框架 Django的核心思想&#xff0c;“Object Relational Mapping”&#xff0c;即对象-关系映射&#xff0c;简称ORM。 一个句话理解就是&#xff1a;创建一个实例对象&#xff0c;用创建它的类名当做数据表名&#xff0c;用创建它…

移动互联网广告 - 第十更 - 广告投放运营 DashBoard - 2016/12/10

广告投放运营 DashBoard设计 移动互联网互联网广告投放&#xff0c;数据监控DashBoard&#xff0c;基础样例示意&#xff0c;下图仅供参考&#xff08;来自于互联网&#xff09;。 转载于:https://www.cnblogs.com/pythonMLer/p/6154700.html

微信小程序中使用emoji表情相关说明

2019独角兽企业重金招聘Python工程师标准>>> 本帖将聚合一些跟emoji表情有关的知识&#xff1b;相关文章&#xff1a;“i爱记账” 小程序后端开发小结 第7条经验前端传过来的昵称和备注信息一定要经过严格的正则表达式过滤&#xff0c;放置出现XSS等攻击&#xff0c…

WSGI直观形象的了解一下

1. 浏览器请求动态页面过程 2. WSGI 怎么在你刚建立的Web服务器上运行一个Django应用和Flask应用&#xff0c;如何不做任何改变而适应不同的web架构呢&#xff1f; 在以前&#xff0c;选择 Python web 架构会受制于可用的web服务器&#xff0c;反之亦然。如果架构和服务器可以…

安装Hbase(分布式)遇到一些问题及解决方法

问题一&#xff1a;安装完成后在Hbase shell 命令行执行list命令时&#xff0c;爆出如下错误&#xff1a; hbase(main):001:0> list TABLE …

PyCharm光标变粗的解决办法

pycharm中光标变粗&#xff0c;如下&#xff1a; 此时变成了改写模式&#xff0c;只需要按下键盘的insert键即可 转载于:https://www.cnblogs.com/uglyliu/p/6159839.html

SparkRDD常用算子实践(附运行效果图)

目录1、简单算子说明2、复杂算子说明 目录 SparkRDD算子分为两类&#xff1a;Transformation与Action. Transformation&#xff1a;即延迟加载数据&#xff0c;Transformation会记录元数据信息&#xff0c;当计算任务触发Action时&#xff0c;才会真正开始计算。 Action&am…

Kali-linux使用Nessus

Nessus号称是世界上最流行的漏洞扫描程序&#xff0c;全世界有超过75000个组织在使用它。该工具提供完整的电脑漏洞扫描服务&#xff0c;并随时更新其漏洞数据库。Nessus不同于传统的漏洞扫描软件&#xff0c;Nessus可同时在本机或远端上遥控&#xff0c;进行系统的漏洞分析扫描…

HDFS读写数据的原理

目录1 概述2 HDFS写数据流程3 HDFS读数据流程 目录 最近由于要准备面试&#xff0c;就把之前学过的东西好好整理下&#xff0c;权当是复习。 下面说下HDFS读写数据的原理。 1 概述 HDFS集群分为两大角色&#xff1a;NameNode、DataNode NameNode负责管理整个文件系统的元数…

理解列存储索引

版权声明&#xff1a;原创作品&#xff0c;谢绝转载&#xff01;否则将追究法律责任。 优点和使用场景 SQL Server 内存中列存储索引通过使用基于列的数据存储和基于列的查询处理来存储和管理数据。 列存储索引适合于主要执行大容量加载和只读查询的数据仓库工作负荷…

大数据开发初学者学习路线

目录前言导读&#xff1a;第一章&#xff1a;初识Hadoop第二章&#xff1a;更高效的WordCount第三章&#xff1a;把别处的数据搞到Hadoop上第四章&#xff1a;把Hadoop上的数据搞到别处去第五章&#xff1a;快一点吧&#xff0c;我的SQL第六章&#xff1a;一夫多妻制第七章&…

安卓屏幕适配问题

屏幕适配是根据屏幕密度&#xff0c;dpi为单位的&#xff0c;而不是分辨率。 手机会根据不同手机的密度&#xff0c;自己去不同资源目录下去找对应的资源 比如:   每个图片目录下的图片资源都是一样的&#xff0c;只是大小不一样   比如drawable-sw800dp-mdpi目录&#xff…

MapReduce原理全剖析

MapReduce剖析图 如上图所示是MR的运行详细过程 首先mapTask读文件是通过InputFormat&#xff08;内部是调RecordReader()–&#xff1e;read()&#xff09;来一次读一行&#xff0c;返回K,V值。&#xff08;默认是TextInputFormat&#xff0c;还可以输入其他的类型如:音视频&…

利用selenium webdriver点击alert提示框

在进行元素定位时常常遇到这样的alert框&#xff1a; 那么该如何定位并点击确定或取消按钮呢&#xff1f;stackoverflow上找到了这个问题的答案。 OK&#xff0c; Show you the code&#xff1a; 1 driver.findElement(By.id("updateButton")).click(); 2 //pop up w…

Django的核心思想ORM

元类实现ORM 1. ORM是什么 ORM 是 python编程语言后端web框架 Django的核心思想&#xff0c;“Object Relational Mapping”&#xff0c;即对象-关系映射&#xff0c;简称ORM。 一个句话理解就是&#xff1a;创建一个实例对象&#xff0c;用创建它的类名当做数据表名&#x…