“不积跬步,无以至千里。”
背景
- 确定使用Curator作为zk客户端的情况下,断网[发生SUSPENDED | LOST事件]重连后每次都会回调
org.apache.curator.framework.state.ConnectionStateListener#stateChanged
方法,且事件类型为org.apache.curator.framework.state.ConnectionState#RECONNECTED
- 部署zookeeper的版本为最新稳定版3.8.3,curator-recipes相关依赖的版本为5.5.0
源码分析过程
-
首先需要构建一个CuratorFramework对象,并基于这个CuratorFramework对象创建一个用于实现Leader选举功能的LeaderSelector,并将它启动
public static final String leaderSelectorPath = "/source-code-analyse/reconnect"; public static void main(String[] args) throws InterruptedException {CuratorFramework curatorFramework = newCuratorFramework();LeaderSelectorListener leaderSelectorListener = new LeaderSelectorListener() {@Overridepublic void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {System.out.println("Thread " + Thread.currentThread().getName() + " Connection state changed : " + connectionState);}@Overridepublic void takeLeadership(CuratorFramework curatorFramework) throws Exception {System.out.println("Thread " + Thread.currentThread().getName() + " get the leader.");TimeUnit.SECONDS.sleep(20);}};LeaderSelector leaderSelector = new LeaderSelector(curatorFramework, leaderSelectorPath, leaderSelectorListener);leaderSelector.start();TimeUnit.SECONDS.sleep(100);leaderSelector.close();curatorFramework.close();System.out.println("Test completed.");}
public static CuratorFramework newCuratorFramework() {CuratorFramework curatorFramework = CuratorFrameworkFactory.builder().connectString("192.168.0.104:2181").sessionTimeoutMs(30000).connectionTimeoutMs(6000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).threadFactory(ThreadUtils.newThreadFactory("ReconnectionTestThread")).build();curatorFramework.start();return curatorFramework;}
-
由于LeaderSelector的功能实现需要基于CuratorFramework,于是应该先看看CuratorFramework的start方法,直接看实现类CuratorFrameworkImpl
@Override public void start() {log.info("Starting");if ( !state.compareAndSet(CuratorFrameworkState.LATENT, CuratorFrameworkState.STARTED) ){throw new IllegalStateException("Cannot be started more than once");}try{connectionStateManager.start();//省略代码
-
发现
CuratorFrameworkImpl
内部维护了一个与连接状态管理器,start方法中会启动它 -
在
ConnectionStateManager
的start方法中,会向线程池提交一个任务,去调用processEvents方法public void start() {Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");service.submit(new Callable<Object>(){@Overridepublic Object call() throws Exception{processEvents();return null;}}); }
-
processEvents方法里面核心的内容就是,从
eventQueue
的一个阻塞队列中不断调用poll方法获取ConnectionState对象,因为处在一个while循环中,只要当前连接状态正常,就会一直去pollprivate void processEvents(){while ( state.get() == State.STARTED ){try{int useSessionTimeoutMs = getUseSessionTimeoutMs();long elapsedMs = startOfSuspendedEpoch == 0 ? useSessionTimeoutMs / 2 : System.currentTimeMillis() - startOfSuspendedEpoch;long pollMaxMs = useSessionTimeoutMs - elapsedMs;final ConnectionState newState = eventQueue.poll(pollMaxMs, TimeUnit.MILLISECONDS);if ( newState != null ){if ( listeners.isEmpty() ){log.warn("There are no ConnectionStateListeners registered.");}listeners.forEach(listener -> listener.stateChanged(client, newState));}else if ( sessionExpirationPercent > 0 ){synchronized(this){checkSessionExpiration();}}synchronized(this){if ( (currentConnectionState == ConnectionState.LOST) && client.getZookeeperClient().isConnected() ){// CURATOR-525 - there is a race whereby LOST is sometimes set after the connection has been repaired// this "hack" fixes it by forcing the state to RECONNECTEDlog.warn("ConnectionState is LOST but isConnected() is true. Forcing RECONNECTED.");addStateChange(ConnectionState.RECONNECTED);}}}catch ( InterruptedException e ){// swallow the interrupt as it's only possible from either a background// operation and, thus, doesn't apply to this loop or the instance// is being closed in which case the while test will get it}}}
-
随后遍历所有的
ConnectionStateListener
,回调stateChanged
方法,LeaderSelector
有一个静态内部类叫做WrappedListener
实现了LeaderSelectorListener
,则这个WrappedListener的stateChanged方法会被回调@Override public void stateChanged(CuratorFramework client, ConnectionState newState){try{listener.stateChanged(client, newState);}catch ( CancelLeadershipException dummy ){// If we cancel only leadership but not whole election, then we could hand over// dated leadership to client with no further cancellation. Dated leadership is// possible due to separated steps in leadership acquire: server data(e.g. election sequence)// change and client flag(e.g. hasLeadership) set.leaderSelector.cancelElection();}}
-
而上面的
listener.stateChanged(client, newState)
中listener变量就是构造LeaderSelector时传入的第三个构造参数:LeaderSelectorListener
,就是我们自己实现的LeaderSelectorListener
所以最终会回调到我们自定义的
LeaderSelectorListener#stateChanged()
方法 -
那么现在需要搞清楚
ConnectionStateManager
中的eventQueue
是在哪里被放进去的 -
追溯一下方法调用,发现eventQueue中的元素,是在
ConnectionStateManager#postState
方法中offer进去的private void postState(ConnectionState state) {log.info("State change: " + state);notifyAll();//如果队列满了,offer失败,会先poll,之后继续offerwhile ( !eventQueue.offer(state) ){eventQueue.poll();log.warn("ConnectionStateManager queue full - dropping events to make room");} }
-
继续追溯来到
org.apache.curator.framework.state.ConnectionStateManager#addStateChange
方法public synchronized boolean addStateChange(ConnectionState newConnectionState) {//如果client不是启动状态直接返回falseif ( state.get() != State.STARTED ){return false;}ConnectionState previousState = currentConnectionState;//如果新的连接状态和前一个一样,说明连接状态没有发生变化,不产生事件,直接返回了if ( previousState == newConnectionState ){return false;}currentConnectionState = newConnectionState;ConnectionState localState = newConnectionState;boolean isNegativeMessage = ((newConnectionState == ConnectionState.LOST) || (newConnectionState == ConnectionState.SUSPENDED) || (newConnectionState == ConnectionState.READ_ONLY));//如果是第一次连接,设置状态为CONNECTEDif ( !isNegativeMessage && initialConnectMessageSent.compareAndSet(false, true) ){localState = ConnectionState.CONNECTED;}postState(localState);return true; }
-
继续看addStateChange方法被
org.apache.curator.framework.imps.CuratorFrameworkImpl#validateConnection
调用void validateConnection(Watcher.Event.KeeperState state) {//state为Disconnected的时候产生SUSPENDED事件if ( state == Watcher.Event.KeeperState.Disconnected ){suspendConnection();}//state为Expired的时候产生LOST事件else if ( state == Watcher.Event.KeeperState.Expired ){connectionStateManager.addStateChange(ConnectionState.LOST);}//state为SyncConnected的时候产生RECONNECTED事件else if ( state == Watcher.Event.KeeperState.SyncConnected ){connectionStateManager.addStateChange(ConnectionState.RECONNECTED);}//state为ConnectedReadOnly的时候产生READ_ONLY事件else if ( state == Watcher.Event.KeeperState.ConnectedReadOnly ){connectionStateManager.addStateChange(ConnectionState.READ_ONLY);} }
-
继续追溯
validateConnection()
的调用方是org.apache.curator.framework.imps.CuratorFrameworkImpl#processEvent
private void processEvent(final CuratorEvent curatorEvent) {//只有事件类型是WATCHED时候,会调用这个validateConnection方法,连接状态的变更事件就是WARCHEDif ( curatorEvent.getType() == CuratorEventType.WATCHED ){validateConnection(curatorEvent.getWatchedEvent().getState());}//省略代码 }
-
这个processEvent方法在连接状态发生变化时,会被
CuratorFrameworkImpl
中CuratorZookeeperClient
传入的一个匿名内部类Watcher给调用public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder) {//这个ZookeeperFactory就是Curator创建Zookeeper的一个工厂ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory(), builder.getZkClientConfig());this.client = new CuratorZookeeperClient(localZookeeperFactory,builder.getEnsembleProvider(),builder.getSessionTimeoutMs(),builder.getConnectionTimeoutMs(),builder.getWaitForShutdownTimeoutMs(),new Watcher(){@Overridepublic void process(WatchedEvent watchedEvent){CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null);processEvent(event);}},builder.getRetryPolicy(),builder.canBeReadOnly());//省略代码 }
-
并且在
CuratorZookeeperClient
构造函数中,创建了一个ConnectionState
对象,用来管理客户端与zk的连接事件,同时把刚才的Watcher作为构造参数传给了ConnectionState,放到一个parentWatchers的队列中public CuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider,int sessionTimeoutMs, int connectionTimeoutMs, int waitForShutdownTimeoutMs, Watcher watcher,RetryPolicy retryPolicy, boolean canBeReadOnly){if ( sessionTimeoutMs < connectionTimeoutMs ){log.warn(String.format("session timeout [%d] is less than connection timeout [%d]", sessionTimeoutMs, connectionTimeoutMs));}retryPolicy = Preconditions.checkNotNull(retryPolicy, "retryPolicy cannot be null");ensembleProvider = Preconditions.checkNotNull(ensembleProvider, "ensembleProvider cannot be null");this.connectionTimeoutMs = connectionTimeoutMs;this.waitForShutdownTimeoutMs = waitForShutdownTimeoutMs;//创建了一个ConnectionState对象,管理客户端与zk的连接状态state = new ConnectionState(zookeeperFactory, ensembleProvider, sessionTimeoutMs, watcher, tracer, canBeReadOnly);setRetryPolicy(retryPolicy);}
ConnectionState(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, Watcher parentWatcher, AtomicReference<TracerDriver> tracer, boolean canBeReadOnly){this.ensembleProvider = ensembleProvider;this.tracer = tracer;if ( parentWatcher != null ){//把匿名内部类的Watcher对象传进来,放到parentWatchers中parentWatchers.offer(parentWatcher);}handleHolder = new HandleHolder(zookeeperFactory, this, ensembleProvider, sessionTimeoutMs, canBeReadOnly);}
-
然后在
ConnectionState
对象中看看哪些地方使用了这个parentWatchers
对象,发现是一个process()
方法@Overridepublic void process(WatchedEvent event){if ( LOG_EVENTS ){log.debug("ConnectState watcher: " + event);}if ( event.getType() == Watcher.Event.EventType.None ){boolean wasConnected = isConnected.get();boolean newIsConnected = checkState(event.getState(), wasConnected);if ( newIsConnected != wasConnected ){isConnected.set(newIsConnected);connectionStartMs = System.currentTimeMillis();if ( newIsConnected ){lastNegotiatedSessionTimeoutMs.set(handleHolder.getNegotiatedSessionTimeoutMs());log.debug("Negotiated session timeout: " + lastNegotiatedSessionTimeoutMs.get());}}}for ( Watcher parentWatcher : parentWatchers ){OperationTrace trace = new OperationTrace("connection-state-parent-process", tracer.get(), getSessionId());//遍历Watcher,调用process方法,目前已知是在CuratorFrameworkImpl构造器中new的一个匿名Watcher,会回到我们自定义的ConnectionStateListenerparentWatcher.process(event);trace.commit();}}
-
那么
ConnectionState#process
方法又是在哪里被调用的呢?这个找的有点深了,最终经过断点发现是在org.apache.zookeeper.ClientCnxn.EventThread#processEvent
中被调用private void processEvent(Object event) {try {if (event instanceof WatcherSetEventPair) {// each watcher will process the eventWatcherSetEventPair pair = (WatcherSetEventPair) event;for (Watcher watcher : pair.watchers) {try {watcher.process(pair.event);} catch (Throwable t) {LOG.error("Error while calling watcher ", t);}}//省略代码
-
这个
ClientCnxn
已经不是Curator的源码了,属于Zookeeper原生API,是最底层用来管理客户端和zookeeper连接的一个组件,在new Zookeeper的时候被初始化,这个Zookeeper之前提了一下,会被Curator框架封装在ConnectionState中ConnectionState(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, Watcher parentWatcher, AtomicReference<TracerDriver> tracer, boolean canBeReadOnly) {this.ensembleProvider = ensembleProvider;this.tracer = tracer;if ( parentWatcher != null ){parentWatchers.offer(parentWatcher);}//这个zookeeperFactory里面封装了获取Zookepper的方法handleHolder = new HandleHolder(zookeeperFactory, this, ensembleProvider, sessionTimeoutMs, canBeReadOnly); }
-
org.apache.zookeeper.ClientCnxn.EventThread#processEvent
方法又是在org.apache.zookeeper.ClientCnxn.EventThread#run
中调用,因为EventThread
这个内部类继承了Thread类,所以在创建Zookeeper的时候就调用start()将线程启动了,同时启动的还有SendThread
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,boolean canBeReadOnly)throws IOException//省略代码... ...cnxn = createConnection(connectStringParser.getChrootPath(),hostProvider,sessionTimeout,this.clientConfig,watcher,getClientCnxnSocket(),canBeReadOnly);cnxn.start(); }
public void start() {sendThread.start();eventThread.start(); }
-
跟踪EventThread源码,可以看到,这个线程的run方法中也是采用while循环的方式不断从一个叫做
waitingEvents
的阻塞队列中take事件private final LinkedBlockingQueue<Object> waitingEvents =new LinkedBlockingQueue<Object>(); @Override @SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER") public void run() {try {isRunning = true;while (true) {Object event = waitingEvents.take();//如果不是一个new Object对象,交给processEvent方法处理if (event == eventOfDeath) {wasKilled = true;} else {processEvent(event);}//省略无关代码... ... }
-
那么重点就是这个waitingEvents的元素是在哪里add的?
-
在ClientCnxn中拿这个变量搜索一下,发现有两个地方会add,一个是queueEvent方法,一个是queuePacket方法,显然根据名字来看,第二个应该是添加和ZK进行交互的具体数据的(而后通过打断点的方式也确实验证了这一点),而
queueEvent()
才是用来添加事件数据的public void queueEvent(WatchedEvent event) {queueEvent(event, null);}private void queueEvent(WatchedEvent event, Set<Watcher> materializedWatchers) {if (event.getType() == EventType.None && sessionState == event.getState()) {return;}sessionState = event.getState();final Set<Watcher> watchers;if (materializedWatchers == null) {// materialize the watchers based on the eventwatchers = watchManager.materialize(event.getState(), event.getType(), event.getPath());} else {watchers = new HashSet<>(materializedWatchers);}WatcherSetEventPair pair = new WatcherSetEventPair(watchers, event);// queue the pair (watch set & event) for later processingwaitingEvents.add(pair); }
-
也就是说,当客户端和ZK server连接状态变更时(如重连)一定会在某个地方调用这个queueEvent方法,把变更状态放到阻塞队列中,等待消费
-
这块代码比较复杂,有兴趣可以自主阅读
org.apache.zookeeper.ClientCnxn.SendThread
源码 -
简单的说,这块的处理流程是这样的:Zookeerper被创建的时候,会创建ClientCnxn,启动两个线程,一个是eventThread,另一个就是sendThread
-
这个SendThread主要作用就是用来跟zk通信的,而且还会搞一个心跳机制,定期去和zk ping一下,确定连接是正常的
-
在SendThread的run方法里有一个while循环,会检查如果你是断网状态,会不停的通过ClientCnxnSocket重新建立连接,连不上会重复进行此步骤
//如果不是连接状态,会一直尝试建立连接,有兴趣的可以去startConnect方法看看,如果失败,会被外层的Catch块捕获,然后继续来到while循环,重新尝试建立连接 if (!clientCnxnSocket.isConnected()) {// don't re-establish connection if we are closingif (closing) {break;}if (rwServerAddress != null) {serverAddress = rwServerAddress;rwServerAddress = null;} else {serverAddress = hostProvider.next(1000);}onConnecting(serverAddress);//这个方法中,最后会通过clientCnxnSocket组件连接zk,clientCnxnSocket.connect(addr);startConnect(serverAddress);// Update now to start the connection timer right after we make a connection attemptclientCnxnSocket.updateNow();clientCnxnSocket.updateLastSendAndHeard(); }
-
一旦重新重新建立,会在
org.apache.zookeeper.ClientCnxn.SendThread#run
方法中调用clientCnxnSocket.doTransport
,开始和zk收发数据包//pengingQueue是已经发送并正在等待响应的数据包 clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
-
doTransport方法里面是NIO的代码,有兴趣可以自己研究下
-
最终会在
org.apache.zookeeper.ClientCnxnSocket#readConnectResult
读取zk响应的数据包,调用org.apache.zookeeper.ClientCnxn.SendThread#onConnected
方法,将数据放入waitingEvents阻塞队列中void readConnectResult() throws IOException {//省略无关代码sendThread.onConnected(conRsp.getTimeOut(), this.sessionId, conRsp.getPasswd(), isRO); }
因为我们和ZK建立的不是一个只读连接,所以事件类型会是
SyncConnected
void onConnected(int _negotiatedSessionTimeout,long _sessionId,byte[] _sessionPasswd,boolean isRO) throws IOException {//省略无关代码KeeperState eventState = (isRO) ? KeeperState.ConnectedReadOnly : KeeperState.SyncConnected;eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, eventState, null)); }
前面已经看到代码,在validate的时候,如果KeeperState是
KeeperState.SyncConnected
,会触发RECONNECTED
事件,最终回调到我们自定义的ConnectionStateListener#stateChanged
方法中 -
有兴趣的可以根据我的思路进行断点调试验证,不过有一些事异步的,注意打断点的时机
验证结果
- 使用CuratorFramework作为zookeeper客户端连接工具时,当发生断网重连时在自定义的ConnectionStateListener的stateChanged方法中确定会产生RECONNECTED事件