目录
原生java客户端实战
常用API
代码
Curator客户端实战
1. maven依赖
2. 初始化客户端
3. 重试策略
4. 增删改成API
5. 监听器API
分布式ID生成器
顺序节点生成分布式ID
实现雪花算法
zookeeper实现分布式队列
原生java客户端实战
常用API
- create(path, data, acl,createMode): 创建一个给定路径的 znode,并在 znode 保存 data[]的 数据,createMode指定 znode 的类型。
- delete(path, version):如果给定 path 上的 znode 的版本和给定的 version 匹配, 删除 znode。
- exists(path, watch):判断给定 path 上的 znode 是否存在,并在 znode 设置一个 watch。
- getData(path, watch):返回给定 path 上的 znode 数据,并在 znode 设置一个 watch。
- setData(path, data, version):如果给定 path 上的 znode 的版本和给定的 version 匹配,设置 znode 数据。
- getChildren(path, watch):返回给定 path 上的 znode 的孩子 znode 名字,并在 znode 设置一个 watch。
- sync(path):把客户端 session 连接节点和 leader 节点进行同步。
代码
1. 引入maven
<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.8.0</version></dependency>
2. 增删改查API
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class ZkJavaClient {private final static String CLUSTER_CONNECT_STR="192.168.6.128:2181,192.168.6.128:2182,192.168.6.128:2183";private static ZooKeeper zooKeeper;// 初始化zk客户端@BeforeClasspublic static void initZookeeper() throws IOException, InterruptedException {System.out.println("initZookeeper");CountDownLatch countDownLatch =new CountDownLatch(1);zooKeeper = new ZooKeeper(CLUSTER_CONNECT_STR, 3000, new Watcher() {@Overridepublic void process(WatchedEvent event) {if(Event.KeeperState.SyncConnected==event.getState()&& event.getType()== Event.EventType.None){//如果收到了服务端的响应事件,连接成功countDownLatch.countDown();System.out.println("zookeeper连接建立");}}});System.out.println("zookeeper连接中...");countDownLatch.await();// 打印连接状态System.out.println(zooKeeper.getState());}public static String getUniqueNode(String node) {return node + "-" + System.currentTimeMillis();}private static String syncNode = getUniqueNode("/user");// 新增-同步@Testpublic void a_createSync() throws InterruptedException, KeeperException {String result = zooKeeper.create(syncNode, "kk".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);System.out.println("同步创建node成功" + result);}// 新增-异步@Testpublic void b_createASync() throws InterruptedException, KeeperException {zooKeeper.create(getUniqueNode("/user-sync"), "kk".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT,new AsyncCallback.StringCallback() {@Overridepublic void processResult(int rc, String path, Object ctx, String name) {System.out.println(String.format("异步创建node成功 rc %s, path %s,ctx %s,name %s",rc,path,ctx,name));}}, "context");Thread.sleep(1000 * 2);}// 更新-同步@Testpublic void c_updateSync() throws InterruptedException, KeeperException {Stat stat = new Stat();byte[] data = zooKeeper.getData(syncNode, false, stat);System.out.println("node = " + syncNode + "修改前: " + new String(data));stat = zooKeeper.setData(syncNode, "kk2".getBytes(), stat.getVersion());System.out.println("同步修改node = " + syncNode + "成功 " + stat);data = zooKeeper.getData(syncNode, false, stat);System.out.println("node = " + syncNode + "修改后: " + new String(data));}// 删除@Testpublic void d_delSync() throws InterruptedException, KeeperException {Stat stat = new Stat();byte[] data = zooKeeper.getData(syncNode, false, stat);System.out.println("node = " + syncNode + "删除前: " + new String(data));zooKeeper.delete(syncNode, stat.getVersion());System.out.println("同步删除node = " + syncNode + "成功 ");stat = zooKeeper.exists(syncNode, null);System.out.println("node = " + syncNode + "删除后: " + stat);}
}
Curator客户端实战
1. maven依赖
Curator 包含了几个包:
- curator-framework是对ZooKeeper的底层API的一些封装。
- curator-client提供了一些客户端的操作,例如重试策略等。
- curator-recipes封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等。
<!-- zookeeper client -->
<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.8.0</version>
</dependency><!--curator-->
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>5.1.0</version><exclusions><exclusion><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></exclusion></exclusions>
</dependency>
2. 初始化客户端
private final static String CLUSTER_CONNECT_STR="192.168.6.128:2181,192.168.6.128:2182,192.168.6.128:2183";private static CuratorFramework client;@BeforeClasspublic static void initZookeeper() {System.out.println("initZookeeper");RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);client = CuratorFrameworkFactory.builder().connectString(CLUSTER_CONNECT_STR).sessionTimeoutMs(5000) // 会话超时时间.connectionTimeoutMs(5000) // 连接超时时间.retryPolicy(retryPolicy).namespace("Curator-test") // 包含隔离名称.build();client.start();}
3. 重试策略
// 定义重试策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
当客户端异常退出或者与服务端失去连接的时候,可以通过设置客户端重新连接 ZooKeeper 服务端。而 Curator 提供了 一次重试、多次重试等不同种类的实现方式。在 Curator 内部,可以通过判断服务器返回的 keeperException 的状态代码来判断是否进行重试处理,如果返回的是 OK 表示一切操作都没有问题,而 SYSTEMERROR 表示系统或服务端错误。
策略名称 | 描述 |
ExponentialBackoffRetry | 重试一组次数,重试之间的睡眠时间增加 |
RetryNTimes | 重试最大次数 |
RetryOneTime | 只重试一次 |
RetryUntilElapsed | 在给定的时间结束之前重试 |
4. 增删改成API
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class ZkCuratorClient {private final static String CLUSTER_CONNECT_STR="192.168.6.128:2181,192.168.6.128:2182,192.168.6.128:2183";private static CuratorFramework client;@BeforeClasspublic static void initZookeeper() {System.out.println("initZookeeper");RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);client = CuratorFrameworkFactory.builder().connectString(CLUSTER_CONNECT_STR).sessionTimeoutMs(5000) // 会话超时时间.connectionTimeoutMs(5000) // 连接超时时间.retryPolicy(retryPolicy).namespace("Curator-test") // 包含隔离名称.build();client.start();}private static String TMP_PATH = getUniqueNode("/curator-kk");;// 创建单节点@Testpublic void a_Create() throws Exception {
// String path = client.create().forPath("/curator-node");String pathResult = client.create().withMode(CreateMode.PERSISTENT).forPath(TMP_PATH, "kk".getBytes());System.out.println("同步创建node成功, path = " + TMP_PATH + " result = " + pathResult);}public static String getUniqueNode(String node) {return node + "-" + System.currentTimeMillis();}// 创建父子节点@Testpublic void b_Create_Parent() throws Exception {String pathWithParent = getUniqueNode("/kk-parent/kk-sub-1");String pathResult = client.create().creatingParentsIfNeeded().forPath(pathWithParent, "kk_son".getBytes());System.out.println("同步创建node成功, path = " + pathWithParent + " result = " + pathResult);}// 更新节点@Testpublic void c_SetData() throws Exception {Stat stat = client.setData().forPath(TMP_PATH, "changed!".getBytes());System.out.println("更新node成功, path = " + TMP_PATH + " result = " + stat);}// 查询节点@Testpublic void d_GetData() throws Exception {byte[] bytes = client.getData().forPath(TMP_PATH);System.out.println("查询node成功, path = " + TMP_PATH + " result = " + new String(bytes));}// 删除节点@Testpublic void e_Delete() throws Exception {String pathWithParent="/kk-parent";client.delete().guaranteed().deletingChildrenIfNeeded().forPath(pathWithParent);}// 查询节点 - 异步@Testpublic void f_GetData_Async() throws Exception {client.getData().inBackground((item1, item2) -> {System.out.println(" background: val " + new String(item2.getData()) + " item2 = " + item2);}).forPath(TMP_PATH);Thread.sleep(1000 * 2);}// 查询节点 - 异步 - 指定线程池@Testpublic void g_GetData_Async_Excutor() throws Exception {ExecutorService executorService = Executors.newSingleThreadExecutor();client.getData().inBackground((item1, item2) -> {System.out.println(" background: val " + new String(item2.getData()) + " item2 = " + item2);},executorService).forPath(TMP_PATH);Thread.sleep(1000 * 2);}
}
5. 监听器API
- NodeCache: 监听单节点
- PathChildrenCache: 监听子节点
- TreeCache: 监听所有层级子节点(树节点)
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class ZkCuratorWatchClient {private final static String CLUSTER_CONNECT_STR="192.168.6.128:2181,192.168.6.128:2182,192.168.6.128:2183";private static CuratorFramework client;@BeforeClasspublic static void initZookeeper() {System.out.println("initZookeeper");RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);client = CuratorFrameworkFactory.builder().connectString(CLUSTER_CONNECT_STR).sessionTimeoutMs(5000) // 会话超时时间.connectionTimeoutMs(5000) // 连接超时时间.retryPolicy(retryPolicy).namespace("Curator-test") // 包含隔离名称.build();client.start();}private static String TMP_PATH = "/curator-kk-w";;// 添加单节点监听器-永久@Testpublic void a_addWatch() throws Exception {createIfNeed(TMP_PATH);NodeCache nodeCache = new NodeCache(client, TMP_PATH);nodeCache.getListenable().addListener(new NodeCacheListener() {@Overridepublic void nodeChanged() throws Exception {System.out.println(TMP_PATH + " path nodeChanged");print_GetData();}});nodeCache.start();Thread.sleep(1000 * 300);}// 添加子节点(Child)监听器-永久@Testpublic void b_addWatch_Child() throws Exception {createIfNeed(TMP_PATH);PathChildrenCache nodeCache = new PathChildrenCache(client, TMP_PATH, true);nodeCache.getListenable().addListener(new PathChildrenCacheListener() {@Overridepublic void childEvent(CuratorFramework cli, PathChildrenCacheEvent event) throws Exception {ChildData data = event.getData();System.out.println(" path nodeChanged" + data.getPath() + " type = " + event.getType() + " val = " + new String(data.getData()));}});nodeCache.start();Thread.sleep(1000 * 300);}// 添加所有子节点(Tree)监听器-永久@Testpublic void testTreeCache() throws Exception {createIfNeed(TMP_PATH);TreeCache treeCache = new TreeCache(client, TMP_PATH);treeCache.getListenable().addListener(new TreeCacheListener() {@Overridepublic void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {ChildData data = event.getData();System.out.println(" path nodeChanged" + data.getPath() + " type = " + event.getType() + " val = " + new String(data.getData()));}});treeCache.start();Thread.sleep(1000 * 300);}private void createIfNeed(String path) throws Exception {Stat stat = client.checkExists().forPath(path);System.out.println(stat);if (stat == null) {String pathResult = client.create().withMode(CreateMode.PERSISTENT).forPath(path, "kk".getBytes());System.out.println("同步创建node成功, path = " + path + " result = " + pathResult);}}public void print_GetData() throws Exception {byte[] bytes = client.getData().forPath(TMP_PATH);System.out.println("查询node成功, path = " + TMP_PATH + " result = " + new String(bytes));}
}
分布式ID生成器
- java的UUID
- mongo的ObjectId
- Redis的incr生成id
- Twitter的SnowFlake算法
- zookeeper的顺序节点
顺序节点生成分布式ID
public class IDMaker{private static CuratorFramework client;private final static String CLUSTER_CONNECT_STR="192.168.6.128:2181,192.168.6.128:2182,192.168.6.128:2183";// 初始化客户端static {RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);client = CuratorFrameworkFactory.builder().connectString(CLUSTER_CONNECT_STR).sessionTimeoutMs(5000) // 会话超时时间.connectionTimeoutMs(5000) // 连接超时时间.retryPolicy(retryPolicy).namespace("Curator-test") // 包含隔离名称.build();client.start();}// 创建临时顺序节点private String createSeqNode(String pathPefix) throws Exception {//创建一个临时顺序节点String destPath = client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(pathPefix);return destPath;}// 生成分布式idpublic String makeId(String path) throws Exception {String str = createSeqNode(path);if(null != str){//获取末尾的序号int index = str.lastIndexOf(path);if(index>=0){index+=path.length();return index<=str.length() ? str.substring(index):"";}}return str;}// 测试-多线程批量生成idpublic static void main(String[] args) throws Exception {String path = "/idmarker/id-";IDMaker idMaker = new IDMaker();for (int i = 0; i < 5; i++) {new Thread(() -> {for (int j = 0; j < 10; j++) {try {String id = idMaker.makeId(path);System.out.println(Thread.currentThread().getName() + " 第" + j + "生产的id = " +id);} catch (Exception e) {System.err.println(e);}}}, "thread-" + i).start();}Thread.sleep(1000 * 300);}
}
实现雪花算法
==