前言
我之前写了一篇快速上手ZK的文章:https://blog.csdn.net/qq_38974073/article/details/135293106
本篇最要是进一步加深学习ZK,算是一次简单的实践,巩固学习成果。
设计一个分布式锁
对锁的基本要求
- 可重入:允许同一个应用内的同一个线程重复调用同一个方法;
- 阻塞:没有拿到锁的线程将进入阻塞。
- 公平的:先来先得。
实现原理
使用zk作为发号器,每个线程申请锁时会创建一个临时有序节点:
- 节点编号最小的获得锁,完成业务操作之后删除临时节点;
- 如果不是最小编号的节点,就监听前一个节点的删除事件,并进入阻塞状态,当触发回调的事件时,唤醒阻塞线程,并重新进行获取锁操作。
锁要求实现的描述:
- 可重入:对同一个线程,不用重复获取锁,重入计数+1即可;
- 阻塞:利用
CountDownLatch
实现,当触发回调时唤醒线程; - 公平的:利用zk临时有序节点的特点进行排队,先到先申请锁。
问:申请到锁之后,网络中断怎么办?
- 临时节点随客户端关闭而被删除
问:如何避免羊群效应?
- 每个线程只监听前一个节点
关键流程
关键代码实现
锁的关键方法:
- 加锁:lock
- 解锁:unLock
- 尝试加锁:tryLock
public boolean lock() {if(Thread.currentThread().equals(thread)) {lockCount.incrementAndGet();return true;}while (true) {if (tryLock()) {thread = Thread.currentThread();lockCount.incrementAndGet();return true;}try {await();} catch (Exception e) {throw new RuntimeException(e);}}
}public synchronized boolean unlock() {if (!thread.equals(Thread.currentThread())) {return false;}int newLockCount = lockCount.decrementAndGet();if (newLockCount < 0) {throw new IllegalMonitorStateException("重入锁计数不可为负数" );}// 是否剩余重入次数if (newLockCount != 0) {return true;}// 到这一步,意味着lockCount已经为0,可以删除临时节点了try{if(client.isNodeExist(properties.getZkPath())) {client.deleteNode(lockedPathMap.get(thread));}} catch (Exception e) {return false;} finally {lockedPathMap.remove(thread);priorPathMap.remove(thread);}return true;
}protected boolean tryLock() {String lockedPath = lockedPathMap.get(Thread.currentThread());if (null == lockedPath || !client.isNodeExist(lockedPath)) {lockedPathMap.put(Thread.currentThread(), lockedPath = client.createEphemeralSeqNode(getLockPrefix()));}// 取得加锁的排队编号String lockedShortPath = getShorPath(lockedPath);List<String> waiters = getWaiters();// 如果自己是所有等待锁中的第一个,则获得锁if (checkLocked(waiters, lockedShortPath)) {return true;}// 当前线程节点是否在排队int index = Collections.binarySearch(waiters, lockedShortPath);if(index < 0) {throw new NullPointerException("可能网络抖动,连接断开,临时节点失效");}// waiters最后面的节点写入map,用来监听priorPathMap.put(Thread.currentThread(), getLockPrefix() + waiters.get(index - 1));return false;
}private boolean await() throws Exception {String priorPath = priorPathMap.get(Thread.currentThread());if (null == priorPath) {throw new NullPointerException("prior_path error");}final CountDownLatch latch = new CountDownLatch(1);// 删除事件Watcher w = watchedEvent -> {// 监测到前一个节点发生变化,接下来就可以唤起等待线程,重新尝试获取锁latch.countDown();};try{// 监听前一个节点的删除时间client.watcher(w, priorPath);} catch (KeeperException.NoNodeException e) {e.printStackTrace();return false;}return latch.await(properties.getTimeout(), TimeUnit.MILLISECONDS);
}
好了,如果你对这个感兴趣,不妨拉一下完整源码: https://gitee.com/liangshij/zk-lock-demo
源码简要说明
模块说明
- lsj-zk-lock:核心实现。
- lsj-zk-lock-spring-boot-starter:整合springboot
- lsj-zk-lock-test:使用demo
安装
经典三步走:导包、配置、使用
- 拉取代码,将lsj-zk-lock、lsj-zk-lock-spring-boot-starter通过 mvn install 命令安装到本地仓库。
- 引入依赖:
<dependency><groupId>cn.lsj</groupId><artifactId>lsj-zk-lock-spring-boot-starter</artifactId><version>2.4.2</version>
</dependency>
配置
- 配置locks和dataSource:
spring:zk:dataSource:url: "localhost"port: 2181locks:- zkPath: "/test/lock"lockName: "countLock"# 获取锁失败时,进入等待的时间,等待结束将重新尝试获取锁timeout: 5000- zkPath: "/test2/lock"lockName: "lock"timeout: 5000
使用
- 使用方式1:通过@GlobalLock注解,指定要使用那个lock
@GetMapping("test2")
@GlobalLock("countLock")
public String test2() {// 业务代码return "";
}
- 使用方式2:通过@Qualifier注解,指定要使用那个lock
@RestController
public class TestController {int count = 0;@Resource@Qualifier("lock")private ReentrantLock lock;@Resource@Qualifier("countLock")private ReentrantLock countLock;@GetMapping("test")public String test() {countLock.lock();try{for (int i = 0; i < 10000; i++) {count++;}} finally {countLock.unlock();}return String.valueOf(count);}
}