zookeeper全系列学习之分布式锁实现

文章目录

  • 前言
  • 一、分布式锁的通用实现思路
  • 二、ZK实现分布式锁的思路
  • 三、ZK实现分布式锁的编码实现
    • 1、核心工具类实现
    • 2、测试代码编写
      • 线程安全问题复现
      • 使用上面封装的`ZkLockHelper`实现的分布式锁
    • 优点
    • 缺点
  • 总结

前言

就像上篇文章zookeeper全系列学习之统一配置获取说的,有了naocs谁还用zk做配置中心呢一样,现在项目中用zk实现分布式锁的估计也很少了,但是我认为它其实是有存在的价值的,因为它的临时顺序节点的特点,当客户端不可用时他能及时识别从而避免客户端开线程去主动删除,无论是为了学习还是工作亦或是为了拓展知识面,我们还是了解下为好,下面开始正文


一、分布式锁的通用实现思路

分布式锁的概念以及常规解决方案可以参考之前的博客:聊聊分布式锁的解决方案;今天我们先分析下分布式锁的实现思路;

  • 首先,需要保证唯一性,即某一时点只能有一个线程访问某一资源;比方说待办短信通知功能,每天早上九点短信提醒所有工单的处理人处理工单,假设服务部署了20个容器,那么早上九点的时候会有20个线程启动准备发送短信,此时我们只能让一个线程执行短信发送,否则用户会收到20条相同的短信;
  • 其次,需要考虑下何时应该释放锁?这又分三种情况,一是拿到锁的线程正常结束,另一种是获取锁的线程异常退出,还有种是获取锁的线程一直阻塞;第一种情况直接释放即可,第二种情况可以通过定义下锁的过期时间然后通过定时任务去释放锁;zk的话直接通过临时节点即可;最后一种阻塞的情况也可以通过定时任务来释放,但是需要根据业务来综合判断,如果业务本身就是长时间耗时的操作那么锁的过期时间就得设置的久一点
  • 最后,当拿到锁的线程释放锁的时候,如何通知其他线程可以抢锁了呢
    这里简单介绍两种解决方案,一种是所有需要锁的线程主动轮询,固定时间去访问下看锁是否释放,但是这种方案无端增加服务器压力并且时效性无法保证;另一种就是zk的watch,监听锁所在的目录,一有变化立马得到通知

二、ZK实现分布式锁的思路

  • zk通过每个线程在同一父目录下创建临时有序节点,然后通过比较节点的id大小来实现分布式锁功能;再通过zk的watch机制实时获取节点的状态,如果被删除立即重新争抢锁;具体流程见线图:

提示:需要关注下图里判断自身不是最小节点时的监听情况,为什么不监听父节点?原因图里已有描述,这里就不再赘述

三、ZK实现分布式锁的编码实现

1、核心工具类实现

通过不断的调试,我封装了一个ZkLockHelper类,里面封装了上锁和释放锁的方法,为了方便我将zk的一些监听和回调机智也融合到一起了,并没有抽出来,下面贴上该类的全部代码

package com.darling.service.zookeeper.lock;import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.platform.commons.util.StringUtils;import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;/*** @description:* @author: dll* @date: Created in 2022/11/4 8:41* @version:* @modified By:*/
@Data
@Slf4j
public class ZkLockHelper implements AsyncCallback.StringCallback, AsyncCallback.StatCallback,Watcher, AsyncCallback.ChildrenCallback {private final String lockPath = "/lockItem";ZooKeeper zkClient;String threadName;CountDownLatch cd = new CountDownLatch(1);private String pathName;/*** 上锁*/public void tryLock() {try {log.info("线程:{}正在创建节点",threadName);zkClient.create(lockPath,(threadName).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,this,"AAA");log.info("线程:{}正在阻塞......",threadName);// 由于上面是异步创建所以这里需要阻塞住当前线程cd.await();} catch (InterruptedException e) {e.printStackTrace();}}/*** 释放锁*/public void unLock() {try {zkClient.delete(pathName,-1);System.out.println(threadName + " 工作结束....");} catch (Exception e) {e.printStackTrace();}}/*** create方法的回调,创建成功后在此处获取/DCSLock的子目录,比较节点ID是否最小,是则拿到锁。。。* @param rc        状态码* @param path      create方法的path入参* @param ctx       create方法的上下文入参* @param name      创建成功的临时有序节点的名称,即在path的后面加上了zk维护的自增ID;*                  注意如果创建的不是有序节点,那么此处的name和path的内容一致*/@Overridepublic void processResult(int rc, String path, Object ctx, String name) {log.info(">>>>>>>>>>>>>>>>>processResult,rx:{},path:{},ctx:{},name:{}",rc,path,ctx.toString(),name);if (StringUtils.isNotBlank(name)) {try {pathName =  name ;// 此处path需注意要写/zkClient.getChildren("/", false,this,"123");
//                List<String> children = zkClient.getChildren("/", false);
//                log.info(">>>>>threadName:{},children:{}",threadName,children);
//                // 给children排序
//                Collections.sort(children);
//                int i = children.indexOf(pathName.substring(1));
//                // 判断自身是否第一个
//                if (Objects.equals(i,0)) {
//                    // 是第一个则表示抢到了锁
//                    log.info("线程{}抢到了锁",threadName);
//                    cd.countDown();
//                }else {
//                    // 表示没抢到锁
//                    log.info("线程{}抢锁失败,重新注册监听器",threadName);
//                    zkClient.exists("/"+children.get(i-1),this,this,"AAA");
//                }} catch (Exception e) {e.printStackTrace();}}}/*** exists方法的回调,此处暂不做处理* @param rc* @param path* @param ctx* @param stat*/@Overridepublic void processResult(int rc, String path, Object ctx, Stat stat) {}/*** exists的watch监听* @param event*/@Overridepublic void process(WatchedEvent event) {//如果第一个线程锁释放了,等价于第一个线程删除了节点,此时只有第二个线程会监控的到switch (event.getType()) {case None:break;case NodeCreated:break;case NodeDeleted:zkClient.getChildren("/", false,this,"123");
//                // 此处path需注意要写"/"
//                List<String> children = null;
//                try {
//                    children = zkClient.getChildren("/", false);
//                } catch (KeeperException e) {
//                    e.printStackTrace();
//                } catch (InterruptedException e) {
//                    e.printStackTrace();
//                }
//                log.info(">>>>>threadName:{},children:{}",threadName,children);
//                // 给children排序
//                Collections.sort(children);
//                int i = children.indexOf(pathName.substring(1));
//                // 判断自身是否第一个
//                if (Objects.equals(i,0)) {
//                    // 是第一个则表示抢到了锁
//                    log.info("线程{}抢到了锁",threadName);
//                    cd.countDown();
//                }else {
//                    /**
//                     *  表示没抢到锁;需要判断前置节点存不存在,其实这里并不是特别关心前置节点存不存在,所以其回调可以不处理;
//                     *  但是这里关注的前置节点的监听,当前置节点监听到被删除时就是其他线程抢锁之时
//                     */
//                    zkClient.exists("/"+children.get(i-1),this,this,"AAA");
//                }break;case NodeDataChanged:break;case NodeChildrenChanged:break;}}/*** getChildren方法的回调* @param rc* @param path* @param ctx* @param children*/@Overridepublic void processResult(int rc, String path, Object ctx, List<String> children) {try {log.info(">>>>>threadName:{},children:{}", threadName, children);if (Objects.isNull(children)) {return;}// 给children排序Collections.sort(children);int i = children.indexOf(pathName.substring(1));// 判断自身是否第一个if (Objects.equals(i, 0)) {// 是第一个则表示抢到了锁log.info("线程{}抢到了锁", threadName);cd.countDown();} else {// 表示没抢到锁log.info("线程{}抢锁失败,重新注册监听器", threadName);/***  表示没抢到锁;需要判断前置节点存不存在,其实这里并不是特别关心前置节点存不存在,所以其回调可以不处理;*  但是这里关注的前置节点的监听,当前置节点监听到被删除时就是其他线程抢锁之时*/zkClient.exists("/" + children.get(i - 1), this, this, "AAA");}} catch (Exception e) {e.printStackTrace();}}
}

提示:代码中注释的代码块可以关注下,原本是直接阻塞式编程,将获取所有子节点并释放锁的操作直接写在getChildren方法的回调里,后来发现当节点被删除时我们还要重新抢锁,那么代码就冗余了,于是结合响应式编程的思想,将这段核心代码放到getChildren方法的回调里,这样代码简洁了并且可以让业务更只关注于getChildren这件事了

2、测试代码编写

线程安全问题复现

package com.darling.service.zookeeper.lock;import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;/*** @description:  开启是个线程给i做递减操作,未加锁的情况下会有线程安全问题* @author: dll* @date: Created in 2022/11/8 8:32* @version:* @modified By:*/
@Slf4j
public class ZkLockTest02 {private int i = 10;@Testpublic void test() throws InterruptedException {for (int n = 0; n < 10; n++) {new Thread(new Runnable() {@SneakyThrows@Overridepublic void run() {Thread.sleep(100);incre();}}).start();}Thread.sleep(5000);log.info("i = {}",i);}/*** i递减 线程不安全*/public void incre(){
//        i.incrementAndGet();log.info("当前线程:{},i = {}",Thread.currentThread().getName(),i--);}
}
  • 上面代码运行结果如下:

使用上面封装的ZkLockHelper实现的分布式锁

package com.darling.service.zookeeper.lock;import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;/*** @description: 使用zk实现的分布式锁解决线程安全问题* @author: dll* @date: Created in 2022/11/8 8:32* @version:* @modified By:*/
@Slf4j
public class ZkLockTest03 {ZooKeeper zkClient;@Beforepublic void conn (){zkClient  = ZkUtil.getZkClient();}@Afterpublic void close (){try {zkClient.close();} catch (InterruptedException e) {e.printStackTrace();}}private int i = 10;@Testpublic void test() throws InterruptedException {for (int n = 0; n < 10; n++) {new Thread(new Runnable() {@SneakyThrows@Overridepublic void run() {Thread.sleep(100);ZkLockHelper zkHelper = new ZkLockHelper();// 这里给zkHelper设置threadName是为了后续调试的时候日志打印,便于观察存在的问题String threadName = Thread.currentThread().getName();zkHelper.setThreadName(threadName);zkHelper.setZkClient(zkClient);// tryLock上锁zkHelper.tryLock();incre();log.info("线程{}正在执行业务代码...",threadName);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}// 释放锁zkHelper.unLock();}}).start();}while (true) {}}/*** i递减 线程不安全*/public void incre(){
//        i.incrementAndGet();log.info("☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆当前线程:{},i = {}",Thread.currentThread().getName(),i--);}
}
  • 运行结果如下:
    哈哈

由于日志中掺杂着zk的日志所有此处并未截全,但是也能看到i是在按规律递减的,不会出现通过线程拿到相同值的情况
#四、zk实现分布式锁的优缺点

优点

  • 集群部署不存在单点故障问题
  • 统一视图
    zk集群每个节点对外提供的数据是一致的,数据一致性有所报障
  • 临时有序节点
    zk提供临时有序节点,这样当客户端失去连接时会自动释放锁,不用像其他方案一样当拿到锁的实例服务不可用时,需要定时任务去删除锁;临时节点的特性就是当客户端失去连接会自动删除
  • watch能力加持
    当获取不到锁时,无需客户端定期轮询争抢,只需watch前一节点即可,当有变化时会及时通知,比普通方案即及时又高效;注意这里最好只watch前一节点,如果watch整个父目录的话,当客户端并发较大时会不断有请求进出zk,给zk性能带来压力

缺点

  • 与单机版redis比较的话性能肯定较差,但是当客户端集群足够庞大且业务量足够多时肯定还是集群更加稳定
  • 极端情况下还是会出现多个线程抢到同一把锁的问题;假设某个线程拿到锁后还没执行业务代码就进入长时间的垃圾收集STW了,此时与zk的连接也会消失;然后此时别的线程的watch会被触发从而抢到锁去执行了,但是当stw的线程恢复过来时继续执行自身的业务代码,此时就会出现不一致的问题了;当然,个人认为这种设想太过极端了,毕竟如果stw时间过长肯定会影响整个集群的性能的,所以我感觉可以不必考虑,真的要解决那么再加上mysql乐观锁吧;

总结

好了,zk实现分布式锁的编码实现就到这了,后续有时间再写redis、数据库实现的,其实思路缕清了,编码实现还是相对简单的

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/884055.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Uni-App-02

条件编译 条件编译概念 不同的运行平台终归有些专有的特性&#xff0c;无法实现跨平台完全兼容&#xff0c;例如&#xff1a;微信小程序导航栏右上角的关闭图标。 uni-app提供了一种“条件编译”机制&#xff0c;可以针对特定的平台编译执行特定的代码&#xff0c;否则不执行。…

【ArcGISPro】制作简单的ArcGISPro-AI助手

【python】AI Navigator的使用及搭建本机大模型_anaconda ai navigator-CSDN博客 【Python】AI Navigator对话流式输出_ai大模型流式输出效果(打字效果) python-CSDN博客 【C#】调用本机AI大模型流式返回_怎么实现调用本地大模型时实现流式输出-CSDN博客 【ArcGISPro】宣布推…

springboot 修复 Spring Framework 特定条件下目录遍历漏洞(CVE-2024-38819)

刚解决Spring Framework 特定条件下目录遍历漏洞&#xff08;CVE-2024-38816&#xff09;没几天&#xff0c;又来一个新的&#xff0c;真是哭笑不得啊。 springboot 修复 Spring Framework 特定条件下目录遍历漏洞&#xff08;CVE-2024-38816&#xff09;https://blog.csdn.ne…

可编辑97页PPT | 制造企业数字化转型战略咨询及IT总体规划方案

荐言分享&#xff1a;制造企业数字化转型是当前市场环境下的必然趋势&#xff0c;旨在通过引入先进的信息技术&#xff0c;优化业务流程&#xff0c;提升运营效率&#xff0c;增强客户体验&#xff0c;实现可持续发展。这一转型过程涉及多个方面&#xff0c;需要综合考虑企业战…

计算机视觉中的点算子:从零开始构建

Hey小伙伴们&#xff01;今天我们要聊的是一个非常基础但极其重要的计算机视觉技术——点算子&#xff08;Point Operators&#xff09;。点算子主要用于对图像的每个像素进行独立的处理&#xff0c;比如亮度调整、对比度增强、灰度化等。通过这些简单的操作&#xff0c;我们可…

001-Kotlin界面开发之Jetpack Compose Desktop学习路径

Compose Desktop学习之路 学习过程 理解Kotlin的基本语法 Compose Desktop采用Kotlin构建&#xff0c;因此对Kotlin的基本语法有很好的理解是必不可少的。你可以从官方的Kotlin文档开始。 用一句话概括&#xff0c;Kotlin是一种现代的、静态类型的编程语言&#xff0c;它结合…

小金标认证的头戴式蓝牙,QCY H3 Pro耳机,平价高音质的新选择

我发现如今市面上百元级的头戴式无线耳机都有非常出色的音质表现了&#xff0c;这其中国产品牌的表现尤为亮眼&#xff0c;与入耳式耳机相比&#xff0c;头戴式耳机拥有更大的发音单元和更包裹耳朵的耳罩设计&#xff0c;提供了更舒适稳固的佩戴体验&#xff0c;在音质和降噪效…

方法+数组

1. 方法 1. 什么是方法 方法定义&#xff1a; // []表示可写可不写[public] [static] type name ( [type formal , type formal , ...]){方法体&#xff1b;[return value ;] }[修饰符] 返回值类型 方法名称([参数类型 形参 , 参数类型 形参 ...]){方法体代码;[return 返回值…

大语言模型(LLM)入门级选手初学教程 II

模型架构 5.1 输入编码&#xff1a; i. 词元序列编码 Input Embedding Module ii. 位置编码&#xff08;Position Embedding, PE)&#xff0c;Transformer 的编码器结构本身无法识别序列中元素的顺序。 &#x1d499;&#x1d461; &#x1d497;&#x1d461; &#x1d491…

【深度学习】实验 — 动手实现 GPT【二】:注意力机制、注意力掩码、多头注意力机制

【深度学习】实验 — 动手实现 GPT【二】&#xff1a;注意力机制、多头注意力机制 注意力机制简单示例&#xff1a;单个元素的情况简单示例&#xff1a;计算所有输入词元的注意力权重推广到所有输入序列词元&#xff1a; 注意力掩码代码实现多头注意力测试 注意力机制 简单示例…

数据库(31)——事务

事务 数据库事务&#xff08;Database Transaction&#xff09;是数据库管理系统&#xff08;DBMS&#xff09;中执行的一组逻辑操作单元&#xff0c;这些操作要么全部成功执行&#xff0c;要么全部不执行&#xff0c;以保持数据的一致性和完整性。事务是确保数据可靠性的重要机…

Android 获取OAID

获取OAID 老规矩&#xff0c;直接上&#xff1a; implementation com.huawei.hms:opendevice:6.11.0.300 // 要获取华为vaid 和aaid&#xff0c;还需添加opendevice 依赖implementation(name: oaid_sdk_2.5.0, ext: aar) import android.content.Context; import android.util.…

每日互动基于 Apache DolphinScheduler 从容应对ClickHouse 大数据入库瓶颈

引言 大家好&#xff0c;我叫张琦&#xff0c;来自每日互动&#xff0c;担任大数据平台架构师。今天我将分享我们团队在基于Apache DolphinScheduler实现ClickHouse零压入库过程中的实践经验。 这个实践项目涉及到两个关键组件&#xff1a;Apache DolphinScheduler和ClickHous…

[vulnhub] Brainpan1

https://www.vulnhub.com/entry/brainpan-1,51/ 主机发现端口扫描 使用nmap扫描网段类存活主机 因为靶机是我最后添加的&#xff0c;所以靶机IP是166 nmap -sP 192.168.75.0/24 Starting Nmap 7.94SVN ( https://nmap.org ) at 2024-1…

数学建模与优化算法:从基础理论到实际应用

数学建模和优化算法&#xff0c;它们不仅帮助我们理解和描述复杂系统的行为&#xff0c;还能找到系统性能最优化的解决方案。本文将从基础的数学理论出发&#xff0c;逐步深入到各种优化算法&#xff0c;并探讨它们在实际问题中的应用。 思维导图文件可获取&#xff1a;https:…

基于Intel Gaudi AI加速器的大语言模型微调与推理优化赛题等你挑战 | CCF BDCI进行时

一年一度的行业盛事2024 CCF大数据与计算智能大赛&#xff08;简称2024 CCF BDCI&#xff09;又在激烈进行中啦&#xff01; 多个赛题等你挑战还没有报名的伙伴们抓紧时间咯&#xff0c;叫上你伙伴练起来吧&#xff01; 2024 CCF大数据与计算智能大赛 CCF大数据与计算智能大…

使用 FastGPT 工作流搭建 GitHub Issues 自动总结机器人

如今任何项目开发节奏都很快&#xff0c;及时掌握项目动态是很重要滴&#xff0c;GitHub Issues 一般都是开发者和用户反馈问题的主要渠道。 然而&#xff0c;随着 Issue 数量的增加&#xff0c;及时跟进每一个问题会变得越来越困难。 为了解决这个痛点&#xff0c;我们开发了…

Unreal Engine 5 C++(C#)开发:使用蓝图库实现插件(一)认识和了解Build.cs

目录 引言 一、创建一个C插件TextureReader插件 二、Build.cs文件 三、ModuleRules 四、TextureReader插件的构造 4.1ReadOnlyTargetRules的作用 4.2TextureReaderd的构造调用 4.3设置当前类的预编译头文件的使用模式 4.4PublicIncludePaths.AddRange与PrivateInclude…

探索C嘎嘎:初步接触STL

#1024程序员节&#xff5c;征文# 前言&#xff1a; 在前文小编讲述了模版初阶&#xff0c;其实讲述模版就是为了给讲STL提前铺垫&#xff0c;STL是C中很重要的一部分&#xff0c;各位读者朋友要知道它的份量&#xff0c;下面废话不多说&#xff0c;开始走进STL的世界。 目录&am…

指令系统 I(指令的格式、寻址)

一、指令系统 1. 指令集体系结构 指令&#xff08;机器指令&#xff09;是指示计算机执行某种操作的命令&#xff0c;是计算机运行的最小功能单位。一台计算机的所有指令的集合构成该机的指令系统&#xff0c;也称指令集。 指令系统是指令集体系结构&#xff08;ISA&#xf…