【MIT6.824】lab2C-persistence, lab2D-log compaction 实现笔记

引言

lab2C的实验要求如下

Complete the functions persist() and readPersist() in raft.go by adding code to save and restore persistent state. You will need to encode (or “serialize”) the state as an array of bytes in order to pass it to the Persister. Use the labgob encoder; see the comments in persist() and readPersist(). labgob is like Go’s gob encoder but prints error messages if you try to encode structures with lower-case field names. For now, pass nil as the second argument to persister.Save(). Insert calls to persist() at the points where your implementation changes persistent state. Once you’ve done this, and if the rest of your implementation is correct, you should pass all of the 2C tests.

lab2D的实验要求如下

Implement Snapshot() and the InstallSnapshot RPC, as well as the changes to Raft to support these (e.g, operation with a trimmed log). Your solution is complete when it passes the 2D tests (and all the previous Lab 2 tests).

总体而言, lab2C需要我们实现关键数据的持久化,lab2D需要我们通过快照实现日志的压缩。代码可以在https://github.com/slipegg/MIT6.824中得到。所有代码均通过了1千次的测试。

lab2C 实现

在实验时测试2C时,测试代码将会尝试将某些节点从网络中断开,然后一段时间后再依据这些断开的节点的持久化的信息重新生成一个新的节点并加入到网络中,测试代码将会检测加入这个节点后是否与预期相同。

在初始化节点的时候,会传入一个Persister对象,这个对象充当一个硬盘的角色,用于持久化数据,后续在测试重新生成节点时,就需要传入旧节点的Persister对象,以便新节点能够从硬盘中读取旧节点的数据进行复原。

参考raft论文,我们需要持久化的数据有:

  • currentTerm
  • votedFor
  • log entries

在raft.go中,我们需要实现persist和readPersist函数,用于持久化和读取数据。

// persist saves Raft's persistent state to stable storage,
func (rf *Raft) persist() {rf.persister.Save(rf.encodeState(), rf.persister.snapshot)
}func (rf *Raft) encodeState() []byte {w := new(bytes.Buffer)e := labgob.NewEncoder(w)e.Encode(rf.currentTerm)e.Encode(rf.votedFor)e.Encode(rf.logs)return w.Bytes()
}
// readPersist restores previously persisted state.
func (rf *Raft) readPersist(data []byte) {if data == nil || len(data) < 1 { // bootstrap without any state?return}var currentTerm intvar votedFor intvar logs []LogEntryr := bytes.NewBuffer(data)d := labgob.NewDecoder(r)if d.Decode(&currentTerm) != nil ||d.Decode(&votedFor) != nil ||d.Decode(&logs) != nil {Debug(dError, "S%v failed to read persist", rf.me)} else {Debug(dInfo, "S%v read persist successfully", rf.me)rf.currentTerm = currentTermrf.votedFor = votedForrf.logs = logsrf.lastApplied = rf.getFirstIndex()rf.commitIndex = rf.getFirstIndex()}
}

然后我们需要在每次修改了持久化数据的地方调用persist函数,然后在初始化节点时调用readPersist函数来读取持久化数据,整体难度不大。

lab2D 实现

在实验时测试2D时,测试代码在接收到apply的命令id为9结尾时,就会调用节点的Snapshot函数进行快照,将日志压缩。代码需要做到在压缩日志后,仍然能够准确地运行。

首先需要完成快照生成的函数,如下所示,每次会传入需要快照到的日志index,以及当这个节点为止的状态机的快照数据,系统保证传入的日志index一定是已经apply过的。由于已经将状态机的内容放入到了snapshot中,所以其实包括index在内的前面的所有日志都可以删除了,但是由于在同步日志信息时,需要上一个日志的term信息,所以我们会单独保留id为index的日志的id和term信息,放在logs的第一位。

func (rf *Raft) Snapshot(index int, snapshot []byte) {rf.mu.Lock()defer rf.mu.Unlock()if index <= rf.getFirstIndex() {Debug(dSnap, "S%v ignores the snapshot request with end index %v, because the index is not bigger than the first index %v", rf.me, index, rf.getFirstIndex())return}rf.logs = append([]LogEntry{{index, rf.logs[index-rf.getFirstIndex()].Term, nil}}, rf.logs[index-rf.getFirstIndex()+1:]...)rf.persister.Save(rf.encodeState(), snapshot)Debug(dSnap, "S%v applies the snapshot with end index %v, now the len(logs)=%v", rf.me, index, len(rf.logs))
}

由于快照的引入,现在logs中的第一个日志可能不再是0了,所以之前代码中所有从logs中依据日志index获取日志的代码都要修改为:rf.logs[index-rf.getFirstIndex()]

同时快照的引入还会导致在leader与follower进行日志同步时,需要的同步的日志可能已经没有了,所以这时候需要直接将整个日志发送给对方。

需要发送的快照请求如下:

func (rf *Raft) genInstallSnapshotRequest() *InstallSnapshotRequest {return &InstallSnapshotRequest{Term:             rf.currentTerm,LeaderId:         rf.me,LastIncludeIndex: rf.getFirstIndex(),LastIncludeTerm:  rf.logs[0].Term,Data:             rf.persister.ReadSnapshot(),}
}

follower接收到快照请求后,需要进行如下处理,主要就是检查这个快照有没有过期,是不是真的比自己当前commit的日志还要新,如果是的话,就将自己的日志全部删除,只保留快照中给的最后一个日志,作为logs中的第一个日志,然后再唤起applyCond进行快照的apply。

func (rf *Raft) InstallSnapshot(request *InstallSnapshotRequest, reply *InstallSnapshotReply) {rf.mu.Lock()Debug(dSnap, "S%v {term: %v, commitIndex: %v}, received from S%v with InstallSnapshotRequest {%v} ", rf.me, rf.currentTerm, rf.commitIndex, request.LeaderId, request)defer rf.mu.Unlock()reply.Term = rf.currentTermif request.Term < rf.currentTerm {return}if request.Term > rf.currentTerm {rf.currentTerm = request.Termrf.votedFor = -1rf.persist()}rf.changeState(Follower)if request.LastIncludeIndex <= rf.commitIndex {return}rf.persister.Save(rf.encodeState(), request.Data)rf.commitIndex = request.LastIncludeIndexrf.logs = []LogEntry{{request.LastIncludeIndex, request.LastIncludeTerm, nil}} //2D遇到的bug所在Debug(dSnap, "S%v installs snapshot from S%v, now the commitIndex is %v", rf.me, request.LeaderId, rf.commitIndex)rf.waitApplySnapshotRequest = *requestrf.applyCond.Signal()
}

如果leader接收到回复表示快照已经更新成功了,那么就更新这个节点的nextIndex和matchIndex。

func (rf *Raft) handleInstallSnapshotReply(peer int, request *InstallSnapshotRequest, reply *InstallSnapshotReply) {if reply.Term > rf.currentTerm {rf.changeState(Follower)rf.currentTerm = reply.Termrf.votedFor = -1rf.persist()Debug(dWarn, "S%v found higher term %v in InstallSnapshotReply %v from S%v, changes to follower", rf.me, reply.Term, reply, peer)} else {rf.nextIndex[peer] = request.LastIncludeIndex + 1rf.matchIndex[peer] = request.LastIncludeIndexDebug(dLog, "S%v has installed snapshot to S%v, now the S%v's nextIndex is %v", rf.me, peer, peer, rf.nextIndex[peer])rf.updateCommitIndexForLeader()}
}

注意为了能够有序地进行快照的apply,对原本的applier函数进行了修改,同时增加了waitApplySnapshotRequest来记录最新需要apply的快照请求。

其主要思想是每次唤起applyCond时,先检查是否有新的快照请求,即waitApplySnapshotRequest的Term是否为-1,如果不为-1,那么就进行快照的apply,快照apply了之后再把waitApplySnapshotRequest的Term设置为-1。如果没有新的快照请求,那么就进行日志的apply。

func (rf *Raft) applier() {for !rf.killed() {rf.mu.Lock()for rf.lastApplied >= rf.commitIndex {rf.applyCond.Wait()}if rf.waitApplySnapshotRequest.Term != -1 {if rf.lastApplied < rf.waitApplySnapshotRequest.LastIncludeIndex {rf.mu.Unlock()rf.applyCh <- ApplyMsg{ //Question: two applyCh update way, how to update orderly?SnapshotValid: true,Snapshot:      rf.waitApplySnapshotRequest.Data,SnapshotTerm:  rf.waitApplySnapshotRequest.LastIncludeTerm,SnapshotIndex: rf.waitApplySnapshotRequest.LastIncludeIndex,}rf.mu.Lock()rf.lastApplied = rf.waitApplySnapshotRequest.LastIncludeIndexDebug(dSnap, "S%v applies snapshot from S%v, now the lastApplied is %v", rf.me, rf.waitApplySnapshotRequest.LeaderId, rf.lastApplied)}rf.waitApplySnapshotRequest = InstallSnapshotRequest{Term: -1}rf.mu.Unlock()} else {commitIndex, lastApplied := rf.commitIndex, rf.lastAppliedif rf.getFirstIndex() != 0 && lastApplied+1-rf.getFirstIndex() <= 0 {Debug(dWarn, "S%v has no log to apply, because lastApplied %v < firstIndex %v", rf.me, lastApplied, rf.getFirstIndex())rf.mu.Unlock()continue}entries := make([]LogEntry, commitIndex-lastApplied)Debug(dInfo, "S%v pre to apply log entries. LastApplied: %v, FirstIndex: %v, commitIndex: %v)",rf.me, lastApplied, rf.getFirstIndex(), commitIndex)copy(entries, rf.logs[lastApplied+1-rf.getFirstIndex():commitIndex+1-rf.getFirstIndex()])rf.mu.Unlock()for _, entry := range entries {rf.applyCh <- ApplyMsg{CommandValid: true,Command:      entry.Command,CommandIndex: entry.Index,CommandTerm:  entry.Term,}}rf.mu.Lock()Debug(dInfo, "S%v finishes applying log entries(startId: %v, length: %v), now rf.lastApplied = %v",rf.me, lastApplied+1, len(entries), rf.lastApplied)rf.lastApplied = commitIndexrf.mu.Unlock()}}
}

问题记录

当时写的时候也感觉不是特别复杂,但是后面测试的时候发现这里还是有很多需要注意的点,容易导致错误。快照的引入导致的一个重要的问题是我们现在有两种方式来更新状态机的数据,一种是通过日志的apply,一种是通过快照的apply。

一开始的写法是在接收到快照请求进行InstallSnapshot的处理的时候新起了一个go协程来直接对快照进行apply,但是这会导致一系列的问题。

一开始我们对这两者的并发做什么限制,那么这就有可能出现下面这种情况:

  1. follower节点接受到快照同步请求,并且开启一个协程开始进行快照的apply
  2. 在快照的apply之前,follower节点接收到下一个日志的同步的请求,开始进行日志的apply

这两个apply的顺序其实是不确定的,很有可能就会出现先进行日志的apply,然后再进行快照的apply,这样就会导致状态机的数据不一致,所以需要控制在快照进行apply的时候,不允许进行日志的apply。

然后我采用的方法是控制节点的lastApplied值,即在开启协程进行快照的apply前将lastApplied值设置为-1,然后在快照的apply结束后再将lastApplied设置为快照的index值,然后在日志进行apply的时候,对lastApplied进行判断,如果lastApplied值为-1,那么就进行锁等待,直到lastApplied值不为-1,然后再进行日志的apply。但是这种方法在测试的时候会发现,进行1000次测试大约会有0~3次的可能出现错误,错误的原因是在进行日志的apply的时候,需要apply的日志已经在logs中没有了,导致了取值的错误,也就是并发控制没有成功,在进行了快照的apply后,日志的apply依旧在进行。

经过debug发现这是由于出现了如下这种情况:

  1. followe节点接收到日志同步的请求,开启一个协程进行日志的apply
  2. leader节点已经进行了快照,然后由于超时又给该follower节点发送了日志同步的请求
  3. follower节点接收到快照同步的请求,设置lastApplied为-1,然后开启一个协程进行快照的apply
  4. follower节点结束了日志的apply,将lastApplied设置为日志的index,然后follower节点继续检查,发现lastApplied不为-1,且lastApplied小于commitIndex,所以继续进行日志的apply,然后在logs中取日志时发现该日志已经没有了,导致错误。

所以通过lastApplied进行并发控制并不可行,最后采用的方法是添加了snapApplyCount变量,每次在进行快照的apply时,将snapApplyCount加1,快照的apply结束后将snapApplyCount减1,然后在进行日志的apply时,如果snapApplyCount不为0,那么就进入锁等待。

注意在完成快照的apply后,有可能节点已经接收到了leader同步来的其他日志,所以需要在结束后检查是否有新的日志需要apply,如果需要就唤起日志的apply。最后处理快照同步请求的代码如上述的InstallSnapshot所示,日志apply的代码如下:

func (rf *Raft) applier() {for !rf.killed() {rf.mu.Lock()for rf.snapApplyCount != 0 || rf.lastApplied >= rf.commitIndex {rf.applyCond.Wait()}commitIndex, lastApplied := rf.commitIndex, rf.lastAppliedif rf.getFirstIndex() != 0 && lastApplied+1-rf.getFirstIndex() <= 0 {rf.mu.Unlock()continue}entries := make([]LogEntry, commitIndex-lastApplied)Debug(dInfo, "S%v pre to apply log entries. LastApplied: %v, FirstIndex: %v, commitIndex: %v)",rf.me, lastApplied, rf.getFirstIndex(), commitIndex)copy(entries, rf.logs[lastApplied+1-rf.getFirstIndex():commitIndex+1-rf.getFirstIndex()])rf.mu.Unlock()for _, entry := range entries {rf.applyCh <- ApplyMsg{CommandValid: true,Command:      entry.Command,CommandIndex: entry.Index,CommandTerm:  entry.Term,}}rf.mu.Lock()Debug(dInfo, "S%v finishes applying log entries(startId: %v, length: %v), now rf.lastApplied = %v",rf.me, lastApplied+1, len(entries), rf.lastApplied)rf.lastApplied = commitIndexrf.mu.Unlock()}
}

但是上述方法后面经过测试发现也还是有少量的bug,bug的主要原因在于如下这种情况:

  1. follower节点接收到最后日志为x的快照同步请求,开启一个协程进行快照的apply
  2. follower节点又接收到最后日志为x+10的快照同步请求,开启一个协程进行快照的apply
  3. follower先完成了x+10的快照的apply,然后才完成了x的快照的apply,但是这时候它会将lastApplied设置为x,同时apply的顺序也出现了错误。

纵观上面的问题的一大根源在于我们出现了多个apply的协程,而没有对协程进行很好的并发控制,所以最后采取了上述的发型,将所有的apply都放在一个协程中进行,优先进行快照的apply,进测试可以准确地通过。

实验结果

最终对lab2中所有的测试进行了1000次的测试,全部通过。

请添加图片描述

总结

整个lab2中感觉难度最大的还是lab2B,因为需要实现的功能比较多,需要多多参考raft论文中的论文,最为印象深刻的就是lab2D中的并发问题了,这种问题确实在一开始实现的时候比较难想到,需要通过实验发现,而这种1000次测试才出现一两次错误的问题就更加难发现了,需要有全面的日志记录和多次重复实验的系统才行,后面有机会也分享一下有关日志记录和重复实验相关的内容。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/652.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【C++风云录】开源金融与科技库探索:优化计算与高效分析

高效计算与智能分析&#xff1a;开源库在金融和科技领域的应用探究 前言 本文将探索几个关键的开源库&#xff0c;包括QuantLib、TA-Lib、Boost.Asio、Armadillo和FastFlow&#xff0c;这些库在金融领域和科技领域中发挥着重要作用。通过使用这些工具&#xff0c;开发人员能够…

记录——FPGA的学习路线

文章目录 一、前言二、编程语言2.1 书籍2.2 刷题网站2.3 仿真工具 三、基础知识3.1 专业基础课3.2 fpga相关专业知识 四、开发工具五、动手实验 一、前言 也不是心血来潮想学习fpga了&#xff0c;而是祥哥还有我一个国科大的同学都在往fpga这个方向走 并且看过我之前文章的同…

合并有序表 (顺序存储 和 链式存储 方式实现)

代码详细解析: 合并有序表文章浏览阅读1.4k次&#xff0c;点赞6次&#xff0c;收藏7次。●假设有两个有序表 LA和LB , 将他们合并成一个有序表LC●要求不破坏原有的表 LA和 LB构思:把这两个表, 合成一个有序表 , 不是简简单单吗?就算是把他们先遍历不按顺序插入到表 C里面 , …

万物皆可计算|下一个风口:近内存计算-2

虽然PIM可以有缓解内存墙的问题&#xff0c;但是PIM设计面临着一系列技术和工程上的挑战&#xff0c;这些挑战直接影响着PIM技术的实用化和广泛应用&#xff1a; 地址翻译与操作映射&#xff1a; 在传统计算机体系结构中&#xff0c;地址空间由操作系统管理和调度&#xff0c;通…

Hotcoin 热门资产上新速报:以太坊互操作性基础设施Omni Network(OMNI)

Hotcoin持续为全球600万用户发掘优质潜力资产&#xff0c;热门币种交易上热币。一文快速了解今日上新资产:Omni Network&#xff08;OMNI&#xff09; 推荐指数 8.4 交易对 OMNI/USDT 交易时间 4月17日 GMT8 20&#xff1a;30 资产赛道 Layer1 项目简介 Omni 是以太坊…

关于电脑蓝屏解决方法(ST-LINK/ J-Link)

问题背景&#xff1a; 电脑win11系统&#xff0c;使用到STM32CUBEIDE STM32MP157A在实际的烧录情况中&#xff0c;烧录器插到电脑上面&#xff0c;电脑立即蓝屏。(封面蓝屏图片不为本问题蓝屏图片) 其他类似情况也可参考。 问题分析&#xff1a; 可能是激发了电脑的保护&…

【星瑞格】SinoDB国产数据库安装初体验及学习指南

今天和大家一起来看看一款来自福建的国产数据库——SinoDB。本人很早就听说过这款数据库&#xff0c;而且星瑞格公司就在同一栋办公楼。虽然以前就已经对这颗国产数据库有一定的了解&#xff0c;并没有真正的去使用一把。随着数据库国产化改造工作的推进&#xff0c;身边的客户…

TCP机械臂测试

#include<myhead.h> #define SER_IP "192.168.125.242" #define SER_PORT 1234 #define CLI_IP "192.168.243.131" #define CLI_PORT 9999 int main(int argc, const char *argv[]) { //1、创建用于通信的套接字文件描述符 int cfd socket(…

vue+springboot实现聊天功能

前言 在我的项目中&#xff0c;突然有种想法&#xff0c;想实现聊天功能&#xff0c;历经一段时间终于做出来了&#xff1b;那么接下来会讲解如何实现&#xff0c;这篇文章只会实现最基础的逻辑&#xff0c;实时获取对方聊天记录&#xff0c;话不多说&#xff0c;我们就开始吧…

吹爆,一款实用的个人IT工具箱

作为一名开发人员&#xff0c;我们在日常工作和学习中常常需要使用一系列小工具&#xff0c;如JSON格式化、JSON转表格、当前时间戳、XML格式化、SQL格式化、密码生成以及UUID生成等。通常情况下&#xff0c;我们会在网上搜索各种在线工具来满足这些需求。 然而&#xff0c;这…

odoo17开发教程(21):数据文件详解

Odoo 以数据为导向&#xff0c;因此模块定义的一个重要部分就是其管理的各种记录的定义&#xff1a;用户界面&#xff08;菜单和视图&#xff09;、安全性&#xff08;访问权限和记录规则&#xff09;、报告和普通数据都是通过记录定义的。 结构 在 Odoo 中定义数据的主要方式…

【C++ 哈希应用】

文章目录 位图概念代码实现海量数据处理 布隆过滤器概念代码实现海量数据处理 哈希切割海量数据处理 位图 概念 一个值在给定的集合中有两种状态&#xff0c;在或不在&#xff0c;要表示这种状态&#xff0c;最少可以用一个比特位&#xff0c;比特位为1表示在&#xff0c;比特…

【简单介绍下单片机】

&#x1f308;个人主页: 程序员不想敲代码啊 &#x1f3c6;CSDN优质创作者&#xff0c;CSDN实力新星&#xff0c;CSDN博客专家 &#x1f44d;点赞⭐评论⭐收藏 &#x1f91d;希望本文对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出指正&#xff0c;让我们共…

大学生简历大赛演讲稿(6篇)

大学生简历大赛演讲稿&#xff08;6篇&#xff09; 以下是六篇大学生简历大赛演讲稿的范文&#xff0c;供您参考&#xff1a; 范文一&#xff1a;展现真我&#xff0c;点亮未来 尊敬的评委、亲爱的同学们&#xff1a; 大家好&#xff01; 今天&#xff0c;我站在这里&#xf…

open-webui与ollama的部署最后完整之命令

docker run -d --networkhost -v open-webui:/app/backend/data -e HF_ENDPOINThttps://hf-mirror.com -e OLLAMA_BASE_URLhttp://127.0.0.1:11434 --name open-webui --restart always ghcr.io/open-webui/open-webui:main -e HF_ENDPOINThttps://hf-mirror.com 一定要加上&a…

【C++】:C++关键字,命名空间,输入输出,缺省参数

目录 一&#xff0c;C关键字(C98)二&#xff0c;命名空间2.1 命名冲突2.2 关键字namespace2.2.1 命名空间中可以定义变量/函数/类型2.2.2 命名空间可以嵌套2.2.3 同一个工程中允许存在多个相同名称的命名空间,编译器最后会合成同一个命名空间中。 2.3 命名空间的使用2.3.1 指定…

剑指offer之牛客与力扣——前者分类题单中的题目在后者的链接

搜索 [4.12完成] JZ1 LCR 172. 统计目标成绩的出现次数 JZ3 153. 寻找旋转排序数组中的最小值 JZ4 LCR 014. 字符串的排列 JZ5 LCR 163. 找到第 k 位数字 400 动态规划 [4.15完成] JZ2 LCR 161. 连续天数的最高销售额 53 JZ3 LCR 127. 跳跃训练 70 JZ4 LCR 126. 斐波那契…

笔记:编写函数,接收整数参数t,返回斐波那契数列中大于t的第1个数。

文章目录 前言一、斐波那契数列是什么&#xff1f;二、编写代码1.代码2.优化代码 总结 前言 题目&#xff1a;编写函数&#xff0c;接收整数参数t&#xff0c;返回斐波那契数列中大于t的第1个数。 在编写函数之前&#xff0c;我们首先需要了解一下斐波那契数列是什么。 一、斐…

gemini国内怎么用

gemini国内怎么用 Google Gemini 作为一个尚处于研发阶段的大型语言模型&#xff0c;其具体功能和性能尚未公开&#xff0c;因此无法对其好用程度做出明确评价。 然而&#xff0c;基于 Google 在人工智能领域的领先地位和技术实力&#xff0c;我们可以对其潜力进行一些推测&a…

【xgboost】使用xgboost训练一个简单模型

使用pandas读取特征数据&#xff0c;并处理数据中的双引号 使用xgboost训练一版模型 xgboost1.6.2 #!/usr/bin/env python # -*- coding:utf-8 -*- import pandas as pd import numpy as np from sklearn.model_selection import train_test_split import xgboost as xgb impor…