Zookeeper(五)Zokeeper 环境搭建与Curator使用

目录

  • 一 环境搭建
    • 1.1 单机环境搭建
    • 1.2 可视化工具ZooKeeper Assistant
    • 1.3 集群环境搭建
  • 二 常用命令
    • 1.1 命令行语法
    • 1.2 数据节点信息
    • 1.3 节点类型
  • 三 CuratorAPI使用
    • 3.1 依赖
    • 3.1 创建会话
    • 3.2 基本使用增删改查
    • 3.3 ACL权限控制
    • 3.4 分布式锁
    • 3.5 分布式计数器
    • 3.6 分布式Barrier
    • 3.7 主从节点选举
    • 3.8 NodeCache监听
    • 3.9 PathChildrenCache监听
    • 3.10 TreeCache监听

  • 官网:Apache ZooKeeper

一 环境搭建

1.1 单机环境搭建

  • 必要环境:JDK
  • 下载地址:https://zookeeper.apache.org/
  • 历史版本:https://archive.apache.org/dist/zookeeper/

image.png
image.png
image.png

  • 我这里是本地环境,说名一下,无脑解压一下,放在本地环境目录

image.png

  • 复制配置文件一份zoo_sample未zoo

image.png

  • 修改配置文件

image.png

  • 启动

image.png

  • 查看

image.png

1.2 可视化工具ZooKeeper Assistant

  • 下载地址:http://www.redisant.cn/za

image.png

  • 查看状况

image.png

1.3 集群环境搭建

  • 这里我是伪集群环境搭建,注意集群环境只能是奇数

image.png

  • 这里我模拟三套服务器环境,一个主节点,两个从节点,主要是配置文件的变化
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=D:\\Tools\\Zookeeper\\ServerA\\data
dataLogDir=D:\\Tools\\Zookeeper\\ServerA\\log
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# https://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1## Metrics Providers
#
# https://prometheus.io Metrics Exporter
#metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
#metricsProvider.httpHost=0.0.0.0
#metricsProvider.httpPort=7000
#metricsProvider.exportJvmInfo=true
server.1=localhost:2886:3886
server.2=localhost:2887:3887
server.3=localhost:2888:3888

image.png
image.png
server.A=B:C:D;其中 A 是一个数字,表示这个是第几号服务器;B 是这个服务器的 ip 地址;C 表示的是这个服务器与集群中的 Leader 服务器交换信息的端口;D 表示的是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader,而这个端口就是用来执行选举时服务器相互通信的端口。如果是伪集群的配置方式,由于 B 都是一样,所以不同的 Zookeeper 实例通信端口号不能一样,所以要给它们分配不同的端口号。

  • myid 建立,依次写入1,2,3,id被称为Server ID,用来标识该机器在集群中的机器序号。同时,在每台ZooKeeper机器上,我们都需要在数据目录(即dataDir参数指定的那个目录)下创建一个 myid文件,该文件只有一行内容,并且是一个数字,即对应于每台机器的Server ID数字。

image.png

  • 后面的B,C一样的,一键启动脚本
@echo off
start cmd /k "cd /d D:\Tools\Zookeeper\ServerA\bin && call zkServer.cmd"
echo Starting ZooKeeper ServerA...
start cmd /k "cd /d D:\Tools\Zookeeper\ServerB\bin && call zkServer.cmd"
echo Starting ZooKeeper ServerB...
start cmd /k "cd /d D:\Tools\Zookeeper\ServerC\bin && call zkServer.cmd"
echo Starting ZooKeeper ServerC...
echo All ZooKeeper servers have been started.
  • 查看启动日志可以看到主从节点情况

image.png

二 常用命令

1.1 命令行语法

命令行语法功能描述
help显示所有操作命令
ls path使用ls命令来查看当前znode的子节点[可监听], -w 监听子节点变化, -s 附加次级信息
create普通创建, -s 含有序列, -e 临时(重启或超时消失)
get path获得节点的值[可监听] -w 监听节点内容变化, -s 附加次级信息
set设置节点的具体值
stat查看节点的状态
delete删除节点
deleteall递归删除节点

1.2 数据节点信息

[zk: bigdata01:2181(CONNECTED) 5] ls -s /
[zookeeper]cZxid = 0x0
ctime = Thu Jan 01 08:00:00 CST 1970
mZxid = 0x0
mtime = Thu Jan 01 08:00:00 CST 1970
pZxid = 0x0
cversion = -1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1
  1. cZxid: 创建的事务zxid
    每次修改Zookeeper状态都会产生一个zookeeper事务ID, 事务ID是Zookeeper中所有修改总的次序. 每次修改都有唯一的zxid, 如果zxid1小于zxid2, 那么zxid1在zxid2之前发生.
  2. ctime: znode被创建的毫秒数(从1970开始)
  3. mzxid: znode最后更新的事务zxid
  4. mtime: znode最后修改的毫秒数(从1970开始)
  5. pZxid: znode最后更新的子节点zxid
  6. cversion: znode子节点变化号, znode子节点修改次数
  7. dataversion:znode 数据变化号
  8. aclVersion:znode 访问控制列表的变化号
  9. ephemeralOwner:如果是临时节点,这个是 znode 拥有者的 session id。如果不是临时节点则是 0。
  10. dataLength:znode 的数据长度
  11. numChildren:znode 子节点数量

1.3 节点类型


这个命令就需要自己去练了

三 CuratorAPI使用

3.1 依赖

        <dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>4.0.0</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>4.0.0</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-x-discovery</artifactId><version>4.0.0</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-test</artifactId><version>4.0.0</version><scope>test</scope></dependency>

3.1 创建会话

使用CuratorFrameworkFactory这个工厂类的两个静态方法来创建一个客户端
image.png
image.png
image.png

package com.shu;import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;import java.util.ArrayList;
import java.util.List;/*** @author 31380* @description* @create 2024/3/16 18:39*/
public class CuratorUtils {/*** 创建连接** @param connectionString  连接地址* @param sessionTimeout    会话超时时间* @param connectionTimeout 连接超时时间* @return*/public static CuratorFramework createCuratorFramework(String connectionString, int sessionTimeout, int connectionTimeout) {return CuratorFrameworkFactory.builder().connectString(connectionString).sessionTimeoutMs(sessionTimeout).retryPolicy(new ExponentialBackoffRetry(1000, 3)).connectionTimeoutMs(connectionTimeout).build();}/*** 创建连接** @param connectionString  连接地址* @param sessionTimeout    会话超时时间* @param connectionTimeout 连接超时时间* @param retryPolicy       重试策略* @return*/public static CuratorFramework createCuratorFrameworkWithRetry(String connectionString,int sessionTimeout,int connectionTimeout,RetryPolicy retryPolicy) {return CuratorFrameworkFactory.builder().connectString(connectionString).sessionTimeoutMs(sessionTimeout).connectionTimeoutMs(connectionTimeout).retryPolicy(retryPolicy).build();}/*** 创建一个隔离的命名空间*/public static CuratorFramework createNamespaceCuratorFramework(String connectionString, int sessionTimeout, int connectionTimeout, String namespace) {return CuratorFrameworkFactory.builder().connectString(connectionString).sessionTimeoutMs(sessionTimeout).connectionTimeoutMs(connectionTimeout).namespace(namespace).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();}/*** ZooDefs.Perms.READ:读权限* ZooDefs.Perms.WRITE:写权限* ZooDefs.Perms.CREATE:创建子节点权限* ZooDefs.Perms.DELETE:删除权限* ZooDefs.Perms.ADMIN:管理权限* ZooDefs.Perms.ALL:所有权限* 以下是一些常用的身份验证方案:* Ids.ANYONE_ID_UNSAFE:表示任何人都可以访问* Ids.AUTH_IDS:表示使用已验证的用户身份* Ids.OPEN_ACL_UNSAFE:表示开放的ACL,任何人都可以访问*  ACL acl = new ACL(ZooDefs.Perms.READ, new Id("myUser", "myPassword"));* @return*/public static List<ACL> getAclList() {ArrayList<ACL> acls = new ArrayList<>();// 权限设置ACL acl = new ACL(ZooDefs.Perms.ALL, ZooDefs.Ids.ANYONE_ID_UNSAFE);// 添加权限acls.add(acl);return acls;}}

3.2 基本使用增删改查

  • 新增
package com.shu.base;import com.shu.CuratorUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.CreateMode;import java.util.List;/*** @author 31380* @description* @create 2024/3/16 18:43*/
public class CuratorCreatTest {/*** 总结:curator创建节点方法* 1.创建节点,如果节点已经存在则抛出异常 create().forPath()* 2.withMode():节点类型: CreateMode.EPHEMERAL 临时节点,CreateMode.PERSISTENT 永久节点* 3.递归创建节点 creatingParentsIfNeeded()* 4.查询所有子节点 getChildren().forPath()* 5.删除节点 delete().forPath()* 6.判断节点是否存在 checkExists().forPath()* 7.关闭连接 close()* @param args* @throws Exception*/public static void main(String[] args) throws Exception {CuratorFramework curatorClint = CuratorUtils.createCuratorFramework("127.0.0.1:2181", 1000, 1000);System.out.println("连接成功!");curatorClint.start();// 创建节点,如果节点已经存在则抛出异常try {curatorClint.create().forPath("/test");} catch (Exception e) {System.out.println("创建节点失败!"+e.getMessage());}// 删除节点try {curatorClint.delete().forPath("/test");System.out.println("删除节点成功!");} catch (Exception e) {System.out.println("删除节点失败!"+e.getMessage());}/*** 临时节点(EPHEMERAL):临时创建的,会话结束节点自动被删除,也可以手动删除,临时节点不能拥有子节点.* 持久节点(PERSISTENT):创建后永久存在,除非主动删除。*/// 临时节点,当会话结束后,节点自动删除curatorClint.create().withMode(CreateMode.EPHEMERAL).forPath("/secondPath", "hello,word".getBytes());System.out.println("临时节点:"+new String(curatorClint.getData().forPath("/secondPath")));// 永久节点curatorClint.create().withMode(CreateMode.PERSISTENT).forPath("/thirdPath", "hello,word".getBytes());System.out.println("永久节点:"+new String(curatorClint.getData().forPath("/thirdPath")));// 递归创建节点curatorClint.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/parent/child", "hello,word".getBytes());System.out.println("递归创建节点:"+new String(curatorClint.getData().forPath("/parent/child")));// 查询所有子节点List<String> list= curatorClint.getChildren().forPath("/");System.out.println(list);// 关闭连接curatorClint.close();}
}
  • 读取
package com.shu.base;import com.shu.CuratorUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.data.Stat;import java.util.Date;/*** @author 31380* @description 读取节点数据* @create 2024/3/17 11:28*/
public class CuratorReadTest {/*** 总结:* 1. 读取单个节点数据:curatorClint.getData().forPath("/base/test")* 2. 读取多个节点数据:curatorClint.getChildren().forPath("/test").forEach(System.out::println)* 3. 读取节点数据并获取 stat:curatorClint.getData().storingStatIn(stat).forPath("/base/test")* 4:Stat:节点状态,包含节点的版本、数据长度、子节点数量、创建时间、修改时间、最近一次修改的事务 ID、数据版本、ACL 版本、临时节点* @param args*/public static void main(String[] args) {// 地址String connectString = "127.0.0.1:2181"; // 确保连接字符串正确CuratorFramework curatorClint = CuratorUtils.createCuratorFramework(connectString, 1000, 1000);System.out.println("连接成功!");curatorClint.start();// 读取单个节点数据try {byte[] bytes = curatorClint.getData().forPath("/base/test");System.out.println(new String(bytes));System.out.println("读取节点数据成功!");} catch (Exception e) {System.out.println("读取节点数据失败!"+e.getMessage());}// 读取多个节点数据try {curatorClint.getChildren().forPath("/test").forEach(System.out::println);System.out.println("读取多个节点数据成功!");} catch (Exception e) {System.out.println("读取多个节点数据失败!"+e.getMessage());}try {Stat stat = new Stat();byte[] data = curatorClint.getData().storingStatIn(stat).forPath("/base/test");String dataString = new String(data);System.out.println("节点数据:" + dataString);System.out.println("节点状态:");System.out.println("  节点创建版本:" + stat.getCversion());System.out.println("  数据长度:" + stat.getDataLength());System.out.println("  子节点数量:" + stat.getNumChildren());System.out.println("  创建时间:" + new Date(stat.getCtime()));System.out.println("  修改时间:" + new Date(stat.getMtime()));System.out.println("  最近一次修改的事务 ID:" + stat.getMzxid());System.out.println("  数据版本:" + stat.getVersion());System.out.println("  ACL 版本:" + stat.getAversion());System.out.println("  临时节点:" + stat.getEphemeralOwner());System.out.println("读取节点数据并获取 stat 成功!");} catch (Exception e) {System.out.println("读取节点数据并获取 stat 失败:" + e.getMessage());}// 关闭连接curatorClint.close();}
}
  • 删除
package com.shu.base;import com.shu.CuratorUtils;
import org.apache.curator.framework.CuratorFramework;/*** @author 31380* @description* @create 2024/3/16 19:25*/
public class CuratorDeleteTest {/*** 总结:* 1. 删除节点:delete().forPath("/test")* 2. 如果存在子节点,删除子节点:delete().deletingChildrenIfNeeded().forPath("/parent")* 3. 递归删除节点:delete().deletingChildrenIfNeeded().forPath("/secondPath")* 4. 判断节点是否存在:checkExists().forPath("/secondPath")* 5. 关闭连接:close()* @param args* @throws Exception*/public static void main(String[] args) throws Exception {CuratorFramework curatorClint = CuratorUtils.createCuratorFramework("127.0.0.1:2181", 1000, 1000);System.out.println("连接成功!");curatorClint.start();// 删除节点try {curatorClint.delete().forPath("/test");System.out.println("删除节点成功!");} catch (Exception e) {System.out.println("删除节点失败!"+e.getMessage());}// 如果存在子节点,删除子节点try {curatorClint.delete().deletingChildrenIfNeeded().forPath("/parent");System.out.println("删除节点成功!");} catch (Exception e) {System.out.println("删除节点失败!"+e.getMessage());}// 递归删除节点curatorClint.delete().deletingChildrenIfNeeded().forPath("/secondPath");// 判断节点是否存在if (curatorClint.checkExists().forPath("/secondPath") == null) {System.out.println("节点不存在!");} else {System.out.println("节点存在!");}// 关闭连接curatorClint.close();}
}
  • 修改
package com.shu.base;import com.shu.CuratorUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.data.Stat;/*** @author 31380* @description* @create 2024/3/17 11:35*/
public class CuratorUpdateTest {/*** 总计* 1. 更新节点:setData().forPath("/test", "hello,word".getBytes())* 2. 指定版本更新节点:setData().withVersion(1).forPath("/test", "hello,word".getBytes())* @param args*/public static void main(String[] args) throws Exception {// 地址String connectString = "127.0.0.1:2181";//创建节点CuratorFramework curatorClint = CuratorUtils.createCuratorFramework(connectString, 1000, 1000);System.out.println("连接成功!");curatorClint.start();// 更新节点try {curatorClint.setData().forPath("/base/test", "hello,word1111".getBytes());// 获取节点数据byte[] bytes = curatorClint.getData().forPath("/base/test");System.out.println(new String(bytes));System.out.println("更新节点成功!");} catch (Exception e) {System.out.println("更新节点失败!"+e.getMessage());}// 先获取节点的版本号Stat stat = new Stat();byte[] data = curatorClint.getData().storingStatIn(stat).forPath("/base/test");String dataString = new String(data);System.out.println("节点数据:" + dataString);System.out.println("节点状态:");System.out.println("  数据版本:" + stat.getVersion());// 指定版本更新节点:CAS 机制try {curatorClint.setData().withVersion(stat.getVersion()).forPath("/base/test", "hello,word2222".getBytes());// 获取节点数据byte[] bytes = curatorClint.getData().forPath("/base/test");System.out.println(new String(bytes));System.out.println("更新节点成功!");} catch (Exception e) {System.out.println("更新节点失败!"+e.getMessage());}}
}
  • 异步创建
package com.shu.base;import com.shu.CuratorUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.CreateMode;import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** @author 31380* @description Curator异步操作* @create 2024/3/17 11:42*/
public class CuratorAyncTest {/*** 总结:* 1 异步操作:inBackground()* 2.创建节点,如果节点已经存在则抛出异常 create().forPath()* 3.递归创建节点 creatingParentsIfNeeded()* @param args* @throws Exception*/public static void main(String[] args) throws Exception {// 地址String connectString = "127.0.0.1:2181";//创建节点CuratorFramework curatorClint = CuratorUtils.createCuratorFramework(connectString, 1000, 1000);System.out.println("连接成功!");curatorClint.start();CountDownLatch cdl = new CountDownLatch(2);ExecutorService executorService = Executors.newFixedThreadPool(2);curatorClint.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground((client, event) -> {System.out.println("Code:" + event.getResultCode());System.out.println("Type:" + event.getType());System.out.println("Path:" + event.getPath());cdl.countDown();}, executorService).forPath("/test1", "hello,word".getBytes());curatorClint.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground((client, event) -> {System.out.println("Code:" + event.getResultCode());System.out.println("Type:" + event.getType());System.out.println("Path:" + event.getPath());cdl.countDown();}).forPath("/test2", "hello,word".getBytes());cdl.await();executorService.shutdown();curatorClint.close();}/*** 事件类型* CREATE, // 创建* DELETE, // 删除* EXISTS, // 存在* GET_DATA, // 获取数据* SET_DATA, // 设置数据* CHILDREN, // 子节点* SYNC, // 同步* GET_ACL, // 获取ACL* SET_ACL, // 设置ACL* TRANSACTION, // 事务* GET_CONFIG, // 获取配置* RECONFIG, // 重新配置* WATCHED, // 监听* REMOVE_WATCHES, // 移除监听* CLOSING; // 关闭* @param args*//*** 响应码* OK(0), // OK* CONNECTIONLOSS(-4), // 连接丢失* MARSHALLINGERROR(-7), // 编组错误* UNIMPLEMENTED(-9), // 未实现* OPERATIONTIMEOUT(-10), // 操作超时* BADARGUMENTS(-8), // 错误参数* APIERROR(-100), // API错误* NONODE(-101), // 无节点·*/}
  • 不同的顺序节点
package com.shu.base;import com.shu.CuratorUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.CreateMode;/*** @author 31380* @description* @create 2024/3/17 11:03*/
public class CuratorSEQCreat {/*** 临时顺序节点(EPHEMERAL_SEQUENTIAL):具有临时节点特征,但是它会有序列号。* 持久顺序节点(PERSISTENT_SEQUENTIAL):具有持久节点特征,但是它会有序列号。* @param args*/public static void main(String[] args) {CuratorFramework curatorClint = CuratorUtils.createCuratorFramework("127.0.0.1:2181", 1000, 1000);System.out.println("连接成功!");curatorClint.start();// 创建一个持久顺序节点A-1,A-2,A-3try {curatorClint.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/test/A", "hello,word".getBytes());curatorClint.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/test/A", "hello,word".getBytes());curatorClint.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/test/A", "hello,word".getBytes());System.out.println("创建节点成功!");} catch (Exception e) {System.out.println("创建节点失败!"+e.getMessage());}// 创建一个临时顺序节点B-1,B-2,B-3try {curatorClint.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/test/B", "hello,word".getBytes());curatorClint.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/test/B", "hello,word".getBytes());curatorClint.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/test/B", "hello,word".getBytes());System.out.println("创建节点成功!");} catch (Exception e) {System.out.println("创建节点失败!"+e.getMessage());}// 关闭连接curatorClint.close();}
}
  • 事务
package com.shu.base;import com.shu.CuratorUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.transaction.CuratorTransactionResult;import java.util.Collection;/*** @author 31380* @description TODO* @create 2024/3/16 19:28*/
public class CuratorTransactionTest {public static void main(String[] args) throws Exception {CuratorFramework curatorClint = CuratorUtils.createCuratorFramework("127.0.0.1:2181", 1000, 1000);System.out.println("连接成功!");curatorClint.start();Collection<CuratorTransactionResult> commit = curatorClint.inTransaction().create().forPath("/xiao", "456".getBytes()).and().setData().forPath("/xiao", "123".getBytes()).and().commit();for (CuratorTransactionResult result : commit) {System.out.println(result.getForPath() + "--->" + result.getType());}curatorClint.close();}
}

3.3 ACL权限控制

package com.shu.acl;import com.shu.CuratorUtils;
import org.apache.curator.framework.CuratorFramework;/*** @author 31380* @description* @create 2024/3/16 19:56*/
public class CuratorAclTest {public static void main(String[] args) {CuratorFramework curatorClint = CuratorUtils.createCuratorFramework("127.0.0.1:2181", 1000, 1000);System.out.println("连接成功!");curatorClint.start();// 创建节点,ACL为ip:try {curatorClint.create().withACL(CuratorUtils.getAclList()).forPath("/test");System.out.println("创建节点成功!");} catch (Exception e) {System.out.println("创建节点失败!"+e.getMessage());}}
}/*** @description* @author 31380* @create 2024/3/17 11:12* Schema 代表权限控制模式,分别为:* ● World 任何人* ● Auth 不需要ID* ● Digest 用户名和密码方式的认证* ● IP Address IP地址方式的认证* perms(权限),ZooKeeper支持如下权限* ● CREATE: 创建子节点* ● READ: 获取子节点与自身节点的数据信息* ● WRITE:在Znode节点上写数据* ● DELETE:删除子节点* ● ADMIN:设置ACL权限* ————————————————*/
package com.shu.acl;

3.4 分布式锁

package com.shu.lock;import com.shu.CuratorUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;import java.text.SimpleDateFormat;
import java.util.concurrent.CountDownLatch;/*** @author 31380* @description 分布式锁* @create 2024/3/17 13:12*/
public class LockTest {/*** 分布式锁:InterProcessMutex* 1: 获取锁,acquire()* 2: 释放锁,release()* 3: 创建 InterProcessMutex 对象* 4: 调用 acquire() 方法获取锁* 5: 业务操作* 6: 调用 release() 方法释放锁* @param args*/public static void main(String[] args) {CountDownLatch latch = new CountDownLatch(10);String connectString = "127.0.0.1:2181";String lockPath = "/lock";CuratorFramework curatorFramework = CuratorUtils.createCuratorFramework(connectString, 1000, 1000);curatorFramework.start();InterProcessMutex lock = new InterProcessMutex(curatorFramework, lockPath);for (int i = 0; i < 10; i++) {new Thread(() -> {try {latch.await();lock.acquire();System.out.println(Thread.currentThread().getName() + "获取到锁");// 模拟业务操作,生成订单号Thread.sleep(1000);SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS");String orderNo = sdf.format(System.currentTimeMillis());System.out.println("生成的订单号:" + orderNo);} catch (Exception e) {e.printStackTrace();} finally {try {lock.release();} catch (Exception e) {e.printStackTrace();}}}, "Thread-" + i).start();latch.countDown();}}
}

3.5 分布式计数器

package com.shu.lock;import com.shu.CuratorUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.atomic.AtomicValue;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicInteger;/*** @author 31380* @description 分布式计数器* @create 2024/3/17 13:20*/
public class RecipeDisAtomicIntTest {/*** 分布式计数器:DistributedAtomicInteger* 1、创建DistributedAtomicInteger对象* 2、调用add方法* 3、获取当前值** @param args*/public static void main(String[] args) {String connectString = "127.0.0.1:2181";String connectString2 = "127.0.0.1:2182";String connectString3 = "127.0.0.1:2183";CuratorFramework curatorFramework = CuratorUtils.createCuratorFramework(connectString, 1000, 1000);curatorFramework.start();DistributedAtomicInteger atomicInteger = new DistributedAtomicInteger(curatorFramework, "/atomic", null);try {AtomicValue<Integer> added = atomicInteger.add(1);System.out.println("1Result: " + added.succeeded());// 获取当前值System.out.println("2Result: " + added.postValue());} catch (Exception e) {throw new RuntimeException(e);}// 客户端2CuratorFramework curatorFramework2 = CuratorUtils.createCuratorFramework(connectString2, 1000, 1000);curatorFramework2.start();DistributedAtomicInteger atomicInteger2 = new DistributedAtomicInteger(curatorFramework2, "/atomic", null);try {AtomicValue<Integer> added = atomicInteger2.add(1);System.out.println("2Result: " + added.succeeded());// 获取当前值System.out.println("2Result: " + added.postValue());} catch (Exception e) {throw new RuntimeException(e);}// 客户端3CuratorFramework curatorFramework3 = CuratorUtils.createCuratorFramework(connectString3, 1000, 1000);curatorFramework3.start();DistributedAtomicInteger atomicInteger3 = new DistributedAtomicInteger(curatorFramework3, "/atomic", null);try {AtomicValue<Integer> added = atomicInteger3.add(1);System.out.println("3Result: " + added.succeeded());// 获取当前值System.out.println("3Result: " + added.postValue());} catch (Exception e) {throw new RuntimeException(e);}}}

3.6 分布式Barrier

package com.shu.lock;import com.shu.CuratorUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.barriers.DistributedBarrier;/*** @author 31380* @description* 分布式Barrier:分布式 Barrier 是一种常见的同步原语,用于在分布式系统中协调多个进程或线程的执行顺序。* 它可以用来实现诸如等待直到所有参与者都准备好,然后一起执行某项任务,或者等待直到某些条件达成后再继续执行的场景。* @create 2024/3/17 13:27*/
public class CycliBarrierTest {static DistributedBarrier barrier;public static void main(String[] args) {String connectString = "127.0.0.1:2181";String path="/barrier";CuratorFramework curatorFramework = CuratorUtils.createCuratorFramework(connectString, 1000, 1000);curatorFramework.start();// 等待所有的线程到达barrier 10个线程for (int i = 0; i < 10; i++) {new Thread(() -> {try {barrier = new DistributedBarrier(curatorFramework, path);System.out.println(Thread.currentThread().getName() + "号barrier设置");barrier.setBarrier();barrier.waitOnBarrier();System.out.println("启动...");} catch (Exception e) {e.printStackTrace();}}, "Thread-" + i).start();}try {Thread.sleep(2000);barrier.removeBarrier();} catch (Exception e) {e.printStackTrace();}}
}

3.7 主从节点选举

package com.shu.master;import com.shu.CuratorUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;/*** @author 31380* @description 主节点选举* @create 2024/3/17 13:03*/
public class MasterSelectTest {/*** 主节点选举:LeaderSelector* 1、创建LeaderSelector对象* 2、调用start方法* 3、添加监听器* 4、关闭连接* @param args*/public static void main(String[] args) {// 地址String connectString = "127.0.0.1:2181";String connectString2 = "127.0.0.1:2182";String connectString3 = "127.0.0.1:2183";// 创建并连接 CuratorFramework 实例CuratorFramework curatorFramework1 = CuratorUtils.createCuratorFramework(connectString, 1000, 1000);CuratorFramework curatorFramework2 = CuratorUtils.createCuratorFramework(connectString2, 1000, 1000);CuratorFramework curatorFramework3 = CuratorUtils.createCuratorFramework(connectString3, 1000, 1000);curatorFramework1.start();curatorFramework2.start();curatorFramework3.start();// 第一个节点LeaderSelector leaderSelector1 = new LeaderSelector(curatorFramework1, "/master1", new LeaderSelectorListenerAdapter() {@Overridepublic void takeLeadership(CuratorFramework curatorFramework) throws Exception {System.out.println("节点1成为master节点");Thread.sleep(10000);System.out.println("节点1完成master操作,释放master权利");}});leaderSelector1.autoRequeue();leaderSelector1.start();// 第二个节点LeaderSelector leaderSelector2 = new LeaderSelector(curatorFramework2, "/master1", new LeaderSelectorListenerAdapter() {@Overridepublic void takeLeadership(CuratorFramework curatorFramework) throws Exception {System.out.println("节点2成为master节点");Thread.sleep(10000);System.out.println("节点2完成master操作,释放master权利");}});leaderSelector2.autoRequeue();leaderSelector2.start();// 第三个节点LeaderSelector leaderSelector3 = new LeaderSelector(curatorFramework3, "/master1", new LeaderSelectorListenerAdapter() {@Overridepublic void takeLeadership(CuratorFramework curatorFramework) throws Exception {System.out.println("节点3成为master节点");Thread.sleep(10000);System.out.println("节点3完成master操作,释放master权利");}});leaderSelector3.autoRequeue();leaderSelector3.start();try {Thread.sleep(Integer.MAX_VALUE);} catch (InterruptedException e) {e.printStackTrace();}}}

3.8 NodeCache监听

package com.shu.watch;import com.shu.CuratorUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;/*** @author 31380* @description* @create 2024/3/16 19:38*/
public class CuratorNodeCacheTest {/*** NodeCache:监听节点的新增、修改操作* 1、创建NodeCache对象* 2、调用start方法* 3、添加监听器* 4、关闭连接* @param args* @throws Exception*/public static void main(String[] args) throws Exception {String path = "/test";CuratorFramework curatorClint = CuratorUtils.createCuratorFramework("127.0.0.1:2181", 1000, 1000);System.out.println("连接成功!");curatorClint.start();final NodeCache nodeCache = new NodeCache(curatorClint, path);nodeCache.start();nodeCache.getListenable().addListener(new NodeCacheListener() {@Overridepublic void nodeChanged() throws Exception {System.out.println("监听事件触发");System.out.println("重新获得节点内容为:" + new String(nodeCache.getCurrentData().getData()));}});curatorClint.setData().forPath(path,"456".getBytes());curatorClint.setData().forPath(path,"789".getBytes());curatorClint.setData().forPath(path,"123".getBytes());curatorClint.setData().forPath(path,"222".getBytes());curatorClint.setData().forPath(path,"333".getBytes());curatorClint.setData().forPath(path,"444".getBytes());Thread.sleep(15000);}
}

3.9 PathChildrenCache监听

package com.shu.watch;import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;/*** @author 31380* @description* @create 2024/3/16 19:43*/
public class CuratorPathChildrenCacheTest {/*** PathChildrenCache:监听子节点的新增、修改、删除操作* @param args* @throws Exception*/public static void main(String[] args) throws Exception {CuratorFramework client = getClient();String parentPath = "/p1";PathChildrenCache pathChildrenCache = new PathChildrenCache(client,parentPath,false);/* * StartMode:初始化方式* POST_INITIALIZED_EVENT:异步初始化。初始化后会触发事件* NORMAL:异步初始化* BUILD_INITIAL_CACHE:同步初始化* */pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {@Overridepublic void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {System.out.println("事件类型:"  + event.getType() + ";操作节点:" + event.getData().getPath());switch(event.getType()){case CHILD_ADDED:System.out.println("新增子节点:" + event.getData().getPath());break;case CHILD_UPDATED:System.out.println("更新子节点:" + event.getData().getPath());break;case CHILD_REMOVED:System.out.println("删除子节点:" + event.getData().getPath());break;default:break;}}});String path = "/p1/c1";client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path);Thread.sleep(1000); // 此处需留意,如果没有现成睡眠则无法触发监听事件client.delete().forPath(path);Thread.sleep(15000);}private static CuratorFramework getClient(){RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181").retryPolicy(retryPolicy).sessionTimeoutMs(6000).connectionTimeoutMs(3000).namespace("demo").build();client.start();return client;}
}

3.10 TreeCache监听

package com.shu.watch;import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;/*** @author 31380* @description* @create 2024/3/16 19:44*/
public class CuratorWatcher3 {private static final String CONNECT_ADDR = "127.0.0.1:2181";private static final int SESSION_TIMEOUT = 5000;public static void main(String[] args) throws Exception {RetryPolicy policy = new ExponentialBackoffRetry(1000, 10);CuratorFramework curator = CuratorFrameworkFactory.builder().connectString(CONNECT_ADDR).sessionTimeoutMs(SESSION_TIMEOUT).retryPolicy(policy).build();curator.start();TreeCache treeCache = new TreeCache(curator, "/treeCache");treeCache.start();treeCache.getListenable().addListener((curatorFramework, treeCacheEvent) -> {switch (treeCacheEvent.getType()) {case NODE_ADDED:System.out.println("NODE_ADDED:路径:" + treeCacheEvent.getData().getPath() + ",数据:" + new String(treeCacheEvent.getData().getData())+ ",状态:" + treeCacheEvent.getData().getStat());break;case NODE_UPDATED:System.out.println("NODE_UPDATED:路径:" + treeCacheEvent.getData().getPath() + ",数据:" + new String(treeCacheEvent.getData().getData())+ ",状态:" + treeCacheEvent.getData().getStat());break;case NODE_REMOVED:System.out.println("NODE_REMOVED:路径:" + treeCacheEvent.getData().getPath() + ",数据:" + new String(treeCacheEvent.getData().getData())+ ",状态:" + treeCacheEvent.getData().getStat());break;default:break;}});curator.create().forPath("/treeCache", "123".getBytes());curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/treeCache/c1", "456".getBytes());curator.setData().forPath("/treeCache", "789".getBytes());curator.setData().forPath("/treeCache/c1", "910".getBytes());curator.delete().forPath("/treeCache/c1");curator.delete().forPath("/treeCache");Thread.sleep(5000);curator.close();}
}

详细介绍参考书籍《从Paxos到Zookeeper:分布式一致性原理与实践》

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/757669.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

婴儿洗衣机硬核测评:希亦、鲸立、小吉婴儿洗衣机性能大比拼!

如果你非常注重婴儿衣物的卫生问题&#xff0c;那么婴儿洗衣机则是非常理想的选择。毕竟&#xff0c;在婴儿吃奶或者接触其他材料时&#xff0c;其抵抗力是比较弱的&#xff0c;再加上普通洗衣机无法对婴儿的衣物进行有效的消毒处理&#xff0c;轻则会对婴儿的健康造成威胁&…

基于 HBase Phoenix 构建实时数仓(5)—— 用 Kafka Connect 做实时数据同步

目录 一、总体架构 二、安装配置 MySQL 1. 创建 mysql 用户 2. 建立 MySQL 使用的目录 3. 解压安装包 4. 配置环境变量 5. 创建 MySQL 配置文件 6. MySQL 系统初始化 7. 启动 mysql 服务器 8. 创建 dba 用户 三、配置 MySQL 主从复制 四、安装部署 Kafka Connector…

23.python标准库之turtle库

一、窗体函数 turtle.setup(width, height, startx, starty) width:窗口宽度 height:窗口高度 startx:窗口与屏幕左侧距离&#xff08;单位象素&#xff09; starty:窗口与屏幕顶部距离&#xff08;单位象素&#xff09; 二、画笔状态函数 三、画笔运动函数

Java字符串精通之旅:从新手到专家

目录 一、字符串的创建 1.直接赋值 2.使用构造方法 二、字符串不可变性 三、常用操作 1.字符串长度 2.连接字符串 3.格式化字符串 四、示例代码&#xff1a;String类应用 五、String中常用的方法 在Java编程世界里&#xff0c;字符串无疑是最常用的数据类型之一。不论…

想要把PDF文件转TXT文本编辑改动怎么办?三秒钟帮你搞定 PDF编辑器

pdf是一种便携文件格式&#xff0c;是由Adobe公司所开发的独特的跨平台文件格式。PDF文件以PostScript语言图象模型为基础&#xff0c;无论在哪种打印机上都可保证精确的颜色和准确的打印效果&#xff0c;即PDF会忠实地再现原稿的每一个字符、颜色以及图象。有点遗憾的是&#…

【好用】Star超36.8k,一个的免费通用数据库管理工具

关于数据库管理工具&#xff0c;大家可能都在用SQLyog、Navicat、MySQL-Front、SQL Studio、MySQL Workbench等等&#xff0c;这些管理工具不是不好用&#xff0c;就是要变魔术才可以用&#xff0c;今天 V 哥给大家推荐一个即好用&#xff0c;又免费的可视化通用数据库管理工具…

Git 删除.git 目录

Git 删除.git 目录 如上图&#xff0c;删除项目中的.git目录 ①在windows系统上&#xff0c;进入该项目的根目录 ②在根目录下打开Git bash ③使用rm -rf .git命令递归删除.git目录 rm -rf .git在删除后想重新初始化&#xff0c;可以参考下面链接里的内容 Git初始化及远程推送…

腾讯云优惠券领取的几种方法,助你降低云服务成本

腾讯云优惠券领取的几种方法&#xff0c;助你降低云服务成本 一、最新优惠卷二、最新活动 腾讯云—腾讯倾力打造的云计算品牌&#xff0c;以卓越科技能力助力各行各业数字化转型&#xff0c;为全球客户提供领先的云计算、大数据、人工智能服务&#xff0c;以及定制化行业解决方…

刷题日记:面试经典 150 题 DAY6

刷题日记&#xff1a;面试经典 150 题 DAY6 392. 判断子序列167. 两数之和 II - 输入有序数组11. 盛最多水的容器15. 三数之和209. 长度最小的子数组 392. 判断子序列 原题链接 392. 判断子序列 双指针&#xff0c;i指向s&#xff0c;j指向t 如果s[i]t[j]&#xff0c;则匹配…

JDK1.8超详细安装教程

1、下载jdk1.8 大家可以直接去百度云盘下载&#xff1a; 链接&#xff1a;https://pan.baidu.com/s/187N6CU9Gu4bjtOz5_cjd-A?pwd3535 提取码&#xff1a;35352、开始安装 双击下载好的.exe文件&#xff0c;点击下一步 修改安装路径&#xff0c;点击下一步 会顺带安装jre…

107 在携带请求体的情况下, hutool 将 get 请求转换为了 post 请求

前言 本问题主要是来自于同事 情况大致如下, 同样的代码 一个是测试用例, 一个是生产环境的应用, 访问同一个第三方服务, 参数什么的完全一致 但是 出现的问题就是 测试用例能够拿到正确的对方的响应, 但是 生产环境的应用 却是拿到的对方的报错 然后 我开始以为是 是否…

【CKA模拟题】学会JSONPath,精准定位Pod信息!

题干 For this question, please set this context (In exam, diff cluster name) kubectl config use-context kubernetes-adminkubernetesyou have a script named pod-filter.sh . Update this script to include a command that filters and displays the label with the…

安卓Android入门

安卓作为日常生活中不可缺少的移动操作系统&#xff0c;在5G的发展和应用过程中发挥着其重要的作用。 5G是第五代移动通信技术&#xff0c;拥有更快的速度、更高的带宽、更低的延迟和更大的连接密度。这一技术的快速发展为移动设备和应用提供了更多的可能性。 安卓和5G的关系…

Qt 多元素控件

Qt开发 多元素控件 Qt 中提供的多元素控件有: QListWidgetQListViewQTableWidgetQTableViewQTreeWidgetQTreeView xxWidget 和 xxView 之间的区别 以 QTableWidget 和 QTableView 为例. QTableView 是基于 MVC 设计的控件. QTableView 自身不持有数据. 使用QTableView 的 …

OSPF特殊区域(stub\nssa)

stub区域——只有1类、2类、3类&#xff1b;完全stub区域——只有1类、2类 NSSA区域&#xff1a;本区域将自己引入的外部路由发布给其他区域&#xff0c;但不需要接收其他区域的路由 在NSSA区域的路由器上&#xff0c;引入外部路由时&#xff0c;不会转换成5类LSA&#xff0c…

HarmonyOS系统开发ArkTS常用组件切换按钮及参数

Toggle为切换按钮组件&#xff0c;一般用于两种状态之间的切换&#xff0c;例如下图中的蓝牙开关。 Toggle组件的参数&#xff1a;Toggle(options: { type: ToggleType, isOn?: boolean }) type属性用于设置Toggle组件的类型isOn属性用于设置Toggle组件的状态selectedColor()…

51-31 CVPR’24 | VastGaussian,3D高斯大型场景重建

2024 年 2 月&#xff0c;清华大学、华为和中科院联合发布的 VastGaussian 模型&#xff0c;实现了基于 3D Gaussian Splatting 进行大型场景高保真重建和实时渲染。 Abstract 现有基于NeRF大型场景重建方法&#xff0c;往往在视觉质量和渲染速度方面存在局限性。虽然最近 3D…

docker入门(四)—— docker常用命令详解

docker 常用命令 基本命令 # 查看 docker 版本 docker version # 查看一些 docker 的详细信息 docker info 帮助命令&#xff08;–help&#xff09;&#xff0c;linux必须要会看帮助文档 docker --help[rootiZbp15293q8kgzhur7n6kvZ /]# docker --helpUsage: docker [OPTI…

【C语言】结构体类型名、变量名以及typedef

文章目录 分类判断结构体成员的使用typedef 分类判断 struct tag {char m;int i; }p;假设定义了上面这一个结构体&#xff0c;tag 就是类型名&#xff0c; p 就是变量名&#xff0c; m 和 i 就是结构体成员列表。 可以这么记&#xff0c;括号前面的是类型名&#xff0c;括号后…

【vue核心技术实战精讲】1.1 Vue开篇介绍 + 1.2 Vue的起步 和 插值表达式

文章目录 准备开始适应人群vue 框架学习路线一、vue 基础1、历史介绍2、前端框架与库的区别? 二、vue的起步 和 插值表达式Stage 1&#xff1a;下载包&#xff0c;并放入项目中Stage 2&#xff1a;编码Stage 3&#xff1a;源码 与 效果 准备开始 适应人群 有一定的HTML/CSS/…