聊聊storm nimbus的LeaderElector

为什么80%的码农都做不了架构师?>>>   hot3.png

本文主要研究一下storm nimbus的LeaderElector

Nimbus

org/apache/storm/daemon/nimbus/Nimbus.java

    public static void main(String[] args) throws Exception {Utils.setupDefaultUncaughtExceptionHandler();launch(new StandaloneINimbus());}public static Nimbus launch(INimbus inimbus) throws Exception {Map<String, Object> conf = Utils.merge(ConfigUtils.readStormConfig(),ConfigUtils.readYamlConfig("storm-cluster-auth.yaml", false));boolean fixupAcl = (boolean) conf.get(DaemonConfig.STORM_NIMBUS_ZOOKEEPER_ACLS_FIXUP);boolean checkAcl = fixupAcl || (boolean) conf.get(DaemonConfig.STORM_NIMBUS_ZOOKEEPER_ACLS_CHECK);if (checkAcl) {AclEnforcement.verifyAcls(conf, fixupAcl);}return launchServer(conf, inimbus);}private static Nimbus launchServer(Map<String, Object> conf, INimbus inimbus) throws Exception {StormCommon.validateDistributedMode(conf);validatePortAvailable(conf);StormMetricsRegistry metricsRegistry = new StormMetricsRegistry();final Nimbus nimbus = new Nimbus(conf, inimbus, metricsRegistry);nimbus.launchServer();final ThriftServer server = new ThriftServer(conf, new Processor<>(nimbus), ThriftConnectionType.NIMBUS);metricsRegistry.startMetricsReporters(conf);Utils.addShutdownHookWithDelayedForceKill(() -> {metricsRegistry.stopMetricsReporters();nimbus.shutdown();server.stop();}, 10);if (ClientAuthUtils.areWorkerTokensEnabledServer(server, conf)) {nimbus.initWorkerTokenManager();}LOG.info("Starting nimbus server for storm version '{}'", STORM_VERSION);server.serve();return nimbus;}public Nimbus(Map<String, Object> conf, INimbus inimbus, IStormClusterState stormClusterState, NimbusInfo hostPortInfo,BlobStore blobStore, TopoCache topoCache, ILeaderElector leaderElector, IGroupMappingServiceProvider groupMapper,StormMetricsRegistry metricsRegistry)throws Exception {//......if (blobStore == null) {blobStore = ServerUtils.getNimbusBlobStore(conf, this.nimbusHostPortInfo, null);}this.blobStore = blobStore;if (topoCache == null) {topoCache = new TopoCache(blobStore, conf);}if (leaderElector == null) {leaderElector = Zookeeper.zkLeaderElector(conf, zkClient, blobStore, topoCache, stormClusterState, getNimbusAcls(conf),metricsRegistry);}this.leaderElector = leaderElector;this.blobStore.setLeaderElector(this.leaderElector);//......}public void launchServer() throws Exception {try {BlobStore store = blobStore;IStormClusterState state = stormClusterState;NimbusInfo hpi = nimbusHostPortInfo;LOG.info("Starting Nimbus with conf {}", ConfigUtils.maskPasswords(conf));validator.prepare(conf);//add to nimbusesstate.addNimbusHost(hpi.getHost(),new NimbusSummary(hpi.getHost(), hpi.getPort(), Time.currentTimeSecs(), false, STORM_VERSION));leaderElector.addToLeaderLockQueue();this.blobStore.startSyncBlobs();for (ClusterMetricsConsumerExecutor exec: clusterConsumerExceutors) {exec.prepare();}if (isLeader()) {for (String topoId : state.activeStorms()) {transition(topoId, TopologyActions.STARTUP, null);}clusterMetricSet.setActive(true);}//......} catch (Exception e) {if (Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e)) {throw e;}if (Utils.exceptionCauseIsInstanceOf(InterruptedIOException.class, e)) {throw e;}LOG.error("Error on initialization of nimbus", e);Utils.exitProcess(13, "Error on initialization of nimbus");}}
  • Nimbus在构造器里头调用Zookeeper.zkLeaderElector创建leaderElector
  • launchServer方法调用了leaderElector.addToLeaderLockQueue()参与leader选举

Zookeeper.zkLeaderElector

storm-core-1.1.0-sources.jar!/org/apache/storm/zookeeper/Zookeeper.java

    public static ILeaderElector zkLeaderElector(Map conf, BlobStore blobStore) throws UnknownHostException {return _instance.zkLeaderElectorImpl(conf, blobStore);}protected ILeaderElector zkLeaderElectorImpl(Map conf, BlobStore blobStore) throws UnknownHostException {List<String> servers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);Object port = conf.get(Config.STORM_ZOOKEEPER_PORT);CuratorFramework zk = mkClientImpl(conf, servers, port, "", conf);String leaderLockPath = conf.get(Config.STORM_ZOOKEEPER_ROOT) + "/leader-lock";String id = NimbusInfo.fromConf(conf).toHostPortString();AtomicReference<LeaderLatch> leaderLatchAtomicReference = new AtomicReference<>(new LeaderLatch(zk, leaderLockPath, id));AtomicReference<LeaderLatchListener> leaderLatchListenerAtomicReference =new AtomicReference<>(leaderLatchListenerImpl(conf, zk, blobStore, leaderLatchAtomicReference.get()));return new LeaderElectorImp(conf, servers, zk, leaderLockPath, id, leaderLatchAtomicReference,leaderLatchListenerAtomicReference, blobStore);}
  • 这里使用/leader-lock路径创建了LeaderLatch,然后使用leaderLatchListenerImpl创建了LeaderLatchListener
  • 最后使用LeaderElectorImp创建ILeaderElector

leaderLatchListenerImpl

storm-core-1.1.0-sources.jar!/org/apache/storm/zookeeper/Zookeeper.java

    // Leader latch listener that will be invoked when we either gain or lose leadershippublic static LeaderLatchListener leaderLatchListenerImpl(final Map conf, final CuratorFramework zk, final BlobStore blobStore, final LeaderLatch leaderLatch) throws UnknownHostException {final String hostName = InetAddress.getLocalHost().getCanonicalHostName();return new LeaderLatchListener() {final String STORM_JAR_SUFFIX = "-stormjar.jar";final String STORM_CODE_SUFFIX = "-stormcode.ser";final String STORM_CONF_SUFFIX = "-stormconf.ser";@Overridepublic void isLeader() {Set<String> activeTopologyIds = new TreeSet<>(Zookeeper.getChildren(zk, conf.get(Config.STORM_ZOOKEEPER_ROOT) + ClusterUtils.STORMS_SUBTREE, false));Set<String> activeTopologyBlobKeys = populateTopologyBlobKeys(activeTopologyIds);Set<String> activeTopologyCodeKeys = filterTopologyCodeKeys(activeTopologyBlobKeys);Set<String> allLocalBlobKeys = Sets.newHashSet(blobStore.listKeys());Set<String> allLocalTopologyBlobKeys = filterTopologyBlobKeys(allLocalBlobKeys);// this finds all active topologies blob keys from all local topology blob keysSets.SetView<String> diffTopology = Sets.difference(activeTopologyBlobKeys, allLocalTopologyBlobKeys);LOG.info("active-topology-blobs [{}] local-topology-blobs [{}] diff-topology-blobs [{}]",generateJoinedString(activeTopologyIds), generateJoinedString(allLocalTopologyBlobKeys),generateJoinedString(diffTopology));if (diffTopology.isEmpty()) {Set<String> activeTopologyDependencies = getTopologyDependencyKeys(activeTopologyCodeKeys);// this finds all dependency blob keys from active topologies from all local blob keysSets.SetView<String> diffDependencies = Sets.difference(activeTopologyDependencies, allLocalBlobKeys);LOG.info("active-topology-dependencies [{}] local-blobs [{}] diff-topology-dependencies [{}]",generateJoinedString(activeTopologyDependencies), generateJoinedString(allLocalBlobKeys),generateJoinedString(diffDependencies));if (diffDependencies.isEmpty()) {LOG.info("Accepting leadership, all active topologies and corresponding dependencies found locally.");} else {LOG.info("Code for all active topologies is available locally, but some dependencies are not found locally, giving up leadership.");closeLatch();}} else {LOG.info("code for all active topologies not available locally, giving up leadership.");closeLatch();}}@Overridepublic void notLeader() {LOG.info("{} lost leadership.", hostName);}//......private void closeLatch() {try {leaderLatch.close();} catch (IOException e) {throw new RuntimeException(e);}}};}
  • leaderLatchListenerImpl返回一个LeaderLatchListener接口的实现类
  • isLeader接口里头做了一些校验,即当被zookeeper选中为leader的时候,如果本地没有所有的active topologies或者本地没有所有dependencies,那么就需要调用leaderLatch.close()放弃leadership
  • notLeader接口主要打印一下log

LeaderElectorImp

org/apache/storm/zookeeper/LeaderElectorImp.java

public class LeaderElectorImp implements ILeaderElector {private static Logger LOG = LoggerFactory.getLogger(LeaderElectorImp.class);private final Map<String, Object> conf;private final List<String> servers;private final CuratorFramework zk;private final String leaderlockPath;private final String id;private final AtomicReference<LeaderLatch> leaderLatch;private final AtomicReference<LeaderLatchListener> leaderLatchListener;private final BlobStore blobStore;private final TopoCache tc;private final IStormClusterState clusterState;private final List<ACL> acls;private final StormMetricsRegistry metricsRegistry;public LeaderElectorImp(Map<String, Object> conf, List<String> servers, CuratorFramework zk, String leaderlockPath, String id,AtomicReference<LeaderLatch> leaderLatch, AtomicReference<LeaderLatchListener> leaderLatchListener,BlobStore blobStore, final TopoCache tc, IStormClusterState clusterState, List<ACL> acls,StormMetricsRegistry metricsRegistry) {this.conf = conf;this.servers = servers;this.zk = zk;this.leaderlockPath = leaderlockPath;this.id = id;this.leaderLatch = leaderLatch;this.leaderLatchListener = leaderLatchListener;this.blobStore = blobStore;this.tc = tc;this.clusterState = clusterState;this.acls = acls;this.metricsRegistry = metricsRegistry;}@Overridepublic void prepare(Map<String, Object> conf) {// no-op for zookeeper implementation}@Overridepublic void addToLeaderLockQueue() throws Exception {// if this latch is already closed, we need to create new instance.if (LeaderLatch.State.CLOSED.equals(leaderLatch.get().getState())) {leaderLatch.set(new LeaderLatch(zk, leaderlockPath));LeaderListenerCallback callback = new LeaderListenerCallback(conf, zk, leaderLatch.get(), blobStore, tc, clusterState, acls,metricsRegistry);leaderLatchListener.set(Zookeeper.leaderLatchListenerImpl(callback));LOG.info("LeaderLatch was in closed state. Resetted the leaderLatch and listeners.");}// Only if the latch is not already started we invoke startif (LeaderLatch.State.LATENT.equals(leaderLatch.get().getState())) {leaderLatch.get().addListener(leaderLatchListener.get());leaderLatch.get().start();LOG.info("Queued up for leader lock.");} else {LOG.info("Node already in queue for leader lock.");}}@Override// Only started latches can be closed.public void removeFromLeaderLockQueue() throws Exception {if (LeaderLatch.State.STARTED.equals(leaderLatch.get().getState())) {leaderLatch.get().close();LOG.info("Removed from leader lock queue.");} else {LOG.info("leader latch is not started so no removeFromLeaderLockQueue needed.");}}@Overridepublic boolean isLeader() throws Exception {return leaderLatch.get().hasLeadership();}@Overridepublic NimbusInfo getLeader() {try {return Zookeeper.toNimbusInfo(leaderLatch.get().getLeader());} catch (Exception e) {throw Utils.wrapInRuntime(e);}}@Overridepublic List<NimbusInfo> getAllNimbuses() throws Exception {List<NimbusInfo> nimbusInfos = new ArrayList<>();Collection<Participant> participants = leaderLatch.get().getParticipants();for (Participant participant : participants) {nimbusInfos.add(Zookeeper.toNimbusInfo(participant));}return nimbusInfos;}@Overridepublic void close() {//Do nothing now.}
}
  • LeaderElectorImp实现了ILeaderElector接口
  • addToLeaderLockQueue方法检测如果latch已经closed,则重新创建一个新的,然后检测latch的状态,如果还没有start的话,则调用start参与选举
  • 之所以对closed状态的latch创建一个,主要有两个原因:一是对已经closed的latch进行方法调用会抛异常,二是被zk选举为leader,但是不满意storm的一些leader条件会放弃leadership即close掉

小结

  • storm nimbus的LeaderElector主要是基于zookeeper recipies的LeaderLatch来实现
  • storm nimbus自定义了LeaderLatchListener,对成为leader之后的nimbus进行校验,需要本地拥有所有的active topologies以及所有dependencies,否则放弃leadership

doc

  • Highly Available Nimbus Design

转载于:https://my.oschina.net/go4it/blog/2239477

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/281933.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Android框架式编程之BufferKnife

BufferKnife作为框架式编程的重要组成部分&#xff0c;使用BufferKnife能够极大的精简View层面的代码量&#xff0c;并为MVP/MVC方式提供辅助。 一、配置 compile com.jakewharton:butterknife:(insert latest version) annotationProcessor com.jakewharton:butterknife-compi…

如果我去深圳,你会见我吗

▲图/ 深圳夜景初次见易小姐&#xff0c;还是21年的春节回老家的时候。想来20年因为疫情没有回家&#xff0c;家母几次三番电话里头表达的思念以及建议一些不靠谱的回家计划&#xff0c;着实有些不忍&#xff0c;确实有似“儿行千里母担忧”之理&#xff0c;索性拿着年假和加班…

CodeForces - 1059D(二分+误差)

链接&#xff1a;CodeForces - 1059D 题意&#xff1a;给出笛卡尔坐标系上 n 个点&#xff0c;求与 x 轴相切且覆盖了所有给出点的圆的最小半径。 题解&#xff1a;二分半径即可。判断&#xff1a;假设当前二分到的半径是 R &#xff0c;因为要和 x 轴相切&#xff0c;所以圆心…

pureref 平移用不了_关于参考图管理神器 PureRef 的一些快捷键

PureRef 的一些快捷键 软件下载&#xff1a;点击这里控制(配合左键)窗口内鼠标左键     框选窗口边鼠标左键     调整窗口大小鼠标中键 或 按住Alt     移动画布鼠标滚轮 或 按住Z     缩放画布按住S     查看目标位置颜色信息(可复制16进制颜色…

Windows 10 版本信息

Windows 10 版本信息 原文 https://technet.microsoft.com/zh-cn/windows/release-info Windows 10 版本信息 Microsoft 已更新其服务模型。 半年频道每年发布两次功能更新&#xff0c;时间大概在 3 月和 9 月&#xff0c;每个版本的服务时间线为 18 个月。 从 Windows 10 版本…

开源轻量的 .NET 监控工具 - 看门狗

你好&#xff0c;这里是 Dotnet 工具箱&#xff0c;定期分享 Dotnet 有趣&#xff0c;实用的工具或组件&#xff0c;希望对您有用&#xff01;简介WatchDog 是一个使用 C# 开发的开源的轻量监控工具&#xff0c;它可以记录和查看 ASP.Net Core Web 和 WebApi 的实时消息、事件、…

python读取oracle数据库性能_用python对oracle进行简单性能测试

一、概述dba在工作中避不开的两个问题&#xff0c;sql使用绑定变量到底会有多少的性能提升&#xff1f;数据库的审计功能如果打开对数据库的性能会产生多大的影响&#xff1f;最近恰好都碰到了&#xff0c;索性做个实验。sql使用绑定变量对性能的影响开通数据库审计功能对性能的…

BZOJ 3231: [Sdoi2008]递归数列 (JZYZOJ 1353) 矩阵快速幂

http://www.lydsy.com/JudgeOnline/problem.php?id3231和斐波那契一个道理在最后加一个求和即可1 #include<cstdio>2 #include<cstring>3 #include<iostream>4 //using namespace std;5 const int maxn10010;6 const double eps1e-8;7 long long modn;8 lon…

马斯克的火箭上天了,SpaceX开源项目也登上了热榜!

python知识手册SpaceX于美国东部时间5月30日下午3&#xff1a;22分将两位美国宇航员送往国际空间站&#xff0c;虽然这只是Demo任务&#xff0c;但SpaceX已经以其卓越工程优势、低廉的发射成本赢得了全球航天产业的信赖。同时也是除美俄中这些航天国家队以外&#xff0c;唯一独…

EasyMock学习笔记

目前在接触平台侧的开发&#xff0c;发现平台侧的东西和以前javacard开发很不一样&#xff0c;看来以后要学的东西还有很多很多。今天接触了下EasyMock。 Mock 方法是单元测试中常见的一种技术&#xff0c;它的主要作用是模拟一些在应用中不容易构造或者比较复杂的对象&#xf…

app启动广告页的实现,解决了广告图片要实时更新的问题

网上很多的实现方法很多都是显示第一次的缓存的图片&#xff0c;这样就造成后台更新广告图片App不能实时展示的问题。 我的具体实现思路是&#xff1a; 1.启动时先获取启动页的图片全屏展示。 2.设计一个等待时间&#xff0c;如果超过等待时间还没拿到图片就把获取的启动页去掉…

vue中点击插入html_Vue中插入HTML代码的方法

我们需要吧Hello World插入到My name is Pjee应该如何做&#xff1f;一、使用v-htmlv-html:更新元素的 innerHTMLconst text Hello World>My name is Pjee注意&#xff1a;你的站点上动态渲染的任意 HTML 可能会非常危险&#xff0c;因为它很容易导致 XSS 攻击。请只对可信…

进程共享变量#pragma data_seg用法

#pragma data_seg介绍用#pragma data_seg建立一个新的数据段并定义共享数据&#xff0c;其具体格式为&#xff1a;   #pragma data_seg &#xff08;"shareddata")   HWND sharedwndNULL;//共享数据   #pragma data_seg() ---------------------------------…

机器视觉Halcon教程(1.介绍)

前言本期教程主要教大家如何使用Halcon机器视觉&#xff0c;通过使用Halcon, 我们可以实现一些机器视觉的应用开发。例如: OCR识别、视觉定位、缺陷检测等内容。什么是halcon&#xff1f;简单来说, Halcon就是一款应用于机器视觉的软件&#xff0c;它提供了一套开发工具&#x…

网络时间的那些事及 ntpq 详解

2019独角兽企业重金招聘Python工程师标准>>> GMT (Greenwich Mean Time)格林威治时间 UTC (Coordinated Universal Time) 协调世界时 IAT (International Atomic Time),TAI 国际原子时 CST (Chinese Standard Time), 北京时间Gentoo&#xff08;也许其他发行版也是&…

【前端芝士树】Javascript的原型与原型链

【前端芝士树】Javascript的原型、原型链以及继承机制 前端的面试中经常会遇到这个问题&#xff0c;自己也是一直似懂非懂&#xff0c;趁这个机会整理一下0. 为什么会出现原型和原型链的概念 1994年&#xff0c;网景公司&#xff08;Netscape&#xff09;发布了Navigator浏览器…

神奇的幻方2015提高组d1t1

题目描述 幻方是一种很神奇的N*N矩阵&#xff1a;它由数字1,2,3,……,N*N构成&#xff0c;且每行、每列及两条对角线上的数字之和都相同。 当N为奇数时&#xff0c;我们可以通过以下方法构建一个幻方&#xff1a; 首先将1写在第一行的中间。 之后&#xff0c;按如下方式从小到大…

goldengate mysql_使用GoldenGate实现MySQL到Oracle的数据实时同步

step 1: 配置mysql修改配置文件my.ini#for goldengatelog-bin "C:/mysql/logbin/logbin.log"binlog-format ROWlog-bin-index "C:\mysql\logindex"binlog_cache_size32mmax_binlog_cache_size512mmax_binlog_size512m添加数据库用户ggs&#xff0c;具有…

C# 反射之Activator用法举例

概述程序运行时&#xff0c;通过反射可以得到其它程序集或者自己程序集代码的各种信息&#xff0c;包括类、函数、变量等来实例化它们&#xff0c;执行它们&#xff0c;操作它们&#xff0c;实际上就是获取程序在内存中的映像&#xff0c;然后基于这个映像进行各种操作。Activa…

MyBatis批量插入

转载于:https://blog.51cto.com/12701034/1929672