Zookeeper(九)客户端的启动流程

目录

  • 一 ZooKeeper会话的创建与连接
    • 1.1 会话的创建
      • 1.1.1 ClientWatchManager
      • 1.1.2 ConnectStringParser
      • 1.1.3 HostProvider
      • 1.1.4 ClientCnxn
    • 1.2 会话的连接
      • 1.2.1 SendThread
      • 1.2.2 eventThread
  • 二 ZooKeeper会话的响应
    • 2.1 接受服务端响应
  • 三 ClientCnxn 详解
    • 3.1 Packet
    • 3.2 队列
    • 3.3 ClientCnxnSocket:底层Socket通信层

  • 官网:Apache ZooKeeper

ZooKeeper的客户端主要由以下几个核心组件组成。

  • ZooKeeper实例:客户端的入口。
  • ClientWatchManager:客户端Watcher管理器。
  • HostProvider:客户端地址列表管理器。
  • ClientCnxn:客户端核心线程,其内部又包含两个线程,即SendThread和EventThread。前者是一个I/O线程,主要负责ZooKeeper客户端和服务端之间的网络I/O通信;后者是一个事件线程,主要负责对服务端事件进行处理。

一 ZooKeeper会话的创建与连接

  • ZooKeeper客户端的初始化与启动环节,实际上就是ZooKeeper对象的实例化过程

image.png

  • 我们可以看到需要的参数:设置默认Watcher,设置ZooKeeper服务器地址列表,创建ClientCnxn。
  • 如果在ZooKeeper的构造方法中传入一个Watcher对象的话,那么ZooKeeper就会将这个Watcher对象保存在ZKWatchManager的defaultWatcher中,作为整个客户端会话期间的默认Watcher。

1.1 会话的创建

private final ZKWatchManager watchManager = new ZKWatchManager();public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,boolean canBeReadOnly)
throws IOException
{LOG.info("Initiating client connection, connectString=" + connectString+ " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);watchManager.defaultWatcher = watcher;ConnectStringParser connectStringParser = new ConnectStringParser(connectString);HostProvider hostProvider = new StaticHostProvider(connectStringParser.getServerAddresses());cnxn = new ClientCnxn(connectStringParser.getChrootPath(),hostProvider, sessionTimeout, this, watchManager,getClientCnxnSocket(), canBeReadOnly);cnxn.start();
}
  • 通过调用ZooKeeper的构造方法来实例化一个ZooKeeper对象,在初始化过程中,会创建一个客户端的Watcher管理器:ClientWatchManager。
  • 如果在构造方法中传入了一个Watcher对象,那么客户端会将这个对象作为默认Watcher保存在ClientWatchManager中。
  • 对于构造方法中传入的服务器地址,客户端会将其存放在服务器地址列表管理器HostProvider中。
  • ZooKeeper客户端首先会创建一个网络连接器ClientCnxn,用来管理客户端与服务器的网络交互。另外,客户端在创建ClientCnxn的同时,还会初始化客户端两个核心队列outgoingQueue和pendingQueue,分别作为客户端的请求发送队列和服务端响应的等待队列。

1.1.1 ClientWatchManager

  • ZKWatchManager类的主要作用是管理和触发Zookeeper客户端的监视事件(watches)。
  • 在Zookeeper中,监视事件是一种机制,允许客户端在Zookeeper服务上注册兴趣,当指定节点的数据发生变化或者子节点列表发生变化时,客户端会收到通知。
  • 这个类中定义了三个Map类型的成员变量,分别用来存储数据监视(dataWatches)、存在监视(existWatches)和子节点监视(childWatches)的Watcher集合。Watcher是一个接口,它的实现类可以定义当特定事件发生时客户端需要执行的操作。
  • materialize方法是ClientWatchManager接口的一个实现,它的作用是根据事件类型和状态来触发相应的Watcher。方法的参数包括Zookeeper的连接状态(KeeperState)、事件类型(EventType)和客户端路径(clientPath)。

根据事件类型,materialize方法会执行以下操作:

  1. None类型:表示没有特定的事件发生,此时会触发默认监视器(defaultWatcher),并且如果配置了禁用自动重置监视器,并且当前连接状态不是同步连接状态,那么会清除所有的监视器集合。
  2. NodeDataChanged和NodeCreated类型:表示节点数据发生变化或者新节点被创建,此时会移除与指定路径相关的数据监视器和存在监视器,并将它们添加到结果集合中。
  3. NodeChildrenChanged类型:表示子节点列表发生变化,此时会移除与指定路径相关的子节点监视器,并将它们添加到结果集合中。
  4. NodeDeleted类型:表示节点被删除,此时会移除与指定路径相关的数据监视器、存在监视器和子节点监视器,并将它们添加到结果集合中。如果在存在监视器中发现了一个不应该出现的节点被删除的情况,将会记录一条警告日志。
  5. 其他未处理的事件类型:将会记录一条错误日志,并抛出运行时异常。

1.1.2 ConnectStringParser

  • 解析字符串转为InetSocketAddress存在集合中
    private final ArrayList<InetSocketAddress> serverAddresses = new ArrayList<InetSocketAddress>();
  • 在3.2.0及其之后版本的ZooKeeper中,添加了“Chroot”特性[插图],该特性允许每个客户端为自己设置一个命名空间(Namespace)。如果一个ZooKeeper客户端设置了Chroot,那么该客户端对服务器的任何操作,都将会被限制在其自己的命名空间下。
 private final String chrootPath;

1.1.3 HostProvider

在ConnectStringParser解析器中会对服务器地址做一个简单的处理,并将服务器地址和相应的端口封装成一个InetSocketAddress对象,以ArrayList形式保存在ConnectStringParser.serverAddresses属性中,然后,经过处理的地址列表会被进一步封装到StaticHostProvider类中。
image.png

  • 我们可以看到默认实现:StaticHostProvider,把解析号可用的InetSocketAddress,最后进行一个打散

image.png

  • 通过调用StaticHostProvider的next()方法,能够从StaticHostProvider中获取一个可用的服务器地址。这个next()方法并非简单地从serverAddresses中依次获取一个服务器地址,而是先将随机打散后的服务器地址列表拼装成一个环形循环队列

image.png

1.1.4 ClientCnxn

image.png

  • ClientCnxn是ZooKeeper客户端的核心工作类,负责维护客户端与服务端之间的网络连接并进行一系列网络通信。
  • 客户端会创建两个核心网络线程SendThread和EventThread,前者用于管理客户端和服务端之间的所有网络I/O,后者则用于进行客户端的事件处理。
  • 同时,客户端还会将ClientCnxnSocket分配给SendThread作为底层网络I/O处理器,并初始化EventThread的待处理事件队列waitingEvents,用于存放所有等待被客户端处理的事件。
   public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {this.zooKeeper = zooKeeper;this.watcher = watcher;this.sessionId = sessionId;this.sessionPasswd = sessionPasswd;this.sessionTimeout = sessionTimeout;this.hostProvider = hostProvider;this.chrootPath = chrootPath;connectTimeout = sessionTimeout / hostProvider.size();readTimeout = sessionTimeout * 2 / 3;readOnly = canBeReadOnly;sendThread = new SendThread(clientCnxnSocket);eventThread = new EventThread();}

1.2 会话的连接

image.png

    public void start() {sendThread.start();eventThread.start();}

启动SendThread和EventThread

1.2.1 SendThread

SendThread首先会判断当前客户端的状态,进行一系列清理性工作,为客户端发送“会话创建”请求做准备。
image.png
在开始创建TCP连接之前,SendThread首先需要获取一个ZooKeeper服务器的目标地址,这通常是从HostProvider中随机获取出一个地址,然后委托给ClientCnxnSocket去创建与ZooKeeper服务器之间的TCP连接。

private void startConnect() throws IOException {state = States.CONNECTING;InetSocketAddress addr;if (rwServerAddress != null) {addr = rwServerAddress;rwServerAddress = null;} else {addr = hostProvider.next(1000);}setName(getName().replaceAll("\\(.*\\)","(" + addr.getHostName() + ":" + addr.getPort() + ")"));if (ZooKeeperSaslClient.isEnabled()) {try {String principalUserName = System.getProperty(ZK_SASL_CLIENT_USERNAME, "zookeeper");zooKeeperSaslClient =new ZooKeeperSaslClient(principalUserName+"/"+addr.getHostName());} catch (LoginException e) {// An authentication error occurred when the SASL client tried to initialize:// for Kerberos this means that the client failed to authenticate with the KDC.// This is different from an authentication error that occurs during communication// with the Zookeeper server, which is handled below.LOG.warn("SASL configuration failed: " + e + " Will continue connection to Zookeeper server without "+ "SASL authentication, if Zookeeper server allows it.");eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None,Watcher.Event.KeeperState.AuthFailed, null));saslLoginFailed = true;}}logStartConnect(addr);clientCnxnSocket.connect(addr);}

获取到一个服务器地址后,ClientCnxnSocket负责和服务器创建一个TCP长连接。
image.png
这里有两个实现类,默认第一个,封装连接请求,ConnectRequest,最后在封装成Packet对象,放入请求发送队列outgoingQueue中去。
image.png
当客户端请求准备完毕后,就可以开始向服务端发送请求了。ClientCnxnSocket负责从outgoingQueue中取出一个待发送的Packet对象,将其序列化成ByteBuffer后,向服务端进行发送。

 void sendPacket(Packet p) throws IOException {SocketChannel sock = (SocketChannel) sockKey.channel();if (sock == null) {throw new IOException("Socket is null!");}p.createBB();ByteBuffer pbb = p.bb;sock.write(pbb);}

1.2.2 eventThread

  • 启动一个线程不断轮训事件,对不同的事件对出不同步的反应,processEvent方法是关键

image.png
首先,方法通过instanceof关键字检查传入的事件对象是否是WatcherSetEventPair类型。如果是,这意味着事件包含了一组Watcher对象,每个Watcher都会处理这个事件。代码遍历这些Watcher对象,并调用它们的process方法来处理事件。如果在处理过程中抛出异常,将会记录错误日志。
如果事件对象不是WatcherSetEventPair类型,那么它将被当作Packet类型来处理。Packet对象包含了客户端路径、回复头信息以及回调接口(cb)。代码首先检查回复头中的错误码,如果有错误,就将错误码保存在局部变量rc中。
接下来,代码根据响应的类型(例如ExistsResponse、GetDataResponse等)来调用相应的回调接口方法。这些回调接口是Zookeeper客户端用来接收操作结果的机制。例如,如果响应是GetDataResponse类型,那么代码会调用DataCallback接口的processResult方法,并传入操作结果码、客户端路径、上下文信息、节点数据和状态信息。
此外,代码还处理了MultiResponse类型,这是一种特殊的情况,表示一个请求包含了多个操作,每个操作都有自己的结果。在这种情况下,代码会遍历结果列表,并在所有操作都成功的情况下调用MultiCallback接口的processResult方法。
最后,如果回调接口是VoidCallback类型,那么代码会调用processResult方法,但不会传入任何数据,因为VoidCallback不关心操作结果。

二 ZooKeeper会话的响应

2.1 接受服务端响应

ClientCnxnSocket接收到服务端的响应后,会首先判断当前的客户端状态是否是“已初始化”,如果尚未完成初始化,那么就认为该响应一定是会话创建请求的响应,直接交由readConnectResult方法来处理该响应。
image.png

  • 反序列响应结果,使用Jute进行的

image.png

  • 连接成功后,一方面需要通知SendThread线程,进一步对客户端进行会话参数的设置,包括readTimeout和connectTimeout等,并更新客户端状态;另一方面,需要通知地址管理器HostProvider当前成功连接的服务器地址。

image.png

  • 为了能够让上层应用感知到会话的成功创建,SendThread会生成一个事件SyncConnected-None,代表客户端与服务器会话创建成功,并将该事件传递给EventThread线程。

image.png
EventThread线程收到事件后,会从ClientWatchManager管理器中查询出对应的Watcher,针对SyncConnected-None事件,那么就直接找出步骤2中存储的默认Watcher,然后将其放到EventThread的waitingEvents队列中去。
EventThread不断地从waitingEvents队列中取出待处理的Watcher对象,然后直接调用该对象的process接口方法,以达到触发Watcher的目的。

三 ClientCnxn 详解

3.1 Packet

Packet是ClientCnxn内部定义的一个对协议层的封装,作为ZooKeeper中请求与响应的载体
image.png
Packet中包含了最基本的请求头(requestHeader)、响应头(replyHeader)、请求体(request)、响应体(response)、节点路径(clientPath/serverPath)和注册的Watcher(watchRegistration)等信息。
Packet的createBB()方法负责对Packet对象进行序列化,最终生成可用于底层网络传输的ByteBuffer对象。在这个过程中,只会将requestHeader、request和readOnly三个属性进行序列化,其余属性都保存在客户端的上下文中,不会进行与服务端之间的网络传输。

 public void createBB() {try {ByteArrayOutputStream baos = new ByteArrayOutputStream();BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);boa.writeInt(-1, "len"); // We'll fill this in laterif (requestHeader != null) {requestHeader.serialize(boa, "header");}if (request instanceof ConnectRequest) {request.serialize(boa, "connect");// append "am-I-allowed-to-be-readonly" flagboa.writeBool(readOnly, "readOnly");} else if (request != null) {request.serialize(boa, "request");}baos.close();this.bb = ByteBuffer.wrap(baos.toByteArray());this.bb.putInt(this.bb.capacity() - 4);this.bb.rewind();} catch (IOException e) {LOG.warn("Ignoring unexpected exception", e);}}

3.2 队列

outgoingQueue和pendingQueueClientCnxn中,有两个比较核心的队列outgoingQueue和pendingQueue,分别代表客户端的请求发送队列和服务端响应的等待队列。Outgoing队列是一个请求发送队列,专门用于存储那些需要发送到服务端的Packet集合。Pending队列是为了存储那些已经从客户端发送到服务端的,但是需要等待服务端响应的Packet集合。

3.3 ClientCnxnSocket:底层Socket通信层

  • ClientCnxnSocket定义了底层Socket通信的接口。在ZooKeeper3.4.0以前的版本中,客户端的这个底层通信层并没有被独立出来,而是混合在了ClientCnxn代码中。
  • 但后来为了使客户端代码结构更为清晰,同时也是为了便于对底层Socket层进行扩展(例如使用Netty来实现),因此从3.4.0版本开始,抽取出了这个接口类。在使用ZooKeeper客户端的时候,可以通过在zookeeper.clientCnxnSocket这个系统变量中配置ClientCnxnSocket实现类的全类名,以指定底层Socket通信层的自定义实现,例如,-Dzookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNIO。在ZooKeeper中,其默认的实现是ClientCnxnSocketNIO。该实现类使用Java原生的NIO接口,其核心是doIO逻辑,主要负责对请求的发送和响应接收过程。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/782288.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vue/html 集成对接 汉王esp370(标准版/谷歌版)

汉王eqp370版本介绍&#xff08;所有下面的资料都在我主页文件里面&#xff09; 标准版&#xff1a;只支持IE版本浏览器 谷歌版&#xff1a;支持谷歌版本浏览器 区分汉王版本的软件&#xff1a;已提供 如何区分版本 集成标准版方式 原理&#xff1a;exe的ocx组件安装后&#xf…

计算机视觉的应用27-关于VoVNetV2模型的应用场景,VoVNetV2模型结构介绍

大家好&#xff0c;我是微学AI&#xff0c;今天给大家介绍一下计算机视觉的应用27-关于VoVNetV2模型的应用场景&#xff0c;VoVNetV2模型结构介绍。VoVNetV2&#xff08;Visual Object-Driven Representation Learning Network Version 2&#xff09;是一种深度学习模型&#x…

建立动态MGRE隧道的配置方法

目录 一、实验拓扑 1.1通用配置 1.1.1地址配置 1.1.2静态缺省指向R5&#xff0c;实现公网互通 1.1.3MGRE协议配置 1.1.4配置静态 二、Shortcut方式 三、Normal方式&#xff08;非shortcut&#xff09; 四、总结 一、实验拓扑 下面两种配置方法皆使用静态方式 1.1通用配…

Sentinel原理及实践

Sentinel 是什么 Sentinel 是面向分布式、多语言异构化服务架构的流量治理组件&#xff0c;主要以流量为切入点&#xff0c;从流量路由、流量控制、流量整形、熔断降级、系统自适应过载保护、热点流量防护等多个维度来帮助开发者保障微服务的稳定性。 为什么使用sentinel&…

文件的顺序读写——顺序读写函数——fgets、fgetc、fputs、 fputc

✨✨ 欢迎大家来到莉莉的博文✨✨ &#x1f388;&#x1f388;养成好习惯&#xff0c;先赞后看哦~&#x1f388;&#x1f388; 目录 一、fgetc和fputc函数 1.1 fputc 1.2 fgetc 二、fputs和fgets函数 2.1 fputs函数 2.2 fgets函数 一、fgetc和fputc函数 1.1 fputc 返回类…

基于单片机自行车码表系统设计

**单片机设计介绍&#xff0c;基于单片机自行车码表系统设计 文章目录 一 概要二、功能设计三、 软件设计原理图 五、 程序六、 文章目录 一 概要 基于单片机自行车码表系统设计主要涵盖了硬件设计、软件设计以及功能实现等多个方面。以下是对该设计概要的详细描述&#xff1a…

计算机网络——31数据链路层和局域网引论和服务

数据链路层和局域网 WAN&#xff1a;网络形式采用点到点链路 带宽大&#xff0c;距离远&#xff08;延迟大&#xff09; 贷款延迟积大 如果采用多点连接方式 竞争方式&#xff1a;一旦冲突代价大令牌等协调方式&#xff1a;在其中协调节点的发送代价大 点到点链路的链路层服…

每日面经分享(Spring Boot: part3 Service层)

SpringBoot Service层的作用 a. 封装业务逻辑&#xff1a;Service层负责封装应用程序的业务逻辑。Service层是控制器&#xff08;Controller&#xff09;和数据访问对象&#xff08;DAO&#xff09;之间的中间层&#xff0c;负责处理业务规则和业务流程。通过将业务逻辑封装在S…

设计模式(9):外观模式

一.迪米特法则(最少知识原则) 一个软件实体应当尽可能少的与其他实体发生相互作用。 二.外观模式 为子系统提供统一的入口&#xff0c;封装子系统的复杂性&#xff0c;便于客户端调用。它的核心是什么呢&#xff0c;就是为我们的子系统提供一个统一的入口&#xff0c;封装子…

newOJ-1093: 分香蕉

目录 题目链接&#xff1a; 思路&#xff1a; 坑一&#xff1a; 坑二&#xff1a; 坑三&#xff1a; 代码&#xff1a; 通过10%&#xff1a; 通过80%&#xff1a; 通过100%&#xff1a; 题目链接&#xff1a; P1093 - 分香蕉 - New Online Judge (ecustacm.cn) 思路&a…

RVM安装Ruby笔记(Mac)

环境 硬件&#xff1a;Macbook Pro 系统&#xff1a;macOS 14.1 安装公钥 通过gpg安装公钥失败&#xff0c;报错如下&#xff1a; 换了几个公钥地址&#xff08;hkp://subkeys.pgp.net&#xff0c;hkp://keys.gnupg.net&#xff0c;hkp://pgp.mit.edu&#xff09;&#xff0c;…

使用Flink实现MySQL到Kafka的数据流转换

使用Flink实现MySQL到Kafka的数据流转换 本篇博客将介绍如何使用Flink将数据从MySQL数据库实时传输到Kafka&#xff0c;这是一个常见的用例&#xff0c;适用于需要实时数据connector的场景。 环境准备 在开始之前&#xff0c;确保你的环境中已经安装了以下软件&#xff1a;…

ML-Decoder: Scalable and Versatile Classification Head

1、引言 论文链接&#xff1a;https://openaccess.thecvf.com/content/WACV2023/papers/Ridnik_ML-Decoder_Scalable_and_Versatile_Classification_Head_WACV_2023_paper.pdf 因为 transformer 解码器分类头[1] 在少类别多标签分类数据集上表现得很好&#xff0c;但由于其查询…

PHP的定时任务框架的taskPHP3.0学习记录2(环境要求、配置Redis、crontab执行时间语法、命令操作以及Screen全屏窗口管理器)

环境要求 php版本> 5.5开启socket扩展开启pdo扩展开启shmop扩展 echo <pre>; echo --; $requiredVersion 5.6.0; $currentVersion phpversion(); if (version_compare($currentVersion, $requiredVersion, >)) {echo "1.PHP版本满足要求&#xff0c;当前版…

c语言:vs2022写一个一元二次方程(包含虚根)

求一元二次方程 的根&#xff0c;通过键盘输入a、b、c&#xff0c;根据△的值输出对应x1和x2的值(保留一位小数)(用if语句完成)。 //一元二次方程的实现 #include <stdio.h> #include <math.h> #include <stdlib.h> int main() {double a, b, c, delta, x1…

数据结构 - 算法效率|时间复杂度|空间复杂度

目录 1.算法效率 2.时间复杂度 2.1定义 2.2大O渐近表示法 2.3常见时间复杂度计算举例 3.空间复杂度 3.1定义 3.2常见空间复杂度计算举例 1.算法效率 算法的效率常用算法复杂度来衡量&#xff0c;算法复杂度描述了算法在输入数据规模变化时&#xff0c;其运行时间和空间…

opejdk11 java 启动流程 java main方法怎么被jvm执行

java启动过程 java main方法怎么被jvm执行 java main方法是怎么被jvm调用的 1、jvm main入口 2、执行JLI_Launch方法 3、执行JVMInit方法 4、执行ContinueInNewThread方法 5、执行CallJavaMainInNewThread方法 6、创建线程执行ThreadJavaMain方法 7、执行ThreadJavaMain方法…

Last-Modified:HTTP缓存控制机制解析

&#x1f90d; 前端开发工程师、技术日更博主、已过CET6 &#x1f368; 阿珊和她的猫_CSDN博客专家、23年度博客之星前端领域TOP1 &#x1f560; 牛客高级专题作者、打造专栏《前端面试必备》 、《2024面试高频手撕题》 &#x1f35a; 蓝桥云课签约作者、上架课程《Vue.js 和 E…

docker配置github仓库ghcr国内镜像加速

文章目录 说明ghcr.io简介配置镜像命令地址命令行方式1panel面板方式方式一&#xff1a;配置镜像加速&#xff0c;命令行拉取方式二&#xff1a;配置镜像仓库&#xff0c;可视化拉取 说明 由于使用的容器需要从github下载镜像&#xff0c;服务器在国外下载速度很慢&#xff0c…

26. UE5 RPG同步面板属性(二)

在上一篇&#xff0c;我们解析了UI属性面板的实现步骤&#xff1a; 首先我们需要通过c去实现创建GameplayTag&#xff0c;这样可以在c和UE里同时获取到Tag创建一个DataAsset类&#xff0c;用于设置tag对应的属性和显示内容创建AttributeMenuWidgetController实现对应逻辑 并且…