6.584-Lab1:MapReduce

前置知识/概念

Raft

是一个基于“Leader”的协议,能够保证分布式网路的一致性。

RPC(Remote Producer Call)

参考链接1
参考链接2
Go中RPC的简单实现

Golang中regexp正则表达式的用法

https://gukaifeng.cn/posts/golang-zheng-ze-biao-da-shi-regexp-de-ji-ben-yong-fa/index.html

Golang中自定义类型Sort

在提供的排序方法sort.Ints、sort.Floats、sort.Strings的底层都分别实现了三个函数Len()Less()Swap(),所以在我们实现自定义类型排序的时候要实现上述三个函数。

实现

参考链接

概览

在这里插入图片描述
如图,Lab1要实现两个部分分别是Map(映射)和Reduce(规约),Master负责调度,分配任务在代码中是Coordinator,而Worker则负责具体的map task和reduce task。
Map Task具体是统计文件中的单词生成对应的键值对[key, value],通过一个哈希函数Ihash将单词映射为key,存在中间文件,例如File1中有单词a、b对应键值对为[a, 1],[b, 1],分别存放在mr-out-1-ihash(a/b)%NReduce的中间文件中,其中NReduce是Reduce Task的个数,代码中为10。
Reduce Task具体是将中间文件mr-out-*-taskid中的键值对放入最终的文件mr-out-taskid中。

代码

rpc.go

Coordinator和Worker通过RPC进行通信,在文件rpc.go中需要设计相应的数据结构让其进行通信。分析可能的具体行为:Worker需要向Coordinator申请任务、Coordinator需要向Worker分配任务(map task & reduce task)、Worker向Coordinator回复任务的执行情况(成功、失败)、没有闲置任务分配时Coordinator告诉Worker等待(Wait)、所有任务完成告诉Worker结束(Shutdown)。
根据上述分析设计如下数据类型:

// 用不同数字表示不同信息的类别
type MsgType intconst (AskForTask    MsgType = iota // 表示worker向coordinator申请任务MapSucceed                   // 表示worker向coordinator传递Map Task完成MapFailed                    // 表示worker向coordinator传递Map Task失败ReduceSucceed                // 表示worker向coordinator传递Reduce Task完成ReduceFailed                 // 表示worker向coordinator传递Reduce Task失败MapAlloc                     // 表示coordinator向worker分配Map TaskReduceAlloc                  // 表示coordinator向worker分配Reduce TaskWait                         // 表示coordinator让worker休眠Shutdown                     // 表示coordinator让worker终止
)

从Worker视角出发,设计发送给Coordinator的信息结构体MsgSend,需要包含任务id以及任务执行情况:

type MsgSend struct {MsgType MsgTypeTaskId  int
}

从Coordinator视角出发,设计发送给Worker的信息的结构体MsgSend,需要包含任务的id即要处理的第几个文件用于中间文件的命名、任务类型、要处理的filename、NRduce:

type MsgReply struct {MsgType  MsgTypeNReduce  intTaskId   int    // 当worker发送MsgSend申请任务、coordinator回复任务的IDTaskName string //
}

worker.go

上面分析可知Worker的行为有:申请任务、执行任务、汇报任务执行情况。

// worker的任务就是不断请求任务、执行任务、报告执行状态
// main/mrworker.go calls this function.
func Worker(mapf func(string, string) []KeyValue,reducef func(string, []string) string) {// Your worker implementation here.for {// 不断请求replyMsg := CallForTask()switch replyMsg.MsgType {case MapAlloc: // coordinator分配了map taskerr := HandleMapTask(replyMsg, mapf)if err != nil { // Map Task任务完成_ = CallForReportStatus(MapFailed, replyMsg.TaskId)} else { // Map Task 任务失败_ = CallForReportStatus(MapSucceed, replyMsg.TaskId)}case ReduceAlloc:err := HandleReduceTask(replyMsg, reducef)if err != nil { // Map Task任务完成_ = CallForReportStatus(ReduceFailed, replyMsg.TaskId)} else { // Map Task 任务失败_ = CallForReportStatus(ReduceSucceed, replyMsg.TaskId)}case Wait:time.Sleep(time.Second * 10)case Shutdown:os.Exit(0)}time.Sleep(time.Second)}
}

其中申请任务、回复任务执行情况的函数均利用了rpc中的Call来实现向Coordinator进行通信。
在这里插入图片描述
在这里插入图片描述

其中执行Map Task 的执行函数HandleMapTask

func HandleMapTask(reply *MessageReply, mapf func(string, string) []KeyValue) error {file, err := os.Open(reply.TaskName)if err != nil {return err}defer file.Close()content, err := io.ReadAll(file)if err != nil {return err}kva := mapf(reply.TaskName, string(content))sort.Sort(ByKey(kva))tempFiles := make([]*os.File, reply.NReduce)encoders := make([]*json.Encoder, reply.NReduce)for _, kv := range kva {redId := ihash(kv.Key) % reply.NReduceif encoders[redId] == nil {tempFile, err := ioutil.TempFile("", fmt.Sprintf("mr-map-tmp-%d", redId))if err != nil {return err}defer tempFile.Close()tempFiles[redId] = tempFileencoders[redId] = json.NewEncoder(tempFile)}err := encoders[redId].Encode(&kv)if err != nil {return err}}for i, file := range tempFiles {if file != nil {fileName := file.Name()file.Close()newName := fmt.Sprintf("mr-out-%d-%d", reply.TaskID, i)if err := os.Rename(fileName, newName); err != nil {return err}}}return nil
}

在这里插入图片描述
由函数TemFile的描述可知,该函数在创建时会按照pattern+随机字符串作为临时文件名字,即是不同程序同时调用该函数会创建不同的临时文件所以不存在资源竞争是并发安全的。
先将Map Task映射的键值对存入临时文件中,等全部放入临时文件后再将临时文件重命名为需要的中间文件的名字,因为重命名操作时原子性的。
如果不采用临时文件直接存入目标中间文件的话,会出现存入中间文件之前中间文件中含有其他数据即脏数据,可能是上个Worker执行到一半因为某些原因而退出之前存入的,所以存入之前需要将中间文件清空一下,这样就比较浪费时间。

执行Reduce Task 的执行函数HandleReduceTask也是同样的问题,虽然从中间件读取的时候没有写入操作但写入最终文件时也同样需要像上面一样保证原子性,这里贴出没有使用临时文件的代码:

// 处理分配的 Reduce 任务,处理每个MapTask产生的mr-out-*-key_id
func HandleReduceTask(reply *MsgReply, reducef func(string, []string) string) error {key_id := reply.TaskId// todo:这里的key_id要 % NReduce吗 --答:传入的TaskId一定是小于NReduce的,规约的数量就是NRduce也即Reduce Task的数量files, err := ReadSpecificFile(key_id, "./")if err != nil {return err}// 从所有匹配的文件中读出Json格式的键值对[k1-value]/[k2-value],key哈希的值可以不一样但这里ihash(k1) % NReduce  = ihash(k2) % NReducek_vs := map[string][]string{}for _, file := range files {dec := json.NewDecoder(file)for { // 循环读JSON数据流var kv KeyValue // 将读出的JSON数据解码放入kvif err := dec.Decode(&kv); err != nil {break}k_vs[kv.Key] = append(k_vs[kv.Key], kv.Value)}file.Close()}keys := []string{} // 将map中的keys拿出排序,按照keys的字典序依次写入文件for k, _ := range k_vs {keys = append(keys, k)}sort.Strings(keys)oname := "mr-out-" + strconv.Itoa(reply.TaskId) // 将Reduce后的结果放入 mr-out-TaskIdofile, err := os.Create(oname)if err != nil {return err}defer ofile.Close()for _, key := range keys {output := reducef(key, k_vs[key])_, err := fmt.Fprintf(ofile, "%s %s\n", key, output) // 格式化写入文件if err != nil {return err}}CleanFileByReduceId(reply.TaskId, "./")return nil
}

利用临时文件保证原子性参考上面HandleMapTask函数的实现。其中需要注意的是最终写入文件的时候Key要按照字典升序排列,而map存储的Key不是有序的,所以把Key拿出来排序再放入文件。

coordinator.go

coordinator中要实现worker申请任务的函数AskForTask以及对worker报告任务执行情况后对相应任务状态的更新函数NoticeResult
任务的状态有闲置idle、成功finished、失败failed、超时、正在运行running。每次worker申请任务时都轮询一下所有任务,委派闲置、失败、超时的任务,而超时状态的判断则通过为每个任务打上运行开始的时间戳,若轮询到running的任务时判断当前时间与开始的时间戳比较若大于10s则可以判定为超时,可以再次委派给worker。

TaskInfo结构
type TaskStatue int// 定义类别
const (idle     TaskStatue = iota // 闲置finished                   // 完成running                    // 运行failed                     // 失败
)type MapTaskInfo struct {statue    TaskStatue // 任务状态TaskId    int        // 任务编号startTime int64      // 分配时间,当前时间-分配时间>10s表示超时
}
type ReduceTaskInfo struct {statue TaskStatue//TaskId reduce task的编号用数组下标表示startTime int64
}
type Coordinator struct {NReduce     int // reduce tasks可用的数量MapTasks    map[string]*MapTaskInfomu          sync.Mutex // 互斥锁ReduceTasks []*ReduceTaskInfo
}

初始化函数:

// 初始化函数
func (c *Coordinator) Init(files []string) {for idx, filename := range files {c.MapTasks[filename] = &MapTaskInfo{statue: idle, // 初始为闲置TaskId: idx,}}for idx := 0; idx < c.NReduce; idx++ {c.ReduceTasks[idx] = &ReduceTaskInfo{statue: idle,}}
}// create a Coordinator.
// main/mrcoordinator.go calls this function.
// nReduce is the number of reduce tasks to use.
func MakeCoordinator(files []string, nReduce int) *Coordinator {c := Coordinator{NReduce:     nReduce,MapTasks:    make(map[string]*MapTaskInfo),ReduceTasks: make([]*ReduceTaskInfo, nReduce),}c.Init(files)c.server()return &c
}
rpc相应函数AskForTask的实现

AskForTask:实现相对复杂,大致流程为:
1.每个任务初始化为闲置。
2.每当有worker申请任务的话就轮询所有任务。
3.若存在有idle、failed、超时的任务就可以分配。
4.若没有可分配的任务,判断完成任务的个数是否等于所有任务个数:
4.1 若相等,则表明所有任务完成,告知workershutdown
4.2 若不相等,则表明有任务还在进行且没有可分配的任务,告知workerWait

期间应保证共享资源的互斥,每个worker请求任务的同时要上锁。

func (c *Coordinator) AskForTask(req *MsgSend, reply *MsgReply) error {if req.MsgType != AskForTask { // 传入的不是“申请任务”类型的信息return NoMathMsgType}// 加锁,保证每个worker申请任务时互斥c.mu.Lock()defer c.mu.Unlock()// 选择一个失败or闲置or超时的任务分配给workerMapSuccessNum := 0 // Map task 完成个数for filename, maptaskinfo := range c.MapTasks {alloc := falseif maptaskinfo.statue == idle || maptaskinfo.statue == failed { // 该任务闲置或失败则可以分配alloc = true} else if maptaskinfo.statue == running { // 判断该任务是否超时,若超时则再分配if time.Now().Unix()-maptaskinfo.startTime > 10 {maptaskinfo.startTime = time.Now().Unix() // 再分配更新开始时间alloc = true}} else { // 该任务是已完成任务MapSuccessNum++}// 当前任务可以分配if alloc {reply.TaskId = maptaskinfo.TaskIdreply.TaskName = filenamereply.NReduce = c.NReducereply.MsgType = MapAllocmaptaskinfo.statue = runningmaptaskinfo.startTime = time.Now().Unix()return nil}}// 没有任务可以分配但所有任务没有完成if MapSuccessNum < len(c.MapTasks) {reply.MsgType = Waitreturn nil}// 运行到这里表明所有的Map任务都已经完成ReduceSuccessNum := 0for idex, reducetaskinfo := range c.ReduceTasks {alloc := falseif reducetaskinfo.statue == idle || reducetaskinfo.statue == failed {alloc = true} else if reducetaskinfo.statue == running {if time.Now().Unix()-reducetaskinfo.startTime > 10 {reducetaskinfo.startTime = time.Now().Unix()alloc = true}} else {ReduceSuccessNum++}if alloc {reply.TaskId = idexreply.NReduce = c.NReducereply.MsgType = ReduceAllocreducetaskinfo.statue = runningreducetaskinfo.startTime = time.Now().Unix()return nil}}if ReduceSuccessNum < len(c.ReduceTasks) {reply.MsgType = Waitreturn nil}// 运行到这里表明所有的任务都已完成reply.MsgType = Shutdownreturn nil
}
rpc相应函数NoticeResult的实现

只需要将worker传递过来的任务完成状态更新到coordinator的TaskInfo即可。

func (c *Coordinator) NoticeResult(req *MsgSend, reply *MsgReply) error {c.mu.Lock()defer c.mu.Unlock()if req.MsgType == MapSucceed {for _, taskinfo := range c.MapTasks {if taskinfo.TaskId == req.TaskId {taskinfo.statue = finished}}} else if req.MsgType == ReduceSucceed {c.ReduceTasks[req.TaskId].statue = finished} else if req.MsgType == MapFailed {for _, taskinfo := range c.MapTasks {if taskinfo.TaskId == req.TaskId {taskinfo.statue = failed}}} else if req.MsgType == ReduceFailed {c.ReduceTasks[req.TaskId].statue = failed}return nil
}
所有任务是否完成-Done函数

只需要轮询一遍任务数组,判断是否所有任务均完成。

// if the entire job has finished.
func (c *Coordinator) Done() bool {// Your code here.// 遍历所有任务,全部完成则返回true,否则返回falsefor _, taskinfo := range c.MapTasks {if taskinfo.statue != finished {return false}}for _, taskinfo := range c.ReduceTasks {if taskinfo.statue != finished {return false}}return true
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/60981.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

抽象java入门1.5.3.1——类的进阶

前言&#xff1a;在研究神技代码Hello word的时候&#xff0c;发现了一个重大公式bug&#xff0c;在代码溯源中&#xff0c;我发现了一个奇怪的东西&#xff0c;就是OUT不是类中类&#xff08;不是常规类的写法&#xff09; 内容总结&#xff1a; 代码运行的顺序复习 正片开始…

人力资源招聘系统的革新之路:从传统到智能的转变

在全球化与数字化交织的今天&#xff0c;企业间的竞争日益激烈&#xff0c;而人才作为企业发展的核心驱动力&#xff0c;其重要性不言而喻。传统的人力资源招聘方式&#xff0c;如依赖纸质简历、人工筛选、面对面面试等&#xff0c;不仅效率低下&#xff0c;且难以精准匹配企业…

AXI DMA IP BUG踩坑记录

1. 问题描述 在突发的过程中总是一旦使用XAxiDma_SimpleTransfer函数就会出现AXI STREAM信号的READY信号先拉高4个数据(32位)的时钟后会迅速拉低,换句话说就是一旦PS端发起了XAxiDma_SimpleTransfer,AXI总线的READY信号就会拉高四个节拍,这样就会导致传输的数据出现问题。…

Vue2教程001:初识Vue

文章目录 1、初识Vue1.1、Vue2前言1.2、创建Vue实例1.3、插值表达式1.4 Vue响应式特性 1、初识Vue 1.1、Vue2前言 Vue是什么&#xff1f; 概念&#xff1a;Vue是一个用于构建用户界面的渐进式框架。 Vue的两种使用方式&#xff1a; Vue核心包开发 场景&#xff1a;局部模块…

vscode vite+vue3项目启动调试

1、经常我们在普通的项目中&#xff0c;如果算法并不复杂&#xff0c;那么基本上console.log就可以搞定&#xff0c;当然也可以直接alert&#xff0c;打包的时候如果不去掉&#xff0c;还会在发版中上接弹出&#xff0c;给你个惊喜。 2、碰到了有些算法过程比较复杂的情况下&a…

Jdbc学习笔记(三)--PreparedStatement对象、sql攻击(安全问题)

目录 &#xff08;一&#xff09;使用PreparedStatement对象的原因&#xff1a; 使用Statement对象编写sql语句会遇到的问题 ​编辑 &#xff08;二&#xff09;sql攻击 1.什么是sql攻击 2.演示sql攻击 &#xff08;三&#xff09;防止SQL攻击 1.PreparedStatement是什么 …

后端分层解耦

引入 在上篇所举的例子中&#xff0c;我们将所有的代码均放在HelloControl方法之中&#xff0c;这样会导致代码的复用性、可读性较差&#xff0c;难以维护。因此我们需 三层架构 在之前的代码中&#xff0c;代码大体可以分为三部分&#xff1a;数据访问、数据逻辑处理、响应数…

97.【C语言】数据结构之栈

目录 栈 1.基本概念 2.提炼要点 3.概念选择题 4.栈的实现 栈初始化函数 入栈函数 出栈函数和栈顶函数 栈顶函数 栈销毁函数 栈 基本概念参见王爽老师的《汇编语言 第四版》第56和57页 节选一部分 1.基本概念 注意:这里提到的数据结构中的栈有别于操作系统的栈,后者是…

初识算法 · 模拟(1)

目录 前言&#xff1a; 替换所有的问号 题目解析 算法原理 算法编写 提莫攻击 题目解析 算法原理 算法编写 外观数列 题目解析 算法原理 算法编写 前言&#xff1a; ​本文的主题是模拟&#xff0c;通过三道题目讲解&#xff0c;一道是提莫攻击&#xff0c;一道是…

【数值分析】高斯-赛德尔方法、规范化幂法、原点移位法

【数值分析】高斯-赛德尔方法、规范化幂法、原点移位法 题目 要求 代码实现过程不能调用任何库函数自带的“线性 方程组求解、特征值求解库函数” 利用高斯-赛德尔方法求解上述线性方程组 使用Python编程求解矩阵A与列向量b import numpy as np import sympy as spdef crea…

【CUDA】了解GPU架构

目录 一、初步认识 二、Fermi架构 三、Kepler 架构 3.1 动态并行 3.2 Hyper-Q 一、初步认识 SM&#xff08;Streaming Multiprocessors&#xff09;是GPU架构中非常重要的部分&#xff0c;GPU硬件的并行性就是由SM决定的。以Fermi架构为例&#xff0c;其包含以下主要组成…

64位程序调用32位dll解决方案

最近在做64位代码移植&#xff0c;发现很多老代码使用到了第三方的32位dll;而且这些第三方32位dll库已经年代久远&#xff0c;原开发商已不再了&#xff1b;所以急切的需要在64位主程序 中使用老的32位dll;查询很多解决方案 发现目前只有使用com 进程外组件的方法可以解决此问题…

【HOT100第五天】搜索二维矩阵 II,相交链表,反转链表,回文链表

240.搜索二维矩阵 II 编写一个高效的算法来搜索 m x n 矩阵 matrix 中的一个目标值 target 。该矩阵具有以下特性&#xff1a; 每行的元素从左到右升序排列。每列的元素从上到下升序排列。 先动手写写最简单方法&#xff0c;二重循环。 class Solution { public:bool searchMa…

模板元函数应用:输出字符串。

看下面三个字符串&#xff0c;s1,s2,s3 &#xff1a; string s1 "逆天邪神";wstring s2 _t("焚星妖莲");_string s3 "焚绝尘"; 在控制台输出字符串&#xff0c;可能的一个方案是&#xff1a; void print_test(const wstring& s) {std::…

pytest | 框架的简单使用

这里写目录标题 单个文件测试方法执行测试套件的子集测试名称的子字符串根据应用的标记进行选择 其他常见的测试命令 pytest框架的使用示例 pytest将运行当前目录及其子目录中test_*.py或 *_test.py 形式的所有 文件 文件内的函数名称可以test* 或者test_* 开头 单个文件测试…

【C++】类和对象-深度剖析默认成员函数-上

> &#x1f343; 本系列为初阶C的内容&#xff0c;如果感兴趣&#xff0c;欢迎订阅&#x1f6a9; > &#x1f38a;个人主页:[小编的个人主页])小编的个人主页 > &#x1f380; &#x1f389;欢迎大家点赞&#x1f44d;收藏⭐文章 > ✌️ &#x1f91e; &#x1…

Web性能优化:从基础到高级

&#x1f493; 博客主页&#xff1a;瑕疵的CSDN主页 &#x1f4dd; Gitee主页&#xff1a;瑕疵的gitee主页 ⏩ 文章专栏&#xff1a;《热点资讯》 Web性能优化&#xff1a;从基础到高级 Web性能优化&#xff1a;从基础到高级 Web性能优化&#xff1a;从基础到高级 引言 基础优…

当 docker-compose.yaml 文件部署时,Dify 线上版本升级过程

如果线上 Dify 是通过 docker-compose.yaml 文件部署的&#xff0c;那么当 Dify 版本升级时该如何操作呢&#xff1f;官方已经给出了 Docker compose 和 Source Code 两种方式。相对而言&#xff0c;前者更简单些&#xff0c;至少不需要安装依赖包和迁移数据库文件。为了更加具…

如何让手机ip变成动态

在数字化浪潮中&#xff0c;手机已成为我们日常生活中不可或缺的一部分。无论是浏览网页、使用社交媒体还是进行在线购物&#xff0c;手机都扮演着举足轻重的角色。然而&#xff0c;在享受网络带来的便利时&#xff0c;我们也需要关注网络安全和隐私保护。静态IP地址可能让手机…

vue3 如何调用第三方npm包内部的 pinia 状态管理库方法

抛砖引玉: 如果在开发vue3项目是, 引用了npm第三方包 ,而且这个包内使用了Pinia 状态管理库,那我们如何去调用 npm内部的 Pinia 状态管理库呢? 实际遇到的问题: 今天在制作npm包时遇到的问题,之前Vue2版本的时候状态管理库用的Vuex ,当时调用npm包内的状态管理库很简单,直接引…