Zookeeper详解

1.Zookeeper概述

1.Zookeeper概念

Zookeeper是 Apache Hadoop 项目下的一个子项目,是一个树形目录服务

Zookeeper 翻译过来就是动物园管理员,他是用来管 Hadoop(大象)、Hive(蜜蜂)、Pig(小猪)的管理员。简称zk

Hadoop: 存储海量数据和分析海量数据的工具

Hive: 基于Hadoop的一个数据仓库工具,用来进行数据提取、转化、加载

Pig: 基于Hadoop的大规模数据分析平台

Zookeeper 是一个开源的,分布式应用程序的协调服务

Zookeeper 提供的主要功能包括:

注册中心:比如Dubbo的注册中心

分布式锁:比如卖电影票有好多入口:官网、猫眼、淘票票

配置管理:比如用来保存项目的各种配置信息(数据库连接信息);常用的配置中心:Apollo(百度开源)、Nacos(阿里开源)

2.ZooKeeper 命令操作

1.数据模型

ZooKeeper 是一个树形目录服务,其数据模型和Unix的文件系统目录树很类似,拥有一个层次化结构。

这里面的每一个节点都被称为:ZNode,每个节点上都会保存自己的数据和节点信息。

节点可以拥有子节点,同时也允许少量(1MB)数据存储在该节点之下

节点可以分为四大类:

持久节点(PERSISTENT):创建后一直存在,直到主动删除此节点

临时节点(EPHEMERAL):生命周期依赖于客户端会话,对应客户端会话失效后节点自动清除:-e

持久顺序节点(PERSISTENT_SEQUENTIAL) :持久顺序节点,创建后一直存在,直到主动删除此节点:-s

临时顺序节点(EPHEMERAL_SEQUENTIAL) :临时节点在客户端会话失效后节点自动清:-es

2.服务端命令

启动 ZooKeeper 服务: ./zkServer.sh start

查看 ZooKeeper 服务状态: ./zkServer.sh status

停止 ZooKeeper 服务: ./zkServer.sh stop

重启 ZooKeeper 服务: ./zkServer.sh restart

3.客户端常用命令

连接ZooKeeper服务端

./zkCli.sh -server localhost:2181

若为连接本机可直接

./zkCli.sh

显示指定目录下节点 

查看根节点
ls /

查看指定节点
ls /zookeeper

 

创建根节点

create /节点path value 

获取节点

create /节点path value 

设置节点

set /节点path value 

创建子节点

create /节点path/子节点 

删除单个节点

delete /节点path 

删除带有子节点的节点

deleteall /节点path 

查看命令帮助

help 

4.创建临时节点和顺序节点

创建临时节点

create -e /节点path value 

再次使用zk-cli连接服务端,验证刚才创建的临时节点已经没了

创建顺序节点,zk会自动在节点路径后边添加序号

create -s /节点path value

创建临时顺序节点

create -es /节点path value

czxid:节点被创建的事务ID

ctime: 创建时间

mZxid: 最后一次被更新的事务ID

mtime: 修改时间

pzxid:子节点列表最后一次被更新的事务ID

cversion:子节点的版本号

dataversion:数据版本号

aclversion:权限版本号

ephemeralOwner:用于临时节点,代表临时节点的事务ID,如果为持久节点则为0

dataLength:节点存储的数据的长度

numChildren:当前节点的子节点个数 

查询节点详细信息

ls –s /节点path 

2..ZooKeeper JavaAPI 操作 

1.Curator介绍

Curator 是 Apache ZooKeeper 的Java客户端库,目标是简化 ZooKeeper 客户端的使用

常见的ZooKeeper Java API :

原生Java API

ZkClient

Curator

Curator 最初是 Netfix 研发的,后来捐献了 Apache 基金会,目前是 Apache 的顶级项目

官网:Welcome to Apache Curator | Apache Curator

2.建立连接

创建项目curator-zk

pom.xml中添加Curator和日志坐标 

    <properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><!--单元测试--><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.10</version><scope>test</scope></dependency><!--curator--><dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>4.0.0</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>4.0.0</version></dependency><!--日志--><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.21</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.21</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin></plugins></build>

日志配置文件:log4j.properties

log4j.rootLogger=off,stdoutlog4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = [%d{yyyy-MM-dd HH/:mm/:ss}]%-5p %c(line/:%L) %x-%m%n

在java测试包下创建com\curator包创建测试类CuratorTest

package com.curator;import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.Test;public class CuratorTest {/** 建立连接*/@Testpublic void testConnect() {/** connectString         连接字符串。zk server 地址和端口* sessionTimeoutMs      会话差事时间 单位ms* connectionTimeoutMs   连接超时时间 单位ms* retryPolicy           重试策略*///重试策略RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);//第一种方式/*CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.235.129:2181",60*1000,15*1000,retryPolicy);*///常用第二种方式,更直观CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.235.129:2181").sessionTimeoutMs(60 * 1000).connectionTimeoutMs(15 * 1000).retryPolicy(retryPolicy).namespace("ljb").build();//开启连接client.start();}}

3.创建节点

@Before:初始化方法(对于每一个测试方法都要执行一次)

@After:释放资源(对于每一个测试方法运行完后都要执行一次)

package com.curator;import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;public class CuratorTest {/** 建立连接*/private CuratorFramework client;@Beforepublic void testConnect() {/** connectString         连接字符串。zk server 地址和端口* sessionTimeoutMs      会话差事时间 单位ms* connectionTimeoutMs   连接超时时间 单位ms* retryPolicy           重试策略*///重试策略RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);//第一种方式/*CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.235.129:2181",60*1000,15*1000,retryPolicy);*///常用第二种方式,更直观client = CuratorFrameworkFactory.builder().connectString("192.168.235.129:2181").sessionTimeoutMs(60 * 1000).connectionTimeoutMs(15 * 1000).retryPolicy(retryPolicy).namespace("ljb").build();//开启连接client.start();}/** 创建节点:create 持久 临时 顺序 数据* 1.基本创建* 2.创建节点 带有数据* 3.设置节点的类型* 4.创建多级节点 /app1/p1*/@Testpublic void testCreate() throws Exception {//1. 基本创建 :create().forPath("")//如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储String path = client.create().forPath("/app1");System.out.println(path);}@Testpublic void testCreate2() throws Exception {//2. 创建节点 带有数据:create().forPath("",data)//节点默认类型:持久化String path = client.create().forPath("/app2", "hehe".getBytes());System.out.println(path);}@Testpublic void testCreate3() throws Exception {//3. 设置节点的类型:create().withMode().forPath("",data)//创建临时节点String path = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3");System.out.println(path);}@Testpublic void testCreate4() throws Exception {//创建多级节点  /app1/p1 :create().creatingParentsIfNeeded().forPath("",data)//creatingParentsIfNeeded():如果父节点不存在,则创建父节点String path = client.create().creatingParentsIfNeeded().forPath("/app4/p1");System.out.println(path);}@Afterpublic void close() {if (client != null) {client.close();}}}

4.查询节点

//====================get===============================/*** 查询节点:* 1. 查询数据:get: getData().forPath()* 2. 查询子节点: ls: getChildren().forPath()* 3. 查询节点状态信息:ls -s:getData().storingStatIn(状态对象).forPath()*/@Testpublic void testGet1() throws Exception {//1. 查询数据:getbyte[] data = client.getData().forPath("/app1");System.out.println(new String(data));}@Testpublic void testGet2() throws Exception {// 2. 查询子节点: lsList<String> path = client.getChildren().forPath("/");System.out.println(path);}@Testpublic void testGet3() throws Exception {Stat status = new Stat();System.out.println(status);//3. 查询节点状态信息:ls -s// store status in ...client.getData().storingStatIn(status).forPath("/app1");System.out.println(status);}

5.修改节点 

    //=====================set==================/*** 修改数据* 1. 基本修改数据:setData().forPath()* 2. 根据版本修改: setData().withVersion().forPath()* * version 是通过查询出来的。目的就是为了让其他客户端或者线程不干扰我。** @throws Exception*/@Testpublic void testSet() throws Exception {client.setData().forPath("/app1", "ljb".getBytes());}@Testpublic void testSetForVersion() throws Exception {Stat status = new Stat();//3. 查询节点状态信息:ls -sclient.getData().storingStatIn(status).forPath("/app1");int version = status.getVersion();//查询出来的 3System.out.println(version);client.setData().withVersion(version).forPath("/app1", "hehe".getBytes());}

6.删除结点

    //=================delete=================/*** 删除节点: delete deleteall* 1. 删除单个节点:delete().forPath("/app1");* 2. 删除带有子节点的节点:delete().deletingChildrenIfNeeded().forPath("/app1");* 3. 必须成功的删除:为了防止网络抖动。本质就是重试。  client.delete().guaranteed().forPath("/app2");* 4. 回调:inBackground*/@Testpublic void testDelete() throws Exception {// 1. 删除单个节点client.delete().forPath("/app1");}@Testpublic void testDelete2() throws Exception {//2. 删除带有子节点的节点client.delete().deletingChildrenIfNeeded().forPath("/app4");}@Testpublic void testDelete3() throws Exception {//3. 必须成功的删除, 底层是自动重试client.delete().guaranteed().forPath("/app2");}@Testpublic void testDelete4() throws Exception {//4. 回调client.delete().guaranteed().inBackground(new BackgroundCallback(){@Overridepublic void processResult(CuratorFramework client, CuratorEvent event) throws Exception {System.out.println("我被删除了~");System.out.println(event);}}).forPath("/app1");}

完整代码 

package com.curator;import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;import java.util.List;public class CuratorTest {/** 建立连接*/private CuratorFramework client;@Beforepublic void testConnect() {/** connectString         连接字符串。zk server 地址和端口* sessionTimeoutMs      会话差事时间 单位ms* connectionTimeoutMs   连接超时时间 单位ms* retryPolicy           重试策略*///重试策略RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);//第一种方式/*CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.235.129:2181",60*1000,15*1000,retryPolicy);*///常用第二种方式,更直观client = CuratorFrameworkFactory.builder().connectString("192.168.235.129:2181").sessionTimeoutMs(60 * 1000).connectionTimeoutMs(15 * 1000).retryPolicy(retryPolicy).namespace("ljb").build();//开启连接client.start();}
//======================create=====================/** 创建节点:create 持久 临时 顺序 数据* 1.基本创建* 2.创建节点 带有数据* 3.设置节点的类型* 4.创建多级节点 /app1/p1*/@Testpublic void testCreate() throws Exception {//1. 基本创建 :create().forPath("")//如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储String path = client.create().forPath("/app1");System.out.println(path);}@Testpublic void testCreate2() throws Exception {//2. 创建节点 带有数据:create().forPath("",data)//节点默认类型:持久化String path = client.create().forPath("/app2", "hehe".getBytes());System.out.println(path);}@Testpublic void testCreate3() throws Exception {//3. 设置节点的类型:create().withMode().forPath("",data)//创建临时节点String path = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3");System.out.println(path);}@Testpublic void testCreate4() throws Exception {//创建多级节点  /app1/p1 :create().creatingParentsIfNeeded().forPath("",data)//creatingParentsIfNeeded():如果父节点不存在,则创建父节点String path = client.create().creatingParentsIfNeeded().forPath("/app4/p1");System.out.println(path);}
//====================get===============================/*** 查询节点:* 1. 查询数据:get: getData().forPath()* 2. 查询子节点: ls: getChildren().forPath()* 3. 查询节点状态信息:ls -s:getData().storingStatIn(状态对象).forPath()*/@Testpublic void testGet1() throws Exception {//1. 查询数据:getbyte[] data = client.getData().forPath("/app1");System.out.println(new String(data));}@Testpublic void testGet2() throws Exception {// 2. 查询子节点: lsList<String> path = client.getChildren().forPath("/");System.out.println(path);}@Testpublic void testGet3() throws Exception {Stat status = new Stat();System.out.println(status);//3. 查询节点状态信息:ls -s// store status in ...client.getData().storingStatIn(status).forPath("/app1");System.out.println(status);}//=====================set==================/** 修改数据* 1. 基本修改数据:setData().forPath()* 2. 根据版本修改: setData().withVersion().forPath()* * version 是通过查询出来的。目的就是为了让其他客户端或者线程不干扰我。*/@Testpublic void testSet() throws Exception {client.setData().forPath("/app1", "ljb".getBytes());}@Testpublic void testSetForVersion() throws Exception {Stat status = new Stat();//3. 查询节点状态信息:ls -sclient.getData().storingStatIn(status).forPath("/app1");int version = status.getVersion();//查询出来的 3System.out.println(version);client.setData().withVersion(version).forPath("/app1", "hehe".getBytes());}//=================delete=================/*** 删除节点: delete deleteall* 1. 删除单个节点:delete().forPath("/app1");* 2. 删除带有子节点的节点:delete().deletingChildrenIfNeeded().forPath("/app1");* 3. 必须成功的删除:为了防止网络抖动。本质就是重试。  client.delete().guaranteed().forPath("/app2");* 4. 回调:inBackground*/@Testpublic void testDelete() throws Exception {// 1. 删除单个节点client.delete().forPath("/app1");}@Testpublic void testDelete2() throws Exception {//2. 删除带有子节点的节点client.delete().deletingChildrenIfNeeded().forPath("/app4");}@Testpublic void testDelete3() throws Exception {//3. 必须成功的删除, 底层是自动重试client.delete().guaranteed().forPath("/app2");}@Testpublic void testDelete4() throws Exception {//4. 回调client.delete().guaranteed().inBackground(new BackgroundCallback(){@Overridepublic void processResult(CuratorFramework client, CuratorEvent event) throws Exception {System.out.println("我被删除了~");System.out.println(event);}}).forPath("/app1");}@Afterpublic void close() {if (client != null) {client.close();}}
}

7.Watch监听概念 

ZooKeeper 允许用户在指定节点上注册一些Watcher,并且在一些特定事件触发的时候,ZooKeeper 服务端会将事件通知到感兴趣的客户端上去,该机制是 ZooKeeper 实现分布式协调服务的重要特性

ZooKeeper 中引入了Watcher机制来实现了发布/订阅功能能,能够让多个订阅者同时监听某一个对象,当一个对象自身状态变化时,会通知所有订阅者

ZooKeeper 原生支持通过注册Watcher来进行事件监听,但是其使用并不是特别方便,需要开发人员自己反复注册Watcher,比较繁琐

Curator引入了 Cache 来实现对 ZooKeeper 服务端事件的监听

ZooKeeper提供了三种Watcher:

NodeCache : 只监听某一个特定的节点

PathChildrenCache : 监控一个ZNode的子节点 

TreeCache : 可以监控整个树上的所有节点,类似于PathChildrenCache和NodeCache的组合

8.Watch监听-NodeCache

只监听某一个特定的节点

package com.curator;import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;public class CuratorWatcherTest {/** 建立连接*/private CuratorFramework client;@Beforepublic void testConnect() {/** connectString         连接字符串。zk server 地址和端口* sessionTimeoutMs      会话差事时间 单位ms* connectionTimeoutMs   连接超时时间 单位ms* retryPolicy           重试策略*///重试策略RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);//第一种方式/*CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.235.129:2181",60*1000,15*1000,retryPolicy);*///常用第二种方式,更直观client = CuratorFrameworkFactory.builder().connectString("192.168.235.129:2181").sessionTimeoutMs(60 * 1000).connectionTimeoutMs(15 * 1000).retryPolicy(retryPolicy).namespace("ljb").build();//开启连接client.start();}@Afterpublic void close() {if (client != null) {client.close();}}/*** 演示 NodeCache:给指定一个节点注册监听器*/@Testpublic void testNodeCache() throws Exception {//1. 创建NodeCache对象NodeCache nodeCache = new NodeCache(client,"/app1");//2. 注册监听nodeCache.getListenable().addListener(new NodeCacheListener() {@Overridepublic void nodeChanged() throws Exception {System.out.println("节点变化了~");//获取修改节点后的数据byte[] data = nodeCache.getCurrentData().getData();System.out.println(new String(data));}});//3. 开启监听.如果设置为true,则开启监听是,加载缓冲数据nodeCache.start(true);while (true){}}
}

9.Watch监听-PathChildrenCache

监控一个ZNode的子节点

package com.curator;import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;public class CuratorWatcherTest {/** 建立连接*/private CuratorFramework client;@Beforepublic void testConnect() {/** connectString         连接字符串。zk server 地址和端口* sessionTimeoutMs      会话差事时间 单位ms* connectionTimeoutMs   连接超时时间 单位ms* retryPolicy           重试策略*///重试策略RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);//第一种方式/*CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.235.129:2181",60*1000,15*1000,retryPolicy);*///常用第二种方式,更直观client = CuratorFrameworkFactory.builder().connectString("192.168.235.129:2181").sessionTimeoutMs(60 * 1000).connectionTimeoutMs(15 * 1000).retryPolicy(retryPolicy).namespace("ljb").build();//开启连接client.start();}@Afterpublic void close() {if (client != null) {client.close();}}@Testpublic void testPathChildrenCache() throws Exception {//1.创建监听对象PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/app2",true);//2. 绑定监听器pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {    			@Overridepublic void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {System.out.println("子节点变化了~");System.out.println(event);//监听子节点的数据变更,并且拿到变更后的数据//1.获取类型PathChildrenCacheEvent.Type type = event.getType();//2.判断类型是否是updateif(type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){System.out.println("数据变了!!!");byte[] data = event.getData().getData();System.out.println(new String(data));}}});//3. 开启pathChildrenCache.start();while (true){}}
}

10.Watch监听-TreeCache 

可以监控整个树上的所有节点,类似于PathChildrenCache和NodeCache的组合

package com.curator;import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;public class CuratorWatcherTest {/** 建立连接*/private CuratorFramework client;@Beforepublic void testConnect() {/** connectString         连接字符串。zk server 地址和端口* sessionTimeoutMs      会话差事时间 单位ms* connectionTimeoutMs   连接超时时间 单位ms* retryPolicy           重试策略*///重试策略RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);//第一种方式/*CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.235.129:2181",60*1000,15*1000,retryPolicy);*///常用第二种方式,更直观client = CuratorFrameworkFactory.builder().connectString("192.168.235.129:2181").sessionTimeoutMs(60 * 1000).connectionTimeoutMs(15 * 1000).retryPolicy(retryPolicy).namespace("ljb").build();//开启连接client.start();}@Afterpublic void close() {if (client != null) {client.close();}}@Testpublic void testTreeCache() throws Exception {//1. 创建监听器TreeCache treeCache = new TreeCache(client,"/app2");//2. 注册监听treeCache.getListenable().addListener(new TreeCacheListener() {@Overridepublic void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {System.out.println("节点变化了");System.out.println(event);}});//3. 开启treeCache.start();while (true){}}
}

3.分布式锁

1.概述

在我们进行单机应用开发,涉及并发同步的时候,我们往往采用synchronized(同步)或者Lock的方式来解决多线程间的代码同步问题,这时多线程的运行都是在同一个JVM之下,没有任何问题

但当我们的应用是分布式集群工作的情况下,属于多JVM下的工作环境,跨JVM之间已经无法通过多线程的锁解决同步问题

那么就需要一种更加高级的锁机制,来处理这种跨机器的进程之间的数据同步问题——这就是分布式锁

2.zookeeper分布式锁原理

核心思想:当客户端要获取锁,则创建节点,使用完锁,则删除该节点 

1.客户端获取锁时,在lock节点下创建临时顺序节点。

2.然后获取lock下面的所有子节点,客户端获取到所有的子节点之后,如果发现自己创建的子节点序号最小,那么就认为该客户端获取到了锁。使用完锁后,将该节点删除。

3.如果发现自己创建的节点并非lock所有子节点中最小的,说明自己还没有获取到锁,此时客户端需要找到比自己小的那个节点,同时对其注册事件监听器,监听删除事件。

4.如果发现比自己小的那个节点被删除,则客户端的Watcher会收到相应通知,此时再次判断自己创建的节点是否是lock子节点中序号最小的;如果是则获取到了锁,如果不是则重复以上步骤继续获取到比自己小的一个节点并注册监听

为什么创建的是临时顺序节点: 如果是持久化结点,如果获取锁的节点宕机,锁就不会被释放,如果是临时的锁就会被自动释放。是顺序节点是因为要找最小的节点所以要排个顺序

3.模拟12306售票案例

在Curator中有五种锁方案:

InterProcessSemaphoreMutex:分布式排它锁(非可重入锁)

InterProcessMutex:分布式可重入排它锁

InterProcessReadWriteLock:分布式读写锁

InterProcessMultiLock:将多个锁作为单个实体管理的容器

InterProcessSemaphoreV2:共享信号量

创建线程进行加锁设置

package com.curator;import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;import java.util.concurrent.TimeUnit;public class Ticket12306 implements Runnable{private int tickets = 10;//数据库的票数private InterProcessMutex lock ;//在构造方法中创建连接,并且初始化锁public Ticket12306() {//重试策略RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);//2.第二种方式//CuratorFrameworkFactory.builder();CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.235.129:2181").sessionTimeoutMs(60 * 1000).connectionTimeoutMs(15 * 1000).retryPolicy(retryPolicy).build();//开启连接client.start();try {if (client.checkExists().forPath("/lock") != null) {//删除之前测试遗留的/lock节点client.delete().deletingChildrenIfNeeded().forPath("/lock");}} catch (Exception e) {e.printStackTrace();}lock = new InterProcessMutex(client, "/lock");}@Overridepublic void run() {while(true){//获取锁try {lock.acquire(3, TimeUnit.SECONDS);//时间与时间单位if(tickets > 0){System.out.println(Thread.currentThread()+":"+tickets);Thread.sleep(100);tickets--;}} catch (Exception e) {e.printStackTrace();}finally {//释放锁try {lock.release();} catch (Exception e) {e.printStackTrace();}}}}
}

测试

package com.curator;public class LockTest {public static void main(String[] args) {Ticket12306 ticket12306 = new Ticket12306();//创建客户端Thread t1 = new Thread(ticket12306,"携程");Thread t2 = new Thread(ticket12306,"飞猪");t1.start();t2.start();}
}

 

4.Zookeeper 集群搭建 

1.Zookeeper集群概述

Leader选举:

Serverid:服务器ID

比如有三台服务器,编号分别是1,2,3

编号越大在选择算法中的权重越大

Zxid:数据ID

服务器中存放的最大数据ID,值越大说明数据越新,在选举算法中数据越新权重越大

在Leader选举的过程中,如果某台ZooKeeper获得了超过半数的选票,则此ZooKeeper就可以成为Leader了

2.集群搭建

搭建要求

真实的集群是需要部署在不同的服务器上的,但是在我们测试时同时启动很多个虚拟机内存会吃不消,所以我们通常会搭建伪集群,也就是把所有的服务都搭建在一台虚拟机上,用端口进行区分。

我们这里要求搭建一个三个节点的Zookeeper集群(伪集群)

3.搭建准备

建立/usr/local/zookeeper-cluster目录,将解压后的Zookeeper复制到以下三个目录

mkdir /usr/local/zookeeper-cluster

cp -r  apache-zookeeper-3.8.3-bin /usr/local/zookeeper-cluster/zookeeper-1

cp -r  apache-zookeeper-3.8.3-bin /usr/local/zookeeper-cluster/zookeeper-2

cp -r  apache-zookeeper-3.8.3-bin /usr/local/zookeeper-cluster/zookeeper-3

创建data目录 ,并且将 conf下zoo_sample.cfg 文件改名为 zoo.cfg

mkdir /usr/local/zookeeper-cluster/zookeeper-1/data
mkdir /usr/local/zookeeper-cluster/zookeeper-2/data
mkdir /usr/local/zookeeper-cluster/zookeeper-3/data

mv  /usr/local/zookeeper-cluster/zookeeper-1/conf/zoo_sample.cfg  /usr/local/zookeeper-cluster/zookeeper-1/conf/zoo.cfg

mv  /usr/local/zookeeper-cluster/zookeeper-2/conf/zoo_sample.cfg  /usr/local/zookeeper-cluster/zookeeper-2/conf/zoo.cfg

mv  /usr/local/zookeeper-cluster/zookeeper-3/conf/zoo_sample.cfg  /usr/local/zookeeper-cluster/zookeeper-3/conf/zoo.cfg

配置每一个Zookeeper 的dataDir 和 clientPort 分别为2181 2182 2183

vim /usr/local/zookeeper-cluster/zookeeper-1/conf/zoo.cfg
dataDir=/usr/local/zookeeper-cluster/zookeeper-1/data

clientPort=2181

vim /usr/local/zookeeper-cluster/zookeeper-2/conf/zoo.cfg

clientPort=2182
dataDir=/usr/local/zookeeper-cluster/zookeeper-2/data

vim /usr/local/zookeeper-cluster/zookeeper-3/conf/zoo.cfg

clientPort=2183
dataDir=/usr/local/zookeeper-cluster/zookeeper-3/data

4.配置集群

在每个zookeeper的 data 目录下创建一个 myid 文件,内容分别是1、2、3 。这个文件就是记录每个服务器的ID

echo 1 >/usr/local/zookeeper-cluster/zookeeper-1/data/myid
echo 2 >/usr/local/zookeeper-cluster/zookeeper-2/data/myid
echo 3 >/usr/local/zookeeper-cluster/zookeeper-3/data/myid
 

在每一个zookeeper 的 zoo.cfg配置客户端访问端口(clientPort)和集群服务器IP列表

vim /usr/local/zookeeper-cluster/zookeeper-1/conf/zoo.cfg
vim /usr/local/zookeeper-cluster/zookeeper-2/conf/zoo.cfg
vim /usr/local/zookeeper-cluster/zookeeper-3/conf/zoo.cfg

server.1=192.168.235.129:2881:3881
server.2=192.168.235.129:2882:3882
server.3=192.168.235.129:2883:3883

注:server.服务器ID=服务器IP地址:服务器之间通信端口:服务器之间投票选举端口

5.启动集群

启动集群就是分别启动每个实例

/usr/local/zookeeper-cluster/zookeeper-1/bin/zkServer.sh start
/usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh start
/usr/local/zookeeper-cluster/zookeeper-3/bin/zkServer.sh start

启动后查询一下每个实例的运行状态

/usr/local/zookeeper-cluster/zookeeper-1/bin/zkServer.sh status
/usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh status
/usr/local/zookeeper-cluster/zookeeper-3/bin/zkServer.sh status

先查询第一个服务:Mode为follower表示是跟随者(从)

再查询第二个服务:Mod 为leader表示是领导者(主)

查询第三个为跟随者(从)

6.模拟集群异常 

首先我们先测试如果是从服务器挂掉,会怎么样

把3号服务器停掉,观察1号和2号,发现状态并没有变化

/usr/local/zookeeper-cluster/zookeeper-3/bin/zkServer.sh stop

/usr/local/zookeeper-cluster/zookeeper-1/bin/zkServer.sh status
/usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh status

由此得出结论,3个节点的集群,从服务器挂掉,集群正常

再把1号服务器(从服务器)也停掉,查看2号(主服务器)的状态,发现已经停止运行了

/usr/local/zookeeper-cluster/zookeeper-1/bin/zkServer.sh stop

/usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh status

3个节点的集群,2个从服务器都挂掉,主服务器也无法运行。因为可运行的机器没有超过集群总数量的半数

我们再次把1号服务器启动起来,发现2号服务器又开始正常工作了。而且依然是领导者

/usr/local/zookeeper-cluster/zookeeper-1/bin/zkServer.sh start

/usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh status

我们把3号服务器也启动起来,把2号服务器停掉,停掉后观察1号和3号的状态

/usr/local/zookeeper-cluster/zookeeper-3/bin/zkServer.sh start
/usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh stop

/usr/local/zookeeper-cluster/zookeeper-1/bin/zkServer.sh status
/usr/local/zookeeper-cluster/zookeeper-3/bin/zkServer.sh status

发现新的leader产生了

当集群中的主服务器挂了,集群中的其他服务器会自动进行选举状态,然后产生新得leader

我们再次测试,当我们把2号服务器重新启动起来启动后,会发生什么?2号服务器会再次成为新的领导吗

/usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh start

/usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh status
/usr/local/zookeeper-cluster/zookeeper-3/bin/zkServer.sh status

我们会发现,2号服务器启动后依然是跟随者(从服务器),3号服务器依然是领导者(主服务器),没有撼动3号服务器的领导地位

当领导者产生后,再次有新服务器加入集群,不会影响到现任领导者

7.Zookeeper 集群核心理论

在ZooKeeper集群服务中有三个角色:

Leader 领导者 :

1.处理事务请求(增删改操作)

2.集群内部各服务器的调度者

Follower 跟随者 :

1.处理客户端非事务请求(查询操作),转发事务请求给Leader服务器

2.参与Leader选举投票

Observer 观察者:

处理客户端非事务请求(查询操作),转发事务请求给Leader服务器

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/736195.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

canvas实现水印逻辑分析

目录 效果图一、相关文档二、分析三、实现1、将水印文字转为水印图片2、给刚生成的水印图片加入旋转以及间隔&#xff08;1&#xff09;旋转位移&#xff08;2&#xff09;间隔位移&#xff08;3&#xff09;最后使用toDataURL导出为png图片 3、将生成的水印图片依次排布在需要…

【Python数据结构与判断2/7】数据和判断小结

目录 序言 print() 变量 赋值 四种数据类型 字符串 格式化输出 四则运算 取整与取模 比较运算 逻辑运算 判断 if语句 if-else语句 if-elif-else语句 Tips 空值、0、非0非空值 实战案例 输入密码 短信模板 总结 序言 今天将对前面学过的内容进行一个复习小结…

科技引领品质:飞利浦智能锁“12年免费换新机”重塑行业新标杆

随着智能锁行业的竞争愈发火热&#xff0c;各大品牌在技术创新和服务升级方面不断推陈出新。售后服务的形态正发生深刻变化&#xff0c;从传统的保修维修到如今的技术支持、24小时在线客服等&#xff0c;各大品牌都在不断地提升售后服务水平&#xff0c;以创新的服务理念和先进…

配置与管理DNS服务器

配置与管理DNS服务器 **1&#xff0c;什么是DNS&#xff1f;**负责将域名转换成实际想对应的ip地址&#xff0c;这个过程交域名解析。 **2&#xff0c;域名解析的方法&#xff1a;**分布式&#xff0c;层次结构的数据库系统。根域&#xff0c;顶级域&#xff0c;二级域&#…

sql注入基础学习

1.常用SQL语句 01、显示数据库 show databases&#xff1b; 02、打开数据库 use db name&#xff1b; 03、显示数据表 show tables&#xff1b; 04、显示表结构 describe table_name&#xff1b; 05、显示表中各字段信息&#xff0c;即表结构 show columns from table_nam…

面向对象的编程语言是什么意思?——跟老吕学Python编程

面向对象的编程语言是什么意思&#xff1f;——跟老吕学Python编程 面向对象是什么意思&#xff1f;面向对象的定义面向对象的早期发展面向对象的背景1.审视问题域的视角2.抽象级别3.封装体4.可重用性 面向对象的特征面向对象的开发方法面向对象程序设计基本思想实现 面向对象的…

数据结构-稀疏数组

文章目录 1、什么是稀疏数组&#xff1f;2、稀疏数组的存储流程3、代码实现4、运行结果 1、什么是稀疏数组&#xff1f; 当一个数组中大部分元素为0&#xff0c;或者为同一个值的数组时&#xff0c;可以使用稀疏数组来保存该数组。 2、稀疏数组的存储流程 记录数组一共有几行…

活体检测(点头,摇头,张嘴等动态识别)

活体检测&#xff08;点头&#xff0c;摇头&#xff0c;张嘴等动态识别&#xff09; 某本书里有一句话&#xff0c;等我去读、去拍案。 田间的野老&#xff0c;等我去了解、去惊识。 山风与发&#xff0c;冷泉与舌&#xff0c; 流云与眼&#xff0c;松涛与耳&#xff0c; 他们等…

21、状态模式(行为性模式)

版本一、get状态指针 #include <iostream> using namespace std;//前置声明 class Context;//状态 class State{ public://4个状态virtual void toUp (Context& context){ }virtual void toDown (Context& context){ }virtual void toLeft (Context& cont…

架构学习总结:企业架构=业务+数据+技术+应用架构

最近再次研读DAMA数据管理知识体系,结合工作对什么是企业架构?如何开展企业架构设计工作有一些新的认识,供大家参考。企业架构包括企业的业务架构、数据架构、技术架构和应用架构,要想做好企业的信息化数字化建设规划,这四个架构都不可缺少,这四个方面的内容共同组成了企…

[云原生] k8s配置资源管理

一、Secret的资源配置 1.1 Secret配置的相关说明 Secret 是用来保存密码、token、密钥等敏感数据的 k8s 资源&#xff0c;这类数据虽然也可以存放在 Pod 或者镜像中&#xff0c;但是放在 Secret 中是为了更方便的控制如何使用数据&#xff0c;并减少暴露的风险。 Secret 有…

【JavaSE】抽象类与接口

Object 类 类 java.lang.Object是类层次结构的根类&#xff0c;即所有类的父类。 除Object类之外的任何一个Java类&#xff0c;全部直接或间接的继承于Object类。由此&#xff0c;Object类也被称为根父类。Object类中声明的成员具有通用性&#xff0c;并且Object类中没有声明…

300W-500W-700W-1000W超薄制动电阻

EAK制动电阻&#xff0c;最大连续功率&#xff1a;300 W--1000W 制动电阻器&#xff0c;用于带有中低功率变频器 或作为充电电阻器的驱动器。 安装在变频器附近。 防护等级 IP 20 / IP 54 可根据要求提供更高的防护等级 测试电压 2.5 kV AC 可根据要求提供其他容量和安装…

SMART PLC自适应低通滤波器(收放卷线速度滤波)

一阶低通滤波器更多内容请参考信号处理专栏相关文章,常用链接如下: 1、SMART PLC 低通滤波器和模拟量采集应用 https://rxxw-control.blog.csdn.net/article/details/136595982https://rxxw-control.blog.csdn.net/article/details/1365959822、SMART PLC双线性变换和后向差…

Docker笔记-进入运行中的镜像,查看日志等操作

docker搭建好后&#xff0c;查看运行的docker镜像&#xff1a; docker ps -a 进入运行的容器&#xff0c;命令如下&#xff1a; docker exec -it <容器ID> /bin/bash # 或者&#xff0c;直接用容器里面的命令&#xff0c;比如mysql镜像 docker exec -it <容器ID>…

网络协议常见问题

网络协议常见问题 OSI&#xff08;Open Systems Interconnection&#xff09;模型OSI 封装 TCP/IP协议栈IP数据报的报头TCP头格式UDP头格式TCP (3-way shake)三次握手建立连接&#xff1a;为什么三次握手才可以初始化 Socket、序列号和窗口大小并建立 TCP 连接。每次建立TCP连接…

蓝桥杯单片机---第十二届省赛题目解析

文章目录 比赛题目一、代码相关定义、声明1.头文件声明2.变量声明 二、主要函数1.main函数2.按键扫描3.数码管显示4.电压模式1、2输出 & LED显示5.定时器中断6.消除85C显示 三、次要函数1.初始化函数Init2.按键函数Key3.LED函数Led4.数码管函数Seg5.iic函数中6.onewire函数…

【LeetCode】17.电话号码的字母组合

题目 链接&#xff1a;17. 电话号码的字母组合 - 力扣&#xff08;LeetCode&#xff09; 给定一个仅包含数字2-9的字符串&#xff0c;返回所有它能表示的字母组合。答案可以按任意顺序返回 给出数字到字母的映射如下&#xff08;与电话按键相同&#xff09;。注意 1 不对应任何…

Vue ECharts line3D点击空白处重置图表视角- 附完整示例

ECharts&#xff1a;一个基于 JavaScript 的开源可视化图表库。 目录 效果 一、介绍 1、官方文档&#xff1a;Apache ECharts 2、官方示例 二、准备工作 1、安装依赖包 2、示例版本 三、使用步骤 1、在单页面引入 echarts 2、指定容器并设置容器宽高 3、数据处理&…

leetcode 热题 100_反转链表

题解一&#xff1a; 迭代&#xff1a;逐步修改节点指针&#xff0c;注意在修改前要保存下一个节点指针。 class Solution {public ListNode reverseList(ListNode head) {ListNode pre null;while (head! null) {ListNode temp head.next;head.next pre;pre head;head te…