ZooKeeper 客户端API操作

文章目录

  • 一、节点信息
    • 1、创建节点
    • 2、获取子节点并监听节点变化
    • 3、判断节点是否存在
    • 4、客户端向服务端写入数据
      • 写入请求直接发给 Leader 节点
      • 写入请求直接发给 follow 节点
  • 二、服务器动态上下线监听
    • 1、监听过程
    • 2、代码
  • 三、分布式锁
    • 1、什么是分布式锁?
    • 2、Curator 框架实现分布式锁

一、节点信息

前提:centos102、centos103、centos104 服务器都已经开启

pom.xml 依赖

<dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.13.2</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>2.17.1</version></dependency><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.5.7</version></dependency><dependency><groupId>org.junit.jupiter</groupId><artifactId>junit-jupiter</artifactId><version>RELEASE</version><scope>compile</scope></dependency>
</dependencies>

log4j.properties 配置

# 设置全局的日志记录级别为 INFO
log4j.rootLogger=INFO, stdout# 控制台输出
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n# 文件输出
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

1、创建节点

zkClient.java 代码

// 注意:逗号后面不能有空格
private String connectString = "centos102:2181,centos103:2181,centos104:2181";
private int sessionTimeout = 2000;
private ZooKeeper zkClient;// 创建客户端
@Before
public void init() throws IOException {zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent event) {}});
}// 创建子节点
@Test
public void create() throws InterruptedException, KeeperException {String nodeCreated = zkClient.create("/frost", "cat".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}

运行创建子节点,看看是否创建了该节点

在这里插入图片描述

2、获取子节点并监听节点变化

@Test
public void getChildren() throws InterruptedException, KeeperException {List<String> children = zkClient.getChildren("/", true);for (String child : children) {System.out.println(child);}
}

那如果此时我再创建一个节点,此时控制台没有任何变化,我想要创建一个节点控制台能够看到相关变化怎么办?此时只需要将程序保持不结束,然后将客户端查看子节点函数放入监听器中。

3、判断节点是否存在

@Test
public void exit() throws InterruptedException, KeeperException {Stat stat = zkClient.exists("/frost", false);System.out.println(stat == null ? "not exits" : "exits");
}

4、客户端向服务端写入数据

写入请求直接发给 Leader 节点

  1. 客户端发送写入请求,leader节点执行写入操作
  2. leader通知follow1执行写入操作
  3. folllow1写入完毕给leader返回确认ack
  4. 现在半数以上服务器完成写入,leader给客户端发送确认ack
  5. leader通知follow2写入
  6. follow2写入完毕给leader发送确认ack
    在这里插入图片描述

写入请求直接发给 follow 节点

  1. 客户端发送写入请求,
  2. follow1 将写入请求发送给leader
  3. leader节点执行写入操作,然后leader通知follow1执行写入操作
  4. folllow1写入完毕给leader返回确认ack
  5. 现在半数以上服务器完成写入,leader给follow1发送确认ack
  6. follow1给客户端发送确认ack
  7. leader通知follow2写入
  8. follow2写入完毕给leader发送确认ack

在这里插入图片描述

二、服务器动态上下线监听

1、监听过程

在这里插入图片描述

以下红色字体写错,应该是下线则通知注册监听器的客户端
在这里插入图片描述

对于ZooKeeper集群来说,客户端和服务器都相当于客户端,区别在于:服务器在ZooKeeper集群中是创建节点,客户端在ZooKeeper是监听信息。

在这里插入图片描述

2、代码

服务器注册到zk集群

import org.apache.zookeeper.*;
import java.io.IOException;public class DistributeServer {private String connectString = "centos102:2181,centos103:2181,centos104:2181";private int sessionTimeout = 2000;ZooKeeper zk;public static void main(String[] args) throws IOException, InterruptedException, KeeperException {DistributeServer server = new DistributeServer();// 1. 获取zk连接server.getConnect();// 2. 注册服务器到 zk 集群server.regist(args[0]);// 3. 启动业务逻辑(睡觉)server.business();}private void business() throws InterruptedException {Thread.sleep(Long.MAX_VALUE);}private void regist(String hostname) throws InterruptedException, KeeperException {String create = zk.create("/servers", hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); // 临时带序号的节点System.out.println(hostname + "is online");}private void getConnect() throws IOException {zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent event) {}});}
}

客户端进行监听

import org.apache.zookeeper.*;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;public class DistributeClient {private String connectString = "centos102:2181,centos103:2181,centos104:2181";private int sessionTimeout = 2000;ZooKeeper zk;public static void main(String[] args) throws IOException, InterruptedException, KeeperException {DistributeClient client = new DistributeClient();// 1. 获取zk连接client.getConnect();// 2. 监听/servers下子节点的增加和删除client.getServerList();// 3. 启动业务逻辑(睡觉)client.business();}private void business() throws InterruptedException {Thread.sleep(Long.MAX_VALUE);}private void getServerList() throws InterruptedException, KeeperException {List<String> children = zk.getChildren("/servers", true);ArrayList<String> servers = new ArrayList<>();for (String child : children) {byte[] data = zk.getData("/servers/" + child, false, null);servers.add(new String(data));}System.out.println(servers);}private void getConnect() throws IOException {zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent event) {try {getServerList();} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}}});}
}

启动客户端,然后在服务器上进行增加节点监听
在这里插入图片描述

删除节点监听
在这里插入图片描述

因为我们服务端的代码传参了,所以我们需要设置一下这个参数:
在这里插入图片描述

下图代表服务端启动的是hadoop102节点
在这里插入图片描述

先把客户端启动起来,发现有一个节点hadoop101:
在这里插入图片描述

在启动服务端,hadoop102上线:
在这里插入图片描述

然后返回看客户端的监听,发现节点有变化,打印出所有节点 [hadoop102, hadoop101]
在这里插入图片描述

此时我们修改一下再此启动服务端让 hadoop103 上线:
在这里插入图片描述

返回客户端查看发现 hadoop102 下线,hadoop103 上线
在这里插入图片描述

三、分布式锁

1、什么是分布式锁?

比如说"进程1"在使用该资源的时候,会先去获得锁,"进程1"获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源,"进程1"用完该资源以后就将锁释放掉,让其他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的访问该临界资源。那么我们把这个分布式环境下的这个锁叫作分布式锁。

在这里插入图片描述

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;public class DistributedLock {private final String connectString = "centos102:2181,centos103:2181,centos104:2181";private final int sessionTimeout = 2000;ZooKeeper zk;private CountDownLatch connectLatch = new CountDownLatch(1);private CountDownLatch waitLatch = new CountDownLatch(1);// 前一个节点private String waitPath;// 当前节点String currentMode;public DistributedLock() throws IOException, InterruptedException, KeeperException {// 1. 获取连接zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent event) {// connectLatch,如果连接上zk,可以释放if (event.getState() == Event.KeeperState.SyncConnected) {connectLatch.countDown();}// waitLatch,需要释放if (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) {waitLatch.countDown();}}});// 等待zk正常连接后往下走connectLatch.await();// 2. 判断根节点/lock是否存在Stat stat = zk.exists("/locks", false);if (stat == null) {// 创建根节点(永久节点)zk.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}}// 对 zk 加锁public void zkLock() {// 创建对应的临时带序号的节点try {currentMode = zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);// 判断创建的节点是否是最小的序号节点,如果是,获取到锁;如果不是,监听前一个节点List<String> children = zk.getChildren("/locks", false);// 如果 children 只有一个节点,直接获取锁;如果有多个节点,需要判断,谁最小if (children.size() == 1) {return;}else {// 排序Collections.sort(children);// 获取节点名称seq00000001String thisNode = currentMode.substring("/locks/".length());// 通过seq00000001获取该节点在children当中的位置int index = children.indexOf(thisNode);if (index == -1) {System.out.println("数据异常");}else if (index == 0) {// 该节点为第一个,获取锁直接返回return;}else {// 不是第一个,监听前一个节点waitPath = "/locks/" + children.get(index - 1);zk.getData(waitPath, true, null);// 等待监听结束waitLatch.await();return;}}} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}// 解锁public void unZkLock() throws InterruptedException, KeeperException {// 删除节点zk.delete(currentMode, -1);}
}

测试

import org.apache.zookeeper.KeeperException;import java.io.IOException;public class DistributedLockTest {public static void main(String[] args) throws IOException, InterruptedException, KeeperException {final DistributedLock lock1 = new DistributedLock();final DistributedLock lock2 = new DistributedLock();new Thread(new Runnable() {@Overridepublic void run() {try {lock1.zkLock();System.out.println("线程1启动,获取到锁");Thread.sleep(5000);lock1.unZkLock();System.out.println("线程1释放锁");} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}}}).start();new Thread(new Runnable() {@Overridepublic void run() {try {lock2.zkLock();System.out.println("线程2启动,获取到锁");Thread.sleep(5000);lock2.unZkLock();System.out.println("线程2释放锁");} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}}}).start();}}

2、Curator 框架实现分布式锁

原生JAVA API出现的问题:
(1)会话是异步的,需要自己去连接
(2)Watch需要重复注册,不然就不能生效
(3)开发的复杂性还是比较高
(4)不支持多节点的删除和创建,需要自己去递归

Curator 是一个专门解决分布式锁的框架,解决了原生JAVA API开发分布式遇到的的问题
curator 官方文档:https://curator.apache.org/index.html

pom.xml 文件添加依赖

<dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>4.3.0</version>
</dependency>
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>4.3.0</version>
</dependency>
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-client</artifactId><version>4.3.0</version>
</dependency>
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;public class CuratorLockTest {public static void main(String[] args) {// 创建分布式锁1InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");// 创建分布式锁2InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");new Thread(new Runnable() {@Overridepublic void run() {try {lock1.acquire();System.out.println("线程1获取到锁");lock1.acquire();System.out.println("线程1获取到锁");Thread.sleep(5 * 1000);lock1.release();System.out.println("线程1释放锁");lock1.release();System.out.println("线程1再次释放锁");} catch (Exception e) {e.printStackTrace();}}}).start();new Thread(new Runnable() {@Overridepublic void run() {try {lock2.acquire();System.out.println("线程2获取到锁");lock2.acquire();System.out.println("线程2获取到锁");Thread.sleep(5 * 1000);lock2.release();System.out.println("线程2释放锁");lock2.release();System.out.println("线程2再次释放锁");} catch (Exception e) {e.printStackTrace();}}}).start();}private static CuratorFramework getCuratorFramework() {ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);CuratorFramework client = CuratorFrameworkFactory.builder().connectString("centos102:2181,centos103:2181,centos104:2181").connectionTimeoutMs(2000).sessionTimeoutMs(2000).retryPolicy(policy).build();// 启动客户端client.start();System.out.println("zookeeper 启动成功");return client;}
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/58102.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

qt 滚动条 美化

qt QScrollBar 滚动条分为竖直与水平滚动条&#xff0c;两者设置上类似&#xff0c;但也有一些不同&#xff0c;下面主要讲述美化及注意事项。 一、竖直滚动条 竖直滚动条分为7个部分&#xff1a; sub-line、 up-arrow 、sub-page、 hanle、 add-line、 dow-arrow、 add-pag…

线性回归模型与检验 6个适用条件

当因变量与自变量间存在线性相关关系时&#xff0c;可以使用线性回归分析方法确定它们之间的相互依赖的定量关系。此处所说的定量关系&#xff0c;并非严格的因果关系&#xff0c;而是自变量X对因变量Y的影响或预测的作用。 例如分析广告费、产品单价、产品满意度、服务满意度…

说它是谁就是谁—Python语言中的鸭子类型

鸭子类型&#xff08;Duck Typing&#xff09;是动态类型语言中的一种类型推断风格&#xff0c;尤其在Python语言中得到了广泛的应用。它的核心思想是&#xff1a;“如果它走起路来像鸭子&#xff0c;叫起来像鸭子&#xff0c;那么它就是鸭子”。这句话的意思是&#xff0c;我们…

python_httpstat库

Python httpstat是一个基于Python的命令行工具&#xff0c;用于测量HTTP请求的性能和状态信息。它能够向目标服务器发送HTTP请求&#xff0c;并显示详细的统计信息&#xff0c;包括DNS解析时间、建立连接时间、TLS/SSL握手时间、首字节时间、总时间等。这些信息对于排查网络问题…

详解varint,zigzag编码, 以及在Go标准库中的实现

文章目录 为啥需要varint编码为啥需要zigzag编码varint编码解码 zigzag编码解码 局限性 为啥需要varint编码 当我们用定长数字类型int32来表示整数时&#xff0c;为了传输一个整数1&#xff0c;我们需要传输00000000 00000000 00000000 00000001 32 个 bits&#xff0c;而有价…

SQLite3库增删改查实现数据管理

1. SQLite3简介 SQLite3是一个轻量级的、嵌入式的关系型数据库管理系统&#xff0c;在保存测序数据或结果等时可使用&#xff0c;简单高效&#xff0c;并且有无需服务器、单文件存储数据、支持标准SQL、支持跨平台等优势。 本文以Sqlite3数据库为基础&#xff0c;创建代码示例…

tomcat基本配置

目录 1.java容器简介介绍 2.部署tomcat 2.1上传jdk 2.2创建一个软连接 2.3配置环境变量 2.4读取环境文件并且查看java版本 2.5检查jdk tomcat信息 2.6启动tomcat 2.7检测 3.tomcat 目录结构 3.1总体目录 3.2 bin目录 3.3conf 3.4 logs日志 4.运行代码 4.…

如何确保电子商务网站服务器的正常运行时间

对于电商网站而言&#xff0c;服务器的正常运行时间至关重要。网站宕机会直接影响销售额、客户体验以及品牌声誉。本文将详细探讨如何监控并保障服务器的正常运行时间&#xff0c;确保您的电商网站始终保持在线状态&#xff0c; 为什么监控正常运行时间很重要&#xff1f; 减…

【Oracle实验】字段为空的,无法通过排除判断

Oracle相关文档&#xff0c;希望互相学习&#xff0c;共同进步 风123456789&#xff5e;-CSDN博客 1.场景描述 需求&#xff1a;查询不是某个机构的数据。 同事SQL&#xff1a;where substr(bank_code,1,9) not in(014009001)&#xff1b; 看SQL似乎没什么问题&#xff0c;分析…

【modbus协议】libmodbus库移植基于linux平台

文章目录 下载库函数源码编译路径添加libmodbus 源码分析核心数据结构常用接口函数 开发 TCP Server 端开发TCP Client 端 下载库函数源码 编译路径添加 libmodbus 源码分析 核心数据结构 modbus_t结构体&#xff1a; 这是 libmodbus 的核心数据结构&#xff0c;代表一个 Mod…

OSPF特殊区域及其他特性

不用的链路这状态信息没必要一直保存&#xff0c;要不路由器承受不了。用OSPF 特殊区域解决 1. Stub区域和Totally Stub区域 R1作为ASBR引入多个外部网段&#xff0c;如果Area 2是普通区域&#xff0c;则R3将向该区域注入5类和4类LSA。 当把Area 2配置为Stub区域后&#xff1a…

node升级package.json中的版本

由于项目使用时间过老&#xff0c;升级对应包版本&#xff0c;可以使用新功能 1.使用npm-check-updates这个工具&#xff0c;先全局安装 npm install -g npm-check-updates2.检查package.json中dependencies的最新版本 ncu3.更新dependencies到新版本 ncu -u也是一样的 npx…

探索Python安全字符串处理的奥秘:MarkupSafe库揭秘

文章目录 探索Python安全字符串处理的奥秘&#xff1a;MarkupSafe库揭秘第一部分&#xff1a;背景介绍第二部分&#xff1a;MarkupSafe是什么&#xff1f;第三部分&#xff1a;如何安装MarkupSafe&#xff1f;第四部分&#xff1a;MarkupSafe的简单使用方法1. 使用escape函数2.…

机器视觉运动控制一体机在DELTA并联机械手视觉上下料应用

市场应用背景 DELTA并联机械手是由三个相同的支链所组成&#xff0c;每个支链包含一个转动关节和一个移动关节&#xff0c;具有结构紧凑、占地面积小、高速高灵活性等特点&#xff0c;可在有限的空间内进行高效的作业&#xff0c;广泛应用于柔性上下料、包装、分拣、装配等需要…

【C++】类和对象(二):this指针

大家好&#xff0c;我是苏貝&#xff0c;本篇博客带大家了解C的this指针&#xff0c;如果你觉得我写的还不错的话&#xff0c;可以给我一个赞&#x1f44d;吗&#xff0c;感谢❤️ 目录 1 this指针的引出2 this指针的特性 1 this指针的引出 我们先来定义一个日期类Date 问&am…

华为原生鸿蒙操作系统的发布有何重大意义和影响:

#1024程序员节 | 征文# 一、华为原生鸿蒙操作系统的发布对中国的意义可以从多个层面进行分析&#xff1a; 1. 技术自主创新 鸿蒙操作系统的推出标志着中国在操作系统领域的自主创新能力的提升。过去&#xff0c;中国在高端操作系统方面依赖于外国技术&#xff0c;鸿蒙的发布…

开发涉及的安全规范整理

文章目录 前言安全场景与措施API调用方式鉴权参数校验日志打印数据保存加密 总结 前言 这篇文章我们来整理下写代码和方案设计中的安全规范问题&#xff0c;内容偏服务端&#xff0c;即使是入门的新人&#xff0c;如果你对安全有所了解会让成熟规范的团队对你高看一眼。安全经常…

用HTML构建酷炫的文件上传下载界面

1. 基础HTML结构 首先&#xff0c;我们构建一个基本的HTML结构&#xff0c;包括一个表单用于文件上传&#xff0c;以及一个列表用于展示已上传文件&#xff1a; HTML <!DOCTYPE html> <html> <head><title>酷炫文件上传下载</title><link …

健康养生的重要性

养生之道&#xff0c;健康相随 在快节奏的现代生活中&#xff0c;养生健康已成为我们不可忽视的话题。随着生活水平的提高&#xff0c;人们越来越注重身体的保养与健康的维护。那么&#xff0c;如何才能做到养生健康&#xff0c;让身体与心灵都得到滋养呢&#xff1f; 首先&a…

鱼跃医疗助力退役军人事务部“高原情暖老兵项目”

10月17日-22日&#xff0c;在退役军人事务部指导下&#xff0c;中国老龄事业发展基金会联合腾讯SSV时光实验室、腾讯天籁实验室等机构发起的“情暖老兵&#xff0c;守望相助—老兵听力关怀计划”项目走进西藏&#xff0c;为退伍老兵提供听力健康筛查服务。西藏鱼跃医疗投资有限…