ZooKeeper編程02--多線程的分佈式鎖

面向過程版:

package distributedLockProcess;import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class dl {private static final Logger LOG = LoggerFactory.getLogger(dl.class);//确保所有线程运行结束;private static final String CONNECTION_STRING = "192.168.1.201:2181,192.168.1.202:2181,192.168.1.203:2181";private static final int SESSION_TIMEOUT = 10000;private static final String GROUP_PATH = "/disLocks";private static final String SUB_PATH = "/disLocks/sub";private static final int THREAD_NUM = 10; public static CountDownLatch threadSemaphore = new CountDownLatch(THREAD_NUM);public static void main(String[] args) {for(int i=0; i < THREAD_NUM; i++){final int threadId = i;new Thread(){@Overridepublic void run() {final CountDownLatch countDownLatch = new CountDownLatch(1);try{//此線程連接ZooKeeperZooKeeper zk = new ZooKeeper(CONNECTION_STRING, SESSION_TIMEOUT, new Watcher(){@Overridepublic void process(WatchedEvent event){if (event.getState() == KeeperState.SyncConnected){countDownLatch.countDown();}}});countDownLatch.await();System.out.println(Thread.currentThread().getName() + " --- ZooKeeper.connect()");//GROUP_PATH不存在的话,由一个线程创建即可;if(zk.exists(GROUP_PATH, false)==null){LOG.info( Thread.currentThread().getName() + "节点创建成功, Path: "+ zk.create( GROUP_PATH,("该节点由线程"+Thread.currentThread().getName() + "创建").getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT )+ ", content: " + ("该节点由线程"+Thread.currentThread().getName() + "创建") );}String selfPath = zk.create(SUB_PATH,null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);LOG.info(Thread.currentThread().getName()+"创建锁路径:"+selfPath);if(checkMinPath(zk, selfPath)){LOG.info(Thread.currentThread().getName() + "获取锁成功,赶紧干活!");dosomething();threadSemaphore.countDown();try {if(zk.exists(selfPath,false) == null){LOG.error(Thread.currentThread().getName()+"本节点已不在了...");return;}zk.delete(selfPath, -1);LOG.info(Thread.currentThread().getName() + "删除本节点:"+selfPath);zk.close();} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (KeeperException e) {// TODO Auto-generated catch blocke.printStackTrace();}}} catch (Exception e){LOG.error("【第"+threadId+"个线程】 抛出的异常:");e.printStackTrace();}}}.start();}try {
//	        	Thread.sleep(60000);threadSemaphore.await();LOG.info("所有线程运行结束!");} catch (Exception e) {e.printStackTrace();}}protected static boolean checkMinPath(final ZooKeeper zk, final String selfPath) throws KeeperException, InterruptedException {List<String> subNodes = zk.getChildren(GROUP_PATH, false);Collections.sort(subNodes);int index = subNodes.indexOf( selfPath.substring(GROUP_PATH.length()+1));switch (index){case -1:{LOG.error(Thread.currentThread().getName()+"本节点已不在了..."+selfPath);return false;}case 0:{LOG.info(Thread.currentThread().getName()+"子节点中,我果然是老大"+selfPath);return true;}default:{final String waitPath = GROUP_PATH +"/"+ subNodes.get(index - 1);LOG.info(Thread.currentThread().getName()+"获取子节点中,排在我前面的"+waitPath);try{zk.getData(waitPath, new Watcher(){@Overridepublic void process(WatchedEvent event) {if (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) {  LOG.info(Thread.currentThread().getName()+ "收到情报,排我前面的家伙已挂,我是不是可以出山了?");  try {  if(checkMinPath(zk, selfPath)){  LOG.info(Thread.currentThread().getName() + "获取锁成功,赶紧干活!");dosomething();threadSemaphore.countDown();try {if(zk.exists(selfPath,false) == null){LOG.error(Thread.currentThread().getName()+"本节点已不在了...");} else {zk.delete(selfPath, -1);LOG.info(Thread.currentThread().getName() + "删除本节点:"+selfPath);zk.close();                                    			}} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (KeeperException e) {// TODO Auto-generated catch blocke.printStackTrace();}}  } catch ( Exception e) {  e.printStackTrace();  }  } }}, new Stat());}catch(KeeperException e){if(zk.exists(waitPath,false) == null){LOG.info(Thread.currentThread().getName()+"子节点中,排在我前面的"+waitPath+"已失踪,幸福来得太突然?");return checkMinPath(zk, selfPath);}else{throw e;}} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}return false;}protected static void dosomething() {System.out.println("我正在獨享資源互斥地進行工作。。。");}
}

面向對象重構版:

package distributedLockObject;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;public class AbstractZooKeeper implements Watcher{protected ZooKeeper zookeeper;protected CountDownLatch countDownLatch = new CountDownLatch(1);public ZooKeeper connect(String hosts, int SESSION_TIMEOUT) throws IOException, InterruptedException{zookeeper = new ZooKeeper(hosts, SESSION_TIMEOUT, this);countDownLatch.await();System.out.println("AbstractZooKeeper.connect()");return zookeeper;}public void process(WatchedEvent event){if (event.getState() == KeeperState.SyncConnected){countDownLatch.countDown();}}public void close() throws InterruptedException{zookeeper.close();}
}

package distributedLockObject;import java.util.Collections;
import java.util.List;import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class DistributedLock    {private ZooKeeper zk = null;private String selfPath;private String waitPath;private String LOG_PREFIX_OF_THREAD=Thread.currentThread().getName();private static final String GROUP_PATH = "/disLocks";private static final String SUB_PATH = "/disLocks/sub";private static final Logger LOG = LoggerFactory.getLogger(DistributedLock.class);private Watcher  watcher;public DistributedLock(ZooKeeper  zk ) {this.zk = zk; }public Watcher getWatcher() {return watcher;}public void setWatcher(Watcher watcher) {this.watcher = watcher;}/*** 获取锁* @return*/public boolean  getLock()  throws KeeperException, InterruptedException {selfPath = zk.create(SUB_PATH,null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);LOG.info(LOG_PREFIX_OF_THREAD+"创建锁路径:"+selfPath);if(checkMinPath()){return true;}return false;}/*** 创建节点* @param path 节点path* @param data 初始数据内容* @return*/public boolean createPath( String path, String data  ) throws KeeperException, InterruptedException {if(zk.exists(path, false)==null){LOG.info( LOG_PREFIX_OF_THREAD + "节点创建成功, Path: "+ this.zk.create( path,data.getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT )+ ", content: " + data );}return true;}public void unlock(){try {if(zk.exists(this.selfPath,false) == null){LOG.error(LOG_PREFIX_OF_THREAD+"本节点已不在了...");return;}zk.delete(this.selfPath, -1);LOG.info(LOG_PREFIX_OF_THREAD + "删除本节点:"+selfPath);zk.close();} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (KeeperException e) {// TODO Auto-generated catch blocke.printStackTrace();}}/*** 检查自己是不是最小的节点* @return*/public  boolean checkMinPath() throws KeeperException, InterruptedException {List<String> subNodes = zk.getChildren(GROUP_PATH, false);Collections.sort(subNodes);int index = subNodes.indexOf( selfPath.substring(GROUP_PATH.length()+1));switch (index){case -1:{LOG.error(LOG_PREFIX_OF_THREAD+"本节点已不在了..."+selfPath);return false;}case 0:{LOG.info(LOG_PREFIX_OF_THREAD+"子节点中,我果然是老大"+selfPath);return true;}default:{this.waitPath = GROUP_PATH +"/"+ subNodes.get(index - 1);LOG.info(LOG_PREFIX_OF_THREAD+"获取子节点中,排在我前面的"+waitPath);try{zk.getData(waitPath, this.watcher, new Stat());return false;}catch(KeeperException e){if(zk.exists(waitPath,false) == null){LOG.info(LOG_PREFIX_OF_THREAD+"子节点中,排在我前面的"+waitPath+"已失踪,幸福来得太突然?");return checkMinPath();}else{throw e;}}}}}public String getWaitPath() {return waitPath;}}

package distributedLockObject;public interface DoTemplate {void dodo();
}

package distributedLockObject;import java.io.IOException;
import java.util.concurrent.CountDownLatch;import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;public class LockService {//确保所有线程运行结束;private static final String CONNECTION_STRING = "192.168.1.201:2181,192.168.1.202:2181,192.168.1.203:2181";private static final int THREAD_NUM = 10; public static   CountDownLatch threadSemaphore = new CountDownLatch(THREAD_NUM);private static final String GROUP_PATH = "/disLocks";private static final int SESSION_TIMEOUT = 10000;AbstractZooKeeper az = new AbstractZooKeeper();public void doService(DoTemplate doTemplate){try {ZooKeeper zk = az.connect(CONNECTION_STRING,SESSION_TIMEOUT);DistributedLock dc = new DistributedLock(zk);LockWatcher lw = new LockWatcher(dc,doTemplate);dc.setWatcher(lw);//GROUP_PATH不存在的话,由一个线程创建即可;dc.createPath(GROUP_PATH, "该节点由线程"+Thread.currentThread().getName() + "创建");boolean rs = dc.getLock();if (rs==true) {lw.dosomething();dc.unlock();}} catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (KeeperException e) {// TODO Auto-generated catch blocke.printStackTrace();}}
}

package distributedLockObject;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class LockWatcher implements Watcher{private static final Logger LOG = LoggerFactory.getLogger(LockWatcher.class);private DistributedLock distributedLock;private  DoTemplate doTemplate;public LockWatcher(DistributedLock distributedLock,DoTemplate doTemplate) {// TODO Auto-generated constructor stubthis.distributedLock = distributedLock;this.doTemplate = doTemplate;}@Overridepublic void process(WatchedEvent event) {// TODO Auto-generated method stubif (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(distributedLock.getWaitPath())) {  LOG.info(Thread.currentThread().getName()+ "收到情报,排我前面的家伙已挂,我是不是可以出山了?");  try {  if(distributedLock.checkMinPath()){  dosomething();distributedLock.unlock();}  } catch ( Exception e) {  e.printStackTrace();  }  } }public   void dosomething(){LOG.info(Thread.currentThread().getName() + "获取锁成功,赶紧干活!");doTemplate.dodo();TestLock.threadSemaphore.countDown();}}

package distributedLockObject;
import java.util.concurrent.CountDownLatch;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class TestLock {private static final Logger LOG = LoggerFactory.getLogger(TestLock.class);//确保所有线程运行结束;private static final int THREAD_NUM = 10; public static   CountDownLatch threadSemaphore = new CountDownLatch(THREAD_NUM);public static void main(String[] args) {for(int i=0; i < THREAD_NUM; i++){final int threadId = i;new Thread(){@Overridepublic void run() {try{new LockService().doService(new DoTemplate() {@Overridepublic void dodo() {// TODO Auto-generated method stubLOG.info("我要修改一个文件。。。。"+threadId);}});} catch (Exception e){LOG.error("【第"+threadId+"个线程】 抛出的异常:");e.printStackTrace();}}}.start();}try {
//	        	Thread.sleep(60000);threadSemaphore.await();LOG.info("所有线程运行结束!");} catch (Exception e) {e.printStackTrace();}}
}


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/387130.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

01 Python变量和数据类型

Python变量和数据类型 1 数据类型 计算机&#xff0c;顾名思义就是可以做数学计算的机器&#xff0c;因此&#xff0c;计算机程序理所当然也可以处理各种数值。 但是&#xff0c;计算机能处理的远不止数值&#xff0c;还可以处理文本、图形、音频、视频、网页等各种各样的数…

初识Python-1

1&#xff0c;计算机基础。 2&#xff0c;python历史。 宏观上&#xff1a;python2 与 python3 区别&#xff1a; python2 源码不标准&#xff0c;混乱&#xff0c;重复代码太多&#xff0c; python3 统一 标准&#xff0c;去除重复代码。 3&#xff0c;python的环境。 编译型&…

02 List、Tuple、Dict、Set

List 线性表 创建List&#xff1a; >>> classmates [Michael, Bob, Tracy] >>> L [Michael, 100, True] #可以在list中包含各种类型的数据 >>> empty_list [] #空List 按索引访问List&#xff1a; >>> print L[0] #索引从0开始…

Jenkins的一些代码

pipeline {agent anyenvironment { def ITEMNAME "erp"def DESTPATH "/home/ops/testpipe"def codePATH"/var/lib/jenkins/workspace/test_pipeline"}stages { stage(代码拉取){steps {echo "checkout from ${ITEMNAME}"git url:…

利用layui前端框架实现对不同文件夹的多文件上传

利用layui前端框架实现对不同文件夹的多文件上传 问题场景&#xff1a; 普通的input标签实现多文件上传时&#xff0c;只能对同一个文件夹下的多个文件进行上传&#xff0c;如果要同时上传两个或多个文件夹下的文件&#xff0c;是无法实现的。这篇文章就是利用layui中的插件&am…

ps、grep和kill联合使用杀掉进程

ps、grep和kill联合使用杀掉进程例如要杀掉hello这个进程&#xff0c;使用下面这个命令就能直接实现。ps -ef |grep hello |awk {print $2}|xargs kill -9这里是输出ps -ef |grep hello 结果的第二列的内容然后通过xargs传递给kill -9,其实第二列内容就是hello的进程号&#xf…

03 控制語句

if语句 if age > 18 print your age is, age else print teenager Python代码的缩进规则&#xff1a;具有相同缩进的代码被视为代码块。 if age > 18 print adult elif age > 6 print teenager elif age > 3 print kid else print baby for循环 L [Adam, L…

yum 来安装 nodejs

要通过 yum 来安装 nodejs 和 npm 需要先给 yum 添加 epel 源&#xff0c;添加方法在 centos 添加epel和remi源 中##添加 epel 源 64位: rpm -ivh http://download.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-8.noarch.rpm32位: rpm -ivh http://download.fedoraproj…

yzh的神仙题

U66905 zz题 考虑一个点权值被计算了多少次。。。不知 所以对未来承诺&#xff0c;方便直接算上总数&#xff01; 然后其实是给边定向&#xff0c;即先删除fa和son的哪一个 f[x][j]&#xff0c;会计算j次 无法转移 f[x][j][k]&#xff0c;其中会从子树计算k次。 当边从儿子指向…

04 函數

內置函數 Python內置了很多有用的函數&#xff0c;可以直接調用。 要調用一個函數&#xff0c;需要知道函數的名稱和參數。 可以直接從Python的官方網站查看文檔&#xff1a;http://docs.python.org/2/library >>> abs(-20) >>> help(abs) >>>…

iview render的时候可以写控件的基本格式

render: (h, params) > {return h(div, [h(Button, {props: {type: id,size: small},style: {marginRight: 5px},on: {click: () > {this.pojectshow(this.datatable[params.index].id)}}}, 详情),h(Button, {props: {type: id,size: small},style: {marginRight: 5px},o…

ES6基本使用

var let 度可用于声明变量. 区别&#xff1a;1、let&#xff1a;只在let命令所在代码块内有效 2、let 不存在变量提升&#xff08;内部影响不到外部&#xff09; var b [];for(var j0;j<10;j){let dj;b[j]function(){console.log(d);};}b[3]() //3 3、let 不允许在相同作用…

Axios的Vue插件(添加全局请求/响应拦截器)

/*** file Axios的Vue插件&#xff08;添加全局请求/响应拦截器&#xff09;*/// https://github.com/mzabriskie/axios import axios from axios// 拦截request,设置全局请求为ajax请求 axios.interceptors.request.use((config) > {config.headers[X-Requested-With] XML…

05 切片、迭代、列表生成

切片 >>> L [Adam, Lisa, Bart, Paul] >>> L[0:3] #取前3个元素 >>> L[:3] >>> L[1:3] >>> L[:] >>> L[::2] #第三个参数表示每2个元素取一个元素&#xff0c;也就是隔一个取一个 [Adam,Bart] >>>…

一个例子彻底搞懂C++的虚函数和纯虚函数

学习C的多态性&#xff0c;你必然听过虚函数的概念&#xff0c;你必然知道有关她的种种语法&#xff0c;但你未必了解她为什么要那样做&#xff0c;未必了解她种种行为背后的所思所想。深知你不想在流于表面语法上的蜻蜓点水似是而非&#xff0c;今天我们就一起来揭开挡在你和虚…

利用Caffe实现mnist的数据训练

阿里云的参考文档&#xff1a;https://help.aliyun.com/document_detail/49571.html在文档里提供了caffe的一个案例&#xff0c;利用Caffe实现mnist的数据训练。准备的数据源可以在“深度学习案例代码及数据下载”页找到Caffe数据下载并解压。要训练自己的图片&#xff0c;还是…

06 函数式編程

1 函数式编程简介 函数&#xff1a;function 函数式&#xff1a;functional 一种编程范式 特点&#xff1a; 把计算视为函数而非指令 纯函数式编程&#xff1a;不需要变量&#xff0c;没有副作用&#xff0c;测试简单 支持高阶函数&#xff0c;代码简洁 Python支持的函数式…

Android SDK开发

目前我们的应用内使用了 ArcFace 的人脸检测功能&#xff0c;其他的我们并不了解&#xff0c;所以这里就和大家分享一下我们的集成过程和一些使用心得 集成 ArcFace FD 的集成过程非常简单 在 ArcFace FD 的文档上有说明支持的系统为 5.0 及以上系统&#xff0c;但其实在 4.4 系…

jQuery WeUI 上传

jQuery WeUI 是专为微信公众账号开发而设计的一个框架&#xff0c;jQuery WeUI的官网&#xff1a;http://jqweui.com/ 需求&#xff1a;需要在微信公众号网页添加上传图片功能 技术选型&#xff1a;实现上传图片功能可选百度的WebUploader、饿了么的Element和微信的jQuery WeUI…

07 模块

模块和包的概念 等同于java中的Package 模块名文件名&#xff08;无后缀&#xff09; 在文件系統中&#xff0c;包就是文件夾&#xff0c;模块就是xxx.py文件 每层包下面都有__init__.py文件 导入模块 >>> import math >>> math.pow(2, 0.5) >>…