Zookeeper--Watcher机制源码剖析二

Watcher触发

  • 我们从实际操作时候的表现来看Watcher的触发,比如Zookeeper中NodeDataChanged时间的触发是“Watcher监听的对应数据节点的数据内容发生变更”,需要修改节点数据那么必然和数据节点存储的位置DataTree有关系,我们从这里去寻找修改后触发Watcher的答案。
  • 我们从DataTree类中找到了修改节点的入口setData方法,我们从上篇内容中也知道了ServerCnxn存储到WatchManager中,并且以不同的维度存储了两份数据:
public Stat setData(String path, byte data[], int version, long zxid,long time) throws KeeperException.NoNodeException {Stat s = new Stat();DataNode n = nodes.get(path);if (n == null) {throw new KeeperException.NoNodeException();}byte lastdata[] = null;synchronized (n) {lastdata = n.data;n.data = data;n.stat.setMtime(time);n.stat.setMzxid(zxid);n.stat.setVersion(version);n.copyStat(s);}// now update if the path is in a quota subtree.String lastPrefix;if((lastPrefix = getMaxPrefixWithQuota(path)) != null) {this.updateBytes(lastPrefix, (data == null ? 0 : data.length)- (lastdata == null ? 0 : lastdata.length));}dataWatches.triggerWatch(path, EventType.NodeDataChanged);return s;}
  • 以上setData方法流程就两个步骤:
    • 利用Path从存储节点的ConcurrentHashMap中获取节点信息,
    • 修改节点信息
    • 调用WatchManager 的triggerWatch方法
  • 可以看到以上代码是通过调用WatchManager的triggerWatch方法来触发相关事件
public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {WatchedEvent e = new WatchedEvent(type,KeeperState.SyncConnected, path);HashSet<Watcher> watchers;synchronized (this) {watchers = watchTable.remove(path);......for (Watcher w : watchers) {HashSet<String> paths = watch2Paths.get(w);if (paths != null) {paths.remove(path);}}}for (Watcher w : watchers) {if (supress != null && supress.contains(w)) {continue;}w.process(e);}return watchers;}
  • 如上是triggerWatch源码中的触发逻辑有如下几个步骤:
    • 封装WatchedEvent:首先从通知参数中获取到通知状态KeeperState,事件类型EventType,节点路径Path封装成一个WatchedEvent对象
    • 查询Watcher对象:根据数据节点的节点路径从WatchTable中取出对应的Watcher,如果没有找到watcher,说明没有任何客户端在这个节点上注册过Watcher,直接退出。找到了这个Watcher将他取出来,同时直接从watchTable和watch2Path中删掉------这个步骤可以看出,watcher在服务端是一次性的,即触发一次就失效了。
    • 调用process方法来触发Watcher: 在最后的for循环中依次取出每一个watcher来调用每一个的process,我们看一下他的实现类有N多个如下图,实际调用是哪一个呢,我们得从之前的注册代码中去找答案。之前Zookeeper服务器注册到WatchManager中的是watcher的实现类ServerCnxn,所有我们直接看ServerCnxn的process实现方法就可以
      在这里插入图片描述
  • 以下ServerCnxn对应的process是一个抽象方法,他的实现是NIOServerCnxn的实现:
public abstract void process(WatchedEvent event);
synchronized public void process(WatchedEvent event) {ReplyHeader h = new ReplyHeader(-1, -1L, 0);if (LOG.isTraceEnabled()) {ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,"Deliver event " + event + " to 0x"+ Long.toHexString(this.sessionId)+ " through " + this);}// Convert WatchedEvent to a type that can be sent over the wireWatcherEvent e = event.getWrapper();sendResponse(h, e, "notification");}
  • 以上代码片段可以看出process方法逻辑比较简单以下几个步骤:
    • 将请求头标记为-1,标识当前是一个通知
    • 将watchedEvent包装成WatcherEvent,以方便进行网络传输序列化(之前篇解释过WatcherEvent用来网络传输用)
    • 向客户端发送该通知
  • 我们从上面步骤看其实他并没有处理客户端Watcher的逻辑,只是借用当前客户端连接的ServerCnxn对象来实现对客户端WatchedEvent的传递,真正的客户端Watcher回调与业务逻辑的执行肯定都在客户端这边

客户端回调Watcher

  • 对于一个来自服务器的通知是通ServerCnxn中发送出来的,同样的,客户端这边的响应也是在一个类似的类中ClientCnxn中,ClientCnxn中通过SendThread来收事件通知
SendThread接收事件通知
  • 我们来看下ClientCnxn的接收通知的源码处理:
class SendThread extends ZooKeeperThread {private long lastPingSentNs;private final ClientCnxnSocket clientCnxnSocket;private Random r = new Random(System.nanoTime());        private boolean isFirstConnect = true;void readResponse(ByteBuffer incomingBuffer) throws IOException {ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);ReplyHeader replyHdr = new ReplyHeader();replyHdr.deserialize(bbia, "header");......if (replyHdr.getXid() == -1) {// -1 means notificationif (LOG.isDebugEnabled()) {LOG.debug("Got notification sessionid:0x"+ Long.toHexString(sessionId));}WatcherEvent event = new WatcherEvent();event.deserialize(bbia, "response");// convert from a server path to a client pathif (chrootPath != null) {String serverPath = event.getPath();if(serverPath.compareTo(chrootPath)==0)event.setPath("/");else if (serverPath.length() > chrootPath.length())event.setPath(serverPath.substring(chrootPath.length()));else {LOG.warn("Got server path " + event.getPath()+ " which is too short for chroot path "+ chrootPath);}}WatchedEvent we = new WatchedEvent(event);if (LOG.isDebugEnabled()) {LOG.debug("Got " + we + " for sessionid 0x"+ Long.toHexString(sessionId));}eventThread.queueEvent( we );return;}......}
  • 对于一个服务端的响应客户端由SendThread.readResponse(ByteBuffer incomingBuffer)方法来统一处理,如果响应头replyHdr中标识的XID为-1,标识这个是一个通知类型响应,对其的处理大体上分为4个步骤
    • 反序列化:replyHdr.deserialize(bbia, “header”); 方法Zookeeper客户端接收到请求首先将字节流转换成WatcherEvent对象
    • 处理Chrootpath:如果客户端设置了chrootPath属性,那么要对服务端传过来的完整的节点路径进行chrootPath处理,生成客户端的一个相对节点路径,例如客户端设置差rootPath为/appl,那么针对服务器端传来的响应节点路径为/appl/locks,经过chrootPath处理后,就变成相对路径/locks。
    • 还原WatchedEvent:通过接收到的WatcherEvent得到WatchedEvent
    • 回调Watcher,最后将WatchedEvent对象交给EventThread线程,在下一个轮询周期中进行Watcher回调。
EventThread处理事件通知
  • 如上流程中,服务的的Watcher时间通知最终交给了EventThread线程来处理,EventThread是Zookeeper客户端中专门用来处理服务器端通知的事件线程,我们看下EventThread中是怎么处理的。由上面代码中queueEvent 方法入口:
public void queueEvent(WatchedEvent event) {if (event.getType() == EventType.None&& sessionState == event.getState()) {return;}sessionState = event.getState();// materialize the watchers based on the eventWatcherSetEventPair pair = new WatcherSetEventPair(watcher.materialize(event.getState(), event.getType(),event.getPath()),event);// queue the pair (watch set & event) for later processingwaitingEvents.add(pair);}
  • 如上 QueueEvent方法首先根据该通知事件从ZKWatchManager中取出所有相关Watcher,materalize方法如下:
    private final Map<String, Set<Watcher>> dataWatches =new HashMap<String, Set<Watcher>>();private final Map<String, Set<Watcher>> existWatches =new HashMap<String, Set<Watcher>>();private final Map<String, Set<Watcher>> childWatches =new HashMap<String, Set<Watcher>>();public Set<Watcher> materialize(Watcher.Event.KeeperState state,Watcher.Event.EventType type,String clientPath){Set<Watcher> result = new HashSet<Watcher>();switch (type) {case None:......case NodeDataChanged:case NodeCreated:synchronized (dataWatches) {addTo(dataWatches.remove(clientPath), result);}synchronized (existWatches) {addTo(existWatches.remove(clientPath), result);}break;......}}final private void addTo(Set<Watcher> from, Set<Watcher> to) {if (from != null) {to.addAll(from);}}
  • 以上代码中客户端识别出EventType后,会从相应的Watcher存储(即上代码中dataWatches,existWatches,childWatches中一个或者多个,比如NodeCreated 事件类型从dataWatches ,和 existWatches中所有watcher)中去掉对应的Watcher,此处用的remove,标识客户端的Watcher机制统一也是一次性的,触发后该Watcher就失效了。
  • 接着利用Watcher封装成一个WatcherSetEventPair,并且将这个对象加入一个阻塞队列LinkedBlockingQueue 中,并且我们在ClientCnxn类中能找到一个run方法,这个方法会不断的从阻塞队列中take出数据然后再发送对应的通知process进行串行同步处理,这里的Watcher才是真正的客户端注册的Watcher,调用这个Watcher的process方法就可以实现回调了,保证FIFO。
@Overridepublic void run() {try {isRunning = true;while (true) {Object event = waitingEvents.take();if (event == eventOfDeath) {wasKilled = true;} else {processEvent(event);}......}} catch (InterruptedException e) {
......}
......}

Watcher特性总结

  • 通过我们上面的分析,了解了Watcher机制的相关接口定义以及Watcher的各类事件,我们以Zookeeper节点的数据内容过期接口为例,从Zookeeper客户端进行Watcher注册,服务的处理Watcher以及客户端回调watcher三方面阶段讲解了ZooKeeper的Watcher工作机制,发现Watcher有以下几个特点:
一次性
  • 无论是客户端还是服务端,一个Watcher被触发,ZooKeeper都会将其从相应的存储中移除,因此,开发人员在Watcher的使用上要记住的一点是要反复注册,这样的设计有效的减轻了服务端的压力,如果注册一个Watcher后一直有效,针对那些非常频繁的节点,服务端会不断向客户端发送事件通知,无论对网络还是服务器性能都有非常大影响
客户端串行执行
  • 客户端watcher回调的过程是一个串行同步的过程,这保证了顺序性,同事需要开发人员注意不要因为一个Watcher的处理逻辑影响了整个客户端Watcher回调
轻量级
  • WatchedEvent是整个ZooKeeperWatcher通知机制的最小单元,这个数据结构只包含三部分内容:通知状态,事件类型,节点路径。也就是说,Watcher通知非常简单,只告诉客户端发生了事件,而不说明具体的内容。
    • 例如针对NodeDataChanged时间ZooKeeper的Watcher只通知客户端指定数据节点的数据内容发生了变更,而对于原始数据以及变更后的数据都无法直接获取,需要主动获取,这也是Watcher机制的一个重要特性
  • 另外客户端向服务端注册Watcher时候,并不会吧客户端真实的watcher对象传到服务端,仅仅是在客户端请求中使用boolean类型属性标记,同事服务端也只保存当前连接的ServerCnxn对象
  • 轻量级设计使Watcher机制在网络开销和服务端内存开销上都非常廉价。

上一篇Zookeeper–Watcher机制源码剖析一
下一篇Zookeeper实践与应用- Canal

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/310599.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

吴军《硅谷来信》工作篇学习总结

【学习总结】| 作者 / Edison Zhou这是恰童鞋骚年的第215篇原创文章2018年在得到App上订阅了吴军老师的《硅谷来信》&#xff0c;从此每天的碎片时间就开始听吴军老师在大洋彼岸寄来的信件了。整个来信涵盖了职业发展、工作效率、业余生活等多个主题&#xff0c;从吴军老师的视…

Zookeeper实践与应用-- Nginx负载均衡差异

Nginx/ZooKeeper 负载均衡的差异 Nginx 是我们常见的反向代理服务器&#xff0c;也被广泛的用作负载均衡服务器ZooKeeper是分布式协调服务框架&#xff0c;有时也被用来做负载均衡 Nginx Nginx负载均衡配置非常简单&#xff0c;吧多个Web Server配置到nginx中&#xff0c;用…

从对我的质疑说起,谈谈Linux下的文件删除

特特本来就是个刚毕业的小菜&#xff0c;很多知识都是靠着大家的指点才慢慢学会的。之前在一篇"纯属虚构"的文章 (鹅厂后台开发工程师的工作日常) 提到使用 rm 命令删除一个近 100 G 的 log 文件。很荣幸&#xff0c;这篇文章被一个大号转载了&#xff0c;获得了很不…

Zookeeper实践与应用--分布式锁实现

分布式锁 分布式锁是控制分布式系统之间同步访问资源的一种方式&#xff0c;如果不同系统是同一个系统的不同主机之间共享一个或一组资源&#xff0c;那么访问这些资源的时候&#xff0c;往往需要通过一些呼哧手段来防止彼此之间的干扰保证统一性&#xff0c;因此需要分布式锁…

关于 Blazor Server Side 的一些杂项, 感想

在2016年, 本人就开始了一个内部项目, 其特点就是用C#构建DOM树, 然后把DOM同步到浏览器中显示. 并且在一些小工程中使用.3年下来, 效果很不错, 但因为是使用C#来构建控件树, 在没有特定语法的情况下, 代码风格不是那么好.典型的风格大概是这样的:这个模式挺好的, 有点嫌弃C#代…

重现江湖!大数据高并发——架构师秘籍

大数据高并发的话题屡见不鲜&#xff0c;各种应对的方式方法也四处可见。然而笔试面试中一问就懵&#xff0c;简直是高薪拦路虎。为什么呢&#xff1f;究其原因&#xff0c;还是思路不清晰&#xff0c;缺乏实操&#xff0c;所以一问就倒。作为专注.Net领域十几年的老司机&#…

[剑指offer]面试题4:替换空格

面试题4&#xff1a;替换空格 题目&#xff1a;请实现一个函数&#xff0c;把字符串中的每个空格替换成"%20"。例如输入“We are happy.”&#xff0c;则输出“We%20are%20happy.”。 ❖ 时间复杂度为O&#xff08;n2&#xff09;的解法&#xff0c;不足以拿到Offer…

Zookeepe实践与应用--分布队列

分布式队列 接触到不少分布式队列的产品&#xff0c;比如&#xff0c;ActiveMq&#xff0c;RocketMQ&#xff0c;kafka等消息中间价&#xff0c;现在我们看看Zookeeper实现的分布式队列。分布式队列简单讲就可以分两个部分&#xff0c;一种是先进先出&#xff0c;另外一种是等…

ASP.NET Core+Quartz.Net实现web定时任务

点击蓝色“Dotnet Plus”关注我哟加个“星标”&#xff0c;每天清晨 07:25&#xff0c;干货推送&#xff01;作为一枚后端程序狗&#xff0c;项目实践常遇到定时任务的工作&#xff0c;最容易想到的的思路就是利用Windows计划任务/wndows service程序/Crontab程序等主机方法在主…

Redis基础数据结构内部实现简单介绍

5种基础数据结构 Redis有5种基础数据结构&#xff0c;分别是&#xff1a;String&#xff08;字符串&#xff09;&#xff0c;list&#xff08;列表&#xff09;&#xff0c;hash&#xff08;字典&#xff09;&#xff0c;set&#xff08;集合&#xff09;&#xff0c;zset&…

[剑指offer]面试题7:用两个栈实现队列

面试题7&#xff1a;用两个栈实现队列 题目&#xff1a;用两个栈实现一个队列。队列的声明如下&#xff0c;请实现它的两个函数appendTail和deleteHead&#xff0c;分别完成在队列尾部插入结点和在队列头部删除结点的功能。 用两个栈模拟一个队列的操作: 代码如下: #include …

ASP.NET CORE WEBAPI文件下载

最近要使用ASP.NET CORE WEBAPI用来下载文件&#xff0c;使用的.NET CORE 3.1。考虑如下场景&#xff1a;文件是程序生成的。文件应该能兼容各种格式。浏览器可以感知进行下载。准备经过简单的调研&#xff0c;得到以下结论。ASP.NET CORE 提供FileResult这种类型的ActionResul…

Redis高级数据结构原理解析-bitmap,hyperloglog

Redis 位图 开发过程中&#xff0c;我们可能遇到这种场景记录用户的打卡情况&#xff0c;签到情况&#xff0c;这些场景只有两种结果&#xff0c;有或者没有&#xff0c;加入记录的数据量比较大&#xff0c;比如用一年的数据&#xff0c;如果用Redis中普通key/value&#xff0…

[剑指offer]面试题8:旋转数组的最小数字

面试题8&#xff1a;旋转数组的最小数字 题目&#xff1a;把一个数组最开始的若干个元素搬到数组的末尾&#xff0c;我们称之为数组的旋转。输入一个递增排序的数组的一个旋转&#xff0c;输出旋转数组的最小元素。例如数组{3,4,5,1,2}为{1,2,3,4,5}的一个旋转&#xff0c;该数…

.NET Core + Kubernetes:快速体验

Kubernetes[1] 是目前非常主流的容器编排工具&#xff0c;在应用创建、应用部署、应用扩容、应用更新等方面都非常的方便&#xff0c;而且在应用故障时&#xff0c;也可以快速自愈。所以基于微服务架构下的产品&#xff0c;了解 Kubernetes 的使用是非常必要的&#xff0c;我猜…

[剑指offer]面试题9:斐波那契数列

面试题9&#xff1a;斐波那契数列 题目一&#xff1a;写一个函数&#xff0c;输入n&#xff0c;求斐波那契&#xff08;Fibonacci&#xff09;数列的第n项。斐波那契数列的定义如下&#xff1a; ❖ 效率很低的解法&#xff0c;挑剔的面试官不会喜欢 代码如下: long long fib(…

Redis分布式锁奥义

分布式锁 分布式系统进行逻辑处理的时候&#xff0c;经常会遇到并发问题&#xff0c;例如直播场景中&#xff0c;用户需要连麦主播&#xff0c;当多个用户在同一个时刻一起连麦时候&#xff0c;应该保证只有一个用户能连麦成功&#xff0c;我们改怎么保证这种业务场景下保证数…

.NET Core + Kubernetes:Pod

在 .NET Core Kubernetes&#xff1a;快速体验 文章中&#xff0c;已经实现将一个 .NET Core API 服务部署在 Kubernetes 集群中&#xff0c;接下来将逐步了解 Kubernetes 中各核心模块。首先当然是 Pod&#xff0c;我相信 Pod 是在接触 Kubernetes 时听到较多的一个词语&…

LBS解决方案

LBS解决方案 LBS&#xff08;基于地理位置的服务&#xff09;服务是现在移动互联网中比较常用的功能&#xff0c;例如外卖中我附近的店铺&#xff0c;通常是以客户位置坐标为中心&#xff0c;查询一定范围内的店铺信息&#xff0c;按照距离由近及原进行倒叙排序 方案一&#…

长沙IT技术圈百万年薪大佬?是否存在?

作者&#xff1a;邹溪源&#xff0c;长沙资深互联网从业者&#xff0c;架构师社区特邀嘉宾&#xff01;01引子不知不觉&#xff0c;IT技术圈开始流传起“百万年薪”的故事&#xff0c;有人问我&#xff0c;长沙有百万大佬么&#xff1f;其实我也不知道。02背景长沙自古以来就是…