1.Springboot项目中添加zookeeper 已经对应的客户端依赖 ,pom.xml文件如下
<!-- Zookeeper组件 --><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.9.1</version></dependency><!-- 包含Curator组件 --><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-zookeeper</artifactId><version>6.2.2</version></dependency>
2.application.yml 文件中配置zookeeper连接的相关配置信息
zookeeper:#服务器地址connectString: 127.0.0.1:2181#会话超时时间sessionTimeoutMs: 3000#连接超时时间connectionTimeoutMs: 60000#最大重试次数maxRetries: 3#初始休眠时间baseSleepTimeMs: 1000
3.java配置的方式添加zookeeper相关的配置
package com.jinyi.up.zk.config;import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import javax.annotation.PostConstruct;/*** @author huangchong* @date 2024/3/5 20:48* @desc*/
@Slf4j
@Configuration
public class ZookeeperConfig {@Value("${zookeeper.connectString}")private String connectString;@Value("${zookeeper.baseSleepTimeMs}")private int baseSleepTimeMs;@Value("${zookeeper.maxRetries}")private int maxRetries ;@Value("${zookeeper.connectionTimeoutMs}")int connectionTimeoutMs ;@Value("${zookeeper.sessionTimeoutMs}")int sessionTimeoutMs ;private static CuratorFramework client = null ;/*** 初始化*/@PostConstructpublic void init (){// 重试策略RetryPolicy policy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);//通过工厂创建Curatorclient = CuratorFrameworkFactory.builder().connectString(connectString).connectionTimeoutMs(connectionTimeoutMs).sessionTimeoutMs(sessionTimeoutMs).retryPolicy(policy).build();//开启连接client.start();log.info("zookeeper 初始化完成...");}@Beanpublic CuratorFramework getClient (){return client ;}/*** 分布式锁bean 注入spring管理中*/@Beanpublic InterProcessMutex distributedLock() throws Exception {//使用了Curator提供的InterProcessMutex来创建一个分布式锁。我们使用ZooKeeper的路径/lock来表示锁的资源。InterProcessMutex distributedLock = new InterProcessMutex(client, "/lock");return distributedLock;}
}
4.Zookeeper基础操作服务和分布式锁服务编码
package com.jinyi.up.client.service;import com.jinyi.up.zk.process.AbstractListenerProcess;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.springframework.stereotype.Service;import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;/*** @author huangchong* @date 2024/3/5 21:39* @desc*/
@Slf4j
@Service
public class ZookeeperService {@Resourceprivate CuratorFramework client;/*** 查询节点数据** @param nodePath 节点* @return {@link String}*/public String queryData(String nodePath) {try {Stat stat = client.checkExists().forPath(nodePath);if (stat != null) {byte[] bytes = client.getData().forPath(nodePath);return new String(bytes, StandardCharsets.UTF_8);}return null;} catch (Exception e) {log.error("查询节点数据失败:", e);return null;}}/*** 创建节点** @param mode 节点类型* @param nodePath 节点路径* @param nodeData 节点数据* @return {@link String}*/public String create(CreateMode mode, String nodePath, String nodeData) {try {Stat stat = client.checkExists().forPath(nodePath);if (stat == null) {return client.create().withMode(mode).forPath(nodePath, nodeData.getBytes());} else {return null;}} catch (Exception e) {log.error("创建节点失败:", e);return null;}}/*** 更新节点数据** @param nodePath 节点路径* @param nodeData 节点数据* @return {@link Stat}*/public boolean update(String nodePath, String nodeData) {try {Stat stat = client.checkExists().forPath(nodePath);if (stat != null) {stat = client.setData().forPath(nodePath, nodeData.getBytes());}return stat != null;} catch (Exception e) {log.error("更新节点失败:", e);return false;}}/*** 删除节点** @param nodePath v* @return {@link boolean}*/public boolean delete(String nodePath) {try {Stat stat = client.checkExists().forPath(nodePath);if (stat != null) {client.delete().forPath(nodePath);}return true;} catch (Exception e) {log.error("删除节点失败:", e);return false;}}/*** 监听一个节点** @param nodePath 被监听节点路径* @return {@link }*/public boolean addWatchNodeListener(String nodePath) {CuratorCache curatorCache = CuratorCache.builder(client, nodePath).build();CuratorCacheListener listener = CuratorCacheListener.builder().forNodeCache(new NodeCacheListener() {@Overridepublic void nodeChanged() throws Exception {log.info("监听到节点变动");//TODO}}).build();curatorCache.listenable().addListener(listener);curatorCache.start();return true;}/*** 监听子孙节点 支持子节点的子节点监听* TreeCache监听节点自己和所有子节点们** @param nodePath 被监听节点路径* @param processer 监听后处理* @return {@link }*/public boolean addWatchTreeListener(String nodePath, AbstractListenerProcess processer) {CuratorCache curatorCache = CuratorCache.builder(client, nodePath).build();CuratorCacheListener listener = CuratorCacheListener.builder().forTreeCache(client, new TreeCacheListener() {@Overridepublic void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) {log.info("监听到子节点变动,变动类型:{}", treeCacheEvent.getType().toString());processer.process(curatorFramework, treeCacheEvent);}}).build();curatorCache.listenable().addListener(listener);curatorCache.start();return true;}
}
package com.jinyi.up.zk.service;import com.jinyi.up.zk.process.AbstractListenerProcess;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.springframework.stereotype.Service;import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;/*** @author huangchong* @date 2024/3/5 21:39* @desc*/
@Slf4j
@Service
public class ZookeeperLockService {@Resourceprivate InterProcessMutex distributedLock;public void doProtectedOperation() throws Exception {//acquire()方法获取锁distributedLock.acquire();try {// 执行需要保护的代码块} finally {distributedLock.release();}}
}
5.watcher机制事件处理抽象封装
package com.jinyi.up.zk.process;import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;/*** @author huangchong* @date 2024/3/5 21:58* @desc*/
public abstract class AbstractListenerProcess {/*** 处理监听节点自己和所有子节点们变更事件** @param client zk客户端* @param event 子节点事件* @return {@link }*/public abstract void process(CuratorFramework client, TreeCacheEvent event);
}
package com.jinyi.up.zk.process;import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;/*** @author huangchong* @date 2024/3/5 21:58* @desc*/
@Slf4j
public class WatcherTreeListenerProcess extends AbstractListenerProcess{/*** 实际处理监听节点自己和所有子节点们变更事件** @param client zk客户端* @param event 子节点事件* @return {@link }*/@Overridepublic void process(CuratorFramework client, TreeCacheEvent event) {//事件pathString path = event.getData().getPath();switch (event.getType()) {case NODE_ADDED:log.info("新增子节点:" + path);break;case NODE_UPDATED:log.info("更新子节点:" + path);break;case NODE_REMOVED:log.info("删除子节点:" + path);break;default:break;}}
}
6.基本操作的单元测试代码
package com.jinyi.zookeeper;import com.jinyi.up.zk.ZookeeperApplication;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
///此处classes内的内容是@SpringBootApplication入口
@SpringBootTest(classes = {ZookeeperApplication.class})
public abstract class BaseZkBootTest {
}
package com.jinyi.zookeeper;import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.junit.Test;import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;/*** @author huangchong* @date 2024/3/5 21:00* @desc*/
@Slf4j
public class ZookeeperBaseTest extends BaseZkBootTest {@Resourceprivate CuratorFramework client;@Testpublic void testAddPersistentNode() throws Exception {// 创建一个持久化节点/persistent_node,断开连接时不会自动删除client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/persistent_node");}@Testpublic void testZnodeExists() throws Exception {// 判断节点是否存在,persistent_node2不存在所以stat2是nullStat stat = client.checkExists().forPath("/persistent_node");log.info(String.valueOf(stat));Stat stat2 = client.checkExists().forPath("/persistent_node2");log.info(String.valueOf(stat2));}@Testpublic void testSetData() throws Exception {// 设置节点数据client.setData().forPath("/persistent_node", "persistent_node_data".getBytes(StandardCharsets.UTF_8));}@Testpublic void testCreateAndSet() throws Exception {// 创建一个持久化节点并设置节点数据client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("//persistent_node1", "persistent_node_data1".getBytes(StandardCharsets.UTF_8));}@Testpublic void testGetData() throws Exception {// 查询节点数据byte[] data = client.getData().forPath("/persistent_node1");log.info(new String(data, StandardCharsets.UTF_8));}@Testpublic void testDelete() throws Exception {// 删除节点client.delete().guaranteed().deletingChildrenIfNeeded().forPath("/persistent_node1");}@Testpublic void testReadLock() throws Exception {// 读写锁-读InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock-read");lock.readLock().acquire();log.info("获取-ReadLock");lock.readLock().release();}@Testpublic void testWriteLock() throws Exception {// 读写锁-写InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock-write");lock.writeLock().acquire();log.info("获取-WriteLock");lock.writeLock().release();}
}