MIT6.5840-2023-Lab1: MapReduce

前置知识

MapReduce:Master 将一个 Map 任务或 Reduce 任务分配给一个空闲的 worker。
Map阶段:被分配了 map 任务的 worker 程序读取相关的输入数据片段,生成并输出中间 k/v 对,并缓存在内存中。
Reduce阶段:所有 map 任务结束,reduce 程序使用 RPC 从 map worker 所在主机的磁盘上读取缓存数据,通过对 key 进行排序后使得具有相同 key 值的数据聚合在一起,reduce 进行操作后输出为文件。
image.png

实验内容

实现一个分布式 MapReduce,由两个程序(coordinator 和 worker)组成。只有一个 coordinator 和一个或多个并行执行的 worker 。在真实系统中, worker 会运行在多台不同的机器上,但在本 lab 中将在一台机器上运行所有 worker 。 worker 将通过 RPC 与 coordinator 通话。每个 worker 进程都会向 coordinator 请求 task,从一个或多个文件中读取 task 输入,执行 task,并将 task 输出写入一个或多个文件。 coordinator 应该注意到,如果某个 worker 在合理的时间内(本 lab 使用 10 秒)没有完成任务,就会将相同的 task 交给另一个 worker。
rpc举例:https://pdos.csail.mit.edu/6.824/notes/kv.go
lab内容:https://pdos.csail.mit.edu/6.824/labs/lab-mr.html
Impl:mr/coordinator.go、mr/worker.go、mr/rpc.go
总结一下
对于 Coordiantor:

  • map 任务初始化;
  • rpc handler:回应worker分配任务请求、回应worker任务完成通知;
  • 自身状态控制,处理 map/reduce 阶段,还是已经全部完成;
  • 任务超时重新分配;

对于 Worker:

  • 给 coordinator 发送 rpc 请求分配任务;
  • 给 coordinator 发送 rpc 通知任务完成;
  • 自身状态控制,准确来说是 coordinator 不需要 worker 工作时,通知 worker 结束运行;

具体code见:https://github.com/BeGifted/MIT6.5840-2023

实验环境

OS:WSL-Ubuntu-18.04
golang:go1.17.6 linux/amd64

概要设计

所设计的Coordinator、Task、rpc消息格式:

type WorkerArgs struct {WorkerId    intWorkerState int // init\done\failTask        *Task
}type WorkerReply struct {WorkerId    intWorkerState int // init\done\failTask        *Task
}type Coordinator struct {// Your definitions here.MapTaskChan    chan *TaskReduceTaskChan chan *TaskNumReduce      int // reduce numNumMap         int // map numNumDoneReduce  int // reduce done numNumDoneMap     int // map done numState          int // map\reduce\donemu             sync.MutexTimeout        time.DurationMapTasks       map[int]*TaskReduceTasks    map[int]*Task
}type Task struct {TaskId    intTaskType  int // map\reduceTaskState int // int\run\doneNReduce   int // nReduceStartTime time.TimeInput []string
}const (StateMap    = 0StateReduce = 1StateDone   = 2
)const (TaskStateInit = 0TaskStateRun  = 1TaskStateDone = 2
)const (TaskTypeMap    = 0TaskTypeReduce = 1
)const (WorkerStateInit = 0WorkerStateDone = 1WorkerStateFail = 2
)

(TODO)WorkerState 出现 fail 的原因主要是在文件无法打开或读取上,如果是在处理 map 任务时出现 fail,那只有可能是原文件丢失了;如果是 reduce 任务时出现 fail,表示中间文件丢失,需要运行某个特定的 map 任务重新生成,然后再重新开始该 reduce 任务。当然,不实现这个也不会影响 test。

主要流程

创建 Coordinator

创建 Coordinator 并且初始化,将需要处理的数据片段放入 MapTaskChan 信道。这里将单个文件视作一个数据片段进行处理,也就是说有 len(files) 个 map 任务。

func MakeCoordinator(files []string, nReduce int) *Coordinator {c := Coordinator{}// Your code here.c.NumMap = len(files)c.NumReduce = nReducec.MapTaskChan = make(chan *Task, len(files))c.ReduceTaskChan = make(chan *Task, nReduce)c.MapTasks = make(map[int]*Task)c.ReduceTasks = make(map[int]*Task)c.NumDoneMap = 0c.NumDoneReduce = 0c.State = StateMapc.Timeout = time.Duration(time.Second * 10)for i, file := range files {input := []string{file}task := Task{TaskId:    i,TaskType:  TaskTypeMap,TaskState: TaskStateInit,Input:     input,NReduce:   nReduce,StartTime: time.Now(),}c.MapTaskChan <- &taskc.MapTasks[i] = &task}c.server()return &c
}

运行 Worker

Worker 主要处理两类任务:map 和 reduce。这两类任务通过 rpc 与 Coordinator 通信获取。
map 任务处理:

if task.TaskType == TaskTypeMap {filename := task.Input[0]intermediate := []KeyValue{}file, err := os.Open(filename)if err != nil {log.Fatalf("cannot open %v", filename)continue}content, err := ioutil.ReadAll(file)if err != nil {log.Fatalf("cannot read %v", filename)continue}file.Close()// log.Println("mapf")// log.Println(task.TaskId)kva := mapf(filename, string(content))intermediate = append(intermediate, kva...)// sort.Sort(ByKey(intermediate))ReduceSplit := make(map[int][]KeyValue)for _, kv := range intermediate {ReduceSplit[ihash(kv.Key)%task.NReduce] = append(ReduceSplit[ihash(kv.Key)%task.NReduce], kv)}for i := 0; i < task.NReduce; i++ {oname := fmt.Sprintf("mr-%d-%d.tmp", task.TaskId, i)ofile, _ := os.Create(oname)enc := json.NewEncoder(ofile)for _, kv := range ReduceSplit[i] {err := enc.Encode(&kv)if err != nil {log.Fatalf("cannot encode %v", kv)break}}ofile.Close()}// Task Doneargs.Task = taskTaskDone(&args)
}

reduce 任务处理:

if task.TaskType == TaskTypeReduce {var kva ByKeyfor _, filename := range task.Input {file, err := os.Open(filename)if err != nil {log.Fatalf("cannot open %v", filename)file.Close()continue}dec := json.NewDecoder(file)for {var kv KeyValueif err := dec.Decode(&kv); err != nil {break}kva = append(kva, kv)}file.Close()}sort.Sort(kva)i := 0oname := fmt.Sprintf("mr-out-%d", task.TaskId)ofile, _ := os.Create(oname)for i < len(kva) {j := i + 1for j < len(kva) && kva[j].Key == kva[i].Key {j++}values := []string{}for k := i; k < j; k++ {values = append(values, kva[k].Value)}output := reducef(kva[i].Key, values)fmt.Fprintf(ofile, "%v %v\n", kva[i].Key, output)i = j}// Task Doneargs.Task = taskTaskDone(&args)
}

WorkerHandler

给 worker 分配 map/reduce 任务,取决于阶段任务是否全部完成,当阶段任务全部完成,coordinator 的状态也需要更新。这个过程全局加锁。
处理 map 阶段:

if c.State == StateMap {select {case reply.Task = <-c.MapTaskChan:reply.Task.StartTime = time.Now()reply.Task.TaskState = TaskStateRundefault:for _, mapTask := range c.MapTasks {if mapTask.TaskState == TaskStateRun && time.Since(mapTask.StartTime) > c.Timeout {mapTask.StartTime = time.Now()reply.Task = mapTaskreturn nil}}}
}

处理 reduce 阶段:

if c.State == StateReduce {select {case reply.Task = <-c.ReduceTaskChan:reply.Task.StartTime = time.Now()reply.Task.TaskState = TaskStateRundefault:for _, reduceTask := range c.ReduceTasks {if reduceTask.TaskState == TaskStateRun && time.Since(reduceTask.StartTime) > c.Timeout {reduceTask.StartTime = time.Now()reply.Task = reduceTaskreturn nil}}}
}

需要注意的是,除了这两个阶段外还有 StateDone 阶段,即 reduce 任务都执行完毕了,coordinator 还没完全回收,此时 worker 还在请求分配任务,这时候就应该通知 worker 停止。

DoneHandler

coordinator 处理任务完成的通知。全程加锁。在这里更新 task/coordinator 状态。

func (c *Coordinator) DoneHandler(args *WorkerArgs, reply *WorkerReply) error {c.mu.Lock()defer c.mu.Unlock()task := args.Taskif task.TaskType == TaskTypeMap {if task.TaskState == TaskStateRun {task.TaskState = TaskStateDonec.MapTasks[task.TaskId].TaskState = TaskStateDonec.NumDoneMap++}} else if task.TaskType == TaskTypeReduce {if task.TaskState == TaskStateRun {task.TaskState = TaskStateDonec.ReduceTasks[task.TaskId].TaskState = TaskStateDonec.NumDoneReduce++}}if c.State == StateMap {if c.NumDoneMap == c.NumMap {c.State = StateReducefor i := 0; i < c.NumReduce; i++ {input := []string{}for j := 0; j < c.NumMap; j++ {input = append(input, fmt.Sprintf("mr-%d-%d.tmp", j, i))}task := Task{TaskId:    i,TaskType:  TaskTypeReduce,TaskState: TaskStateInit,NReduce:   c.NumReduce,StartTime: time.Now(),Input:     input,}c.ReduceTaskChan <- &taskc.ReduceTasks[i] = &task}}} else if c.State == StateReduce {if c.NumDoneReduce == c.NumReduce {c.State = StateDone}}return nil
}

实验结果

bash test-mr-many.sh 10

image.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/202353.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

QT 中基于 TCP 的网络通信 (备查)

基础 基于 TCP 的套接字通信需要用到两个类&#xff1a; 1&#xff09;QTcpServer&#xff1a;服务器类&#xff0c;用于监听客户端连接以及和客户端建立连接。 2&#xff09;QTcpSocket&#xff1a;通信的套接字类&#xff0c;客户端、服务器端都需要使用。 这两个套接字通信类…

位运算在查找重复元素中的妙用

关卡名 用4KB内存寻找重复元素 我会了✔️ 内容 1.理解如何用4KB内存寻找重复元素 ✔️ 本关所有题目的重点都是理解如何解决就好&#xff0c;面试问的时候能够将问题描述清楚&#xff0c;不用写代码。 在海量数据中&#xff0c;此时普通的数组、链表、Hash、树等等结构有…

《当代家庭教育》期刊论文投稿发表简介

《当代家庭教育》杂志是家庭的参谋和助手&#xff0c;社会的桥梁和纽带&#xff0c;人生的伴侣和知音&#xff0c;事业的良师益友。 国家新闻出版总署批准的正规省级教育类G4期刊&#xff0c;知网、维普期刊网收录。安排基础教育相关稿件&#xff0c;适用于评职称时的论文发表…

mybatisplus自动填充属性值

MetaObjectHandler: 是mybatisplus提供的一个接口&#xff0c;&#xff0c;&#xff0c;这个接口定义了在执行插入和更新操作的时候的回调方法&#xff0c;&#xff0c;&#xff0c;允许你自定义实体对象的一些属性值&#xff0c;&#xff0c;比如: createTime,createBy,update…

java中static关键字的作用是什么?

在Java中&#xff0c;static 是一个关键字&#xff0c;它可以用来修饰类的成员变量、成员方法和内部类。static 的作用主要有以下几个方面&#xff1a; 静态变量&#xff08;Static Variables&#xff09;&#xff1a; 当变量被声明为static时&#xff0c;它变成了类的静态变量…

今天面试招了个25K的测试员,从腾讯出来的果然都有两把刷子···

公司前段时间缺人&#xff0c;也面了不少测试&#xff0c;前面一开始瞄准的就是中级的水准&#xff0c;也没指望来大牛&#xff0c;提供的薪资在15-25k&#xff0c;面试的人很多&#xff0c;但平均水平很让人失望。看简历很多都是4年工作经验&#xff0c;但面试中&#xff0c;不…

OkGo导入失败解决办法

jcenter()maven { url "https://jitpack.io" }再同步就可以了

罗马数字转整数算法(leetcode第13题)

题目描述&#xff1a; 罗马数字包含以下七种字符: I&#xff0c; V&#xff0c; X&#xff0c; L&#xff0c;C&#xff0c;D 和 M。 字符 数值 I 1 V 5 X 10 L 50 C 100 D 500 M …

微服务主要特点Java微服务开发的关键技术

【点我-这里送书】 本人详解 作者:王文峰,参加过 CSDN 2020年度博客之星,《Java王大师王天师》 公众号:JAVA开发王大师,专注于天道酬勤的 Java 开发问题中国国学、传统文化和代码爱好者的程序人生,期待你的关注和支持!本人外号:神秘小峯 山峯 转载说明:务必注明来源(…

c++ atmoic acquire/release

由于多核cpu缓存的存在&#xff0c;以及gcc编译优化&#xff0c;cpu指令层面的优化&#xff0c;导致程序的执行顺序可能跟你写的顺序不完全一致&#xff08;reorder&#xff09;。 但是在多线程编程中如何确保各个线程能正确的读取到各个变量呢&#xff08;而不是cache中老旧的…

【Vue3从入门到项目实现】RuoYi-Vue3若依框架前端学习——登录页面

若依官方的前后端分离版中&#xff0c;前端用的Vue2&#xff0c;这个有人改了Vue3的前端出来。刚好用来学习&#xff1a; https://gitee.com/weifengze/RuoYi-Vue3 运行前后端项目 首先运行项目 启动前端&#xff0c;npm install、npm run dev 启动后端&#xff0c;按教程配置…

Spring第四课,MVC终章,应用分层的好处,总结

Spring MVC其实也就是Spring Web 软件的设计原则&#xff1a;高内聚&#xff0c;低耦合 高内聚:一个模块各个元素之间联系的紧密程度&#xff0c;如果各个元素&#xff08;语句&#xff0c;程序段&#xff09;之间的联系程度越高&#xff0c;即内聚性越高 低耦合&#xff1a;软…

自定义登录页面模板(移动端)

login/index <script setup lang"ts"> </script><template><div class"login-page">//组件 由于配置了自动注册&#xff0c;所以无需引入<cp-nav-barright-text"注册"click-right"$router.push(/register)&quo…

Codeforces Round 913 (Div. 3)补题

Rook 题目大意&#xff1a;我们给定一个棋盘(如下图)&#xff0c;棋盘上有一个车&#xff0c;问车可以到的位置&#xff0c;任意顺序输出即可。 思路&#xff1a;输出车的行列中非它本身的点即可。 #include<bits/stdc.h> using namespace std; int main() {int t;scanf…

缓存类型及优缺点:Ehcache、Caffeine、Memcached和Redis的比较

文章目录 一、缓存类型二、常见内存缓存三、常见分布式缓存三、Ehcache、Caffeine、Memcached和Redis优缺点以及适用场景1、Ehcache2、Caffeine3、Memcached4、Redis 四、小结五、Ehcache、Caffeine、Memcached、Redis分别支持的数据类型&#xff1f; 一、缓存类型 在Java中&a…

Fabric:链码的部署及执行

Hyperledger Fabric:V2.5.4 写在最前 使用Fabric搭建自定义网络参考&#xff1a;https://blog.csdn.net/yeshang_lady/article/details/134113296 使用Fabric创建应用通道参考&#xff1a;https://blog.csdn.net/yeshang_lady/article/details/134668458 接下来将介绍如何在自…

ELK的日志解决方案

1. 安装和配置ELK 确保你已经安装了Elasticsearch、Logstash和Kibana。你可以按照官方文档或使用包管理工具进行安装。 Elasticsearch官方配置文档 Kibana官方配置文档 2. Logstash配置 拉取logstash 创建容器 docker run -it \ --name logstash \ --privileged \ -p 5044:5…

Ubuntu-Sim2Real环境配置(下)

cd ICRA-RM-Sim2Real/docker_client/ ./exec_client.sh cd ~ roslaunch rtab_navigation rtab_navigation.launch 执行上面代码的时候后台一直刷新 cd ICRA-RM-Sim2Real/docker_client/ ./exec_client.sh cd ~ roslaunch carto_navigation navigation.launch 1.Usage 执行该…

【微服务】spring循环依赖深度解析

目录 一、循环依赖概述 1.2 spring中的循环依赖 二、循环依赖问题模拟 2.1 循环依赖代码演示 2.2 问题分析与解决 2.2.1 使用反射中间容器 三、spring循环依赖问题解析 3.1 spring中的依赖注入 3.1.1 field属性注入 3.1.2 setter方法注入 3.1.3 构造器注入 3.2 spri…