zookeeper版本:zookeeper-3.4.13,该版本原生api不支持递归创建节点
依赖
<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.4.13</version>
</dependency>
Zookeeper分布式锁编写
import org.apache.zookeeper.*;import java.io.IOException;
import java.util.concurrent.CountDownLatch;/*** zk实现分布式锁*/
public class ZkLock {// zk实例private ZooKeeper zooKeeper;// 计数器,启动一个线程private static final CountDownLatch downLatch = new CountDownLatch(1);/*** 加锁操作** @param id*/public void lock(Integer id) {// 节点路径String path = "/fluyi-product-lock-" + id;// 创建节点,一定要是临时节点,防止宕机无法释放节点try {zooKeeper.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);} catch (Exception e) {// 加锁失败,再次尝试while (true) {// 休眠5秒try {Thread.sleep(5000);} catch (Exception ex) {ex.printStackTrace();}// 进行重连try {zooKeeper.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);} catch (Exception ex) {continue;}break;}}}/*** 解锁操作** @param id*/public void unlock(Integer id) {String path = "/fluyi-product-lock-" + id;// 任何版本try {zooKeeper.delete(path, -1);} catch (Exception e) {e.printStackTrace();}}// 获取实例public static ZkLock getInstance() {return Sington.getInstance();}// 初始化private ZkLock() {try {zooKeeper = new ZooKeeper("xxx.xxx.xxx.xxx:2181", 50000, new zkWatch());try {downLatch.await();} catch (InterruptedException e) {e.printStackTrace();}System.out.println("建立连接=》" + zooKeeper.getState());} catch (IOException e) {e.printStackTrace();}}private static class zkWatch implements Watcher {@Overridepublic void process(WatchedEvent watchedEvent) {System.out.println("接受监听事件" + watchedEvent);// 进行连接if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {// 连接成功downLatch.countDown();}}}// 单例private static class Sington {private static ZkLock zkLock;static {zkLock = new ZkLock();}private static ZkLock getInstance() {return zkLock;}}
}
测试使用
public class UnSafeThread {private static int num = 0;private static ZkLock zkLock = ZkLock.getInstance();/*** 创建10个线程*/private static CountDownLatch countDownLatch = new CountDownLatch(10);public static void incr() {zkLock.lock(1);num++;System.out.println(num);zkLock.unlock(1);}public static void main(String[] args) {for (int i = 0; i < 10; i++) {new Thread(() -> {for (int j = 0; j < 100; j++) {incr();// 短暂休眠try {Thread.sleep(10);} catch (InterruptedException e) {throw new RuntimeException(e);}countDownLatch.countDown();}}).start();}while (true) {if (countDownLatch.getCount() == 0) {System.out.println(num);break;}}}
}