消息中间件 RocketMQ 高级功能和源码分析(十)
一、消息中间件 RocketMQ 源码分析: 消息消费概述
1、集群模式和广播模式
消息消费以组的模式开展,一个消费组内可以包含多个消费者,每一个消费者组可订阅多个主题,消费组之间有集群模式和广播模式两种消费模式。
- 集群模式,主题下的同一条消息只允许被其中一个消费者消费。
- 广播模式,主题下的同一条消息,将被集群内的所有消费者消费一次。
2、消息服务器与消费者之间的消息传递也有两种模式:推模式、拉模式。
所谓的拉模式,是消费端主动拉起拉消息请求,
而推模式是消息达到消息服务器后,推送给消息消费者。
RocketMQ 消息推模式的实现基于拉模式,在拉模式上包装一层,一个拉取任务完成后开始下一个拉取任务。
3、集群模式下,多个消费者如何对消息队列进行负载呢?消息队列负载机制遵循一个通用思想:一个消息队列同一个时间只允许被一个消费者消费,一个消费者可以消费多个消息队列。
4、RocketMQ 支持局部顺序消息消费,也就是保证同一个消息队列上的消息顺序消费。不支持消息全局顺序消费,如果要实现某一个主题的全局顺序消费,可以将该主题的队列数设置为1,牺牲高可用性。
二、消息中间件 RocketMQ 源码分析: 消息消费初探
1、消息推送模式
2、 消息消费重要方法
void sendMessageBack(final MessageExt msg, final int delayLevel, final String brokerName):发送消息确认
Set<MessageQueue> fetchSubscribeMessageQueues(final String topic) :获取消费者对主题分配了那些消息队列
void registerMessageListener(final MessageListenerConcurrently messageListener):注册并发事件监听器
void registerMessageListener(final MessageListenerOrderly messageListener):注册顺序消息事件监听器
void subscribe(final String topic, final String subExpression):基于主题订阅消息,消息过滤使用表达式
void subscribe(final String topic, final String fullClassName,final String filterClassSource):基于主题订阅消息,消息过滤使用类模式
void subscribe(final String topic, final MessageSelector selector) :订阅消息,并指定队列选择器
void unsubscribe(final String topic):取消消息订阅
3、 DefaultMQPushConsumer
//消费者组
private String consumerGroup;
//消息消费模式
private MessageModel messageModel = MessageModel.CLUSTERING;
//指定消费开始偏移量(最大偏移量、最小偏移量、启动时间戳)开始消费
private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
//集群模式下的消息队列负载策略
private AllocateMessageQueueStrategy allocateMessageQueueStrategy;
//订阅信息
private Map<String /* topic */, String /* sub expression */> subscription = new HashMap<String, String>();
//消息业务监听器
private MessageListener messageListener;
//消息消费进度存储器
private OffsetStore offsetStore;
//消费者最小线程数量
private int consumeThreadMin = 20;
//消费者最大线程数量
private int consumeThreadMax = 20;
//并发消息消费时处理队列最大跨度
private int consumeConcurrentlyMaxSpan = 2000;
//每1000次流控后打印流控日志
private int pullThresholdForQueue = 1000;
//推模式下任务间隔时间
private long pullInterval = 0;
//推模式下任务拉取的条数,默认32条
private int pullBatchSize = 32;
//每次传入MessageListener#consumerMessage中消息的数量
private int consumeMessageBatchMaxSize = 1;
//是否每次拉取消息都订阅消息
private boolean postSubscriptionWhenPull = false;
//消息重试次数,-1代表16次
private int maxReconsumeTimes = -1;
//消息消费超时时间
private long consumeTimeout = 15;
三、消息中间件 RocketMQ 源码分析: 消息消费启动流程
1、消息消费启动流程 示例图:
2、 代码:DefaultMQPushConsumerImpl#start
public synchronized void start() throws MQClientException {switch (this.serviceState) {case CREATE_JUST:this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());this.serviceState = ServiceState.START_FAILED;//检查消息者是否合法this.checkConfig();//构建主题订阅信息this.copySubscription();//设置消费者客户端实例名称为进程IDif (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {this.defaultMQPushConsumer.changeInstanceNameToPID();}//创建MQClient实例this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);//构建rebalanceImplthis.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());this.rebalanceImpl.setmQClientFactory(this.mQClientFactorthis.pullAPIWrapper = new PullAPIWrapper(mQClientFactory,this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookLisif (this.defaultMQPushConsumer.getOffsetStore() != null) {this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();} else {switch (this.defaultMQPushConsumer.getMessageModel()) {case BROADCASTING: //消息消费广播模式,将消费进度保存在本地this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;case CLUSTERING: //消息消费集群模式,将消费进度保存在远端Brokerthis.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;default:break;}this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);}this.offsetStore.load//创建顺序消息消费服务if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {this.consumeOrderly = true;this.consumeMessageService =new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());//创建并发消息消费服务} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {this.consumeOrderly = false;this.consumeMessageService =new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());}//消息消费服务启动this.consumeMessageService.start();//注册消费者实例boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);if (!registerOK) {this.serviceState = ServiceState.CREATE_JUST;this.consumeMessageService.shutdown();throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),null);//启动消费者客户端mQClientFactory.start();log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());this.serviceState = ServiceState.RUNNING;break;case RUNNING:case START_FAILED:case SHUTDOWN_ALREADY:throw new MQClientException("The PushConsumer service state not OK, maybe started once, "+ this.serviceState+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),null);default:break;}this.updateTopicSubscribeInfoWhenSubscriptionChanged();this.mQClientFactory.checkClientInBroker();this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();this.mQClientFactory.rebalanceImmediately();
}
四、消息中间件 RocketMQ 源码分析:消息拉取介绍
1、消息拉取 介绍
-
消息消费模式有两种模式:广播模式与集群模式。广播模式比较简单,每一个消费者需要拉取订阅主题下所有队列的消息。
-
在集群模式下,同一个消费者组内有多个消息消费者,同一个主题存在多个消费队列,消费者通过负载均衡的方式消费消息。
-
消息队列负载均衡,通常的作法是一个消息队列在同一个时间只允许被一个消费消费者消费,一个消息消费者可以同时消费多个消息队列。
2、 PullMessageService 实现机制
从 MQClientInstance 的启动流程中可以看出,RocketMQ 使用一个单独的线程 PullMessageService 来负责消息的拉取。
3、 代码:PullMessageService#run
public void run() {log.info(this.getServiceName() + " service started");//循环拉取消息while (!this.isStopped()) {try {//从请求队列中获取拉取消息请求PullRequest pullRequest = this.pullRequestQueue.take();//拉取消息this.pullMessage(pullRequest);} catch (InterruptedException ignored) {} catch (Exception e) {log.error("Pull Message Service Run Method exception", e);}}log.info(this.getServiceName() + " service end");
}
4、 PullRequest
private String consumerGroup; //消费者组
private MessageQueue messageQueue; //待拉取消息队列
private ProcessQueue processQueue; //消息处理队列
private long nextOffset; //待拉取的MessageQueue偏移量
private boolean lockedFirst = false; //是否被锁定
5、 代码:PullMessageService#pullMessage
private void pullMessage(final PullRequest pullRequest) {//获得消费者实例final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());if (consumer != null) {//强转为推送模式消费者DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;//推送消息impl.pullMessage(pullRequest);} else {log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);}
}
6、 ProcessQueue 实现机制
ProcessQueue 是 MessageQueue 在消费端的重现、快照。PullMessageService 从消息服务器默认每次拉取 32 条消息,按照消息的队列偏移量顺序存放在 ProcessQueue 中,PullMessageService 然后将消息提交到消费者消费线程池,消息成功消费后从 ProcessQueue 中移除。
7、 属性
//消息容器
private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();
//读写锁
private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock();
//ProcessQueue总消息树
private final AtomicLong msgCount = new AtomicLong();
//ProcessQueue队列最大偏移量
private volatile long queueOffsetMax = 0L;
//当前ProcessQueue是否被丢弃
private volatile boolean dropped = false;
//上一次拉取时间戳
private volatile long lastPullTimestamp = System.currentTimeMillis();
//上一次消费时间戳
private volatile long lastConsumeTimestamp = System.currentTimeMillis();
8、 方法
//移除消费超时消息
public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer)
//添加消息
public boolean putMessage(final List<MessageExt> msgs)
//获取消息最大间隔
public long getMaxSpan()
//移除消息
public long removeMessage(final List<MessageExt> msgs)
//将consumingMsgOrderlyTreeMap中消息重新放在msgTreeMap,并清空consumingMsgOrderlyTreeMap
public void rollback()
//将consumingMsgOrderlyTreeMap消息清除,表示成功处理该批消息
public long commit()
//重新处理该批消息
public void makeMessageToCosumeAgain(List<MessageExt> msgs)
//从processQueue中取出batchSize条消息
public List<MessageExt> takeMessags(final int batchSize)
五、消息中间件 RocketMQ 源码分析: 客户端发起拉取消息请求
1、消息拉取基本流程 示例图:
2、客户端发起拉取请求
代码:DefaultMQPushConsumerImpl#pullMessage
public void pullMessage(final PullRequest pullRequest) {//从pullRequest获得ProcessQueuefinal ProcessQueue processQueue = pullRequest.getProcessQueue();//如果处理队列被丢弃,直接返回if (processQueue.isDropped()) {log.info("the pull request[{}] is dropped.", pullRequest.toString());return;}//如果处理队列未被丢弃,更新时间戳pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());try {this.makeSureStateOK();} catch (MQClientException e) {log.warn("pullMessage exception, consumer state not ok", e);this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);return;}//如果处理队列被挂起,延迟1s后再执行if (this.isPause()) {log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);return;}//获得最大待处理消息数量long cachedMessageCount = processQueue.getMsgCount().get();//获得最大待处理消息大小long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);//从数量进行流控if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);if ((queueFlowControlTimes++ % 1000) == 0) {log.warn("the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);}return;}//从消息大小进行流控if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);if ((queueFlowControlTimes++ % 1000) == 0) {log.warn("the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);}return;}//获得订阅信息final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());if (null == subscriptionData) {this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);log.warn("find the consumer's subscription failed, {}", pullRequest);return;//与服务端交互,获取消息this.pullAPIWrapper.pullKernelImpl(pullRequest.getMessageQueue(),subExpression,subscriptionData.getExpressionType(),subscriptionData.getSubVersion(),pullRequest.getNextOffset(),this.defaultMQPushConsumer.getPullBatchSize(),sysFlag,commitOffsetValue,BROKER_SUSPEND_MAX_TIME_MILLIS,CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,CommunicationMode.ASYNC,pullCallback);}
六、消息中间件 RocketMQ 源码分析: Broker 组装消息
1、消息服务端 Broker 组装消息 示例图:
2、 代码:PullMessageProcessor#processRequest
//构建消息过滤器
MessageFilter messageFilter;
if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData,this.brokerController.getConsumerFilterManager());
} else {messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData,this.brokerController.getConsumerFilterManager());
}
//调用MessageStore.getMessage查找消息
final GetMessageResult getMessageResult =this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), //消费组名称 requestHeader.getTopic(), //主题名称requestHeader.getQueueId(), //队列IDrequestHeader.getQueueOffset(), //待拉取偏移量requestHeader.getMaxMsgNums(), //最大拉取消息条数messageFilter //消息过滤器);
3、 代码:DefaultMessageStore#getMessage
GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
long nextBeginOffset = offset; //查找下一次队列偏移量
long minOffset = 0; //当前消息队列最小偏移量
long maxOffset = 0; //当前消息队列最大偏移量
GetMessageResult getResult = new GetMessageResult();
final long maxOffsetPy = this.commitLog.getMaxOffset(); //当前commitLog最大偏移量
//根据主题名称和队列编号获取消息消费队列
ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);...
minOffset = consumeQueue.getMinOffsetInQueue();
maxOffset = consumeQueue.getMaxOffsetInQueue();
//消息偏移量异常情况校对下一次拉取偏移量
if (maxOffset == 0) { //表示当前消息队列中没有消息status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;nextBeginOffset = nextOffsetCorrection(offset, 0);
} else if (offset < minOffset) { //待拉取消息的偏移量小于队列的其实偏移量status = GetMessageStatus.OFFSET_TOO_SMALL;nextBeginOffset = nextOffsetCorrection(offset, minOffset);
} else if (offset == maxOffset) { //待拉取偏移量为队列最大偏移量status = GetMessageStatus.OFFSET_OVERFLOW_ONE;nextBeginOffset = nextOffsetCorrection(offset, offset);
} else if (offset > maxOffset) { //偏移量越界status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;if (0 == minOffset) {nextBeginOffset = nextOffsetCorrection(offset, minOffset);} else {nextBeginOffset = nextOffsetCorrection(offset, maxOffset);}
}
...
//根据偏移量从CommitLog中拉取32条消息
SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
4、 代码:PullMessageProcessor#processRequest
//根据拉取结果填充responseHeader
response.setRemark(getMessageResult.getStatus().name());
responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
responseHeader.setMinOffset(getMessageResult.getMinOffset());
responseHeader.setMaxOffset(getMessageResult.getMaxOffset());//判断如果存在主从同步慢,设置下一次拉取任务的ID为主节点
switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) {case ASYNC_MASTER:case SYNC_MASTER:break;case SLAVE:if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) {response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);}break;
}
...
//GetMessageResult与Response的Code转换
switch (getMessageResult.getStatus()) {case FOUND: //成功response.setCode(ResponseCode.SUCCESS);break;case MESSAGE_WAS_REMOVING: //消息存放在下一个commitLog中response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY); //消息重试break;case NO_MATCHED_LOGIC_QUEUE: //未找到队列case NO_MESSAGE_IN_QUEUE: //队列中未包含消息if (0 != requestHeader.getQueueOffset()) {response.setCode(ResponseCode.PULL_OFFSET_MOVED);requestHeader.getQueueOffset(),getMessageResult.getNextBeginOffset(),requestHeader.getTopic(),requestHeader.getQueueId(),requestHeader.getConsumerGroup());} else {response.setCode(ResponseCode.PULL_NOT_FOUND);}break;case NO_MATCHED_MESSAGE: //未找到消息response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);break;case OFFSET_FOUND_NULL: //消息物理偏移量为空response.setCode(ResponseCode.PULL_NOT_FOUND);break;case OFFSET_OVERFLOW_BADLY: //offset越界response.setCode(ResponseCode.PULL_OFFSET_MOVED);// XXX: warn and notify melog.info("the request offset: {} over flow badly, broker max offset: {}, consumer: {}",requestHeader.getQueueOffset(), getMessageResult.getMaxOffset(), channel.remoteAddress());break;case OFFSET_OVERFLOW_ONE: //offset在队列中未找到response.setCode(ResponseCode.PULL_NOT_FOUND);break;case OFFSET_TOO_SMALL: //offset未在队列中response.setCode(ResponseCode.PULL_OFFSET_MOVED);requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(),getMessageResult.getMinOffset(), channel.remoteAddress());break;default:assert false;break;
}
...
//如果CommitLog标记可用,并且当前Broker为主节点,则更新消息消费进度
boolean storeOffsetEnable = brokerAllowSuspend;
storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;
storeOffsetEnable = storeOffsetEnable&& this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
if (storeOffsetEnable) {this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
}
七、消息中间件 RocketMQ 源码分析: 消息拉取客户端处理服务端响应
1、消息拉取客户端处理消息 示例图:
2、 代码:MQClientAPIImpl#processPullResponse
private PullResult processPullResponse(final RemotingCommand response) throws MQBrokerException, RemotingCommandException {PullStatus pullStatus = PullStatus.NO_NEW_MSG;//判断响应结果switch (response.getCode()) {case ResponseCode.SUCCESS:pullStatus = PullStatus.FOUND;break;case ResponseCode.PULL_NOT_FOUND:pullStatus = PullStatus.NO_NEW_MSG;break;case ResponseCode.PULL_RETRY_IMMEDIATELY:pullStatus = PullStatus.NO_MATCHED_MSG;break;case ResponseCode.PULL_OFFSET_MOVED:pullStatus = PullStatus.OFFSET_ILLEGAL;break;default:throw new MQBrokerException(response.getCode(), response.getRemark());}//解码响应头PullMessageResponseHeader responseHeader =(PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class);//封装PullResultExt返回return new PullResultExt(pullStatus, responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(),responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody());
}
3、 PullResult 类
private final PullStatus pullStatus; //拉取结果
private final long nextBeginOffset; //下次拉取偏移量
private final long minOffset; //消息队列最小偏移量
private final long maxOffset; //消息队列最大偏移量
private List<MessageExt> msgFoundList; //拉取的消息列表
4、 代码:DefaultMQPushConsumerImpl$PullCallback#OnSuccess
//将拉取到的消息存入processQueue
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
//将processQueue提交到consumeMessageService中供消费者消费
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(),processQueue,pullRequest.getMessageQueue(),dispatchToConsume);
//如果pullInterval大于0,则等待pullInterval毫秒后将pullRequest对象放入到PullMessageService中的pullRequestQueue队列中
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
八、消息中间件 RocketMQ 源码分析: 拉取消息的流程小结
上一节关联链接请点击:
# 消息中间件 RocketMQ 高级功能和源码分析(九)