Zookeeper 官方示例2-SyncPrimitive 代码解读(二)

测试命令
java jar .\ZookeeperDemo-0.0.1-SNAPSHOT.jar bTest 192.168.206.100:2181 2

1. Barrier(阻塞原语)

1.1 概念

[!quote] A barrier is a primitive that enables a group of processes to synchronize the beginning and the end of a computation. The general idea of this implementation is to have a barrier node that serves the purpose of being a parent for individual process nodes. Suppose that we call the barrier node "/b1". Each process "p" then creates a node "/b1/p". Once enough processes have created their corresponding nodes, joined processes can start the computation.

  • 阻塞是一个原语,它使一组进程能够同时开始计算。此实现的总体思想是拥有一个屏障节点,用于作为单个流程节点的父节点。
  • 假设我们将障碍节点称为“/b1”。然后,每个进程“ p”创建一个节点“/b1/p”。一旦有足够多的进程创建了相应的节点,联合进程就可以开始计算了。
  • 场景:当有些操作需要所有参与者全部准备好之后才能开始执行,并且对每个参与者来说必须等待所有参与者全部执行完毕,才算执行完毕。于是就需要一个屏障,来控制所有参与者同时开始,并等待所有参与者全部结束。

1.2 设计

  • 创建一个/b1的znode的持久化节点。
  • enter() 模拟往阻塞里增加执行进程(Join barrier)。往znode下增加子节点,并判断子节点数是否满足指定的个数n。若未满足条件则继续等待;反之则返回true。
  • leave() 模拟进程执行完毕后的离开(Wait until all reach barrier)。删除znode的子节点,并判断子节点是否大于0,若大于0则表示还有子进程没有执行完。

源码:

package com.agileluo.zookeeperdemo.barriers;  
import java.io.IOException;  
import java.net.InetAddress;  
import java.net.UnknownHostException;  
import java.nio.ByteBuffer;  
import java.util.List;  
import java.util.Random;  
import java.lang.Integer;  
import org.apache.commons.lang3.RandomStringUtils;  
import org.apache.zookeeper.CreateMode;  
import org.apache.zookeeper.KeeperException;  
import org.apache.zookeeper.WatchedEvent;  
import org.apache.zookeeper.Watcher;  
import org.apache.zookeeper.ZooKeeper;  
import org.apache.zookeeper.ZooDefs.Ids;  
import org.apache.zookeeper.data.Stat;  /**  * 1. Queue test * 1.1 Start a producer to create 100 elements *    java SyncPrimitive qTest localhost 100 p * 1.2 Start a consumer to consume 100 elements *    java SyncPrimitive qTest localhost 100 c * * 2.Barrier test * Start a barrier with 2 participants (start as many times as many participants you'd like to enter) *    java SyncPrimitive bTest localhost 2 */public class SyncPrimitive implements Watcher {  static ZooKeeper zk = null;  static Integer mutex;  String root;  static{  System.setProperty("zookeeper.sasl.client", "false");  }  SyncPrimitive(String address) {  if(zk == null){  try {  System.out.println("Starting ZK:");  zk = new ZooKeeper(address, 3000, this);  mutex = Integer.parseInt("-1");  System.out.println("Finished starting ZK: " + zk);  } catch (IOException e) {  System.out.println(e.toString());  zk = null;  }  }  //else mutex = new Integer(-1);  }  synchronized public void process(WatchedEvent event) {  synchronized (mutex) {  //System.out.println("Process: " + event.getType());  mutex.notify();  }  }  /**  * Barrier(阻塞原语)  *  A barrier is a primitive that enables a group of processes to synchronize the beginning and the end of a computation. The general idea of this implementation is to  *  have a barrier node that serves the purpose of being a parent for individual process nodes. Suppose that we call the barrier node &quot;/b1&quot;. Each process  *  &quot;p&quot; then creates a node &quot;/b1/p&quot;. Once enough processes have created their corresponding nodes, joined processes can start the computation.  *  阻塞是一个原语,它使一组进程能够同时开始计算。此实现的总体思想是拥有一个屏障节点,用于作为单个流程节点的父节点。  *  假设我们将障碍节点称为“/b1”。然后,每个进程“ p”创建一个节点“/b1/p”。一旦有足够多的进程创建了相应的节点,联合进程就可以开始计算了。  *  场景:当有些操作需要所有参与者全部准备好之后才能开始执行,并且对每个参与者来说必须等待所有参与者全部执行完毕,才算执行完毕。于是就需要一个屏障,来控制所有参与者同时开始,并等待所有参与者全部结束。  */  static public class Barrier extends SyncPrimitive {  //需要并行等待的子进程个数  int size;  /**  *  本参与者对应的子节点path  */        String name;  /**  * Barrier constructor         *         * @param address  * @param root  * @param size  */  Barrier(String address, String root, int size) {  super(address);  this.root = root;  this.size = size;  // Create barrier node(障碍节点必须是持久节点 CreateMode.PERSISTENT)  if (zk != null) {  try {  Stat s = zk.exists(root, false);  if (s == null) { // 如果根节点不存在,则创建  /**  *  zk.create(String path, byte[] data, List<ACL> acl, CreateMode createMode)                         *  第1个参数: barrier节点的path  *  第2个参数: barrier节点的data  *  第3个参数: barrier节点的权限  *  第4个参数: barrier 节点的类型,持久节点 CreateMode.PERSISTENT,子节点必须是临时节点。  */  zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,  CreateMode.PERSISTENT);  }  } catch (KeeperException e) {  System.out  .println("Keeper exception when instantiating queue: "  + e.toString());  } catch (InterruptedException e) {  System.out.println("Interrupted exception");  }  }  // My node name  try {  name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString()+ ":"+ RandomStringUtils.randomAlphabetic(4));  } catch (UnknownHostException e) {  System.out.println(e.toString());  }  }  /**  * Join barrier         *         * @return         * @throws KeeperException  * @throws InterruptedException  */  boolean enter() throws KeeperException, InterruptedException{  zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,  CreateMode.EPHEMERAL); // EPHEMERAL 临时节点  while (true) {  synchronized (mutex) {  List<String> list = zk.getChildren(root, true);  if (list.size() < size) { //判断当前根下子节点的数量,若数量小于设定的进程数,则等待。  mutex.wait();  } else {  return true;  }  }  }  }  /**  * Wait until all reach barrier         *         * @return         * @throws KeeperException  * @throws InterruptedException  */  boolean leave() throws KeeperException, InterruptedException{  zk.delete(root + "/" + name, 0); //模拟进程完成任务,删除子节点。  while (true) {  synchronized (mutex) {  List<String> list = zk.getChildren(root, true);  if (list.size() > 0) { //只要还存在子节点,就说明还有任务没有完成。  mutex.wait();  } else {  return true;  }  }  }  }  }  /**  * Producer-Consumer queue     */    static public class Queue extends SyncPrimitive {  /**  * Constructor of producer-consumer queue         *         * @param address  * @param name  */  Queue(String address, String name) {  super(address);  this.root = name;  // Create ZK node name  if (zk != null) {  try {  Stat s = zk.exists(root, false);  if (s == null) {  zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,  CreateMode.PERSISTENT);  }  } catch (KeeperException e) {  System.out  .println("Keeper exception when instantiating queue: "  + e.toString());  } catch (InterruptedException e) {  System.out.println("Interrupted exception");  }  }  }  /**  * Add element to the queue.         *         * @param i  * @return  */  boolean produce(int i) throws KeeperException, InterruptedException{  ByteBuffer b = ByteBuffer.allocate(4);  byte[] value;  // Add child with value i  b.putInt(i);  value = b.array();  zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE,  CreateMode.PERSISTENT_SEQUENTIAL);  return true;  }  /**  * Remove first element from the queue.         *         * @return         * @throws KeeperException  * @throws InterruptedException  */  int consume() throws KeeperException, InterruptedException{  int retvalue = -1;  Stat stat = null;  // Get the first element available  while (true) {  synchronized (mutex) {  List<String> list = zk.getChildren(root, true);  if (list.size() == 0) {  System.out.println("Going to wait");  mutex.wait();  } else {  Integer min = Integer.parseInt((list.get(0).substring(7)));  String minNode = list.get(0);  for(String s : list){  Integer tempValue = Integer.parseInt(s.substring(7));  //System.out.println("Temporary value: " + tempValue);  if(tempValue < min) {  min = tempValue;  minNode = s;  }  }  System.out.println("Temporary value: " + root + "/" + minNode);  byte[] b = zk.getData(root + "/" + minNode,  false, stat);  zk.delete(root + "/" + minNode, 0);  ByteBuffer buffer = ByteBuffer.wrap(b);  retvalue = buffer.getInt();  return retvalue;  }  }  }  }  }  public static void main(String args[]) {  if (args[0].equals("qTest"))  queueTest(args);  else  barrierTest(args);  }  public static void queueTest(String args[]) {  Queue q = new Queue(args[1], "/app1");  System.out.println("Input: " + args[1]);  int i;  Integer max = Integer.parseInt(args[2]+"");  if (args[3].equals("p")) {  System.out.println("Producer");  for (i = 0; i < max; i++)  try{  q.produce(10 + i);  } catch (KeeperException e){  } catch (InterruptedException e){  }  } else {  System.out.println("Consumer");  for (i = 0; i < max; i++) {  try{  int r = q.consume();  System.out.println("Item: " + r);  } catch (KeeperException e){  i--;  } catch (InterruptedException e){  }  }  }  }  public static void barrierTest(String args[]) {  Barrier b = new Barrier(args[1], "/b1", Integer.parseInt(args[2]+""));  try{  boolean flag = b.enter();  System.out.println("Entered barrier: " + args[2]);  if(!flag) System.out.println("Error when entering the barrier");  } catch (KeeperException e){  } catch (InterruptedException e){  }  // Generate random integer  Random rand = new Random();  int r = rand.nextInt(100);  // Loop for rand iterations  for (int i = 0; i < r; i++) {  try {  Thread.sleep(100);  } catch (InterruptedException e) {  }  }  try{  b.leave();  } catch (KeeperException e){  } catch (InterruptedException e){  }  System.out.println("Left barrier");  }  
}

1.3 测试步骤

  • 第1步,打包 ZookeeperDemo-0.0.1-SNAPSHOT.jar
<build>  <plugins>  <plugin>  <groupId>org.apache.maven.plugins</groupId>  <artifactId>maven-jar-plugin</artifactId>  <configuration>  <archive>  <manifest>  <addClasspath>true</addClasspath>  <mainClass>com.xx.zookeeperdemo.barriers.SyncPrimitive</mainClass> </manifest>  </archive>  </configuration>  </plugin>  </plugins>  
</build>
  • 第2步,jar包目录下打开命令窗口,并执行 java -jar .\ZookeeperDemo-0.0.1-SNAPSHOT.jar bTest 192.168.206.100:2181 3
    控制台输出:

执行后,查看zookeeper的znode情况:

  • 第3步,复制第2步操作,模拟启动第2个进程
    执行后,查看zookeeper的znode情况:

  • 第4步,复制第2步操作,模拟启动第3个进程
    执行后,第1个控制台输出:

第2个控制台输出:

第3个控制台输出:

然后所有进程在随机的整数时间后输出 Left barrier

查看zookeeper的znode情况: 所有子进程创建的临时子节点都已delete

1.4 结果

能实现多个进程之间的并行协同。

1.5 注意事项

  • 为了方便在同一台IP上模拟不同的进程,在官方提供的代码基础上增加了4位长度的随机字符串。

// 官方示例:
name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString());// 新增后的示例
name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString()+ ":"+ RandomStringUtils.randomAlphabetic(4));
  • 关闭SASL安全验证
static{  System.setProperty("zookeeper.sasl.client", "false");  
}

2. 队列

2.1 概念

模拟向同一队列生产/消费消息。

2.2 设计

生产消息: 往znode新增子节点。
消费消息: 往znode中取first子节点,然后删除子节点。

2.3 源码

/**  * Producer-Consumer queue */static public class Queue extends SyncPrimitive {  /**  * Constructor of producer-consumer queue     *     * @param address  * @param name  */  Queue(String address, String name) {  super(address);  this.root = name;  // Create ZK node name  if (zk != null) {  try {  Stat s = zk.exists(root, false);  if (s == null) {  zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,  CreateMode.PERSISTENT);  }  } catch (KeeperException e) {  System.out  .println("Keeper exception when instantiating queue: "  + e.toString());  } catch (InterruptedException e) {  System.out.println("Interrupted exception");  }  }  }  /**  * Add element to the queue.     *     * @param i  * @return  */  boolean produce(int i) throws KeeperException, InterruptedException{  ByteBuffer b = ByteBuffer.allocate(4);  byte[] value;  // Add child with value i  b.putInt(i);  value = b.array();  zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE,  CreateMode.PERSISTENT_SEQUENTIAL);  return true;  }  /**  * Remove first element from the queue.     *     * @return     * @throws KeeperException  * @throws InterruptedException  */  int consume() throws KeeperException, InterruptedException{  int retvalue = -1;  Stat stat = null;  // Get the first element available  while (true) {  synchronized (mutex) {  List<String> list = zk.getChildren(root, true);  if (list.size() == 0) {  System.out.println("Going to wait");  mutex.wait();  } else {  Integer min = Integer.parseInt((list.get(0).substring(7)));  String minNode = list.get(0);  for(String s : list){  Integer tempValue = Integer.parseInt(s.substring(7));  //System.out.println("Temporary value: " + tempValue);  if(tempValue < min) {  min = tempValue;  minNode = s;  }  }  System.out.println("Temporary value: " + root + "/" + minNode);  byte[] b = zk.getData(root + "/" + minNode,  false, stat);  zk.delete(root + "/" + minNode, 0);  ByteBuffer buffer = ByteBuffer.wrap(b);  retvalue = buffer.getInt();  return retvalue;  }  }  }  }  
}

2.4 测试

生产消息: java SyncPrimitive qTest 192.168.206.100:2181 100 p
消费消息: java SyncPrimitive qTest 192.168.206.100:2181 100 c

2.5 结论

借助zookeeper实现消息队列的模拟。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/51912.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vulhub xxe靶机

先用御剑扫描出ip然后进入网页 进入robots.txt里面会发现俩个目录然后我们进去xxe里面 进入xxe页面进行登录&#xff0c;burp抓包 然后进入重放器 可以看到关于密码和用户名的是xml,那么就可以考虑用xxe注入 <?xml version"1.0" ?> <!DOCTYPE r [ <!…

监视SQL Server 内存使用量

文章目录 I 监视SQL Server 内存使用量确定页生存期确定当前的 SQL Server 内存利用率有关当前分配内存的信息II 等待资源池 %ls (%ld)中的内存资源来执行该查询时发生超时。 请重新运行查询。原因查看服务器级别设置的超时值资源信号灯 DMV sys.dm_exec_query_resource_semaph…

IP网络广播系统(IP网络广播系统是什么及它的优势与应用)

一、引言 在当今数字化的时代&#xff0c;音频传播技术也在不断革新。IP网络广播系统作为一种先进的音频传输解决方案&#xff0c;正逐渐在各个领域发挥重要作用。那么&#xff0c;究竟什么是IP网络广播系统呢&#xff1f;它又有着怎样独特的优势和广泛的应用呢&#xff1f;本…

深度学习(二)-损失函数+梯度下降

损失函数 损失函数&#xff08;Loss Function&#xff09;&#xff0c;也有称之为代价函数&#xff08;Cost Function&#xff09;&#xff0c;用来度量预测值和实际值之间的差异。 损失函数的作用 度量决策函数f&#xff08;x&#xff09;和实际值之间的差异。 作为模型性能…

检测CSRF漏洞的工具

免责声明此文档仅限于学习讨论与技术知识的分享&#xff0c;不得违反当地国家的法律法规。对于传播、利用文章中提供的信息而造成的任何直接或者间接的后果及损失&#xff0c;均由使用者本人负责&#xff0c;本文作者不为此承担任何责任&#xff0c;一旦造成后果请自行承担&…

二百五十九、Java——采集Kafka数据,解析成一条条数据,写入另一Kafka中(一般JSON)

一、目的 由于部分数据类型频率为1s&#xff0c;从而数据规模特别大&#xff0c;因此完整的JSON放在Hive中解析起来&#xff0c;尤其是在单机环境下&#xff0c;效率特别慢&#xff0c;无法满足业务需求。 而Flume的拦截器并不能很好的转换数据&#xff0c;因为只能采用Java方…

javascript数据结构与算法-- 二叉树

javascript数据结构与算法-- 二叉树 树是计算机科学中经常用到的一种数据结构。树是一种非线性的数据结构&#xff0c;以分成的方式存储数据&#xff0c;树被用来存储具有层级关系的数据&#xff0c;比如文件系统的文件&#xff0c;树还被用来存储有序列表。我们要研究的是二叉…

IObit Uninstaller Pro v13.6.0.5 绿色便携免安装版本 下载

功能非常强大好用的软件卸载清理工具 下载地址(资源制作整理不易&#xff0c;下载使用需付费&#xff0c;不能接受请勿浪费时间下载) 链接&#xff1a;https://pan.baidu.com/s/1I7lbixooii9ezSrp3X-y-w?pwd716l 提取码&#xff1a;716l

d3dcompiler_47.dll缺失的可能原因多种多样,那么d3dcompiler_47.dll缺失怎么修复

在数字世界的深处&#xff0c;d3dcompiler_47.dll文件扮演着至关重要的角色&#xff0c;它是Direct3D编译器的一部分&#xff0c;负责处理图形渲染和游戏运行中的关键任务。然而&#xff0c;当用户启动某个程序或游戏时&#xff0c;屏幕上突然弹出的错误提示“d3dcompiler_47.d…

苹果手机勿扰模式怎么关闭?4个方法快速关闭!

我们为了提升做事的效率以及保障休息的质量&#xff0c;在认真工作和学习&#xff0c;或者是晚上休息的时候&#xff0c;通常会打开苹果手机的勿扰模式。但当我们需要恢复苹果手机的消息通知时&#xff0c;苹果手机勿扰模式怎么关闭呢&#xff1f;今天&#xff0c;小编整理了4个…

机械学习—零基础学习日志(概率论总笔记2)

正态分布 高斯分布也叫做正态分布。假定事件A经过n次试验后发生了k次&#xff0c;把k的概率分布图画一下&#xff0c;就得到了一个中间鼓起&#xff0c;像倒扣的钟一样的对称图形。 18世纪&#xff0c;数学家棣莫弗和拉普拉斯把这种中间大&#xff0c;两头小的分布称为正态分布…

厨师帽佩戴识别摄像机

厨师帽佩戴识别摄像机 是一种用于识别厨师是否佩戴帽子的智能设备&#xff0c;其作用在于强制执行食品安全卫生标准&#xff0c;防止头发掉落入食物中。该摄像机利用人工智能和图像识别技术&#xff0c;能够识别厨师是否佩戴厨师帽。当摄像机检测到厨师未佩戴帽子时&#xff0c…

微信小程序中Towxml解析Markdown及html

一、Towxml Towxml 是一个让小程序可以解析Markdown、HTML的解析库。 二、引入 2.1 clone代码 git clone https://github.com/sbfkcel/towxml.git2.2 安装依赖 npm install2.3 打包 npm run build2.4 引入文件 将dist文件复制到微信小程序根目录&#xff0c;改名为towx…

Flutter中的Key

在Flutter 中&#xff0c;Key 是 几乎所有 widget 都具有的属性。为什么 widget 具有 Key 呢&#xff1f;Key的作用是什么&#xff1f; 什么是 Key Key是Widget、Element 和 SemanticNodes 的标识符。 Key 是Widget、Element 和 SemanticNodes的唯一标识。例如对于 Widget 在 …

数据结构之 “单链表“

&#xff08;1&#xff09;在顺表表中&#xff0c;如果是头插/删的时间复杂度是O(1)&#xff1b;尾插/删的时间复杂度是O(N) &#xff08;2&#xff09;增容一般是呈2倍的增长&#xff0c;势必会有一定的空间浪费。比如&#xff1a;申请了50个空间&#xff0c;只用了两个&#…

Type-C接口诱骗取电快充方案

Type-C XSP08Q 快充协议芯片是一种新型电源管理芯片&#xff0c;主要负责控制充电电流和电压等相关参数&#xff0c;从而实现快速充电功能。Type-C XSP08Q快充协议是在Type-C接口基础上&#xff0c;加入了XSP08Q协议芯片的支持&#xff0c;很大程度上提升了充电速度。 正常情况…

Linux——性能调优工具一览

一、CPU 1.调优工具 根据指标找工具 性能指标工具说明 平均负载 uptime、top uptime最简单、top提供了更全的指标 系统整体CPU使用率 vmstat、mpstat、top、sar、/proc/stat top、vmstat、mpstat只可以动态查看&#xff0c;而sar还可以记录历史数据 /proc/stat是其他性…

UE引擎内置插件信息 储存的位置

.uproject。图标文件可以让UE 引擎内置插件&#xff0c;配置更改,比如我希望我的DataSmithImporter插件是启用的。

STM32 ADC采样详解

Content 0x00 前言0x01 ADC配置0x02 滤波处理 0x00 前言 在单片机开发过程中&#xff0c;常常涉及到ADC的使用&#xff0c;市面上大部分便宜的传感器都是采用的ADC来获取其数据&#xff0c;如MQ-2 烟雾传感器、光敏传感器等等。 此类传感器工作原理为根据所采集到的数据变化…

大模型入门 ch01:大模型概述

本文是github上的大模型教程LLMs-from-scratch的学习笔记&#xff0c;教程地址&#xff1a;教程链接 STAGE 1&#xff1a; BUILDING 1. 数据准备与采样 LLM的预测过程&#xff0c;是一个不断预测下一个词&#xff08;准确的说是token&#xff09;的过程&#xff0c;每次根据输…