Rocket mq namesrv源码分析

 NameServer

作为注册中心,提供路由注册、路由踢出、路由发现功能,舍弃强一致,保证高可用,集群中各个节点不会实时通讯,其中一个节点下线之后,会提供另外一个节点保证路由功能。

启动入口

org.apache.rocketmq.namesrv.NamesrvStartup#main0
public static void main0(String[] args) {try {//启动namesrv之前的准备,命令行准备、parseCommandlineAndConfigFile(args);//创建namesrv控制器createAndStartNamesrvController();} catch (Throwable e) {e.printStackTrace();System.exit(-1);}
}public static void parseCommandlineAndConfigFile(String[] args) throws Exception {System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));//PackageConflictDetect.detectFastjson();// 1.构建命令行参数Options options = ServerUtil.buildCommandlineOptions(new Options());CommandLine commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());if (null == commandLine) {System.exit(-1);return;}// 2.创建namesrvconfig对象   namesrvConfig = new NamesrvConfig();// 3、创建netty配置,监听9876端口nettyServerConfig = new NettyServerConfig();nettyClientConfig = new NettyClientConfig();nettyServerConfig.setListenPort(9876);controllerConfig = new ControllerConfig();//4、解析启动命令-c参数if (commandLine.hasOption('c')) {String file = commandLine.getOptionValue('c');if (file != null) {InputStream in = new BufferedInputStream(Files.newInputStream(Paths.get(file)));properties = new Properties();properties.load(in);MixAll.properties2Object(properties, namesrvConfig);MixAll.properties2Object(properties, nettyServerConfig);MixAll.properties2Object(properties, nettyClientConfig);MixAll.properties2Object(properties, controllerConfig);namesrvConfig.setConfigStorePath(file);System.out.printf("load config properties file OK, %s%n", file);in.close();}}//5、解析启动命令-p参数if (commandLine.hasOption('p')) {MixAll.printObjectProperties(null, namesrvConfig);MixAll.printObjectProperties(null, nettyServerConfig);MixAll.printObjectProperties(null, nettyClientConfig);MixAll.printObjectProperties(null, controllerConfig);System.exit(0);}MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);if (null == namesrvConfig.getRocketmqHome()) {System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);System.exit(-2);}LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();JoranConfigurator configurator = new JoranConfigurator();configurator.setContext(lc);lc.reset();//处理配置信息configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);MixAll.printObjectProperties(log, namesrvConfig);MixAll.printObjectProperties(log, nettyServerConfig);}
public static void createAndStartNamesrvController() throws Exception {NamesrvController controller = createNamesrvController();start(controller);String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();log.info(tip);System.out.printf("%s%n", tip);
}
org.apache.rocketmq.namesrv.NamesrvController#initializepublic boolean initialize() {loadConfig();//实例化netty服务端和客户端initiateNetworkComponents();initiateThreadExecutors();registerProcessor();startScheduleService();initiateSslContext();initiateRpcHooks();return true;}
org.apache.rocketmq.namesrv.NamesrvController#startpublic void start() throws Exception {//this.remotingServer.start();// In test scenarios where it is up to OS to pick up an available port, set the listening port back to configif (0 == nettyServerConfig.getListenPort()) {nettyServerConfig.setListenPort(this.remotingServer.localListenPort());}this.remotingClient.updateNameServerAddressList(Collections.singletonList(RemotingUtil.getLocalAddress()+ ":" + nettyServerConfig.getListenPort()));this.remotingClient.start();if (this.fileWatchService != null) {this.fileWatchService.start();}this.routeInfoManager.start();}
//启动bootstrap、channel、bind
org.apache.rocketmq.remoting.netty.NettyRemotingServer#start@Overridepublic void start() {this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyServerConfig.getServerWorkerThreads(),new ThreadFactory() {private final AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());}});prepareSharableHandlers();serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector).channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024).option(ChannelOption.SO_REUSEADDR, true).childOption(ChannelOption.SO_KEEPALIVE, false).childOption(ChannelOption.TCP_NODELAY, true).localAddress(new InetSocketAddress(this.nettyServerConfig.getBindAddress(),this.nettyServerConfig.getListenPort())).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) {ch.pipeline().addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler).addLast(defaultEventExecutorGroup,encoder,new NettyDecoder(),new IdleStateHandler(0, 0,nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),connectionManageHandler,serverHandler);}});addCustomConfig(serverBootstrap);try {ChannelFuture sync = serverBootstrap.bind().sync();InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();if (0 == nettyServerConfig.getListenPort()) {this.nettyServerConfig.setListenPort(addr.getPort());}log.info("RemotingServer started, listening {}:{}", this.nettyServerConfig.getBindAddress(),this.nettyServerConfig.getListenPort());this.remotingServerTable.put(this.nettyServerConfig.getListenPort(), this);} catch (Exception e) {throw new IllegalStateException(String.format("Failed to bind to %s:%d", nettyServerConfig.getBindAddress(),nettyServerConfig.getListenPort()), e);}if (this.channelEventListener != null) {this.nettyEventExecutor.start();}this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {NettyRemotingServer.this.scanResponseTable();} catch (Throwable e) {log.error("scanResponseTable exception", e);}}}, 1000 * 3, 1000);}
//启动客户端
org.apache.rocketmq.remoting.netty.NettyRemotingClient#startpublic void start() {if (this.defaultEventExecutorGroup == null) {this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyClientConfig.getClientWorkerThreads(),new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());}});}Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_KEEPALIVE, false).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()).handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();if (nettyClientConfig.isUseTLS()) {if (null != sslContext) {pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));LOGGER.info("Prepend SSL handler");} else {LOGGER.warn("Connections are insecure as SSLContext is null!");}}ch.pipeline().addLast(nettyClientConfig.isDisableNettyWorkerGroup() ? null : defaultEventExecutorGroup,new NettyEncoder(),new NettyDecoder(),new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),new NettyConnectManageHandler(),new NettyClientHandler());}});if (nettyClientConfig.getClientSocketSndBufSize() > 0) {LOGGER.info("client set SO_SNDBUF to {}", nettyClientConfig.getClientSocketSndBufSize());handler.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize());}if (nettyClientConfig.getClientSocketRcvBufSize() > 0) {LOGGER.info("client set SO_RCVBUF to {}", nettyClientConfig.getClientSocketRcvBufSize());handler.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize());}if (nettyClientConfig.getWriteBufferLowWaterMark() > 0 && nettyClientConfig.getWriteBufferHighWaterMark() > 0) {LOGGER.info("client set netty WRITE_BUFFER_WATER_MARK to {},{}",nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark());handler.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark()));}if (nettyClientConfig.isClientPooledByteBufAllocatorEnable()) {handler.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);}this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {NettyRemotingClient.this.scanResponseTable();} catch (Throwable e) {LOGGER.error("scanResponseTable exception", e);}}}, 1000 * 3, 1000);//        this.timer.scheduleAtFixedRate(new TimerTask() {
//            @Override
//            public void run() {
//                try {
//                    NettyRemotingClient.this.scanChannelTablesOfNameServer();
//                } catch (Exception e) {
//                    LOGGER.error("scanChannelTablesOfNameServer exception", e);
//                }
//            }
//        }, 1000 * 3, 10 * 1000);this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {NettyRemotingClient.this.scanAvailableNameSrv();} catch (Exception e) {LOGGER.error("scanAvailableNameSrv exception", e);}}}, 0, this.nettyClientConfig.getConnectTimeoutMillis());}

路由注册

1、Broker服务每隔30秒向Namesrv发送一个心跳包。

org.apache.rocketmq.broker.BrokerController#start public void start() throws Exception {this.shouldStartTime = System.currentTimeMillis() + messageStoreConfig.getDisappearTimeAfterStart();if (messageStoreConfig.getTotalReplicas() > 1 && this.brokerConfig.isEnableSlaveActingMaster() || this.brokerConfig.isEnableControllerMode()) {isIsolated = true;}if (this.brokerOuterAPI != null) {this.brokerOuterAPI.start();}startBasicService();if (!isIsolated && !this.messageStoreConfig.isEnableDLegerCommitLog() && !this.messageStoreConfig.isDuplicationEnable()) {changeSpecialServiceStatus(this.brokerConfig.getBrokerId() == MixAll.MASTER_ID);this.registerBrokerAll(true, false, true);}scheduledFutures.add(this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {@Overridepublic void run2() {try {if (System.currentTimeMillis() < shouldStartTime) {BrokerController.LOG.info("Register to namesrv after {}", shouldStartTime);return;}if (isIsolated) {BrokerController.LOG.info("Skip register for broker is isolated");return;}BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());} catch (Throwable e) {BrokerController.LOG.error("registerBrokerAll Exception", e);}}}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS));if (this.brokerConfig.isEnableSlaveActingMaster()) {scheduleSendHeartbeat();scheduledFutures.add(this.syncBrokerMemberGroupExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {@Overridepublic void run2() {try {BrokerController.this.syncBrokerMemberGroup();} catch (Throwable e) {BrokerController.LOG.error("sync BrokerMemberGroup error. ", e);}}}, 1000, this.brokerConfig.getSyncBrokerMemberGroupPeriod(), TimeUnit.MILLISECONDS));}if (this.brokerConfig.isEnableControllerMode()) {scheduleSendHeartbeat();}if (brokerConfig.isSkipPreOnline()) {startServiceWithoutCondition();}}
org.apache.rocketmq.broker.BrokerController#scheduleSendHeartbeatprotected void scheduleSendHeartbeat() {scheduledFutures.add(this.brokerHeartbeatExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {@Overridepublic void run2() {if (isIsolated) {return;}try {BrokerController.this.sendHeartbeat();} catch (Exception e) {BrokerController.LOG.error("sendHeartbeat Exception", e);}}}, 1000, brokerConfig.getBrokerHeartbeatInterval(), TimeUnit.MILLISECONDS));}

2、broker向namesrv注册服务信息:topic信息、queue信息、集群信息。

         2-1、broker利用netty发送服务信息
//org.apache.rocketmq.broker.BrokerController#registerBrokerAll    
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {TopicConfigAndMappingSerializeWrapper topicConfigWrapper = new TopicConfigAndMappingSerializeWrapper();topicConfigWrapper.setDataVersion(this.getTopicConfigManager().getDataVersion());topicConfigWrapper.setTopicConfigTable(this.getTopicConfigManager().getTopicConfigTable());topicConfigWrapper.setTopicQueueMappingInfoMap(this.getTopicQueueMappingManager().getTopicQueueMappingTable().entrySet().stream().map(entry -> new AbstractMap.SimpleImmutableEntry<>(entry.getKey(), TopicQueueMappingDetail.cloneAsMappingInfo(entry.getValue()))).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>();for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {TopicConfig tmp =new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),topicConfig.getPerm() & this.brokerConfig.getBrokerPermission(), topicConfig.getTopicSysFlag());topicConfigTable.put(topicConfig.getTopicName(), tmp);}topicConfigWrapper.setTopicConfigTable(topicConfigTable);}if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),this.getBrokerAddr(),this.brokerConfig.getBrokerName(),this.brokerConfig.getBrokerId(),this.brokerConfig.getRegisterBrokerTimeoutMills(),this.brokerConfig.isInBrokerContainer())) {doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);}}protected void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,TopicConfigSerializeWrapper topicConfigWrapper) {if (shutdown) {BrokerController.LOG.info("BrokerController#doResterBrokerAll: broker has shutdown, no need to register any more.");return;}List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(this.brokerConfig.getBrokerClusterName(),this.getBrokerAddr(),this.brokerConfig.getBrokerName(),this.brokerConfig.getBrokerId(),this.getHAServerAddr(),topicConfigWrapper,this.filterServerManager.buildNewFilterServerList(),oneway,this.brokerConfig.getRegisterBrokerTimeoutMills(),this.brokerConfig.isEnableSlaveActingMaster(),this.brokerConfig.isCompressedRegister(),this.brokerConfig.isEnableSlaveActingMaster() ? this.brokerConfig.getBrokerNotActiveTimeoutMillis() : null,this.getBrokerIdentity());handleRegisterBrokerResult(registerBrokerResultList, checkOrderConfig);}
将broker的信息通过netty发送到namesrv上。
//org.apache.rocketmq.broker.BrokerController#handleRegisterBrokerResult
protected void handleRegisterBrokerResult(List<RegisterBrokerResult> registerBrokerResultList,boolean checkOrderConfig) {for (RegisterBrokerResult registerBrokerResult : registerBrokerResultList) {if (registerBrokerResult != null) {if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());}this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());if (checkOrderConfig) {this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());}break;}}}
org.apache.rocketmq.broker.out.BrokerOuterAPI#sendHeartbeatToController
org.apache.rocketmq.broker.out.BrokerOuterAPI#sendHeartbeatViaDataVersion
org.apache.rocketmq.broker.out.BrokerOuterAPI#sendHeartbeat/*** Send heartbeat to controller*/public void sendHeartbeatToController(final String controllerAddress,final String clusterName,final String brokerAddr,final String brokerName,final Long brokerId,final int timeoutMills,final boolean isInBrokerContainer,final int epoch,final long maxOffset,final long confirmOffset) {if (StringUtils.isEmpty(controllerAddress)) {return;}final BrokerHeartbeatRequestHeader requestHeader = new BrokerHeartbeatRequestHeader();requestHeader.setClusterName(clusterName);requestHeader.setBrokerAddr(brokerAddr);requestHeader.setBrokerName(brokerName);requestHeader.setEpoch(epoch);requestHeader.setMaxOffset(maxOffset);requestHeader.setConfirmOffset(confirmOffset);brokerOuterExecutor.execute(new AbstractBrokerRunnable(new BrokerIdentity(clusterName, brokerName, brokerId, isInBrokerContainer)) {@Overridepublic void run2() {RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.BROKER_HEARTBEAT, requestHeader);try {//使用netty客户端发送数据BrokerOuterAPI.this.remotingClient.invokeOneway(controllerAddress, request, timeoutMills);} catch (Exception e) {LOGGER.error("Error happen when send heartbeat to controller {}", controllerAddress, e);}}});}
org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeOneway@Overridepublic void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException,RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {final Channel channel = this.getAndCreateChannel(addr);if (channel != null && channel.isActive()) {try {doBeforeRpcHooks(addr, request);this.invokeOnewayImpl(channel, request, timeoutMillis);} catch (RemotingSendRequestException e) {LOGGER.warn("invokeOneway: send request exception, so close the channel[{}]", addr);this.closeChannel(addr, channel);throw e;}} else {this.closeChannel(addr, channel);throw new RemotingConnectException(addr);}}
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#invokeOnewayImplpublic void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {request.markOnewayRPC();boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);if (acquired) {final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);try {channel.writeAndFlush(request).addListener((ChannelFutureListener) f -> {once.release();if (!f.isSuccess()) {log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");}});} catch (Exception e) {once.release();log.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed.");throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);}} else {if (timeoutMillis <= 0) {throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast");} else {String info = String.format("invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreOnewayValue: %d",timeoutMillis,this.semaphoreOneway.getQueueLength(),this.semaphoreOneway.availablePermits());log.warn(info);throw new RemotingTimeoutException(info);}}}
2-2、namesrv接受并且注册服务信息到列表 
org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#registerBroker
    public RegisterBrokerResult registerBroker(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final String haServerAddr,final String zoneName,final Long timeoutMillis,final Boolean enableActingMaster,final TopicConfigSerializeWrapper topicConfigWrapper,final List<String> filterServerList,final Channel channel) {RegisterBrokerResult result = new RegisterBrokerResult();try {this.lock.writeLock().lockInterruptibly();//init or update the cluster infoSet<String> brokerNames = ConcurrentHashMapUtils.computeIfAbsent((ConcurrentHashMap<String, Set<String>>) this.clusterAddrTable, clusterName, k -> new HashSet<>());brokerNames.add(brokerName);boolean registerFirst = false;BrokerData brokerData = this.brokerAddrTable.get(brokerName);if (null == brokerData) {registerFirst = true;brokerData = new BrokerData(clusterName, brokerName, new HashMap<>());this.brokerAddrTable.put(brokerName, brokerData);}boolean isOldVersionBroker = enableActingMaster == null;brokerData.setEnableActingMaster(!isOldVersionBroker && enableActingMaster);brokerData.setZoneName(zoneName);Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();boolean isMinBrokerIdChanged = false;long prevMinBrokerId = 0;if (!brokerAddrsMap.isEmpty()) {prevMinBrokerId = Collections.min(brokerAddrsMap.keySet());}if (brokerId < prevMinBrokerId) {isMinBrokerIdChanged = true;}//Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>//The same IP:PORT must only have one record in brokerAddrTablebrokerAddrsMap.entrySet().removeIf(item -> null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey());//If Local brokerId stateVersion bigger than the registering one,String oldBrokerAddr = brokerAddrsMap.get(brokerId);if (null != oldBrokerAddr && !oldBrokerAddr.equals(brokerAddr)) {BrokerLiveInfo oldBrokerInfo = brokerLiveTable.get(new BrokerAddrInfo(clusterName, oldBrokerAddr));if (null != oldBrokerInfo) {long oldStateVersion = oldBrokerInfo.getDataVersion().getStateVersion();long newStateVersion = topicConfigWrapper.getDataVersion().getStateVersion();if (oldStateVersion > newStateVersion) {log.warn("Registered Broker conflicts with the existed one, just ignore.: Cluster:{}, BrokerName:{}, BrokerId:{}, " +"Old BrokerAddr:{}, Old Version:{}, New BrokerAddr:{}, New Version:{}.",clusterName, brokerName, brokerId, oldBrokerAddr, oldStateVersion, brokerAddr, newStateVersion);//Remove the rejected brokerAddr from brokerLiveTable.brokerLiveTable.remove(new BrokerAddrInfo(clusterName, brokerAddr));return result;}}}if (!brokerAddrsMap.containsKey(brokerId) && topicConfigWrapper.getTopicConfigTable().size() == 1) {log.warn("Can't register topicConfigWrapper={} because broker[{}]={} has not registered.",topicConfigWrapper.getTopicConfigTable(), brokerId, brokerAddr);return null;}String oldAddr = brokerAddrsMap.put(brokerId, brokerAddr);registerFirst = registerFirst || (StringUtils.isEmpty(oldAddr));boolean isMaster = MixAll.MASTER_ID == brokerId;boolean isPrimeSlave = !isOldVersionBroker && !isMaster&& brokerId == Collections.min(brokerAddrsMap.keySet());if (null != topicConfigWrapper && (isMaster || isPrimeSlave)) {ConcurrentMap<String, TopicConfig> tcTable =topicConfigWrapper.getTopicConfigTable();if (tcTable != null) {for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {if (registerFirst || this.isTopicConfigChanged(clusterName, brokerAddr,topicConfigWrapper.getDataVersion(), brokerName,entry.getValue().getTopicName())) {final TopicConfig topicConfig = entry.getValue();if (isPrimeSlave) {// Wipe write perm for prime slavetopicConfig.setPerm(topicConfig.getPerm() & (~PermName.PERM_WRITE));}this.createAndUpdateQueueData(brokerName, topicConfig);}}}if (this.isBrokerTopicConfigChanged(clusterName, brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) {TopicConfigAndMappingSerializeWrapper mappingSerializeWrapper = TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper);Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = mappingSerializeWrapper.getTopicQueueMappingInfoMap();//the topicQueueMappingInfoMap should never be null, but can be emptyfor (Map.Entry<String, TopicQueueMappingInfo> entry : topicQueueMappingInfoMap.entrySet()) {if (!topicQueueMappingInfoTable.containsKey(entry.getKey())) {topicQueueMappingInfoTable.put(entry.getKey(), new HashMap<>());}//Note asset brokerName equal entry.getValue().getBname()//here use the mappingDetail.bnametopicQueueMappingInfoTable.get(entry.getKey()).put(entry.getValue().getBname(), entry.getValue());}}}BrokerAddrInfo brokerAddrInfo = new BrokerAddrInfo(clusterName, brokerAddr);BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddrInfo,new BrokerLiveInfo(System.currentTimeMillis(),timeoutMillis == null ? DEFAULT_BROKER_CHANNEL_EXPIRED_TIME : timeoutMillis,topicConfigWrapper == null ? new DataVersion() : topicConfigWrapper.getDataVersion(),channel,haServerAddr));if (null == prevBrokerLiveInfo) {log.info("new broker registered, {} HAService: {}", brokerAddrInfo, haServerAddr);}if (filterServerList != null) {if (filterServerList.isEmpty()) {this.filterServerTable.remove(brokerAddrInfo);} else {this.filterServerTable.put(brokerAddrInfo, filterServerList);}}if (MixAll.MASTER_ID != brokerId) {String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);if (masterAddr != null) {BrokerAddrInfo masterAddrInfo = new BrokerAddrInfo(clusterName, masterAddr);BrokerLiveInfo masterLiveInfo = this.brokerLiveTable.get(masterAddrInfo);if (masterLiveInfo != null) {result.setHaServerAddr(masterLiveInfo.getHaServerAddr());result.setMasterAddr(masterAddr);}}}if (isMinBrokerIdChanged && namesrvConfig.isNotifyMinBrokerIdChanged()) {notifyMinBrokerIdChanged(brokerAddrsMap, null,this.brokerLiveTable.get(brokerAddrInfo).getHaServerAddr());}} catch (Exception e) {log.error("registerBroker Exception", e);} finally {this.lock.writeLock().unlock();}return result;}

路由剔除

NameServer和每个Broker保持长连接,每隔30秒接收Broker发送的心跳包,同时自身每个10秒扫描BrokerLiveTable,比较上次收到心跳时间和当前时间比较是否大于120秒,如果超过,那么认为Broker不可用,剔除路由表中该Broker相关信息。

org.apache.rocketmq.namesrv.NamesrvController#startScheduleService定时扫描
public void scanNotActiveBroker() {try {log.info("start scanNotActiveBroker");for (Entry<BrokerAddrInfo, BrokerLiveInfo> next : this.brokerLiveTable.entrySet()) {long last = next.getValue().getLastUpdateTimestamp();long timeoutMillis = next.getValue().getHeartbeatTimeoutMillis();if ((last + timeoutMillis) < System.currentTimeMillis()) {RemotingUtil.closeChannel(next.getValue().getChannel());log.warn("The broker channel expired, {} {}ms", next.getKey(), timeoutMillis);this.onChannelDestroy(next.getKey());}}} catch (Exception e) {log.error("scanNotActiveBroker exception", e);}
}public void onChannelDestroy(BrokerAddrInfo brokerAddrInfo) {UnRegisterBrokerRequestHeader unRegisterRequest = new UnRegisterBrokerRequestHeader();boolean needUnRegister = false;if (brokerAddrInfo != null) {try {try {this.lock.readLock().lockInterruptibly();needUnRegister = setupUnRegisterRequest(unRegisterRequest, brokerAddrInfo);} finally {this.lock.readLock().unlock();}} catch (Exception e) {log.error("onChannelDestroy Exception", e);}}if (needUnRegister) {//unregistrationQueue.offer(unRegisterRequest);boolean result = this.submitUnRegisterBrokerRequest(unRegisterRequest);log.info("the broker's channel destroyed, submit the unregister request at once, " +"broker info: {}, submit result: {}", unRegisterRequest, result);}}private boolean setupUnRegisterRequest(UnRegisterBrokerRequestHeader unRegisterRequest,BrokerAddrInfo brokerAddrInfo) {unRegisterRequest.setClusterName(brokerAddrInfo.getClusterName());unRegisterRequest.setBrokerAddr(brokerAddrInfo.getBrokerAddr());for (Entry<String, BrokerData> stringBrokerDataEntry : this.brokerAddrTable.entrySet()) {BrokerData brokerData = stringBrokerDataEntry.getValue();if (!brokerAddrInfo.getClusterName().equals(brokerData.getCluster())) {continue;}for (Entry<Long, String> entry : brokerData.getBrokerAddrs().entrySet()) {Long brokerId = entry.getKey();String brokerAddr = entry.getValue();if (brokerAddr.equals(brokerAddrInfo.getBrokerAddr())) {unRegisterRequest.setBrokerName(brokerData.getBrokerName());unRegisterRequest.setBrokerId(brokerId);return true;}}}return false;}

路由发现

路由发现不是实时的,路由变化后,NameServer不主动推给客户端,等待producer定期拉取最新路由信息。这样的设计方式降低了NameServer实现的复杂性,当路由发生变化时通过在消息发送端的容错机制来保证消息发送的高可用

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/178144.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【带头学C++】----- 八、C++面向对象编程 ---- 8.5 struct结构体类型增强使用说明

目录 8.5 struct结构体类型增强使用说明 8.5.1 C结构体可以定义成员函数 8.5.2 c中定义结构体变量可以不加struct关键字 8.6 bool布尔类型关键字 8.5 struct结构体类型增强使用说明 第六章对结构体的使用、内存对齐以及数组、深拷贝和浅拷贝进行了一个详细的说明&#xff0c…

【数据结构实验】排序(二)希尔排序算法的详细介绍与性能分析

文章目录 1. 引言2. 希尔排序算法原理2.1 示例说明2.2 时间复杂性分析 3. 实验内容3.1 实验题目&#xff08;一&#xff09;输入要求&#xff08;二&#xff09;输出要求 3.2 算法实现3.3 代码解析3.4 实验结果 4. 实验结论 1. 引言 排序算法在计算机科学中扮演着至关重要的角色…

Leetcode211. 添加与搜索单词 - 数据结构设计

Every day a Leetcode 题目来源&#xff1a;211. 添加与搜索单词 - 数据结构设计 解法1&#xff1a;字典树 字典树&#xff08;前缀树&#xff09;是一种树形数据结构&#xff0c;用于高效地存储和检索字符串数据集中的键。前缀树可以用 O(∣S∣) 的时间复杂度完成如下操作…

基于单片机温湿度光照自动窗帘系统设计

**单片机设计介绍&#xff0c; 基于单片机温湿度光照自动窗帘系统设计 文章目录 一 概要二、功能设计设计思路 三、 软件设计原理图 五、 程序六、 文章目录 一 概要 基于单片机的温湿度光照自动窗帘系统是一种智能家居系统&#xff0c;通过使用单片机作为控制核心&#xff0c…

【IQR与MAD】原理,一文带你玩转箱型图含详细解释与代码

IQR方法 基于四分位数&#xff1a;使用数据的第一四分位数&#xff08;25%&#xff09;和第三四分位数&#xff08;75%&#xff09;来计算。 对称&#xff1a;相对于中位数对称地考虑上下界。 受极端值影响&#xff1a;如果数据中包含极端值&#xff0c;IQR可能会被拉得很大&a…

CMA认证是什么?CMA软件测试报告如何获取?

资格证书在各行各业都是一种专业性象征&#xff0c;如第三方检测机构的CMA认证&#xff0c;在相应的检测报告上加盖CMA章可获得国家以及行业认可&#xff0c;还是享受税收优惠的有力证明材料。 一、CMA认证是什么?   CMA是中国计量认证的简称&#xff0c;由省级以上人民政府…

顺序栈的基本操作(超详细)

目录 前言 一、顺序栈的定义 二、顺序栈的c语言结构描述表示 三、顺序栈中基本操作的实现 3.1顺序栈的初始化 3.2判断顺序栈是否为空 3.3求顺序栈的长度 3.4清空顺序栈 3.5销毁顺序栈 3.6顺序栈的入栈 3.7顺序栈的出栈 3.8求栈顶元素 3.9遍历顺序栈 四、顺序栈的…

JDK21发布了!面试官:来,谈下jdk21的新特性!

1.前言 JDK21 计划23年9月19日正式发布&#xff0c;尽管一直以来都是“版随意出&#xff0c;换 8 算我输”&#xff0c;但这么多年这么多版本的折腾&#xff0c;若是之前的 LTS 版本JDK17你还觉得不错&#xff0c;那 JDK21还是有必要关注一下&#xff0c;因为会有一批重要更新…

华清远见嵌入式学习——C++——作业一

作业要求&#xff1a; 代码&#xff1a; #include <iostream>using namespace std;int main() {string str;cout << "请输入一个字符串&#xff1a;" << endl;getline(cin,str);int dx0,xx0,sz0,kg0,qt0;int len str.size() 1;for(int i0;i<l…

HarmonyOS 传感器开发指南

HarmonyOS 系统传感器是应用访问底层硬件传感器的一种设备抽象概念。开发者根据传感器提供的Sensor接口&#xff0c;可以查询设备上的传感器&#xff0c;订阅传感器数据&#xff0c;并根据传感器数据定制相应的算法开发各类应用&#xff0c;比如指南针、运动健康、游戏等。 运作…

Centos 如何判断分区是mbr还是gpt格式

1 介绍 MBR 自20世纪80年代初以来的标准分区表格式每个驱动器最多支持四个主分区最多可以划分2TB的磁盘 GPT GPT是MBR分区表格式的后续每个驱动器最多支持128个分区可以将一个磁盘分区到最大到18艾字节 对小于2TB的磁盘使用MBR对大于2TB的磁盘使用GTP 2 查询方式 2.1 fdis…

校园虚拟化部署与横向扩展统一存储

项目背景 这所隶属教育部直属重点大学&#xff0c;学校设有11个学科体系&#xff0c;现有本硕博学生共29000余人&#xff0c;为积极响应“中国教育现代化2023战略部署”&#xff0c;校方制定教育信息化2.0发展目标&#xff0c;通过平台融合&#xff0c;数据驱动、技术赋能等措…

Linux常见指令基础知识

目录 初始Linux操作系统 Linux背景&#xff1a; 开源 &#xff1a; 发行版本&#xff1a; ​编辑 OS概念&#xff0c;定位&#xff1a; 使用 XShell 远程登录 Linux Linux相关知识 文件是什么&#xff1f; 路径分隔符 &#xff08;.&#xff09; 和 &#xff08;. .&…

Docker Swarm总结+Jenkins安装配置与集成(5/5)

博主介绍&#xff1a;Java领域优质创作者,博客之星城市赛道TOP20、专注于前端流行技术框架、Java后端技术领域、项目实战运维以及GIS地理信息领域。 &#x1f345;文末获取源码下载地址&#x1f345; &#x1f447;&#x1f3fb; 精彩专栏推荐订阅&#x1f447;&#x1f3fb;…

反射、枚举以及lambda表达式

1. 反射 1.1 定义 java的.class文件在运行时会被编译为一个Class对象&#xff0c;既然是对象&#xff0c;那么我们就可以通过一定的方式取到这个对象&#xff0c;然后对于这个对象进行一系列操作&#xff08;改变原本类的属性、方法&#xff09;。 这个操作就是反射&#xf…

数据仓库建模下篇

在实际业务中&#xff0c;给了我们一堆数据&#xff0c;我们怎么拿这些数据进行数仓建设呢&#xff0c;数仓工具箱作者根据自身多年的实际业务经验&#xff0c;给我们总结了如下四步。 数仓工具箱中的维度建模四步走&#xff1a; 维度建模四步走 这四步是环环相扣&#xff0c…

JOSEF 可调漏电继电器 RT-L1KS φ25mm 导轨或面板安装

RT-L系列可调漏电继电器&#xff08;以下简称继电器&#xff09;适用于交流电压至690V&#xff0c;频率为50Hz&#xff0c;电流至1500A及以下漏电继电器中性点接地电路中。 RT-L系列可调型漏电继电器 RT-L1K可调型漏电继电器 RT-L2K可调型漏电继电器 RT-L3K可调型漏电继电器…

Mybaits-plus的使用

MybatisPlus特性 润物无声&#xff1a; 只做增强不做改变&#xff0c;引入它不会对现有工程产生改变&#xff0c;如丝般顺滑。 效率至上 只需简单配置&#xff0c;即可快速进行单表CRUD操作&#xff0c;从而节省大量时间。 使用MybatisPlus依赖基本步骤 引入MybatisPlus依…

echarts点击事件

有这么个需求要点击叶片的时候跳转页面 代码&#xff1a;点击之后 报错了 解决办法 1、使用箭头函数&#xff08;箭头函数没有自己的 this&#xff0c;所以在箭头函数中使用 this 时&#xff0c;其指向与外层作用域相同。&#xff09;或者使用闭包来解决上下文的问题。 2、使…

微信可以添加多少好友?

不知道有没有小伙伴好奇&#xff0c;微信到底可以添加多少好友&#xff1f;正好这个话题也上热搜了&#xff0c;我们就来了解一下。 有网友表示&#xff0c;自己的微信好友数量有10004个&#xff0c;已经不能再添加新的微信好友了。 一个微信号&#xff0c;可以添加的好友上限…