Zookeeper 能保证数据的强一致性,用户任何时候都可以相信集群中每个节点的数据都是相同的。一个用户创建一个节点作为锁,另一个用户检测该节点,如果存在,代表别的用户已经锁住,如果不存在,则可以创建一个节点,代表拥有一个锁。
~
本篇内容包括:Demo 概述、代码实现、测试结果
文章目录
- 一、Demo 概述
- 1、关于 zookeeper “命名服务协调”
- 2、Demo 设计
- 3、Demo 前提
- 二、代码实现
- 1、引用 Maven 依赖
- 2、ConnectionWatcher 类创建 Zookeeper 连接
- 3、ActiveKeyValueStore 类读写 Zookeeper 数据
- 4、ZkLock 类实现分布式锁
- 三、测试结果
一、Demo 概述
1、关于 zookeeper “命名服务协调”
Zookeeper 能保证数据的强一致性,用户任何时候都可以相信集群中每个节点的数据都是相同的。一个用户创建一个节点作为锁,另一个用户检测该节点,如果存在,代表别的用户已经锁住,如果不存在,则可以创建一个节点,代表拥有一个锁。
2、Demo 设计
分布式锁本质,就是多个资源竞争者对一份资源的排他占有
- 我们设置多个线程,分别在同一 path 下创建节点
- 没个线程获取当前 path 下子节点,看最小子节点是否为自身,是则加锁成功(更好的方式是用 Watcher 对前一个地址监控,这里图方便用子节点排序取最小的方式 )
- 线程加锁成功后,执行任务,执行完毕后解锁
3、Demo 前提
参考:Mac通过Docker安装Zookeeper集群
二、代码实现
1、引用 Maven 依赖
<!-- 选择对应的Zookeeper版本 --><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.7.0</version></dependency>
2、ConnectionWatcher 类创建 Zookeeper 连接
import java.io.IOException;
import java.util.concurrent.CountDownLatch;import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;public class ConnectionWatcher implements Watcher {private final CountDownLatch connectedSignal = new CountDownLatch(1);private static final int SESSION_TIMEOUT = 5000;protected ZooKeeper zk;public void connect(String hosts) throws IOException, InterruptedException {zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this);connectedSignal.await();}@Overridepublic void process(WatchedEvent event) {if (event.getState() == Event.KeeperState.SyncConnected) {connectedSignal.countDown();}}public void close() throws InterruptedException {zk.close();}}
3、ActiveKeyValueStore 类读写 Zookeeper 数据
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.List;import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;public class ActiveKeyValueStore extends ConnectionWatcher {private static final Charset CHARSET = StandardCharsets.UTF_8;int state = 0;/*** 写入节点数据** @param path 节点地址* @param value 数据值* @throws InterruptedException 中断异常* @throws KeeperException ZooKeeper异常*/public void write(String path, String value) throws InterruptedException, KeeperException {Stat stat = zk.exists(path, false);if (stat == null) {if (value == null) {zk.create(path, null,ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);} else {zk.create(path, value.getBytes(CHARSET),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}} else {if (value == null) {zk.setData(path, null, -1);} else {zk.setData(path, value.getBytes(CHARSET), -1);}}}public boolean lock(String path, String name) throws InterruptedException, KeeperException {boolean flag = tryLock(path, name);if (flag) {state++;}return flag;}public boolean tryLock(String path, String name) throws InterruptedException, KeeperException {String lockPath = path + "/" + name;zk.create(lockPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);List<String> waits = readChildren(path, null);Collections.sort(waits);if (waits.get(0).equals(name)) {return true;}CountDownLatch latch = new CountDownLatch(1);for (int i = 0; i < waits.size(); i++) {String cur = waits.get(i);if (!cur.equalsIgnoreCase(name)) {continue;}String prePath = path + "/" + waits.get(i - 1);zk.exists(prePath, new Watcher() {@Overridepublic void process(WatchedEvent event) {latch.countDown();}});break;}latch.await();return true;}public boolean unlock(String path, String name) {if (state > 1) {state--;return true;}String lockPath = path + "/" + name;try {Stat stat = zk.exists(lockPath, false);int version = stat.getVersion();zk.delete(lockPath, version);state--;return true;} catch (Exception e) {System.out.println("unlock:" + lockPath + " ,exception,");}return false;}/*** 获取所有子节点** @param path 节点地址* @param watcher watcher* @return 所有子节点* @throws InterruptedException 中断异常* @throws KeeperException ZooKeeper异常*/public List<String> readChildren(String path, Watcher watcher) throws InterruptedException, KeeperException {List<String> childrens = null;if (watcher == null) {childrens = zk.getChildren(path, false);} else {childrens = zk.getChildren(path, watcher, null);}return childrens;}
}
4、ZkLock 类实现分布式锁
import lombok.SneakyThrows;
import org.apache.zookeeper.KeeperException;import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;public class ZkLock {/*** 开启的线程数,模拟多客户端操作*/private static final int CLIENTS_NUM = 3;private final ActiveKeyValueStore store;public ZkLock(String hosts) throws IOException, InterruptedException {//定义一个类store = new ActiveKeyValueStore();//连接Zookeeperstore.connect(hosts);}public static void testLock() {//线程计数器控制业务的执行final CountDownLatch countDownLatch = new CountDownLatch(CLIENTS_NUM);for (int i = 0; i < CLIENTS_NUM; i++) {new Thread() {@Overridepublic void run() {}}.start();}try {// 堵塞线程,任务执行完后释放countDownLatch.await();} catch (InterruptedException e) {e.printStackTrace();}}public static void main(String[] args) throws IOException, InterruptedException, KeeperException {String hosts = "localhost:2181";ZkLock zkLock = new ZkLock(hosts);// 创建父节点zkLock.store.write("/lock4", "父亲节点");//CountDownLatch latch = new CountDownLatch(CLIENTS_NUM);for (int i = 0; i < CLIENTS_NUM; i++) {int finalI = i;new Thread() {@SneakyThrows@Overridepublic void run() {String name = "Thread-" + String.valueOf(finalI);zkLock.store.lock("/lock4", name);TimeUnit.SECONDS.sleep(2);System.out.println("线程-" + name + "执行完毕");latch.countDown();zkLock.store.unlock("/lock4", name);}}.start();}latch.await();System.out.println("end ...");}}
三、测试结果
ZkLock 代码测试结果如下:
线程-Thread-0执行完毕
线程-Thread-1执行完毕
线程-Thread-2执行完毕
end ...
通过 ZkLock 打印的信息可以看出,已经成功模拟实现分布式锁