[MIT6.5840]MapReduce

MapReduce

Lab 地址

https://pdos.csail.mit.edu/6.824/labs/lab-mr.html

论文地址

https://static.googleusercontent.com/media/research.google.com/zh-CN//archive/mapreduce-osdi04.pdf

工作原理

简单来讲,MapReduce是一种分布式框架,可以用来处理大规模数据。该框架抽象了两个接口,分别是MapReduce函数:
在这里插入图片描述

凡是符合这个模式的算法都可以使用该框架来实现并行化,执行流程如下图所示。

在这里插入图片描述

整个框架分为Master和Worker,Master负责分配mapreduce任务,Worker负责向Master申请任务并执行。执行流程如下:

Map阶段:

  • 输入是大文件分割后的一组小文件,通常大小为16~64MB。
  • Worker向Master申请任务,假设得到map任务in0。
  • Worker开始执行map任务,将文件名和文件内容作为参数传入map函数中,得到kv list.
  • 最后Worker将kv list分割成reduceNum份(超参数),要求使得具有相同key的kv对在一份中。可以通过hash值%reduceNum实现分割,然后输出到文件中,下图的0-*

Reduce阶段:

  • 输入当前reduce的序号id,从map阶段的输出中选出*-id的文件,也就是将hash值%reduceNum值相同的kv对取出,这样可以保证具有相同key的kv对只用一次处理。
  • 将所有的kv对根据键值排序,使得相同key的kv对能够连续排列,方便合并。
  • 之后合并相同key的kv对,然后将每个key和其对应的value list输入reduce函数,得到合并的结果,再将其输出到文件中。

在这里插入图片描述

本文介绍了大致思想,详细内容请参考原论文。

代码详解

rpc.go

package mr//
// RPC definitions.
//
// remember to capitalize all names.
//import ("fmt""os""strconv"
)const (MAP    = "MAP"REDUCE = "REDUCE"DONE   = "DONE"
)//
// example to show how to declare the arguments
// and reply for an RPC.
//type ApplyArgs struct {WorkerID     intLastTaskType stringLastTaskID   int
}type ReplyArgs struct {TaskId    intTaskType  stringInputFile stringMapNum    intReduceNum int
}// Add your RPC definitions here.// Cook up a unique-ish UNIX-domain socket name
// in /var/tmp, for the coordinator.
// Can't use the current directory since
// Athena AFS doesn't support UNIX-domain sockets.
func coordinatorSock() string {s := "/var/tmp/5840-mr-"s += strconv.Itoa(os.Getuid())return s
}
// 构造文件名
func tmpMapResult(workerID int, taskID int, reduceId int) string {return fmt.Sprintf("tmp-worker-%d-%d-%d", workerID, taskID, reduceId)
}func finalMapResult(taskID int, reduceID int) string {return fmt.Sprintf("mr-%d-%d", taskID, reduceID)
}func tmpReduceResult(workerID int, reduceId int) string {return fmt.Sprintf("tmp-worker-%d-out-%d", workerID, reduceId)
}func finalReduceResult(reduceID int) string {return fmt.Sprintf("mr-out-%d", reduceID)
}

worker.go

package mrimport ("fmt""hash/fnv""io""log""net/rpc""os""sort""strings"
)// Map functions return a slice of KeyValue.
type KeyValue struct {Key   stringValue string
}// for sorting by key.
type ByKey []KeyValue// for sorting by key.
func (a ByKey) Len() int           { return len(a) }
func (a ByKey) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }// use ihash(key) % NReduce to choose the reduce
// task number for each KeyValue emitted by Map.
func ihash(key string) int {h := fnv.New32a()h.Write([]byte(key))return int(h.Sum32() & 0x7fffffff)
}// main/mrworker.go calls this function.
func Worker(mapf func(string, string) []KeyValue,reducef func(string, []string) string) {// Your worker implementation here.id := os.Getegid()// log.Printf("worker %d start working", id)lastTaskId := -1lastTaskType := ""loop:for {args := ApplyArgs{WorkerID:     id,LastTaskType: lastTaskType,LastTaskID:   lastTaskId,}reply := ReplyArgs{}ok := call("Coordinator.ApplyForTask", &args, &reply)if !ok {fmt.Printf("call failed!\n")continue}// log.Printf("reply: %v", reply)lastTaskId = reply.TaskIdlastTaskType = reply.TaskTypeswitch reply.TaskType {case "":// log.Println("finished")break loopcase MAP:// log.Printf("worker %d get map task %d", id, reply.TaskId)doMapTask(id, reply.TaskId, reply.InputFile, reply.ReduceNum, mapf)case REDUCE:// log.Printf("worker %d get reduce task %d", id, reply.TaskId)doReduceTask(id, reply.TaskId, reply.MapNum, reducef)}}// uncomment to send the Example RPC to the coordinator.// CallExample()}// send an RPC request to the coordinator, wait for the response.
// usually returns true.
// returns false if something goes wrong.
func call(rpcname string, args interface{}, reply interface{}) bool {// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")sockname := coordinatorSock()c, err := rpc.DialHTTP("unix", sockname)if err != nil {log.Fatal("dialing:", err)}defer c.Close()err = c.Call(rpcname, args, reply)if err == nil {return true}fmt.Println(err)return false
}func doMapTask(id int, taskId int, filename string, reduceNum int, mapf func(string, string) []KeyValue) {file, err := os.Open(filename)if err != nil {log.Fatalf("%s 文件打开失败! ", filename)return}content, err := io.ReadAll(file)if err != nil {log.Fatalf("%s 文件内容读取失败! ", filename)}file.Close()kvList := mapf(filename, string(content)) // kv listhashedKvList := make(map[int]ByKey)for _, kv := range kvList {hashedKey := ihash(kv.Key) % reduceNumhashedKvList[hashedKey] = append(hashedKvList[hashedKey], kv)}for i := 0; i < reduceNum; i++ {outFile, err := os.Create(tmpMapResult(id, taskId, i))if err != nil {log.Fatalf("can not create output file: %e", err)return}for _, kv := range hashedKvList[i] {fmt.Fprintf(outFile, "%v\t%v\n", kv.Key, kv.Value)}outFile.Close()}// log.Printf("worker %d finished map task\n", id)
}func doReduceTask(id int, taskId int, mapNum int, reducef func(string, []string) string) {var kvList ByKeyvar lines []stringfor i := 0; i < mapNum; i++ {mapOutFile := finalMapResult(i, taskId)file, err := os.Open(mapOutFile)if err != nil {log.Fatalf("can not open output file %s: %e", mapOutFile, err)return}content, err := io.ReadAll(file)if err != nil {log.Fatalf("file read failed %s: %e", mapOutFile, err)return}lines = append(lines, strings.Split(string(content), "\n")...)}for _, line := range lines {if strings.TrimSpace(line) == "" {continue}split := strings.Split(line, "\t")kvList = append(kvList, KeyValue{Key: split[0], Value: split[1]})}sort.Sort(kvList)outputFile := tmpReduceResult(id, taskId)file, err := os.Create(outputFile)if err != nil {log.Fatalf("can not create output file: %e", err)return}for i := 0; i < len(kvList); {j := i + 1key := kvList[i].Keyvar values []stringfor j < len(kvList) && kvList[j].Key == key {j++}for k := i; k < j; k++ {values = append(values, kvList[k].Value)}res := reducef(key, values)fmt.Fprintf(file, "%v %v\n", key, res)i = j}file.Close()// log.Printf("worker %d finished reduce task", id)
}

coordinator.go

package mrimport ("fmt""log""math""net""net/http""net/rpc""os""sync""time"
)type Task struct {id        intinputFile stringworker    inttaskType  stringdeadLine  time.Time
}type Coordinator struct {// Your definitions here.mtx        sync.MutexinputFile  []stringreduceNum  intmapNum     inttaskStates map[string]TasktodoList   chan Taskstage      string
}// Your code here -- RPC handlers for the worker to call.// an example RPC handler.
//
// the RPC argument and reply types are defined in rpc.go.
func (c *Coordinator) ApplyForTask(args *ApplyArgs, reply *ReplyArgs) error {// process the last taskif args.LastTaskID != -1 {taskId := createTaskId(args.LastTaskID, args.LastTaskType)c.mtx.Lock()if task, ok := c.taskStates[taskId]; ok && task.worker != -1 { // 排除过期任务// log.Printf("worker %d finish task %d", args.WorkerID, task.id)if args.LastTaskType == MAP {for i := 0; i < c.reduceNum; i++ {err := os.Rename(tmpMapResult(task.worker, task.id, i), finalMapResult(task.id, i))if err != nil {log.Fatalf("can not rename %s: %e", tmpMapResult(task.worker, task.id, i), err)}}} else if args.LastTaskType == REDUCE {err := os.Rename(tmpReduceResult(task.worker, task.id), finalReduceResult(task.id))if err != nil {log.Fatalf("can not rename %s: %e", tmpReduceResult(task.worker, task.id), err)}}delete(c.taskStates, taskId)if len(c.taskStates) == 0 {c.shift()}}c.mtx.Unlock()}// assign the new tasktask, ok := <-c.todoListif !ok {return nil}reply.InputFile = task.inputFilereply.MapNum = c.mapNumreply.ReduceNum = c.reduceNumreply.TaskId = task.idreply.TaskType = task.taskTypetask.worker = args.WorkerIDtask.deadLine = time.Now().Add(10 * time.Second)// log.Printf("assign %s task %d to worker %d", task.taskType, task.id, args.WorkerID)c.mtx.Lock()c.taskStates[createTaskId(task.id, task.taskType)] = taskc.mtx.Unlock()return nil
}// start a thread that listens for RPCs from worker.go
func (c *Coordinator) server() {rpc.Register(c)rpc.HandleHTTP()//l, e := net.Listen("tcp", ":1234")sockname := coordinatorSock()os.Remove(sockname)l, e := net.Listen("unix", sockname)if e != nil {log.Fatal("listen error:", e)}go http.Serve(l, nil)
}
// 改变当前的状态
func (c *Coordinator) shift() {// 加锁状态if c.stage == MAP {// log.Printf("Map Task finished")c.stage = REDUCE// 分配reduce taskfor i := 0; i < c.reduceNum; i++ {task := Task{id:       i,worker:   -1,taskType: REDUCE,}c.todoList <- taskc.taskStates[createTaskId(i, REDUCE)] = task}} else if c.stage == REDUCE {close(c.todoList)c.stage = DONE}
}// main/mrcoordinator.go calls Done() periodically to find out
// if the entire job has finished.
func (c *Coordinator) Done() bool {// Your code here.c.mtx.Lock()defer c.mtx.Unlock()return c.stage == DONE
}// create a Coordinator.
// main/mrcoordinator.go calls this function.
// nReduce is the number of reduce tasks to use.
func MakeCoordinator(files []string, nReduce int) *Coordinator {c := Coordinator{mtx:        sync.Mutex{},inputFile:  files,reduceNum:  nReduce,mapNum:     len(files),taskStates: make(map[string]Task),todoList:   make(chan Task, int(math.Max(float64(nReduce), float64(len(files))))),stage:      MAP,}for i, file := range files {task := Task{id:        i,inputFile: file,worker:    -1,taskType:  MAP,}c.todoList <- taskc.taskStates[createTaskId(i, MAP)] = task}// 回收任务go c.collectTask()c.server()return &c
}func createTaskId(id int, taskType string) string {return fmt.Sprintf("%d-%s", id, taskType)
}
// worker执行过期后回收任务
func (c *Coordinator) collectTask() {for {time.Sleep(500 * time.Millisecond)c.mtx.Lock()if c.stage == DONE {c.mtx.Unlock()return}for _, task := range c.taskStates {if task.worker != -1 && time.Now().After(task.deadLine) {// task is expiredtask.worker = -1// log.Printf("task %d is expired", task.id)c.todoList <- task}}c.mtx.Unlock()}
}

运行说明

mrcoordinator

cd src/main/
go build -buildmode=plugin ../mrapps/wc.go
rm mr-out*
go run mrcoordinator.go pg-*.txt

mrworker

cd src/main/
go run mrworker.go wc.so

测试结果

bash test-mr.sh

在这里插入图片描述

MIT6.5840 课程Lab完整项目

https://github.com/Joker0x00/MIT-6.5840-Lab/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/876795.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

windows 安装docker桌面版

下载 下载两个&#xff1a; git桌面版 docker desktop 启动docker 执行安装文件&#xff0c;启动 更新wsl2 假如报错&#xff0c;会提示失败原因。 win10会提示跳转到&#xff1a; https://learn.microsoft.com/zh-cn/windows/wsl/install-manual#step-4—download-the-l…

从0到1,AI我来了- (4)AI图片识别的理论知识-II

上篇文章&#xff0c;我们理解了我们程序的神经网络设计&#xff0c;这篇我们继续&#xff0c;把训练迭代过程分析一下&#xff0c;完成这两篇文章&#xff0c;下面问题&#xff0c;应该能回答了。 一张图片&#xff0c;如何被计算机读懂&#xff1f;pytorch 封装的网络&#…

DP 整数拆分不同的二叉搜索树 DAY21

整数拆分&#xff1f; 给定一个正整数 n &#xff0c;将其拆分为 k 个 正整数 的和&#xff08; k > 2 &#xff09;&#xff0c;并使这些整数的乘积最大化。 返回 你可以获得的最大乘积。 示例 1: 输入: n 2 输出: 1 解释: 2 1 1, 1 1 1。示例 2: 输入: n 10 输…

全国区块链职业技能大赛样题第9套前端源码

后端源码地址:https://blog.csdn.net/Qhx20040819/article/details/140746050 前端源码地址:https://blog.csdn.net/Qhx20040819/article/details/140746216 智能合约+数据库表设计:https://blog.csdn.net/Qhx20040819/article/details/140746646 登录 ​ 用户管理

又一成就,Pencils Protocol单链 TVL 突破 3 亿美元

Pencils Protocol 是 Scroll 生态的原生项目&#xff0c;该项目以一站式收益聚合器和拍卖平台作为主要定位&#xff0c;在功能上&#xff0c;其集 Launchpad、资产统一聚合和分发、杠杆收益等功能于一体&#xff0c;旨在最大化用户的资产利用率。近日&#xff0c;Pencils Proto…

利用python自动化运维i脚本实现远程连接服务器并实现相应命令

目录 前言&#xff1a; 一.调用的python库介绍 二.在主机上安装好相应的库 2.1激活虚拟环境 三.代码实现以及解析 四.效果的实现 五.致谢 前言&#xff1a; 在当今快速发展的技术环境中&#xff0c;自动化运维已成为 IT 基础设施管理的关键组成部分。它不仅可以显著提…

Lua编程

文章目录 概述lua数据类型元表注意 闭包表现 实现 lua/c 接口编程skynet中调用层次虚拟栈C闭包注册表userdatalightuserdata 小结 概述 这次是skynet&#xff0c;需要一些lua/c相关的。写一篇博客&#xff0c;记录下。希望有所收获。 lua数据类型 boolean , number , string…

大模型算法面试题(十五)

本系列收纳各种大模型面试题及答案。 1、大模型LLM进行SFT如何对样本进行优化 大模型LLM&#xff08;Language Model&#xff0c;语言模型&#xff09;进行SFT&#xff08;Structured Fine-Tuning&#xff0c;结构化微调&#xff09;时&#xff0c;对样本的优化是提升模型性能…

Linux源码阅读笔记16-文件系统关联及字符设备操作

文件系统关联 设备文件都是由标准函数处理&#xff0c;类似普通文件。设备文件也是通过虚拟文件系统来管理的&#xff0c;和普通文件都是通过完全相同的接口访问的。 inode中设备文件的成员数据 虚拟文件系统每个文件都关联到一个inode&#xff0c;用于管理文件的属性。源码如…

【Go - context 速览,场景与用法】

作用 context字面意思上下文&#xff0c;用于关联管理上下文&#xff0c;具体有如下几个作用 取消信号传递&#xff1a;可以用来传递取消信号&#xff0c;让一个正在执行的函数知道它应该提前终止。超时控制&#xff1a;可以设定一个超时时间&#xff0c;自动取消超过执行时间…

Swift学习入门,新手小白看过来

&#x1f604;作者简介&#xff1a; 小曾同学.com,一个致力于测试开发的博主⛽️&#xff0c;主要职责&#xff1a;测试开发、CI/CD 如果文章知识点有错误的地方&#xff0c;还请大家指正&#xff0c;让我们一起学习&#xff0c;一起进步。 &#x1f60a; 座右铭&#xff1a;不…

(十三)Spring教程——依赖注入之工厂方法注入

1.工厂方法注入 工厂方法是在应用中被经常使用的设计模式&#xff0c;它也是控制反转和单例设计思想的主要实现方法。由于Spring IoC容器以框架的方式提供工厂方法的功能&#xff0c;并以透明的方式开放给开发者&#xff0c;所以很少需要手工编写基于工厂方法的类。正是因为工厂…

如何从网站获取表格数据

1.手动复制粘贴 最简单的方法是直接在网页上手动选择表格内容&#xff0c;然后复制粘贴到Excel或其他表格处理软件中。这种方法适用于表格较小且不经常更新的情况。 2.使用浏览器插件 有许多浏览器插件可以帮助从网页中提取表格数据&#xff0c;例如&#xff1a; -TableCapt…

SSRF过滤攻击

SSRF绕过&#xff1a; 靶场地址&#xff1a;重庆橙子科技SSRF靶场 这个是毫无过滤的直接读取&#xff0c;但是一般网站会设置有对SSRF的过滤&#xff0c;比如将IP地址过滤。 下面是常用的绕过方式&#xff1a; 1.环回地址绕过 http://127.0.0.1/flag.php http://017700…

相机怎么选(不推荐,只分析)

title: 相机怎么选 tags: [相机, 单反相机] categories: [其他, 相机] 最近准备购买&#xff0c;相机怎么选&#xff0c;我去搜索了许多文章&#xff0c;整理了一篇小白挑选技术篇&#xff0c;供大家参考。 分类 胶片相机 需要装入胶卷才能使用的相机&#xff0c;拍照后可直…

永磁同步电机无速度算法--非线性磁链观测器

非线性磁链观测器顾名思义观测器的状态变量为磁链值&#xff0c;观测的磁链值收敛于电机实际磁链值&#xff0c;观测器收敛。非线性是由于观测器存在sin和cos项&#xff0c;所以是非线性观测器 一、原理介绍 表贴式永磁同步电机αβ轴电压方程: 将公式变换 定义状态变量X: 定…

Milvus 向量数据库进阶系列丨部署形态选型

本系列文章介绍 在和社区小伙伴们交流的过程中&#xff0c;我们发现大家最关心的问题从来不是某个具体的功能如何使用&#xff0c;而是面对一个具体的实战场景时&#xff0c;如何选择合适的向量数据库解决方案或最优的功能组合。在 “Milvus 向量数据库进阶” 这个系列文章中&…

Java实现打印功能

JPG图片文件格式打印实现 打印JPG图片格式的文件&#xff0c;本次采用的Java原生的打印方式。 public static void main(String[] argv) throws Exception {File file new File("E:\\a.jpg");String printerName "HP MFP M436 PCL6";//打印机名包含字串…

vite+vue3项目,开发时候正常,打包后router-view不渲染

这是个很奇怪的问题&#xff0c;但是基本上命名或者引入文件的方式导致的。要么文件名与系统的某些标签名一样&#xff0c;要么就是routes写成了routers。还有一种就是导入方式错误的 错误截图&#xff1a; 正确引入截图&#xff1a;

Flink大状态作业调优——DataStream篇

一、Flink 状态&#xff08;State&#xff09;简介 在流式计算中有些操作一次处理一个独立的事件(比如解析一个事件), 有些操作却需要记住多个事件的信息(比如窗口操作)。那些需要记住多个事件信息的操作就是有状态的。流式计算分为无状态计算和有状态计算两种情况。状态可以理…