【Go语言实战】(25) 分布式算法 MapReduce

MapReduce

写在前面

身为大数据专业的学生,其实大学我也多多少少接触过mapreduce,但是当时觉得这玩意太老了,觉得这和php一样会被时代淘汰。只能说当时确实太年轻了,没有好好珍惜那时候的学习资源…

现在回过头来看mapreduce,发现技术这东西和语言不一样,技术万变不离其中,而语言只是实现技术的一种方法而已,用什么语言其实并不重要。

原论文地址:MapReduce: Simplified Data Processing on Large Clusters

总览

这次 lab1 的 mapreduce,其实是在 搜索引擎tangseng 的时候,需要用来构建倒排索引。所以会和课程上所要求的不太一样,这里也没有使用rpc调用,而是为了与项目统一,便改用了grpc进行调用。

mapreduce工作原理

这里需要注意几点

  • 不同的Map任务之间不会进行通信
  • 不同的Reduce任务之间也不会发生任何信息交换
  • 所有的数据交换都是通过MapReduce框架自身去实现的

那么如何对 map tasks 和 reduce tasks 进行合理的协调呢?这里我们就要引入两个角色,master 和 worker,在原论文中,对这两者的并没有非常明确的定义,但我们可以摘录并提炼原论文对这两个角色的描述:

master :

  • The master picks idle workers and assigns each one a map task or a reduce task.

worker :

  • The map worker who is assigned a map task reads the contents of the corresponding input split.

  • The reduce worker iterates over the sorted intermediate data and for each unique intermediate key encountered, it passes the key and the corresponding set of intermediate values to the user’s Reduce function

这里我们先说一下几个状态枚举值:

  • idle :空闲状态
  • in-progress :进行状态
  • completed :完成状态

这三个枚举值代表着每一个 map task 和 reduce task 的状态,标识着这些 task 是未开始,进行中,还是已完成。

那么 master 其实就是选择空闲的 worker 节点,为每一个空闲的 worker 节点分配 map task 或者 reduce task。而 worker 看似分成了 map worker 和 reduce worker,但其实这两个 worker 都是一样,只是看 master 分配的是 map task 还是 reduce task。这样我们的 map 和 reduce 的数据传送就非常清晰了。

MapReduce整体工作流

接下来,我们来详细讲解一下这几个重要的角色

Worker

首先我们先定义一个 MapReduce 的任务,也就是我们 worker 需要用到参数

type MapReduceTask struct {Input         string   `json:"input"`         // 输入的文件TaskState     State    `json:"task_state"`    // 状态NReducer      int      `json:"n_reducer"`     // reducer 数量TaskNumber    int      `json:"task_number"`   // 任务数量Intermediates []string `json:"intermediates"` // map 之后的文件存储地址Output        string   `json:"output"`        // output的输出地址
}

接着再定义 State 枚举值

type MasterTaskStatus intconst (Idle       MasterTaskStatus = iota + 1 // 未开始InProgress                             // 进行中Completed                              // 已完成
)

接下来我们的 Worker 函数就很简单了

func Worker(ctx context.Context, mapf func(string, string) []*types.KeyValue, reducef func(string, []string) *roaring.Bitmap) {// 启动workerfor {task, err := getTask(ctx) // worker从master获取任务if err != nil {log.LogrusObj.Error("Worker-getTask", err)return}// 拿到task之后,根据task的state,map task交给mapper, reduce task交给reducer// 额外加两个state,让 worker 等待 或者 直接退出switch task.TaskState {case int64(types.Map):mapper(ctx, task, mapf)case int64(types.Reduce):reducer(ctx, task, reducef)case int64(types.Wait):time.Sleep(5 * time.Second)case int64(types.Exit):returndefault:return}}
}

至于 mapper 和 reducer 如何实现的,先桥豆麻袋一下,下文在 map 和 reduce 中会给出答案,如何从 master 中拿到 task 呢?这就涉及到 worker 和 master 的通信。本来打算用 RPC 通信的,但为了项目的整体统一,还是用了 gRPC 。

创建一个proto文件

syntax="proto3";
option go_package = "/index_platform;";message MapReduceTask{// @inject_tag:form:"input" uri:"input"string input = 1;// @inject_tag:form:"task_state" uri:"task_state"int64 task_state = 2;// @inject_tag:form:"n_reducer" uri:"n_reducer"int64 n_reducer = 3;// @inject_tag:form:"task_number" uri:"task_number"int64 task_number = 4;// @inject_tag:form:"intermediates" uri:"intermediates"repeated string intermediates = 5;// @inject_tag:form:"output" uri:"output"string output = 6;
}message MasterTaskCompletedResp {// @inject_tag:form:"code" uri:"code"int64 code=1;// @inject_tag:form:"message" uri:"message"string message=2;
}service MapReduceService {rpc MasterAssignTask(MapReduceTask) returns (MapReduceTask);rpc MasterTaskCompleted(MapReduceTask) returns (MasterTaskCompletedResp);
}

定义两个 RPC 函数,MasterAssignTask 用来接受 master 分配的 task MasterTaskCompleted 完成 task 之后,对这个 task 进行标识,意味着该任务结束。

所以我们 worker 接受任务的通信如下

func getTask(ctx context.Context) (resp *mapreduce.MapReduceTask, err error) {// worker从master获取任务taskReq := &mapreduce.MapReduceTask{}resp, err = rpc.MapReduceClient.MasterAssignTask(ctx, taskReq)return
}

当完成任务时,通过gRPC发送给master

func TaskCompleted(ctx context.Context, task *mapreduce.MapReduceTask) (reply *mapreduce.MasterTaskCompletedResp, err error) {// 通过RPC,把task信息发给masterreply, err = rpc.MapReduceClient.MasterTaskCompleted(ctx, task)return
}

那么 master 是如何分配任务的?接下来我们来介绍一下 master 节点。

Master

我们定义这么一个 Master 服务的结构体

type MasterSrv struct {TaskQueue     chan *types.MapReduceTask // 等待执行的taskTaskMeta      map[int]*types.MasterTask // 当前所有task的信息MasterPhase   types.State               // Master的阶段NReduce       int                       // Reduce的数量InputFiles    []string                  // 输入的文件Intermediates [][]string                // Map任务产生的R个中间文件的信息mapreduce.UnimplementedMapReduceServiceServer // gRPC服务实现接口
}

那么当我们 New 一个 Master 服务的时候,顺便创建 map tasks 任务

func NewMaster(files []string, nReduce int) *MasterSrv {m := &MasterSrv{TaskQueue:     make(chan *types.MapReduceTask, int(math.Max(float64(nReduce), float64(len(files))))),TaskMeta:      map[int]*types.MasterTask{},MasterPhase:   types.Map,NReduce:       nReduce,InputFiles:    files,Intermediates: make([][]string, nReduce),}m.createMapTask()return m
}

创建 map task 任务

func (m *MasterSrv) createMapTask() {// 把输入的files都形成一个task元数据塞到queue中for idx, filename := range m.InputFiles { taskMeta := types.MapReduceTask{Input:      filename,TaskState:  types.Map, // map节点NReducer:   m.NReduce,TaskNumber: idx,}m.TaskQueue <- &taskMetam.TaskMeta[idx] = &types.MasterTask{TaskStatus:    types.Idle, // 状态为 idle ,等待worker节点来领取 taskTaskReference: &taskMeta,}}
}

创建 reduce task 任务

func (m *MasterSrv) createReduceTask() {m.TaskMeta = map[int]*types.MasterTask{}for idx, files := range m.Intermediates {taskMeta := types.MapReduceTask{TaskState:     types.Reduce, // reduce 阶段NReducer:      m.NReduce,TaskNumber:    idx,Intermediates: files,}m.TaskQueue <- &taskMetam.TaskMeta[idx] = &types.MasterTask{TaskStatus:    types.Idle, // 找到空闲的 workerTaskReference: &taskMeta,}}
}

MasterAssignTask 等待 worker 来领取 task

func (m *MasterSrv) MasterAssignTask(ctx context.Context, req *mapreduce.MapReduceTask) (reply *mapreduce.MapReduceTask, err error) {mu.Lock()defer mu.Unlock()task := &types.MapReduceTask{Input:         req.Input,TaskState:     types.State(req.TaskState),NReducer:      int(req.NReducer),TaskNumber:    int(req.TaskNumber),Intermediates: req.Intermediates,Output:        req.Output,}if len(m.TaskQueue) > 0 {// 如果queue中还有任务的话就发出去*task = *<-m.TaskQueuem.TaskMeta[task.TaskNumber].TaskStatus = types.InProgress // 修改worker的状态为进行中m.TaskMeta[task.TaskNumber].StartTime = time.Now() // 记录task的启动时间} else if m.MasterPhase == types.Exit {*task = types.MapReduceTask{TaskState: types.Exit,}} else {// 没有task就让worker等待*task = types.MapReduceTask{TaskState: types.Wait}}// 返回该任务的状态,因为发出去就是给task了,这个状态已经改变了,worker可以工作了reply = &mapreduce.MapReduceTask{Input:         task.Input,TaskState:     int64(task.TaskState),NReducer:      int64(task.NReducer),TaskNumber:    int64(task.TaskNumber),Intermediates: task.Intermediates,Output:        task.Output,}return
}

那么如果 task 把任务都做完了,master 应该怎么回应呢?

func (m *MasterSrv) MasterTaskCompleted(ctx context.Context, req *mapreduce.MapReduceTask) (resp *mapreduce.MasterTaskCompletedResp, err error) {resp = new(mapreduce.MasterTaskCompletedResp)resp.Code = e.ERRORresp.Message = "map finish successfully"// 更新task状态if req.TaskState != int64(m.MasterPhase) || m.TaskMeta[int(req.TaskNumber)].TaskStatus == types.Completed {// 因为worker写在同一个文件这次盘上对于重复的结果要丢弃return}m.TaskMeta[int(req.TaskNumber)].TaskStatus = types.Completederr = m.processTaskResult(req) // always success haha and hope u so :)if err != nil {resp.Code = e.ERRORresp.Message = "map finish failed"return}return
}

处理任务的结果,如果是 map 完成后就变成 reduce 阶段,reduce 之后就是 all done. 😃

// processTaskResult 处理任务结果
func (m *MasterSrv) processTaskResult(task *mapreduce.MapReduceTask) (err error) {switch task.TaskState {case int64(types.Map):// 收集intermediate信息for reduceTaskId, filePath := range task.Intermediates {m.Intermediates[reduceTaskId] = append(m.Intermediates[reduceTaskId], filePath)}if m.allTaskDone() {// 获取所有的map task后,进入reduce阶段m.createReduceTask()m.MasterPhase = types.Reduce}case int64(types.Reduce):if m.allTaskDone() {// 获得所有的reduce task后,进去exit阶段m.MasterPhase = types.Exit}}return
}

介绍完master之后,我们具体来看一下map的具体行为。

Map

在 map 中,我们抽离出一个 mapper,具体的map函数可根据实际情况进行修改,然后将map function传入mapper中进行实际的map动作,我们读取每一个文件,然后把输出的结果都放到 intermediates 中,并且根据 task 所设定的 NReducer 也就是 reducer 数 进行hash ,将结果均匀分到每个中间文件中。

func mapper(ctx context.Context, task *mapreduce.MapReduceTask, mapf func(string, string) []*types.KeyValue) {// 从文件名读取contentcontent, err := os.ReadFile(task.Input)if err != nil {log.LogrusObj.Error("mapper", err)return}// 将content交给mapf,缓存结果intermediates := mapf(task.Input, string(content))// 缓存后的结果会写到本地磁盘,并切成R份// 切分方式是根据key做hashbuffer := make([][]*types.KeyValue, task.NReducer)for _, intermediate := range intermediates {slot := ihash(intermediate.Key) % task.NReducerbuffer[slot] = append(buffer[slot], intermediate)}mapOutput := make([]string, 0)for i := 0; i < int(task.NReducer); i++ {mapOutput = append(mapOutput, writeToLocalFile(int(task.TaskNumber), i, &buffer[i]))}// R个文件的位置发送给mastertask.Intermediates = mapOutput_, err = TaskCompleted(ctx, task) // 完成后,给master发送消息,map阶段结束if err != nil {fmt.Println("mapper-TaskCompleted", err)}return
}

具体的 Map方法,由于是用于搜索引擎,所以这里是建立倒排索引

func Map(filename string, contents string) (res []*types.KeyValue) {res = make([]*types.KeyValue, 0)lines := strings.Split(contents, "\r\n") // 分行var inputData *model.InputDatafor _, line := range lines[1:] {docStruct, _ := doc2Struct(line) // 字符串转 doc structtokens, err := analyzer.GseCutForBuildIndex(docStruct.DocId, docStruct.Body)if err != nil {return}for _, v := range tokens {res = append(res, &types.KeyValue{Key: v.Token, Value: cast.ToString(v.DocId)}) // token:docId 倒排索引}}return
}

至此map就已经完成了,是不是很简单,其实具体的map和reduce并不难,难的是如何平衡调度,接下来我们来看看reduce是如何怎么的。

Reduce

和map一样,我们抽离出一个reducer,然后把具体的 reduce 传进去,当然还有一个shuffle过程,这里进行排序会减少后面的reduce计算。可以少计算几次。

func reducer(ctx context.Context, task *mapreduce.MapReduceTask, reducef func(string, []string) *roaring.Bitmap) {// 先从filepath读取intermediate的KeyValueintermediate := *readFromLocalFile(task.Intermediates)// 根据kv排序 shuffle 过程sort.Sort(types.ByKey(intermediate))dir, _ := os.Getwd()outName := fmt.Sprintf("%s/mr-tmp-%d.%s",dir, task.TaskNumber, consts.InvertedBucket)invertedDB := storage.NewInvertedDB(outName)output := roaring.NewBitmap()var outByte []bytei := 0for i < len(intermediate) {// 将相同的key放在一起分组合并j := i + 1for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {j++}var values []stringfor k := i; k < j; k++ {values = append(values, intermediate[k].Value)}// 交给reducef,拿到结果output = reducef(intermediate[i].Key, values)// 落倒排索引库outByte, _ = output.MarshalBinary()_ = invertedDB.StoragePostings(intermediate[i].Key, outByte)i = j}task.Output = outName_, err := TaskCompleted(ctx, task) // 完成后,给master发送消息,reduce阶段结束if err != nil {fmt.Println("reducer-TaskCompleted", err)return}
}

具体的Reduce,其实就是把相同的key的value聚合在一起。比如

after map:

{"apple":1}
{"apple:"2}
{"poizon":3}

after reduce:

{"apple":{1,2}}
{"poizon":{3}}

具体实现如下所示:

func Reduce(key string, values []string) *roaring.Bitmap {docIds := roaring.New()for _, v := range values {docIds.AddInt(cast.ToInt(v))}return docIds
}

最终 output 输出

output

以上就是我对6.824这个课程的lab1的所有理解了,并且运用到了 tangseng 搜索引擎中。

具体代码实现地址在 https://github.com/CocaineCong/tangseng/app/mapreduce 中。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/95986.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

想做好接口测试,先把这些概念搞清楚了

接口一般来说有两种&#xff0c;一种是程序内部的接口&#xff0c;一种是系统对外的接口。 系统对外的接口 比如你要从别的网站或服务器上获取资源或信息&#xff0c;别人肯定不会把数据库共享给你&#xff0c;他只能给你提供一个他们写好的方法来获取数据&#xff0c;你引用…

【BBC新闻文章分类】使用 TF 2.0和 LSTM 的文本分类

一、说明 NLP上的许多创新是如何将上下文添加到词向量中。常见的方法之一是使用递归神经网络

数据结构之带头双向循环链表

目录 链表的分类 带头双向循环链表的实现 带头双向循环链表的结构 带头双向循环链表的结构示意图 空链表结构示意图 单结点链表结构示意图 多结点链表结构示意图 链表创建结点 双向链表初始化 销毁双向链表 打印双向链表 双向链表尾插 尾插函数测试 双向链表头插 …

如何选择合适的自动化测试工具?

自动化测试是高质量软件交付领域中最重要的实践之一。在今天的敏捷开发方法中&#xff0c;几乎任一软件开发过程都需要在开发阶段的某个时候进行自动化测试&#xff0c;以加速回归测试的工作。自动化测试工具可以帮助测试人员以及整个团队专注于自动化工具无法处理的各自任务&a…

【数据结构---排序】很详细的哦

本篇文章介绍数据结构中的几种排序哦~ 文章目录 前言一、排序是什么&#xff1f;二、排序的分类 1.直接插入排序2.希尔排序3.选择排序4.冒泡排序5.快速排序6.归并排序总结 前言 排序在我们的生活当中无处不在&#xff0c;当然&#xff0c;它在计算机程序当中也是一种很重要的操…

关掉在vscode使用copilot时的提示音

1. 按照图示的操作File --> Preferences --> Settings 2. 搜索框输入关键字Sound&#xff0c;因为是要关掉声音&#xff0c;所以找有关声音的设置 3. 找到如下图所示的选项 Audio Cues:Line Has Inline Suggetion,将其设置为Off 这样&#xff0c;就可以关掉suggest code时…

Elasticsearch:什么时候应该考虑在 Elasticsearch 中添加协调节点?

仅协调节点&#xff08;coordinating only nodes&#xff09;充当智能负载均衡器。 仅协调节点的这种特殊角色通过减轻数据和主节点的协调责任&#xff0c;为广泛的集群提供了优势。 加入集群后&#xff0c;这些节点与任何其他节点类似&#xff0c;都会获取完整的集群状态&…

毕业设计选题之Android基于移动端的线上订餐app外卖点餐安卓系统源码 调试 开题 lw

&#x1f495;&#x1f495;作者&#xff1a;计算机源码社 &#x1f495;&#x1f495;个人简介&#xff1a;本人七年开发经验&#xff0c;擅长Java、Python、PHP、.NET、微信小程序、爬虫、大数据等&#xff0c;大家有这一块的问题可以一起交流&#xff01; &#x1f495;&…

C# - Opencv应用(1) 之VS下环境配置详解

C# - Opencv应用&#xff08;1&#xff09; 之VS下环境配置详解 有时候&#xff0c;单纯c#做前端时会联合C实现的dll来落地某些功能由于有时候会用C - Opencv实现算法后封装成dll&#xff0c;但是有时候会感觉麻烦&#xff0c;不如直接通过C#直接调用Opencv在此慢慢总结下C# -…

SpringBoot vue云办公系统

SpringBoot vue云办公系统 系统功能 云办公系统 登录 员工资料管理: 搜索员工 添加编辑删除员工 导入导出excel 薪资管理: 工资账套管理 添加编辑删除工资账套 员工账套设置 系统管理: 基础信息设置 部门管理 职位管理 职称管理 权限组管理 操作员管理 开发环境和技术 开发语…

选择适合户外篷房企业的企业云盘解决方案

“户外篷房企业用什么企业云盘好&#xff1f;Zoho WorkDrive企业网盘可以帮助户外篷房企业实现文档统一管理、提高工作效率、加强团队协作&#xff0c;并且支持各种文件类型的预览和编辑。” S公司是一家注重管理规范的大型户外篷房企业&#xff0c;已经有10余年的经验。作为设…

string和const char*参数类型选择的合理性对比

在编程中&#xff0c;我们经常需要处理字符串类型的参数。在C中&#xff0c;有两种常见的表示字符串的参数类型&#xff0c;即string和const char*。本文将对比这两种参数类型的特点&#xff0c;分析其在不同情况下的合理性&#xff0c;以便程序员能够根据实际需求做出正确的选…

Docker安装ActiveMQ

ActiveMQ简介 官网地址&#xff1a;https://activemq.apache.org/ 简介&#xff1a; ActiveMQ 是Apache出品&#xff0c;最流行的&#xff0c;能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,…

次方计数的拆贡献法(考虑组合意义)+限定类问题善用值域与位置进行ds:1006T3

对于多次方的计数问题可以考虑拆贡献。 题目问 ∣ S ∣ 3 |S|^3 ∣S∣3&#xff0c; ∣ S ∣ |S| ∣S∣ 表示选的点数。相当于在 ∣ S ∣ |S| ∣S∣ 中选了3次&#xff0c;也就是选了3个可相同的点。 先考虑3个不相同点的贡献&#xff0c;对应任意3个点&#xff0c;必然会对…

【小工具-生成合并文件】使用python实现2个excel文件根据主键合并生成csv文件

1 小工具说明 1.1 功能说明 一般来说&#xff0c;我们会先有一个老的文件&#xff0c;这个文件内容是定制好相关列的表格&#xff0c;作为每天的报告。 当下一天来的时候&#xff0c;需要根据新的报表文件和昨天的报表文件做一个合并&#xff0c;合并的时候就会出现有些事新增…

【BI看板】Superset2.0+图表二次开发初探

Superset图表功能也很丰富了&#xff0c;但一些个性化的定制需求就需要二次开发了。网上二开的superset版本大多是0.xxx版本的或1.5xxx版本&#xff0c;本次用的是2.xxx。 源码相关说明 源码目录 superset-2.0\superset-frontend\plugins\plugin-chart-echarts 插件相关资料 官…

【重拾C语言】六、批量数据组织(二)线性表——分类与检索(主元排序、冒泡排序、插入排序、顺序检索、对半检索)

目录 前言 六、批量数据组织——数组 6.4 线性表——分类与检索 6.4.1 主元排序 6.4.2 冒泡排序 6.4.3 插入排序 6.4.4 顺序检索&#xff08;线性搜索&#xff09; 6.4.5 对半检索&#xff08;二分查找&#xff09; 算法比较 前言 线性表是一种常见的数据结构&#xf…

在linux下预览markdown的方法,转换成html和pdf

背景 markdown是一种便于编写和版本控制的格式&#xff0c;但却不便于预览——特别是包含表格等复杂内容时&#xff0c;单纯的语法高亮是远远不够的——这样就不能边预览边调整内容&#xff0c;需要找到一种预览方法。 思路 linux下有个工具&#xff0c;叫pandoc&#xff0c…

Go Gin Gorm Casbin权限管理实现 - 2. 使用Gorm存储Casbin权限配置以及`增删改查`

文章目录 0. 背景1. 准备工作2. 权限配置以及增删改查2.1 策略和组使用规范2.2 用户以及组关系的增删改查2.2.1 获取所有用户以及关联的角色2.2.2 角色组中添加用户2.2.3 角色组中删除用户 2.3 角色组权限的增删改查2.3.1 获取所有角色组权限2.3.2 创建角色组权限2.3.3 修改角色…

Spring MVC工作原理

&#x1f337;&#x1f341; 博主猫头虎&#xff08;&#x1f405;&#x1f43e;&#xff09;带您 Go to New World✨&#x1f341; &#x1f984; 博客首页——&#x1f405;&#x1f43e;猫头虎的博客&#x1f390; &#x1f433; 《面试题大全专栏》 &#x1f995; 文章图文…