Watcher-- 数据变更通知
- 我们知道Zookeeper提供来分布式数据的订阅/发布功能,一个典型的发布/订阅模型系统定义了一种一对多的订阅关系,能让多个订阅者同时监听某个主题对象,当这个被监听对象自身状态发生变化时候,会通知所有订阅者,Zookeeper中引入了Watcher机制来实现这种分布式通知功能,Zookeeper允许客户端向服务器节点注册一个Watcher监听,当服务器端节点发生指定触发的事件就会触发这个Watcher,之后服务端会向指定客户端发送一个事件通知,这样来实现一个分布式通知的功能,如下图所示的一个流程:
- 上图中流程,Zookeeper的Watcher机制主要包括客户端线程,客户端WatcherManager,和Zookeeper服务器,流程上简单的说:
- 客户端向Zookeeper服务器注册成功Watcher同时,将Watcher对象存储在客户端的WatcherManager
- 当Zookeeper服务器触发Watcher事件后,向客户端发送通知
- 客户端线程从WatcherManager中捞出对应的Watcher对象来执行回调逻辑
Watcher接口
- 在Zookeeper中,接口Watcher表示一个标准的事件处理器,订阅来通知相关的逻辑,我们可以看他的源码:
- EventType:事件类型
- KeeperState:通知状态
- Process(WatchedEvent event):会调方法
- 其中事件类型和通知状态是有对应关系,如下表中所示
KeeperState | EventType | 触发条件解释 | 说明 |
---|---|---|---|
SyncConnected | None | 客户端与服务器成功建立连接 | 客户端和服务器处于连接状态 |
SyncConnected | NodeCreated | Watcher 监听的对应数据节点成功创建 | 客户端和服务器处于连接状态 |
SyncConnected | NodeDeleted | Watcher监听的数据节点成功删除 | 客户端和服务器处于连接状态 |
SyncConnected | NodeDataChanged | Watcher监听的数据节点内容变更 | 客户端和服务器处于连接状态 |
SyncConnected | NodeChildrenChanged | Watcher监听的对应数据节点列表发生变更 | 客户端和服务器处于连接状态 |
Disconnected | None | 客户端与Zookeeper服务器断开连接 | 客户端和服务器断开了连接 |
Expired | None | 会话超时 | 客户端回话失效,通常同时也会收到SessionExpiredException异常 |
AuthFailed | None | 两种情况:使用错误scheme进行权限检查, SASL权限检查失败 | 通常同时收到AuthFailedException异常 |
Unknown | 3.1.0后废弃 | ||
NoSYncConnected | 3.1.0后废弃 |
-
如上列举了Zookeeper中常见的几个通知状态和事件类型,其中针对NodeDateChange事件说明的节点的变更并不一定是内容变化,可能版本号DataVersion变化也是一样会触发。
-
回调方法process 是Watcher接口中的一个回调方法,当Zookeeper服务器端向客户端发送一个Watcher事件通知的时候,客户端会对相应的Process方法进行回调,从而实现对事件的处理,Process方法定义如下
void process(WatchedEvent event);
- 如上参数WatcherEvent包含了一个事件的基本属性:
public class WatchedEvent {private final KeeperState keeperState; //通知状态private final EventType eventType; // 事件类型private String path; // 节点路径/*** Create a WatchedEvent with specified type, state and path*/public WatchedEvent(EventType eventType, KeeperState keeperState, String path) {this.keeperState = keeperState;this.eventType = eventType;this.path = path;}
......
}
- Zookeeper服务端生成WatchedEvent事件后会调用getWrapper方法将字节包装成一个可序列化的WatcherEvent,其实这是一个事务,都是对服务端事件的一个封装,不同的是WatchedEvent是我们逻辑事件中的一个对象,主要用来我们程序内部的事件容器,而WatcherEvent因为实现了序列化的接口,因此可以用于网络传输
- 在服务端得到WatcherEvent后,通过网络传到客户端,还原成一个WatchedEvent,并传递给process,然后process方法根据入参就可以解析完整的服务端事件了。
工作机制
- Zookeeper的Watcher机制可以有如下三个过程:
- 客户端注册Watcher
- 服务端处理watcher
- 客户端回调Watcher
- 以下类图说明各组件之间的关系:
客户端注册Watcher
- 我们通过如下部分源码来分析Watcher的客户端注册,我们创建一个Zookeeper的客户端对象实例时,可以向构造方法中传入一个默认的Watcher:
//我们调用的方法
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) throws IOException {this(connectString, sessionTimeout, watcher, false);
}
//实际上初始化的方法
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider aHostProvider, ZKClientConfig clientConfig) throws IOException {LOG.info("Initiating client connection, connectString={} sessionTimeout={} watcher={}", new Object[]{connectString, Integer.valueOf(sessionTimeout), watcher});if(clientConfig == null) {clientConfig = new ZKClientConfig();}this.clientConfig = clientConfig;this.watchManager = this.defaultWatchManager();this.watchManager.defaultWatcher = watcher;ConnectStringParser connectStringParser = new ConnectStringParser(connectString);this.hostProvider = aHostProvider;this.cnxn = this.createConnection(connectStringParser.getChrootPath(), this.hostProvider, sessionTimeout, this, this.watchManager, this.getClientCnxnSocket(), canBeReadOnly);this.cnxn.start();}
- 如上源码中我们给定的Watcher对象实际上被保存在客户端ZKWatcherManager的defaultWatcher中,另外Zookeeper客户端也可以通过getData,getChildren,exist三个接口来向Zookeeper服务器注册Watcher,无论哪一种都一样,我们用getData方法的源码来分析:
public byte[] getData(String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException {PathUtils.validatePath(path); //校验Path格式正确性ZooKeeper.DataWatchRegistration wcb = null;if(watcher != null) {wcb = new ZooKeeper.DataWatchRegistration(watcher, path);//封装DataWatchRegistration}String serverPath = this.prependChroot(path);RequestHeader h = new RequestHeader();h.setType(4);GetDataRequest request = new GetDataRequest();request.setPath(serverPath);request.setWatch(watcher != null);GetDataResponse response = new GetDataResponse();ReplyHeader r = this.cnxn.submitRequest(h, request, response, wcb);if(r.getErr() != 0) {throw KeeperException.create(Code.get(r.getErr()), path);} else {if(stat != null) {DataTree.copyStat(response.getStat(), stat);}return response.getData();}}
- 如上源码中参数Path, Watcher对象,getData接口注册Watcher后,做了两件事情
- 先用这两个参数封装来一个DataWatchRegistration,其实就是初始化来Zookeeper服务器中的WatchRegistration里面的 watcher,clientPath,这部分用来暂时存储注册信息保存节点和Watcher的对应关系
- 接着会向客户端请求request进行标记,将其设置为“使用watcher监听”。
- 接着继续往下SubmitRequest方法:
ReplyHeader r = this.cnxn.submitRequest(h, request, response, wcb);
public ReplyHeader submitRequest(RequestHeader h,Record request,Record response,WatchRegistration watchRegistration,WatchDeregistration watchDeregistration) throws InterruptedException {ReplyHeader r = new ReplyHeader();Packet packet = queuePacket(h,r,request,response,null, null,null, null, watchRegistration, watchDeregistration);......return r;}
- 这个步骤中又一次将ClientCnxn中的WatchRegistration封装到Packet中,Zookeeper中,Packet可以被看作是一个最小通信协议单元,用于进行客户端与服务器之间的网络传输,任何需要传输的对象都需要包装成一个Packet对象,接着他被放入发送队列,如下queuePacket代码:
public Packet queuePacket(RequestHeader h,ReplyHeader r,Record request,Record response,AsyncCallback cb,String clientPath,String serverPath,Object ctx,WatchRegistration watchRegistration,WatchDeregistration watchDeregistration) {Packet packet = null;// Note that we do not generate the Xid for the packet yet. It is// generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),// where the packet is actually sent.packet = new Packet(h, r, request, response, watchRegistration);packet.cb = cb;packet.ctx = ctx;packet.clientPath = clientPath;packet.serverPath = serverPath;packet.watchDeregistration = watchDeregistration;// The synchronized block here is for two purpose:// 1. synchronize with the final cleanup() in SendThread.run() to avoid race// 2. synchronized against each packet. So if a closeSession packet is added,// later packet will be notified.synchronized (state) {......outgoingQueue.add(packet);....}
- 我们继续追这个outgoingQueue 队列,可以看到随后Zookeeper客户端会向服务器端发送这个请求,同时等待请求的返回,王朝请求发送后,会由客户端的SendThread线程的readResponse方法负责接受来自服务端的响应,finishPacket方法会从Packet中取出对于的Watcher并注册到ZKWatcherManager中去。
protected void finishPacket(Packet p) {int err = p.replyHeader.getErr();if (p.watchRegistration != null) {p.watchRegistration.register(err);}......
- 如Packet中的Watchregistration就是我们刚才第一步getData中保存的节点对应的Watcher的注册信息。现在他又从这部分中取出来封装的Watcher,如下具体的register方法:
public void register(int rc) {if (shouldAddWatch(rc)) {Map<String, Set<Watcher>> watches = getWatches(rc);synchronized (watches) {Set<Watcher> watchers = watches.get(clientPath);if (watchers == null) {watchers = new HashSet<Watcher>();watches.put(clientPath, watchers);}watchers.add(watcher);}}}//getWatchesprotected final ZKWatchManager watchManager;protected Map<String, Set<Watcher>> getWatches(int rc) {return watchManager.dataWatches;}
-
如上register方法中客户端将之前暂时保存的Watcher取出来之后,放入到getWatcher获取到的一个Map对象中,这个Mp对象就是ZkWatcherManager中的一个dataWatches,我们将刚才存入WatchRegistration中的临时信息取出用来初始化ZKWatchManager.dataWatches,用于将数据节点的路径和watcher对象进行一一映射,这样就完成来客户端Watcher的注册,整个Watcher流程如下
-
如上流程中我们每次调用getData都会注册一个Watcher,如果这些Watcher都随着请求发送到服务器的话肯定会内存紧张,现实是这样的码,我们可以看之前代码中负责传输数据的对象Packet中,我们将WatchRegistration封装进去,如下Packet中的序列化方法createBB:
public void createBB() {try {ByteArrayOutputStream baos = new ByteArrayOutputStream();BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);boa.writeInt(-1, "len"); // We'll fill this in laterif (requestHeader != null) {requestHeader.serialize(boa, "header");//封装requestHeader}if (request instanceof ConnectRequest) {//封装requestrequest.serialize(boa, "connect");// append "am-I-allowed-to-be-readonly" flagboa.writeBool(readOnly, "readOnly");} else if (request != null) {request.serialize(boa, "request");}baos.close();this.bb = ByteBuffer.wrap(baos.toByteArray());this.bb.putInt(this.bb.capacity() - 4);this.bb.rewind();} catch (IOException e) {LOG.warn("Unexpected exception", e);}}
- 如上源码中可以看到并没有整个对象完全序列化进去,zookeeper只是将requestHeader和request两个属性进行序列化,WatchRegistration并没有被序列化到底层字节数组中,所以不会进行网络传输
服务端处理Watcher
- 上面讲解了客户端注册Watcher的过程,并且已经了解了最终客户端不会将Watcher对象真正床底到服务器,那么,服务端是怎么样完成客户端的Watcher注册,一下我们对这部分文件进行解析。
ServerCnxn存储
- 我们先看下服务器接收Watcher并将其存储起来的过程,如下Zookeeper服务端处理Watcher序列图:
- 我们先从源头分析客户端给了服务器那些信息,如下Zookeeper类中getData方法的源码:
......
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.getData);
GetDataRequest request = new GetDataRequest();
request.setPath(serverPath);
request.setWatch(watcher != null);
GetDataResponse response = new GetDataResponse();
......
- 如上RequestHeader中type类型设置的 4 ,request中给定了节点path路径,以及一个boolean类型的watcher标识是否天剑监听。服务端收到来自客户端的请求后,在FinalRequestProcessor.processRequest()中会判断当前请求的类型type来做一个策略来决定处理不同类型的请求,如下源码:
switch (request.type) {
......case OpCode.getData: {......Stat stat = new Stat();byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,getDataRequest.getWatch() ? cnxn : null);//为tru则会传递一个 ServerCnxn cnxn = request.cnxn;对象到实际的注册方法中,否则给nullrsp = new GetDataResponse(b, stat);break;}
......}
- 如上,从getData请求的处理逻辑可以看到当getDataRequest.getwatch为true的时候,Zookeeper就认为当前客户端请求需要进行Watcher注册,于是将当前的ServerCnxn对象和数据节点路径传入getData方法
- ServerCnxn是一个Zookeeper客户端和服务器之间的链接接口,代表了一个客户端和服务器的链接,ServerCnxn接口默认实现是NIOServerCNxn,同时3.4.0版本开始引入了Netty实现:NettyServerCnxn,都实现了Watcher接口并且实现process接口,所有把他看成一个Watcher对象,如下ServerCnxn对象以及两种process实现
public abstract class ServerCnxn implements Stats, Watcher {
......public abstract void process(WatchedEvent event);......
}
- 继续追getData源码,getZkDataBase获取到的ZKDatabase 对象,其中DataTree 对象是现在Zookeeper现有的节点数据的树形存储,我们可以通过path来从这获取到对应节点信息,如下获取DataNode,初始化节点状态,将DataNode天骄到WatchManager 对象中的WatchTable和watch2Paths中
byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,getDataRequest.getWatch() ? cnxn : null);
//如下getData实现
public byte[] getData(String path, Stat stat, Watcher watcher)throws KeeperException.NoNodeException {DataNode n = nodes.get(path);if (n == null) {throw new KeeperException.NoNodeException();}synchronized (n) {n.copyStat(stat);if (watcher != null) {dataWatches.addWatch(path, watcher);}return n.data;}}
- Watchmanager是Zookeeper服务端Watcher的管理者,内部管理的WatcherTable和Watch2Paths,所以一个节点存储了两次,不过是从如下两个未存存储
- watchTable是从数据节点路径的粒度来托管Watcher
- watch2Paths是从Watcher的粒度来空值时间触发需要出发的数据节点。
*/
public class WatchManager {private static final Logger LOG = LoggerFactory.getLogger(WatchManager.class);private final HashMap<String, HashSet<Watcher>> watchTable =new HashMap<String, HashSet<Watcher>>();private final HashMap<Watcher, HashSet<String>> watch2Paths =new HashMap<Watcher, HashSet<String>>();......}
- WatcherManager数据结构如下
WatcherManager |
---|
- watchTable: HashMap<String, HashSet>(); + watch2Paths :new HashMap<Watcher, HashSet>(); |
+ addwatch(String ,Watcher): void + removeWatcher(Watcher): void + triggerWatch(String, EventType):Set +Trigger |
上一篇Zookeeper–ZAB与Paxos算法联系与区别
下一篇Zookeeper–Watcher机制源码剖析二