Zookeeper笔记

为什么要使用Zookeeper

dubbo需要一个注册中心,而Zookeeper是我们在使用Dubbo是官方推荐的注册中心
在这里插入图片描述
在这里插入图片描述

Zookeeper介绍

在这里插入图片描述
Zookeeper的集群机制
Zookeeper是为了其他分布式程序提供服务的,所以不能随便就挂了。Zookeeper的集群机制采取的是半数存活机制。也就是整个集群节点中有半数以上的节点存活,那么整个集群环境可用。这也是为什么说zk的集群最好是奇数个节点。

zk的作用

序号功能
1为别的分布式程序服务的
2本身就是一个分布式程序
3主从协调 服务器节点动态上下线 统一配置管理 分布式共享锁 统一名称服务
4管理(存储,读取)用户程序提交的数据 并为用户程序提供数据节点监听服务

Zookeeper集群节点的角色

Leader
Leader是Zookeeper集群工作的核心,其主要工作是:

  1. 事务请求的唯一调度和处理者,保证集群事务处理的顺序性。
  2. 集群内部各服务器的调度者

Follower
Follower是zookeeper集群的跟随者,主要工作是:

  1. 处理客户端非事务性请求(读取数据),转发事务请求给Leader服务器
  2. 参与事务请求Proposal的投票
  3. 参与Leader选举投票

Observer
Observer充当观察者的角色,观察Zookeeper集群的最小状态变化并将这些状态同步过来,其对于非事务请求可以独立处理,对于事务请求,会转给Leader节点进行处理。Observer不会参与投票,包括事务请求Proposal的投票和Leader选举投票

集群环境准备

节点的映射关系

每个节点设置相应的ip和主机名的映射关系,方便集群环境的部署
修改hosts配置文件中的信息
在这里插入图片描述

配置免密登录
生成公钥和私钥

ssh-keygen

输入命令后根据提示,四次回车即可
在这里插入图片描述

发送公钥给需要免密登录的节点

ssh-copy-id zk01
ssh-copy-id zk02
ssh-copy-id zk03

在这里插入图片描述
节点和节点发送文件通过scp命令实现

	scp -r b.txt bobo01:/root/

在这里插入图片描述
关闭防火墙
查看防火墙状态

firewall-cmd --state

停止防火墙

systemctl stop firewall.service

禁止开机启动

systemctl disable firewall.service

Zookeeper的选举机制

Leader主要作用是保证分布式数据一致性,即每个节点的存储的数据同步。

服务器初始化时Leader选举
Zookeeper由于自身的性质,一般建议选取奇数个节点进行搭建分布式服务器集群。以3个节点组成的服务器集群为例,说明服务器初始化时的选举过程。启动第一台安装Zookeeper的节点时,无法单独进行选举,启动第二台时,两节点之间进行通信,开始选举Leader。

  1. 每个Server投出一票。第一次他们都投给自己作为Leader,投票内容未(SID,ZXID)。
    SID为Server的id,即启动ZK时配置文件中的myid;
    ZXID为事务id,为节点的更新程序,ZXID越大,代表Server对ZK节点的操作越新。由于服务器初始化,
    每个Sever上的Znode为0,所以Server1投的票为(1,0),Server2为(2,0)。两Server将各自投票发给集群中其他机器。
  2. 每个Server接收来自其他Server的投票。集群中的每个Server先判断投票的有效性,如检查是不是本轮的投票,是不是来Looking状态的服务器投的票。
  3. 对投票结果进行处理。处理规则为:
    • 首先对比ZXID。ZXID大的服务器优先作为Leader
    • 若ZXID系统,如初始化时,每个Server的ZXID都是0
    • 就会比较sid即myid,myid大的选出来做Leader。
      首次选举对于Server而言,他接受到的投票为(2,0),因为自身的票为(1,0),所以此时它会选举Server2为Leader,
      将自己的更新为(2,0)。而Server2收到的投票为Server1的(1,0)由于比他自己小,
      Server2的投票不变。Server1和Server2再次将票投出,投出的票都为(2,0)
  4. 统计投票。每次投票后,服务器都会统计投票信息,如果判定某个Server有过半的票数,俺么该Server就是Leader。首次投票对于Server1和Server2而言,统计出已经有两台机器接收了(2,0)的投票信息,此时认为选出了Leader。
  5. 改变服务器的状态。当确定了Leader之后,每个Server更新自己的状态,
    Leader将状态更新为Leading,Follower将状态更新为Following。
    在这里插入图片描述
    服务器运行期间的Leader选举
    ZK运行期间,如果有新的Server加入,或非Leader节点挂了,那么LEader会同步数据给新的Server或寻找其他备用Server替代宕机的Server。若Leader宕机,此时集群暂停对外服务,开始在内部选举新的Leader。假设当前集群中有Server1、Server2、Server3三台服务器,Server2为当前集群的Leader,由于意外情况,Server2宕机了,便开始进入选举状态。过程如下

1 变更状态。其他非Observer服务器将自己的状态改变成Looking,开始进入Leader选举。
2. 每个Server投出1张票(myid,ZXID),由于集群运行过,所以每个Server的ZXID可能不同。
假设Server1的ZXID为145,Server3的为122,第一轮投票中,Server1和Server3都投自己,
票分别为(1,145)、(3,122),将自己的票发送给集群中所有机器。
3. 每个Server接收接收来自其他Server的投票,接下来的步骤与初始化时相同。

Zookeeper客户端使用

配置Zookeeper的环境变量
为了简化我们每次操作Zookeeper而不用进入到Zookeeper的安装目录,我们可以将Zookeeper的安装信息配置到系统的环境变量中

vim /etc/profile

添加的内容

export ZOOKEPPER_HOME=/opt/zookeeper
export PATH=$PATH:$ZOOKEEPER_HOME/bin

执行source命令

source /etc/profile

我们就可以在节点的任意位置操作Zookeeper了,通过scp命令将profile文件发送到其他几个节点上

scp /etc/profile zk02:/etc/

客户端连接
通过bin目录下的zkCli.sh 命令连接即可

	zkCli.sh

在这里插入图片描述
zkCli.sh默认连接的是当前节点的Zookeeper节点,如果我们要连接其他节点执行如下命令即可

	zkCli.sh -timeout 5000 -server zk02:2181

数据操作

Zookeeper的数据结构

  1. 层次化的目录结构,命名符合常规文件系统规范
  2. 每个节点在Zookeeper中叫做znode,并且有一个唯一的路径标识
  3. 节点znode可以包含数据和子节点(但是EPHEMERAL类型的节点不能有子节点)
  4. 客户端应用可以在节点上设置监听器
    在这里插入图片描述

节点类型

1).znode有两种类型:

短暂性(ephemeral)(断开连接自己删除)
持久性(persistent)(断开连接不删除)

2).znode有四种形式的目录节点(默认是persistent)如下

序号节点类型描述
1PERSISTENT持久节点
2PERSISTENT_SEQUENTIAL持久有序节点(顺序节点)
3EPHEMERAL短暂节点 (临时节点)
4EPHEMERAL_SEQUENTIAL短暂有序节点 (临时顺序节点)

创建znode时设置顺序标识,znode名称后会附加一个值,顺序号是一个单调递增的计数器,有父节点维护在分布式系统中,顺序号可以被用于为所有的事件进行全局排序,这样客户端可以通过顺序号推断事件的顺序

常用命令
Zookeeper作为Dubbo的注册中心用来保存我们各个服务的节点信息,显示Zookeeper是可以实现输出的存储操作的,我们来看下Zookeeper中存储操作的基本命令

ls
​ ls用来查看某个节点下的子节点信息
在这里插入图片描述
增强的命令,查看节点下的子节点及当前节点的属性信息 ls2或者 ls -s 命令
在这里插入图片描述
create
​ 创建节点信息
在这里插入图片描述
get

​ get命令用来查看节点的数据
在这里插入图片描述
如果要查看节点的属性信息那么我们可以通过get -s 来实现

delete
​ delete只能删除没有子节点的节点要删除非空节点可以通过 rmr 或者 deleteall 命令实现
在这里插入图片描述
set

​ set命令可以用来修改节点的内容。
在这里插入图片描述
事件监听
监听某个节点的数据内容变化,通过get命令 带 -w 参数即可,在3.4版本的Zookeeper中是通过 get path watch 来说实现监控的
在这里插入图片描述
然后我们在其他节点上修改app1节点的数据,会触监听事件

Zookeeper Java API使用

pom

	<dependencies><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.5.9</version></dependency><dependency><groupId>com.github.sgroschupf</groupId><artifactId>zkclient</artifactId><version>0.1</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency></dependencies>

连接ZK服务,并监听节点变化

@Slf4j
public class ConfigCenter {private final static  String CONNECT_STR="192.168.40.243:2181";private final static Integer  SESSION_TIMEOUT=30*1000;private static ZooKeeper zooKeeper=null;private static CountDownLatch countDownLatch=new CountDownLatch(1);public static void main(String[] args) throws IOException, InterruptedException, KeeperException {zooKeeper=new ZooKeeper(CONNECT_STR, SESSION_TIMEOUT, new Watcher() {@Overridepublic void process(WatchedEvent event) {if (event.getType()== Event.EventType.None&& event.getState() == Event.KeeperState.SyncConnected){log.info("连接已建立");countDownLatch.countDown();}}});countDownLatch.await();MyConfig myConfig = new MyConfig();myConfig.setKey("anykey");myConfig.setName("anyName");ObjectMapper objectMapper=new ObjectMapper();byte[] bytes = objectMapper.writeValueAsBytes(myConfig);String s = zooKeeper.create("/myconfig", bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);Watcher watcher = new Watcher() {@SneakyThrows@Overridepublic void process(WatchedEvent event) {if (event.getType()== Event.EventType.NodeDataChanged&& event.getPath()!=null && event.getPath().equals("/myconfig")){log.info(" PATH:{}  发生了数据变化" ,event.getPath());//循环监听//监听结束后,重新设置byte[] data = zooKeeper.getData("/myconfig", this, null);MyConfig newConfig = objectMapper.readValue(new String(data), MyConfig.class);log.info("数据发生变化: {}",newConfig);}}};byte[] data = zooKeeper.getData("/myconfig", watcher, null);MyConfig originalMyConfig = objectMapper.readValue(new String(data), MyConfig.class);log.info("原始数据: {}", originalMyConfig);//        TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);}

在这里插入图片描述
standalone版本

@Slf4j
public abstract class  StandaloneBase {private static final String CONNECT_STR="192.168.109.200:2181";private static final int SESSION_TIMEOUT=30 * 1000;private static ZooKeeper zooKeeper =null;private static CountDownLatch countDownLatch = new CountDownLatch(1);private Watcher watcher =new Watcher() {@Overridepublic void process(WatchedEvent event) {if (event.getState() == Event.KeeperState.SyncConnected&& event.getType()== Event.EventType.None){countDownLatch.countDown();log.info("连接建立");}}};@Beforepublic void init(){try {log.info(" start to connect to zookeeper server: {}",getConnectStr());zooKeeper=new ZooKeeper(getConnectStr(), getSessionTimeout(), watcher);log.info(" 连接中...");countDownLatch.await();} catch (IOException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}public static ZooKeeper getZooKeeper() {return zooKeeper;}@Afterpublic void test(){try {TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);} catch (InterruptedException e) {e.printStackTrace();}}protected String getConnectStr(){return CONNECT_STR;}protected int getSessionTimeout() {return SESSION_TIMEOUT;}
}
private String first_node = "/first-node";/*** 创建节点*/@Testpublic void testCreate() throws KeeperException, InterruptedException {ZooKeeper zooKeeper = getZooKeeper();String s = zooKeeper.create(first_node, "first".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);log.info("Create:{}",s);}/*** 获得节点数据*/@Testpublic void testGetData(){Watcher watcher = new Watcher() {@Overridepublic void process(WatchedEvent event) {if (event.getPath()!=null && event.getPath().equals(first_node)&& event.getType()== Event.EventType.NodeDataChanged){log.info(" PATH: {}  发现变化",first_node);try {byte[] data = getZooKeeper().getData(first_node, this, null);log.info(" data: {}",new String(data));} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}}};try {byte[] data = getZooKeeper().getData(first_node, watcher, null);  //log.info(" data: {}",new String(data));} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}/*** 异步获取节点数据*/@Testpublic void  asyncTest(){String userId="xxx";getZooKeeper().getData("/test", false, (rc, path, ctx, data, stat) -> {Thread thread = Thread.currentThread();log.info(" Thread Name: {},   rc:{}, path:{}, ctx:{}, data:{}, stat:{}",thread.getName(),rc, path, ctx, data, stat);},"test");log.info(" over .");}/*** 判断节点是否存在*/@Testpublic void exist() throws  Exception{ZooKeeper zooKeeper = getZooKeeper();// true表示的是使用Zookeeper中的watchStat stat = zooKeeper.exists(first_node, true);if(stat != null){System.out.println("节点存在"+ stat.getNumChildren());}else{System.out.println("节点不存在 ....");}}/*** 获取某个节点下面的所有的子节点*/@Testpublic void getChildrens() throws Exception{ZooKeeper zooKeeper = getZooKeeper();List<String> childrens = zooKeeper.getChildren(first_node, true);for (String children : childrens) {// System.out.println(children);// 获取子节点中的数据byte[] data = zooKeeper.getData(first_node+"/" + children, false, null);System.out.println(children+":" + new String(data));}}/*** 修改节点的内容*/@Testpublic void setData() throws Exception{// -1 不指定版本 自动维护Stat stat = zooKeeper.setData(first_node+"/a1", "666666".getBytes(), -1);System.out.println(stat);//  指定版本 自动维护//ZooKeeper zooKeeper = getZooKeeper();//Stat stat = new Stat();//byte[] data = zooKeeper.getData(first_node, false, stat);//int version = stat.getVersion();//版本号//zooKeeper.setData(first_node, "third".getBytes(), version);}/*** 删除节点*/@Testpublic void deleteNode() throws Exception{zooKeeper.delete(first_node,-1);}

事件监听处理

/*** 监听Node节点下的子节点的变化*/@Testpublic void nodeChildrenChange() throws Exception{List<String> list = zooKeeper.getChildren("/app1", new Watcher() {/***              None(-1),*             NodeCreated(1),*             NodeDeleted(2),*             NodeDataChanged(3),*             NodeChildrenChanged(4),*             DataWatchRemoved(5),*             ChildWatchRemoved(6);* @param watchedEvent*/@Overridepublic void process(WatchedEvent watchedEvent) {System.out.println("--->"+ watchedEvent.getType());}});for (String s : list) {System.out.println(s);}Thread.sleep(Integer.MAX_VALUE);}/*** 监听节点内容变更*/@Testpublic void nodeDataChanged() throws Exception{byte[] data = zooKeeper.getData("/app1/a1", new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {System.out.println("--->" + watchedEvent.getType());}}, null);System.out.println("--->" + new String(data));Thread.sleep(Integer.MAX_VALUE);}

Curator

Curator 是一套由netflix 公司开源的,Java 语言编程的 ZooKeeper 客户端框架,Curator项目
是现在ZooKeeper 客户端中使用最多,对ZooKeeper 版本支持最好的第三方客户端,并推荐使
用,Curator 把我们平时常用的很多 ZooKeeper 服务开发功能做了封装,例如 Leader 选举、
分布式计数器、分布式锁。这就减少了技术人员在使用 ZooKeeper 时的大部分底层细节开发工
作。在会话重新连接、Watch 反复注册、多种异常处理等使用场景中,用原生的 ZooKeeper
处理比较复杂。而在使用 Curator 时,由于其对这些功能都做了高度的封装,使用起来更加简
单,不但减少了开发时间,而且增强了程序的可靠性。

yaml

	<dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>5.0.0</version><exclusions><exclusion><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-x-discovery</artifactId><version>5.0.0</version><exclusions><exclusion><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.5.8</version></dependency>

父类

@Slf4j
public abstract  class CuratorStandaloneBase {private static final String CONNECT_STR = "192.168.109.200:2181";private static final int sessionTimeoutMs = 60*1000;private static final int connectionTimeoutMs = 5000;private static CuratorFramework curatorFramework;@Beforepublic void init() {RetryPolicy retryPolicy = new ExponentialBackoffRetry(5000, 30);curatorFramework = CuratorFrameworkFactory.builder().connectString(getConnectStr()).retryPolicy(retryPolicy).sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).canBeReadOnly(true).build();curatorFramework.getConnectionStateListenable().addListener((client, newState) -> {if (newState == ConnectionState.CONNECTED) {log.info("连接成功!");}});log.info("连接中......");curatorFramework.start();}public void createIfNeed(String path) throws Exception {Stat stat = curatorFramework.checkExists().forPath(path);if (stat==null){String s = curatorFramework.create().forPath(path);log.info("path {} created! ",s);}}public static CuratorFramework getCuratorFramework() {return curatorFramework;}/* @Afterpublic void   test(){try {TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);} catch (InterruptedException e) {e.printStackTrace();}}*/protected   String getConnectStr(){return CONNECT_STR;}
}
// 递归创建子节点@Testpublic void testCreateWithParent() throws Exception {CuratorFramework curatorFramework = getCuratorFramework();String pathWithParent = "/node-parent/sub-node-1";String path = curatorFramework.create().creatingParentsIfNeeded().forPath(pathWithParent);log.info("curator create node :{}  successfully.", path);}// protection 模式,防止由于异常原因,导致僵尸节点@Testpublic void testCreate() throws Exception {CuratorFramework curatorFramework = getCuratorFramework();String forPath = curatorFramework.create().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/curator-node", "some-data".getBytes());log.info("curator create node :{}  successfully.", forPath);}@Testpublic void testGetData() throws Exception {CuratorFramework curatorFramework = getCuratorFramework();byte[] bytes = curatorFramework.getData().forPath("/curator-node");log.info("get data from  node :{}  successfully.", new String(bytes));}@Testpublic void testSetData() throws Exception {CuratorFramework curatorFramework = getCuratorFramework();curatorFramework.setData().forPath("/curator-node", "changed!".getBytes());byte[] bytes = curatorFramework.getData().forPath("/curator-node");log.info("get data from  node /curator-node :{}  successfully.", new String(bytes));}@Testpublic void testDelete() throws Exception {CuratorFramework curatorFramework = getCuratorFramework();String pathWithParent = "/node-parent";curatorFramework.delete().guaranteed().deletingChildrenIfNeeded().forPath(pathWithParent);}@Testpublic void testListChildren() throws Exception {CuratorFramework curatorFramework = getCuratorFramework();String pathWithParent = "/discovery/example";List<String> strings = curatorFramework.getChildren().forPath(pathWithParent);strings.forEach(System.out::println);}//线程池方式@Testpublic void testThreadPool() throws Exception {CuratorFramework curatorFramework = getCuratorFramework();ExecutorService executorService = Executors.newSingleThreadExecutor();String ZK_NODE="/zk-node";curatorFramework.getData().inBackground((client, event) -> {log.info(" background: {}", event);},executorService).forPath(ZK_NODE);}

Watch机制

ZooKeeper的Watch机制是它的一个重要特性,它允许客户端在ZooKeeper节点发生变化时得到通知。通过Watch机制,客户端可以设置对指定节点的监视,并在节点发生变化(数据更新、节点删除、子节点变化等)时,ZooKeeper会通知相关的客户端。(相当与redis分布式锁中的watchDog机制,是分布式锁中十分重要的机制,用于监听比自己小的节点

Watch机制的主要特点包括:

1.一次性触发:一旦Watch被设置在一个节点上,并且该节点发生了监视的事件(例如数据更新),那么Watch就会触发通知。一次触发后,Watch就会失效,客户端需要重新设置Watch以继续监视节点。

2.轻量级通知:ZooKeeper的Watch机制是轻量级的,因为Watch只是一个通知,不包含实际数据。客户端收到Watch通知后,可以根据需要再次向ZooKeeper请求节点的最新数据。

3.顺序性:Watch通知是有序的。也就是说,如果多个Watch被设置在一个节点上,当该节点发生变化时,通知的顺序是确定的,保证了客户端可以按照特定的顺序处理通知。

4.一次性回调:客户端收到Watch通知后,需要进行一次性回调来处理事件。在处理通知的过程中,如果发生了新的变化,之前的Watch不会再次触发。

需要注意的是,Watch机制并不保证强一致性。由于网络延迟或其他因素,Watch通知可能有一定的延迟,客户端可能会收到旧的数据变更通知。因此,客户端在处理Watch通知时,需要谨慎处理,并考虑可能出现的数据不一致情况。

Watch机制是ZooKeeper实现分布式协作的关键机制之一。通过Watch,客户端可以实时获取ZooKeeper节点的变化,从而在分布式系统中做出相应的处理,实现高效的协作和协调。

ZK分布式锁

创建节点,判断顺序号是否是最小的
在这里插入图片描述
在这里插入图片描述

创建永久节点,在普通节点下创建临时顺序节点,节点之间按顺序依次监听(通过watch机制),当拿到锁的节点处理完事务后,释放锁,后一个节点监听到前一个节点释放锁后,立刻申请获得锁,以此类推
过程解析

  • 第一部分:客户端在Zookeeper集群创建临时顺序节点
  • 第二部分:判断节点是否是当前最小的节点,如果是,获取锁,反之,监听前一个节点
    原生方式实现Zookeeper的分布式锁
/*** 分布式锁 Zookeeper原生API*/
public class DistributedLock {private String connectString = "192.168.58.100:2181";private ZooKeeper client;private CountDownLatch countDownLatch = new CountDownLatch(1);private CountDownLatch waitLatch = new CountDownLatch(1);//当前节点private String currentNode;//要等待的节点private String waitPath;//1.连接Zookeeperpublic DistributedLock() throws Exception {client = new ZooKeeper(connectString, 300000, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {//连接上zk  释放if(watchedEvent.getState() == Event.KeeperState.SyncConnected){countDownLatch.countDown();}// waitLatch 需要释放,节点被删除,并且是前一个节点if(watchedEvent.getType() == Event.EventType.NodeDeleted &&watchedEvent.getPath().equals(waitPath)){waitLatch.countDown();}}});//zk 连接成功,再往下走countDownLatch.await();//2.判断节点是否存在Stat stat = client.exists("/locks", false);if(stat == null){//创建根节点client.create("/locks","locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}}//3.加锁public void zkLock() throws KeeperException, InterruptedException {//创建临时顺序节点currentNode = client.create("/locks/" + "seq-" ,null,ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);//判断创建的节点是否是最小序号节点,如果是 就获取锁,不是监听前一个节点List<String> children = client.getChildren("/locks", false);//如果集合中只有一个元素,可以直接获取锁if(children.size() == 1){return;}else{//先排序Collections.sort(children);//获取节点名称String nodeName = currentNode.substring("/locks/".length());//获取节点名称 在集合的位置int index = children.indexOf(nodeName);if(index == -1){System.out.println("数据异常");}else if(index == 0){return;}else{//需要监听前一个节点的变化waitPath = "/locks/" + children.get(index - 1);client.getData(waitPath,true,null);//等待监听执行waitLatch.await();return;}}}//解锁public void unZkLock() throws KeeperException, InterruptedException {//删除节点client.delete(currentNode,-1);}
}

Nginx代理转发
在这里插入图片描述
Curator框架实现分布式锁
实现思路
image.png

InterProcessMutex介绍

Apache Curator 内置了分布式锁的实现: InterProcessMutex

  • InterProcessMutex有两个构造方法
public InterProcessMutex(CuratorFramework client, String path){this(client, path, new StandardLockInternalsDriver());
}public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver){this(client, path, LOCK_NAME, 1, driver);
}
  • 参数说明如下
参数说明
clientcurator中zk客户端对象
path抢锁路径,同一个锁path需一致
driver可自定义lock驱动实现分布式锁
  • 主要方法
//获取锁,若失败则阻塞等待直到成功,支持重入
public void acquire() throws Exception//超时获取锁,超时失败
public boolean acquire(long time, TimeUnit unit) throws Exception//释放锁
public void release() throws Exception
  • 注意点,调用acquire()方法后需相应调用release()来释放锁

代码实现
配置类

@Configuration
public class CuratorCfg {//Curator初始化@Bean(initMethod = "start")public CuratorFramework curatorFramework(){RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.40.240:2181", retryPolicy);return client;}
}

业务代码

@RestController
public class TestController {@Autowiredprivate OrderService orderService;@Value("${server.port}")private String port;public static final String product = "/product_";@AutowiredCuratorFramework curatorFramework;@PostMapping("/stock/deduct")public Object reduceStock(Integer id) throws Exception {InterProcessMutex interProcessMutex = new InterProcessMutex(curatorFramework, product + id);//互斥锁try {// ...interProcessMutex.acquire();//加锁orderService.reduceStock(id);} catch (Exception e) {if (e instanceof RuntimeException) {throw e;}}finally {interProcessMutex.release();//解锁}return "ok:" + port;}
}

全局异常处理类

@ControllerAdvice
public class ExceptionHandlerController {@ExceptionHandler@ResponseStatus(value = HttpStatus.BAD_REQUEST)@ResponseBodypublic Object exceptionHandler(RuntimeException e){Map<String,Object> result=new HashMap<>(  );result.put( "status","error" );result.put( "message",e.getMessage() );return result;}
}

ZK注册中心

项目较小的话会考虑使用ZK做注册中心,原理:使用临时节点

spring.application.name=product-center1
#zookeeper 连接地址
spring.cloud.zookeeper.connect-string=192.168.40.240:2181
#将本服务注册到zookeeper,如果不希望直接被发现可以配置为false,默认为true
spring.cloud.zookeeper.discovery.register=true
spring.cloud.zookeeper.session-timeout=30000

ZAB协议

数据一致性协议,在分布式锁中半数以上的节点同步成功,才算加锁成功

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/18255.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【MySQL】下载安装以及SQL介绍

1&#xff0c;数据库相关概念 以前我们做系统&#xff0c;数据持久化的存储采用的是文件存储。存储到文件中可以达到系统关闭数据不会丢失的效果&#xff0c;当然文件存储也有它的弊端。 假设在文件中存储以下的数据&#xff1a; 姓名 年龄 性别 住址 张三 23 男 北京…

mysql进阶-用户的创建_修改_删除

1. 使用mysql单次查询 [rootVM-4-6-centos /]# mysql -h localhost -P 3306 -p mytest -e "select * from book1"; Enter password: ------------------------------------------- | id | category_id | book_name | num | ----------------------------…

第17节 R语言分析:生物统计数据集 R 编码分析和绘图

生物统计数据集 R 编码分析和绘图 生物统计学,用于对给定文件 data.csv 中的医疗数据应用 R 编码,该文件是患者人口统计数据集,包含有关来自各种祖先谱系的个体的标准信息。 数据集特征解释 脚本 output= file("Output.txt") # File name of output log sink(o…

Spring-mybatis结合的底层原理

1.项目前期准备 1.1 导入maven jar包 <dependencies><!-- spring依赖 --><dependency><groupId>org.springframework</groupId><artifactId>spring-context</artifactId><version>5.2.5.RELEASE</version></depende…

js中的设计模式

设计模式 代码整体的结构会更加清楚&#xff0c;管理起来会更加方便&#xff0c;更好地维护 设计模式是一种思想 发布订阅 模块化开发 导入很多模块 容器即数组存储未来要执行的方法&#xff0c;同addEventListener 数组塌陷问题* 由于删除了元素&#xff0c;导致从删除元素的位…

easyui实用点

easyui实用点 1.下拉框&#xff08;input框只能选不能手动输入编辑&#xff09; data-options"editable:false"//不可编辑2.日期框&#xff0c;下拉框&#xff0c;文本框等class class"easyui-datebox"//不带时分秒 class"easyui-datetimebox"…

idea调节文字大小、日志颜色、git改动信息

idea调节菜单栏文字大小&#xff1a; 调节代码文字大小&#xff1a; 按住ctrl滚动滑轮可以调节代码文字大小&#xff1a; 单击文件即可在主窗口上打开显示&#xff1a; idea在控制台对不同级别的日志打印不同颜色 &#xff1a; “grep console”插件 点击某一行的时候&#x…

关于会议OA需求分析与开发功能设计

前言&#xff1a;现如今&#xff0c;企业在会议管理方面对OA系统的需求越来越高。因为会议是企业内部沟通和协作的重要环节&#xff0c;一个高效的会议管理系统可以帮助企业提升会议效率、降低成本&#xff0c;并且提高内部信息共享的效果。 目录 一&#xff0c;以下是OA系统在…

[threejs]相机与坐标

搞清相机和坐标的关系在threejs初期很重要&#xff0c;否则有可能会出现写了代码&#xff0c;运行时一片漆黑的现象&#xff0c;这种情况就有可能是因为你相机没弄对。 先来看一下threejs中的坐标(世界坐标) 坐标轴好理解&#xff0c;大家只需要知道在three中不同颜色代表的轴…

ARM单片机中断处理过程解析

前言 中断&#xff0c;在单片机开发中再常见不过了。当然对于中断的原理和执行流程都了然于胸&#xff0c;那么对于ARM单片机中断的具体处理行为&#xff0c;你真的搞清楚了吗&#xff1f; 今天来简单聊一聊&#xff0c;ARM单片机中断处理过程中的具体行为是什么样的&#xf…

一张图像相当于 16×16 个单词:用于大规模图像识别的 Transformers(视觉 Transformers)

一张图片值多少字? 一张图片胜过千言万语?无法用言语完整地描述一幅图画。但论文告诉我们一张图像相当于 1616 个单词。在这篇博客中,我将解释使用 Transformer 进行图像识别。这是一篇非常有趣的论文, 这篇论文有什么特别之处? 它很特别,因为这里我们不会使用任何卷积网…

vite+typescript项目 :找不到模块“./***.vue”或其相应的类型声明——解决方案

vue3ts报错&#xff1a; 找不到模块“./App.vue”或其相应的类型声明。ts(2307) 解决方法&#xff1a; 1、在src文件夹找到 vite-env.d.ts 加入以下代码&#xff1a; declare module *.vue {import type { DefineComponent } from vueconst vueComponent: DefineComponent<…

django使用ztree实现树状结构效果,子节点实现动态加载(l懒加载)

一、实现的效果 由于最近项目中需要实现树状结构的效果,考虑到ztree这个组件大家用的比较多,因此打算在django项目中集成ztree来实现树状的效果。最终实现的示例效果如下: 点击父节点,如果有子节点,则从后台动态请求数据,然后显示出子节点的数据。 二、实现思路 …

AndroidBanner - ViewPager

解决banner 不可见依旧轮播的问题 思考一下&#xff1a;什么时候可以轮播&#xff0c;什么时候不可以轮播 当Banner添加到屏幕上&#xff0c;且对用户可见的时候&#xff0c;可以开始轮播 当Banner从屏幕上移除&#xff0c;或者Banner不可见的时候&#xff0c;可以停止轮播 当…

P2404 自然数的拆分问题

题目 思路 最简单的dfs题之一 只需要一点点小优化 代码 #include<bits/stdc.h> using namespace std; const int maxn55; int n; int ans[maxn],s; void print(int tot) { for(int i1;i<tot-1;i) cout<<ans[i]<<""; cout<<ans[tot-1]&…

微信小程序,商城底部工具栏的实现

效果演示&#xff1a; 前提条件&#xff1a; 去阿里云矢量图标&#xff0c;下载8个图标&#xff0c;四个黑&#xff0c;四个红&#xff0c;如图&#xff1a; 新建文件夹icons&#xff0c;把图标放到该文件夹&#xff0c;然后把该文件夹移动到该项目的文件夹里面。如图所示 app…

jmeter之接口测试(http接口测试)

基础知识储备 一、了解jmeter接口测试请求接口的原理 客户端--发送一个请求动作--服务器响应--返回客户端 客户端--发送一个请求动作--jmeter代理服务器---服务器--jmeter代理服务器--服务器 二、了解基础接口知识&#xff1a; 1、什么是接口&#xff1a;前端与后台之间的…

动手学深度学习(一)预备知识

目录 一、数据操作 1. N维数组样例 2. 访问元素 3. 基础函数 &#xff08;1&#xff09; 创建一个行向量 &#xff08;2&#xff09;通过张量的shape属性来访问张量的形状和元素总数 &#xff08;3&#xff09;reshape()函数 &#xff08;4&#xff09;创建全0、全1、…

CorelDraw怎么做立体字效果?CorelDraw制作漂亮的3d立体字教程

1、打开软件CorelDRAW 2019&#xff0c;用文本工具写上我们所需要的大标题。建议字体选用比较粗的适合做标题的字体。 2、给字填充颜色&#xff0c;此时填充的颜色就是以后立体字正面的颜色。我填充了红色&#xff0c;并加上了灰色的描边。 3、选中文本&#xff0c;单击界面左侧…

java 企业工程管理系统软件源码+Spring Cloud + Spring Boot +二次开发+ MybatisPlus + Redis

&#xfeff; Java版工程项目管理系统 Spring CloudSpring BootMybatisVueElementUI前后端分离 功能清单如下&#xff1a; 首页 工作台&#xff1a;待办工作、消息通知、预警信息&#xff0c;点击可进入相应的列表 项目进度图表&#xff1a;选择&#xff08;总体或单个&am…