大数据Zookeeper--案例

文章目录

  • 服务器动态上下线监听案例
    • 需求
    • 需求分析
    • 具体实现
    • 测试
  • Zookeeper分布式锁案例
    • 原生Zookeeper实现分布式锁
    • Curator框架实现分布式锁
  • Zookeeper面试重点
    • 选举机制
    • 生产集群安装多少zk合适
    • zk常用命令

服务器动态上下线监听案例

需求

某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知
到主节点服务器的上下线。

需求分析

服务器动态上下线

具体实现

1)先在集群上创建/servers节点

[zk: localhost:2181(CONNECTED) 10] create /servers "servers" 
Created /servers

2)在Idea中创建包名:com.yudan.case1

3)服务器端向Zookeeper注册代码

import org.apache.zookeeper.*;import java.io.IOException;public class DistributeServer {private String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";private int sessionTime = 100000;private ZooKeeper zk;public static void main(String[] args) throws IOException, InterruptedException, KeeperException {DistributeServer server = new DistributeServer();// 1、获取zk连接server.getConnect();// 2、注册服务器到zk集群server.regist(args[0]);// 3、启动 业务逻辑(睡觉)server.business();}// 创建到 zk 的客户端连接private void getConnect() throws IOException {zk = new ZooKeeper(connectString, sessionTime, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {}});}// 注册到服务器private void regist(String hostname) throws InterruptedException, KeeperException {String create = zk.create("/servers/"+hostname, hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);System.out.println(hostname + " " + "is online");}// 业务功能private void business() throws InterruptedException {Thread.sleep(Long.MAX_VALUE);}
}

4)客户端代码

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;public class DistributeClient {private String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";private int sessionTime = 100000;private ZooKeeper zk;public static void main(String[] args) throws IOException, InterruptedException, KeeperException {DistributeClient client = new DistributeClient();// 1、获取zk连接client.getConnect();// 2、监听/servers下面子节点的增加和删除client.getServersList();// 3、业务逻辑(睡觉)client.business();}// 创建到 zk 的客户端连接private void getConnect() throws IOException {zk = new ZooKeeper(connectString, sessionTime, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {// 再次启动监听try {getServersList();} catch (InterruptedException e) {throw new RuntimeException(e);} catch (KeeperException e) {throw new RuntimeException(e);}}});}// 获取服务器列表信息private void getServersList() throws InterruptedException, KeeperException {// 获取服务器子节点信息,并且对父节点进行监听List<String> children = zk.getChildren("/servers", true);// 存储服务器信息列表 ArrayList<String> servers = new ArrayList<>();// 遍历所有节点,获取节点中的主机名称信息 for (String child : children) {byte[] data = zk.getData("/servers/" + child, false, null);servers.add(new String(data));}// 打印System.out.println(servers);}private void business() throws InterruptedException {Thread.sleep(Long.MAX_VALUE);}
}

测试

1)在Linux命令行上操作增加减少服务器

(1)启动DistributeClient 客户端

(2)在hadoop102上zk的客户端/servers目录上创建临时带序号节点

[zk: localhost:2181(CONNECTED) 1]  create -e -s /servers/hadoop102 "hadoop102" 
[zk: localhost:2181(CONNECTED) 2]  create -e -s /servers/hadoop103 "hadoop103"

(3)观察Idea控制台变化

[hadoop102, hadoop103]

(4)执行删除操作

[zk: localhost:2181(CONNECTED) 8]  delete /servers/hadoop1020000000000 

(5)观察Idea控制台变化

[hadoop103] 

2)在Idea上操作增加减少服务器

(1)启动DistributeClient 客户端(如果已经启动过,不需要重启)

(2)启动DistributeServer 服务

  • 点击Edit Configurations…
    在这里插入图片描述
  • 在弹出的窗口中(Program arguments)输入想启动的主机,例如,hadoop102
    在这里插入图片描述
  • 回到DistributeServer的main方法,右键,在弹出的窗口中点击Run “DistributeServer.main()”
  • 观察DistributeServer控制台,提示hadoop102 is online
  • 观察DistributeClient控制台,提示hadoop102已经上线

Zookeeper分布式锁案例

什么叫做分布式锁呢?

比如说"进程1"在使用该资源的时候,会先去获得锁,"进程1"获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源,"进程1"用完该资源以后就将锁释放掉,让其他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的访问该临界资源。那么我们把这个分布式环境下的这个锁叫作分布式锁。
分布式锁

原生Zookeeper实现分布式锁

1)分布式锁实现

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;public class DistributeLock {private final String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";private final int sessionTime = 100000;private final ZooKeeper zk;// 当前client等待的子节点private String waitPath;// zookeeper节点等待private CountDownLatch waitLatch = new CountDownLatch(1);// zookeeper连接private CountDownLatch connectLatch = new CountDownLatch(1);// 当前client创建的子节点private String currentMode;// 和 zk 服务建立连接,并创建根节点public DistributeLock() throws IOException, InterruptedException, KeeperException {// 1、获取连接zk = new ZooKeeper(connectString, sessionTime, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {// connectLatch 如果连接上zk 可以释放// 连接建立时, 打开latch, 唤醒wait在该latch上的线程if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {connectLatch.countDown();}// waitLatch 需要释放// 发生了waitPath的删除事件if (watchedEvent.getType() == Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)) {waitLatch.countDown();}}});// 等待 zookeeper正常连接后,往下走程序connectLatch.await();// 2、判断根节点/locks是否存在Stat stat = zk.exists("/locks", false);if (stat == null) {// 创建一下根节点zk.create("/locks","locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);}}// 对zk加锁public void zkLock() {// 创建对应的临时带序号节点try {currentMode = zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);// 判断创建的节点是否是最小的序号节点,如果是获取到锁;如果不是, 监听他序号的前一个节点List<String> children = zk.getChildren("/locks", false);// 如果children 只有一个值,那就直接获取锁;如果有多个节点,需要判断,哪个节点最小if (children.size() == 1) {return;} else {// 对children集合内的节点进行排序Collections.sort(children);// 获取节点名称 seq-String thisNode = currentMode.substring("/locks/".length());// 通过seq- 获取到该节点在children集合中的位置int index = children.indexOf(thisNode);// 判断if (index == -1) {System.out.println("数据异常");} else if (index == 0) {// 就一个节点,可以获取锁了return;}else {// 需要监听前一个节点waitPath = "/locks/" + children.get(index-1);zk.getData(waitPath,true,null);// 等待监听waitLatch.await();return;}}} catch (KeeperException e) {throw new RuntimeException(e);} catch (InterruptedException e) {throw new RuntimeException(e);}}// 对zk解锁public void unzkLock() {// 删除节点try {zk.delete(currentMode,-1);} catch (InterruptedException e) {throw new RuntimeException(e);} catch (KeeperException e) {throw new RuntimeException(e);}}
}

2)分布式锁测试

(1)创建两个线程

import org.apache.zookeeper.KeeperException;import java.io.IOException;public class DistributeLockTest {public static void main(String[] args) throws IOException, InterruptedException, KeeperException {// 创建分布式锁1final DistributeLock lock1 = new DistributeLock();// 创建分布式锁2final DistributeLock lock2 = new DistributeLock();new Thread(new Runnable() {@Overridepublic void run() {// 获取锁对象try {lock1.zkLock();System.out.println("线程1 启动,获取到锁");Thread.sleep(5 * 1000);lock1.unzkLock();System.out.println("线程1 释放锁");} catch (InterruptedException e) {throw new RuntimeException(e);}}}).start();new Thread(new Runnable() {@Overridepublic void run() {// 获取锁对象try {lock2.zkLock();System.out.println("线程2 启动,获取到锁");Thread.sleep(5 * 1000);lock2.unzkLock();System.out.println("线程2 释放锁");} catch (InterruptedException e) {throw new RuntimeException(e);}}}).start();}
}

(2)观察控制台变化

线程1获取锁 
线程1释放锁 
线程2获取锁 
线程2释放锁

Curator框架实现分布式锁

1)原生的Java API开发存在的问题

(1)会话连接是异步的,需要自己去处理。比如使用CountDownLatch

(2)Watch需要重复注册,不然就不能生效

(3)开发的复杂性还是比较高的

(4)不支持多节点删除和创建。需要自己去递归

2)Curator是一个专门解决分布式锁的框架,解决了原生Java API开发分布式遇到的问题。

详情请查看官方文档:https://curator.apache.org/index.html

3)Curator 案例实操

(1)添加依赖

<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.3.0</version> 
</dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.3.0</version> 
</dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-client</artifactId> <version>4.3.0</version> 
</dependency> 

(2)代码实现

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;public class CuratorLockTest {public static void main(String[] args) {// 创建分布式锁1InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");// 创建分布式锁2InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");new Thread(new Runnable() {@Overridepublic void run() {try {lock1.acquire();System.out.println("线程1 获取到锁");lock1.acquire();System.out.println("线程1 再次获取到锁");Thread.sleep(5 * 1000);lock1.release();System.out.println("线程1 释放锁");lock1.release();System.out.println("线程1 再次释放锁");} catch (Exception e) {throw new RuntimeException(e);}}}).start();new Thread(new Runnable() {@Overridepublic void run() {try {lock2.acquire();System.out.println("线程2 获取到锁");lock2.acquire();System.out.println("线程2 再次获取到锁");Thread.sleep(5 * 1000);lock2.release();System.out.println("线程2 释放锁");lock2.release();System.out.println("线程2 再次释放锁");} catch (Exception e) {throw new RuntimeException(e);}}}).start();}// 分布式锁初始化private static CuratorFramework getCuratorFramework() {// 重试策略,初始时间3秒,重试3次ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);CuratorFramework client = CuratorFrameworkFactory.builder().connectString("hadoop102:2181,hadoop103:2181,hadoop104:2181").connectionTimeoutMs(100000).sessionTimeoutMs(100000).retryPolicy(policy).build();// 启动客户端client.start();System.out.println("zookeeper 启动成功!");return client;}
}

(2)观察控制台变化:

线程1获取锁 
线程1再次获取锁 
线程1释放锁 
线程1再次释放锁 
线程2获取锁 
线程2再次获取锁 
线程2释放锁 
线程2再次释放锁

Zookeeper面试重点

选举机制

半数机制,超过半数的投票通过,即通过。

(1)第一次启动选举规则:

投票过半数时,服务器id大的胜出

(2)第二次启动选举规则:

①EPOCH大的直接胜出

②EPOCH相同,事务id大的胜出

③事务id相同,服务器id大的胜出

生产集群安装多少zk合适

安装奇数台。

生产经验:

  • 10台服务器:3台zk;
  • 20台服务器:5台zk;
  • 100台服务器:11台zk;
  • 200台服务器:11台zk

zk常用命令

ls、get、create、delete

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/670370.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CentOS 8 安装配置 Hadoop3.3.6 伪分布式安装方式(适用于开发和调试)

1.配置服务器ssh免密登录&#xff0c;否则后面启动会报错&#xff1a;尝试通过SSH连接到主机出现认证错误的提示 配置服务器ssh免密登录&#xff1a; 1.生成SSH密钥对&#xff08;如果尚未生成&#xff09;&#xff1a; 执行下面的命令生成密钥对&#xff0c;一直回车即可 ssh…

为后端做准备

这里写目录标题 flask 文件上传与接收flask应答&#xff08;接收请求&#xff08;文件、数据&#xff09;flask请求&#xff08;上传文件&#xff09;传递参数和文件 argparse 不从命令行调用参数1、设置default值2、"从命令行传入的参数".split()3、[--input,内容] …

代码随想录day18--二叉树的应用6

LeetCode530.二叉搜索树的最小绝对差值 题目描述&#xff1a; 给你一个二叉搜索树的根节点 root &#xff0c;返回 树中任意两不同节点值之间的最小差值 。 差值是一个正数&#xff0c;其数值等于两值之差的绝对值。 示例 1&#xff1a; 输入&#xff1a;root [4,2,6,1,3] …

CSDN2024年我的创作纪念日1024天|不忘初心|努力上进|积极向前

CSDN2024年我的创作纪念日1024天| 学习成长机遇&#xff1a;学习成长收获&#xff1a;2023年度总结数据&#xff1a;2024新领域的探索&#xff1a;日常和自己的感慨&#xff1a;2024憧憬和规划&#xff1a;创作纪念日总结&#xff1a; 学习成长机遇&#xff1a; 大家好&#x…

SpringBoot-基础篇03

之前搭建了整个开发环境实现了登录注册&#xff0c;springBoot整合mybatis完成增删改查&#xff0c;今天完成分页查询&#xff0c;使用阿里云oss存储照片等资源&#xff0c;后期会尝试自己搭建分布式文件系统来实现。 一&#xff0c;SpringBootMybatis完成分页查询 1&#xff…

天线阵列车载应用——第1章 介绍 1.1节 汽车工业中的天线阵列:应用和频率范围

1.1 汽车工业中的天线阵列:应用和频率范围 无线通信系统的发展需要新的技术来支持更高质量的通信、新的服务和应用。近年来&#xff0c;汽车无线通信市场得到了极大的扩展。现代汽车使用不同的服务:AM/FM收音机、卫星广播(SDARS)、移动电话通信、数字音频广播(DAB)、远程无钥匙…

零基础学编程从入门到精通,系统化的编程视频教程上线,中文编程开发语言工具构件之缩放控制面板构件用法

一、前言 零基础学编程从入门到精通&#xff0c;系统化的编程视频教程上线&#xff0c;中文编程开发语言工具构件之缩放控制面板构件用法 编程入门视频教程链接 https://edu.csdn.net/course/detail/39036 编程工具及实例源码文件下载可以点击最下方官网卡片——软件下载—…

mysql事务锁

Lock - 事务锁 与 latch 的区别 lock对象是事务&#xff0c;用来锁定的是数据库中的对象&#xff0c;如表、行、页。并且一般lock的对象仅在事务commit或rollback后进行释放&#xff08;不同事务隔离级别释放的时间可能不同&#xff09;。此外&#xff0c;lock&#xff0c;正…

3.0 Zookeeper linux 服务端集群搭建步骤

本章节将示范三台 zookeeper 服务端集群搭建步骤。 所需准备工作&#xff0c;创建三台虚拟机环境并安装好 java 开发工具包 JDK&#xff0c;可以使用 VM 或者 vagrantvirtualbox 搭建 centos/ubuntu 环境&#xff0c;本案例基于宿主机 windows10 系统同时使用 vagrantvirtualb…

发送get请求并且发送请求头(header),java实现

发送get请求时&#xff0c;发送请求头&#xff08;Header&#xff09;中的内容 方便第二次调用其他url时传递参数&#xff0c;例如userCode或者租户编码 调用方式 Autowired private HttpServletRequest request;先注入HttpServletRequestpublic xxx xxx(){String url &quo…

docker程序镜像的制作

目录 一、每种资源的预安装&#xff08;基础&#xff09; 安装 nginx安装 redis 二、dockerfile文件制作&#xff08;基础&#xff09; 打包 redis 镜像 创建镜像制作空间制作dockerfile 打包 nginx 镜像 三、创建组合镜像&#xff08;方式一&#xff09; 生成centos容器并…

vue3+echarts:Vue中使用echarts从后端获取数据并赋值显示

//由于前后端交互,所以使用axios发送请求 const Count ref(null); //设备种类数值 const Name ref(null); //设备种类名称 //设备种类 饼图 const pieChart () > {const getpieChart echarts.init(document.getElementById("deviceKind"));// 创建图标getpieC…

位置内插 PI:基于Positional Interpolation扩大模型的上下文窗口

位置内插 PI&#xff1a;基于Positional Interpolation扩大模型的上下文窗口 如何在不牺牲性能或从头训练的情况下&#xff0c;扩展大型语言模型的上下文窗口以处理长文档或长对话&#xff1f; 论文&#xff1a;https://arxiv.org/pdf/2306.15595.pdf 这篇论文介绍了一种名为位…

C++ JSON解析

JSON解析 JSONCPPC实现JSON解析器 JSONCPP JSONCPP源码链接&#xff1a;https://github.com/open-source-parsers/jsoncpp JSOCPP源码下载以后&#xff0c;首先复制一份include文件夹下的json文件夹&#xff0c;头文件留着后续备用。 使用Cmake生成项目。在IDE中编译jsoncpp_…

【Nicn的刷题日常】之打印整数二进制的奇数位和偶数位

目录 1.题目描述 2.解题思路 3.解题 1.题目描述 获取一个整数二进制序列中所有的偶数位和奇数位&#xff0c;分别打印出二进制序列 2.解题思路 1. 提取所有的奇数位&#xff0c;如果该位是1&#xff0c;输出1&#xff0c;是0则输出0 2. 以同样的方式提取偶数位置检测n…

CGAL-3D 凸包算法

3D 凸包算法 一、概述二、静态凸包构造1. Traits 特征类2. 极端点3. 半空间相交4. 凸性检验 三、动态凸包构造四、性能 一、概述 一个点集 S∈R3 是凸的&#xff0c;如果对于任意两点 p 和 q 在集合中&#xff0c;具有端点的线段 p 和 q 包含在 S。集合的凸包 P 包含点集 S 的最…

GADM 4.1 全球国家行政区划下载

扫描文末二维码&#xff0c;关注微信公众号&#xff1a;ThsPool 后台回复g004&#xff0c;领取最新 GADM 4.1 全球国家行政区划 GADM概述 GADM&#xff0c;全称 Database of Global Administrative Areas&#xff0c;是一个开放获取的全球行政区划数据库&#xff0c;包含各国、…

APIfox编排自动化测试场景(一)

测试场景用于将多个接口请求与实际可能发生的一些特殊情况&#xff08;如条件判断、循环&#xff09;有序的组合在一起&#xff0c;来模拟一个真实业务流程&#xff0c;组成自动化测试单元。 新建目录 / 测试场景​ 打开 Apifox 后点击左侧菜单栏中的“自动化测试”&#xff…

基于Vue的移动端UI框架整理

一、Vant 官方地址&#xff1a;https://youzan.github.io/vant/#/zh-CN/ 简介&#xff1a;有赞公司开发。 特性&#xff1a;60 高质量组件、90% 单元测试覆盖率、完善的中英文文档和示例、支持按需引入、支持主题定制、支持国际化、支持 TS、支持 SSR。 特别说明&#xff1…

机器学习---概率图模型(隐马尔可夫模型、马尔可夫随机场、条件随机场)

1. 隐马尔可夫模型 机器学习最重要的任务是根据已观察到的证据&#xff08;例如训练样本&#xff09;对感兴趣的未知变量&#xff08;例如类别标 记&#xff09;进行估计和推测。概率模型&#xff08;probabilistic model&#xff09;提供了一种描述框架&#xff0c;将描述任…