ZooKeeper 实战(四) Curator Watch事件监听

文章目录

  • ZooKeeper 实战(四) Curator Watch事件监听
    • 0.前言
    • 1.Watch 事件监听概念
    • 2.NodeCache
      • 2.1.全参构造器参数
      • 2.2.代码DEMO
      • 2.3.日志输出
    • 3.PathChildrenCache
      • 3.1.全参构造器参数
      • 3.2.子节点监听时间类型
      • 3.2.代码DEMO
    • 4.TreeCache
      • 4.1.构造器参数
      • 4.2.代码DEMO
      • 4.3.日志输出

ZooKeeper 实战(四) Curator Watch事件监听

0.前言

上一篇博客只介绍了有关Curator中对ZNode的CRUD操作,从本篇起开始逐步介绍更加高级的API操作。

1.Watch 事件监听概念

ZooKeeper 中引入了Watcher机制来实现了发布/订阅功能能,能够让多个订阅者同时监听某一个对象,当一个对象自身状态变化时,会通知所有订阅者。虽然ZooKeeper原生支持通过注册Watcher来进行事件监听,但是其使用并不是特别方便,需要开发人员反复注册Watcher,比较繁琐。

而 Curator 引入了Cache 来实现对 ZooKeeper 服务端事件的监听。

Curator 中提供了三种 Cache(Watcher)来监听不同节点变化类型:

  • NodeCache:监听指定的节点。
  • PathChildrenCache:监听指定节点的子节点。
  • TreeCache:监听指定节点及其子孙节点。

2.NodeCache

监听指定的节点,增删改都会监听。

2.1.全参构造器参数

/*** @param: client 注册监听的客户端* @param: path 节点路径* @param: dataIsCompressed 是否开启数据压缩。传递的数据会进行压缩,传递速度快,但取数据时需要把压缩的数据进行转换,默认为false*/
public NodeCache(CuratorFramework client, String path, boolean dataIsCompressed);

2.2.代码DEMO

    @Overridepublic void run(ApplicationArguments args) throws Exception {log.info("。。。。。。。。。。。。。容器初始化完毕。。。。。。。。。。。。。。");String path = "/ahao/watcher";TimeUnit.SECONDS.sleep(3);// 创建NodeCache对象NodeCache nodeCache = new NodeCache(client,path);// 添加监听器nodeCache.getListenable().addListener(new NodeCacheListener() {@Overridepublic void nodeChanged() throws Exception {ChildData currentData = nodeCache.getCurrentData();if (currentData != null){String s = new String(currentData.getData(),StandardCharsets.UTF_8);log.info("监听{}节点发生变化,数据内容:{}",path,s);}else {log.info("监听{}节点被删除了",path);}}});// 开启监听nodeCache.start();TimeUnit.SECONDS.sleep(2);// 创建节点client.create().creatingParentsIfNeeded().forPath(path,"第一次新增".getBytes(StandardCharsets.UTF_8));TimeUnit.SECONDS.sleep(2);// 更新节点client.setData().forPath(path,"数据修改了".getBytes(StandardCharsets.UTF_8));TimeUnit.SECONDS.sleep(2);// 删除节点client.delete().deletingChildrenIfNeeded().forPath(path);}

2.3.日志输出

在这里插入图片描述

3.PathChildrenCache

监听指定节点的子节点。当一个子节点增删改时, PathChildrenCache会包含最新的子节点的数据和状态。

3.1.全参构造器参数

/*** @param: client 注册监听的客户端* @param: path 节点路径* @param: cacheData 是否缓存节点内容(包含节点状态)* @param: dataIsCompressed 是否开启数据压缩。传递的数据会进行压缩,传递速度快,但取数据时需要把压缩的数据进行转换,默认为false* @param: executorService 用于PathChildrenCache的后台线程的线程池。该线程池应该是单线程的,否则缓存可能会看到不一致的结果*/
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final CloseableExecutorService executorService)

3.2.子节点监听时间类型

public enum Type
{// 子节点添加CHILD_ADDED,// 子节点的数据变更CHILD_UPDATED,// 子节点被删除CHILD_REMOVED,// 以下三个事件类型表示:当连接断开时,PathChildrenCache将继续保持其断开连接之前的状态,并且在连接恢复后,PathChildrenCache将为断开连接期间发生的所有添加、删除和更新发出正常的子事件。// 当连接状态处于ConnectionState.SUSPENDED。CONNECTION_SUSPENDED,// 当连接状态处于ConnectionState.RECONNECTEDCONNECTION_RECONNECTED,// 当连接状态处于ConnectionState.LOSTCONNECTION_LOST,// 当通过PathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT)启动监听时,该事件表示PathChildrenCache初始化完成This event signals that the initial cache has been populated.INITIALIZED
}

3.2.代码DEMO

    @Overridepublic void run(ApplicationArguments args) throws Exception {log.info("。。。。。。。。。。。。。容器初始化完毕。。。。。。。。。。。。。。");String path = "/ahao/watcher";TimeUnit.SECONDS.sleep(3);// 创建PathChildrenCache对象// 此处的cacheData参数一定要设置为true,不然Curator不会缓存数据当本地,// 那么后续pathChildrenCache.getCurrentData()得到的数据都为nullPathChildrenCache pathChildrenCache = new PathChildrenCache(client,path,true);// 添加监听器pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {@Overridepublic void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {if (event.getType() == PathChildrenCacheEvent.Type.INITIALIZED){log.info("PathChildrenCache初始化完,事件类型:{}", event.getType());}else {ChildData currentData = event.getData();log.info("事件类型:{},监听到的子节点发生变化:{}",event.getType(),currentData.getPath());}}});// 开启监听pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);// 创建子节点TimeUnit.SECONDS.sleep(2);client.create().creatingParentsIfNeeded().forPath(path+"/c1");client.create().creatingParentsIfNeeded().forPath(path+"/c2");client.create().creatingParentsIfNeeded().forPath(path+"/c3/age");// 修改子节点TimeUnit.SECONDS.sleep(2);client.setData().forPath(path+"/c1","c1更新了".getBytes(StandardCharsets.UTF_8));client.setData().forPath(path+"/c2","c2更新了".getBytes(StandardCharsets.UTF_8));// 删除子节点TimeUnit.SECONDS.sleep(2);client.delete().deletingChildrenIfNeeded().forPath(path+"/c3");}

3.3.日志输出

可以看出,PathChildrenCache只会监听直属子节点的变化,其非直属子节点的后代节点如/c3/age,没有发布通知。

在这里插入图片描述

4.TreeCache

监听指定节点及其子孙节点。

4.1.构造器参数

/*** @param: client 注册监听的客户端* @param: path 节点路径*/
public TreeCache(CuratorFramework client, String path)/*** @param: client 注册监听的客户端* @param: path 节点路径* @param: cacheData 是否缓存节点内容(包含节点状态)* @param: dataIsCompressed 是否开启数据压缩。传递的数据会进行压缩,传递速度快,但取数据时需要把压缩的数据进行转换,默认为false* @param: maxDepth 最大深度。最深的那个后代节点到path所需要经过的节点数* @param: executorService 用于PathChildrenCache的后台线程的线程池。该线程池应该是单线程的,否则缓存可能会看到不一致的结果* @param: createParentNodes 是否需要创建父节点。如果父节点不存在泽创建父节点(容器节点)* @param: TreeCacheSelector TreeCache选择器。根据指定的策略和条件,选择适合的缓存树来创建和维护TreeCache*/
TreeCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, int maxDepth, final ExecutorService executorService, boolean createParentNodes, TreeCacheSelector selector)

4.2.代码DEMO

    @Overridepublic void run(ApplicationArguments args) throws Exception {log.info("。。。。。。。。。。。。。容器初始化完毕。。。。。。。。。。。。。。");String path = "/ahao/watcher/tree";TimeUnit.SECONDS.sleep(3);// 创建TreeCache对象,也可通过TreeCache.newBuilder()创建TreeCache treeCache = new TreeCache(client,path);treeCache.getListenable().addListener(new TreeCacheListener() {@Overridepublic void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {if (event.getType() == TreeCacheEvent.Type.INITIALIZED){log.info("TreeCache初始化完,事件类型:{}", event.getType());}else {ChildData currentData = event.getData();log.info("事件类型:{},监听到的子节点发生变化:{}",event.getType(),currentData.getPath());}}});// 开启监听treeCache.start();// 创建节点TimeUnit.SECONDS.sleep(2);client.create().creatingParentsIfNeeded().forPath(path);client.create().creatingParentsIfNeeded().forPath(path +"/t1");client.create().creatingParentsIfNeeded().forPath(path +"/t2/ccc");// 修改子节点TimeUnit.SECONDS.sleep(2);client.setData().forPath(path,"根节点更新了".getBytes(StandardCharsets.UTF_8));client.setData().forPath(path +"/t2/ccc","/t2/ccc更新了".getBytes(StandardCharsets.UTF_8));// 删除子节点TimeUnit.SECONDS.sleep(2);client.delete().deletingChildrenIfNeeded().forPath(path +"/t2");}

4.3.日志输出

可以看出TreeCache会监听当前节点和后代节点的变化。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/619291.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Flink(十二)【容错机制】

前言 最近已经放假了,但是一直在忙一个很重要的自己的一个项目,用 JavaFX 和一个大数据组件联合开发一个功能,也算不枉我学了一次 JavaFX,收获很大,JavaFX 它作为一个 GUI 开发语言,本质还是 Java&#xff…

MSF流量加密

1、背景介绍 在MSF中生成shell,并上线运行时。都是通过http https tcp等协议传输。虽然MSF本身会对流量进行加密,但MSF太出名以致于其加密特征容易被IPS,WAF等可以检测带有攻击的特征的设备拦截或记录。 2、生成 SSL 证书 openssl req -x50…

关于运维·关于数据库面试题

目录 一、数据库类型 二、数据库引擎 三、mysql数据库类型 四、mysql的约束添加 五、主从复制原理 六、主从方式有几种 七、mysql主从数据不一致的原因 八、mysql的优化 九、什么是事务的特征 十、数据库读写分离的好处 十一、怎样优化sql语句 十二、mysql的同步方…

谷粒商城-商品服务-品牌管理-阿里云云存储+JSR303数字校验+统一异常处理

阿里云云存储OSS 分布式系统上传文件 分布式系统上传文件 单体应用上传:上传文件到服务器,想获取文件时再向服务器发请求获取文件。 分布式系统上传: 因为有多台服务器,为防止负载均衡导致获取文件时没找到对应的服务器&#xf…

实用编程调试技巧

目录 一、调试的基本步骤 二、Debug和Release的介绍 三、Windows环境调试介绍 1.调试环境的准备 2.学会快捷键 最常用的几个快捷键: 断点应用举例: 3.调试的时候查看程序当前信息 (1&#xff09…

GitHub注册新账号的操作流程(详细)

目录 第一步 进入官网,点击右上角的"Sign up" 第二步 输入email地址 第三步 设置密码 第四步 输入昵称 第五步 根据个人喜好决定要不要接收GitHub的邮件推送。然后回答他们的验证问题 第六步 输入验证码 我在注册github账号时遇到过一些阻碍&#x…

软件测试|教你使用Python绘制正多边形

简介 绘制正多边形是Python图形编程的基本任务之一。在本文中,我将为你提供一个使用Python绘制正多边形的详细教程,并提供一个示例代码。我们将使用Python的Turtle库来进行绘制。 步骤1:导入Turtle库 我们需要先安装好Python环境&#xff…

Shiro框架:Shiro内置过滤器源码解析

目录 1. 常见项目中过滤器配置 2.Url访问控制配置解析为内置过滤器 2.1 DefaultFilterChainManager构造并注册内置过滤器 2.2 构造过滤器链 3. Shiro内置过滤器解析 3.1 内置过滤器概览 3.2 公共继承类解析 3.2.1 顶层Filter接口 3.2.2 AbstractFilter 3.2.3 Nameab…

二十几种未授权访问漏洞合集

未授权访问漏洞是一个在企业内部非常常见的问题,这种问题通常都是由于安全配置不当、认证页面存在缺陷,或者压根就没有认证导致的。当某企业对外的服务端口、功能无限制开放,并且对用户的访问没有做任何限制的时候,可能会泄露出某…

PLC数组队列搜索FC(SCL代码+梯形图程序)

根据输入数据搜索输入数据队列中和输入数据相同的数,函数返回其所在队列的位置。这里我们需要用到博途PLC的数组指针功能,有关数组指针的详细使用方法,可以参考下面文章: 博途PLC数组指针: https://rxxw-control.blog.csdn.net/article/details/134761364 区间搜索FC …

常用计算电磁学算法特性与电磁软件分析

常用计算电磁学算法特性与电磁软件分析 参考网站: 计算电磁学三大数值算法FDTD、FEM、MOM ADS、HFSS、CST 优缺点和应用范围详细教程 ## 基于时域有限差分法的FDTD的计算电磁学算法(含Matlab代码)-框架介绍 参考书籍:The finite…

【python】06.函数和模块的使用

函数和模块的使用 在讲解本章节的内容之前,我们先来研究一道数学题,请说出下面的方程有多少组正整数解。 事实上,上面的问题等同于将8个苹果分成四组每组至少一个苹果有多少种方案。想到这一点问题的答案就呼之欲出了。 可以用Python的程序来…

Spring Boot 整合支付宝实现在线支付方案(沙箱环境)

文章目录 1.理解沙箱环境2.沙箱环境接入准备2.1 访问开发者控制台2.2 获取重要信息2.3 处理秘钥 3.接入支付宝支付的流程4.实现支付4.1 添加 SDK 依赖4.2 创建配置类4.3 支付宝订单管理接口实现流程4.4 支付宝支付接口实现流程 5.支付宝支付功能演示7.总结 TIP:对于…

【UEFI基础】EDK网络框架(VLAN)

VLAN VLAN代码综述 在MNP中有很多的VLAN介绍,MNP存在的一个重要原因也是为了处理VLAN,而本文介绍的NetworkPkg\VlanConfigDxe\VlanConfigDxe.inf其实只是一个帮助模块,真正的VLAN配置还是在MNP中。 VLAN同样是一个UEFI Driver Model&#…

Redis实现分布式会话

Redis实现分布式会话 1 什么是分布式会话 1 这是我么之前学过的注册登录模式 2 如果非常多的人访问,因为单台服务器的访问承受能力是有限的,那么我们就想用多态服务器来承担压力 3 一般通过负载均衡的方式来实现,来分担服务器的压力。 4 负…

【PlantUML】- 时序图

写在前面 本篇文章,我们来介绍一下PlantUML的时序图。这个相对类图来讲,比较简单,也不需要布局。读完文章,相信你就能实际操作了。 目录 写在前面一、基本概念二、具体步骤1.环境说明2.元素3.语法4.示例 三、参考资料写在后面系列…

软件测试|Python数据可视化神器——pyecharts教程(十)

使用pyecharts绘制漏斗图 简介 漏斗图(Funnel Chart)是一种用于可视化数据流程或转化率的图表类型。它通常由一系列阶段组成,每个阶段都有一个名称和一个值,表示在该阶段的转化量或数据流程的进展情况。漏斗图的名称来源于其外观…

Web自动化测试,一定得掌握的 8 个核心知识点

使用 cypress 进行端对端测试,和其他的一些框架有一个显著不同的地方,它使用 javascript 作为编程语言。传统主流的 selenium 框架是支持多语言的,大多数 QA 会的python 和 java 语言都可以编写 selenium 代码,遇到需要编写 js 代…

好用的便签有哪些?windows便签工具在哪打开?

每当我8点准时上班,在等待电脑开机的过程,我都会习惯性地思考整理今天要晚上的任务,列出所要完成的待办事项。随着每一项任务的清晰呈现,我的心情也逐渐明朗起来。当然了,这个时候,我迫切需要一款好用的便签…

VS游戏打包教程

我用得天天酷跑小游戏做的例子 1:安装打包插件 2:在解决方案里新建一个项目 3:新建一个setup项目 4:界面如下(通过右键folder,可以创建folder目录和输出) 5:素材文件 6:素材放完了就项目输出 7:创建快捷方式 右键这个主输出选择第一个create shortcut 8:将这个快捷方式,拖到,…