分布式锁实现方案-基于zookeeper的分布式锁实现(原理与代码)

目录

一、基于zookeeper的分布式锁

1.1 基于Zookeeper实现分布式锁的原理

1.1.1 分布式锁特性说明

1.1.1.1 特点分析

1.1.1.2 本质

1.1.2 Zookeeper 分布式锁实现原理

1.1.2.1 Zookeeper临时顺序节点特性

1.1.2.2 Zookeeper满足分布式锁基本要求

1.1.2.3 Watcher机制

1.1.2.3 总结

1.2 分布式锁流程说明

1.2.1 分布式锁流程图

1.2.2 流程说明

1.3 分布式锁代码实现

1.3.1 自己手写,实现Lock接口

1.3.1.1 分布式锁ZookeeperDistributedLock

1.3.1.2 模拟下单处理OrderServiceHandle

1.3.1.3 订单号生成类OrderCodeGenerator

1.3.1.4 分布式锁测试类TestZookeeperDistributedLock

1.3.1.5 测试效果

1.3.2 基于Apache Curator 框架调用

1.3.2.1 maven依赖

1.3.2.2 代码实现

1.3.2.2.1 分布式锁类CuratorDistributeLock

1.3.2.2.2 测试类TestCuratorDistributedLock

1.3.2.3 执行结果


一、基于zookeeper的分布式锁

1.1 基于Zookeeper实现分布式锁的原理

1.1.1 分布式锁特性说明

1.1.1.1 特点分析
  • 每次只能一个占用锁;
  • 可以重复进入锁;
  • 只有占用者才可以解锁;
  • 获取锁和释放锁都需要原子
  • 不能产生死锁
  • 尽量满足性能
1.1.1.2 本质

同步互斥,使得处理任务能够一个一个逐步的过临界资源。

1.1.2 Zookeeper 分布式锁实现原理

1.1.2.1 Zookeeper临时顺序节点特性

zookeeper中有一种临时顺序节点,它具有以下特征:

  • 时效性,当会话结束,节点将自动被删除
  • 顺序性,当多个应用向其注册顺序节点时,每个顺序号将只能被一个应用获取
1.1.2.2 Zookeeper满足分布式锁基本要求
  1. 因为顺序性,可以让最小顺序号的应用获取到锁,从而满足分布式锁的 每次只能一个占用锁,因为只有它一个获取到,所以可以实现 重复进入 ,只要设置标识即可。锁的释放,即删除应用在zookeeper上注册的节点,因为每个节点只被自己注册拥有,所以只有自己才能删除,这样就满足只有占用者才可以解锁
  2. zookeeper的序号分配是原子的,分配后即不会再改变,让最小序号者获取锁,所以获取锁是原子的
  3. 因为注册的是临时节点,在会话期间内有效,所以不会产生死锁
  4. zookeeper注册节点的性能能满足几千,而且支持集群,能够满足大部分情况下的性能
1.1.2.3 Watcher机制

Zookeeper 允许客户端向服务端的某个 Znode 注册一个 Watcher 监听,当服务端的一些指定事
件触发了这个 Watcher,服务端会向指定客户端发送一个事件通知来实现分布式的通知功能,然
后客户端根据 Watcher 通知状态和事件类型做出业务上的改变。

在实现分布式锁的时候,主要利用这个机制,实现释放锁的时候,通知等待锁的线程竞争锁。

1.1.2.3 总结

综上可知,Zookeeper其实是基于临时顺序节点特性实现的分布式锁。当然,还结合了他的Watcher机制,实现释放锁的时候,通知等待锁的线程去竞争锁。

1.2 分布式锁流程说明

1.2.1 分布式锁流程图

1.2.2 流程说明

  1. client判断/lock目录是否存在,如果不存在则向其注册/lock的持久节点
  2. client向/lock/目录下注册/lock/Node-前缀的临时顺序节点,并得到顺序号
  3. client获取/lock/目录下的所有临时顺序子节点
  4. client判断临时子节点序号中是否存在比自身的序号小的节点。如果不存在,则获取到锁;如果存在,则对象该临时节点做watch监控
  5. 获得锁的线程,执行业务逻辑,执行完之后,删除临时节点,完成锁的释放。
  6. 等待锁的线程如果收到监控的临时节点被删除的通知,则再重复4、5、6步骤,进入下一个获得锁、释放锁的循环。

1.3 分布式锁代码实现

1.3.1 自己手写,实现Lock接口

1.3.1.1 分布式锁ZookeeperDistributedLock
package com.ningzhaosheng.distributelock.zookeeper;import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;/*** @author ningzhaosheng* @date 2024/4/17 18:13:38* @description 基于zookeeper实现的分布式锁*/
public class ZookeeperDistributedLock implements Lock {private static Logger logger = LoggerFactory.getLogger(ZookeeperDistributedLock.class);// zookeeper 地址private String ZOOKEEPER_IP_PORT = "192.168.152.130:2181";// zookeeper 锁目录private String LOCK_PATH = "/LOCK";// 创建 zookeeper客户端zkClientprivate ZkClient client = null;private CountDownLatch cdl;// 当前请求的节点前一个节点private String beforePath;// 当前请求的节点private String currentPath;/*** 初始化客户端和创建LOCK目录** @param ZOOKEEPER_IP_PORT* @param LOCK_PATH*/public ZookeeperDistributedLock(String ZOOKEEPER_IP_PORT, String LOCK_PATH) {this.ZOOKEEPER_IP_PORT = ZOOKEEPER_IP_PORT;this.LOCK_PATH = LOCK_PATH;client = new ZkClient(ZOOKEEPER_IP_PORT, 4000, 4000, new SerializableSerializer());// 判断有没有LOCK目录,没有则创建if (!this.client.exists(LOCK_PATH)) {this.client.createPersistent(LOCK_PATH);}}@Overridepublic void lock() {if (!tryLock()) {//对次小节点进行监听waitForLock();lock();} else {logger.info(Thread.currentThread().getName() + " 获得分布式锁!");}}@Overridepublic void lockInterruptibly() throws InterruptedException {}@Overridepublic boolean tryLock() {// 如果currentPath为空则为第一次尝试加锁,第一次加锁赋值currentPathif (currentPath == null || currentPath.length() <= 0) {// 创建一个临时顺序节点currentPath = this.client.createEphemeralSequential(LOCK_PATH + '/', "lock");System.out.println("---------------------------->" + currentPath);}// 获取所有临时节点并排序,临时节点名称为自增长的字符串如:0000000400List<String> childrens = this.client.getChildren(LOCK_PATH);//由小到大排序所有子节点Collections.sort(childrens);//判断创建的子节点/LOCK/Node-n是否最小,即currentPath,如果当前节点等于childrens中的最小的一个就占用锁if (currentPath.equals(LOCK_PATH + '/' + childrens.get(0))) {return true;}//找出比创建的临时顺序节子节点/LOCK/Node-n次小的节点,并赋值给beforePathelse {int wz = Collections.binarySearch(childrens, currentPath.substring(6));beforePath = LOCK_PATH + '/' + childrens.get(wz - 1);}return false;}@Overridepublic boolean tryLock(long time, TimeUnit unit) throws InterruptedException {return false;}//等待锁,对次小节点进行监听private void waitForLock() {IZkDataListener listener = new IZkDataListener() {public void handleDataDeleted(String dataPath) throws Exception {logger.info(Thread.currentThread().getName() + ":捕获到DataDelete事件!---------------------------");if (cdl != null) {cdl.countDown();}}public void handleDataChange(String dataPath, Object data) throws Exception {}};// 对次小节点进行监听,即beforePath-给排在前面的的节点增加数据删除的watcherthis.client.subscribeDataChanges(beforePath, listener);if (this.client.exists(beforePath)) {cdl = new CountDownLatch(1);try {cdl.await();} catch (InterruptedException e) {e.printStackTrace();}}this.client.unsubscribeDataChanges(beforePath, listener);}@Overridepublic void unlock() {// 删除当前临时节点client.delete(currentPath);}@Overridepublic Condition newCondition() {return null;}}
1.3.1.2 模拟下单处理OrderServiceHandle
package com.ningzhaosheng.distributelock.zookeeper;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.Lock;/*** @author ningzhaosheng* @date 2024/4/17 21:45:46* @description 模拟订单处理*/
public class OrderServiceHandle implements Runnable {private static OrderCodeGenerator ong = new OrderCodeGenerator();private Logger logger = LoggerFactory.getLogger(OrderServiceHandle.class);// 按照线程数初始化倒计数器,倒计数器private CountDownLatch cdl = null;private Lock lock = null;public OrderServiceHandle(CountDownLatch cdl, Lock lock) {this.cdl = cdl;this.lock = lock;}// 创建订单public void createOrder() {String orderCode = null;//准备获取锁lock.lock();try {// 获取订单编号orderCode = ong.getOrderCode();} catch (Exception e) {// TODO: handle exception} finally {//完成业务逻辑以后释放锁lock.unlock();}// ……业务代码logger.info("insert into DB使用id:=======================>" + orderCode);}@Overridepublic void run() {try {// 等待其他线程初始化cdl.await();} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}// 创建订单createOrder();}
}
1.3.1.3 订单号生成类OrderCodeGenerator
package com.ningzhaosheng.distributelock.zookeeper;import java.text.SimpleDateFormat;
import java.util.Date;/*** @author ningzhaosheng* @date 2024/4/17 21:44:06* @description 生成订单号*/
public class OrderCodeGenerator {// 自增长序列private static int i = 0;// 按照“年-月-日-小时-分钟-秒-自增长序列”的规则生成订单编号public String getOrderCode() {Date now = new Date();SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");return sdf.format(now) + ++i;}
}
1.3.1.4 分布式锁测试类TestZookeeperDistributedLock
package com.ningzhaosheng.distributelock.zookeeper;import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.Lock;/*** @author ningzhaosheng* @date 2024/4/17 21:48:28* @description zookeeper分布式锁测试类*/
public class TestZookeeperDistributedLock {public static void main(String[] args) {// zookeeper 地址String ZOOKEEPER_IP_PORT = "192.168.31.9:2181";// zookeeper 锁目录String LOCK_PATH = "/LOCK";// 线程并发数int NUM = 10;CountDownLatch cdl = new CountDownLatch(NUM);for (int i = 1; i <= NUM; i++) {// 按照线程数迭代实例化线程Lock lock = new ZookeeperDistributedLock(ZOOKEEPER_IP_PORT, LOCK_PATH);new Thread(new OrderServiceHandle(cdl, lock)).start();// 创建一个线程,倒计数器减1cdl.countDown();}}
}
1.3.1.5 测试效果

从上图执行结果中可以看出,在多线程情况下,分布式锁获取和释放正常。

1.3.2 基于Apache Curator 框架调用

1.3.2.1 maven依赖
        <dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>5.2.0</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>5.2.0</version></dependency>
1.3.2.2 代码实现

这里模拟业务使用分布式锁,还是使用的OrderServiceHandle类,这里只给出分布式锁实现类和测试类,不再给出OrderServiceHandle代码,可以参考上一小节的OrderServiceHandle类。

1.3.2.2.1 分布式锁类CuratorDistributeLock
package com.ningzhaosheng.distributelock.zookeeper.curator;import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;/*** @author ningzhaosheng* @date 2024/4/17 22:03:45* @description 实现Lock接口(其实可以不用,可以直接使用InterProcessMutex,这里是为了和jvm的Lock锁保持一致,所以做了一层封装)*/
public class CuratorDistributeLock implements Lock {private CuratorFramework client;private InterProcessMutex mutex;public CuratorDistributeLock(String connString, String lockPath) {this(connString, lockPath, new ExponentialBackoffRetry(3000,5));}public CuratorDistributeLock(String connString, String lockPath, ExponentialBackoffRetry retryPolicy) {try {client = CuratorFrameworkFactory.builder().connectString(connString).retryPolicy(retryPolicy).build();client.start();mutex = new InterProcessMutex(client, lockPath);} catch (Exception e) {e.printStackTrace();}}@Overridepublic void lock() {try {// 获取锁mutex.acquire();} catch (Exception e) {e.printStackTrace();}}@Overridepublic void lockInterruptibly() throws InterruptedException {}@Overridepublic boolean tryLock() {return false;}@Overridepublic boolean tryLock(long time, TimeUnit unit) throws InterruptedException {return false;}@Overridepublic void unlock() {try {// 释放锁mutex.release();} catch (Exception e) {e.printStackTrace();}}@Overridepublic Condition newCondition() {return null;}
}
1.3.2.2.2 测试类TestCuratorDistributedLock
package com.ningzhaosheng.distributelock.zookeeper.curator;import com.ningzhaosheng.distributelock.zookeeper.OrderServiceHandle;import java.util.concurrent.CountDownLatch;/*** @author ningzhaosheng* @date 2024/4/17 21:54:33* @description 基于 apache curator分布式锁测试类*/
public class TestCuratorDistributedLock {private static final String ZK_ADDRESS = "192.168.31.9:2181";private static final String LOCK_PATH = "/distributed_lock";public static void main(String[] args) {int NUM = 10;CountDownLatch cdl = new CountDownLatch(NUM);for (int i = 1; i <= NUM; i++) {// 按照线程数迭代实例化线程/** 创建CuratorDistributeLock* 其实可以不用,可以直接使用InterProcessMutex,这里是为了和jvm的Lock锁保持一致,所以做了一层封装*/CuratorDistributeLock curatorDistributeLock = new CuratorDistributeLock(ZK_ADDRESS,LOCK_PATH);new Thread(new OrderServiceHandle(cdl, curatorDistributeLock)).start();// 创建一个线程,倒计数器减1cdl.countDown();}}
}
1.3.2.3 执行结果

从执行结果可以看出,基于apche curator框架实现zookeeper锁,它也是按照临时顺序节点的顺序获取锁的,每次获得锁的节点都是最小顺序节点,然后等待锁的线程,会基于watcher机制,每次给最小临时顺序节点加回调,监听节点的变更(即释放锁的线程会删除节点),然后再重新判断最小临时顺序节点,最小的获得锁执行,依次循环完成。

好了,本次内容就分享到这,欢迎关注本博主。如果有帮助到大家,欢迎大家点赞+关注+收藏,有疑问也欢迎大家评论留言!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/1169.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

UE5增强输入系统 Enhanced Input

关键字&#xff1a; Enhanced Input 、 输入、映射、事件、鼠标、键盘、键鼠、动作、Trigger、触发器、 疑问&#xff1a; 新输入系统怎么做一个基础的案例&#xff1f;Trigger修改器中每个项都是什么功能&#xff1f;功能边界问题&#xff1a;如时刻、时段、单次事件、持续事…

Linux驱动开发——(一)设备树的基本属性及其应用

目录 一、常见基本属性 1.1 compatible属性 1.2 status属性 1.3 reg属性 1.4 #address-cells属性和#size-cells属性 二、基本属性在设备树的表现 三、基本属性在驱动代码的表现 3.1 驱动代码 3.2 驱动代码中的OF函数 3.2.1 of_find_node_by_path 3.2.2 of_find_prope…

Unity类银河恶魔城学习记录13-5,6 p146 Delete save file,p147 Encryption of saved data源代码

Alex教程每一P的教程原代码加上我自己的理解初步理解写的注释&#xff0c;可供学习Alex教程的人参考 此代码仅为较上一P有所改变的代码 【Unity教程】从0编程制作类银河恶魔城游戏_哔哩哔哩_bilibili FileDataHandler.cs using System; using System.IO; using UnityEngine; p…

Spring Boot + Thymeleaf 实现的任务发布网站

角色&#xff1a; 管理员雇主雇员 功能 雇主&#xff1a;登录、注册、发布任务、选择中标雇员、评价雇员雇员&#xff1a;登录、注册、查看任务列表、投标任务、收藏任务、完成任务管理员、登录、任务管理、雇主管理、雇员管理 部分功能截图 部署 导入数据库…

.NET 邮件发送 SMTP邮件发送

SMTP&#xff08;Simple Mail Transfer Protocol&#xff09;是用于电子邮件传输的规则集&#xff0c;可以从邮件客户端向接收电子邮件服务器发送、中继或转发邮件。发件人可使用SMTP 服务器来执行发送电子邮件的过程。SMTP服务器则是按照这些规则中转电子邮件的服务器。 IMAP…

视频质量评价 PSNR 算法详细介绍

PSNR PSNR(Peak Signal-to-Noise Ratio,峰值信噪比)是一种常用的评价图像质量的指标,尤其在图像压缩和图像处理领域。它基于最大可能的图像信号功率和图像的噪声功率之间的比率,通常用于衡量图像恢复或图像压缩算法的效果。 原理 PSNR是基于MSE(Mean Squared Error,均…

node-sass报错

node-sass报错 解决方案 有几种解决方案&#xff0c;但感觉都是为了下载vsta_sdk这个工具的。 有的电脑下载C开发程序时可以顺带下载这个插件。 可以直接下载VS之后点击下载C桌面开发&#xff0c;但是有的不行&#xff0c;所以网上也就有另外一种方式&#xff0c;就是下载V…

C# danbooru Stable Diffusion 提示词反推 OpenVINO Demo

C# danbooru Stable Diffusion 提示词反推 OpenVINO Demo 目录 说明 效果 模型信息 项目 代码 下载 说明 模型下载地址&#xff1a;https://huggingface.co/deepghs/ml-danbooru-onnx 效果 模型信息 OVVersion { BuildNumber 2023.1.0-12185-9e6b00e51cd-releases/20…

桌面软件使用到的开源库

想了解一下桌面软件开发中可能使用到的dll库 联想锁屏 libcef-常用概念-框架特点-CSDN博客 libcurl库使用详情、libcurl库的制作-CSDN博客 使用Cef和Qt做一个跨平台的多标签多窗口浏览器_cef3 多个标签-CSDN博客 cef 依赖的文件 libcef - Bigben - 博客园 (cnblogs.com) Q…

Hikyuu 2.0.2 发布,高性能量化交易研究框架

新增特性 历史财务信息入库&#xff0c;对于使用 MySQL 存储&#xff0c;可以直接使用服务端的财务数据&#xff08;之前只能在执行数据下载的机器上获取&#xff09;增加指标 FINANCE 获取相应历史财务数据&#xff0c;具体财务字段信息可通过StockManager.get_history_finan…

Android AIDL接口

一.AlDI接口简介 AIDL&#xff08;Android Interface Definition Language&#xff09;是一种 IDL 语言&#xff0c;用于生成可以在 Android 设备上两个进程之间进行进程间通信&#xff08;IPC&#xff09;的代码。 通过 AIDL&#xff0c;可以在一个进程中获取另一个进程的数据…

开源博客项目Blog .NET Core源码学习(16:App.Hosting项目结构分析-4)

本文学习并分析App.Hosting项目中前台页面的文章专栏页面和文章详情页面。< 文章专栏页面 文章专栏页面总体上为左右布局&#xff0c;左侧显示文章列表&#xff0c;右侧从上向下为关键词搜索、分类导航、热门文章等内容。整个页面使用了layui中的面包屑导航、表单、模版、流…

MySQL数据库外键约束打开与关闭 ️

MySQL数据库外键约束打开与关闭 &#x1f6e0;️ MySQL数据库外键约束打开与关闭 &#x1f6e0;️摘要 &#x1f4dd;引言 &#x1f680;正文内容&#xff08;详细介绍&#xff09; &#x1f4a1;关闭外键约束检查外键约束检查关闭的作用风险与最佳实践建议 &#x1f914; QA环…

使用Spring进行文件的上传和下载

概览 使用Spring进行文件的上传和下载Spring上传文件接口设计dubbo接口设计上传文件流的RPC的接口设计 Spring文件下载接口设计dubbo接口设计下载文件流的RPC的接口设计 spring上传文件大小控制 使用Spring进行文件的上传和下载 本文主要介绍在Spring框架下面调用微服务的dubb…

LeetCode36: 有效的数独(Java)

题目&#xff1a; 请你判断一个 9 x 9 的数独是否有效。只需要 根据以下规则 &#xff0c;验证已经填入的数字是否有效即可。 数字 1-9 在每一行只能出现一次。数字 1-9 在每一列只能出现一次。数字 1-9 在每一个以粗实线分隔的 3x3 宫内只能出现一次。&#xff08;请参考示例…

一次Ambari安装记录

引言 Ambari是一个开源的Apache项目,它提供了一个直观易用的Web界面,用于管理、监控和配置Apache Hadoop集群。它是一个集群管理工具,可以帮助管理员轻松地部署、管理和监控Hadoop集群的各种组件,如HDFS、YARN、MapReduce、Hive、HBase等。通过Ambari,用户可以在集群中添…

OerOerlikonTCO1200欧瑞康LPCVD system操作使用说明

OerOerlikonTCO1200欧瑞康LPCVD system操作使用说明

DQ-DETR: DETR WITH DYNAMIC QUERY FOR TINY OBJECTDETECTION 学习笔记

论文地址&#xff1a;https://arxiv.org/pdf/2404.03507.pdf 此DQ-DETR与IDEA提出的同名&#xff0c;该文主要集中于小目标的检测 尽管之前的类似DETR的方法在通用目标检测中取得了成功&#xff0c;但在小目标检测方面仍然具有挑战性&#xff0c;因为目标 Query 的位置信息并未…

LWIP开启ARP之后进入硬件错误中断

遇到个很奇怪的问题&#xff0c;如下图只要开启ARP之后&#xff0c;就会进入硬件错误中断&#xff0c;关掉就好了。 而无法开启ARP&#xff0c;就不能ping 通&#xff0c;所以必须要解决这个问题。 最终debug发现死在memcpy函数位置 这样原因就很好分析了&#xff0c; 共4个拷…

通过linux工具iftop命令查看视频监控平台是否收到监控摄像头的视频流(视频监控平台接收和转发的视频流)

目录 一、需求描述 二、解决思路 &#xff08;一&#xff09;问题分析 &#xff08;二&#xff09;解决思路 1、通过抓包的方式 2、通过一些linux的网络监视工具 三、需求实现 &#xff08;一&#xff09;抓包工具 1、tcpdump 2、Wireshark 3、tcptrace &#xff0…