# 消息中间件 RocketMQ 高级功能和源码分析(五)

消息中间件 RocketMQ 高级功能和源码分析(五)

一、 消息中间件 RocketMQ 源码分析:NameServer 路由元数据

1、消息中间件 RocketMQ 中,NameServer 路由管理

NameServer 的主要作用是为消息的生产者和消息消费者提供关于主题 Topic 的路由信息,那么 NameServer 需要存储路由的基础信息,还要管理 Broker 节点,包括路由注册、路由删除等。

2、消息中间件 RocketMQ 中,NameServer 路由元信息

1) 代码:RouteInfoManager


private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

2)消息中间件 RocketMQ 中,NameServer 路由实体图:

在这里插入图片描述

3)说明:

topicQueueTable: Topic 消息队列路由信息,消息发送时根据路由表进行负载均衡

brokerAddrTable: Broker 基础信息,包括brokerName、所属集群名称、主备 Broker 地址

clusterAddrTable: Broker 集群信息,存储集群中所有 Broker 名称

brokerLiveTable: Broker 状态信息,NameServer 每次收到心跳包是会替换该信息

filterServerTable: Broker 上的 FilterServer 列表,用于类模式消息过滤。

RocketMQ 基于定于发布机制,一个 Topic 拥有多个消息队列,一个 Broker 为每一个主题创建4个读队列和4个写队列。多个 Broker 组成一个集群,集群由相同的多台 Broker 组成 Master-Slave 架构,brokerId为0 代表 Master,大于0为 Slave。BrokerLiveInfo 中的 lastUpdateTimestamp 存储上次收到 Broker 心跳包的时间。

4)实体数据实例图

在这里插入图片描述

在这里插入图片描述

3、org.apache.rocketmq.broker.BrokerStartup.java 源码:

/*
D:\RocketMQ\rocketmq-master\broker\src\main\java\org\apache\rocketmq\broker\BrokerStartup.java* Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements.  See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with* the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
package org.apache.rocketmq.broker;import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.joran.JoranConfigurator;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.common.TlsMode;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.netty.NettySystemConfig;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.slf4j.LoggerFactory;import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_ENABLE;public class BrokerStartup {public static Properties properties = null;public static CommandLine commandLine = null;public static String configFile = null;public static InternalLogger log;public static void main(String[] args) {start(createBrokerController(args));}public static BrokerController start(BrokerController controller) {try {controller.start();String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", "+ controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();if (null != controller.getBrokerConfig().getNamesrvAddr()) {tip += " and name server is " + controller.getBrokerConfig().getNamesrvAddr();}log.info(tip);System.out.printf("%s%n", tip);return controller;} catch (Throwable e) {e.printStackTrace();System.exit(-1);}return null;}public static void shutdown(final BrokerController controller) {if (null != controller) {controller.shutdown();}}public static BrokerController createBrokerController(String[] args) {System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) {NettySystemConfig.socketSndbufSize = 131072;}if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) {NettySystemConfig.socketRcvbufSize = 131072;}try {//PackageConflictDetect.detectFastjson();Options options = ServerUtil.buildCommandlineOptions(new Options());commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),new PosixParser());if (null == commandLine) {System.exit(-1);}final BrokerConfig brokerConfig = new BrokerConfig();final NettyServerConfig nettyServerConfig = new NettyServerConfig();final NettyClientConfig nettyClientConfig = new NettyClientConfig();nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));nettyServerConfig.setListenPort(10911);final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);}if (commandLine.hasOption('c')) {String file = commandLine.getOptionValue('c');if (file != null) {configFile = file;InputStream in = new BufferedInputStream(new FileInputStream(file));properties = new Properties();properties.load(in);properties2SystemEnv(properties);MixAll.properties2Object(properties, brokerConfig);MixAll.properties2Object(properties, nettyServerConfig);MixAll.properties2Object(properties, nettyClientConfig);MixAll.properties2Object(properties, messageStoreConfig);BrokerPathConfigHelper.setBrokerConfigPath(file);in.close();}}MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);if (null == brokerConfig.getRocketmqHome()) {System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);System.exit(-2);}String namesrvAddr = brokerConfig.getNamesrvAddr();if (null != namesrvAddr) {try {String[] addrArray = namesrvAddr.split(";");for (String addr : addrArray) {RemotingUtil.string2SocketAddress(addr);}} catch (Exception e) {System.out.printf("The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n",namesrvAddr);System.exit(-3);}}switch (messageStoreConfig.getBrokerRole()) {case ASYNC_MASTER:case SYNC_MASTER:brokerConfig.setBrokerId(MixAll.MASTER_ID);break;case SLAVE:if (brokerConfig.getBrokerId() <= 0) {System.out.printf("Slave's brokerId must be > 0");System.exit(-3);}break;default:break;}messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();JoranConfigurator configurator = new JoranConfigurator();configurator.setContext(lc);lc.reset();configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml");if (commandLine.hasOption('p')) {InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);MixAll.printObjectProperties(console, brokerConfig);MixAll.printObjectProperties(console, nettyServerConfig);MixAll.printObjectProperties(console, nettyClientConfig);MixAll.printObjectProperties(console, messageStoreConfig);System.exit(0);} else if (commandLine.hasOption('m')) {InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);MixAll.printObjectProperties(console, brokerConfig, true);MixAll.printObjectProperties(console, nettyServerConfig, true);MixAll.printObjectProperties(console, nettyClientConfig, true);MixAll.printObjectProperties(console, messageStoreConfig, true);System.exit(0);}log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);MixAll.printObjectProperties(log, brokerConfig);MixAll.printObjectProperties(log, nettyServerConfig);MixAll.printObjectProperties(log, nettyClientConfig);MixAll.printObjectProperties(log, messageStoreConfig);final BrokerController controller = new BrokerController(brokerConfig,nettyServerConfig,nettyClientConfig,messageStoreConfig);// remember all configs to prevent discardcontroller.getConfiguration().registerConfig(properties);boolean initResult = controller.initialize();if (!initResult) {controller.shutdown();System.exit(-3);}Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {private volatile boolean hasShutdown = false;private AtomicInteger shutdownTimes = new AtomicInteger(0);@Overridepublic void run() {synchronized (this) {log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());if (!this.hasShutdown) {this.hasShutdown = true;long beginTime = System.currentTimeMillis();controller.shutdown();long consumingTimeTotal = System.currentTimeMillis() - beginTime;log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);}}}}, "ShutdownHook"));return controller;} catch (Throwable e) {e.printStackTrace();System.exit(-1);}return null;}private static void properties2SystemEnv(Properties properties) {if (properties == null) {return;}String rmqAddressServerDomain = properties.getProperty("rmqAddressServerDomain", MixAll.WS_DOMAIN_NAME);String rmqAddressServerSubGroup = properties.getProperty("rmqAddressServerSubGroup", MixAll.WS_DOMAIN_SUBGROUP);System.setProperty("rocketmq.namesrv.domain", rmqAddressServerDomain);System.setProperty("rocketmq.namesrv.domain.subgroup", rmqAddressServerSubGroup);}private static Options buildCommandlineOptions(final Options options) {Option opt = new Option("c", "configFile", true, "Broker config properties file");opt.setRequired(false);options.addOption(opt);opt = new Option("p", "printConfigItem", false, "Print all config item");opt.setRequired(false);options.addOption(opt);opt = new Option("m", "printImportantConfig", false, "Print important config item");opt.setRequired(false);options.addOption(opt);return options;}
}

4、org.apache.rocketmq.broker.BrokerController.java 源码:

/*
D:\RocketMQ\rocketmq-master\broker\src\main\java\org\apache\rocketmq\broker\BrokerController.java* Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements.  See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with* the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
package org.apache.rocketmq.broker;import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.broker.client.ClientHousekeepingService;
import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
import org.apache.rocketmq.broker.client.ConsumerManager;
import org.apache.rocketmq.broker.client.DefaultConsumerIdsChangeListener;
import org.apache.rocketmq.broker.client.ProducerManager;
import org.apache.rocketmq.broker.client.net.Broker2Client;
import org.apache.rocketmq.broker.client.rebalance.RebalanceLockManager;
import org.apache.rocketmq.broker.dledger.DLedgerRoleChangeHandler;
import org.apache.rocketmq.broker.filter.CommitLogDispatcherCalcBitMap;
import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
import org.apache.rocketmq.broker.filtersrv.FilterServerManager;
import org.apache.rocketmq.broker.latency.BrokerFastFailure;
import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor;
import org.apache.rocketmq.broker.longpolling.NotifyMessageArrivingListener;
import org.apache.rocketmq.broker.longpolling.PullRequestHoldService;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
import org.apache.rocketmq.broker.mqtrace.SendMessageHook;
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
import org.apache.rocketmq.broker.out.BrokerOuterAPI;
import org.apache.rocketmq.broker.plugin.MessageStoreFactory;
import org.apache.rocketmq.broker.plugin.MessageStorePluginContext;
import org.apache.rocketmq.broker.processor.AdminBrokerProcessor;
import org.apache.rocketmq.broker.processor.ClientManageProcessor;
import org.apache.rocketmq.broker.processor.ConsumerManageProcessor;
import org.apache.rocketmq.broker.processor.EndTransactionProcessor;
import org.apache.rocketmq.broker.processor.PullMessageProcessor;
import org.apache.rocketmq.broker.processor.QueryMessageProcessor;
import org.apache.rocketmq.broker.processor.SendMessageProcessor;
import org.apache.rocketmq.broker.slave.SlaveSynchronize;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
import org.apache.rocketmq.broker.topic.TopicConfigManager;
import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener;
import org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService;
import org.apache.rocketmq.broker.transaction.TransactionalMessageService;
import org.apache.rocketmq.broker.transaction.queue.DefaultTransactionalMessageCheckListener;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl;
import org.apache.rocketmq.broker.util.ServiceProvider;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.Configuration;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.stats.MomentStatsItem;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.common.TlsMode;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.netty.RequestTask;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.srvutil.FileWatchService;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.MessageArrivingListener;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.dledger.DLedgerCommitLog;
import org.apache.rocketmq.store.stats.BrokerStats;
import org.apache.rocketmq.store.stats.BrokerStatsManager;public class BrokerController {private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);private static final InternalLogger LOG_PROTECTION = InternalLoggerFactory.getLogger(LoggerName.PROTECTION_LOGGER_NAME);private static final InternalLogger LOG_WATER_MARK = InternalLoggerFactory.getLogger(LoggerName.WATER_MARK_LOGGER_NAME);private final BrokerConfig brokerConfig;private final NettyServerConfig nettyServerConfig;private final NettyClientConfig nettyClientConfig;private final MessageStoreConfig messageStoreConfig;private final ConsumerOffsetManager consumerOffsetManager;private final ConsumerManager consumerManager;private final ConsumerFilterManager consumerFilterManager;private final ProducerManager producerManager;private final ClientHousekeepingService clientHousekeepingService;private final PullMessageProcessor pullMessageProcessor;private final PullRequestHoldService pullRequestHoldService;private final MessageArrivingListener messageArrivingListener;private final Broker2Client broker2Client;private final SubscriptionGroupManager subscriptionGroupManager;private final ConsumerIdsChangeListener consumerIdsChangeListener;private final RebalanceLockManager rebalanceLockManager = new RebalanceLockManager();private final BrokerOuterAPI brokerOuterAPI;private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("BrokerControllerScheduledThread"));private final SlaveSynchronize slaveSynchronize;private final BlockingQueue<Runnable> sendThreadPoolQueue;private final BlockingQueue<Runnable> pullThreadPoolQueue;private final BlockingQueue<Runnable> queryThreadPoolQueue;private final BlockingQueue<Runnable> clientManagerThreadPoolQueue;private final BlockingQueue<Runnable> heartbeatThreadPoolQueue;private final BlockingQueue<Runnable> consumerManagerThreadPoolQueue;private final BlockingQueue<Runnable> endTransactionThreadPoolQueue;private final FilterServerManager filterServerManager;private final BrokerStatsManager brokerStatsManager;private final List<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();private final List<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();private MessageStore messageStore;private RemotingServer remotingServer;private RemotingServer fastRemotingServer;private TopicConfigManager topicConfigManager;private ExecutorService sendMessageExecutor;private ExecutorService pullMessageExecutor;private ExecutorService queryMessageExecutor;private ExecutorService adminBrokerExecutor;private ExecutorService clientManageExecutor;private ExecutorService heartbeatExecutor;private ExecutorService consumerManageExecutor;private ExecutorService endTransactionExecutor;private boolean updateMasterHAServerAddrPeriodically = false;private BrokerStats brokerStats;private InetSocketAddress storeHost;private BrokerFastFailure brokerFastFailure;private Configuration configuration;private FileWatchService fileWatchService;private TransactionalMessageCheckService transactionalMessageCheckService;private TransactionalMessageService transactionalMessageService;private AbstractTransactionalMessageCheckListener transactionalMessageCheckListener;private Future<?> slaveSyncFuture;public BrokerController(final BrokerConfig brokerConfig,final NettyServerConfig nettyServerConfig,final NettyClientConfig nettyClientConfig,final MessageStoreConfig messageStoreConfig) {this.brokerConfig = brokerConfig;this.nettyServerConfig = nettyServerConfig;this.nettyClientConfig = nettyClientConfig;this.messageStoreConfig = messageStoreConfig;this.consumerOffsetManager = new ConsumerOffsetManager(this);this.topicConfigManager = new TopicConfigManager(this);this.pullMessageProcessor = new PullMessageProcessor(this);this.pullRequestHoldService = new PullRequestHoldService(this);this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService);this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);this.consumerFilterManager = new ConsumerFilterManager(this);this.producerManager = new ProducerManager();this.clientHousekeepingService = new ClientHousekeepingService(this);this.broker2Client = new Broker2Client(this);this.subscriptionGroupManager = new SubscriptionGroupManager(this);this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);this.filterServerManager = new FilterServerManager(this);this.slaveSynchronize = new SlaveSynchronize(this);this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());this.heartbeatThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());this.endTransactionThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getEndTransactionPoolQueueCapacity());this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName());this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));this.brokerFastFailure = new BrokerFastFailure(this);this.configuration = new Configuration(log,BrokerPathConfigHelper.getBrokerConfigPath(),this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig);}public BrokerConfig getBrokerConfig() {return brokerConfig;}public NettyServerConfig getNettyServerConfig() {return nettyServerConfig;}public BlockingQueue<Runnable> getPullThreadPoolQueue() {return pullThreadPoolQueue;}public BlockingQueue<Runnable> getQueryThreadPoolQueue() {return queryThreadPoolQueue;}public boolean initialize() throws CloneNotSupportedException {boolean result = this.topicConfigManager.load();result = result && this.consumerOffsetManager.load();result = result && this.subscriptionGroupManager.load();result = result && this.consumerFilterManager.load();if (result) {try {this.messageStore =new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,this.brokerConfig);if (messageStoreConfig.isEnableDLegerCommitLog()) {DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);}this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);//load pluginMessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);this.messageStore = MessageStoreFactory.build(context, this.messageStore);this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));} catch (IOException e) {result = false;log.error("Failed to initialize", e);}}result = result && this.messageStore.load();if (result) {this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getSendMessageThreadPoolNums(),this.brokerConfig.getSendMessageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.sendThreadPoolQueue,new ThreadFactoryImpl("SendMessageThread_"));this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getPullMessageThreadPoolNums(),this.brokerConfig.getPullMessageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.pullThreadPoolQueue,new ThreadFactoryImpl("PullMessageThread_"));this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getQueryMessageThreadPoolNums(),this.brokerConfig.getQueryMessageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.queryThreadPoolQueue,new ThreadFactoryImpl("QueryMessageThread_"));this.adminBrokerExecutor =Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl("AdminBrokerThread_"));this.clientManageExecutor = new ThreadPoolExecutor(this.brokerConfig.getClientManageThreadPoolNums(),this.brokerConfig.getClientManageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.clientManagerThreadPoolQueue,new ThreadFactoryImpl("ClientManageThread_"));this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getHeartbeatThreadPoolNums(),this.brokerConfig.getHeartbeatThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.heartbeatThreadPoolQueue,new ThreadFactoryImpl("HeartbeatThread_", true));this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getEndTransactionThreadPoolNums(),this.brokerConfig.getEndTransactionThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.endTransactionThreadPoolQueue,new ThreadFactoryImpl("EndTransactionThread_"));this.consumerManageExecutor =Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl("ConsumerManageThread_"));this.registerProcessor();final long initialDelay = UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis();final long period = 1000 * 60 * 60 * 24;this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.getBrokerStats().record();} catch (Throwable e) {log.error("schedule record error.", e);}}}, initialDelay, period, TimeUnit.MILLISECONDS);this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.consumerOffsetManager.persist();} catch (Throwable e) {log.error("schedule persist consumerOffset error.", e);}}}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.consumerFilterManager.persist();} catch (Throwable e) {log.error("schedule persist consumer filter error.", e);}}}, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.protectBroker();} catch (Throwable e) {log.error("protectBroker error.", e);}}}, 3, 3, TimeUnit.MINUTES);this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.printWaterMark();} catch (Throwable e) {log.error("printWaterMark error.", e);}}}, 10, 1, TimeUnit.SECONDS);this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());} catch (Throwable e) {log.error("schedule dispatchBehindBytes error.", e);}}}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);if (this.brokerConfig.getNamesrvAddr() != null) {this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());} else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.brokerOuterAPI.fetchNameServerAddr();} catch (Throwable e) {log.error("ScheduledTask fetchNameServerAddr exception", e);}}}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);}if (!messageStoreConfig.isEnableDLegerCommitLog()) {if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());this.updateMasterHAServerAddrPeriodically = false;} else {this.updateMasterHAServerAddrPeriodically = true;}} else {this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.printMasterAndSlaveDiff();} catch (Throwable e) {log.error("schedule printMasterAndSlaveDiff error.", e);}}}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);}}if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {// Register a listener to reload SslContexttry {fileWatchService = new FileWatchService(new String[] {TlsSystemConfig.tlsServerCertPath,TlsSystemConfig.tlsServerKeyPath,TlsSystemConfig.tlsServerTrustCertPath},new FileWatchService.Listener() {boolean certChanged, keyChanged = false;@Overridepublic void onChanged(String path) {if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {log.info("The trust certificate changed, reload the ssl context");reloadServerSslContext();}if (path.equals(TlsSystemConfig.tlsServerCertPath)) {certChanged = true;}if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {keyChanged = true;}if (certChanged && keyChanged) {log.info("The certificate and private key changed, reload the ssl context");certChanged = keyChanged = false;reloadServerSslContext();}}private void reloadServerSslContext() {((NettyRemotingServer) remotingServer).loadSslContext();((NettyRemotingServer) fastRemotingServer).loadSslContext();}});} catch (Exception e) {log.warn("FileWatchService created error, can't load the certificate dynamically");}}initialTransaction();initialAcl();initialRpcHooks();}return result;}private void initialTransaction() {this.transactionalMessageService = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE_ID, TransactionalMessageService.class);if (null == this.transactionalMessageService) {this.transactionalMessageService = new TransactionalMessageServiceImpl(new TransactionalMessageBridge(this, this.getMessageStore()));log.warn("Load default transaction message hook service: {}", TransactionalMessageServiceImpl.class.getSimpleName());}this.transactionalMessageCheckListener = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_LISTENER_ID, AbstractTransactionalMessageCheckListener.class);if (null == this.transactionalMessageCheckListener) {this.transactionalMessageCheckListener = new DefaultTransactionalMessageCheckListener();log.warn("Load default discard message hook service: {}", DefaultTransactionalMessageCheckListener.class.getSimpleName());}this.transactionalMessageCheckListener.setBrokerController(this);this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);}private void initialAcl() {if (!this.brokerConfig.isAclEnable()) {log.info("The broker dose not enable acl");return;}List<AccessValidator> accessValidators = ServiceProvider.load(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class);if (accessValidators == null || accessValidators.isEmpty()) {log.info("The broker dose not load the AccessValidator");return;}for (AccessValidator accessValidator: accessValidators) {final AccessValidator validator = accessValidator;this.registerServerRPCHook(new RPCHook() {@Overridepublic void doBeforeRequest(String remoteAddr, RemotingCommand request) {//Do not catch the exceptionvalidator.validate(validator.parse(request, remoteAddr));}@Overridepublic void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {}});}}private void initialRpcHooks() {List<RPCHook> rpcHooks = ServiceProvider.load(ServiceProvider.RPC_HOOK_ID, RPCHook.class);if (rpcHooks == null || rpcHooks.isEmpty()) {return;}for (RPCHook rpcHook: rpcHooks) {this.registerServerRPCHook(rpcHook);}}public void registerProcessor() {/*** SendMessageProcessor*/SendMessageProcessor sendProcessor = new SendMessageProcessor(this);sendProcessor.registerSendMessageHook(sendMessageHookList);sendProcessor.registerConsumeMessageHook(consumeMessageHookList);this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);/*** PullMessageProcessor*/this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);/*** QueryMessageProcessor*/NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this);this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);/*** ClientManageProcessor*/ClientManageProcessor clientProcessor = new ClientManageProcessor(this);this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);/*** ConsumerManageProcessor*/ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this);this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);/*** EndTransactionProcessor*/this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);/*** Default*/AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this);this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);this.fastRemotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);}public BrokerStats getBrokerStats() {return brokerStats;}public void setBrokerStats(BrokerStats brokerStats) {this.brokerStats = brokerStats;}public void protectBroker() {if (this.brokerConfig.isDisableConsumeIfConsumerReadSlowly()) {final Iterator<Map.Entry<String, MomentStatsItem>> it = this.brokerStatsManager.getMomentStatsItemSetFallSize().getStatsItemTable().entrySet().iterator();while (it.hasNext()) {final Map.Entry<String, MomentStatsItem> next = it.next();final long fallBehindBytes = next.getValue().getValue().get();if (fallBehindBytes > this.brokerConfig.getConsumerFallbehindThreshold()) {final String[] split = next.getValue().getStatsKey().split("@");final String group = split[2];LOG_PROTECTION.info("[PROTECT_BROKER] the consumer[{}] consume slowly, {} bytes, disable it", group, fallBehindBytes);this.subscriptionGroupManager.disableConsume(group);}}}}public long headSlowTimeMills(BlockingQueue<Runnable> q) {long slowTimeMills = 0;final Runnable peek = q.peek();if (peek != null) {RequestTask rt = BrokerFastFailure.castRunnable(peek);slowTimeMills = rt == null ? 0 : this.messageStore.now() - rt.getCreateTimestamp();}if (slowTimeMills < 0) {slowTimeMills = 0;}return slowTimeMills;}public long headSlowTimeMills4SendThreadPoolQueue() {return this.headSlowTimeMills(this.sendThreadPoolQueue);}public long headSlowTimeMills4PullThreadPoolQueue() {return this.headSlowTimeMills(this.pullThreadPoolQueue);}public long headSlowTimeMills4QueryThreadPoolQueue() {return this.headSlowTimeMills(this.queryThreadPoolQueue);}public long headSlowTimeMills4EndTransactionThreadPoolQueue() {return this.headSlowTimeMills(this.endTransactionThreadPoolQueue);}public void printWaterMark() {LOG_WATER_MARK.info("[WATERMARK] Send Queue Size: {} SlowTimeMills: {}", this.sendThreadPoolQueue.size(), headSlowTimeMills4SendThreadPoolQueue());LOG_WATER_MARK.info("[WATERMARK] Pull Queue Size: {} SlowTimeMills: {}", this.pullThreadPoolQueue.size(), headSlowTimeMills4PullThreadPoolQueue());LOG_WATER_MARK.info("[WATERMARK] Query Queue Size: {} SlowTimeMills: {}", this.queryThreadPoolQueue.size(), headSlowTimeMills4QueryThreadPoolQueue());LOG_WATER_MARK.info("[WATERMARK] Transaction Queue Size: {} SlowTimeMills: {}", this.endTransactionThreadPoolQueue.size(), headSlowTimeMills4EndTransactionThreadPoolQueue());}public MessageStore getMessageStore() {return messageStore;}public void setMessageStore(MessageStore messageStore) {this.messageStore = messageStore;}private void printMasterAndSlaveDiff() {long diff = this.messageStore.slaveFallBehindMuch();// XXX: warn and notify melog.info("Slave fall behind master: {} bytes", diff);}public Broker2Client getBroker2Client() {return broker2Client;}public ConsumerManager getConsumerManager() {return consumerManager;}public ConsumerFilterManager getConsumerFilterManager() {return consumerFilterManager;}public ConsumerOffsetManager getConsumerOffsetManager() {return consumerOffsetManager;}public MessageStoreConfig getMessageStoreConfig() {return messageStoreConfig;}public ProducerManager getProducerManager() {return producerManager;}public void setFastRemotingServer(RemotingServer fastRemotingServer) {this.fastRemotingServer = fastRemotingServer;}public PullMessageProcessor getPullMessageProcessor() {return pullMessageProcessor;}public PullRequestHoldService getPullRequestHoldService() {return pullRequestHoldService;}public SubscriptionGroupManager getSubscriptionGroupManager() {return subscriptionGroupManager;}public void shutdown() {if (this.brokerStatsManager != null) {this.brokerStatsManager.shutdown();}if (this.clientHousekeepingService != null) {this.clientHousekeepingService.shutdown();}if (this.pullRequestHoldService != null) {this.pullRequestHoldService.shutdown();}if (this.remotingServer != null) {this.remotingServer.shutdown();}if (this.fastRemotingServer != null) {this.fastRemotingServer.shutdown();}if (this.fileWatchService != null) {this.fileWatchService.shutdown();}if (this.messageStore != null) {this.messageStore.shutdown();}this.scheduledExecutorService.shutdown();try {this.scheduledExecutorService.awaitTermination(5000, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {}this.unregisterBrokerAll();if (this.sendMessageExecutor != null) {this.sendMessageExecutor.shutdown();}if (this.pullMessageExecutor != null) {this.pullMessageExecutor.shutdown();}if (this.adminBrokerExecutor != null) {this.adminBrokerExecutor.shutdown();}if (this.brokerOuterAPI != null) {this.brokerOuterAPI.shutdown();}this.consumerOffsetManager.persist();if (this.filterServerManager != null) {this.filterServerManager.shutdown();}if (this.brokerFastFailure != null) {this.brokerFastFailure.shutdown();}if (this.consumerFilterManager != null) {this.consumerFilterManager.persist();}if (this.clientManageExecutor != null) {this.clientManageExecutor.shutdown();}if (this.queryMessageExecutor != null) {this.queryMessageExecutor.shutdown();}if (this.consumerManageExecutor != null) {this.consumerManageExecutor.shutdown();}if (this.fileWatchService != null) {this.fileWatchService.shutdown();}if (this.transactionalMessageCheckService != null) {this.transactionalMessageCheckService.shutdown(false);}if (this.endTransactionExecutor != null) {this.endTransactionExecutor.shutdown();}}private void unregisterBrokerAll() {this.brokerOuterAPI.unregisterBrokerAll(this.brokerConfig.getBrokerClusterName(),this.getBrokerAddr(),this.brokerConfig.getBrokerName(),this.brokerConfig.getBrokerId());}public String getBrokerAddr() {return this.brokerConfig.getBrokerIP1() + ":" + this.nettyServerConfig.getListenPort();}public void start() throws Exception {if (this.messageStore != null) {this.messageStore.start();}if (this.remotingServer != null) {this.remotingServer.start();}if (this.fastRemotingServer != null) {this.fastRemotingServer.start();}if (this.fileWatchService != null) {this.fileWatchService.start();}if (this.brokerOuterAPI != null) {this.brokerOuterAPI.start();}if (this.pullRequestHoldService != null) {this.pullRequestHoldService.start();}if (this.clientHousekeepingService != null) {this.clientHousekeepingService.start();}if (this.filterServerManager != null) {this.filterServerManager.start();}if (!messageStoreConfig.isEnableDLegerCommitLog()) {startProcessorByHa(messageStoreConfig.getBrokerRole());handleSlaveSynchronize(messageStoreConfig.getBrokerRole());}this.registerBrokerAll(true, false, true);this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());} catch (Throwable e) {log.error("registerBrokerAll Exception", e);}}}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);if (this.brokerStatsManager != null) {this.brokerStatsManager.start();}if (this.brokerFastFailure != null) {this.brokerFastFailure.start();}}public synchronized void registerIncrementBrokerData(TopicConfig topicConfig, DataVersion dataVersion) {TopicConfig registerTopicConfig = topicConfig;if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {registerTopicConfig =new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),this.brokerConfig.getBrokerPermission());}ConcurrentMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();topicConfigTable.put(topicConfig.getTopicName(), registerTopicConfig);TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();topicConfigSerializeWrapper.setDataVersion(dataVersion);topicConfigSerializeWrapper.setTopicConfigTable(topicConfigTable);doRegisterBrokerAll(true, false, topicConfigSerializeWrapper);}public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {TopicConfig tmp =new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),this.brokerConfig.getBrokerPermission());topicConfigTable.put(topicConfig.getTopicName(), tmp);}topicConfigWrapper.setTopicConfigTable(topicConfigTable);}if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),this.getBrokerAddr(),this.brokerConfig.getBrokerName(),this.brokerConfig.getBrokerId(),this.brokerConfig.getRegisterBrokerTimeoutMills())) {doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);}}private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,TopicConfigSerializeWrapper topicConfigWrapper) {List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(this.brokerConfig.getBrokerClusterName(),this.getBrokerAddr(),this.brokerConfig.getBrokerName(),this.brokerConfig.getBrokerId(),this.getHAServerAddr(),topicConfigWrapper,this.filterServerManager.buildNewFilterServerList(),oneway,this.brokerConfig.getRegisterBrokerTimeoutMills(),this.brokerConfig.isCompressedRegister());if (registerBrokerResultList.size() > 0) {RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);if (registerBrokerResult != null) {if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());}this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());if (checkOrderConfig) {this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());}}}}private boolean needRegister(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final int timeoutMills) {TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();List<Boolean> changeList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName, brokerId, topicConfigWrapper, timeoutMills);boolean needRegister = false;for (Boolean changed : changeList) {if (changed) {needRegister = true;break;}}return needRegister;}public TopicConfigManager getTopicConfigManager() {return topicConfigManager;}public void setTopicConfigManager(TopicConfigManager topicConfigManager) {this.topicConfigManager = topicConfigManager;}public String getHAServerAddr() {return this.brokerConfig.getBrokerIP2() + ":" + this.messageStoreConfig.getHaListenPort();}public RebalanceLockManager getRebalanceLockManager() {return rebalanceLockManager;}public SlaveSynchronize getSlaveSynchronize() {return slaveSynchronize;}public ExecutorService getPullMessageExecutor() {return pullMessageExecutor;}public void setPullMessageExecutor(ExecutorService pullMessageExecutor) {this.pullMessageExecutor = pullMessageExecutor;}public BlockingQueue<Runnable> getSendThreadPoolQueue() {return sendThreadPoolQueue;}public FilterServerManager getFilterServerManager() {return filterServerManager;}public BrokerStatsManager getBrokerStatsManager() {return brokerStatsManager;}public List<SendMessageHook> getSendMessageHookList() {return sendMessageHookList;}public void registerSendMessageHook(final SendMessageHook hook) {this.sendMessageHookList.add(hook);log.info("register SendMessageHook Hook, {}", hook.hookName());}public List<ConsumeMessageHook> getConsumeMessageHookList() {return consumeMessageHookList;}public void registerConsumeMessageHook(final ConsumeMessageHook hook) {this.consumeMessageHookList.add(hook);log.info("register ConsumeMessageHook Hook, {}", hook.hookName());}public void registerServerRPCHook(RPCHook rpcHook) {getRemotingServer().registerRPCHook(rpcHook);this.fastRemotingServer.registerRPCHook(rpcHook);}public RemotingServer getRemotingServer() {return remotingServer;}public void setRemotingServer(RemotingServer remotingServer) {this.remotingServer = remotingServer;}public void registerClientRPCHook(RPCHook rpcHook) {this.getBrokerOuterAPI().registerRPCHook(rpcHook);}public BrokerOuterAPI getBrokerOuterAPI() {return brokerOuterAPI;}public InetSocketAddress getStoreHost() {return storeHost;}public void setStoreHost(InetSocketAddress storeHost) {this.storeHost = storeHost;}public Configuration getConfiguration() {return this.configuration;}public BlockingQueue<Runnable> getHeartbeatThreadPoolQueue() {return heartbeatThreadPoolQueue;}public TransactionalMessageCheckService getTransactionalMessageCheckService() {return transactionalMessageCheckService;}public void setTransactionalMessageCheckService(TransactionalMessageCheckService transactionalMessageCheckService) {this.transactionalMessageCheckService = transactionalMessageCheckService;}public TransactionalMessageService getTransactionalMessageService() {return transactionalMessageService;}public void setTransactionalMessageService(TransactionalMessageService transactionalMessageService) {this.transactionalMessageService = transactionalMessageService;}public AbstractTransactionalMessageCheckListener getTransactionalMessageCheckListener() {return transactionalMessageCheckListener;}public void setTransactionalMessageCheckListener(AbstractTransactionalMessageCheckListener transactionalMessageCheckListener) {this.transactionalMessageCheckListener = transactionalMessageCheckListener;}public BlockingQueue<Runnable> getEndTransactionThreadPoolQueue() {return endTransactionThreadPoolQueue;}private void handleSlaveSynchronize(BrokerRole role) {if (role == BrokerRole.SLAVE) {if (null != slaveSyncFuture) {slaveSyncFuture.cancel(false);}this.slaveSynchronize.setMasterAddr(null);slaveSyncFuture = this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.slaveSynchronize.syncAll();}catch (Throwable e) {log.error("ScheduledTask SlaveSynchronize syncAll error.", e);}}}, 1000 * 3, 1000 * 10, TimeUnit.MILLISECONDS);} else {//handle the slave synchroniseif (null != slaveSyncFuture) {slaveSyncFuture.cancel(false);}this.slaveSynchronize.setMasterAddr(null);}}public void changeToSlave(int brokerId) {log.info("Begin to change to slave brokerName={} brokerId={}", brokerConfig.getBrokerName(), brokerId);//change the rolebrokerConfig.setBrokerId(brokerId == 0 ? 1 : brokerId); //TO DO checkmessageStoreConfig.setBrokerRole(BrokerRole.SLAVE);//handle the scheduled servicetry {this.messageStore.handleScheduleMessageService(BrokerRole.SLAVE);} catch (Throwable t) {log.error("[MONITOR] handleScheduleMessageService failed when changing to slave", t);}//handle the transactional servicetry {this.shutdownProcessorByHa();} catch (Throwable t) {log.error("[MONITOR] shutdownProcessorByHa failed when changing to slave", t);}//handle the slave synchronisehandleSlaveSynchronize(BrokerRole.SLAVE);try {this.registerBrokerAll(true, true, brokerConfig.isForceRegister());} catch (Throwable ignored) {}log.info("Finish to change to slave brokerName={} brokerId={}", brokerConfig.getBrokerName(), brokerId);}public void changeToMaster(BrokerRole role) {if (role == BrokerRole.SLAVE) {return;}log.info("Begin to change to master brokerName={}", brokerConfig.getBrokerName());//handle the slave synchronisehandleSlaveSynchronize(role);//handle the scheduled servicetry {this.messageStore.handleScheduleMessageService(role);} catch (Throwable t) {log.error("[MONITOR] handleScheduleMessageService failed when changing to master", t);}//handle the transactional servicetry {this.startProcessorByHa(BrokerRole.SYNC_MASTER);} catch (Throwable t) {log.error("[MONITOR] startProcessorByHa failed when changing to master", t);}//if the operations above are totally successful, we change to masterbrokerConfig.setBrokerId(0); //TO DO checkmessageStoreConfig.setBrokerRole(role);try {this.registerBrokerAll(true, true, brokerConfig.isForceRegister());} catch (Throwable ignored) {}log.info("Finish to change to master brokerName={}", brokerConfig.getBrokerName());}private void startProcessorByHa(BrokerRole role) {if (BrokerRole.SLAVE != role) {if (this.transactionalMessageCheckService != null) {this.transactionalMessageCheckService.start();}}}private void shutdownProcessorByHa() {if (this.transactionalMessageCheckService != null) {this.transactionalMessageCheckService.shutdown(true);}}}

5、org.apache.rocketmq.common.BrokerConfig.java 源码:

/*
D:\RocketMQ\rocketmq-master\common\src\main\java\org\apache\rocketmq\common\BrokerConfig.java* Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements.  See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with* the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
package org.apache.rocketmq.common;import java.net.InetAddress;
import java.net.UnknownHostException;
import org.apache.rocketmq.common.annotation.ImportantField;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingUtil;public class BrokerConfig {private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));@ImportantFieldprivate String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));@ImportantFieldprivate String brokerIP1 = RemotingUtil.getLocalAddress();private String brokerIP2 = RemotingUtil.getLocalAddress();@ImportantFieldprivate String brokerName = localHostName();@ImportantFieldprivate String brokerClusterName = "DefaultCluster";@ImportantFieldprivate long brokerId = MixAll.MASTER_ID;private int brokerPermission = PermName.PERM_READ | PermName.PERM_WRITE;private int defaultTopicQueueNums = 8;@ImportantFieldprivate boolean autoCreateTopicEnable = true;private boolean clusterTopicEnable = true;private boolean brokerTopicEnable = true;@ImportantFieldprivate boolean autoCreateSubscriptionGroup = true;private String messageStorePlugIn = "";@ImportantFieldprivate String msgTraceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC;@ImportantFieldprivate boolean traceTopicEnable = false;/*** thread numbers for send message thread pool, since spin lock will be used by default since 4.0.x, the default* value is 1.*/private int sendMessageThreadPoolNums = 1; //16 + Runtime.getRuntime().availableProcessors() * 4;private int pullMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2;private int queryMessageThreadPoolNums = 8 + Runtime.getRuntime().availableProcessors();private int adminBrokerThreadPoolNums = 16;private int clientManageThreadPoolNums = 32;private int consumerManageThreadPoolNums = 32;private int heartbeatThreadPoolNums = Math.min(32, Runtime.getRuntime().availableProcessors());/*** Thread numbers for EndTransactionProcessor*/private int endTransactionThreadPoolNums = 8 + Runtime.getRuntime().availableProcessors() * 2;private int flushConsumerOffsetInterval = 1000 * 5;private int flushConsumerOffsetHistoryInterval = 1000 * 60;@ImportantFieldprivate boolean rejectTransactionMessage = false;@ImportantFieldprivate boolean fetchNamesrvAddrByAddressServer = false;private int sendThreadPoolQueueCapacity = 10000;private int pullThreadPoolQueueCapacity = 100000;private int queryThreadPoolQueueCapacity = 20000;private int clientManagerThreadPoolQueueCapacity = 1000000;private int consumerManagerThreadPoolQueueCapacity = 1000000;private int heartbeatThreadPoolQueueCapacity = 50000;private int endTransactionPoolQueueCapacity = 100000;private int filterServerNums = 0;private boolean longPollingEnable = true;private long shortPollingTimeMills = 1000;private boolean notifyConsumerIdsChangedEnable = true;private boolean highSpeedMode = false;private boolean commercialEnable = true;private int commercialTimerCount = 1;private int commercialTransCount = 1;private int commercialBigCount = 1;private int commercialBaseCount = 1;private boolean transferMsgByHeap = true;private int maxDelayTime = 40;private String regionId = MixAll.DEFAULT_TRACE_REGION_ID;private int registerBrokerTimeoutMills = 6000;private boolean slaveReadEnable = false;private boolean disableConsumeIfConsumerReadSlowly = false;private long consumerFallbehindThreshold = 1024L * 1024 * 1024 * 16;private boolean brokerFastFailureEnable = true;private long waitTimeMillsInSendQueue = 200;private long waitTimeMillsInPullQueue = 5 * 1000;private long waitTimeMillsInHeartbeatQueue = 31 * 1000;private long waitTimeMillsInTransactionQueue = 3 * 1000;private long startAcceptSendRequestTimeStamp = 0L;private boolean traceOn = true;// Switch of filter bit map calculation.// If switch on:// 1. Calculate filter bit map when construct queue.// 2. Filter bit map will be saved to consume queue extend file if allowed.private boolean enableCalcFilterBitMap = false;// Expect num of consumers will use filter.private int expectConsumerNumUseFilter = 32;// Error rate of bloom filter, 1~100.private int maxErrorRateOfBloomFilter = 20;//how long to clean filter data after dead.Default: 24hprivate long filterDataCleanTimeSpan = 24 * 3600 * 1000;// whether do filter when retry.private boolean filterSupportRetry = false;private boolean enablePropertyFilter = false;private boolean compressedRegister = false;private boolean forceRegister = true;/*** This configurable item defines interval of topics registration of broker to name server. Allowing values are* between 10, 000 and 60, 000 milliseconds.*/private int registerNameServerPeriod = 1000 * 30;/*** The minimum time of the transactional message  to be checked firstly, one message only exceed this time interval* that can be checked.*/@ImportantFieldprivate long transactionTimeOut = 6 * 1000;/*** The maximum number of times the message was checked, if exceed this value, this message will be discarded.*/@ImportantFieldprivate int transactionCheckMax = 15;/*** Transaction message check interval.*/@ImportantFieldprivate long transactionCheckInterval = 60 * 1000;/*** Acl feature switch*/@ImportantFieldprivate boolean aclEnable = false;public static String localHostName() {try {return InetAddress.getLocalHost().getHostName();} catch (UnknownHostException e) {log.error("Failed to obtain the host name", e);}return "DEFAULT_BROKER";}public boolean isTraceOn() {return traceOn;}public void setTraceOn(final boolean traceOn) {this.traceOn = traceOn;}public long getStartAcceptSendRequestTimeStamp() {return startAcceptSendRequestTimeStamp;}public void setStartAcceptSendRequestTimeStamp(final long startAcceptSendRequestTimeStamp) {this.startAcceptSendRequestTimeStamp = startAcceptSendRequestTimeStamp;}public long getWaitTimeMillsInSendQueue() {return waitTimeMillsInSendQueue;}public void setWaitTimeMillsInSendQueue(final long waitTimeMillsInSendQueue) {this.waitTimeMillsInSendQueue = waitTimeMillsInSendQueue;}public long getConsumerFallbehindThreshold() {return consumerFallbehindThreshold;}public void setConsumerFallbehindThreshold(final long consumerFallbehindThreshold) {this.consumerFallbehindThreshold = consumerFallbehindThreshold;}public boolean isBrokerFastFailureEnable() {return brokerFastFailureEnable;}public void setBrokerFastFailureEnable(final boolean brokerFastFailureEnable) {this.brokerFastFailureEnable = brokerFastFailureEnable;}public long getWaitTimeMillsInPullQueue() {return waitTimeMillsInPullQueue;}public void setWaitTimeMillsInPullQueue(final long waitTimeMillsInPullQueue) {this.waitTimeMillsInPullQueue = waitTimeMillsInPullQueue;}public boolean isDisableConsumeIfConsumerReadSlowly() {return disableConsumeIfConsumerReadSlowly;}public void setDisableConsumeIfConsumerReadSlowly(final boolean disableConsumeIfConsumerReadSlowly) {this.disableConsumeIfConsumerReadSlowly = disableConsumeIfConsumerReadSlowly;}public boolean isSlaveReadEnable() {return slaveReadEnable;}public void setSlaveReadEnable(final boolean slaveReadEnable) {this.slaveReadEnable = slaveReadEnable;}public int getRegisterBrokerTimeoutMills() {return registerBrokerTimeoutMills;}public void setRegisterBrokerTimeoutMills(final int registerBrokerTimeoutMills) {this.registerBrokerTimeoutMills = registerBrokerTimeoutMills;}public String getRegionId() {return regionId;}public void setRegionId(final String regionId) {this.regionId = regionId;}public boolean isTransferMsgByHeap() {return transferMsgByHeap;}public void setTransferMsgByHeap(final boolean transferMsgByHeap) {this.transferMsgByHeap = transferMsgByHeap;}public String getMessageStorePlugIn() {return messageStorePlugIn;}public void setMessageStorePlugIn(String messageStorePlugIn) {this.messageStorePlugIn = messageStorePlugIn;}public boolean isHighSpeedMode() {return highSpeedMode;}public void setHighSpeedMode(final boolean highSpeedMode) {this.highSpeedMode = highSpeedMode;}public String getRocketmqHome() {return rocketmqHome;}public void setRocketmqHome(String rocketmqHome) {this.rocketmqHome = rocketmqHome;}public String getBrokerName() {return brokerName;}public void setBrokerName(String brokerName) {this.brokerName = brokerName;}public int getBrokerPermission() {return brokerPermission;}public void setBrokerPermission(int brokerPermission) {this.brokerPermission = brokerPermission;}public int getDefaultTopicQueueNums() {return defaultTopicQueueNums;}public void setDefaultTopicQueueNums(int defaultTopicQueueNums) {this.defaultTopicQueueNums = defaultTopicQueueNums;}public boolean isAutoCreateTopicEnable() {return autoCreateTopicEnable;}public void setAutoCreateTopicEnable(boolean autoCreateTopic) {this.autoCreateTopicEnable = autoCreateTopic;}public String getBrokerClusterName() {return brokerClusterName;}public void setBrokerClusterName(String brokerClusterName) {this.brokerClusterName = brokerClusterName;}public String getBrokerIP1() {return brokerIP1;}public void setBrokerIP1(String brokerIP1) {this.brokerIP1 = brokerIP1;}public String getBrokerIP2() {return brokerIP2;}public void setBrokerIP2(String brokerIP2) {this.brokerIP2 = brokerIP2;}public int getSendMessageThreadPoolNums() {return sendMessageThreadPoolNums;}public void setSendMessageThreadPoolNums(int sendMessageThreadPoolNums) {this.sendMessageThreadPoolNums = sendMessageThreadPoolNums;}public int getPullMessageThreadPoolNums() {return pullMessageThreadPoolNums;}public void setPullMessageThreadPoolNums(int pullMessageThreadPoolNums) {this.pullMessageThreadPoolNums = pullMessageThreadPoolNums;}public int getQueryMessageThreadPoolNums() {return queryMessageThreadPoolNums;}public void setQueryMessageThreadPoolNums(final int queryMessageThreadPoolNums) {this.queryMessageThreadPoolNums = queryMessageThreadPoolNums;}public int getAdminBrokerThreadPoolNums() {return adminBrokerThreadPoolNums;}public void setAdminBrokerThreadPoolNums(int adminBrokerThreadPoolNums) {this.adminBrokerThreadPoolNums = adminBrokerThreadPoolNums;}public int getFlushConsumerOffsetInterval() {return flushConsumerOffsetInterval;}public void setFlushConsumerOffsetInterval(int flushConsumerOffsetInterval) {this.flushConsumerOffsetInterval = flushConsumerOffsetInterval;}public int getFlushConsumerOffsetHistoryInterval() {return flushConsumerOffsetHistoryInterval;}public void setFlushConsumerOffsetHistoryInterval(int flushConsumerOffsetHistoryInterval) {this.flushConsumerOffsetHistoryInterval = flushConsumerOffsetHistoryInterval;}public boolean isClusterTopicEnable() {return clusterTopicEnable;}public void setClusterTopicEnable(boolean clusterTopicEnable) {this.clusterTopicEnable = clusterTopicEnable;}public String getNamesrvAddr() {return namesrvAddr;}public void setNamesrvAddr(String namesrvAddr) {this.namesrvAddr = namesrvAddr;}public long getBrokerId() {return brokerId;}public void setBrokerId(long brokerId) {this.brokerId = brokerId;}public boolean isAutoCreateSubscriptionGroup() {return autoCreateSubscriptionGroup;}public void setAutoCreateSubscriptionGroup(boolean autoCreateSubscriptionGroup) {this.autoCreateSubscriptionGroup = autoCreateSubscriptionGroup;}public boolean isRejectTransactionMessage() {return rejectTransactionMessage;}public void setRejectTransactionMessage(boolean rejectTransactionMessage) {this.rejectTransactionMessage = rejectTransactionMessage;}public boolean isFetchNamesrvAddrByAddressServer() {return fetchNamesrvAddrByAddressServer;}public void setFetchNamesrvAddrByAddressServer(boolean fetchNamesrvAddrByAddressServer) {this.fetchNamesrvAddrByAddressServer = fetchNamesrvAddrByAddressServer;}public int getSendThreadPoolQueueCapacity() {return sendThreadPoolQueueCapacity;}public void setSendThreadPoolQueueCapacity(int sendThreadPoolQueueCapacity) {this.sendThreadPoolQueueCapacity = sendThreadPoolQueueCapacity;}public int getPullThreadPoolQueueCapacity() {return pullThreadPoolQueueCapacity;}public void setPullThreadPoolQueueCapacity(int pullThreadPoolQueueCapacity) {this.pullThreadPoolQueueCapacity = pullThreadPoolQueueCapacity;}public int getQueryThreadPoolQueueCapacity() {return queryThreadPoolQueueCapacity;}public void setQueryThreadPoolQueueCapacity(final int queryThreadPoolQueueCapacity) {this.queryThreadPoolQueueCapacity = queryThreadPoolQueueCapacity;}public boolean isBrokerTopicEnable() {return brokerTopicEnable;}public void setBrokerTopicEnable(boolean brokerTopicEnable) {this.brokerTopicEnable = brokerTopicEnable;}public int getFilterServerNums() {return filterServerNums;}public void setFilterServerNums(int filterServerNums) {this.filterServerNums = filterServerNums;}public boolean isLongPollingEnable() {return longPollingEnable;}public void setLongPollingEnable(boolean longPollingEnable) {this.longPollingEnable = longPollingEnable;}public boolean isNotifyConsumerIdsChangedEnable() {return notifyConsumerIdsChangedEnable;}public void setNotifyConsumerIdsChangedEnable(boolean notifyConsumerIdsChangedEnable) {this.notifyConsumerIdsChangedEnable = notifyConsumerIdsChangedEnable;}public long getShortPollingTimeMills() {return shortPollingTimeMills;}public void setShortPollingTimeMills(long shortPollingTimeMills) {this.shortPollingTimeMills = shortPollingTimeMills;}public int getClientManageThreadPoolNums() {return clientManageThreadPoolNums;}public void setClientManageThreadPoolNums(int clientManageThreadPoolNums) {this.clientManageThreadPoolNums = clientManageThreadPoolNums;}public boolean isCommercialEnable() {return commercialEnable;}public void setCommercialEnable(final boolean commercialEnable) {this.commercialEnable = commercialEnable;}public int getCommercialTimerCount() {return commercialTimerCount;}public void setCommercialTimerCount(final int commercialTimerCount) {this.commercialTimerCount = commercialTimerCount;}public int getCommercialTransCount() {return commercialTransCount;}public void setCommercialTransCount(final int commercialTransCount) {this.commercialTransCount = commercialTransCount;}public int getCommercialBigCount() {return commercialBigCount;}public void setCommercialBigCount(final int commercialBigCount) {this.commercialBigCount = commercialBigCount;}public int getMaxDelayTime() {return maxDelayTime;}public void setMaxDelayTime(final int maxDelayTime) {this.maxDelayTime = maxDelayTime;}public int getClientManagerThreadPoolQueueCapacity() {return clientManagerThreadPoolQueueCapacity;}public void setClientManagerThreadPoolQueueCapacity(int clientManagerThreadPoolQueueCapacity) {this.clientManagerThreadPoolQueueCapacity = clientManagerThreadPoolQueueCapacity;}public int getConsumerManagerThreadPoolQueueCapacity() {return consumerManagerThreadPoolQueueCapacity;}public void setConsumerManagerThreadPoolQueueCapacity(int consumerManagerThreadPoolQueueCapacity) {this.consumerManagerThreadPoolQueueCapacity = consumerManagerThreadPoolQueueCapacity;}public int getConsumerManageThreadPoolNums() {return consumerManageThreadPoolNums;}public void setConsumerManageThreadPoolNums(int consumerManageThreadPoolNums) {this.consumerManageThreadPoolNums = consumerManageThreadPoolNums;}public int getCommercialBaseCount() {return commercialBaseCount;}public void setCommercialBaseCount(int commercialBaseCount) {this.commercialBaseCount = commercialBaseCount;}public boolean isEnableCalcFilterBitMap() {return enableCalcFilterBitMap;}public void setEnableCalcFilterBitMap(boolean enableCalcFilterBitMap) {this.enableCalcFilterBitMap = enableCalcFilterBitMap;}public int getExpectConsumerNumUseFilter() {return expectConsumerNumUseFilter;}public void setExpectConsumerNumUseFilter(int expectConsumerNumUseFilter) {this.expectConsumerNumUseFilter = expectConsumerNumUseFilter;}public int getMaxErrorRateOfBloomFilter() {return maxErrorRateOfBloomFilter;}public void setMaxErrorRateOfBloomFilter(int maxErrorRateOfBloomFilter) {this.maxErrorRateOfBloomFilter = maxErrorRateOfBloomFilter;}public long getFilterDataCleanTimeSpan() {return filterDataCleanTimeSpan;}public void setFilterDataCleanTimeSpan(long filterDataCleanTimeSpan) {this.filterDataCleanTimeSpan = filterDataCleanTimeSpan;}public boolean isFilterSupportRetry() {return filterSupportRetry;}public void setFilterSupportRetry(boolean filterSupportRetry) {this.filterSupportRetry = filterSupportRetry;}public boolean isEnablePropertyFilter() {return enablePropertyFilter;}public void setEnablePropertyFilter(boolean enablePropertyFilter) {this.enablePropertyFilter = enablePropertyFilter;}public boolean isCompressedRegister() {return compressedRegister;}public void setCompressedRegister(boolean compressedRegister) {this.compressedRegister = compressedRegister;}public boolean isForceRegister() {return forceRegister;}public void setForceRegister(boolean forceRegister) {this.forceRegister = forceRegister;}public int getHeartbeatThreadPoolQueueCapacity() {return heartbeatThreadPoolQueueCapacity;}public void setHeartbeatThreadPoolQueueCapacity(int heartbeatThreadPoolQueueCapacity) {this.heartbeatThreadPoolQueueCapacity = heartbeatThreadPoolQueueCapacity;}public int getHeartbeatThreadPoolNums() {return heartbeatThreadPoolNums;}public void setHeartbeatThreadPoolNums(int heartbeatThreadPoolNums) {this.heartbeatThreadPoolNums = heartbeatThreadPoolNums;}public long getWaitTimeMillsInHeartbeatQueue() {return waitTimeMillsInHeartbeatQueue;}public void setWaitTimeMillsInHeartbeatQueue(long waitTimeMillsInHeartbeatQueue) {this.waitTimeMillsInHeartbeatQueue = waitTimeMillsInHeartbeatQueue;}public int getRegisterNameServerPeriod() {return registerNameServerPeriod;}public void setRegisterNameServerPeriod(int registerNameServerPeriod) {this.registerNameServerPeriod = registerNameServerPeriod;}public long getTransactionTimeOut() {return transactionTimeOut;}public void setTransactionTimeOut(long transactionTimeOut) {this.transactionTimeOut = transactionTimeOut;}public int getTransactionCheckMax() {return transactionCheckMax;}public void setTransactionCheckMax(int transactionCheckMax) {this.transactionCheckMax = transactionCheckMax;}public long getTransactionCheckInterval() {return transactionCheckInterval;}public void setTransactionCheckInterval(long transactionCheckInterval) {this.transactionCheckInterval = transactionCheckInterval;}public int getEndTransactionThreadPoolNums() {return endTransactionThreadPoolNums;}public void setEndTransactionThreadPoolNums(int endTransactionThreadPoolNums) {this.endTransactionThreadPoolNums = endTransactionThreadPoolNums;}public int getEndTransactionPoolQueueCapacity() {return endTransactionPoolQueueCapacity;}public void setEndTransactionPoolQueueCapacity(int endTransactionPoolQueueCapacity) {this.endTransactionPoolQueueCapacity = endTransactionPoolQueueCapacity;}public long getWaitTimeMillsInTransactionQueue() {return waitTimeMillsInTransactionQueue;}public void setWaitTimeMillsInTransactionQueue(long waitTimeMillsInTransactionQueue) {this.waitTimeMillsInTransactionQueue = waitTimeMillsInTransactionQueue;}public String getMsgTraceTopicName() {return msgTraceTopicName;}public void setMsgTraceTopicName(String msgTraceTopicName) {this.msgTraceTopicName = msgTraceTopicName;}public boolean isTraceTopicEnable() {return traceTopicEnable;}public void setTraceTopicEnable(boolean traceTopicEnable) {this.traceTopicEnable = traceTopicEnable;}public boolean isAclEnable() {return aclEnable;}public void setAclEnable(boolean aclEnable) {this.aclEnable = aclEnable;}
}

6、org.apache.rocketmq.remoting.netty.NettyServerConfig.java 源码:

/*
D:\RocketMQ\rocketmq-master\remoting\src\main\java\org\apache\rocketmq\remoting\netty\NettyServerConfig.java* Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements.  See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with* the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
package org.apache.rocketmq.remoting.netty;public class NettyServerConfig implements Cloneable {private int listenPort = 8888;private int serverWorkerThreads = 8;private int serverCallbackExecutorThreads = 0;private int serverSelectorThreads = 3;private int serverOnewaySemaphoreValue = 256;private int serverAsyncSemaphoreValue = 64;private int serverChannelMaxIdleTimeSeconds = 120;private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;private boolean serverPooledByteBufAllocatorEnable = true;/*** make make install*** ../glibc-2.10.1/configure \ --prefix=/usr \ --with-headers=/usr/include \* --host=x86_64-linux-gnu \ --build=x86_64-pc-linux-gnu \ --without-gd*/private boolean useEpollNativeSelector = false;public int getListenPort() {return listenPort;}public void setListenPort(int listenPort) {this.listenPort = listenPort;}public int getServerWorkerThreads() {return serverWorkerThreads;}public void setServerWorkerThreads(int serverWorkerThreads) {this.serverWorkerThreads = serverWorkerThreads;}public int getServerSelectorThreads() {return serverSelectorThreads;}public void setServerSelectorThreads(int serverSelectorThreads) {this.serverSelectorThreads = serverSelectorThreads;}public int getServerOnewaySemaphoreValue() {return serverOnewaySemaphoreValue;}public void setServerOnewaySemaphoreValue(int serverOnewaySemaphoreValue) {this.serverOnewaySemaphoreValue = serverOnewaySemaphoreValue;}public int getServerCallbackExecutorThreads() {return serverCallbackExecutorThreads;}public void setServerCallbackExecutorThreads(int serverCallbackExecutorThreads) {this.serverCallbackExecutorThreads = serverCallbackExecutorThreads;}public int getServerAsyncSemaphoreValue() {return serverAsyncSemaphoreValue;}public void setServerAsyncSemaphoreValue(int serverAsyncSemaphoreValue) {this.serverAsyncSemaphoreValue = serverAsyncSemaphoreValue;}public int getServerChannelMaxIdleTimeSeconds() {return serverChannelMaxIdleTimeSeconds;}public void setServerChannelMaxIdleTimeSeconds(int serverChannelMaxIdleTimeSeconds) {this.serverChannelMaxIdleTimeSeconds = serverChannelMaxIdleTimeSeconds;}public int getServerSocketSndBufSize() {return serverSocketSndBufSize;}public void setServerSocketSndBufSize(int serverSocketSndBufSize) {this.serverSocketSndBufSize = serverSocketSndBufSize;}public int getServerSocketRcvBufSize() {return serverSocketRcvBufSize;}public void setServerSocketRcvBufSize(int serverSocketRcvBufSize) {this.serverSocketRcvBufSize = serverSocketRcvBufSize;}public boolean isServerPooledByteBufAllocatorEnable() {return serverPooledByteBufAllocatorEnable;}public void setServerPooledByteBufAllocatorEnable(boolean serverPooledByteBufAllocatorEnable) {this.serverPooledByteBufAllocatorEnable = serverPooledByteBufAllocatorEnable;}public boolean isUseEpollNativeSelector() {return useEpollNativeSelector;}public void setUseEpollNativeSelector(boolean useEpollNativeSelector) {this.useEpollNativeSelector = useEpollNativeSelector;}@Overridepublic Object clone() throws CloneNotSupportedException {return (NettyServerConfig) super.clone();}
}

7、org.apache.rocketmq.remoting.netty.NettyClientConfig.java 源码:

/*
D:\RocketMQ\rocketmq-master\remoting\src\main\java\org\apache\rocketmq\remoting\netty\NettyClientConfig.java* Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements.  See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with* the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
package org.apache.rocketmq.remoting.netty;public class NettyClientConfig {/*** Worker thread number*/private int clientWorkerThreads = 4;private int clientCallbackExecutorThreads = Runtime.getRuntime().availableProcessors();private int clientOnewaySemaphoreValue = NettySystemConfig.CLIENT_ONEWAY_SEMAPHORE_VALUE;private int clientAsyncSemaphoreValue = NettySystemConfig.CLIENT_ASYNC_SEMAPHORE_VALUE;private int connectTimeoutMillis = 3000;private long channelNotActiveInterval = 1000 * 60;/*** IdleStateEvent will be triggered when neither read nor write was performed for* the specified period of this time. Specify {@code 0} to disable*/private int clientChannelMaxIdleTimeSeconds = 120;private int clientSocketSndBufSize = NettySystemConfig.socketSndbufSize;private int clientSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;private boolean clientPooledByteBufAllocatorEnable = false;private boolean clientCloseSocketIfTimeout = false;private boolean useTLS;public boolean isClientCloseSocketIfTimeout() {return clientCloseSocketIfTimeout;}public void setClientCloseSocketIfTimeout(final boolean clientCloseSocketIfTimeout) {this.clientCloseSocketIfTimeout = clientCloseSocketIfTimeout;}public int getClientWorkerThreads() {return clientWorkerThreads;}public void setClientWorkerThreads(int clientWorkerThreads) {this.clientWorkerThreads = clientWorkerThreads;}public int getClientOnewaySemaphoreValue() {return clientOnewaySemaphoreValue;}public void setClientOnewaySemaphoreValue(int clientOnewaySemaphoreValue) {this.clientOnewaySemaphoreValue = clientOnewaySemaphoreValue;}public int getConnectTimeoutMillis() {return connectTimeoutMillis;}public void setConnectTimeoutMillis(int connectTimeoutMillis) {this.connectTimeoutMillis = connectTimeoutMillis;}public int getClientCallbackExecutorThreads() {return clientCallbackExecutorThreads;}public void setClientCallbackExecutorThreads(int clientCallbackExecutorThreads) {this.clientCallbackExecutorThreads = clientCallbackExecutorThreads;}public long getChannelNotActiveInterval() {return channelNotActiveInterval;}public void setChannelNotActiveInterval(long channelNotActiveInterval) {this.channelNotActiveInterval = channelNotActiveInterval;}public int getClientAsyncSemaphoreValue() {return clientAsyncSemaphoreValue;}public void setClientAsyncSemaphoreValue(int clientAsyncSemaphoreValue) {this.clientAsyncSemaphoreValue = clientAsyncSemaphoreValue;}public int getClientChannelMaxIdleTimeSeconds() {return clientChannelMaxIdleTimeSeconds;}public void setClientChannelMaxIdleTimeSeconds(int clientChannelMaxIdleTimeSeconds) {this.clientChannelMaxIdleTimeSeconds = clientChannelMaxIdleTimeSeconds;}public int getClientSocketSndBufSize() {return clientSocketSndBufSize;}public void setClientSocketSndBufSize(int clientSocketSndBufSize) {this.clientSocketSndBufSize = clientSocketSndBufSize;}public int getClientSocketRcvBufSize() {return clientSocketRcvBufSize;}public void setClientSocketRcvBufSize(int clientSocketRcvBufSize) {this.clientSocketRcvBufSize = clientSocketRcvBufSize;}public boolean isClientPooledByteBufAllocatorEnable() {return clientPooledByteBufAllocatorEnable;}public void setClientPooledByteBufAllocatorEnable(boolean clientPooledByteBufAllocatorEnable) {this.clientPooledByteBufAllocatorEnable = clientPooledByteBufAllocatorEnable;}public boolean isUseTLS() {return useTLS;}public void setUseTLS(boolean useTLS) {this.useTLS = useTLS;}
}

8、org.apache.rocketmq.store.config.MessageStoreConfig.java 源码:

/*
D:\RocketMQ\rocketmq-master\store\src\main\java\org\apache\rocketmq\store\config\MessageStoreConfig.java* Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements.  See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with* the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
package org.apache.rocketmq.store.config;import java.io.File;
import org.apache.rocketmq.common.annotation.ImportantField;
import org.apache.rocketmq.store.ConsumeQueue;public class MessageStoreConfig {//The root directory in which the log data is kept@ImportantFieldprivate String storePathRootDir = System.getProperty("user.home") + File.separator + "store";//The directory in which the commitlog is kept@ImportantFieldprivate String storePathCommitLog = System.getProperty("user.home") + File.separator + "store"+ File.separator + "commitlog";// CommitLog file size,default is 1Gprivate int mapedFileSizeCommitLog = 1024 * 1024 * 1024;// ConsumeQueue file size,default is 30Wprivate int mapedFileSizeConsumeQueue = 300000 * ConsumeQueue.CQ_STORE_UNIT_SIZE;// enable consume queue extprivate boolean enableConsumeQueueExt = false;// ConsumeQueue extend file size, 48Mprivate int mappedFileSizeConsumeQueueExt = 48 * 1024 * 1024;// Bit count of filter bit map.// this will be set by pipe of calculate filter bit map.private int bitMapLengthConsumeQueueExt = 64;// CommitLog flush interval// flush data to disk@ImportantFieldprivate int flushIntervalCommitLog = 500;// Only used if TransientStorePool enabled// flush data to FileChannel@ImportantFieldprivate int commitIntervalCommitLog = 200;/*** introduced since 4.0.x. Determine whether to use mutex reentrantLock when putting message.<br/>* By default it is set to false indicating using spin lock when putting message.*/private boolean useReentrantLockWhenPutMessage = false;// Whether schedule flush,default is real-time@ImportantFieldprivate boolean flushCommitLogTimed = false;// ConsumeQueue flush intervalprivate int flushIntervalConsumeQueue = 1000;// Resource reclaim intervalprivate int cleanResourceInterval = 10000;// CommitLog removal intervalprivate int deleteCommitLogFilesInterval = 100;// ConsumeQueue removal intervalprivate int deleteConsumeQueueFilesInterval = 100;private int destroyMapedFileIntervalForcibly = 1000 * 120;private int redeleteHangedFileInterval = 1000 * 120;// When to delete,default is at 4 am@ImportantFieldprivate String deleteWhen = "04";private int diskMaxUsedSpaceRatio = 75;// The number of hours to keep a log file before deleting it (in hours)@ImportantFieldprivate int fileReservedTime = 72;// Flow control for ConsumeQueueprivate int putMsgIndexHightWater = 600000;// The maximum size of a single log file,default is 512Kprivate int maxMessageSize = 1024 * 1024 * 4;// Whether check the CRC32 of the records consumed.// This ensures no on-the-wire or on-disk corruption to the messages occurred.// This check adds some overhead,so it may be disabled in cases seeking extreme performance.private boolean checkCRCOnRecover = true;// How many pages are to be flushed when flush CommitLogprivate int flushCommitLogLeastPages = 4;// How many pages are to be committed when commit data to fileprivate int commitCommitLogLeastPages = 4;// Flush page size when the disk in warming stateprivate int flushLeastPagesWhenWarmMapedFile = 1024 / 4 * 16;// How many pages are to be flushed when flush ConsumeQueueprivate int flushConsumeQueueLeastPages = 2;private int flushCommitLogThoroughInterval = 1000 * 10;private int commitCommitLogThoroughInterval = 200;private int flushConsumeQueueThoroughInterval = 1000 * 60;@ImportantFieldprivate int maxTransferBytesOnMessageInMemory = 1024 * 256;@ImportantFieldprivate int maxTransferCountOnMessageInMemory = 32;@ImportantFieldprivate int maxTransferBytesOnMessageInDisk = 1024 * 64;@ImportantFieldprivate int maxTransferCountOnMessageInDisk = 8;@ImportantFieldprivate int accessMessageInMemoryMaxRatio = 40;@ImportantFieldprivate boolean messageIndexEnable = true;private int maxHashSlotNum = 5000000;private int maxIndexNum = 5000000 * 4;private int maxMsgsNumBatch = 64;@ImportantFieldprivate boolean messageIndexSafe = false;private int haListenPort = 10912;private int haSendHeartbeatInterval = 1000 * 5;private int haHousekeepingInterval = 1000 * 20;private int haTransferBatchSize = 1024 * 32;@ImportantFieldprivate String haMasterAddress = null;private int haSlaveFallbehindMax = 1024 * 1024 * 256;@ImportantFieldprivate BrokerRole brokerRole = BrokerRole.ASYNC_MASTER;@ImportantFieldprivate FlushDiskType flushDiskType = FlushDiskType.ASYNC_FLUSH;private int syncFlushTimeout = 1000 * 5;private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";private long flushDelayOffsetInterval = 1000 * 10;@ImportantFieldprivate boolean cleanFileForciblyEnable = true;private boolean warmMapedFileEnable = false;private boolean offsetCheckInSlave = false;private boolean debugLockEnable = false;private boolean duplicationEnable = false;private boolean diskFallRecorded = true;private long osPageCacheBusyTimeOutMills = 1000;private int defaultQueryMaxNum = 32;@ImportantFieldprivate boolean transientStorePoolEnable = false;private int transientStorePoolSize = 5;private boolean fastFailIfNoBufferInStorePool = false;private boolean enableDLegerCommitLog = false;private String dLegerGroup;private String dLegerPeers;private String dLegerSelfId;public boolean isDebugLockEnable() {return debugLockEnable;}public void setDebugLockEnable(final boolean debugLockEnable) {this.debugLockEnable = debugLockEnable;}public boolean isDuplicationEnable() {return duplicationEnable;}public void setDuplicationEnable(final boolean duplicationEnable) {this.duplicationEnable = duplicationEnable;}public long getOsPageCacheBusyTimeOutMills() {return osPageCacheBusyTimeOutMills;}public void setOsPageCacheBusyTimeOutMills(final long osPageCacheBusyTimeOutMills) {this.osPageCacheBusyTimeOutMills = osPageCacheBusyTimeOutMills;}public boolean isDiskFallRecorded() {return diskFallRecorded;}public void setDiskFallRecorded(final boolean diskFallRecorded) {this.diskFallRecorded = diskFallRecorded;}public boolean isWarmMapedFileEnable() {return warmMapedFileEnable;}public void setWarmMapedFileEnable(boolean warmMapedFileEnable) {this.warmMapedFileEnable = warmMapedFileEnable;}public int getMapedFileSizeCommitLog() {return mapedFileSizeCommitLog;}public void setMapedFileSizeCommitLog(int mapedFileSizeCommitLog) {this.mapedFileSizeCommitLog = mapedFileSizeCommitLog;}public int getMapedFileSizeConsumeQueue() {int factor = (int) Math.ceil(this.mapedFileSizeConsumeQueue / (ConsumeQueue.CQ_STORE_UNIT_SIZE * 1.0));return (int) (factor * ConsumeQueue.CQ_STORE_UNIT_SIZE);}public void setMapedFileSizeConsumeQueue(int mapedFileSizeConsumeQueue) {this.mapedFileSizeConsumeQueue = mapedFileSizeConsumeQueue;}public boolean isEnableConsumeQueueExt() {return enableConsumeQueueExt;}public void setEnableConsumeQueueExt(boolean enableConsumeQueueExt) {this.enableConsumeQueueExt = enableConsumeQueueExt;}public int getMappedFileSizeConsumeQueueExt() {return mappedFileSizeConsumeQueueExt;}public void setMappedFileSizeConsumeQueueExt(int mappedFileSizeConsumeQueueExt) {this.mappedFileSizeConsumeQueueExt = mappedFileSizeConsumeQueueExt;}public int getBitMapLengthConsumeQueueExt() {return bitMapLengthConsumeQueueExt;}public void setBitMapLengthConsumeQueueExt(int bitMapLengthConsumeQueueExt) {this.bitMapLengthConsumeQueueExt = bitMapLengthConsumeQueueExt;}public int getFlushIntervalCommitLog() {return flushIntervalCommitLog;}public void setFlushIntervalCommitLog(int flushIntervalCommitLog) {this.flushIntervalCommitLog = flushIntervalCommitLog;}public int getFlushIntervalConsumeQueue() {return flushIntervalConsumeQueue;}public void setFlushIntervalConsumeQueue(int flushIntervalConsumeQueue) {this.flushIntervalConsumeQueue = flushIntervalConsumeQueue;}public int getPutMsgIndexHightWater() {return putMsgIndexHightWater;}public void setPutMsgIndexHightWater(int putMsgIndexHightWater) {this.putMsgIndexHightWater = putMsgIndexHightWater;}public int getCleanResourceInterval() {return cleanResourceInterval;}public void setCleanResourceInterval(int cleanResourceInterval) {this.cleanResourceInterval = cleanResourceInterval;}public int getMaxMessageSize() {return maxMessageSize;}public void setMaxMessageSize(int maxMessageSize) {this.maxMessageSize = maxMessageSize;}public boolean isCheckCRCOnRecover() {return checkCRCOnRecover;}public boolean getCheckCRCOnRecover() {return checkCRCOnRecover;}public void setCheckCRCOnRecover(boolean checkCRCOnRecover) {this.checkCRCOnRecover = checkCRCOnRecover;}public String getStorePathCommitLog() {return storePathCommitLog;}public void setStorePathCommitLog(String storePathCommitLog) {this.storePathCommitLog = storePathCommitLog;}public String getDeleteWhen() {return deleteWhen;}public void setDeleteWhen(String deleteWhen) {this.deleteWhen = deleteWhen;}public int getDiskMaxUsedSpaceRatio() {if (this.diskMaxUsedSpaceRatio < 10)return 10;if (this.diskMaxUsedSpaceRatio > 95)return 95;return diskMaxUsedSpaceRatio;}public void setDiskMaxUsedSpaceRatio(int diskMaxUsedSpaceRatio) {this.diskMaxUsedSpaceRatio = diskMaxUsedSpaceRatio;}public int getDeleteCommitLogFilesInterval() {return deleteCommitLogFilesInterval;}public void setDeleteCommitLogFilesInterval(int deleteCommitLogFilesInterval) {this.deleteCommitLogFilesInterval = deleteCommitLogFilesInterval;}public int getDeleteConsumeQueueFilesInterval() {return deleteConsumeQueueFilesInterval;}public void setDeleteConsumeQueueFilesInterval(int deleteConsumeQueueFilesInterval) {this.deleteConsumeQueueFilesInterval = deleteConsumeQueueFilesInterval;}public int getMaxTransferBytesOnMessageInMemory() {return maxTransferBytesOnMessageInMemory;}public void setMaxTransferBytesOnMessageInMemory(int maxTransferBytesOnMessageInMemory) {this.maxTransferBytesOnMessageInMemory = maxTransferBytesOnMessageInMemory;}public int getMaxTransferCountOnMessageInMemory() {return maxTransferCountOnMessageInMemory;}public void setMaxTransferCountOnMessageInMemory(int maxTransferCountOnMessageInMemory) {this.maxTransferCountOnMessageInMemory = maxTransferCountOnMessageInMemory;}public int getMaxTransferBytesOnMessageInDisk() {return maxTransferBytesOnMessageInDisk;}public void setMaxTransferBytesOnMessageInDisk(int maxTransferBytesOnMessageInDisk) {this.maxTransferBytesOnMessageInDisk = maxTransferBytesOnMessageInDisk;}public int getMaxTransferCountOnMessageInDisk() {return maxTransferCountOnMessageInDisk;}public void setMaxTransferCountOnMessageInDisk(int maxTransferCountOnMessageInDisk) {this.maxTransferCountOnMessageInDisk = maxTransferCountOnMessageInDisk;}public int getFlushCommitLogLeastPages() {return flushCommitLogLeastPages;}public void setFlushCommitLogLeastPages(int flushCommitLogLeastPages) {this.flushCommitLogLeastPages = flushCommitLogLeastPages;}public int getFlushConsumeQueueLeastPages() {return flushConsumeQueueLeastPages;}public void setFlushConsumeQueueLeastPages(int flushConsumeQueueLeastPages) {this.flushConsumeQueueLeastPages = flushConsumeQueueLeastPages;}public int getFlushCommitLogThoroughInterval() {return flushCommitLogThoroughInterval;}public void setFlushCommitLogThoroughInterval(int flushCommitLogThoroughInterval) {this.flushCommitLogThoroughInterval = flushCommitLogThoroughInterval;}public int getFlushConsumeQueueThoroughInterval() {return flushConsumeQueueThoroughInterval;}public void setFlushConsumeQueueThoroughInterval(int flushConsumeQueueThoroughInterval) {this.flushConsumeQueueThoroughInterval = flushConsumeQueueThoroughInterval;}public int getDestroyMapedFileIntervalForcibly() {return destroyMapedFileIntervalForcibly;}public void setDestroyMapedFileIntervalForcibly(int destroyMapedFileIntervalForcibly) {this.destroyMapedFileIntervalForcibly = destroyMapedFileIntervalForcibly;}public int getFileReservedTime() {return fileReservedTime;}public void setFileReservedTime(int fileReservedTime) {this.fileReservedTime = fileReservedTime;}public int getRedeleteHangedFileInterval() {return redeleteHangedFileInterval;}public void setRedeleteHangedFileInterval(int redeleteHangedFileInterval) {this.redeleteHangedFileInterval = redeleteHangedFileInterval;}public int getAccessMessageInMemoryMaxRatio() {return accessMessageInMemoryMaxRatio;}public void setAccessMessageInMemoryMaxRatio(int accessMessageInMemoryMaxRatio) {this.accessMessageInMemoryMaxRatio = accessMessageInMemoryMaxRatio;}public boolean isMessageIndexEnable() {return messageIndexEnable;}public void setMessageIndexEnable(boolean messageIndexEnable) {this.messageIndexEnable = messageIndexEnable;}public int getMaxHashSlotNum() {return maxHashSlotNum;}public void setMaxHashSlotNum(int maxHashSlotNum) {this.maxHashSlotNum = maxHashSlotNum;}public int getMaxIndexNum() {return maxIndexNum;}public void setMaxIndexNum(int maxIndexNum) {this.maxIndexNum = maxIndexNum;}public int getMaxMsgsNumBatch() {return maxMsgsNumBatch;}public void setMaxMsgsNumBatch(int maxMsgsNumBatch) {this.maxMsgsNumBatch = maxMsgsNumBatch;}public int getHaListenPort() {return haListenPort;}public void setHaListenPort(int haListenPort) {this.haListenPort = haListenPort;}public int getHaSendHeartbeatInterval() {return haSendHeartbeatInterval;}public void setHaSendHeartbeatInterval(int haSendHeartbeatInterval) {this.haSendHeartbeatInterval = haSendHeartbeatInterval;}public int getHaHousekeepingInterval() {return haHousekeepingInterval;}public void setHaHousekeepingInterval(int haHousekeepingInterval) {this.haHousekeepingInterval = haHousekeepingInterval;}public BrokerRole getBrokerRole() {return brokerRole;}public void setBrokerRole(BrokerRole brokerRole) {this.brokerRole = brokerRole;}public void setBrokerRole(String brokerRole) {this.brokerRole = BrokerRole.valueOf(brokerRole);}public int getHaTransferBatchSize() {return haTransferBatchSize;}public void setHaTransferBatchSize(int haTransferBatchSize) {this.haTransferBatchSize = haTransferBatchSize;}public int getHaSlaveFallbehindMax() {return haSlaveFallbehindMax;}public void setHaSlaveFallbehindMax(int haSlaveFallbehindMax) {this.haSlaveFallbehindMax = haSlaveFallbehindMax;}public FlushDiskType getFlushDiskType() {return flushDiskType;}public void setFlushDiskType(FlushDiskType flushDiskType) {this.flushDiskType = flushDiskType;}public void setFlushDiskType(String type) {this.flushDiskType = FlushDiskType.valueOf(type);}public int getSyncFlushTimeout() {return syncFlushTimeout;}public void setSyncFlushTimeout(int syncFlushTimeout) {this.syncFlushTimeout = syncFlushTimeout;}public String getHaMasterAddress() {return haMasterAddress;}public void setHaMasterAddress(String haMasterAddress) {this.haMasterAddress = haMasterAddress;}public String getMessageDelayLevel() {return messageDelayLevel;}public void setMessageDelayLevel(String messageDelayLevel) {this.messageDelayLevel = messageDelayLevel;}public long getFlushDelayOffsetInterval() {return flushDelayOffsetInterval;}public void setFlushDelayOffsetInterval(long flushDelayOffsetInterval) {this.flushDelayOffsetInterval = flushDelayOffsetInterval;}public boolean isCleanFileForciblyEnable() {return cleanFileForciblyEnable;}public void setCleanFileForciblyEnable(boolean cleanFileForciblyEnable) {this.cleanFileForciblyEnable = cleanFileForciblyEnable;}public boolean isMessageIndexSafe() {return messageIndexSafe;}public void setMessageIndexSafe(boolean messageIndexSafe) {this.messageIndexSafe = messageIndexSafe;}public boolean isFlushCommitLogTimed() {return flushCommitLogTimed;}public void setFlushCommitLogTimed(boolean flushCommitLogTimed) {this.flushCommitLogTimed = flushCommitLogTimed;}public String getStorePathRootDir() {return storePathRootDir;}public void setStorePathRootDir(String storePathRootDir) {this.storePathRootDir = storePathRootDir;}public int getFlushLeastPagesWhenWarmMapedFile() {return flushLeastPagesWhenWarmMapedFile;}public void setFlushLeastPagesWhenWarmMapedFile(int flushLeastPagesWhenWarmMapedFile) {this.flushLeastPagesWhenWarmMapedFile = flushLeastPagesWhenWarmMapedFile;}public boolean isOffsetCheckInSlave() {return offsetCheckInSlave;}public void setOffsetCheckInSlave(boolean offsetCheckInSlave) {this.offsetCheckInSlave = offsetCheckInSlave;}public int getDefaultQueryMaxNum() {return defaultQueryMaxNum;}public void setDefaultQueryMaxNum(int defaultQueryMaxNum) {this.defaultQueryMaxNum = defaultQueryMaxNum;}/*** Enable transient commitLog store pool only if transientStorePoolEnable is true and the FlushDiskType is* ASYNC_FLUSH** @return <tt>true</tt> or <tt>false</tt>*/public boolean isTransientStorePoolEnable() {return transientStorePoolEnable && FlushDiskType.ASYNC_FLUSH == getFlushDiskType()&& BrokerRole.SLAVE != getBrokerRole();}public void setTransientStorePoolEnable(final boolean transientStorePoolEnable) {this.transientStorePoolEnable = transientStorePoolEnable;}public int getTransientStorePoolSize() {return transientStorePoolSize;}public void setTransientStorePoolSize(final int transientStorePoolSize) {this.transientStorePoolSize = transientStorePoolSize;}public int getCommitIntervalCommitLog() {return commitIntervalCommitLog;}public void setCommitIntervalCommitLog(final int commitIntervalCommitLog) {this.commitIntervalCommitLog = commitIntervalCommitLog;}public boolean isFastFailIfNoBufferInStorePool() {return fastFailIfNoBufferInStorePool;}public void setFastFailIfNoBufferInStorePool(final boolean fastFailIfNoBufferInStorePool) {this.fastFailIfNoBufferInStorePool = fastFailIfNoBufferInStorePool;}public boolean isUseReentrantLockWhenPutMessage() {return useReentrantLockWhenPutMessage;}public void setUseReentrantLockWhenPutMessage(final boolean useReentrantLockWhenPutMessage) {this.useReentrantLockWhenPutMessage = useReentrantLockWhenPutMessage;}public int getCommitCommitLogLeastPages() {return commitCommitLogLeastPages;}public void setCommitCommitLogLeastPages(final int commitCommitLogLeastPages) {this.commitCommitLogLeastPages = commitCommitLogLeastPages;}public int getCommitCommitLogThoroughInterval() {return commitCommitLogThoroughInterval;}public void setCommitCommitLogThoroughInterval(final int commitCommitLogThoroughInterval) {this.commitCommitLogThoroughInterval = commitCommitLogThoroughInterval;}public String getdLegerGroup() {return dLegerGroup;}public void setdLegerGroup(String dLegerGroup) {this.dLegerGroup = dLegerGroup;}public String getdLegerPeers() {return dLegerPeers;}public void setdLegerPeers(String dLegerPeers) {this.dLegerPeers = dLegerPeers;}public String getdLegerSelfId() {return dLegerSelfId;}public void setdLegerSelfId(String dLegerSelfId) {this.dLegerSelfId = dLegerSelfId;}public boolean isEnableDLegerCommitLog() {return enableDLegerCommitLog;}public void setEnableDLegerCommitLog(boolean enableDLegerCommitLog) {this.enableDLegerCommitLog = enableDLegerCommitLog;}
}

二、 消息中间件 RocketMQ 源码分析:路由注册之发送心跳包

1、路由注册 示例图:

在这里插入图片描述

2、路由注册:发送心跳包

RocketMQ 路由注册是通过 Broker 与 NameServer 的心跳功能实现的。Broker 启动时向集群中所有的 NameServer 发送心跳信息,每隔30s向集群中所有 NameServer 发送心跳包,NameServer 收到心跳包时会更新 brokerLiveTable 缓存中 BrokerLiveInfo 的 lastUpdataTimeStamp 信息,然后 NameServer 每隔10s扫描 brokerLiveTable,如果连续120S没有收到心跳包,NameServer 将移除 Broker 的路由信息同时关闭 Socket 连接。

3、代码:BrokerController#start


//注册Broker信息
this.registerBrokerAll(true, false, true);
//每隔30s上报Broker信息到NameServer
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());} catch (Throwable e) {log.error("registerBrokerAll Exception", e);}}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

4、代码:BrokerOuterAPI#registerBrokerAll


//获得nameServer地址信息
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
//遍历所有nameserver列表
if (nameServerAddressList != null && nameServerAddressList.size() > 0) {//封装请求头final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();requestHeader.setBrokerAddr(brokerAddr);requestHeader.setBrokerId(brokerId);requestHeader.setBrokerName(brokerName);requestHeader.setClusterName(clusterName);requestHeader.setHaServerAddr(haServerAddr);requestHeader.setCompressed(compressed);//封装请求体RegisterBrokerBody requestBody = new RegisterBrokerBody();requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);requestBody.setFilterServerList(filterServerList);final byte[] body = requestBody.encode(compressed);final int bodyCrc32 = UtilAll.crc32(body);requestHeader.setBodyCrc32(bodyCrc32);final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());for (final String namesrvAddr : nameServerAddressList) {brokerOuterExecutor.execute(new Runnable() {@Overridepublic void run() {try {//分别向NameServer注册RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);if (result != null) {registerBrokerResultList.add(result);}log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);} catch (Exception e) {log.warn("registerBroker Exception, {}", namesrvAddr, e);} finally {countDownLatch.countDown();}}});}try {countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {}
}

5、代码:BrokerOutAPI#registerBroker


if (oneway) {try {this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);} catch (RemotingTooMuchRequestException e) {// Ignore}return null;
}
RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);

三、 消息中间件 RocketMQ 源码分析:路由注册之处理请求包

1、路由注册之处理请求包 示例图:

org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor网路处理类解析请求类型,如果请求类型是为REGISTER_BROKER,则将请求转发到RouteInfoManager#regiesterBroker

在这里插入图片描述

2、代码:DefaultRequestProcessor#processRequest


//判断是注册Broker信息
case RequestCode.REGISTER_BROKER:Version brokerVersion = MQVersion.value2Version(request.getVersion());if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {return this.registerBrokerWithFilterServer(ctx, request);} else {//注册Broker信息return this.registerBroker(ctx, request);}

3、代码:DefaultRequestProcessor#registerBroker


RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(requestHeader.getClusterName(),requestHeader.getBrokerAddr(),requestHeader.getBrokerName(),requestHeader.getBrokerId(),requestHeader.getHaServerAddr(),topicConfigWrapper,null,ctx.channel()
);

4、代码:RouteInfoManager#registerBroker

维护路由信息


//加锁
this.lock.writeLock().lockInterruptibly();
//维护clusterAddrTable
Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
if (null == brokerNames) {brokerNames = new HashSet<String>();this.clusterAddrTable.put(clusterName, brokerNames);
}
brokerNames.add(brokerName);

5、维护 brokerAddrTable


//维护brokerAddrTable
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
//第一次注册,则创建brokerData
if (null == brokerData) {registerFirst = true;brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());this.brokerAddrTable.put(brokerName, brokerData);
}
//非第一次注册,更新Broker
Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
while (it.hasNext()) {Entry<Long, String> item = it.next();if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {it.remove();}
}
String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
registerFirst = registerFirst || (null == oldAddr);

6、维护 topicQueueTable


//维护topicQueueTable
if (null != topicConfigWrapper && MixAll.MASTER_ID == brokerId) {if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) {ConcurrentMap<String, TopicConfig> tcTable = topicConfigWrapper.getTopicConfigTable();if (tcTable != null) {for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {this.createAndUpdateQueueData(brokerName, entry.getValue());}}}
}

7、代码:RouteInfoManager#createAndUpdateQueueData


private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) {//创建QueueDataQueueData queueData = new QueueData();queueData.setBrokerName(brokerName);queueData.setWriteQueueNums(topicConfig.getWriteQueueNums());queueData.setReadQueueNums(topicConfig.getReadQueueNums());queueData.setPerm(topicConfig.getPerm());queueData.setTopicSynFlag(topicConfig.getTopicSysFlag());//获得topicQueueTable中队列集合List<QueueData> queueDataList = this.topicQueueTable.get(topicConfig.getTopicName());//topicQueueTable为空,则直接添加queueData到队列集合if (null == queueDataList) {queueDataList = new LinkedList<QueueData>();queueDataList.add(queueData);this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList);log.info("new topic registered, {} {}", topicConfig.getTopicName(), queueData);} else {//判断是否是新的队列boolean addNewOne = true;Iterator<QueueData> it = queueDataList.iterator();while (it.hasNext()) {QueueData qd = it.next();//如果brokerName相同,代表不是新的队列if (qd.getBrokerName().equals(brokerName)) {if (qd.equals(queueData)) {addNewOne = false;} else {log.info("topic changed, {} OLD: {} NEW: {}", topicConfig.getTopicName(), qd,queueData);it.remove();}}}//如果是新的队列,则添加队列到queueDataListif (addNewOne) {queueDataList.add(queueData);}}
}

8、维护 brokerLiveTable


//维护brokerLiveTable
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,new BrokerLiveInfo(System.currentTimeMillis(),topicConfigWrapper.getDataVersion(),channel,haServerAddr));

9、维护 filterServerList


//维护filterServerList
if (filterServerList != null) {if (filterServerList.isEmpty()) {this.filterServerTable.remove(brokerAddr);} else {this.filterServerTable.put(brokerAddr, filterServerList);}
}if (MixAll.MASTER_ID != brokerId) {String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);if (masterAddr != null) {BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);if (brokerLiveInfo != null) {result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());result.setMasterAddr(masterAddr);}}
}

四、 消息中间件 RocketMQ 源码分析:路由删除

1、路由删除

Broker每隔30s向NameServer发送一个心跳包,心跳包包含BrokerIdBroker地址,Broker名称,Broker所属集群名称、Broker关联的FilterServer列表。但是如果Broker宕机,NameServer无法收到心跳包,此时NameServer如何来剔除这些失效的Broker呢?NameServer会每隔10s扫描brokerLiveTable状态表,如果BrokerLivelastUpdateTimestamp的时间戳距当前时间超过120s,则认为Broker失效,移除该Broker,关闭与Broker连接,同时更新topicQueueTablebrokerAddrTablebrokerLiveTablefilterServerTable

2、RocketMQ 有两个触发点来删除路由信息

  • NameServer 定期扫描 brokerLiveTable 检测上次心跳包与当前系统的时间差,如果时间超过120s,则需要移除 broker。
  • Broker 在正常关闭的情况下,会执行 unregisterBroker 指令

这两种方式路由删除的方法都是一样的,就是从相关路由表中删除与该 broker 相关的信息。

在这里插入图片描述

3、代码:NamesrvController#initialize


//每隔10s扫描一次为活跃Broker
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {NamesrvController.this.routeInfoManager.scanNotActiveBroker();}
}, 5, 10, TimeUnit.SECONDS);

4、代码:RouteInfoManager#scanNotActiveBroker


public void scanNotActiveBroker() {//获得brokerLiveTableIterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();//遍历brokerLiveTablewhile (it.hasNext()) {Entry<String, BrokerLiveInfo> next = it.next();long last = next.getValue().getLastUpdateTimestamp();//如果收到心跳包的时间距当时时间是否超过120sif ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {//关闭连接RemotingUtil.closeChannel(next.getValue().getChannel());//移除brokerit.remove();//维护路由表this.onChannelDestroy(next.getKey(), next.getValue().getChannel());}}
}

5、代码:RouteInfoManager#onChannelDestroy


//申请写锁,根据brokerAddress从brokerLiveTable和filterServerTable移除
this.lock.writeLock().lockInterruptibly();
this.brokerLiveTable.remove(brokerAddrFound);
this.filterServerTable.remove(brokerAddrFound);

6、维护 brokerAddrTable


//维护brokerAddrTable
String brokerNameFound = null;
boolean removeBrokerName = false;
Iterator<Entry<String, BrokerData>> itBrokerAddrTable =this.brokerAddrTable.entrySet().iterator();
//遍历brokerAddrTable
while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {BrokerData brokerData = itBrokerAddrTable.next().getValue();//遍历broker地址Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator();while (it.hasNext()) {Entry<Long, String> entry = it.next();Long brokerId = entry.getKey();String brokerAddr = entry.getValue();//根据broker地址移除brokerAddrif (brokerAddr.equals(brokerAddrFound)) {brokerNameFound = brokerData.getBrokerName();it.remove();log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",brokerId, brokerAddr);break;}}//如果当前主题只包含待移除的broker,则移除该topicif (brokerData.getBrokerAddrs().isEmpty()) {removeBrokerName = true;itBrokerAddrTable.remove();log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed",brokerData.getBrokerName());}
}

7、维护 clusterAddrTable

//维护clusterAddrTable
if (brokerNameFound != null && removeBrokerName) {Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();//遍历clusterAddrTablewhile (it.hasNext()) {Entry<String, Set<String>> entry = it.next();//获得集群名称String clusterName = entry.getKey();//获得集群中brokerName集合Set<String> brokerNames = entry.getValue();//从brokerNames中移除brokerNameFoundboolean removed = brokerNames.remove(brokerNameFound);if (removed) {log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed",brokerNameFound, clusterName);if (brokerNames.isEmpty()) {log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",clusterName);//如果集群中不包含任何broker,则移除该集群it.remove();}break;}}
}

8、维护 topicQueueTable队列


//维护topicQueueTable队列
if (removeBrokerName) {//遍历topicQueueTableIterator<Entry<String, List<QueueData>>> itTopicQueueTable =this.topicQueueTable.entrySet().iterator();while (itTopicQueueTable.hasNext()) {Entry<String, List<QueueData>> entry = itTopicQueueTable.next();//主题名称String topic = entry.getKey();//队列集合List<QueueData> queueDataList = entry.getValue();//遍历该主题队列Iterator<QueueData> itQueueData = queueDataList.iterator();while (itQueueData.hasNext()) {//从队列中移除为活跃broker信息QueueData queueData = itQueueData.next();if (queueData.getBrokerName().equals(brokerNameFound)) {itQueueData.remove();log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed",topic, queueData);}}//如果该topic的队列为空,则移除该topicif (queueDataList.isEmpty()) {itTopicQueueTable.remove();log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed",topic);}}
}

9、释放写锁

//释放写锁
finally {this.lock.writeLock().unlock();
}

五、 消息中间件 RocketMQ 源码分析:路由发现和小结

1、路由发现

RocketMQ 路由发现是非实时的,当 Topic 路由出现变化后,NameServer 不会主动推送给客户端,而是由客户端定时拉取主题最新的路由。

2、代码:DefaultRequestProcessor#getRouteInfoByTopic


public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);final GetRouteInfoRequestHeader requestHeader =(GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);//调用RouteInfoManager的方法,从路由表topicQueueTable、brokerAddrTable、filterServerTable中分别填充TopicRouteData的List<QueueData>、List<BrokerData>、filterServerTopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());//如果找到主题对应你的路由信息并且该主题为顺序消息,则从NameServer KVConfig中获取关于顺序消息相关的配置填充路由信息if (topicRouteData != null) {if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {String orderTopicConf =this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,requestHeader.getTopic());topicRouteData.setOrderTopicConf(orderTopicConf);}byte[] content = topicRouteData.encode();response.setBody(content);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;}response.setCode(ResponseCode.TOPIC_NOT_EXIST);response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));return response;
}

3、 NameServer 小结

在这里插入图片描述

# 消息中间件 RocketMQ 高级功能和源码分析(四)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/30428.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

一个小的画布Canvas页面,记录点的轨迹

Hello大家好&#xff0c;好久没有更新了&#xff0c;最近在忙一些其他的事&#xff0c;今天说一下画布canvas&#xff0c;下面是我的代码&#xff0c;实现了一个点从画布的&#xff08;0,0&#xff09;到&#xff08;canvas.width&#xff0c;canvas.height&#xff09;的一个实…

60.指针数组和数组指针

一.指针数组 指针数组是一个数组&#xff0c;在指针数组中存放的是指针变量。 定义一个指针数组p int *p[5]; 内存模型如下&#xff1a; 指针数组的初始化 #include <stdio.h>int main(void) {int a1;int b2;int c3;int i;int *p[3] {&a,&b,&c};for(i0…

椭圆的标准方程与协方差矩阵的特征值和特征向量的关系

椭圆的标准方程与协方差矩阵的特征值和特征向量的关系 flyfish 单位圆 &#xff1a;单位圆表示在标准正交基下的分布。 椭圆 &#xff1a;通过协方差矩阵的特征向量和特征值变换得到的椭圆&#xff0c;表示数据在新的坐标系下的分布。 特征向量 &#xff1a;红色箭头表示特征…

Android sensor列表和访问记录

命令: dumpsys sensorservice 1.dumpsys sensorservice查看最近申请记录 dumpsys sensorservice命令输出Previous Registrations. Previous Registrations: 23:07:43 0x00000008 pid16587 uid10397 packagecom.start.testdemo.ui.udfp.fql.XsqFQLActivity samplingPeriod66…

07.MyBatis映射器:一对一关联查询

大家好&#xff0c;我是王有志&#xff0c;一个分享硬核 Java 技术的金融摸鱼侠&#xff0c;欢迎大家加入 Java 人自己的交流群“共同富裕的 Java 人”。 《MyBatis 映射器&#xff1a;实现简单的 SQL 语句》中&#xff0c;我们在 MyBatis 映射器的查询语句中使用 resultType 元…

AMBA-CHI协议详解(四)

《AMBA 5 CHI Architecture Specification》 AMBA-CHI协议详解&#xff08;一&#xff09; AMBA-CHI协议详解&#xff08;二&#xff09; AMBA-CHI协议详解&#xff08;三&#xff09; AMBA-CHI协议详解&#xff08;四&#xff09; 文章目录 2.3.3 Atomic transactions2.3.4 S…

MySql进阶:深入理解MySQL语句执行逻辑

深入理解MySQL语句执行逻辑 一、前言 本文源自微博客(www.microblog.store),且以获得授权 一直是想知道一条SQL语句是怎么被执行的&#xff0c;它执行的顺序是怎样的&#xff0c;然后查看总结各方资料&#xff0c;就有了下面这一篇博文了。   本文将从MySQL总体架构—&…

plc如何接线

在开始前刚好我有一些资料&#xff0c;是我根据网友给的问题精心整理了一份「plc的资料从专业入门到高级教程」&#xff0c; 点个关注在评论区回复“666”之后私信回复“666”&#xff0c;全部无偿共享给大家&#xff01;&#xff01;&#xff01;PLC自动化控制在电气自动化和…

Adobe Premiere 视频编辑软件下载安装,pr 全系列资源分享!

Adobe Premiere以其强大的功能、灵活的操作和卓越的性能&#xff0c;成为视频编辑领域的佼佼者。 在剪辑方面&#xff0c;Adobe Premiere提供了强大而灵活的工具集。用户可以在直观的时间线上对视频进行精细的裁剪、剪辑和合并操作。无论是快速剪辑短片&#xff0c;还是精心打造…

springboot心理健康线上咨询系统APP-计算机毕业设计源码031539

摘 要 信息化社会内需要与之针对性的信息获取途径&#xff0c;但是途径的扩展基本上为人们所努力的方向&#xff0c;由于站在的角度存在偏差&#xff0c;人们经常能够获得不同类型信息&#xff0c;这也是技术最为难以攻克的课题。针对心理健康咨询等问题&#xff0c;对其进行研…

钡铼BL102应用智能电网配电柜PLC转MQTT无线接云服务

在当今智能电网的发展浪潮中&#xff0c;配电系统的智能化升级是提升电网效率与稳定性的重要环节。随着物联网技术的飞速发展&#xff0c;实现配电柜的远程监控与管理成为了可能&#xff0c;而这一转变的关键在于如何有效地将传统配电柜中的PLC数据接入到云端进行分析与处理。 …

大数据关联规则算法

关联性&#xff08;Association&#xff09; 定义&#xff1a;指一个变量能够提供有关另一个变量的信息。特点&#xff1a;关联性是一个广泛的概念&#xff0c;它可以包括直接的、间接的、强的或弱的联系。 相关性&#xff08;Correlation&#xff09; 定义&#xff1a;指两个…

AI视频智能监管赋能城市管理:打造安全有序的城市环境

一、方案背景 随着城市化进程的加速和科技的飞速发展&#xff0c;街道治安问题日益凸显&#xff0c;治安监控成为维护社会稳定和保障人民安全的重要手段。当前&#xff0c;许多城市已经建立了较为完善的治安监控体系&#xff0c;但仍存在一些问题。例如&#xff0c;监控设备分…

window 卸载应用商店程序

# 使用Get-AppxPackage获取所有应用程序 # 使用Remove-AppxPackage PythonSoftwareFoundation.Python.3.12_3.12.1264.0_x64__qbz5n2kfra8p0

【Linux 基础】目录结构

Linux 的目录结构&#xff08;也称为文件系统结构&#xff09;是组织文件和目录的一种逻辑方式。每个文件和目录在文件系统中都有一个唯一的位置或路径。 Linux文件系统是整个操作系统的基础架构&#xff0c;对于系统的稳定运行、数据安全以及用户操作便捷性至关重要&#xff0…

webhook-k8s API和apimachinery版本高于Client-go

1. 问题 使用go mod tidy 存在丢弃的版本 go: downloading github.com/josharian/intern v1.0.0 go: finding module for package k8s.io/api/flowcontrol/v1alpha1 go: simple-webhook/types importsk8s.io/client-go/rest tested byk8s.io/client-go/rest.test importsk8s.…

场外个股期权通道业务是什么意思?

今天带你了解场外个股期权通道业务是什么意思&#xff1f;场外个股期权业务是指在沪深交易所之外进行的个股期权交易。它是一种非标准化的合约&#xff0c;不在交易所内进行交割。 场外个股期权通道业务&#xff0c;是指投资者通过与场外个股期权机构通道签订合约&#xff0c;购…

uni-app中使用富文本rich-text个人经验

rich-text是在uni-app一个内置组件&#xff0c;用于高性能地渲染富文本内容。先贴一下官方的属性列表&#xff1a; 先说一下“selectable” 长按选择区域复制&#xff0c;这个我在APP项目中 不起作用&#xff0c;可能像文档说的&#xff0c;只支持“百度小程序”吧。在APP端起作…

CUDA系列-Mem-9

这里写目录标题 Static Architecture.Abstractions provided by CUSW_UNIT_MEM_MANAGERMemory Object (CUmemobj) Memory Descriptor(CUmemdesc)Memory Block(CUmemblock)Memory BinsSuballocations in Memory BlockFunctional description Memory Manager 你可能觉得奇怪&…

使用SQLite

自学python如何成为大佬(目录):https://blog.csdn.net/weixin_67859959/article/details/139049996?spm1001.2014.3001.5501 与许多其他数据库管理系统不同&#xff0c;SQLite不是一个客户端/服务器结构的数据库引擎&#xff0c;而是一种嵌入式数据库&#xff0c;它的数据库就…