为什么要使用Zookeeper
dubbo需要一个注册中心,而Zookeeper是我们在使用Dubbo是官方推荐的注册中心
Zookeeper介绍
Zookeeper的集群机制
Zookeeper是为了其他分布式程序提供服务的,所以不能随便就挂了。Zookeeper的集群机制采取的是半数存活机制
。也就是整个集群节点中有半数以上的节点存活,那么整个集群环境可用。这也是为什么说zk的集群最好是奇数个节点。
zk的作用
序号 | 功能 |
---|---|
1 | 为别的分布式程序服务的 |
2 | 本身就是一个分布式程序 |
3 | 主从协调 服务器节点动态上下线 统一配置管理 分布式共享锁 统一名称服务 |
4 | 管理(存储,读取)用户程序提交的数据 并为用户程序提供数据节点监听服务 |
Zookeeper集群节点的角色
Leader
Leader是Zookeeper集群工作的核心,其主要工作是:
- 事务请求的唯一调度和处理者,保证集群事务处理的顺序性。
- 集群内部各服务器的调度者
Follower
Follower是zookeeper集群的跟随者,主要工作是:
- 处理客户端非事务性请求(读取数据),转发事务请求给Leader服务器
- 参与事务请求Proposal的投票
- 参与Leader选举投票
Observer
Observer充当观察者的角色,观察Zookeeper集群的最小状态变化并将这些状态同步过来,其对于非事务请求可以独立处理
,对于事务请求,会转给Leader节点进行处理。Observer不会参与投票
,包括事务请求Proposal的投票和Leader选举投票
集群环境准备
节点的映射关系
每个节点设置相应的ip和主机名的映射关系,方便集群环境的部署
修改hosts配置文件中的信息
配置免密登录
生成公钥和私钥
ssh-keygen
输入命令后根据提示,四次回车即可
发送公钥给需要免密登录的节点
ssh-copy-id zk01
ssh-copy-id zk02
ssh-copy-id zk03
节点和节点发送文件通过scp命令实现
scp -r b.txt bobo01:/root/
关闭防火墙
查看防火墙状态
firewall-cmd --state
停止防火墙
systemctl stop firewall.service
禁止开机启动
systemctl disable firewall.service
Zookeeper的选举机制
Leader主要作用是保证分布式数据一致性,即每个节点的存储的数据同步。
服务器初始化时Leader选举
Zookeeper由于自身的性质,一般建议选取奇数个节点进行搭建分布式服务器集群。以3个节点组成的服务器集群为例,说明服务器初始化时的选举过程。启动第一台安装Zookeeper的节点时,无法单独进行选举,启动第二台时,两节点之间进行通信,开始选举Leader。
- 每个Server投出一票。第一次他们都投给自己作为Leader,投票内容未(SID,ZXID)。
SID为Server的id,即启动ZK时配置文件中的myid;
ZXID为事务id,为节点的更新程序,ZXID越大,代表Server对ZK节点的操作越新。由于服务器初始化,
每个Sever上的Znode为0,所以Server1投的票为(1,0),Server2为(2,0)。两Server将各自投票发给集群中其他机器。 - 每个Server接收来自其他Server的投票。集群中的每个Server先判断投票的有效性,如检查是不是本轮的投票,是不是来Looking状态的服务器投的票。
- 对投票结果进行处理。处理规则为:
- 首先对比ZXID。ZXID大的服务器优先作为Leader
- 若ZXID系统,如初始化时,每个Server的ZXID都是0
- 就会比较sid即myid,myid大的选出来做Leader。
首次选举对于Server而言,他接受到的投票为(2,0),因为自身的票为(1,0),所以此时它会选举Server2为Leader,
将自己的更新为(2,0)。而Server2收到的投票为Server1的(1,0)由于比他自己小,
Server2的投票不变。Server1和Server2再次将票投出,投出的票都为(2,0)
- 统计投票。每次投票后,服务器都会统计投票信息,如果判定某个Server有过半的票数,俺么该Server就是Leader。首次投票对于Server1和Server2而言,统计出已经有两台机器接收了(2,0)的投票信息,此时认为选出了Leader。
- 改变服务器的状态。当确定了Leader之后,每个Server更新自己的状态,
Leader将状态更新为Leading,Follower将状态更新为Following。
服务器运行期间的Leader选举
ZK运行期间,如果有新的Server加入,或非Leader节点挂了,那么LEader会同步数据给新的Server或寻找其他备用Server替代宕机的Server。若Leader宕机,此时集群暂停对外服务,开始在内部选举新的Leader。假设当前集群中有Server1、Server2、Server3三台服务器,Server2为当前集群的Leader,由于意外情况,Server2宕机了,便开始进入选举状态。过程如下
1 变更状态。其他非Observer服务器将自己的状态改变成Looking,开始进入Leader选举。
2. 每个Server投出1张票(myid,ZXID),由于集群运行过,所以每个Server的ZXID可能不同。
假设Server1的ZXID为145,Server3的为122,第一轮投票中,Server1和Server3都投自己,
票分别为(1,145)、(3,122),将自己的票发送给集群中所有机器。
3. 每个Server接收接收来自其他Server的投票,接下来的步骤与初始化时相同。
Zookeeper客户端使用
配置Zookeeper的环境变量
为了简化我们每次操作Zookeeper而不用进入到Zookeeper的安装目录,我们可以将Zookeeper的安装信息配置到系统的环境变量中
vim /etc/profile
添加的内容
export ZOOKEPPER_HOME=/opt/zookeeper
export PATH=$PATH:$ZOOKEEPER_HOME/bin
执行source命令
source /etc/profile
我们就可以在节点的任意位置操作Zookeeper了,通过scp命令将profile文件发送到其他几个节点上
scp /etc/profile zk02:/etc/
客户端连接
通过bin目录下的zkCli.sh 命令连接即可
zkCli.sh
zkCli.sh默认连接的是当前节点的Zookeeper节点,如果我们要连接其他节点执行如下命令即可
zkCli.sh -timeout 5000 -server zk02:2181
数据操作
Zookeeper的数据结构
- 层次化的目录结构,命名符合常规文件系统规范
- 每个节点在Zookeeper中叫做znode,并且有一个唯一的路径标识
- 节点znode可以包含数据和子节点(但是EPHEMERAL类型的节点不能有子节点)
- 客户端应用可以在节点上设置监听器
节点类型
1).znode有两种类型:
短暂性(ephemeral)(断开连接自己删除)
持久性(persistent)(断开连接不删除)
2).znode有四种形式的目录节点(默认是persistent)如下
序号 | 节点类型 | 描述 |
---|---|---|
1 | PERSISTENT | 持久节点 |
2 | PERSISTENT_SEQUENTIAL | 持久有序节点(顺序节点) |
3 | EPHEMERAL | 短暂节点 (临时节点) |
4 | EPHEMERAL_SEQUENTIAL | 短暂有序节点 (临时顺序节点) |
创建znode时设置顺序标识,znode名称后会附加一个值,顺序号是一个单调递增的计数器,有父节点维护在分布式系统中,顺序号可以被用于为所有的事件进行全局排序,这样客户端可以通过顺序号推断事件的顺序
常用命令
Zookeeper作为Dubbo的注册中心用来保存我们各个服务的节点信息,显示Zookeeper是可以实现输出的存储操作的,我们来看下Zookeeper中存储操作的基本命令
ls
ls用来查看某个节点下的子节点信息
增强的命令,查看节点下的子节点及当前节点的属性信息 ls2或者 ls -s 命令
create
创建节点信息
get
get命令用来查看节点的数据
如果要查看节点的属性信息那么我们可以通过get -s 来实现
delete
delete只能删除没有子节点的节点要删除非空节点可以通过 rmr 或者 deleteall 命令实现
set
set命令可以用来修改节点的内容。
事件监听
监听某个节点的数据内容变化,通过get命令 带 -w 参数即可,在3.4版本的Zookeeper中是通过 get path watch
来说实现监控的
然后我们在其他节点上修改app1节点的数据,会触监听事件
Zookeeper Java API使用
pom
<dependencies><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.5.9</version></dependency><dependency><groupId>com.github.sgroschupf</groupId><artifactId>zkclient</artifactId><version>0.1</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency></dependencies>
连接ZK服务,并监听节点变化
@Slf4j
public class ConfigCenter {private final static String CONNECT_STR="192.168.40.243:2181";private final static Integer SESSION_TIMEOUT=30*1000;private static ZooKeeper zooKeeper=null;private static CountDownLatch countDownLatch=new CountDownLatch(1);public static void main(String[] args) throws IOException, InterruptedException, KeeperException {zooKeeper=new ZooKeeper(CONNECT_STR, SESSION_TIMEOUT, new Watcher() {@Overridepublic void process(WatchedEvent event) {if (event.getType()== Event.EventType.None&& event.getState() == Event.KeeperState.SyncConnected){log.info("连接已建立");countDownLatch.countDown();}}});countDownLatch.await();MyConfig myConfig = new MyConfig();myConfig.setKey("anykey");myConfig.setName("anyName");ObjectMapper objectMapper=new ObjectMapper();byte[] bytes = objectMapper.writeValueAsBytes(myConfig);String s = zooKeeper.create("/myconfig", bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);Watcher watcher = new Watcher() {@SneakyThrows@Overridepublic void process(WatchedEvent event) {if (event.getType()== Event.EventType.NodeDataChanged&& event.getPath()!=null && event.getPath().equals("/myconfig")){log.info(" PATH:{} 发生了数据变化" ,event.getPath());//循环监听//监听结束后,重新设置byte[] data = zooKeeper.getData("/myconfig", this, null);MyConfig newConfig = objectMapper.readValue(new String(data), MyConfig.class);log.info("数据发生变化: {}",newConfig);}}};byte[] data = zooKeeper.getData("/myconfig", watcher, null);MyConfig originalMyConfig = objectMapper.readValue(new String(data), MyConfig.class);log.info("原始数据: {}", originalMyConfig);// TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);}
standalone版本
@Slf4j
public abstract class StandaloneBase {private static final String CONNECT_STR="192.168.109.200:2181";private static final int SESSION_TIMEOUT=30 * 1000;private static ZooKeeper zooKeeper =null;private static CountDownLatch countDownLatch = new CountDownLatch(1);private Watcher watcher =new Watcher() {@Overridepublic void process(WatchedEvent event) {if (event.getState() == Event.KeeperState.SyncConnected&& event.getType()== Event.EventType.None){countDownLatch.countDown();log.info("连接建立");}}};@Beforepublic void init(){try {log.info(" start to connect to zookeeper server: {}",getConnectStr());zooKeeper=new ZooKeeper(getConnectStr(), getSessionTimeout(), watcher);log.info(" 连接中...");countDownLatch.await();} catch (IOException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}public static ZooKeeper getZooKeeper() {return zooKeeper;}@Afterpublic void test(){try {TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);} catch (InterruptedException e) {e.printStackTrace();}}protected String getConnectStr(){return CONNECT_STR;}protected int getSessionTimeout() {return SESSION_TIMEOUT;}
}
private String first_node = "/first-node";/*** 创建节点*/@Testpublic void testCreate() throws KeeperException, InterruptedException {ZooKeeper zooKeeper = getZooKeeper();String s = zooKeeper.create(first_node, "first".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);log.info("Create:{}",s);}/*** 获得节点数据*/@Testpublic void testGetData(){Watcher watcher = new Watcher() {@Overridepublic void process(WatchedEvent event) {if (event.getPath()!=null && event.getPath().equals(first_node)&& event.getType()== Event.EventType.NodeDataChanged){log.info(" PATH: {} 发现变化",first_node);try {byte[] data = getZooKeeper().getData(first_node, this, null);log.info(" data: {}",new String(data));} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}}};try {byte[] data = getZooKeeper().getData(first_node, watcher, null); //log.info(" data: {}",new String(data));} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}/*** 异步获取节点数据*/@Testpublic void asyncTest(){String userId="xxx";getZooKeeper().getData("/test", false, (rc, path, ctx, data, stat) -> {Thread thread = Thread.currentThread();log.info(" Thread Name: {}, rc:{}, path:{}, ctx:{}, data:{}, stat:{}",thread.getName(),rc, path, ctx, data, stat);},"test");log.info(" over .");}/*** 判断节点是否存在*/@Testpublic void exist() throws Exception{ZooKeeper zooKeeper = getZooKeeper();// true表示的是使用Zookeeper中的watchStat stat = zooKeeper.exists(first_node, true);if(stat != null){System.out.println("节点存在"+ stat.getNumChildren());}else{System.out.println("节点不存在 ....");}}/*** 获取某个节点下面的所有的子节点*/@Testpublic void getChildrens() throws Exception{ZooKeeper zooKeeper = getZooKeeper();List<String> childrens = zooKeeper.getChildren(first_node, true);for (String children : childrens) {// System.out.println(children);// 获取子节点中的数据byte[] data = zooKeeper.getData(first_node+"/" + children, false, null);System.out.println(children+":" + new String(data));}}/*** 修改节点的内容*/@Testpublic void setData() throws Exception{// -1 不指定版本 自动维护Stat stat = zooKeeper.setData(first_node+"/a1", "666666".getBytes(), -1);System.out.println(stat);// 指定版本 自动维护//ZooKeeper zooKeeper = getZooKeeper();//Stat stat = new Stat();//byte[] data = zooKeeper.getData(first_node, false, stat);//int version = stat.getVersion();//版本号//zooKeeper.setData(first_node, "third".getBytes(), version);}/*** 删除节点*/@Testpublic void deleteNode() throws Exception{zooKeeper.delete(first_node,-1);}
事件监听处理
/*** 监听Node节点下的子节点的变化*/@Testpublic void nodeChildrenChange() throws Exception{List<String> list = zooKeeper.getChildren("/app1", new Watcher() {/*** None(-1),* NodeCreated(1),* NodeDeleted(2),* NodeDataChanged(3),* NodeChildrenChanged(4),* DataWatchRemoved(5),* ChildWatchRemoved(6);* @param watchedEvent*/@Overridepublic void process(WatchedEvent watchedEvent) {System.out.println("--->"+ watchedEvent.getType());}});for (String s : list) {System.out.println(s);}Thread.sleep(Integer.MAX_VALUE);}/*** 监听节点内容变更*/@Testpublic void nodeDataChanged() throws Exception{byte[] data = zooKeeper.getData("/app1/a1", new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {System.out.println("--->" + watchedEvent.getType());}}, null);System.out.println("--->" + new String(data));Thread.sleep(Integer.MAX_VALUE);}
Curator
Curator 是一套由netflix 公司开源的,Java 语言编程的 ZooKeeper 客户端框架,Curator项目
是现在ZooKeeper 客户端中使用最多,对ZooKeeper 版本支持最好的第三方客户端,并推荐使
用,Curator 把我们平时常用的很多 ZooKeeper 服务开发功能做了封装,例如 Leader 选举、
分布式计数器、分布式锁。这就减少了技术人员在使用 ZooKeeper 时的大部分底层细节开发工
作。在会话重新连接、Watch 反复注册、多种异常处理等使用场景中,用原生的 ZooKeeper
处理比较复杂。而在使用 Curator 时,由于其对这些功能都做了高度的封装,使用起来更加简
单,不但减少了开发时间,而且增强了程序的可靠性。
yaml
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>5.0.0</version><exclusions><exclusion><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-x-discovery</artifactId><version>5.0.0</version><exclusions><exclusion><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.5.8</version></dependency>
父类
@Slf4j
public abstract class CuratorStandaloneBase {private static final String CONNECT_STR = "192.168.109.200:2181";private static final int sessionTimeoutMs = 60*1000;private static final int connectionTimeoutMs = 5000;private static CuratorFramework curatorFramework;@Beforepublic void init() {RetryPolicy retryPolicy = new ExponentialBackoffRetry(5000, 30);curatorFramework = CuratorFrameworkFactory.builder().connectString(getConnectStr()).retryPolicy(retryPolicy).sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).canBeReadOnly(true).build();curatorFramework.getConnectionStateListenable().addListener((client, newState) -> {if (newState == ConnectionState.CONNECTED) {log.info("连接成功!");}});log.info("连接中......");curatorFramework.start();}public void createIfNeed(String path) throws Exception {Stat stat = curatorFramework.checkExists().forPath(path);if (stat==null){String s = curatorFramework.create().forPath(path);log.info("path {} created! ",s);}}public static CuratorFramework getCuratorFramework() {return curatorFramework;}/* @Afterpublic void test(){try {TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);} catch (InterruptedException e) {e.printStackTrace();}}*/protected String getConnectStr(){return CONNECT_STR;}
}
// 递归创建子节点@Testpublic void testCreateWithParent() throws Exception {CuratorFramework curatorFramework = getCuratorFramework();String pathWithParent = "/node-parent/sub-node-1";String path = curatorFramework.create().creatingParentsIfNeeded().forPath(pathWithParent);log.info("curator create node :{} successfully.", path);}// protection 模式,防止由于异常原因,导致僵尸节点@Testpublic void testCreate() throws Exception {CuratorFramework curatorFramework = getCuratorFramework();String forPath = curatorFramework.create().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/curator-node", "some-data".getBytes());log.info("curator create node :{} successfully.", forPath);}@Testpublic void testGetData() throws Exception {CuratorFramework curatorFramework = getCuratorFramework();byte[] bytes = curatorFramework.getData().forPath("/curator-node");log.info("get data from node :{} successfully.", new String(bytes));}@Testpublic void testSetData() throws Exception {CuratorFramework curatorFramework = getCuratorFramework();curatorFramework.setData().forPath("/curator-node", "changed!".getBytes());byte[] bytes = curatorFramework.getData().forPath("/curator-node");log.info("get data from node /curator-node :{} successfully.", new String(bytes));}@Testpublic void testDelete() throws Exception {CuratorFramework curatorFramework = getCuratorFramework();String pathWithParent = "/node-parent";curatorFramework.delete().guaranteed().deletingChildrenIfNeeded().forPath(pathWithParent);}@Testpublic void testListChildren() throws Exception {CuratorFramework curatorFramework = getCuratorFramework();String pathWithParent = "/discovery/example";List<String> strings = curatorFramework.getChildren().forPath(pathWithParent);strings.forEach(System.out::println);}//线程池方式@Testpublic void testThreadPool() throws Exception {CuratorFramework curatorFramework = getCuratorFramework();ExecutorService executorService = Executors.newSingleThreadExecutor();String ZK_NODE="/zk-node";curatorFramework.getData().inBackground((client, event) -> {log.info(" background: {}", event);},executorService).forPath(ZK_NODE);}
Watch机制
ZooKeeper的Watch机制是它的一个重要特性,它允许客户端在ZooKeeper节点发生变化时得到通知。通过Watch机制,客户端可以设置对指定节点的监视,并在节点发生变化(数据更新、节点删除、子节点变化等)时,ZooKeeper会通知相关的客户端。(相当与redis分布式锁中的watchDog机制
,是分布式锁中十分重要的机制,用于监听比自己小的节点
)
Watch机制的主要特点包括:
1.一次性触发:一旦Watch被设置在一个节点上,并且该节点发生了监视的事件(例如数据更新),那么Watch就会触发通知。一次触发后,Watch就会失效,客户端需要重新设置Watch以继续监视节点。
2.轻量级通知:ZooKeeper的Watch机制是轻量级的,因为Watch只是一个通知,不包含实际数据。客户端收到Watch通知后,可以根据需要再次向ZooKeeper请求节点的最新数据。
3.顺序性:Watch通知是有序的。也就是说,如果多个Watch被设置在一个节点上,当该节点发生变化时,通知的顺序是确定的,保证了客户端可以按照特定的顺序处理通知。
4.一次性回调:客户端收到Watch通知后,需要进行一次性回调来处理事件。在处理通知的过程中,如果发生了新的变化,之前的Watch不会再次触发。
需要注意的是,Watch机制并不保证强一致性。由于网络延迟或其他因素,Watch通知可能有一定的延迟,客户端可能会收到旧的数据变更通知。因此,客户端在处理Watch通知时,需要谨慎处理,并考虑可能出现的数据不一致情况。
Watch机制是ZooKeeper实现分布式协作的关键机制之一。通过Watch,客户端可以实时获取ZooKeeper节点的变化,从而在分布式系统中做出相应的处理,实现高效的协作和协调。
ZK分布式锁
创建节点,判断顺序号是否是最小的
创建永久节点,在普通节点下创建临时顺序节点,节点之间按顺序依次监听(通过watch机制),当拿到锁的节点处理完事务后,释放锁,后一个节点监听到前一个节点释放锁后,立刻申请获得锁,以此类推
过程解析
- 第一部分:客户端在Zookeeper集群创建临时顺序节点
- 第二部分:判断节点是否是当前最小的节点,如果是,获取锁,反之,监听前一个节点
原生方式实现Zookeeper的分布式锁
/*** 分布式锁 Zookeeper原生API*/
public class DistributedLock {private String connectString = "192.168.58.100:2181";private ZooKeeper client;private CountDownLatch countDownLatch = new CountDownLatch(1);private CountDownLatch waitLatch = new CountDownLatch(1);//当前节点private String currentNode;//要等待的节点private String waitPath;//1.连接Zookeeperpublic DistributedLock() throws Exception {client = new ZooKeeper(connectString, 300000, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {//连接上zk 释放if(watchedEvent.getState() == Event.KeeperState.SyncConnected){countDownLatch.countDown();}// waitLatch 需要释放,节点被删除,并且是前一个节点if(watchedEvent.getType() == Event.EventType.NodeDeleted &&watchedEvent.getPath().equals(waitPath)){waitLatch.countDown();}}});//zk 连接成功,再往下走countDownLatch.await();//2.判断节点是否存在Stat stat = client.exists("/locks", false);if(stat == null){//创建根节点client.create("/locks","locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}}//3.加锁public void zkLock() throws KeeperException, InterruptedException {//创建临时顺序节点currentNode = client.create("/locks/" + "seq-" ,null,ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);//判断创建的节点是否是最小序号节点,如果是 就获取锁,不是监听前一个节点List<String> children = client.getChildren("/locks", false);//如果集合中只有一个元素,可以直接获取锁if(children.size() == 1){return;}else{//先排序Collections.sort(children);//获取节点名称String nodeName = currentNode.substring("/locks/".length());//获取节点名称 在集合的位置int index = children.indexOf(nodeName);if(index == -1){System.out.println("数据异常");}else if(index == 0){return;}else{//需要监听前一个节点的变化waitPath = "/locks/" + children.get(index - 1);client.getData(waitPath,true,null);//等待监听执行waitLatch.await();return;}}}//解锁public void unZkLock() throws KeeperException, InterruptedException {//删除节点client.delete(currentNode,-1);}
}
Nginx代理转发
Curator框架实现分布式锁
实现思路
InterProcessMutex介绍
Apache Curator 内置了分布式锁的实现: InterProcessMutex
。
- InterProcessMutex有两个构造方法
public InterProcessMutex(CuratorFramework client, String path){this(client, path, new StandardLockInternalsDriver());
}public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver){this(client, path, LOCK_NAME, 1, driver);
}
- 参数说明如下
参数 | 说明 |
---|---|
client | curator中zk客户端对象 |
path | 抢锁路径,同一个锁path需一致 |
driver | 可自定义lock驱动实现分布式锁 |
- 主要方法
//获取锁,若失败则阻塞等待直到成功,支持重入
public void acquire() throws Exception//超时获取锁,超时失败
public boolean acquire(long time, TimeUnit unit) throws Exception//释放锁
public void release() throws Exception
- 注意点,调用acquire()方法后需相应调用release()来释放锁
代码实现
配置类
@Configuration
public class CuratorCfg {//Curator初始化@Bean(initMethod = "start")public CuratorFramework curatorFramework(){RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.40.240:2181", retryPolicy);return client;}
}
业务代码
@RestController
public class TestController {@Autowiredprivate OrderService orderService;@Value("${server.port}")private String port;public static final String product = "/product_";@AutowiredCuratorFramework curatorFramework;@PostMapping("/stock/deduct")public Object reduceStock(Integer id) throws Exception {InterProcessMutex interProcessMutex = new InterProcessMutex(curatorFramework, product + id);//互斥锁try {// ...interProcessMutex.acquire();//加锁orderService.reduceStock(id);} catch (Exception e) {if (e instanceof RuntimeException) {throw e;}}finally {interProcessMutex.release();//解锁}return "ok:" + port;}
}
全局异常处理类
@ControllerAdvice
public class ExceptionHandlerController {@ExceptionHandler@ResponseStatus(value = HttpStatus.BAD_REQUEST)@ResponseBodypublic Object exceptionHandler(RuntimeException e){Map<String,Object> result=new HashMap<>( );result.put( "status","error" );result.put( "message",e.getMessage() );return result;}
}
ZK注册中心
项目较小的话会考虑使用ZK做注册中心,原理:使用临时节点
spring.application.name=product-center1
#zookeeper 连接地址
spring.cloud.zookeeper.connect-string=192.168.40.240:2181
#将本服务注册到zookeeper,如果不希望直接被发现可以配置为false,默认为true
spring.cloud.zookeeper.discovery.register=true
spring.cloud.zookeeper.session-timeout=30000
ZAB协议
数据一致性协议,在分布式锁中半数以上的节点同步成功,才算加锁成功