Zookeeper:实现“分布式锁”的 Demo

Zookeeper 能保证数据的强一致性,用户任何时候都可以相信集群中每个节点的数据都是相同的。一个用户创建一个节点作为锁,另一个用户检测该节点,如果存在,代表别的用户已经锁住,如果不存在,则可以创建一个节点,代表拥有一个锁。

本篇内容包括:Demo 概述、代码实现、测试结果


文章目录

    • 一、Demo 概述
        • 1、关于 zookeeper “命名服务协调”
        • 2、Demo 设计
        • 3、Demo 前提
    • 二、代码实现
        • 1、引用 Maven 依赖
        • 2、ConnectionWatcher 类创建 Zookeeper 连接
        • 3、ActiveKeyValueStore 类读写 Zookeeper 数据
        • 4、ZkLock 类实现分布式锁
    • 三、测试结果


一、Demo 概述

1、关于 zookeeper “命名服务协调”

Zookeeper 能保证数据的强一致性,用户任何时候都可以相信集群中每个节点的数据都是相同的。一个用户创建一个节点作为锁,另一个用户检测该节点,如果存在,代表别的用户已经锁住,如果不存在,则可以创建一个节点,代表拥有一个锁。

2、Demo 设计

分布式锁本质,就是多个资源竞争者对一份资源的排他占有

  • 我们设置多个线程,分别在同一 path 下创建节点
  • 没个线程获取当前 path 下子节点,看最小子节点是否为自身,是则加锁成功(更好的方式是用 Watcher 对前一个地址监控,这里图方便用子节点排序取最小的方式 )
  • 线程加锁成功后,执行任务,执行完毕后解锁

3、Demo 前提

参考:Mac通过Docker安装Zookeeper集群


二、代码实现

1、引用 Maven 依赖

        <!--    选择对应的Zookeeper版本    --><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.7.0</version></dependency>

2、ConnectionWatcher 类创建 Zookeeper 连接

import java.io.IOException;
import java.util.concurrent.CountDownLatch;import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;public class ConnectionWatcher implements Watcher {private final CountDownLatch connectedSignal = new CountDownLatch(1);private static final int SESSION_TIMEOUT = 5000;protected ZooKeeper zk;public void connect(String hosts) throws IOException, InterruptedException {zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this);connectedSignal.await();}@Overridepublic void process(WatchedEvent event) {if (event.getState() == Event.KeeperState.SyncConnected) {connectedSignal.countDown();}}public void close() throws InterruptedException {zk.close();}}

3、ActiveKeyValueStore 类读写 Zookeeper 数据

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.List;import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;public class ActiveKeyValueStore extends ConnectionWatcher {private static final Charset CHARSET = StandardCharsets.UTF_8;int state = 0;/*** 写入节点数据** @param path  节点地址* @param value 数据值* @throws InterruptedException 中断异常* @throws KeeperException      ZooKeeper异常*/public void write(String path, String value) throws InterruptedException, KeeperException {Stat stat = zk.exists(path, false);if (stat == null) {if (value == null) {zk.create(path, null,ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);} else {zk.create(path, value.getBytes(CHARSET),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}} else {if (value == null) {zk.setData(path, null, -1);} else {zk.setData(path, value.getBytes(CHARSET), -1);}}}public boolean lock(String path, String name) throws InterruptedException, KeeperException {boolean flag = tryLock(path, name);if (flag) {state++;}return flag;}public boolean tryLock(String path, String name) throws InterruptedException, KeeperException {String lockPath = path + "/" + name;zk.create(lockPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);List<String> waits = readChildren(path, null);Collections.sort(waits);if (waits.get(0).equals(name)) {return true;}CountDownLatch latch = new CountDownLatch(1);for (int i = 0; i < waits.size(); i++) {String cur = waits.get(i);if (!cur.equalsIgnoreCase(name)) {continue;}String prePath = path + "/" + waits.get(i - 1);zk.exists(prePath, new Watcher() {@Overridepublic void process(WatchedEvent event) {latch.countDown();}});break;}latch.await();return true;}public boolean unlock(String path, String name) {if (state > 1) {state--;return true;}String lockPath = path + "/" + name;try {Stat stat = zk.exists(lockPath, false);int version = stat.getVersion();zk.delete(lockPath, version);state--;return true;} catch (Exception e) {System.out.println("unlock:" + lockPath + " ,exception,");}return false;}/*** 获取所有子节点** @param path    节点地址* @param watcher watcher* @return 所有子节点* @throws InterruptedException 中断异常* @throws KeeperException      ZooKeeper异常*/public List<String> readChildren(String path, Watcher watcher) throws InterruptedException, KeeperException {List<String> childrens = null;if (watcher == null) {childrens = zk.getChildren(path, false);} else {childrens = zk.getChildren(path, watcher, null);}return childrens;}
}

4、ZkLock 类实现分布式锁

import lombok.SneakyThrows;
import org.apache.zookeeper.KeeperException;import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;public class ZkLock {/*** 开启的线程数,模拟多客户端操作*/private static final int CLIENTS_NUM = 3;private final ActiveKeyValueStore store;public ZkLock(String hosts) throws IOException, InterruptedException {//定义一个类store = new ActiveKeyValueStore();//连接Zookeeperstore.connect(hosts);}public static void testLock() {//线程计数器控制业务的执行final CountDownLatch countDownLatch = new CountDownLatch(CLIENTS_NUM);for (int i = 0; i < CLIENTS_NUM; i++) {new Thread() {@Overridepublic void run() {}}.start();}try {// 堵塞线程,任务执行完后释放countDownLatch.await();} catch (InterruptedException e) {e.printStackTrace();}}public static void main(String[] args) throws IOException, InterruptedException, KeeperException {String hosts = "localhost:2181";ZkLock zkLock = new ZkLock(hosts);// 创建父节点zkLock.store.write("/lock4", "父亲节点");//CountDownLatch latch = new CountDownLatch(CLIENTS_NUM);for (int i = 0; i < CLIENTS_NUM; i++) {int finalI = i;new Thread() {@SneakyThrows@Overridepublic void run() {String name = "Thread-" + String.valueOf(finalI);zkLock.store.lock("/lock4", name);TimeUnit.SECONDS.sleep(2);System.out.println("线程-" + name + "执行完毕");latch.countDown();zkLock.store.unlock("/lock4", name);}}.start();}latch.await();System.out.println("end ...");}}

三、测试结果

ZkLock 代码测试结果如下:

线程-Thread-0执行完毕
线程-Thread-1执行完毕
线程-Thread-2执行完毕
end ...

通过 ZkLock 打印的信息可以看出,已经成功模拟实现分布式锁

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/535441.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JavaIO流:案例

java.io 包下需要掌握的流有 16 个&#xff0c;本篇内容包括&#xff1a;java.io包下需要掌握的流、Java IO 案例。 文章目录一、java.io包下需要掌握的流二、Java IO 案例1、Demo 1&#xff08;FileInputStream&#xff09;2、Demo 2&#xff08;FileInputStream&#xff09;3…

比对excel数据

#!/usr/bin/env pythonimport openpyxl from openpyxl.styles import PatternFill from openpyxl.styles import colors from openpyxl.styles import Font, Color aD:/测算单位设置/比对/吉林/tmp001.xlsx bD:/测算单位设置/比对/吉林/国网吉林电力.xlsx cD:/测算单位设置/比对…

CPU 是如何执行任务的

前言 你清楚下面这几个问题吗&#xff1f; 有了内存&#xff0c;为什么还需要 CPU Cache&#xff1f; CPU 是怎么读写数据的&#xff1f; 如何让 CPU 能读取数据更快一些&#xff1f; CPU 伪共享是如何发生的&#xff1f;又该如何避免&#xff1f; CPU 是如何调度任务的&a…

Ansible 的自动化运维

1、Ansible 特点 Ansible 自 2012 年发布以来&#xff0c;很快在全球流行&#xff0c;其特点如下&#xff1a; Ansible 基于 Python 开发&#xff0c;运维工程师对其二次开发相对比较容易&#xff1b; Ansible 丰富的内置模块&#xff0c;几乎可以满足一切要求&#xff1b; …

Shell 信号发送与捕捉

1、Linux信号类型 信号&#xff08;Signal&#xff09;&#xff1a;信号是在软件层次上对中断机制的一种模拟&#xff0c;通过给一个进程发送信号&#xff0c;执行相应的处理函数。 进程可以通过三种方式来响应一个信号&#xff1a; 1&#xff09;忽略信号&#xff0c;即对信…

运维面试题总结

集群相关 简述 ETCD 及其特点&#xff1f; etcd 是 CoreOS 团队发起的开源项目&#xff0c;是一个管理配置信息和服务发现&#xff08;service discovery&#xff09;的项目&#xff0c;它的目标是构建一个高可用的分布式键值&#xff08;key-value&#xff09;数据库&#x…

详解设计模式:建造者模式

建造者模式&#xff08;Builder Pattern&#xff09;也叫做生成器模式&#xff0c;是 GoF 的 23 种设计模式的一种&#xff0c;它将一个复杂对象的构建与它的表示分离&#xff0c;使得同样的构建过程可以创建不同的表示。 当我们需要实列化一个复杂的类&#xff0c;以得到不同结…

图文并茂 VLAN 详解,让你看一遍就理解 VLAN

一、为什么需要VLAN 1.1、什么是VLAN? VLAN(Virtual LAN)&#xff0c;翻译成中文是“虚拟局域网”。LAN可以是由少数几台家用计算机构成的网络&#xff0c;也可以是数以百计的计算机构成的企业网络。VLAN所指的LAN特指使用路由器分割的网络——也就是广播域。 在此让我们先复习…

认识VLAN,并学会VLAN的划分和网络配置实例

VLAN的划分和网络的配置实例 1、VLAN基础知识 VLAN&#xff08;Virtual Local Area Network&#xff09;的中文名为&#xff1a;“虚拟局域网”&#xff0c;注意和VPN&#xff08;虚拟专用网&#xff09;进行区分。 VLAN是一种将局域网设备从逻辑上划分&#xff08;不是从物…

VLAN划分及配置注意事项

VLAN&#xff08;Virtual Local Area Network&#xff09;即虚拟局域网&#xff0c;是将一个物理的LAN在逻辑上划分成多个广播域的通信技术。VLAN内的主机间可以直接通信&#xff0c;而VLAN间不能直接通信&#xff0c;从而将广播报文限制在一个VLAN内。VLAN之间的通信是通过第3…

Docker原理剖析

一、简介 1、了解Docker的前生LXC LXC为Linux Container的简写。可以提供轻量级的虚拟化&#xff0c;以便隔离进程和资源&#xff0c;而且不需要提供指令解释机制以及全虚拟化的其他复杂性。相当于C中的NameSpace。容器有效地将由单个操作系统管理的资源划分到孤立的组中&#…

获取Linux内存、cpu、磁盘IO等信息

#!/bin/bash # 获取要监控的本地服务器IP地址 IPifconfig | grep inet | grep -vE inet6|127.0.0.1 | awk {print $2} echo "IP地址&#xff1a;"$IP# 获取cpu总核数 cpu_numgrep -c "model name" /proc/cpuinfo echo "cpu总核数&#xff1a;"$c…

Docker容器网络解析

Docker 容器网络的发展历史 在 Dokcer 发布之初&#xff0c;Docker 是将网络、管理、安全等集成在一起的&#xff0c;其中网络模块可以为容器提供桥接网络、主机网络等简单的网络功能。 从 1.7 版本开始&#xff0c;Docker正是把网络和存储这两部分的功能都以插件化形式剥离出来…

将指定excel的一列数据提取到另一个excel的指定列

#!/usr/bin/env python import openpyxl bjD:/地市县公司/西藏台账数据分析-设备台帐分析.xlsx wb openpyxl.load_workbook (bj) get_sheets wb.sheetnames #print(get_sheets) TA01TA01 TA02TA02 TA03TA03 TE01TE01 YG201YG201 YG202YG202 YG203YG203 YG204YG204 YG205YG205…

Docker 数据管理介绍

默认容器的数据是保存在容器的可读写层&#xff0c;当容器被删除时其上的数据也会丢失&#xff0c;所以为了实现数据的持久性则需要选择一种数据持久技术来保存数据。官方提供了三种存储方式&#xff1a;Volumes、Bind mounts和tmpfs。前面还介绍了&#xff1a;Docker 服务终端…

Docker 数据持久化的三种方案

容器中的数据可以存储在容器层。但是将数据存放在容器层存在以下问题&#xff1a; 数据不是持久化。意思是如果容器删除了&#xff0c;这些数据也就没了 主机上的其它进程不方便访问这些数据 对这些数据的I/O会经过存储驱动&#xff0c;然后到达主机&#xff0c;引入了一层间…

Git 存储原理及相关实现

Git 是目前最流行的版本控制系统&#xff0c;从本地开发到生产部署&#xff0c;我们每天都在使用 Git 进行我们的版本控制&#xff0c;除了日常使用的命令之外&#xff0c;如果想要对 Git 有更深一步的了解&#xff0c;那么研究下 Git 的底层存储原理将会对理解 Git 及其使用非…

Git内部原理

Git有什么特点&#xff1f; fast&#xff0c;scalable&#xff0c;distributed revision control system&#xff08;快速&#xff0c;可扩展的分布式版本控制系统&#xff09; 几乎所有操作都是本地执行 每一个clone都是整个生命周期的完整副本 the stupid content tracker&a…

git存储原理

四种数据类型 实际上Git基于数据类型的不同&#xff0c;把对象分为四种&#xff1a;数据对象、树对象、提交对象、标签对象。Git文件系统的设计思路与linux文件系统相似&#xff0c;即将文件的内容与文件的属性分开存储&#xff0c;文件内容以“装满字节的袋子”存储在文件系统…

详解设计模式:中介者模式

中介者模式&#xff08;Mediator Pattern&#xff09;也被称为调停者模式&#xff0c;是在 GoF 23 种设计模式中定义了的行为型模式。 中介者模式 是用来降低多个对象和类之间的通信复杂性。这种模式提供了一个中介类&#xff0c;该类通常处理不同类之间的通信&#xff0c;并支…