构建项目
zk版本:3.5.7,引入4.0.0的curator版本,Curator依赖的版本只能比zookeeper依赖的版本高。
Curator简单介绍
Curator是Netflix公司开源的一套zookeeper客户端框架,解决了很多Zookeeper客户端非常底层的细节开发工作,包括连接重连、反复注册Watcher和NodeExistsException异常等等,现在是Apache的开源项目。
Curator封装了很多功能(分布式锁、leader选举、分布式队列、共享计数器等等),更加简单易用
Curator对比zookeeper原生API
- 原生API的超时重连,需要手动操作,而Curator封装了很多重连策略,自动重连
- 原生API不支持递归创建节点,Curator可以递归创建节点
- 是对原生API的进一步封装,功能更多,更容易使用
- Curator 是Fluent的API风格(依赖于方法链、提高代码的易读性)
pom文件引入
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>curator-zk</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><!-- https://mvnrepository.com/artifact/junit/junit --><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.10</version><scope>test</scope></dependency><!-- curator--><!-- https://mvnrepository.com/artifact/org.apache.curator/curator-framework --><dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>4.0.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes --><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>4.0.0</version></dependency><!-- 日志--><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.21</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.21</version><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin></plugins></build>
</project>
log4j.properties(resource目录下)
### set log levels - for more verbose logging change 'info' to 'debug' ###
log4j.rootLogger=off, stdout
### direct log messages to stdout ###
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d{yyyy-MM-dd HH/:mm/:ss}]-%5p %c(line/:%L) %x-%m%n
建立连接
package com.tang.curator;import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.Test;public class CuratorTest {/*** 建立连接*/@Testpublic void testConnect() {//重试策略ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(3000,10);//第一种创建方式:通过构造器构造CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 60*1000, 15*1000, retryPolicy);//第二种创建方式:通过builder模式链式编程client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181").sessionTimeoutMs(60*1000).connectionTimeoutMs(15*1000).retryPolicy(retryPolicy).namespace("zkProject").build();//开启连接client.start();}
}
构造参数 | 意义 |
---|---|
connectString | 连接url,集群用逗号分割 ,127.0.0.1:2181,127.0.0.1:2182 |
sessionTimeoutMs | 会话超时时间(建立连接后,会话断开的重连时间) 单位:毫秒,默认60秒 |
connectionTimeoutMs | 连接超时时间 单位:毫秒,默认15秒 |
retryPolicy | 重试策略 |
namespace | 默认一个根目录,以后的所有创建都会在此目录下进行 |
RetryPolicy相关(重试连接策略)
ExponentialBackoffRetry
特点:采用指数退避算法来实现重试机制。每次重试失败后,等待时间间隔会逐渐增加,直到达到最大重试次数或者重试成功为止。这种策略适用于需要逐渐延长重试间隔以减少对系统的冲击,同时确保最终能够成功连接的场景。
实现:ExponentialBackoffRetry实现了RetryPolicy接口,并提供了一些构造方法来设置重试的参数,如最大重试次数、初始等待时间、最大等待时间等
RetryNTimes
特点:简单地重试N次,不考虑重试之间的时间间隔。这种策略适用于希望快速重试且对重试次数有明确限制的场景。
实现:通过指定重试次数N来实例化该策略,当重试次数达到N时停止重试
RetryOneTime
特点:只重试一次,即如果初次尝试失败,则进行一次重试,然后停止。这种策略适用于对重试要求不高的场景,或者作为其他策略的补充。
注意:虽然这种策略较为简单,但在某些情况下可能仍然有用,特别是当希望快速失败而不是无限重试时。
RetryUntilElapsed
特点:在指定的总时间内不断重试,直到成功或时间耗尽。这种策略适用于对时间有明确要求,且希望在这段时间内尽可能多地尝试连接的场景。
实现:需要指定一个最长等待时间,在这段时间内会不断尝试重试,直到成功或时间到期
RetryForever
特点:永远重试,直到成功为止。这种策略适用于对连接成功有极高要求,且不介意无限重试的场景。
注意:在实际应用中应谨慎使用此策略,以避免因无限重试而导致的资源耗尽或其他问题
添加节点
有四种添加节点的方式:
- 基本创建:create().forPath()
- 创建带数据的节点:create().forPath(path,value)
- 设置节点类型:create().withMode().forPath()
- 创建多节点(父节点不存在会报错,创建时需要调用:creatingParentContainersIfNeeded方法)
节点类型是一个CreateMode的枚举,有以下四种类型:
节点名称 | 枚举 | 解释 |
---|---|---|
持久节点 | Persistent | 节点创建后,会一直保存在ZooKeeper中,直到被明确删除 |
持久顺序节点 | PersistentSequential | 在创建子节点时,ZooKeeper会自动为节点名称添加一个数字后缀,以保证子节点的创建顺序 |
临时节点 | Ephemeral | 节点的生命周期与创建它的会话绑定在一起,会话结束后,节点也会被自动删除 |
临时顺序节点 | EphemeralSequential | 与持久顺序节点类似,但节点的生命周期是临时的,与会话绑定 |
/*** 创建节点*/@Testpublic void create() throws Exception {//1.基本创建//如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储client.create().forPath("/test");//2.创建带数据的节点,如果这里的path还是/test就会报错client.create().forPath("/test1","test".getBytes());}@Testpublic void testCreate2() throws Exception {// 设置节点类型 (默认:持久化)String path = client.create().withMode(CreateMode.EPHEMERAL) // 设置临时模式.forPath("/app3");//输出path为/app3System.out.println(path);}@Testpublic void testCreate3() throws Exception {// 创建多节点String path = client.create().creatingParentContainersIfNeeded() // 父节点不存在,则创建父节点.forPath("/app4/p2");//输出path为/app4/p2System.out.println(path);}
查询节点
- 查询数据:client.getData().forPath()
- 查询子节点:client.getChildren().forPath()
- 查询节点信息状态: client.getData().storingStatIn(stat).forPath()
/*** 查询节点* @throws Exception*/@Testpublic void testGet() throws Exception {//1.查询数据:getbyte[] data = client.getData().forPath("/app1");System.out.println("1.查询数据:"+new String(data));// 2.查询子节点:lsList<String> path = client.getChildren().forPath("/app4");System.out.println("2."+path);//3.查询节点状态信息:ls -sStat stat = new Stat();client.getData().storingStatIn(stat).forPath("/app1");System.out.println("3."+stat.toString());}
修改节点
- 基本数据修改: client.setData().forPath()
- 根据版本修改:client.setData().withVersion().forPath()
一般使用第二种修改方式,version 是通过查询出来的,目的是为了让其他客户端不干扰我修改(原子性操作)
/*** 1.基本修改数据:.setData().forPath()* * @throws Exception*/@Testpublic void testSet() throws Exception {client.setData().forPath("/test","MarryChristmas".getBytes());}/***2.根据版本修改* version 是通过查询出来的,目的是为了让其他客户端不干扰我(原子性操作)* @throws Exception*/@Testpublic void testSetForVersion() throws Exception {int version = 0;Stat stat = new Stat();client.getData().storingStatIn(stat).forPath("/test");version = stat.getVersion();client.setData().withVersion(version).forPath("/test","HappyNewYear".getBytes());}
删除节点
- 删除单个节点:client.delete().forPath()
- 删除带有子节点的节点: client.delete().deletingChildrenIfNeeded().forPath()
- 必须成功的删除:为了防止网络波动,本质就是重试:client.delete().guaranteed().forPath()
- 回调:inBackground()
/*** 删除节点* 1.删除单个节点* 2.删除带有子节点的节点* 3.必须成功的删除:为了防止网络抖动,本质就是重试* 4.回掉* @throws Exception*/@Testpublic void testDelete() throws Exception {client.delete().forPath("/test1");client.delete().deletingChildrenIfNeeded().forPath("/app4");client.delete().guaranteed().forPath("/app4");client.delete().guaranteed().inBackground(new BackgroundCallback() {@Overridepublic void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {System.out.println("水逆退散!");}}).forPath("/test");}
完整代码:
package com.tang.curator;import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;import java.util.List;public class CuratorTest {private CuratorFramework client;/*** 建立连接*/@Before//@Before表示在测试方法执行前执行@Testpublic void testConnect() {//重试策略ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(3000,10);//第一种创建方式:通过构造器构造client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 60*1000, 15*1000, retryPolicy);//第二种创建方式:通过builder模式链式编程client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181").sessionTimeoutMs(60*1000).connectionTimeoutMs(15*1000).retryPolicy(retryPolicy).namespace("zkProject").build();//开启连接client.start();}/*** 创建节点*/@Testpublic void create() throws Exception {//1.基本创建//如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储client.create().forPath("/test");//2.创建带数据的节点,如果这里的path还是/test就会报错client.create().forPath("/test1","test".getBytes());}@Testpublic void testCreate2() throws Exception {// 设置节点类型 (默认:持久化)String path = client.create().withMode(CreateMode.EPHEMERAL) // 设置临时模式.forPath("/app3");//输出path为/app3System.out.println(path);}@Testpublic void testCreate3() throws Exception {// 创建多节点String path = client.create().creatingParentContainersIfNeeded() // 父节点不存在,则创建父节点.forPath("/app4/p2");//输出path为/app4/p2System.out.println(path);}/*** 查询节点* @throws Exception*/@Testpublic void testGet() throws Exception {//1.查询数据:getbyte[] data = client.getData().forPath("/app1");System.out.println("1.查询数据:"+new String(data));// 2.查询子节点:lsList<String> path = client.getChildren().forPath("/app4");System.out.println("2."+path);//3.查询节点状态信息:ls -sStat stat = new Stat();client.getData().storingStatIn(stat).forPath("/app1");System.out.println("3."+stat.toString());}/*** 1.基本修改数据:.setData().forPath()** @throws Exception*/@Testpublic void testSet() throws Exception {client.setData().forPath("/test","MarryChristmas".getBytes());}/***2.根据版本修改* version 是通过查询出来的,目的是为了让其他客户端不干扰我(原子性操作)* @throws Exception*/@Testpublic void testSetForVersion() throws Exception {int version = 0;Stat stat = new Stat();client.getData().storingStatIn(stat).forPath("/test");version = stat.getVersion();client.setData().withVersion(version).forPath("/test","HappyNewYear".getBytes());}/*** 删除节点* 1.删除单个节点* 2.删除带有子节点的节点* 3.必须成功的删除:为了防止网络抖动,本质就是重试* 4.回掉* @throws Exception*/@Testpublic void testDelete() throws Exception {client.delete().forPath("/test1");client.delete().deletingChildrenIfNeeded().forPath("/app4");client.delete().guaranteed().forPath("/app4");client.delete().guaranteed().inBackground(new BackgroundCallback() {@Overridepublic void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {System.out.println("水逆退散!");}}).forPath("/test");}@Afterpublic void close() {if (client != null) {client.close();}}
}
参考博客:
https://blog.csdn.net/qq_37774171/article/details/122514318