zookeeper最初设计的初衷就是为了保证分布式系统的一致性。本文将讲解如何利用zookeeper的临时顺序结点,实现分布式锁。
目录
1. 理论分析
1.1 结点类型
1.2 监听器
1.3 实现原理
2. 手写实现简易zookeeper分布式锁
1.1 依赖
1.2 常量定义
1.3 实现zookeeper分布式锁
1.4 使用方式
3. 引入Curator框架实现zookeeper分布式锁
2.1 框架依赖
2.2 使用方式
1. 理论分析
zookeeper 和Linux一样,采用目录树的方式管理结点,目录层级间以 / 区分
每个数据节点在 ZooKeeper 中被称为 znode,它是 ZooKeeper 中数据的最小单元。由于ZooKeeper 主要用于协调服务,出于性能和一致性考虑,每个节点的存放数据上限为1M
1.1 结点类型
znode有四种类型:
1.持久化结点 (PERSISTENT): 创建节点后一直存在
2. 持久化有序结点(PERSISTENT_SEQUENTIAL):在持久化结点的基础上,zookeeper会自动根据创建顺序,在结点名称后面加上一串序号
3. 临时结点(EPHEMERAL):在zookeeper与客户端失去连接后自动删除
4. 临时有序结点(EPHEMERAL_SEQUENTIAL):在临时结点的基础上,zookeeper会自动根据创建顺序,在结点名称后面加上一串序号
1.2 监听器
Watcher 监听机制是 Zookeeper 中非常重要的特性。结点可以绑定监听事件,当监听事件发生的时候,Zookeeper会向客户端发送通知事件,执行监听器的回调方法。
1.3 实现原理
我们首先新建一个"/locks"的持久化结点,用来管理表示锁的子节点。(实际场景使用可以根据不同锁对象划分成更细致的持久化结点,比如"/locks/bilibili/comment/publish")
当用户尝试获取锁的时候,在"locks"结点下新建一个临时有序结点,例如"seq-00001"
新建结点成功后,系统进行检查,建立的结点是否是当前所有子节点中序号最小的一个
如果是最小的一个,说明用户是当前锁的持有者,往下执行业务逻辑,执行完成后摧毁临时结点
如果不是最小的一个,为了避免不断地自旋检查空耗性能,一般采用注册监听器的方式减少性能消耗:监听前一个结点的摧毁事件。如果用户持有的结点前面还有其他结点,说明用户不是持有的人,不能执行业务逻辑,应当阻塞等待;直到用户前一个结点被摧毁,说明轮到用户持有锁了,可以继续往下执行业务逻辑。
2. 手写实现简易zookeeper分布式锁
1.1 依赖
<dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>RELEASE</version><scope>test</scope></dependency><!--日志--><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>2.8.2</version></dependency><!--zookeeper--><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.5.6</version></dependency><dependency><groupId>org.junit.jupiter</groupId><artifactId>junit-jupiter</artifactId><version>RELEASE</version><scope>compile</scope></dependency>
1.2 常量定义
public interface ZkConstants {//连接地址String connectString = "127.0.0.1:2181";// 连接超时时间int sessionTimeout = 2000;
}
1.3 实现zookeeper分布式锁
public class DistributedLock {// zk客户端连接private ZooKeeper zkClient;// 连接成功等待private CountDownLatch connectLatch = new CountDownLatch(1);// 前一个结点(锁)private String waitPath;// 结点删除等待private CountDownLatch waitLatch = new CountDownLatch(1);// 当前创建的结点(锁)private String createNode;/*** 构造方法:初始化客户端连接** @throws IOException* @throws InterruptedException* @throws KeeperException*/public DistributedLock() throws IOException, InterruptedException, KeeperException {//获取连接zkClient = new ZooKeeper(ZkConstants.connectString, ZkConstants.sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {//连接成功,释放countDownLatchif (watchedEvent.getState() == Event.KeeperState.SyncConnected) {connectLatch.countDown();}//前一个结点删除if (watchedEvent.getType() == Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)) {//解锁下一个结点waitLatch.countDown();}}});//等待zk正常连接后,再往下执行connectLatch.await();//判断根节点/locks是否存在Stat exists = zkClient.exists("/locks", false);if (exists == null) {//创建根节点 -- 持久结点zkClient.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}}/*** 加锁** @throws InterruptedException* @throws KeeperException*/public void zkLock() throws InterruptedException, KeeperException {//创建对应的临时带序号结点createNode = zkClient.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);//判断创建节点是否是序号最小的结点List<String> children = zkClient.getChildren("/locks", false);if (children.size() == 1) {return;} else {//排序结点以得到当前创建结点的序号(等待锁的序位)Collections.sort(children);//获取生成的临时结点序号String thisNode = createNode.substring("/locks/".length());//获得排序int index = children.indexOf(thisNode);if (index == -1) {System.out.println("数据异常");} else if (index == 0) {//最小序号结点,直接获取锁return;} else {//监听序号前一个结点waitPath = "/locks/" + children.get(index - 1);//true代表使用创建zkClient时初始化的监听器zkClient.getData(waitPath, true, null);waitLatch.await();}}}/*** 解锁** @throws InterruptedException* @throws KeeperException*/public void zkUnLock() throws InterruptedException, KeeperException {//删除临时带序号结点zkClient.delete(createNode, -1);}}
1.4 使用方式
public class DistributedLockTest {public static void main(String[] args) throws IOException, InterruptedException, KeeperException {ExecutorService executorService = Executors.newFixedThreadPool(2);DistributedLock lock1 = new DistributedLock();DistributedLock lock2 = new DistributedLock();//多线程获取锁1CompletableFuture.supplyAsync(() -> {try {lock1.zkLock();System.out.println("线程" + Thread.currentThread().getName() + "获取到锁......");Thread.sleep(5000);lock1.zkUnLock();System.out.println("线程" + Thread.currentThread().getName() + "释放锁......");} catch (InterruptedException e) {throw new RuntimeException(e);} catch (KeeperException e) {throw new RuntimeException(e);}return true;}, executorService);//多线程获取锁2CompletableFuture.supplyAsync(() -> {try {lock2.zkLock();System.out.println("线程" + Thread.currentThread().getName() + "获取到锁......");Thread.sleep(5000);lock2.zkUnLock();System.out.println("线程" + Thread.currentThread().getName() + "释放锁......");} catch (InterruptedException e) {throw new RuntimeException(e);} catch (KeeperException e) {throw new RuntimeException(e);}return true;}, executorService);executorService.shutdown();}
}
3. 引入Curator框架实现zookeeper分布式锁
实际生产环境下,自然不可能手写这么多代码处理分布式锁,且不提很多地方的代码可复用,CountDownLatch反复处理带来的代码复杂性高,并且一些可重入锁、异常处理等逻辑上文也并没有完善。
生产场景中被广泛使用的zookeeper分布式锁的框架便是Curator
2.1 框架依赖
/..省略../<!--Curator--><dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>4.3.0</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>4.3.0</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-client</artifactId><version>4.3.0</version></dependency>
2.2 使用方式
public class CuratorLockTest {public static void main(String[] args) {//创建分布式锁1InterProcessLock lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");InterProcessLock lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");ExecutorService executorService = Executors.newFixedThreadPool(2);executorService.execute(() -> {try {lock1.acquire();System.out.println("线程1获取到锁");//curator支持可重入锁lock1.acquire();System.out.println("线程1 再次获取到锁");Thread.sleep(5000);lock1.release();System.out.println("线程1 释放锁");lock1.release();System.out.println("线程1 再次释放锁");} catch (Exception e) {throw new RuntimeException(e);}});executorService.execute(() -> {try {lock2.acquire();System.out.println("线程2获取到锁");lock2.acquire();System.out.println("线程2 再次获取到锁");Thread.sleep(5000);lock2.release();System.out.println("线程2 释放锁");lock2.release();System.out.println("线程2 再次释放锁");} catch (Exception e) {throw new RuntimeException(e);}});executorService.shutdown();}public static CuratorFramework getCuratorFramework() {//4秒超时,重试3次ExponentialBackoffRetry exponentialBackoffRetry = new ExponentialBackoffRetry(4000, 3);CuratorFramework client = CuratorFrameworkFactory.builder().connectString(ZkConstants.connectString).connectionTimeoutMs(ZkConstants.sessionTimeout).sessionTimeoutMs(ZkConstants.sessionTimeout).retryPolicy(exponentialBackoffRetry).build();client.start();System.out.println("zookeeper 启动成功...");return client;}
}
希望能对大家理解zookeeper分布式锁有所帮助