1.zookeeper启动入口
在zkServer.sh的启动命令中,我们可以找到zookeeper启动的关键类org.apache.zookeeper.server.quorum.QuorumPeerMain
QuorumPeerMain#main
我们可以直接看org.apache.zookeeper.server.quorum.QuorumPeerMain中的main方法,从下面的main方法中,我们可以看到初始化了一个QuorumPeerMain对象,并进行调用initializeAndRun方法
public static void main(String[] args) {QuorumPeerMain main = new QuorumPeerMain();try {main.initializeAndRun(args);} catch (IllegalArgumentException e) {LOG.error("Invalid arguments, exiting abnormally", e);LOG.info(USAGE);System.err.println(USAGE);System.exit(2);} catch (ConfigException e) {LOG.error("Invalid config, exiting abnormally", e);System.err.println("Invalid config, exiting abnormally");System.exit(2);} catch (DatadirException e) {LOG.error("Unable to access datadir, exiting abnormally", e);System.err.println("Unable to access datadir, exiting abnormally");System.exit(3);} catch (AdminServerException e) {LOG.error("Unable to start AdminServer, exiting abnormally", e);System.err.println("Unable to start AdminServer, exiting abnormally");System.exit(4);} catch (Exception e) {LOG.error("Unexpected exception, exiting abnormally", e);System.exit(1);}LOG.info("Exiting normally");System.exit(0);}
QuorumPeerMain#initializeAndRun
我们紧接看这个initializeAndRun方法,方法里的代码如下:
protected void initializeAndRun(String[] args)throws ConfigException, IOException, AdminServerException{QuorumPeerConfig config = new QuorumPeerConfig();if (args.length == 1) {//解析参数信息config.parse(args[0]);}// Start and schedule the the purge task// 启动一个定时任务进行定时清理日志信息DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config.getDataDir(), config.getDataLogDir(), config.getSnapRetainCount(), config.getPurgeInterval());purgeMgr.start();if (args.length == 1 && config.isDistributed()) {//作为集群启动runFromConfig(config);} else {LOG.warn("Either no config or no quorum defined in config, running "+ " in standalone mode");// there is only server in the quorum -- run as standalone// 如果没有配置文件的信息的话,就单机进行启动ZooKeeperServerMain.main(args);}}
QuorumPeerMain#runFromConfig
在上面的initializeAndRun方法中我们看到了zookeeper 根据配置文件进行判断是否是单机启动还是集群启动,这块我们就按照集群启动的方式进行启动。我们紧接看这个runFromConfig方法。
public void runFromConfig(QuorumPeerConfig config)throws IOException, AdminServerException{/注册log4j的beantry {ManagedUtil.registerLog4jMBeans();} catch (JMException e) {LOG.warn("Unable to register log4j JMX control", e);}//从这块进行开始启动quorum peer 一个quorum peer就对应着一个zookeeper机器节点LOG.info("Starting quorum peer, myid=" + config.getServerId());try {//创建网络连接工厂ServerCnxnFactory cnxnFactory = null;ServerCnxnFactory secureCnxnFactory = null;if (config.getClientPortAddress() != null) {cnxnFactory = ServerCnxnFactory.createFactory();cnxnFactory.configure(config.getClientPortAddress(),config.getMaxClientCnxns(),false);}if (config.getSecureClientPortAddress() != null) {secureCnxnFactory = ServerCnxnFactory.createFactory();secureCnxnFactory.configure(config.getSecureClientPortAddress(),config.getMaxClientCnxns(),true);}//从这开始就是对quorumPeer的属性的设置//里面有很多属性暂时可以先不了解 // 一个Zookeeper主节点的启动quorumPeer = getQuorumPeer();//磁盘数据管理组件quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(),config.getDataDir()));quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());quorumPeer.enableLocalSessionsUpgrading(config.isLocalSessionsUpgradingEnabled());//quorumPeer.setQuorumPeers(config.getAllMembers());quorumPeer.setElectionType(config.getElectionAlg());quorumPeer.setMyid(config.getServerId());quorumPeer.setTickTime(config.getTickTime());quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());quorumPeer.setInitLimit(config.getInitLimit());quorumPeer.setSyncLimit(config.getSyncLimit());quorumPeer.setConfigFileName(config.getConfigFilename());// zk的内存数据库quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);if (config.getLastSeenQuorumVerifier()!=null) {quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);}quorumPeer.initConfigInZKDatabase();quorumPeer.setCnxnFactory(cnxnFactory);quorumPeer.setSecureCnxnFactory(secureCnxnFactory);quorumPeer.setSslQuorum(config.isSslQuorum());quorumPeer.setUsePortUnification(config.sh