Watcher触发
- 我们从实际操作时候的表现来看Watcher的触发,比如Zookeeper中NodeDataChanged时间的触发是“Watcher监听的对应数据节点的数据内容发生变更”,需要修改节点数据那么必然和数据节点存储的位置DataTree有关系,我们从这里去寻找修改后触发Watcher的答案。
- 我们从DataTree类中找到了修改节点的入口setData方法,我们从上篇内容中也知道了ServerCnxn存储到WatchManager中,并且以不同的维度存储了两份数据:
public Stat setData(String path, byte data[], int version, long zxid,long time) throws KeeperException.NoNodeException {Stat s = new Stat();DataNode n = nodes.get(path);if (n == null) {throw new KeeperException.NoNodeException();}byte lastdata[] = null;synchronized (n) {lastdata = n.data;n.data = data;n.stat.setMtime(time);n.stat.setMzxid(zxid);n.stat.setVersion(version);n.copyStat(s);}// now update if the path is in a quota subtree.String lastPrefix;if((lastPrefix = getMaxPrefixWithQuota(path)) != null) {this.updateBytes(lastPrefix, (data == null ? 0 : data.length)- (lastdata == null ? 0 : lastdata.length));}dataWatches.triggerWatch(path, EventType.NodeDataChanged);return s;}
- 以上setData方法流程就两个步骤:
- 利用Path从存储节点的ConcurrentHashMap中获取节点信息,
- 修改节点信息
- 调用WatchManager 的triggerWatch方法
- 可以看到以上代码是通过调用WatchManager的triggerWatch方法来触发相关事件
public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {WatchedEvent e = new WatchedEvent(type,KeeperState.SyncConnected, path);HashSet<Watcher> watchers;synchronized (this) {watchers = watchTable.remove(path);......for (Watcher w : watchers) {HashSet<String> paths = watch2Paths.get(w);if (paths != null) {paths.remove(path);}}}for (Watcher w : watchers) {if (supress != null && supress.contains(w)) {continue;}w.process(e);}return watchers;}
- 如上是triggerWatch源码中的触发逻辑有如下几个步骤:
- 封装WatchedEvent:首先从通知参数中获取到通知状态KeeperState,事件类型EventType,节点路径Path封装成一个WatchedEvent对象
- 查询Watcher对象:根据数据节点的节点路径从WatchTable中取出对应的Watcher,如果没有找到watcher,说明没有任何客户端在这个节点上注册过Watcher,直接退出。找到了这个Watcher将他取出来,同时直接从watchTable和watch2Path中删掉------这个步骤可以看出,watcher在服务端是一次性的,即触发一次就失效了。
- 调用process方法来触发Watcher: 在最后的for循环中依次取出每一个watcher来调用每一个的process,我们看一下他的实现类有N多个如下图,实际调用是哪一个呢,我们得从之前的注册代码中去找答案。之前Zookeeper服务器注册到WatchManager中的是watcher的实现类ServerCnxn,所有我们直接看ServerCnxn的process实现方法就可以
- 以下ServerCnxn对应的process是一个抽象方法,他的实现是NIOServerCnxn的实现:
public abstract void process(WatchedEvent event);
synchronized public void process(WatchedEvent event) {ReplyHeader h = new ReplyHeader(-1, -1L, 0);if (LOG.isTraceEnabled()) {ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,"Deliver event " + event + " to 0x"+ Long.toHexString(this.sessionId)+ " through " + this);}// Convert WatchedEvent to a type that can be sent over the wireWatcherEvent e = event.getWrapper();sendResponse(h, e, "notification");}
- 以上代码片段可以看出process方法逻辑比较简单以下几个步骤:
- 将请求头标记为-1,标识当前是一个通知
- 将watchedEvent包装成WatcherEvent,以方便进行网络传输序列化(之前篇解释过WatcherEvent用来网络传输用)
- 向客户端发送该通知
- 我们从上面步骤看其实他并没有处理客户端Watcher的逻辑,只是借用当前客户端连接的ServerCnxn对象来实现对客户端WatchedEvent的传递,真正的客户端Watcher回调与业务逻辑的执行肯定都在客户端这边
客户端回调Watcher
- 对于一个来自服务器的通知是通ServerCnxn中发送出来的,同样的,客户端这边的响应也是在一个类似的类中ClientCnxn中,ClientCnxn中通过SendThread来收事件通知
SendThread接收事件通知
- 我们来看下ClientCnxn的接收通知的源码处理:
class SendThread extends ZooKeeperThread {private long lastPingSentNs;private final ClientCnxnSocket clientCnxnSocket;private Random r = new Random(System.nanoTime()); private boolean isFirstConnect = true;void readResponse(ByteBuffer incomingBuffer) throws IOException {ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);ReplyHeader replyHdr = new ReplyHeader();replyHdr.deserialize(bbia, "header");......if (replyHdr.getXid() == -1) {// -1 means notificationif (LOG.isDebugEnabled()) {LOG.debug("Got notification sessionid:0x"+ Long.toHexString(sessionId));}WatcherEvent event = new WatcherEvent();event.deserialize(bbia, "response");// convert from a server path to a client pathif (chrootPath != null) {String serverPath = event.getPath();if(serverPath.compareTo(chrootPath)==0)event.setPath("/");else if (serverPath.length() > chrootPath.length())event.setPath(serverPath.substring(chrootPath.length()));else {LOG.warn("Got server path " + event.getPath()+ " which is too short for chroot path "+ chrootPath);}}WatchedEvent we = new WatchedEvent(event);if (LOG.isDebugEnabled()) {LOG.debug("Got " + we + " for sessionid 0x"+ Long.toHexString(sessionId));}eventThread.queueEvent( we );return;}......}
- 对于一个服务端的响应客户端由SendThread.readResponse(ByteBuffer incomingBuffer)方法来统一处理,如果响应头replyHdr中标识的XID为-1,标识这个是一个通知类型响应,对其的处理大体上分为4个步骤
- 反序列化:replyHdr.deserialize(bbia, “header”); 方法Zookeeper客户端接收到请求首先将字节流转换成WatcherEvent对象
- 处理Chrootpath:如果客户端设置了chrootPath属性,那么要对服务端传过来的完整的节点路径进行chrootPath处理,生成客户端的一个相对节点路径,例如客户端设置差rootPath为/appl,那么针对服务器端传来的响应节点路径为/appl/locks,经过chrootPath处理后,就变成相对路径/locks。
- 还原WatchedEvent:通过接收到的WatcherEvent得到WatchedEvent
- 回调Watcher,最后将WatchedEvent对象交给EventThread线程,在下一个轮询周期中进行Watcher回调。
EventThread处理事件通知
- 如上流程中,服务的的Watcher时间通知最终交给了EventThread线程来处理,EventThread是Zookeeper客户端中专门用来处理服务器端通知的事件线程,我们看下EventThread中是怎么处理的。由上面代码中queueEvent 方法入口:
public void queueEvent(WatchedEvent event) {if (event.getType() == EventType.None&& sessionState == event.getState()) {return;}sessionState = event.getState();// materialize the watchers based on the eventWatcherSetEventPair pair = new WatcherSetEventPair(watcher.materialize(event.getState(), event.getType(),event.getPath()),event);// queue the pair (watch set & event) for later processingwaitingEvents.add(pair);}
- 如上 QueueEvent方法首先根据该通知事件从ZKWatchManager中取出所有相关Watcher,materalize方法如下:
private final Map<String, Set<Watcher>> dataWatches =new HashMap<String, Set<Watcher>>();private final Map<String, Set<Watcher>> existWatches =new HashMap<String, Set<Watcher>>();private final Map<String, Set<Watcher>> childWatches =new HashMap<String, Set<Watcher>>();public Set<Watcher> materialize(Watcher.Event.KeeperState state,Watcher.Event.EventType type,String clientPath){Set<Watcher> result = new HashSet<Watcher>();switch (type) {case None:......case NodeDataChanged:case NodeCreated:synchronized (dataWatches) {addTo(dataWatches.remove(clientPath), result);}synchronized (existWatches) {addTo(existWatches.remove(clientPath), result);}break;......}}final private void addTo(Set<Watcher> from, Set<Watcher> to) {if (from != null) {to.addAll(from);}}
- 以上代码中客户端识别出EventType后,会从相应的Watcher存储(即上代码中dataWatches,existWatches,childWatches中一个或者多个,比如NodeCreated 事件类型从dataWatches ,和 existWatches中所有watcher)中去掉对应的Watcher,此处用的remove,标识客户端的Watcher机制统一也是一次性的,触发后该Watcher就失效了。
- 接着利用Watcher封装成一个WatcherSetEventPair,并且将这个对象加入一个阻塞队列LinkedBlockingQueue 中,并且我们在ClientCnxn类中能找到一个run方法,这个方法会不断的从阻塞队列中take出数据然后再发送对应的通知process进行串行同步处理,这里的Watcher才是真正的客户端注册的Watcher,调用这个Watcher的process方法就可以实现回调了,保证FIFO。
@Overridepublic void run() {try {isRunning = true;while (true) {Object event = waitingEvents.take();if (event == eventOfDeath) {wasKilled = true;} else {processEvent(event);}......}} catch (InterruptedException e) {
......}
......}
Watcher特性总结
- 通过我们上面的分析,了解了Watcher机制的相关接口定义以及Watcher的各类事件,我们以Zookeeper节点的数据内容过期接口为例,从Zookeeper客户端进行Watcher注册,服务的处理Watcher以及客户端回调watcher三方面阶段讲解了ZooKeeper的Watcher工作机制,发现Watcher有以下几个特点:
一次性
- 无论是客户端还是服务端,一个Watcher被触发,ZooKeeper都会将其从相应的存储中移除,因此,开发人员在Watcher的使用上要记住的一点是要反复注册,这样的设计有效的减轻了服务端的压力,如果注册一个Watcher后一直有效,针对那些非常频繁的节点,服务端会不断向客户端发送事件通知,无论对网络还是服务器性能都有非常大影响
客户端串行执行
- 客户端watcher回调的过程是一个串行同步的过程,这保证了顺序性,同事需要开发人员注意不要因为一个Watcher的处理逻辑影响了整个客户端Watcher回调
轻量级
- WatchedEvent是整个ZooKeeperWatcher通知机制的最小单元,这个数据结构只包含三部分内容:通知状态,事件类型,节点路径。也就是说,Watcher通知非常简单,只告诉客户端发生了事件,而不说明具体的内容。
- 例如针对NodeDataChanged时间ZooKeeper的Watcher只通知客户端指定数据节点的数据内容发生了变更,而对于原始数据以及变更后的数据都无法直接获取,需要主动获取,这也是Watcher机制的一个重要特性
- 另外客户端向服务端注册Watcher时候,并不会吧客户端真实的watcher对象传到服务端,仅仅是在客户端请求中使用boolean类型属性标记,同事服务端也只保存当前连接的ServerCnxn对象
- 轻量级设计使Watcher机制在网络开销和服务端内存开销上都非常廉价。
上一篇Zookeeper–Watcher机制源码剖析一
下一篇Zookeeper实践与应用- Canal