支持断线重连、永久watcher、递归操作 ZooKeeper 客户端

项目介绍

ZooKeeper本质上是一个分布式的小文件存储系统。原本是Apache Hadoop的一个组件,现在被拆分为一个Hadoop的独立子项目。

Zookeeper 作为一个分布式的服务框架,主要用来解决分布式集群中应用系统的一致性问题,它能提供基于类似于文件系统的目录节点树方式的数据存储,但是 Zookeeper 并不是用来专门存储数据的,它的作用主要是用来维护和监控你存储的数据的状态变化。通过监控这些数据状态的变化,从而可以达到基于数据的集群管理。

Zookeeper 在Windows安装和使用,可参考http://www.cnblogs.com/shanyou/p/3221990.html

ZookeeperClient是在https://github.com/shayhatsor/zookeeper基础上的再次封装,使开发者更方便使用ZooKeeper相关的功能。

ZookeeperClient实现了断线重连,会话过期重连,永久监听,子节点数据变化的监听。并且加入了常用功能,例如分布式锁,Leader选举,分布式队列

,项目地址https://github.com/milanyangbo/ZooKeeper.Net

支持的平台

.NET 4及以上,目前尚不支持.NET Core, .NET Core推荐用另外一个项目 支持断线重连、永久watcher、递归操作并且能跨平台(.NET Core)的ZooKeeper异步客户端 

使用说明

下面列一下常用的使用方法,不仅限于此哦!

一、创建ZKClient对象

创建ZKClient对象 有两种方式可以方便的创建ZKClient对象

  1. 使用构造函数创建

 string address = "localhost:2181";
   ZKClient zkClient1 = new ZKClient(address);    
   ZKClient zkClient2 = new ZKClient(address, TimeSpan.FromMilliseconds(10000));  
   ZKClient zkClient3 = new ZKClient(address, TimeSpan.FromMilliseconds(10000), TimeSpan.FromMilliseconds(10000));  
   ZKClient zkClient4 = new ZKClient(address, TimeSpan.FromMilliseconds(30000), TimeSpan.FromMilliseconds(10000), new SerializableSerializer());      
   ZKClient zkClient5 = new ZKClient(address, TimeSpan.FromMilliseconds(30000), TimeSpan.FromMilliseconds(10000), new SerializableSerializer(), TimeSpan.FromMilliseconds(60000)); 


  1. 使用辅助类创建

string address = "localhost:2181";  
   ZKClient zkClient = ZKClientBuilder.NewZKClient(address)  
                              .SessionTimeout(30000)//可选  
                              .Serializer(new SerializableSerializer())//可选  
                              .RetryTimeout(60000)//可选  
                              .ConnectionTimeout(10000)//可选  
                              .Build(); //创建实例


二、节点的新增、更新、删除和获取

新增节点

  1. 常规新增节点

    父节点不存在会抛出异常

    await zkClient.CreateAsync("/test1", "123", CreateMode.EPHEMERAL);  await zkClient.CreateAsync("/test1-1", 123, CreateMode.EPHEMERAL_SEQUENTIAL);  await zkClient.CreateAsync("/test1-2", 123, CreateMode.PERSISTENT);  await zkClient.CreateAsync("/test1-3", 123, CreateMode.PERSISTENT_SEQUENTIAL);
  1. 递归新增节点(新增节点及其父节点)

    如果父节点不存在会被一并创建。
    对于PERSISTENT类型的节点,递归创建,父节点和子节点都创建为PERSISTENT。
    对于EPHEMERAL类型的节点,递归创建,父节点都是PERSISTENT类型,而最后一级节点才是EPHEMERAL类型。(因为EPHEMERAL不能拥有子节点)
    注意:第二个参数为节点的值,指的的最后一级节点的值。

 string path = "/test8/1/2/3";     //递归创建节点及父节点
    await zkClient.CreateRecursiveAsync(path, "abc", CreateMode.PERSISTENT);     await zkClient.CreateRecursiveAsync(path, "123", ZooDefs.Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT);

  1. 特殊的EPHEMERAL类型节点

    特殊类型的EPHEMERAL节点,该节点在会话失效被删除后,重新连接会被自动创建。


更新节点数据

   string path = "/test";   await zkClient.SetDataAsync(path, "456");   //带期望版本号的更新,如果真实的版本号与期望版本号不一致会更新失败,抛出异常await zkClient.SetDataAsync(path, "123", 2);

删除节点

  1. 常规删除

    bool flag = await zkClient.DeleteAsync("/test");//删除任意版本bool flag = await zkClient.DeleteAsync("/test",1);//删除指定版本
  1. 递归删除(删除节点及子节点)

    string path = "/test";    await zkClient.DeleteRecursiveAsync(path);//如果/test下有多个子节点,会被一并删除

获取节点数据

    string path = "/test";    await zkClient.GetDataAsync<string>(path); //如果节点不存在抛出异常await zkClient.GetDataAsync<string>(path, true); //如果节点不存在返回nullStat stat = (await zkClient.GetZKDataAsync<string>(path)).stat; //获得数据以及stat信息

等待节点创建

    string path = "/test";    //等待直到超时或者节点创建成功。await zkClient.WaitUntilExistsAsync(path, TimeSpan.FromMilliseconds(5000));

三、权限管理

ZooKeeper的权限管理亦即ACL控制功能通过Server、Client两端协调完成:
Server端:
一个ZooKeeper的节点(znode)存储两部分内容:数据和状态,状态中包含ACL信息。创建一个znode会产生一个ACL列表,列表中每个ACL包括:

验证模式(scheme)
具体内容(Id)(当scheme=“digest”时,Id为用户名密码,例如“root:J0sTy9BCUKubtK1y8pkbL7qoxSw=”)
权限(perms)

ZooKeeper提供了如下几种验证模式(scheme):

digest:Client端由用户名和密码验证,譬如user:password,digest的密码生成方式是Sha1摘要的base64形式
auth:不使用任何id,代表任何已确认用户。
ip:Client端由IP地址验证,譬如172.2.0.0/24
world:固定用户为anyone,为所有Client端开放权限
super:在这种scheme情况下,对应的id拥有超级权限,可以做任何事情(cdrwa)

注意的是,exists操作和getAcl操作并不受ACL许可控制,因此任何客户端可以查询节点的状态和节点的ACL。
节点的权限(perms)主要有以下几种:

Create 允许对子节点Create操作
Read 允许对本节点GetChildren和GetData操作
Write 允许对本节点SetData操作
Delete 允许对子节点Delete操作
Admin 允许对本节点setAcl操作
Znode ACL权限用一个int型数字perms表示,perms的5个二进制位分别表示setacl、delete、create、write、read。比如0x1f=adcwr,0x1=----r,0x15=a-c-r。

四、监听相关

注意:对于断开连接时间过长造成的会话过期,由于服务器端在会话过期后会删除客户端设置的监听。

即便客户端在会话过期后自动连接成功,但是在会话过期到会话重建这段时间客户端监听的节点仍可能发生了改变,

而具体哪些变了或是没变,客户端是无法感知到的。

为了避免丢掉任何数据改变的事件,所有的监听器的都有一个回调方法(SessionExpiredHandler),用来处理会话过期这种特殊情况。

    SessionExpiredHandler = async (path) =>{                    await Task.Run(() =>{                        Console.WriteLine(path);});};

订阅节点的信息改变(创建节点,删除节点,添加子节点)

    IZKChildListener childListener = new ZKChildListener();    //子节点内容变化childListener.ChildChangeHandler = async (parentPath, currentChilds) =>{               await Task.Run(() =>{                     Console.WriteLine(parentPath);                     Console.WriteLine(string.Join(".", currentChilds));});};     //子节点数量变化childListener.ChildCountChangedHandler = async (parentPath, currentChilds) =>{               await Task.Run(() =>{                     Console.WriteLine(parentPath);                     Console.WriteLine(string.Join(".", currentChilds));});};     //"/testUserNode" 监听的节点,可以是现在存在的也可以是不存在的 zkClient.SubscribeChildChanges("/testUserNode3", childListener);         

订阅节点的数据内容的变化

    IZKDataListener dataListener = new ZKDataListener();    // 节点创建和节点内容变化dataListener.DataCreatedOrChangeHandler = async (dataPath, data) =>{                await Task.Run(() =>{                      Console.WriteLine(dataPath + ":" + Convert.ToString(data));});};     // 节点删除dataListener.DataDeletedHandler = async (dataPath) =>{                 await Task.Run(() =>{                      Console.WriteLine(dataPath);});};      // 节点创建dataListener.DataCreatedHandler = async (dataPath, data) =>{                   await Task.Run(() =>{                       Console.WriteLine(dataPath + ":" + Convert.ToString(data));});};       // 节点内容变化dataListener.DataChangeHandler = async (dataPath, data) =>{                    await Task.Run(() =>{                        Console.WriteLine(dataPath);});};      zkClient.SubscribeDataChanges("/testUserNode", dataListener);

客户端状态监听

    IZKStateListener stateListener = new ZKStateListener();    //状态改变stateListener.StateChangedHandler = async (state) =>{                await Task.Run(() =>{                   Console.WriteLine(state.ToString());});};      //会话失效stateListener.SessionExpiredHandler = async (path) =>{                  await Task.Run(() =>{                     Console.WriteLine(path);});};       //创建会话stateListener.NewSessionHandler = async () =>{                   await Task.Run(() =>{});};        //会话失败stateListener.SessionEstablishmentErrorHandler = async (ex) =>{                  await Task.Run(() =>{                        Console.WriteLine(ex.Message);});};         zkClient.SubscribeStateChanges(stateListener);

五、扩展功能


分布式锁

 using (var zkClient = new ZKClient(TestUtil.zkServers)){    await zkClient.CreateRecursiveAsync("/zk/lock", null, CreateMode.Persistent);    //创建分布式锁, 非线程安全类,每个线程请创建单独实例。var _lock = new ZKDistributedLock(zkClient, "/zk/lock");    await _lock.LockAsync(); //获得锁//do sometingawait _lock.UnLockAsync();//释放锁}

Leader选举

 using (var zkClient = new ZKClient(TestUtil.zkServers)){          await zkClient.CreateRecursiveAsync("/zk/leader", null, CreateMode.Persistent);        var listener = new ZKLeaderSelectorListener();         
listener.takeLeadership = async (client, selector) =>{ Console.WriteLine("I am the leader-" + await selector.GetLeaderAsync()); selector.Close();}; var selector = new ZKLeaderSelector("id", true, zkClient, "/zk/leader", listener); //启动并参与Leader选举selector.Start(); //获得当前主服务的IDawait selector.GetLeaderAsync(); //如果要退出Leader选举selector.Close(); }

分布式队列

using (var zkClient = new ZKClient(TestUtil.zkServers)){         
       await zkClient.CreateRecursiveAsync("/zk/queue", null, CreateMode.PERSISTENT); var queue = new ZKDistributedQueue<long>(new ZKClient(TestUtil.zkServers), "/zk/queue") await queue.OfferAsync("123");//放入元素var value = await queue.PollAsync();//删除并获取顶部元素var value = await queue.PeekAsync(); //获取顶部元素,不会删除    }


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/325813.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

mongdb总结

#清屏 cls --windows #操作数据库 show dbs --查看所有数据库 use dbName --使用已有的dbName数据库&#xff0c;或者创建新的数据库dbName&#xff0c;如果一个数据库没有表就不存在 db --显示数据库名称 #操作集合 show collections …

JavaScript操作BOM简单案例

需要两个页面index.html和Test.html&#xff0c;可以直接运行&#xff0c;每个功能都已经注释完整&#xff0c;index.html页面的代码&#xff1a; <!DOCTYPE html> <html><head><meta charset"UTF-8"><title></title></head…

Java中“/”,“.”所代表的文件路径

转载自 Java中“/”&#xff0c;“.”所代表的文件路径 我们在开发的过程中&#xff0c;经常会去读、写文件。在读写文件的时候&#xff0c;就不得不写文件的路径&#xff0c;使用相对路径的方式有两种&#xff1a;”/”和 “.” 。在写文件的路径的时候&#xff0c;需要了解一…

Hibernate框架(1)

1.Hibernate框架简述 Hibernate的核心组件 在基于MVC设计模式的JAVA WEB应用中&#xff0c;Hibernate可以作为模型层/数据访问层。它通过配置文件(hibernate.properties或hibernate.cfg.xml)和映射文件(***.hbm.xml)把JAVA对象或PO(Persistent Object,持久化对象)映射到数据库中…

通过 Transifex 中文化开源软件

如果您对于汉化软件充满热情, 我软已经发布了以下的开源产品在 Transifex 平台&#xff0c;让社区的小伙伴们参与翻译以及审核: 如何参与? – Transifex 的新手 登录 Transifex 如果您第一次使用 Transifex, 您可以新建立一个账号或是通过您的 GitHub, Google 或 LinkedIn 账号…

JavaScript操作DOM元素

一、DOM&#xff1a; Document Object Model&#xff08;文档对象模型&#xff09; 二、DOM的分类&#xff1a; 1.DOM Core&#xff08;核心&#xff09; 2.HTML-DOM 3.CSS-DOM 三、节点属性&#xff1a; 1.lastElementChild:最后一个节点 2.firstElementChild:第一个节点 3.ne…

hibernate的lazy的使用

引用&#xff1a;https://blog.csdn.net/Vincent_yuan1991/article/details/53482487 一&#xff1a; lazy&#xff0c;延迟加载 Lazy的有效期&#xff1a;只有在session打开的时候才有效&#xff1b;session关闭后lazy就没效了。 lazy策略可以用在&#xff1a; 标签上&#x…

React中路由组件与一般组件

四、路由组件与一般组件 1.写法不同&#xff1a;一般组件&#xff1a;<Demo/>路由组件&#xff1a;<Route path"/demo" component{Demo}/>2.存放位置不同&#xff1a;一般组件&#xff1a;components路由组件&#xff1a;pages3.接收到的props不同&#…

第六期.Net开源社群联合分享--除了情结和价格,Azure最适合什么场景?等你来讲趟坑的实战经验!

嘿嘿&#xff0c;大家好啊&#xff01;好荣幸啊这一期&#xff0c;能够咱们.NET开源社区一块来做这次线上分享会。 我就是各位小伙伴可爱而且博学而且低调而且人见人爱花见花开而且谦虚但是经常口不择言的主持人老板娘Grace。 这次有新朋友&#xff0c;有老朋友&#xff0c;有…

jQuery选择器整理

第六章jQuery选择器一、jQuery选择器的分类&#xff1a; 1.基本选择器 2.层次选择器 3.属性选择器 4.基本过滤选择器 5.可见性过滤选择器 二、jQuery的基本选择器&#xff1a; ///基本选择器 //id选择器 //$("#div1").css(“background”,“red”); //class选择器 /…

遍历HashMap的四种方法

在Map集合中 values():方法是获取集合中的所有的值----没有键&#xff0c;没有对应关系&#xff0c; KeySet(): 将Map中所有的键存入到set集合中。因为set具备迭代器。所有可以迭代方式取出所有的键&#xff0c;再根据get方法。获取每一个键对应的值。 keySet():迭代后只能通…

支付系统的防重设计

转载自 支付系统的防重设计 导读 “目前在互联网应用的大部分支付场景中&#xff0c;对接支付宝、微信移动支付产品这样需要用户参与支付流程的支付方式已经变得非常普遍&#xff0c;类似的还有PC端银行网银支付&#xff1b;而通过绑定用户银行卡、对接银行卡快捷支付通道直接…

React中antd的按需引入+自定主题

antd的按需引入自定主题 1.安装依赖&#xff1a;yarn add react-app-rewired customize-cra babel-plugin-import less less-loader2.修改package.json...."scripts": {"start": "react-app-rewired start","build": "react-app-…

Windows Server Containers 支持 Windows 开发者使用 Docker

在过去几年里&#xff0c;Docker 和容器已成为全球开发界和企业最热门的话题之一。去年秋天发布的 Windows Server 2016 支持 Windows 开发者使用容器&#xff0c;使得这一热门话题再次升温。Windows 和 Docker 是如何走到一起的&#xff1f; 一切始于 2014 年隆重举办的普吉特…

漫画:什么是二叉堆?(修正版)

转载自 漫画&#xff1a;什么是二叉堆&#xff1f;&#xff08;修正版&#xff09; 什么是二叉堆&#xff1f; 二叉堆本质上是一种完全二叉树&#xff0c;它分为两个类型&#xff1a; 1.最大堆 2.最小堆 什么是最大堆呢&#xff1f;最大堆任何一个父节点的值&#xff0c;都…

React路由组件传递参数

向路由组件传递参数 1.params参数路由链接(携带参数)&#xff1a;<Link to/demo/test/tom/18}>详情</Link>注册路由(声明接收)&#xff1a;<Route path"/demo/test/:name/:age" component{Test}/>接收参数&#xff1a;this.props.match.params2.se…

jQuery选择器案例之——index.js

$(function(){//alert(a);///基本选择器//id选择器//$("#div1").css("background","red");//class选择器//$(".p1").css("color","red");//标签选择器//$("p").css("color","red")…

漫画:什么是堆排序

转载自 漫画&#xff1a;什么是堆排序 在上一篇漫画中&#xff0c;小灰介绍了 二叉堆 这样一种强大的数据结构&#xff1a; 漫画&#xff1a;什么是二叉堆&#xff1f;&#xff08;修正版&#xff09; 那么&#xff0c;这个二叉堆怎样来使用呢&#xff1f;我们这一期将会详…

监控——《微服务设计》读书笔记

在单块应用的世界里&#xff0c;当我们遇到问题时&#xff0c;我们至少清楚从哪里开始调查。网站访问速度&#xff1f;网站访问异常&#xff1f;CPU占用过高&#xff1f;这些都是单块应用程序的问题&#xff0c;单一的故障点会极大地简化对问题的排查。 而现在我们面对了多个微…