文章目录
- 一、Zookeeper算法一致性
- 1、Paxos 算法
- 1.1 概述
- 1.2 算法流程
- 1.3 算法缺陷
- 2、ZAB 协议
- 2.1 概述
- 2.2 Zab 协议内容
- 3、CAP理论
- 二、源码详解
- 1、辅助源码
- 1.1 持久化源码(了解)
- 1.2 序列化源码
- 2、ZK 服务端初始化源码解析
- 2.1 启用脚本分析
- 2.2 ZK 服务端启动入口
- 2.3 解析参数 zoo.cfg 和 myid
- 2.4 过期快照删除
- 2.5 初始化通信组件
- 3、ZK 服务端加载数据源码解析
- 3.1 冷启动数据恢复快照数据
- 3.2 冷启动数据恢复编辑日志
- 4、ZK 选举源码解析
- 4.1 选举准备
- 4.2 选举执行
- 5、Follower 和 Leader 状态同步源码
- 6、服务端启动
- 6.1 Leader 启动
- 6.2 Follower 启动
- 7、客户端启动
- 7.1 创建 ZookeeperMain
- 7.2 初始化监听器
- 7.3 解析连接地址
- 7.4 创建通信
- 7.5 执行 run()
一、Zookeeper算法一致性
1、Paxos 算法
1.1 概述
Paxos算法:一种基于消息传递且具有高度容错特性的一致性算法。Paxos算法解决的问题:就是如何快速正确的在一个分布式系统中对某个数据值达成一致,并且保证不论发生任何异常,都不会破坏整个系统的一致性。
在一个Paxos系统中,首先将所有节点划分为Proposer(提议者),Acceptor(接受者),和Learner(学习者)。(注意:每个节点都可以身兼数职),一个完整的Paxos算法流程分为三个阶段:
Prepare准备阶段
- Proposer向多个Acceptor发出Propose请求Promise(承诺)
- Acceptor针对收到的Propose请求进行Promise(承诺)
Accept接受阶段
- Proposer收到多数Acceptor承诺的Promise后,向Acceptor发出Propose请求
- Acceptor针对收到的Propose请求进行Accept处理
Learn学习阶段:Proposer将形成的决议发送给所有Learners
1.2 算法流程
1.3 算法缺陷
在网络复杂的情况下,一个应用 Paxos 算法的分布式系统,可能很久无法收敛,甚至陷入活锁的情况。造成这种情况的原因是系统中有一个以上的 Proposer,多个Proposers 相互争夺Acceptor,造成迟迟无法达成一致的情况。针对这种情况,一种改进的 Paxos 算法被提出:从系统中选出一个节点作为 Leader,只有 Leader 能够发起提案。这样,一次 Paxos 流程中只有一个Proposer,不会出现活锁的情况
2、ZAB 协议
2.1 概述
Zab 借鉴了 Paxos 算法,是特别为 Zookeeper 设计的支持崩溃恢复的原子广播协议。基于该协议,Zookeeper 设计为只有一台客户端(Leader)负责处理外部的写事务请求,然后Leader 客户端将数据同步到其他 Follower 节点。即 Zookeeper 只有一个 Leader 可以发起提案
2.2 Zab 协议内容
Zab 协议包括两种基本的模式:消息广播、崩溃恢复
3、CAP理论
CAP理论告诉我们,一个分布式系统不可能同时满足以下三种
-
一致性(C:Consistency)
在分布式环境中,一致性是指数据在多个副本之间是否能够保持数据一致的特性。在一致性的需求下,当一个系统在数据一致的状态下执行更新操作后,应该保证系统的数据仍然处于一致的状态。
-
可用性(A:Available)
可用性是指系统提供的服务必须一直处于可用的状态,对于用户的每一个操作请求总是能够在有限的时间内返回结果
-
分区容错性(P:Partition Tolerance)
分布式系统在遇到任何网络分区故障的时候,仍然需要能够保证对外提供满足一致性和可用性的服务,除非是整个网络环境都发生了故障
这三个基本需求,最多只能同时满足其中的两项,因为P是必须的,因此往往选择就在CP或者AP中。ZooKeeper保证的是CP
- ZooKeeper不能保证每次服务请求的可用性。(注:在极端环境下,ZooKeeper可能会丢弃一些请求,消费者程序需要重新请求才能获得结果)。所以说,ZooKeeper不能保证服务可用性。
- 进行Leader选举时集群都是不可用
二、源码详解
Zookeeper源码下载地址:https://archive.apache.org/dist/zookeeper/zookeeper-3.5.7/
1、辅助源码
1.1 持久化源码(了解)
Leader 和 Follower 中的数据会在内存和磁盘中各保存一份。所以需要将内存中的数据持久化到磁盘中,在 org.apache.zookeeper.server.persistence 包下的相关类都是序列化相关的代码
public interface SnapShot {// 反序列化方法long deserialize(DataTree dt, Map<Long, Integer> sessions)throws IOException;// 序列化方法void serialize(DataTree dt, Map<Long, Integer> sessions,File name) throws IOException;/***find the most recent snapshot file*查找最近的快照文件*/File findMostRecentSnapshot() throws IOException;// 释放资源void close() throws IOException;
}public interface TxnLog {// 设置服务状态void setServerStats(ServerStats serverStats);// 滚动日志void rollLog() throws IOException;// 追 加boolean append(TxnHeader hdr, Record r) throws IOException;// 读取数据TxnIterator read(long zxid) throws IOException;// 获取最后一个 zxidlong getLastLoggedZxid() throws IOException;// 删除日志boolean truncate(long zxid) throws IOException;// 获 取 DbIdlong getDbId() throws IOException;// 提 交void commit() throws IOException;// 日志同步时间long getTxnLogSyncElapsedTime();// 关闭日志void close() throws IOException;// 读取日志的接口public interface TxnIterator {// 获取头信息TxnHeader getHeader();// 获取传输的内容Record getTxn();// 下一条记录boolean next() throws IOException;// 关闭资源void close() throws IOException;// 获取存储的大小long getStorageSize() throws IOException;}
}
1.2 序列化源码
zookeeper-jute 代码是关于Zookeeper 序列化相关源码
2、ZK 服务端初始化源码解析
2.1 启用脚本分析
zkServer.sh start 底层的实际执行内容,所以程序的入口是 QuorumPeerMain.java 类
nohup "$JAVA"
+ 一堆提交参数
+ $ZOOMAIN(org.apache.zookeeper.server.quorum.QuorumPeerMain)
+ "$ZOOCFG"(zkEnv.sh 文件中 ZOOCFG="zoo.cfg")
2.2 ZK 服务端启动入口
源码里查找QuorumPeerMain类
public static void main(String[] args) {// 创建了一个 zk 节点QuorumPeerMain main = new QuorumPeerMain();try {// 初始化节点并运行,args 相当于提交参数中的 zoo.cfgmain.initializeAndRun(args);} catch (IllegalArgumentException e) {... ...}LOG.info("Exiting normally"); System.exit(0);
}protected void initializeAndRun(String[] args)
throws ConfigException, IOException, AdminServerException{// 管理 zk 的配置信息QuorumPeerConfig config = new QuorumPeerConfig();if (args.length == 1) {// 1 解析参数,zoo.cfg 和 myidconfig.parse(args[0]);}// 2 启动定时任务,对过期的快照,执行删除(默认该功能关闭)// Start and schedule the the purge taskDatadirCleanupManager purgeMgr = new DatadirCleanupManager(config.getDataDir(), config.getDataLogDir(), config.getSnapRetainCount(), config.getPurgeInterval()); purgeMgr.start();if (args.length == 1 && config.isDistributed()) {// 3 启动集群runFromConfig(config);} else {LOG.warn("Either no config or no quorum defined in config, running "+ " in standalone mode");// there is only server in the quorum -- run as standalone ZooKeeperServerMain.main(args);}
}
2.3 解析参数 zoo.cfg 和 myid
public void parse(String path) throws ConfigException {LOG.info("Reading configuration from: " + path);try {// 校验文件路径及是否存在File configFile = (new VerifyingFileFactory.Builder(LOG).warnForRelativePath().failForNonExistingPath().build()).create(path);Properties cfg = new Properties();FileInputStream in = new FileInputStream(configFile); // 加载配置文件cfg.load(in);configFileStr = path;} finally {in.close();}// 解析配置文件parseProperties(cfg);} catch (IOException e) {throw new ConfigException("Error processing " + path, e);} catch (IllegalArgumentException e) {throw new ConfigException("Error processing " + path, e);}... ...
}// parseProperties(cfg)方法拉到最下面setupQuorumPeerConfigvoid setupQuorumPeerConfig(Properties prop, boolean configBackwardCompatibilityMode)throws IOException, ConfigException {quorumVerifier = parseDynamicConfig(prop, electionAlg, true, configBackwardCompatibilityMode);setupMyId();setupClientPort();setupPeerType();checkValidity();
}private void setupMyId() throws IOException {File myIdFile = new File(dataDir, "myid");// standalone server doesn't need myid file.if (!myIdFile.isFile()) {return;}BufferedReader br = new BufferedReader(new FileReader(myIdFile));String myIdString;try {myIdString = br.readLine();} finally {br.close();}try {// 将解析 myid 文件中的 id 赋值给 serverIdserverId = Long.parseLong(myIdString);MDC.put("myid", myIdString);} catch (NumberFormatException e) {throw new IllegalArgumentException("serverid " + myIdString+ " is not a number");}
}
2.4 过期快照删除
可以启动定时任务,对过期的快照,执行删除。默认该功能时关闭的
// 2 启动定时任务,对过期的快照,执行删除(默认是关闭)
// config.getSnapRetainCount() = 3 最少保留的快照个数
// config.getPurgeInterval() = 0 默认 0 表示关闭
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config.getDataDir(), config.getDataLogDir(), config.getSnapRetainCount(), config.getPurgeInterval());
purgeMgr.start();public void start() {if (PurgeTaskStatus.STARTED == purgeTaskStatus) { LOG.warn("Purge task is already running."); return;}// 默认情况 purgeInterval=0,该任务关闭,直接返回// Don't schedule the purge task with zero or negative purge interval.if (purgeInterval <= 0) {LOG.info("Purge task is not scheduled."); return;}// 创建一个定时器timer = new Timer("PurgeTask", true);// 创建一个清理快照任务TimerTask task = new PurgeTask(dataLogDir, snapDir, snapRetainCount);// 如果 purgeInterval 设置的值是 1,表示 1 小时检查一次,判断是否有过期快照, 有则删除timer.scheduleAtFixedRate(task, 0, TimeUnit.HOURS.toMillis(purgeInterval));purgeTaskStatus = PurgeTaskStatus.STARTED;
}static class PurgeTask extends TimerTask { private File logsDir;private File snapsDir; private int snapRetainCount;public PurgeTask(File dataDir, File snapDir, int count) { logsDir = dataDir;snapsDir = snapDir; snapRetainCount = count;
}@Overridepublic void run() {LOG.info("Purge task started."); try {// 清理过期的数据PurgeTxnLog.purge(logsDir, snapsDir, snapRetainCount);} catch (Exception e) {LOG.error("Error occurred while purging.", e);}LOG.info("Purge task completed.");}
}public static void purge(File dataDir, File snapDir, int num) throws IOException { if (num < 3) {throw new IllegalArgumentException(COUNT_ERR_MSG);}FileTxnSnapLog txnLog = new FileTxnSnapLog(dataDir, snapDir); List<File> snaps = txnLog.findNRecentSnapshots(num);int numSnaps = snaps.size(); if (numSnaps > 0) {purgeOlderSnapshots(txnLog, snaps.get(numSnaps - 1));}
}
2.5 初始化通信组件
if (args.length == 1 && config.isDistributed()) {runFromConfig(config);
} else {LOG.warn("Either no config or no quorum defined in config, running "+ " in standalone mode");// there is only server in the quorum -- run as standaloneZooKeeperServerMain.main(args);
}// 通信协议默认 NIO
public void runFromConfig(QuorumPeerConfig config)throws IOException, AdminServerException{
......
LOG.info("Starting quorum peer");
try {ServerCnxnFactory cnxnFactory = null;ServerCnxnFactory secureCnxnFactory = null;// 通信组件初始化,默认是 NIO 通信if (config.getClientPortAddress() != null) {// zookeeperAdmin.md 文件中//Default is `NIOServerCnxnFactorycnxnFactory = ServerCnxnFactory.createFactory();cnxnFactory.configure(config.getClientPortAddress(),config.getMaxClientCnxns(), false);}if (config.getSecureClientPortAddress() != null) {secureCnxnFactory = ServerCnxnFactory.createFactory();secureCnxnFactory.configure(config.getSecureClientPortAddress(),config.getMaxClientCnxns(), true);}// 把解析的参数赋值给该 zookeeper 节点quorumPeer = getQuorumPeer();quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(),config.getDataDir()));quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());quorumPeer.enableLocalSessionsUpgrading(config.isLocalSessionsUpgradingEnabled());//quorumPeer.setQuorumPeers(config.getAllMembers());quorumPeer.setElectionType(config.getElectionAlg());quorumPeer.setMyid(config.getServerId());quorumPeer.setTickTime(config.getTickTime());quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());quorumPeer.setInitLimit(config.getInitLimit());quorumPeer.setSyncLimit(config.getSyncLimit());quorumPeer.setConfigFileName(config.getConfigFilename());// 管理 zk 数据的存储quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);if (config.getLastSeenQuorumVerifier()!=null) {quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(),false);}quorumPeer.initConfigInZKDatabase();// 管理 zk 的通信quorumPeer.setCnxnFactory(cnxnFactory);quorumPeer.setSecureCnxnFactory(secureCnxnFactory);quorumPeer.setSslQuorum(config.isSslQuorum());quorumPeer.setUsePortUnification(config.shouldUsePortUnification());quorumPeer.setLearnerType(config.getPeerType());quorumPeer.setSyncEnabled(config.getSyncEnabled());quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());if (config.sslQuorumReloadCertFiles) {quorumPeer.getX509Util().enableCertFileReloading();}......quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);quorumPeer.initialize();// 启动 zkquorumPeer.start();quorumPeer.join();} catch (InterruptedException e) {// warn, but generally this is okLOG.warn("Quorum Peer interrupted", e);}
}
初始化 NIO 服务端 Socket(并未启动),ctrl + alt +B 查找 configure 实现类,NIOServerCnxnFactory.java
@Override
public void configure(InetSocketAddress addr, int maxcc, boolean secure) throws IOException {......// 初始化 NIO 服务端 socket,绑定 2181 端口,可以接收客户端请求this.ss = ServerSocketChannel.open();ss.socket().setReuseAddress(true);LOG.info("binding to port " + addr);// 绑定 2181 端口ss.socket().bind(addr);ss.configureBlocking(false);acceptThread = new AcceptThread(ss, addr, selectorThreads);
}
3、ZK 服务端加载数据源码解析
3.1 冷启动数据恢复快照数据
public synchronized void start() {if (!getView().containsKey(myid)) {throw new RuntimeException("My id " + myid + " not in the peer list");}// 冷启动数据恢复loadDataBase();startServerCnxnFactory(); try {// 启动通信工厂实例对象adminServer.start();} catch (AdminServerException e) { LOG.warn("Problem starting AdminServer", e); System.out.println(e);}// 准备选举环境startLeaderElection();// 执行选举super.start();
}private void loadDataBase() {try {// 加载磁盘数据到内存,恢复 DataTree// zk 的操作分两种:事务操作和非事务操作// 事务操作:zk.cteate();都会被分配一个全局唯一的 zxid,zxid 组成:64 位:(前 32 位:epoch 每个 leader 任期的代号;后 32 位:txid 为事务 id)// 非事务操作:zk.getData()zkDb.loadDataBase();// load the epochslong lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);......
}public long restore(DataTree dt, Map<Long, Integer> sessions,PlayBackListener listener) throws IOException {// 恢复快照文件数据到 DataTreelong deserializeResult = snapLog.deserialize(dt, sessions);FileTxnLog txnLog = new FileTxnLog(dataDir);RestoreFinalizer finalizer = () -> {// 恢复编辑日志数据到 DataTreelong highestZxid = fastForwardFromEdits(dt, sessions, listener);return highestZxid;......
}//ctrl + alt +B 查找 deserialize 实现类 FileSnap.java
public long deserialize(DataTree dt, Map<Long, Integer> sessions)throws IOException {......// 依次遍历每一个快照的数据for (int i = 0, snapListSize = snapList.size(); i < snapListSize; i++) {snap = snapList.get(i);LOG.info("Reading snapshot " + snap);// 反序列化环境准备try (InputStream snapIS = new BufferedInputStream(new FileInputStream(snap));CheckedInputStream crcIn = new CheckedInputStream(snapIS, new Adler32())) {InputArchive ia = BinaryInputArchive.getArchive(crcIn);// 反序列化,恢复数据到 DataTreedeserialize(dt, sessions, ia);......
}public void deserialize(DataTree dt, Map<Long, Integer> sessions,InputArchive ia) throws IOException {FileHeader header = new FileHeader();header.deserialize(ia, "fileheader");if (header.getMagic() != SNAP_MAGIC) {throw new IOException("mismatching magic headers "+ header.getMagic() +" != " + FileSnap.SNAP_MAGIC);}// 恢复快照数据到 DataTreeSerializeUtils.deserializeSnapshot(dt,ia,sessions);
}public static void deserializeSnapshot(DataTree dt,InputArchive ia,Map<Long, Integer> sessions) throws IOException {int count = ia.readInt("count");while (count > 0) {long id = ia.readLong("id");int to = ia.readInt("timeout");sessions.put(id, to);if (LOG.isTraceEnabled()) {ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,"loadData --- session in archive: " + id+ " with timeout: " + to);}count--;}// 恢复快照数据到 DataTreedt.deserialize(ia, "tree");
}public void deserialize(InputArchive ia, String tag) throws IOException {aclCache.deserialize(ia);nodes.clear();pTrie.clear();String path = ia.readString("path");// 从快照中恢复每一个 datanode 节点数据到 DataTreewhile (!"/".equals(path)) {// 每次循环创建一个节点对象 DataNode node = new DataNode();ia.readRecord(node, "node");// 将 DataNode 恢复到 DataTreenodes.put(path, node);synchronized (node) {aclCache.addUsage(node.acl);}int lastSlash = path.lastIndexOf('/');if (lastSlash == -1) {root = node;} else {// 处理父节点String parentPath = path.substring(0, lastSlash);DataNode parent = nodes.get(parentPath);if (parent == null) {throw new IOException("Invalid Datatree, unable to find " +"parent " + parentPath + " of path " + path);}// 处理子节点parent.addChild(path.substring(lastSlash + 1));// 处理临时节点和永久节点long eowner = node.stat.getEphemeralOwner();EphemeralType ephemeralType = EphemeralType.get(eowner);if (ephemeralType == EphemeralType.CONTAINER) {containers.add(path);} else if (ephemeralType == EphemeralType.TTL) {ttls.add(path);} else if (eowner != 0) {HashSet<String> list = ephemerals.get(eowner);if (list == null) {list = new HashSet<String>();ephemerals.put(eowner, list);}list.add(path);}}path = ia.readString("path");}nodes.put("/", root);// we are done with deserializing the// the datatree// update the quotas - create path trie// and also update the stat nodessetupQuota();aclCache.purgeUnused();
}
3.2 冷启动数据恢复编辑日志
回到 FileTxnSnapLog.java 类中的 restore 方法
public long restore(DataTree dt, Map<Long, Integer> sessions,PlayBackListener listener) throws IOException {// 恢复快照文件数据到 DataTreelong deserializeResult = snapLog.deserialize(dt, sessions);FileTxnLog txnLog = new FileTxnLog(dataDir);RestoreFinalizer finalizer = () -> {// 恢复编辑日志数据到 DataTreelong highestZxid = fastForwardFromEdits(dt, sessions, listener);return highestZxid;};......
}public long fastForwardFromEdits(DataTree dt, Map<Long, Integer> sessions,PlayBackListener listener) throws IOException {// 在此之前,已经从快照文件中恢复了大部分数据,接下来只需从快照的 zxid + 1位置开始恢复TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);// 快照中最大的 zxid,在执行编辑日志时,这个值会不断更新,直到所有操作执行完long highestZxid = dt.lastProcessedZxid;TxnHeader hdr;try {// 从 lastProcessedZxid 事务编号器开始,不断的从编辑日志中恢复剩下的还没有恢复的数据while (true) {// 获取事务头信息(有 zxid)hdr = itr.getHeader();if (hdr == null) {//empty logsreturn dt.lastProcessedZxid;}if (hdr.getZxid() < highestZxid && highestZxid != 0) {LOG.error("{}(highestZxid) > {}(next log) for type {}",highestZxid, hdr.getZxid(), hdr.getType());} else {highestZxid = hdr.getZxid();}try {// 根据编辑日志恢复数据到 DataTree,每执行一次,对应的事务 id,highestZxid + 1processTransaction(hdr,dt,sessions, itr.getTxn());} catch(KeeperException.NoNodeException e) {throw new IOException("Failed to process transaction type: " +hdr.getType() + " error: " + e.getMessage(), e);}listener.onTxnLoaded(hdr, itr.getTxn());if (!itr.next())break;}} finally {if (itr != null) {itr.close();}}return highestZxid;
}
4、ZK 选举源码解析
选举流程可以参考之前的zookeeper基础学习
4.1 选举准备
@Override
public synchronized void start() {if (!getView().containsKey(myid)) {throw new RuntimeException("My id " + myid + " not in the peer list");}loadDataBase();startServerCnxnFactory();try {adminServer.start();} catch (AdminServerException e) {LOG.warn("Problem starting AdminServer", e);System.out.println(e);}// 选举准备startLeaderElection();super.start();
}synchronized public void startLeaderElection() {try {if (getPeerState() == ServerState.LOOKING) {// 创建选票// (1)选票组件:epoch(leader 的任期代号)、zxid(某个 leader 当选期间执行的事务编号)、myid(serverid)// (2)开始选票时,都是先投自己currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());}......// 创建选举算法实例this.electionAlg = createElectionAlgorithm(electionType);
}protected Election createElectionAlgorithm(int electionAlgorithm){Election le=null;......case 3:// 1 创建 QuorumCnxnManager,负责选举过程中的所有网络通信QuorumCnxManager qcm = createCnxnManager();QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);if (oldQcm != null) {LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)");oldQcm.halt();}QuorumCnxManager.Listener listener = qcm.listener;if(listener != null){// 2 启动监听线程listener.start();// 3 准备开始选举FastLeaderElection fle = new FastLeaderElection(this, qcm);fle.start();le = fle;} else {LOG.error("Null listener when initializing cnx manager");}......
}// 网络通信组件初始化
public QuorumCnxManager createCnxnManager() {return new QuorumCnxManager(this,this.getId(),this.getView(),this.authServer,this.authLearner,this.tickTime * this.syncLimit,this.getQuorumListenOnAllIPs(),this.quorumCnxnThreadsSize,this.isQuorumSaslAuthEnabled());
}public QuorumCnxManager(QuorumPeer self,final long mySid,Map<Long,QuorumPeer.QuorumServer> view,QuorumAuthServer authServer,QuorumAuthLearner authLearner,int socketTimeout,boolean listenOnAllIPs,int quorumCnxnThreadsSize,boolean quorumSaslAuthEnabled) {// 创建各种队列this.recvQueue = new ArrayBlockingQueue<Message>(RECV_CAPACITY);this.queueSendMap = new ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>>();this.senderWorkerMap = new ConcurrentHashMap<Long, SendWorker>();this.lastMessageSent = new ConcurrentHashMap<Long, ByteBuffer>();String cnxToValue = System.getProperty("zookeeper.cnxTimeout");if(cnxToValue != null){this.cnxTO = Integer.parseInt(cnxToValue);}this.self = self;this.mySid = mySid;this.socketTimeout = socketTimeout;this.view = view;this.listenOnAllIPs = listenOnAllIPs;initializeAuth(mySid, authServer, authLearner, quorumCnxnThreadsSize,quorumSaslAuthEnabled);// Starts listener thread that waits for connection requestslistener = new Listener();listener.setName("QuorumPeerListener");
}
监听线程初始化,点击 QuorumCnxManager.Listener,找到对应的 run 方法
@Override
public void run() {......LOG.info("My election bind port: " + addr.toString());setName(addr.toString());// 绑定服务器地址ss.bind(addr);// 死循环while (!shutdown) {try {// 阻塞,等待处理请求client = ss.accept();......
}
选举准备,点击 FastLeaderElection
public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager){ this.stop = false;this.manager = manager;starter(self, manager);
}private void starter(QuorumPeer self, QuorumCnxManager manager) { this.self = self;proposedLeader = -1;proposedZxid = -1;// 初始化队列和信息sendqueue = new LinkedBlockingQueue<ToSend>();recvqueue = new LinkedBlockingQueue<Notification>(); this.messenger = new Messenger(manager);
}
4.2 选举执行
QuorumPeer.java中执行 super.start();
就相当于执行 QuorumPeer.java 类中的 run()方法。当 Zookeeper 启动后,首先都是 Looking 状态,通过选举,让其中一台服务器成为 Leader,其他的服务器成为 Follower。
@Override
public void run() {......while (running) {switch (getPeerState()) {case LOOKING:LOG.info("LOOKING");......// 进行选举,选举结束,返回最终成为 Leader 胜选的那张选票setCurrentVote(makeLEStrategy().lookForLeader());......case FOLLOWING:try {LOG.info("FOLLOWING");setFollower(makeFollower(logFactory));follower.followLeader();} catch (Exception e) {LOG.warn("Unexpected exception",e);} finally {follower.shutdown();setFollower(null);updateServerState();}break;case LEADING:LOG.info("LEADING");try {setLeader(makeLeader(logFactory));leader.lead();setLeader(null);} catch (Exception e) {LOG.warn("Unexpected exception",e);} finally {if (leader != null) {leader.shutdown("Forcing shutdown");setLeader(null);}updateServerState();}break;}start_fle = Time.currentElapsedTime();}} finally {......
}
ctrl+alt+b 点击 lookForLeader()的实现类 FastLeaderElection.java
public Vote lookForLeader() throws InterruptedException {......try {// 正常启动中,所有其他服务器,都会给我发送一个投票// 保存每一个服务器的最新合法有效的投票HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();// 存储合法选举之外的投票结果HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();// 一次选举的最大等待时间,默认值是 0.2sint notTimeout = finalizeWait;// 每发起一轮选举,logicalclock++// 在没有合法的 epoch 数据之前,都使用逻辑时钟代替// 选举 leader 的规则:依次比较 epoch(任期) zxid(事务 id) serverid(myid) 谁大谁当选 leadersynchronized(this){// 更新逻辑时钟,每进行一次选举,都需要更新逻辑时钟logicalclock.incrementAndGet();// 更新选票(serverid, zxid, epoch)updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());}LOG.info("New election. My id = " + self.getId() +", proposed zxid=0x" + Long.toHexString(proposedZxid));// 广播选票,把自己的选票发给其他服务器sendNotifications();/** Loop in which we exchange notifications until we find a leader*/// 一轮一轮的选举直到选举成功while ((self.getPeerState() == ServerState.LOOKING) &&(!stop)){......
}
点击 sendNotifications,广播选票,把自己的选票发给其他服务器
private void sendNotifications() {// 遍历投票参与者,给每台服务器发送选票for (long sid : self.getCurrentAndNextConfigVoters()) {QuorumVerifier qv = self.getQuorumVerifier();// 创建发送选票ToSend notmsg = new ToSend(ToSend.mType.notification,proposedLeader,proposedZxid,logicalclock.get(),QuorumPeer.ServerState.LOOKING,sid,proposedEpoch, qv.toString().getBytes());if(LOG.isDebugEnabled()){LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x" +Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock.get()) +" (n.round), " + sid + " (recipient), " + self.getId() +" (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)");}// 把发送选票放入发送队列sendqueue.offer(notmsg);}
}
在 FastLeaderElection.java 类中查找 WorkerSender 线程
class WorkerSender extends ZooKeeperThread {......public void run() {while (!stop) {try {// 队列阻塞,时刻准备接收要发送的选票ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);if(m == null) continue;// 处理要发送的选票process(m);} catch (InterruptedException e) {break;}}LOG.info("WorkerSender is down");}/*** Called by run() once there is a new message to send.** @param m message to send*/void process(ToSend m) {ByteBuffer requestBuffer = buildMsg(m.state.ordinal(),m.leader,m.zxid,m.electionEpoch,m.peerEpoch,m.configData);// 发送选票manager.toSend(m.sid, requestBuffer);}
}public void toSend(Long sid, ByteBuffer b) { // 判断如果是发给自己的消息,直接进入自己的 RecvQueueif (this.mySid == sid) {b.position(0);addToRecvQueue(new Message(b.duplicate(), sid));} else {// 如果是发给其他服务器,创建对应的发送队列或者获取已经存在的发送队列// ,并把要发送的消息放入该队列ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY);ArrayBlockingQueue<ByteBuffer> oldq = queueSendMap.putIfAbsent(sid, bq);if (oldq != null) {addToSendQueue(oldq, b);} else {addToSendQueue(bq, b);}// 将选票发送出去connectOne(sid);}
}
与要发送的服务器节点建立通信连接,创建并启动发送器线程和接收器线程
//connectOne->connectOne->initiateConnection->startConnection
private boolean startConnection(Socket sock, Long sid)throws IOException {DataOutputStream dout = null;DataInputStream din = null;try {// 通过输出流,向服务器发送数据BufferedOutputStream buf = new BufferedOutputStream(sock.getOutputStream());dout = new DataOutputStream(buf);// Sending id and challenge// represents protocol version (in other words - message type)dout.writeLong(PROTOCOL_VERSION);dout.writeLong(self.getId());String addr = formatInetAddr(self.getElectionAddress());byte[] addr_bytes = addr.getBytes();dout.writeInt(addr_bytes.length);dout.write(addr_bytes);dout.flush();// 通过输入流读取对方发送过来的选票din = new DataInputStream(new BufferedInputStream(sock.getInputStream()));} catch (IOException e) {LOG.warn("Ignoring exception reading or writing challenge: ", e);closeSocket(sock);return false;}// authenticate learnerQuorumPeer.QuorumServer qps = self.getVotingView().get(sid);if (qps != null) {// TODO - investigate why reconfig makes qps null.authLearner.authenticate(sock, qps.hostname);}// 如果对方的 id 比我的大,我是没有资格给对方发送连接请求的,直接关闭自己的客户端if (sid > self.getId()) {LOG.info("Have smaller server identifier, so dropping the " +"connection: (" + sid + ", " + self.getId() + ")");closeSocket(sock);// Otherwise proceed with the connection} else {// 初始化,发送器 和 接收器SendWorker sw = new SendWorker(sock, sid);RecvWorker rw = new RecvWorker(sock, din, sid, sw);sw.setRecv(rw);SendWorker vsw = senderWorkerMap.get(sid);if(vsw != null)vsw.finish();senderWorkerMap.put(sid, sw);queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY));// 启动发送器线程和接收器线程sw.start();rw.start();return true;}return false;
}
点击 SendWorker,并查找该类下的 run 方法;点击 RecvWorker,并查找该类下的 run 方法(这里不举例了),在 FastLeaderElection.java 类中查找 WorkerReceiver 线程
public void run() {Message response;while (!stop) {// Sleeps on receivetry {// 从 RecvQueue 中取出选举投票消息(其他服务器发送过来的)response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);......} catch (InterruptedException e) {......
}
5、Follower 和 Leader 状态同步源码
当选举结束后,每个节点都需要根据自己的角色更新自己的状态。选举出的 Leader 更新自己状态为Leader,其他节点更新自己状态为 Follower。Leader 更新状态入口:leader.lead()
;Follower 更新状态入口:follower.followerLeader()
注意:
- follower 必须要让 leader 知道自己的状态:epoch、zxid、sid 必须要找出谁是leader;发起请求连接 leader;发送自己的信息给leader;leader 接收到信息,必须要返回对应的信息给 follower
- 当leader 得知follower 的状态了,就确定需要做何种方式的数据同步DIFF、TRUNC、SNAP
- 执行数据同步
- 当 leader 接收到超过半数 follower 的 ack 之后,进入正常工作状态,集群启动完成了
最终总结同步的方式:
- DIFF 咱两一样,不需要做什么
- TRUNC follower 的 zxid 比 leader 的 zxid 大,所以 Follower 要回滚
- COMMIT leader 的zxid 比 follower 的 zxid 大,发送 Proposal 给 foloower 提交执行
- 如果 follower 并没有任何数据,直接使用 SNAP 的方式来执行数据同步(直接把数据全部序列到follower)
6、服务端启动
6.1 Leader 启动
ZooKeeperServer全局查找 Leader,然后 ctrl + f 查找 lead()
void lead() throws IOException, InterruptedException {... ...// 启动 zookeeper 服务startZkServer();... ...
}private synchronized void startZkServer() {......// 启动 zookeeper 服务zk.startup();......
}//LeaderZooKeeperServer.java->super.startup();//ZookeeperServer.java
public synchronized void startup() {if (sessionTracker == null) {createSessionTracker();}startSessionTracker();// 接受请求相关处理setupRequestProcessors();registerJMX();setState(State.RUNNING);notifyAll();
}protected void setupRequestProcessors() {RequestProcessor finalProcessor = new FinalRequestProcessor(this);RequestProcessor syncProcessor = new SyncRequestProcessor(this,finalProcessor);((SyncRequestProcessor)syncProcessor).start();firstProcessor = new PrepRequestProcessor(this, syncProcessor);((PrepRequestProcessor)firstProcessor).start();
}//点击 PrepRequestProcessor,并查找它的 run 方法
6.2 Follower 启动
FollowerZooKeeperServer全局查找 Follower,然后 ctrl + f 查找 followLeader()
void followLeader() throws InterruptedException {
......while (this.isRunning()) {readPacket(qp);processPacket(qp);}
......
}void readPacket(QuorumPacket pp) throws IOException {synchronized (leaderIs) {leaderIs.readRecord(pp, "packet");}if (LOG.isTraceEnabled()) {final long traceMask =(pp.getType() == Leader.PING) ? ZooTrace.SERVER_PING_TRACE_MASK: ZooTrace.SERVER_PACKET_TRACE_MASK;ZooTrace.logQuorumPacket(LOG, traceMask, 'i', pp);}
}protected void processPacket(QuorumPacket qp) throws Exception{switch (qp.getType()) {case Leader.PING: ping(qp); break;case Leader.PROPOSAL: TxnHeader hdr = new TxnHeader();Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);if (hdr.getZxid() != lastQueued + 1) {LOG.warn("Got zxid 0x"+ Long.toHexString(hdr.getZxid())+ " expected 0x"+ Long.toHexString(lastQueued + 1));}lastQueued = hdr.getZxid();if (hdr.getType() == OpCode.reconfig){SetDataTxn setDataTxn = (SetDataTxn) txn; QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData()));self.setLastSeenQuorumVerifier(qv, true); }fzk.logRequest(hdr, txn);break;case Leader.COMMIT:fzk.commit(qp.getZxid());break;case Leader.COMMITANDACTIVATE:// get the new configuration from the requestRequest request = fzk.pendingTxns.element();SetDataTxn setDataTxn = (SetDataTxn) request.getTxn(); QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData())); // get new designated leader from (current) leader's messageByteBuffer buffer = ByteBuffer.wrap(qp.getData()); long suggestedLeaderId = buffer.getLong();boolean majorChange = self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true);// commit (writes the new config to ZK tree (/zookeeper/config) fzk.commit(qp.getZxid());if (majorChange) {throw new Exception("changes proposed in reconfig");}break;case Leader.UPTODATE:LOG.error("Received an UPTODATE message after Follower started");break;case Leader.REVALIDATE:revalidate(qp);break;case Leader.SYNC:fzk.sync();break;default:LOG.warn("Unknown packet type: {}", LearnerHandler.packetToString(qp));break;}
}
7、客户端启动
在 ZkCli.sh 启动 Zookeeper 时,会调用 ZooKeeperMain.java,查找 ZooKeeperMain,找到程序的入口 main()方法
public static void main(String args[]) throws CliException, IOException, InterruptedException{ZooKeeperMain main = new ZooKeeperMain(args);main.run();
}
7.1 创建 ZookeeperMain
public ZooKeeperMain(String args[]) throws IOException, InterruptedException {cl.parseOptions(args);System.out.println("Connecting to " + cl.getOption("server"));connectToZK(cl.getOption("server"));
}protected void connectToZK(String newHost) throws InterruptedException, IOException {if (zk != null && zk.getState().isAlive()) {zk.close();}host = newHost;boolean readOnly = cl.getOption("readonly") != null;if (cl.getOption("secure") != null) {System.setProperty(ZKClientConfig.SECURE_CLIENT, "true");System.out.println("Secure connection is enabled");}zk = new ZooKeeperAdmin(host, Integer.parseInt(cl.getOption("timeout")), new MyWatcher(), readOnly);
}
7.2 初始化监听器
ZooKeeperAdmin一直点进去
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,boolean canBeReadOnly, HostProvider aHostProvider,ZKClientConfig clientConfig) throws IOException {LOG.info("Initiating client connection, connectString=" + connectString+ " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);if (clientConfig == null) {clientConfig = new ZKClientConfig();}this.clientConfig = clientConfig;watchManager = defaultWatchManager();// 赋值 watcher 给默认的 defaultWatcherwatchManager.defaultWatcher = watcher;ConnectStringParser connectStringParser = new ConnectStringParser(connectString);hostProvider = aHostProvider;// 客户端与服务器端通信的终端cnxn = createConnection(connectStringParser.getChrootPath(),hostProvider, sessionTimeout, this, watchManager,getClientCnxnSocket(), canBeReadOnly);cnxn.start();
}
7.3 解析连接地址
public ConnectStringParser(String connectString) {// parse out chroot, if anyint off = connectString.indexOf('/');if (off >= 0) {String chrootPath = connectString.substring(off);// ignore "/" chroot spec, same as nullif (chrootPath.length() == 1) {this.chrootPath = null;} else {PathUtils.validatePath(chrootPath);this.chrootPath = chrootPath;}connectString = connectString.substring(0, off);} else {this.chrootPath = null;}// "hadoop102:2181,hadoop103:2181,hadoop104:2181"用逗号切割List<String> hostsList = split(connectString,",");for (String host : hostsList) {int port = DEFAULT_PORT;int pidx = host.lastIndexOf(':');if (pidx >= 0) {// otherwise : is at the end of the string, ignoreif (pidx < host.length() - 1) {port = Integer.parseInt(host.substring(pidx + 1));}host = host.substring(0, pidx);}serverAddresses.add(InetSocketAddress.createUnresolved(host, port));}
}public class InetSocketAddress extends SocketAddress{// Private implementation class pointed to by all public methods.private static class InetSocketAddressHolder {// The hostname of the Socket Address 主机名称private String hostname;// The IP address of the Socket Address 通信地址private InetAddress addr;// The port number of the Socket Address 端口号private int port;... ...}... ...
}
7.4 创建通信
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,boolean canBeReadOnly, HostProvider aHostProvider,ZKClientConfig clientConfig) throws IOException {......// 客户端与服务器端通信的终端cnxn = createConnection(connectStringParser.getChrootPath(),hostProvider, sessionTimeout, this, watchManager,getClientCnxnSocket(), canBeReadOnly);cnxn.start();
}protected ClientCnxn createConnection(String chrootPath,HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,boolean canBeReadOnly) throws IOException {return new ClientCnxn(chrootPath, hostProvider, sessionTimeout, this,watchManager, clientCnxnSocket, canBeReadOnly);
}// 一直点下去,直到这里
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {this.zooKeeper = zooKeeper;this.watcher = watcher;this.sessionId = sessionId;this.sessionPasswd = sessionPasswd;this.sessionTimeout = sessionTimeout;this.hostProvider = hostProvider;this.chrootPath = chrootPath;connectTimeout = sessionTimeout / hostProvider.size();readTimeout = sessionTimeout * 2 / 3;readOnly = canBeReadOnly;sendThread = new SendThread(clientCnxnSocket);eventThread = new EventThread();this.clientConfig=zooKeeper.getClientConfig();initRequestTimeout();
}
点击SendThread,查找run方法
// ZooKeeperThread 是一个线程,执行它的 run()方法
@Override
public void run() {clientCnxnSocket.introduce(this, sessionId, outgoingQueue);clientCnxnSocket.updateNow();clientCnxnSocket.updateLastSendAndHeard();int to;long lastPingRwServer = Time.currentElapsedTime();final int MAX_SEND_PING_INTERVAL = 10000; //10 secondsInetSocketAddress serverAddress = null;// 在循环里面,循环发送,循环接收while (state.isAlive()) {try {if (!clientCnxnSocket.isConnected()) {// don't re-establish connection if we are closingif (closing) {break;}if (rwServerAddress != null) {serverAddress = rwServerAddress;rwServerAddress = null;} else {serverAddress = hostProvider.next(1000);}// 启动连接服务端startConnect(serverAddress);......// If we are in read-only mode, seek for read/write serverif (state == States.CONNECTEDREADONLY) {long now = Time.currentElapsedTime();int idlePingRwServer = (int) (now - lastPingRwServer);if (idlePingRwServer >= pingRwTimeout) {lastPingRwServer = now;idlePingRwServer = 0;pingRwTimeout =Math.min(2*pingRwTimeout, maxPingRwTimeout);pingRwServer();}to = Math.min(to, pingRwTimeout - idlePingRwServer);}// 接收服务端响应,并处理clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);......
}private void startConnect(InetSocketAddress addr) throws IOException {......logStartConnect(addr);// 建立连接clientCnxnSocket.connect(addr);
}
ctrl + alt +B 查找 connect 实现类,ClientCnxnSocketNIO.java
@Override
void connect(InetSocketAddress addr) throws IOException {SocketChannel sock = createSock();try {registerAndConnect(sock, addr);} catch (IOException e) {LOG.error("Unable to open socket to " + addr);sock.close();throw e;}initialized = false;/** Reset incomingBuffer*/lenBuffer.clear();incomingBuffer = lenBuffer;
}void registerAndConnect(SocketChannel sock, InetSocketAddress addr)
throws IOException {sockKey = sock.register(selector, SelectionKey.OP_CONNECT);boolean immediateConnect = sock.connect(addr);if (immediateConnect) {sendThread.primeConnection();}
}void primeConnection() throws IOException {LOG.info("Socket connection established, initiating session, client: {}, server: {}",clientCnxnSocket.getLocalSocketAddress(),clientCnxnSocket.getRemoteSocketAddress());// 标记不是第一次连接isFirstConnect = false;... ...
}
ctrl + alt +B 查找 doTransport 实现类,ClientCnxnSocketNIO.java
@Override
void doTransport(int waitTimeOut, List<Packet> pendingQueue, ClientCnxn cnxn)throws IOException, InterruptedException {selector.select(waitTimeOut);Set<SelectionKey> selected;synchronized (this) {selected = selector.selectedKeys();}// Everything below and until we get back to the select is// non blocking, so time is effectively a constant. That is// Why we just have to do this once, hereupdateNow();for (SelectionKey k : selected) {SocketChannel sc = ((SocketChannel) k.channel());if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {if (sc.finishConnect()) {updateLastSendAndHeard();updateSocketAddresses();sendThread.primeConnection();}} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {// 读取服务端应答doIO(pendingQueue, cnxn);}}if (sendThread.getZkState().isConnected()) {if (findSendablePacket(outgoingQueue,sendThread.tunnelAuthInProgress()) != null) {enableWrite();}}selected.clear();
}
7.5 执行 run()
public static void main(String args[]) throws CliException, IOException, InterruptedException{ZooKeeperMain main = new ZooKeeperMain(args);main.run();
}void run() throws CliException, IOException, InterruptedException {......if (jlinemissing) {System.out.println("JLine support is disabled");BufferedReader br =new BufferedReader(new InputStreamReader(System.in));String line;while ((line = br.readLine()) != null) {// 一行一行读取命令executeLine(line);}}} else {// Command line args non-null. Run what was passed.processCmd(cl);}System.exit(exitCode);
}public void executeLine(String line) throws CliException, InterruptedException, IOException {if (!line.equals("")) {cl.parseCommand(line);addToHistory(commandCount,line);// 处理客户端命令processCmd(cl);commandCount++;}
}protected boolean processCmd(MyCommandOptions co) throws CliException, IOException, InterruptedException {boolean watch = false;try {// 解析命令watch = processZKCmd(co);exitCode = 0;} catch (CliException ex) {exitCode = ex.getExitCode();System.err.println(ex.getMessage());}return watch;
}protected boolean processZKCmd(MyCommandOptions co) throws CliException, IOException, InterruptedException {String[] args = co.getArgArray();String cmd = co.getCommand();if (args.length < 1) {usage();throw new MalformedCommandException("No command entered");}if (!commandMap.containsKey(cmd)) {usage();throw new CommandNotFoundException("Command not found " + cmd);}boolean watch = false;LOG.debug("Processing " + cmd);if (cmd.equals("quit")) {zk.close();System.exit(exitCode);} else if (cmd.equals("redo") && args.length >= 2) {Integer i = Integer.decode(args[1]);if (commandCount <= i || i < 0) { // don't allow redoing this redothrow new MalformedCommandException("Command index out of range");}cl.parseCommand(history.get(i));if (cl.getCommand().equals("redo")) {throw new MalformedCommandException("No redoing redos");}history.put(commandCount, history.get(i));processCmd(cl);} else if (cmd.equals("history")) {for (int i = commandCount - 10; i <= commandCount; ++i) {if (i < 0) continue;System.out.println(i + " - " + history.get(i));}} else if (cmd.equals("printwatches")) {if (args.length == 1) {System.out.println("printwatches is " + (printWatches ? "on" : "off"));} else {printWatches = args[1].equals("on");}} else if (cmd.equals("connect")) {if (args.length >= 2) {connectToZK(args[1]);} else {connectToZK(host);}}// Below commands all need a live connectionif (zk == null || !zk.getState().isAlive()) {System.out.println("Not connected");return false;}// execute from commandMapCliCommand cliCmd = commandMapCli.get(cmd);if(cliCmd != null) {cliCmd.setZk(zk);watch = cliCmd.parse(args).exec();} else if (!commandMap.containsKey(cmd)) {usage();}return watch;
}