Map-Reduce入门

1、Map-Reduce的逻辑过程

假设我们需要处理一批有关天气的数据,其格式如下:

  • 按照ASCII码存储,每行一条记录
  • 每一行字符从0开始计数,第15个到第18个字符为年
  • 第25个到第29个字符为温度,其中第25位是符号+/-

0067011990999991950051507+0000+

0043011990999991950051512+0022+

0043011990999991950051518-0011+

0043012650999991949032412+0111+

0043012650999991949032418+0078+

0067011990999991937051507+0001+

0043011990999991937051512-0002+

0043011990999991945051518+0001+

0043012650999991945032412+0002+

0043012650999991945032418+0078+

现在需要统计出每年的最高温度。

Map-Reduce主要包括两个步骤:Map和Reduce

每一步都有key-value对作为输入和输出:

  • map阶段的key-value对的格式是由输入的格式所决定的,如果是默认的TextInputFormat,则每行作为一个记录进程处理,其中key为此行的开头相对于文件的起始位置,value就是此行的字符文本
  • map阶段的输出的key-value对的格式必须同reduce阶段的输入key-value对的格式相对应

对于上面的例子,在map过程,输入的key-value对如下:

(0, 0067011990999991950051507+0000+)

(33, 0043011990999991950051512+0022+)

(66, 0043011990999991950051518-0011+)

(99, 0043012650999991949032412+0111+)

(132, 0043012650999991949032418+0078+)

(165, 0067011990999991937051507+0001+)

(198, 0043011990999991937051512-0002+)

(231, 0043011990999991945051518+0001+)

(264, 0043012650999991945032412+0002+)

(297, 0043012650999991945032418+0078+)

在map过程中,通过对每一行字符串的解析,得到年-温度的key-value对作为输出:

(1950, 0)

(1950, 22)

(1950, -11)

(1949, 111)

(1949, 78)

(1937, 1)

(1937, -2)

(1945, 1)

(1945, 2)

(1945, 78)

在reduce过程,将map过程中的输出,按照相同的key将value放到同一个列表中作为reduce的输入

(1950, [0, 22, –11])

(1949, [111, 78])

(1937, [1, -2])

(1945, [1, 2, 78])

在reduce过程中,在列表中选择出最大的温度,将年-最大温度的key-value作为输出:

(1950, 22)

(1949, 111)

(1937, 1)

(1945, 78)

其逻辑过程可用如下图表示:

image

2、编写Map-Reduce程序

编写Map-Reduce程序,一般需要实现两个函数:mapper中的map函数和reducer中的reduce函数。

一般遵循以下格式:

  • map: (K1, V1)  ->  list(K2, V2)

public interface Mapper<K1, V1, K2, V2> extends JobConfigurable, Closeable {

  void map(K1 key, V1 value, OutputCollector<K2, V2> output, Reporter reporter)

  throws IOException;

}

  • reduce: (K2, list(V))  ->  list(K3, V3) 

public interface Reducer<K2, V2, K3, V3> extends JobConfigurable, Closeable {

  void reduce(K2 key, Iterator<V2> values,

              OutputCollector<K3, V3> output, Reporter reporter)

    throws IOException;

}

 

对于上面的例子,则实现的mapper如下:

public class MaxTemperatureMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {

    @Override

    public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {

        String line = value.toString();

        String year = line.substring(15, 19);

        int airTemperature;

        if (line.charAt(25) == '+') {

            airTemperature = Integer.parseInt(line.substring(26, 30));

        } else {

            airTemperature = Integer.parseInt(line.substring(25, 30));

        }

        output.collect(new Text(year), new IntWritable(airTemperature));

    }

}

实现的reducer如下:

public class MaxTemperatureReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {

    public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {

        int maxValue = Integer.MIN_VALUE;

        while (values.hasNext()) {

            maxValue = Math.max(maxValue, values.next().get());

        }

        output.collect(key, new IntWritable(maxValue));

    }

}

 

欲运行上面实现的Mapper和Reduce,则需要生成一个Map-Reduce得任务(Job),其基本包括以下三部分:

  • 输入的数据,也即需要处理的数据
  • Map-Reduce程序,也即上面实现的Mapper和Reducer
  • 此任务的配置项JobConf

欲配置JobConf,需要大致了解Hadoop运行job的基本原理:

  • Hadoop将Job分成task进行处理,共两种task:map task和reduce task
  • Hadoop有两类的节点控制job的运行:JobTracker和TaskTracker
    • JobTracker协调整个job的运行,将task分配到不同的TaskTracker上
    • TaskTracker负责运行task,并将结果返回给JobTracker
  • Hadoop将输入数据分成固定大小的块,我们称之input split
  • Hadoop为每一个input split创建一个task,在此task中依次处理此split中的一个个记录(record)
  • Hadoop会尽量让输入数据块所在的DataNode和task所执行的DataNode(每个DataNode上都有一个TaskTracker)为同一个,可以提高运行效率,所以input split的大小也一般是HDFS的block的大小。
  • Reduce task的输入一般为Map Task的输出,Reduce Task的输出为整个job的输出,保存在HDFS上。
  • 在reduce中,相同key的所有的记录一定会到同一个TaskTracker上面运行,然而不同的key可以在不同的TaskTracker上面运行,我们称之为partition
    • partition的规则为:(K2, V2) –> Integer, 也即根据K2,生成一个partition的id,具有相同id的K2则进入同一个partition,被同一个TaskTracker上被同一个Reducer进行处理。

public interface Partitioner<K2, V2> extends JobConfigurable {

  int getPartition(K2 key, V2 value, int numPartitions);

}

下图大概描述了Map-Reduce的Job运行的基本原理:

image

 

下面我们讨论JobConf,其有很多的项可以进行配置:

  • setInputFormat:设置map的输入格式,默认为TextInputFormat,key为LongWritable, value为Text
  • setNumMapTasks:设置map任务的个数,此设置通常不起作用,map任务的个数取决于输入的数据所能分成的input split的个数
  • setMapperClass:设置Mapper,默认为IdentityMapper
  • setMapRunnerClass:设置MapRunner, map task是由MapRunner运行的,默认为MapRunnable,其功能为读取input split的一个个record,依次调用Mapper的map函数
  • setMapOutputKeyClass和setMapOutputValueClass:设置Mapper的输出的key-value对的格式
  • setOutputKeyClass和setOutputValueClass:设置Reducer的输出的key-value对的格式
  • setPartitionerClass和setNumReduceTasks:设置Partitioner,默认为HashPartitioner,其根据key的hash值来决定进入哪个partition,每个partition被一个reduce task处理,所以partition的个数等于reduce task的个数
  • setReducerClass:设置Reducer,默认为IdentityReducer
  • setOutputFormat:设置任务的输出格式,默认为TextOutputFormat
  • FileInputFormat.addInputPath:设置输入文件的路径,可以使一个文件,一个路径,一个通配符。可以被调用多次添加多个路径
  • FileOutputFormat.setOutputPath:设置输出文件的路径,在job运行前此路径不应该存在

当然不用所有的都设置,由上面的例子,可以编写Map-Reduce程序如下:

public class MaxTemperature {

    public static void main(String[] args) throws IOException {

        if (args.length != 2) {

            System.err.println("Usage: MaxTemperature <input path> <output path>");

            System.exit(-1);

        }

        JobConf conf = new JobConf(MaxTemperature.class);

        conf.setJobName("Max temperature");

        FileInputFormat.addInputPath(conf, new Path(args[0]));

        FileOutputFormat.setOutputPath(conf, new Path(args[1]));

        conf.setMapperClass(MaxTemperatureMapper.class);

        conf.setReducerClass(MaxTemperatureReducer.class);

        conf.setOutputKeyClass(Text.class);

        conf.setOutputValueClass(IntWritable.class);

        JobClient.runJob(conf);

    }

}

3、Map-Reduce数据流(data flow)

Map-Reduce的处理过程主要涉及以下四个部分:

  • 客户端Client:用于提交Map-reduce任务job
  • JobTracker:协调整个job的运行,其为一个Java进程,其main class为JobTracker
  • TaskTracker:运行此job的task,处理input split,其为一个Java进程,其main class为TaskTracker
  • HDFS:hadoop分布式文件系统,用于在各个进程间共享Job相关的文件

image

3.1、任务提交

JobClient.runJob()创建一个新的JobClient实例,调用其submitJob()函数。

  • 向JobTracker请求一个新的job ID
  • 检测此job的output配置
  • 计算此job的input splits
  • 将Job运行所需的资源拷贝到JobTracker的文件系统中的文件夹中,包括job jar文件,job.xml配置文件,input splits
  • 通知JobTracker此Job已经可以运行了

提交任务后,runJob每隔一秒钟轮询一次job的进度,将进度返回到命令行,直到任务运行完毕。

 

3.2、任务初始化

 

当JobTracker收到submitJob调用的时候,将此任务放到一个队列中,job调度器将从队列中获取任务并初始化任务。

初始化首先创建一个对象来封装job运行的tasks, status以及progress。

在创建task之前,job调度器首先从共享文件系统中获得JobClient计算出的input splits。

其为每个input split创建一个map task。

每个task被分配一个ID。

 

3.3、任务分配

 

TaskTracker周期性的向JobTracker发送heartbeat。

在heartbeat中,TaskTracker告知JobTracker其已经准备运行一个新的task,JobTracker将分配给其一个task。

在JobTracker为TaskTracker选择一个task之前,JobTracker必须首先按照优先级选择一个Job,在最高优先级的Job中选择一个task。

TaskTracker有固定数量的位置来运行map task或者reduce task。

默认的调度器对待map task优先于reduce task

当选择reduce task的时候,JobTracker并不在多个task之间进行选择,而是直接取下一个,因为reduce task没有数据本地化的概念。

 

3.4、任务执行

 

TaskTracker被分配了一个task,下面便要运行此task。

首先,TaskTracker将此job的jar从共享文件系统中拷贝到TaskTracker的文件系统中。

TaskTracker从distributed cache中将job运行所需要的文件拷贝到本地磁盘。

其次,其为每个task创建一个本地的工作目录,将jar解压缩到文件目录中。

其三,其创建一个TaskRunner来运行task。

TaskRunner创建一个新的JVM来运行task。

被创建的child JVM和TaskTracker通信来报告运行进度。

 

3.4.1、Map的过程

MapRunnable从input split中读取一个个的record,然后依次调用Mapper的map函数,将结果输出。

map的输出并不是直接写入硬盘,而是将其写入缓存memory buffer。

当buffer中数据的到达一定的大小,一个背景线程将数据开始写入硬盘。

在写入硬盘之前,内存中的数据通过partitioner分成多个partition。

在同一个partition中,背景线程会将数据按照key在内存中排序。

每次从内存向硬盘flush数据,都生成一个新的spill文件。

当此task结束之前,所有的spill文件被合并为一个整的被partition的而且排好序的文件。

reducer可以通过http协议请求map的输出文件,tracker.http.threads可以设置http服务线程数。

3.4.2、Reduce的过程

当map task结束后,其通知TaskTracker,TaskTracker通知JobTracker。

对于一个job,JobTracker知道TaskTracer和map输出的对应关系。

reducer中一个线程周期性的向JobTracker请求map输出的位置,直到其取得了所有的map输出。

reduce task需要其对应的partition的所有的map输出。

reduce task中的copy过程即当每个map task结束的时候就开始拷贝输出,因为不同的map task完成时间不同。

reduce task中有多个copy线程,可以并行拷贝map输出。

当很多map输出拷贝到reduce task后,一个背景线程将其合并为一个大的排好序的文件。

当所有的map输出都拷贝到reduce task后,进入sort过程,将所有的map输出合并为大的排好序的文件。

最后进入reduce过程,调用reducer的reduce函数,处理排好序的输出的每个key,最后的结果写入HDFS。

 

image

 

3.5、任务结束

 

当JobTracker获得最后一个task的运行成功的报告后,将job得状态改为成功。

当JobClient从JobTracker轮询的时候,发现此job已经成功结束,则向用户打印消息,从runJob函数中返回。

转载于:https://www.cnblogs.com/JohnLiang/archive/2011/11/09/2243448.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/276945.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

clickhouse大数据分析技术与实战_从销售到经营——大客户销售策略与实战技术...

对于首席客户代表而言&#xff0c;要走出困局&#xff0c;所需要大客户销售策略性的训练&#xff0c;而不是像基层客户经理的销售技巧训练一样&#xff1b;新业务的学习固然重要&#xff0c;但更重要的是转化成实战绩效。从组织变革角度&#xff0c;每次成功的业务转型背后都意…

Acer 4750 安装黑苹果_黑苹果系统安装通用教程图文版

在开始之前&#xff0c;不管你要安装的是台式组装机&#xff0c;台式品牌机&#xff0c;一体机&#xff0c;还是笔记本&#xff0c;都要大概了解一下硬件信息。因为黑苹果的安装确实比安装Windows的系统要复杂的多。不管是前期准备工作&#xff0c;安装&#xff0c;还是安装之后…

mysql 快速生成百万条测试数据

转自&#xff1a;http://www.cnblogs.com/jiangxiaobo/p/6101072.html 1、生成思路 利用mysql内存表插入速度快的特点&#xff0c;先利用函数和存储过程在内存表中生成数据&#xff0c;然后再从内存表插入普通表中2、创建内存表及普通表 CREATE TABLE vote_record_memory (id I…

java JVM

每一个Java虚拟机都由一个类加载器子系统&#xff08;class loader subsystem&#xff09;&#xff0c;负责加载程序中的类型&#xff08;类和接口&#xff09;&#xff0c;并赋予唯一的名字。每一个Java虚拟机都有一个执行引擎&#xff08;execution engine&#xff09;负责执…

给Domino系统管理员的十二项建议

Domino系统管理员的日常工作就是维护Domino系统的正常运行。以下简要说明了管理员所必做的一些工作。对于系统管理员&#xff0c;特别是新建系统的管理员来说&#xff0c;这些建议能帮助他们完成基本的维护工作。 根据许多资深的Domino管理员和咨询人员的经验&#xff0c;我们对…

技术管理—管理书籍推荐

技术出身&#xff0c;考虑接触下管理方面的知识。也许管理真的适合你&#xff0c;角色认知角色实践角色胜任&#xff01;最后爱上它&#xff01; 我最喜欢的一本书--高效能人士的七个习惯 作者&#xff1a;史蒂芬柯维&#xff08;Stephen Richards Covey&#xff09; 该…

网络虚拟化有几种实现方式_停车场管理系统的防砸车功能有几种方式?如何实现?...

原标题&#xff1a;停车场管理系统的防砸车功能有几种方式&#xff1f;如何实现&#xff1f;前言0101正文一、压力波防砸装置也叫遇阻防砸&#xff0c;主要是安装遇阻返回装置&#xff0c;当道闸杆下落过程中接触到车辆或者行人(接触力度是可以调节的)&#xff0c;装置道闸杆底…

aspnet中gridview文本只显示开始几个文本_软网推荐:三个小软件 轻松解决文本操作难题...

TXT文本操作在Windows操作中算是比较容易的事了&#xff0c;但简单的文本操作也会遇到难题。例如&#xff0c;对于我们反复需要使用的多个信息&#xff0c;如果仅靠CtrlC和CtrlV来回复制、粘贴&#xff0c;效率会极低&#xff1b;再如&#xff0c;对于一些软件组件中显示的文本…

刚被IBM收购的红帽,它的下一站是中国

前不久IBM斥资340亿美元收购红帽的新闻震惊了所有人&#xff0c;这个金额是互联网上第三大交易&#xff0c;也是开源史上最大交易。这个收购背后到底有哪些目的&#xff1f;红帽接下来会做什么&#xff1f;11月6日红帽在北京举办红帽论坛&#xff0c;向外界介绍了红帽的想法。 …

为什么onenote一直在加载_OneNote:科研笔记独一无二的无敌利器

每个人都梦想着自己有超乎常人的记忆力&#xff0c;拥有者过目不忘的技能&#xff0c;从此走向人生巅峰……然而我们都不是那样的人&#xff0c;在这个高速发展的数字新信息时代&#xff0c;进行有效的记忆&#xff0c;保存我们随时到来的灵感等&#xff0c;这就需要我们进行笔…

WPF 实现 DataGrid/ListView 分页控件

原文:WPF 实现 DataGrid/ListView 分页控件在WPF中&#xff0c;通常会选用DataGrid/ListView进行数据展示&#xff0c;如果数据量不多&#xff0c;可以直接一个页面显示出来。如果数据量很大&#xff0c;2000条数据&#xff0c;一次性显示在一个页面中&#xff0c;不仅消耗资源…

Sql Server 中汉字处理排序规则,全角半角

--1. 为数据库指定排序规则CREATEDATABASEdb COLLATE Chinese_PRC_CI_ASGOALTERDATABASEdb COLLATE Chinese_PRC_BINGO/**//**/--2. 为表中的列指定排序规则CREATETABLEtb(col1 varchar(10),col2 varchar(10) COLLATE Chinese_PRC_CI_AS)GOALTERTABLEtb ADDcol3 varchar(10) CO…

解决局域网设置固定IP后无法上网?

1.cmd中输入ipconfig /all查看ip和dns的状态 2.查看自动获取的dns是什么,然后手动设置ip和dns时,和自动获取的保持一样即可 注解&#xff1a;设置后还是无法上网后主要检查ip与dns是否设置错误. 转载于:https://www.cnblogs.com/yanans/p/11301061.html

鼠标输入

一、隐藏并捕捉光标 偏航角和俯仰角是通过鼠标移动获得的&#xff0c;水平的移动影响偏航角&#xff0c;竖直的移动影响俯仰角。 原理是&#xff0c;存储上一帧鼠标的位置&#xff0c;在当前帧中计算鼠标位置与上一帧的位置相差多少。如果水平/竖直差别越大&#xff0c;那么俯仰…

c#用canny算子做边缘提取_机器视觉学习(三)边缘检测

一、边缘检测二、边缘检测流程三、Canny边缘检测前言边缘检测是图像处理和计算机视觉中&#xff0c;尤其是特征提取中的一个研究领域。有许多方法用于边缘检测&#xff0c;它们的绝大部分可以划分为两类&#xff1a;基于一阶导数首先计算边缘强度&#xff0c; 通常用一阶导数表…

bindresult必须在哪个位置_手机视频剪辑工具哪个好?清爽视频编辑APP有人推荐吗?...

作为一个非常喜欢旅游的人&#xff0c;每次出门在外都喜欢发各种照片&#xff0c;以前发照片觉得就能够表达自己的状态和心情&#xff0c;但是随着时间的变化发现&#xff0c;身边的人都开始喜欢发视频了。此前在飞机上拍摄了一段觉得不错的天空视频&#xff0c;想要制作成短片…

无法访问你试图使用的功能所在的网络位置_[steam实用工具]解决无法访问商店/社区/好友列表的问题...

[steam实用工具]解决无法访问商店/社区/好友列表的问题在我们使用steam的过程中&#xff0c;由于某些原因&#xff0c;在访问商店/社区/好友列表时会被受到限制。针对这种情况&#xff0c;国内的大神些开发出了以下工具来解决我们访问的难题。本文章中的软件由“羽翼诚"大…

(转)用Java获得当前性能信息

(转&#xff09;用Java获得当前性能信息 http://www.blogjava.net/amigoxie/archive/2008/04/30/197564.html在Java中&#xff0c;可以获得总的物理内存、剩余的物理内存、已使用的物理内存等信息&#xff0c;本例讲解如何取得这些信息&#xff0c;并且获得在Windows下的内存使…

docker wsl2启动不了_Docker学习笔记

在笔记本上主要还是想以轻量、方便为主&#xff0c;所以采用的是在WSL2中使用docker的这么一个方案。WSL2我笔记本原来是预装的是WIN10家庭版&#xff0c;需要先升级为专业版&#xff0c;并加入windows预览体验计划。更新完之后&#xff0c;安装WSL&#xff0c;我选择的是Ubunt…

网易马进:DDB从分布式数据库到结构化数据中心的架构变迁

导语&#xff1a; 本文根据马进老师在2018年5月10日【第九届中国数据库技术大会(DTCC)】现场演讲内容整理而成。马进 网易 DDB项目负责人来自网易杭研大数据平台组&#xff0c;入职以来先后参与了分布式数据库DDB&#xff0c;缓存NKV&#xff0c;网易数据运河NDC等项目&#xf…