Zookeeper--Watcher机制源码剖析一

Watcher-- 数据变更通知

  • 我们知道Zookeeper提供来分布式数据的订阅/发布功能,一个典型的发布/订阅模型系统定义了一种一对多的订阅关系,能让多个订阅者同时监听某个主题对象,当这个被监听对象自身状态发生变化时候,会通知所有订阅者,Zookeeper中引入了Watcher机制来实现这种分布式通知功能,Zookeeper允许客户端向服务器节点注册一个Watcher监听,当服务器端节点发生指定触发的事件就会触发这个Watcher,之后服务端会向指定客户端发送一个事件通知,这样来实现一个分布式通知的功能,如下图所示的一个流程:
    在这里插入图片描述
  • 上图中流程,Zookeeper的Watcher机制主要包括客户端线程,客户端WatcherManager,和Zookeeper服务器,流程上简单的说:
    • 客户端向Zookeeper服务器注册成功Watcher同时,将Watcher对象存储在客户端的WatcherManager
    • 当Zookeeper服务器触发Watcher事件后,向客户端发送通知
    • 客户端线程从WatcherManager中捞出对应的Watcher对象来执行回调逻辑

Watcher接口

  • 在Zookeeper中,接口Watcher表示一个标准的事件处理器,订阅来通知相关的逻辑,我们可以看他的源码:
    • EventType:事件类型
    • KeeperState:通知状态
    • Process(WatchedEvent event):会调方法
  • 其中事件类型和通知状态是有对应关系,如下表中所示
KeeperStateEventType触发条件解释说明
SyncConnectedNone客户端与服务器成功建立连接客户端和服务器处于连接状态
SyncConnectedNodeCreatedWatcher 监听的对应数据节点成功创建客户端和服务器处于连接状态
SyncConnectedNodeDeletedWatcher监听的数据节点成功删除客户端和服务器处于连接状态
SyncConnectedNodeDataChangedWatcher监听的数据节点内容变更客户端和服务器处于连接状态
SyncConnectedNodeChildrenChangedWatcher监听的对应数据节点列表发生变更客户端和服务器处于连接状态
DisconnectedNone客户端与Zookeeper服务器断开连接客户端和服务器断开了连接
ExpiredNone会话超时客户端回话失效,通常同时也会收到SessionExpiredException异常
AuthFailedNone两种情况:使用错误scheme进行权限检查, SASL权限检查失败通常同时收到AuthFailedException异常
Unknown3.1.0后废弃
NoSYncConnected3.1.0后废弃
  • 如上列举了Zookeeper中常见的几个通知状态和事件类型,其中针对NodeDateChange事件说明的节点的变更并不一定是内容变化,可能版本号DataVersion变化也是一样会触发。

  • 回调方法process 是Watcher接口中的一个回调方法,当Zookeeper服务器端向客户端发送一个Watcher事件通知的时候,客户端会对相应的Process方法进行回调,从而实现对事件的处理,Process方法定义如下

 void process(WatchedEvent event);
  • 如上参数WatcherEvent包含了一个事件的基本属性:
public class WatchedEvent {private final KeeperState keeperState; //通知状态private final EventType eventType; //  事件类型private String path; //   节点路径/*** Create a WatchedEvent with specified type, state and path*/public WatchedEvent(EventType eventType, KeeperState keeperState, String path) {this.keeperState = keeperState;this.eventType = eventType;this.path = path;}
......
}
  • Zookeeper服务端生成WatchedEvent事件后会调用getWrapper方法将字节包装成一个可序列化的WatcherEvent,其实这是一个事务,都是对服务端事件的一个封装,不同的是WatchedEvent是我们逻辑事件中的一个对象,主要用来我们程序内部的事件容器,而WatcherEvent因为实现了序列化的接口,因此可以用于网络传输
  • 在服务端得到WatcherEvent后,通过网络传到客户端,还原成一个WatchedEvent,并传递给process,然后process方法根据入参就可以解析完整的服务端事件了。

工作机制

  • Zookeeper的Watcher机制可以有如下三个过程:
    • 客户端注册Watcher
    • 服务端处理watcher
    • 客户端回调Watcher
  • 以下类图说明各组件之间的关系:
    在这里插入图片描述
客户端注册Watcher
  • 我们通过如下部分源码来分析Watcher的客户端注册,我们创建一个Zookeeper的客户端对象实例时,可以向构造方法中传入一个默认的Watcher:
//我们调用的方法
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) throws IOException {this(connectString, sessionTimeout, watcher, false);
}
//实际上初始化的方法
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider aHostProvider, ZKClientConfig clientConfig) throws IOException {LOG.info("Initiating client connection, connectString={} sessionTimeout={} watcher={}", new Object[]{connectString, Integer.valueOf(sessionTimeout), watcher});if(clientConfig == null) {clientConfig = new ZKClientConfig();}this.clientConfig = clientConfig;this.watchManager = this.defaultWatchManager();this.watchManager.defaultWatcher = watcher;ConnectStringParser connectStringParser = new ConnectStringParser(connectString);this.hostProvider = aHostProvider;this.cnxn = this.createConnection(connectStringParser.getChrootPath(), this.hostProvider, sessionTimeout, this, this.watchManager, this.getClientCnxnSocket(), canBeReadOnly);this.cnxn.start();}
  • 如上源码中我们给定的Watcher对象实际上被保存在客户端ZKWatcherManager的defaultWatcher中,另外Zookeeper客户端也可以通过getData,getChildren,exist三个接口来向Zookeeper服务器注册Watcher,无论哪一种都一样,我们用getData方法的源码来分析:
 public byte[] getData(String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException {PathUtils.validatePath(path); //校验Path格式正确性ZooKeeper.DataWatchRegistration wcb = null;if(watcher != null) {wcb = new ZooKeeper.DataWatchRegistration(watcher, path);//封装DataWatchRegistration}String serverPath = this.prependChroot(path);RequestHeader h = new RequestHeader();h.setType(4);GetDataRequest request = new GetDataRequest();request.setPath(serverPath);request.setWatch(watcher != null);GetDataResponse response = new GetDataResponse();ReplyHeader r = this.cnxn.submitRequest(h, request, response, wcb);if(r.getErr() != 0) {throw KeeperException.create(Code.get(r.getErr()), path);} else {if(stat != null) {DataTree.copyStat(response.getStat(), stat);}return response.getData();}}
  • 如上源码中参数Path, Watcher对象,getData接口注册Watcher后,做了两件事情
    • 先用这两个参数封装来一个DataWatchRegistration,其实就是初始化来Zookeeper服务器中的WatchRegistration里面的 watcher,clientPath,这部分用来暂时存储注册信息保存节点和Watcher的对应关系
    • 接着会向客户端请求request进行标记,将其设置为“使用watcher监听”。
  • 接着继续往下SubmitRequest方法:
ReplyHeader r = this.cnxn.submitRequest(h, request, response, wcb);
public ReplyHeader submitRequest(RequestHeader h,Record request,Record response,WatchRegistration watchRegistration,WatchDeregistration watchDeregistration) throws InterruptedException {ReplyHeader r = new ReplyHeader();Packet packet = queuePacket(h,r,request,response,null, null,null, null, watchRegistration,  watchDeregistration);......return r;}
  • 这个步骤中又一次将ClientCnxn中的WatchRegistration封装到Packet中,Zookeeper中,Packet可以被看作是一个最小通信协议单元,用于进行客户端与服务器之间的网络传输,任何需要传输的对象都需要包装成一个Packet对象,接着他被放入发送队列,如下queuePacket代码:
public Packet queuePacket(RequestHeader h,ReplyHeader r,Record request,Record response,AsyncCallback cb,String clientPath,String serverPath,Object ctx,WatchRegistration watchRegistration,WatchDeregistration watchDeregistration) {Packet packet = null;// Note that we do not generate the Xid for the packet yet. It is// generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),// where the packet is actually sent.packet = new Packet(h, r, request, response, watchRegistration);packet.cb = cb;packet.ctx = ctx;packet.clientPath = clientPath;packet.serverPath = serverPath;packet.watchDeregistration = watchDeregistration;// The synchronized block here is for two purpose:// 1. synchronize with the final cleanup() in SendThread.run() to avoid race// 2. synchronized against each packet. So if a closeSession packet is added,// later packet will be notified.synchronized (state) {......outgoingQueue.add(packet);....}
  • 我们继续追这个outgoingQueue 队列,可以看到随后Zookeeper客户端会向服务器端发送这个请求,同时等待请求的返回,王朝请求发送后,会由客户端的SendThread线程的readResponse方法负责接受来自服务端的响应,finishPacket方法会从Packet中取出对于的Watcher并注册到ZKWatcherManager中去。
protected void finishPacket(Packet p) {int err = p.replyHeader.getErr();if (p.watchRegistration != null) {p.watchRegistration.register(err);}......
  • 如Packet中的Watchregistration就是我们刚才第一步getData中保存的节点对应的Watcher的注册信息。现在他又从这部分中取出来封装的Watcher,如下具体的register方法:
public void register(int rc) {if (shouldAddWatch(rc)) {Map<String, Set<Watcher>> watches = getWatches(rc);synchronized (watches) {Set<Watcher> watchers = watches.get(clientPath);if (watchers == null) {watchers = new HashSet<Watcher>();watches.put(clientPath, watchers);}watchers.add(watcher);}}}//getWatchesprotected final ZKWatchManager watchManager;protected Map<String, Set<Watcher>> getWatches(int rc) {return watchManager.dataWatches;}
  • 如上register方法中客户端将之前暂时保存的Watcher取出来之后,放入到getWatcher获取到的一个Map对象中,这个Mp对象就是ZkWatcherManager中的一个dataWatches,我们将刚才存入WatchRegistration中的临时信息取出用来初始化ZKWatchManager.dataWatches,用于将数据节点的路径和watcher对象进行一一映射,这样就完成来客户端Watcher的注册,整个Watcher流程如下
    在这里插入图片描述

  • 如上流程中我们每次调用getData都会注册一个Watcher,如果这些Watcher都随着请求发送到服务器的话肯定会内存紧张,现实是这样的码,我们可以看之前代码中负责传输数据的对象Packet中,我们将WatchRegistration封装进去,如下Packet中的序列化方法createBB:

 public void createBB() {try {ByteArrayOutputStream baos = new ByteArrayOutputStream();BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);boa.writeInt(-1, "len"); // We'll fill this in laterif (requestHeader != null) {requestHeader.serialize(boa, "header");//封装requestHeader}if (request instanceof ConnectRequest) {//封装requestrequest.serialize(boa, "connect");// append "am-I-allowed-to-be-readonly" flagboa.writeBool(readOnly, "readOnly");} else if (request != null) {request.serialize(boa, "request");}baos.close();this.bb = ByteBuffer.wrap(baos.toByteArray());this.bb.putInt(this.bb.capacity() - 4);this.bb.rewind();} catch (IOException e) {LOG.warn("Unexpected exception", e);}}
  • 如上源码中可以看到并没有整个对象完全序列化进去,zookeeper只是将requestHeader和request两个属性进行序列化,WatchRegistration并没有被序列化到底层字节数组中,所以不会进行网络传输

服务端处理Watcher

  • 上面讲解了客户端注册Watcher的过程,并且已经了解了最终客户端不会将Watcher对象真正床底到服务器,那么,服务端是怎么样完成客户端的Watcher注册,一下我们对这部分文件进行解析。
ServerCnxn存储
  • 我们先看下服务器接收Watcher并将其存储起来的过程,如下Zookeeper服务端处理Watcher序列图:
    在这里插入图片描述
  • 我们先从源头分析客户端给了服务器那些信息,如下Zookeeper类中getData方法的源码:
......
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.getData);
GetDataRequest request = new GetDataRequest();
request.setPath(serverPath);
request.setWatch(watcher != null);
GetDataResponse response = new GetDataResponse();
......
  • 如上RequestHeader中type类型设置的 4 ,request中给定了节点path路径,以及一个boolean类型的watcher标识是否天剑监听。服务端收到来自客户端的请求后,在FinalRequestProcessor.processRequest()中会判断当前请求的类型type来做一个策略来决定处理不同类型的请求,如下源码:
switch (request.type) {
......case OpCode.getData: {......Stat stat = new Stat();byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,getDataRequest.getWatch() ? cnxn : null);//为tru则会传递一个 ServerCnxn cnxn = request.cnxn;对象到实际的注册方法中,否则给nullrsp = new GetDataResponse(b, stat);break;}
......}
  • 如上,从getData请求的处理逻辑可以看到当getDataRequest.getwatch为true的时候,Zookeeper就认为当前客户端请求需要进行Watcher注册,于是将当前的ServerCnxn对象和数据节点路径传入getData方法
  • ServerCnxn是一个Zookeeper客户端和服务器之间的链接接口,代表了一个客户端和服务器的链接,ServerCnxn接口默认实现是NIOServerCNxn,同时3.4.0版本开始引入了Netty实现:NettyServerCnxn,都实现了Watcher接口并且实现process接口,所有把他看成一个Watcher对象,如下ServerCnxn对象以及两种process实现
public abstract class ServerCnxn implements Stats, Watcher {
......public abstract void process(WatchedEvent event);......
}

在这里插入图片描述

  • 继续追getData源码,getZkDataBase获取到的ZKDatabase 对象,其中DataTree 对象是现在Zookeeper现有的节点数据的树形存储,我们可以通过path来从这获取到对应节点信息,如下获取DataNode,初始化节点状态,将DataNode天骄到WatchManager 对象中的WatchTable和watch2Paths中
 byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,getDataRequest.getWatch() ? cnxn : null);
//如下getData实现
public byte[] getData(String path, Stat stat, Watcher watcher)throws KeeperException.NoNodeException {DataNode n = nodes.get(path);if (n == null) {throw new KeeperException.NoNodeException();}synchronized (n) {n.copyStat(stat);if (watcher != null) {dataWatches.addWatch(path, watcher);}return n.data;}}
  • Watchmanager是Zookeeper服务端Watcher的管理者,内部管理的WatcherTable和Watch2Paths,所以一个节点存储了两次,不过是从如下两个未存存储
    • watchTable是从数据节点路径的粒度来托管Watcher
    • watch2Paths是从Watcher的粒度来空值时间触发需要出发的数据节点。
 */
public class WatchManager {private static final Logger LOG = LoggerFactory.getLogger(WatchManager.class);private final HashMap<String, HashSet<Watcher>> watchTable =new HashMap<String, HashSet<Watcher>>();private final HashMap<Watcher, HashSet<String>> watch2Paths =new HashMap<Watcher, HashSet<String>>();......}
  • WatcherManager数据结构如下
WatcherManager
- watchTable: HashMap<String, HashSet>(); + watch2Paths :new HashMap<Watcher, HashSet>();
+ addwatch(String ,Watcher): void + removeWatcher(Watcher): void + triggerWatch(String, EventType):Set +Trigger

上一篇Zookeeper–ZAB与Paxos算法联系与区别
下一篇Zookeeper–Watcher机制源码剖析二

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/310603.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

.NET Core 3.1 的REST 和gRPC 性能测试

看到越南小哥 的github 上的Evaluating Performance of REST vs. gRPC &#xff0c; 使用的是.NET Core 3.0 , 今天我把它升级到.NET Core 3.1 同样做了一个测试&#xff0c;文章的结果和他的博客文章是一样的&#xff1a;https://dev.to/thangchung/performance-benchmark-grp…

VS快捷键大全(超全)

1.注释&#xff1a;CTRLKC 2.取消注释&#xff1a;CTRLKU 3.设置断点调试&#xff1a;F9&#xff0c;断点行不执行 4.回到上一个光标位置&#xff1a;CTRL 5.前进到下一个光标位置&#xff1a;CTRLShift 6.复制整行代码&#xff1a;光标停在该行&#xff0c;CTRLC&#xf…

C++ 在派生类中使用using声明改变基类成员的可访问性

通过在类的内部使用using声明语句 , 我们可以将该类的直接或间接基类中的任何可访问成员标记出来 (只限于非私有成员) 。using声明语句中名字的访问权限由该using声明语句之前的访问说明符来决定。 例子如下: //.h class Base { public:int base_public 1;void func1(); pro…

Zookeeper--Watcher机制源码剖析二

Watcher触发 我们从实际操作时候的表现来看Watcher的触发&#xff0c;比如Zookeeper中NodeDataChanged时间的触发是“Watcher监听的对应数据节点的数据内容发生变更”&#xff0c;需要修改节点数据那么必然和数据节点存储的位置DataTree有关系&#xff0c;我们从这里去寻找修改…

吴军《硅谷来信》工作篇学习总结

【学习总结】| 作者 / Edison Zhou这是恰童鞋骚年的第215篇原创文章2018年在得到App上订阅了吴军老师的《硅谷来信》&#xff0c;从此每天的碎片时间就开始听吴军老师在大洋彼岸寄来的信件了。整个来信涵盖了职业发展、工作效率、业余生活等多个主题&#xff0c;从吴军老师的视…

[剑指offer]面试题1:赋值运算符函数

面试题1&#xff1a;赋值运算符函数 题目&#xff1a;如下为类型CMyString的声明&#xff0c;请为该类型添加赋值运算符函数。 class CMyString { public:CMyString(char *pData nullptr);CMyString(const CMyString &str);~CMyString(void); private:char *m_pDate; };经…

Zookeeper实践与应用-- Nginx负载均衡差异

Nginx/ZooKeeper 负载均衡的差异 Nginx 是我们常见的反向代理服务器&#xff0c;也被广泛的用作负载均衡服务器ZooKeeper是分布式协调服务框架&#xff0c;有时也被用来做负载均衡 Nginx Nginx负载均衡配置非常简单&#xff0c;吧多个Web Server配置到nginx中&#xff0c;用…

从对我的质疑说起,谈谈Linux下的文件删除

特特本来就是个刚毕业的小菜&#xff0c;很多知识都是靠着大家的指点才慢慢学会的。之前在一篇"纯属虚构"的文章 (鹅厂后台开发工程师的工作日常) 提到使用 rm 命令删除一个近 100 G 的 log 文件。很荣幸&#xff0c;这篇文章被一个大号转载了&#xff0c;获得了很不…

[剑指offer]面试题3:二维数组中的查找

面试题3&#xff1a;二维数组中的查找 题目&#xff1a;在一个二维数组中&#xff0c;每一行都按照从左到右递增的顺序排序&#xff0c;每一列都按照从上到下递增的顺序排序。请完成一个函数&#xff0c;输入这样的一个二维数组和一个整数&#xff0c;判断数组中是否含有该整数…

Zookeeper实践与应用--分布式锁实现

分布式锁 分布式锁是控制分布式系统之间同步访问资源的一种方式&#xff0c;如果不同系统是同一个系统的不同主机之间共享一个或一组资源&#xff0c;那么访问这些资源的时候&#xff0c;往往需要通过一些呼哧手段来防止彼此之间的干扰保证统一性&#xff0c;因此需要分布式锁…

关于 Blazor Server Side 的一些杂项, 感想

在2016年, 本人就开始了一个内部项目, 其特点就是用C#构建DOM树, 然后把DOM同步到浏览器中显示. 并且在一些小工程中使用.3年下来, 效果很不错, 但因为是使用C#来构建控件树, 在没有特定语法的情况下, 代码风格不是那么好.典型的风格大概是这样的:这个模式挺好的, 有点嫌弃C#代…

重现江湖!大数据高并发——架构师秘籍

大数据高并发的话题屡见不鲜&#xff0c;各种应对的方式方法也四处可见。然而笔试面试中一问就懵&#xff0c;简直是高薪拦路虎。为什么呢&#xff1f;究其原因&#xff0c;还是思路不清晰&#xff0c;缺乏实操&#xff0c;所以一问就倒。作为专注.Net领域十几年的老司机&#…

[剑指offer]面试题4:替换空格

面试题4&#xff1a;替换空格 题目&#xff1a;请实现一个函数&#xff0c;把字符串中的每个空格替换成"%20"。例如输入“We are happy.”&#xff0c;则输出“We%20are%20happy.”。 ❖ 时间复杂度为O&#xff08;n2&#xff09;的解法&#xff0c;不足以拿到Offer…

Zookeepe实践与应用--分布队列

分布式队列 接触到不少分布式队列的产品&#xff0c;比如&#xff0c;ActiveMq&#xff0c;RocketMQ&#xff0c;kafka等消息中间价&#xff0c;现在我们看看Zookeeper实现的分布式队列。分布式队列简单讲就可以分两个部分&#xff0c;一种是先进先出&#xff0c;另外一种是等…

[剑指offer]面试题5:从尾到头打印链表

面试题5&#xff1a;从尾到头打印链表 题目&#xff1a;输入一个链表的头结点&#xff0c;从尾到头反过来打印出每个结点的值。 链表结点定义如下: struct ListNode {int m_nKey;ListNode *m_pNext; };通常打印是一个只读操作&#xff0c;我们不希望打印时修改内容。 假设面…

ASP.NET Core+Quartz.Net实现web定时任务

点击蓝色“Dotnet Plus”关注我哟加个“星标”&#xff0c;每天清晨 07:25&#xff0c;干货推送&#xff01;作为一枚后端程序狗&#xff0c;项目实践常遇到定时任务的工作&#xff0c;最容易想到的的思路就是利用Windows计划任务/wndows service程序/Crontab程序等主机方法在主…

Redis基础数据结构内部实现简单介绍

5种基础数据结构 Redis有5种基础数据结构&#xff0c;分别是&#xff1a;String&#xff08;字符串&#xff09;&#xff0c;list&#xff08;列表&#xff09;&#xff0c;hash&#xff08;字典&#xff09;&#xff0c;set&#xff08;集合&#xff09;&#xff0c;zset&…

[剑指offer]面试题7:用两个栈实现队列

面试题7&#xff1a;用两个栈实现队列 题目&#xff1a;用两个栈实现一个队列。队列的声明如下&#xff0c;请实现它的两个函数appendTail和deleteHead&#xff0c;分别完成在队列尾部插入结点和在队列头部删除结点的功能。 用两个栈模拟一个队列的操作: 代码如下: #include …

ASP.NET CORE WEBAPI文件下载

最近要使用ASP.NET CORE WEBAPI用来下载文件&#xff0c;使用的.NET CORE 3.1。考虑如下场景&#xff1a;文件是程序生成的。文件应该能兼容各种格式。浏览器可以感知进行下载。准备经过简单的调研&#xff0c;得到以下结论。ASP.NET CORE 提供FileResult这种类型的ActionResul…

Redis高级数据结构原理解析-bitmap,hyperloglog

Redis 位图 开发过程中&#xff0c;我们可能遇到这种场景记录用户的打卡情况&#xff0c;签到情况&#xff0c;这些场景只有两种结果&#xff0c;有或者没有&#xff0c;加入记录的数据量比较大&#xff0c;比如用一年的数据&#xff0c;如果用Redis中普通key/value&#xff0…