MIT6.5840-2023-Lab1: MapReduce

前置知识

MapReduce:Master 将一个 Map 任务或 Reduce 任务分配给一个空闲的 worker。
Map阶段:被分配了 map 任务的 worker 程序读取相关的输入数据片段,生成并输出中间 k/v 对,并缓存在内存中。
Reduce阶段:所有 map 任务结束,reduce 程序使用 RPC 从 map worker 所在主机的磁盘上读取缓存数据,通过对 key 进行排序后使得具有相同 key 值的数据聚合在一起,reduce 进行操作后输出为文件。
image.png

实验内容

实现一个分布式 MapReduce,由两个程序(coordinator 和 worker)组成。只有一个 coordinator 和一个或多个并行执行的 worker 。在真实系统中, worker 会运行在多台不同的机器上,但在本 lab 中将在一台机器上运行所有 worker 。 worker 将通过 RPC 与 coordinator 通话。每个 worker 进程都会向 coordinator 请求 task,从一个或多个文件中读取 task 输入,执行 task,并将 task 输出写入一个或多个文件。 coordinator 应该注意到,如果某个 worker 在合理的时间内(本 lab 使用 10 秒)没有完成任务,就会将相同的 task 交给另一个 worker。
rpc举例:https://pdos.csail.mit.edu/6.824/notes/kv.go
lab内容:https://pdos.csail.mit.edu/6.824/labs/lab-mr.html
Impl:mr/coordinator.go、mr/worker.go、mr/rpc.go
总结一下
对于 Coordiantor:

  • map 任务初始化;
  • rpc handler:回应worker分配任务请求、回应worker任务完成通知;
  • 自身状态控制,处理 map/reduce 阶段,还是已经全部完成;
  • 任务超时重新分配;

对于 Worker:

  • 给 coordinator 发送 rpc 请求分配任务;
  • 给 coordinator 发送 rpc 通知任务完成;
  • 自身状态控制,准确来说是 coordinator 不需要 worker 工作时,通知 worker 结束运行;

具体code见:https://github.com/BeGifted/MIT6.5840-2023

实验环境

OS:WSL-Ubuntu-18.04
golang:go1.17.6 linux/amd64

概要设计

所设计的Coordinator、Task、rpc消息格式:

type WorkerArgs struct {WorkerId    intWorkerState int // init\done\failTask        *Task
}type WorkerReply struct {WorkerId    intWorkerState int // init\done\failTask        *Task
}type Coordinator struct {// Your definitions here.MapTaskChan    chan *TaskReduceTaskChan chan *TaskNumReduce      int // reduce numNumMap         int // map numNumDoneReduce  int // reduce done numNumDoneMap     int // map done numState          int // map\reduce\donemu             sync.MutexTimeout        time.DurationMapTasks       map[int]*TaskReduceTasks    map[int]*Task
}type Task struct {TaskId    intTaskType  int // map\reduceTaskState int // int\run\doneNReduce   int // nReduceStartTime time.TimeInput []string
}const (StateMap    = 0StateReduce = 1StateDone   = 2
)const (TaskStateInit = 0TaskStateRun  = 1TaskStateDone = 2
)const (TaskTypeMap    = 0TaskTypeReduce = 1
)const (WorkerStateInit = 0WorkerStateDone = 1WorkerStateFail = 2
)

(TODO)WorkerState 出现 fail 的原因主要是在文件无法打开或读取上,如果是在处理 map 任务时出现 fail,那只有可能是原文件丢失了;如果是 reduce 任务时出现 fail,表示中间文件丢失,需要运行某个特定的 map 任务重新生成,然后再重新开始该 reduce 任务。当然,不实现这个也不会影响 test。

主要流程

创建 Coordinator

创建 Coordinator 并且初始化,将需要处理的数据片段放入 MapTaskChan 信道。这里将单个文件视作一个数据片段进行处理,也就是说有 len(files) 个 map 任务。

func MakeCoordinator(files []string, nReduce int) *Coordinator {c := Coordinator{}// Your code here.c.NumMap = len(files)c.NumReduce = nReducec.MapTaskChan = make(chan *Task, len(files))c.ReduceTaskChan = make(chan *Task, nReduce)c.MapTasks = make(map[int]*Task)c.ReduceTasks = make(map[int]*Task)c.NumDoneMap = 0c.NumDoneReduce = 0c.State = StateMapc.Timeout = time.Duration(time.Second * 10)for i, file := range files {input := []string{file}task := Task{TaskId:    i,TaskType:  TaskTypeMap,TaskState: TaskStateInit,Input:     input,NReduce:   nReduce,StartTime: time.Now(),}c.MapTaskChan <- &taskc.MapTasks[i] = &task}c.server()return &c
}

运行 Worker

Worker 主要处理两类任务:map 和 reduce。这两类任务通过 rpc 与 Coordinator 通信获取。
map 任务处理:

if task.TaskType == TaskTypeMap {filename := task.Input[0]intermediate := []KeyValue{}file, err := os.Open(filename)if err != nil {log.Fatalf("cannot open %v", filename)continue}content, err := ioutil.ReadAll(file)if err != nil {log.Fatalf("cannot read %v", filename)continue}file.Close()// log.Println("mapf")// log.Println(task.TaskId)kva := mapf(filename, string(content))intermediate = append(intermediate, kva...)// sort.Sort(ByKey(intermediate))ReduceSplit := make(map[int][]KeyValue)for _, kv := range intermediate {ReduceSplit[ihash(kv.Key)%task.NReduce] = append(ReduceSplit[ihash(kv.Key)%task.NReduce], kv)}for i := 0; i < task.NReduce; i++ {oname := fmt.Sprintf("mr-%d-%d.tmp", task.TaskId, i)ofile, _ := os.Create(oname)enc := json.NewEncoder(ofile)for _, kv := range ReduceSplit[i] {err := enc.Encode(&kv)if err != nil {log.Fatalf("cannot encode %v", kv)break}}ofile.Close()}// Task Doneargs.Task = taskTaskDone(&args)
}

reduce 任务处理:

if task.TaskType == TaskTypeReduce {var kva ByKeyfor _, filename := range task.Input {file, err := os.Open(filename)if err != nil {log.Fatalf("cannot open %v", filename)file.Close()continue}dec := json.NewDecoder(file)for {var kv KeyValueif err := dec.Decode(&kv); err != nil {break}kva = append(kva, kv)}file.Close()}sort.Sort(kva)i := 0oname := fmt.Sprintf("mr-out-%d", task.TaskId)ofile, _ := os.Create(oname)for i < len(kva) {j := i + 1for j < len(kva) && kva[j].Key == kva[i].Key {j++}values := []string{}for k := i; k < j; k++ {values = append(values, kva[k].Value)}output := reducef(kva[i].Key, values)fmt.Fprintf(ofile, "%v %v\n", kva[i].Key, output)i = j}// Task Doneargs.Task = taskTaskDone(&args)
}

WorkerHandler

给 worker 分配 map/reduce 任务,取决于阶段任务是否全部完成,当阶段任务全部完成,coordinator 的状态也需要更新。这个过程全局加锁。
处理 map 阶段:

if c.State == StateMap {select {case reply.Task = <-c.MapTaskChan:reply.Task.StartTime = time.Now()reply.Task.TaskState = TaskStateRundefault:for _, mapTask := range c.MapTasks {if mapTask.TaskState == TaskStateRun && time.Since(mapTask.StartTime) > c.Timeout {mapTask.StartTime = time.Now()reply.Task = mapTaskreturn nil}}}
}

处理 reduce 阶段:

if c.State == StateReduce {select {case reply.Task = <-c.ReduceTaskChan:reply.Task.StartTime = time.Now()reply.Task.TaskState = TaskStateRundefault:for _, reduceTask := range c.ReduceTasks {if reduceTask.TaskState == TaskStateRun && time.Since(reduceTask.StartTime) > c.Timeout {reduceTask.StartTime = time.Now()reply.Task = reduceTaskreturn nil}}}
}

需要注意的是,除了这两个阶段外还有 StateDone 阶段,即 reduce 任务都执行完毕了,coordinator 还没完全回收,此时 worker 还在请求分配任务,这时候就应该通知 worker 停止。

DoneHandler

coordinator 处理任务完成的通知。全程加锁。在这里更新 task/coordinator 状态。

func (c *Coordinator) DoneHandler(args *WorkerArgs, reply *WorkerReply) error {c.mu.Lock()defer c.mu.Unlock()task := args.Taskif task.TaskType == TaskTypeMap {if task.TaskState == TaskStateRun {task.TaskState = TaskStateDonec.MapTasks[task.TaskId].TaskState = TaskStateDonec.NumDoneMap++}} else if task.TaskType == TaskTypeReduce {if task.TaskState == TaskStateRun {task.TaskState = TaskStateDonec.ReduceTasks[task.TaskId].TaskState = TaskStateDonec.NumDoneReduce++}}if c.State == StateMap {if c.NumDoneMap == c.NumMap {c.State = StateReducefor i := 0; i < c.NumReduce; i++ {input := []string{}for j := 0; j < c.NumMap; j++ {input = append(input, fmt.Sprintf("mr-%d-%d.tmp", j, i))}task := Task{TaskId:    i,TaskType:  TaskTypeReduce,TaskState: TaskStateInit,NReduce:   c.NumReduce,StartTime: time.Now(),Input:     input,}c.ReduceTaskChan <- &taskc.ReduceTasks[i] = &task}}} else if c.State == StateReduce {if c.NumDoneReduce == c.NumReduce {c.State = StateDone}}return nil
}

实验结果

bash test-mr-many.sh 10

image.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/202353.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

QT 中基于 TCP 的网络通信 (备查)

基础 基于 TCP 的套接字通信需要用到两个类&#xff1a; 1&#xff09;QTcpServer&#xff1a;服务器类&#xff0c;用于监听客户端连接以及和客户端建立连接。 2&#xff09;QTcpSocket&#xff1a;通信的套接字类&#xff0c;客户端、服务器端都需要使用。 这两个套接字通信类…

《当代家庭教育》期刊论文投稿发表简介

《当代家庭教育》杂志是家庭的参谋和助手&#xff0c;社会的桥梁和纽带&#xff0c;人生的伴侣和知音&#xff0c;事业的良师益友。 国家新闻出版总署批准的正规省级教育类G4期刊&#xff0c;知网、维普期刊网收录。安排基础教育相关稿件&#xff0c;适用于评职称时的论文发表…

今天面试招了个25K的测试员,从腾讯出来的果然都有两把刷子···

公司前段时间缺人&#xff0c;也面了不少测试&#xff0c;前面一开始瞄准的就是中级的水准&#xff0c;也没指望来大牛&#xff0c;提供的薪资在15-25k&#xff0c;面试的人很多&#xff0c;但平均水平很让人失望。看简历很多都是4年工作经验&#xff0c;但面试中&#xff0c;不…

OkGo导入失败解决办法

jcenter()maven { url "https://jitpack.io" }再同步就可以了

c++ atmoic acquire/release

由于多核cpu缓存的存在&#xff0c;以及gcc编译优化&#xff0c;cpu指令层面的优化&#xff0c;导致程序的执行顺序可能跟你写的顺序不完全一致&#xff08;reorder&#xff09;。 但是在多线程编程中如何确保各个线程能正确的读取到各个变量呢&#xff08;而不是cache中老旧的…

【Vue3从入门到项目实现】RuoYi-Vue3若依框架前端学习——登录页面

若依官方的前后端分离版中&#xff0c;前端用的Vue2&#xff0c;这个有人改了Vue3的前端出来。刚好用来学习&#xff1a; https://gitee.com/weifengze/RuoYi-Vue3 运行前后端项目 首先运行项目 启动前端&#xff0c;npm install、npm run dev 启动后端&#xff0c;按教程配置…

自定义登录页面模板(移动端)

login/index <script setup lang"ts"> </script><template><div class"login-page">//组件 由于配置了自动注册&#xff0c;所以无需引入<cp-nav-barright-text"注册"click-right"$router.push(/register)&quo…

Codeforces Round 913 (Div. 3)补题

Rook 题目大意&#xff1a;我们给定一个棋盘(如下图)&#xff0c;棋盘上有一个车&#xff0c;问车可以到的位置&#xff0c;任意顺序输出即可。 思路&#xff1a;输出车的行列中非它本身的点即可。 #include<bits/stdc.h> using namespace std; int main() {int t;scanf…

Fabric:链码的部署及执行

Hyperledger Fabric:V2.5.4 写在最前 使用Fabric搭建自定义网络参考&#xff1a;https://blog.csdn.net/yeshang_lady/article/details/134113296 使用Fabric创建应用通道参考&#xff1a;https://blog.csdn.net/yeshang_lady/article/details/134668458 接下来将介绍如何在自…

ELK的日志解决方案

1. 安装和配置ELK 确保你已经安装了Elasticsearch、Logstash和Kibana。你可以按照官方文档或使用包管理工具进行安装。 Elasticsearch官方配置文档 Kibana官方配置文档 2. Logstash配置 拉取logstash 创建容器 docker run -it \ --name logstash \ --privileged \ -p 5044:5…

Ubuntu-Sim2Real环境配置(下)

cd ICRA-RM-Sim2Real/docker_client/ ./exec_client.sh cd ~ roslaunch rtab_navigation rtab_navigation.launch 执行上面代码的时候后台一直刷新 cd ICRA-RM-Sim2Real/docker_client/ ./exec_client.sh cd ~ roslaunch carto_navigation navigation.launch 1.Usage 执行该…

【微服务】spring循环依赖深度解析

目录 一、循环依赖概述 1.2 spring中的循环依赖 二、循环依赖问题模拟 2.1 循环依赖代码演示 2.2 问题分析与解决 2.2.1 使用反射中间容器 三、spring循环依赖问题解析 3.1 spring中的依赖注入 3.1.1 field属性注入 3.1.2 setter方法注入 3.1.3 构造器注入 3.2 spri…

树莓派学习:wiringPi+硬件pwm+舵机

目录 目的 代码 只有io口1是支持pwm&#xff0c;其他要软件pwm 编译 运行 目的 黄色pwm&#xff0c;红色正极&#xff0c;棕色负极 让sg90舵机转动&#xff0c;就需要一个20ms的周期pwm&#xff0c;其中高电平在0.5-2.5ms之间 转动角度 高电平在一个周期内的时间0 …

ElasticSearch篇---第四篇

系列文章目录 文章目录 系列文章目录前言一、elasticsearch 是如何实现 master 选举的?二、elasticsearch 索引数据多了怎么办,如何调优,部署?三、说说你们公司 es 的集群架构,索引数据大小,分片有多少?前言 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽…

集成开发环境 PyCharm 的安装【侯小啾python基础领航计划 系列(二)】

集成开发环境PyCharm的安装【侯小啾python基础领航计划 系列(二)】 大家好,我是博主侯小啾, 🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔…

LeetCode Hot100 207.课程表

题目&#xff1a; 你这个学期必须选修 numCourses 门课程&#xff0c;记为 0 到 numCourses - 1 。 在选修某些课程之前需要一些先修课程。 先修课程按数组 prerequisites 给出&#xff0c;其中 prerequisites[i] [ai, bi] &#xff0c;表示如果要学习课程 ai 则 必须 先学习…

掌握大型语言模型(LLM)技术:推理优化

原文链接&#xff1a;Mastering LLM Techniques: Inference Optimization | NVIDIA Technical Blog 大模型相关技术文章已整理到Github仓库&#xff0c;欢迎start! 堆叠Transformer层以创建大型模型可以获得更好的准确性、few-shot学习能力&#xff0c;甚至在各种语言任务中具有…

Kafka 的特点和优势

Apache Kafka 作为一款分布式流处理平台&#xff0c;以其独特的特点和卓越的优势成为实时数据处理领域的瑰宝。本文将深入研究 Kafka 的各项特点和优势&#xff0c;并通过详实的示例代码展示其在不同场景下的强大应用。 高吞吐量和水平扩展 Kafka 的设计注重高吞吐量和水平扩…

Python-炸弹人【附完整源码】

炸弹人 炸弹人是童年的一款经典电子游戏&#xff0c;玩家控制一个类似"炸弹人"的角色&#xff0c;这个角色可以放置炸弹&#xff0c;并在指定的时间内引爆它们消灭敌人以达到目标&#xff0c;此游戏共设有两节关卡&#xff0c;代码如下&#xff1a; 运行效果&#x…