引入
前面我们已经深入了HDFS的设计与实现,对于分布式系统也有了不错的理解。
但HDFS仅仅解决了海量数据存储和读写的问题。要想让数据产生价值,一定是需要从数据中挖掘出价值才行,这就需要我们拥有海量数据的计算处理能力。
下面我们还是老样子,来数据一下要实现海量计算处理能力,有些什么核心痛点
大数据计算核心痛点
量级大
在稍微大一点的互联网企业,需要计算处理的数据量都开始以PB计了。而传统的计算处理模型中,一个程序所能调度的网络带宽通常在数百MB、内存容量通常就几十GB 、磁盘大小通常也就数TB,根本解决不了这么大量级的数据计算需求。(什么?我打宿傩?)
易用差
虽然在04年已经有了分布式计算,但是那个时候的分布式计算都是专用的系统,只能专门处理某一类计算,比如进行大规模数据的排序。这样的系统没办法复用到其他的大数据计算场景,每一种应用都需要开发与维护专门的系统。很难让让没有分布式系统知识和经验的人,可以快速简便地去利用分布式计算处理海量数据。
门槛高
而且因为分布式系统中遇到故障和失败,是一个很常见的问题,传统的分布式程序设计(如MPI)非常复杂,用户需要关注的细节非常多,比如数据分片、数据传输、节点间通信等,因而设计分布式程序的门槛非常高。
容错差
在分布式环境下,随着集群规模的增加,集群中的故障率会显著增加,进而导致任务失败和数据丢失的可能性增加。
这里的“故障”主要指磁盘损坏、机器宕机、节点间通信失败等硬件故障和坏数据,以及用户程序bug产生的软件故障等。
Hadoop MapReduce设计
针对这些痛点,MapReduce的核心设计目标:在保障扩展性和容错性的前提下,提升海量数据计算处理的易用性!
而它的实现的核心思路也很简答,就是通过开发统一通用的编程模型,并构建一个抽象和高层的编程接口和框架,屏蔽分布式领域的复杂问题,让开发者能够专注于分析程序的业务逻辑。
模型本质就是对现实世界中某种事物或现象的一种概括、抽象的表示。 比如函数是输入和输出之间关系的抽象;数学公式是对物理与数学规律的抽象;软件架构图是软件工程师对软件系统的抽象。
通过前面深入HDFS的篇章,我们知道,从本质上来看,HDFS就是通过抽象、封装的思想,把成百上千台服务器、成千上万块硬盘的硬件做了一个封装,屏蔽了底层复杂实现,让使用者可以把它当成一块硬盘来使用,这极大的降低了它的使用门槛。
其实无论是什么领域,学会去抽象总结,才能把握事物的内在规律,而不是被纷繁复杂的事物表象所迷惑,才能更进一步深刻地认识这个世界!
为了让大家先对MapReduce这个通用模型有个初步的概念,我举个通俗的例子——假设我们有一大堆杂乱无序的相同品牌的扑克牌,要快速把它们梳理出有多少副完整的扑克牌,就可以分几个组,一组人分别梳理一堆牌,把相同的牌放到一个位置,然后下一组人基于这些牌去计数不同的牌有多少张,最后汇总起来,就能知道可以组成多少副完整的扑克牌了。
下面我们看看Hadoop MapReduce的设计与落地的总体思路。
参考思想:Unix设计哲学&Unix下的Bash和管道
MapReduce的设计哲学和Unix是一样的,叫做 “Do one thing, and do it well”,也就是每个模块只做一件事情,但是把这件事情彻底做好。
而MapReduce的计算流程设计思想,也是参考了Unix系统中,利用一个个命令,通过管道把数据处理流程串接起来处理的模式。
参考设计:Lisp类函数式编程
其实MapReduce编程模型并不是Hadoop原创,甚至也不是Google原创,而是借鉴了Lisp(python、scala)这类函数式编程语言的思想。
熟悉Java Stream API的都对这种编程模式并不陌生,它实际上就是map、groupingBy、reduce之类的操作,这种编程模型分离了程序的业务逻辑和控制逻辑,使得程序在大规模的分布式环境下运行成为了可能。
另外,尽管MapReduce编程模型非常简单,现实中的大多数任务却都可以用这种编程模型来表达,这在函数式编程语言中已经得到了证明,它为MapReduce后来广泛地流行奠定了基础。
参考论文:MapReduce: Simplified data processing on large cluster
Google在2004年发布的这篇论文也是大数据的三驾马车之一!
该论文主要包含下面内容:
-
MapReduce的计算模型和应用场景;
-
MapReduce实际是如何实现的,使得开发者无需关心分布式的存在;
-
如何逐步迭代优化MapReduce的性能。
这里就不单独开一篇文章介绍了,下面我们把论文里面的核心内容梳理一下。
需求场景
第一种,是对所有的数据,都只需要单条数据就能完成处理。
比如,有很多网页的内容,要从里面提取出来每一个网页的标题。这样的计算可以完全并行化。
第二种,是需要汇总多条数据才能完成计算。
比如,要统计日志里面某个URL被访问了多少次,只需要简单累加就可以了。
比如统计某个URL下面的唯一用户数,就需要将所有相同URL的数据,搬运到同一个计算节点上进行处理。不过,在搬运之后,不同的URL还是可以放到不同的节点进行处理的。
第三种,自然是一、二两种情况的组合了。
比如,先从网页数据里面,提取出网页的URL和标题,然后根据标题里面的关键字,统计特定关键字出现在多少个不同的URL里面,这就需要同时采用一二这两种情况的操作。
当然还有更复杂的数据操作,但是这些动作也都可以抽象成前面的两个动作的组合。因为无非,要处理的数据要么是完全独立的,要么需要多条数据之间的依赖。
计算模型
前面需求场景的
第一种动作,就是 MapReduce 里面的 Map
Map 函数,顾名思义就是一个映射函数,它会接受一个 key-value 对,然后把这个 key-value 对转换成0到多个新的 key-value 对并输出出去。
第二种动作,就是 MapReduce 里面的 Reduce
Reduce 函数,则是一个化简函数,它接受一个 Key,以及这个 Key 下的一组 Value,然后化简成一组新的值 Value 输出出去。
Map 函数的输出结果,会被整个 MapReduce 程序接手,进行 shuffle 操作。也就是数据搬运的过程。
shuffle 会把 Map 函数输出的所有相同的 Key 的 Value 整合到一个列表中,给到 Reduce 函数。并且给到 Reduce 函数的 Key,在每个 Reduce 里,都是按照 Key 排好序的。
它们就构建了 MapReduce 的计算模型
注意:
shuffle 过程的排序操作,并不是 MapReduce 框架本身的核心需求,而是为了技术上实现方便。因为我们要把相同 Key 的数据放到一起处理,而通过一个 HashMap 把所有的数据放在内存里,又不一定放得下。那么利用硬盘进行外部排序是一个最简单的,没有内存大小依赖的,对数据根据 Key 进行分组的解决办法。
MapReduce 计算模型的设计,其实就是典型的模版方法模式(Template Method Pattern)。
与其说它是一个分布式数据处理系统,不如说是分布式数据处理框架。
因为 MapReduce 框架已经设定好了整个数据处理的流程,用户只需要实现 Map 和 Reduce 这两个接口函数,就能完成海量的数据处理。
应用场景
论文里列了以下六个应用场景:
-
分布式 grep ;
-
统计 URL 的访问频次;
-
反转网页-链接图;
-
分域名的词向量;
-
生成倒排索引;
-
分布式排序。
实现挑战
要想让写 Map 和 Reduce 函数的人不需要关心“分布式”的存在,那么 MapReduce 框架本身就需要解决好三个很重要的问题:
-
第一个,自然是如何做好各个服务器节点之间的“协同”,以及解决出现各种软硬件问题后的“容错”这两部分的设计。
-
第二个,性能问题。MapReduce 框架非常容易遇到网络性能瓶颈。尽量充分利用 MapReduce 集群的计算能力,并让整个集群的性能可以随硬件的增加接近于线性增长,可以说是非常大的一个挑战。
-
最后一个,易用性问题。Map 函数和 Reduce 函数最终还是运行在多个不同的机器上的,并且在 Map 和 Reduce 函数中还会遇到各种千奇百怪的数据。当我们的程序在遭遇到奇怪的数据出错的时候,我们需要有办法来进行 debug。
MapReduce 的协同
MapReduce的集群,通常就是分布式存储系统GFS的集群。
在这个集群里,本身会有一个调度系统(Scheduler)。
当我们要运行一个MapReduce任务的时候,其实就是把整个MapReduce的任务提交给这个调度系统,让这个调度系统来分配和安排 Map 函数和 Reduce 函数,以及后面会提到的 master 在不同的硬件上运行。
在MapReduce任务提交了之后,整个MapReduce任务就会按照这样的顺序来执行:
-
第一步,由于写好的MapReduce程序,已经指定了输入路径。所以MapReduce会先找到GFS 上的对应路径,然后把对应路径下的所有数据进行分片(Split)。每个分片的大小通常是 64MB,这个尺寸也是GFS里面一个块(Block)的大小。接着,MapReduce 会在整个集群上,启动很多个MapReduce程序的复刻(fork)进程。
-
第二步,在这些进程中,有一个和其他不同的特殊进程,就是一个master进程,剩下的都是worker进程。然后,会有M个map的任务以及R个 reduce 的任务,分配给这些worker进程去进行处理。这里的master进程,是负责找到空闲的(idle)worker进程,然后再把map任务或者reduce任务,分配给worker进程去处理。
这里需要注意一点,并不是每一个map和reduce任务,都会单独建立一个新的worker 进程来执行。而是master进程会把map和reduce任务分配给有限的worker,因为一个worker通常可以顺序地执行多个map 和reduce 的任务。
-
第三步,被分配到map任务的worker会读取某一个分片,分片里的数据会变成一个个key-value对喂给map任务,然后等Map函数计算完后,会生成的新的key-value对缓冲在内存里。
-
第四步,这些缓冲了的key-value对,会定期地写到map任务所在机器的本地硬盘上。
并且按照一个分区函数(partitioning function),把输出的数据分成R个不同的区域。
而这些本地文件的位置,会被worker传回给到master节点,再由master节点将这些地址转发给reduce任务所在的worker 那里。
-
第五步,运行reduce任务的worker,在收到master的通知之后,会通过RPC(远程过程调用)来从map任务所在机器的本地磁盘上,抓取数据。当reduce任务的worker 获取到所有的中间文件之后,它就会将中间文件根据Key进行排序。这样,所有相同Key的Value 的数据会被放到一起,也就是完成了混洗(Shuffle)的过程。
-
第六步,reduce会对排序后的数据执行实际的Reduce函数,并把reduce的结果输出到当前这个reduce分片的最终输出文件里。
-
第七步,当所有的map任务和reduce任务执行完成之后,master会唤醒启动MapReduce任务的用户程序,然后回到用户程序里,往下执行MapReduce任务提交之后的代码逻辑。
整个MapReduce的执行过程,也是一个典型的 Master-Slave 的分布式系统。map和 reduce所在的worker之间并不会直接通信,它们都只和master通信。另外,像是map 的输出数据在哪里这样的信息,也是告诉master,让master转达给reduce 所在的 worker。reduce从map里获取数据,也是直接拿到数据所在的地址去抓取,而不是让reduce通过RPC,调用map所在的worker去获取数据。
MapReduce 的容错(Fault Tolerance)
MapReduce的容错机制非常简单,可以简单地用两个关键词来描述,就是重新运行和写Checkpoints。
worker 节点的失效(Master Failure)
对于Worker 节点的失效,MapReduce框架解决问题的方式非常简单。就是换一台服务器重新运行这个Worker节点被分配到的所有任务。master节点会定时地去ping每一个worker 节点,一旦worker节点没有响应,就会认为这个节点失效了。于是,master会重新在另一台服务器上,启动一个worker进程,并且在新的worker进程所在的节点上,重新运行所有失效节点上被分配到的任务。而无论失效节点上,之前的map和 reduce任务是否执行成功,这些任务都会重新运行。因为在节点ping不通的情况下,很难保障它的本地硬盘还能正常访问。
master 节点的失效(Worker Failure)
对于 master节点的失效,直接就任由master节点失败了,也就是整个MapReduce任务失败了。而对于开发者来说,解决这个问题的办法也很简单,就是再次提交一下任务去重试。
因为master进程在整个任务中只有一个,它会失效的可能性很小。而MapReduce的任务也是一个用户离线数据处理的任务,并不是一个实时在线的服务,失败重来通常也没有什么影响,只是晚一点拿到数据结果罢了。
虽然在论文发表的时候,谷歌并没有实现对于master的失效自动恢复机制,但他们也给出了一个很简单的解决方案,那就是让master定时把它里面存放的信息,作为一个个的Checkpoint写入到硬盘中去。
针对这个其实可以把这个Checkpoint直接写到GFS里,然后让调度系统监控master。这样一旦master失效,就可以启动一个新的master,来读取Checkpoints 数据,然后就可以恢复任务,并继续执行了,而不需要重新运行整个任务。
对错误数据视而不见
worker 和 master 的节点失效,以及对应的恢复机制,通常都是来自于硬件问题。但是在海量数据处理的情况下,比如在TB乃至PB级别的数据下,还会经常遇到“脏数据”的问题。
这些数据,可能是日志采集的时候就出错了,也可能是一个非常罕见的边界情况(edge-case),我们的Map和Reduce 函数正好处理不了。甚至有可能,只是简单的硬盘硬件的问题带来的错误数据。
那么,对于这些异常数据,我们固然可以不断debug,一一修正。但是这么做,大多数时候都是划不来的,因为很可能为了一条数据记录,由于Map函数处理不了,你就要重新扫描几TB的数据。
所以,MapReduce不仅为节点故障提供了容错机制,对于这些极少数的数据异常带来的问题,也提供了一个容错机制。MapReduce会记录Map或者Reduce函数,运行出错的具体数据的行号,如果同样行号的数据执行重试还是出错,它就会跳过这一行的数据。如果这样的数据行数在总体数据中的比例很小,那么整个MapReduce程序会忽视这些错误,仍然执行完成。毕竟,一个URL被访问了1万次还是9999次,对于搜素引擎的排序结果不会有什么影响。
MapReduce 的性能优化
MapReduce集群里的硬件配置方面的最大瓶颈,自然和 GFS 也一样——网络带宽。
把程序搬到数据那儿去
既然网络带宽是瓶颈,那么优化的办法自然就是尽可能减少需要通过网络传输的数据。在MapReduce这个框架下,就是在分配map任务的时候,根据需要读取的数据在哪里进行分配。由于GFS是知道每一个Block 的数据是在哪台服务器上的。而MapReduce,会找到同样服务器上的worker,来分配对应的map 任务。如果那台服务器上没有,那么它就会找离这台服务器最近的、有worker 的服务器,来分配对应的任务。
除此之外,由于MapReduce程序的代码往往很小,可能只有几百KB或者几MB,但是每个map需要读取的一个分片的数据是64MB大小。这样,我们通过把要执行的MapReduce程序,复制到数据所在的服务器上,就不用多花那10倍乃至100倍的网络传输量了。
通过Combiner减少网络数据传输
除了Map函数需要读取输入的分片数据之外,Reduce所在的worker去抓取中间数据,一样也需要通过网络。那么要在这里减少网络传输,最简单的办法,就是尽可能让中间数据的数据量小一些。
在MapReduce的框架里,MapReduce允许开发者自己定义一个Combiner 函数。这个Combiner函数,会对在同一个服务器上所有map 输出的结果运行一次,然后进行数据合并。实际上,不仅是同一个Map函数的输出可以合并,同一台服务器上多个Map的输出,我们都可以合并。反正它们都在一台机器上,合并只需要本地的硬盘读写和CPU,并不需要我们最紧缺的网络资源。以域名的访问次数为例,它的数据分布一定有很强的头部效应,少量20%的域名可能占了80%的访问记录。这样一合并,我们要传输的数据至少可以减少60%。如果考虑一台 16 核的服务器,有16个map的worker运行,应该还能再减少80%以上。这样,通过一个中间的Combiner,我们要传输的数据一下子就下降了两个数量级,大大缓解了网络传输的压力。
注意:不是所有场景都能预聚合处理的,比如求中位数。
MapReduce 的 debug 信息
虽然我们一直说,我们希望MapReduce让开发者意识不到分布式的存在。但是归根到底,map和reduce的任务都是在分布式集群上运行的,这个就给我们对程序debug 带来了很大的挑战。无论是通过debugger做单步调试,还是打印出日志来看程序执行的情况,都不太可行。
所以,MapReduce也为开发者贴心地提供了三个办法来解决这个问题:
-
第一个,是提供一个单机运行的MapReduce的库,这个库在接收到MapReduce任务之后,会在本地执行完成map和reduce的任务。这样,你就可以通过拿一点小数据,在本地调试你的MapReduce任务了,无论是debugger还是打日志,都行得通。
-
第二个,是在master 里面内嵌了一个HTTP服务器,然后把master的各种状态展示出来给开发者看到。这样一来,你就可以看到有多少个任务执行完了,有多少任务还在执行过程中,它处理了多少输入数据,有多少中间数据,有多少输出的结果数据,以及任务完成的百分比等等。同样的,里面还有每一个任务的日志信息。
另外通过这个HTTP 服务器,你还可以看到具体是哪一个worker里的任务失败了,对应的错误日志是什么。这样,你就可以快速在线上定位你的程序出了什么错,是在哪台服务器上。
-
最后一个,是MapReduce框架里提供了一个计数器(counter)的机制。作为开发者,你可以自己定义几个计数器,然后在Map 和Reduce的函数里去调用这个计数器进行自增。所有 map 和reduce的计数器都会汇总到master节点上,通过上面的HTTP服务器里展现出来。
比如,你就可以利用这个计数器,去统计有多少输入日志的格式和预期的不一样。如果比例太高,那么多半你的程序就有Bug,没有兼容所有合法的日志。
遗憾与缺陷
尽管MapReduce框架已经作出了很多努力,但是今天来看,整个计算框架的缺陷还是不少的。
主要的缺陷有两个:
-
第一个是还没有100%做到让用户意识不到“分布式”的存在,无论是Combiner 还是Partitioner,都是让开发者意识到,它面对的还是分布式的数据和分布式的程序。
-
第二个是性能仍然不太理想,这体现在两个方面:
-
一个是每个任务都有比较大的overhead,都需要预先把程序复制到各个 worker 节点,然后启动进程;
-
另一个是所有的中间数据都要读写多次硬盘。map 的输出结果要写到硬盘上,reduce抓取数据排序合并之后,也要先写到本地硬盘上再进行读取,所以快不起来。
-
Hadoop MapReduce核心设计
Hadoop MapReduce 参考了上面的相关内容,其设计和落地在企业落地后,也是有不断优化迭代的。
和MapReduce的论文不太一样。在Hadoop1.0实现里,每一个MapReduce的任务并没有一个独立的master进程,而是直接让调度系统承担了所有的worker 的master 的角色,这就是Hadoop1.0里的 JobTracker。在Hadoop1.0里,MapReduce论文里面的worker就是TaskTracker,用来执行map 和 reduce的任务。而分配任务,以及和TaskTracker沟通任务的执行情况,都由单一的JobTracker 来负责。
这个设计,也导致了只要服务器数量一多,JobTracker的负载就会很重。所以早年间,单个Hadoop 集群能够承载的服务器上限,被卡在了4000台。而且JobTracker也成为了整个Hadoop 系统很脆弱的“单点”。
在Hadoop 2.0,Hadoop社区把JobTracker的角色,拆分成了进行任务调度的Resource Mananger,以及监控单个MapReduce任务执行的Application Master,回到了和MapReduce论文相同的架构。
MRv1
第一代MapReduce计算框架,由两部分组成:编程模型(programming model)和运行时环境(runtime environment)。
基本编程模型是将问题抽象成Map和Reduce两个阶段。
-
Map阶段将输入数据解析成 key/value,迭代调用 map() 函数处理后,再以 key/value 的形式输出到本地目录;
-
Reduce 阶段则将 key 相同的 value 进行 reduce 处理,并将最终结果写到 HDFS 上。
运行时环境由两类服务组成:JobTracker 和 TaskTracker。
-
JobTracker 负责资源管理和所有作业的控制
-
TaskTracker 负责接收来自JobTracker的命令并执行它
YARN/MRv2
针对MRv1中的MapReduce在扩展性和多框架支持方面的不足,提出了全新的资源管理框架YARN(Yet Another Resource Negotiator)。
将JobTracker中的资源管理和作业控制功能分开,分别由两个不同进程ResourceManager和ApplicationMaster实现。
-
ResourceManager负责所有应用程序的资源分配
-
ApplicationMaster仅负责管理一个应用程序
总结
今天我们梳理了MapReduce的设计与实现的思路,后面我们深入源码去看看MapReduce有哪些有意思的东西。