为什么80%的码农都做不了架构师?>>>
序
本文主要研究一下storm nimbus的LeaderElector
Nimbus
org/apache/storm/daemon/nimbus/Nimbus.java
public static void main(String[] args) throws Exception {Utils.setupDefaultUncaughtExceptionHandler();launch(new StandaloneINimbus());}public static Nimbus launch(INimbus inimbus) throws Exception {Map<String, Object> conf = Utils.merge(ConfigUtils.readStormConfig(),ConfigUtils.readYamlConfig("storm-cluster-auth.yaml", false));boolean fixupAcl = (boolean) conf.get(DaemonConfig.STORM_NIMBUS_ZOOKEEPER_ACLS_FIXUP);boolean checkAcl = fixupAcl || (boolean) conf.get(DaemonConfig.STORM_NIMBUS_ZOOKEEPER_ACLS_CHECK);if (checkAcl) {AclEnforcement.verifyAcls(conf, fixupAcl);}return launchServer(conf, inimbus);}private static Nimbus launchServer(Map<String, Object> conf, INimbus inimbus) throws Exception {StormCommon.validateDistributedMode(conf);validatePortAvailable(conf);StormMetricsRegistry metricsRegistry = new StormMetricsRegistry();final Nimbus nimbus = new Nimbus(conf, inimbus, metricsRegistry);nimbus.launchServer();final ThriftServer server = new ThriftServer(conf, new Processor<>(nimbus), ThriftConnectionType.NIMBUS);metricsRegistry.startMetricsReporters(conf);Utils.addShutdownHookWithDelayedForceKill(() -> {metricsRegistry.stopMetricsReporters();nimbus.shutdown();server.stop();}, 10);if (ClientAuthUtils.areWorkerTokensEnabledServer(server, conf)) {nimbus.initWorkerTokenManager();}LOG.info("Starting nimbus server for storm version '{}'", STORM_VERSION);server.serve();return nimbus;}public Nimbus(Map<String, Object> conf, INimbus inimbus, IStormClusterState stormClusterState, NimbusInfo hostPortInfo,BlobStore blobStore, TopoCache topoCache, ILeaderElector leaderElector, IGroupMappingServiceProvider groupMapper,StormMetricsRegistry metricsRegistry)throws Exception {//......if (blobStore == null) {blobStore = ServerUtils.getNimbusBlobStore(conf, this.nimbusHostPortInfo, null);}this.blobStore = blobStore;if (topoCache == null) {topoCache = new TopoCache(blobStore, conf);}if (leaderElector == null) {leaderElector = Zookeeper.zkLeaderElector(conf, zkClient, blobStore, topoCache, stormClusterState, getNimbusAcls(conf),metricsRegistry);}this.leaderElector = leaderElector;this.blobStore.setLeaderElector(this.leaderElector);//......}public void launchServer() throws Exception {try {BlobStore store = blobStore;IStormClusterState state = stormClusterState;NimbusInfo hpi = nimbusHostPortInfo;LOG.info("Starting Nimbus with conf {}", ConfigUtils.maskPasswords(conf));validator.prepare(conf);//add to nimbusesstate.addNimbusHost(hpi.getHost(),new NimbusSummary(hpi.getHost(), hpi.getPort(), Time.currentTimeSecs(), false, STORM_VERSION));leaderElector.addToLeaderLockQueue();this.blobStore.startSyncBlobs();for (ClusterMetricsConsumerExecutor exec: clusterConsumerExceutors) {exec.prepare();}if (isLeader()) {for (String topoId : state.activeStorms()) {transition(topoId, TopologyActions.STARTUP, null);}clusterMetricSet.setActive(true);}//......} catch (Exception e) {if (Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e)) {throw e;}if (Utils.exceptionCauseIsInstanceOf(InterruptedIOException.class, e)) {throw e;}LOG.error("Error on initialization of nimbus", e);Utils.exitProcess(13, "Error on initialization of nimbus");}}
- Nimbus在构造器里头调用Zookeeper.zkLeaderElector创建leaderElector
- launchServer方法调用了leaderElector.addToLeaderLockQueue()参与leader选举
Zookeeper.zkLeaderElector
storm-core-1.1.0-sources.jar!/org/apache/storm/zookeeper/Zookeeper.java
public static ILeaderElector zkLeaderElector(Map conf, BlobStore blobStore) throws UnknownHostException {return _instance.zkLeaderElectorImpl(conf, blobStore);}protected ILeaderElector zkLeaderElectorImpl(Map conf, BlobStore blobStore) throws UnknownHostException {List<String> servers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);Object port = conf.get(Config.STORM_ZOOKEEPER_PORT);CuratorFramework zk = mkClientImpl(conf, servers, port, "", conf);String leaderLockPath = conf.get(Config.STORM_ZOOKEEPER_ROOT) + "/leader-lock";String id = NimbusInfo.fromConf(conf).toHostPortString();AtomicReference<LeaderLatch> leaderLatchAtomicReference = new AtomicReference<>(new LeaderLatch(zk, leaderLockPath, id));AtomicReference<LeaderLatchListener> leaderLatchListenerAtomicReference =new AtomicReference<>(leaderLatchListenerImpl(conf, zk, blobStore, leaderLatchAtomicReference.get()));return new LeaderElectorImp(conf, servers, zk, leaderLockPath, id, leaderLatchAtomicReference,leaderLatchListenerAtomicReference, blobStore);}
- 这里使用/leader-lock路径创建了LeaderLatch,然后使用leaderLatchListenerImpl创建了LeaderLatchListener
- 最后使用LeaderElectorImp创建ILeaderElector
leaderLatchListenerImpl
storm-core-1.1.0-sources.jar!/org/apache/storm/zookeeper/Zookeeper.java
// Leader latch listener that will be invoked when we either gain or lose leadershippublic static LeaderLatchListener leaderLatchListenerImpl(final Map conf, final CuratorFramework zk, final BlobStore blobStore, final LeaderLatch leaderLatch) throws UnknownHostException {final String hostName = InetAddress.getLocalHost().getCanonicalHostName();return new LeaderLatchListener() {final String STORM_JAR_SUFFIX = "-stormjar.jar";final String STORM_CODE_SUFFIX = "-stormcode.ser";final String STORM_CONF_SUFFIX = "-stormconf.ser";@Overridepublic void isLeader() {Set<String> activeTopologyIds = new TreeSet<>(Zookeeper.getChildren(zk, conf.get(Config.STORM_ZOOKEEPER_ROOT) + ClusterUtils.STORMS_SUBTREE, false));Set<String> activeTopologyBlobKeys = populateTopologyBlobKeys(activeTopologyIds);Set<String> activeTopologyCodeKeys = filterTopologyCodeKeys(activeTopologyBlobKeys);Set<String> allLocalBlobKeys = Sets.newHashSet(blobStore.listKeys());Set<String> allLocalTopologyBlobKeys = filterTopologyBlobKeys(allLocalBlobKeys);// this finds all active topologies blob keys from all local topology blob keysSets.SetView<String> diffTopology = Sets.difference(activeTopologyBlobKeys, allLocalTopologyBlobKeys);LOG.info("active-topology-blobs [{}] local-topology-blobs [{}] diff-topology-blobs [{}]",generateJoinedString(activeTopologyIds), generateJoinedString(allLocalTopologyBlobKeys),generateJoinedString(diffTopology));if (diffTopology.isEmpty()) {Set<String> activeTopologyDependencies = getTopologyDependencyKeys(activeTopologyCodeKeys);// this finds all dependency blob keys from active topologies from all local blob keysSets.SetView<String> diffDependencies = Sets.difference(activeTopologyDependencies, allLocalBlobKeys);LOG.info("active-topology-dependencies [{}] local-blobs [{}] diff-topology-dependencies [{}]",generateJoinedString(activeTopologyDependencies), generateJoinedString(allLocalBlobKeys),generateJoinedString(diffDependencies));if (diffDependencies.isEmpty()) {LOG.info("Accepting leadership, all active topologies and corresponding dependencies found locally.");} else {LOG.info("Code for all active topologies is available locally, but some dependencies are not found locally, giving up leadership.");closeLatch();}} else {LOG.info("code for all active topologies not available locally, giving up leadership.");closeLatch();}}@Overridepublic void notLeader() {LOG.info("{} lost leadership.", hostName);}//......private void closeLatch() {try {leaderLatch.close();} catch (IOException e) {throw new RuntimeException(e);}}};}
- leaderLatchListenerImpl返回一个LeaderLatchListener接口的实现类
- isLeader接口里头做了一些校验,即当被zookeeper选中为leader的时候,如果本地没有所有的active topologies或者本地没有所有dependencies,那么就需要调用leaderLatch.close()放弃leadership
- notLeader接口主要打印一下log
LeaderElectorImp
org/apache/storm/zookeeper/LeaderElectorImp.java
public class LeaderElectorImp implements ILeaderElector {private static Logger LOG = LoggerFactory.getLogger(LeaderElectorImp.class);private final Map<String, Object> conf;private final List<String> servers;private final CuratorFramework zk;private final String leaderlockPath;private final String id;private final AtomicReference<LeaderLatch> leaderLatch;private final AtomicReference<LeaderLatchListener> leaderLatchListener;private final BlobStore blobStore;private final TopoCache tc;private final IStormClusterState clusterState;private final List<ACL> acls;private final StormMetricsRegistry metricsRegistry;public LeaderElectorImp(Map<String, Object> conf, List<String> servers, CuratorFramework zk, String leaderlockPath, String id,AtomicReference<LeaderLatch> leaderLatch, AtomicReference<LeaderLatchListener> leaderLatchListener,BlobStore blobStore, final TopoCache tc, IStormClusterState clusterState, List<ACL> acls,StormMetricsRegistry metricsRegistry) {this.conf = conf;this.servers = servers;this.zk = zk;this.leaderlockPath = leaderlockPath;this.id = id;this.leaderLatch = leaderLatch;this.leaderLatchListener = leaderLatchListener;this.blobStore = blobStore;this.tc = tc;this.clusterState = clusterState;this.acls = acls;this.metricsRegistry = metricsRegistry;}@Overridepublic void prepare(Map<String, Object> conf) {// no-op for zookeeper implementation}@Overridepublic void addToLeaderLockQueue() throws Exception {// if this latch is already closed, we need to create new instance.if (LeaderLatch.State.CLOSED.equals(leaderLatch.get().getState())) {leaderLatch.set(new LeaderLatch(zk, leaderlockPath));LeaderListenerCallback callback = new LeaderListenerCallback(conf, zk, leaderLatch.get(), blobStore, tc, clusterState, acls,metricsRegistry);leaderLatchListener.set(Zookeeper.leaderLatchListenerImpl(callback));LOG.info("LeaderLatch was in closed state. Resetted the leaderLatch and listeners.");}// Only if the latch is not already started we invoke startif (LeaderLatch.State.LATENT.equals(leaderLatch.get().getState())) {leaderLatch.get().addListener(leaderLatchListener.get());leaderLatch.get().start();LOG.info("Queued up for leader lock.");} else {LOG.info("Node already in queue for leader lock.");}}@Override// Only started latches can be closed.public void removeFromLeaderLockQueue() throws Exception {if (LeaderLatch.State.STARTED.equals(leaderLatch.get().getState())) {leaderLatch.get().close();LOG.info("Removed from leader lock queue.");} else {LOG.info("leader latch is not started so no removeFromLeaderLockQueue needed.");}}@Overridepublic boolean isLeader() throws Exception {return leaderLatch.get().hasLeadership();}@Overridepublic NimbusInfo getLeader() {try {return Zookeeper.toNimbusInfo(leaderLatch.get().getLeader());} catch (Exception e) {throw Utils.wrapInRuntime(e);}}@Overridepublic List<NimbusInfo> getAllNimbuses() throws Exception {List<NimbusInfo> nimbusInfos = new ArrayList<>();Collection<Participant> participants = leaderLatch.get().getParticipants();for (Participant participant : participants) {nimbusInfos.add(Zookeeper.toNimbusInfo(participant));}return nimbusInfos;}@Overridepublic void close() {//Do nothing now.}
}
- LeaderElectorImp实现了ILeaderElector接口
- addToLeaderLockQueue方法检测如果latch已经closed,则重新创建一个新的,然后检测latch的状态,如果还没有start的话,则调用start参与选举
- 之所以对closed状态的latch创建一个,主要有两个原因:一是对已经closed的latch进行方法调用会抛异常,二是被zk选举为leader,但是不满意storm的一些leader条件会放弃leadership即close掉
小结
- storm nimbus的LeaderElector主要是基于zookeeper recipies的LeaderLatch来实现
- storm nimbus自定义了LeaderLatchListener,对成为leader之后的nimbus进行校验,需要本地拥有所有的active topologies以及所有dependencies,否则放弃leadership
doc
- Highly Available Nimbus Design