raft实现
距离上一篇文章一个月,因为6.824的课程看不懂,基础知识薄弱。现在了解一点Raft算法(自己动手实现一遍)还需要其他分布式相关的基础知识(实现一个分布式对象存储系统),然后再去继续学习。总结一下,如果直接就去学习6.824的课程收效甚微,一定要有一定基础知识储备才可以!
这块代码是网上开源的代码,阅读之后还是发现很多问题。但是值得借鉴学习
Raft算法简单实现。
package mainimport ("fmt""log""math/rand""net/http""net/rpc""sync""time"
)const raftCount = 5//声明leader对象
type Leader struct {//任期Term int//领导编号LeaderId int
}//创建存储leader的对象
//最初任期为0,-1代表没编号
var leader = Leader{0, -1}//声明raft节点类型
type Raft struct {//锁mu sync.Mutex//节点编号me int//当前任期currentTerm int//为哪个节点投票votedFor int//当前节点状态//0 follower 1 candidate 2 leaderstate int//发送最后一条消息的时间lastMessageTime int64//当前节点的领导currentLeader int//消息通道message chan bool//选举通道electCh chan bool//心跳信号heartBeat chan bool//返回心跳信号hearbeatRe chan bool//超时时间timeout int
}func main() {for i := 0; i < raftCount; i++ {//定义Make() 创建节点Make(i)}//对raft结构体实现rpc注册rpc.Register(new(Raft))rpc.HandleHTTP()err := http.ListenAndServe(":8080", nil)if err != nil {log.Fatal(err)}for {;}
}//创建节点
func Make(me int) *Raft {rf := &Raft{}//编号rf.me = me//给0 1 2三个节点投票,给谁都不投rf.votedFor = -1//0 followerrf.state = 0rf.timeout = 0//最初没有领导rf.currentLeader = -1//设置任期rf.setTerm(0)//通道rf.electCh = make(chan bool)rf.message = make(chan bool)rf.heartBeat = make(chan bool)rf.hearbeatRe = make(chan bool)rf.lastMessageTime = 0rf.currentTerm = 0//随机种子rand.Seed(time.Now().UnixNano())//选举的逻辑实现go rf.election()//心跳检查go rf.sendLeaderHeartBeat()return rf
}func (rf *Raft) setTerm(term int) {rf.currentTerm = term
}func (rf *Raft) election() {var result boolfor {timeout := randRange(150, 300)rf.lastMessageTime = millisecond()select {case <-time.After(time.Duration(timeout) * time.Millisecond):fmt.Println("当前节点状态为:", rf.state)}result = falsefor !result {result = rf.election_one_rand(&leader)}}
}func randRange(min, max int64) int64 {//用于心跳信号的时间等return rand.Int63n(max-min) + min
}func millisecond() int64 {return time.Now().UnixNano() / int64(time.Millisecond)
}//选leader
func (rf *Raft) election_one_rand(leader *Leader) bool {var timeout int64timeout = 100var vote intvar triggerHeartbeat boollast := millisecond()success := false//首先,要成为candidate状态rf.mu.Lock()rf.becomeCandidate()rf.mu.Unlock()//开始选fmt.Println("start electing leader")for {for i := 0; i < raftCount; i++ {if i != rf.me {go func() {if leader.LeaderId < 0 {rf.electCh <- true}}()}}vote = 0triggerHeartbeat = falsefor i := 0; i < raftCount; i++ {select {case ok := <-rf.electCh:if ok {vote ++success = vote > raftCount/2if success && !triggerHeartbeat {triggerHeartbeat = truerf.mu.Lock()//真正的成为leaderrf.becomeLeader()rf.mu.Unlock()rf.heartBeat <- truefmt.Println(rf.me, "号节点成为了leader") //, "任期:", leader.Term)fmt.Println("leader发送心跳信号")}}}}//间隔时间小于100毫秒左右//若不超时,且票数大于一半,且当前有领导if (timeout+last < millisecond() || (vote >= raftCount/2 || rf.currentLeader > -1)) {break} else {select {case <-time.After(time.Duration(10) * time.Millisecond):}}}return success
}//修改节点为candidate状态
func (rf *Raft) becomeCandidate() {//将节点状态变为1rf.state = 1//节点任期加1rf.setTerm(rf.currentTerm + 1)//设置为哪个节点投票rf.votedFor = rf.me//当前没有领导rf.currentLeader = -1
}// 允许节点进行成为leader的竞争,哪一个先获得多数选票,哪一个就是leader
func (rf *Raft) becomeLeader() {//节点状态变为2,代表leaderrf.state = 2rf.currentLeader = rf.me
}func (rf *Raft) sendLeaderHeartBeat() {for {select {case <-rf.heartBeat:rf.sendAppendEntriesImpl()}}
}//返回给leader的确认信号
func (rf *Raft) sendAppendEntriesImpl() {if rf.currentLeader == rf.me {var success_count = 0for i := 0; i < raftCount; i++ {if i != rf.me {go func() {rp, err := rpc.DialHTTP("tcp", "127.0.0.1:8080")if err != nil {log.Fatal(err)}var ok = falseer := rp.Call("Raft.Communication", Param{"hello"}, &ok)if er != nil {log.Fatal(err)}if ok {rf.hearbeatRe <- true}}()}}//计算返回确认信号的子节点,若子节点个数>raftCount/2,则校验成功for i := 0; i < raftCount; i++ {select {case ok := <-rf.hearbeatRe:if ok {success_count++fmt.Println(rf.me, "获得了", success_count, "选票")if success_count > raftCount/2 {fmt.Println("投票选举成功,校验心跳信号成功")defer func() {fmt.Println("Leader is:", rf.me)}()panic("The End")}}}}}
}type Param struct {Msg string
}func (r *Raft) Communication(p Param, a *bool) error {fmt.Println(p.Msg)*a = truereturn nil
}
Result
运行结果1
当前节点状态为: 0
start electing leader
3 号节点成为了leader
leader发送心跳信号
hello
hello
hello
hello
3 获得了 1 选票
3 获得了 2 选票
3 获得了 3 选票
投票选举成功,校验心跳信号成功
Leader is: 3
panic: The End
运行结果2
当前节点状态为: 0
start electing leader
当前领导是: -1
当前领导是: -1
当前领导是: -1
当前领导是x: 4
4 号节点成为了leader
leader发送心跳信号
当前领导是: 4
当前领导是: 4
hello
hello
hello
hello
4 获得了 1 选票
4 获得了 2 选票
4 获得了 3 选票
投票选举成功,校验心跳信号成功, Leader是: 4
2020/04/22 14:21:26 The end
运行结果3
当前节点状态为: 0
start electing leader
当前节点状态为: 0
start electing leader
当前节点状态为: 0
start electing leader
当前领导是: -1
当前领导是: -1
当前领导是: -1
当前领导是x: 3
3 号节点成为了leader
leader发送心跳信号
当前领导是: 3
当前领导是: 3
当前领导是: -1
当前领导是: -1
当前领导是: -1
当前领导是x: 0
0 号节点成为了leader
leader发送心跳信号
当前领导是: 0
当前领导是: 0
当前领导是: -1
当前领导是: -1
当前领导是: -1
当前领导是x: 1
1 号节点成为了leader
leader发送心跳信号
当前领导是: 1
当前领导是: 1
hello
hello
hello
hello
3 获得了 1 选票
0 获得了 1 选票
hello
3 获得了 2 选票
hello
1 获得了 1 选票
3 获得了 3 选票
投票选举成功,校验心跳信号成功, Leader是: 3
hello
hello
0 获得了 2 选票
hello
hello
1 获得了 2 选票
0 获得了 3 选票
投票选举成功,校验心跳信号成功, Leader是: 0
hello
1 获得了 3 选票
投票选举成功,校验心跳信号成功, Leader是: 1
hello
结果4
leader3 发送了4个hello投票包,这个时候选举已经成功了。但是node4这个时候,运行到了
//选举的逻辑实现
go rf.election()
开始了新的选举。也选举成功了。
当前节点: 3 状态为: 0
start electing leader
当前领导是: -1
当前领导是: -1
当前领导是: -1
当前领导是x: 3
3 号节点成为了leader
leader发送心跳信号
当前领导是: 3
当前领导是: 3
hello3
hello3
hello3
hello3
3 获得了 1 选票
3 获得了 2 选票
3 获得了 3 选票
投票选举成功,校验心跳信号成功, Leader是: 3
当前节点: 4 状态为: 0
start electing leader
当前领导是: -1
当前领导是: -1
当前领导是: -1
当前领导是x: 4
4 号节点成为了leader
leader发送心跳信号
当前领导是: 4
当前领导是: 4
hello4
hello4
hello4
4 获得了 1 选票
4 获得了 2 选票
hello4
4 获得了 3 选票
投票选举成功,校验心跳信号成功, Leader是: 4
2020/04/22 14:36:44 The end
结果5
如果每次随机的时间不一样,那么每个都有选举的机会,就是每个都选举一次。这是错的
正确应该是一个进行选举,其他收到心跳包后就停止leader选举进程
随机的时间,如果有一样的值,那么就会同时进行leader选举 0 231
随机的时间,如果有一样的值,那么就会同时进行leader选举 2 193
随机的时间,如果有一样的值,那么就会同时进行leader选举 3 157
随机的时间,如果有一样的值,那么就会同时进行leader选举 1 251
随机的时间,如果有一样的值,那么就会同时进行leader选举 4 288
当前节点: 3 状态为: 0
start electing leader
当前节点: 3 领导是: -1
当前节点: 3 领导是: -1
当前节点: 3 领导是: -1
当前领导是x: 3
3 号节点成为了leader 任期: 0
leader发送心跳信号
当前节点: 3 领导是: 3
当前节点: 3 领导是: 3
hello3
hello3
hello3
3 获得了 1 选票
3 获得了 2 选票
hello3
3 获得了 3 选票
node是: 3 投票选举成功,校验心跳信号成功, Leader是: 3
当前节点: 2 状态为: 0
start electing leader
当前节点: 2 领导是: -1
当前节点: 2 领导是: -1
当前节点: 2 领导是: -1
当前领导是x: 2
2 号节点成为了leader 任期: 0
leader发送心跳信号
当前节点: 2 领导是: 2
当前节点: 2 领导是: 2
hello2
hello2
2 获得了 1 选票
hello2
hello2
2 获得了 2 选票
2 获得了 3 选票
node是: 2 投票选举成功,校验心跳信号成功, Leader是: 2
当前节点: 0 状态为: 0
start electing leader
当前节点: 0 领导是: -1
当前节点: 0 领导是: -1
当前节点: 0 领导是: -1
当前领导是x: 0
0 号节点成为了leader 任期: 0
leader发送心跳信号
当前节点: 0 领导是: 0
当前节点: 0 领导是: 0
hello0
hello0
hello0
0 获得了 1 选票
0 获得了 2 选票
hello0
0 获得了 3 选票
node是: 0 投票选举成功,校验心跳信号成功, Leader是: 0
当前节点: 1 状态为: 0
start electing leader
当前节点: 1 领导是: -1
当前节点: 1 领导是: -1
当前节点: 1 领导是: -1
当前领导是x: 1
1 号节点成为了leader 任期: 0
leader发送心跳信号
当前节点: 1 领导是: 1
当前节点: 1 领导是: 1
hello1
hello1
hello1
1 获得了 1 选票
1 获得了 2 选票
1 获得了 3 选票
node是: 1 投票选举成功,校验心跳信号成功, Leader是: 1
hello1
当前节点: 4 状态为: 0
start electing leader
当前节点: 4 领导是: -1
当前节点: 4 领导是: -1
当前节点: 4 领导是: -1
当前领导是x: 4
4 号节点成为了leader 任期: 0
leader发送心跳信号
当前节点: 4 领导是: 4
当前节点: 4 领导是: 4
hello4
4 获得了 1 选票
hello4
hello4
4 获得了 2 选票
4 获得了 3 选票
node是: 4 投票选举成功,校验心跳信号成功, Leader是: 4
hello4
2020/04/22 15:58:03 The end
总结
上面代码存在的问题:
- 在确定leader后,其它node或者(candidate)应该停止Leader选举。
- leader的全局变量没有用。