【02】ZooKeeper经典应用场景实战一

1、ZooKeeper Java客户端实战

  • ZooKeeper应用的开发主要通过Java客户端API去连接和操作ZooKeeper集群。可供选择的Java客户端API有:
    • ZooKeeper官方的Java客户端API。
    • 第三方的Java客户端API,比如:Curator
  • ZooKeeper官方的客户端API提供了基本的操作。例如,创建会话、创建节点、读取节点、更新数据、删除节点和检查节点是否存在等。不过,对于实际开发来说,ZooKeeper官方API有一些不足之处,具体如下:
    • ZooKeeper的Watcher监测是一次性的,每次触发之后都需要重新进行注册。
    • 会话超时之后没有实现重连机制。
    • 异常处理烦琐,ZooKeeper提供了很多异常,对于开发人员来说可能根本不知道应该如何处理这些抛出的异常。
    • 仅提供了简单的byte[]数组类型的接口,没有提供Java POJO级别的序列化数据处理接口。
    • 创建节点时如果抛出异常,需要自行检查节点是否存在。
    • 无法实现级联删除。
  • 总之,ZooKeeper官方API功能比较简单,在实际开发过程中比较笨重,一般不推荐使用

1.1 ZooKeeper原生Java客户端

  • <1> 引入ZooKeeper Client的依赖

    • 注意:依赖的版本与服务端的版本最好保持一致,否则会有很多兼容性的问题

      <!-- zookeeper client -->
      <dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.8.0</version>
      </dependency>
      
  • <2> ZooKeeper常用构造器

    • ZooKeeper原生客户端主要使用org.apache.zookeeper.ZooKeeper这个类来使用ZK服务。
      ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
      
      • connectString:是用逗号分隔的ZK服务的列表,每个ZK节点都是 host:port 对,host是机器名称或者IP地址,port 是ZK节点对客户端提供服务的端口号。客户端会任意选取一个服务列表中的节点建立连接。
      • sessionTimeout : session timeout 会话超时时间。
      • watcher:用于监听接收到来自ZooKeeper集群的事件。
    • 使用ZK原生API连接ZK集群:
      public class ZkClientDemo {private static final  String  CONNECT_STR="localhost:2181";private final static  String CLUSTER_CONNECT_STR="192.168.65.156:2181,192.168.65.190:2181,192.168.65.200:2181";public static void main(String[] args) throws Exception {final CountDownLatch countDownLatch=new CountDownLatch(1);ZooKeeper zooKeeper = new ZooKeeper(CLUSTER_CONNECT_STR,4000, new Watcher() {@Overridepublic void process(WatchedEvent event) {if(Event.KeeperState.SyncConnected==event.getState() && event.getType()== Event.EventType.None){//如果收到了服务端的响应事件,连接成功countDownLatch.countDown();System.out.println("连接建立");}}});System.out.printf("连接中");countDownLatch.await();//CONNECTEDSystem.out.println(zooKeeper.getState());//创建持久节点zooKeeper.create("/user","admin".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}
      }
      
  • <3> ZooKeeper常用方法

    • create(String path, byte[] data, List acl, CreateMode createMode):创建一个指定路径的ZNode节点,并在节点中保存 data 中的数据,createMode 指定了节点的类型;
    • delete(String path, int version):如果指定的path路径上的节点存在且与版本号version匹配,则删除ZNode节点;
    • exists(String path, Watcher watcher):判断指定的path路径上的ZNode节点是否存在,并ZNode上设置一个watch监听;
    • getData(String path, Watcher watcher, Stat stat):返回指定path路径上的ZNode节点中的数据,并在ZNode上设置一个watch监听;
    • setData(String path, byte[] data, int version):如果指定path路径上的ZNode节点的版本与给定的version 匹配,则将ZNode节点的数据设置为data;
    • getChildren(String path, Watcher watcher):返回指定路径path上ZNode节点的孩子节点的节点名称,并在ZNode节点上设置一个watch;
    • sync(String path, AsyncCallback.VoidCallback cb, Object ctx):把客户端session连接的节点与leader节点进行同步;
  • <4> 方法特点:

    • 所有获取 ZNode 数据的API 都可以设置一个watch来监控ZNode的变化;
    • 所有更新 ZNode 数据的 API 都有两个版本: 无条件更新版本和条件更新版本。如果 version 为 -1,更新为无条件更新。否则只有给定的 version 和 ZNode 当前的 version 一样,才会进行更新,这样的更新是条件更新。
    • 所有的方法都有同步和异步两个版本。同步版本的方法发送请求给 ZooKeeper 并等待服务器的响 应。异步版本把请求放入客户端的请求队列,然后马上返回。异步版本通过 callback 来接受来 自服务端的响应。
  • <5> 同步创建节点:

    public void createTest() throws KeeperException, InterruptedException {String path = zooKeeper.create(ZK_NODE, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);log.info("created path: {}",path);
    }
    
  • <6> 异步创建节点:

    public void createAsycTest() throws InterruptedException {zooKeeper.create(ZK_NODE, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT,(rc, path, ctx, name) -> log.info("rc  {},path {},ctx {},name {}",rc,path,ctx,name),"context");TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
    }
    
  • <7> 修改节点数据:

    public void setTest() throws KeeperException, InterruptedException {Stat stat = new Stat();byte[] data = zooKeeper.getData(ZK_NODE, false, stat);log.info("修改前: {}",new String(data));zooKeeper.setData(ZK_NODE, "changed!".getBytes(), stat.getVersion());byte[] dataAfter = zooKeeper.getData(ZK_NODE, false, stat);log.info("修改后: {}",new String(dataAfter));
    }
    

1.2 第三方开源客户端 Curator

  • Curator是Netflix公司开源的一套ZooKeeper客户端框架,和ZkClient一样它解决了非常底层的细节开发工作,包括连接、重连、反复注册Watcher的问题以及NodeExistsException异常等。

  • Curator是Apache基金会的顶级项目之一,Curator具有更加完善的文档,另外还提供了一套易用性和可读性更强的Fluent风格的客户端API框架。

  • Curator还为ZooKeeper客户端框架提供了一些比较普遍的、开箱即用的、分布式开发用的解决方案,例如Recipe、共享锁服务、Master选举机制和分布式计算器等,帮助开发者避免了“重复造轮子”的无效开发工作。

  • 官网:https://curator.apache.org/

  • <1> 引入依赖:

    • Curator 包含以下几个包:
      • curator-framework 是对ZooKeeper底层API的一些封装。
      • curator-client 提供了一些客户端的操作,例如重试策略等。
      • curator-recipes 封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等。
        <!-- zookeeper client -->
        <dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.8.0</version>
        </dependency><!--curator-->
        <dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>5.1.0</version><exclusions><exclusion><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></exclusion></exclusions>
        </dependency>
        
  • <2> 创建一个客户端实例:

    • 在使用 curator-framework 操作ZooKeeper前,首先要创建一个客户端实例。这是一个CuratorFramework类型的对象,有以下两种方法:
      • 使用工厂类CuratorFrameworkFactory的静态newClient()方法
        java // 重试策略 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3) //创建客户端实例 CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy); //启动客户端 client.start();
      • 使用工厂类 CuratorFrameworkFactory 的静态 builder 构造者方法。
        	//随着重试次数增加重试时间间隔变大,指数倍增长baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1)))RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.128.129:2181").sessionTimeoutMs(5000)  // 会话超时时间.connectionTimeoutMs(5000) // 连接超时时间.retryPolicy(retryPolicy).namespace("base") // 包含隔离名称.build();client.start();```
        
      • connectionString服务器地址列表,在指定服务器地址列表的时候可以是一个地址,也可以是多个地址。如果是多个地址,那么每个服务器地址列表用逗号分隔, 如 host1:port1,host2:port2,host3;port3 。
      • retryPolicy重试策略,当客户端异常退出或者与服务端失去连接的时候,可以通过设置客户端重新连接 ZooKeeper 服务端。而 Curator 提供了 一次重试、多次重试等不同种类的实现方式。在 Curator 内部,可以通过判断服务器返回的 keeperException 的状态代码来判断是否进行重试处理,如果返回的是 OK 表示一切操作都没有问题,而 SYSTEMERROR 表示系统或服务端错误。
      • 超时时间:Curator 客户端创建过程中,有两个超时时间的设置。一个是 sessionTimeoutMs 会话超时时间,用来设置该条会话在 ZooKeeper 服务端的失效时间。另一个是 connectionTimeoutMs 客户端创建会话的超时时间,用来限制客户端发起一个会话连接到接收 ZooKeeper 服务端应答的时间。sessionTimeoutMs 作用在服务端,而 connectionTimeoutMs 作用在客户端。
  • <3> 创建节点:

    • 创建节点的方式如下面的代码所示,回顾我们之前课程中讲到的内容,描述一个节点要包括节点的类型,即临时节点还是持久节点、节点的数据信息、节点是否是有序节点等属性和性质。
      public void testCreate() throws Exception {String path = curatorFramework.create().forPath("/curator-node");curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/curator-node","some-data".getBytes())log.info("curator create node :{}  successfully.",path);
      }
      
    • 在 Curator 中,可以使用 create 函数创建数据节点,并通过 withMode 函数指定节点类型(持久化节点,临时节点,顺序节点,临时顺序节点,持久化顺序节点等),默认是持久化节点,之后调用 forPath 函数来指定节点的路径和数据信息。
  • <4> 一次性创建带层级结构的节点:

    • 代码如下:

      public void testCreateWithParent() throws Exception {String pathWithParent="/node-parent/sub-node-1";String path = curatorFramework.create().creatingParentsIfNeeded().forPath(pathWithParent);log.info("curator create node :{}  successfully.",path);
      }
      
  • <5> 获取数据:

    • 代码如下:
      public void testGetData() throws Exception {byte[] bytes = curatorFramework.getData().forPath("/curator-node");log.info("get data from  node :{}  successfully.",new String(bytes));
      }
      
  • <6> 更新节点:

    • 我们通过客户端实例的 setData() 方法更新 ZooKeeper 服务上的数据节点,在setData 方法的后边,通过 forPath 函数来指定更新的数据节点路径以及要更新的数据。
      public void testSetData() throws Exception {curatorFramework.setData().forPath("/curator-node","changed!".getBytes());byte[] bytes = curatorFramework.setData().forPath("/curator-node");log.info("get data from  node /curator-node :{}  successfully.",new String(bytes));
      }
      
  • <7> 删除节点:

    • guaranteed:该函数的功能如字面意思一样,主要起到一个保障删除成功的作用,其底层工作方式是:只要该客户端的会话有效,就会在后台持续发起删除请求,直到该数据节点在 ZooKeeper 服务端被删除。
    • deletingChildrenIfNeeded:指定了该函数后,系统在删除该数据节点的时候会以递归的方式直接删除其子节点,以及子节点的子节点。
      public void testDelete() throws Exception {String pathWithParent="/node-parent";curatorFramework.delete().guaranteed().deletingChildrenIfNeeded().forPath(pathWithParent);
      }
      
  • <8> 异步接口:

    • Curator 引入了BackgroundCallback 接口,用来处理服务器端返回来的信息,这个处理过程是在异步线程中调用,默认在 EventThread 中调用,也可以自定义线程池。
      public interface BackgroundCallback
      {/*** Called when the async background operation completes** @param client the client* @param event operation result details* @throws Exception errors*/public void processResult(CuratorFramework client, CuratorEvent event) throws Exception;
      }
      
    • 如上接口,主要参数为 client 客户端, 和 服务端事件 event。inBackground 异步处理默认在EventThread中执行:
      public void test() throws Exception {curatorFramework.getData().inBackground((item1, item2) -> {log.info(" background: {}", item2);}).forPath(ZK_NODE);TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
      }
      
    • 指定线程池:
      public void test() throws Exception {ExecutorService executorService = Executors.newSingleThreadExecutor();curatorFramework.getData().inBackground((item1, item2) -> {log.info(" background: {}", item2);},executorService).forPath(ZK_NODE);TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
      }
      
  • <9> Curator 监听器:

    • 针对 background 通知和错误通知,使用此监听器之后,调用inBackground方法会异步获得监听
      public interface CuratorListener
      {/*** Called when a background task has completed or a watch has triggered** @param client client* @param event the event* @throws Exception any errors*/public void  eventReceived(CuratorFramework client, CuratorEvent event) throws Exception;
      }
      
    • Curator Caches:
      • Curator 引入了 Cache 来实现对 Zookeeper 服务端事件监听,Cache 事件监听可以理解为一个本地缓存视图与远程 Zookeeper 视图的对比过程。Cache 提供了反复注册的功能。Cache 分为两类注册类型:节点监听和子节点监听。
      • node cache:对某一个节点进行监听
        public NodeCache(CuratorFramework client,String path)
        
      • 可以通过注册监听器来实现,对当前节点数据变化的处理
        public void addListener(NodeCacheListener listener)
        
        public class NodeCacheTest extends AbstractCuratorTest{public static final String NODE_CACHE="/node-cache";@Testpublic void testNodeCacheTest() throws Exception {createIfNeed(NODE_CACHE);NodeCache nodeCache = new NodeCache(curatorFramework, NODE_CACHE);nodeCache.getListenable().addListener(new NodeCacheListener() {@Overridepublic void nodeChanged() throws Exception {log.info("{} path nodeChanged: ",NODE_CACHE);printNodeData();}});nodeCache.start();}public void printNodeData() throws Exception {byte[] bytes = curatorFramework.getData().forPath(NODE_CACHE);log.info("data: {}",new String(bytes));}
        }
        
    • Path Cache:
      • PathChildrenCache 会对子节点进行监听,但是不会对二级子节点进行监听

        public PathChildrenCache(CuratorFramework client,String path,boolean cacheData)
        
      • 可以通过注册监听器来实现,对当前节点的子节点数据变化的处理

        public void addListener(PathChildrenCacheListener listener)
        
        public class PathCacheTest extends AbstractCuratorTest{public static final String PATH="/path-cache";@Testpublic void testPathCache() throws Exception {createIfNeed(PATH);PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorFramework, PATH, true);pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {@Overridepublic void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {log.info("event:  {}",event);}});// 如果设置为true则在首次启动时就会缓存节点内容到Cache中pathChildrenCache.start(true);}
        }
        
    • Tree Cache:
      • TreeCache 使用一个内部类TreeNode来维护这个一个树结构。并将这个树结构与ZK节点进行了映射。所以TreeCache 可以监听当前节点下所有节点的事件。
        public TreeCache(CuratorFramework client, String path,boolean cacheData)
        
      • 可以通过注册监听器来实现,对当前节点的子节点,及递归子节点数据变化的处理
        public void addListener(TreeCacheListener listener)
        
        public class TreeCacheTest extends AbstractCuratorTest{public static final String TREE_CACHE="/tree-path";@Testpublic void testTreeCache() throws Exception {createIfNeed(TREE_CACHE);TreeCache treeCache = new TreeCache(curatorFramework, TREE_CACHE);treeCache.getListenable().addListener(new TreeCacheListener() {@Overridepublic void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {log.info(" tree cache: {}",event);}});treeCache.start();}
        }
        

2、ZooKeeper在分布式命名服务中的应用

  • 命名服务是为系统中的资源提供标识能力。ZooKeeper的命名服务主要是利用ZooKeeper节点的树形分层结构和子节点的顺序维护能力,来为分布式系统中的资源命名。
  • 典型的应用场景有:
    • 分布式API目录
    • 分布式节点命名
    • 分布式ID生成器

2.1 分布式API目录

  • 为分布式系统中各种API接口服务的名称、链接地址,提供类似JNDI(Java命名和目录接口)中的文件系统的功能。借助于ZooKeeper的树形分层结构就能提供分布式的API调用功能。
  • 著名的Dubbo分布式框架就是应用了ZooKeeper的分布式的JNDI功能。在Dubbo中,使用ZooKeeper维护的全局服务接口API的地址列表。大致的思路为:
    • 服务提供者(Service Provider)在启动的时候,向ZooKeeper上的指定节点/dubbo/${serviceName}/providers写入自己的API地址,这个操作就相当于服务的公开。
    • 服务消费者(Consumer)启动的时候,订阅节点/dubbo/{serviceName}/providers下的服务提供者的URL地址,获得所有服务提供者的API。

2.2 分布式节点命名

  • 一个分布式系统通常会由很多的节点组成,节点的数量不是固定的,而是不断动态变化的。比如说,当业务不断膨胀和流量洪峰到来时,大量的节点可能会动态加入到集群中。而一旦流量洪峰过去了,就需要下线大量的节点。再比如说,由于机器或者网络的原因,一些节点会主动离开集群。
  • 如何为大量的动态节点命名呢?一种简单的办法是可以通过配置文件,手动为每一个节点命名。但是,如果节点数据量太大,或者说变动频繁,手动命名则是不现实的,这就需要用到分布式节点的命名服务。
  • 可用于生成集群节点的编号的方案:
    (1)使用数据库的自增ID特性,用数据表存储机器的MAC地址或者IP来维护。
    (2)使用ZooKeeper持久顺序节点的顺序特性来维护节点的NodeId编号。
  • 在第2种方案中,集群节点命名服务的基本流程是:
    • 启动节点服务,连接ZooKeeper,检查命名服务根节点是否存在,如果不存在,就创建系统的根节点。
    • 在根节点下创建一个临时顺序ZNode节点,取回ZNode的编号把它作为分布式系统中节点的NODEID。
    • 如果临时节点太多,可以根据需要删除临时顺序ZNode节点。

2.3 分布式ID生成器

  • 在分布式系统中,分布式ID生成器的使用场景非常之多:

    • 大量的数据记录,需要分布式ID。
    • 大量的系统消息,需要分布式ID。
    • 大量的请求日志,如restful的操作记录,需要唯一标识,以便进行后续的用户行为分析和调用链路分析。
    • 分布式节点的命名服务,往往也需要分布式ID。
  • 传统的数据库自增主键已经不能满足需求。在分布式系统环境中,迫切需要一种全新的唯一ID系统,这种系统需要满足以下需求:
    (1)全局唯一:不能出现重复ID。
    (2)高可用:ID生成系统是基础系统,被许多关键系统调用,一旦宕机,就会造成严重影响。

  • 有哪些分布式的ID生成器方案呢?大致如下:

    • 1.Java的UUID。
    • 2.分布式缓存Redis生成ID:利用Redis的原子操作INCR和INCRBY,生成全局唯一的ID。
    • 3.Twitter的SnowFlake算法。
    • 4.ZooKeeper生成ID:利用ZooKeeper的顺序节点,生成全局唯一的ID。
    • 5.MongoDb的ObjectId:MongoDB是一个分布式的非结构化NoSQL数据库,每插入一条记录会自动生成全局唯一的一个“_id”字段值,它是一个12字节的字符串,可以作为分布式系统中全局唯一的ID。
  • <1> 基于Zookeeper实现分布式ID生成器

    • 在ZooKeeper节点的四种类型中,其中有以下两种类型具备自动编号的能力
      • PERSISTENT_SEQUENTIAL持久化顺序节点。
      • EPHEMERAL_SEQUENTIAL临时顺序节点。
    • ZooKeeper的每一个节点都会为它的第一级子节点维护一份顺序编号,会记录每个子节点创建的先后顺序,这个顺序编号是分布式同步的,也是全局唯一的。
    • 可以通过创建ZooKeeper的临时顺序节点的方法,生成全局唯一的ID
      public class IDMaker extends CuratorBaseOperations {private String createSeqNode(String pathPefix) throws Exception {CuratorFramework curatorFramework = getCuratorFramework();//创建一个临时顺序节点String destPath = curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(pathPefix);return destPath;}public String  makeId(String path) throws Exception {String str = createSeqNode(path);if(null != str){//获取末尾的序号int index = str.lastIndexOf(path);if(index>=0){index+=path.length();return index<=str.length() ? str.substring(index):"";}}return str;}
      }
      
    • 测试:
      @Test
      public void testMarkId() throws Exception {IDMaker idMaker = new IDMaker();idMaker.init();String pathPrefix = "/idmarker/id-";for(int i=0;i<5;i++){new Thread(()->{for (int j=0;j<10;j++){String id = null;try {id = idMaker.makeId(pathPrefix);log.info("{}线程第{}个创建的id为{}",Thread.currentThread().getName(),j,id);} catch (Exception e) {e.printStackTrace();}}},"thread"+i).start();}Thread.sleep(Integer.MAX_VALUE);}
      
  • <2> 基于Zookeeper实现SnowFlakeID算法:

    • Twitter(推特)的SnowFlake雪花算法是一种著名的分布式服务器用户ID生成算法。SnowFlake算法所生成的ID是一个64bit的长整型数字,如图10-2所示。这个64bit被划分成四个部分,其中后面三个部分分别表示时间戳、工作机器ID、序列号。

    • SnowFlakeID的四个部分,具体介绍如下:

      • (1)第一位 占用1 bit,其值始终是0,没有实际作用。
      • (2)时间戳 占用41 bit,精确到毫秒,总共可以容纳约69年的时间。
      • (3)工作机器id占用10 bit,最多可以容纳1024个节点。
      • (4)序列号 占用12 bit。这个值在同一毫秒同一节点上从0开始不断累加,最多可以累加到4095。
    • 在工作节点达到1024顶配的场景下,SnowFlake算法在同一毫秒最多可以生成的ID数量为: 1024 * 4096 =4194304,在绝大多数并发场景下都是够用的。

    • SnowFlake算法的优点

      • 生成ID时不依赖于数据库,完全在内存生成,高性能和高可用性。
      • 容量大,每秒可生成几百万个ID。
      • ID呈趋势递增,后续插入数据库的索引树时,性能较高。
    • SnowFlake算法的缺点

      • 依赖于系统时钟的一致性,如果某台机器的系统时钟回拨了,有可能造成ID冲突,或者ID乱序。
      • 在启动之前,如果这台机器的系统时间回拨过,那么有可能出现ID重复的危险。
    • 基于zookeeper实现雪花算法:

      public class SnowflakeIdGenerator {/*** 单例*/public static SnowflakeIdGenerator instance =new SnowflakeIdGenerator();/*** 初始化单例** @param workerId 节点Id,最大8091* @return the 单例*/public synchronized void init(long workerId) {if (workerId > MAX_WORKER_ID) {// zk分配的workerId过大throw new IllegalArgumentException("woker Id wrong: " + workerId);}instance.workerId = workerId;}private SnowflakeIdGenerator() {}/*** 开始使用该算法的时间为: 2017-01-01 00:00:00*/private static final long START_TIME = 1483200000000L;/*** worker id 的bit数,最多支持8192个节点*/private static final int WORKER_ID_BITS = 13;/*** 序列号,支持单节点最高每毫秒的最大ID数1024*/private final static int SEQUENCE_BITS = 10;/*** 最大的 worker id ,8091* -1 的补码(二进制全1)右移13位, 然后取反*/private final static long MAX_WORKER_ID = ~(-1L << WORKER_ID_BITS);/*** 最大的序列号,1023* -1 的补码(二进制全1)右移10位, 然后取反*/private final static long MAX_SEQUENCE = ~(-1L << SEQUENCE_BITS);/*** worker 节点编号的移位*/private final static long WORKER_ID_SHIFT = SEQUENCE_BITS;/*** 时间戳的移位*/private final static long TIMESTAMP_LEFT_SHIFT = WORKER_ID_BITS + SEQUENCE_BITS;/*** 该项目的worker 节点 id*/private long workerId;/*** 上次生成ID的时间戳*/private long lastTimestamp = -1L;/*** 当前毫秒生成的序列*/private long sequence = 0L;/*** Next id long.** @return the nextId*/public Long nextId() {return generateId();}/*** 生成唯一id的具体实现*/private synchronized long generateId() {long current = System.currentTimeMillis();if (current < lastTimestamp) {// 如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过,出现问题返回-1return -1;}if (current == lastTimestamp) {// 如果当前生成id的时间还是上次的时间,那么对sequence序列号进行+1sequence = (sequence + 1) & MAX_SEQUENCE;if (sequence == MAX_SEQUENCE) {// 当前毫秒生成的序列数已经大于最大值,那么阻塞到下一个毫秒再获取新的时间戳current = this.nextMs(lastTimestamp);}} else {// 当前的时间戳已经是下一个毫秒sequence = 0L;}// 更新上次生成id的时间戳lastTimestamp = current;// 进行移位操作生成int64的唯一ID//时间戳右移动23位long time = (current - START_TIME) << TIMESTAMP_LEFT_SHIFT;//workerId 右移动10位long workerId = this.workerId << WORKER_ID_SHIFT;return time | workerId | sequence;}/*** 阻塞到下一个毫秒*/private long nextMs(long timeStamp) {long current = System.currentTimeMillis();while (current <= timeStamp) {current = System.currentTimeMillis();}return current;}
      }
      

3、ZooKeeper实现分布式队列

  • 常见的消息队列有:RabbitMQ,RocketMQ,Kafka等。Zookeeper作为一个分布式的小文件管理系统,同样能实现简单的队列功能。Zookeeper不适合大数据量存储,官方并不推荐作为队列使用,但由于实现简单,集群搭建较为便利,因此在一些吞吐量不高的小型系统中还是比较好用的。

3.1 设计思路

  • 1、创建队列根节点:在Zookeeper中创建一个持久节点,用作队列的根节点。所有队列元素的节点将放在这个根节点下。
  • 2、实现入队操作:当需要将一个元素添加到队列时,可以在队列的根节点下创建一个临时有序节点。节点的数据可以包含队列元素的信息。
  • 3、实现出队操作:当需要从队列中取出一个元素时,可以执行以下操作:
    • 获取根节点下的所有子节点。
    • 找到具有最小序号的子节点。
    • 获取该节点的数据。
    • 删除该节点。
    • 返回节点的数据。
    /*** 入队* @param data* @throws Exception*/
    public void enqueue(String data) throws Exception {// 创建临时有序子节点zk.create(QUEUE_ROOT + "/queue-", data.getBytes(StandardCharsets.UTF_8),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    }/*** 出队* @return* @throws Exception*/
    public String dequeue() throws Exception {while (true) {List<String> children = zk.getChildren(QUEUE_ROOT, false);if (children.isEmpty()) {return null;}Collections.sort(children);for (String child : children) {String childPath = QUEUE_ROOT + "/" + child;try {byte[] data = zk.getData(childPath, false, null);zk.delete(childPath, -1);return new String(data, StandardCharsets.UTF_8);} catch (KeeperException.NoNodeException e) {// 节点已被其他消费者删除,尝试下一个节点}}}
    }
    

3.2 使用Apache Curator实现分布式队列

  • Apache Curator是一个ZooKeeper客户端的封装库,提供了许多高级功能,包括分布式队列。

    public class CuratorDistributedQueueDemo {private static final String QUEUE_ROOT = "/curator_distributed_queue";public static void main(String[] args) throws Exception {CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181",new ExponentialBackoffRetry(1000, 3));client.start();// 定义队列序列化和反序列化QueueSerializer<String> serializer = new QueueSerializer<String>() {@Overridepublic byte[] serialize(String item) {return item.getBytes();}@Overridepublic String deserialize(byte[] bytes) {return new String(bytes);}};// 定义队列消费者QueueConsumer<String> consumer = new QueueConsumer<String>() {@Overridepublic void consumeMessage(String message) throws Exception {System.out.println("消费消息: " + message);}@Overridepublic void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {}};// 创建分布式队列DistributedQueue<String> queue = QueueBuilder.builder(client, consumer, serializer, QUEUE_ROOT).buildQueue();queue.start();// 生产消息for (int i = 0; i < 5; i++) {String message = "Task-" + i;System.out.println("生产消息: " + message);queue.put(message);Thread.sleep(1000);}Thread.sleep(10000);queue.close();client.close();}
    }
    

3.3 注意事项

  • 使用Curator的DistributedQueue时,默认情况下不使用锁。当调用QueueBuilder的lockPath()方法并指定一个锁节点路径时,才会启用锁。如果不指定锁节点路径,那么队列操作可能会受到并发问题的影响。

  • 在创建分布式队列时,指定一个锁节点路径可以帮助确保队列操作的原子性和顺序性。分布式环境中,多个消费者可能同时尝试消费队列中的消息。如果不使用锁来同步这些操作,可能会导致消息被多次处理或者处理顺序出现混乱。当然,并非所有场景都需要指定锁节点路径。如果您的应用场景允许消息被多次处理,或者处理顺序不是关键问题,那么可以不使用锁。这样可以提高队列操作的性能,因为不再需要等待获取锁。

    // 创建分布式队列
    QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, serializer, "/order");
    //指定了一个锁节点路径/orderlock,用于实现分布式锁,以保证队列操作的原子性和顺序性。
    queue = builder.lockPath("/orderlock").buildQueue();
    //启动队列,这时队列开始监听ZooKeeper中/order节点下的消息。
    queue.start();
    

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/58555.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

信息安全工程师(73)网络安全风险评估过程

一、确定评估目标 此阶段需要明确评估的范围、目标和要求。评估目标通常包括特定的网络系统、信息系统或网络基础设施&#xff0c;评估范围可能涉及整个组织或仅特定部门。明确评估要求有助于确保评估过程的针对性和有效性。 二、收集信息 在评估开始之前&#xff0c;需要对目标…

Vmos pro-虚拟机 解锁永久vip

[应用名称] 应用名称&#xff1a;Vmos pro [应用版本] 应用版本&#xff1a;2.99 [软件大小] 软件大小&#xff1a;32.2mb [应用简介] 应用简介&#xff1a;Vmos Pro这款安卓虚拟机平台&#xff0c;提供了多样化的ROM版本选择。用户可根据自身需求更换ROM&#xff0c;调…

华为OD机试 - 最多购买宝石数目 - 滑动窗口(Python/JS/C/C++ 2024 C卷 100分)

华为OD机试 2024E卷题库疯狂收录中&#xff0c;刷题点这里 专栏导读 本专栏收录于《华为OD机试真题&#xff08;Python/JS/C/C&#xff09;》。 刷的越多&#xff0c;抽中的概率越大&#xff0c;私信哪吒&#xff0c;备注华为OD&#xff0c;加入华为OD刷题交流群&#xff0c;…

1:基本电路专题:R(电阻)的介绍

说实话这个其实我不想写的&#xff0c;因为这个是初中的知识&#xff0c;并没有很难&#xff0c;但是为了保持整齐性&#xff0c;我还是写了一下关于这个的知识点。是电子学中三大基本无源元件之一。&#xff08;R&#xff08;电阻&#xff09;,L&#xff08;电感&#xff09;,…

基于SpringBoot的“CSGO赛事管理系统”的设计与实现(源码+数据库+文档+PPT)

基于SpringBoot的“CSGO赛事管理系统”的设计与实现&#xff08;源码数据库文档PPT) 开发语言&#xff1a;Java 数据库&#xff1a;MySQL 技术&#xff1a;SpringBoot 工具&#xff1a;IDEA/Ecilpse、Navicat、Maven 系统展示 系统首页界面图 赛事信息界面图 赛事通知界面…

漏洞挖掘 | 通过域混淆绕过实现账户接管

由于这是一个私有项目&#xff0c;我将使用 example.com 来代替。 很长一段时间以来&#xff0c;我一直想在漏洞赏金项目中找到一个账户接管&#xff08;ATO&#xff09;漏洞。于是&#xff0c;我开始探索项目范围内的 account.example.com。 我做的第一件事就是注册一个新账…

视觉目标检测标注xml格式文件解析可视化 - python 实现

视觉目标检测任务&#xff0c;通常用 labelimage标注&#xff0c;对应的标注文件为xml。 该示例来源于开源项目&#xff1a;https://gitcode.com/DataBall/DataBall-detections-100s/overview 读取 xml 标注文件&#xff0c;并进行可视化示例如下&#xff1a; #-*-coding:ut…

地理征服营销与开源 AI 智能名片 2 + 1 链动模式 S2B2C 商城小程序的融合创新

摘要&#xff1a;本文探讨了地理征服营销这一创新营销策略与开源 AI 智能名片 2 1 链动模式 S2B2C 商城小程序的融合应用。首先阐述地理征服营销的概念和实施要点&#xff0c;接着介绍开源 AI 智能名片 2 1 链动模式 S2B2C 商城小程序的功能与优势&#xff0c;分析二者结合如…

三周精通FastAPI:24 OAuth2 实现简单的 Password 和 Bearer 验证

官网文档&#xff1a;https://fastapi.tiangolo.com/zh/tutorial/security/simple-oauth2/ OAuth2 实现简单的 Password 和 Bearer 验证 本章添加上一章示例中欠缺的部分&#xff0c;实现完整的安全流。 获取 username 和 password 首先&#xff0c;使用 FastAPI 安全工具获…

字节青训-兔群繁殖之谜

问题描述 生物学家小 R 正在研究一种特殊的兔子品种的繁殖模式。这种兔子的繁殖遵循以下规律&#xff1a; 每对成年兔子每个月会生育一对新的小兔子&#xff08;一雌一雄&#xff09;。新生的小兔子需要一个月成长&#xff0c;到第二个月才能开始繁殖。兔子永远不会死亡。 小 R…

MiniWord

1.nuget 下载配置 2.引用 3. var value = new Dictionary<string, object>() { ["nianfen"] = nianfen, ["yuefen"] = yuefen, ["yuefenjian1"] = (int.Par…

计算机毕业设计Python+大模型恶意木马流量检测与分类 恶意流量监测 随机森林模型 深度学习 机器学习 数据可视化 大数据毕业设计 信息安全 网络安全

温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; Python大模型恶意木马流量检…

04.DDD与CQRS

学习视频来源&#xff1a;DDD独家秘籍视频合集 https://space.bilibili.com/24690212/channel/collectiondetail?sid1940048&ctype0 文章目录 定义职责分离DDD与CQRS的关系领域模型和查询模型特点命令场景的领域模型查询场景的查询模型 架构方案领域事件方案1&#xff1a…

专业140+总分410+武汉大学807信号与系统考研经验武大原936电子信息与通信工程,真题,大纲,参考书。

考研专业课807信号与系统(原936)140&#xff0c;总分410&#xff0c;顺利被武汉大学录取&#xff0c;群 里不少同学希望总结一下复习经验&#xff0c;回看这一年有得有失&#xff0c;总结一下希望给大家有些参考。考研还需从自身情况出发&#xff0c;制定适合自己的复习计划&am…

eclipse下载与安装(汉化教程)超详细

目录 一、下载eclipse安装包 三、配置eclipse 代码自动补全功能 安装汉化包 中英文切换 四、用eclipse写hello world 一、下载eclipse安装包 1、首先进入 eclipse官网 如下&#xff1a; 2、这里面有很多版本&#xff1b;我们小白一般选择第二个&#xff0c;向下滑动&…

【Kettle的安装与使用】使用Kettle实现mysql和hive的数据传输(使用Kettle将mysql数据导入hive、将hive数据导入mysql)

文章目录 一、安装1、解压2、修改字符集3、启动 二、实战1、将hive数据导入mysql2、将mysql数据导入到hive 一、安装 Kettle的安装包在文章结尾 1、解压 在windows中解压到一个非中文路径下 2、修改字符集 修改 spoon.bat 文件 "-Dfile.encodingUTF-8"3、启动…

RHCE笔记-DNS服务器

一.DNS简介 DNS&#xff08;域名系统&#xff09;是一种互联网服务&#xff0c;负责将我们熟悉的域名&#xff08;比如 www.example.com&#xff09;转换为计算机能理解的IP地址&#xff08;比如 192.0.2.1&#xff09;。这样&#xff0c;当你在浏览器中输入网址时&#xff0c;…

利用QGIS工具手动绘制线轨迹并生成地理信息geojson文件

前端想要获得一个完整的shp文件或者geojson的地理信息文件&#xff0c;可以利用QGIS工具手动绘制你想要的数据点位&#xff0c;然后导出图层生成对应的文件即可。 1、新建临时图层 选择线图层&#xff0c;点击ok创建临时图层。 2、绘制线图层 在工具栏中选择添加线要素&#…

面试记录(1)

java中的抽象类和接口的区别&#xff1a; 相同点 (1) 都可以被继承 (2) 都不能被实例化 (3) 都可以包含方法声明 (4) 派生类必须实现未实现的方法 不同点 1.关键字不同&#xff1a; ​ ① 继承抽象类的关键字是extends&#xff0c;而实现接口的关键字是implements&#xff1b;…

构建您自己的 RAG 应用程序:使用 Ollama、Python 和 ChromaDB 在本地设置 LLM 的分步指南

在数据隐私至关重要的时代&#xff0c;建立自己的本地语言模型 &#xff08;LLM&#xff09; 为公司和个人都提供了至关重要的解决方案。本教程旨在指导您完成使用 Ollama、Python 3 和 ChromaDB 创建自定义聊天机器人的过程&#xff0c;所有这些机器人都托管在您的系统本地。以…