Zookeeper Java客户端实战
ZooKeeper应用的开发主要通过Java客户端API去连接和操作ZooKeeper集群。
-
ZooKeeper官方的Java客户端API。
-
第三方的Java客户端API,比如Curator。
ZooKeeper官方的客户端API提供了基本的操作:创建会话、创建节点、读取节点、更新数据、删除节点和检查节点是否存在等。
对于实际开发来说,ZooKeeper官方API有一些不足之处:
-
ZooKeeper的Watcher监测是一次性的,每次触发之后都需要重新进行注册。
-
会话超时之后没有实现重连机制。
-
异常处理烦琐,ZooKeeper提供了很多异常,对于开发人员来说可能根本不知道应该如何处理这些抛出的异常。
-
仅提供了简单的byte[]数组类型的接口,没有提供Java POJO级别的序列化数据处理接口。
-
创建节点时如果抛出异常,需要自行检查节点是否存在。
-
无法实现级联删除。
总之,ZooKeeper官方API功能比较简单,在实际开发过程中比较笨重,一般不推荐使用。
Zookeeper 原生Java客户端使用
引入zookeeper client依赖
<!-- zookeeper client -->
<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.8.0</version>
</dependency>
注意:保持与服务端版本一致,避免兼容性的问题
ZooKeeper常用构造器
ZooKeeper (connectString, sessionTimeout, watcher)
参数 | 描述 |
---|---|
connectString | 逗号分隔的列表,每个ZooKeeper节点是一个host.port对,host 是机器名或者IP地址,port是ZooKeeper节点对客户端提供服务的端口号。客户端会任意选取connectString 中的一个节点建立连接。 |
sessionTimeout | session timeout时间。 |
watcher | 接收到来自ZooKeeper集群的事件。 |
使用 zookeeper 原生 API,连接zookeeper集群
public class ZkClientDemo
{private static final String CONNECT_STR = "你的公网IP:2181";private final static String CLUSTER_CONNECT_STR = "192.168.65.156:2181,192.168.65.190:2181,192.168.65.200:2181";public static void main(String[] args)throws Exception{final CountDownLatch countDownLatch = new CountDownLatch(1);ZooKeeper zooKeeper = new ZooKeeper(CONNECT_STR, 4000, new Watcher(){@Overridepublic void process(WatchedEvent event){if (Event.KeeperState.SyncConnected == event.getState() && event.getType() == Event.EventType.None){// 如果收到了服务端的响应事件,连接成功countDownLatch.countDown();System.out.println("连接建立");}}});System.out.printf("连接中");countDownLatch.await();// CONNECTEDSystem.out.println(zooKeeper.getState());// 创建持久节点zooKeeper.create("/user", "gao".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}
}
Zookeeper主要方法
方法 | 功能 |
---|---|
create(path, data, acl,createMode) | 创建一个给定路径的 znode,并在 znode 保存 data[]的 数据,createMode指定 znode 的类型。 |
delete(path, version) | 如果给定 path 上的 znode 的版本和给定的 version 匹配, 删除 znode |
exists(path, watch) | 判断给定 path 上的 znode 是否存在,并在 znode 设置一个 watch |
getData(path, watch) | 返回给定 path 上的 znode 数据,并在 znode 设置一个 watch。 |
setData(path, data, version) | 如果给定 path 上的 znode 的版本和给定的 version 匹配,设置 znode 数据。 |
getChildren(path, watch) | 返回给定 path 上的 znode 的子 znode 名字,并在 znode 设置一个 watch。 |
sync(path) | 把客户端 session 连接节点和 leader 节点进行同步。 |
方法特点:
-
所有获取 znode 数据的 API 都可以设置一个 watch 用来监控 znode 的变化。
-
所有更新 znode 数据的 API 都有两个版本: 无条件更新版本和条件更新版本。如果 version 为 -1,更新为无条件更新。否则只有给定的 version 和 znode 当前的 version 一样,才会进行更新。
-
所有的方法都有同步和异步两个版本。同步版本的方法发送请求给 ZooKeeper 并等待服务器的响应。异步版本把请求放入客户端的请求队列,然后马上返回。异步版本通过 callback 来接受来自服务端的响应。
同步创建节点:
public void createTest() throws KeeperException, InterruptedException
{String path = zooKeeper.create(ZK_NODE, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);log.info("created path: {}",path);
}
异步创建节点:
public void createAsycTest() throws InterruptedException
{zooKeeper.create(ZK_NODE, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT,(rc, path, ctx, name) -> log.info("rc {},path {},ctx {},name {}",rc,path,ctx,name),"context");TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
}
修改数据:
public void setTest() throws KeeperException, InterruptedException
{Stat stat = new Stat();byte[] data = zooKeeper.getData(ZK_NODE, false, stat);log.info("修改前: {}",new String(data));zooKeeper.setData(ZK_NODE, "changed!".getBytes(), stat.getVersion());byte[] dataAfter = zooKeeper.getData(ZK_NODE, false, stat);log.info("修改后: {}",new String(dataAfter));
}
Curator开源客户端使用
Curator是Netflix公司开源的一套ZooKeeper客户端框架,和ZkClient一样它解决了非常底层的细节开发工作,包括连接、重连、反复注册Watcher的问题以及NodeExistsException异常等。
Curator是Apache基金会的顶级项目之一,Curator具有更加完善的文档,另外还提供了一套易用性和可读性更强的Fluent风格的客户端API框架。
Curator还为ZooKeeper客户端框架提供了一些比较普遍的、开箱即用的、分布式开发用的解决方案,例如Recipe、共享锁服务、Master选举机制和分布式计算器等,帮助开发者避免了“重复造轮子”的无效开发工作。
在实际的开发场景中,使用Curator客户端就足以应付日常的ZooKeeper集群操作的需求。
官网:Apache Curator
引入依赖
-
curator-framework是对ZooKeeper的底层API的一些封装。
-
curator-client提供了一些客户端的操作,例如重试策略等。
-
curator-recipes封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等。
<!-- zookeeper client -->
<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.8.0</version>
</dependency><!--curator-->
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>5.1.0</version><exclusions><exclusion><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></exclusion></exclusions>
</dependency>
创建一个客户端实例
使用curator-framework包操作ZooKeeper前,首先要创建一个客户端实例(CuratorFramework类型的对象)
-
使用工厂类CuratorFrameworkFactory的静态newClient()方法
//重试策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3)
//创建客户端实例
CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
//启动客户端
client.start();
-
使用工厂类CuratorFrameworkFactory的静态builder构造者方法
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.128.129:2181").sessionTimeoutMs(5000) // 会话超时时间.connectionTimeoutMs(5000) // 连接超时时间.retryPolicy(retryPolicy).namespace("base") // 包含隔离名称.build();
client.start();
-
connectionString:服务器地址列表,一个或多个。(多个地址列表用逗号分隔)
-
retryPolicy:重试策略,当客户端异常退出或者与服务端失去连接的时候,可以通过设置客户端重新连接 ZooKeeper 服务端。( Curator 内部,通过判断服务器返回的 keeperException 状态代码判断是否重试)
策略名称 | 描述 |
---|---|
ExponentialBackoffRetry | 重试一组次数,重试之间的睡眠时间增加 |
RetryNTimes | 重试最大次数 |
RetryOneTime | 只重试一次 |
RetryUntilElapsed | 在给定的时间结束之前重试 |
-
超时时间:Curator 客户端创建过程中,有两个超时时间的设置。一个是 sessionTimeoutMs 会话超时时间,用来设置该条会话在 ZooKeeper 服务端的失效时间。另一个是 connectionTimeoutMs 客户端创建会话的超时时间,用来限制客户端发起一个会话连接到接收 ZooKeeper 服务端应答的时间。
创建节点
public void testCreate() throws Exception
{String path = curatorFramework.create().forPath("/curator-node");curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/curator-node","some-data".getBytes())log.info("curator create node :{} successfully.",path);
}
一次性创建带层级结构的节点
public void testCreateWithParent() throws Exception
{String pathWithParent="/node-parent/sub-node-1";String path = curatorFramework.create().creatingParentsIfNeeded().forPath(pathWithParent);log.info("curator create node :{} successfully.",path);
}
获取数据
public void testGetData() throws Exception
{byte[] bytes = curatorFramework.getData().forPath("/curator-node");log.info("get data from node :{} successfully.",new String(bytes));
}
更新节点
public void testSetData() throws Exception
{curatorFramework.setData().forPath("/curator-node","changed!".getBytes());byte[] bytes = curatorFramework.setData().forPath("/curator-node");log.info("get data from node /curator-node :{} successfully.",new String(bytes));
}
删除节点
public void testDelete() throws Exception
{String pathWithParent="/node-parent";curatorFramework.delete().guaranteed().deletingChildrenIfNeeded().forPath(pathWithParent);
}
guaranteed:保障删除成功,底层工作方式是:只要该客户端的会话有效,就会在后台持续发起删除请求,直到该数据节点在 ZooKeeper 服务端被删除。
deletingChildrenIfNeeded:指定了该函数后,系统在删除该数据节点的时候会以递归的方式直接删除其子节点,以及子节点的子节点。
异步接口
public interface BackgroundCallback
{/*** Called when the async background operation completes** @param client the client* @param event operation result details* @throws Exception errors*/public void processResult(CuratorFramework client, CuratorEvent event) throws Exception;
}
默认在 EventThread 中调用
public void test() throws Exception
{//inBackground 异步处理默认在EventThread中执行curatorFramework.getData().inBackground((item1, item2) -> {log.info(" background: {}", item2);}).forPath(ZK_NODE);TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
}
指定线程池
public void test() throws Exception
{ExecutorService executorService = Executors.newSingleThreadExecutor();curatorFramework.getData().inBackground((item1, item2) -> {log.info(" background: {}", item2);},executorService).forPath(ZK_NODE);TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
}
Curator 监听器
/*** Receives notifications about errors and background events*/
public interface CuratorListener
{/*** Called when a background task has completed or a watch has triggered** @param client client* @param event the event* @throws Exception any errors*/public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception;
}
Curator 引入了 Cache 来实现对 Zookeeper 服务端事件监听,Cache 提供了反复注册的功能。Cache 分为两类注册类型:节点监听和子节点监听。
node cache:NodeCache 对某一个节点进行监听
@Slf4j
public class NodeCacheTest extends AbstractCuratorTest{public static final String NODE_CACHE="/node-cache";@Testpublic void testNodeCacheTest() throws Exception {createIfNeed(NODE_CACHE);NodeCache nodeCache = new NodeCache(curatorFramework, NODE_CACHE);nodeCache.getListenable().addListener(new NodeCacheListener() {@Overridepublic void nodeChanged() throws Exception {log.info("{} path nodeChanged: ",NODE_CACHE);printNodeData();}});nodeCache.start();}public void printNodeData() throws Exception {byte[] bytes = curatorFramework.getData().forPath(NODE_CACHE);log.info("data: {}",new String(bytes));}
}
path cache: PathChildrenCache 会对子节点进行监听,但是不会对二级子节点进行监听,
@Slf4j
public class PathCacheTest extends AbstractCuratorTest{public static final String PATH="/path-cache";@Testpublic void testPathCache() throws Exception {createIfNeed(PATH);PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorFramework, PATH, true);pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {@Overridepublic void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {log.info("event: {}",event);}});// 如果设置为true则在首次启动时就会缓存节点内容到Cache中pathChildrenCache.start(true);}
}
tree cache:TreeCache 使用一个内部类TreeNode来维护这个一个树结构。并将这个树结构与ZK节点进行了映射。所以TreeCache 可以监听当前节点下所有节点的事件。
@Slf4j
public class TreeCacheTest extends AbstractCuratorTest{public static final String TREE_CACHE="/tree-path";@Testpublic void testTreeCache() throws Exception {createIfNeed(TREE_CACHE);TreeCache treeCache = new TreeCache(curatorFramework, TREE_CACHE);treeCache.getListenable().addListener(new TreeCacheListener() {@Overridepublic void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {log.info(" tree cache: {}",event);}});treeCache.start();}
}
Zookeeper在分布式命名服务中的实战
ZooKeeper的命名服务主要是利用ZooKeeper节点的树形分层结构和子节点的顺序维护能力,来为分布式系统中的资源命名。
典型的分布式命名服务有:
-
分布式API目录
-
分布式节点命名
-
分布式ID生成器
分布式API目录
为分布式系统中各种API接口服务的名称、链接地址,提供类似JNDI(Java命名和目录接口)中的文件系统的功能。借助于ZooKeeper的树形分层结构就能提供分布式的API调用功能。
著名的Dubbo分布式框架就是应用了ZooKeeper的分布式的JNDI功能。在Dubbo中,使用ZooKeeper维护的全局服务接口API的地址列表。大致的思路为:
-
服务提供者(Service Provider)在启动的时候,向ZooKeeper上的指定节点/dubbo/${serviceName}/providers写入自己的API地址,这个操作就相当于服务的公开。
-
服务消费者(Consumer)启动的时候,订阅节点/dubbo/{serviceName}/providers下的服务提供者的URL地址,获得所有服务提供者的API。
分布式节点命名
一个分布式节点通常有很多节点,并且节点数量不固定。业务膨胀和流量洪峰会有新的节点加入集群,服务故障和网络波动等原因有节点退出集群。
那么分布式中大量的节点要如何命名呢?
可用于生成集群节点的编号的方案:
1、使用数据库的自增ID特性,用数据表存储机器的MAC地址或者IP来维护。
2、使用ZooKeeper持久顺序节点的顺序特性来维护节点的NodeId编号。
-
启动节点服务,连接ZooKeeper,检查命名服务根节点是否存在,如果不存在,就创建系统的根节点。
-
在根节点下创建一个临时顺序ZNode节点,取回ZNode的编号把它作为分布式系统中节点的NODEID。
-
如果临时节点太多,可以根据需要删除临时顺序ZNode节点。
分布式ID生成器
分布式ID生成器的使用场景:
-
大量的数据记录,需要分布式ID。
-
大量的系统消息,需要分布式ID。
-
大量的请求日志,如restful的操作记录,需要唯一标识,以便进行后续的用户行为分析和调用链路分析。
-
分布式节点的命名服务,往往也需要分布式ID。
分布式ID生成系统需要满足条件:
1、全局唯一:不能出现重复ID。
2、高可用:ID生成系统是基础系统,被许多关键系统调用,一旦宕机,就会造成严重影响。
分布式的ID生成器方案有:
1、Java的UUID。
2、分布式缓存Redis生成ID:利用Redis的原子操作INCR和INCRBY,生成全局唯一的ID。
3、Twitter的SnowFlake算法。
4、ZooKeeper生成ID:利用ZooKeeper的顺序节点,生成全局唯一的ID。
5、MongoDb的ObjectId:MongoDB是一个分布式的非结构化NoSQL数据库,每插入一条记录会自动生成全局唯一的一个“_id”字段值,它是一个12字节的字符串,可以作为分布式系统中全局唯一的ID。
基于Zookeeper实现分布式ID生成器
ZooKeeper具备自动编号能力的节点类型:
-
PERSISTENT_SEQUENTIAL 持久化顺序节点
-
EPHEMERAL_SEQUENTIAL 临时顺序节点
ZooKeeper的每一个节点都会为它的第一级子节点维护一份顺序编号,记录每个子节点创建的先后顺序,顺序编号分布式同步且全局唯一。
通过创建ZooKeeper的临时顺序节点的方法,生成全局唯一的ID
@Slf4j
public class IDMaker extends CuratorBaseOperations
{private String createSeqNode(String pathPefix) throws Exception{CuratorFramework curatorFramework = getCuratorFramework();// 创建一个临时顺序节点String destPath = curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(pathPefix);return destPath;}public String makeId(String path) throws Exception{String str = createSeqNode(path);if (null != str){// 获取末尾的序号int index = str.lastIndexOf(path);if (index >= 0){index += path.length();return index <= str.length() ? str.substring(index) : "";}}return str;}// 测试public static void main(String[] args) throws InterruptedException{IDMaker idMaker = new IDMaker();String pathPrefix = "/idmarker/id-";for (int i = 0; i < 5; i++){new Thread(() -> {for (int j = 0; j < 10; j++){String id = null;try{id = idMaker.makeId(pathPrefix);log.info("{}线程第{}个创建的id为{}", Thread.currentThread().getName(), j, id);}catch (Exception e){e.printStackTrace();}}}, "thread" + i).start();}Thread.sleep(Integer.MAX_VALUE);}
}public class CuratorBaseOperations
{public CuratorFramework getCuratorFramework(){RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client = CuratorFrameworkFactory.builder().connectString("你的公网IP:2181").sessionTimeoutMs(5000) // 会话超时时间.connectionTimeoutMs(5000) // 连接超时时间.retryPolicy(retryPolicy).namespace("base") // 包含隔离名称.build();client.start();return client;}
}
基于Zookeeper实现SnowFlakeID算法
Twitter(推特)的SnowFlake算法是一种著名的分布式服务器用户ID生成算法。SnowFlake算法所生成的ID是一个64bit的长整型数字。这个64bit被划分成四个部分,其中后面三个部分分别表示时间戳、工作机器ID、序列号。
SnowFlakeID分为四个部分:
(1)第一位 占用1 bit,其值始终是0,没有实际作用。
(2)时间戳 占用41 bit,精确到毫秒,总共可以容纳约69年的时间。
(3)工作机器id占用10 bit,最多可以容纳1024个节点。
(4)序列号 占用12 bit。这个值在同一毫秒同一节点上从0开始不断累加,最多可以累加到4095。
在工作节点达到1024顶配的场景下,SnowFlake算法在同一毫秒最多可以生成的ID数量为:1024 * 4096 =4194304
,在绝大多数并发场景下都是够用的。
SnowFlake算法的优点:
-
生成ID时不依赖于数据库,完全在内存生成,高性能和高可用性。
-
容量大,每秒可生成几百万个ID。
-
ID呈趋势递增,后续插入数据库的索引树时,性能较高。
SnowFlake算法的缺点:
-
依赖于系统时钟的一致性,如果某台机器的系统时钟回拨了,有可能造成ID冲突,或者ID乱序。
-
在启动之前,如果这台机器的系统时间回拨过,那么有可能出现ID重复的危险。
基于zookeeper实现雪花算法:
public class SnowflakeIdGenerator
{/*** 单例*/public static SnowflakeIdGenerator INSTANCE = new SnowflakeIdGenerator();/*** 初始化单例** @param workerId 节点Id,最大8091* @return the 单例*/public synchronized void init(long workerId){if (workerId > MAX_WORKER_ID){// zk分配的workerId过大throw new IllegalArgumentException("woker Id wrong: " + workerId);}INSTANCE.workerId = workerId;}private SnowflakeIdGenerator(){}/*** 开始使用该算法的时间为: 2017-01-01 00:00:00*/private static final long START_TIME = 1483200000000L;/*** worker id 的bit数,最多支持8192个节点*/private static final int WORKER_ID_BITS = 13;/*** 序列号,支持单节点最高每毫秒的最大ID数1024*/private static final int SEQUENCE_BITS = 10;/*** 最大的 worker id ,8091 -1 的补码(二进制全1)右移13位, 然后取反*/private static final long MAX_WORKER_ID = ~(-1L << WORKER_ID_BITS);/*** 最大的序列号,1023 -1 的补码(二进制全1)右移10位, 然后取反*/private static final long MAX_SEQUENCE = ~(-1L << SEQUENCE_BITS);/*** worker 节点编号的移位*/private static final long WORKER_ID_SHIFT = SEQUENCE_BITS;/*** 时间戳的移位*/private static final long TIMESTAMP_LEFT_SHIFT = WORKER_ID_BITS + SEQUENCE_BITS;/*** 该项目的worker 节点 id*/private long workerId;/*** 上次生成ID的时间戳*/private long lastTimestamp = -1L;/*** 当前毫秒生成的序列*/private long sequence = 0L;/*** Next id long.** @return the nextId*/public Long nextId(){return generateId();}/*** 生成唯一id的具体实现*/private synchronized long generateId(){long current = System.currentTimeMillis();if (current < lastTimestamp){// 如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过,出现问题返回-1return -1;}if (current == lastTimestamp){// 如果当前生成id的时间还是上次的时间,那么对sequence序列号进行+1sequence = (sequence + 1) & MAX_SEQUENCE;if (sequence == MAX_SEQUENCE){// 当前毫秒生成的序列数已经大于最大值,那么阻塞到下一个毫秒再获取新的时间戳current = this.nextMs(lastTimestamp);}}else{// 当前的时间戳已经是下一个毫秒sequence = 0L;}// 更新上次生成id的时间戳lastTimestamp = current;// 进行移位操作生成int64的唯一ID// 时间戳右移动23位long time = (current - START_TIME) << TIMESTAMP_LEFT_SHIFT;// workerId 右移动10位long workerId = this.workerId << WORKER_ID_SHIFT;return time | workerId | sequence;}/*** 阻塞到下一个毫秒*/private long nextMs(long timeStamp){long current = System.currentTimeMillis();while (current <= timeStamp){current = System.currentTimeMillis();}return current;}public static void main(String[] args){SnowflakeIdGenerator instance = SnowflakeIdGenerator.INSTANCE;instance.init(1000);System.out.println(instance.generateId());System.out.println(instance.nextId());}
}
zookeeper实现分布式队列
常见的消息队列有:RabbitMQ,RocketMQ,Kafka等。Zookeeper作为一个分布式的小文件管理系统,同样能实现简单的队列功能。
Zookeeper不适合大数据量存储,官方并不推荐作为队列使用,但由于实现简单,集群搭建较为便利,因此在一些吞吐量不高的小型系统中还是比较好用的。
设计思路
1、创建队列根节点:在Zookeeper中创建一个持久节点,用作队列的根节点。所有队列元素的节点将放在这个根节点下。
2、实现入队操作:当需要将一个元素添加到队列时,可以在队列的根节点下创建一个临时有序节点。节点的数据可以包含队列元素的信息。
3、实现出队操作:当需要从队列中取出一个元素时,可以执行以下操作:
-
获取根节点下的所有子节点。
-
找到具有最小序号的子节点。
-
获取该节点的数据。
-
删除该节点。
-
返回节点的数据。
/*** 入队* @param data* @throws Exception*/
public void enqueue(String data) throws Exception {// 创建临时有序子节点zk.create(QUEUE_ROOT + "/queue-", data.getBytes(StandardCharsets.UTF_8),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
}/*** 出队* @return* @throws Exception*/
public String dequeue() throws Exception {while (true) {List<String> children = zk.getChildren(QUEUE_ROOT, false);if (children.isEmpty()) {return null;}Collections.sort(children);for (String child : children) {String childPath = QUEUE_ROOT + "/" + child;try {byte[] data = zk.getData(childPath, false, null);zk.delete(childPath, -1);return new String(data, StandardCharsets.UTF_8);} catch (KeeperException.NoNodeException e) {// 节点已被其他消费者删除,尝试下一个节点}}}
}
使用Apache Curator实现分布式队列
Apache Curator是一个ZooKeeper客户端的封装库,提供了许多高级功能,其中就包括分布式队列。
public class CuratorDistributedQueueDemo {private static final String QUEUE_ROOT = "/curator_distributed_queue";public static void main(String[] args) throws Exception {CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181",new ExponentialBackoffRetry(1000, 3));client.start();// 定义队列序列化和反序列化QueueSerializer<String> serializer = new QueueSerializer<String>() {@Overridepublic byte[] serialize(String item) {return item.getBytes();}@Overridepublic String deserialize(byte[] bytes) {return new String(bytes);}};// 定义队列消费者QueueConsumer<String> consumer = new QueueConsumer<String>() {@Overridepublic void consumeMessage(String message) throws Exception {System.out.println("消费消息: " + message);}@Overridepublic void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {}};// 创建分布式队列DistributedQueue<String> queue = QueueBuilder.builder(client, consumer, serializer, QUEUE_ROOT).buildQueue();queue.start();// 生产消息for (int i = 0; i < 5; i++) {String message = "Task-" + i;System.out.println("生产消息: " + message);queue.put(message);Thread.sleep(1000);}Thread.sleep(10000);queue.close();client.close();}
}
注意事项
使用Curator的DistributedQueue时,默认情况下不使用锁。当调用QueueBuilder的lockPath()方法并指定一个锁节点路径时,才会启用锁。如果不指定锁节点路径,那么队列操作可能会受到并发问题的影响。
在创建分布式队列时,指定一个锁节点路径可以帮助确保队列操作的原子性和顺序性。分布式环境中,多个消费者可能同时尝试消费队列中的消息。如果不使用锁来同步这些操作,可能会导致消息被多次处理或者处理顺序出现混乱。如果应用场景允许消息被多次处理,或者处理顺序不是关键问题,可以不使用锁。这样可以提高队列操作的性能,因为不再需要等待获取锁。
// 创建分布式队列
QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, serializer, "/order");
//指定了一个锁节点路径/orderlock,用于实现分布式锁,以保证队列操作的原子性和顺序性。
queue = builder.lockPath("/orderlock").buildQueue();
//启动队列,这时队列开始监听ZooKeeper中/order节点下的消息。
queue.start();