Lock
public interface Lock {public void lock();public void unlock();
}
ZkLock
public class ZkLock implements Lock{private CuratorFramework client;private final String zkPath;private Integer count; private final String subNodePathPrefix;private String lockedPath;private String preNodePath;private String subShortPath;private Thread thread;public ZkLock(CuratorFramework client, String zkPath) throws Exception {this.client = client;this.zkPath = zkPath;subNodePathPrefix = "node-";count = 0;init();}private void init() throws Exception {synchronized (ZkLock.class) {Stat stat;stat = client.checkExists().forPath(zkPath);if(stat == null)client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(zkPath);}}@Overridepublic void lock() {if (thread == Thread.currentThread()) { count += 1;return;}if(lockInternal()){thread = Thread.currentThread();count += 1;}}private boolean lockInternal() {try {boolean locked = tryLock(); if (locked)return true;while (!locked)locked = await();return true;} catch (Exception e) {e.printStackTrace();}return false;}private boolean tryLock() throws Exception {lockedPath = client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(zkPath + "/" + subNodePathPrefix);if (lockedPath == null){throw new Exception();}subShortPath = getShortPath(lockedPath);List<String> waiters = getWaiters();if(checkIsHeadNode(waiters))return true;int index = Collections.binarySearch(waiters, subShortPath);if (index < 0)throw new Exception();preNodePath = zkPath + "/" + waiters.get(index - 1);return false;}private String getShortPath(String path) {int index = path.lastIndexOf(zkPath + "/");if (index >= 0) {index += zkPath.length() + 1;return index <= path.length() ? path.substring(index) : "";}return null;}private boolean checkIsHeadNode(List<String> waiters) {Collections.sort(waiters);return subShortPath.equals(waiters.get(0));}private List<String> getWaiters() throws Exception {return client.getChildren().forPath(zkPath);}private boolean await() throws Exception {if (preNodePath == null)throw new Exception();CountDownLatch latch = new CountDownLatch(1);client.getData().usingWatcher((Watcher) watchedEvent -> latch.countDown()).forPath(preNodePath);latch.await();return true;}@Overridepublic void unlock() {if(!thread.equals(Thread.currentThread()))return;count -= 1;if (count > 0)return;try {client.delete().forPath(lockedPath);}catch (Exception e){e.printStackTrace();}}
}