大数据之MapReduce详解(MR的运行机制及配合WordCount实例来说明运行机制)

  • 目录
    • 前言:
    • 1、MapReduce原理
    • 2、mapreduce实践(WordCount实例)

目录

今天先总体说下MapReduce的相关知识,后续将会详细说明对应的shuffle、mr与yarn的联系、以及mr的join操作的等知识。以下内容全是个人学习后的见解,如有遗漏或不足请大家多多指教。

前言:

为什么要MAPREDUCE
(1)海量数据在单机上处理因为硬件资源限制,无法胜任
(2)而一旦将单机版程序扩展到集群来分布式运行,将极大增加程序的复杂度和开发难度
(3)引入mapreduce框架后,开发人员可以将绝大部分工作集中在业务逻辑的开发上,而将分布式计算中的复杂性交由框架来处理。

设想一个海量数据场景下的wordcount需求:
单机版:内存受限,磁盘受限,运算能力受限分布式:
1、文件分布式存储(HDFS)
2、运算逻辑需要至少分成2个阶段(一个阶段独立并发,一个阶段汇聚)
3、运算程序如何分发
4、程序如何分配运算任务(切片)
5、两阶段的程序如何启动?如何协调?
6、整个程序运行过程中的监控?容错?重试?

可见在程序由单机版扩成分布式时,会引入大量的复杂工作。为了提高开发效率,可以将分布式程序中的公共功能封装成框架,让开发人员可以将精力集中于业务逻辑。

而mapreduce就是这样一个分布式程序的通用框架,其应对以上问题的整体结构如下:
1、MRAppMaster(mapreduce application master)
2、MapTask
3、ReduceTask

1、MapReduce原理

Mapreduce是一个分布式运算程序的编程框架,是用户开发“基于hadoop的数据分析应用”的核心框架;
Mapreduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个hadoop集群上;

Mapreduce框架结构及核心运行机制
1.1、结构
一个完整的mapreduce程序在分布式运行时有三类实例进程 :
1、MRAppMaster:负责整个程序的过程调度及状态协调
2、mapTask:负责map阶段的整个数据处理流程
3、ReduceTask:负责reduce阶段的整个数据处理流程
1.2、mapreduce框架的设计思想
这里写图片描述
这里面有两个任务的分配过程:1、总的任务切割分配给各个mapTask,不同的mapTask再将得到的hashmap按照首字母划分,分配给各个reduceTask。

1.3、mapreduce程序运行的整体流程(wordcount运行过程的解析)
这里写图片描述
流程解析
(job.split:负责任务的切分,形成一个任务切片规划文件。
wc.jar:要运行的jar包,包含mapper、reducer、Driver等java类。
job.xml:job的其他配置信息:如指定map是哪个类,reduce是那个类,以及输入数据的路径在哪,输出数据的路径在哪等配置信息。)
前提:客户端提交任务给yarn后(提交前会进行任务的规划),yarn利用ResouceManager去找到mrAppmaster.
1、 一个mr程序启动的时候,最先启动的是MRAppMaster,MRAppMaster启动后根据本次job的描述信息,计算出需要的maptask实例数量,然后向集群申请机器启动相应数量的maptask进程

2、 maptask进程启动之后,根据给定的数据切片范围进行数据处理,主体流程为:
a) 利用客户指定的inputformat来获取RecordReader读取数据,形成输入KV对(框架干的事)
b) 将输入KV对传递给客户定义的map()方法,做逻辑运算,并将map()方法输出的KV对收集到缓存
c) 将缓存中的KV对按照K分区排序后不断溢写到磁盘文件

3、 MRAppMaster监控到所有maptask进程任务完成之后,会根据客户指定的参数启动相应数量的reducetask进程,并告知reducetask进程要处理的数据范围(数据分区)

4、 Reducetask进程启动之后,根据MRAppMaster告知的待处理数据所在位置,从若干台maptask运行所在机器上获取到若干个maptask输出结果文件,并在本地进行重新归并排序,然后按照相同key的KV为一个组,调用客户定义的reduce()方法进行逻辑运算,并收集运算输出的结果KV,然后调用客户指定的outputformat将结果数据输出到外部存储(对应的就是context.write方法)

2、mapreduce实践(WordCount实例)

编程规范:

(1)用户编写的程序分成三个部分:Mapper,Reducer,Driver(提交运行mr程序的客户端)
(2)Mapper的输入数据是KV对的形式(KV的类型可自定义)
(3)Mapper的输出数据是KV对的形式(KV的类型可自定义)
(4)Mapper中的业务逻辑写在map()方法中
(5)map()方法(maptask进程)对每一个<K,V>调用一次
(6)Reducer的输入数据类型对应Mapper的输出数据类型,也是KV
(7)Reducer的业务逻辑写在reduce()方法中
(8)Reducetask进程对每一组相同k的<k,v>组调用一次reduce()方法
(9)用户自定义的Mapper和Reducer都要继承各自的父类
(10)整个程序需要一个Drvier来进行提交,提交的是一个描述了各种必要信息的job对象

WordCount程序
mapper类

package bigdata.mr.wcdemo;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
//map方法的生命周期:  框架每传一行数据就被调用一次* KEYIN: 默认情况下,是mr框架所读到的一行文本的起始偏移量,Long,* 但是在hadoop中有自己的更精简的序列化接口,所以不直接用Long,而用LongWritable* * VALUEIN:默认情况下,是mr框架所读到的一行文本的内容,String,同上,用Text* * KEYOUT:是用户自定义逻辑处理完成之后输出数据中的key,在此处是单词,String,同上,用Text* VALUEOUT:是用户自定义逻辑处理完成之后输出数据中的value,在此处是单词次数,Integer,同上,用IntWritable*/
public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{/*** map阶段的业务逻辑就写在自定义的map()方法中* maptask会对每一行输入数据调用一次我们自定义的map()方法*/@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {        //将maptask传给我们的文本内容先转换成StringString line = value.toString();//根据空格将这一行切分成单词String[] words = line.split(" ");       //将单词输出为<单词,1>for(String word:words){//将单词作为key,将次数1作为value,以便于后续的数据分发,可以根据单词分发,以便于相同单词会到相同的reduce taskcontext.write(new Text(word), new IntWritable(1));}}
}

reducer类

package mr_test;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
//生命周期:框架每传递进来一个k相同的value 组,reduce方法就被调用一次* KEYIN, VALUEIN 对应  mapper输出的KEYOUT,VALUEOUT类型对应* KEYOUT, VALUEOUT 是自定义reduce逻辑处理结果的输出数据类型* KEYOUT是单词* VLAUEOUT是总次数*/
public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {   /*** <angelababy,1><angelababy,1><angelababy,1><angelababy,1><angelababy,1>* <hello,1><hello,1><hello,1><hello,1><hello,1><hello,1>* <banana,1><banana,1><banana,1><banana,1><banana,1><banana,1>* 入参key,是一组相同单词kv对的key*/@Overrideprotected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { int count=0;for(IntWritable value:values){count+=value.get();     }context.write(key, new IntWritable(count));}
}

Driver类 用来描述job并提交job

package mr_test;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/*** 相当于一个yarn集群的客户端* 需要在此封装我们的mr程序的相关运行参数,指定jar包* 最后提交给yarn*/
public class WordcountDriver {public static void main(String[] args) throws IOException, Exception, InterruptedException {Configuration cf = new Configuration();
//  把这个程序打包成一个Job来运行Job job = Job.getInstance();        //指定本程序的jar包所在的本地路径job.setJarByClass(WordcountDriver.class);       //指定本业务job要使用的mapper/Reducer业务类job.setMapperClass(WorldcountMapper.class);job.setReducerClass(WordcountReducer.class);        //指定mapper输出数据的kv类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);  //指定最终输出的数据的kv类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class); //指定job的输入原始文件所在目录FileInputFormat.setInputPaths(job, new Path(args[0]));  //指定job的输出结果所在目录FileOutputFormat.setOutputPath(job, new Path(args[1]));     //将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn去运行boolean res = job.waitForCompletion(true);System.exit(res?0:1);   }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/456754.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

生动形象的理解什么是装饰器!

装饰器 装饰器是程序开发中经常会用到的一个功能&#xff0c;用好了装饰器&#xff0c;开发效率如虎添翼&#xff0c;所以这也是Python面试中必问的问题&#xff0c;但对于好多初次接触这个知识的人来讲&#xff0c;这个功能有点绕&#xff0c;自学时直接绕过去了&#xff0c;…

PLSQL 申明和游标

--从键盘输入一个数 accept b prompt 请输入一个大于零的数字; declareanum number : &b; beginwhile anum>0loopdbms_output.put_line(anum);anum:anum-1;end loop; end;declarev_num number; begin -- 从stsu表中选出id最大的值&#xff0c;并根据该值打印次数select …

20155222 2016-2017-2 《Java程序设计》第8周学习总结

20155222 2016-2017-2 《Java程序设计》第8周学习总结 教材学习内容总结 Java NIO(New IO)是一个可以替代标准Java IO API的IO API&#xff08;从Java 1.4开始)&#xff0c;Java NIO提供了与标准IO不同的IO工作方式。 Java NIO: Channels and Buffers&#xff08;通道和缓冲区&…

BAT经典面试题精简版(基础知识附答案)

文章目录目录J2SE基础JVM操作系统TCP/IP数据结构与算法目录 J2SE基础 九种基本数据类型的大小&#xff0c;以及他们的封装类。 原始类型封装类 booleanBoolean charCharacter byteByte shortShort intInteger longLong floatFloat doubleDouble Switch能否用string做参数&…

使用2to3.py 转换 python2.x 代码 到python3

1.使用Windows 命令提示符&#xff08;cmd&#xff09;cd到2to3.py 脚本所在位置&#xff0c;如下图&#xff1a; 找不到的2 to 3.py的去 pycharm中双击shift搜索一下 2.紧接着运行 2to3.py 脚本&#xff08;可省略&#xff09; 3.执行你想要转换的文件 python 2to3.py -w H:…

iis6.0与asp.net的运行原理

这几天上网翻阅了不少前辈们的关于iis和asp.net运行原理的博客&#xff0c;学的有点零零散散&#xff0c;花了好长时间做了一个小结&#xff08;虽然文字不多&#xff0c;但也花了不少时间呢&#xff09;&#xff0c;鄙人不才&#xff0c;难免有理解不道的地方&#xff0c;还望…

元类--用不上的先了解

元类 1. 类也是对象 在大多数编程语言中&#xff0c;类就是一组用来描述如何生成一个对象的代码段。在Python中这一点仍然成立&#xff1a; >>> class ObjectCreator(object): … pass … >>> my_object ObjectCreator() >>> print(my_ob…

Android学习笔记进阶十之Matrix错切变换

刚开始我也不懂啥叫错切变换&#xff0c;一看效果图你就恍然大悟。 对图像的错切变换做个总结&#xff1a; x x0 b*y0; y d*x0 y0; 与之对应的方法是&#xff1a; [java] view plaincopyMatrix matrix new Matrix(); matrix.setSkew(0.0f, 0.5f);

SQL数据库实战(含建表数据和查询案例)

Oracle数据库安装的时候会自带一个练习用数据库&#xff08;其中包含employee表&#xff0c;后来版本中此表改名为emp&#xff09;&#xff1b; 首先在安装过程中应该有个选项“是否安装实例表”&#xff08;完全安装模式下默认是选择的&#xff09;&#xff0c;需要选择才有此…

Django中的核心思想ORM---元类实现ORM

1. ORM是什么 ORM 是 python编程语言后端web框架 Django的核心思想&#xff0c;“Object Relational Mapping”&#xff0c;即对象-关系映射&#xff0c;简称ORM。 一个句话理解就是&#xff1a;创建一个实例对象&#xff0c;用创建它的类名当做数据表名&#xff0c;用创建它…

移动互联网广告 - 第十更 - 广告投放运营 DashBoard - 2016/12/10

广告投放运营 DashBoard设计 移动互联网互联网广告投放&#xff0c;数据监控DashBoard&#xff0c;基础样例示意&#xff0c;下图仅供参考&#xff08;来自于互联网&#xff09;。 转载于:https://www.cnblogs.com/pythonMLer/p/6154700.html

微信小程序中使用emoji表情相关说明

2019独角兽企业重金招聘Python工程师标准>>> 本帖将聚合一些跟emoji表情有关的知识&#xff1b;相关文章&#xff1a;“i爱记账” 小程序后端开发小结 第7条经验前端传过来的昵称和备注信息一定要经过严格的正则表达式过滤&#xff0c;放置出现XSS等攻击&#xff0c…

java.lang.IllegalArgumentException: Does not contain a valid host:port authority: ignorethis

执行Hive语句运行MapReduce程序时突然出现这样的异常&#xff1a; Total MapReduce jobs 1 Launching Job 1 out of 1 Number of reduce tasks not specified. Estimated from input data size: 1 In order to change the average load for a reducer (in bytes):set hive.ex…

Oracle基础语句

1、创建表create table IT_EMPLOYEES(ENPLOYEES_ID NUMBER(6) NOT NULL UNIQUE,FIRST_NAME VARCHAR2(20),LAST_NAME VARCHAR2(25) NOT NULL,EMAIL VARCHAR2(25),PHONE_NUMBER VARCHAR2(20),JOB_ID VARCHAR2(10),SALARY NUMBER(8,2),MANAGER_ID NUMBER(6));2、--创建索引&#x…

Linux三剑客之grep 与 egrep

grep&#xff1a;Linux上文本处理三剑客1 grep&#xff1a;文本过滤(模式&#xff1a;pattern)工具; *&#xff08;grep, egrep, fgrep&#xff09; 2 sed&#xff1a;stream editor&#xff0c;文本编辑工具&#xff1b; 3 awk&#xff1a;Linux上的实现gawk&#xff0c;文本报…

WSGI直观形象的了解一下

1. 浏览器请求动态页面过程 2. WSGI 怎么在你刚建立的Web服务器上运行一个Django应用和Flask应用&#xff0c;如何不做任何改变而适应不同的web架构呢&#xff1f; 在以前&#xff0c;选择 Python web 架构会受制于可用的web服务器&#xff0c;反之亦然。如果架构和服务器可以…

安装Hbase(分布式)遇到一些问题及解决方法

问题一&#xff1a;安装完成后在Hbase shell 命令行执行list命令时&#xff0c;爆出如下错误&#xff1a; hbase(main):001:0> list TABLE …

安装MySql卡在Start Service的问题

我的情况&#xff1a;之前在windows下安装过5.6版本&#xff0c;卸载后&#xff0c;现在安装5.7版本&#xff0c;然后卡在Start Service这里&#xff0c;log日志没报任何错误&#xff0c;后来经过不断的尝试各种网上的办法终于把问题解决了。 问题的原因就是当初卸载5.6版本时…

学习进度条11

第十三周 日期 星期一 星期二 星期三 星期四 星期五 星期六 所花时间&#xff08;包括上课&#xff09; 19:10-22:20 (编程河北省科技信息通用调查系统) 8:00-10:00 (上课) 18:30-21:00 (Oracle实验) 14:00-16:30 (编程河北省科技信息通用调查系统) 18:20-22:30 (编…

Python面向切面编程是什么

简而言之就是装饰器 https://blog.csdn.net/qq_41856814/article/details/90146293