目录
- 一 ZooKeeper会话的创建与连接
- 1.1 会话的创建
- 1.1.1 ClientWatchManager
- 1.1.2 ConnectStringParser
- 1.1.3 HostProvider
- 1.1.4 ClientCnxn
- 1.2 会话的连接
- 1.2.1 SendThread
- 1.2.2 eventThread
- 二 ZooKeeper会话的响应
- 2.1 接受服务端响应
- 三 ClientCnxn 详解
- 3.1 Packet
- 3.2 队列
- 3.3 ClientCnxnSocket:底层Socket通信层
- 官网:Apache ZooKeeper
ZooKeeper的客户端主要由以下几个核心组件组成。
- ZooKeeper实例:客户端的入口。
- ClientWatchManager:客户端Watcher管理器。
- HostProvider:客户端地址列表管理器。
- ClientCnxn:客户端核心线程,其内部又包含两个线程,即SendThread和EventThread。前者是一个I/O线程,主要负责ZooKeeper客户端和服务端之间的网络I/O通信;后者是一个事件线程,主要负责对服务端事件进行处理。
一 ZooKeeper会话的创建与连接
- ZooKeeper客户端的初始化与启动环节,实际上就是ZooKeeper对象的实例化过程
- 我们可以看到需要的参数:设置默认Watcher,设置ZooKeeper服务器地址列表,创建ClientCnxn。
- 如果在ZooKeeper的构造方法中传入一个Watcher对象的话,那么ZooKeeper就会将这个Watcher对象保存在ZKWatchManager的defaultWatcher中,作为整个客户端会话期间的默认Watcher。
1.1 会话的创建
private final ZKWatchManager watchManager = new ZKWatchManager();public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,boolean canBeReadOnly)
throws IOException
{LOG.info("Initiating client connection, connectString=" + connectString+ " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);watchManager.defaultWatcher = watcher;ConnectStringParser connectStringParser = new ConnectStringParser(connectString);HostProvider hostProvider = new StaticHostProvider(connectStringParser.getServerAddresses());cnxn = new ClientCnxn(connectStringParser.getChrootPath(),hostProvider, sessionTimeout, this, watchManager,getClientCnxnSocket(), canBeReadOnly);cnxn.start();
}
- 通过调用ZooKeeper的构造方法来实例化一个ZooKeeper对象,在初始化过程中,会创建一个客户端的Watcher管理器:ClientWatchManager。
- 如果在构造方法中传入了一个Watcher对象,那么客户端会将这个对象作为默认Watcher保存在ClientWatchManager中。
- 对于构造方法中传入的服务器地址,客户端会将其存放在服务器地址列表管理器HostProvider中。
- ZooKeeper客户端首先会创建一个网络连接器ClientCnxn,用来管理客户端与服务器的网络交互。另外,客户端在创建ClientCnxn的同时,还会初始化客户端两个核心队列outgoingQueue和pendingQueue,分别作为客户端的请求发送队列和服务端响应的等待队列。
1.1.1 ClientWatchManager
- ZKWatchManager类的主要作用是管理和触发Zookeeper客户端的监视事件(watches)。
- 在Zookeeper中,监视事件是一种机制,允许客户端在Zookeeper服务上注册兴趣,当指定节点的数据发生变化或者子节点列表发生变化时,客户端会收到通知。
- 这个类中定义了三个Map类型的成员变量,分别用来存储数据监视(dataWatches)、存在监视(existWatches)和子节点监视(childWatches)的Watcher集合。Watcher是一个接口,它的实现类可以定义当特定事件发生时客户端需要执行的操作。
- materialize方法是ClientWatchManager接口的一个实现,它的作用是根据事件类型和状态来触发相应的Watcher。方法的参数包括Zookeeper的连接状态(KeeperState)、事件类型(EventType)和客户端路径(clientPath)。
根据事件类型,materialize方法会执行以下操作:
- None类型:表示没有特定的事件发生,此时会触发默认监视器(defaultWatcher),并且如果配置了禁用自动重置监视器,并且当前连接状态不是同步连接状态,那么会清除所有的监视器集合。
- NodeDataChanged和NodeCreated类型:表示节点数据发生变化或者新节点被创建,此时会移除与指定路径相关的数据监视器和存在监视器,并将它们添加到结果集合中。
- NodeChildrenChanged类型:表示子节点列表发生变化,此时会移除与指定路径相关的子节点监视器,并将它们添加到结果集合中。
- NodeDeleted类型:表示节点被删除,此时会移除与指定路径相关的数据监视器、存在监视器和子节点监视器,并将它们添加到结果集合中。如果在存在监视器中发现了一个不应该出现的节点被删除的情况,将会记录一条警告日志。
- 其他未处理的事件类型:将会记录一条错误日志,并抛出运行时异常。
1.1.2 ConnectStringParser
- 解析字符串转为InetSocketAddress存在集合中
private final ArrayList<InetSocketAddress> serverAddresses = new ArrayList<InetSocketAddress>();
- 在3.2.0及其之后版本的ZooKeeper中,添加了“Chroot”特性[插图],该特性允许每个客户端为自己设置一个命名空间(Namespace)。如果一个ZooKeeper客户端设置了Chroot,那么该客户端对服务器的任何操作,都将会被限制在其自己的命名空间下。
private final String chrootPath;
1.1.3 HostProvider
在ConnectStringParser解析器中会对服务器地址做一个简单的处理,并将服务器地址和相应的端口封装成一个InetSocketAddress对象,以ArrayList形式保存在ConnectStringParser.serverAddresses属性中,然后,经过处理的地址列表会被进一步封装到StaticHostProvider类中。
- 我们可以看到默认实现:StaticHostProvider,把解析号可用的InetSocketAddress,最后进行一个打散
- 通过调用StaticHostProvider的next()方法,能够从StaticHostProvider中获取一个可用的服务器地址。这个next()方法并非简单地从serverAddresses中依次获取一个服务器地址,而是先将随机打散后的服务器地址列表拼装成一个环形循环队列
1.1.4 ClientCnxn
- ClientCnxn是ZooKeeper客户端的核心工作类,负责维护客户端与服务端之间的网络连接并进行一系列网络通信。
- 客户端会创建两个核心网络线程SendThread和EventThread,前者用于管理客户端和服务端之间的所有网络I/O,后者则用于进行客户端的事件处理。
- 同时,客户端还会将ClientCnxnSocket分配给SendThread作为底层网络I/O处理器,并初始化EventThread的待处理事件队列waitingEvents,用于存放所有等待被客户端处理的事件。
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {this.zooKeeper = zooKeeper;this.watcher = watcher;this.sessionId = sessionId;this.sessionPasswd = sessionPasswd;this.sessionTimeout = sessionTimeout;this.hostProvider = hostProvider;this.chrootPath = chrootPath;connectTimeout = sessionTimeout / hostProvider.size();readTimeout = sessionTimeout * 2 / 3;readOnly = canBeReadOnly;sendThread = new SendThread(clientCnxnSocket);eventThread = new EventThread();}
1.2 会话的连接
public void start() {sendThread.start();eventThread.start();}
启动SendThread和EventThread
1.2.1 SendThread
SendThread首先会判断当前客户端的状态,进行一系列清理性工作,为客户端发送“会话创建”请求做准备。
在开始创建TCP连接之前,SendThread首先需要获取一个ZooKeeper服务器的目标地址,这通常是从HostProvider中随机获取出一个地址,然后委托给ClientCnxnSocket去创建与ZooKeeper服务器之间的TCP连接。
private void startConnect() throws IOException {state = States.CONNECTING;InetSocketAddress addr;if (rwServerAddress != null) {addr = rwServerAddress;rwServerAddress = null;} else {addr = hostProvider.next(1000);}setName(getName().replaceAll("\\(.*\\)","(" + addr.getHostName() + ":" + addr.getPort() + ")"));if (ZooKeeperSaslClient.isEnabled()) {try {String principalUserName = System.getProperty(ZK_SASL_CLIENT_USERNAME, "zookeeper");zooKeeperSaslClient =new ZooKeeperSaslClient(principalUserName+"/"+addr.getHostName());} catch (LoginException e) {// An authentication error occurred when the SASL client tried to initialize:// for Kerberos this means that the client failed to authenticate with the KDC.// This is different from an authentication error that occurs during communication// with the Zookeeper server, which is handled below.LOG.warn("SASL configuration failed: " + e + " Will continue connection to Zookeeper server without "+ "SASL authentication, if Zookeeper server allows it.");eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None,Watcher.Event.KeeperState.AuthFailed, null));saslLoginFailed = true;}}logStartConnect(addr);clientCnxnSocket.connect(addr);}
获取到一个服务器地址后,ClientCnxnSocket负责和服务器创建一个TCP长连接。
这里有两个实现类,默认第一个,封装连接请求,ConnectRequest,最后在封装成Packet对象,放入请求发送队列outgoingQueue中去。
当客户端请求准备完毕后,就可以开始向服务端发送请求了。ClientCnxnSocket负责从outgoingQueue中取出一个待发送的Packet对象,将其序列化成ByteBuffer后,向服务端进行发送。
void sendPacket(Packet p) throws IOException {SocketChannel sock = (SocketChannel) sockKey.channel();if (sock == null) {throw new IOException("Socket is null!");}p.createBB();ByteBuffer pbb = p.bb;sock.write(pbb);}
1.2.2 eventThread
- 启动一个线程不断轮训事件,对不同的事件对出不同步的反应,processEvent方法是关键
首先,方法通过instanceof关键字检查传入的事件对象是否是WatcherSetEventPair类型。如果是,这意味着事件包含了一组Watcher对象,每个Watcher都会处理这个事件。代码遍历这些Watcher对象,并调用它们的process方法来处理事件。如果在处理过程中抛出异常,将会记录错误日志。
如果事件对象不是WatcherSetEventPair类型,那么它将被当作Packet类型来处理。Packet对象包含了客户端路径、回复头信息以及回调接口(cb)。代码首先检查回复头中的错误码,如果有错误,就将错误码保存在局部变量rc中。
接下来,代码根据响应的类型(例如ExistsResponse、GetDataResponse等)来调用相应的回调接口方法。这些回调接口是Zookeeper客户端用来接收操作结果的机制。例如,如果响应是GetDataResponse类型,那么代码会调用DataCallback接口的processResult方法,并传入操作结果码、客户端路径、上下文信息、节点数据和状态信息。
此外,代码还处理了MultiResponse类型,这是一种特殊的情况,表示一个请求包含了多个操作,每个操作都有自己的结果。在这种情况下,代码会遍历结果列表,并在所有操作都成功的情况下调用MultiCallback接口的processResult方法。
最后,如果回调接口是VoidCallback类型,那么代码会调用processResult方法,但不会传入任何数据,因为VoidCallback不关心操作结果。
二 ZooKeeper会话的响应
2.1 接受服务端响应
ClientCnxnSocket接收到服务端的响应后,会首先判断当前的客户端状态是否是“已初始化”,如果尚未完成初始化,那么就认为该响应一定是会话创建请求的响应,直接交由readConnectResult方法来处理该响应。
- 反序列响应结果,使用Jute进行的
- 连接成功后,一方面需要通知SendThread线程,进一步对客户端进行会话参数的设置,包括readTimeout和connectTimeout等,并更新客户端状态;另一方面,需要通知地址管理器HostProvider当前成功连接的服务器地址。
- 为了能够让上层应用感知到会话的成功创建,SendThread会生成一个事件SyncConnected-None,代表客户端与服务器会话创建成功,并将该事件传递给EventThread线程。
EventThread线程收到事件后,会从ClientWatchManager管理器中查询出对应的Watcher,针对SyncConnected-None事件,那么就直接找出步骤2中存储的默认Watcher,然后将其放到EventThread的waitingEvents队列中去。
EventThread不断地从waitingEvents队列中取出待处理的Watcher对象,然后直接调用该对象的process接口方法,以达到触发Watcher的目的。
三 ClientCnxn 详解
3.1 Packet
Packet是ClientCnxn内部定义的一个对协议层的封装,作为ZooKeeper中请求与响应的载体
Packet中包含了最基本的请求头(requestHeader)、响应头(replyHeader)、请求体(request)、响应体(response)、节点路径(clientPath/serverPath)和注册的Watcher(watchRegistration)等信息。
Packet的createBB()方法负责对Packet对象进行序列化,最终生成可用于底层网络传输的ByteBuffer对象。在这个过程中,只会将requestHeader、request和readOnly三个属性进行序列化,其余属性都保存在客户端的上下文中,不会进行与服务端之间的网络传输。
public void createBB() {try {ByteArrayOutputStream baos = new ByteArrayOutputStream();BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);boa.writeInt(-1, "len"); // We'll fill this in laterif (requestHeader != null) {requestHeader.serialize(boa, "header");}if (request instanceof ConnectRequest) {request.serialize(boa, "connect");// append "am-I-allowed-to-be-readonly" flagboa.writeBool(readOnly, "readOnly");} else if (request != null) {request.serialize(boa, "request");}baos.close();this.bb = ByteBuffer.wrap(baos.toByteArray());this.bb.putInt(this.bb.capacity() - 4);this.bb.rewind();} catch (IOException e) {LOG.warn("Ignoring unexpected exception", e);}}
3.2 队列
outgoingQueue和pendingQueueClientCnxn中,有两个比较核心的队列outgoingQueue和pendingQueue,分别代表客户端的请求发送队列和服务端响应的等待队列。Outgoing队列是一个请求发送队列,专门用于存储那些需要发送到服务端的Packet集合。Pending队列是为了存储那些已经从客户端发送到服务端的,但是需要等待服务端响应的Packet集合。
3.3 ClientCnxnSocket:底层Socket通信层
- ClientCnxnSocket定义了底层Socket通信的接口。在ZooKeeper3.4.0以前的版本中,客户端的这个底层通信层并没有被独立出来,而是混合在了ClientCnxn代码中。
- 但后来为了使客户端代码结构更为清晰,同时也是为了便于对底层Socket层进行扩展(例如使用Netty来实现),因此从3.4.0版本开始,抽取出了这个接口类。在使用ZooKeeper客户端的时候,可以通过在zookeeper.clientCnxnSocket这个系统变量中配置ClientCnxnSocket实现类的全类名,以指定底层Socket通信层的自定义实现,例如,-Dzookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNIO。在ZooKeeper中,其默认的实现是ClientCnxnSocketNIO。该实现类使用Java原生的NIO接口,其核心是doIO逻辑,主要负责对请求的发送和响应接收过程。