【MIT6.824】lab2C-persistence, lab2D-log compaction 实现笔记

引言

lab2C的实验要求如下

Complete the functions persist() and readPersist() in raft.go by adding code to save and restore persistent state. You will need to encode (or “serialize”) the state as an array of bytes in order to pass it to the Persister. Use the labgob encoder; see the comments in persist() and readPersist(). labgob is like Go’s gob encoder but prints error messages if you try to encode structures with lower-case field names. For now, pass nil as the second argument to persister.Save(). Insert calls to persist() at the points where your implementation changes persistent state. Once you’ve done this, and if the rest of your implementation is correct, you should pass all of the 2C tests.

lab2D的实验要求如下

Implement Snapshot() and the InstallSnapshot RPC, as well as the changes to Raft to support these (e.g, operation with a trimmed log). Your solution is complete when it passes the 2D tests (and all the previous Lab 2 tests).

总体而言, lab2C需要我们实现关键数据的持久化,lab2D需要我们通过快照实现日志的压缩。代码可以在https://github.com/slipegg/MIT6.824中得到。所有代码均通过了1千次的测试。

lab2C 实现

在实验时测试2C时,测试代码将会尝试将某些节点从网络中断开,然后一段时间后再依据这些断开的节点的持久化的信息重新生成一个新的节点并加入到网络中,测试代码将会检测加入这个节点后是否与预期相同。

在初始化节点的时候,会传入一个Persister对象,这个对象充当一个硬盘的角色,用于持久化数据,后续在测试重新生成节点时,就需要传入旧节点的Persister对象,以便新节点能够从硬盘中读取旧节点的数据进行复原。

参考raft论文,我们需要持久化的数据有:

  • currentTerm
  • votedFor
  • log entries

在raft.go中,我们需要实现persist和readPersist函数,用于持久化和读取数据。

// persist saves Raft's persistent state to stable storage,
func (rf *Raft) persist() {rf.persister.Save(rf.encodeState(), rf.persister.snapshot)
}func (rf *Raft) encodeState() []byte {w := new(bytes.Buffer)e := labgob.NewEncoder(w)e.Encode(rf.currentTerm)e.Encode(rf.votedFor)e.Encode(rf.logs)return w.Bytes()
}
// readPersist restores previously persisted state.
func (rf *Raft) readPersist(data []byte) {if data == nil || len(data) < 1 { // bootstrap without any state?return}var currentTerm intvar votedFor intvar logs []LogEntryr := bytes.NewBuffer(data)d := labgob.NewDecoder(r)if d.Decode(&currentTerm) != nil ||d.Decode(&votedFor) != nil ||d.Decode(&logs) != nil {Debug(dError, "S%v failed to read persist", rf.me)} else {Debug(dInfo, "S%v read persist successfully", rf.me)rf.currentTerm = currentTermrf.votedFor = votedForrf.logs = logsrf.lastApplied = rf.getFirstIndex()rf.commitIndex = rf.getFirstIndex()}
}

然后我们需要在每次修改了持久化数据的地方调用persist函数,然后在初始化节点时调用readPersist函数来读取持久化数据,整体难度不大。

lab2D 实现

在实验时测试2D时,测试代码在接收到apply的命令id为9结尾时,就会调用节点的Snapshot函数进行快照,将日志压缩。代码需要做到在压缩日志后,仍然能够准确地运行。

首先需要完成快照生成的函数,如下所示,每次会传入需要快照到的日志index,以及当这个节点为止的状态机的快照数据,系统保证传入的日志index一定是已经apply过的。由于已经将状态机的内容放入到了snapshot中,所以其实包括index在内的前面的所有日志都可以删除了,但是由于在同步日志信息时,需要上一个日志的term信息,所以我们会单独保留id为index的日志的id和term信息,放在logs的第一位。

func (rf *Raft) Snapshot(index int, snapshot []byte) {rf.mu.Lock()defer rf.mu.Unlock()if index <= rf.getFirstIndex() {Debug(dSnap, "S%v ignores the snapshot request with end index %v, because the index is not bigger than the first index %v", rf.me, index, rf.getFirstIndex())return}rf.logs = append([]LogEntry{{index, rf.logs[index-rf.getFirstIndex()].Term, nil}}, rf.logs[index-rf.getFirstIndex()+1:]...)rf.persister.Save(rf.encodeState(), snapshot)Debug(dSnap, "S%v applies the snapshot with end index %v, now the len(logs)=%v", rf.me, index, len(rf.logs))
}

由于快照的引入,现在logs中的第一个日志可能不再是0了,所以之前代码中所有从logs中依据日志index获取日志的代码都要修改为:rf.logs[index-rf.getFirstIndex()]

同时快照的引入还会导致在leader与follower进行日志同步时,需要的同步的日志可能已经没有了,所以这时候需要直接将整个日志发送给对方。

需要发送的快照请求如下:

func (rf *Raft) genInstallSnapshotRequest() *InstallSnapshotRequest {return &InstallSnapshotRequest{Term:             rf.currentTerm,LeaderId:         rf.me,LastIncludeIndex: rf.getFirstIndex(),LastIncludeTerm:  rf.logs[0].Term,Data:             rf.persister.ReadSnapshot(),}
}

follower接收到快照请求后,需要进行如下处理,主要就是检查这个快照有没有过期,是不是真的比自己当前commit的日志还要新,如果是的话,就将自己的日志全部删除,只保留快照中给的最后一个日志,作为logs中的第一个日志,然后再唤起applyCond进行快照的apply。

func (rf *Raft) InstallSnapshot(request *InstallSnapshotRequest, reply *InstallSnapshotReply) {rf.mu.Lock()Debug(dSnap, "S%v {term: %v, commitIndex: %v}, received from S%v with InstallSnapshotRequest {%v} ", rf.me, rf.currentTerm, rf.commitIndex, request.LeaderId, request)defer rf.mu.Unlock()reply.Term = rf.currentTermif request.Term < rf.currentTerm {return}if request.Term > rf.currentTerm {rf.currentTerm = request.Termrf.votedFor = -1rf.persist()}rf.changeState(Follower)if request.LastIncludeIndex <= rf.commitIndex {return}rf.persister.Save(rf.encodeState(), request.Data)rf.commitIndex = request.LastIncludeIndexrf.logs = []LogEntry{{request.LastIncludeIndex, request.LastIncludeTerm, nil}} //2D遇到的bug所在Debug(dSnap, "S%v installs snapshot from S%v, now the commitIndex is %v", rf.me, request.LeaderId, rf.commitIndex)rf.waitApplySnapshotRequest = *requestrf.applyCond.Signal()
}

如果leader接收到回复表示快照已经更新成功了,那么就更新这个节点的nextIndex和matchIndex。

func (rf *Raft) handleInstallSnapshotReply(peer int, request *InstallSnapshotRequest, reply *InstallSnapshotReply) {if reply.Term > rf.currentTerm {rf.changeState(Follower)rf.currentTerm = reply.Termrf.votedFor = -1rf.persist()Debug(dWarn, "S%v found higher term %v in InstallSnapshotReply %v from S%v, changes to follower", rf.me, reply.Term, reply, peer)} else {rf.nextIndex[peer] = request.LastIncludeIndex + 1rf.matchIndex[peer] = request.LastIncludeIndexDebug(dLog, "S%v has installed snapshot to S%v, now the S%v's nextIndex is %v", rf.me, peer, peer, rf.nextIndex[peer])rf.updateCommitIndexForLeader()}
}

注意为了能够有序地进行快照的apply,对原本的applier函数进行了修改,同时增加了waitApplySnapshotRequest来记录最新需要apply的快照请求。

其主要思想是每次唤起applyCond时,先检查是否有新的快照请求,即waitApplySnapshotRequest的Term是否为-1,如果不为-1,那么就进行快照的apply,快照apply了之后再把waitApplySnapshotRequest的Term设置为-1。如果没有新的快照请求,那么就进行日志的apply。

func (rf *Raft) applier() {for !rf.killed() {rf.mu.Lock()for rf.lastApplied >= rf.commitIndex {rf.applyCond.Wait()}if rf.waitApplySnapshotRequest.Term != -1 {if rf.lastApplied < rf.waitApplySnapshotRequest.LastIncludeIndex {rf.mu.Unlock()rf.applyCh <- ApplyMsg{ //Question: two applyCh update way, how to update orderly?SnapshotValid: true,Snapshot:      rf.waitApplySnapshotRequest.Data,SnapshotTerm:  rf.waitApplySnapshotRequest.LastIncludeTerm,SnapshotIndex: rf.waitApplySnapshotRequest.LastIncludeIndex,}rf.mu.Lock()rf.lastApplied = rf.waitApplySnapshotRequest.LastIncludeIndexDebug(dSnap, "S%v applies snapshot from S%v, now the lastApplied is %v", rf.me, rf.waitApplySnapshotRequest.LeaderId, rf.lastApplied)}rf.waitApplySnapshotRequest = InstallSnapshotRequest{Term: -1}rf.mu.Unlock()} else {commitIndex, lastApplied := rf.commitIndex, rf.lastAppliedif rf.getFirstIndex() != 0 && lastApplied+1-rf.getFirstIndex() <= 0 {Debug(dWarn, "S%v has no log to apply, because lastApplied %v < firstIndex %v", rf.me, lastApplied, rf.getFirstIndex())rf.mu.Unlock()continue}entries := make([]LogEntry, commitIndex-lastApplied)Debug(dInfo, "S%v pre to apply log entries. LastApplied: %v, FirstIndex: %v, commitIndex: %v)",rf.me, lastApplied, rf.getFirstIndex(), commitIndex)copy(entries, rf.logs[lastApplied+1-rf.getFirstIndex():commitIndex+1-rf.getFirstIndex()])rf.mu.Unlock()for _, entry := range entries {rf.applyCh <- ApplyMsg{CommandValid: true,Command:      entry.Command,CommandIndex: entry.Index,CommandTerm:  entry.Term,}}rf.mu.Lock()Debug(dInfo, "S%v finishes applying log entries(startId: %v, length: %v), now rf.lastApplied = %v",rf.me, lastApplied+1, len(entries), rf.lastApplied)rf.lastApplied = commitIndexrf.mu.Unlock()}}
}

问题记录

当时写的时候也感觉不是特别复杂,但是后面测试的时候发现这里还是有很多需要注意的点,容易导致错误。快照的引入导致的一个重要的问题是我们现在有两种方式来更新状态机的数据,一种是通过日志的apply,一种是通过快照的apply。

一开始的写法是在接收到快照请求进行InstallSnapshot的处理的时候新起了一个go协程来直接对快照进行apply,但是这会导致一系列的问题。

一开始我们对这两者的并发做什么限制,那么这就有可能出现下面这种情况:

  1. follower节点接受到快照同步请求,并且开启一个协程开始进行快照的apply
  2. 在快照的apply之前,follower节点接收到下一个日志的同步的请求,开始进行日志的apply

这两个apply的顺序其实是不确定的,很有可能就会出现先进行日志的apply,然后再进行快照的apply,这样就会导致状态机的数据不一致,所以需要控制在快照进行apply的时候,不允许进行日志的apply。

然后我采用的方法是控制节点的lastApplied值,即在开启协程进行快照的apply前将lastApplied值设置为-1,然后在快照的apply结束后再将lastApplied设置为快照的index值,然后在日志进行apply的时候,对lastApplied进行判断,如果lastApplied值为-1,那么就进行锁等待,直到lastApplied值不为-1,然后再进行日志的apply。但是这种方法在测试的时候会发现,进行1000次测试大约会有0~3次的可能出现错误,错误的原因是在进行日志的apply的时候,需要apply的日志已经在logs中没有了,导致了取值的错误,也就是并发控制没有成功,在进行了快照的apply后,日志的apply依旧在进行。

经过debug发现这是由于出现了如下这种情况:

  1. followe节点接收到日志同步的请求,开启一个协程进行日志的apply
  2. leader节点已经进行了快照,然后由于超时又给该follower节点发送了日志同步的请求
  3. follower节点接收到快照同步的请求,设置lastApplied为-1,然后开启一个协程进行快照的apply
  4. follower节点结束了日志的apply,将lastApplied设置为日志的index,然后follower节点继续检查,发现lastApplied不为-1,且lastApplied小于commitIndex,所以继续进行日志的apply,然后在logs中取日志时发现该日志已经没有了,导致错误。

所以通过lastApplied进行并发控制并不可行,最后采用的方法是添加了snapApplyCount变量,每次在进行快照的apply时,将snapApplyCount加1,快照的apply结束后将snapApplyCount减1,然后在进行日志的apply时,如果snapApplyCount不为0,那么就进入锁等待。

注意在完成快照的apply后,有可能节点已经接收到了leader同步来的其他日志,所以需要在结束后检查是否有新的日志需要apply,如果需要就唤起日志的apply。最后处理快照同步请求的代码如上述的InstallSnapshot所示,日志apply的代码如下:

func (rf *Raft) applier() {for !rf.killed() {rf.mu.Lock()for rf.snapApplyCount != 0 || rf.lastApplied >= rf.commitIndex {rf.applyCond.Wait()}commitIndex, lastApplied := rf.commitIndex, rf.lastAppliedif rf.getFirstIndex() != 0 && lastApplied+1-rf.getFirstIndex() <= 0 {rf.mu.Unlock()continue}entries := make([]LogEntry, commitIndex-lastApplied)Debug(dInfo, "S%v pre to apply log entries. LastApplied: %v, FirstIndex: %v, commitIndex: %v)",rf.me, lastApplied, rf.getFirstIndex(), commitIndex)copy(entries, rf.logs[lastApplied+1-rf.getFirstIndex():commitIndex+1-rf.getFirstIndex()])rf.mu.Unlock()for _, entry := range entries {rf.applyCh <- ApplyMsg{CommandValid: true,Command:      entry.Command,CommandIndex: entry.Index,CommandTerm:  entry.Term,}}rf.mu.Lock()Debug(dInfo, "S%v finishes applying log entries(startId: %v, length: %v), now rf.lastApplied = %v",rf.me, lastApplied+1, len(entries), rf.lastApplied)rf.lastApplied = commitIndexrf.mu.Unlock()}
}

但是上述方法后面经过测试发现也还是有少量的bug,bug的主要原因在于如下这种情况:

  1. follower节点接收到最后日志为x的快照同步请求,开启一个协程进行快照的apply
  2. follower节点又接收到最后日志为x+10的快照同步请求,开启一个协程进行快照的apply
  3. follower先完成了x+10的快照的apply,然后才完成了x的快照的apply,但是这时候它会将lastApplied设置为x,同时apply的顺序也出现了错误。

纵观上面的问题的一大根源在于我们出现了多个apply的协程,而没有对协程进行很好的并发控制,所以最后采取了上述的发型,将所有的apply都放在一个协程中进行,优先进行快照的apply,进测试可以准确地通过。

实验结果

最终对lab2中所有的测试进行了1000次的测试,全部通过。

请添加图片描述

总结

整个lab2中感觉难度最大的还是lab2B,因为需要实现的功能比较多,需要多多参考raft论文中的论文,最为印象深刻的就是lab2D中的并发问题了,这种问题确实在一开始实现的时候比较难想到,需要通过实验发现,而这种1000次测试才出现一两次错误的问题就更加难发现了,需要有全面的日志记录和多次重复实验的系统才行,后面有机会也分享一下有关日志记录和重复实验相关的内容。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/652.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

记录——FPGA的学习路线

文章目录 一、前言二、编程语言2.1 书籍2.2 刷题网站2.3 仿真工具 三、基础知识3.1 专业基础课3.2 fpga相关专业知识 四、开发工具五、动手实验 一、前言 也不是心血来潮想学习fpga了&#xff0c;而是祥哥还有我一个国科大的同学都在往fpga这个方向走 并且看过我之前文章的同…

合并有序表 (顺序存储 和 链式存储 方式实现)

代码详细解析: 合并有序表文章浏览阅读1.4k次&#xff0c;点赞6次&#xff0c;收藏7次。●假设有两个有序表 LA和LB , 将他们合并成一个有序表LC●要求不破坏原有的表 LA和 LB构思:把这两个表, 合成一个有序表 , 不是简简单单吗?就算是把他们先遍历不按顺序插入到表 C里面 , …

万物皆可计算|下一个风口:近内存计算-2

虽然PIM可以有缓解内存墙的问题&#xff0c;但是PIM设计面临着一系列技术和工程上的挑战&#xff0c;这些挑战直接影响着PIM技术的实用化和广泛应用&#xff1a; 地址翻译与操作映射&#xff1a; 在传统计算机体系结构中&#xff0c;地址空间由操作系统管理和调度&#xff0c;通…

Hotcoin 热门资产上新速报:以太坊互操作性基础设施Omni Network(OMNI)

Hotcoin持续为全球600万用户发掘优质潜力资产&#xff0c;热门币种交易上热币。一文快速了解今日上新资产:Omni Network&#xff08;OMNI&#xff09; 推荐指数 8.4 交易对 OMNI/USDT 交易时间 4月17日 GMT8 20&#xff1a;30 资产赛道 Layer1 项目简介 Omni 是以太坊…

【星瑞格】SinoDB国产数据库安装初体验及学习指南

今天和大家一起来看看一款来自福建的国产数据库——SinoDB。本人很早就听说过这款数据库&#xff0c;而且星瑞格公司就在同一栋办公楼。虽然以前就已经对这颗国产数据库有一定的了解&#xff0c;并没有真正的去使用一把。随着数据库国产化改造工作的推进&#xff0c;身边的客户…

vue+springboot实现聊天功能

前言 在我的项目中&#xff0c;突然有种想法&#xff0c;想实现聊天功能&#xff0c;历经一段时间终于做出来了&#xff1b;那么接下来会讲解如何实现&#xff0c;这篇文章只会实现最基础的逻辑&#xff0c;实时获取对方聊天记录&#xff0c;话不多说&#xff0c;我们就开始吧…

吹爆,一款实用的个人IT工具箱

作为一名开发人员&#xff0c;我们在日常工作和学习中常常需要使用一系列小工具&#xff0c;如JSON格式化、JSON转表格、当前时间戳、XML格式化、SQL格式化、密码生成以及UUID生成等。通常情况下&#xff0c;我们会在网上搜索各种在线工具来满足这些需求。 然而&#xff0c;这…

【简单介绍下单片机】

&#x1f308;个人主页: 程序员不想敲代码啊 &#x1f3c6;CSDN优质创作者&#xff0c;CSDN实力新星&#xff0c;CSDN博客专家 &#x1f44d;点赞⭐评论⭐收藏 &#x1f91d;希望本文对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出指正&#xff0c;让我们共…

大学生简历大赛演讲稿(6篇)

大学生简历大赛演讲稿&#xff08;6篇&#xff09; 以下是六篇大学生简历大赛演讲稿的范文&#xff0c;供您参考&#xff1a; 范文一&#xff1a;展现真我&#xff0c;点亮未来 尊敬的评委、亲爱的同学们&#xff1a; 大家好&#xff01; 今天&#xff0c;我站在这里&#xf…

【C++】:C++关键字,命名空间,输入输出,缺省参数

目录 一&#xff0c;C关键字(C98)二&#xff0c;命名空间2.1 命名冲突2.2 关键字namespace2.2.1 命名空间中可以定义变量/函数/类型2.2.2 命名空间可以嵌套2.2.3 同一个工程中允许存在多个相同名称的命名空间,编译器最后会合成同一个命名空间中。 2.3 命名空间的使用2.3.1 指定…

剑指offer之牛客与力扣——前者分类题单中的题目在后者的链接

搜索 [4.12完成] JZ1 LCR 172. 统计目标成绩的出现次数 JZ3 153. 寻找旋转排序数组中的最小值 JZ4 LCR 014. 字符串的排列 JZ5 LCR 163. 找到第 k 位数字 400 动态规划 [4.15完成] JZ2 LCR 161. 连续天数的最高销售额 53 JZ3 LCR 127. 跳跃训练 70 JZ4 LCR 126. 斐波那契…

gemini国内怎么用

gemini国内怎么用 Google Gemini 作为一个尚处于研发阶段的大型语言模型&#xff0c;其具体功能和性能尚未公开&#xff0c;因此无法对其好用程度做出明确评价。 然而&#xff0c;基于 Google 在人工智能领域的领先地位和技术实力&#xff0c;我们可以对其潜力进行一些推测&a…

大型网站系统架构演化实例_4.数据库读写分离

1.数据库读写分离 网站在使用缓存后&#xff0c;使对大部分数据读操作访问都可以不通过数据库就能完成&#xff0c;但是仍有一部分操作&#xff08;缓存访问不命中、缓存过期&#xff09;和全部的写操作都需要访问数据库&#xff0c;在网站的用户达到一定规模后&#x…

通过实例学C#之ArrayList

介绍 ArrayList对象可以容纳若干个具有相同类型的对象&#xff0c;那有人说&#xff0c;这和数组有什么区别呢。其区别大概可以分为以下几点&#xff1a; 1.数组效率较高&#xff0c;但其容量固定&#xff0c;而且没办法动态改变。 2.ArrayList容量可以动态增长&#xff0c;但…

ros1中python3包调用自定义.py文件

ros中python包相互import不成功问题 问题解决办法 问题 在ros工程中&#xff0c;运行python文件难以直接import自己写的py文件&#xff0c;相互之间无法import&#xff0c;但是在python3虚拟环境python *.py文件就可以正常运行&#xff01; 注意这里还有个问题&#xff0c;我…

❤️‍FlyFlow工作流周更来咯~~

FlyFlow 借鉴了钉钉与飞书的界面设计理念&#xff0c;致力于打造一款用户友好、快速上手的工作流程工具。相较于传统的基于 BPMN.js 的工作流引擎&#xff0c;我们提供的解决方案显著简化了操作逻辑&#xff0c;使得用户能够在极短的时间内构建定制化的业务流程&#xff0c;即便…

记录汇川:五个ST案例

起保停&#xff1a; 简单数学教学&#xff1a; 数据查找&#xff1a; 按钮检测&#xff1a; 数据堆栈&#xff1a;

wiringpi库的应用 -- sg90 定时器 oled

sg 90舵机: 接线: VCC -- 红 GND -- 地 信号线 -- 黄 -- pwm 定时器: 先玩定时器: sg90 需要的pwm波需要定时器输出&#xff0c;so我们得先来玩一下定时器 分析&#xff1a;实现定时器&#xff0c;通过itimerval结构体以及函数setitimer产生的信号&#xff0c;系统…

快手本地生活服务商系统怎么操作?

当下&#xff0c;抖音和快手两大短视频巨头都已开始布局本地生活服务&#xff0c;想要在这一板块争得一席之地。而这也很多普通人看到了机遇&#xff0c;选择成为抖音和快手的本地生活服务商&#xff0c;通过将商家引进平台&#xff0c;并向其提供代运营服务&#xff0c;而成功…

深入探讨虚拟现实中的新型安全威胁:“盗梦攻击”及其防御策略

随着虚拟现实&#xff08;VR&#xff09;技术的飞速发展&#xff0c;用户体验达到了前所未有的沉浸水平&#xff0c;但也暴露在一系列新的安全威胁之下。本文着重介绍了近期出现的一种高度隐秘且影响深远的攻击手段——“盗梦攻击”。这一概念由芝加哥大学的研究人员提出&#…