Zookeeper分布式队列实战

目录

Zookeeper分布式队列

普通方式实现

设计思路

具体实现

使用Curator实现

具体实现

注意事项


Zookeeper分布式队列

       常见的消息队列有:RabbitMQ,RocketMQ,Kafka等。Zookeeper作为一个分布式的小文件管理系统,同样能实现简单的队列功能。Zookeeper不适合大数据量存储,官方并不推荐作为队列使用,但由于实现简单,集群搭建较为便利,因此在一些吞吐量不高的小型系统中是比较好用的。

普通方式实现

设计思路

     

1.创建队列根节点
       在Zookeeper中创建一个持久节点,用作队列的根节点。所有队列元素的节点将放在这个根节点下。
2.实现入队操作
       当需要将一个元素添加到队列时,可以在队列的根节点下创建一个临时有序节点。节点的数据可以包含队列元素的信息。
3.实现出队操作
       当需要从队列中取出一个元素时,先获取根节点下的所有子节点。再找到具有最小序号的子节点,获取该节点的数据,删除该节点,然后返回节点的数据。

具体实现
/*** 入队* @param data* @throws Exception*/
public void enqueue(String data) throws Exception {// 创建临时有序子节点zk.create(QUEUE_ROOT + "/queue-", data.getBytes(StandardCharsets.UTF_8),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
}/*** 出队* @return* @throws Exception*/
public String dequeue() throws Exception {while (true) {List<String> children = zk.getChildren(QUEUE_ROOT, false);if (children.isEmpty()) {return null;}Collections.sort(children);for (String child : children) {String childPath = QUEUE_ROOT + "/" + child;try {byte[] data = zk.getData(childPath, false, null);zk.delete(childPath, -1);return new String(data, StandardCharsets.UTF_8);} catch (KeeperException.NoNodeException e) {// 节点已被其他消费者删除,尝试下一个节点}}}
}

使用Curator实现

Curator是一个ZooKeeper客户端的封装库,提供了许多高级功能,包括分布式队列。

具体实现
public class CuratorDistributedQueueDemo {private static final String QUEUE_ROOT = "/curator_distributed_queue";public static void main(String[] args) throws Exception {CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181",new ExponentialBackoffRetry(1000, 3));client.start();// 定义队列序列化和反序列化QueueSerializer<String> serializer = new QueueSerializer<String>() {@Overridepublic byte[] serialize(String item) {return item.getBytes();}@Overridepublic String deserialize(byte[] bytes) {return new String(bytes);}};// 定义队列消费者QueueConsumer<String> consumer = new QueueConsumer<String>() {@Overridepublic void consumeMessage(String message) throws Exception {System.out.println("消费消息: " + message);}@Overridepublic void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {}};// 创建分布式队列DistributedQueue<String> queue = QueueBuilder.builder(client, consumer, serializer, QUEUE_ROOT).buildQueue();queue.start();// 生产消息for (int i = 0; i < 5; i++) {String message = "Task-" + i;System.out.println("生产消息: " + message);queue.put(message);Thread.sleep(1000);}Thread.sleep(10000);queue.close();client.close();}
}
注意事项

       使用Curator的DistributedQueue时,默认情况下不使用锁。当调用QueueBuilder的lockPath()方法并指定一个锁节点路径时,才会启用锁。如果不指定锁节点路径,那么队列操作可能会受到并发问题的影响。

       在创建分布式队列时,指定一个锁节点路径可以帮助确保队列操作的原子性和顺序性。分布式环境中,多个消费者可能同时尝试消费队列中的消息。如果不使用锁来同步这些操作,可能会导致消息被多次处理或者处理顺序出现混乱。如果应用场景允许消息被多次处理,或者处理顺序不是关键问题,那么可以不使用锁。这样可以提高队列操作的性能,因为不再需要等待获取锁。

// 创建分布式队列
QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, serializer, "/order");
//指定了一个锁节点路径/orderlock,用于实现分布式锁,以保证队列操作的原子性和顺序性。
queue = builder.lockPath("/orderlock").buildQueue();
//启动队列,这时队列开始监听ZooKeeper中/order节点下的消息。
queue.start();

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/659495.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

PCB过孔过电流能力计算

PCB&#xff08;印刷电路板&#xff09;过孔的过电流能力计算通常基于以下几个关键参数&#xff1a; 过孔直径&#xff08;D&#xff09;&#xff1a;过孔的直径决定了其有效导电截面积&#xff0c;进而影响载流能力。 铜厚度&#xff08;t&#xff09;&#xff1a;内层或外层…

ImportError: You must install pydot (`pip install pydot`) and install graphviz

1、安装pydot pip install pydot2、安装cudnn 官方下载页面 下载解压后&#xff0c;复制bin、lib、include的三个文件夹到到cuda安装路径中&#xff0c;这是我的cuda路径&#xff1a; C:\Program Files\NVIDIA GPU Computing Toolkit\CUDA\v11.5直接复制粘贴这三个文件夹&a…

从零学习Linux操作系统 第二十三部分 系统中火墙的管理及优化

1 、实验环境设定 第一台主机需要两个网卡 另一台主机一个网卡桥接到VMnet0上 第一台主机保证能够和windows保持连接 设定第一块儿网卡能够与Windows连接 设定第二台主机能够与第一台主机连接 二、火墙中的基本名词及知识 火墙就相当于是一个表格&#xff0c;这个表格里写…

svn 安装路径

SVN客户端安装&#xff08;超详细&#xff09; 一、SVN客户端安装 1、下载安装包地址&#xff1a;https://tortoisesvn.net/downloads.html 此安装包是英文版的&#xff0c;还可以下载一个语言包&#xff0c;在同界面的下方 一直点击下一步&#xff0c;直到弹出选择红框 然…

QuertWrapper and 和or 用法

1.使用 MyBatis Plus 实现上述 SQL 查询条件可以按照以下步骤进行&#xff1a; 创建一个 QueryWrapper 对象&#xff1a;QueryWrapper<Entity> queryWrapper new QueryWrapper<>();使用 eq 方法添加等于条件和 and 条件&#xff1a;queryWrapper.eq("age&qu…

jsonwebtoken使用HS256生成token失败

项目场景&#xff1a; 用户登入将token返回给用户 问题描述 在koa中使用jsonwebtoken库生成token失败&#xff0c;找了很多原因。 const jwt require("jsonwebtoken"); const { PRIVATE_KEY } require("../config/screct");class LoginController {as…

3. Mybatis的XML配置文件(重点)

目录 1 Mybatis的XML配置文件 1.1 XML配置文件规范 1.2 XML配置文件实现 1.3 MybatisX的使用 2. Mybatis动态SQL 2.1 什么是动态SQL 2.2 动态SQL-if 2.2.1 条件查询 2.2.2更新 2.3 动态SQL-foreach 2.4 动态SQL-sql&include 1.mybatis入门 2.mybatis基本操作 1…

AI智能分析+明厨亮灶智慧管理平台助力“舌尖上的安全”

春节是中国最重要的传统节日之一&#xff0c;在春节期间&#xff0c;人们聚餐需求激增&#xff0c;餐饮业也迎来了高峰期。在这个时期&#xff0c;餐饮企业需要更加注重食品安全和卫生质量&#xff0c;以保证消费者的健康和权益&#xff0c;明厨亮灶智慧管理成为了餐饮业中备受…

记一次复杂左连接的优化之路

慢执行分析 create table t3 as select t_1.lon as lon, t_1.lat as lat, t_1.label as label, t_1.is_core as is_core, t_2.grid_id as grid_id, t_2.mid_jd as mid_jd, t_2.mid_wd as mid_wd, t_2.zs_jd as zs_jd, t_2.zs_wd as zs_wd, t_2.yx_jd as yx_jd, t_2.yx_wd as y…

基于单片机温度控制系统的研究

摘 要&#xff1a;笔者基于单片机的温度控制系统&#xff0c;从单片机选择、传感器选择、系统框架设计等方面概述了单片机的温度控制系统内涵&#xff0c;分析了其运行原理&#xff0c;列举了单片机温度控制系统设计的实操方法&#xff0c;从硬件系统、软件系统、温度检测方法…

windows 11安装跳过联网,使用本地账户登陆

windows 11安装跳过联网&#xff0c;使用本地账户登陆 第一步 断开网络&#xff0c;拔网线 第二步 安装windows11 第三步 shiftF10调出命令行 第四步 输入命令&#xff1a; OOBE\BYPASSNRO回车自动重启&#xff0c;随后继续安装选择我没有网络&#xff0c;即可跳过win…

springboot144基于mvc的高校办公室行政事务管理系统设计与实现

简介 【毕设源码推荐 javaweb 项目】基于springbootvue 的 适用于计算机类毕业设计&#xff0c;课程设计参考与学习用途。仅供学习参考&#xff0c; 不得用于商业或者非法用途&#xff0c;否则&#xff0c;一切后果请用户自负。 看运行截图看 第五章 第四章 获取资料方式 **项…

二叉搜索树操作题目:删除二叉搜索树中的结点

文章目录 题目标题和出处难度题目描述要求示例数据范围进阶 解法一思路和算法代码复杂度分析 解法二思路和算法代码复杂度分析 题目 标题和出处 标题&#xff1a;删除二叉搜索树中的结点 出处&#xff1a;450. 删除二叉搜索树中的结点 难度 5 级 题目描述 要求 给定二叉…

Ubuntu Linux 下安装和卸载cmake 3.28.2版本

一、安装cmake 1.首先&#xff0c;先从cmake官网下载cmake-3.28.2-linux-x86_64.tar.gz 2.用FinalShell 等文件上传工具&#xff0c;将这个压缩包上传到 虚拟机的某个路径去&#xff08;自选&#xff09; 3. cd /usr/local/bin/&#xff0c;然后创建cmake文件夹&#xff0c;…

pnpm : 无法加载文件 D:\tool\nvm\nvm\node_global\pnpm.ps1,因为在此系统上禁止运行脚本

你们好&#xff0c;我是金金金。 场景 新创建的项目&#xff0c;在vscode编辑器终端输入 pnpm i&#xff0c;显示报错如上 解决 在终端输入get-ExecutionPolicy(查看执行策略/权限) 输出Restricted(受限的) 终端再次输入Set-ExecutionPolicy -Scope CurrentUser命令给用户赋予…

STM32低功耗模式

一、低功耗模式介绍 STM32 的低功耗模式有 3 种&#xff1a; 1)睡眠模式&#xff08;CM3 内核停止&#xff0c;外设仍然运行&#xff09; 2)停止模式&#xff08;所有时钟都停止&#xff09; 3)待机模式&#xff08;1.8V 内核电源关闭&#xff09; 在这三种低功耗模式中&#…

Vue3项目封装一个Element-plus Pagination分页

前言:后台系统分页肯定是离不开的,但是ui框架都很多,我们可以定义封装一种格式,所有项目按到这个结构来做. 实例: 第一步:在项目components组件新建一个分页组件,用来进行封装组件. 第二步:根据官方的进行定义,官方提供的这些,需要我们封装成动态模式 第三步:代码改造 <!-…

软件工程知识梳理0-概述

学好软件工程就必须理解软件工程到底是干什么的&#xff0c;为什么需要软件工程&#xff0c;以及怎么干的&#xff01;只有理解了软件工程的本质&#xff0c;才能更好的理解软件工程中各种工程手段和方法的目的。 个人开发模式 —> 小作坊开发模式 —> 软件工程开发模式 …

zoneId、ZoneOffset、Date、LocalDateTime、ZonedDateTime、OffsetDateTime的区别

1、zoneId 2、ZoneOffset继承了zoneId 3、ZoneOffset 和 TimeZone区别 ZoneOffset 和 TimeZone 是 Java 编程语言中处理时区信息的两个不同的类。 ZoneOffset 类&#xff1a; ZoneOffset 是 Java 8 中引入的日期时间 API 的一部分&#xff0c;位于 java.time 包中。 它代表…

Unity之第一人称角色控制

目录 第一人称角色控制 &#x1f634;1、准备工作 &#x1f4fa;2、鼠标控制摄像机视角 &#x1f3ae;3、角色控制 &#x1f603;4.杂谈 第一人称角色控制 专栏Unity之动画和角色控制-CSDN博客的这一篇也有讲到角色控制器&#xff0c;是第三人称视角的&#xff0c;以小编…