zookeeper应用之分布式队列

队列这种数据结构都不陌生,特点就是先进先出。有很多常用的消息中间件可以有现成的该部分功能,这里使用zookeeper基于发布订阅模式来实现分布式队列。对应的会有一个生产者和一个消费者。

这里理论上还是使用顺序节点。生产者不断产生新的顺序子节点,消费者watcher监听节点新增事件来消费消息。

生产者:

CuratorFramework client = ...
client.start();
String path = "/testqueue";
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(path,"11".getBytes())

消费者:

CuratorFramework client = ...
client.start();
String path = "/testqueue";
PathChildrenCache pathCache = new PathChildrenCache(client,path,true);
pathCache.getListenable().addListener(new PathChildrenCacheListener() {@Overridepublic void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {if(event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED){ChildData data = event.getData();//handle msgclient.delete().forPath(data.getPath());}}
});
pathCache.start();

使用curator queue:

先来使用基本的队列类DistributedQueue。

DistributedQueue的初始化需要提交准备几个参数:

client连接就不多说了:

CuratorFramework client = ...

QueueSerializer:这个主要是用来指定对消息data进行序列化和反序列化

这里就搞一个简单的字符串类型:

QueueSerializer<String> serializer = new QueueSerializer<String>() {@Overridepublic byte[] serialize(String item) {return item.getBytes();}@Overridepublic String deserialize(byte[] bytes) {return new String(bytes);}
};

QueueConsumer消息consumer,当有新消息来的时候会调用consumer.consumeMessage()来处理消息

这里也搞个简单的string类型的处理consumer

QueueConsumer<String> consumer = new QueueConsumer<String>() {@Overridepublic void consumeMessage(String s) throws Exception {System.out.println("receive msg:"+s);}@Overridepublic void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {//TODO}
};

队列消息发布:

//队列节点路径
String queuePath = "/queue";
//使用上面准备的几个参数构造DistributedQueue对象
DistributedQueue<String> queue =  QueueBuilder.builder(client,consumer,serializer,queuePath).buildQueue();
queue.start();
//调用put方法生产消息
queue.put("hello");
queue.put("msg");
Thread.sleep(2000);
queue.put("3");

这样在启动测试程序在,consumer的consumeMessage方法就会收到queue.put的消息。

这里有个问题有没有发现,在初始化queue的时候需要指定consumer,那岂不是只能同一个程序中生产消费,何来的分布式?

其实这里在queue对象创建的时候consumer可以为null,这个时候queue就只生产消息。具体的逻辑需要看下DistributedQueue类的源码。

在DistributedQueue类的构造函数有一步设置isProducerOnly属性

isProducerOnly = (consumer == null);

然后在start()方法会根据isProducerOnly来判断启动方式

if ( !isProducerOnly || (maxItems != QueueBuilder.NOT_SET) )
{childrenCache.start();
}if ( !isProducerOnly )
{service.submit(new Callable<Object>(){@Overridepublic Object call(){runLoop();return null;}});
}

这里看到consumer为空,两个if不成立,不会初始化对那个的消息消费逻辑wather监听。只需要在另一个程序里创建queue启动时指定consumer即可。

源码分析

先从消息的发布也就是put方法

首先调用makeItemPath()获取创建节点路径:

ZKPaths.makePath(queuePath, QUEUE_ITEM_NAME);

这里QUEUE_ITEM_NAME=“queue-”。

然后调用internalPut()方法来创建节点路径

//先累加消息数量putCount
putCount.incrementAndGet();
//使用serializer序列化消息数据
byte[]              bytes = ItemSerializer.serialize(multiItem, serializer);
//根据background来创建节点
if ( putInBackground )
{doPutInBackground(item, path, givenMultiItem, bytes);
}
else
{doPutInForeground(item, path, givenMultiItem, bytes);
}

看doPutInForeground里就是具体的创建节点了

//创建节点
client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(path, bytes);
//哦,错了这里putCount不是总消息数,是正在创建消息数,创建完再回减
synchronized(putCount)
{putCount.decrementAndGet();putCount.notifyAll();
}//如果有对应的lisener依次调用
putListenerContainer.forEach(listener -> {if ( item != null ){listener.putCompleted(item);}else{listener.putMultiCompleted(givenMultiItem);}
});

消息的发布就完成了。

然后是消息的consumer,这里肯定是使用的watcher。这里还是回到前面start方法处根据isProducerOnly属性判断有两步操作:

1、childrenCache.start();

childrenCache初始化是在queue的构造函数里

childrenCache = new ChildrenCache(client, queuePath)

其start方法会调用

private final CuratorWatcher watcher = new CuratorWatcher()
{@Overridepublic void process(WatchedEvent event) throws Exception{if ( !isClosed.get() ){sync(true);}}
};private final BackgroundCallback  callback = new BackgroundCallback(){@Overridepublic void processResult(CuratorFramework client, CuratorEvent event) throws Exception{if ( event.getResultCode() == KeeperException.Code.OK.intValue() ){setNewChildren(event.getChildren());}}};void start() throws Exception{sync(true);}private synchronized void sync(boolean watched) throws Exception{if ( watched ){//走这里client.getChildren().usingWatcher(watcher).inBackground(callback).forPath(path);}else{client.getChildren().inBackground(callback).forPath(path);}}

这里先把代码都贴上,看到内部定义了一个watcher和callback。这里inBackground就是watcher到事件使用callback进行处理,最后是调用到setNewChildren方法

private synchronized void setNewChildren(List<String> newChildren)
{if ( newChildren != null ){Data currentData = children.get();//将数据设置到children变量里,消息版本+1children.set(new Data(newChildren, currentData.version + 1));//notifyAll() 等待线程获取消息notifyFromCallback();}
}

这里有引入了一个children变量,然后将数据设置到了该变量里。

private final AtomicReference<Data> children = new AtomicReference<Data>(new Data(Lists.<String>newArrayList(), 0));

children其实是线程间通信一个共享数据容器变量。这里设置了数据,然后具体的数据消费在下一步。

2、线程池里丢了个任务去执行runLoop();方法。

回到DistributedQueue.start的第二步,执行runLoop()方法,看名字就应该知道了一直轮询获取消息。

还是来看代码吧

private void runLoop()
{long         currentVersion = -1;long         maxWaitMs = -1;//while一直轮询while ( state.get() == State.STARTED  ){try{//从childrenCache里获取数据ChildrenCache.Data      data = (maxWaitMs > 0) ? childrenCache.blockingNextGetData(currentVersion, maxWaitMs, TimeUnit.MILLISECONDS) : childrenCache.blockingNextGetData(currentVersion);currentVersion = data.version;List<String>        children = Lists.newArrayList(data.children);sortChildren(children); // makes sure items are processed in the correct orderif ( children.size() > 0 ){maxWaitMs = getDelay(children.get(0));if ( maxWaitMs > 0 ){continue;}}else{continue;}/**处理数据 这里取出消息后会删除节点,然后使用serializer反序列化节点数据,调用consumer.consumeMessage来处理消息**/processChildren(children, currentVersion);}}}
}

这里获取数据使用了childrenCache.blockingNextGetData

synchronized Data blockingNextGetData(long startVersion, long maxWait, TimeUnit unit) throws InterruptedException
{long            startMs = System.currentTimeMillis();boolean         hasMaxWait = (unit != null);long            maxWaitMs = hasMaxWait ? unit.toMillis(maxWait) : -1;//数据版本没变一直wait等待while ( startVersion == children.get().version ){if ( hasMaxWait ){long        elapsedMs = System.currentTimeMillis() - startMs;long        thisWaitMs = maxWaitMs - elapsedMs;if ( thisWaitMs <= 0 ){break;}wait(thisWaitMs);}else{wait();}}return children.get();
}

这里就有wait阻塞等消息,当消息来时候会被唤醒。

其它类型队列:

curator对优先队列(DistributedPriorityQueue)、延迟队列(DistributedDelayQueue)都有对应的实现,有兴趣的自己看吧。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/154900.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java修仙记之记录一次与前端女修士论道的经历

文章开始之前&#xff0c;想跟我念一句&#xff1a;福生无量天尊&#xff0c;无量寿佛&#xff0c;阿弥陀佛 第一场论道&#xff1a;id更新之争 一个天气明朗的下午&#xff0c;前端的小美女长发姐告诉我&#xff1a;嘿&#xff0c;小后端&#xff0c;你的代码报错了 我答道&am…

SpringBoot 全局请求参数转驼峰、响应参数转换为下划线

文章目录 前言请求参数将下划线转换为驼峰响应参数将驼峰转换为下划线方式一 使用Jackson方式处理方式二 在配置文件中修改jackson.default-property-inclusion 说明jackson.property-naming-strategy 说明前言 在开发SpringBoot项目时,我们经常需要处理参数的命名规范。有时…

springboot -sse -flux 服务器推送消息

先说BUG处理&#xff0c;遇到提示异步问题 Async support must be enabled on a servlet and for all filters involved in async request processing. This is done in Java code using the Servlet API or by adding "<async-supported>true</async-supported&…

如何保障亚马逊多账户的安全,防止关联?

在亚马逊平台上拥有多个账户可以扩大销售渠道&#xff0c;但同时也需要注意账户的安全&#xff0c;以防止被关联。本文将介绍一些重要的措施&#xff0c;帮助您保护亚马逊多账户的安全&#xff0c;预防账号关联。 一、亚马逊关联是什么&#xff1f; 在亚马逊平台上&#xff0…

单节点服务架构

单节点的服务架构&#xff1a; LNMP l:lilnux系统 n:nginx静态页面&#xff0c;转发动态请求 m:mysql数据库&#xff0c;后端服务器&#xff0c;保存用户和密码信息&#xff0c;以及论坛的信息 p:PHP&#xff0c;处理动态请求&#xff0c;动态请求转发数据库&#xff0c;然…

3PC(三阶段提交)

三阶段提交 3PC&#xff08;Three-Phase Commit&#xff09;是一种分布式系统中用于实现事务一致性的协议&#xff0c;它是在2PC&#xff08;Two-Phase Commit&#xff09;的基础上发展而来&#xff0c;旨在解决2PC的一些缺点。与2PC的两个阶段&#xff08;准备和提交&#xf…

iptables的一次修复日志

iptables的一次修复日志 搭建配置wireguard后&#xff0c;使用内网连接设备十分方便&#xff0c;我采用的是星型连接&#xff0c;即每个节点都连接到中心节点&#xff0c;但是突然发生了重启wg后中心节点不转发流量的问题&#xff0c;即每个接入的节点只能与中心节点连接&…

M2 Mac Xcode编译报错 ‘***.framework/‘ for architecture arm64

In /Users/fly/Project/Pods/YYKit/Vendor/WebP.framework/WebP(anim_decode.o), building for iOS Simulator, but linking in object file built for iOS, file /Users/fly/Project/Pods/YYKit/Vendor/WebP.framework/WebP for architecture arm64 这是我当时编译模拟器时报…

Mars3d-vue最简项目模板集成使用Mars3d的UI控件样板

备注说明&#xff1a; 1.小白可看步骤一二&#xff0c;进阶小白可直接看步骤三 步骤一&#xff1a;新建文件夹<uitest>&#xff0c;在mars3d仓库拉一份最简项目模板&#xff1a; git clone mars3d-vue-template: Vue3.x 技术栈下的Mars3D项目模板 步骤二&#xff1a;运…

java: 无法访问org.mybatis.spring.annotation.MapperScan

java: 无法访问org.mybatis.spring.annotation.MapperScan错误的类文件: /E:/maven/repository/org/mybatis/mybatis-spring/3.0.1/mybatis-spring-3.0.1.jar!/org/mybatis/spring/annotation/MapperScan.class类文件具有错误的版本 61.0, 应为 52.0请删除该文件或确保该文件位…

本地部署 EmotiVoice易魔声 多音色提示控制TTS

本地部署 EmotiVoice易魔声 多音色提示控制TTS EmotiVoice易魔声 介绍ChatGLM3 Github 地址部署 EmotiVoice准备模型文件准备预训练模型推理 EmotiVoice易魔声 介绍 EmotiVoice是一个强大的开源TTS引擎&#xff0c;支持中英文双语&#xff0c;包含2000多种不同的音色&#xff…

网站为什么一定要安装SSL证书

随着互联网的普及和发展&#xff0c;网络安全问题日益凸显。在这个信息爆炸的时代&#xff0c;保护用户隐私和数据安全已经成为各大网站和企业的首要任务。而SSL证书作为一种网络安全技术&#xff0c;已经成为网站必备的安全工具。那么&#xff0c;为什么网站一定要安装SSL证书…

electron项目开机自启动

一、效果展示&#xff1a;界面控制是否需要开机自启动 二、代码实现&#xff1a; 1、在渲染进程login.html中&#xff0c;画好界面&#xff0c;默认勾选&#xff1b; <div class"intro">开机自启动 <input type"checkbox" id"checkbox&quo…

C++纯虚函数和抽象类 制作饮品案例(涉及知识点:继承,多态,实例化继承抽象类的子类,多文件实现项目)

一.纯虚函数的由来 在多态中&#xff0c;通常父类中虚函数的实现是毫无意义的&#xff0c;主要都是调用子类重写的内容。例如&#xff1a; #include<iostream>using namespace std;class AbstractCalculator { public:int m_Num1;int m_Num2;virtual int getResult(){r…

PHP手动为第三方类添加composer自动加载

有时候我们要使用的第三方的类库&#xff08;SDK&#xff09;没用用composer封装好&#xff0c;无法用composer进行安装&#xff0c;怎么办呢&#xff1f;&#xff1f;&#xff1f; 步骤如下&#xff1a; 第一步、下载需要的SDK文件包&#xff0c;把它放在vendor目录下 第二步…

SSM高考志愿辅助推荐系统-计算机毕业设计附源码21279

目 录 摘要 1 绪论 1.1 研究背景 1.2研究意义 1.3论文结构与章节安排 2 高考志愿辅助推荐系统分析 2.1 可行性分析 2.1.1 技术可行性分析 2.1.2 经济可行性分析 2.1.3 法律可行性分析 2.2 系统功能分析 2.2.1 功能性分析 2.2.2 非功能性分析 2.3 系统用例分析 2…

竞赛选题 身份证识别系统 - 图像识别 深度学习

文章目录 0 前言1 实现方法1.1 原理1.1.1 字符定位1.1.2 字符识别1.1.3 深度学习算法介绍1.1.4 模型选择 2 算法流程3 部分关键代码 4 效果展示5 最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; 毕业设计 图像识别 深度学习 身份证识别…

java面试-zookeeper

1、什么是zap协议 ZAB 协议总共包含以下两部分内容&#xff1a; ZAB 协议通过两阶段提交的方式来确保分布式系统的一致性。这两阶段分别是&#xff1a;准备阶段和提交阶段。在准备阶段&#xff0c;一个节点&#xff08;称为 Leader&#xff09;向其他节点&#xff08;称为 Fol…

K8S精进之路-控制器Deployment-(1)

在K8S中&#xff0c;最小运行单位为POD,它是一个逻辑概念&#xff0c;其实是一组共享了某些资源的容器组。POD是能运行多个容器的&#xff0c;Pod 里的所有容器&#xff0c;共享的是同一个 Network Namespace&#xff0c;并且可以声明共享同一个 Volume。在POD中能够hold住网络…

Python数据分析实战-爬取以某个关键词搜索的最新的500条新闻的标题和链接(附源码和实现效果)

实现功能 通过百度引擎&#xff0c;爬取以“开源之夏”为搜索关键词最新的500条新闻的标题和链接 实现代码 1.安装所需的库&#xff1a;你需要安装requests和beautifulsoup4库。可以使用以下命令通过pip安装&#xff1a; pip install requests beautifulsoup42.发起搜索请求…