【分布式系统】Zookeeper学习笔记

在这里插入图片描述

基本概念

Zookeeper工作机制

  1. 从设计模式角度理解: 是一个基于观察者模式设计的分布式服务管理框架; 负责存储和管理大家都关心的数据, 一旦这些数据的状态发生变化, Zookeeper就将负责通知已经在Zookeeper上注册的那些观察值做出相应的反应.

Zookeeper特点

  1. Zookeeper有: 一个领导者(Leader), 多个跟随者(Follower)组成的集群
  2. 集群中只要有半数以上节点存活, Zookeeper集群就能正常服务, 所以Zookeeper适合安装数台服务器
  3. 全局数据一致: 每个Server保存一份相同的数据副本, Client无论连接哪个Server, 数据都是一致的
  4. 更新请求顺序执行: 来自同一个Client的更新请求按其发送顺序依次执行
  5. 数据更新原子性: 一次数据更新要么成功, 要么失败
  6. 实时性: 在一定时间范围内, Client能读到最新的数据

数据结构

在这里插入图片描述

  1. Zookeeper数据模型与Unix文件系统类似
  2. 每个节点称作ZNode
  3. 每个ZNode都能够存储1MB数据
  4. 每个ZNode都可以通过其路径唯一标识

选举机制

Zookeeper第一次选举机制

假设有5台服务器, 且按顺序启动, 则会发生:

  1. 服务器1启动, 发起一次选举, 投自己一票. 此时服务器1票数为1, 选举无法完成, 状态保持为Looking
  2. 服务器2启动, 再发起一次选举. 服务器1和服务器2分别投自己1票并交换选票信息, 服务器1因为**myid小于服务器2, 所以改选票为推举服务器2**. 此时服务器1为0票, 服务器2为2票.
  3. 服务器3启动; 发起一次选举, 根据之前的交换规则, 服务器1/2都会投票服务器3, 此时选票超过半数, 服务器1,2将状态改为FOLLOWING, 服务器3状态改为LEADER
  4. 服务器4启动, 发起一次选举, 根据少数服从多数原则, 服务器4将自己选票投给服务器3, 并将自己的状态改为FOLLOWER
  5. 服务器5同服务器4, 将选票给服务器3, 并将自己状态设置为FOLLOWER

其中, 每个节点都有:

  1. 服务器ID(SID): 用来唯一标识Zookeeper集群中的机器, 每台机器不能重复, 与myid一致
  2. 事务ID(ZXID): ZXID是一个事务ID, 用来标识一次服务器状态的变更, 在某一时刻, 集群中的每一台机器的ZXID值不一定完全一致, 这和ZooKeeper服务器对客户端"更新请求"的处理逻辑相关
  3. Epoch: 每个Leader任期的代号. 没有Leader时同一轮投票过程中的逻辑时钟值是相同的. 每投完一次票这个数据就会增加

ZooKeeper非第一次选举机制

上述情况下, Leader宕机

  1. 当出现以下情况, 就会发生重新选举
    1. 服务器初始化启动
    2. 服务器运行期间无法和Leader保持连接
  2. 当一台服务器进入Leader选举流程时, 集群可能会处于一下两种状态
    1. 集群中存在Leader: 选举过程中, 宕机服务器会被告知Leader的信息, 之后会再与Leader建立连接恢复原状态
    2. 集群中不存在Leader, 且存在半数或以上的服务器正常: 正常服务器按照(EPOCH, ZXID, SID)进行排序选举

读写机制

写数据原理

访问Leader

在这里插入图片描述

  1. 写Leader
  2. Leader同步超过半数节点
  3. 返回ack
  4. Leader继续同步后面节点
访问Follower

在这里插入图片描述

  1. 写Follower
  2. Follower将请求转发给Leader
  3. Leader先完成写, 然后通知Follower写
  4. 当超过半数完成写后, 应答Follower ack
  5. Follower应答客户ACK
  6. Leader继续同步剩余节点

读流程

  1. 客户端请求Zookeeper中的数据是通过访问集群中的某一节点通常是配置的url对应的节点, 然后从该节点总请求数据
  2. 由于Zookeeper只保证了系统的强一致性, 并没有保证所有节点都完成写操作后再返回ACK, 因此客户端可能请求到脏数据

节点类型

节点分类

在这里插入图片描述

节点类型有以下几种

  1. 持久(Persistent): 客户端和服务端断开连接后, 创建的节点不删除
  2. 短暂(Ephemeral): 客户端和服务端断开连接后, 创建的节点自己删除
  3. 持久化目录节点: 客户端和ZooKeeper断开连接后, 节点依旧存在 顺序号可以为所有的顺序进行全局排序, 这样客户端可以通过序号判断事件的顺序
  4. 持久化顺序编号节点: 客户端与Zookeeper断开后, 节点依旧存在, 只是ZooKeeper给该节点进行顺序编号
  5. 临时目录节点: 断开连接后, 节点被删除
  6. 临时顺序编号目录节点: 断开连接后, 节点被删除, ZooKeeper给该节点名称进程顺序编号

节点增删查改

# 创建节点
[zk: localhost:2181(CONNECTED) 3] create /hello "data"
Created /hello
[zk: localhost:2181(CONNECTED) 5] create /hello/world "data in world"
Created /hello/world
# 查看节点内容
[zk: localhost:2181(CONNECTED) 8] ls /hello
[world]
# 查看节点信息
[zk: localhost:2181(CONNECTED) 10] ls -s /hello
[world]
cZxid = 0x500000006
ctime = Sun Aug 20 03:57:56 UTC 2023
mZxid = 0x500000006
mtime = Sun Aug 20 03:57:56 UTC 2023
pZxid = 0x500000008
cversion = 1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 4
numChildren = 1
# 仅查看节点信息
[zk: localhost:2181(CONNECTED) 11] stat /hello
cZxid = 0x500000026
ctime = Sun Aug 20 04:45:24 UTC 2023
mZxid = 0x500000026
mtime = Sun Aug 20 04:45:24 UTC 2023
pZxid = 0x500000026
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 4
numChildren = 0
# 获取节点中的值
[zk: localhost:2181(CONNECTED) 12] get -s /hello
data
cZxid = 0x500000006
ctime = Sun Aug 20 03:57:56 UTC 2023
mZxid = 0x500000006
mtime = Sun Aug 20 03:57:56 UTC 2023
pZxid = 0x500000008
cversion = 1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 4
numChildren = 1
# 创建带序号的节点
[zk: localhost:2181(CONNECTED) 14] create -s /hello/seqworld "data with sequence"
Created /hello/seqworld0000000001 #这里ZooKeeper自动为节点添加序号
[zk: localhost:2181(CONNECTED) 1]  create -s /hello/seqworld "data with sequence 2"
Created /hello/seqworld0000000002
[zk: localhost:2181(CONNECTED) 0] create /hello/world "data with sequence 2"
Node already exists: /hello/world
# 可以看到带序号的可以重复添加, 而不带序号的不能重复添加###########################临时节点############################################
# 创建临时节点
[zk: localhost:2181(CONNECTED) 2] create -e /hello/tmpworld "temp hello data"
Created /hello/tmpworld
[zk: localhost:2181(CONNECTED) 5] ls /hello
[seqworld0000000001, seqworld0000000002, tmpworld, world]
# 创建临时顺序编号节点
[zk: localhost:2181(CONNECTED) 3] create -es /hello/tmp_seq_world "temp hello data"
Created /hello/tmp_seq_world0000000004
# 退出并重进客户端
[zk: localhost:2181(CONNECTED) 6] quit
root@4dc78c0fb310:/apache-zookeeper-3.8.2-bin# zkCli.sh
# 发现临时节点全部被删除
[zk: localhost:2181(CONNECTED) 0] ls /hello
[seqworld0000000001, seqworld0000000002, world]############################修改节点###########################
# 修改/hello/world节点下的值
[zk: localhost:2181(CONNECTED) 2] set /hello/world "new data"
[zk: localhost:2181(CONNECTED) 4] get -s /hello/world
new data
cZxid = 0x500000008
ctime = Sun Aug 20 03:58:34 UTC 2023
mZxid = 0x500000018
mtime = Sun Aug 20 04:28:15 UTC 2023
pZxid = 0x500000008
cversion = 0
dataVersion = 2
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 8
numChildren = 0#################################删除节点#########################
# 删除单个节点
[zk: localhost:2181(CONNECTED) 3] delete /hello/world
[zk: localhost:2181(CONNECTED) 5] ls /hello
[listen, new, seqworld0000000001, seqworld0000000002]
# 递归删除节点
[zk: localhost:2181(CONNECTED) 9] deleteall /hello
[zk: localhost:2181(CONNECTED) 8] ls /hello
Node does not exist: /hello

监听器

监听器原理

  1. main线程中创建客户端
  2. 之后会创建两个线程, 一个负责网络通信(connect), 另一个负责监听
  3. 通过connect线程将注册的监听事件发送给Zookeeper
  4. Zookeeper的注册监听器列表汇总将注册的监听事件列表添加到列表中
  5. Zookeeper监听到有数据或路径的变化, 会将这个消息发给listener线程
  6. listener线程内部调用了proces()方法

监听节点

# 创建节点
[zk: localhost:2181(CONNECTED) 6] create /hello/listen "data under listening"
Created /hello/listen
# 监听节点
[zk: localhost:2181(CONNECTED) 7] get -w /hello/listen
data under listening
[zk: localhost:2181(CONNECTED) 9] set /hello/listen "new data under listening"
WatchedEvent state:SyncConnected type:NodeDataChanged path:/hello/listen
# 监控节点路径的变化
[zk: localhost:2181(CONNECTED) 10] ls -w /hello
[listen, seqworld0000000001, seqworld0000000002, world]
# 创建子节点
[zk: localhost:2181(CONNECTED) 12] create /hello/new "data"
WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/hello
Created /hello/new

应用场景

统一命名服务

  1. 在分布式环境经常需要对应用/服务进行统一命名, 便于识别

在这里插入图片描述

统一配置管理:

在这里插入图片描述

  1. 分布式环境下, 配置环境需要同步

    1. 分布式环境下所有节点的配置信息应该是一致的
    2. 对配置文件修改后, 希望能够快速同步到各个节点上
  2. 配置管理可交由Zookeeper实现

    1. 可将配置信息写入Zookeeper上的一个ZNode
    2. 各个客户端服务器监听这个ZNode

统一集群管理

在这里插入图片描述

  1. 分布式环境中, 实时掌握每个节点的状态是必要的
    1. 可以根据节点实时状态做出一些调整
  2. ZooKeeper可以实现实时监控节点的状态变化
    1. 可以将节点信息写入ZooKeeper上的一个ZNode
    2. 监听这个ZNode可以获取它的实时状态变化

服务器动态上下线

在这里插入图片描述

  1. 客户端能实时洞察到服务器上下线的变化

软负载均衡

在这里插入图片描述

  1. 在ZooKeeper中记录每台服务器的访问数, 让访问数最少得服务器去处理最新的客户端需求

基本使用

安装

在多台电脑下:

version: "3"
services:zookeeper:container_name: zookeeper restart: alwaysprivileged: trueimage: zookeeperports:- 20012:2181- 20013:2888- 20014:3888volumes:- /opt/docker/zookeeper/conf:/conf- /opt/docker/zookeeper/data:/data- /opt/docker/zookeeper/logs:/datalog

使用docker-compose构建容器

docker-compose up -d

集群配置

这里有两台电脑, 域名分别为:

  • server.passnight.local
  • replica.passnight.local
  • follower.passnight.local

他们默认的终端prompt分别为:

  • passnight@passnight-s600
  • passnight@passnight-acepc
  • passnight@passnight-centerm
集群配置流程
# 配置server的myid
passnight@passnight-s600:/opt/docker/zookeeper/data$ sudo vim myid
1
:wq
# 配置replica的myid
passnight@passnight-acepc:/opt/docker/zookeeper/data$ sudo vim myid
2
:wq
# 重启Zookeeper
passnight@passnight-acepc:/opt/docker/zookeeper/data$ docker restart zookeeper
zookeeper
passnight@passnight-s600:/opt/docker/zookeeper/data$ docker restart zookeeper
zookeeper

之后再各个配置文件中配置集群服务器信息; 这里注意ip一定要用本机ip; 端口一定要对应容器内端口; 否则会出现连接失败的问题.

# 数据路径
dataDir=/data
dataLogDir=/datalog
# 心跳时间, Zookeeper服务器与客户端/服务器与服务端信条间隔,单位为毫秒
tickTime=2000
# Leader和Follower初始化通信时限, 单位为秒
initLimit=5 
# 同步时间, 如果超过同步时间, Leader认为Follower下线, 并从服务列表中删除Follower
# clientPort=2181; 客户端端口号, 通常不做修改
syncLimit=2
autopurge.snapRetainCount=3
autopurge.purgeInterval=0
maxClientCnxns=60
standaloneEnabled=true
admin.enableServer=true
# 格式: server.[服务器编号]=[服务器地址][服务器Follower与Leader交换信息的端口][用于重新选举的端口]
# 记住这里的本机地址一定要用0.0.0.0, 端口一定要用容器内的端口, 否则会出现网络连接失败的问题
server.1=0.0.0.0:2888:3888
server.2=replica.passnight.local:20013:20014
server.3=follower.passnight.local:20013:20014

进入容器检查连接状态

# 第一台机器
passnight@passnight-s600:/opt/docker/zookeeper$ docker exec -it zookeeper bash
root@4dc78c0fb310:/apache-zookeeper-3.8.2-bin# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower
# 第二台机器
passnight@passnight-centerm:/opt/docker/zookeeper$ docker exec -it zookeeper bash
root@b0c269f3f8cc:/apache-zookeeper-3.9.0-bin# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower
# 第三台机器
passnight@passnight-acepc:/opt/docker/zookeeper$ docker exec -it zookeeper bash
root@0daabd03b9e0:/apache-zookeeper-3.8.2-bin# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: leader

客户端命令行操作

# 启动客户端
root@4dc78c0fb310:/apache-zookeeper-3.8.2-bin# zkCli.sh
Connecting to localhost:2181
# ....一些日志
[zk: localhost:2181(CONNECTED) 0]

znode节点数据信息

[zk: localhost:2181(CONNECTED) 7] ls -s /
cZxid = 0x0 
ctime = Thu Jan 01 00:00:00 UTC 1970 # 创建znode的时间戳
mZxid = 0x0 # znode最后更新的事务zxid
mtime = Thu Jan 01 00:00:00 UTC 1970 # znode最后修改的毫秒数
pZxid = 0x2e5 # znode最后更新的子节点zxid
cversion = 34 # # znode子节点变化号, znode子节点修改次数
dataVersion = 0 # znode数据变化号
aclVersion = 0 # znode访问控制表变化号
ephemeralOwner = 0x0 # 如果是临时节点, 则是znode拥有者的sessionid; 否则是0
dataLength = 0 # znode数据长度
numChildren = 12 # znode子节点数量

客户端API

依赖:

<!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.9.0</version>
</dependency>

测试环境

package com.passnight.zookeeper.client;import lombok.SneakyThrows;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.junit.Before;
import org.junit.Test;import java.nio.charset.StandardCharsets;public class ZooKeeperClientTest {private final String connectString = "server.passnight.local:20012,follower.passnight.local:20012,replica.passnight.local:20012";private final int sessionTimeOut = 2000;private final Watcher watcher = event -> {};ZooKeeper zooKeeper;@Before@SneakyThrowspublic void init() {zooKeeper = new ZooKeeper(connectString, sessionTimeOut, watcher);}
}

增删查改

增加节点
    @Testpublic void create() throws InterruptedException, KeeperException {String response = zooKeeper.create("/hello", "data in /hello".getBytes(StandardCharsets.UTF_8), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);System.out.println(response);}
// /hello
读取节点
    @Testpublic void getChildren() throws InterruptedException, KeeperException {List<String> children = zooKeeper.getChildren("/", true);children.forEach(System.out::println);}
//    zookeeper
//    hello
判断节点是否存在
@Test
public void exists() throws InterruptedException, KeeperException {Stat exists = zooKeeper.exists("/data", false);System.out.println(exists == null ? "not exist" : "exist");// 输出: not existexists = zooKeeper.exists("/hello", false);System.out.println(exists == null ? "not exist" : "exist");// 输出: exist
}
监听节点
    @Testpublic void watchChange() throws InterruptedException, KeeperException {zooKeeper.getChildren("/", (WatchedEvent event) -> {try {List<String> children = zooKeeper.getChildren("/", true);children.forEach(System.out::println);} catch (KeeperException | InterruptedException e) {throw new RuntimeException(e);}});TimeUnit.DAYS.sleep(1);}
// 此时使用zkCli添加节点
// [zk: localhost:2181(CONNECTED) 1] create /new "test the listener"// 控制台输出
// zookeeper
// hello
// new

使用案例

注册中心案例

在这里插入图片描述

  1. 分布式系统中, 主节点可以有多台, 可以动态上下线, 任意一台客户端都能实时感知到主节点服务器的上下线
  2. 当服务器上线, 通知Zookeeper自己上线, 并告知相关信息
  3. 当服务器下线, 若创建的实临时节点, 对应节点会被删除; 此时Zookeeper通过监听器通知客户端该服务器已下线

zookeeper配置:

package com.passnight.zookeeper.config;import org.apache.zookeeper.Watcher;public class ZookeeperConfig {public final static String connectString = "server.passnight.local:20012,follower.passnight.local:20012,replica.passnight.local:20012";public final static int sessionTimeOut = 2000;public final static Watcher emptyWatcher = event -> {};
}

节点创建

在服务器中创建servers节点, 所谓注册中心的根节点

[zk: localhost:2181(CONNECTED) 10] create /servers "servers"
Created /servers

编写注册中心监听代码

package com.passnight.zookeeper.discovery;import com.passnight.zookeeper.config.ZookeeperConfig;
import lombok.extern.log4j.Log4j2;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;@Log4j2
public class DiscoveryServer {final static private String REGISTER_PATH = "/servers";private ZooKeeper zooKeeper;private List<String> activeServerList;CountDownLatch latch = new CountDownLatch(1);private void connect() throws IOException {zooKeeper = new ZooKeeper(ZookeeperConfig.connectString, ZookeeperConfig.sessionTimeOut, ZookeeperConfig.emptyWatcher);}private void listen() throws InterruptedException, KeeperException {activeServerList = zooKeeper.getChildren(REGISTER_PATH, (event) -> {try {listen();} catch (InterruptedException | KeeperException e) {throw new RuntimeException(e);}});log.info("server status changed: {}", activeServerList);}public static void main(String[] args) throws InterruptedException, KeeperException, IOException {DiscoveryServer server = new DiscoveryServer();server.connect();server.listen();server.latch.await();}
}

运行代码

# 使用zkCli创建节点
[zk: localhost:2181(CONNECTED) 46] create -es /servers/server1 "server1"
Created /servers/server10000000000# java客户端接收到事件
14:32:49.134 [main-EventThread] INFO com.passnight.zookeeper.discovery.Client - server status changed: [server10000000000]# 再创建一个节点
[zk: localhost:2181(CONNECTED) 47] create -es /servers/server2 "server2"
Created /servers/server20000000001
# java客户端依旧能接收到事件
14:33:15.711 [main-EventThread] INFO com.passnight.zookeeper.discovery.Client - server status changed: [server20000000001, server10000000000]# 删除节点
[zk: localhost:2181(CONNECTED) 51] delete /servers/server10000000000
# java服务端接收到了删除事件
14:40:07.598 [main-EventThread] INFO com.passnight.zookeeper.discovery.Client - server status changed: [server20000000001]

服务提供端

服务提端端代码

package com.passnight.zookeeper.discovery;import com.passnight.zookeeper.config.ZookeeperConfig;
import lombok.extern.log4j.Log4j2;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.springframework.util.Assert;import java.io.IOException;
import java.net.InetAddress;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;@Log4j2
public class ServiceServer {private ZooKeeper zooKeeper;final private static String REGISTER_PATH = "/servers/server";private void connect() throws IOException {zooKeeper = new ZooKeeper(ZookeeperConfig.connectString, ZookeeperConfig.sessionTimeOut, ZookeeperConfig.emptyWatcher);}private void register(String hostname) throws InterruptedException, KeeperException {Assert.notNull(zooKeeper, "connect to the zookeeper before registration");zooKeeper.create(REGISTER_PATH, hostname.getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);log.info("{} online", hostname);}private void serve() throws InterruptedException {TimeUnit.DAYS.sleep(Long.MAX_VALUE);}public static void main(String[] args) throws IOException, InterruptedException, KeeperException {ServiceServer serviceServer = new ServiceServer();serviceServer.connect();serviceServer.register(String.valueOf(InetAddress.getLocalHost()));serviceServer.serve();}
}

启动服务提供段

# 该日志表示服务提供端打印成功
14:43:26.363 [main] INFO com.passnight.zookeeper.discovery.ServiceServer - passnight-s600/fd12:4abe:6e6e:0:0:0:0:7f8 online# 该日志由注册中心打印, 表明服务提供端成功注册
14:43:26.369 [main-EventThread] INFO com.passnight.zookeeper.discovery.DiscoveryServer - server status changed: [server0000000002, server20000000001]

分布式锁

在这里插入图片描述

  1. 请求锁: 客户端创建临时顺序节点
  2. 序号小的节点获取锁, 处理业务; 否则对前一个节点监听
  3. 释放锁: 当监听到前一个锁释放, 则获得锁, 并处理前一步判断

分布式锁代码

package com.passnight.zookeeper.lock;import com.passnight.zookeeper.config.ZookeeperConfig;
import org.apache.zookeeper.*;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Semaphore;public class DistributeLock {final static private String LOCK_PATH = "/locks";private ZooKeeper zooKeeper;private List<String> activeServerList;Semaphore lock = new Semaphore(0);private String waitingForLock = "";private final Watcher lockWatcher = (event) -> {// 刚连接上zookeeper, 没有客户端持有锁, 释放许可if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {lock.release();}// 有节点释放锁, 释放许可if (event.getType() == Watcher.Event.EventType.NodeDeleted && event.getType().equals(waitingForLock)) {lock.release();}};private String currentLock;private void connect() throws IOException {zooKeeper = new ZooKeeper(ZookeeperConfig.connectString, ZookeeperConfig.sessionTimeOut, ZookeeperConfig.emptyWatcher);}public DistributeLock() throws IOException, InterruptedException, KeeperException {connect();if (Objects.isNull(zooKeeper.exists(LOCK_PATH, false))) {zooKeeper.create(LOCK_PATH, "locks".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}}public void lock() throws InterruptedException, KeeperException {currentLock = zooKeeper.create(LOCK_PATH + "/seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL).substring(LOCK_PATH.length() + 1);List<String> locks = zooKeeper.getChildren(LOCK_PATH, false);if (locks.size() > 1) { // 有超过一个锁请求locks.sort(String::compareTo);String minNode = locks.get(0);int currentNodeIndex = locks.indexOf(currentLock);if (!currentLock.equals(minNode)) { // 当前节点与最小节点不相等, 获取锁失败waitingForLock = locks.get(currentNodeIndex - 1);zooKeeper.getData(LOCK_PATH + "/" + waitingForLock, lockWatcher, null);lock.acquire();}}}public void unlock() throws InterruptedException, KeeperException {zooKeeper.delete(LOCK_PATH + "/" + currentLock, -1);}
}

测试用例

package com.passnight.zookeeper.lock;import lombok.SneakyThrows;
import lombok.extern.log4j.Log4j2;
import org.apache.zookeeper.KeeperException;import java.io.IOException;
import java.util.concurrent.TimeUnit;@Log4j2
public class DistributeLockTest {public static void main(String[] args) throws IOException, InterruptedException, KeeperException {final DistributeLock lock1 = new DistributeLock();final DistributeLock lock2 = new DistributeLock();new Thread(new Runnable() {@Override@SneakyThrowspublic void run() {lock1.lock();log.info("lock acquired");TimeUnit.SECONDS.sleep(3);lock1.unlock();log.info("lock released");}}).start();new Thread(new Runnable() {@Override@SneakyThrowspublic void run() {lock2.lock();log.info("lock acquired");TimeUnit.SECONDS.sleep(3);lock2.unlock();log.info("lock released");}}).start();}
}

可以得到日志

15:51:00.984 [Thread-1] INFO com.passnight.zookeeper.lock.DistributeLockTest - lock acquired
15:51:04.000 [Thread-0] INFO com.passnight.zookeeper.lock.DistributeLockTest - lock acquired
15:51:04.000 [Thread-1] INFO com.passnight.zookeeper.lock.DistributeLockTest - lock released
15:51:07.013 [Thread-0] INFO com.passnight.zookeeper.lock.DistributeLockTest - lock released

可以看到, 线程1先获取锁, 大约三秒后, 线程0才获取锁, 于此同时线程1释放锁; 再过了3秒, 线程0才释放锁

Curator分布式锁

依赖

<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-framework -->
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>5.5.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes -->
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>5.5.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-client -->
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-client</artifactId><version>5.5.0</version>
</dependency>

使用ZooKeeper客户端直接使用, 可能会存在以下问题:

  1. 异步会话连接需要自己处理
  2. Watch需要重复注册 分布式注册中心的listen()
  3. 不支持多节点的删除和创建, 要自己递归
  4. 开发复杂性较高

测试用例

package com.passnight.zookeeper.lock;import com.passnight.zookeeper.config.ZookeeperConfig;
import lombok.SneakyThrows;
import lombok.extern.log4j.Log4j2;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;import java.util.concurrent.TimeUnit;@Log4j2
public class CuratorDistributeLock {public static void main(String[] args) {InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");new Thread(new Runnable() {@Override@SneakyThrowspublic void run() {lock1.acquire();log.info("lock acquired");lock1.acquire();log.info("lock is reentrant");TimeUnit.SECONDS.sleep(3);lock1.release();lock1.release();log.info("lock released");}}).start();new Thread(new Runnable() {@Override@SneakyThrowspublic void run() {lock2.acquire();log.info("lock acquired");TimeUnit.SECONDS.sleep(3);lock2.release();log.info("lock released");}}).start();}private static CuratorFramework getCuratorFramework() {CuratorFramework curatorFramework = CuratorFrameworkFactory.builder().connectString(ZookeeperConfig.connectString).connectionTimeoutMs(ZookeeperConfig.sessionTimeOut).sessionTimeoutMs(ZookeeperConfig.sessionTimeOut).retryPolicy(new ExponentialBackoffRetry(300, 3)).build();curatorFramework.start();return curatorFramework;}
}

根据输出, 可以看到成功达到分布式锁的效果. 这里要注意可重入锁要多次释放, 不能只释放一次;

16:15:48.938 [Thread-0] INFO com.passnight.zookeeper.lock.CuratorDistributeLock - lock acquired
16:15:48.938 [Thread-0] INFO com.passnight.zookeeper.lock.CuratorDistributeLock - lock is reentrant
16:15:51.948 [Thread-0] INFO com.passnight.zookeeper.lock.CuratorDistributeLock - lock released
16:15:51.987 [Thread-1] INFO com.passnight.zookeeper.lock.CuratorDistributeLock - lock acquired
16:15:55.000 [Thread-1] INFO com.passnight.zookeeper.lock.CuratorDistributeLock - lock released

ZooKeeper算法基础

拜占庭将军问题

  1. 拜占庭将军是一个协议问题, 拜占庭帝国的将军们必须全体一致决定是否攻击敌军
  2. 但这些将军在地理位置上是分隔开来的, 且将军中存在叛徒
  3. 叛徒可以任意行动以达到目标: 欺骗将军, 促成一个不是所有将军都同意的决定, 迷惑将军使他们无法做出决定

Paxos算法

  1. Paxos算法: 是一种基于消息传递且具有高度容错特性的一致性算法, 其保证一个分布式系统对某个数据达成一致, 且不论发生任何异常, 都不会破坏数据一致性
    1. 将所有节点划分为提议者(Proposer), 接受者(Acceptor)和学习者(Learner) 注意每个节点都可以身兼数职
    2. Paxos算法分为三个阶段
      1. Prepare(准备)阶段
        1. Proposer向多个Acceptor发出Propose请求Promise:
        2. Acceptor针对收到的Propose请求进行Promise
      2. Accept接受阶段
        1. Proposer收到多数Acceptor承诺的Promise后, 向Acceptor发出Propose
        2. Acceptor针对Propose请求进行Accept处理
      3. Learn学习阶段:
        1. Proposer将形成的决议发送给所有的Learner
  2. 算法流程:
    1. Proposer生成全局唯一且递增的Proposal Id, 向所有Acceptor发送Propose请求 这里无需携带内容, 只需要携带Proposal Id
    2. :Acceptor收到Proposer请求后, 做出一个承诺, 两个应答
      1. 不再接受Proposal ID小于或等于当前请求的Proposer请求
      2. 不再接受Proposal Id 小于当先请求的Accept请求
      3. 不违背以前做出的承诺下, 回复已经Accept过的提案中Proposer ID最大的那个天的ValueProposal ID, 没有则返回空值
    3. Propose: Proposer收到多数Acceptor的Promise应答后, 从应答中选择Proposal ID最大的提案的Value, 作为本次要发起的提案. 若所有应答的提案Value均为空值, 则自己可以随意决定天Value, 然后携带当前Proposal ID, 向所有Acceptor发送Propose请求
    4. Accept: Acceptor在收到Propose请求后, 在不违背自己之前做出的承诺下, 接受并持久化当前Proposal ID和提案Value
    5. Learn: Proposer收到多Acceptor的Accept后, 决议形成, 将决议发送给所有的Learner
  3. 存在问题:
    1. 倘若A1和A5都需要A3接受才能通过提案, 此时
    2. A1发送提案号1, A3承诺A1
    3. A5发送提案号2, A3承诺A5
    4. A1发送提案1, 无法获得A3的支持, 故重新发送提案3
    5. A3发送提案号3, A3承诺A1
    6. A5发送提案2, 无法获得A3的支持, 故重新发送提案
    7. 以此往复, 形成活锁

ZAB算法

  1. 为了解决上述活锁的问题, 限制只有一个Leader才能发送提案
  2. Zab协议包括两种模式: 消息广播/崩溃恢复
  3. 消息广播
    1. 在这里插入图片描述

    2. 客户端发起一个写请求

    3. Leader将客户端请求转化为事务Proposal, 并为之分配一个全局id即zxid

    4. Leader服务器为每个Follower服务器分配一个队列, 然后将需要广播的提案放入队列, 并根据FIFO策略进行发送

    5. Follower接收到提案后, 首先会以事物日志的方式将其写入到磁盘中, 成功后返回一个ACK

    6. 当Leader收到超过半数Follower的Ack响应消息后, 即认为消息发送成功, 可发送commit消息

    7. Leader向所有Follower广播commit消息, 同时自身也完成事务提交. Follower接收到commit消息后, 会将上一条事务提交

  4. 崩溃恢复: 若Leader崩溃或网络原因导致Leader断线, 就会进入崩溃恢复模式
    1. 可能存在的情况:
      1. 事务在Leader提出之后, Leader崩溃
      2. 事务在Leader上提交了, 且过半follower响应ack, 但是Leader在Commit消息发出之前挂了
    2. 崩溃恢复要求满足以下要求
      1. 被Leader提交的提案, 必须被所有Follower服务器提交
      2. 确保丢弃已经被Leader提出的, 但是还没有被提交的提案
    3. 崩溃恢复主要包含两个部分: Leader选举数据恢复
    4. 新的Leader必须满足以下条件:
      1. 新Leader不能包含未提交的提案
      2. 新选举的Leader节点中含有最大的zxid 这说明, 该节点数据时最新的
    5. Zab数据再次同步:
      1. 新Leader在正式开始工作前, Leader服务器会首先确认日志中所有的服务器是否已经被集群中过半的服务器Commit
      2. 只有当所有Follower将所有未同步的事务同步之后, 并应用到数据内存中, Leader才会将Follower加入到可用Follower列表当中

CAP理论

CAP理论告诉我们, 一个分布式系统不可能同时满足以下三种:

  1. 一致性(Consistency): 多个副本之间是否能够保持一致的特性
  2. 可用性(Available): 系统必须一致处于可用的状态, 对于用户的每个操作总是能够在有限的时间内返回结果
  3. 分区容错性(Partition Tolerance): 分布式系统在遇到任何网络分区的故障的时候, 仍然能够保证对外提供满足一致性和可用性的服务

Zookeeper保证的是CPU, 原因如下:

  1. ZooKeeper不能保证服务请求的可用性, 因为在极端情况下, ZooKeeper会丢弃某些请求
  2. ZooKeeper在选举的时候不对外提供服务

ZooKeeper源码分析

源码地址: apache/zookeeper: Apache ZooKeeper (github.com)

引用

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/35124.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

EdgeOne 边缘函数 + Hono.js + Fauna 搭建个人博客

一、背景 虽然 “博客” 已经是很多很多年前流行的东西了&#xff0c;但是时至今日&#xff0c;仍然有一部分人在维护自己的博客站点&#xff0c;输出不少高质量的文章。 我使用过几种博客托管平台或静态博客生成框架&#xff0c;前段时间使用Hono.jsFauna &#xff0c;基于 …

RK3568平台开发系列讲解(I2C篇)利用逻辑分析仪进行I2C总线的全面分析

🚀返回专栏总目录 文章目录 1. 基础协议1.1. 协议简介1.2. 物理信号1.3. 总线连接沉淀、分享、成长,让自己和他人都能有所收获!😄 1. 基础协议 1.1. 协议简介 IIC-BUS(Inter-IntegratedCircuit Bus)最早是由PHilip半导体(现在被NXP收购)于1982年开发。 主要是用来方…

将深度相机的实时三维坐标数据保存为excel文档(Python+Pyrealsense2+YOLOv8)

一、如何将数据保存为excel文档 1.excel文件库与相关使用 &#xff08;1&#xff09;导入相应的excel文件库&#xff0c;导入前先要进行pip安装&#xff0c;pip install xlwt import xlwt # 导入用于创建和写入Excel文件的库 (2) 建立一个excel文档&#xff0c;并在第0行写…

RabbitMQ中Direct交换机的用法

前言&#xff1a;比如我们的支付完成之后需要进行修改支付状态还要完成短信通知用户需要同时并发两条指令我们可以使用direct交换机进行指定两个不同的业务去完成这两件事 比如我们现在有direct.queue1/direct.queue2两个消息队列&#xff0c;一个direct交换机 我们创建完成两…

鸿蒙开发之--生命周期

开发官网 开发-HarmonyOS开发者-华为开发者联盟 UIAbility生命周期 1、首先执行onCreate(),用于页面初始化和设置页面逻辑 2、执行onWindowStageCreate()创建一个窗口&#xff0c;在这里可以使windowStage.loadContent(url&#xff0c;&#xff08;&#xff09;>{})打开一…

“拿来主义”学习无限滚动动画(附源码)

欢迎关注&#xff1a;xssy5431 小拾岁月 参考链接&#xff1a;https://mp.weixin.qq.com/s/xVTCwR1ZSn5goWmc2yimVA 动画效果 需求分析 需求中涉及无线滚动&#xff0c;说明需要使用 animation 动画。另外&#xff0c;为了方便用户点击操作&#xff0c;需要给滚动对象添加鼠标…

感谢我的辅导员—敬爱的罗老师

前言&#xff1a;快毕业了&#xff0c;想在毕业季感谢给予我帮助的老师&#xff0c;我的辅导员-罗老师是我最想感谢的大学老师。我不知道应该以什么样的方式去表达罗老师对我大学阶段的帮助&#xff0c;如果是直接发邮件&#xff0c;微信信息留言&#xff0c;可能在之后我和老师…

MySQL索引优化解决方案--索引优化(4)

排序优化 尽量避免使用Using FileSort方式排序。order by语句使用索引最左前列或使用where子句与order by子句条件组合满足索引最左前列。where子句中如果出现索引范围查询会导致order by索引失效。 优化案例 联表查询优化 分组查询优化 慢查询日志

架构是怎样练成的-楼宇监控系统案例

目录 概要 项目背景 原系统设计方案 改进后的设计方案 小结 概要 绝大多数人掌握的架构都是直接学习&#xff0c;慢慢地才能体会到一个架构的好处。架构是一种抽象&#xff0c;是为了复用目的而对代码做的抽象。通过一个项目的改造&#xff0c;理解架构是如何产生的&…

Kubernetes Prometheus 系例 | kubernetes 部署 Kafka exporter监控Kafka集群

prometheus 监控 kafka 常见的有两种开源方案&#xff1b; 部署 exporter 或 jmx 配置监控。 项目地址&#xff1a; kafka_exporter&#xff1a;https://github.com/danielqsj/kafka_exporter jmx_exporter&#xff1a;https://github.com/prometheus/jmx_exporter 本文采用kaf…

【日常记录】【JS】优雅检测用户是否在指定元素的外部点击

文章目录 1、界面基本布局2、代码实现3、参考链接 1、界面基本布局 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0">…

搜索python包的说明

当我发现bug时&#xff0c;就怀疑是sns包的版本问题了&#xff08;原代码是原作者以前成功运行的代码&#xff09;&#xff0c;于是直接到网上搜&#xff0c;找到对应的说明文档 根据该示例代码进行改写&#xff1a; 达成目的。

【漏洞复现】用友 UFIDA saveDoc.ajax 任意文件上传漏洞

免责声明&#xff1a; 本文内容旨在提供有关特定漏洞或安全漏洞的信息&#xff0c;以帮助用户更好地了解可能存在的风险。公布此类信息的目的在于促进网络安全意识和技术进步&#xff0c;并非出于任何恶意目的。阅读者应该明白&#xff0c;在利用本文提到的漏洞信息或进行相关测…

ONLYOFFICE 8.1版本桌面编辑器测评

https://www.onlyoffice.com/zh/ 随着工作方式的不断演变&#xff0c;文档编辑软件成为了我们日常工作中不可或缺的一部分。而ONLYOFFICE作为一款开源且功能丰富的办公套件&#xff0c;其最新推出的8.1版本在原有基础上进行了大量的优化与更新&#xff0c;旨在提供更流畅、更安…

后端返回base64文件流下载

后端返回base64文件流: 前端处理&#xff1a; downloadTemplate () {this.$API.downloadTemplate().then(({ data }) > {const binaryString atob(data) // 解码base64字符串const byteArray new Uint8Array(binaryString.length) // 创建一个Uint8Arrayfor (let i 0; i…

2.优化算法之滑动窗口1

1.长度最小的子数组 . - 力扣&#xff08;LeetCode&#xff09; &#xff08;1&#xff09;题目描述 给定一个含有 n 个正整数的数组和一个正整数 target 。 找出该数组中满足其总和大于等于 target 的长度最小的 子数组 [numsl, numsl1, ..., numsr-1, numsr] &#xff0c;…

计算机视觉——opencv快速入门(一) opencv的介绍与安装

什么是opencv OpenCV&#xff08;Open Source Computer Vision Library&#xff09;是一个开源的计算机视觉和机器学习软件库。它旨在提供广泛的图像和视频处理功能&#xff0c;支持多种编程语言&#xff08;主要包括C, Python, Java等&#xff09;和操作系统&#xff08;如Li…

生产环境:CentOS 7 Docker 20.10.19离线部署(为离线部署k8s做准备)

背景描述&#xff1a;离线部署Docker环境 在现代IT基础设施中&#xff0c;Docker已经成为应用容器化的标准工具。它简化了应用程序的部署和管理&#xff0c;使开发者和运维工程师能够以更高的效率和一致性进行工作。然而&#xff0c;在某些场景下&#xff0c;由于安全性、网络…

代码随想录算法训练营第三十四天|56. 合并区间、738.单调递增的数字、968.监控二叉树

56. 合并区间 题目链接&#xff1a;56. 合并区间 文档讲解&#xff1a;代码随想录 状态&#xff1a;无语&#xff0c;这题从右边界排序做不了&#xff01; 思路&#xff1a; 排序&#xff1a;按照区间的起始位置进行排序&#xff0c;这样后面处理时可以顺序合并重叠区间。合并…

数据结构-线性表的链式表示

目录 前言一、线性表的链式表示和实现1.1 线性表的表示1.2 基本操作的实现1.3 线性表的链式表示的优缺点 总结 前言 本篇文章主要介绍线性表的链式表示 一、线性表的链式表示和实现 1.1 线性表的表示 线性表的链式表示又称为链式存储结构或链式映像 链式存储定义&#xff1…