Zookeeprt实战(待完善)

目录

原生java客户端实战

常用API

代码

Curator客户端实战

1. maven依赖

2. 初始化客户端

3. 重试策略

4. 增删改成API

5. 监听器API

分布式ID生成器

顺序节点生成分布式ID

实现雪花算法

zookeeper实现分布式队列


原生java客户端实战

常用API

  • create(path, data, acl,createMode): 创建一个给定路径的 znode,并在 znode 保存 data[]的 数据,createMode指定 znode 的类型。
  • delete(path, version):如果给定 path 上的 znode 的版本和给定的 version 匹配, 删除 znode。
  • exists(path, watch):判断给定 path 上的 znode 是否存在,并在 znode 设置一个 watch。
  • getData(path, watch):返回给定 path 上的 znode 数据,并在 znode 设置一个 watch。
  • setData(path, data, version):如果给定 path 上的 znode 的版本和给定的 version 匹配,设置 znode 数据。
  • getChildren(path, watch):返回给定 path 上的 znode 的孩子 znode 名字,并在 znode 设置一个 watch。
  • sync(path):把客户端 session 连接节点和 leader 节点进行同步。

代码

1. 引入maven

        <dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.8.0</version></dependency>

2. 增删改查API 


@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class ZkJavaClient {private final static  String CLUSTER_CONNECT_STR="192.168.6.128:2181,192.168.6.128:2182,192.168.6.128:2183";private static ZooKeeper zooKeeper;// 初始化zk客户端@BeforeClasspublic static void initZookeeper() throws IOException, InterruptedException {System.out.println("initZookeeper");CountDownLatch countDownLatch =new CountDownLatch(1);zooKeeper = new ZooKeeper(CLUSTER_CONNECT_STR, 3000, new Watcher() {@Overridepublic void process(WatchedEvent event) {if(Event.KeeperState.SyncConnected==event.getState()&& event.getType()== Event.EventType.None){//如果收到了服务端的响应事件,连接成功countDownLatch.countDown();System.out.println("zookeeper连接建立");}}});System.out.println("zookeeper连接中...");countDownLatch.await();// 打印连接状态System.out.println(zooKeeper.getState());}public static String  getUniqueNode(String node) {return node + "-" + System.currentTimeMillis();}private static  String syncNode = getUniqueNode("/user");// 新增-同步@Testpublic void a_createSync() throws InterruptedException, KeeperException {String result = zooKeeper.create(syncNode, "kk".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);System.out.println("同步创建node成功" + result);}// 新增-异步@Testpublic void b_createASync() throws InterruptedException, KeeperException {zooKeeper.create(getUniqueNode("/user-sync"), "kk".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT,new AsyncCallback.StringCallback() {@Overridepublic void processResult(int rc, String path, Object ctx, String name) {System.out.println(String.format("异步创建node成功 rc  %s, path %s,ctx %s,name %s",rc,path,ctx,name));}}, "context");Thread.sleep(1000 * 2);}// 更新-同步@Testpublic void c_updateSync() throws InterruptedException, KeeperException {Stat stat = new Stat();byte[] data = zooKeeper.getData(syncNode, false, stat);System.out.println("node = " + syncNode + "修改前: " + new String(data));stat = zooKeeper.setData(syncNode, "kk2".getBytes(), stat.getVersion());System.out.println("同步修改node = " + syncNode + "成功   " + stat);data = zooKeeper.getData(syncNode, false, stat);System.out.println("node = " + syncNode + "修改后: " + new String(data));}// 删除@Testpublic void d_delSync() throws InterruptedException, KeeperException {Stat stat = new Stat();byte[] data = zooKeeper.getData(syncNode, false, stat);System.out.println("node = " + syncNode + "删除前: " + new String(data));zooKeeper.delete(syncNode, stat.getVersion());System.out.println("同步删除node = " + syncNode + "成功   ");stat = zooKeeper.exists(syncNode, null);System.out.println("node = " + syncNode + "删除后: " + stat);}
}

Curator客户端实战

1. maven依赖

Curator 包含了几个包:

  • curator-framework是对ZooKeeper的底层API的一些封装。
  • curator-client提供了一些客户端的操作,例如重试策略等。
  • curator-recipes封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等。
<!-- zookeeper client -->
<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.8.0</version>
</dependency><!--curator-->
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>5.1.0</version><exclusions><exclusion><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></exclusion></exclusions>
</dependency>

2. 初始化客户端

private final static  String CLUSTER_CONNECT_STR="192.168.6.128:2181,192.168.6.128:2182,192.168.6.128:2183";private static CuratorFramework client;@BeforeClasspublic static void initZookeeper() {System.out.println("initZookeeper");RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);client = CuratorFrameworkFactory.builder().connectString(CLUSTER_CONNECT_STR).sessionTimeoutMs(5000)  // 会话超时时间.connectionTimeoutMs(5000) // 连接超时时间.retryPolicy(retryPolicy).namespace("Curator-test") // 包含隔离名称.build();client.start();}

3. 重试策略

// 定义重试策略        
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);

        当客户端异常退出或者与服务端失去连接的时候,可以通过设置客户端重新连接 ZooKeeper 服务端。而 Curator 提供了 一次重试、多次重试等不同种类的实现方式。在 Curator 内部,可以通过判断服务器返回的 keeperException 的状态代码来判断是否进行重试处理,如果返回的是 OK 表示一切操作都没有问题,而 SYSTEMERROR 表示系统或服务端错误。

策略名称

描述

ExponentialBackoffRetry

重试一组次数,重试之间的睡眠时间增加

RetryNTimes

重试最大次数

RetryOneTime

只重试一次

RetryUntilElapsed

在给定的时间结束之前重试

4. 增删改成API


@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class ZkCuratorClient {private final static  String CLUSTER_CONNECT_STR="192.168.6.128:2181,192.168.6.128:2182,192.168.6.128:2183";private static CuratorFramework client;@BeforeClasspublic static void initZookeeper() {System.out.println("initZookeeper");RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);client = CuratorFrameworkFactory.builder().connectString(CLUSTER_CONNECT_STR).sessionTimeoutMs(5000)  // 会话超时时间.connectionTimeoutMs(5000) // 连接超时时间.retryPolicy(retryPolicy).namespace("Curator-test") // 包含隔离名称.build();client.start();}private static String TMP_PATH = getUniqueNode("/curator-kk");;// 创建单节点@Testpublic void a_Create() throws Exception {
//        String path = client.create().forPath("/curator-node");String pathResult = client.create().withMode(CreateMode.PERSISTENT).forPath(TMP_PATH, "kk".getBytes());System.out.println("同步创建node成功, path = " + TMP_PATH + " result = " + pathResult);}public static String  getUniqueNode(String node) {return node + "-" + System.currentTimeMillis();}// 创建父子节点@Testpublic void b_Create_Parent() throws Exception {String pathWithParent = getUniqueNode("/kk-parent/kk-sub-1");String pathResult = client.create().creatingParentsIfNeeded().forPath(pathWithParent, "kk_son".getBytes());System.out.println("同步创建node成功, path = " + pathWithParent + " result = " + pathResult);}// 更新节点@Testpublic void c_SetData() throws Exception {Stat stat = client.setData().forPath(TMP_PATH, "changed!".getBytes());System.out.println("更新node成功, path = " + TMP_PATH + " result = " + stat);}// 查询节点@Testpublic void d_GetData() throws Exception {byte[] bytes = client.getData().forPath(TMP_PATH);System.out.println("查询node成功, path = " + TMP_PATH + " result = " + new String(bytes));}// 删除节点@Testpublic void e_Delete() throws Exception {String pathWithParent="/kk-parent";client.delete().guaranteed().deletingChildrenIfNeeded().forPath(pathWithParent);}// 查询节点 - 异步@Testpublic void f_GetData_Async() throws Exception {client.getData().inBackground((item1, item2) -> {System.out.println(" background:  val  " + new String(item2.getData()) + " item2 = " + item2);}).forPath(TMP_PATH);Thread.sleep(1000 * 2);}// 查询节点 - 异步 - 指定线程池@Testpublic void g_GetData_Async_Excutor() throws Exception {ExecutorService executorService = Executors.newSingleThreadExecutor();client.getData().inBackground((item1, item2) -> {System.out.println(" background:  val  " + new String(item2.getData()) + " item2 = " + item2);},executorService).forPath(TMP_PATH);Thread.sleep(1000 * 2);}
}

5. 监听器API

  • NodeCache: 监听单节点
  • PathChildrenCache: 监听子节点
  • TreeCache: 监听所有层级子节点(树节点)

@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class ZkCuratorWatchClient {private final static  String CLUSTER_CONNECT_STR="192.168.6.128:2181,192.168.6.128:2182,192.168.6.128:2183";private static CuratorFramework client;@BeforeClasspublic static void initZookeeper() {System.out.println("initZookeeper");RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);client = CuratorFrameworkFactory.builder().connectString(CLUSTER_CONNECT_STR).sessionTimeoutMs(5000)  // 会话超时时间.connectionTimeoutMs(5000) // 连接超时时间.retryPolicy(retryPolicy).namespace("Curator-test") // 包含隔离名称.build();client.start();}private static String TMP_PATH = "/curator-kk-w";;// 添加单节点监听器-永久@Testpublic void a_addWatch() throws Exception {createIfNeed(TMP_PATH);NodeCache nodeCache = new NodeCache(client, TMP_PATH);nodeCache.getListenable().addListener(new NodeCacheListener() {@Overridepublic void nodeChanged() throws Exception {System.out.println(TMP_PATH + " path nodeChanged");print_GetData();}});nodeCache.start();Thread.sleep(1000 * 300);}// 添加子节点(Child)监听器-永久@Testpublic void b_addWatch_Child() throws Exception {createIfNeed(TMP_PATH);PathChildrenCache nodeCache = new PathChildrenCache(client, TMP_PATH, true);nodeCache.getListenable().addListener(new PathChildrenCacheListener() {@Overridepublic void childEvent(CuratorFramework cli, PathChildrenCacheEvent event) throws Exception {ChildData data = event.getData();System.out.println(" path nodeChanged" + data.getPath() + " type = " + event.getType() + " val = " + new String(data.getData()));}});nodeCache.start();Thread.sleep(1000 * 300);}// 添加所有子节点(Tree)监听器-永久@Testpublic void testTreeCache() throws Exception {createIfNeed(TMP_PATH);TreeCache treeCache = new TreeCache(client, TMP_PATH);treeCache.getListenable().addListener(new TreeCacheListener() {@Overridepublic void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {ChildData data = event.getData();System.out.println(" path nodeChanged" + data.getPath() + " type = " + event.getType() + " val = " + new String(data.getData()));}});treeCache.start();Thread.sleep(1000 * 300);}private void createIfNeed(String path) throws Exception {Stat stat = client.checkExists().forPath(path);System.out.println(stat);if  (stat == null) {String pathResult = client.create().withMode(CreateMode.PERSISTENT).forPath(path, "kk".getBytes());System.out.println("同步创建node成功, path = " + path + " result = " + pathResult);}}public void print_GetData() throws Exception {byte[] bytes = client.getData().forPath(TMP_PATH);System.out.println("查询node成功, path = " + TMP_PATH + " result = " + new String(bytes));}
}

分布式ID生成器

  • java的UUID
  • mongo的ObjectId
  • Redis的incr生成id
  • Twitter的SnowFlake算法
  • zookeeper的顺序节点

顺序节点生成分布式ID


public class IDMaker{private static CuratorFramework client;private final static  String CLUSTER_CONNECT_STR="192.168.6.128:2181,192.168.6.128:2182,192.168.6.128:2183";// 初始化客户端static  {RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);client = CuratorFrameworkFactory.builder().connectString(CLUSTER_CONNECT_STR).sessionTimeoutMs(5000)  // 会话超时时间.connectionTimeoutMs(5000) // 连接超时时间.retryPolicy(retryPolicy).namespace("Curator-test") // 包含隔离名称.build();client.start();}// 创建临时顺序节点private String createSeqNode(String pathPefix) throws Exception {//创建一个临时顺序节点String destPath = client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(pathPefix);return destPath;}// 生成分布式idpublic String  makeId(String path) throws Exception {String str = createSeqNode(path);if(null != str){//获取末尾的序号int index = str.lastIndexOf(path);if(index>=0){index+=path.length();return index<=str.length() ? str.substring(index):"";}}return str;}// 测试-多线程批量生成idpublic static void main(String[] args) throws Exception {String path = "/idmarker/id-";IDMaker idMaker = new IDMaker();for (int i = 0; i < 5; i++) {new Thread(() -> {for (int j = 0; j < 10; j++) {try {String id = idMaker.makeId(path);System.out.println(Thread.currentThread().getName() + " 第" + j + "生产的id = " +id);} catch (Exception e) {System.err.println(e);}}}, "thread-" + i).start();}Thread.sleep(1000 * 300);}
}

实现雪花算法

==

zookeeper实现分布式队列

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/587636.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vue3+vite 项目常用库

特别注意&#xff1a;需要使用管理员权限来打开命令提示符&#xff0c;不然会出现各种报错 yarn npm install -g yarn vite 1.使用vite创建项目 yarn create vite 2.安装包和运行 //或yarn yarn yarn dev 3.在vs code安装volar插件和Ant Design Vue Helper插件 需要注意…

2. 使用 Python 解释器

2.1. 调用 Python 解释器 Python 解释器通常被安装在目标机器的 /usr/local/bin/python 目录下。将 /usr/local/bin 目录包含进 Unix shell 的搜索路径里&#xff0c;以确保可以通过输入: python命令来启动它。由于 Python 解释器的安装路径是可选的&#xff0c;这也可能是其…

【一分钟】ThinkPHP v6.0 (poc-yaml-thinkphp-v6-file-write)环境复现及poc解析

写在前面 一分钟表示是非常短的文章&#xff0c;只会做简单的描述。旨在用较短的时间获取有用的信息 环境下载 官方环境下载器&#xff1a;https://getcomposer.org/Composer-Setup.exe 下载文档时可以设置代理&#xff0c;不然下载不上&#xff0c;你懂的 下载成功 cmd cd…

Redis经典五大类型源码及底层实现(二)

&#x1f44f;作者简介&#xff1a;大家好&#xff0c;我是爱吃芝士的土豆倪&#xff0c;24届校招生Java选手&#xff0c;很高兴认识大家&#x1f4d5;系列专栏&#xff1a;Spring源码、JUC源码、Kafka原理、分布式技术原理、数据库技术&#x1f525;如果感觉博主的文章还不错的…

【PHP】函数array_reduce()使用场景

目录 1.计算数组中所有元素的和 2.计算数组中所有元素的乘积 3.将多个字符串连接在一起 4.对数组中的元素进行逻辑计算 5.取出第一个满足条件的数组&#xff0c;筛选有用数组 6.array_reduce()函数的基本语法&#xff1a; array_reduce 函数通常用于对数组中的元素进行累…

数据结构:第7章:查找(复习)

顺序查找&#xff1a; ASL 折半查找&#xff1a; 这里 j 表示 二叉查找树的第 j 层 二叉排序树&#xff1a; 二叉排序树&#xff08;Binary Search Tree&#xff0c;BST&#xff09;是一种特殊的二叉树&#xff0c;定义&#xff1a; 对于二叉排序树的每个节点&#xff0c;…

全球电商平台API数据稳定接入

API是什么&#xff1f; API就是接口&#xff0c;就是通道&#xff0c;负责一个程序和其他软件的沟通&#xff0c;本质是预先定义的函数。”比如&#xff1a;电脑需要调用手机里面的信息&#xff0c;这时候你会拿一根数据线将电脑手机连接起来&#xff0c;电脑和手机上连接数据…

Linux学习笔记(一)

如果有自己的物理服务器请先查看[这篇文章](https://blog.csdn.net/yasinawolaopo/article/details/132391128)文章目录 网卡配置Linux基础指令ls:列出目录内容cd(mkdir.rmkdir): 切换文件夹(创建,删除操作)cp:复制文件或目录mv:文件/文件夹移动cat:查看文件vi:文件查看编辑man…

纯前端 文件预览方法汇总

以下是通过javaScript的方法实现文件预览的方法汇总&#xff1a; 1.使用HTML5的File API和Canvas来预览图片文件&#xff1a; <!DOCTYPE html> <html> <head><title>Image Preview</title> </head> <body><input type"fil…

二进制文件分割器

二进制文件分割器 时间: 2023.12.29 作者: FlameCyclone 自己写的一个能方便分割文件的小工具 使用说明 输出文件名 输出文件名规则前缀文件名开始固定名称序号(10/16进制显示, 宽度以输出最大序号为准)分割范围(16进制显示, 宽度以输出最大范围为准)CRC32校验码8字符组成…

touchHLE实战之游戏

前面推荐了touchHLE&#xff0c;号称可以玩旧的IOS游戏&#xff0c;但是国外还是管理的很严格的&#xff0c;一直没有找到合适的游戏文件测试。最近&#xff0c;发现官网上公布了开发者赠送的一款游戏&#xff0c;试了下完美运行。 看到国外贴吧reddit上有人推荐可用的ipa资源&…

蓝桥杯C/C++程序设计——成绩统计

题目描述 小蓝给学生们组织了一场考试&#xff0c;卷面总分为 100 分&#xff0c;每个学生的得分都是一个 0 到 100 的整数。 如果得分至少是 60 分&#xff0c;则称为及格。如果得分至少为 85 分&#xff0c;则称为优秀。 请计算及格率和优秀率&#xff0c;用百分数表示&am…

不同语言告别2023,迎接2024

一、序言 1.一名合格的程序员&#xff0c;始于Hello World&#xff0c;终于Hello World&#xff0c;用不同语言表达2023最后一天。 2.在这一年里&#xff0c;博主新接触了VUE、Python、人工智能、JAVA的框架SprinBoot、微服务等&#xff0c;然后一路来感谢大家的支持&#xf…

ClickHouse基础知识(一):ClickHouse 入门

1. ClickHouse 入门 ClickHouse 是俄罗斯的 Yandex 于 2016 年开源的列式存储数据库&#xff08;DBMS&#xff09;&#xff0c;使用 C 语言编写&#xff0c;主要用于在线分析处理查询&#xff08;OLAP&#xff09;&#xff0c;能够使用 SQL 查询实时生成分析数据报告。 2. Cl…

python使用selenium控制浏览器进行爬虫

这里以谷歌浏览器为例&#xff0c;需要安装一下chromedriver&#xff0c;其他浏览器也有相对应的driver&#xff0c;chromedriver下载地址&#xff1a;https://googlechromelabs.github.io/chrome-for-testing/ 然后是打开python环境安装一下依赖pip install selenium&#xf…

【低代码平台】10个开源免费Airtable 的替代方案

Airtable是一个易于使用的简单低代码平台&#xff0c;有助于团队协作管理复杂的数据表&#xff0c;并创建定制的工作流程。把它想象成一个类固醇上的云电子表格。 Airtable还简化了数据输入过程&#xff0c;连接和集成第三方服务和应用程序&#xff0c;并提供了许多数据导入/导…

毅速:3D打印技术传统模具行业影响深远

随着3D打印技术的不断发展和完善&#xff0c;一系列的优势使其在模具制造领域的应用越来越广泛&#xff0c;这一技术在模具行业的应用将为整个行业带来变革。 首先&#xff0c;3D打印技术将大幅提高模具制造的精度和效率。传统的模具制造过程中&#xff0c;由于加工设备的限制和…

gitee(码云)仓库内容更新,使用TortoiseGit同步本地仓库和远程仓库

前言&#xff1a; 网上有很多同步仓库教程&#xff0c;但都是git命令行操作。这篇使用TortoiseGit可视化操作同步本地仓库和远程仓库 克隆本地仓库&#xff0c;上传远程仓库&#xff0c;下载TortoiseGit可以看这篇使用gitee&#xff08;码云&#xff09;上传自己的代码&#xf…

Altium Designer20中遇到的问题和解决办法记录

最近二战考完研了&#xff0c;重新拾起之前学的一些项目&#xff0c;最近在优化以前话的四层PCB版的时候发现了在使用AD使碰到一些问题现在记录如下&#xff1a; 1.Altium Designer 中的 Clearance Constraint 错误如何修改 &#xff1a; 我遇到的报错如下&#xff1a;  这…

Vue模板编译

Vue模板编译 Vue生命周期中&#xff0c;在初始化阶段各项工作做完之后调用了vm.$mount方法&#xff0c;该方法的调用标志着初始化阶段的结束和进入下一个阶段&#xff0c;从官方文档给出的生命周期流程图中可以看到&#xff0c;下一个阶段就进入了模板编译阶段(created和befor…