文章目录
- 服务器动态上下线监听案例
- 需求
- 需求分析
- 具体实现
- 测试
- Zookeeper分布式锁案例
- 原生Zookeeper实现分布式锁
- Curator框架实现分布式锁
- Zookeeper面试重点
- 选举机制
- 生产集群安装多少zk合适
- zk常用命令
服务器动态上下线监听案例
需求
某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知
到主节点服务器的上下线。
需求分析
具体实现
1)先在集群上创建/servers节点
[zk: localhost:2181(CONNECTED) 10] create /servers "servers"
Created /servers
2)在Idea中创建包名:com.yudan.case1
3)服务器端向Zookeeper注册代码
import org.apache.zookeeper.*;import java.io.IOException;public class DistributeServer {private String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";private int sessionTime = 100000;private ZooKeeper zk;public static void main(String[] args) throws IOException, InterruptedException, KeeperException {DistributeServer server = new DistributeServer();// 1、获取zk连接server.getConnect();// 2、注册服务器到zk集群server.regist(args[0]);// 3、启动 业务逻辑(睡觉)server.business();}// 创建到 zk 的客户端连接private void getConnect() throws IOException {zk = new ZooKeeper(connectString, sessionTime, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {}});}// 注册到服务器private void regist(String hostname) throws InterruptedException, KeeperException {String create = zk.create("/servers/"+hostname, hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);System.out.println(hostname + " " + "is online");}// 业务功能private void business() throws InterruptedException {Thread.sleep(Long.MAX_VALUE);}
}
4)客户端代码
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;public class DistributeClient {private String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";private int sessionTime = 100000;private ZooKeeper zk;public static void main(String[] args) throws IOException, InterruptedException, KeeperException {DistributeClient client = new DistributeClient();// 1、获取zk连接client.getConnect();// 2、监听/servers下面子节点的增加和删除client.getServersList();// 3、业务逻辑(睡觉)client.business();}// 创建到 zk 的客户端连接private void getConnect() throws IOException {zk = new ZooKeeper(connectString, sessionTime, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {// 再次启动监听try {getServersList();} catch (InterruptedException e) {throw new RuntimeException(e);} catch (KeeperException e) {throw new RuntimeException(e);}}});}// 获取服务器列表信息private void getServersList() throws InterruptedException, KeeperException {// 获取服务器子节点信息,并且对父节点进行监听List<String> children = zk.getChildren("/servers", true);// 存储服务器信息列表 ArrayList<String> servers = new ArrayList<>();// 遍历所有节点,获取节点中的主机名称信息 for (String child : children) {byte[] data = zk.getData("/servers/" + child, false, null);servers.add(new String(data));}// 打印System.out.println(servers);}private void business() throws InterruptedException {Thread.sleep(Long.MAX_VALUE);}
}
测试
1)在Linux命令行上操作增加减少服务器
(1)启动DistributeClient 客户端
(2)在hadoop102上zk的客户端/servers目录上创建临时带序号节点
[zk: localhost:2181(CONNECTED) 1] create -e -s /servers/hadoop102 "hadoop102"
[zk: localhost:2181(CONNECTED) 2] create -e -s /servers/hadoop103 "hadoop103"
(3)观察Idea控制台变化
[hadoop102, hadoop103]
(4)执行删除操作
[zk: localhost:2181(CONNECTED) 8] delete /servers/hadoop1020000000000
(5)观察Idea控制台变化
[hadoop103]
2)在Idea上操作增加减少服务器
(1)启动DistributeClient 客户端(如果已经启动过,不需要重启)
(2)启动DistributeServer 服务
- 点击Edit Configurations…
- 在弹出的窗口中(Program arguments)输入想启动的主机,例如,hadoop102
- 回到DistributeServer的main方法,右键,在弹出的窗口中点击Run “DistributeServer.main()”
- 观察DistributeServer控制台,提示hadoop102 is online
- 观察DistributeClient控制台,提示hadoop102已经上线
Zookeeper分布式锁案例
什么叫做分布式锁呢?
比如说"进程1"在使用该资源的时候,会先去获得锁,"进程1"获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源,"进程1"用完该资源以后就将锁释放掉,让其他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的访问该临界资源。那么我们把这个分布式环境下的这个锁叫作分布式锁。
原生Zookeeper实现分布式锁
1)分布式锁实现
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;public class DistributeLock {private final String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";private final int sessionTime = 100000;private final ZooKeeper zk;// 当前client等待的子节点private String waitPath;// zookeeper节点等待private CountDownLatch waitLatch = new CountDownLatch(1);// zookeeper连接private CountDownLatch connectLatch = new CountDownLatch(1);// 当前client创建的子节点private String currentMode;// 和 zk 服务建立连接,并创建根节点public DistributeLock() throws IOException, InterruptedException, KeeperException {// 1、获取连接zk = new ZooKeeper(connectString, sessionTime, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {// connectLatch 如果连接上zk 可以释放// 连接建立时, 打开latch, 唤醒wait在该latch上的线程if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {connectLatch.countDown();}// waitLatch 需要释放// 发生了waitPath的删除事件if (watchedEvent.getType() == Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)) {waitLatch.countDown();}}});// 等待 zookeeper正常连接后,往下走程序connectLatch.await();// 2、判断根节点/locks是否存在Stat stat = zk.exists("/locks", false);if (stat == null) {// 创建一下根节点zk.create("/locks","locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);}}// 对zk加锁public void zkLock() {// 创建对应的临时带序号节点try {currentMode = zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);// 判断创建的节点是否是最小的序号节点,如果是获取到锁;如果不是, 监听他序号的前一个节点List<String> children = zk.getChildren("/locks", false);// 如果children 只有一个值,那就直接获取锁;如果有多个节点,需要判断,哪个节点最小if (children.size() == 1) {return;} else {// 对children集合内的节点进行排序Collections.sort(children);// 获取节点名称 seq-String thisNode = currentMode.substring("/locks/".length());// 通过seq- 获取到该节点在children集合中的位置int index = children.indexOf(thisNode);// 判断if (index == -1) {System.out.println("数据异常");} else if (index == 0) {// 就一个节点,可以获取锁了return;}else {// 需要监听前一个节点waitPath = "/locks/" + children.get(index-1);zk.getData(waitPath,true,null);// 等待监听waitLatch.await();return;}}} catch (KeeperException e) {throw new RuntimeException(e);} catch (InterruptedException e) {throw new RuntimeException(e);}}// 对zk解锁public void unzkLock() {// 删除节点try {zk.delete(currentMode,-1);} catch (InterruptedException e) {throw new RuntimeException(e);} catch (KeeperException e) {throw new RuntimeException(e);}}
}
2)分布式锁测试
(1)创建两个线程
import org.apache.zookeeper.KeeperException;import java.io.IOException;public class DistributeLockTest {public static void main(String[] args) throws IOException, InterruptedException, KeeperException {// 创建分布式锁1final DistributeLock lock1 = new DistributeLock();// 创建分布式锁2final DistributeLock lock2 = new DistributeLock();new Thread(new Runnable() {@Overridepublic void run() {// 获取锁对象try {lock1.zkLock();System.out.println("线程1 启动,获取到锁");Thread.sleep(5 * 1000);lock1.unzkLock();System.out.println("线程1 释放锁");} catch (InterruptedException e) {throw new RuntimeException(e);}}}).start();new Thread(new Runnable() {@Overridepublic void run() {// 获取锁对象try {lock2.zkLock();System.out.println("线程2 启动,获取到锁");Thread.sleep(5 * 1000);lock2.unzkLock();System.out.println("线程2 释放锁");} catch (InterruptedException e) {throw new RuntimeException(e);}}}).start();}
}
(2)观察控制台变化
线程1获取锁
线程1释放锁
线程2获取锁
线程2释放锁
Curator框架实现分布式锁
1)原生的Java API开发存在的问题
(1)会话连接是异步的,需要自己去处理。比如使用CountDownLatch
(2)Watch需要重复注册,不然就不能生效
(3)开发的复杂性还是比较高的
(4)不支持多节点删除和创建。需要自己去递归
2)Curator是一个专门解决分布式锁的框架,解决了原生Java API开发分布式遇到的问题。
详情请查看官方文档:https://curator.apache.org/index.html
3)Curator 案例实操
(1)添加依赖
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.3.0</version>
</dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.3.0</version>
</dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-client</artifactId> <version>4.3.0</version>
</dependency>
(2)代码实现
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;public class CuratorLockTest {public static void main(String[] args) {// 创建分布式锁1InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");// 创建分布式锁2InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");new Thread(new Runnable() {@Overridepublic void run() {try {lock1.acquire();System.out.println("线程1 获取到锁");lock1.acquire();System.out.println("线程1 再次获取到锁");Thread.sleep(5 * 1000);lock1.release();System.out.println("线程1 释放锁");lock1.release();System.out.println("线程1 再次释放锁");} catch (Exception e) {throw new RuntimeException(e);}}}).start();new Thread(new Runnable() {@Overridepublic void run() {try {lock2.acquire();System.out.println("线程2 获取到锁");lock2.acquire();System.out.println("线程2 再次获取到锁");Thread.sleep(5 * 1000);lock2.release();System.out.println("线程2 释放锁");lock2.release();System.out.println("线程2 再次释放锁");} catch (Exception e) {throw new RuntimeException(e);}}}).start();}// 分布式锁初始化private static CuratorFramework getCuratorFramework() {// 重试策略,初始时间3秒,重试3次ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);CuratorFramework client = CuratorFrameworkFactory.builder().connectString("hadoop102:2181,hadoop103:2181,hadoop104:2181").connectionTimeoutMs(100000).sessionTimeoutMs(100000).retryPolicy(policy).build();// 启动客户端client.start();System.out.println("zookeeper 启动成功!");return client;}
}
(2)观察控制台变化:
线程1获取锁
线程1再次获取锁
线程1释放锁
线程1再次释放锁
线程2获取锁
线程2再次获取锁
线程2释放锁
线程2再次释放锁
Zookeeper面试重点
选举机制
半数机制,超过半数的投票通过,即通过。
(1)第一次启动选举规则:
投票过半数时,服务器id大的胜出
(2)第二次启动选举规则:
①EPOCH大的直接胜出
②EPOCH相同,事务id大的胜出
③事务id相同,服务器id大的胜出
生产集群安装多少zk合适
安装奇数台。
生产经验:
- 10台服务器:3台zk;
- 20台服务器:5台zk;
- 100台服务器:11台zk;
- 200台服务器:11台zk
zk常用命令
ls、get、create、delete