本文Java代码地址: https://gitee.com/blackjie_1/ljUp/tree/master/zookeeperDemo
个人博客网站:什么是快乐
基于docker 安装
拉取zookeeper 3.4.10
docker pull zookeeper:3.4.10
启动服务端
docker run -d -p 2181:2181 -v /root/docker/zookeeper:/home/zookeeper --name zookeeper_1 --restart always zookeeper:3.4.10
启动客户端
docker run -it --``rm` `--link zookeeper_one:zookeeper zookeeper zkCli.sh -server zookeeper
或者
docker exec -it zookeeper_1 zkCli.sh
数据模型
其数据结构类似一个树形结构,每个节点可以拥有子节点并能存储1M的数据
1、持久化
2、临时节点 -e
3、持久化顺序节点 -s
4、临时顺序节点 -es
客户端命令
连接本地zookeeper
docker exec -it zookeeper_1 zkCli.sh
或者
docker run -it --rm --link zookeeper_1:zookeeper_1 zookeeper:3.4.10 zkCli.sh -server zookeeper_1
退出
quit
查看节点
ls /节点名称
创建节点
create /节点名称 [数据] create -e /节点名称 [数据] 临时节点,当前会话断开时,临时节点会删除create -s /节点名称 [数据] 顺序节点,节点名称后会有编号create -s /节点名称 [数据] 临时的顺序节点
获取数据
get /节点名称
设置数据
set /节点名称 [数据]
删除
delete /节点名称delete all /节点名称 删除节点及节点下所有节点
Java代码操作
maven依赖
<!-- 本次学习zookeeper版本是3.4.10--><dependency><groupId>org.apache.curator</groupId><artifactId>curator-client</artifactId><version>2.12.0</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>2.12.0</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>2.12.0</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId></dependency>
增删改查
连接
public void curatorFramework() {//重试策略ExponentialBackoffRetry exponentialBackoffRetry = new ExponentialBackoffRetry(3000, 10);//第一种方式
// CuratorFramework client = CuratorFrameworkFactory
// .newClient(zookeeperUrl, 60000, 15000, exponentialBackoffRetry);//第二种方法client = CuratorFrameworkFactory.builder().connectString("192.168.106.128:2181").connectionTimeoutMs(15000).sessionTimeoutMs(60000).retryPolicy(exponentialBackoffRetry).namespace("lj").build();client.start();}
新增
/*** zookeeper 创建节点 持久、临时、顺序 带有数据* create* 1、创建节点并带有数据* 2、设置节点类型* 3、创建多级节点*/@Testpublic void curatorCreate() throws Exception {//基本创建String app1 = client.create().forPath("/app1");//创建带有数据的节点String s1 = client.create().forPath("/app2", "李杰_app2".getBytes(StandardCharsets.UTF_8));//创建节点默认类型:持久化。可通过withMode方法设置类型String s2 = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3", "李杰_临时".getBytes(StandardCharsets.UTF_8));//创建多级节点//creatingParentsIfNeeded :创建多级节点,如果父节点不存在则创建父节点String s3 = client.create().creatingParentsIfNeeded().forPath("/app4/app5", "李杰_多级节点".getBytes(StandardCharsets.UTF_8));System.out.println(s3);}
查询
/*** zookeeper 查询节点* 1、查询字节点 ls /* 2、获取数据 get /* 3、查询节点状态 ls -s*/@Testpublic void curatorQuery() throws Exception {//获取数据 getDatabyte[] bytes = client.getData().forPath("/app1");//查询子节点 getChildrenList<String> strings = client.getChildren().forPath("/");//查询子节点信息+数据信息//stat 用于获取节点信息,结果会放在stat对象中Stat stat = new Stat();byte[] bytes1 = client.getData().storingStatIn(stat).forPath("/app2");System.out.println(new String(bytes1));}
修改
/*** zookeeper 修改节点数据* 1、修改数据* 2、根据版本修改数据*/@Testpublic void curatorUpdate() throws Exception {//1、修改数据
// client.setData().forPath("/app2","app2_修改".getBytes(StandardCharsets.UTF_8));//2、根据版本修改数据 withVersion//获取版本号Stat stat = new Stat();client.getData().storingStatIn(stat).forPath("/app2");//根据版本号修改数据client.setData().withVersion(stat.getVersion()).forPath("/app2","app2_version_update".getBytes(StandardCharsets.UTF_8));}
删除
/*** zookeeper 删除节点* 1、删除单个节点* 2、删除带有子节点的节点* 3、必须成功的删除* 4、回调函数*/@Testpublic void curatorDelete() throws Exception {//1、删除数据client.delete().forPath("/app1");//2、删除带有子节点的节点 deletingChildrenIfNeededclient.delete().deletingChildrenIfNeeded().forPath("/app4");//3、删除子节点 (必须删除成功,本质是重试策略) guaranteedclient.delete().guaranteed().forPath("/app4");//4、回调函数 inBackground 。在删除后执行的方法client.delete().guaranteed().inBackground(new BackgroundCallback(){@Overridepublic void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {System.out.println("我被删除了");System.out.println(curatorEvent);}}).forPath("/app4");}
监听器
* 监听节点
* 1、监听单个节点 nodeCache
* 2、监听节点下所有子节点 PathChildrenCache
* 3、监听单个节点和节点下的所有子节点 TreeCache
监听单个节点
@Testpublic void curatorFrameworkWatch() throws Exception {//监听单个节点//1、创建监听对象NodeCache nodeCache = new NodeCache(client,"/app2",false);//2、注册监听nodeCache.getListenable().addListener(new NodeCacheListener() {@Overridepublic void nodeChanged() throws Exception {byte[] data = nodeCache.getCurrentData().getData();System.out.println("节点发生改变,当前值:"+new String(data));}});//3、开启监听nodeCache.start();while (true){}}
监听某节点下的所有子节点
/*** 监听某节点的所有子节点* @throws Exception*/@Testpublic void curatorFrameworkWatchChildren() throws Exception {//监听某节点的所有子节点//1、创建监听对象PathChildrenCachePathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/app2", true);//2、注册监听pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {@Overridepublic void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {//会监听很多数据,包括节点新增,删除,修改连接等System.out.println("节点发生改变");System.out.println(event);//监听子节点数据发生变化if(PathChildrenCacheEvent.Type.CHILD_UPDATED.equals(event.getType())){// 确实是子节点数据发生变化,获取变化后的值byte[] data = event.getData().getData();String s = new String(data);System.out.println(s);}}});//3、开启监听pathChildrenCache.start();while (true){}}
监听某节点和其所有的子节点
/*** 监听某节点和其所有子节点* @throws Exception*/@Testpublic void curatorFrameworkWatchAll() throws Exception {//1、创建监听对象PathChildrenCacheTreeCache pathChildrenCache = new TreeCache(client, "/app2");//2、注册监听pathChildrenCache.getListenable().addListener(new TreeCacheListener() {@Overridepublic void childEvent(CuratorFramework curatorFramework, TreeCacheEvent event) throws Exception {//会监听很多数据,包括节点新增,删除,修改连接等System.out.println("节点发生改变");System.out.println(event);//监听子节点数据发生变化if(TreeCacheEvent.Type.NODE_UPDATED.equals(event.getType())){// 确实是节点数据发生变化,获取变化后的值byte[] data = event.getData().getData();String s = new String(data);System.out.println(s);}}});//3、开启监听pathChildrenCache.start();while (true){}}
分布式锁
简略概念:多机器下的对于锁的处理。实现方式:
1、redis (性能高,但是不是很可靠)
2、数据库实现(获得锁:数据库新增一条唯一数据。释放锁:删除新增的数据。锁等待:等新增成功。此思想同样可以用redis实现。)
3、zookeeper
Java代码实现
本次使用的锁是InterProcessMutex
主要步骤:
1、构建CuratorFramework client 对象
2、通过client 构建InterProcessMutex 对象:lock= new InterProcessMutex(client, “/lock”);
3、执行业务前获取锁:boolean acquire = lock.acquire(5, TimeUnit.SECONDS);
4、业务结束后释放锁:lock.release();
模拟售票
public class ZookeeperLockTests {private static class Tick12306{private int tick=100;public int buyTick(){int result=0;if(tick>0){result=tick;tick--;}else{System.out.println("无票了");return -1000;}return result;}}private static class OtherTick implements Runnable{//抢票机构名称private String name;//12306票池private Tick12306 tick12306;//分布式锁private InterProcessMutex lock;public OtherTick(String name,Tick12306 tick12306){this.name=name;this.tick12306=tick12306;//重试策略ExponentialBackoffRetry exponentialBackoffRetry = new ExponentialBackoffRetry(3000, 10);CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.106.128:2181").connectionTimeoutMs(15000).sessionTimeoutMs(60000).retryPolicy(exponentialBackoffRetry).namespace("lj").build();client.start();lock = new InterProcessMutex(client, "/lock");}//抢票@Overridepublic void run() {while (tick12306.tick>0){try {//获取锁boolean acquire = lock.acquire(5, TimeUnit.SECONDS);if(acquire){System.out.println(this.name+"抢票:"+tick12306.buyTick());}}catch (Exception e){e.printStackTrace();}finally {try {//锁释放lock.release();} catch (Exception e) {e.printStackTrace();}}}}}public static void main(String[] args) {Tick12306 tick12306 = new Tick12306();OtherTick t1 = new OtherTick("携程", tick12306);OtherTick t2 = new OtherTick("飞猪", tick12306);Thread thread1 = new Thread(t1);Thread thread2 = new Thread(t2);thread1.start();thread2.start();}}