Hadoop入门(六)Mapreduce

一、Mapreduce概述

MapReduce是一个编程模型,用以进行大数据量的计算

 

二、Hadoop MapReduce

(1)MapReduce是什么

Hadoop MapReduce是一个软件框架,基于该框架能够容易地编写应用程序,这些应用程序能够运行在由上千个商用机器组成的大集群上,并以一种可靠的,具有容错能力的方式并行地处理上TB级别的海量数据集

Mapreduce的特点:

  1. 软件框架
  2. 并行处理
  3. 可靠且容错
  4. 大规模集群
  5. 海量数据集
     

(2)MapReduce做什么

MapReduce的思想就是“分而治之”

1)Mapper负责“分”
把复杂的任务分解为若干个“简单的任务”来处理。“简单的任务”包含三层含义:

  1. 数据或计算的规模相对原任务要大大缩小
  2. 就近计算原则,任务会分配到存放着所需数据的节点上进行计算
  3. 这些小任务可以并行计算彼此间几乎没有依赖关系

2)Reducer负责对map阶段的结果进行汇总
至于需要多少个Reducer,可以根据具体问题,
通过在mapred-site.xml配置文件里设置参数mapred.reduce.tasks的值,缺省值为1。
 

三、MapReduce工作机制

作业执行涉及4个独立的实体

  1. 客户端,用来提交MapReduce作业
  2. JobTracker,用来协调作业的运行
  3. TaskTracker,用来处理作业划分后的任务
  4. HDFS,用来在其它实体间共享作业文件

mapreduce作业工作流程图

Mapreduce作业的4个对象

  1. 客户端(client):编写mapreduce程序,配置作业,提交作业,这就是程序员完成的工作;
  2. JobTracker:初始化作业,分配作业,与TaskTracker通信,协调整个作业的执行;
  3. TaskTracker:保持与JobTracker的通信,在分配的数据片段上执行Map或Reduce任务,TaskTracker和JobTracker的不同有个很重要的方面,就是在执行任务时候TaskTracker可以有n多个,JobTracker则只会有一个(JobTracker只能有一个就和hdfs里namenode一样存在单点故障,我会在后面的mapreduce的相关问题里讲到这个问题的)
  4. Hdfs:保存作业的数据、配置信息等等,最后的结果也是保存在hdfs上面

 

mapreduce运行步骤1

 首先是客户端要编写好mapreduce程序,配置好mapreduce的作业也就是job,
接下来就是提交job了,提交job是提交到JobTracker上的,这个时候JobTracker就会构建这个job,具体就是分配一个新的job任务的ID值

接下来它会做检查操作,这个检查就是确定输出目录是否存在,如果存在那么job就不能正常运行下去,JobTracker会抛出错误给客户端,接下来还要检查输入目录是否存在,如果不存在同样抛出错误,如果存在JobTracker会根据输入计算输入分片(Input Split),如果分片计算不出来也会抛出错误,至于输入分片我后面会做讲解的,这些都做好了JobTracker就会配置Job需要的资源了。

分配好资源后,JobTracker就会初始化作业,初始化主要做的是将Job放入一个内部的队列,让配置好的作业调度器能调度到这个作业,作业调度器会初始化这个job,初始化就是创建一个正在运行的job对象(封装任务和记录信息),以便JobTracker跟踪job的状态和进程。

mapreduce运行步骤2

初始化完毕后,作业调度器会获取输入分片信息(input split),每个分片创建一个map任务。

接下来就是任务分配了,这个时候tasktracker会运行一个简单的循环机制定期发送心跳给jobtracker,心跳间隔是5秒,程序员可以配置这个时间,心跳就是jobtracker和tasktracker沟通的桥梁,通过心跳,jobtracker可以监控tasktracker是否存活,也可以获取tasktracker处理的状态和问题,同时tasktracker也可以通过心跳里的返回值获取jobtracker给它的操作指令。

任务分配好后就是执行任务了。在任务执行时候jobtracker可以通过心跳机制监控tasktracker的状态和进度,同时也能计算出整个job的状态和进度,而tasktracker也可以本地监控自己的状态和进度。当jobtracker获得了最后一个完成指定任务的tasktracker操作成功的通知时候,jobtracker会把整个job状态置为成功,然后当客户端查询job运行状态时候(注意:这个是异步操作),客户端会查到job完成的通知的。如果job中途失败,mapreduce也会有相应机制处理,一般而言如果不是程序员程序本身有bug,mapreduce错误处理机制都能保证提交的job能正常完成。

 

四、mapreduce运行机制

在Hadoop中,一个MapReduce作业会把输入的数据集切分为若干独立的数据块,由Map任务以完全并行的方式处理。框架会对Map的输出先进行排序,然后把结果输入给Reduce任务。

作业的输入和输出都会被存储在文件系统中,整个框架负责任务的调度和监控,以及重新执行已经关闭的任务。MapReduce框架和分布式文件系统是运行在一组相同的节点,计算节点和存储节点都是在一起的

(1) MapReduce的输入输出

一个MapReduce作业的输入和输出类型: 会有三组<key,value>键值对类型的存在

 

(2)Mapreduce作业的处理流程

 

按照时间顺序包括:
输入分片(input split)
map阶段
shuffle阶段:map shuffle(partition、sort/group、combiner、partition  merge)和reduce shuffle(copy、merge(sort/group))
reduce阶段

1)输入分片(input split)

在进行map计算之前,mapreduce会根据输入文件计算输入分片(input split),每个输入分片(input split)针对一个map任务
输入分片(input split)存储的并非数据本身,而是一个分片长度和一个记录数据的位置的数组,输入分片(input split)往往和hdfs的block(块)关系很密切

    假如我们设定hdfs的块的大小是64mb,如果我们输入有三个文件,大小分别是3mb、65mb和127mb,那么mapreduce会把3mb文件分为一个输入分片(input split),65mb则是两个输入分片(input split)而127mb也是两个输入分片(input split)
    即我们如果在map计算前做输入分片调整,例如合并小文件,那么就会有5个map任务将执行,而且每个map执行的数据大小不均,这个也是mapreduce优化计算的一个关键点。

2)map阶段
程序员编写好的map函数了,因此map函数效率相对好控制,而且一般map操作都是本地化操作也就是在数据存储节点上进行;

3)combiner阶段

combiner阶段是程序员可以选择的,combiner其实也是一种reduce操作,因此我们看见WordCount类里是用reduce进行加载的。

Combiner是一个本地化的reduce操作,它是map运算的后续操作,主要是在map计算出中间文件前做一个简单的合并重复key值的操作,例如我们对文件里的单词频率做统计,map计算时候如果碰到一个hadoop的单词就会记录为1,但是这篇文章里hadoop可能会出现n多次,那么map输出文件冗余就会很多,因此在reduce计算前对相同的key做一个合并操作,那么文件会变小,这样就提高了宽带的传输效率,毕竟hadoop计算力宽带资源往往是计算的瓶颈也是最为宝贵的资源,但是combiner操作是有风险的,使用它的原则是combiner的输入不会影响到reduce计算的最终输入,
例如:

如果计算只是求总数,最大值,最小值可以使用combiner,但是做平均值计算使用combiner的话,最终的reduce计算结果就会出错。

4)shuffle阶段

将map的输出作为reduce的输入的过程就是shuffle了。

5)reduce阶段

和map函数一样也是程序员编写的,最终结果是存储在hdfs上的。

 

五、Mapreduce框架的相关问题

jobtracker的单点故障

jobtracker和hdfs的namenode一样也存在单点故障,单点故障一直是hadoop被人诟病的大问题,为什么hadoop的做的文件系统和mapreduce计算框架都是高容错的,但是最重要的管理节点的故障机制却如此不好,我认为主要是namenode和jobtracker在实际运行中都是在内存操作,而做到内存的容错就比较复杂了,只有当内存数据被持久化后容错才好做,namenode和jobtracker都可以备份自己持久化的文件,但是这个持久化都会有延迟,因此真的出故障,任然不能整体恢复,另外hadoop框架里包含zookeeper框架,zookeeper可以结合jobtracker,用几台机器同时部署jobtracker,保证一台出故障,有一台马上能补充上,不过这种方式也没法恢复正在跑的mapreduce任务。

 

六、Mapreduce的单词计数实例

public class WordCount {public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {public void map(Object key, Text value, Context context) throws IOException, InterruptedException {}}public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {}}public static void main(String[] args) throws Exception {//… }
}

(1)Map

 public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{  private final static IntWritable one = new IntWritable(1);private Text word = new Text();       public void map(Object key, Text value, Context context) throws IOException, InterruptedException {StringTokenizer itr = new StringTokenizer(value.toString());while (itr.hasMoreTokens()) {word.set(itr.nextToken());context.write(word, one);}}}

map的方法

public void map(Object key, Text value, Context context) throws IOException, InterruptedException {…}
这里有三个参数,前面两个Object key, Text value就是输入的key和value,第三个参数Context context这是可以记录输入的key和value

例如:context.write(word, one);此外context还会记录map运算的状态。

(2)reduce

 public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {private IntWritable result = new IntWritable();public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}result.set(sum);context.write(key, result);}}

reduce函数的方法

public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {…}

  reduce函数的输入也是一个key/value的形式,
不过它的value是一个迭代器的形式Iterable<IntWritable> values,

也就是说reduce的输入是一个key对应一组的值的value,reduce也有context和map的context作用一致。

(3)main函数

    public static void main(String[] args) throws Exception {Configuration conf = new Configuration();String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length != 2) {System.err.println("Usage: wordcount <in> <out>");System.exit(2);}Job job = new Job(conf, "word count");job.setJarByClass(WordCount.class);job.setMapperClass(TokenizerMapper.class);job.setCombinerClass(IntSumReducer.class);job.setReducerClass(IntSumReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job, new Path(otherArgs[0]));FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}
Configuration conf = new Configuration();

运行mapreduce程序前都要初始化Configuration,该类主要是读取mapreduce系统配置信息,这些信息包括hdfs还有mapreduce,也就是安装hadoop时候的配置文件例如:core-site.xml、hdfs-site.xml和mapred-site.xml等等文件里的信息,有些童鞋不理解为啥要这么做,这个是没有深入思考mapreduce计算框架造成,我们程序员开发mapreduce时候只是在填空,在map函数和reduce函数里编写实际进行的业务逻辑,其它的工作都是交给mapreduce框架自己操作的,但是至少我们要告诉它怎么操作啊,比如hdfs在哪里啊,mapreduce的jobstracker在哪里啊,而这些信息就在conf包下的配置文件里。

    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length != 2) {System.err.println("Usage: wordcount <in> <out>");System.exit(2);}

If的语句好理解,就是运行WordCount程序时候一定是两个参数,如果不是就会报错退出。至于第一句里的GenericOptionsParser类,它是用来解释常用hadoop命令,并根据需要为Configuration对象设置相应的值,其实平时开发里我们不太常用它,而是让类实现Tool接口,然后再main函数里使用ToolRunner运行程序,而ToolRunner内部会调用GenericOptionsParser。

    Job job = new Job(conf, "word count");job.setJarByClass(WordCount.class);job.setMapperClass(TokenizerMapper.class);job.setCombinerClass(IntSumReducer.class);job.setReducerClass(IntSumReducer.class);

第一行就是在构建一个job,在mapreduce框架里一个mapreduce任务也叫mapreduce作业也叫做一个mapreduce的job,而具体的map和reduce运算就是task了,这里我们构建一个job,构建时候有两个参数,一个是conf这个就不累述了,一个是这个job的名称。
  
第二行就是装载程序员编写好的计算程序,例如我们的程序类名就是WordCount了。这里我要做下纠正,虽然我们编写mapreduce程序只需要实现map函数和reduce函数,但是实际开发我们要实现三个类,第三个类是为了配置mapreduce如何运行map和reduce函数,准确的说就是构建一个mapreduce能执行的job了,例如WordCount类。
  
第三行和第五行就是装载map函数和reduce函数实现类了,这里多了个第四行,这个是装载Combiner类,这个我后面讲mapreduce运行机制时候会讲述,其实本例去掉第四行也没有关系,但是使用了第四行理论上运行效率会更好。

    job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);

这个是定义输出的key/value的类型,也就是最终存储在hdfs上结果文件的key/value的类型。 

    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);

第一行就是构建输入的数据文件,
第二行是构建输出的数据文件,
最后一行如果job运行成功了,我们的程序就会正常退出。FileInputFormat和FileOutputFormat可以设置输入输出文件路径,
mapreduce计算时候,输入文件必须存在,要不直Mr任务直接退出。输出一般是一个文件夹,而且该文件夹不能存在。

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/323027.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Ocelot API网关的实现剖析

在微软Tech Summit 2017 大会上和大家分享了一门课程《.NET Core 在腾讯财付通的企业级应用开发实践》&#xff0c;其中重点是基于ASP.NET Core打造可扩展的高性能企业级API网关&#xff0c;以开源的API网关Ocelot为基础结合自己的业务特性&#xff0c;当天课程只有40分钟&…

Hadoop入门(十二)Intellij IDEA远程向hadoop集群提交mapreduce作业

Intellij IDEA远程向hadoop集群提交mapreduce作业&#xff0c;需要依赖到hadoop的库&#xff0c;hadoop集群的配置信息&#xff0c;还有本地项目的jar包。 一、软件环境 &#xff08;1&#xff09;window本地安装hadoop软件 首先将集群上的hadoop环境下载到本地&#xff0c;…

Spring AOP知识点简介

文章目录1、什么是AOP1.1、AOP术语1.2、AOP框架2、动态代理2.1、JDK动态代理2.2、CGLIB动态代理3、基于代理类的AOP实现3.1、Spring的通知类型3.2、ProxyFactoryBean4、AspectJ开发4.1、基于XML的声明式AspectJ4.2、基于注解的声明式AspectJ1、什么是AOP 面向切面编程&#xf…

SQL2017 Azure SQL新功能:图形数据库

图形数据库是什么呢&#xff1f;如果从字面理解是进行图形处理的数据库&#xff0c;那么你就错了哈哈。 我们先来解释什么是图形数据库。 图形数据库是NoSQL数据库的一种类型&#xff0c;它应用图形理论存储实体之间的关系信息。最常见的例子&#xff0c;就是社会网络中人与人之…

Hadoop入门(十三)远程提交wordCout程序到hadoop集群

一、项目结构 用到的文件有WordCount.java、core-site.xml、mapreduce-site.xml、yarn-site.xml、log4j.properties、pom.xml 二、项目源码 &#xff08;1&#xff09;WordCount.java package com.mk.mapreduce;import org.apache.hadoop.conf.Configuration; import org.ap…

腾讯云短信服务使用记录与.NET Core C#代码分享

1、即使是相同的短信签名与短信正文模板&#xff0c;也需要针对“国内文本短信”与“海外文本短信”分别申请。开始不知道&#xff0c;以为只要申请一次&#xff0c;给国外手机发短信时给api传对应的国家码就行&#xff0c;后来才发现需要分别申请。 2、短信服务web api响应“手…

Hadoop入门(九)Mapreduce高级shuffle之Combiner

一、Combiner的出现 &#xff08;1&#xff09;为什么需要进行Map规约操 作 在上述过程中&#xff0c;我们看到至少两个性能瓶颈&#xff1a; &#xff08;1&#xff09;如果我们有10亿个数据&#xff0c;Mapper会生成10亿个键值对在网络间进行传输&#xff0c;但如果我们只…

欢乐纪中某A组赛【2019.1.19】

前言 因为BBB有一堆(两道)题都做过&#xff0c;于是就来做A组了。 成绩 RankRankRank是有算别人的 RankRankRankPersonPersonPersonScoreScoreScoreAAABBBCCC3332017myself2017myself2017myself2102102101001001001001001001010102222222017lrz2017lrz2017lrz1001001000001001…

使用Identity Server 4建立Authorization Server (2)

第一部分: 使用Identity Server 4建立Authorization Server (1) 第一部分主要是建立了一个简单的Identity Server. 接下来继续: 建立Web Api项目 如图可以在同一个解决方案下建立一个web api项目: (可选)然后修改webapi的launchSettings.json, 我习惯使用控制台, 所以把IISExpr…

建立Vue脚手架的必要性

首先所有文件都放到一个html&#xff0c;代码多了之后阅读体验非常差。 其次建立这样的文件夹后&#xff0c;发现竟然不能随时更新&#xff0c;有缓存的情况

【实验手册】使用Visual Studio Code 开发.NET Core应用程序

.NET Core with Visual Studio Code 目录 概述... 2 先决条件... 2 练习1&#xff1a; 安装和配置.NET Core以及Visual Studio Code 扩展... 2 任务1&#xff1a;安装Visual Studio Code和.NET Core. 2 任务2&#xff1a;安装插件... 4 练习2&#xff1a;使用命令行界面构建. N…

Hadoop入门(八)Mapreduce高级shuffle之Partitioner

一、Partitioner概述 Map阶段总共五个步骤&#xff0c;2就是一个分区操作 哪个key到哪个Reducer的分配过程&#xff0c;是由Partitioner规定的。 二、Hadoop内置Partitioner MapReduce的使用者通常会指定Reduce任务和Reduce任务输出文件的数量&#xff08;R&#xff09;。 用…

在ASP.NET Core中使用AOP来简化缓存操作

前言 关于缓存的使用&#xff0c;相信大家都是熟悉的不能再熟悉了&#xff0c;简单来说就是下面一句话。 优先从缓存中取数据&#xff0c;缓存中取不到再去数据库中取&#xff0c;取到了在扔进缓存中去。 然后我们就会看到项目中有类似这样的代码了。 public Product Get(int p…

Hadoop入门(七)Mapreduce高级Shuffle

一、Shuffle概述 Reduce阶段三个步骤&#xff0c;Shuffle就是一个随机、洗牌操作 Shuffle是什么 针对多个map任务的输出按照不同的分区&#xff08;Partition&#xff09;通过网络复制到不同的reduce任务节点上&#xff0c;这个过程就称作为Shuffle。 二、Shuffle过程 &#…

methods中axios里的数据无法渲染到页面

最近在研究axios聊天室室遇到一个问题 将axios获取到的数据传递给data&#xff0c;从而改变页面中的数值&#xff0c;但是结果令人失望 这是data里的数据 原想将data中的items数组换成axios里的response.data&#xff0c;后来发现items一直为空&#xff0c;就拿字符串做实验了…

.NET Core跨平台的奥秘[上篇]:历史的枷锁

微软推出的第一个版本的.NET Framework是一个面向Windows桌面和服务器的基础框架&#xff0c;在此之后&#xff0c;为此微软根据设备自身的需求对.NET Framework进行裁剪&#xff0c;不断推出了针对具体设备类型的.NET Framework版本以实现针对移动、平板和嵌入式设备提供支持。…

Hadoop入门(十)Mapreduce高级shuffle之Sort和Group

一、排序分组概述 MapReduce中排序和分组在哪里被执行 第3步中需要对不同分区中的数据进行排序和分组&#xff0c;默认情况按照key进行排序和分组 二、排序 在Hadoop默认的排序算法中&#xff0c;只会针对key值进行排序 任务&#xff1a; 数据文件中&#xff0c;如果按照第一…

使用Identity Server 4建立Authorization Server (3)

预备知识: 学习Identity Server 4的预备知识 第一部分: 使用Identity Server 4建立Authorization Server (1) 第二部分: 使用Identity Server 4建立Authorization Server (2) 上一部分简单的弄了个web api 并通过Client_Credentials和ResourceOwnerPassword两种方式获取token然…

php接口跨域问题

报错是因为接口跨域&#xff0c;不允许访问 只需在php头部加入此行代码就行了 header(Access-Control-Allow-Origin:*);

spring boot输出hello world几种方法

1、手动配置&#xff0c;三个文件 打开创建maven,创建这三个文件从上到下依次复制即可 配置文件&#xff08;重要&#xff09;&#xff08;否则后面会报错&#xff09; pom.xml <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://www.w…