NameServer源码解析

1 模块入口代码的功能

        本节介绍入口代码的功能,阅读源码的时候,很多人喜欢根据执行逻辑,先从入口代码看起。NameServer部分入口代码主要完成命令行参数解析,初始化Controller的功能。

1.1 入口函数

首先看一下NameServer的源码目录(见图10-1)。

NamesrvStartup是模块的启动入口,NamesrvController是用来协块各个调模功能的代码。

我们从启动代码开始分析,找到NamesrvStartup.java里的main函数public static void main(String[]args){main0(args);},发现它又把逻辑转到main0这个函数里。

图10-1 NameServer源码目录

1.2 解析命令行参数

main0函数主要完成两个功能,第一个功能是解析命令行参数,我们通过源码来看一看,重点是解析-c和-p参数,如代码清单10-1所示。

代码清单10-1 解析NameServer命令行参数

Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args,
    buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {
    System.exit(-1);
    return null;
}
final NamesrvConfig namesrvConfig = new NamesrvConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(9876);
if (commandLine.hasOption('c')) {
    String file = commandLine.getOptionValue('c');
    if (file != null) {
        InputStream in = new BufferedInputStream(new
            FileInputStream(file));
        properties = new Properties();
        properties.load(in);
        MixAll.properties2Object(properties, namesrvConfig);
        MixAll.properties2Object(properties, nettyServerConfig);
        namesrvConfig.setConfigStorePath(file);
        System.out.printf("load config properties file OK, " +
            file + "%n");
        in.close();
    }
}
if (commandLine.hasOption('p')) {
    MixAll.printObjectProperties(null, namesrvConfig);
    MixAll.printObjectProperties(null, nettyServerConfig);
    System.exit(0);
}

 

-c命令行参数用来指定配置文件的位置;-p命令行参数用来打印所有配置项的值。注意,用-p参数打印配置项的值之后程序就退出了,这是一个帮助调试的选项。

1.3 初始化NameServer的Controller

main0函数的另外一个功能是初始化Controller,如代码清单10-2所示。

代码清单10-2 初始化并启动Controller

// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);
boolean initResult = controller.initialize();
if (!initResult) {
    controller.shutdown();
    System.exit(-3);
}
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log,
    new Callable<Void>() {
    @Override
    public Void call() throws Exception {
        controller.shutdown();
        return null;
    }
}));
controller.start();

 

根据解析出的配置参数,调用controller.initialize()来初始化,然后调用controller.start()让NameServer开始服务。

还有一个逻辑是注册ShutdownHookThread,当程序退出的时候会调用controller.shutdown来做退出前的清理工作。

2 NameServer的总控逻辑

NameServer的总控逻辑在NamesrvController.java代码中。NameServer是集群的协调者,它只是简单地接收其他角色报上来的状态,然后根据请求返回相应的状态。首先,NameserverController把执行线程池初始化好,如代码清单10-3所示。

代码清单10-3 线程池初始化

this.remotingExecutor =
    Executors.newFixedThreadPool(nettyServerConfig
        .getServerWorkerThreads(), new ThreadFactoryImpl
        ("RemotingExecutorThread_"));
this.registerProcessor();

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
       NamesrvController.this.routeInfoManager.scanNotActiveBroker();
    }
}, 5, 10, TimeUnit.SECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
       NamesrvController.this.kvConfigManager.printAllPeriodically();
    }
}, 1, 10, TimeUnit.MINUTES);

 

启动了一个默认是8个线程的线程池(private int serverWorkerThreads=8),还有两个定时执行的线程,一个用来扫描失效的Broker(scanNotActiveBroker),另一个用来打印配置信息(printAllPeriodically)。

然后启动负责通信的服务remotingServer,remotingServer监听一些端口,收到Broker、Client等发过来的请求后,根据请求的命令,调用不同的Processor来处理。这些不同的处理逻辑被放到上面初始化的线程池中执行,如代码清单10-4所示。

代码清单10-4 启动通信服务,关联初始化的线程池

this.remotingServer = new NettyRemotingServer(this.nettyServerConfig,
    this.brokerHousekeepingService);
……
if (namesrvConfig.isClusterTest()) {
    this.remotingServer.registerDefaultProcessor(new
            ClusterTestRequestProcessor(this, namesrvConfig
            .getProductEnvName()),
        this.remotingExecutor);
} else {
    this.remotingServer.registerDefaultProcessor(new
        DefaultRequestProcessor(this), this.remotingExecutor);
}

 

remotingServer是基于Netty封装的一个网络通信服务,要了解remoting-Server需要先对Netty有个基本的认知,后面会单独介绍。

3 核心业务逻辑处理

NameServer的核心业务逻辑,在DefaultRequestProcessor.java中可以一目了然地看出。网络通信服务模块收到请求后,就调用这个Processor来处理,如代码清单10-5所示。

代码清单10-5 根据请求码调用相应的处理逻辑

switch (request.getCode()) {
    case RequestCode.PUT_KV_CONFIG:
        return this.putKVConfig(ctx, request);
    case RequestCode.GET_KV_CONFIG:
        return this.getKVConfig(ctx, request);
    case RequestCode.DELETE_KV_CONFIG:
        return this.deleteKVConfig(ctx, request);
    case RequestCode.REGISTER_BROKER:
        Version brokerVersion = MQVersion.value2Version(request
            .getVersion());
        if (brokerVersion.ordinal() >= MQVersion.Version
            .V3_0_11.ordinal()) {
            return this.registerBrokerWithFilterServer(ctx, request);
        } else {
            return this.registerBroker(ctx, request);
        }
    case RequestCode.UNREGISTER_BROKER:
        return this.unregisterBroker(ctx, request);
    case RequestCode.GET_ROUTEINTO_BY_TOPIC:
        return this.getRouteInfoByTopic(ctx, request);
    case RequestCode.GET_BROKER_CLUSTER_INFO:
        return this.getBrokerClusterInfo(ctx, request);
    case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
        return this.wipeWritePermOfBroker(ctx, request);
    case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
        return getAllTopicListFromNameserver(ctx, request);
    case RequestCode.DELETE_TOPIC_IN_NAMESRV:
        return deleteTopicInNamesrv(ctx, request);
    case RequestCode.GET_KVLIST_BY_NAMESPACE:
        return this.getKVListByNamespace(ctx, request);
    case RequestCode.GET_TOPICS_BY_CLUSTER:
        return this.getTopicsByCluster(ctx, request);
    case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
        return this.getSystemTopicListFromNs(ctx, request);
    case RequestCode.GET_UNIT_TOPIC_LIST:
        return this.getUnitTopicList(ctx, request);
    case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
        return this.getHasUnitSubTopicList(ctx, request);
    case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
        return this.getHasUnitSubUnUnitTopicList(ctx, request);
    case RequestCode.UPDATE_NAMESRV_CONFIG:
        return this.updateConfig(ctx, request);
    case RequestCode.GET_NAMESRV_CONFIG:
        return this.getConfig(ctx, request);
    default:
        break;
}

 

逻辑主体是个switch语句,根据RequestCode调用不同的函数来处理,从RequestCode可以了解到NameServer的主要功能,比如:REGISTER_BROKER是在集群中新加入一个Broker机器;GET_ROUTEINTO_BY_TOPIC是请求获取一个Topic的路由信息;WIPE_WRITE_PERM_OF_BROKER是删除一个Broker的写权限。

4 集群状态存储

NameServer作为集群的协调者,需要保存和维护集群的各种元数据,这是通过RouteInfoManager类来实现的,如代码清单10-6所示。

代码清单10-6 RouteInfoManager的存储结构

private final HashMap<String/* topic */, List<QueueData>> topicQueue-Table;
private final HashMap<String/* brokerName */, BrokerData> brokerAddr-Table;
private final HashMap<String/* clusterName */, Set<String/* brokerName
*/>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo>
    brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter
Server */> filterServerTable;
public RouteInfoManager() {
    this.topicQueueTable = new HashMap<String, List<QueueData>>(1024);
    this.brokerAddrTable = new HashMap<String, BrokerData>(128);
    this.clusterAddrTable = new HashMap<String, Set<String>>(32);
    this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256);
    this.filterServerTable = new HashMap<String, List<String>>(256);
}

 

每个结构存储着一类集群信息,具体含义在第5章有介绍。了解RocketMQ各个角色的功能后,对每个结构的处理逻辑就好理解了。下面重点看一下控制访问这些结构的锁机制。

锁分为互斥锁、读写锁;也可分为可重入锁、不可重入锁。在NameServer的场景中,读取操作多,更改操作少,所以选择读写锁能大大提高效率。对于如何选择可重入和不可重入锁,重点看函数间的调用关系,比如多次获取锁的示例代码,如果这个lock是不可重入的,代码无法正常执行,如代码清单10-7所示。

代码清单10-7 多次获取锁示例

Lock lock = new Lock();
public void outer() {
    lock.lock();
    inner();
    lock.unlock();
}
public void inner() {
    lock.lock();
    //do something lock.unlock(); }
}

 

RouteInfoManager中使用的是可重入的读写锁(private final ReadWriteLock lock=new ReentrantReadWriteLock()),我们以deleteTopic函数为例,看一下锁的使用方式,如代码清单10-8所示。

代码清单10-8 锁的使用方式

public void deleteTopic(final String topic) {
    try {
        try {
            this.lock.writeLock().lockInterruptibly();
            this.topicQueueTable.remove(topic);
        } finally {
            this.lock.writeLock().unlock();
        }
    } catch (Exception e) {
        log.error("deleteTopic Exception", e);
    }
}

 

 

首先锁的获取和执行逻辑要放到一个try{}里,然后在finally{}中释放。这是一种典型的使用方式,我们可以参考这种方式实现自己的代码。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/153298.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

代码随想录算法训练营第五十九天丨 单调栈02

503.下一个更大元素II 思路 做本题之前建议先做739. 每日温度 (opens new window)和 496.下一个更大元素 I (opens new window)。 这道题和739. 每日温度 (opens new window)也几乎如出一辙。 不过&#xff0c;本题要循环数组了。 关于单调栈的讲解我在题解739. 每日温度 …

el-table 对循环产生的空白列赋默认值

1. el-table 空白列赋值 对el-table中未传数据存在空白的列赋默认值0。使用el-table 提供的插槽 slot-scope&#xff1a;{{ row || ‘0’ }} 原数据&#xff1a; <el-table-column label"集镇" :propcity ><template slot-scope"{row}">{{…

Vue实现表单效验

第一步&#xff1a;首先给form表单绑定一个rules属性 和 ref属性 <el-form :model"addFroms" label-position"right" :rules"rules" ref"ruleFormRef" label-width"100px"></el-form> 第二步&#xff1a;获取表…

TCC简介

TCC TCC&#xff08;Try-Confirm/Cancel&#xff09;是一种分布式事务处理模型&#xff0c;旨在解决分布式系统中的事务一致性问题。 三阶段 Try阶段&#xff1a; 在这个阶段&#xff0c;业务参与者尝试执行事务&#xff0c;并执行相应的业务逻辑。该阶段用于检查事务执行的…

007 OpenCV霍夫变换

目录 一、环境 二、霍夫变换原理 三、代码 一、环境 本文使用环境为&#xff1a; Windows10Python 3.9.17opencv-python 4.8.0.74 二、霍夫变换原理 OpenCV中的霍夫变换是一种用于检测图像中直线和圆的算法。它基于图像中像素的分布情况&#xff0c;通过统计像素点之间的…

2024年山东省职业院校技能大赛中职组“网络安全”赛项竞赛试题-C

2024年山东省职业院校技能大赛中职组 “网络安全”赛项竞赛试题-C 一、竞赛时间 总计&#xff1a;360分钟 二、竞赛阶段 竞赛阶段 任务阶段 竞赛任务 竞赛时间 分值 A、B模块 A-1 登录安全加固 180分钟 200分 A-2 本地安全策略设置 A-3 流量完整性保护 A-4 …

基于springboot实现智能热度分析和自媒体推送平台系统项目【项目源码】计算机毕业设计

基于springboot实现智能热度分析和自媒体推送平台演示 系统开发平台 在该自媒体分享网站中&#xff0c;Eclipse能给用户提供更多的方便&#xff0c;其特点一是方便学习&#xff0c;方便快捷&#xff1b;二是有非常大的信息储存量&#xff0c;主要功能是用在对数据库中查询和编…

纯CSS实现炫酷文本时钟

如图所示这是一个纯本文时钟效果,和传统的时钟不一样,没有表盘,也没有完整到每一分钟的数字表示当前时刻。 在这个时钟中,当前时间通过文本显示,显示的文本时间误差为+/- 4分钟,以明亮的颜色突出显示当前时间,而其余字母则较暗。 实际上这是一个实现很复杂的时钟,因为…

EOCR电机保护器的日常维护与保养技巧

EOCR是由施耐德韩国公司生产的电动机保护系列产品&#xff0c;由电子器件和互感器等部件组成&#xff0c;在正常状态下使用都有各自的机械寿命和电气寿命,若操作不当或异常条件下会加速电器元件的老化&#xff0c;缩短保护器的使用寿命&#xff0c;所以电机保护器的正确使用和正…

css 实现文字流光效果

经过调研发现大多滑块验证码中&#xff0c;有一些文字流光效果&#xff0c;因此在这里简单实现一下。 实现主要利用background 渐变背景以及backgorund-clip:text实现。具体代码如下 css部分 .slide {width: 300px;height: 40px;border: 1px solid #ccc;border-radius: 8px;…

提升工作效率,使用AnyTXT Searcher实现远程办公速查公司电脑文件——“cpolar内网穿透”

文章目录 前言1. AnyTXT Searcher1.1 下载安装AnyTXT Searcher 2. 下载安装注册cpolar3. AnyTXT Searcher设置和操作3.1 AnyTXT结合cpolar—公网访问搜索神器3.2 公网访问测试 4. 固定连接公网地址 前言 你是否遇到过这种情况&#xff0c;异地办公或者不在公司&#xff0c;想找…

linux 服务器进程、端口查找,nginx 配置日志查找,lsof 命令详解

一 、根据端口号 查看文件的部署位置 1.1 使用查看端口号对应的进程信息 方式一 &#xff1a; 使用netstat命令 netstat -tuln | grep 端口号-t&#xff1a;显示TCP连接 -u&#xff1a;显示UDP连接 -l&#xff1a;仅显示监听状态的连接 -n&#xff1a;以数字形式显示端口…

如果在手机没有root的情况下完成安卓手机数据恢复

您是否不小心从安卓设备中删除了重要数据&#xff1f; 担心如何取回您的照片、视频和文档&#xff1f; 有时您可能会不小心删除重要数据并使用安卓 root方法取回文件。 许多用户不喜欢根植他们的安卓设备&#xff0c;因为这是一种复杂的方法。 在本指南中&#xff0c;我们将向您…

控制您的音乐、视频等媒体内容

跨多个 Chrome 标签页播放音乐或声音 在计算机上打开 Chrome 。在标签页中播放音乐、视频或其他任何有声内容。您可以停留在该标签页上&#xff0c;也可以转到别处。要控制声音&#xff0c;请在右上角点击“媒体控件”图标 。您可暂停播放、转到下一首歌曲/下一个视频&#xf…

重磅,瑞士药监局 发布 EU GMP附录1《无菌药品生产》官方解读!

近日&#xff0c;瑞士药监局发布了EU GMP附录1《无菌药品生产》&#xff08;同时也是PIC/S和WHO GMP附录1&#xff09;的解读文件&#xff0c;该文件侧重于新版EU、PIC/S和WHO GMP附录1的一些最重要的变化&#xff0c;也涵盖了长期以来反复引起问题的方面。反映了检查员对这些主…

SpringBean的配置详解

Bean的基础配置 例如&#xff1a;配置UserDaoImpl由Spring容器负责管理 <beanid"userDao"class"com.xfy.dao.Impl.UserDaoImpl"></bean> 此时存储到Spring容器中的Bean的beanName是userDao&#xff0c;值是UserDaoImpl&#xff0c;可以根据bea…

Ubuntu本地快速搭建web小游戏网站,公网用户远程访问

前言 网&#xff1a;我们通常说的是互联网&#xff1b;站&#xff1a;可以理解成在互联网上的一个房子。把互联网看做一个城市&#xff0c;城市里面的每一个房子就是一个站点&#xff0c;房子里面放着你的资源&#xff0c;那如果有人想要访问你房子里面的东西怎么办&#xff1…

腾讯云轻量数据库试用初体验

腾讯云轻量数据库1核1G开箱测评&#xff0c;轻量数据库服务采用腾讯云自研的新一代云原生数据库TDSQL-C&#xff0c;轻量数据库兼100%兼容MySQL数据库&#xff0c;实现超百万级 QPS 的高吞吐&#xff0c;128TB海量分布式智能存储&#xff0c;虽然轻量数据库为单节点架构&#x…

ESP32 MicroPython AI摄像头应用⑩

ESP32 MicroPython AI摄像头应用⑩ 1、AI摄像头应用2、移动检测&#xff08;LCD显示&#xff09;3、实验内容3、参考代码4、实验结果 1、AI摄像头应用 我们小车MCU支持AI(人工智能)加速&#xff0c;可以用于加速神经网络计算和信号处理等工作的向量指令 (vector instructions)…