Zookeeper Java 开发,自定义分布式锁示例

文章目录

    • 一、概述
    • 二、导入依赖包
    • 三、创建锁的过程
      • 3.1 通过 create 创建节点信息
      • 3.2 AsyncCallback.StringCallback 回调函数
      • 3.3 AsyncCallback.Children2Callback 的回调函数
      • 3.4 Watcher 的回调函数
    • 四、完整示例
      • 4.1 完整分布式锁代码
      • 4.2 测试类

如果您还没有安装Zookeeper请看ZooKeeper 安装说明,Zookeeper 命令使用方法和数据说明,Zookeeper Java 开发入门。

一、概述

  • 情景:假设有10个客户端(分散的10台主机)要执行一个任务,这个任务某些过程需要保持原子性。那么我们就需要一个分布式锁。

  • 原理:通过在Zookeeper中创建序列节点来实现获得锁,删除节点来释放锁。其实质是一个按先来后到的排序过程,实现过程如下:

    • 客户端发起请求,创建锁序列节点(/lock/xxxxxxxx)

    • 获取所有锁节点,判断自己是否为最小节点

      • 如果自己是最小序列节点,则立即获得锁
      • 否则不能获得锁,但要监控前一个序列节点的状态
    • 获得锁的客户端开始执行任务。

    • 执行完任务后释放锁。

      • 由于后一个节点监控了前一个节点,当前一个节点删除时,后一个客户端会收到回调。

      • 在这个回调中再获取所有锁节点,判断自己是否为最小节点。

      • 以此类推,直到全部结束。

  • 流程如下

在这里插入图片描述

  • 如果您对没有做过 Zookeeper 开发,强烈建立先看 Zookeeper Java 开发入门。

二、导入依赖包

  • 在 pom.xml 文件中导入 Zookeeper 包,注意一般这个包的版本要和您安装的 Zookeeper 服务端版本一致。

    <dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.8.2</version>
    </dependency>
    

三、创建锁的过程

3.1 通过 create 创建节点信息

  • 通过 create 创建序列节点信息。他是异步方式,创建成功后会调用 AsyncCallback.StringCallback.processResult 回调函数。
    public void lock() throws InterruptedException, LockException {zooKeeper.create("/lock", "xxx".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, this, context);countDownLatch.await();if(StringUtils.isEmpty(this.lockNodePath)){throw new LockException("创建锁失败");}System.out.println(this.appName + " 获得锁");}

3.2 AsyncCallback.StringCallback 回调函数

  • 在 AsyncCallback.StringCallback 的回调函数中通过 getChildren 方法获取 ZooKeeper 锁节点下的所有节点信息。这个方法是异步的,调用成功后会调用 AsyncCallback.Children2Callback.processResult 回调函数。
    // AsyncCallback.StringCallback@Overridepublic void processResult(int i, String s, Object o, String s1) {if(StringUtils.isEmpty(s1)){// 这里是创建锁失败的情况。this.countDownLatch.countDown();return;}System.out.println(this.appName + " create lock node="+s1);this.lockNodePath = s1;// 获取 ZooKeeper 锁节点下的所有节点信息,以此来判断我是不是第一个创建节点,如果就获得锁,否则监控前一个节点信息。zooKeeper.getChildren("/", false, this, context);}

3.3 AsyncCallback.Children2Callback 的回调函数

  • 在 AsyncCallback.Children2Callback 的回调函数判断我是不是第一个创建节点,如果就获得锁,否则监控前一个节点信息。监控前一个节点信息使用 exists 方法,这个方法设置了 Watcher 的 processResult 回调函数
    // AsyncCallback.Children2Callback@Overridepublic void processResult(int i, String s, Object o, List<String> list, Stat stat) {Collections.sort(list);//        for (String s1 : list) {
//            System.out.println("\t "+this.lockNodePath+" previous lock node="+s1);
//        }int index = list.indexOf(lockNodePath.substring(1));if(0 == index){// 如果我现在是第一个节点,则获得锁try {zooKeeper.setData("/", this.lockNodePath.getBytes(), -1);} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}this.countDownLatch.countDown();}else {// 我不是第一个节点,监控前一个节点信息(等他删除后我就是可能是第一个了)String watchNodePath = "/" + list.get(index - 1);System.out.println("\t "+this.lockNodePath+" watch node:"+ watchNodePath);zooKeeper.exists(watchNodePath, this, new StatCallback() {@Overridepublic void processResult(int i, String s, Object o, Stat stat) {}}, context);}}

3.4 Watcher 的回调函数

  • 在 Watcher 的回调函数,我们通过判断 watchedEvent.getType() 为 NodeDeleted 类型时,重新获取 ZooKeeper 锁节点下的所有节点信息,这使得消息回到了 “3.3”步,判断谁是第一个节点,然后获得得,完成整个流程。
    // Watcher@Overridepublic void process(WatchedEvent watchedEvent) {switch (watchedEvent.getType()) {case None:break;case NodeCreated:break;case NodeDeleted:zooKeeper.getChildren("/", false, this, context);break;case NodeDataChanged:break;case NodeChildrenChanged:break;case DataWatchRemoved:break;case ChildWatchRemoved:break;case PersistentWatchRemoved:break;}}

四、完整示例

4.1 完整分布式锁代码

package top.yiqifu.study.p131;import org.apache.commons.lang3.StringUtils;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;public class ZookeeperLock implements Watcher, AsyncCallback.StringCallback,AsyncCallback.Children2Callback {private String appName;private ZooKeeper zooKeeper;private Object context;private String lockNodePath;private CountDownLatch countDownLatch = new CountDownLatch(1);public ZookeeperLock(String name, ZooKeeper zk){this.appName = name;this.zooKeeper = zk;this.context = this;}public void lock() throws InterruptedException, LockException {zooKeeper.create("/lock", "xxx".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, this, context);countDownLatch.await();if(StringUtils.isEmpty(this.lockNodePath)){throw new LockException("创建锁失败");}System.out.println(this.appName + " 获得锁");}public void unlock() throws KeeperException, InterruptedException, LockException {if(StringUtils.isEmpty(this.lockNodePath)){throw new LockException("没有获得锁,无法释放锁");}zooKeeper.delete(lockNodePath, -1);System.out.println(this.appName + " 释放锁");}// AsyncCallback.StringCallback@Overridepublic void processResult(int i, String s, Object o, String s1) {if(StringUtils.isEmpty(s1)){// 这里是创建锁失败的情况。this.countDownLatch.countDown();return;}System.out.println(this.appName + " create lock node="+s1);this.lockNodePath = s1;// 获取 ZooKeeper 锁节点下的所有节点信息,以此来判断我是不是第一个创建节点,如果就获得锁,否则监控前一个节点信息。zooKeeper.getChildren("/", false, this, context);}// AsyncCallback.Children2Callback@Overridepublic void processResult(int i, String s, Object o, List<String> list, Stat stat) {Collections.sort(list);//        for (String s1 : list) {
//            System.out.println("\t "+this.lockNodePath+" previous lock node="+s1);
//        }int index = list.indexOf(lockNodePath.substring(1));if(0 == index){// 如果我现在是第一个节点,则获得锁try {zooKeeper.setData("/", this.lockNodePath.getBytes(), -1);} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}this.countDownLatch.countDown();}else {// 我不是第一个节点,监控前一个节点信息(等他删除后我就是可能是第一个了)String watchNodePath = "/" + list.get(index - 1);System.out.println("\t "+this.lockNodePath+" watch node:"+ watchNodePath);zooKeeper.exists(watchNodePath, this, new StatCallback() {@Overridepublic void processResult(int i, String s, Object o, Stat stat) {}}, context);}}// Watcher@Overridepublic void process(WatchedEvent watchedEvent) {switch (watchedEvent.getType()) {case None:break;case NodeCreated:break;case NodeDeleted:zooKeeper.getChildren("/", false, this, context);break;case NodeDataChanged:break;case NodeChildrenChanged:break;case DataWatchRemoved:break;case ChildWatchRemoved:break;case PersistentWatchRemoved:break;}}public class LockException extends  Exception{public LockException(String message){super(message);}}
}

4.2 测试类

package top.yiqifu.study.p131;import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;public class Test06_ZookeeperLock {public static void main(String[] args) {try {// 创建 ZooKeeper 对象final ZooKeeper zooKeeper = testCreateZookeeper();int clientCount = 10;final CountDownLatch countDownLatch = new CountDownLatch(clientCount);for (int i = 0; i < clientCount; i++) {new Thread(new Runnable(){@Overridepublic void run() {TestLock(zooKeeper);countDownLatch.countDown();}}).start();}countDownLatch.await();} catch (IOException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}}private static void  TestLock(ZooKeeper zooKeeper){try {String appName = Thread.currentThread().getName();ZookeeperLock zookeeperLock = new ZookeeperLock(appName, zooKeeper);// 加锁(获得分布式锁)zookeeperLock.lock();System.out.println(appName + " 执行任务");Thread.sleep(1000);// 释放锁zookeeperLock.unlock();} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();} catch (ZookeeperLock.LockException e) {e.printStackTrace();}}private static ZooKeeper testCreateZookeeper() throws IOException, InterruptedException, KeeperException {final CountDownLatch countDownLatch = new CountDownLatch(1);// ZooKeeper 集群地址(没连接池的概念,是Session的概念)//String connectionString = "192.168.8.51:2181,192.168.8.52:2181,192.168.8.53:2181";String connectionString = "192.168.8.51:2181,192.168.8.52:2181,192.168.8.53:2181/aaa";// ZooKeeper Session 数据超时时间(也就是这个Session关闭后,与这个Session相关的数据在ZooKeeper中保存多信)。Integer sessionTimeout = 3000;// ZooKeeper Session 级别 Watcher(Watch只发生在读方法上,如 get、exists)final ZooKeeper zooKeeper = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {try {Event.KeeperState state = watchedEvent.getState();Event.EventType type = watchedEvent.getType();String path = watchedEvent.getPath();switch (state) {case Unknown:break;case Disconnected:break;case NoSyncConnected:break;case SyncConnected:countDownLatch.countDown();break;case AuthFailed:break;case ConnectedReadOnly:break;case SaslAuthenticated:break;case Expired:break;case Closed:break;}switch (type) {case None:break;case NodeCreated:break;case NodeDeleted:break;case NodeDataChanged:break;case NodeChildrenChanged:break;case DataWatchRemoved:break;case ChildWatchRemoved:break;case PersistentWatchRemoved:break;}System.out.println("Session watch state=" + state);System.out.println("Session watch type=" + type);System.out.println("Session watch path=" + path);} catch (Exception e) {e.printStackTrace();}}});countDownLatch.await();ZooKeeper.States state = zooKeeper.getState();switch (state) {case CONNECTING:break;case ASSOCIATING:break;case CONNECTED:break;case CONNECTEDREADONLY:break;case CLOSED:break;case AUTH_FAILED:break;case NOT_CONNECTED:break;}System.out.println("ZooKeeper state=" + state);return zooKeeper;}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/146021.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

切换阿里云ES方式及故障应急处理方案

一、阿里云es服务相关问题及答解 1.1 ES7.10扩容节点时间 增加节点数量需要节点拉起和数据Rebalance两步,拉起时间7.16及以上的新版本大概10分钟以内,7.16以前大概一小时,数据迁移的时间就看数据量了,一般整体在半小时以内 (需进行相关测试验证) 1.2 ES7.10扩容数据节点…

在 Node.js 中发出 HTTP 请求的 5 种方法

在 Node.js 中发出 HTTP 请求的 5 种方法 学习如何在 Node.js 中发出 HTTP 请求可能会让人感到不知所措&#xff0c;因为有数十个可用的库&#xff0c;每个解决方案都声称比上一个更高效。一些库提供跨平台支持&#xff0c;而另一些库则关注捆绑包大小或开发人员体验。 在这篇…

云轴科技ZStack信创云平台支撑长江航务管理局35套航运管理系统

信创是数字中国建设的重要组成部分&#xff0c;也是数字经济发展的关键推动力量。作为云基础软件企业&#xff0c;云轴科技ZStack产品矩阵全面覆盖数据中心云基础设施&#xff0c;ZStack信创云首批通过可信云《一云多芯IaaS平台能力要求》先进级&#xff0c;是其中唯一兼容四种…

二百零三、Flume——Flume实时采集数据频率为1s的高频率Kafka数据直接写入ODS层表的HDFS文件路径下

一、目的 在离线数仓中&#xff0c;需要用Flume去采集Kafka中的数据&#xff0c;然后写入HDFS中。 由于每种数据类型的频率、数据大小、数据规模不同&#xff0c;因此每种数据的采集需要不同的Flume配置文件。玩了几天Flume&#xff0c;感觉Flume的使用难点就是配置文件 二、…

AOF是什么?

目录 一、AOF是什么&#xff1f; 二、使用AOF 三、命令写入 四、重写机制 4.1 触发AOF 4.2 AOF执行流程 一、AOF是什么&#xff1f; AOF是Append Only File&#xff0c;是Redis中实现持久化的一种方式。以独⽴⽇志的⽅式记录每次命令&#xff0c;重启时再重新执⾏ AOF ⽂件中的…

小程序富文本图片大小问题

文章目录 概要uniapp小程序情况解决方法及完整示例 概要 在小程序使用富文本或者在nuiapp&#xff08;小程序的&#xff09;使用富文本都会转为 <rich-text nodes"<p class"p class">内容</p>”></rich-text>如果是这种情况的话在css…

自动驾驶-BEV感知综述

BEV感知综述 随着自动驾驶传感器配置多模态化、多源化&#xff0c;将多源信息在unified View下表达变得更加关键。BEV视角下构建的local map对于多源信息融合及理解更加直观简洁&#xff0c;同时对于后续规划控制模块任务的开展也更为方便。BEV感知的核心问题是&#xff1a; …

飞书开发学习笔记(八)-开发飞书小程序Demo

飞书开发学习笔记(八)-开发飞书小程序Demo 一.小程序开发概述 1.1 小程序开发概述 飞书开发文档中查看&#xff1a;小程序开发概述 飞书小程序是指可以运行在飞书客户端中的小程序&#xff0c;小程序的一套代码可以适配 Android、iOS、PC 多平台&#xff0c;且用户体验与飞书…

VUE基础的一些实战总结

目录 创建一个 Vue 应用 步骤 1&#xff1a;安装 Node.js 和 npm 步骤 2&#xff1a;安装 Vue CLI 步骤 3&#xff1a;创建 Vue 项目 步骤 4&#xff1a;启动开发服务器 步骤 5&#xff1a;访问应用程序 步骤 6&#xff1a;编辑 Vue 应用 步骤 7&#xff1a;构建和部署…

python3.8 安装 ssl 模块 和 _ctypes 模块

这文章目录 前情提要安装 openssl-1.1.1重新编译安装 python3.8-rpath 编译选项介绍python3.8 跟 python3.10 的区别那要怎么解决这个问题呢&#xff0c;我想到有四种解决方案&#xff1a; 前情提要 我在之前给 python3.10 安装 ssl 模块后以为该步骤 “对于 python3.6、pytho…

uniapp使用Canvas实现电子签名

来源&#xff1a; 公司的一个需求&#xff0c;需要给新注册的会员和客商需要增加签署协议功能&#xff1b; 之前的思路&#xff1a; 1、使用vue-signature-pad来实现电子签名&#xff0c;但是安卓手机不兼容&#xff1b; 2、uniapp插件市场来实现&#xff0c;但是对HBuilderX…

【MMC/SD/SDIO】读写操作

SD 总线是基于命令和数据流&#xff0c;它们由一个开始 Bit 发起&#xff0c;由一个停止 Bit 结束。 Command&#xff1a;命令开始一个操作。命令由 Host 驱动&#xff0c;或者给单卡&#xff08;寻址命令&#xff09;&#xff0c;或者给所有连接的卡&#xff08;广播命令&…

决策树的Boosting策略是什么

在决策树的Boosting策略中&#xff0c;最常见的算法是梯度提升决策树&#xff08;Gradient Boosting Decision Trees&#xff0c;简称GBDT&#xff09;。GBDT是一种集成学习方法&#xff0c;通过串行训练多个决策树&#xff0c;并根据前一个树的预测结果来调整下一个树的训练目…

【Linux网络】工作环境救急——关于yum安装的5个花式操作

目录 1、只下载不安装&#xff0c;离线安装软件 2、自行打包创建元数据 第一步&#xff1a;先准备好nginx的软件包&#xff0c;放在一个文件夹下 第二步&#xff1a;在本地下载createrepo命令软件&#xff0c;用于创建元信息&#xff0c;这个一定是对包的上一级目录使用命令…

无线WiFi安全渗透与攻防(六)之WEP破解-Gerix-wifi-cracker自动化破解WEP加密

WEP破解-Gerix-wifi-cracker自动化破解WEP加密 WEP破解-Gerix-wifi-cracker自动化破解WEP加密1.环境准备1.软件和kali2.下载软件,下载地址3.将软件复制到kali,解压4.进入软件目录2.破解步骤1.启动gerix-wifi-cracker-2-master软件2.设置无线网卡位Monitor Mode模式3.重新扫描…

【Python 千题 —— 基础篇】输出列表平均值

题目描述 题目描述 输出列表的平均值。题中有一个包含数字的列表 [19, 39, 130, 48, 392, 101, 92]&#xff0c;使用 for 循环输出这个列表中所有项的平均值。 输入描述 无输入。 输出描述 输出列表的平均值。 示例 示例 ① 输出&#xff1a; 列表的平均值是&#xf…

【Android】导入三方jar包/系统的framework.jar

1.Android.mk导包 1).jar包位置 与res和src同一级的libs中(没有就新建) 2).Android.mk文件 LOCAL_STATIC_ANDROID_LIBRARIES&#xff1a;android静态库&#xff0c;经常用于一些support的导包 LOCAL_JAVA_LIBRARIES&#xff1a;依赖的java库&#xff0c;一般为系统的jar…

【MySQL学习笔记-001】- 创建表、插入数据、查看数据库结构

创建employees表 当创建一个表时&#xff0c;需要指定表的名称和每个列的名称和数据类型。以下是一个示例SQL语句&#xff0c;用于创建一个名为"employees"的表&#xff0c;其中包含员工ID、姓名、职位和工资等列&#xff1a; CREATE TABLE employees (employee_id…

PCA降维Python demo

读这篇15年CVPR的文章&#x1f923;&#x1f923;&#x1f923;&#x1f923;&#x1f923; inproceedings{liu2015sparse,title{Sparse convolutional neural networks},author{Liu, Baoyuan and Wang, Min and Foroosh, Hassan and Tappen, Marshall and Pensky, Marianna},…

相机突然断电,保存的DAT视频文件如何修复

3-7 本文主要解决因相机突然断电导致拍摄的视频文件损坏的问题。 在平常使用相机拍摄视频&#xff0c;比如用单反相机、无人机拍摄视频的时候&#xff0c;如果电池突然断电&#xff0c;或者突然炸机了&#xff0c;就非常有可能会得到一个损坏的视频文件&#xff0c;比如会产生…