NameServer
作为注册中心,提供路由注册、路由踢出、路由发现功能,舍弃强一致,保证高可用,集群中各个节点不会实时通讯,其中一个节点下线之后,会提供另外一个节点保证路由功能。
启动入口
org.apache.rocketmq.namesrv.NamesrvStartup#main0
public static void main0(String[] args) {try {//启动namesrv之前的准备,命令行准备、parseCommandlineAndConfigFile(args);//创建namesrv控制器createAndStartNamesrvController();} catch (Throwable e) {e.printStackTrace();System.exit(-1);}
}public static void parseCommandlineAndConfigFile(String[] args) throws Exception {System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));//PackageConflictDetect.detectFastjson();// 1.构建命令行参数Options options = ServerUtil.buildCommandlineOptions(new Options());CommandLine commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());if (null == commandLine) {System.exit(-1);return;}// 2.创建namesrvconfig对象 namesrvConfig = new NamesrvConfig();// 3、创建netty配置,监听9876端口nettyServerConfig = new NettyServerConfig();nettyClientConfig = new NettyClientConfig();nettyServerConfig.setListenPort(9876);controllerConfig = new ControllerConfig();//4、解析启动命令-c参数if (commandLine.hasOption('c')) {String file = commandLine.getOptionValue('c');if (file != null) {InputStream in = new BufferedInputStream(Files.newInputStream(Paths.get(file)));properties = new Properties();properties.load(in);MixAll.properties2Object(properties, namesrvConfig);MixAll.properties2Object(properties, nettyServerConfig);MixAll.properties2Object(properties, nettyClientConfig);MixAll.properties2Object(properties, controllerConfig);namesrvConfig.setConfigStorePath(file);System.out.printf("load config properties file OK, %s%n", file);in.close();}}//5、解析启动命令-p参数if (commandLine.hasOption('p')) {MixAll.printObjectProperties(null, namesrvConfig);MixAll.printObjectProperties(null, nettyServerConfig);MixAll.printObjectProperties(null, nettyClientConfig);MixAll.printObjectProperties(null, controllerConfig);System.exit(0);}MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);if (null == namesrvConfig.getRocketmqHome()) {System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);System.exit(-2);}LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();JoranConfigurator configurator = new JoranConfigurator();configurator.setContext(lc);lc.reset();//处理配置信息configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);MixAll.printObjectProperties(log, namesrvConfig);MixAll.printObjectProperties(log, nettyServerConfig);}
public static void createAndStartNamesrvController() throws Exception {NamesrvController controller = createNamesrvController();start(controller);String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();log.info(tip);System.out.printf("%s%n", tip);
}
org.apache.rocketmq.namesrv.NamesrvController#initializepublic boolean initialize() {loadConfig();//实例化netty服务端和客户端initiateNetworkComponents();initiateThreadExecutors();registerProcessor();startScheduleService();initiateSslContext();initiateRpcHooks();return true;}
org.apache.rocketmq.namesrv.NamesrvController#startpublic void start() throws Exception {//this.remotingServer.start();// In test scenarios where it is up to OS to pick up an available port, set the listening port back to configif (0 == nettyServerConfig.getListenPort()) {nettyServerConfig.setListenPort(this.remotingServer.localListenPort());}this.remotingClient.updateNameServerAddressList(Collections.singletonList(RemotingUtil.getLocalAddress()+ ":" + nettyServerConfig.getListenPort()));this.remotingClient.start();if (this.fileWatchService != null) {this.fileWatchService.start();}this.routeInfoManager.start();}
//启动bootstrap、channel、bind
org.apache.rocketmq.remoting.netty.NettyRemotingServer#start@Overridepublic void start() {this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyServerConfig.getServerWorkerThreads(),new ThreadFactory() {private final AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());}});prepareSharableHandlers();serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector).channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024).option(ChannelOption.SO_REUSEADDR, true).childOption(ChannelOption.SO_KEEPALIVE, false).childOption(ChannelOption.TCP_NODELAY, true).localAddress(new InetSocketAddress(this.nettyServerConfig.getBindAddress(),this.nettyServerConfig.getListenPort())).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) {ch.pipeline().addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler).addLast(defaultEventExecutorGroup,encoder,new NettyDecoder(),new IdleStateHandler(0, 0,nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),connectionManageHandler,serverHandler);}});addCustomConfig(serverBootstrap);try {ChannelFuture sync = serverBootstrap.bind().sync();InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();if (0 == nettyServerConfig.getListenPort()) {this.nettyServerConfig.setListenPort(addr.getPort());}log.info("RemotingServer started, listening {}:{}", this.nettyServerConfig.getBindAddress(),this.nettyServerConfig.getListenPort());this.remotingServerTable.put(this.nettyServerConfig.getListenPort(), this);} catch (Exception e) {throw new IllegalStateException(String.format("Failed to bind to %s:%d", nettyServerConfig.getBindAddress(),nettyServerConfig.getListenPort()), e);}if (this.channelEventListener != null) {this.nettyEventExecutor.start();}this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {NettyRemotingServer.this.scanResponseTable();} catch (Throwable e) {log.error("scanResponseTable exception", e);}}}, 1000 * 3, 1000);}
//启动客户端
org.apache.rocketmq.remoting.netty.NettyRemotingClient#startpublic void start() {if (this.defaultEventExecutorGroup == null) {this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyClientConfig.getClientWorkerThreads(),new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());}});}Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_KEEPALIVE, false).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()).handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();if (nettyClientConfig.isUseTLS()) {if (null != sslContext) {pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));LOGGER.info("Prepend SSL handler");} else {LOGGER.warn("Connections are insecure as SSLContext is null!");}}ch.pipeline().addLast(nettyClientConfig.isDisableNettyWorkerGroup() ? null : defaultEventExecutorGroup,new NettyEncoder(),new NettyDecoder(),new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),new NettyConnectManageHandler(),new NettyClientHandler());}});if (nettyClientConfig.getClientSocketSndBufSize() > 0) {LOGGER.info("client set SO_SNDBUF to {}", nettyClientConfig.getClientSocketSndBufSize());handler.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize());}if (nettyClientConfig.getClientSocketRcvBufSize() > 0) {LOGGER.info("client set SO_RCVBUF to {}", nettyClientConfig.getClientSocketRcvBufSize());handler.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize());}if (nettyClientConfig.getWriteBufferLowWaterMark() > 0 && nettyClientConfig.getWriteBufferHighWaterMark() > 0) {LOGGER.info("client set netty WRITE_BUFFER_WATER_MARK to {},{}",nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark());handler.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark()));}if (nettyClientConfig.isClientPooledByteBufAllocatorEnable()) {handler.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);}this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {NettyRemotingClient.this.scanResponseTable();} catch (Throwable e) {LOGGER.error("scanResponseTable exception", e);}}}, 1000 * 3, 1000);// this.timer.scheduleAtFixedRate(new TimerTask() {
// @Override
// public void run() {
// try {
// NettyRemotingClient.this.scanChannelTablesOfNameServer();
// } catch (Exception e) {
// LOGGER.error("scanChannelTablesOfNameServer exception", e);
// }
// }
// }, 1000 * 3, 10 * 1000);this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {NettyRemotingClient.this.scanAvailableNameSrv();} catch (Exception e) {LOGGER.error("scanAvailableNameSrv exception", e);}}}, 0, this.nettyClientConfig.getConnectTimeoutMillis());}
路由注册
1、Broker服务每隔30秒向Namesrv发送一个心跳包。
org.apache.rocketmq.broker.BrokerController#start public void start() throws Exception {this.shouldStartTime = System.currentTimeMillis() + messageStoreConfig.getDisappearTimeAfterStart();if (messageStoreConfig.getTotalReplicas() > 1 && this.brokerConfig.isEnableSlaveActingMaster() || this.brokerConfig.isEnableControllerMode()) {isIsolated = true;}if (this.brokerOuterAPI != null) {this.brokerOuterAPI.start();}startBasicService();if (!isIsolated && !this.messageStoreConfig.isEnableDLegerCommitLog() && !this.messageStoreConfig.isDuplicationEnable()) {changeSpecialServiceStatus(this.brokerConfig.getBrokerId() == MixAll.MASTER_ID);this.registerBrokerAll(true, false, true);}scheduledFutures.add(this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {@Overridepublic void run2() {try {if (System.currentTimeMillis() < shouldStartTime) {BrokerController.LOG.info("Register to namesrv after {}", shouldStartTime);return;}if (isIsolated) {BrokerController.LOG.info("Skip register for broker is isolated");return;}BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());} catch (Throwable e) {BrokerController.LOG.error("registerBrokerAll Exception", e);}}}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS));if (this.brokerConfig.isEnableSlaveActingMaster()) {scheduleSendHeartbeat();scheduledFutures.add(this.syncBrokerMemberGroupExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {@Overridepublic void run2() {try {BrokerController.this.syncBrokerMemberGroup();} catch (Throwable e) {BrokerController.LOG.error("sync BrokerMemberGroup error. ", e);}}}, 1000, this.brokerConfig.getSyncBrokerMemberGroupPeriod(), TimeUnit.MILLISECONDS));}if (this.brokerConfig.isEnableControllerMode()) {scheduleSendHeartbeat();}if (brokerConfig.isSkipPreOnline()) {startServiceWithoutCondition();}}
org.apache.rocketmq.broker.BrokerController#scheduleSendHeartbeatprotected void scheduleSendHeartbeat() {scheduledFutures.add(this.brokerHeartbeatExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {@Overridepublic void run2() {if (isIsolated) {return;}try {BrokerController.this.sendHeartbeat();} catch (Exception e) {BrokerController.LOG.error("sendHeartbeat Exception", e);}}}, 1000, brokerConfig.getBrokerHeartbeatInterval(), TimeUnit.MILLISECONDS));}
2、broker向namesrv注册服务信息:topic信息、queue信息、集群信息。
2-1、broker利用netty发送服务信息
//org.apache.rocketmq.broker.BrokerController#registerBrokerAll
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {TopicConfigAndMappingSerializeWrapper topicConfigWrapper = new TopicConfigAndMappingSerializeWrapper();topicConfigWrapper.setDataVersion(this.getTopicConfigManager().getDataVersion());topicConfigWrapper.setTopicConfigTable(this.getTopicConfigManager().getTopicConfigTable());topicConfigWrapper.setTopicQueueMappingInfoMap(this.getTopicQueueMappingManager().getTopicQueueMappingTable().entrySet().stream().map(entry -> new AbstractMap.SimpleImmutableEntry<>(entry.getKey(), TopicQueueMappingDetail.cloneAsMappingInfo(entry.getValue()))).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>();for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {TopicConfig tmp =new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),topicConfig.getPerm() & this.brokerConfig.getBrokerPermission(), topicConfig.getTopicSysFlag());topicConfigTable.put(topicConfig.getTopicName(), tmp);}topicConfigWrapper.setTopicConfigTable(topicConfigTable);}if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),this.getBrokerAddr(),this.brokerConfig.getBrokerName(),this.brokerConfig.getBrokerId(),this.brokerConfig.getRegisterBrokerTimeoutMills(),this.brokerConfig.isInBrokerContainer())) {doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);}}protected void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,TopicConfigSerializeWrapper topicConfigWrapper) {if (shutdown) {BrokerController.LOG.info("BrokerController#doResterBrokerAll: broker has shutdown, no need to register any more.");return;}List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(this.brokerConfig.getBrokerClusterName(),this.getBrokerAddr(),this.brokerConfig.getBrokerName(),this.brokerConfig.getBrokerId(),this.getHAServerAddr(),topicConfigWrapper,this.filterServerManager.buildNewFilterServerList(),oneway,this.brokerConfig.getRegisterBrokerTimeoutMills(),this.brokerConfig.isEnableSlaveActingMaster(),this.brokerConfig.isCompressedRegister(),this.brokerConfig.isEnableSlaveActingMaster() ? this.brokerConfig.getBrokerNotActiveTimeoutMillis() : null,this.getBrokerIdentity());handleRegisterBrokerResult(registerBrokerResultList, checkOrderConfig);}
将broker的信息通过netty发送到namesrv上。
//org.apache.rocketmq.broker.BrokerController#handleRegisterBrokerResult
protected void handleRegisterBrokerResult(List<RegisterBrokerResult> registerBrokerResultList,boolean checkOrderConfig) {for (RegisterBrokerResult registerBrokerResult : registerBrokerResultList) {if (registerBrokerResult != null) {if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());}this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());if (checkOrderConfig) {this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());}break;}}}
org.apache.rocketmq.broker.out.BrokerOuterAPI#sendHeartbeatToController
org.apache.rocketmq.broker.out.BrokerOuterAPI#sendHeartbeatViaDataVersion
org.apache.rocketmq.broker.out.BrokerOuterAPI#sendHeartbeat/*** Send heartbeat to controller*/public void sendHeartbeatToController(final String controllerAddress,final String clusterName,final String brokerAddr,final String brokerName,final Long brokerId,final int timeoutMills,final boolean isInBrokerContainer,final int epoch,final long maxOffset,final long confirmOffset) {if (StringUtils.isEmpty(controllerAddress)) {return;}final BrokerHeartbeatRequestHeader requestHeader = new BrokerHeartbeatRequestHeader();requestHeader.setClusterName(clusterName);requestHeader.setBrokerAddr(brokerAddr);requestHeader.setBrokerName(brokerName);requestHeader.setEpoch(epoch);requestHeader.setMaxOffset(maxOffset);requestHeader.setConfirmOffset(confirmOffset);brokerOuterExecutor.execute(new AbstractBrokerRunnable(new BrokerIdentity(clusterName, brokerName, brokerId, isInBrokerContainer)) {@Overridepublic void run2() {RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.BROKER_HEARTBEAT, requestHeader);try {//使用netty客户端发送数据BrokerOuterAPI.this.remotingClient.invokeOneway(controllerAddress, request, timeoutMills);} catch (Exception e) {LOGGER.error("Error happen when send heartbeat to controller {}", controllerAddress, e);}}});}
org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeOneway@Overridepublic void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException,RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {final Channel channel = this.getAndCreateChannel(addr);if (channel != null && channel.isActive()) {try {doBeforeRpcHooks(addr, request);this.invokeOnewayImpl(channel, request, timeoutMillis);} catch (RemotingSendRequestException e) {LOGGER.warn("invokeOneway: send request exception, so close the channel[{}]", addr);this.closeChannel(addr, channel);throw e;}} else {this.closeChannel(addr, channel);throw new RemotingConnectException(addr);}}
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#invokeOnewayImplpublic void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {request.markOnewayRPC();boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);if (acquired) {final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);try {channel.writeAndFlush(request).addListener((ChannelFutureListener) f -> {once.release();if (!f.isSuccess()) {log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");}});} catch (Exception e) {once.release();log.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed.");throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);}} else {if (timeoutMillis <= 0) {throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast");} else {String info = String.format("invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreOnewayValue: %d",timeoutMillis,this.semaphoreOneway.getQueueLength(),this.semaphoreOneway.availablePermits());log.warn(info);throw new RemotingTimeoutException(info);}}}
2-2、namesrv接受并且注册服务信息到列表
org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#registerBroker
public RegisterBrokerResult registerBroker(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final String haServerAddr,final String zoneName,final Long timeoutMillis,final Boolean enableActingMaster,final TopicConfigSerializeWrapper topicConfigWrapper,final List<String> filterServerList,final Channel channel) {RegisterBrokerResult result = new RegisterBrokerResult();try {this.lock.writeLock().lockInterruptibly();//init or update the cluster infoSet<String> brokerNames = ConcurrentHashMapUtils.computeIfAbsent((ConcurrentHashMap<String, Set<String>>) this.clusterAddrTable, clusterName, k -> new HashSet<>());brokerNames.add(brokerName);boolean registerFirst = false;BrokerData brokerData = this.brokerAddrTable.get(brokerName);if (null == brokerData) {registerFirst = true;brokerData = new BrokerData(clusterName, brokerName, new HashMap<>());this.brokerAddrTable.put(brokerName, brokerData);}boolean isOldVersionBroker = enableActingMaster == null;brokerData.setEnableActingMaster(!isOldVersionBroker && enableActingMaster);brokerData.setZoneName(zoneName);Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();boolean isMinBrokerIdChanged = false;long prevMinBrokerId = 0;if (!brokerAddrsMap.isEmpty()) {prevMinBrokerId = Collections.min(brokerAddrsMap.keySet());}if (brokerId < prevMinBrokerId) {isMinBrokerIdChanged = true;}//Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>//The same IP:PORT must only have one record in brokerAddrTablebrokerAddrsMap.entrySet().removeIf(item -> null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey());//If Local brokerId stateVersion bigger than the registering one,String oldBrokerAddr = brokerAddrsMap.get(brokerId);if (null != oldBrokerAddr && !oldBrokerAddr.equals(brokerAddr)) {BrokerLiveInfo oldBrokerInfo = brokerLiveTable.get(new BrokerAddrInfo(clusterName, oldBrokerAddr));if (null != oldBrokerInfo) {long oldStateVersion = oldBrokerInfo.getDataVersion().getStateVersion();long newStateVersion = topicConfigWrapper.getDataVersion().getStateVersion();if (oldStateVersion > newStateVersion) {log.warn("Registered Broker conflicts with the existed one, just ignore.: Cluster:{}, BrokerName:{}, BrokerId:{}, " +"Old BrokerAddr:{}, Old Version:{}, New BrokerAddr:{}, New Version:{}.",clusterName, brokerName, brokerId, oldBrokerAddr, oldStateVersion, brokerAddr, newStateVersion);//Remove the rejected brokerAddr from brokerLiveTable.brokerLiveTable.remove(new BrokerAddrInfo(clusterName, brokerAddr));return result;}}}if (!brokerAddrsMap.containsKey(brokerId) && topicConfigWrapper.getTopicConfigTable().size() == 1) {log.warn("Can't register topicConfigWrapper={} because broker[{}]={} has not registered.",topicConfigWrapper.getTopicConfigTable(), brokerId, brokerAddr);return null;}String oldAddr = brokerAddrsMap.put(brokerId, brokerAddr);registerFirst = registerFirst || (StringUtils.isEmpty(oldAddr));boolean isMaster = MixAll.MASTER_ID == brokerId;boolean isPrimeSlave = !isOldVersionBroker && !isMaster&& brokerId == Collections.min(brokerAddrsMap.keySet());if (null != topicConfigWrapper && (isMaster || isPrimeSlave)) {ConcurrentMap<String, TopicConfig> tcTable =topicConfigWrapper.getTopicConfigTable();if (tcTable != null) {for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {if (registerFirst || this.isTopicConfigChanged(clusterName, brokerAddr,topicConfigWrapper.getDataVersion(), brokerName,entry.getValue().getTopicName())) {final TopicConfig topicConfig = entry.getValue();if (isPrimeSlave) {// Wipe write perm for prime slavetopicConfig.setPerm(topicConfig.getPerm() & (~PermName.PERM_WRITE));}this.createAndUpdateQueueData(brokerName, topicConfig);}}}if (this.isBrokerTopicConfigChanged(clusterName, brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) {TopicConfigAndMappingSerializeWrapper mappingSerializeWrapper = TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper);Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = mappingSerializeWrapper.getTopicQueueMappingInfoMap();//the topicQueueMappingInfoMap should never be null, but can be emptyfor (Map.Entry<String, TopicQueueMappingInfo> entry : topicQueueMappingInfoMap.entrySet()) {if (!topicQueueMappingInfoTable.containsKey(entry.getKey())) {topicQueueMappingInfoTable.put(entry.getKey(), new HashMap<>());}//Note asset brokerName equal entry.getValue().getBname()//here use the mappingDetail.bnametopicQueueMappingInfoTable.get(entry.getKey()).put(entry.getValue().getBname(), entry.getValue());}}}BrokerAddrInfo brokerAddrInfo = new BrokerAddrInfo(clusterName, brokerAddr);BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddrInfo,new BrokerLiveInfo(System.currentTimeMillis(),timeoutMillis == null ? DEFAULT_BROKER_CHANNEL_EXPIRED_TIME : timeoutMillis,topicConfigWrapper == null ? new DataVersion() : topicConfigWrapper.getDataVersion(),channel,haServerAddr));if (null == prevBrokerLiveInfo) {log.info("new broker registered, {} HAService: {}", brokerAddrInfo, haServerAddr);}if (filterServerList != null) {if (filterServerList.isEmpty()) {this.filterServerTable.remove(brokerAddrInfo);} else {this.filterServerTable.put(brokerAddrInfo, filterServerList);}}if (MixAll.MASTER_ID != brokerId) {String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);if (masterAddr != null) {BrokerAddrInfo masterAddrInfo = new BrokerAddrInfo(clusterName, masterAddr);BrokerLiveInfo masterLiveInfo = this.brokerLiveTable.get(masterAddrInfo);if (masterLiveInfo != null) {result.setHaServerAddr(masterLiveInfo.getHaServerAddr());result.setMasterAddr(masterAddr);}}}if (isMinBrokerIdChanged && namesrvConfig.isNotifyMinBrokerIdChanged()) {notifyMinBrokerIdChanged(brokerAddrsMap, null,this.brokerLiveTable.get(brokerAddrInfo).getHaServerAddr());}} catch (Exception e) {log.error("registerBroker Exception", e);} finally {this.lock.writeLock().unlock();}return result;}
路由剔除
NameServer和每个Broker保持长连接,每隔30秒接收Broker发送的心跳包,同时自身每个10秒扫描BrokerLiveTable,比较上次收到心跳时间和当前时间比较是否大于120秒,如果超过,那么认为Broker不可用,剔除路由表中该Broker相关信息。
org.apache.rocketmq.namesrv.NamesrvController#startScheduleService定时扫描
public void scanNotActiveBroker() {try {log.info("start scanNotActiveBroker");for (Entry<BrokerAddrInfo, BrokerLiveInfo> next : this.brokerLiveTable.entrySet()) {long last = next.getValue().getLastUpdateTimestamp();long timeoutMillis = next.getValue().getHeartbeatTimeoutMillis();if ((last + timeoutMillis) < System.currentTimeMillis()) {RemotingUtil.closeChannel(next.getValue().getChannel());log.warn("The broker channel expired, {} {}ms", next.getKey(), timeoutMillis);this.onChannelDestroy(next.getKey());}}} catch (Exception e) {log.error("scanNotActiveBroker exception", e);}
}public void onChannelDestroy(BrokerAddrInfo brokerAddrInfo) {UnRegisterBrokerRequestHeader unRegisterRequest = new UnRegisterBrokerRequestHeader();boolean needUnRegister = false;if (brokerAddrInfo != null) {try {try {this.lock.readLock().lockInterruptibly();needUnRegister = setupUnRegisterRequest(unRegisterRequest, brokerAddrInfo);} finally {this.lock.readLock().unlock();}} catch (Exception e) {log.error("onChannelDestroy Exception", e);}}if (needUnRegister) {//unregistrationQueue.offer(unRegisterRequest);boolean result = this.submitUnRegisterBrokerRequest(unRegisterRequest);log.info("the broker's channel destroyed, submit the unregister request at once, " +"broker info: {}, submit result: {}", unRegisterRequest, result);}}private boolean setupUnRegisterRequest(UnRegisterBrokerRequestHeader unRegisterRequest,BrokerAddrInfo brokerAddrInfo) {unRegisterRequest.setClusterName(brokerAddrInfo.getClusterName());unRegisterRequest.setBrokerAddr(brokerAddrInfo.getBrokerAddr());for (Entry<String, BrokerData> stringBrokerDataEntry : this.brokerAddrTable.entrySet()) {BrokerData brokerData = stringBrokerDataEntry.getValue();if (!brokerAddrInfo.getClusterName().equals(brokerData.getCluster())) {continue;}for (Entry<Long, String> entry : brokerData.getBrokerAddrs().entrySet()) {Long brokerId = entry.getKey();String brokerAddr = entry.getValue();if (brokerAddr.equals(brokerAddrInfo.getBrokerAddr())) {unRegisterRequest.setBrokerName(brokerData.getBrokerName());unRegisterRequest.setBrokerId(brokerId);return true;}}}return false;}
路由发现
路由发现不是实时的,路由变化后,NameServer不主动推给客户端,等待producer定期拉取最新路由信息。这样的设计方式降低了NameServer实现的复杂性,当路由发生变化时通过在消息发送端的容错机制来保证消息发送的高可用