MIT 6.824 Lab 1 MapReduce

MapReduce

在这里插入图片描述

目标

根据论文所说明的,有MASTER和WORKER两类工作节点,以下实现大都按照论文所说的实现,但是在对MASTER的实现上有所改动:

  • MASTER向WORKER发送心跳检测,这里改为了对分配出去的任务进行超时监控。

MASTER:

  • 接收MapReduce任务(需要处理的文件),并生成对应的Map任务。
  • 接受WORKER的任务分配请求,按需给WORKER分配任务(Map or Reduce)。
  • 对分配给WORKER的任务(Map or Reduce)进行超时监控。
  • 当Map任务完成时,自动创建对应的Reduce任务。
  • 当Reduce任务完成时,结束所用工作,退出程序。

WORKER:

  • 向MASTER提交任务分配请求。
  • 根据MASTER的任务分配请求进行判定处理(Map、Reduce、Waite、Exit)。
  • 当任务(Map or Reduce)完成时,通知MASTER。

实现

ProcessStatus

用来表示Master当前的状态,有如下四种:

  • 接受完MapReduce文件,处于Maping状态。
  • 已经成功处理完Map任务,处于Reducing状态。
  • 所有的Map和Reduce任务都已经完成,处于Done状态。
  • 当所有的Map(or Reduce)任务都已经分配出去,但是还没有接受到所有的成功反馈,处于Waiting状态。
type MasterStatu intconst (Maping   MasterStatu = 0Reducing MasterStatu = 1Done     MasterStatu = 2Waiting  MasterStatu = 3
)

RPC定义

提交任务分配请求(GetOneJob)

Request

type GetOneJobRequest struct {
}

Response

  • 任务的类型,可以根据MasterStatu来判断,有四种:Map,Reduce,Waite,Done。

  • 如果是Map任务:

    需要进行Map的文件路径。

    Master给当前Worker命名的编号,为了给存储中间键值的文件命名。

    后续有多少个Reduce任务,为了给存储分散中间键值的文件命名。

  • 如果是Reduce任务:

    存储哪些Map任务的Worker成功了,为了寻找Reduce任务的文件。

    当前Worker分配的是第几个Reduce任务。

type GetOneJobResponce struct {JobType MasterStatuFilePath     stringWorkerNumber intNReduce      intPathList     []intReduceNumber int
}

提交任务完成记录(JobDone)

Request

  • 当前完成的是什么类型的任务,Map or Reduce。

  • 如果是Map任务:

    当前完成的Map任务的文件地址。

    当前Worker的编号。

  • 如果是Reduce任务:
    当前完成的Reduce任务的编号。

type JobDoneRequest struct {JobType MasterStatuFilePath stringWorkerNumber int
}

Response

type JobDoneResponse struct {
}

Master

Master的结构体描述,有当前状态、Map任务、Reduce任务等。

type Master struct {Mu           sync.MutexStatu        MasterStatuNReduce      intNMapJob      intWorkerNumber int // 进行reduce任务的时候,给worker编号,为了存放intermediate信息MapJob     map[string]bool // 没有开始做的MapJob,按照文件拆分MapJobDone map[string]int  // 已经做完的MapJob,存放(key = file, value = worknumber)MapJobList []int           // 存放所有的worknumberReduceJob     map[int]bool // 没有开始做的ReduceJob,按照 0 ~ nReduce 编号ReduceJobDone map[int]bool // 已经做完的ReduceJob
}

GetOneJob

根据Rpc的定义,应该有一个分配任务的函数,这里分配任务也应该是按照Master当前的状态去分配:

func (m *Master) GetOneJob(req *GetOneJobRequest, resp *GetOneJobResponce) error {m.Mu.Lock()defer m.Mu.Unlock()switch m.Statu {case Maping:JobType, FilePath, WorkerNumber := m.AssignMapJob()resp.JobType = JobTyperesp.FilePath = FilePathresp.WorkerNumber = WorkerNumberresp.NReduce = m.NReducecase Reducing:JobType, PathList, ReduceNumber := m.AssignReduceJob()resp.JobType = JobTyperesp.PathList = PathListresp.ReduceNumber = ReduceNumbercase Done:resp.JobType = Done}return nil
}
  • AssignMapJob

    如果没有MapJob说明,所有任务都在执行,并且还有任务没有完成,所以Worker应该是要进入Waite,否则一定有Map任务分配给Worker,任取一个MapJob出来即可,对应的要开一个协程来对超时的任务进行重加载。

    func (m *Master) AssignMapJob() (MasterStatu, string, int) {if len(m.MapJob) == 0 {return Waiting, "", 0}// 找到map任务JobType := MapingFilePath := ""for k := range m.MapJob {FilePath = kbreak}delete(m.MapJob, FilePath)go MapJobTLE(FilePath, m)return JobType, FilePath, m.GetWorkerNumber()
    }
    
  • AssignReduceJob

    具体逻辑同AssignMapJob。

    func (m *Master) AssignReduceJob() (MasterStatu, []int, int) {if len(m.ReduceJob) == 0 {return Waiting, []int{}, 0}// 一定找到一个任务ReduceNumber := 0for k := range m.ReduceJob {ReduceNumber = k}delete(m.ReduceJob, ReduceNumber)go ReduceJobTLE(ReduceNumber, m)return Reducing, m.MapJobList, ReduceNumber
    }
    

JobDone

这里通知的只有两种状态:Map or Reduce。由于是并发,所以只要记录第一个完成该任务的信息即可。

当所有Map任务都完成时,记得初始化Reduce任务,并切换进入Reducing状态。

当所有Reduce任务都完成时,同样也是切换进入Done状态。

func (m *Master) JobDone(req *JobDoneRequest, resp *JobDoneResponse) error {m.Mu.Lock()defer m.Mu.Unlock()switch req.JobType {case Maping:_, done := m.MapJobDone[req.FilePath]if !done {m.MapJobDone[req.FilePath] = req.WorkerNumberdelete(m.MapJob, req.FilePath)}if len(m.MapJobDone) == m.NMapJob {m.initReduceJob()}case Reducing:m.ReduceJobDone[req.WorkerNumber] = truedelete(m.ReduceJob, req.WorkerNumber)if len(m.ReduceJobDone) == m.NReduce {m.Statu = Done}}return nil
}

Worker

Worker

Worker的入口函数,具体作用就是不断地向Master发送Rpc请求,去获取任务。

DoMap和DoReduce的实现可以直接参照给定的参考程序,基本逻辑都是差不多的,一些区别:

  • Map的中间键值需要通过ihash函数将其分为nReduce份存储。
  • Reduce应该是从多份文件中读到中间键值再进行操作。
func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) {for {resp := &GetOneJobResponce{}ok := call("Master.GetOneJob", &GetOneJobRequest{}, resp)if !ok || resp.JobType == Done { // master已经关闭,或者任务执行完了return}// fmt.Printf("%v\n", resp)switch resp.JobType {case Maping:DoMap(mapf, resp.FilePath, resp.WorkerNumber, resp.NReduce)case Reducing:DoReduce(reducef, resp.PathList, resp.ReduceNumber)case Waiting:time.Sleep(5 * time.Second)}}
}

成果

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/313606.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

大家在寻找的高级程序员到底是什么样子的?

你好,我是Z哥。这篇文章主题很简单,就是一个很常见的话题“什么是高级程序员?”。文章稍微长了些,但是很容易阅读。我们的中国文化,对“面子”看的特别重,所以你会发现身边到处都是高级XXX,听着…

优秀的程序员是那种过单行线马路都要往两边看的人

最近一周帮我以前一个同事推荐工作,顺便了解下行情,我这个同事我感觉还行,技术不说有多好,但是往年绝对不至于简历筛选时被刷掉那种,最先开始推给了一个我比较信任的HR手里,她兼职猎头,推给这个…

【为自己相亲】单身小姐姐你在哪里,我是书豪,我在等你

笔者简介Introduction书豪:【人工智能爱好者社区】公众号负责人《R数据科学实战:工具详解与案例分析》书籍作者。 你没看错这是书豪在给自己寻觅良缘如果你有,或者身边的朋友有兴趣请与我联系基本信息 出生日期:1995年5月身高&am…

知道的越多,越感觉自己渺小

作者:猛哥,关注技术和人文发展的程序员,架构师社区合伙人芝诺说:“人的知识就像一个圆,圆圈外是未知的,圆圈内是已知的,你知道的越多,你的圆圈就会越大。圆的周长也就越大&#xff0…

.NET Core 3.0 System.Text.Json 和 Newtonsoft.Json 行为不一致问题及解决办法

行为不一致.NET Core 3.0 新出了个内置的 JSON 库, 全名叫做尼古拉斯 System.Text.Json - 性能更高占用内存更少这都不是事...对我来说, 很多或大或小的项目能少个第三方依赖项, 还能规避多个依赖项的依赖 Newtonsoft.Json 版本不一致的问题, 是件极美的事情.但是, 结果总不是不…

Java9 新特性

在介绍 java9 之前,我们先来看看java成立到现在的所有版本。 1990年初,最初被命名为Oak;1995年5月23日,Java语言诞生;1996年1月,第一个JDK-JDK1.0诞生;1996年4月,10个最主要的操作系…

深入探究Kubernetes - 初识容器

♥2019年8月28星期三第47篇原创引言最近Kubernetes比较火,新技术快速火起来,一定有它强大的优势,Hr反馈,招聘时会Kubernetes的很少,风口上的Kubernetes一起学学?扫盲贴,参考《Kubernetes进阶实践…

Java11 新特性

Java 11新特性的详细解释。JDK 11已经于 2018年9月25日正式发布,那么Java 11主要包含哪些新特性呢? JDK 11是Java SE 11平台版本11的开源参考实现,由JSR 384在Java Community Process中指定。 阿里巴巴是中国唯一的JCP委员会成员公司&#x…

福爆 | 博客升级 .NET Core 3.0 又踩一坑

点击上方蓝字关注“汪宇杰博客”导语昨天刚发了一篇《生产大爆炸发生问题的是已经被删除的博客文章,正常情况下,这些不存在的文章会直接显示自定义的404页面,但实际上产生了500异常。日志如下:2019-09-26 00:11:50.8405|RD00155DB…

Sping5——响应式编程

1、响应式编程基础 1.1、什么是响应式编程? 响应式编程是一种面向数据流和变化传播的编程范式。 使用它可以在编程语言中很方便地表达静态或动态的数据流,而相关的计算模型会自动将变化的值通过数据流进行传播。我们可以使用声明的方式构建应用程序的能…

.NET Core使用NPOI导出复杂Word详解

最近使用NPOI做了个导出Word文档的功能,关于使用.NET Core 导出Word文档的方式有很多。最终我为什么选择了NPOI来实现了这个功能,首先是NPOI是一个开源,免费且容易上手的第三方框架(并且现在已支持.NET Core,GitHub源码…

Spring Boot 2.0新特性

Spring Boot依赖于Spring,而Spring Cloud又依赖于Spring Boot,因此Spring Boot2.0的发布正式整合了Spring5.0的很多特性,同样后面Spring Cloud最新版本的发布也需要整合最新的Spring Boot2.0内容。 一、新版本特性 1.1,基于 Jav…

为了不让代码“作恶”,能否将道德条款纳入开源许可证?

随着特朗普政府反移民政策的执行,成千上万的移民儿童与父母分离,美国移民和海关执法局(ICE)也因此成为众矢之的。所以,当开源开发者 Seth Vargo 发现前东家 —— Chef 公司最近与 ICE 签订了合同后,进行删库…

dump获取与分析

一、dump基本概念 在故障定位(尤其是out of memory)和性能分析的时候,经常会用到一些文件来帮助我们排除代码问题。这些文件记录了JVM运行期间的内存占用、线程执行等情况,这就是我们常说的dump文件。常用的有heap dump和thread dump(也叫ja…

dump分析死锁

1、制造死锁场景 一段死锁程序 public class DeadLock {public static Object lockA new Object();public static Object lockB new Object();public static void main(String[] args) {new ThreadA().start();new ThreadB().start();} }class ThreadA extends Thread {Ove…

用 C# 来守护 Python 进程

背景目前我主要负责的一个项目是一个 C/S 架构的客户端开发,前端主要是通过 WPF 相关技术来实现,后端是通过 Python 来实现,前后端的数据通信则是通过 MQ 的方式来进行处理。由于 Python 进程是需要依赖客户端进程来运行,为了保证…

针对媒体不实报道误导大众--抹黑C#工资垫底

近注意到一些媒体故意抹黑C# 工资垫底,参见 https://www.toutiao.com/i6741889572931633668/:通过搜索引擎搜索《编程语言薪酬排行:Python薪资最高,Java第二,C# 垫底》:早在2018年就出现这样的标题内容,还是CSDN公众号…

程序员你写的代码,被人挖出了黑产

事件经过看了微博上发表转发1000 、点赞1000次的吐槽陕西省的普通话成绩查询网站代码的微博,后来知乎上又有20万的阅读量这个话题的提问。最终结案这并不是真的陕西省普通话成绩查询网的网址,只不过是和官方查询一样的界面,李鬼”网站&#x…

Java垃圾回收日志解析

1.开启垃圾回收日志 在运行一个java程序时可以在命令行中加入相应的JVM垃圾回收参数,获取程序运行时详细的垃圾回收日志信息。以下是一些大概的参数: -XX:PrintGC与-verbose:gc 这两个命令效果都是一样,打印最基本的回收信息 -XX:PrintGCDe…

感谢有你们,架构师修行之路!

感谢有你们转眼马上就十月一了,听说今年的阵势非常强大,菜菜虽然身在北京,但是可能也目睹不了这个激动时刻了。自从2018年年底决定开始写公众号以来,几乎每个周末都在构思文章,撰写文章。关注公众号的老粉丝应该知道&a…