Zookeeper(五)Zokeeper 环境搭建与Curator使用

目录

  • 一 环境搭建
    • 1.1 单机环境搭建
    • 1.2 可视化工具ZooKeeper Assistant
    • 1.3 集群环境搭建
  • 二 常用命令
    • 1.1 命令行语法
    • 1.2 数据节点信息
    • 1.3 节点类型
  • 三 CuratorAPI使用
    • 3.1 依赖
    • 3.1 创建会话
    • 3.2 基本使用增删改查
    • 3.3 ACL权限控制
    • 3.4 分布式锁
    • 3.5 分布式计数器
    • 3.6 分布式Barrier
    • 3.7 主从节点选举
    • 3.8 NodeCache监听
    • 3.9 PathChildrenCache监听
    • 3.10 TreeCache监听

  • 官网:Apache ZooKeeper

一 环境搭建

1.1 单机环境搭建

  • 必要环境:JDK
  • 下载地址:https://zookeeper.apache.org/
  • 历史版本:https://archive.apache.org/dist/zookeeper/

image.png
image.png
image.png

  • 我这里是本地环境,说名一下,无脑解压一下,放在本地环境目录

image.png

  • 复制配置文件一份zoo_sample未zoo

image.png

  • 修改配置文件

image.png

  • 启动

image.png

  • 查看

image.png

1.2 可视化工具ZooKeeper Assistant

  • 下载地址:http://www.redisant.cn/za

image.png

  • 查看状况

image.png

1.3 集群环境搭建

  • 这里我是伪集群环境搭建,注意集群环境只能是奇数

image.png

  • 这里我模拟三套服务器环境,一个主节点,两个从节点,主要是配置文件的变化
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=D:\\Tools\\Zookeeper\\ServerA\\data
dataLogDir=D:\\Tools\\Zookeeper\\ServerA\\log
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# https://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1## Metrics Providers
#
# https://prometheus.io Metrics Exporter
#metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
#metricsProvider.httpHost=0.0.0.0
#metricsProvider.httpPort=7000
#metricsProvider.exportJvmInfo=true
server.1=localhost:2886:3886
server.2=localhost:2887:3887
server.3=localhost:2888:3888

image.png
image.png
server.A=B:C:D;其中 A 是一个数字,表示这个是第几号服务器;B 是这个服务器的 ip 地址;C 表示的是这个服务器与集群中的 Leader 服务器交换信息的端口;D 表示的是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader,而这个端口就是用来执行选举时服务器相互通信的端口。如果是伪集群的配置方式,由于 B 都是一样,所以不同的 Zookeeper 实例通信端口号不能一样,所以要给它们分配不同的端口号。

  • myid 建立,依次写入1,2,3,id被称为Server ID,用来标识该机器在集群中的机器序号。同时,在每台ZooKeeper机器上,我们都需要在数据目录(即dataDir参数指定的那个目录)下创建一个 myid文件,该文件只有一行内容,并且是一个数字,即对应于每台机器的Server ID数字。

image.png

  • 后面的B,C一样的,一键启动脚本
@echo off
start cmd /k "cd /d D:\Tools\Zookeeper\ServerA\bin && call zkServer.cmd"
echo Starting ZooKeeper ServerA...
start cmd /k "cd /d D:\Tools\Zookeeper\ServerB\bin && call zkServer.cmd"
echo Starting ZooKeeper ServerB...
start cmd /k "cd /d D:\Tools\Zookeeper\ServerC\bin && call zkServer.cmd"
echo Starting ZooKeeper ServerC...
echo All ZooKeeper servers have been started.
  • 查看启动日志可以看到主从节点情况

image.png

二 常用命令

1.1 命令行语法

命令行语法功能描述
help显示所有操作命令
ls path使用ls命令来查看当前znode的子节点[可监听], -w 监听子节点变化, -s 附加次级信息
create普通创建, -s 含有序列, -e 临时(重启或超时消失)
get path获得节点的值[可监听] -w 监听节点内容变化, -s 附加次级信息
set设置节点的具体值
stat查看节点的状态
delete删除节点
deleteall递归删除节点

1.2 数据节点信息

[zk: bigdata01:2181(CONNECTED) 5] ls -s /
[zookeeper]cZxid = 0x0
ctime = Thu Jan 01 08:00:00 CST 1970
mZxid = 0x0
mtime = Thu Jan 01 08:00:00 CST 1970
pZxid = 0x0
cversion = -1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1
  1. cZxid: 创建的事务zxid
    每次修改Zookeeper状态都会产生一个zookeeper事务ID, 事务ID是Zookeeper中所有修改总的次序. 每次修改都有唯一的zxid, 如果zxid1小于zxid2, 那么zxid1在zxid2之前发生.
  2. ctime: znode被创建的毫秒数(从1970开始)
  3. mzxid: znode最后更新的事务zxid
  4. mtime: znode最后修改的毫秒数(从1970开始)
  5. pZxid: znode最后更新的子节点zxid
  6. cversion: znode子节点变化号, znode子节点修改次数
  7. dataversion:znode 数据变化号
  8. aclVersion:znode 访问控制列表的变化号
  9. ephemeralOwner:如果是临时节点,这个是 znode 拥有者的 session id。如果不是临时节点则是 0。
  10. dataLength:znode 的数据长度
  11. numChildren:znode 子节点数量

1.3 节点类型


这个命令就需要自己去练了

三 CuratorAPI使用

3.1 依赖

        <dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>4.0.0</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>4.0.0</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-x-discovery</artifactId><version>4.0.0</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-test</artifactId><version>4.0.0</version><scope>test</scope></dependency>

3.1 创建会话

使用CuratorFrameworkFactory这个工厂类的两个静态方法来创建一个客户端
image.png
image.png
image.png

package com.shu;import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;import java.util.ArrayList;
import java.util.List;/*** @author 31380* @description* @create 2024/3/16 18:39*/
public class CuratorUtils {/*** 创建连接** @param connectionString  连接地址* @param sessionTimeout    会话超时时间* @param connectionTimeout 连接超时时间* @return*/public static CuratorFramework createCuratorFramework(String connectionString, int sessionTimeout, int connectionTimeout) {return CuratorFrameworkFactory.builder().connectString(connectionString).sessionTimeoutMs(sessionTimeout).retryPolicy(new ExponentialBackoffRetry(1000, 3)).connectionTimeoutMs(connectionTimeout).build();}/*** 创建连接** @param connectionString  连接地址* @param sessionTimeout    会话超时时间* @param connectionTimeout 连接超时时间* @param retryPolicy       重试策略* @return*/public static CuratorFramework createCuratorFrameworkWithRetry(String connectionString,int sessionTimeout,int connectionTimeout,RetryPolicy retryPolicy) {return CuratorFrameworkFactory.builder().connectString(connectionString).sessionTimeoutMs(sessionTimeout).connectionTimeoutMs(connectionTimeout).retryPolicy(retryPolicy).build();}/*** 创建一个隔离的命名空间*/public static CuratorFramework createNamespaceCuratorFramework(String connectionString, int sessionTimeout, int connectionTimeout, String namespace) {return CuratorFrameworkFactory.builder().connectString(connectionString).sessionTimeoutMs(sessionTimeout).connectionTimeoutMs(connectionTimeout).namespace(namespace).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();}/*** ZooDefs.Perms.READ:读权限* ZooDefs.Perms.WRITE:写权限* ZooDefs.Perms.CREATE:创建子节点权限* ZooDefs.Perms.DELETE:删除权限* ZooDefs.Perms.ADMIN:管理权限* ZooDefs.Perms.ALL:所有权限* 以下是一些常用的身份验证方案:* Ids.ANYONE_ID_UNSAFE:表示任何人都可以访问* Ids.AUTH_IDS:表示使用已验证的用户身份* Ids.OPEN_ACL_UNSAFE:表示开放的ACL,任何人都可以访问*  ACL acl = new ACL(ZooDefs.Perms.READ, new Id("myUser", "myPassword"));* @return*/public static List<ACL> getAclList() {ArrayList<ACL> acls = new ArrayList<>();// 权限设置ACL acl = new ACL(ZooDefs.Perms.ALL, ZooDefs.Ids.ANYONE_ID_UNSAFE);// 添加权限acls.add(acl);return acls;}}

3.2 基本使用增删改查

  • 新增
package com.shu.base;import com.shu.CuratorUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.CreateMode;import java.util.List;/*** @author 31380* @description* @create 2024/3/16 18:43*/
public class CuratorCreatTest {/*** 总结:curator创建节点方法* 1.创建节点,如果节点已经存在则抛出异常 create().forPath()* 2.withMode():节点类型: CreateMode.EPHEMERAL 临时节点,CreateMode.PERSISTENT 永久节点* 3.递归创建节点 creatingParentsIfNeeded()* 4.查询所有子节点 getChildren().forPath()* 5.删除节点 delete().forPath()* 6.判断节点是否存在 checkExists().forPath()* 7.关闭连接 close()* @param args* @throws Exception*/public static void main(String[] args) throws Exception {CuratorFramework curatorClint = CuratorUtils.createCuratorFramework("127.0.0.1:2181", 1000, 1000);System.out.println("连接成功!");curatorClint.start();// 创建节点,如果节点已经存在则抛出异常try {curatorClint.create().forPath("/test");} catch (Exception e) {System.out.println("创建节点失败!"+e.getMessage());}// 删除节点try {curatorClint.delete().forPath("/test");System.out.println("删除节点成功!");} catch (Exception e) {System.out.println("删除节点失败!"+e.getMessage());}/*** 临时节点(EPHEMERAL):临时创建的,会话结束节点自动被删除,也可以手动删除,临时节点不能拥有子节点.* 持久节点(PERSISTENT):创建后永久存在,除非主动删除。*/// 临时节点,当会话结束后,节点自动删除curatorClint.create().withMode(CreateMode.EPHEMERAL).forPath("/secondPath", "hello,word".getBytes());System.out.println("临时节点:"+new String(curatorClint.getData().forPath("/secondPath")));// 永久节点curatorClint.create().withMode(CreateMode.PERSISTENT).forPath("/thirdPath", "hello,word".getBytes());System.out.println("永久节点:"+new String(curatorClint.getData().forPath("/thirdPath")));// 递归创建节点curatorClint.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/parent/child", "hello,word".getBytes());System.out.println("递归创建节点:"+new String(curatorClint.getData().forPath("/parent/child")));// 查询所有子节点List<String> list= curatorClint.getChildren().forPath("/");System.out.println(list);// 关闭连接curatorClint.close();}
}
  • 读取
package com.shu.base;import com.shu.CuratorUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.data.Stat;import java.util.Date;/*** @author 31380* @description 读取节点数据* @create 2024/3/17 11:28*/
public class CuratorReadTest {/*** 总结:* 1. 读取单个节点数据:curatorClint.getData().forPath("/base/test")* 2. 读取多个节点数据:curatorClint.getChildren().forPath("/test").forEach(System.out::println)* 3. 读取节点数据并获取 stat:curatorClint.getData().storingStatIn(stat).forPath("/base/test")* 4:Stat:节点状态,包含节点的版本、数据长度、子节点数量、创建时间、修改时间、最近一次修改的事务 ID、数据版本、ACL 版本、临时节点* @param args*/public static void main(String[] args) {// 地址String connectString = "127.0.0.1:2181"; // 确保连接字符串正确CuratorFramework curatorClint = CuratorUtils.createCuratorFramework(connectString, 1000, 1000);System.out.println("连接成功!");curatorClint.start();// 读取单个节点数据try {byte[] bytes = curatorClint.getData().forPath("/base/test");System.out.println(new String(bytes));System.out.println("读取节点数据成功!");} catch (Exception e) {System.out.println("读取节点数据失败!"+e.getMessage());}// 读取多个节点数据try {curatorClint.getChildren().forPath("/test").forEach(System.out::println);System.out.println("读取多个节点数据成功!");} catch (Exception e) {System.out.println("读取多个节点数据失败!"+e.getMessage());}try {Stat stat = new Stat();byte[] data = curatorClint.getData().storingStatIn(stat).forPath("/base/test");String dataString = new String(data);System.out.println("节点数据:" + dataString);System.out.println("节点状态:");System.out.println("  节点创建版本:" + stat.getCversion());System.out.println("  数据长度:" + stat.getDataLength());System.out.println("  子节点数量:" + stat.getNumChildren());System.out.println("  创建时间:" + new Date(stat.getCtime()));System.out.println("  修改时间:" + new Date(stat.getMtime()));System.out.println("  最近一次修改的事务 ID:" + stat.getMzxid());System.out.println("  数据版本:" + stat.getVersion());System.out.println("  ACL 版本:" + stat.getAversion());System.out.println("  临时节点:" + stat.getEphemeralOwner());System.out.println("读取节点数据并获取 stat 成功!");} catch (Exception e) {System.out.println("读取节点数据并获取 stat 失败:" + e.getMessage());}// 关闭连接curatorClint.close();}
}
  • 删除
package com.shu.base;import com.shu.CuratorUtils;
import org.apache.curator.framework.CuratorFramework;/*** @author 31380* @description* @create 2024/3/16 19:25*/
public class CuratorDeleteTest {/*** 总结:* 1. 删除节点:delete().forPath("/test")* 2. 如果存在子节点,删除子节点:delete().deletingChildrenIfNeeded().forPath("/parent")* 3. 递归删除节点:delete().deletingChildrenIfNeeded().forPath("/secondPath")* 4. 判断节点是否存在:checkExists().forPath("/secondPath")* 5. 关闭连接:close()* @param args* @throws Exception*/public static void main(String[] args) throws Exception {CuratorFramework curatorClint = CuratorUtils.createCuratorFramework("127.0.0.1:2181", 1000, 1000);System.out.println("连接成功!");curatorClint.start();// 删除节点try {curatorClint.delete().forPath("/test");System.out.println("删除节点成功!");} catch (Exception e) {System.out.println("删除节点失败!"+e.getMessage());}// 如果存在子节点,删除子节点try {curatorClint.delete().deletingChildrenIfNeeded().forPath("/parent");System.out.println("删除节点成功!");} catch (Exception e) {System.out.println("删除节点失败!"+e.getMessage());}// 递归删除节点curatorClint.delete().deletingChildrenIfNeeded().forPath("/secondPath");// 判断节点是否存在if (curatorClint.checkExists().forPath("/secondPath") == null) {System.out.println("节点不存在!");} else {System.out.println("节点存在!");}// 关闭连接curatorClint.close();}
}
  • 修改
package com.shu.base;import com.shu.CuratorUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.data.Stat;/*** @author 31380* @description* @create 2024/3/17 11:35*/
public class CuratorUpdateTest {/*** 总计* 1. 更新节点:setData().forPath("/test", "hello,word".getBytes())* 2. 指定版本更新节点:setData().withVersion(1).forPath("/test", "hello,word".getBytes())* @param args*/public static void main(String[] args) throws Exception {// 地址String connectString = "127.0.0.1:2181";//创建节点CuratorFramework curatorClint = CuratorUtils.createCuratorFramework(connectString, 1000, 1000);System.out.println("连接成功!");curatorClint.start();// 更新节点try {curatorClint.setData().forPath("/base/test", "hello,word1111".getBytes());// 获取节点数据byte[] bytes = curatorClint.getData().forPath("/base/test");System.out.println(new String(bytes));System.out.println("更新节点成功!");} catch (Exception e) {System.out.println("更新节点失败!"+e.getMessage());}// 先获取节点的版本号Stat stat = new Stat();byte[] data = curatorClint.getData().storingStatIn(stat).forPath("/base/test");String dataString = new String(data);System.out.println("节点数据:" + dataString);System.out.println("节点状态:");System.out.println("  数据版本:" + stat.getVersion());// 指定版本更新节点:CAS 机制try {curatorClint.setData().withVersion(stat.getVersion()).forPath("/base/test", "hello,word2222".getBytes());// 获取节点数据byte[] bytes = curatorClint.getData().forPath("/base/test");System.out.println(new String(bytes));System.out.println("更新节点成功!");} catch (Exception e) {System.out.println("更新节点失败!"+e.getMessage());}}
}
  • 异步创建
package com.shu.base;import com.shu.CuratorUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.CreateMode;import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** @author 31380* @description Curator异步操作* @create 2024/3/17 11:42*/
public class CuratorAyncTest {/*** 总结:* 1 异步操作:inBackground()* 2.创建节点,如果节点已经存在则抛出异常 create().forPath()* 3.递归创建节点 creatingParentsIfNeeded()* @param args* @throws Exception*/public static void main(String[] args) throws Exception {// 地址String connectString = "127.0.0.1:2181";//创建节点CuratorFramework curatorClint = CuratorUtils.createCuratorFramework(connectString, 1000, 1000);System.out.println("连接成功!");curatorClint.start();CountDownLatch cdl = new CountDownLatch(2);ExecutorService executorService = Executors.newFixedThreadPool(2);curatorClint.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground((client, event) -> {System.out.println("Code:" + event.getResultCode());System.out.println("Type:" + event.getType());System.out.println("Path:" + event.getPath());cdl.countDown();}, executorService).forPath("/test1", "hello,word".getBytes());curatorClint.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground((client, event) -> {System.out.println("Code:" + event.getResultCode());System.out.println("Type:" + event.getType());System.out.println("Path:" + event.getPath());cdl.countDown();}).forPath("/test2", "hello,word".getBytes());cdl.await();executorService.shutdown();curatorClint.close();}/*** 事件类型* CREATE, // 创建* DELETE, // 删除* EXISTS, // 存在* GET_DATA, // 获取数据* SET_DATA, // 设置数据* CHILDREN, // 子节点* SYNC, // 同步* GET_ACL, // 获取ACL* SET_ACL, // 设置ACL* TRANSACTION, // 事务* GET_CONFIG, // 获取配置* RECONFIG, // 重新配置* WATCHED, // 监听* REMOVE_WATCHES, // 移除监听* CLOSING; // 关闭* @param args*//*** 响应码* OK(0), // OK* CONNECTIONLOSS(-4), // 连接丢失* MARSHALLINGERROR(-7), // 编组错误* UNIMPLEMENTED(-9), // 未实现* OPERATIONTIMEOUT(-10), // 操作超时* BADARGUMENTS(-8), // 错误参数* APIERROR(-100), // API错误* NONODE(-101), // 无节点·*/}
  • 不同的顺序节点
package com.shu.base;import com.shu.CuratorUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.CreateMode;/*** @author 31380* @description* @create 2024/3/17 11:03*/
public class CuratorSEQCreat {/*** 临时顺序节点(EPHEMERAL_SEQUENTIAL):具有临时节点特征,但是它会有序列号。* 持久顺序节点(PERSISTENT_SEQUENTIAL):具有持久节点特征,但是它会有序列号。* @param args*/public static void main(String[] args) {CuratorFramework curatorClint = CuratorUtils.createCuratorFramework("127.0.0.1:2181", 1000, 1000);System.out.println("连接成功!");curatorClint.start();// 创建一个持久顺序节点A-1,A-2,A-3try {curatorClint.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/test/A", "hello,word".getBytes());curatorClint.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/test/A", "hello,word".getBytes());curatorClint.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/test/A", "hello,word".getBytes());System.out.println("创建节点成功!");} catch (Exception e) {System.out.println("创建节点失败!"+e.getMessage());}// 创建一个临时顺序节点B-1,B-2,B-3try {curatorClint.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/test/B", "hello,word".getBytes());curatorClint.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/test/B", "hello,word".getBytes());curatorClint.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/test/B", "hello,word".getBytes());System.out.println("创建节点成功!");} catch (Exception e) {System.out.println("创建节点失败!"+e.getMessage());}// 关闭连接curatorClint.close();}
}
  • 事务
package com.shu.base;import com.shu.CuratorUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.transaction.CuratorTransactionResult;import java.util.Collection;/*** @author 31380* @description TODO* @create 2024/3/16 19:28*/
public class CuratorTransactionTest {public static void main(String[] args) throws Exception {CuratorFramework curatorClint = CuratorUtils.createCuratorFramework("127.0.0.1:2181", 1000, 1000);System.out.println("连接成功!");curatorClint.start();Collection<CuratorTransactionResult> commit = curatorClint.inTransaction().create().forPath("/xiao", "456".getBytes()).and().setData().forPath("/xiao", "123".getBytes()).and().commit();for (CuratorTransactionResult result : commit) {System.out.println(result.getForPath() + "--->" + result.getType());}curatorClint.close();}
}

3.3 ACL权限控制

package com.shu.acl;import com.shu.CuratorUtils;
import org.apache.curator.framework.CuratorFramework;/*** @author 31380* @description* @create 2024/3/16 19:56*/
public class CuratorAclTest {public static void main(String[] args) {CuratorFramework curatorClint = CuratorUtils.createCuratorFramework("127.0.0.1:2181", 1000, 1000);System.out.println("连接成功!");curatorClint.start();// 创建节点,ACL为ip:try {curatorClint.create().withACL(CuratorUtils.getAclList()).forPath("/test");System.out.println("创建节点成功!");} catch (Exception e) {System.out.println("创建节点失败!"+e.getMessage());}}
}/*** @description* @author 31380* @create 2024/3/17 11:12* Schema 代表权限控制模式,分别为:* ● World 任何人* ● Auth 不需要ID* ● Digest 用户名和密码方式的认证* ● IP Address IP地址方式的认证* perms(权限),ZooKeeper支持如下权限* ● CREATE: 创建子节点* ● READ: 获取子节点与自身节点的数据信息* ● WRITE:在Znode节点上写数据* ● DELETE:删除子节点* ● ADMIN:设置ACL权限* ————————————————*/
package com.shu.acl;

3.4 分布式锁

package com.shu.lock;import com.shu.CuratorUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;import java.text.SimpleDateFormat;
import java.util.concurrent.CountDownLatch;/*** @author 31380* @description 分布式锁* @create 2024/3/17 13:12*/
public class LockTest {/*** 分布式锁:InterProcessMutex* 1: 获取锁,acquire()* 2: 释放锁,release()* 3: 创建 InterProcessMutex 对象* 4: 调用 acquire() 方法获取锁* 5: 业务操作* 6: 调用 release() 方法释放锁* @param args*/public static void main(String[] args) {CountDownLatch latch = new CountDownLatch(10);String connectString = "127.0.0.1:2181";String lockPath = "/lock";CuratorFramework curatorFramework = CuratorUtils.createCuratorFramework(connectString, 1000, 1000);curatorFramework.start();InterProcessMutex lock = new InterProcessMutex(curatorFramework, lockPath);for (int i = 0; i < 10; i++) {new Thread(() -> {try {latch.await();lock.acquire();System.out.println(Thread.currentThread().getName() + "获取到锁");// 模拟业务操作,生成订单号Thread.sleep(1000);SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS");String orderNo = sdf.format(System.currentTimeMillis());System.out.println("生成的订单号:" + orderNo);} catch (Exception e) {e.printStackTrace();} finally {try {lock.release();} catch (Exception e) {e.printStackTrace();}}}, "Thread-" + i).start();latch.countDown();}}
}

3.5 分布式计数器

package com.shu.lock;import com.shu.CuratorUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.atomic.AtomicValue;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicInteger;/*** @author 31380* @description 分布式计数器* @create 2024/3/17 13:20*/
public class RecipeDisAtomicIntTest {/*** 分布式计数器:DistributedAtomicInteger* 1、创建DistributedAtomicInteger对象* 2、调用add方法* 3、获取当前值** @param args*/public static void main(String[] args) {String connectString = "127.0.0.1:2181";String connectString2 = "127.0.0.1:2182";String connectString3 = "127.0.0.1:2183";CuratorFramework curatorFramework = CuratorUtils.createCuratorFramework(connectString, 1000, 1000);curatorFramework.start();DistributedAtomicInteger atomicInteger = new DistributedAtomicInteger(curatorFramework, "/atomic", null);try {AtomicValue<Integer> added = atomicInteger.add(1);System.out.println("1Result: " + added.succeeded());// 获取当前值System.out.println("2Result: " + added.postValue());} catch (Exception e) {throw new RuntimeException(e);}// 客户端2CuratorFramework curatorFramework2 = CuratorUtils.createCuratorFramework(connectString2, 1000, 1000);curatorFramework2.start();DistributedAtomicInteger atomicInteger2 = new DistributedAtomicInteger(curatorFramework2, "/atomic", null);try {AtomicValue<Integer> added = atomicInteger2.add(1);System.out.println("2Result: " + added.succeeded());// 获取当前值System.out.println("2Result: " + added.postValue());} catch (Exception e) {throw new RuntimeException(e);}// 客户端3CuratorFramework curatorFramework3 = CuratorUtils.createCuratorFramework(connectString3, 1000, 1000);curatorFramework3.start();DistributedAtomicInteger atomicInteger3 = new DistributedAtomicInteger(curatorFramework3, "/atomic", null);try {AtomicValue<Integer> added = atomicInteger3.add(1);System.out.println("3Result: " + added.succeeded());// 获取当前值System.out.println("3Result: " + added.postValue());} catch (Exception e) {throw new RuntimeException(e);}}}

3.6 分布式Barrier

package com.shu.lock;import com.shu.CuratorUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.barriers.DistributedBarrier;/*** @author 31380* @description* 分布式Barrier:分布式 Barrier 是一种常见的同步原语,用于在分布式系统中协调多个进程或线程的执行顺序。* 它可以用来实现诸如等待直到所有参与者都准备好,然后一起执行某项任务,或者等待直到某些条件达成后再继续执行的场景。* @create 2024/3/17 13:27*/
public class CycliBarrierTest {static DistributedBarrier barrier;public static void main(String[] args) {String connectString = "127.0.0.1:2181";String path="/barrier";CuratorFramework curatorFramework = CuratorUtils.createCuratorFramework(connectString, 1000, 1000);curatorFramework.start();// 等待所有的线程到达barrier 10个线程for (int i = 0; i < 10; i++) {new Thread(() -> {try {barrier = new DistributedBarrier(curatorFramework, path);System.out.println(Thread.currentThread().getName() + "号barrier设置");barrier.setBarrier();barrier.waitOnBarrier();System.out.println("启动...");} catch (Exception e) {e.printStackTrace();}}, "Thread-" + i).start();}try {Thread.sleep(2000);barrier.removeBarrier();} catch (Exception e) {e.printStackTrace();}}
}

3.7 主从节点选举

package com.shu.master;import com.shu.CuratorUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;/*** @author 31380* @description 主节点选举* @create 2024/3/17 13:03*/
public class MasterSelectTest {/*** 主节点选举:LeaderSelector* 1、创建LeaderSelector对象* 2、调用start方法* 3、添加监听器* 4、关闭连接* @param args*/public static void main(String[] args) {// 地址String connectString = "127.0.0.1:2181";String connectString2 = "127.0.0.1:2182";String connectString3 = "127.0.0.1:2183";// 创建并连接 CuratorFramework 实例CuratorFramework curatorFramework1 = CuratorUtils.createCuratorFramework(connectString, 1000, 1000);CuratorFramework curatorFramework2 = CuratorUtils.createCuratorFramework(connectString2, 1000, 1000);CuratorFramework curatorFramework3 = CuratorUtils.createCuratorFramework(connectString3, 1000, 1000);curatorFramework1.start();curatorFramework2.start();curatorFramework3.start();// 第一个节点LeaderSelector leaderSelector1 = new LeaderSelector(curatorFramework1, "/master1", new LeaderSelectorListenerAdapter() {@Overridepublic void takeLeadership(CuratorFramework curatorFramework) throws Exception {System.out.println("节点1成为master节点");Thread.sleep(10000);System.out.println("节点1完成master操作,释放master权利");}});leaderSelector1.autoRequeue();leaderSelector1.start();// 第二个节点LeaderSelector leaderSelector2 = new LeaderSelector(curatorFramework2, "/master1", new LeaderSelectorListenerAdapter() {@Overridepublic void takeLeadership(CuratorFramework curatorFramework) throws Exception {System.out.println("节点2成为master节点");Thread.sleep(10000);System.out.println("节点2完成master操作,释放master权利");}});leaderSelector2.autoRequeue();leaderSelector2.start();// 第三个节点LeaderSelector leaderSelector3 = new LeaderSelector(curatorFramework3, "/master1", new LeaderSelectorListenerAdapter() {@Overridepublic void takeLeadership(CuratorFramework curatorFramework) throws Exception {System.out.println("节点3成为master节点");Thread.sleep(10000);System.out.println("节点3完成master操作,释放master权利");}});leaderSelector3.autoRequeue();leaderSelector3.start();try {Thread.sleep(Integer.MAX_VALUE);} catch (InterruptedException e) {e.printStackTrace();}}}

3.8 NodeCache监听

package com.shu.watch;import com.shu.CuratorUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;/*** @author 31380* @description* @create 2024/3/16 19:38*/
public class CuratorNodeCacheTest {/*** NodeCache:监听节点的新增、修改操作* 1、创建NodeCache对象* 2、调用start方法* 3、添加监听器* 4、关闭连接* @param args* @throws Exception*/public static void main(String[] args) throws Exception {String path = "/test";CuratorFramework curatorClint = CuratorUtils.createCuratorFramework("127.0.0.1:2181", 1000, 1000);System.out.println("连接成功!");curatorClint.start();final NodeCache nodeCache = new NodeCache(curatorClint, path);nodeCache.start();nodeCache.getListenable().addListener(new NodeCacheListener() {@Overridepublic void nodeChanged() throws Exception {System.out.println("监听事件触发");System.out.println("重新获得节点内容为:" + new String(nodeCache.getCurrentData().getData()));}});curatorClint.setData().forPath(path,"456".getBytes());curatorClint.setData().forPath(path,"789".getBytes());curatorClint.setData().forPath(path,"123".getBytes());curatorClint.setData().forPath(path,"222".getBytes());curatorClint.setData().forPath(path,"333".getBytes());curatorClint.setData().forPath(path,"444".getBytes());Thread.sleep(15000);}
}

3.9 PathChildrenCache监听

package com.shu.watch;import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;/*** @author 31380* @description* @create 2024/3/16 19:43*/
public class CuratorPathChildrenCacheTest {/*** PathChildrenCache:监听子节点的新增、修改、删除操作* @param args* @throws Exception*/public static void main(String[] args) throws Exception {CuratorFramework client = getClient();String parentPath = "/p1";PathChildrenCache pathChildrenCache = new PathChildrenCache(client,parentPath,false);/* * StartMode:初始化方式* POST_INITIALIZED_EVENT:异步初始化。初始化后会触发事件* NORMAL:异步初始化* BUILD_INITIAL_CACHE:同步初始化* */pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {@Overridepublic void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {System.out.println("事件类型:"  + event.getType() + ";操作节点:" + event.getData().getPath());switch(event.getType()){case CHILD_ADDED:System.out.println("新增子节点:" + event.getData().getPath());break;case CHILD_UPDATED:System.out.println("更新子节点:" + event.getData().getPath());break;case CHILD_REMOVED:System.out.println("删除子节点:" + event.getData().getPath());break;default:break;}}});String path = "/p1/c1";client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path);Thread.sleep(1000); // 此处需留意,如果没有现成睡眠则无法触发监听事件client.delete().forPath(path);Thread.sleep(15000);}private static CuratorFramework getClient(){RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181").retryPolicy(retryPolicy).sessionTimeoutMs(6000).connectionTimeoutMs(3000).namespace("demo").build();client.start();return client;}
}

3.10 TreeCache监听

package com.shu.watch;import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;/*** @author 31380* @description* @create 2024/3/16 19:44*/
public class CuratorWatcher3 {private static final String CONNECT_ADDR = "127.0.0.1:2181";private static final int SESSION_TIMEOUT = 5000;public static void main(String[] args) throws Exception {RetryPolicy policy = new ExponentialBackoffRetry(1000, 10);CuratorFramework curator = CuratorFrameworkFactory.builder().connectString(CONNECT_ADDR).sessionTimeoutMs(SESSION_TIMEOUT).retryPolicy(policy).build();curator.start();TreeCache treeCache = new TreeCache(curator, "/treeCache");treeCache.start();treeCache.getListenable().addListener((curatorFramework, treeCacheEvent) -> {switch (treeCacheEvent.getType()) {case NODE_ADDED:System.out.println("NODE_ADDED:路径:" + treeCacheEvent.getData().getPath() + ",数据:" + new String(treeCacheEvent.getData().getData())+ ",状态:" + treeCacheEvent.getData().getStat());break;case NODE_UPDATED:System.out.println("NODE_UPDATED:路径:" + treeCacheEvent.getData().getPath() + ",数据:" + new String(treeCacheEvent.getData().getData())+ ",状态:" + treeCacheEvent.getData().getStat());break;case NODE_REMOVED:System.out.println("NODE_REMOVED:路径:" + treeCacheEvent.getData().getPath() + ",数据:" + new String(treeCacheEvent.getData().getData())+ ",状态:" + treeCacheEvent.getData().getStat());break;default:break;}});curator.create().forPath("/treeCache", "123".getBytes());curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/treeCache/c1", "456".getBytes());curator.setData().forPath("/treeCache", "789".getBytes());curator.setData().forPath("/treeCache/c1", "910".getBytes());curator.delete().forPath("/treeCache/c1");curator.delete().forPath("/treeCache");Thread.sleep(5000);curator.close();}
}

详细介绍参考书籍《从Paxos到Zookeeper:分布式一致性原理与实践》

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/757669.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

婴儿洗衣机硬核测评:希亦、鲸立、小吉婴儿洗衣机性能大比拼!

如果你非常注重婴儿衣物的卫生问题&#xff0c;那么婴儿洗衣机则是非常理想的选择。毕竟&#xff0c;在婴儿吃奶或者接触其他材料时&#xff0c;其抵抗力是比较弱的&#xff0c;再加上普通洗衣机无法对婴儿的衣物进行有效的消毒处理&#xff0c;轻则会对婴儿的健康造成威胁&…

基于 HBase Phoenix 构建实时数仓(5)—— 用 Kafka Connect 做实时数据同步

目录 一、总体架构 二、安装配置 MySQL 1. 创建 mysql 用户 2. 建立 MySQL 使用的目录 3. 解压安装包 4. 配置环境变量 5. 创建 MySQL 配置文件 6. MySQL 系统初始化 7. 启动 mysql 服务器 8. 创建 dba 用户 三、配置 MySQL 主从复制 四、安装部署 Kafka Connector…

密码学——传统加密技术和公钥加密

传统加密技术和公开密钥 传统加密技术基本概念基本原理公开密钥基本概念基本原理传统加密技术 传统加密,即对称加密或称之为单钥加密,是公钥加密技术出现之前的主流加密技术,甚至在现在,仍然具有广泛应用。 基本概念 回顾一下一些基本概念: 明文,原始的消息和数据,也…

gin | gin环境搭建与示例工程

要安装Gin软件包&#xff0c;需要先安装Go并设置Go工作区。 1. 下载并安装 gin&#xff1a; go get -u github.com/gin-gonic/gin 2. 将 gin 引入到代码中&#xff1a; import "github.com/gin-gonic/gin" 3. (可选) 如果使用诸如 http.StatusOK 之类的常量&a…

23.python标准库之turtle库

一、窗体函数 turtle.setup(width, height, startx, starty) width:窗口宽度 height:窗口高度 startx:窗口与屏幕左侧距离&#xff08;单位象素&#xff09; starty:窗口与屏幕顶部距离&#xff08;单位象素&#xff09; 二、画笔状态函数 三、画笔运动函数

Java字符串精通之旅:从新手到专家

目录 一、字符串的创建 1.直接赋值 2.使用构造方法 二、字符串不可变性 三、常用操作 1.字符串长度 2.连接字符串 3.格式化字符串 四、示例代码&#xff1a;String类应用 五、String中常用的方法 在Java编程世界里&#xff0c;字符串无疑是最常用的数据类型之一。不论…

想要把PDF文件转TXT文本编辑改动怎么办?三秒钟帮你搞定 PDF编辑器

pdf是一种便携文件格式&#xff0c;是由Adobe公司所开发的独特的跨平台文件格式。PDF文件以PostScript语言图象模型为基础&#xff0c;无论在哪种打印机上都可保证精确的颜色和准确的打印效果&#xff0c;即PDF会忠实地再现原稿的每一个字符、颜色以及图象。有点遗憾的是&#…

golang实现循环队列

思路&#xff1a; 基于数组实现。当容量为k时&#xff0c;我们初始化一个容量为k1的数组arr&#xff0c;方便区分队列空和满。 当rearfront时&#xff0c;判断队列为空&#xff1b; 当(rear1) % len(arr) front时&#xff0c;判断队列为满&#xff1b; package mainimport (&…

【好用】Star超36.8k,一个的免费通用数据库管理工具

关于数据库管理工具&#xff0c;大家可能都在用SQLyog、Navicat、MySQL-Front、SQL Studio、MySQL Workbench等等&#xff0c;这些管理工具不是不好用&#xff0c;就是要变魔术才可以用&#xff0c;今天 V 哥给大家推荐一个即好用&#xff0c;又免费的可视化通用数据库管理工具…

Git 删除.git 目录

Git 删除.git 目录 如上图&#xff0c;删除项目中的.git目录 ①在windows系统上&#xff0c;进入该项目的根目录 ②在根目录下打开Git bash ③使用rm -rf .git命令递归删除.git目录 rm -rf .git在删除后想重新初始化&#xff0c;可以参考下面链接里的内容 Git初始化及远程推送…

腾讯云优惠券领取的几种方法,助你降低云服务成本

腾讯云优惠券领取的几种方法&#xff0c;助你降低云服务成本 一、最新优惠卷二、最新活动 腾讯云—腾讯倾力打造的云计算品牌&#xff0c;以卓越科技能力助力各行各业数字化转型&#xff0c;为全球客户提供领先的云计算、大数据、人工智能服务&#xff0c;以及定制化行业解决方…

刷题日记:面试经典 150 题 DAY6

刷题日记&#xff1a;面试经典 150 题 DAY6 392. 判断子序列167. 两数之和 II - 输入有序数组11. 盛最多水的容器15. 三数之和209. 长度最小的子数组 392. 判断子序列 原题链接 392. 判断子序列 双指针&#xff0c;i指向s&#xff0c;j指向t 如果s[i]t[j]&#xff0c;则匹配…

实验11-1-9 藏尾诗(PTA)

题目&#xff1a; 本题要求编写一个解密藏尾诗的程序。 注&#xff1a;在 2023 年 1 月 17 日 15 点 14 分以后&#xff0c;该题数据修改为 UTF-8 编码。 输入格式&#xff1a; 输入为一首中文藏尾诗&#xff0c;一共四句。每句一行&#xff0c;但句子不一定是等长的&#…

JDK1.8超详细安装教程

1、下载jdk1.8 大家可以直接去百度云盘下载&#xff1a; 链接&#xff1a;https://pan.baidu.com/s/187N6CU9Gu4bjtOz5_cjd-A?pwd3535 提取码&#xff1a;35352、开始安装 双击下载好的.exe文件&#xff0c;点击下一步 修改安装路径&#xff0c;点击下一步 会顺带安装jre…

Redux 的工作流程

Redux 是一个用于管理 JavaScript 应用程序状态的库。它的工作流程主要包括以下步骤&#xff1a; 1、Action 创建&#xff1a;当用户与应用程序交互时&#xff08;例如点击按钮&#xff09;&#xff0c;会触发一个 Action。Action 是一个描述了发生了什么的普通 JavaScript 对象…

107 在携带请求体的情况下, hutool 将 get 请求转换为了 post 请求

前言 本问题主要是来自于同事 情况大致如下, 同样的代码 一个是测试用例, 一个是生产环境的应用, 访问同一个第三方服务, 参数什么的完全一致 但是 出现的问题就是 测试用例能够拿到正确的对方的响应, 但是 生产环境的应用 却是拿到的对方的报错 然后 我开始以为是 是否…

前端面试题详解

前端面试 1.app如何实现登陆成功&#xff0c;卸载app重新安装再进入获取上一次已经登陆的信息&#xff1f; 要实现前端APP在登录成功后&#xff0c;即使卸载并重新安装也能获取上一次已经登录的信息&#xff0c;通常涉及以下几个关键步骤&#xff1a; 1. 使用持久化存储 在APP…

【CKA模拟题】学会JSONPath,精准定位Pod信息!

题干 For this question, please set this context (In exam, diff cluster name) kubectl config use-context kubernetes-adminkubernetesyou have a script named pod-filter.sh . Update this script to include a command that filters and displays the label with the…

安卓Android入门

安卓作为日常生活中不可缺少的移动操作系统&#xff0c;在5G的发展和应用过程中发挥着其重要的作用。 5G是第五代移动通信技术&#xff0c;拥有更快的速度、更高的带宽、更低的延迟和更大的连接密度。这一技术的快速发展为移动设备和应用提供了更多的可能性。 安卓和5G的关系…

哪些患者不适用于数字OT训练系统进行康复训练

数字OT&#xff08;Occupational Therapy&#xff0c;职业治疗&#xff09;训练系统是一种通过数字化技术辅助患者进行康复训练的方法。尽管数字OT训练系统可以帮助很多患者进行康复训练&#xff0c;但并非所有患者都适合使用该系统。以下是一些不适合使用数字OT训练系统进行康…