面向過程版:
package distributedLockProcess;import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class dl {private static final Logger LOG = LoggerFactory.getLogger(dl.class);//确保所有线程运行结束;private static final String CONNECTION_STRING = "192.168.1.201:2181,192.168.1.202:2181,192.168.1.203:2181";private static final int SESSION_TIMEOUT = 10000;private static final String GROUP_PATH = "/disLocks";private static final String SUB_PATH = "/disLocks/sub";private static final int THREAD_NUM = 10; public static CountDownLatch threadSemaphore = new CountDownLatch(THREAD_NUM);public static void main(String[] args) {for(int i=0; i < THREAD_NUM; i++){final int threadId = i;new Thread(){@Overridepublic void run() {final CountDownLatch countDownLatch = new CountDownLatch(1);try{//此線程連接ZooKeeperZooKeeper zk = new ZooKeeper(CONNECTION_STRING, SESSION_TIMEOUT, new Watcher(){@Overridepublic void process(WatchedEvent event){if (event.getState() == KeeperState.SyncConnected){countDownLatch.countDown();}}});countDownLatch.await();System.out.println(Thread.currentThread().getName() + " --- ZooKeeper.connect()");//GROUP_PATH不存在的话,由一个线程创建即可;if(zk.exists(GROUP_PATH, false)==null){LOG.info( Thread.currentThread().getName() + "节点创建成功, Path: "+ zk.create( GROUP_PATH,("该节点由线程"+Thread.currentThread().getName() + "创建").getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT )+ ", content: " + ("该节点由线程"+Thread.currentThread().getName() + "创建") );}String selfPath = zk.create(SUB_PATH,null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);LOG.info(Thread.currentThread().getName()+"创建锁路径:"+selfPath);if(checkMinPath(zk, selfPath)){LOG.info(Thread.currentThread().getName() + "获取锁成功,赶紧干活!");dosomething();threadSemaphore.countDown();try {if(zk.exists(selfPath,false) == null){LOG.error(Thread.currentThread().getName()+"本节点已不在了...");return;}zk.delete(selfPath, -1);LOG.info(Thread.currentThread().getName() + "删除本节点:"+selfPath);zk.close();} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (KeeperException e) {// TODO Auto-generated catch blocke.printStackTrace();}}} catch (Exception e){LOG.error("【第"+threadId+"个线程】 抛出的异常:");e.printStackTrace();}}}.start();}try {
// Thread.sleep(60000);threadSemaphore.await();LOG.info("所有线程运行结束!");} catch (Exception e) {e.printStackTrace();}}protected static boolean checkMinPath(final ZooKeeper zk, final String selfPath) throws KeeperException, InterruptedException {List<String> subNodes = zk.getChildren(GROUP_PATH, false);Collections.sort(subNodes);int index = subNodes.indexOf( selfPath.substring(GROUP_PATH.length()+1));switch (index){case -1:{LOG.error(Thread.currentThread().getName()+"本节点已不在了..."+selfPath);return false;}case 0:{LOG.info(Thread.currentThread().getName()+"子节点中,我果然是老大"+selfPath);return true;}default:{final String waitPath = GROUP_PATH +"/"+ subNodes.get(index - 1);LOG.info(Thread.currentThread().getName()+"获取子节点中,排在我前面的"+waitPath);try{zk.getData(waitPath, new Watcher(){@Overridepublic void process(WatchedEvent event) {if (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) { LOG.info(Thread.currentThread().getName()+ "收到情报,排我前面的家伙已挂,我是不是可以出山了?"); try { if(checkMinPath(zk, selfPath)){ LOG.info(Thread.currentThread().getName() + "获取锁成功,赶紧干活!");dosomething();threadSemaphore.countDown();try {if(zk.exists(selfPath,false) == null){LOG.error(Thread.currentThread().getName()+"本节点已不在了...");} else {zk.delete(selfPath, -1);LOG.info(Thread.currentThread().getName() + "删除本节点:"+selfPath);zk.close(); }} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (KeeperException e) {// TODO Auto-generated catch blocke.printStackTrace();}} } catch ( Exception e) { e.printStackTrace(); } } }}, new Stat());}catch(KeeperException e){if(zk.exists(waitPath,false) == null){LOG.info(Thread.currentThread().getName()+"子节点中,排在我前面的"+waitPath+"已失踪,幸福来得太突然?");return checkMinPath(zk, selfPath);}else{throw e;}} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}return false;}protected static void dosomething() {System.out.println("我正在獨享資源互斥地進行工作。。。");}
}
面向對象重構版:
package distributedLockObject;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;public class AbstractZooKeeper implements Watcher{protected ZooKeeper zookeeper;protected CountDownLatch countDownLatch = new CountDownLatch(1);public ZooKeeper connect(String hosts, int SESSION_TIMEOUT) throws IOException, InterruptedException{zookeeper = new ZooKeeper(hosts, SESSION_TIMEOUT, this);countDownLatch.await();System.out.println("AbstractZooKeeper.connect()");return zookeeper;}public void process(WatchedEvent event){if (event.getState() == KeeperState.SyncConnected){countDownLatch.countDown();}}public void close() throws InterruptedException{zookeeper.close();}
}
package distributedLockObject;import java.util.Collections;
import java.util.List;import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class DistributedLock {private ZooKeeper zk = null;private String selfPath;private String waitPath;private String LOG_PREFIX_OF_THREAD=Thread.currentThread().getName();private static final String GROUP_PATH = "/disLocks";private static final String SUB_PATH = "/disLocks/sub";private static final Logger LOG = LoggerFactory.getLogger(DistributedLock.class);private Watcher watcher;public DistributedLock(ZooKeeper zk ) {this.zk = zk; }public Watcher getWatcher() {return watcher;}public void setWatcher(Watcher watcher) {this.watcher = watcher;}/*** 获取锁* @return*/public boolean getLock() throws KeeperException, InterruptedException {selfPath = zk.create(SUB_PATH,null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);LOG.info(LOG_PREFIX_OF_THREAD+"创建锁路径:"+selfPath);if(checkMinPath()){return true;}return false;}/*** 创建节点* @param path 节点path* @param data 初始数据内容* @return*/public boolean createPath( String path, String data ) throws KeeperException, InterruptedException {if(zk.exists(path, false)==null){LOG.info( LOG_PREFIX_OF_THREAD + "节点创建成功, Path: "+ this.zk.create( path,data.getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT )+ ", content: " + data );}return true;}public void unlock(){try {if(zk.exists(this.selfPath,false) == null){LOG.error(LOG_PREFIX_OF_THREAD+"本节点已不在了...");return;}zk.delete(this.selfPath, -1);LOG.info(LOG_PREFIX_OF_THREAD + "删除本节点:"+selfPath);zk.close();} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (KeeperException e) {// TODO Auto-generated catch blocke.printStackTrace();}}/*** 检查自己是不是最小的节点* @return*/public boolean checkMinPath() throws KeeperException, InterruptedException {List<String> subNodes = zk.getChildren(GROUP_PATH, false);Collections.sort(subNodes);int index = subNodes.indexOf( selfPath.substring(GROUP_PATH.length()+1));switch (index){case -1:{LOG.error(LOG_PREFIX_OF_THREAD+"本节点已不在了..."+selfPath);return false;}case 0:{LOG.info(LOG_PREFIX_OF_THREAD+"子节点中,我果然是老大"+selfPath);return true;}default:{this.waitPath = GROUP_PATH +"/"+ subNodes.get(index - 1);LOG.info(LOG_PREFIX_OF_THREAD+"获取子节点中,排在我前面的"+waitPath);try{zk.getData(waitPath, this.watcher, new Stat());return false;}catch(KeeperException e){if(zk.exists(waitPath,false) == null){LOG.info(LOG_PREFIX_OF_THREAD+"子节点中,排在我前面的"+waitPath+"已失踪,幸福来得太突然?");return checkMinPath();}else{throw e;}}}}}public String getWaitPath() {return waitPath;}}
package distributedLockObject;public interface DoTemplate {void dodo();
}
package distributedLockObject;import java.io.IOException;
import java.util.concurrent.CountDownLatch;import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;public class LockService {//确保所有线程运行结束;private static final String CONNECTION_STRING = "192.168.1.201:2181,192.168.1.202:2181,192.168.1.203:2181";private static final int THREAD_NUM = 10; public static CountDownLatch threadSemaphore = new CountDownLatch(THREAD_NUM);private static final String GROUP_PATH = "/disLocks";private static final int SESSION_TIMEOUT = 10000;AbstractZooKeeper az = new AbstractZooKeeper();public void doService(DoTemplate doTemplate){try {ZooKeeper zk = az.connect(CONNECTION_STRING,SESSION_TIMEOUT);DistributedLock dc = new DistributedLock(zk);LockWatcher lw = new LockWatcher(dc,doTemplate);dc.setWatcher(lw);//GROUP_PATH不存在的话,由一个线程创建即可;dc.createPath(GROUP_PATH, "该节点由线程"+Thread.currentThread().getName() + "创建");boolean rs = dc.getLock();if (rs==true) {lw.dosomething();dc.unlock();}} catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (KeeperException e) {// TODO Auto-generated catch blocke.printStackTrace();}}
}
package distributedLockObject;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class LockWatcher implements Watcher{private static final Logger LOG = LoggerFactory.getLogger(LockWatcher.class);private DistributedLock distributedLock;private DoTemplate doTemplate;public LockWatcher(DistributedLock distributedLock,DoTemplate doTemplate) {// TODO Auto-generated constructor stubthis.distributedLock = distributedLock;this.doTemplate = doTemplate;}@Overridepublic void process(WatchedEvent event) {// TODO Auto-generated method stubif (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(distributedLock.getWaitPath())) { LOG.info(Thread.currentThread().getName()+ "收到情报,排我前面的家伙已挂,我是不是可以出山了?"); try { if(distributedLock.checkMinPath()){ dosomething();distributedLock.unlock();} } catch ( Exception e) { e.printStackTrace(); } } }public void dosomething(){LOG.info(Thread.currentThread().getName() + "获取锁成功,赶紧干活!");doTemplate.dodo();TestLock.threadSemaphore.countDown();}}
package distributedLockObject;
import java.util.concurrent.CountDownLatch;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class TestLock {private static final Logger LOG = LoggerFactory.getLogger(TestLock.class);//确保所有线程运行结束;private static final int THREAD_NUM = 10; public static CountDownLatch threadSemaphore = new CountDownLatch(THREAD_NUM);public static void main(String[] args) {for(int i=0; i < THREAD_NUM; i++){final int threadId = i;new Thread(){@Overridepublic void run() {try{new LockService().doService(new DoTemplate() {@Overridepublic void dodo() {// TODO Auto-generated method stubLOG.info("我要修改一个文件。。。。"+threadId);}});} catch (Exception e){LOG.error("【第"+threadId+"个线程】 抛出的异常:");e.printStackTrace();}}}.start();}try {
// Thread.sleep(60000);threadSemaphore.await();LOG.info("所有线程运行结束!");} catch (Exception e) {e.printStackTrace();}}
}