在NameServer与Broker启动之后,我们就可以来创建Producer进行生产消息,客户端常用的生产类是DefaultMQProducer,我们启动消费者其实就是调用该类的start方法。
初始化逻辑
通过构建一起DefaultMQProducer类来实现初始化,查看源码我们可以得到DefaultMQProducer类的构造方法存在很多类型,但是最终会去执行最后一个构造方法。
public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) {// 命名空间this.namespace = namespace;// 生产者组this.producerGroup = producerGroup;// 创建DefaultMQProducerImpl实例,负责发送消息defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
}
该构造函数主要就是用来指定命名空间,生产者组以及用来处理消息的发送的DefaultMQProducerImpl类,该类里面包含消息处理的所有方法。
public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {this.defaultMQProducer = defaultMQProducer;this.rpcHook = rpcHook;// 异步发送消息的线程池队列this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000);// 初始化线程this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),Runtime.getRuntime().availableProcessors(),1000 * 60,TimeUnit.MILLISECONDS,this.asyncSenderThreadPoolQueue,new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());}});
}
主要就是初始化一个异步发送消息的线程池。
启动逻辑
生产者的启动逻辑调用start方法,其实就是去调用DefaultMQProducerImpl的start方法。
public void start(final boolean startFactory) throws MQClientException {// 根据serviceState的状态来判断是否重复启动switch (this.serviceState) {case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;// 检查配置信息this.checkConfig();if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {this.defaultMQProducer.changeInstanceNameToPID();}// 客户端核心管理组件的初始化mQClientFactorythis.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);// 往核心组件中注册一个Producer,往hashMap插入数据boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);// 如果存在就报错if (!registerOK) {this.serviceState = ServiceState.CREATE_JUST;throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),null);}// 注册topicPublishInfoTablethis.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());if (startFactory) {// 启动客户端通讯实例mQClientFactory.start();}log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),this.defaultMQProducer.isSendMessageWithVIPChannel());this.serviceState = ServiceState.RUNNING;break;case RUNNING:case START_FAILED:case SHUTDOWN_ALREADY:throw new MQClientException("The producer service state not OK, maybe started once, "+ this.serviceState+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),null);default:break;}this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();RequestFutureHolder.getInstance().startScheduledTask(this);
}
首先使用checkConfig去检查配置信息,比如生产者组等是否符合规范,并通过getOrCreateMQClientInstance方法根据clientId获取mQClientFactory的实例。
将生产者注册到MQClientInstance实例的ProducerTable当中,同时注册注册topicPublishInfoTable对象。
启动MQClientInstance实例,调用其start方法用于初始化netty服务,定时任务,拉取消息服务等。
最后主动调用sendHeartbeatToAllBrokerWithLock方法发送心跳信息给所有Broker并且移除超时的request请求,执行异常回调。
MQClientInstance的启动
MQClientInstance类封装了RockerMQ底层通讯处理的API,Producer与Consumer类都会使用到这个类,它是Producer,Consumer与NameServer,Broker打交道的网络通道,这个类可以理解成一个工厂,是对消费者与生产者以及控制台三者的一个合集,内部封装了netty客户端,消息的生成,消费和负载均衡的实现类等。
MQClientInstance类的初始化
public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {this.clientConfig = clientConfig;this.instanceIndex = instanceIndex;// 创建netty客户端配置类this.nettyClientConfig = new NettyClientConfig();this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());// 客户端请求处理器this.clientRemotingProcessor = new ClientRemotingProcessor(this);// 创建客户端远程通信API实现类的实例this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);// 更新nameServer的地址if (this.clientConfig.getNamesrvAddr() != null) {this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());log.info("user specified name server address: {}", this.clientConfig.getNamesrvAddr());}// 客户端IDthis.clientId = clientId;// mq的admin控制台操作实现this.mQAdminImpl = new MQAdminImpl(this);// push模式下拉取消息服务this.pullMessageService = new PullMessageService(this);// 负载均衡服务器this.rebalanceService = new RebalanceService(this);this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);this.defaultMQProducer.resetClientConfig(clientConfig);// 消费者统计管理器this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);log.info("Created a new client Instance, InstanceIndex:{}, ClientID:{}, ClientConfig:{}, ClientVersion:{}, SerializerType:{}",this.instanceIndex,this.clientId,this.clientConfig,MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer());
}
初始化MQClientInstance类的时候会初始化netty客户端以及各种服务实例等。
MQClientInstance类的启动
在调用MQClientInstance的start方法时,会启动很多相关的类比如初始化netty客户端等,接下来我们就一起来分析。
public void start() throws MQClientException {synchronized (this) {switch (this.serviceState) {case CREATE_JUST:// 状态的二次校验this.serviceState = ServiceState.START_FAILED;// If not specified,looking address from name serverif (null == this.clientConfig.getNamesrvAddr()) {// 获取nameServer的地址this.mQClientAPIImpl.fetchNameServerAddr();}// Start request-response channel+// 启动mQClientAPIImpl,建立客户端与服务端的channelthis.mQClientAPIImpl.start();// Start various schedule tasks// 启动定时任务this.startScheduledTask();// Start pull service// 启动拉消息服务// 发送消息this.pullMessageService.start();// Start rebalance service// 负载均衡 rebalanceService为线程类,start方法执行run方法// 负载均衡服务,消费者与生产者建立关系this.rebalanceService.start();// Start push servicethis.defaultMQProducer.getDefaultMQProducerImpl().start(false);log.info("the client factory [{}] start OK", this.clientId);this.serviceState = ServiceState.RUNNING;break;case START_FAILED:throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);default:break;}}
}
首先会去启动一个netty的客户端,在前面分析的NettyRemotingServer类里面有过类似的启动逻辑,这里就不再重复,感兴趣的客户去看一下。
调用this.startScheduledTask()启动很多的定时任务,获取nameServer地址的,更新topic路由的,清除取消Broker以及发送心跳信息到所有Broker,持久化消费者进度,广播消息持久化到本地,集群消息推送到Broker端,尝试调整消费线程池的线程数量(目前还未实现该方法)。
注意:该类为生产者和消费者公用的类,所以作用于每一个Producer与Consumer。
private void startScheduledTask() {if (null == this.clientConfig.getNamesrvAddr()) {// 定时获取nameServer的地址this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();} catch (Exception e) {log.error("ScheduledTask fetchNameServerAddr exception", e);}}}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);}// 更新topic路由this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {MQClientInstance.this.updateTopicRouteInfoFromNameServer();} catch (Exception e) {log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);}}}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);// 清理无效的Broker以及发送心跳信息给所有的Brokerthis.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {MQClientInstance.this.cleanOfflineBroker();MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();} catch (Exception e) {log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);}}}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);// 持久化消费者偏移量,即消费进度 广播消息持久化到本地,集群消息推送到Broker端this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {MQClientInstance.this.persistAllConsumerOffset();} catch (Exception e) {log.error("ScheduledTask persistAllConsumerOffset exception", e);}}}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);// 尝试调整push模式的消费线程池的线程数量(没有实现逻辑)this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {MQClientInstance.this.adjustThreadPool();} catch (Exception e) {log.error("ScheduledTask adjustThreadPool exception", e);}}}, 1, 1, TimeUnit.MINUTES);
}
更新topic路由信息
每隔30s从nameServer内部拉取并更新topic路由消息。
public void updateTopicRouteInfoFromNameServer() {// 使用set集合就是为了有效的去重Set<String> topicList = new HashSet<String>();// 从ConsumerTable中获取{Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();while (it.hasNext()) {Entry<String, MQConsumerInner> entry = it.next();MQConsumerInner impl = entry.getValue();if (impl != null) {Set<SubscriptionData> subList = impl.subscriptions();if (subList != null) {for (SubscriptionData subData : subList) {topicList.add(subData.getTopic());}}}}}// 从producerTable中获取{Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();while (it.hasNext()) {Entry<String, MQProducerInner> entry = it.next();MQProducerInner impl = entry.getValue();if (impl != null) {Set<String> lst = impl.getPublishTopicList();topicList.addAll(lst);}}}// 比较更新本地的路由信息for (String topic : topicList) {/*** 从nameSerer拉取到topic路由信息之后,调用topicRouteDataIsChange方法与本地* 的旧topic路由信息比较看是否更改,比较的数据包括:topic的队列信息queueDatas,* topic的broker信息brokerDatas,顺序topic配置orderTopicConf,* 消费过滤信息filterServerTable。* 当判断需要更新的时候,会更新本地的topic缓存,包括:* 1. 更新brokerName到brokerAddr的地址的映射关系,即brokerAddrTable;* 2. 更新生产者producerTable集合,更新MQProducerInner的topicPublishInfoTable属性;* 3. 更新消费者的consumerTable集合,更新MQConsumerInner的rebalanceImpl.topicSubscribeInfoTable属性;* 4. 更新topicRouteTable集合,更新本地topic路由信息。*/this.updateTopicRouteInfoFromNameServer(topic);}
}
主要逻辑就是从consumerTable以及producerTable中获取配置的所有topic集合,包括consumer订阅的topic集合以及Producer中topicPublishTable集合中的数据。
从nameSerer拉取到topic路由信息之后,调用topicRouteDataIsChange方法与本地的旧topic路由信息比较看是否更改,比较的数据包括:topic的队列信息queueDatas,topic的broker信息brokerDatas,顺序topic配置orderTopicConf,消费过滤信息filterServerTable。
清理无效的Broker
调用cleanOfflineBroker()方法区清除下线的Broker。
private void cleanOfflineBroker() {try {// 加锁if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS))try {ConcurrentHashMap<String, HashMap<Long, String>> updatedTable = new ConcurrentHashMap<String, HashMap<Long, String>>(this.brokerAddrTable.size(), 1);Iterator<Entry<String, HashMap<Long, String>>> itBrokerTable = this.brokerAddrTable.entrySet().iterator();// 遍历brokerAddrTablewhile (itBrokerTable.hasNext()) {Entry<String, HashMap<Long, String>> entry = itBrokerTable.next();String brokerName = entry.getKey();HashMap<Long, String> oneTable = entry.getValue();HashMap<Long, String> cloneAddrTable = new HashMap<Long, String>(oneTable.size(), 1);cloneAddrTable.putAll(oneTable);Iterator<Entry<Long, String>> it = cloneAddrTable.entrySet().iterator();while (it.hasNext()) {Entry<Long, String> ee = it.next();String addr = ee.getValue();// 判断broker地址是否存在于topicRouteTable的任意一个topic的路由信息中,// 如果不存在则直接移除该broker地址if (!this.isBrokerAddrExistInTopicRouteTable(addr)) {it.remove();log.info("the broker addr[{} {}] is offline, remove it", brokerName, addr);}}// 集合为空就移除if (cloneAddrTable.isEmpty()) {itBrokerTable.remove();log.info("the broker[{}] name's host is offline, remove it", brokerName);} else {// 否则更新剩下的updatedTable.put(brokerName, cloneAddrTable);}}// 不为空直接更新if (!updatedTable.isEmpty()) {this.brokerAddrTable.putAll(updatedTable);}} finally {this.lockNamesrv.unlock();}} catch (InterruptedException e) {log.warn("cleanOfflineBroker Exception", e);}
}
该方法间隔30s就会被执行,遍历并更新brokerAddrTable集合,其主要的步骤就是获取每一个人对象地址,首先去本地topicRouteTable是否存在,存在即保留不存在就表名Broker以下线需要被清除,如果brokerAddrTable中的value集合也是空则会直接删除键值对。
发送心跳信息给Broker
该方法每隔30s向所有的Broker发送心跳包的定时任务方法,客户的consumer和producer都是通过该定时任务发送心跳数据包的。在其他地方也会主动调用一次该方法,例如DefaultMQProducerImpl、DefaultMQPushConsumerImpl等类的start方法的结尾都会主动调用一次该方法。
public void sendHeartbeatToAllBrokerWithLock() {// 加锁if (this.lockHeartbeat.tryLock()) {try {// 发送心跳包给所有brokerthis.sendHeartbeatToAllBroker();// 上传过滤类到Broker对应的所有Filtersrv,push模式消费使用this.uploadFilterClassSource();} catch (final Exception e) {log.error("sendHeartbeatToAllBroker exception", e);} finally {this.lockHeartbeat.unlock();}} else {log.warn("lock heartBeat, but failed. [{}]", this.clientId);}
}
首先会通过prepareHeartbeatData方法准备心跳包的数据进行发送,然后会遍历brokerAddrTable的broker地址进行循环发送心跳包。
注意:如果单单启动了生产者,这时候我们要去判断是否为Master节点,由于生产者只能与Master发送消息数据,而当我们启动消费者此时我们就需要向所有的broker发送心跳包。
持久化消费偏移量
@Override
public void persistConsumerOffset() {try {this.makeSureStateOK();Set<MessageQueue> mqs = new HashSet<MessageQueue>();Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();mqs.addAll(allocateMq);// 持久化的方法this.offsetStore.persistAll(mqs);} catch (Exception e) {log.error("group: " + this.defaultMQPushConsumer.getConsumerGroup() + " persistConsumerOffset exception", e);}
}
调用persistAllConsumerOffset()进行持久化,而持久化的功能时消费端所有的。通过调用DefaultMQPushConsumer的persistAll()方法,如果你是广播就会执行LocalFileOffsetStore的对应方法,如果你是集群就会执行RemoteBrokerOffsetStore的对应方法。
发送消息的逻辑在Producer的消息发送逻辑里面,下一章分析。
负载均衡也是相对于消费端,消费端在处理消息的时候使用的消费模式,后期去消费端里面详解。
@Override
public void run() {log.info(this.getServiceName() + " service started");while (!this.isStopped()) {this.waitForRunning(waitInterval);// 执行该方法进行负载均衡this.mqClientFactory.doRebalance();}log.info(this.getServiceName() + " service end");
}