MIT6.5840 Lab 1: MapReduce(6.824)

结果

介绍
在本实验中,您将构建一个MapReduce系统。您将实现一个调用应用程序Map和Reduce函数并处理文件读写的工作进程,以及一个将任务分发给工作进程并处理失败的工作进程的协调进程。您将构建类似于MapReduce论文的东西。(注意:本实验使用“coordinator”代替论文中的“master”。)

mrsequential.go的逻辑就是从写好的代码(例如mrapps/wc.go)编译成的动态库(wc.so)中提取出map和reduce两个函数,再利用map来处理数据得到中间结果,reduce拿中间结果进一步处理得到最终结果。

现在要在分布式的环境下执行这个过程,也就是通过协调进程去把任务分发到worker上,这个任务可能是map可能是reduce,

Your Job (moderate/hard)

实现一个分布式MapReduce,它由两个程序组成,协调器和工作器。只有一个协调进程和一个或多个并行执行的工作进程。在一个真实的系统中,工人会在一堆不同的机器上运行,但在这个实验中,你将在一台机器上运行它们。工作人员将通过RPC与协调器对话。每个工作进程将在一个循环中向协调器请求一个任务,从一个或多个文件中读取任务的输入,执行任务,将任务的输出写入一个或多个文件,然后再次向协调器请求一个新任务。协调器应该注意到,如果一个工人没有在合理的时间内完成任务(在本实验中,使用10秒),并将相同的任务交给另一个工人。协调器和工作器的“主”例程位于main/mrcoordinato.go 和 main/mrworker.go不要更改这些文件。您应该将您的实现放在 mr/coordinator.go, mr/worker.go, and mr/rpc.go

实验要求:

  1. nReduce对应的Reduce数及输出的文件数,也要作为MakeCoordinator()方法的参数;
  2. Reduce任务的输出文件的命名为mr-out-X,这个X就是来自nReduce;
  3. mr-out-X的输出有个格式要求,参照main/mrsequential.go,"%v %v" 格式;
  4. Map输出的中间值要放到当前目录的文件中,Reduce任务从这些文件来读取;
  5. 当Coordinator.go的Done()方法返回true,MapReduce的任务就完成了;
  6. 当一个任务完成,对应的worker就应该终止,这个终止的标志可以来自于call()方法,若它去给Master发送请求,得到终止的回应,那么对应的worker进程就可以结束了。

实验提示:

  1. 修改mr/worker.go的Worker(),发送RPC请求给coordinator要任务。然后修改Coordinator将还没有被Map执行的文件作为响应返回给worker。然后worker读取文件并执行Map方法函数,就如示例文件 mrsequential.go;
  2. Map和Reduce函数加载来自插件wc.go,如果改了这些东西需要使用命令重新编译生成新的.so文件,尽量不要动这些东西;
  3. 中间文件的命名方式推荐为mr-X-Y,X对应Map任务Id,Y对应的Reduce任务Id;
  4. 为顺利存储中间数据,采用json,以便读取;
  5. worker 的 map 部分可以使用ihash(key)函数(在worker.go 中)为给定的键选择 reduce 任务;
  6. Coordinator作为一个 RPC 服务器,将是并发的;不要忘记锁定共享数据;
  7. 在所有Map任务完成后,Reduce任务才会开始,所以对应的worker可能会需要等待,那么可以使用time.sleep()或其他方法;
  8. worker可能挂掉或其他原因崩了,Coordinator在这个实验中等待10s,超过时间将会分配给其他的worker;
  9. 您可以使用 ioutil.TempFile 创建一个临时文件,并使用 os.Rename 对其进行原子重命名;
  10. test-mr.sh 运行子目录 mr-tmp 中的所有进程,因此如果出现问题并且您想查看中间文件或输出文件,请查看那里。您可以修改 test-mr.sh 以在测试失败后退出,这样脚本就不会继续测试(并覆盖输出文件)。
RPC通信 

项目中需要使用rpc的地方是worker向coordinator索要任务或发送任务完成情况,先探究rpc是如何通信的,在mr/coordinato.go中,注册rpc的函数为

func (c *Coordinator) server() {rpc.Register(c)rpc.HandleHTTP()//l, e := net.Listen("tcp", ":1234")sockname := coordinatorSock()os.Remove(sockname)l, e := net.Listen("unix", sockname)if e != nil {log.Fatal("listen error:", e)}go http.Serve(l, nil)
}

这里使用的是unix套接字,它用于本地进程之间的通信,通常比网络套接字更高效,因为数据不需要通过网络协议栈,在同一台机器上的进程之间通信,Worker 进程可以通过套接字文件连接到 Coordinator 进行 RPC 调用,使用 HTTP 协议来组织和传递数据。

整体流程概括为:Worker 的 RPC 请求通过 HTTP 协议发送->请求通过 Unix 套接字传输到 Coordinator->Coordinator 的 HTTP 服务处理请求,并返回响应。

在worker中调用rpc的方法如下,传入rpc方法名,参数和返回值。

func call(rpcname string, args interface{}, reply interface{}) bool {// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")sockname := coordinatorSock()c, err := rpc.DialHTTP("unix", sockname)if err != nil {log.Fatal("dialing:", err)}defer c.Close()err = c.Call(rpcname, args, reply)if err == nil {return true}fmt.Println(err)return false
}
worker部分

work的工作就是处理map任务和reduce任务,并在处理完成后反馈结果,那么在mr/worker.go中有,其中executeMapTask和executeReduceTask分别用来处理map和reduce任务,处理完成后会调用notifyTaskComplete反馈任务结果,函数的实现可以参考mrsequential.go。

func Worker(mapf func(string, string) []KeyValue,reducef func(string, []string) string) {for {task := requestTask()switch task.TaskType {case MapTask:executeMapTask(task, mapf)case ReduceTask:executeReduceTask(task, reducef)case NoTask:log.Println("No task available, sleeping...")time.Sleep(1 * time.Second)}}}func executeMapTask(task *TaskRep, mapf func(string, string) []KeyValue) {filename := task.FileNamefile, err := os.Open(filename)if err != nil {log.Fatalf("cannot open %v", filename)}content, err := ioutil.ReadAll(file)if err != nil {log.Fatalf("cannot read %v", filename)}file.Close()kva := mapf(filename, string(content))intermediate := make(map[int][]KeyValue)for _, kv := range kva {reduceTaskNum := ihash(kv.Key) % task.ReduceCountintermediate[reduceTaskNum] = append(intermediate[reduceTaskNum], kv)}for reduceTaskNum, kvs := range intermediate {tempFile, _ := ioutil.TempFile("", "mr-temp-*")enc := json.NewEncoder(tempFile)for _, kv := range kvs {enc.Encode(&kv)}tempFile.Close()finalName := fmt.Sprintf("mr-%d-%d", task.TaskID, reduceTaskNum)os.Rename(tempFile.Name(), finalName)}notifyTaskComplete(task.TaskID, MapTask)
}func executeReduceTask(task *TaskRep, reducef func(string, []string) string) {intermediate := make(map[string][]string)// 遍历所有 MapTask 的任务 IDfor mapTaskID := 0; mapTaskID < task.MapTaskCount; mapTaskID++ {filename := fmt.Sprintf("mr-%d-%d", mapTaskID, task.TaskID)file, err := os.Open(filename)if err != nil {// 文件不存在可能是因为 MapTask 失败,忽略continue}// 解码中间文件内容dec := json.NewDecoder(file)for {var kv KeyValueif err := dec.Decode(&kv); err != nil {break}intermediate[kv.Key] = append(intermediate[kv.Key], kv.Value)}file.Close()}// 生成最终输出文件outputFile, _ := os.Create(fmt.Sprintf("mr-out-%d", task.TaskID))for key, values := range intermediate {result := reducef(key, values)fmt.Fprintf(outputFile, "%v %v\n", key, result)}outputFile.Close()notifyTaskComplete(task.TaskID, ReduceTask)
}func notifyTaskComplete(taskID int, taskType int) {req := TaskCompleteReq{TaskID: taskID, TaskType: taskType}reply := TaskCompleteRep{}call("Coordinator.TaskComplete", &req, &reply)
}
coordinator部分

对于coordinator.go,首先需要定义任务的种类,这里想到worker要知道是map还是reduce任务,要处理的文件名称,并且写入文件时需要有map和reduce的id,处理时间需要在10s内,那么定义如下Task结构体,任务的类型和状态都用枚举数,任务在coordinactor实例初始化的时候就塞到实例的...task字段内,这里要注意输入的file有多少个,就有多少个map任务,而reduce任务的数量和nReduce有关。任务超时的检查我是用轮询机制,每隔一秒轮询所有任务如果任务状态为正在运行并且时间超时那么把它状态初始化。 

结构体中的字段并不是一下就能全部想出来,也是需要在写处理函数的过程中看需要哪些字段才决定。

type Task struct {TaskType    intFileName    stringTaskID      intReduceCount intStatus      intStartTime   time.Time
}const (MapTask = iotaReduceTaskNoTask
)const (Pending   = iota //任务已准备好进行处理,并将由一个空闲的工作器接收Active           //任务正在被工作器处理Retry            //工作器无法处理任务,任务正在等待将来重试Completed        //任务已成功处理
)type Coordinator struct {mu          sync.MutexmapTasks    []Task // 所有 Map 任务reduceTasks []Task // 所有 Reduce 任务nReduce     int    // Reduce 任务数量
}func MakeCoordinator(files []string, nReduce int) *Coordinator {c := Coordinator{nReduce:     nReduce,mapTasks:    make([]Task, len(files)),reduceTasks: make([]Task, nReduce),}for i, file := range files {c.mapTasks[i] = Task{TaskType: MapTask, FileName: file, TaskID: i, Status: Pending}}for i := 0; i < nReduce; i++ {c.reduceTasks[i] = Task{TaskType: ReduceTask, TaskID: i, Status: Pending}}// Your code here.c.server()go c.monitorTimeouts()return &c
}func (c *Coordinator) monitorTimeouts() {for {time.Sleep(time.Second)c.mu.Lock()for i := range c.mapTasks {if c.mapTasks[i].Status == Active && time.Since(c.mapTasks[i].StartTime) > TaskTimeout {c.mapTasks[i].Status = Pending}}for i := range c.reduceTasks {if c.reduceTasks[i].Status == Active && time.Since(c.reduceTasks[i].StartTime) > TaskTimeout {c.reduceTasks[i].Status = Pending}}c.mu.Unlock()}
}

Done方法很简单,所有map任务和reduce任务都是已完成的状态就代表Done

func (c *Coordinator) Done() bool {ret := c.allMapTasksDone() && c.allReduceTasksDone()return ret
}func (c *Coordinator) allMapTasksDone() bool {for _, task := range c.mapTasks {if task.Status != Completed {return false}}return true
}func (c *Coordinator) allReduceTasksDone() bool {for _, task := range c.reduceTasks {if task.Status != Completed {return false}}return true
}

接下来是分发任务的逻辑和任务完成后的回调函数,分发任务注意map任务全部完成了才可以开始reduce任务

//分发任务
func (c *Coordinator) AssignTask(req *TaskReq, reply *TaskRep) error {c.mu.Lock()defer c.mu.Unlock()// 分配 Map 任务for i, task := range c.mapTasks {if task.Status == Pending {reply.TaskType = MapTaskreply.FileName = task.FileNamereply.TaskID = task.TaskIDreply.ReduceCount = c.nReducec.mapTasks[i].Status = Activec.mapTasks[i].StartTime = time.Now()return nil}}// 检查是否可以分配 Reduce 任务if c.allMapTasksDone() {for i, task := range c.reduceTasks {if task.Status == Pending {reply.TaskType = ReduceTaskreply.TaskID = task.TaskIDreply.ReduceCount = c.nReducereply.MapTaskCount = len(c.mapTasks)c.reduceTasks[i].Status = Activec.reduceTasks[i].StartTime = time.Now()return nil}}}// 没有任务可分配reply.TaskType = NoTaskreturn nil
}//worker完成任务后会回调这个函数
func (c *Coordinator) TaskComplete(req *TaskCompleteReq, reply *TaskCompleteRep) error {c.mu.Lock()defer c.mu.Unlock()if req.TaskType == MapTask {c.mapTasks[req.TaskID].Status = Completed} else if req.TaskType == ReduceTask {c.reduceTasks[req.TaskID].Status = Completed}reply.Success = truereturn nil
}
rpc部分

在rpc.go中,定义worker要调用的rpc方法(要任务,报告任务完成情况)的参数和返回值就行

type TaskReq struct {
}type TaskRep struct {TaskType     int    // 任务类型:Map、ReduceFileName     string // Map 任务的输入文件名TaskID       int    // 任务编号MapTaskCount int    //map任务数量ReduceCount  int    // 传入的reducer的数量,用于hashStatus       intStartTime    time.Time
}type TaskCompleteReq struct {TaskID   intTaskType int
}type TaskCompleteRep struct {Success bool
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/59869.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

nfs服务器--RHCE

一&#xff0c;简介 NFS&#xff08;Network File System&#xff0c;网络文件系统&#xff09;是FreeBSD支持的文件系统中的一种&#xff0c;它允许网络中的计 算机&#xff08;不同的计算机、不同的操作系统&#xff09;之间通过TCP/IP网络共享资源&#xff0c;主要在unix系…

Uni-APP+Vue3+鸿蒙 开发菜鸟流程

参考文档 文档中心 运行和发行 | uni-app官网 AppGallery Connect DCloud开发者中心 环境要求 Vue3jdk 17 Java Downloads | Oracle 中国 【鸿蒙开发工具内置jdk17&#xff0c;本地不使用17会报jdk版本不一致问题】 开发工具 HBuilderDevEco Studio【目前只下载这一个就…

ubuntu 16.04 中 VS2019 跨平台开发环境配置

su 是 “switch user” 的缩写&#xff0c;表示从当前用户切换到另一个用户。 sudo 是 “superuser do” 的缩写&#xff0c;意为“以超级用户身份执行”。 apt 是 “Advanced Package Tool” 的缩写&#xff0c;Ubuntu中用于软件包管理的命令行工具。 1、为 root 用户设置密码…

Java集合ConcurrentHashMap——针对实习面试

目录 Java集合ConcurrentHashMapConcurrentHashMap的特性是什么&#xff1f;HashMap和ConcurrentHashMap的区别&#xff1f;说说ConcurrentHashMap的底层实现 Java集合ConcurrentHashMap ConcurrentHashMap的特性是什么&#xff1f; 线程安全性 多线程并发读写安全&#xff1a…

游戏引擎学习第16天

视频参考:https://www.bilibili.com/video/BV1mEUCY8EiC/ 这些字幕讨论了编译器警告的概念以及如何在编译过程中启用和处理警告。以下是字幕的内容摘要&#xff1a; 警告的定义&#xff1a;警告是编译器用来告诉你某些地方可能存在问题&#xff0c;尽管编译器不强制要求你修复…

【题目3】C++类的设计——07年复试笔试题

【题目】07年C复试笔试真题 定义一个处理日期的类TDate&#xff0c;它有3个私有数据成员&#xff1a;Month,Day,Year和若干共有成员函数&#xff0c;实现如下要求[附条件解读] ①构造函数重载→创建无参构造函数有参构造函数 ②成员函数设置缺省参数→与④一同可用set()在类中实…

【STL】set,multiset,map,multimap的介绍以及使用

关联式容器 在C的STL中包含序列式容器和关联式容器 1.关联式容器&#xff1a;它里面存储的是元素本身&#xff0c;其底层是线性序列的数据结构&#xff0c;比如&#xff1a;vector&#xff0c;list&#xff0c;deque&#xff0c;forward_list(C11)等 2.关联式容器里面储存的…

VUE+SPRINGBOOT实现邮箱注册、重置密码、登录功能

随着互联网的发展&#xff0c;网站用户的管理、触达、消息通知成为一个网站设计是否合理的重要标志。目前主流互联网公司都支持手机验证码注册、登录。但是手机短信作为服务端网站是需要付出运营商通信成本的&#xff0c;而邮箱的注册、登录、重置密码&#xff0c;无疑成为了这…

ARM(安谋) China处理器

0 Preface/Foreword 0.1 参考博客 Cortex-M23/M33与STAR-MC1星辰处理器 ARM China&#xff0c;2018年4月established&#xff0c;独立运行。 1 处理器类型 1.1 周易AIPU 1.2 STAR-MC1&#xff08;星辰处理器&#xff09; STAT-MC1&#xff0c;主要为满足AIOT应用性能、功…

拉取docker镜像应急方法

发现许多docker hub镜像网址速度也慢得发指啦&#xff0c;如果想速度快点&#xff0c;可以考虑买个按量计费的公有云服务器&#xff0c;用他们的内网镜像&#xff0c;然后再导出&#xff0c;然后传到本地。 开通服务器 可以考虑个开通最低配的&#xff0c;这里我用的是腾讯的…

论文解读《Personalized LoRA for Human-Centered Text Understanding》

引言&#xff1a;感觉这篇蛮不错的&#xff0c;读一读。学一学如何在 LLMs&#xff08;文中说的是PLMs&#xff0c;不过我觉得可以理解为 LLMs&#xff09; 的结构上做改进 ✅ NLP 研 2 选手的学习笔记 笔者简介&#xff1a;Wang Linyong&#xff0c;NPU&#xff0c;2023级&…

SpringBoot+React养老院管理系统 附带详细运行指导视频

文章目录 一、项目演示二、项目介绍三、运行截图四、主要代码1.入住合同文件上传2.添加和修改套餐的代码3.查看入住记录代码 一、项目演示 项目演示地址&#xff1a; 视频地址 二、项目介绍 项目描述&#xff1a;这是一个基于SpringBootReact框架开发的养老院管理系统。首先…

【C++】红黑树封装map—set

1 .关联式容器 C中的map是标准模板库&#xff08;STL&#xff09;中的一种关联容器&#xff0c;它存储的是键值对&#xff08;key-value pairs&#xff09;&#xff0c;其中每个键都是唯一的。 键值对&#xff1a; 用来表示具有一一对应关系的一种结构&#xff0c;该结构中一…

药房智链:中药实验管理的供应链优化

摘要 随着信息技术在管理上越来越深入而广泛的应用&#xff0c;管理信息系统的实施在技术上已逐步成熟。本文介绍了中药实验管理系统的开发全过程。通过分析中药实验管理系统管理的不足&#xff0c;创建了一个计算机管理中药实验管理系统的方案。文章介绍了中药实验管理系统的系…

Unity学习---IL2CPP打包时可能遇到的问题

写这篇主要是怕自己之后打包的时候出问题不知道怎么搞&#xff0c;所以记录一下。 问题一&#xff1a;类型裁剪 IL2CPP打包后会自动对Unity工程的dll进行裁剪&#xff0c;将代码中没有引用到的类型裁剪掉。特别是通过反射等方式调用一些类的时候&#xff0c;很容易出问题。 …

商城小程序的流程渠道拓展

传统印象里&#xff0c;小程序的开发制作似乎很难&#xff0c;尤其是商城类型且功能体系完善的&#xff0c;事实也确实如此&#xff0c;没有较高的技术和成本投入或团队各个流程的专业人员合作&#xff0c;很难开发出来成品&#xff0c;或者质量较低。 当然对于大公司来说&…

Linux网络:守护进程

Linux网络&#xff1a;守护进程 会话进程组会话终端 守护进程setsiddaemon 在创建一个网络服务后&#xff0c;往往这个服务进程是一直运行的。但是对于大部分进程来说&#xff0c;如果退出终端&#xff0c;这个终端上创建的所有进程都会退出&#xff0c;这就导致进程的生命周期…

基于gradio+networkx库对图结构进行可视化展示

前言 在gradio框架下对蛋白质-蛋白质相互作用网络&#xff08;PPI网络&#xff09;进行可视化&#xff0c;并将其在网页前端进行展示。 方法 其实很简单 可以直接使用networkx画图后保存图片&#xff0c;然后使用Gradio框架的image组件进行展示即可。 但实际上gradio还配置…

【大数据学习 | HBASE高级】hive操作hbase

一般在查询hbase的数据的时候我们可以直接使用hbase的命令行或者是api进行查询就行了&#xff0c;但是在日常的计算过程中我们一般都不是为了查询&#xff0c;都是在查询的基础上进行二次计算&#xff0c;所以使用hbase的命令是没有办法进行数据计算的&#xff0c;并且对于hbas…

Python小游戏28——水果忍者

首先&#xff0c;你需要安装Pygame库。如果你还没有安装&#xff0c;可以使用以下命令进行安装&#xff1a; 【bash】 pip install pygame 《水果忍者》游戏代码&#xff1a; 【python】 import pygame import random import sys # 初始化Pygame pygame.init() # 设置屏幕尺寸 …