大数据Zookeeper--案例

文章目录

  • 服务器动态上下线监听案例
    • 需求
    • 需求分析
    • 具体实现
    • 测试
  • Zookeeper分布式锁案例
    • 原生Zookeeper实现分布式锁
    • Curator框架实现分布式锁
  • Zookeeper面试重点
    • 选举机制
    • 生产集群安装多少zk合适
    • zk常用命令

服务器动态上下线监听案例

需求

某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知
到主节点服务器的上下线。

需求分析

服务器动态上下线

具体实现

1)先在集群上创建/servers节点

[zk: localhost:2181(CONNECTED) 10] create /servers "servers" 
Created /servers

2)在Idea中创建包名:com.yudan.case1

3)服务器端向Zookeeper注册代码

import org.apache.zookeeper.*;import java.io.IOException;public class DistributeServer {private String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";private int sessionTime = 100000;private ZooKeeper zk;public static void main(String[] args) throws IOException, InterruptedException, KeeperException {DistributeServer server = new DistributeServer();// 1、获取zk连接server.getConnect();// 2、注册服务器到zk集群server.regist(args[0]);// 3、启动 业务逻辑(睡觉)server.business();}// 创建到 zk 的客户端连接private void getConnect() throws IOException {zk = new ZooKeeper(connectString, sessionTime, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {}});}// 注册到服务器private void regist(String hostname) throws InterruptedException, KeeperException {String create = zk.create("/servers/"+hostname, hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);System.out.println(hostname + " " + "is online");}// 业务功能private void business() throws InterruptedException {Thread.sleep(Long.MAX_VALUE);}
}

4)客户端代码

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;public class DistributeClient {private String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";private int sessionTime = 100000;private ZooKeeper zk;public static void main(String[] args) throws IOException, InterruptedException, KeeperException {DistributeClient client = new DistributeClient();// 1、获取zk连接client.getConnect();// 2、监听/servers下面子节点的增加和删除client.getServersList();// 3、业务逻辑(睡觉)client.business();}// 创建到 zk 的客户端连接private void getConnect() throws IOException {zk = new ZooKeeper(connectString, sessionTime, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {// 再次启动监听try {getServersList();} catch (InterruptedException e) {throw new RuntimeException(e);} catch (KeeperException e) {throw new RuntimeException(e);}}});}// 获取服务器列表信息private void getServersList() throws InterruptedException, KeeperException {// 获取服务器子节点信息,并且对父节点进行监听List<String> children = zk.getChildren("/servers", true);// 存储服务器信息列表 ArrayList<String> servers = new ArrayList<>();// 遍历所有节点,获取节点中的主机名称信息 for (String child : children) {byte[] data = zk.getData("/servers/" + child, false, null);servers.add(new String(data));}// 打印System.out.println(servers);}private void business() throws InterruptedException {Thread.sleep(Long.MAX_VALUE);}
}

测试

1)在Linux命令行上操作增加减少服务器

(1)启动DistributeClient 客户端

(2)在hadoop102上zk的客户端/servers目录上创建临时带序号节点

[zk: localhost:2181(CONNECTED) 1]  create -e -s /servers/hadoop102 "hadoop102" 
[zk: localhost:2181(CONNECTED) 2]  create -e -s /servers/hadoop103 "hadoop103"

(3)观察Idea控制台变化

[hadoop102, hadoop103]

(4)执行删除操作

[zk: localhost:2181(CONNECTED) 8]  delete /servers/hadoop1020000000000 

(5)观察Idea控制台变化

[hadoop103] 

2)在Idea上操作增加减少服务器

(1)启动DistributeClient 客户端(如果已经启动过,不需要重启)

(2)启动DistributeServer 服务

  • 点击Edit Configurations…
    在这里插入图片描述
  • 在弹出的窗口中(Program arguments)输入想启动的主机,例如,hadoop102
    在这里插入图片描述
  • 回到DistributeServer的main方法,右键,在弹出的窗口中点击Run “DistributeServer.main()”
  • 观察DistributeServer控制台,提示hadoop102 is online
  • 观察DistributeClient控制台,提示hadoop102已经上线

Zookeeper分布式锁案例

什么叫做分布式锁呢?

比如说"进程1"在使用该资源的时候,会先去获得锁,"进程1"获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源,"进程1"用完该资源以后就将锁释放掉,让其他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的访问该临界资源。那么我们把这个分布式环境下的这个锁叫作分布式锁。
分布式锁

原生Zookeeper实现分布式锁

1)分布式锁实现

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;public class DistributeLock {private final String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";private final int sessionTime = 100000;private final ZooKeeper zk;// 当前client等待的子节点private String waitPath;// zookeeper节点等待private CountDownLatch waitLatch = new CountDownLatch(1);// zookeeper连接private CountDownLatch connectLatch = new CountDownLatch(1);// 当前client创建的子节点private String currentMode;// 和 zk 服务建立连接,并创建根节点public DistributeLock() throws IOException, InterruptedException, KeeperException {// 1、获取连接zk = new ZooKeeper(connectString, sessionTime, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {// connectLatch 如果连接上zk 可以释放// 连接建立时, 打开latch, 唤醒wait在该latch上的线程if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {connectLatch.countDown();}// waitLatch 需要释放// 发生了waitPath的删除事件if (watchedEvent.getType() == Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)) {waitLatch.countDown();}}});// 等待 zookeeper正常连接后,往下走程序connectLatch.await();// 2、判断根节点/locks是否存在Stat stat = zk.exists("/locks", false);if (stat == null) {// 创建一下根节点zk.create("/locks","locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);}}// 对zk加锁public void zkLock() {// 创建对应的临时带序号节点try {currentMode = zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);// 判断创建的节点是否是最小的序号节点,如果是获取到锁;如果不是, 监听他序号的前一个节点List<String> children = zk.getChildren("/locks", false);// 如果children 只有一个值,那就直接获取锁;如果有多个节点,需要判断,哪个节点最小if (children.size() == 1) {return;} else {// 对children集合内的节点进行排序Collections.sort(children);// 获取节点名称 seq-String thisNode = currentMode.substring("/locks/".length());// 通过seq- 获取到该节点在children集合中的位置int index = children.indexOf(thisNode);// 判断if (index == -1) {System.out.println("数据异常");} else if (index == 0) {// 就一个节点,可以获取锁了return;}else {// 需要监听前一个节点waitPath = "/locks/" + children.get(index-1);zk.getData(waitPath,true,null);// 等待监听waitLatch.await();return;}}} catch (KeeperException e) {throw new RuntimeException(e);} catch (InterruptedException e) {throw new RuntimeException(e);}}// 对zk解锁public void unzkLock() {// 删除节点try {zk.delete(currentMode,-1);} catch (InterruptedException e) {throw new RuntimeException(e);} catch (KeeperException e) {throw new RuntimeException(e);}}
}

2)分布式锁测试

(1)创建两个线程

import org.apache.zookeeper.KeeperException;import java.io.IOException;public class DistributeLockTest {public static void main(String[] args) throws IOException, InterruptedException, KeeperException {// 创建分布式锁1final DistributeLock lock1 = new DistributeLock();// 创建分布式锁2final DistributeLock lock2 = new DistributeLock();new Thread(new Runnable() {@Overridepublic void run() {// 获取锁对象try {lock1.zkLock();System.out.println("线程1 启动,获取到锁");Thread.sleep(5 * 1000);lock1.unzkLock();System.out.println("线程1 释放锁");} catch (InterruptedException e) {throw new RuntimeException(e);}}}).start();new Thread(new Runnable() {@Overridepublic void run() {// 获取锁对象try {lock2.zkLock();System.out.println("线程2 启动,获取到锁");Thread.sleep(5 * 1000);lock2.unzkLock();System.out.println("线程2 释放锁");} catch (InterruptedException e) {throw new RuntimeException(e);}}}).start();}
}

(2)观察控制台变化

线程1获取锁 
线程1释放锁 
线程2获取锁 
线程2释放锁

Curator框架实现分布式锁

1)原生的Java API开发存在的问题

(1)会话连接是异步的,需要自己去处理。比如使用CountDownLatch

(2)Watch需要重复注册,不然就不能生效

(3)开发的复杂性还是比较高的

(4)不支持多节点删除和创建。需要自己去递归

2)Curator是一个专门解决分布式锁的框架,解决了原生Java API开发分布式遇到的问题。

详情请查看官方文档:https://curator.apache.org/index.html

3)Curator 案例实操

(1)添加依赖

<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.3.0</version> 
</dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.3.0</version> 
</dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-client</artifactId> <version>4.3.0</version> 
</dependency> 

(2)代码实现

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;public class CuratorLockTest {public static void main(String[] args) {// 创建分布式锁1InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");// 创建分布式锁2InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");new Thread(new Runnable() {@Overridepublic void run() {try {lock1.acquire();System.out.println("线程1 获取到锁");lock1.acquire();System.out.println("线程1 再次获取到锁");Thread.sleep(5 * 1000);lock1.release();System.out.println("线程1 释放锁");lock1.release();System.out.println("线程1 再次释放锁");} catch (Exception e) {throw new RuntimeException(e);}}}).start();new Thread(new Runnable() {@Overridepublic void run() {try {lock2.acquire();System.out.println("线程2 获取到锁");lock2.acquire();System.out.println("线程2 再次获取到锁");Thread.sleep(5 * 1000);lock2.release();System.out.println("线程2 释放锁");lock2.release();System.out.println("线程2 再次释放锁");} catch (Exception e) {throw new RuntimeException(e);}}}).start();}// 分布式锁初始化private static CuratorFramework getCuratorFramework() {// 重试策略,初始时间3秒,重试3次ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);CuratorFramework client = CuratorFrameworkFactory.builder().connectString("hadoop102:2181,hadoop103:2181,hadoop104:2181").connectionTimeoutMs(100000).sessionTimeoutMs(100000).retryPolicy(policy).build();// 启动客户端client.start();System.out.println("zookeeper 启动成功!");return client;}
}

(2)观察控制台变化:

线程1获取锁 
线程1再次获取锁 
线程1释放锁 
线程1再次释放锁 
线程2获取锁 
线程2再次获取锁 
线程2释放锁 
线程2再次释放锁

Zookeeper面试重点

选举机制

半数机制,超过半数的投票通过,即通过。

(1)第一次启动选举规则:

投票过半数时,服务器id大的胜出

(2)第二次启动选举规则:

①EPOCH大的直接胜出

②EPOCH相同,事务id大的胜出

③事务id相同,服务器id大的胜出

生产集群安装多少zk合适

安装奇数台。

生产经验:

  • 10台服务器:3台zk;
  • 20台服务器:5台zk;
  • 100台服务器:11台zk;
  • 200台服务器:11台zk

zk常用命令

ls、get、create、delete

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/670370.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CentOS 8 安装配置 Hadoop3.3.6 伪分布式安装方式(适用于开发和调试)

1.配置服务器ssh免密登录&#xff0c;否则后面启动会报错&#xff1a;尝试通过SSH连接到主机出现认证错误的提示 配置服务器ssh免密登录&#xff1a; 1.生成SSH密钥对&#xff08;如果尚未生成&#xff09;&#xff1a; 执行下面的命令生成密钥对&#xff0c;一直回车即可 ssh…

为后端做准备

这里写目录标题 flask 文件上传与接收flask应答&#xff08;接收请求&#xff08;文件、数据&#xff09;flask请求&#xff08;上传文件&#xff09;传递参数和文件 argparse 不从命令行调用参数1、设置default值2、"从命令行传入的参数".split()3、[--input,内容] …

2024年华为OD机试真题-数组去重和排序-Python-OD统一考试(C卷)

题目描述: 给定一个乱序的数组,删除所有的重复元素,使得每个元素只出现一次,并且按照出现的次 数从高到低进行排序,相同出现次数按照第一次出现顺序进行先后排序。 输入描述: 一个数组 输出描述: 去重排序后的数组 补充说明: 数组大小不超过100 数组元素值大小不超过10…

代码随想录day18--二叉树的应用6

LeetCode530.二叉搜索树的最小绝对差值 题目描述&#xff1a; 给你一个二叉搜索树的根节点 root &#xff0c;返回 树中任意两不同节点值之间的最小差值 。 差值是一个正数&#xff0c;其数值等于两值之差的绝对值。 示例 1&#xff1a; 输入&#xff1a;root [4,2,6,1,3] …

CSDN2024年我的创作纪念日1024天|不忘初心|努力上进|积极向前

CSDN2024年我的创作纪念日1024天| 学习成长机遇&#xff1a;学习成长收获&#xff1a;2023年度总结数据&#xff1a;2024新领域的探索&#xff1a;日常和自己的感慨&#xff1a;2024憧憬和规划&#xff1a;创作纪念日总结&#xff1a; 学习成长机遇&#xff1a; 大家好&#x…

SpringBoot-基础篇03

之前搭建了整个开发环境实现了登录注册&#xff0c;springBoot整合mybatis完成增删改查&#xff0c;今天完成分页查询&#xff0c;使用阿里云oss存储照片等资源&#xff0c;后期会尝试自己搭建分布式文件系统来实现。 一&#xff0c;SpringBootMybatis完成分页查询 1&#xff…

天线阵列车载应用——第1章 介绍 1.1节 汽车工业中的天线阵列:应用和频率范围

1.1 汽车工业中的天线阵列:应用和频率范围 无线通信系统的发展需要新的技术来支持更高质量的通信、新的服务和应用。近年来&#xff0c;汽车无线通信市场得到了极大的扩展。现代汽车使用不同的服务:AM/FM收音机、卫星广播(SDARS)、移动电话通信、数字音频广播(DAB)、远程无钥匙…

零基础学编程从入门到精通,系统化的编程视频教程上线,中文编程开发语言工具构件之缩放控制面板构件用法

一、前言 零基础学编程从入门到精通&#xff0c;系统化的编程视频教程上线&#xff0c;中文编程开发语言工具构件之缩放控制面板构件用法 编程入门视频教程链接 https://edu.csdn.net/course/detail/39036 编程工具及实例源码文件下载可以点击最下方官网卡片——软件下载—…

监控室脱岗检测系统-人员脱岗监测报警方案---豌豆云

人员脱岗检测算法自动识别比如保安值班室,监控室中的人员离岗行为,并自动告警给管理人员,约束了工作人员擅自离岗行为。 人员脱岗检测,对违规动作/危险行为/行为规范做精确识别,打造人员脱岗检测,将视频图像智能识别系统应用在企业日常运营管理中,降低生产成本。 应用场景&am…

mysql事务锁

Lock - 事务锁 与 latch 的区别 lock对象是事务&#xff0c;用来锁定的是数据库中的对象&#xff0c;如表、行、页。并且一般lock的对象仅在事务commit或rollback后进行释放&#xff08;不同事务隔离级别释放的时间可能不同&#xff09;。此外&#xff0c;lock&#xff0c;正…

3.0 Zookeeper linux 服务端集群搭建步骤

本章节将示范三台 zookeeper 服务端集群搭建步骤。 所需准备工作&#xff0c;创建三台虚拟机环境并安装好 java 开发工具包 JDK&#xff0c;可以使用 VM 或者 vagrantvirtualbox 搭建 centos/ubuntu 环境&#xff0c;本案例基于宿主机 windows10 系统同时使用 vagrantvirtualb…

发送get请求并且发送请求头(header),java实现

发送get请求时&#xff0c;发送请求头&#xff08;Header&#xff09;中的内容 方便第二次调用其他url时传递参数&#xff0c;例如userCode或者租户编码 调用方式 Autowired private HttpServletRequest request;先注入HttpServletRequestpublic xxx xxx(){String url &quo…

docker程序镜像的制作

目录 一、每种资源的预安装&#xff08;基础&#xff09; 安装 nginx安装 redis 二、dockerfile文件制作&#xff08;基础&#xff09; 打包 redis 镜像 创建镜像制作空间制作dockerfile 打包 nginx 镜像 三、创建组合镜像&#xff08;方式一&#xff09; 生成centos容器并…

vue3+echarts:Vue中使用echarts从后端获取数据并赋值显示

//由于前后端交互,所以使用axios发送请求 const Count ref(null); //设备种类数值 const Name ref(null); //设备种类名称 //设备种类 饼图 const pieChart () > {const getpieChart echarts.init(document.getElementById("deviceKind"));// 创建图标getpieC…

位置内插 PI:基于Positional Interpolation扩大模型的上下文窗口

位置内插 PI&#xff1a;基于Positional Interpolation扩大模型的上下文窗口 如何在不牺牲性能或从头训练的情况下&#xff0c;扩展大型语言模型的上下文窗口以处理长文档或长对话&#xff1f; 论文&#xff1a;https://arxiv.org/pdf/2306.15595.pdf 这篇论文介绍了一种名为位…

详解Python3的垃圾回收机制

Python的垃圾回收机制主要包括两个部分&#xff1a;引用计数和循环引用检测。 引用计数法 内部采用 引用计数法&#xff0c;为每个对象维护引用次数&#xff0c;并据此回收不在需要的垃圾对象。 由于引用计数法存在重大缺陷&#xff0c;循环引用时由内存泄露风险&#xff0c…

自动驾驶TPM技术杂谈 ———— Unix常用命令行

文章目录 介绍常用命令 —— A常用命令 —— C常用命令 —— D常用命令 —— E常用命令 —— F常用命令 —— G常用命令 —— H常用命令 —— I常用命令 —— J常用命令 —— K常用命令 —— L常用命令 —— M常用命令 —— N常用命令 —— P常用命令 —— Q常用命令 —— R常用…

Vue3——创建一个应用

文章目录 创建应用实例挂载应用没有模板的组件的挂载 应用配置多个应用实例 其实使用脚手架创建的vue项目的main.js文件中已经为我们配置好 vue应用的创建。 import { createApp } from vue import App from ./App.vue const app createApp(App) app.mount(#app)创建应用实例…

C++ JSON解析

JSON解析 JSONCPPC实现JSON解析器 JSONCPP JSONCPP源码链接&#xff1a;https://github.com/open-source-parsers/jsoncpp JSOCPP源码下载以后&#xff0c;首先复制一份include文件夹下的json文件夹&#xff0c;头文件留着后续备用。 使用Cmake生成项目。在IDE中编译jsoncpp_…

【Nicn的刷题日常】之打印整数二进制的奇数位和偶数位

目录 1.题目描述 2.解题思路 3.解题 1.题目描述 获取一个整数二进制序列中所有的偶数位和奇数位&#xff0c;分别打印出二进制序列 2.解题思路 1. 提取所有的奇数位&#xff0c;如果该位是1&#xff0c;输出1&#xff0c;是0则输出0 2. 以同样的方式提取偶数位置检测n…