聊聊storm nimbus的LeaderElector

为什么80%的码农都做不了架构师?>>>   hot3.png

本文主要研究一下storm nimbus的LeaderElector

Nimbus

org/apache/storm/daemon/nimbus/Nimbus.java

    public static void main(String[] args) throws Exception {Utils.setupDefaultUncaughtExceptionHandler();launch(new StandaloneINimbus());}public static Nimbus launch(INimbus inimbus) throws Exception {Map<String, Object> conf = Utils.merge(ConfigUtils.readStormConfig(),ConfigUtils.readYamlConfig("storm-cluster-auth.yaml", false));boolean fixupAcl = (boolean) conf.get(DaemonConfig.STORM_NIMBUS_ZOOKEEPER_ACLS_FIXUP);boolean checkAcl = fixupAcl || (boolean) conf.get(DaemonConfig.STORM_NIMBUS_ZOOKEEPER_ACLS_CHECK);if (checkAcl) {AclEnforcement.verifyAcls(conf, fixupAcl);}return launchServer(conf, inimbus);}private static Nimbus launchServer(Map<String, Object> conf, INimbus inimbus) throws Exception {StormCommon.validateDistributedMode(conf);validatePortAvailable(conf);StormMetricsRegistry metricsRegistry = new StormMetricsRegistry();final Nimbus nimbus = new Nimbus(conf, inimbus, metricsRegistry);nimbus.launchServer();final ThriftServer server = new ThriftServer(conf, new Processor<>(nimbus), ThriftConnectionType.NIMBUS);metricsRegistry.startMetricsReporters(conf);Utils.addShutdownHookWithDelayedForceKill(() -> {metricsRegistry.stopMetricsReporters();nimbus.shutdown();server.stop();}, 10);if (ClientAuthUtils.areWorkerTokensEnabledServer(server, conf)) {nimbus.initWorkerTokenManager();}LOG.info("Starting nimbus server for storm version '{}'", STORM_VERSION);server.serve();return nimbus;}public Nimbus(Map<String, Object> conf, INimbus inimbus, IStormClusterState stormClusterState, NimbusInfo hostPortInfo,BlobStore blobStore, TopoCache topoCache, ILeaderElector leaderElector, IGroupMappingServiceProvider groupMapper,StormMetricsRegistry metricsRegistry)throws Exception {//......if (blobStore == null) {blobStore = ServerUtils.getNimbusBlobStore(conf, this.nimbusHostPortInfo, null);}this.blobStore = blobStore;if (topoCache == null) {topoCache = new TopoCache(blobStore, conf);}if (leaderElector == null) {leaderElector = Zookeeper.zkLeaderElector(conf, zkClient, blobStore, topoCache, stormClusterState, getNimbusAcls(conf),metricsRegistry);}this.leaderElector = leaderElector;this.blobStore.setLeaderElector(this.leaderElector);//......}public void launchServer() throws Exception {try {BlobStore store = blobStore;IStormClusterState state = stormClusterState;NimbusInfo hpi = nimbusHostPortInfo;LOG.info("Starting Nimbus with conf {}", ConfigUtils.maskPasswords(conf));validator.prepare(conf);//add to nimbusesstate.addNimbusHost(hpi.getHost(),new NimbusSummary(hpi.getHost(), hpi.getPort(), Time.currentTimeSecs(), false, STORM_VERSION));leaderElector.addToLeaderLockQueue();this.blobStore.startSyncBlobs();for (ClusterMetricsConsumerExecutor exec: clusterConsumerExceutors) {exec.prepare();}if (isLeader()) {for (String topoId : state.activeStorms()) {transition(topoId, TopologyActions.STARTUP, null);}clusterMetricSet.setActive(true);}//......} catch (Exception e) {if (Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e)) {throw e;}if (Utils.exceptionCauseIsInstanceOf(InterruptedIOException.class, e)) {throw e;}LOG.error("Error on initialization of nimbus", e);Utils.exitProcess(13, "Error on initialization of nimbus");}}
  • Nimbus在构造器里头调用Zookeeper.zkLeaderElector创建leaderElector
  • launchServer方法调用了leaderElector.addToLeaderLockQueue()参与leader选举

Zookeeper.zkLeaderElector

storm-core-1.1.0-sources.jar!/org/apache/storm/zookeeper/Zookeeper.java

    public static ILeaderElector zkLeaderElector(Map conf, BlobStore blobStore) throws UnknownHostException {return _instance.zkLeaderElectorImpl(conf, blobStore);}protected ILeaderElector zkLeaderElectorImpl(Map conf, BlobStore blobStore) throws UnknownHostException {List<String> servers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);Object port = conf.get(Config.STORM_ZOOKEEPER_PORT);CuratorFramework zk = mkClientImpl(conf, servers, port, "", conf);String leaderLockPath = conf.get(Config.STORM_ZOOKEEPER_ROOT) + "/leader-lock";String id = NimbusInfo.fromConf(conf).toHostPortString();AtomicReference<LeaderLatch> leaderLatchAtomicReference = new AtomicReference<>(new LeaderLatch(zk, leaderLockPath, id));AtomicReference<LeaderLatchListener> leaderLatchListenerAtomicReference =new AtomicReference<>(leaderLatchListenerImpl(conf, zk, blobStore, leaderLatchAtomicReference.get()));return new LeaderElectorImp(conf, servers, zk, leaderLockPath, id, leaderLatchAtomicReference,leaderLatchListenerAtomicReference, blobStore);}
  • 这里使用/leader-lock路径创建了LeaderLatch,然后使用leaderLatchListenerImpl创建了LeaderLatchListener
  • 最后使用LeaderElectorImp创建ILeaderElector

leaderLatchListenerImpl

storm-core-1.1.0-sources.jar!/org/apache/storm/zookeeper/Zookeeper.java

    // Leader latch listener that will be invoked when we either gain or lose leadershippublic static LeaderLatchListener leaderLatchListenerImpl(final Map conf, final CuratorFramework zk, final BlobStore blobStore, final LeaderLatch leaderLatch) throws UnknownHostException {final String hostName = InetAddress.getLocalHost().getCanonicalHostName();return new LeaderLatchListener() {final String STORM_JAR_SUFFIX = "-stormjar.jar";final String STORM_CODE_SUFFIX = "-stormcode.ser";final String STORM_CONF_SUFFIX = "-stormconf.ser";@Overridepublic void isLeader() {Set<String> activeTopologyIds = new TreeSet<>(Zookeeper.getChildren(zk, conf.get(Config.STORM_ZOOKEEPER_ROOT) + ClusterUtils.STORMS_SUBTREE, false));Set<String> activeTopologyBlobKeys = populateTopologyBlobKeys(activeTopologyIds);Set<String> activeTopologyCodeKeys = filterTopologyCodeKeys(activeTopologyBlobKeys);Set<String> allLocalBlobKeys = Sets.newHashSet(blobStore.listKeys());Set<String> allLocalTopologyBlobKeys = filterTopologyBlobKeys(allLocalBlobKeys);// this finds all active topologies blob keys from all local topology blob keysSets.SetView<String> diffTopology = Sets.difference(activeTopologyBlobKeys, allLocalTopologyBlobKeys);LOG.info("active-topology-blobs [{}] local-topology-blobs [{}] diff-topology-blobs [{}]",generateJoinedString(activeTopologyIds), generateJoinedString(allLocalTopologyBlobKeys),generateJoinedString(diffTopology));if (diffTopology.isEmpty()) {Set<String> activeTopologyDependencies = getTopologyDependencyKeys(activeTopologyCodeKeys);// this finds all dependency blob keys from active topologies from all local blob keysSets.SetView<String> diffDependencies = Sets.difference(activeTopologyDependencies, allLocalBlobKeys);LOG.info("active-topology-dependencies [{}] local-blobs [{}] diff-topology-dependencies [{}]",generateJoinedString(activeTopologyDependencies), generateJoinedString(allLocalBlobKeys),generateJoinedString(diffDependencies));if (diffDependencies.isEmpty()) {LOG.info("Accepting leadership, all active topologies and corresponding dependencies found locally.");} else {LOG.info("Code for all active topologies is available locally, but some dependencies are not found locally, giving up leadership.");closeLatch();}} else {LOG.info("code for all active topologies not available locally, giving up leadership.");closeLatch();}}@Overridepublic void notLeader() {LOG.info("{} lost leadership.", hostName);}//......private void closeLatch() {try {leaderLatch.close();} catch (IOException e) {throw new RuntimeException(e);}}};}
  • leaderLatchListenerImpl返回一个LeaderLatchListener接口的实现类
  • isLeader接口里头做了一些校验,即当被zookeeper选中为leader的时候,如果本地没有所有的active topologies或者本地没有所有dependencies,那么就需要调用leaderLatch.close()放弃leadership
  • notLeader接口主要打印一下log

LeaderElectorImp

org/apache/storm/zookeeper/LeaderElectorImp.java

public class LeaderElectorImp implements ILeaderElector {private static Logger LOG = LoggerFactory.getLogger(LeaderElectorImp.class);private final Map<String, Object> conf;private final List<String> servers;private final CuratorFramework zk;private final String leaderlockPath;private final String id;private final AtomicReference<LeaderLatch> leaderLatch;private final AtomicReference<LeaderLatchListener> leaderLatchListener;private final BlobStore blobStore;private final TopoCache tc;private final IStormClusterState clusterState;private final List<ACL> acls;private final StormMetricsRegistry metricsRegistry;public LeaderElectorImp(Map<String, Object> conf, List<String> servers, CuratorFramework zk, String leaderlockPath, String id,AtomicReference<LeaderLatch> leaderLatch, AtomicReference<LeaderLatchListener> leaderLatchListener,BlobStore blobStore, final TopoCache tc, IStormClusterState clusterState, List<ACL> acls,StormMetricsRegistry metricsRegistry) {this.conf = conf;this.servers = servers;this.zk = zk;this.leaderlockPath = leaderlockPath;this.id = id;this.leaderLatch = leaderLatch;this.leaderLatchListener = leaderLatchListener;this.blobStore = blobStore;this.tc = tc;this.clusterState = clusterState;this.acls = acls;this.metricsRegistry = metricsRegistry;}@Overridepublic void prepare(Map<String, Object> conf) {// no-op for zookeeper implementation}@Overridepublic void addToLeaderLockQueue() throws Exception {// if this latch is already closed, we need to create new instance.if (LeaderLatch.State.CLOSED.equals(leaderLatch.get().getState())) {leaderLatch.set(new LeaderLatch(zk, leaderlockPath));LeaderListenerCallback callback = new LeaderListenerCallback(conf, zk, leaderLatch.get(), blobStore, tc, clusterState, acls,metricsRegistry);leaderLatchListener.set(Zookeeper.leaderLatchListenerImpl(callback));LOG.info("LeaderLatch was in closed state. Resetted the leaderLatch and listeners.");}// Only if the latch is not already started we invoke startif (LeaderLatch.State.LATENT.equals(leaderLatch.get().getState())) {leaderLatch.get().addListener(leaderLatchListener.get());leaderLatch.get().start();LOG.info("Queued up for leader lock.");} else {LOG.info("Node already in queue for leader lock.");}}@Override// Only started latches can be closed.public void removeFromLeaderLockQueue() throws Exception {if (LeaderLatch.State.STARTED.equals(leaderLatch.get().getState())) {leaderLatch.get().close();LOG.info("Removed from leader lock queue.");} else {LOG.info("leader latch is not started so no removeFromLeaderLockQueue needed.");}}@Overridepublic boolean isLeader() throws Exception {return leaderLatch.get().hasLeadership();}@Overridepublic NimbusInfo getLeader() {try {return Zookeeper.toNimbusInfo(leaderLatch.get().getLeader());} catch (Exception e) {throw Utils.wrapInRuntime(e);}}@Overridepublic List<NimbusInfo> getAllNimbuses() throws Exception {List<NimbusInfo> nimbusInfos = new ArrayList<>();Collection<Participant> participants = leaderLatch.get().getParticipants();for (Participant participant : participants) {nimbusInfos.add(Zookeeper.toNimbusInfo(participant));}return nimbusInfos;}@Overridepublic void close() {//Do nothing now.}
}
  • LeaderElectorImp实现了ILeaderElector接口
  • addToLeaderLockQueue方法检测如果latch已经closed,则重新创建一个新的,然后检测latch的状态,如果还没有start的话,则调用start参与选举
  • 之所以对closed状态的latch创建一个,主要有两个原因:一是对已经closed的latch进行方法调用会抛异常,二是被zk选举为leader,但是不满意storm的一些leader条件会放弃leadership即close掉

小结

  • storm nimbus的LeaderElector主要是基于zookeeper recipies的LeaderLatch来实现
  • storm nimbus自定义了LeaderLatchListener,对成为leader之后的nimbus进行校验,需要本地拥有所有的active topologies以及所有dependencies,否则放弃leadership

doc

  • Highly Available Nimbus Design

转载于:https://my.oschina.net/go4it/blog/2239477

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/281933.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如果我去深圳,你会见我吗

▲图/ 深圳夜景初次见易小姐&#xff0c;还是21年的春节回老家的时候。想来20年因为疫情没有回家&#xff0c;家母几次三番电话里头表达的思念以及建议一些不靠谱的回家计划&#xff0c;着实有些不忍&#xff0c;确实有似“儿行千里母担忧”之理&#xff0c;索性拿着年假和加班…

开源轻量的 .NET 监控工具 - 看门狗

你好&#xff0c;这里是 Dotnet 工具箱&#xff0c;定期分享 Dotnet 有趣&#xff0c;实用的工具或组件&#xff0c;希望对您有用&#xff01;简介WatchDog 是一个使用 C# 开发的开源的轻量监控工具&#xff0c;它可以记录和查看 ASP.Net Core Web 和 WebApi 的实时消息、事件、…

BZOJ 3231: [Sdoi2008]递归数列 (JZYZOJ 1353) 矩阵快速幂

http://www.lydsy.com/JudgeOnline/problem.php?id3231和斐波那契一个道理在最后加一个求和即可1 #include<cstdio>2 #include<cstring>3 #include<iostream>4 //using namespace std;5 const int maxn10010;6 const double eps1e-8;7 long long modn;8 lon…

马斯克的火箭上天了,SpaceX开源项目也登上了热榜!

python知识手册SpaceX于美国东部时间5月30日下午3&#xff1a;22分将两位美国宇航员送往国际空间站&#xff0c;虽然这只是Demo任务&#xff0c;但SpaceX已经以其卓越工程优势、低廉的发射成本赢得了全球航天产业的信赖。同时也是除美俄中这些航天国家队以外&#xff0c;唯一独…

机器视觉Halcon教程(1.介绍)

前言本期教程主要教大家如何使用Halcon机器视觉&#xff0c;通过使用Halcon, 我们可以实现一些机器视觉的应用开发。例如: OCR识别、视觉定位、缺陷检测等内容。什么是halcon&#xff1f;简单来说, Halcon就是一款应用于机器视觉的软件&#xff0c;它提供了一套开发工具&#x…

网络时间的那些事及 ntpq 详解

2019独角兽企业重金招聘Python工程师标准>>> GMT (Greenwich Mean Time)格林威治时间 UTC (Coordinated Universal Time) 协调世界时 IAT (International Atomic Time),TAI 国际原子时 CST (Chinese Standard Time), 北京时间Gentoo&#xff08;也许其他发行版也是&…

【前端芝士树】Javascript的原型与原型链

【前端芝士树】Javascript的原型、原型链以及继承机制 前端的面试中经常会遇到这个问题&#xff0c;自己也是一直似懂非懂&#xff0c;趁这个机会整理一下0. 为什么会出现原型和原型链的概念 1994年&#xff0c;网景公司&#xff08;Netscape&#xff09;发布了Navigator浏览器…

C# 反射之Activator用法举例

概述程序运行时&#xff0c;通过反射可以得到其它程序集或者自己程序集代码的各种信息&#xff0c;包括类、函数、变量等来实例化它们&#xff0c;执行它们&#xff0c;操作它们&#xff0c;实际上就是获取程序在内存中的映像&#xff0c;然后基于这个映像进行各种操作。Activa…

MyBatis批量插入

转载于:https://blog.51cto.com/12701034/1929672

狐狸文│区块链发展的正路

&#xff08;图片出自网络&#xff0c;版权归原作者所有&#xff09;最近看了一本书&#xff1a;《美国增长的起落》。这本书是大部头&#xff0c;但看起来很过瘾。通过对这本书的阅读&#xff0c;我更新了自己对区块链发展的理解。这一年&#xff0c;“区块链”很热&#xff0…

Qt之水平/垂直布局(QBoxLayout、QHBoxLayout、QVBoxLayout)

简述 QBoxLayout可以在水平方向或垂直方向上排列控件&#xff0c;由QHBoxLayout、QVBoxLayout所继承。 QHBoxLayout&#xff1a;水平布局&#xff0c;在水平方向上排列控件&#xff0c;即&#xff1a;左右排列。 QVBoxLayout&#xff1a;垂直布局&#xff0c;在垂直方向上排列控…

Optaplanner终于支持多线程并行运行 - Multithreaded incremental solving

Optaplanner 7.9.0.Final之前&#xff0c;启动引擎开始对一个Problem进行规划的时候&#xff0c;只能是单线程进行的。也就是说&#xff0c;当引擎对每一个possible solution进行分数计算的过程中&#xff0c;细化到每个步骤(Caculation)&#xff0c;都只能排队在同一个线程中依…

python棋盘格_干货必看 | Python的turtle库之经典棋盘格

国际棋盘格是一个由9横9纵的线组成的格子正方形&#xff0c;用Python的turtle库进行绘制的时候&#xff0c;先做9横9纵的线&#xff0c;再填上灰色小正方形&#xff0c;这就可以完成一个棋盘格了&#xff0c;下面是具体的操作步骤。(一)整体代码1、import turtleimport turtle2…

ResourceManager中的Resource Estimator框架介绍与算法剖析

欢迎大家前往腾讯云社区&#xff0c;获取更多腾讯海量技术实践干货哦~ 本文由宋超发表于云社区专栏 本文首先介绍了Hadoop中的ResourceManager中的estimator service的框架与运行流程&#xff0c;然后对其中用到的资源估算算法进行了原理剖析。 一. Resource Estimator Service…

几十款 WPF 控件 - UI 库,总有一款适合你

几十款 WPF 控件 - UI 库&#xff0c;总有一款适合你独立观察员 2022 年 10 月 16 日引言众所周知&#xff0c;使用 WPF 框架能够开发出功能强大、界面美观的桌面端应用。能够达到这个效果&#xff0c;各种 WPF 的控件库、UI 库功不可没。所以&#xff0c;想着能不能收集一下目…

Android Studio导出jar包

Eclipse直接有个Export&#xff0c;可以直接导出jar包。AS相对Eclipse变化很大&#xff0c;编译脚本变成了Gradle&#xff0c;各种导包操作都有差异。 下面是AS导出jar的过程: 第一步&#xff0c;修改app下的build.grade。 apply plugin: com.android.application修改为 apply …

GitHub Actions构建镜像并部署服务

目的通过GitHub的Actions来(白嫖)部署.Net服务到阿里云服务器。环境准备需要一个阿里云服务器并且该服务器还安装了docker环境&#xff0c;如果环境安装不清楚可以查看之前的文章。创建镜像仓库在阿里云的容器镜像服务中&#xff0c;创建一个镜像仓库用来存储我们测试的镜像&am…

20165232 缓冲区溢出漏洞实验

缓冲区溢出漏洞实验 实验准备 实验环境需要32位的Linux系统&#xff0c;需要下载安装一些用于编译 32 位 C 程序的软件包&#xff0c;代码如下&#xff1a; $ sudo apt-get update$ sudo apt-get install -y lib32z1 libc6-dev-i386$ sudo apt-get install -y lib32readline-gp…

Atcoder 084D - Small Multiple(最短路径+思维)

分析&#xff1a;这题脑洞新奇...居然是最短路...将0到k-1看做k个点&#xff0c;第t个点向(10*t0,1,2...,9)%k连一条长度为0,,1,2,..,9的边&#xff0c;然后枚举s1,2,...,9,算出所有从s到0的最短路&#xff0c;答案就是最短路s的最小值。 1 #include<iostream>2 #include…

Blazor学习之旅(5)数据绑定

【Blazor】| 总结/Edison Zhou大家好&#xff0c;我是Edison。最近在学习Blazor做全栈开发&#xff0c;因此根据老习惯&#xff0c;我会将我的学习过程记录下来&#xff0c;一来体系化整理&#xff0c;二来作为笔记供将来翻看。本篇&#xff0c;我们来了解下在Blazor中数据是如…