1 模块入口代码的功能
本节介绍入口代码的功能,阅读源码的时候,很多人喜欢根据执行逻辑,先从入口代码看起。NameServer部分入口代码主要完成命令行参数解析,初始化Controller的功能。
1.1 入口函数
首先看一下NameServer的源码目录(见图10-1)。
NamesrvStartup是模块的启动入口,NamesrvController是用来协块各个调模功能的代码。
我们从启动代码开始分析,找到NamesrvStartup.java里的main函数public static void main(String[]args){main0(args);},发现它又把逻辑转到main0这个函数里。
图10-1 NameServer源码目录
1.2 解析命令行参数
main0函数主要完成两个功能,第一个功能是解析命令行参数,我们通过源码来看一看,重点是解析-c和-p参数,如代码清单10-1所示。
代码清单10-1 解析NameServer命令行参数
Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args,
buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {
System.exit(-1);
return null;
}
final NamesrvConfig namesrvConfig = new NamesrvConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(9876);
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
InputStream in = new BufferedInputStream(new
FileInputStream(file));
properties = new Properties();
properties.load(in);
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, " +
file + "%n");
in.close();
}
}
if (commandLine.hasOption('p')) {
MixAll.printObjectProperties(null, namesrvConfig);
MixAll.printObjectProperties(null, nettyServerConfig);
System.exit(0);
}
-c命令行参数用来指定配置文件的位置;-p命令行参数用来打印所有配置项的值。注意,用-p参数打印配置项的值之后程序就退出了,这是一个帮助调试的选项。
1.3 初始化NameServer的Controller
main0函数的另外一个功能是初始化Controller,如代码清单10-2所示。
代码清单10-2 初始化并启动Controller
// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log,
new Callable<Void>() {
@Override
public Void call() throws Exception {
controller.shutdown();
return null;
}
}));
controller.start();
根据解析出的配置参数,调用controller.initialize()来初始化,然后调用controller.start()让NameServer开始服务。
还有一个逻辑是注册ShutdownHookThread,当程序退出的时候会调用controller.shutdown来做退出前的清理工作。
2 NameServer的总控逻辑
NameServer的总控逻辑在NamesrvController.java代码中。NameServer是集群的协调者,它只是简单地接收其他角色报上来的状态,然后根据请求返回相应的状态。首先,NameserverController把执行线程池初始化好,如代码清单10-3所示。
代码清单10-3 线程池初始化
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig
.getServerWorkerThreads(), new ThreadFactoryImpl
("RemotingExecutorThread_"));
this.registerProcessor();
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
启动了一个默认是8个线程的线程池(private int serverWorkerThreads=8),还有两个定时执行的线程,一个用来扫描失效的Broker(scanNotActiveBroker),另一个用来打印配置信息(printAllPeriodically)。
然后启动负责通信的服务remotingServer,remotingServer监听一些端口,收到Broker、Client等发过来的请求后,根据请求的命令,调用不同的Processor来处理。这些不同的处理逻辑被放到上面初始化的线程池中执行,如代码清单10-4所示。
代码清单10-4 启动通信服务,关联初始化的线程池
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig,
this.brokerHousekeepingService);
……
if (namesrvConfig.isClusterTest()) {
this.remotingServer.registerDefaultProcessor(new
ClusterTestRequestProcessor(this, namesrvConfig
.getProductEnvName()),
this.remotingExecutor);
} else {
this.remotingServer.registerDefaultProcessor(new
DefaultRequestProcessor(this), this.remotingExecutor);
}
remotingServer是基于Netty封装的一个网络通信服务,要了解remoting-Server需要先对Netty有个基本的认知,后面会单独介绍。
3 核心业务逻辑处理
NameServer的核心业务逻辑,在DefaultRequestProcessor.java中可以一目了然地看出。网络通信服务模块收到请求后,就调用这个Processor来处理,如代码清单10-5所示。
代码清单10-5 根据请求码调用相应的处理逻辑
switch (request.getCode()) {
case RequestCode.PUT_KV_CONFIG:
return this.putKVConfig(ctx, request);
case RequestCode.GET_KV_CONFIG:
return this.getKVConfig(ctx, request);
case RequestCode.DELETE_KV_CONFIG:
return this.deleteKVConfig(ctx, request);
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request
.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version
.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
} else {
return this.registerBroker(ctx, request);
}
case RequestCode.UNREGISTER_BROKER:
return this.unregisterBroker(ctx, request);
case RequestCode.GET_ROUTEINTO_BY_TOPIC:
return this.getRouteInfoByTopic(ctx, request);
case RequestCode.GET_BROKER_CLUSTER_INFO:
return this.getBrokerClusterInfo(ctx, request);
case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
return this.wipeWritePermOfBroker(ctx, request);
case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
return getAllTopicListFromNameserver(ctx, request);
case RequestCode.DELETE_TOPIC_IN_NAMESRV:
return deleteTopicInNamesrv(ctx, request);
case RequestCode.GET_KVLIST_BY_NAMESPACE:
return this.getKVListByNamespace(ctx, request);
case RequestCode.GET_TOPICS_BY_CLUSTER:
return this.getTopicsByCluster(ctx, request);
case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
return this.getSystemTopicListFromNs(ctx, request);
case RequestCode.GET_UNIT_TOPIC_LIST:
return this.getUnitTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
return this.getHasUnitSubTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
return this.getHasUnitSubUnUnitTopicList(ctx, request);
case RequestCode.UPDATE_NAMESRV_CONFIG:
return this.updateConfig(ctx, request);
case RequestCode.GET_NAMESRV_CONFIG:
return this.getConfig(ctx, request);
default:
break;
}
逻辑主体是个switch语句,根据RequestCode调用不同的函数来处理,从RequestCode可以了解到NameServer的主要功能,比如:REGISTER_BROKER是在集群中新加入一个Broker机器;GET_ROUTEINTO_BY_TOPIC是请求获取一个Topic的路由信息;WIPE_WRITE_PERM_OF_BROKER是删除一个Broker的写权限。
4 集群状态存储
NameServer作为集群的协调者,需要保存和维护集群的各种元数据,这是通过RouteInfoManager类来实现的,如代码清单10-6所示。
代码清单10-6 RouteInfoManager的存储结构
private final HashMap<String/* topic */, List<QueueData>> topicQueue-Table;
private final HashMap<String/* brokerName */, BrokerData> brokerAddr-Table;
private final HashMap<String/* clusterName */, Set<String/* brokerName
*/>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo>
brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter
Server */> filterServerTable;
public RouteInfoManager() {
this.topicQueueTable = new HashMap<String, List<QueueData>>(1024);
this.brokerAddrTable = new HashMap<String, BrokerData>(128);
this.clusterAddrTable = new HashMap<String, Set<String>>(32);
this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256);
this.filterServerTable = new HashMap<String, List<String>>(256);
}
每个结构存储着一类集群信息,具体含义在第5章有介绍。了解RocketMQ各个角色的功能后,对每个结构的处理逻辑就好理解了。下面重点看一下控制访问这些结构的锁机制。
锁分为互斥锁、读写锁;也可分为可重入锁、不可重入锁。在NameServer的场景中,读取操作多,更改操作少,所以选择读写锁能大大提高效率。对于如何选择可重入和不可重入锁,重点看函数间的调用关系,比如多次获取锁的示例代码,如果这个lock是不可重入的,代码无法正常执行,如代码清单10-7所示。
代码清单10-7 多次获取锁示例
Lock lock = new Lock();
public void outer() {
lock.lock();
inner();
lock.unlock();
}
public void inner() {
lock.lock();
//do something lock.unlock(); }
}
RouteInfoManager中使用的是可重入的读写锁(private final ReadWriteLock lock=new ReentrantReadWriteLock()),我们以deleteTopic函数为例,看一下锁的使用方式,如代码清单10-8所示。
代码清单10-8 锁的使用方式
public void deleteTopic(final String topic) {
try {
try {
this.lock.writeLock().lockInterruptibly();
this.topicQueueTable.remove(topic);
} finally {
this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error("deleteTopic Exception", e);
}
}
首先锁的获取和执行逻辑要放到一个try{}里,然后在finally{}中释放。这是一种典型的使用方式,我们可以参考这种方式实现自己的代码。