大纲
1.服务器的请求处理链
(1)Leader服务器的请求处理链
一.PrepRequestProcessor请求预处理器
二.ProposalRequestProcessor事务投票处理器
三.SyncRequestProcessor事务日志处理器
四.AckRequestProcessor投票反馈处理器
五.CommitProcessor事务提交处理器
六.ToBeAppliedRequestProcessor处理器
七.FinalRequestProcessor处理器
(2)Follower服务器的请求处理链
一.FollowerRequestProcessor请求转发处理器
二.SendAckRequestProcessor投票反馈处理器
2.服务端处理会话创建请求的流程
(1)请求接收
(2)会话创建
(3)请求预处理
(4)事务处理
(5)事务应用和响应
1.服务器的请求处理链
(1)Leader服务器的请求处理链
(2)Follower服务器的请求处理链
(1)Leader服务器的请求处理链
一.PrepRequestProcessor请求预处理器
二.ProposalRequestProcessor事务投票处理器
三.SyncRequestProcessor事务日志处理器
四.AckRequestProcessor投票反馈处理器
五.CommitProcessor事务提交处理器
六.ToBeAppliedRequestProcessor处理器
七.FinalRequestProcessor处理器
当客户端需要和zk服务端进行相互协调通信时,首先要通过Leader服务器建立该客户端与服务端的连接会话。当会话创建成功后,zk服务端就可以接收来自客户端的请求操作了。
Leader服务器是zk集群的核心,其主要工作是:
工作一:处理事务请求,保证集群事务处理的顺序性
工作二:集群内各服务器的调度者
zk服务端会使用责任链模式来处理每一个客户端的请求。在服务端启动时,会进行请求处理链的初始化。Leader服务器的请求处理链如下图示,主要有7个请求处理器。
一.PrepRequestProcessor请求预处理器
zk中的事务请求就是会改变服务器状态的请求。事务请求包括创建节点、更新节点、删除节点、创建会话等请求。
PrepRequestProcessor是Leader服务器的请求预处理器(Prepare),它能够识别出当前客户端请求是否是事务请求,它会对事务请求进行一系列的预处理操作。这些预处理包括:创建请求事务头事务体、会话检查、ACL检查等。
PrepRequestProcessor实现了RequestProcessor接口并继承了zk线程,而且还有一个RequestProcessor类型的nextProcessor属性字段,nextProcessor属性字段的作用是指向下一个请求处理器。
Leader服务器在开始处理请求时,会调用PrepRequestProcessor的processRequest()方法将请求添加到队列。请求预处理器的线程启动后会不断从submittedRequests队列取出请求,然后把请求交给PrepRequestProcessor的pRequest()方法进行预处理。在pRequest()方法中,会根据请求类型来判断请求是否是事务请求。如果是事务请求,就调用pRequest2Txn()方法对事务请求进行预处理。之后再将请求交给nextProcessor属性字段指向的处理器进行下一步处理。
PrepRequestProcessor处理器也是一个线程,它会先把请求添加到队列,然后由线程进行处理。
PrepRequestProcessor的nextProcessor属性指向的是ProposalRequestProcessor处理器。
public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider {...protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException, X509Exception {return new Leader(this, new LeaderZooKeeperServer(logFactory, this, this.zkDb));}...
}public class LeaderZooKeeperServer extends QuorumZooKeeperServer {CommitProcessor commitProcessor;PrepRequestProcessor prepRequestProcessor;...//初始化请求处理链@Overrideprotected void setupRequestProcessors() {RequestProcessor finalProcessor = new FinalRequestProcessor(this);RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener());commitProcessor.start();ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor);proposalProcessor.initialize();prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);prepRequestProcessor.start();//启动请求预处理器线程firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);setupContainerManager();}...
}public class PrepRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor {RequestProcessor nextProcessor;LinkedBlockingQueue<Request> submittedRequests = new LinkedBlockingQueue<Request>();public void processRequest(Request request) {//将请求添加到队列submittedRequests.add(request);}...@Overridepublic void run() {while (true) {Request request = submittedRequests.take();...pRequest(request);}}protected void pRequest(Request request) throws RequestProcessorException {request.setHdr(null);request.setTxn(null);switch (request.type) {...case OpCode.create:CreateRequest createRequest = new CreateRequest();pRequest2Txn(request.type, zks.getNextZxid(), request,createRequest, true);break;case OpCode.delete:...}...request.zxid = zks.getZxid();//将请求交给下一个处理器来处理nextProcessor.processRequest(request);}//下面这个方法专门用来对事务请求进行预处理protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize) {//设置请求的事务头事务体 request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid, Time.currentWallTime(), type));...}...
}
有两个入口会触发调用PrepRequestProcessor的processRequest()方法。
第一是Leader服务器监听到Learner转发给Leader的事务请求。也就是在不断运行的LearnerHandler线程中发现Learner给Leader发送请求时,会调用LeaderZooKeeperServer.submitLearnerRequest方法来触发。
第二是zk服务端监听到的来自客户端的事务请求。此时会先调用ZooKeeperServer的processPacket()方法处理Socket的读请求,然后再调用ZooKeeperServer的submitRequest()方法提交读请求,最后就会调用ZooKeeperServer的firstProcessor的processRequest()方法。firstProcessor的processRequest()方法执行完便进入PrepRequestProcessor。
//第一个入口
public class Leader {...void lead() throws IOException, InterruptedException {...cnxAcceptor = new LearnerCnxAcceptor();cnxAcceptor.start();...}class LearnerCnxAcceptor extends ZooKeeperCriticalThread {...@Overridepublic void run() {while (!stop) {Socket s = ss.accept();s.setSoTimeout(self.tickTime * self.initLimit);s.setTcpNoDelay(nodelay);BufferedInputStream is = new BufferedInputStream(s.getInputStream());LearnerHandler fh = new LearnerHandler(s, is, Leader.this);fh.start();...}...}}...
}public class LearnerHandler extends ZooKeeperThread {...@Overridepublic void run() {...while (true) {...case Leader.REQUEST:...//调用LeaderZooKeeperServer的submitLearnerRequest方法leader.zk.submitLearnerRequest(si);...}...}...
}public class LeaderZooKeeperServer extends QuorumZooKeeperServer {PrepRequestProcessor prepRequestProcessor;...@Overrideprotected void setupRequestProcessors() {...prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);prepRequestProcessor.start();firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);...}public void submitLearnerRequest(Request request) {prepRequestProcessor.processRequest(request);}...
}//第二个入口
public class NIOServerCnxnFactory extends ServerCnxnFactory {...class SelectorThread extends AbstractSelectThread {@Overridepublic void run() {...while (!stopped) {select();...}...}private void select() {selector.select();Set<SelectionKey> selected = selector.selectedKeys();ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(selected);Collections.shuffle(selectedList);Iterator<SelectionKey> selectedKeys = selectedList.iterator();while (!stopped && selectedKeys.hasNext()) {SelectionKey key = selectedKeys.next();selected.remove(key);...if (key.isReadable() || key.isWritable()) {//服务端从客户端读数据(读取请求) + 服务端向客户端写数据(发送响应)handleIO(key);}...}}private void handleIO(SelectionKey key) {IOWorkRequest workRequest = new IOWorkRequest(this, key);NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();cnxn.disableSelectable();key.interestOps(0);//激活连接:添加连接到连接过期队列touchCnxn(cnxn);//通过工作线程池来处理请求workerPool.schedule(workRequest);}...}private class IOWorkRequest extends WorkerService.WorkRequest {private final NIOServerCnxn cnxn;public void doWork() throws InterruptedException {...if (key.isReadable() || key.isWritable()) {cnxn.doIO(key);...}}...}
}public class WorkerService {...public void schedule(WorkRequest workRequest, long id) {ScheduledWorkRequest scheduledWorkRequest = new ScheduledWorkRequest(workRequest);int size = workers.size();if (size > 0) {int workerNum = ((int) (id % size) + size) % size;ExecutorService worker = workers.get(workerNum);worker.execute(scheduledWorkRequest);} else {scheduledWorkRequest.run();}}private class ScheduledWorkRequest implements Runnable {private final WorkRequest workRequest;ScheduledWorkRequest(WorkRequest workRequest) {this.workRequest = workRequest;}@Overridepublic void run() {...workRequest.doWork();}}...
}public class NIOServerCnxn extends ServerCnxn {private final ZooKeeperServer zkServer;void doIO(SelectionKey k) throws InterruptedException {...if (k.isReadable()) {...readPayload();}}private void readPayload() throws IOException, InterruptedException {...readRequest();}private void readRequest() throws IOException {//处理输入流zkServer.processPacket(this, incomingBuffer);}...
}public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {...public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {InputStream bais = new ByteBufferInputStream(incomingBuffer);BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);RequestHeader h = new RequestHeader();h.deserialize(bia, "header");incomingBuffer = incomingBuffer.slice();...Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo());submitRequest(si);...}public void submitRequest(Request si) {...//激活会话touch(si.cnxn);//firstProcessor.processRequest方法执行完便进入PrepRequestProcessorfirstProcessor.processRequest(si);...}...
}
二.ProposalRequestProcessor事务投票处理器
ProposalRequestProcessor处理器是Leader服务器的事务投票处理器。它是PrepRequestProcessor请求预处理器的下一个处理器,它的主要作用是对事务请求进行处理,包括创建提议、发起投票。
对于非事务请求:它会将请求直接交给CommitProcessor处理器处理,不再做其他处理。
对于事务请求:除了将请求交给CommitProcessor处理器外,还会创建请求对应的Proposal提议,并将Proposal提议发送给所有Follower来发起一次集群内的事务投票,同时还会将事务请求交给SyncRequestProcessor处理器来记录事务日志。
提议是指:当处理一个事务请求时,zk会先在服务端发起一次投票流程。该投票的主要作用是通知zk服务端的各机器处理事务请求,从而避免因某个机器出现问题而造成事务不一致的问题。
ProposalRequestProcessor事务投票处理器的三个子流程分别是:Commit流程、Proposal流程、Sync流程。
流程一:Commit流程
完成Proposal流程后,zk服务器上的数据还没有进行任何改变。完成Proposal流程只是说明zk服务端可以执行事务请求操作了,真正执行具体数据的变更需要在Commit流程中实现。Commit流程的主要作用就是完成请求的执行。该流程是由CommitProcessor处理器来实现的。
流程二:Proposal流程
处理事务请求时,zk要取得集群中过半机器的投票才能修改数据。Proposal流程的主要工作就是投票和统计投票结果。
流程三:Sync流程
Sync流程是由SyncRequestProcessor处理器来实现的。
ProposalRequestProcessor处理器不是一个线程,它的nextProcessor就是CommitProcessor处理器,它会调用SyncRequestProcessor处理器的processRequest()方法;
public class LeaderZooKeeperServer extends QuorumZooKeeperServer {...@Overrideprotected void setupRequestProcessors() {RequestProcessor finalProcessor = new FinalRequestProcessor(this);RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener());commitProcessor.start();//构建ProposalRequestProcessor处理器,下一个处理器为CommitProcessor处理器ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor);proposalProcessor.initialize();//初始化ProposalRequestProcessor处理器prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);prepRequestProcessor.start();firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);setupContainerManager();}...
}//ProposalRequestProcessor的nextProcessor就是CommitProcessor
public class ProposalRequestProcessor implements RequestProcessor {LeaderZooKeeperServer zks;RequestProcessor nextProcessor;//nextProcessor其实就是CommitProcessor处理器SyncRequestProcessor syncProcessor;//事务日志处理器,它的下一个处理器是AckRequestProcessorpublic ProposalRequestProcessor(LeaderZooKeeperServer zks, RequestProcessor nextProcessor) {this.zks = zks;this.nextProcessor = nextProcessor;AckRequestProcessor ackProcessor = new AckRequestProcessor(zks.getLeader());//创建事务日志处理器,它的下一个处理器是AckRequestProcessorsyncProcessor = new SyncRequestProcessor(zks, ackProcessor);}//初始化ProposalRequestProcessor处理器public void initialize() {syncProcessor.start();//启动事务日志处理器的线程}public void processRequest(Request request) throws RequestProcessorException {if (request instanceof LearnerSyncRequest) {//处理Learner的数据同步请求zks.getLeader().processSync((LearnerSyncRequest)request);} else {//Commit流程,nextProcessor其实就是CommitProcessor处理器nextProcessor.processRequest(request);if (request.getHdr() != null) {//Proposal流程zks.getLeader().propose(request);//Sync流程,将请求添加到队列,然后由事务日志处理器线程去处理syncProcessor.processRequest(request);}}}...
}public class Leader {final ConcurrentMap<Long, Proposal> outstandingProposals = new ConcurrentHashMap<Long, Proposal>();...public Proposal propose(Request request) throws XidRolloverException {...byte[] data = SerializeUtils.serializeRequest(request);proposalStats.setLastBufferSize(data.length);QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null);//生成Proposal提议Proposal p = new Proposal();p.packet = pp;p.request = request;synchronized(this) {p.addQuorumVerifier(self.getQuorumVerifier());if (request.getHdr().getType() == OpCode.reconfig) {self.setLastSeenQuorumVerifier(request.qv, true); }if (self.getQuorumVerifier().getVersion()<self.getLastSeenQuorumVerifier().getVersion()) {p.addQuorumVerifier(self.getLastSeenQuorumVerifier());}lastProposed = p.packet.getZxid();//将发送的Proposal提议放入outstandingProposals队列中outstandingProposals.put(lastProposed, p);//发送Proposal提议,其实就是把Proposal提议交给LearnerHandler处理sendPacket(pp);}return p;}void sendPacket(QuorumPacket qp) {synchronized (forwardingFollowers) {for (LearnerHandler f : forwardingFollowers) {//LearnerHandler会将提议放入其发送队列里f.queuePacket(qp);}}}...
}public class LearnerHandler extends ZooKeeperThread {final LinkedBlockingQueue<QuorumPacket> queuedPackets = new LinkedBlockingQueue<QuorumPacket>();...void queuePacket(QuorumPacket p) {queuedPackets.add(p);}@Overridepublic void run() {...//启动一个线程去发送Packet,比如Proposal提议startSendingPackets();...}protected void startSendingPackets() {if (!sendingThreadStarted) {// Start sending packetsnew Thread() {public void run() {Thread.currentThread().setName("Sender-" + sock.getRemoteSocketAddress());sendPackets();}}.start();sendingThreadStarted = true;} else {LOG.error("Attempting to start sending thread after it already started");}}private void sendPackets() throws InterruptedException {long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;while (true) {QuorumPacket p;p = queuedPackets.poll();if (p == null) {bufferedOutput.flush();p = queuedPackets.take();}if (p == proposalOfDeath) {break;}if (p.getType() == Leader.PING) {traceMask = ZooTrace.SERVER_PING_TRACE_MASK;}if (p.getType() == Leader.PROPOSAL) {syncLimitCheck.updateProposal(p.getZxid(), System.nanoTime());}if (LOG.isTraceEnabled()) {ZooTrace.logQuorumPacket(LOG, traceMask, 'o', p);}oa.writeRecord(p, "packet");}}...
}
三.SyncRequestProcessor事务日志处理器
SyncRequestProcessor处理器是事务日志处理器。它的作用是将事务请求记录到事务日志文件中,同时触发zk进行数据快照。
SyncRequestProcessor处理器也是一个线程,它会先把请求添加到队列,然后由线程处理,它的nextProcessor是AckRequestProcessor处理器。
//SyncRequestProcessor事务日志处理器,它的下一个处理器是AckRequestProcessor
public class SyncRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor {private final ZooKeeperServer zks;private final LinkedBlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>();private final RequestProcessor nextProcessor;//AckRequestProcessor处理器private Thread snapInProcess = null;volatile private boolean running;private final LinkedList<Request> toFlush = new LinkedList<Request>();private final Random r = new Random();private static int snapCount = ZooKeeperServer.getSnapCount();private final Request requestOfDeath = Request.requestOfDeath;public SyncRequestProcessor(ZooKeeperServer zks, RequestProcessor nextProcessor) {super("SyncThread:" + zks.getServerId(), zks.getZooKeeperServerListener());this.zks = zks;this.nextProcessor = nextProcessor;running = true;}public void processRequest(Request request) {//将请求添加到队列queuedRequests.add(request);}@Overridepublic void run() {try {int logCount = 0;int randRoll = r.nextInt(snapCount/2);while (true) {Request si = null;if (toFlush.isEmpty()) {si = queuedRequests.take();} else {si = queuedRequests.poll();if (si == null) {flush(toFlush);continue;}}if (si == requestOfDeath) {break;}if (si != null) {if (zks.getZKDatabase().append(si)) {logCount++;if (logCount > (snapCount / 2 + randRoll)) {randRoll = r.nextInt(snapCount/2);// roll the logzks.getZKDatabase().rollLog();// take a snapshotif (snapInProcess != null && snapInProcess.isAlive()) {LOG.warn("Too busy to snap, skipping");} else {snapInProcess = new ZooKeeperThread("Snapshot Thread") {public void run() {try {zks.takeSnapshot();} catch(Exception e) {LOG.warn("Unexpected exception", e);}}};snapInProcess.start();}logCount = 0;}} else if (toFlush.isEmpty()) {if (nextProcessor != null) {nextProcessor.processRequest(si);if (nextProcessor instanceof Flushable) {((Flushable)nextProcessor).flush();}}continue;}toFlush.add(si);if (toFlush.size() > 1000) {flush(toFlush);}}}} catch (Throwable t) {handleException(this.getName(), t);} finally{running = false;}}private void flush(LinkedList<Request> toFlush) throws IOException, RequestProcessorException {if (toFlush.isEmpty()) {return;}zks.getZKDatabase().commit();while (!toFlush.isEmpty()) {Request i = toFlush.remove();if (nextProcessor != null) {nextProcessor.processRequest(i);}}if (nextProcessor != null && nextProcessor instanceof Flushable) {((Flushable)nextProcessor).flush();}}public void shutdown() {queuedRequests.add(requestOfDeath);if (running) {this.join();}if (!toFlush.isEmpty()) {flush(toFlush);}if (nextProcessor != null) {nextProcessor.shutdown();}}
}
四.AckRequestProcessor投票反馈处理器
SyncRequestProcessor的nextProcessor就是AckRequestProcessor,AckRequestProcessor是Leader特有的处理器。
它负责在SyncRequestProcessor处理器完成事务日志记录后,通过Leader的processAck()方法向Proposal提议添加来自Leader的ACK响应。也就是将Leader的SID添加到Proposal提议的投票收集器里,然后调用Leader的tryToCommit()方法检查提议是否已有过半ACK并尝试提交。
同理,如果Leader收到Follower对该Proposal提议请求返回的ACK响应,也会通过Leader的processAck()方法向提议添加来自Follower的ACK响应,也就是将Follower的SID添加到Proposal提议的投票收集器里,然后调用Leader的tryToCommit()方法检查提议是否已有过半ACK来尝试提交。
AckRequestProcessor处理器不是一个线程,它没有nextProcessor属性字段。
//SyncRequestProcessor的nextProcessor就是AckRequestProcessor
class AckRequestProcessor implements RequestProcessor {Leader leader;AckRequestProcessor(Leader leader) {this.leader = leader;}//Forward the request as an ACK to the leaderpublic void processRequest(Request request) {QuorumPeer self = leader.self;if (self != null) {//Leader也作为参与Proposal投票的一份子进行ACK响应//将Leader的SID添加到Proposal提议的投票收集器里 + 检查Proposal提议的投票收集器是否有过半ACK才提交leader.processAck(self.getId(), request.zxid, null);} else {LOG.error("Null QuorumPeer");}}
}public class LearnerHandler extends ZooKeeperThread {...@Overridepublic void run() {...while (true) {...switch (qp.getType()) {case Leader.ACK:...//如果Leader收到Follower对某Proposal提议请求返回的ACK响应//那么就将Follower的SID添加到该Proposal提议的投票收集器里leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());break;...}...}...
}public class Leader {final ConcurrentMap<Long, Proposal> outstandingProposals = new ConcurrentHashMap<Long, Proposal>();...public Proposal propose(Request request) throws XidRolloverException {...byte[] data = SerializeUtils.serializeRequest(request);proposalStats.setLastBufferSize(data.length);QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null);//生成Proposal提议Proposal p = new Proposal();p.packet = pp;p.request = request;synchronized(this) {p.addQuorumVerifier(self.getQuorumVerifier());if (request.getHdr().getType() == OpCode.reconfig) {self.setLastSeenQuorumVerifier(request.qv, true); }if (self.getQuorumVerifier().getVersion()<self.getLastSeenQuorumVerifier().getVersion()) {p.addQuorumVerifier(self.getLastSeenQuorumVerifier());}lastProposed = p.packet.getZxid();//将发送的Proposal提议放入outstandingProposals队列中outstandingProposals.put(lastProposed, p);//发送Proposal提议,其实就是把Proposal提议交给LearnerHandler处理sendPacket(pp);}return p;}void sendPacket(QuorumPacket qp) {synchronized (forwardingFollowers) {for (LearnerHandler f : forwardingFollowers) {//LearnerHandler会将提议放入其发送队列里f.queuePacket(qp);}}}synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) { ...//检查请求的ZXID,需要比上次已提交的请求的ZXID也就是lastCommitted要大if (lastCommitted >= zxid) {if (LOG.isDebugEnabled()) {LOG.debug("proposal has already been committed, pzxid: 0x{} zxid: 0x{}", Long.toHexString(lastCommitted), Long.toHexString(zxid));}// The proposal has already been committedreturn;}Proposal p = outstandingProposals.get(zxid);//将Leader的SID添加到Proposal提议的投票收集器里p.addAck(sid);//尝试提交,即检查Proposal提议的投票收集器中是否有过半ACK响应boolean hasCommitted = tryToCommit(p, zxid, followerAddr);...}synchronized public boolean tryToCommit(Proposal p, long zxid, SocketAddress followerAddr) { //如果提议队列中存在该提议的前一个提议,说明该提议的前一个提议还没提交,那么就返回false//zxid - 1是因为,只有事务请求才会生成zxid,那么前一个事务肯定就是zxid = 1if (outstandingProposals.containsKey(zxid - 1)) return false;//getting a quorum from all necessary configurations.//Proposal提议的投票收集器是否已过半if (!p.hasAllQuorums()) {return false; }...outstandingProposals.remove(zxid);if (p.request != null) {toBeApplied.add(p);}...//一旦提议通过,马上就要在Leader中标记lastCommitted为最新的提交ZXIDcommit(zxid);//给Follower广播commit消息inform(p);//给Observer发送commit消息...//调用CommitProcessor处理器的commit方法提交请求zk.commitProcessor.commit(p.request);//让Leader执行commit消息//下面处理的是Learner发起的同步请求if (pendingSyncs.containsKey(zxid)) {for (LearnerSyncRequest r: pendingSyncs.remove(zxid)) {sendSync(r);} } return true; }//广播commit消息public void commit(long zxid) {synchronized(this) {//标记lastCommitted为最新的提交ZXIDlastCommitted = zxid;}QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null);sendPacket(qp);}void sendPacket(QuorumPacket qp) {synchronized (forwardingFollowers) {for (LearnerHandler f : forwardingFollowers) {//调用LearnerHandler的queuePacket方法添加Packet到发送队列f.queuePacket(qp);}}}public void inform(Proposal proposal) {QuorumPacket qp = new QuorumPacket(Leader.INFORM, proposal.request.zxid, proposal.packet.getData(), null);sendObserverPacket(qp);}...static public class Proposal extends SyncedLearnerTracker {public QuorumPacket packet;public Request request;...}
}public class SyncedLearnerTracker {protected ArrayList<QuorumVerifierAcksetPair> qvAcksetPairs = new ArrayList<QuorumVerifierAcksetPair>();...//添加到投票收集器public boolean addAck(Long sid) {boolean change = false;for (QuorumVerifierAcksetPair qvAckset : qvAcksetPairs) {if (qvAckset.getQuorumVerifier().getVotingMembers().containsKey(sid)) {qvAckset.getAckset().add(sid);change = true;}}return change;}//判断投票收集器是否过半public boolean hasAllQuorums() {for (QuorumVerifierAcksetPair qvAckset : qvAcksetPairs) {if (!qvAckset.getQuorumVerifier().containsQuorum(qvAckset.getAckset()))return false;}return true;}...
}
五.CommitProcessor事务提交处理器
ProposalRequestProcessor的nextProcessor就是CommitProcessor处理器,CommitProcessor就是事务提交处理器。
对于非事务请求,CommitProcessor会将其转交给nextProcessor处理。对于事务请求,CommitProcessor会阻塞等待Proposal提议可以被提交。
CommitProcessor有个LinkedBlockingQueue队列queuedRequests。当调用CommitProcessor的processRequest()方法时,请求会被添加到该队列。CommitProcessor线程会从queuedRequests队列中取出请求进行处理。此外还通过nextPending和committedRequests队列保证请求的顺序处理。
CommitProcessor处理器也是一个线程,它会先把请求添加到队列,然后由线程处理,它的nextProcessor是ToBeAppliedRequestProcessor.
public class CommitProcessor extends ZooKeeperCriticalThread implements RequestProcessor {//请求队列protected final LinkedBlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>();protected final LinkedBlockingQueue<Request> committedRequests = new LinkedBlockingQueue<Request>();//下一个要提交的请求protected final AtomicReference<Request> nextPending = new AtomicReference<Request>();//当前正在处理的请求数protected AtomicInteger numRequestsProcessing = new AtomicInteger(0);...@Overridepublic void processRequest(Request request) {if (stopped) {return;}queuedRequests.add(request);if (!isWaitingForCommit()) {wakeup();//唤醒}}private boolean isProcessingRequest() {return numRequestsProcessing.get() != 0;}private boolean isWaitingForCommit() {return nextPending.get() != null;}private boolean isProcessingCommit() {return currentlyCommitting.get() != null;}synchronized private void wakeup() {notifyAll();//唤醒阻塞的线程}@Overridepublic void run() {Request request;while (!stopped) {synchronized(this) {while (!stopped && ((queuedRequests.isEmpty() || isWaitingForCommit() || isProcessingCommit()) && (committedRequests.isEmpty() || isProcessingRequest()))) {wait();//阻塞等待}}while (!stopped && !isWaitingForCommit() && !isProcessingCommit() && (request = queuedRequests.poll()) != null) {if (needCommit(request)) {//需要进行提交的事务请求nextPending.set(request);//设置下一个要提交的请求} else {//非事务请求转交给下一个处理器sendToNextProcessor(request);}}processCommitted();//处理提交}}protected void processCommitted() {Request request;if (!stopped && !isProcessingRequest() && (committedRequests.peek() != null)) {if ( !isWaitingForCommit() && !queuedRequests.isEmpty()) {return;}request = committedRequests.poll();Request pending = nextPending.get();if (pending != null && pending.sessionId == request.sessionId && pending.cxid == request.cxid) {pending.setHdr(request.getHdr());pending.setTxn(request.getTxn());pending.zxid = request.zxid;currentlyCommitting.set(pending);nextPending.set(null);sendToNextProcessor(pending);} else {currentlyCommitting.set(request);sendToNextProcessor(request);}} }public void commit(Request request) {committedRequests.add(request);if (!isProcessingCommit()) {//CommitProcessor处理器当前没有提交请求wakeup();//CommitProcessor唤醒线程}}private void sendToNextProcessor(Request request) {numRequestsProcessing.incrementAndGet();workerPool.schedule(new CommitWorkRequest(request), request.sessionId);}private class CommitWorkRequest extends WorkerService.WorkRequest {private final Request request;CommitWorkRequest(Request request) {this.request = request;}...public void doWork() throws RequestProcessorException {try {nextProcessor.processRequest(request);} finally {currentlyCommitting.compareAndSet(request, null);if (numRequestsProcessing.decrementAndGet() == 0) {if (!queuedRequests.isEmpty() || !committedRequests.isEmpty()) {wakeup();}}}}}...
}
如何理解保证事务请求的顺序处理:
顺序排队的事务请求在被ProposalRequestProcessor处理的过程中,首先会执行CommitProcessor的processRequest()方法将请求加入请求队列,所以请求队列queuedRequests里面的请求是按顺序排好的。然后会生成Proposal提议发送给Follower并收集ACK响应,最后当ACK响应过半时才调用CommitProcessor的commit()方法,此时可以进行提交的请求就会被添加到CommitProcessor的committedRequests队列中。
是否会因网络原因,导致CommitProcessor的committedRequests队列里的请求并不一定按顺序排好呢?
事务请求能保证顺序处理的根本原因是:
整个Proposal消息广播过程是基于FIFO特性的TCP协议来进行网络通信的,所以能够很容易保证消息广播过程中消息接收和发送的顺序性。也就是广播时是由一个主进程Leader去通过FIFO的TCP协议进行发送的,所以每个Follower接收到的Proposal和Commit请求都会按顺序进入队列。
客户端并发执行的事务请求到达Leader时一定会按顺序入队。然后Leader对事务请求进行广播时,也会按顺序进行广播。被单一Leader进行顺序广播的多个事务请求也会顺序到达某Follower。所以某Follower收到的多个Proposal提议也会按广播时的顺序进入队列,之后某Follower都会按广播时的顺序发送ACK响应给Leader。
所以Leader收到某Follower的ACK响应都是按广播时的顺序收到的。即使Leader先收到Follower2响应的事务2,后收到Follower1的响应事务1,但最终统计过半选票时,Leader会发现事务1首先过半从而优先保证事务1的顺序。
当然,Leader的processAck()方法会先确保要被提交的请求ZXID比上次大。此外,Leader的tryToCommit()方法也会首先确保前一个事务提交了才能处理。以及Follower在接收到Proposal和Commit请求就是按顺序响应,即若Follower要提交的事务ID不是pendingTxns的头部元素,那么就退出程序。最后结合CommitProcessor里的queuedRequests + committedRequests + nextPending,于是便能保证事务请求的顺序处理。
public class Leader {...synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) { ...//检查请求的ZXID,需要比上次已提交的请求的ZXID也就是lastCommitted要大if (lastCommitted >= zxid) {if (LOG.isDebugEnabled()) {LOG.debug("proposal has already been committed, pzxid: 0x{} zxid: 0x{}", Long.toHexString(lastCommitted), Long.toHexString(zxid));}// The proposal has already been committedreturn;}Proposal p = outstandingProposals.get(zxid);//将Leader的SID添加到Proposal提议的投票收集器里p.addAck(sid);//尝试提交,即检查Proposal提议的投票收集器中是否有过半ACK响应boolean hasCommitted = tryToCommit(p, zxid, followerAddr);...}synchronized public boolean tryToCommit(Proposal p, long zxid, SocketAddress followerAddr) { //如果提议队列中存在该提议的前一个提议,说明该提议的前一个提议还没提交,那么就返回false//zxid - 1是因为,只有事务请求才会生成zxid,那么前一个事务肯定就是zxid = 1if (outstandingProposals.containsKey(zxid - 1)) return false;//getting a quorum from all necessary configurations.//Proposal提议的投票收集器是否已过半if (!p.hasAllQuorums()) {return false; }...zk.commitProcessor.commit(p.request);... }...
}public class Follower extends Learner{...void followLeader() throws InterruptedException {...while (this.isRunning()) {readPacket(qp);processPacket(qp);}...}protected void processPacket(QuorumPacket qp) throws Exception {switch (qp.getType()) {case Leader.PING: ping(qp); break;case Leader.PROPOSAL://处理Leader发起的Proposal提议投票请求TxnHeader hdr = new TxnHeader();Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);lastQueued = hdr.getZxid();...fzk.logRequest(hdr, txn);break;case Leader.COMMIT://处理Leader发送过来的对Proposal提议进行提交的请求fzk.commit(qp.getZxid());break;...}}
}public class FollowerZooKeeperServer extends LearnerZooKeeperServer {LinkedBlockingQueue<Request> pendingTxns = new LinkedBlockingQueue<Request>();...//将收到的投票请求放入队列pendingTxnspublic void logRequest(TxnHeader hdr, Record txn) {Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());if ((request.zxid & 0xffffffffL) != 0) {pendingTxns.add(request);}syncProcessor.processRequest(request);}//When a COMMIT message is received, eventually this method is called,//which matches up the zxid from the COMMIT with (hopefully) the head of //the pendingTxns queue and hands it to the commitProcessor to commit.//@param zxid - must correspond to the head of pendingTxns if it existspublic void commit(long zxid) {if (pendingTxns.size() == 0) {LOG.warn("Committing " + Long.toHexString(zxid) + " without seeing txn");return;}long firstElementZxid = pendingTxns.element().zxid;if (firstElementZxid != zxid) {//如果Follower需要提交的事务ID不是pendingTxns的头部元素,就退出程序LOG.error("Committing zxid 0x" + Long.toHexString(zxid) + " but next pending txn 0x" + Long.toHexString(firstElementZxid));System.exit(12);}Request request = pendingTxns.remove();commitProcessor.commit(request);}...
}
六.ToBeAppliedRequestProcessor处理器
Leader中有一个toBeApplied队列,专门存储那些可以被提交的Proposal提议。ToBeAppliedRequestProcessor会把已被CommitProcessor处理过的请求,转交给下一个处理器处理,并把请求从Leader的toBeApplied队列中移除。
ToBeAppliedRequestProcessor处理器不是一个线程,它的next是FinalRequestProcessor处理器。
public class Leader {private final ConcurrentLinkedQueue<Proposal> toBeApplied = new ConcurrentLinkedQueue<Proposal>();...static class ToBeAppliedRequestProcessor implements RequestProcessor {private final RequestProcessor next;private final Leader leader;ToBeAppliedRequestProcessor(RequestProcessor next, Leader leader) {this.leader = leader;this.next = next;}...public void processRequest(Request request) throws RequestProcessorException {next.processRequest(request);if (request.getHdr() != null) {long zxid = request.getHdr().getZxid();Iterator<Proposal> iter = leader.toBeApplied.iterator();if (iter.hasNext()) {Proposal p = iter.next();if (p.request != null && p.request.zxid == zxid) {iter.remove();return;}}}}...}...
}
七.FinalRequestProcessor处理器
FinalRequestProcessor处理器用来处理返回客户端响应前的收尾工作,包括创建客户端的响应、将事务请求应用到内存数据库中。
FinalRequestProcessor处理器不是一个线程,它也没有nextProcessor属性字段。
public class FinalRequestProcessor implements RequestProcessor {...public void processRequest(Request request) {...ProcessTxnResult rc = null;synchronized (zks.outstandingChanges) {// Need to process local session requestsrc = zks.processTxn(request);if (request.getHdr() != null) {TxnHeader hdr = request.getHdr();Record txn = request.getTxn();long zxid = hdr.getZxid();while (!zks.outstandingChanges.isEmpty() && zks.outstandingChanges.peek().zxid <= zxid) {ChangeRecord cr = zks.outstandingChanges.remove();if (zks.outstandingChangesForPath.get(cr.path) == cr) {zks.outstandingChangesForPath.remove(cr.path);}}}// do not add non quorum packets to the queue.if (request.isQuorum()) {zks.getZKDatabase().addCommittedProposal(request);}}...//创建响应并发送响应给客户端long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();ReplyHeader hdr = new ReplyHeader(request.cxid, lastZxid, err.intValue());zks.serverStats().updateLatency(request.createTime);cnxn.updateStatsForResponse(request.cxid, lastZxid, lastOp, request.createTime, Time.currentElapsedTime());cnxn.sendResponse(hdr, rsp, "response");if (request.type == OpCode.closeSession) {cnxn.sendCloseSession();}}...
}
总结:
PrepRequestProcessor的nextProcessor就是ProposalRequestProcessor;
ProposalRequestProcessor的nextProcessor就是CommitProcessor;
CommitProcessor的nextProcessor就是ToBeAppliedRequestProcessor;
ToBeAppliedRequestProcessor的next是FinalRequestProcessor;
FinalRequestProcessor没有nextProcessor属性字段;ProposalRequestProcessor会调用SyncRequestProcessor处理器的方法;
SyncRequestProcessor的nextProcessor就是AckRequestProcessor;
AckRequestProcessor没有nextProcessor属性字段;PrepRequestProcessor处理器是一个线程;
ProposalRequestProcessor处理器不是一个线程;
CommitProcessor处理器是一个线程;
ToBeAppliedRequestProcessor处理器不是一个线程;
FinalRequestProcessor处理器不是一个线程;SyncRequestProcessor处理器是一个线程;
AckRequestProcessor处理器不是一个线程;
(2)Follower服务器的请求处理链
一.FollowerRequestProcessor请求转发处理器
二.SendAckRequestProcessor投票反馈处理器
Follower服务器的主要工作是:
一.处理非事务请求 + 转发事务请求给Leader服务器
二.参与事务请求的Proposal提议的投票
三.参与Leader选举投票
Follower服务器的请求处理链如下图示:
Leader服务器的第一个处理器是LeaderRequestProcessor,Follower服务器的第一个处理器是FollowerRequestProcessor。由于不需要处理事务请求的投票,所以Follower服务器没有ProposalRequestProcessor处理器。
一.FollowerRequestProcessor请求转发处理器
FollowerRequestProcessor主要工作是识别当前请求是否是事务请求。如果是事务请求,那么Follower就会将该事务请求转发给Leader服务器。FollowerRequestProcessor处理器会通过调用Learner的request()方法实现请求转发。Learner的request方法会往输出流leaderOs中写入请求数据来发给Leader。输出流leaderOs在Follower和Leader建立好连接时就已经初始化好了的。
public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider {...protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.zkDb));}...
}public class FollowerZooKeeperServer extends LearnerZooKeeperServer {...@Overrideprotected void setupRequestProcessors() {RequestProcessor finalProcessor = new FinalRequestProcessor(this);commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());commitProcessor.start();firstProcessor = new FollowerRequestProcessor(this, commitProcessor);((FollowerRequestProcessor) firstProcessor).start();syncProcessor = new SyncRequestProcessor(this, new SendAckRequestProcessor((Learner)getFollower()));syncProcessor.start();}public Follower getFollower(){return self.follower;}...
}public class FollowerRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor {FollowerZooKeeperServer zks;RequestProcessor nextProcessor;LinkedBlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>();boolean finished = false;public FollowerRequestProcessor(FollowerZooKeeperServer zks, RequestProcessor nextProcessor) {super("FollowerRequestProcessor:" + zks.getServerId(), zks.getZooKeeperServerListener());this.zks = zks;this.nextProcessor = nextProcessor;}@Overridepublic void run() {while (!finished) {Request request = queuedRequests.take();if (request == Request.requestOfDeath) {break;}nextProcessor.processRequest(request);//如果是事务请求,则调用zks.getFollower().request(request)转发事务请求给Leaderswitch (request.type) {case OpCode.sync:zks.pendingSyncs.add(request);zks.getFollower().request(request);break;case OpCode.create:case OpCode.create2:case OpCode.createTTL:case OpCode.createContainer:case OpCode.delete:case OpCode.deleteContainer:case OpCode.setData:case OpCode.reconfig:case OpCode.setACL:case OpCode.multi:case OpCode.check:zks.getFollower().request(request);break;case OpCode.createSession:case OpCode.closeSession:// Don't forward local sessions to the leader.if (!request.isLocalSession()) {zks.getFollower().request(request);}break;}}}public void processRequest(Request request) {if (!finished) {Request upgradeRequest = null;upgradeRequest = zks.checkUpgradeSession(request);if (upgradeRequest != null) {queuedRequests.add(upgradeRequest);}queuedRequests.add(request);}}...
}public class Learner {protected BufferedOutputStream bufferedOutput;protected Socket sock;protected InputArchive leaderIs;protected OutputArchive leaderOs;...//send a request packet to the leader//发送一个请求给Leadervoid request(Request request) throws IOException {ByteArrayOutputStream baos = new ByteArrayOutputStream();DataOutputStream oa = new DataOutputStream(baos);oa.writeLong(request.sessionId);oa.writeInt(request.cxid);oa.writeInt(request.type);if (request.request != null) {request.request.rewind();int len = request.request.remaining();byte b[] = new byte[len];request.request.get(b);request.request.rewind();oa.write(b);}oa.close();QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos.toByteArray(), request.authInfo);//发送请求,往输出流leaderOs写数据writePacket(qp, true);}void writePacket(QuorumPacket pp, boolean flush) throws IOException {synchronized (leaderOs) {if (pp != null) {leaderOs.writeRecord(pp, "packet");}if (flush) {bufferedOutput.flush();}}}//和Leader建立连接时就已经初始化好输出流leaderOs了protected void connectToLeader(InetSocketAddress addr, String hostname) throws IOException, InterruptedException, X509Exception {this.sock = createSocket();int initLimitTime = self.tickTime * self.initLimit;int remainingInitLimitTime = initLimitTime;long startNanoTime = nanoTime();for (int tries = 0; tries < 5; tries++) {remainingInitLimitTime = initLimitTime - (int)((nanoTime() - startNanoTime) / 1000000);sockConnect(sock, addr, Math.min(self.tickTime * self.syncLimit, remainingInitLimitTime));if (self.isSslQuorum()) {((SSLSocket) sock).startHandshake();}sock.setTcpNoDelay(nodelay);break;}Thread.sleep(1000);self.authLearner.authenticate(sock, hostname);//初始化好输入流leaderIsleaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream()));bufferedOutput = new BufferedOutputStream(sock.getOutputStream());//初始化好输出流leaderOsleaderOs = BinaryOutputArchive.getArchive(bufferedOutput);}//创建BIO的scoektprivate Socket createSocket() throws X509Exception, IOException {Socket sock;if (self.isSslQuorum()) {sock = self.getX509Util().createSSLSocket();} else {sock = new Socket();}sock.setSoTimeout(self.tickTime * self.initLimit);return sock;}...
}
二.SendAckRequestProcessor投票反馈处理器
Leader的请求处理链有个叫AckRequestProcessor的投票反馈处理器,主要负责在执行完SyncRequestProcessor处理器记录好事务日志后,向Proposal提议反馈来自Leader的ACK响应。
Follower的请求处理链也有个叫SendAckRequestProcessor的投票反馈处理器,主要负责在执行完SyncRequestProcessor处理器记录好事务日志后,通过发送消息给Leader来向Proposal提议反馈来自Follower的ACK响应。
Follower请求处理链的SyncRequestProcessor处理器会启动一个线程。SyncRequestProcessor处理器会先把请求添加到队列,然后由线程处理。SyncRequestProcessor的nextProcessor就是SendAckRequestProcessor请求处理器。SendAckRequestProcessor不是一个线程。
public class SendAckRequestProcessor implements RequestProcessor, Flushable {Learner learner;SendAckRequestProcessor(Learner peer) {this.learner = peer;}public void processRequest(Request si) {if (si.type != OpCode.sync) {QuorumPacket qp = new QuorumPacket(Leader.ACK, si.getHdr().getZxid(), null, null);learner.writePacket(qp, false);//向Leader发送ACK响应...}}...
}public class Follower extends Learner {...void followLeader() throws InterruptedException {...QuorumServer leaderServer = findLeader(); connectToLeader(leaderServer.addr, leaderServer.hostname);long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);syncWithLeader(newEpochZxid); QuorumPacket qp = new QuorumPacket();while (this.isRunning()) {readPacket(qp);//读取Leader发过来的请求的输入流leaderIsprocessPacket(qp);//处理Leader发过来的请求,其中就包括Proposal提议的投票请求}...}protected void processPacket(QuorumPacket qp) throws Exception{switch (qp.getType()) {case Leader.PING: ping(qp); break;//对Leader发起的Proposal提议投票进行响应case Leader.PROPOSAL: TxnHeader hdr = new TxnHeader();Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);...//对Leader发起的Proposal提议投票进行响应//此时请求便能进入SyncRequestProcessor处理器的队列里了//SyncRequestProcessor线程处理完该请求,就会由SendAckRequestProcessor来发送ACK响应fzk.logRequest(hdr, txn);break;...}}...
}public class FollowerZooKeeperServer extends LearnerZooKeeperServer {...@Overrideprotected void setupRequestProcessors() {RequestProcessor finalProcessor = new FinalRequestProcessor(this);commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());commitProcessor.start();firstProcessor = new FollowerRequestProcessor(this, commitProcessor);((FollowerRequestProcessor) firstProcessor).start();syncProcessor = new SyncRequestProcessor(this, new SendAckRequestProcessor((Learner)getFollower()));syncProcessor.start();}public void logRequest(TxnHeader hdr, Record txn) {Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());if ((request.zxid & 0xffffffffL) != 0) {pendingTxns.add(request);}//调用SyncRequestProcessor的processRequest方法处理Proposal提议的投票响应syncProcessor.processRequest(request);}...
}public class Learner {protected BufferedOutputStream bufferedOutput;protected Socket sock;protected InputArchive leaderIs;protected OutputArchive leaderOs;...void readPacket(QuorumPacket pp) throws IOException {synchronized (leaderIs) {//读取Leader发送过来的请求的输入流leaderIs.readRecord(pp, "packet");}}void writePacket(QuorumPacket pp, boolean flush) throws IOException {synchronized (leaderOs) {if (pp != null) {//将响应写入输出流,发送给LeaderleaderOs.writeRecord(pp, "packet");}if (flush) {bufferedOutput.flush();}}}//和Leader建立连接时就已经初始化好输出流leaderOs了protected void connectToLeader(InetSocketAddress addr, String hostname) throws IOException, InterruptedException, X509Exception {this.sock = createSocket();int initLimitTime = self.tickTime * self.initLimit;int remainingInitLimitTime = initLimitTime;long startNanoTime = nanoTime();for (int tries = 0; tries < 5; tries++) {remainingInitLimitTime = initLimitTime - (int)((nanoTime() - startNanoTime) / 1000000);sockConnect(sock, addr, Math.min(self.tickTime * self.syncLimit, remainingInitLimitTime));if (self.isSslQuorum()) {((SSLSocket) sock).startHandshake();}sock.setTcpNoDelay(nodelay);break;}Thread.sleep(1000);self.authLearner.authenticate(sock, hostname);//初始化好输入流leaderIsleaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream()));bufferedOutput = new BufferedOutputStream(sock.getOutputStream());//初始化好输出流leaderOsleaderOs = BinaryOutputArchive.getArchive(bufferedOutput);}...
}