Zookeeper实现分布式队列

目录

Zookeeper分布式队列

普通方式实现

设计思路

具体实现

使用Curator实现

具体实现

注意事项


Zookeeper分布式队列

       常见的消息队列有:RabbitMQ,RocketMQ,Kafka等。Zookeeper作为一个分布式的小文件管理系统,同样能实现简单的队列功能。Zookeeper不适合大数据量存储,官方并不推荐作为队列使用,但由于实现简单,集群搭建较为便利,因此在一些吞吐量不高的小型系统中是比较好用的。

普通方式实现

设计思路

     

1.创建队列根节点
       在Zookeeper中创建一个持久节点,用作队列的根节点。所有队列元素的节点将放在这个根节点下。
2.实现入队操作
       当需要将一个元素添加到队列时,可以在队列的根节点下创建一个临时有序节点。节点的数据可以包含队列元素的信息。
3.实现出队操作
       当需要从队列中取出一个元素时,先获取根节点下的所有子节点。再找到具有最小序号的子节点,获取该节点的数据,删除该节点,然后返回节点的数据。

具体实现
/*** 入队* @param data* @throws Exception*/
public void enqueue(String data) throws Exception {// 创建临时有序子节点zk.create(QUEUE_ROOT + "/queue-", data.getBytes(StandardCharsets.UTF_8),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
}/*** 出队* @return* @throws Exception*/
public String dequeue() throws Exception {while (true) {List<String> children = zk.getChildren(QUEUE_ROOT, false);if (children.isEmpty()) {return null;}Collections.sort(children);for (String child : children) {String childPath = QUEUE_ROOT + "/" + child;try {byte[] data = zk.getData(childPath, false, null);zk.delete(childPath, -1);return new String(data, StandardCharsets.UTF_8);} catch (KeeperException.NoNodeException e) {// 节点已被其他消费者删除,尝试下一个节点}}}
}

使用Curator实现

Curator是一个ZooKeeper客户端的封装库,提供了许多高级功能,包括分布式队列。

具体实现
public class CuratorDistributedQueueDemo {private static final String QUEUE_ROOT = "/curator_distributed_queue";public static void main(String[] args) throws Exception {CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181",new ExponentialBackoffRetry(1000, 3));client.start();// 定义队列序列化和反序列化QueueSerializer<String> serializer = new QueueSerializer<String>() {@Overridepublic byte[] serialize(String item) {return item.getBytes();}@Overridepublic String deserialize(byte[] bytes) {return new String(bytes);}};// 定义队列消费者QueueConsumer<String> consumer = new QueueConsumer<String>() {@Overridepublic void consumeMessage(String message) throws Exception {System.out.println("消费消息: " + message);}@Overridepublic void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {}};// 创建分布式队列DistributedQueue<String> queue = QueueBuilder.builder(client, consumer, serializer, QUEUE_ROOT).buildQueue();queue.start();// 生产消息for (int i = 0; i < 5; i++) {String message = "Task-" + i;System.out.println("生产消息: " + message);queue.put(message);Thread.sleep(1000);}Thread.sleep(10000);queue.close();client.close();}
}
注意事项

       使用Curator的DistributedQueue时,默认情况下不使用锁。当调用QueueBuilder的lockPath()方法并指定一个锁节点路径时,才会启用锁。如果不指定锁节点路径,那么队列操作可能会受到并发问题的影响。

       在创建分布式队列时,指定一个锁节点路径可以帮助确保队列操作的原子性和顺序性。分布式环境中,多个消费者可能同时尝试消费队列中的消息。如果不使用锁来同步这些操作,可能会导致消息被多次处理或者处理顺序出现混乱。如果应用场景允许消息被多次处理,或者处理顺序不是关键问题,那么可以不使用锁。这样可以提高队列操作的性能,因为不再需要等待获取锁。

// 创建分布式队列
QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, serializer, "/order");
//指定了一个锁节点路径/orderlock,用于实现分布式锁,以保证队列操作的原子性和顺序性。
queue = builder.lockPath("/orderlock").buildQueue();
//启动队列,这时队列开始监听ZooKeeper中/order节点下的消息。
queue.start();

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/657327.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【python】图形化开发pyqt6基本写法模板与基础控件属性方法整理

pyqt6的简介 首先呢Python有许多可以编写图形化界面的库&#xff0c;我们通常跟着教程的话最初会接触的tkinter&#xff0c;但是学习中会发现编写的图形化跟我们平常接触的软件有很大区别&#xff08;简单来说就是丑&#xff09;。 pyqt则是第三方库&#xff0c;在Python中算…

ETL怎么实现文件处理

在现代企业及各类组织的日常运作中&#xff0c;数据作为一种关键的信息资源&#xff0c;其管理和分析能力直接影响到决策效率与准确性。文件作为数据的主要载体&#xff0c;承载着从运营报告、客户记录、交易明细等各种类型的数据信息。这些海量且多样的文件数据在未经处理的情…

flask_django基于python的城市轨道交通公交线路查询系统vue

同时&#xff0c;随着信息社会的快速发展&#xff0c;城市轨道交通线路查询系统面临着越来越多的信息&#xff0c;因此很难获得他们对高效信息的需求&#xff0c;如何使用方便快捷的方式使查询者在广阔的海洋信息中查询&#xff0c;存储&#xff0c;管理和共享信息方面有效&…

C语言菜鸟入门·运算符(算数运算符,关系运算符,逻辑运算符,位运算符,赋值运算符,三目运算符)详细介绍

目录 ​编辑 1. 算术运算符 2. 关系运算符 3. 逻辑运算符 4. 位运算符 5. 赋值运算符 6. 杂项运算符 ↦ sizeof & 三元 6.1 sizeof&#xff08;&#xff09; 6.2 &取地址运算符 6.3 * 6.4 三目运算符 7. 运算符优先级 运算符是一种告诉编译器执行…

ElementUI组件:Link 文字链接

Link 文字链接 点击下载learnelementuispringboot项目源码 效果图 el-link.vue页面效果图 项目里el-link.vue文件代码 <script> export default {name: el_link }</script> <!--https://element.eleme.cn/#/zh-CN/component/link --> <template>&l…

嵌入式学习第十四天

1.结构体&#xff08;2&#xff09;: &#xff08;1&#xff09;结构体类型定义 &#xff08;2&#xff09;结构体变量的定义 &#xff08;3&#xff09;结构体元素的访问 &#xff08;4&#xff09;结构体的存储: 内存对齐: char 按照1字节对齐 …

C# OpenCvSharp DNN Gaze Estimation 视线估计

目录 介绍 效果 模型信息 项目 代码 frmMain.cs GazeEstimation.cs 下载 C# OpenCvSharp DNN Gaze Estimation 介绍 训练源码地址&#xff1a;https://github.com/deepinsight/insightface/tree/master/reconstruction/gaze 效果 模型信息 Inputs ----------------…

医院安全(不良)事件报告系统源码,不良事件处理的全过程管理,实现11大类不良事件类型的报告上报、流转审批、跟踪改进及统计分析功能。

医院安全&#xff08;不良&#xff09;事件报告系统源码&#xff0c;不良事件上报系统源码&#xff0c;PHP源码 医院安全&#xff08;不良&#xff09;事件报告系统提供11大类不良事件的上报、事件审核处理、时间按分析、事件跟踪与持续改进&#xff0c;事件提醒、权限控制、外…

聊聊DoIP吧

DoIP是啥? DoIP代表"Diagnostic over Internet Protocol",即互联网诊断协议。它是一种用于在车辆诊断中进行通信的网络协议。DoIP的目标是在现代汽车中实现高效的诊断和通信。通过使用互联网协议(IP)作为通信基础,DoIP使得诊断信息能够通过网络进行传输,从而提…

React通用后台模板

一. 项目初始化 1. 创建项目 环境 npm init vite 打开package.json,参考以下各模块版本: "dependencies": { "react": "^18.2.0", "react-dom": "^18.2.0", "react-redux": "^7.2.8", …

Security ❀ TCP异常报文详解

文章目录 1. TCP Out-Of-Order2. TCP Previous Segment Lost3. TCP Retransmission4. TCP Dup Ack XXX#X5. TCP Windows Update6. TCP Previous segment not captured7. 异常案例分析 TCP协议中seq和ack seq的联系&#xff1a; id4的http请求报文由客户端发向服务器&#xff0…

sqli-labs-master靶场训练笔记(1-22|新手村)

2024.1.21 level-1 &#xff08;单引号装饰&#xff09; 先根据提示建立一个get请求 在尝试使用单个单引号测试&#xff0c;成功发现语句未闭合报错 然后反手一个 order by 得到数据库共3列&#xff0c;-- 后面加字母防止浏览器吃掉 -- 操作&#xff08;有些会&#xff09…

maven helper 解决jar包冲突方法

一 概要说明 1.1 说明 首先&#xff0c;解决idea中jar包冲突&#xff0c;使用maven的插件&#xff1a;maven helper插件&#xff0c;它能够给我们罗列出来同一个jar包的不同版本&#xff0c;以及他们的来源&#xff0c;但是对不同jar包中同名的类没有办法。 1.2 依赖顺序 …

Spring | Spring的“数据库开发“ (Srping JDBC)

目录&#xff1a; Spring JDBC1.Spring JDBC的核心类 ( JdbcTemplate类 )2.Srping JDBC 的配置3.JdbcTemplate类的“常用方法”execute( )&#xff1a;直接执行“sql语句”&#xff0c;没有返回值update( ) &#xff1a;“增删改”&#xff0c;返回 “影响的行数”query( ) : “…

双链表的基本知识以及增删查改的实现

满怀热忱&#xff0c;前往梦的彼岸 前言 之前我们对单链表进行了非常细致的剖析&#xff0c;现在我们所面临的则是与之相对应的双链表&#xff0c;我会先告诉诸位它的基本知识&#xff0c;再接着把它的增删查改讲一下&#xff0c;ok&#xff0c;正文开始。 一.链表的种类 我…

其他发现:开源数据可视化分析工具DataEase介绍文档

一、 简介 DataEase 是开源的数据可视化分析工具&#xff0c;帮助用户快速分析数据并洞察业务趋势&#xff0c;从而实现业务的改进与优化。DataEase 支持丰富的数据源连接&#xff0c;能够通过拖拉拽方式快速制作图表&#xff0c;并可以方便地与他人分享。 二、 优势 1、 开…

STM32学习笔记二——STM32时钟源时钟树

目录 STM32芯片内部系统架构详细讲解&#xff1a; 1.芯片内部混乱电信号解决方案&#xff1a; 2.时钟树&#xff1a; 1.内部RC振荡器与外部晶振的选择 2. STM32 时钟源 3.STM32中几个与时钟相关的概念 4.时钟输出的使能及其流程 5.时钟设置的基本流程 时钟源——单片机…

Java多线程--同步机制解决线程安全问题方式二:同步方法

文章目录 一、同步方法&#xff08;1&#xff09;同步方法--案例11、案例12、案例1之同步监视器 &#xff08;2&#xff09;同步方法--案例21、案例2之同步监视器的问题2、案例2的补充说明 二、代码及重要说明&#xff08;1&#xff09;代码&#xff08;2&#xff09;重要说明 …

基于yolov2深度学习网络的视频手部检测算法matlab仿真

目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 5.算法完整程序工程 1.算法运行效果图预览 输入mp4格式的视频文件进行测试&#xff0c;视频格式为1080p30. 2.算法运行软件版本 matlab2022a 3.部分核心程序 ..........................…

Linux第40步_移植ST公司的uboot

一、查看ST公司的uboot源码包 ST公司的uboot源码包在虚拟机中的路径&#xff1a; “/home/zgq/linux/atk-mp1/stm32mp1-openstlinux-5.4-dunfell-mp1-20-06-24/sources/arm-ostl-linux-gnueabi/u-boot-stm32mp-2020.01-r0”&#xff1b; “u-boot-stm32mp-2020.01-r0”就是S…