2.Zookeeper集成springboot操作节点,事件监听,分布式锁实现

1.Springboot项目中添加zookeeper 已经对应的客户端依赖 ,pom.xml文件如下

 <!-- Zookeeper组件 --><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.9.1</version></dependency><!-- 包含Curator组件 --><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-zookeeper</artifactId><version>6.2.2</version></dependency>

2.application.yml 文件中配置zookeeper连接的相关配置信息

zookeeper:#服务器地址connectString: 127.0.0.1:2181#会话超时时间sessionTimeoutMs: 3000#连接超时时间connectionTimeoutMs: 60000#最大重试次数maxRetries: 3#初始休眠时间baseSleepTimeMs: 1000

3.java配置的方式添加zookeeper相关的配置

package com.jinyi.up.zk.config;import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import javax.annotation.PostConstruct;/*** @author huangchong* @date 2024/3/5 20:48* @desc*/
@Slf4j
@Configuration
public class ZookeeperConfig {@Value("${zookeeper.connectString}")private String connectString;@Value("${zookeeper.baseSleepTimeMs}")private int baseSleepTimeMs;@Value("${zookeeper.maxRetries}")private int maxRetries ;@Value("${zookeeper.connectionTimeoutMs}")int connectionTimeoutMs ;@Value("${zookeeper.sessionTimeoutMs}")int sessionTimeoutMs ;private static CuratorFramework client = null ;/*** 初始化*/@PostConstructpublic void init (){// 重试策略RetryPolicy policy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);//通过工厂创建Curatorclient = CuratorFrameworkFactory.builder().connectString(connectString).connectionTimeoutMs(connectionTimeoutMs).sessionTimeoutMs(sessionTimeoutMs).retryPolicy(policy).build();//开启连接client.start();log.info("zookeeper 初始化完成...");}@Beanpublic CuratorFramework getClient (){return client ;}/*** 分布式锁bean 注入spring管理中*/@Beanpublic InterProcessMutex distributedLock() throws Exception {//使用了Curator提供的InterProcessMutex来创建一个分布式锁。我们使用ZooKeeper的路径/lock来表示锁的资源。InterProcessMutex distributedLock = new InterProcessMutex(client, "/lock");return distributedLock;}
}

4.Zookeeper基础操作服务和分布式锁服务编码

package com.jinyi.up.client.service;import com.jinyi.up.zk.process.AbstractListenerProcess;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.springframework.stereotype.Service;import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;/*** @author huangchong* @date 2024/3/5 21:39* @desc*/
@Slf4j
@Service
public class ZookeeperService {@Resourceprivate CuratorFramework client;/*** 查询节点数据** @param nodePath 节点* @return {@link String}*/public String queryData(String nodePath) {try {Stat stat = client.checkExists().forPath(nodePath);if (stat != null) {byte[] bytes = client.getData().forPath(nodePath);return new String(bytes, StandardCharsets.UTF_8);}return null;} catch (Exception e) {log.error("查询节点数据失败:", e);return null;}}/*** 创建节点** @param mode     节点类型* @param nodePath 节点路径* @param nodeData 节点数据* @return {@link String}*/public String create(CreateMode mode, String nodePath, String nodeData) {try {Stat stat = client.checkExists().forPath(nodePath);if (stat == null) {return client.create().withMode(mode).forPath(nodePath, nodeData.getBytes());} else {return null;}} catch (Exception e) {log.error("创建节点失败:", e);return null;}}/*** 更新节点数据** @param nodePath 节点路径* @param nodeData 节点数据* @return {@link Stat}*/public boolean update(String nodePath, String nodeData) {try {Stat stat = client.checkExists().forPath(nodePath);if (stat != null) {stat = client.setData().forPath(nodePath, nodeData.getBytes());}return stat != null;} catch (Exception e) {log.error("更新节点失败:", e);return false;}}/*** 删除节点** @param nodePath v* @return {@link boolean}*/public boolean delete(String nodePath) {try {Stat stat = client.checkExists().forPath(nodePath);if (stat != null) {client.delete().forPath(nodePath);}return true;} catch (Exception e) {log.error("删除节点失败:", e);return false;}}/*** 监听一个节点** @param nodePath 被监听节点路径* @return {@link }*/public boolean addWatchNodeListener(String nodePath) {CuratorCache curatorCache = CuratorCache.builder(client, nodePath).build();CuratorCacheListener listener = CuratorCacheListener.builder().forNodeCache(new NodeCacheListener() {@Overridepublic void nodeChanged() throws Exception {log.info("监听到节点变动");//TODO}}).build();curatorCache.listenable().addListener(listener);curatorCache.start();return true;}/*** 监听子孙节点 支持子节点的子节点监听* TreeCache监听节点自己和所有子节点们** @param nodePath  被监听节点路径* @param processer 监听后处理* @return {@link }*/public boolean addWatchTreeListener(String nodePath, AbstractListenerProcess processer) {CuratorCache curatorCache = CuratorCache.builder(client, nodePath).build();CuratorCacheListener listener = CuratorCacheListener.builder().forTreeCache(client, new TreeCacheListener() {@Overridepublic void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) {log.info("监听到子节点变动,变动类型:{}", treeCacheEvent.getType().toString());processer.process(curatorFramework, treeCacheEvent);}}).build();curatorCache.listenable().addListener(listener);curatorCache.start();return true;}
}
package com.jinyi.up.zk.service;import com.jinyi.up.zk.process.AbstractListenerProcess;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.springframework.stereotype.Service;import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;/*** @author huangchong* @date 2024/3/5 21:39* @desc*/
@Slf4j
@Service
public class ZookeeperLockService {@Resourceprivate InterProcessMutex distributedLock;public void doProtectedOperation() throws Exception {//acquire()方法获取锁distributedLock.acquire();try {// 执行需要保护的代码块} finally {distributedLock.release();}}
}

5.watcher机制事件处理抽象封装

package com.jinyi.up.zk.process;import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;/*** @author huangchong* @date 2024/3/5 21:58* @desc*/
public abstract class AbstractListenerProcess {/*** 处理监听节点自己和所有子节点们变更事件** @param client       zk客户端* @param event 子节点事件* @return {@link }*/public abstract void process(CuratorFramework client, TreeCacheEvent event);
}
package com.jinyi.up.zk.process;import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;/*** @author huangchong* @date 2024/3/5 21:58* @desc*/
@Slf4j
public  class WatcherTreeListenerProcess extends AbstractListenerProcess{/*** 实际处理监听节点自己和所有子节点们变更事件** @param client zk客户端* @param event 子节点事件* @return {@link }*/@Overridepublic void process(CuratorFramework client, TreeCacheEvent event) {//事件pathString path = event.getData().getPath();switch (event.getType()) {case NODE_ADDED:log.info("新增子节点:" + path);break;case NODE_UPDATED:log.info("更新子节点:" + path);break;case NODE_REMOVED:log.info("删除子节点:" + path);break;default:break;}}
}

6.基本操作的单元测试代码

package com.jinyi.zookeeper;import com.jinyi.up.zk.ZookeeperApplication;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
///此处classes内的内容是@SpringBootApplication入口
@SpringBootTest(classes = {ZookeeperApplication.class})
public abstract class BaseZkBootTest {
}
package com.jinyi.zookeeper;import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.junit.Test;import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;/*** @author huangchong* @date 2024/3/5 21:00* @desc*/
@Slf4j
public class ZookeeperBaseTest extends BaseZkBootTest {@Resourceprivate CuratorFramework client;@Testpublic void testAddPersistentNode() throws Exception {// 创建一个持久化节点/persistent_node,断开连接时不会自动删除client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/persistent_node");}@Testpublic void testZnodeExists() throws Exception {// 判断节点是否存在,persistent_node2不存在所以stat2是nullStat stat = client.checkExists().forPath("/persistent_node");log.info(String.valueOf(stat));Stat stat2 = client.checkExists().forPath("/persistent_node2");log.info(String.valueOf(stat2));}@Testpublic void testSetData() throws Exception {// 设置节点数据client.setData().forPath("/persistent_node", "persistent_node_data".getBytes(StandardCharsets.UTF_8));}@Testpublic void testCreateAndSet() throws Exception {// 创建一个持久化节点并设置节点数据client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("//persistent_node1", "persistent_node_data1".getBytes(StandardCharsets.UTF_8));}@Testpublic void testGetData() throws Exception {// 查询节点数据byte[] data = client.getData().forPath("/persistent_node1");log.info(new String(data, StandardCharsets.UTF_8));}@Testpublic void testDelete() throws Exception {// 删除节点client.delete().guaranteed().deletingChildrenIfNeeded().forPath("/persistent_node1");}@Testpublic void testReadLock() throws Exception {// 读写锁-读InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock-read");lock.readLock().acquire();log.info("获取-ReadLock");lock.readLock().release();}@Testpublic void testWriteLock() throws Exception {// 读写锁-写InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock-write");lock.writeLock().acquire();log.info("获取-WriteLock");lock.writeLock().release();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/722456.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【C++】6-8 评委打分 分数 10

6-8 评委打分 分数 10 全屏浏览 切换布局 作者 刘利 单位 惠州学院 某诗歌朗诵比赛&#xff0c;有n位评委给参赛者打分&#xff0c;计算总分时要去除最高分和对低分。 要求&#xff1a;编写名为cmax和cmin的函数分别返回最高分的和最低分元素的引用&#xff0c;带有2个形参…

leetcode面试经典算法题——1

链接&#xff1a;https://leetcode.cn/studyplan/top-interview-150/ 392. 判断子序列 给定字符串 s 和 t &#xff0c;判断 s 是否为 t 的子序列。 字符串的一个子序列是原始字符串删除一些&#xff08;也可以不删除&#xff09;字符而不改变剩余字符相对位置形成的新字符串…

292.【华为OD机试】跳马问题(广度优先搜索(BFS)JavaPythonC++JS实现)

🚀点击这里可直接跳转到本专栏,可查阅顶置最新的华为OD机试宝典~ 本专栏所有题目均包含优质解题思路,高质量解题代码(Java&Python&C++&JS分别实现),详细代码讲解,助你深入学习,深度掌握! 文章目录 一. 题目二.解题思路三.题解代码Python题解代码JAVA题解…

分布式事务(SeataServer)

SeataServer搭建 Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。AT模式是阿里首推的模式,阿里云上有商用版本的GTS(Global Transaction Servi…

JavaScript 闭包 作用域

闭包 JavaScript 中的闭包是相当重要的概念并且与作用域相关知识的指向密切相关 JavaScript 中的作用域是什么意思?闭包会在哪些场景中使用?通过定时器循环输出自增的数字通过 JS 的代码如何实现? 闭包概念一 闭包是指有权访问另外一个函数作用域中的变量的函数。 闭包…

自我对比: 通过不一致的解决视角更好地进行反思

一、写作动机&#xff1a; LLM 在自我评价时往往过于自信或随意性较大&#xff0c;提供的反馈固执或不一致&#xff0c;从而导致反思效果不佳。为了解决这个问题&#xff0c;作者提倡 "自我对比"&#xff1a; 它可以根据要求探索不同的解决角度&#xff0c;对比差异…

ChatGPT如何辅助医生改善AD患者教育的效果

特应性皮炎&#xff08;AD&#xff09;是一种常见的慢性炎症性皮肤病&#xff0c;在全球范围内造成了巨大的疾病负担。尽管在治疗方面取得了一定进展&#xff0c;但AD患者的生活质量较低&#xff0c;治疗满意度差&#xff0c;超过一半的患者认为中度至重度AD疾病控制不佳。AD的…

YOLO快速入门

Yolo简介 概述 YOLO&#xff08;You Only Look Once&#xff09;是一种流行的目标检测算法&#xff0c;由Joseph Redmon等人开发。 YOLO算法以其高效的实时性能和准确的检测能力而闻名。自YOLO的首次提出以来&#xff0c;已经经 历了多个版本的更新和改进。以下是YOLO发展史的…

周边类-找厕所小程序源码

源码获取方式 1&#xff0c;搜一搜 万能工具箱合集 点击资料库 即可进去获取 找厕所小程序源码依赖于腾讯地图的一款源码&#xff0c;腾讯地图api免费申请&#xff0c;是一款免费又永久的不需要服务器的小程序&#xff0c;起个好名字蹭蹭蹭~ 搭建教程&#xff1a; 1、下载源码…

使用css的transition属性实现抽屉功能

需求 使用css手写一个抽屉&#xff0c;并且不能遮挡住原来的页面 效果&#xff1a;&#xff08;录的gif有点卡&#xff0c;实际情况很丝滑&#xff09; 实现代码&#xff1a; <template><div class"dashboard-container"><div class"mainBox&…

Java项目:36 springboot图书个性化推荐系统的设计与实现003

作者主页&#xff1a;源码空间codegym 简介&#xff1a;Java领域优质创作者、Java项目、学习资料、技术互助 文中获取源码 项目介绍 springboot003图书个性化推荐系统的设计与实现 管理员&#xff1a;首页、个人中心、学生管理、图书分类管理、图书信息管理、图书预约管理、退…

[element]element-ui框架下载

⭐作者介绍&#xff1a;大二本科网络工程专业在读&#xff0c;持续学习Java&#xff0c;努力输出优质文章 ⭐作者主页&#xff1a;逐梦苍穹 ⭐如果觉得文章写的不错&#xff0c;欢迎点个关注一键三连&#x1f609;有写的不好的地方也欢迎指正&#xff0c;一同进步&#x1f601;…

CSS中画一条0.5px的线

采用transform: scale()的方式&#xff0c;该方法用来定义元素的2D 缩放转换&#xff1a; transform: scale(0.5,0.5); 采用meta viewport的方式 <meta name"viewport" content"widthdevice-width, initial-scale0.5, minimum-scale0.5, maximum-scale0.5…

基于Springboot的足球俱乐部管理系统(有报告)。Javaee项目,springboot项目。

演示视频&#xff1a; 基于Springboot的足球俱乐部管理系统&#xff08;有报告&#xff09;。Javaee项目&#xff0c;springboot项目。 项目介绍: 采用M&#xff08;model&#xff09;V&#xff08;view&#xff09;C&#xff08;controller&#xff09;三层体系结构&#xff…

【Datawhale组队学习:Sora原理与技术实战】Attention和LLM

Attention Attention 注意力&#xff0c;从两个不同的主体开始。 论文&#xff1a;https://arxiv.org/pdf/1703.03906.pdf seq2seq代码仓&#xff1a;https://github.com/google/seq2seq 计算方法&#xff1a; 加性Attention&#xff0c;如&#xff08;Bahdanau attention&…

数据库-ODBC操作

承接Qt/C软件开发项目&#xff0c;高质量交付&#xff0c;灵活沟通&#xff0c;长期维护支持。需求所寻&#xff0c;技术正适&#xff0c;共创完美&#xff0c;欢迎私信联系&#xff01; 一、ODBC 数据源配置 打开ODBC数据源管理器&#xff1a; 在Windows搜索栏中键入“ODBC数…

Flink hello world

下载并且解压Flink Downloads | Apache Flink 启动Flink. $ ./bin/start-cluster.sh Starting cluster. Starting standalonesession daemon on host DESKTOP-T4TU7JE. Starting taskexecutor daemon on host DESKTOP-T4TU7JE. Flink 的版本附带了许多示例作业。您可以快速将…

PyTorch搭建LeNet神经网络

函数的参数 1、PyTorch Tensor的通道排序 [batch, channel, height, width] batch: 要处理的一批图像的个数 channel: 通道数&#xff08;一般是R G B 三个通道&#xff09; height: 图像的高度 width: 图像的宽度 2.Conv 2d 卷积层的参数 [in_channels, out_channels, ke…

Golang 开发实战day01 - Variable String Numeric

Golang 教程01 - Variable String Numeric 1. Go语言的重要性 Go语言&#xff0c;又称Golang&#xff0c;是一种由Google开发的静态编译型编程语言。它于2009年首次发布&#xff0c;并在短短几年内迅速流行起来。Go语言具有以下特点&#xff1a; 语法简单易学&#xff1a;Go…

【牛客】SQL137 第二快/慢用时之差大于试卷时长一半的试卷-窗口函数

描述 现有试卷信息表examination_info&#xff08;exam_id试卷ID, tag试卷类别, difficulty试卷难度, duration考试时长, release_time发布时间&#xff09;&#xff1a; idexam_idtagdifficultydurationrelease_time19001SQLhard602021-09-01 06:00:0029002Chard602021-09-0…