zookeeper案例

目录

案例一:服务器动态上下线

服务端:

(1)先获取zookeeper连接

(2)注册服务器到zookeeper集群:

(3)业务逻辑(睡眠):

服务端代码如下:

客户端:

(1)获取zookeeper的连接:

(2)监听/servers下边的子节点的增减:

客户端代码如下:

案例二:ZooKeeper 分布式锁

分布式锁是什么?

锁的实现:

构造函数:

加锁函数:

解锁函数:

整体代码:

测试类代码 :

Curator 框架实现分布式锁案例:

实现步骤:

代码如下:


该案例主要也是客户端监听原理,客户端监听服务器的上下线情况

先在集群上创建/servers 节点(用于存储连接的服务器的主机和该服务器的节点数)相当于zookeeper集群

案例一:服务器动态上下线

服务端:

(1)先获取zookeeper连接

        创建类对象

该类为我们创建的服务端类:

        DistributeServer server = new DistributeServer();

        获取zookeeper连接:

自己创建连接方法:

    private void getconnect() throws IOException {zk = new ZooKeeper(connectstring, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {}});}

 让后server对象在main函数中调用

(2)注册服务器到zookeeper集群:

注册是需要注册到zookeeper集群的/servers路径下,需要指定参数进行创建

private void regestServer(String hostname) throws InterruptedException, KeeperException {
zk.create(parentNode+"/"+hostname,hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);//  需要创建有序的临时节点所以-e(暂时) -s(有序)System.out.println("服务器"+hostname+"已注册连接");}

(3)业务逻辑(睡眠):

    private void business() throws InterruptedException {Thread.sleep(Long.MAX_VALUE);}

服务端代码如下:

package com.tangxiaocong.case1;
import org.apache.zookeeper.*;
import java.io.IOException;
/*** @Date 2023/8/10 19:06* @Author */
public class DistributeServer {private static String connectstring="hadoop102:2181,hadoop103:2181,hadoop104:2181";private static int sessionTimeout=2000;private ZooKeeper zk =null;private String parentNode = "/servers";public static void main(String[] args) throws IOException, InterruptedException, KeeperException {//获取zk连接//创建DistributeServer server = new DistributeServer();server.getconnect();//注册服务器到zk集群//注册是需要在/servers节点下创建所开启的服务器的路径server.regestServer(args[0]);//业务逻辑(实际是延时让它睡觉---不然会注册完成就关闭)server.business();}private void business() throws InterruptedException {Thread.sleep(Long.MAX_VALUE);}private void regestServer(String hostname) throws InterruptedException, KeeperException {
zk.create(parentNode+"/"+hostname,hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);//  需要创建有序的临时节点所以-e(暂时) -s(有序)System.out.println("服务器"+hostname+"已注册连接");}private void getconnect() throws IOException {zk = new ZooKeeper(connectstring, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {}});}
}

客户端:

(1)获取zookeeper的连接:

        先创建客户端对象,在进行构建获取zookeeper连接的方法,本方法对process方法进行了重写,填写了再发生上下线的运行逻辑

 private void getConnect() throws IOException {zk= new ZooKeeper(connectString, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {try {getServerList();} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}}});}

(2)监听/servers下边的子节点的增减:

        构建方法client.getServerList()来进行监听:

代码逻辑就是通过getChildren()方法获取指定目录下的所有子目录并开启监听

再进行遍历,把遍历结果封装到一个集合中,最后进行输出

 private void getServerList() throws InterruptedException, KeeperException {List<String> children = zk.getChildren("/servers", true);//该方法会获取指定路径下的所有子节点//true 会走初始化中的watch 也可以自己创建watch//把所有的服务器都封装到一个集合ArrayList<String> list = new ArrayList<>();for (String child : children) {byte[] data = zk.getData("/servers" +"/"+ child, false, null);//上边已经便利到一个服务器对象,再进行添加list.add(new String(data));}System.out.println(list);}

(3)业务逻辑同服务端不在赘述。

客户端代码如下:

package com.tangxiaocong.case1;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/*** @Date 2023/8/10 21:27* @Author * 客户端的监听功能*/
public class DistributeClient {
private String connectString="hadoop102:2181,hadoop103:2181,hadoop104:2181";private int sessionTimeout=2000;private ZooKeeper zk=null;public static void main(String[] args) throws IOException, InterruptedException, KeeperException {//获取zk连接DistributeClient client = new DistributeClient();client.getConnect();//监听/servers下边的子节点的增减client.getServerList();//业务逻辑(睡眠)client.business();}private void business() throws InterruptedException {Thread.sleep(Long.MAX_VALUE);}private void getServerList() throws InterruptedException, KeeperException {List<String> children = zk.getChildren("/servers", true);//该方法会获取指定路径下的所有子节点//true 会走初始化中的watch 也可以自己创建watch//把所有的服务器都封装到一个集合ArrayList<String> list = new ArrayList<>();for (String child : children) {byte[] data = zk.getData("/servers" +"/"+ child, false, null);//上边已经便利到一个服务器对象,再进行添加list.add(new String(data));}System.out.println(list);}private void getConnect() throws IOException {zk= new ZooKeeper(connectString, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {try {getServerList();} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}}});}
}

案例二:ZooKeeper 分布式锁

分布式锁是什么?

日常使用计算机的时候,我们的电脑不会只开一个进程,但是当“进程1”在访问某些资源的时候,不能被其他进程所访问,它就会去获得锁,把她所访问的资源进行锁上,对该资源进行独占。"进程 1"用完该资源以后就将锁释放掉,让其 他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的 访问该临界资源。那么我们把这个分布式环境下的这个锁叫作分布式锁。

锁的实现:

构造函数:

在该类中首先要实现构造方法,构造方法与类名相同,在该方法中需要获取连接,重写process方法,在该方法中实现释放CountDownLatch的类对象,有两种情况,正常连接释放一种,不是正常连接状态,则释放另一种。在构造方法中还要判断是否存在“/locks”路径,存在则正常退出,不存在则创建该路径。

加锁函数:

使用ZooKeeper对象进行创建节点(临时有序),让后获取“/locks”路径下的所有节点序号,对结果进行判断,如果返回的List集合只有一个节点,则直接返回,默认加锁,不用再做监听工作。如果不是只有一个节点,则对List集合进行排序,再获取他的节点名称,通过indexOf函数来获取该名称节点的下标。如果为-1,则数据异常,为0 则为最小节点,则直接退出,进行加锁不需要设置监听,结果为其他则需要设置监听,先设置监听字符串,当状态不发生改变会一致阻塞,只有上锁节点让位后会调用process方法进行释放。

解锁函数:

解锁就是直接删除节点即可

整体代码:

package com.tangxiaocong.case2;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/*** @Date 2023/8/12 19:56* @Author */
public class DistributedLock {final    private String connectString="hadoop102:2181,hadoop103:2181,hadoop104:2181";final  private int sessionTimeout=2000;final    private   ZooKeeper zk;private String waitPath;private String currentModu;//为了程序的健壮性,创建该对象   等待操作final   private CountDownLatch waitLach=new CountDownLatch(1);final   private CountDownLatch countDownLatch=new CountDownLatch(1);public DistributedLock() throws IOException, InterruptedException, KeeperException {//获取连接zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {//  connectLatch  如果正常连接zk  可以释放if (watchedEvent.getState()==Event.KeeperState.SyncConnected){countDownLatch.countDown();}//检测到删除节点并且是前一个节点则释放waitlatchif (watchedEvent.getType()==Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)){waitLach.countDown();}}});//等待是否正常连接  正常(已)连接会释放  否则阻塞countDownLatch.await();// 判断是否存在lock锁Stat stat = zk.exists("/locks", false);if (stat==null){//创建该节点String s = zk.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.CONTAINER);}}//对zk加锁public void zkLock()  {//创建临时的带序号的节点try {currentModu = zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);List<String> children = zk.getChildren("/locks", false);//如果只有一个节点   则直接获取if(children.size()==1){return;}else {//排序Collections.sort(children);//直接从s后边开始   开始的下标就是length的长度String substring = currentModu.substring("/locks/".length());//通过substring来获取在List集合中的下标位置int index = children.indexOf(substring);if (index==-1){System.out.println("数据异常");}else if (index==0){return;}else {//  需要监听上一个节点waitPath="/locks/"+children.get(index-1);zk.getData(waitPath,true,new Stat());//等待监听waitLach.await();return;}}} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}//判断创建的节点是否是最小序号的节点 如果是则获取锁  不是则监听他的前一个节点}//对zk解锁public void unzkLock(){
//删除节点try {//-1  是版本号zk.delete(this.currentModu,-1);} catch (InterruptedException  | KeeperException e) {e.printStackTrace();}}
}

测试类代码 :

package com.tangxiaocong.case2;
import org.apache.zookeeper.KeeperException;
import java.io.IOException;
/*** @Date 2023/8/12 22:31* @Author 唐晓聪*/
public class DistributedLockTest
{public static void main(String[] args) throws IOException, InterruptedException, KeeperException {//创建两个客户端对象final    DistributedLock lock1 = new DistributedLock();final   DistributedLock lock2 = new DistributedLock();new Thread(new Runnable() {@Overridepublic void run() {try {  lock1.zkLock();System.out.println("线程1启动获得锁");Thread.sleep(5*1000);lock1.unzkLock();System.out.println("线程1释放锁");} catch (Exception e) {e.printStackTrace();}}}).start();new Thread(new Runnable() {@Overridepublic void run() {try {lock2.zkLock();System.out.println("线程2启动获得锁");Thread.sleep(5*1000);lock2.unzkLock();System.out.println("线程2释放锁");} catch (Exception e) {e.printStackTrace();}}}).start();}
}

Curator 框架实现分布式锁案例:

该案例是直接使用API进行实现分布式锁

实现步骤:

创建分布式锁对象,new InterProcessMutex(),参数1为所要连接的客户端,参数2为监听路径

参数1传入的为getCuratorFramework()自定义函数,

该函数通过工厂类的方式进行建立连接,返回创建好的客户端,让后start启动客户端

创建完分布式锁对象后创建两个线程,在线程中进行获得锁,释放锁的操作。

代码如下:

package com.tangxiaocong.case3;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
/*** @Date 2023/8/13 20:07* @Author */
public class CuratorLockTest {public static void main(String[] args) {//创建分布式锁1//参数1   所连接的客户端 参数2 监听路径InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");//创建分布式锁2InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");//创建线程new Thread(new Runnable() {@Overridepublic void run() {try {lock1.acquire();System.out.println("thread 1 acquire lock");lock1.acquire();System.out.println("thread 1 again acquire lock");Thread.sleep(5*1000);lock1.release();System.out.println("thread 1 relax lock");lock1.release();System.out.println("thread 1 again relax lock");System.out.println();} catch (Exception e) {e.printStackTrace();}}}).start();new Thread(new Runnable() {@Overridepublic void run() {try {lock2.acquire();System.out.println("thread 2 acquire lock");lock2.acquire();System.out.println("thread 2 again acquire lock");Thread.sleep(5*1000);lock2.release();System.out.println("thread 2 relax lock");lock2.release();System.out.println("thread 2 again relax lock");} catch (Exception e) {e.printStackTrace();}}}).start();}private static CuratorFramework getCuratorFramework() {ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);//通过工厂类的方式进行建立连接CuratorFramework client = CuratorFrameworkFactory.builder().connectString("hadoop102:2181,hadoop102:2181,hadoop104:2181").connectionTimeoutMs(2000).sessionTimeoutMs(2000).retryPolicy(policy)//连接失败后  间隔多少秒下次间隔.build();client.start();System.out.println("zookeeper  success start  !!!!!");return client;}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/36212.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java+Excel+POI+testNG基于数据驱动做一个简单的接口测试【杭州多测师_王sir】

一、创建一个apicases.xlsx放入到eclipse的resource里面&#xff0c;然后refresh刷新一下 二、在pom.xml文件中加入poi和testng的mvn repository、然后在eclipse的对应目录下放入features和plugins&#xff0c;重启eclipse就可以看到testNG了 <!--poi excel解析 --><d…

运维监控学习笔记3

DELL的IPMI页面的登录&#xff1a; 风扇的状态&#xff1a; 电源温度&#xff1a;超过70度就告警&#xff1a; 日志信息&#xff1a; 可以看到更换过磁盘。 iDRAC的设置 虚拟控制台&#xff1a;启动远程控制台&#xff1a; 可以进行远程控制。 机房工程师帮我们接远程控制&…

【云原生】kubernetes中容器的资源限制

目录 1 metrics-server 2 指定内存请求和限制 3 指定 CPU 请求和限制 资源限制 在k8s中对于容器资源限制主要分为以下两类: 内存资源限制: 内存请求&#xff08;request&#xff09;和内存限制&#xff08;limit&#xff09;分配给一个容器。 我们保障容器拥有它请求数量的…

【云原生】K8S集群

目录 一、调度约束1.1 POT的创建过程1.1调度过程 二、指定节点调度2.1 通过标签选择节点 三、亲和性3.1requiredDuringSchedulingIgnoredDuringExecution&#xff1a;硬策略3.1 preferredDuringSchedulingIgnoredDuringExecution&#xff1a;软策略3.3Pod亲和性与反亲和性3.4使…

山东布谷科技直播平台搭建游戏开发技术分享:数据存储的重要意义

在市场上的热门的直播平台中&#xff0c;有很多小程序为用户提供各种各样的功能&#xff0c;这其中就有很多游戏小程序&#xff0c;当今社会独生子女众多&#xff0c;很多作为独生子女的用户都会去选择一个能够社交互动的APP来填补内心的空虚&#xff0c;而直播平台的实时互动的…

SAP 选择屏幕组件名描述翻译时字符长度不够问题处理

问题&#xff1a;有时候我们在开发report程序的时候&#xff0c;要求程序显示支持中英文&#xff0c;如果程序是在中文环境下开发的时候&#xff0c;需要进行翻译处理&#xff0c;但是我们发现选择屏幕上的组件的描述支持的默认长度是30位&#xff0c;如果超过该如何处理呢 解…

构建一个LLM应用所需的所有信息

一、说明 您是否对大型语言模型&#xff08;LLM&#xff09;的潜力感兴趣&#xff0c;并渴望创建您的第一个基于LLM的应用程序&#xff1f;或者&#xff0c;也许您是一位经验丰富的开发人员&#xff0c;希望简化工作流程&#xff1f;看看DemoGPT就是您的最佳选择。该工具旨在简…

【软件测试】Linux环境下Docker搭建+Docker搭建MySQL服务(详细)

目录&#xff1a;导读 前言 一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 Linux之docker搭…

CDN(内容分发网络)

CDN的全称是 Content Delivery Network, 即内容分发网络。CDN是构建在现有网络基础之上的智能虚拟网络&#xff0c;依靠部署在各地的边缘服务器&#xff0c;通过中心平台的负载均衡、内容分发、调度等功能模块&#xff0c;使用户就近获取所需内容&#xff0c;降低网络拥塞&a…

详谈MongoDB的那些事

概念区分 什么是关系型数据库 关系型数据库&#xff08;Relational Database&#xff09;是一种基于关系模型的数据库管理系统&#xff08;DBMS&#xff09;。在关系型数据库中&#xff0c;数据以表格的形式存储&#xff0c;表格由行和列组成&#xff0c;行表示数据记录&…

神秘的ip地址8.8.8.8,到底是什么类型的DNS服务器?

下午好&#xff0c;我的网工朋友。 DNS&#xff0c;咱们网工配置网络连接或者路由器时&#xff0c;高低得和这玩意儿打交道吧。 它是互联网中用于将人类可读的域名&#xff08;例如http://www.example.com&#xff09;转换为计算机可理解的IP地址&#xff08;例如192.0.2.1&a…

元宇宙核能发电VR模拟仿真实训教学为建设新型电力系统提供重要支撑

随着“碳达峰、碳中和”目标与建设新型能源体系的提出&#xff0c;在元宇宙环境下建设电力系统是未来发展的趋势。以物联网、区块链、数字孪生、混合现实等技术为主要代表的元宇宙技术体系及其在电力和能源系统中的应用&#xff0c;将会促进智能电网的发展&#xff0c;为建设新…

Oracle 知识篇+分区表上的索引由global改为local注意事项

★ 知识点 二、知识点 Local型索引有如下优点 1.Only one index partition must be rebuilt when a maintenance operation other than SPLIT PARTITION or ADD PARTITION is performed on an underlying table partition. 2.The duration of a partition maintenance opera…

【uniapp】使用Vs Code开发uniapp:

文章目录 一、使用命令行创建uniapp项目&#xff1a;二、安装插件与配置&#xff1a;三、编译和运行:四、修改pinia&#xff1a; 一、使用命令行创建uniapp项目&#xff1a; 二、安装插件与配置&#xff1a; 三、编译和运行: 该项目下的dist》dev》mp-weixin文件导入微信开发者…

unity vscode 代码关联 跳转 BUG

一早打开电脑发现代码关联失效了&#xff0c;目测可能跟昨天一些插件更新有关 结论 就这货&#xff0c;开了就没法提示代码关联&#xff0c;估计预览版全是BUG。 另一个坑 同期有个unity插件也是预览版&#xff0c;“非常好使”&#xff0c;当场去世。评论点开有好几个人说用…

替代阿托斯DLKZOR-T/DLHZO-TES直动式伺服阀比例阀

DLKZOR-T/DLKZOR-TES直动式伺服阀比例阀结构&#xff1a; 1&#xff0c;LVDT传感器 2&#xff0c;比例电磁铁 3&#xff0c;阀体 4&#xff0c;阀套 5&#xff0c;阀芯 6&#xff0c;复位弹簧 7&#xff0c;集成数字放大器 8&#xff0c;七芯插头 9&#xff0c;RS232通…

[保研/考研机试] 杨辉三角形 西北工业大学复试上机题 C++实现

题目描述 Time Limit: 1000 ms Memory Limit: 256 mb 输入n值&#xff0c;使用递归函数&#xff0c;求杨辉三角形中各个位置上的值。 输入描述: 一个大于等于2的整型数n 输出描述: 题目可能有多组不同的测试数据&#xff0c;对于每组输入数据&#xff0c; 按题目的要求输…

15.3.2 【Linux】系统的配置文件:/etc/crontab,/etc/cron.d/*

这个“ crontab -e ”是针对使用者的 cron 来设计的&#xff0c;如果是“系统的例行性任务”时&#xff0c; 该怎么办呢&#xff1f;是否还是需要以 crontab -e 来管理你的例行性工作调度呢&#xff1f;当然不需要&#xff0c;你只要编辑/etc/crontab 这个文件就可以。有一点需…

机器学习终极指南:特征工程(01/2) — 第 -2 部分

西姆兰吉特辛格 一、介绍 欢迎来到“机器学习终极指南”的第二部分。在第一部分中&#xff0c;我们讨论了探索性数据分析 &#xff08;EDA&#xff09;&#xff0c;这是机器学习管道中的关键步骤。在这一部分中&#xff0c;我们将深入研究特征工程&#xff0c;这是机器学习过程…

使用shift关键字,写一个带二级命令的脚本(如:docker run -a -b -c中的run)

省流&#xff1a;shift关键字 探索思路 最近有一个小小的需求&#xff0c;写一个类似于docker run -a -b -c这样的脚本&#xff0c;这个脚本名为doline&#xff0c;它本身可以执行&#xff08;doline -a -b -c&#xff09;&#xff0c;同时又带有几个如run、init、start这样的…