使用Spring-RocketMQ
时,只需要引入rocketmq-spring-boot-starter
包,并且定义以下消费者,就可以很简单的实现消息消费
@Component
@RocketMQMessageListener(topic = "first-topic", consumerGroup = "my-producer-group", selectorExpression = "tag1")
public class RocketMQConsumer implements RocketMQListener<String>{@Overridepublic void onMessage(String message) {System.out.println(message);}
可以看到只需要添加@RocketMQMessageListener
注解,并实现RocketMQListener
接口就可以完成消息的接受、处理逻辑
@RocketMQMessageListener
实现原理
可以看到在ListenerContainerConfiguration
中获取了所有加了RocketMQMessageListener
注解的bean
ListenerContainerConfiguration#afterSingletonsInstantiated//ListenerContainerConfiguration实现了SmartInitializingSingleton接口,会在bean都实例化完之后,触发afterSingletonsInstantiated方法@Overridepublic void afterSingletonsInstantiated() {//获取所有加了RocketMQMessageListener注解的beanMap<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class).entrySet().stream().filter(entry -> !ScopedProxyUtils.isScopedTarget(entry.getKey())).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));//循环调用registerContainer方法beans.forEach(this::registerContainer);}
private void registerContainer(String beanName, Object bean) {......//拿到RocketMQMessageListener注解RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);//获取注解上定义的consumerGroupString consumerGroup = this.environment.resolvePlaceholders(annotation.consumerGroup());//获取注解上定义的topicString topic = this.environment.resolvePlaceholders(annotation.topic());//定义beanNameString containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(),counter.incrementAndGet());GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;//注册bean 调用createRocketMQListenerContainer初始化一些属性genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class,() -> createRocketMQListenerContainer(containerBeanName, bean, annotation));DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName,DefaultRocketMQListenerContainer.class);if (!container.isRunning()) {try {//调用start方法container.start();} catch (Exception e) {log.error("Started container failed. {}", container, e);throw new RuntimeException(e);}}log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);}
createRocketMQListenerContainer
里面就是初始化了DefaultRocketMQListenerContainer
这个对象,并且设置了一些消费相关的属性,比如nameServer
、topic
、tags
、consumerGroup
消费者组,rocketMQListener
我们定义的消费监听者等
可以看到这里面并没有定义具体的消费者实例
//DefaultRocketMQListenerContainer定义 实现了InitializingBean接口,在bean初始化的时候会调用afterPropertiesSet方法
public class DefaultRocketMQListenerContainer implements InitializingBean,RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware {@Overridepublic void afterPropertiesSet() throws Exception {//通过方法名可以看到是初始化MQ消费者实例initRocketMQPushConsumer();this.messageType = getMessageType();this.methodParameter = getMethodParameter();log.debug("RocketMQ messageType: {}", messageType);}
private void initRocketMQPushConsumer() throws MQClientException {......if (Objects.nonNull(rpcHook)) {//初始化DefaultMQPushConsumer对象consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook, new AllocateMessageQueueAveragely(),enableMsgTrace, this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));consumer.setVipChannelEnabled(false);consumer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook, consumerGroup));} else {log.debug("Access-key or secret-key not configure in " + this + ".");consumer = new DefaultMQPushConsumer(consumerGroup, enableMsgTrace,this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));}//消息模式 广播还是集群switch (messageModel) {case BROADCASTING:consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);break;case CLUSTERING:consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);break;default:throw new IllegalArgumentException("Property 'messageModel' was wrong.");}//筛选方式 TAG和SQL92switch (selectorType) {case TAG:consumer.subscribe(topic, selectorExpression);break;case SQL92:consumer.subscribe(topic, MessageSelector.bySql(selectorExpression));break;default:throw new IllegalArgumentException("Property 'selectorType' was wrong.");}//消费模式 顺序和并发switch (consumeMode) {case ORDERLY:consumer.setMessageListener(new DefaultMessageListenerOrderly());break;case CONCURRENTLY:consumer.setMessageListener(new DefaultMessageListenerConcurrently());break;default:throw new IllegalArgumentException("Property 'consumeMode' was wrong.");}}
回到上面registerContainer
,最后拿到DefaultRocketMQListenerContainer
的bean,调用start
方法
DefaultRocketMQListenerContainer#start@Overridepublic void start() {if (this.isRunning()) {throw new IllegalStateException("container already running. " + this.toString());}try {//当前consumer就是上面分析的DefaultMQPushConsumerconsumer.start();} catch (MQClientException e) {throw new IllegalStateException("Failed to start RocketMQ push consumer", e);}this.setRunning(true);log.info("running container: {}", this.toString());}
DefaultMQPushConsumer#start@Overridepublic void start() throws MQClientException {setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));//可以看到调用的是defaultMQPushConsumerImpl.start()方法this.defaultMQPushConsumerImpl.start();if (null != traceDispatcher) {try {traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());} catch (MQClientException e) {log.warn("trace dispatcher start failed ", e);}}}
defaultMQPushConsumerImpl
是什么时候初始化的呢
上面说到初始化DefaultMQPushConsumer
对象时,点进去构造方法
public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic) {this.consumerGroup = consumerGroup;this.namespace = namespace;this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;//可以看到就在构造方法里面初始化的,通过名字可以猜想就是DefaultMQPushConsumer的实现类,但是并不是通过实现接口的方式,而是组合的方式defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);if (enableMsgTrace) {try {AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook);dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl());traceDispatcher = dispatcher;this.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(new ConsumeMessageTraceHookImpl(traceDispatcher));} catch (Throwable e) {log.error("system mqtrace hook init failed ,maybe can't send msg trace data");}}}
接口看DefaultMQPushConsumerImpl
的start
方法
DefaultMQPushConsumerImpl#startpublic synchronized void start() throws MQClientException {switch (this.serviceState) {case CREATE_JUST://顺序消息 ConsumeMessageOrderlyService处理if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {this.consumeOrderly = true;this.consumeMessageService =new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());//并发消息 ConsumeMessageConcurrentlyService处理} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {this.consumeOrderly = false;this.consumeMessageService =new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());}this.consumeMessageService.start();//调用start方法mQClientFactory.start();}
MQClientInstance#startpublic void start() throws MQClientException {synchronized (this) {switch (this.serviceState) {case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;// If not specified,looking address from name serverif (null == this.clientConfig.getNamesrvAddr()) {this.mQClientAPIImpl.fetchNameServerAddr();}// Start request-response channelthis.mQClientAPIImpl.start();// Start various schedule tasksthis.startScheduledTask();// Start pull service 可以看到这里开启拉取消息this.pullMessageService.start();// Start rebalance servicethis.rebalanceService.start();// Start push servicethis.defaultMQProducer.getDefaultMQProducerImpl().start(false);log.info("the client factory [{}] start OK", this.clientId);this.serviceState = ServiceState.RUNNING;break;case START_FAILED:throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);default:break;}}}
ServiceThread#startpublic void start() {log.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread);if (!started.compareAndSet(false, true)) {return;}stopped = false;//new了一个Thread对象,this表示自己就是一个Runnable对象this.thread = new Thread(this, getServiceName());this.thread.setDaemon(isDaemon);//调用start方法this.thread.start();}
//ServiceThread实现了Runnable接口,并且是抽象的,找实现类
public abstract class ServiceThread implements Runnable {
}
可以看到PullMessageService
和我们找的有关,找到它的run
方法
PullMessageService#run@Overridepublic void run() {log.info(this.getServiceName() + " service started");//通过while循环拉取消息while (!this.isStopped()) {try {//消息存入LinkedBlockingQueue中,通过take方法阻塞获取PullRequest pullRequest = this.pullRequestQueue.take();//调用pullMessage处理消息this.pullMessage(pullRequest);} catch (InterruptedException ignored) {} catch (Exception e) {log.error("Pull Message Service Run Method exception", e);}}log.info(this.getServiceName() + " service end");}
private void pullMessage(final PullRequest pullRequest) {//选择一个消费者实例final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());if (consumer != null) {//转换为DefaultMQPushConsumerImpl对象,应该很熟悉吧DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;//调用pullMessage方法继续处理impl.pullMessage(pullRequest);} else {log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);}}
public void pullMessage(final PullRequest pullRequest) {......PullCallback pullCallback = new PullCallback() {@Overridepublic void onSuccess(PullResult pullResult) {//调用consumeMessageService的submitConsumeRequest方法//consumeMessageService上面提到过,包含顺序和并发消费DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(),processQueue,pullRequest.getMessageQueue(),dispatchToConsume);......}@Overridepublic void onException(Throwable e) {if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {log.warn("execute the pull request exception", e);}DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);}};}
ConsumeMessageOrderlyService#submitConsumeRequest@Overridepublic void submitConsumeRequest(final List<MessageExt> msgs,final ProcessQueue processQueue,final MessageQueue messageQueue,final boolean dispathToConsume) {if (dispathToConsume) {//ConsumeRequest是一个RunnableConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);//提交到线程池中处理this.consumeExecutor.submit(consumeRequest);}}
来看ConsumeRequest
的run
方法
ConsumeMessageOrderlyService.ConsumeRequest#run@Overridepublic void run() {......//核心在这里消费消息 status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);}
DefaultRocketMQListenerContainer.DefaultMessageListenerOrderly#consumeMessage @Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {for (MessageExt messageExt : msgs) {log.debug("received msg: {}", messageExt);try {long now = System.currentTimeMillis();//处理消息handleMessage(messageExt);long costTime = System.currentTimeMillis() - now;log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime);} catch (Exception e) {log.warn("consume message failed. messageExt:{}", messageExt, e);context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}}return ConsumeOrderlyStatus.SUCCESS;}
DefaultRocketMQListenerContainer#handleMessageprivate void handleMessage(MessageExt messageExt) throws MQClientException, RemotingException, InterruptedException {if (rocketMQListener != null) {//可以看到最终调用到onMessage方法,也就是开头我们实现的接口中的onMessage方法rocketMQListener.onMessage(doConvertMessage(messageExt));} else if (rocketMQReplyListener != null) {Object replyContent = rocketMQReplyListener.onMessage(doConvertMessage(messageExt));Message<?> message = MessageBuilder.withPayload(replyContent).build();org.apache.rocketmq.common.message.Message replyMessage = MessageUtil.createReplyMessage(messageExt, convertToBytes(message));consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer().send(replyMessage, new SendCallback() {@Override public void onSuccess(SendResult sendResult) {if (sendResult.getSendStatus() != SendStatus.SEND_OK) {log.error("Consumer replies message failed. SendStatus: {}", sendResult.getSendStatus());} else {log.info("Consumer replies message success.");}}@Override public void onException(Throwable e) {log.error("Consumer replies message failed. error: {}", e.getLocalizedMessage());}});}}
至此整个流程也就通了
总结
@RocketMQMessageListener
相当于定义一个消费者,topic、consumerGroup、selectorExpression、consumeMode、messageModel
定义了消费者的一些属性
实现RocketMQListener
接口来处理具体消费逻辑
每个消费者初始化了一个DefaultRocketMQListenerContainer
对象,该对象中包含消费实例和消费者的属性
服务启动的时候开启一个线程轮训队列中的消息,如果没有就一直阻塞,拿到消息后,最终会调用自己实现的onMessage
方法