MIT6.5840 Lab 1: MapReduce(6.824)

结果

介绍
在本实验中,您将构建一个MapReduce系统。您将实现一个调用应用程序Map和Reduce函数并处理文件读写的工作进程,以及一个将任务分发给工作进程并处理失败的工作进程的协调进程。您将构建类似于MapReduce论文的东西。(注意:本实验使用“coordinator”代替论文中的“master”。)

mrsequential.go的逻辑就是从写好的代码(例如mrapps/wc.go)编译成的动态库(wc.so)中提取出map和reduce两个函数,再利用map来处理数据得到中间结果,reduce拿中间结果进一步处理得到最终结果。

现在要在分布式的环境下执行这个过程,也就是通过协调进程去把任务分发到worker上,这个任务可能是map可能是reduce,

Your Job (moderate/hard)

实现一个分布式MapReduce,它由两个程序组成,协调器和工作器。只有一个协调进程和一个或多个并行执行的工作进程。在一个真实的系统中,工人会在一堆不同的机器上运行,但在这个实验中,你将在一台机器上运行它们。工作人员将通过RPC与协调器对话。每个工作进程将在一个循环中向协调器请求一个任务,从一个或多个文件中读取任务的输入,执行任务,将任务的输出写入一个或多个文件,然后再次向协调器请求一个新任务。协调器应该注意到,如果一个工人没有在合理的时间内完成任务(在本实验中,使用10秒),并将相同的任务交给另一个工人。协调器和工作器的“主”例程位于main/mrcoordinato.go 和 main/mrworker.go不要更改这些文件。您应该将您的实现放在 mr/coordinator.go, mr/worker.go, and mr/rpc.go

实验要求:

  1. nReduce对应的Reduce数及输出的文件数,也要作为MakeCoordinator()方法的参数;
  2. Reduce任务的输出文件的命名为mr-out-X,这个X就是来自nReduce;
  3. mr-out-X的输出有个格式要求,参照main/mrsequential.go,"%v %v" 格式;
  4. Map输出的中间值要放到当前目录的文件中,Reduce任务从这些文件来读取;
  5. 当Coordinator.go的Done()方法返回true,MapReduce的任务就完成了;
  6. 当一个任务完成,对应的worker就应该终止,这个终止的标志可以来自于call()方法,若它去给Master发送请求,得到终止的回应,那么对应的worker进程就可以结束了。

实验提示:

  1. 修改mr/worker.go的Worker(),发送RPC请求给coordinator要任务。然后修改Coordinator将还没有被Map执行的文件作为响应返回给worker。然后worker读取文件并执行Map方法函数,就如示例文件 mrsequential.go;
  2. Map和Reduce函数加载来自插件wc.go,如果改了这些东西需要使用命令重新编译生成新的.so文件,尽量不要动这些东西;
  3. 中间文件的命名方式推荐为mr-X-Y,X对应Map任务Id,Y对应的Reduce任务Id;
  4. 为顺利存储中间数据,采用json,以便读取;
  5. worker 的 map 部分可以使用ihash(key)函数(在worker.go 中)为给定的键选择 reduce 任务;
  6. Coordinator作为一个 RPC 服务器,将是并发的;不要忘记锁定共享数据;
  7. 在所有Map任务完成后,Reduce任务才会开始,所以对应的worker可能会需要等待,那么可以使用time.sleep()或其他方法;
  8. worker可能挂掉或其他原因崩了,Coordinator在这个实验中等待10s,超过时间将会分配给其他的worker;
  9. 您可以使用 ioutil.TempFile 创建一个临时文件,并使用 os.Rename 对其进行原子重命名;
  10. test-mr.sh 运行子目录 mr-tmp 中的所有进程,因此如果出现问题并且您想查看中间文件或输出文件,请查看那里。您可以修改 test-mr.sh 以在测试失败后退出,这样脚本就不会继续测试(并覆盖输出文件)。
RPC通信 

项目中需要使用rpc的地方是worker向coordinator索要任务或发送任务完成情况,先探究rpc是如何通信的,在mr/coordinato.go中,注册rpc的函数为

func (c *Coordinator) server() {rpc.Register(c)rpc.HandleHTTP()//l, e := net.Listen("tcp", ":1234")sockname := coordinatorSock()os.Remove(sockname)l, e := net.Listen("unix", sockname)if e != nil {log.Fatal("listen error:", e)}go http.Serve(l, nil)
}

这里使用的是unix套接字,它用于本地进程之间的通信,通常比网络套接字更高效,因为数据不需要通过网络协议栈,在同一台机器上的进程之间通信,Worker 进程可以通过套接字文件连接到 Coordinator 进行 RPC 调用,使用 HTTP 协议来组织和传递数据。

整体流程概括为:Worker 的 RPC 请求通过 HTTP 协议发送->请求通过 Unix 套接字传输到 Coordinator->Coordinator 的 HTTP 服务处理请求,并返回响应。

在worker中调用rpc的方法如下,传入rpc方法名,参数和返回值。

func call(rpcname string, args interface{}, reply interface{}) bool {// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")sockname := coordinatorSock()c, err := rpc.DialHTTP("unix", sockname)if err != nil {log.Fatal("dialing:", err)}defer c.Close()err = c.Call(rpcname, args, reply)if err == nil {return true}fmt.Println(err)return false
}
worker部分

work的工作就是处理map任务和reduce任务,并在处理完成后反馈结果,那么在mr/worker.go中有,其中executeMapTask和executeReduceTask分别用来处理map和reduce任务,处理完成后会调用notifyTaskComplete反馈任务结果,函数的实现可以参考mrsequential.go。

func Worker(mapf func(string, string) []KeyValue,reducef func(string, []string) string) {for {task := requestTask()switch task.TaskType {case MapTask:executeMapTask(task, mapf)case ReduceTask:executeReduceTask(task, reducef)case NoTask:log.Println("No task available, sleeping...")time.Sleep(1 * time.Second)}}}func executeMapTask(task *TaskRep, mapf func(string, string) []KeyValue) {filename := task.FileNamefile, err := os.Open(filename)if err != nil {log.Fatalf("cannot open %v", filename)}content, err := ioutil.ReadAll(file)if err != nil {log.Fatalf("cannot read %v", filename)}file.Close()kva := mapf(filename, string(content))intermediate := make(map[int][]KeyValue)for _, kv := range kva {reduceTaskNum := ihash(kv.Key) % task.ReduceCountintermediate[reduceTaskNum] = append(intermediate[reduceTaskNum], kv)}for reduceTaskNum, kvs := range intermediate {tempFile, _ := ioutil.TempFile("", "mr-temp-*")enc := json.NewEncoder(tempFile)for _, kv := range kvs {enc.Encode(&kv)}tempFile.Close()finalName := fmt.Sprintf("mr-%d-%d", task.TaskID, reduceTaskNum)os.Rename(tempFile.Name(), finalName)}notifyTaskComplete(task.TaskID, MapTask)
}func executeReduceTask(task *TaskRep, reducef func(string, []string) string) {intermediate := make(map[string][]string)// 遍历所有 MapTask 的任务 IDfor mapTaskID := 0; mapTaskID < task.MapTaskCount; mapTaskID++ {filename := fmt.Sprintf("mr-%d-%d", mapTaskID, task.TaskID)file, err := os.Open(filename)if err != nil {// 文件不存在可能是因为 MapTask 失败,忽略continue}// 解码中间文件内容dec := json.NewDecoder(file)for {var kv KeyValueif err := dec.Decode(&kv); err != nil {break}intermediate[kv.Key] = append(intermediate[kv.Key], kv.Value)}file.Close()}// 生成最终输出文件outputFile, _ := os.Create(fmt.Sprintf("mr-out-%d", task.TaskID))for key, values := range intermediate {result := reducef(key, values)fmt.Fprintf(outputFile, "%v %v\n", key, result)}outputFile.Close()notifyTaskComplete(task.TaskID, ReduceTask)
}func notifyTaskComplete(taskID int, taskType int) {req := TaskCompleteReq{TaskID: taskID, TaskType: taskType}reply := TaskCompleteRep{}call("Coordinator.TaskComplete", &req, &reply)
}
coordinator部分

对于coordinator.go,首先需要定义任务的种类,这里想到worker要知道是map还是reduce任务,要处理的文件名称,并且写入文件时需要有map和reduce的id,处理时间需要在10s内,那么定义如下Task结构体,任务的类型和状态都用枚举数,任务在coordinactor实例初始化的时候就塞到实例的...task字段内,这里要注意输入的file有多少个,就有多少个map任务,而reduce任务的数量和nReduce有关。任务超时的检查我是用轮询机制,每隔一秒轮询所有任务如果任务状态为正在运行并且时间超时那么把它状态初始化。 

结构体中的字段并不是一下就能全部想出来,也是需要在写处理函数的过程中看需要哪些字段才决定。

type Task struct {TaskType    intFileName    stringTaskID      intReduceCount intStatus      intStartTime   time.Time
}const (MapTask = iotaReduceTaskNoTask
)const (Pending   = iota //任务已准备好进行处理,并将由一个空闲的工作器接收Active           //任务正在被工作器处理Retry            //工作器无法处理任务,任务正在等待将来重试Completed        //任务已成功处理
)type Coordinator struct {mu          sync.MutexmapTasks    []Task // 所有 Map 任务reduceTasks []Task // 所有 Reduce 任务nReduce     int    // Reduce 任务数量
}func MakeCoordinator(files []string, nReduce int) *Coordinator {c := Coordinator{nReduce:     nReduce,mapTasks:    make([]Task, len(files)),reduceTasks: make([]Task, nReduce),}for i, file := range files {c.mapTasks[i] = Task{TaskType: MapTask, FileName: file, TaskID: i, Status: Pending}}for i := 0; i < nReduce; i++ {c.reduceTasks[i] = Task{TaskType: ReduceTask, TaskID: i, Status: Pending}}// Your code here.c.server()go c.monitorTimeouts()return &c
}func (c *Coordinator) monitorTimeouts() {for {time.Sleep(time.Second)c.mu.Lock()for i := range c.mapTasks {if c.mapTasks[i].Status == Active && time.Since(c.mapTasks[i].StartTime) > TaskTimeout {c.mapTasks[i].Status = Pending}}for i := range c.reduceTasks {if c.reduceTasks[i].Status == Active && time.Since(c.reduceTasks[i].StartTime) > TaskTimeout {c.reduceTasks[i].Status = Pending}}c.mu.Unlock()}
}

Done方法很简单,所有map任务和reduce任务都是已完成的状态就代表Done

func (c *Coordinator) Done() bool {ret := c.allMapTasksDone() && c.allReduceTasksDone()return ret
}func (c *Coordinator) allMapTasksDone() bool {for _, task := range c.mapTasks {if task.Status != Completed {return false}}return true
}func (c *Coordinator) allReduceTasksDone() bool {for _, task := range c.reduceTasks {if task.Status != Completed {return false}}return true
}

接下来是分发任务的逻辑和任务完成后的回调函数,分发任务注意map任务全部完成了才可以开始reduce任务

//分发任务
func (c *Coordinator) AssignTask(req *TaskReq, reply *TaskRep) error {c.mu.Lock()defer c.mu.Unlock()// 分配 Map 任务for i, task := range c.mapTasks {if task.Status == Pending {reply.TaskType = MapTaskreply.FileName = task.FileNamereply.TaskID = task.TaskIDreply.ReduceCount = c.nReducec.mapTasks[i].Status = Activec.mapTasks[i].StartTime = time.Now()return nil}}// 检查是否可以分配 Reduce 任务if c.allMapTasksDone() {for i, task := range c.reduceTasks {if task.Status == Pending {reply.TaskType = ReduceTaskreply.TaskID = task.TaskIDreply.ReduceCount = c.nReducereply.MapTaskCount = len(c.mapTasks)c.reduceTasks[i].Status = Activec.reduceTasks[i].StartTime = time.Now()return nil}}}// 没有任务可分配reply.TaskType = NoTaskreturn nil
}//worker完成任务后会回调这个函数
func (c *Coordinator) TaskComplete(req *TaskCompleteReq, reply *TaskCompleteRep) error {c.mu.Lock()defer c.mu.Unlock()if req.TaskType == MapTask {c.mapTasks[req.TaskID].Status = Completed} else if req.TaskType == ReduceTask {c.reduceTasks[req.TaskID].Status = Completed}reply.Success = truereturn nil
}
rpc部分

在rpc.go中,定义worker要调用的rpc方法(要任务,报告任务完成情况)的参数和返回值就行

type TaskReq struct {
}type TaskRep struct {TaskType     int    // 任务类型:Map、ReduceFileName     string // Map 任务的输入文件名TaskID       int    // 任务编号MapTaskCount int    //map任务数量ReduceCount  int    // 传入的reducer的数量,用于hashStatus       intStartTime    time.Time
}type TaskCompleteReq struct {TaskID   intTaskType int
}type TaskCompleteRep struct {Success bool
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/59869.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MYSQL- 展示事件信息 EVENTS 语句(十八)

13.7.5.18 SHOW EVENTS 语句 SHOW EVENTS[{FROM | IN} schema_name][LIKE pattern | WHERE expr]此语句显示有关事件管理器事件的信息&#xff0c;这些信息在第23.4节“使用事件调度器”中进行了讨论。它要求显示事件的数据库具有EVENT权限。 以最简单的形式&#xff0c;SHOW…

WPF Gif图谱 如果隐藏的话会存在BUG

<hc:GifImageWidth"110"Height"110"Margin"20,20,0,0"Stretch"None"Uri"./../WPF/Asset/Image/fire_on.gif"Visibility"{Binding FireVisibility}" />FireVisibility 初始状态必须为&#xff1a;Visibility…

nfs服务器--RHCE

一&#xff0c;简介 NFS&#xff08;Network File System&#xff0c;网络文件系统&#xff09;是FreeBSD支持的文件系统中的一种&#xff0c;它允许网络中的计 算机&#xff08;不同的计算机、不同的操作系统&#xff09;之间通过TCP/IP网络共享资源&#xff0c;主要在unix系…

Linux驱动开发——设备树随记

Linux驱动开发——设备树随记 前言 在嵌入式Linux这块&#xff0c;对设备树一直都没怎么去了解&#xff0c;一直是模模糊糊的。所以最近也是被老大赶鸭子上架&#xff0c;快速跟着正点原子的驱动开发的课程学了一下。感觉对设备树的认识也是更清晰了一点。同样借着此篇博客记…

Uni-APP+Vue3+鸿蒙 开发菜鸟流程

参考文档 文档中心 运行和发行 | uni-app官网 AppGallery Connect DCloud开发者中心 环境要求 Vue3jdk 17 Java Downloads | Oracle 中国 【鸿蒙开发工具内置jdk17&#xff0c;本地不使用17会报jdk版本不一致问题】 开发工具 HBuilderDevEco Studio【目前只下载这一个就…

[每日一氵] PySpark 的 log GC 部分是什么意思

2024-11-15T11:10:40.2920800: 2850.503: [GC (Allocation Failure) [PSYoungGen: 142705K->3472K(141312K)] 1403514K->1264289K(1543168K), 0.0170225 secs] [Times: user0.05 sys0.00, real0.01 secs] 这一行日志来自Java的垃圾收集器&#xff08;Garbage Collector, …

sslSocketFactory not supported on JDK 9+

clientBuilder.sslSocketFactory(SSLSocketFactory) not supported on JDK 9 at okhttp3.internal.platform.Jdk9Platform.trustManager(Jdk9Platform.kt:61) at okhttp3.OkHttpClient$Builder.sslSocketFactory(OkHttpClient.kt:751) at 1.升版本4.9.3以上 2、加个函数获取X…

「Mac玩转仓颉内测版8」入门篇8 - Cangjie函数与方法

本篇介绍Cangjie编程语言中的函数与方法&#xff0c;帮助理解如何通过函数封装重复操作&#xff0c;提升代码的复用性和可维护性。 关键词 Cangjie函数方法定义参数传递返回值模块化与复用性 一、什么是函数&#xff1f; 函数是一个代码块&#xff0c;用于接收参数、执行操作…

ubuntu 16.04 中 VS2019 跨平台开发环境配置

su 是 “switch user” 的缩写&#xff0c;表示从当前用户切换到另一个用户。 sudo 是 “superuser do” 的缩写&#xff0c;意为“以超级用户身份执行”。 apt 是 “Advanced Package Tool” 的缩写&#xff0c;Ubuntu中用于软件包管理的命令行工具。 1、为 root 用户设置密码…

git没有识别出大写字母改成小写重命名的文件目录

Git 默认不会跟踪大写字母和小写字母的区别&#xff0c;因为在大多数文件系统中&#xff0c;大写字母和小写字母被认为是相同的文件&#xff0c;只有在区分大小写的文件系统中&#xff08;如 macOS 的 HFS 或 Windows 的 NTFS&#xff09;&#xff0c;这才是一个问题。 如果重命…

Java集合ConcurrentHashMap——针对实习面试

目录 Java集合ConcurrentHashMapConcurrentHashMap的特性是什么&#xff1f;HashMap和ConcurrentHashMap的区别&#xff1f;说说ConcurrentHashMap的底层实现 Java集合ConcurrentHashMap ConcurrentHashMap的特性是什么&#xff1f; 线程安全性 多线程并发读写安全&#xff1a…

游戏引擎学习第16天

视频参考:https://www.bilibili.com/video/BV1mEUCY8EiC/ 这些字幕讨论了编译器警告的概念以及如何在编译过程中启用和处理警告。以下是字幕的内容摘要&#xff1a; 警告的定义&#xff1a;警告是编译器用来告诉你某些地方可能存在问题&#xff0c;尽管编译器不强制要求你修复…

数据库几道简答题

1。 什么是数据库? 答&#xff1a;数据库是长期存储在计算机内、有组织的、可共享的数据集合。数据库是按某种数据模型进行组织的、存放在外存储器上&#xff0c;且可被多个用户同时使用。因此,数据库具有较小的冗余度,较高的数据独立性和易扩展性. 2。 什么是数据库的数据独…

使用--log-file保存pytest的运行日志

前面使用了tee和重定向来保存pytest的运行日志&#xff0c;这次使用--log-file&#xff0c;因为它可以配置日志的级别、格式和每行日志的生成时间。 pytest -q -s -ra --count100 test_open_stream.py --alluredir./report/CXL --log-filepytest_log.txt 【pytest.ini】 使用…

【题目3】C++类的设计——07年复试笔试题

【题目】07年C复试笔试真题 定义一个处理日期的类TDate&#xff0c;它有3个私有数据成员&#xff1a;Month,Day,Year和若干共有成员函数&#xff0c;实现如下要求[附条件解读] ①构造函数重载→创建无参构造函数有参构造函数 ②成员函数设置缺省参数→与④一同可用set()在类中实…

【STL】set,multiset,map,multimap的介绍以及使用

关联式容器 在C的STL中包含序列式容器和关联式容器 1.关联式容器&#xff1a;它里面存储的是元素本身&#xff0c;其底层是线性序列的数据结构&#xff0c;比如&#xff1a;vector&#xff0c;list&#xff0c;deque&#xff0c;forward_list(C11)等 2.关联式容器里面储存的…

VUE+SPRINGBOOT实现邮箱注册、重置密码、登录功能

随着互联网的发展&#xff0c;网站用户的管理、触达、消息通知成为一个网站设计是否合理的重要标志。目前主流互联网公司都支持手机验证码注册、登录。但是手机短信作为服务端网站是需要付出运营商通信成本的&#xff0c;而邮箱的注册、登录、重置密码&#xff0c;无疑成为了这…

PyTorch——从入门到精通:PyTorch基础知识(张量)【PyTorch系统学习】

什么是张量&#xff08;Tensor&#xff09; ​ 张量在数学中是一个代数对象&#xff0c;描述了与矢量空间相关的代数对象集之间的多重线性映射。张量是向量和矩阵概念的推广&#xff0c;可以理解为多维数组。作为数学中的一个基本概念&#xff0c;张量有着多种类型&#xff0c;…

自动化生成边界测试和极端情况测试用例

在软件测试中&#xff0c;边界测试和极端情况测试是确保代码健壮性和容错能力的关键步骤。许多软件缺陷和错误往往发生在输入数据的边界值或极端情况下。手动生成这些测试用例不仅费时费力&#xff0c;而且容易遗漏。幸运的是&#xff0c;OpenAI的强大功能可以帮助软件测试工程…

ARM(安谋) China处理器

0 Preface/Foreword 0.1 参考博客 Cortex-M23/M33与STAR-MC1星辰处理器 ARM China&#xff0c;2018年4月established&#xff0c;独立运行。 1 处理器类型 1.1 周易AIPU 1.2 STAR-MC1&#xff08;星辰处理器&#xff09; STAT-MC1&#xff0c;主要为满足AIOT应用性能、功…