FastLeaderElection是zookeeper默认的选举算法,当peer处于ServerState.Looking状态时会执行FastLeaderElection.lookForLeader进行选主.
重要数据结构:
1.HashMap<Long, Vote> recvset: 本轮选举中来自 ServerState处于 Looking的 Peer的选票信息. 用于判断是否选举结束.
2.HashMap<Long, Vote> outofelection : 选举之外的 peer发送的选票信息, 即 ServerState处于 Following和Leading的peer发送的信息 表示选举已经结束了. 用于判断选举是否结束.
重要函数:
totalOrderPredicate: 比较zxid的大小, 比较顺序 epoch -> zxid - > serviceId
termPredicate : 通过判断 Leader是否在 recvSet中占1/2以上来确定是否结束了选举
ooePredicate : 通过recvSet和outofelection判断是否结束了选举.
选主主要逻辑如下:
1.更新逻辑时钟+1,向其他peer发送选自己的提议
2.循环处理来自其他Peer的通知:
1) Looking的通知: 如果通知中推荐的人比自己合适,则更新提议发送给其他peer,否则忽略. 判断选举是否结束, 通过判断 notification.leader 是否占 recvset的 1/2以上选票.
2)Leading或Following的通知: 如果收到这两种消息说明选举已经结束, 通过outofelection集合判断.
public Vote lookForLeader() throws InterruptedException {......try {HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();int notTimeout = finalizeWait;synchronized(this){logicalclock++;updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());}LOG.info("New election. My id = " + self.getId() +", proposed zxid=0x" + Long.toHexString(proposedZxid));sendNotifications();/** Loop in which we exchange notifications until we find a leader*/while ((self.getPeerState() == ServerState.LOOKING) &&(!stop)){/** Remove next notification from queue, times out after 2 times* the termination time*/Notification n = recvqueue.poll(notTimeout,TimeUnit.MILLISECONDS);/** Sends more notifications if haven't received enough.* Otherwise processes new notification.*/if(n == null){if(manager.haveDelivered()){sendNotifications();} else {manager.connectAll();}/** Exponential backoff*/int tmpTimeOut = notTimeout*2;notTimeout = (tmpTimeOut < maxNotificationInterval?tmpTimeOut : maxNotificationInterval);LOG.info("Notification time out: " + notTimeout);}else if(self.getVotingView().containsKey(n.sid)) {/** Only proceed if the vote comes from a replica in the* voting view.*/
//处理通知逻辑
switch (n.state) {case LOOKING:// If notification > current, replace and send messages outif (n.electionEpoch > logicalclock) {logicalclock = n.electionEpoch;recvset.clear();if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {updateProposal(n.leader, n.zxid, n.peerEpoch);} else {updateProposal(getInitId(),getInitLastLoggedZxid(),getPeerEpoch());}sendNotifications();} else if (n.electionEpoch < logicalclock) {if(LOG.isDebugEnabled()){LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"+ Long.toHexString(n.electionEpoch)+ ", logicalclock=0x" + Long.toHexString(logicalclock));}break;} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,proposedLeader, proposedZxid, proposedEpoch)) {updateProposal(n.leader, n.zxid, n.peerEpoch);sendNotifications();}if(LOG.isDebugEnabled()){LOG.debug("Adding vote: from=" + n.sid +", proposed leader=" + n.leader +", proposed zxid=0x" + Long.toHexString(n.zxid) +", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));}
//更新recvSet
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));if (termPredicate(recvset,new Vote(proposedLeader, proposedZxid,logicalclock, proposedEpoch))) {// Verify if there is any change in the proposed leaderwhile((n = recvqueue.poll(finalizeWait,TimeUnit.MILLISECONDS)) != null){
//半路杀出个程咬金if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,proposedLeader, proposedZxid, proposedEpoch)){recvqueue.put(n);break;}}//如果n不为空, 说明出现了比 自己推荐的人更适合当leader的peer出现了/** This predicate is true once we don't read any new* relevant message from the reception queue*/if (n == null) {self.setPeerState((proposedLeader == self.getId()) ?ServerState.LEADING: learningState());Vote endVote = new Vote(proposedLeader,proposedZxid,logicalclock,proposedEpoch);leaveInstance(endVote);return endVote;}}break;case OBSERVING:LOG.debug("Notification from observer: " + n.sid);break;case FOLLOWING:case LEADING:/** Consider all notifications from the same epoch* together.*/
//逻辑时钟相同说明处于同一轮选举,需要更新recvSet后进行判断
if(n.electionEpoch == logicalclock){recvset.put(n.sid, new Vote(n.leader,n.zxid,n.electionEpoch,n.peerEpoch));if(ooePredicate(recvset, outofelection, n)) {self.setPeerState((n.leader == self.getId()) ?ServerState.LEADING: learningState());Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);leaveInstance(endVote);return endVote;}}
//新peer加入集群时需要判断一下是不是当前大多数的peer都follow这个Leader了,recvSet必然为空,所以需要更新ooe来判断是否结束了选举
/** Before joining an established ensemble, verify* a majority is following the same leader.*/outofelection.put(n.sid, new Vote(n.version,n.leader,n.zxid,n.electionEpoch,n.peerEpoch,n.state));if(ooePredicate(outofelection, outofelection, n)) {synchronized(this){logicalclock = n.electionEpoch;self.setPeerState((n.leader == self.getId()) ?ServerState.LEADING: learningState());}Vote endVote = new Vote(n.leader,n.zxid,n.electionEpoch,n.peerEpoch);leaveInstance(endVote);return endVote;}break;default:LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)",n.state, n.sid);break;}} else {LOG.warn("Ignoring notification from non-cluster member " + n.sid);}}return null;} finally {try {if(self.jmxLeaderElectionBean != null){MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean);}} catch (Exception e) {LOG.warn("Failed to unregister with JMX", e);}self.jmxLeaderElectionBean = null;}}