RocketMQ消息队列-@RocketMQMessageListener实现原理

使用Spring-RocketMQ时,只需要引入rocketmq-spring-boot-starter包,并且定义以下消费者,就可以很简单的实现消息消费

@Component
@RocketMQMessageListener(topic = "first-topic", consumerGroup = "my-producer-group", selectorExpression = "tag1")
public class RocketMQConsumer implements RocketMQListener<String>{@Overridepublic void onMessage(String message) {System.out.println(message);}

可以看到只需要添加@RocketMQMessageListener注解,并实现RocketMQListener接口就可以完成消息的接受、处理逻辑


@RocketMQMessageListener实现原理

在这里插入图片描述

可以看到在ListenerContainerConfiguration中获取了所有加了RocketMQMessageListener注解的bean

 ListenerContainerConfiguration#afterSingletonsInstantiated//ListenerContainerConfiguration实现了SmartInitializingSingleton接口,会在bean都实例化完之后,触发afterSingletonsInstantiated方法@Overridepublic void afterSingletonsInstantiated() {//获取所有加了RocketMQMessageListener注解的beanMap<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class).entrySet().stream().filter(entry -> !ScopedProxyUtils.isScopedTarget(entry.getKey())).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));//循环调用registerContainer方法beans.forEach(this::registerContainer);}
    private void registerContainer(String beanName, Object bean) {......//拿到RocketMQMessageListener注解RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);//获取注解上定义的consumerGroupString consumerGroup = this.environment.resolvePlaceholders(annotation.consumerGroup());//获取注解上定义的topicString topic = this.environment.resolvePlaceholders(annotation.topic());//定义beanNameString containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(),counter.incrementAndGet());GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;//注册bean  调用createRocketMQListenerContainer初始化一些属性genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class,() -> createRocketMQListenerContainer(containerBeanName, bean, annotation));DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName,DefaultRocketMQListenerContainer.class);if (!container.isRunning()) {try {//调用start方法container.start();} catch (Exception e) {log.error("Started container failed. {}", container, e);throw new RuntimeException(e);}}log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);}

createRocketMQListenerContainer里面就是初始化了DefaultRocketMQListenerContainer这个对象,并且设置了一些消费相关的属性,比如nameServertopictagsconsumerGroup消费者组,rocketMQListener我们定义的消费监听者等

可以看到这里面并没有定义具体的消费者实例

//DefaultRocketMQListenerContainer定义  实现了InitializingBean接口,在bean初始化的时候会调用afterPropertiesSet方法
public class DefaultRocketMQListenerContainer implements InitializingBean,RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware {@Overridepublic void afterPropertiesSet() throws Exception {//通过方法名可以看到是初始化MQ消费者实例initRocketMQPushConsumer();this.messageType = getMessageType();this.methodParameter = getMethodParameter();log.debug("RocketMQ messageType: {}", messageType);}
    private void initRocketMQPushConsumer() throws MQClientException {......if (Objects.nonNull(rpcHook)) {//初始化DefaultMQPushConsumer对象consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook, new AllocateMessageQueueAveragely(),enableMsgTrace, this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));consumer.setVipChannelEnabled(false);consumer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook, consumerGroup));} else {log.debug("Access-key or secret-key not configure in " + this + ".");consumer = new DefaultMQPushConsumer(consumerGroup, enableMsgTrace,this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));}//消息模式 广播还是集群switch (messageModel) {case BROADCASTING:consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);break;case CLUSTERING:consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);break;default:throw new IllegalArgumentException("Property 'messageModel' was wrong.");}//筛选方式  TAG和SQL92switch (selectorType) {case TAG:consumer.subscribe(topic, selectorExpression);break;case SQL92:consumer.subscribe(topic, MessageSelector.bySql(selectorExpression));break;default:throw new IllegalArgumentException("Property 'selectorType' was wrong.");}//消费模式 顺序和并发switch (consumeMode) {case ORDERLY:consumer.setMessageListener(new DefaultMessageListenerOrderly());break;case CONCURRENTLY:consumer.setMessageListener(new DefaultMessageListenerConcurrently());break;default:throw new IllegalArgumentException("Property 'consumeMode' was wrong.");}}

回到上面registerContainer,最后拿到DefaultRocketMQListenerContainer的bean,调用start方法

    DefaultRocketMQListenerContainer#start@Overridepublic void start() {if (this.isRunning()) {throw new IllegalStateException("container already running. " + this.toString());}try {//当前consumer就是上面分析的DefaultMQPushConsumerconsumer.start();} catch (MQClientException e) {throw new IllegalStateException("Failed to start RocketMQ push consumer", e);}this.setRunning(true);log.info("running container: {}", this.toString());}
    DefaultMQPushConsumer#start@Overridepublic void start() throws MQClientException {setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));//可以看到调用的是defaultMQPushConsumerImpl.start()方法this.defaultMQPushConsumerImpl.start();if (null != traceDispatcher) {try {traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());} catch (MQClientException e) {log.warn("trace dispatcher start failed ", e);}}}

defaultMQPushConsumerImpl是什么时候初始化的呢

上面说到初始化DefaultMQPushConsumer对象时,点进去构造方法

    public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic) {this.consumerGroup = consumerGroup;this.namespace = namespace;this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;//可以看到就在构造方法里面初始化的,通过名字可以猜想就是DefaultMQPushConsumer的实现类,但是并不是通过实现接口的方式,而是组合的方式defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);if (enableMsgTrace) {try {AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook);dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl());traceDispatcher = dispatcher;this.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(new ConsumeMessageTraceHookImpl(traceDispatcher));} catch (Throwable e) {log.error("system mqtrace hook init failed ,maybe can't send msg trace data");}}}

接口看DefaultMQPushConsumerImplstart方法

    DefaultMQPushConsumerImpl#startpublic synchronized void start() throws MQClientException {switch (this.serviceState) {case CREATE_JUST://顺序消息 ConsumeMessageOrderlyService处理if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {this.consumeOrderly = true;this.consumeMessageService =new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());//并发消息 ConsumeMessageConcurrentlyService处理} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {this.consumeOrderly = false;this.consumeMessageService =new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());}this.consumeMessageService.start();//调用start方法mQClientFactory.start();}
     MQClientInstance#startpublic void start() throws MQClientException {synchronized (this) {switch (this.serviceState) {case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;// If not specified,looking address from name serverif (null == this.clientConfig.getNamesrvAddr()) {this.mQClientAPIImpl.fetchNameServerAddr();}// Start request-response channelthis.mQClientAPIImpl.start();// Start various schedule tasksthis.startScheduledTask();// Start pull service 可以看到这里开启拉取消息this.pullMessageService.start();// Start rebalance servicethis.rebalanceService.start();// Start push servicethis.defaultMQProducer.getDefaultMQProducerImpl().start(false);log.info("the client factory [{}] start OK", this.clientId);this.serviceState = ServiceState.RUNNING;break;case START_FAILED:throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);default:break;}}}
    ServiceThread#startpublic void start() {log.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread);if (!started.compareAndSet(false, true)) {return;}stopped = false;//new了一个Thread对象,this表示自己就是一个Runnable对象this.thread = new Thread(this, getServiceName());this.thread.setDaemon(isDaemon);//调用start方法this.thread.start();}
//ServiceThread实现了Runnable接口,并且是抽象的,找实现类
public abstract class ServiceThread implements Runnable {
}

在这里插入图片描述

可以看到PullMessageService和我们找的有关,找到它的run方法

    PullMessageService#run@Overridepublic void run() {log.info(this.getServiceName() + " service started");//通过while循环拉取消息while (!this.isStopped()) {try {//消息存入LinkedBlockingQueue中,通过take方法阻塞获取PullRequest pullRequest = this.pullRequestQueue.take();//调用pullMessage处理消息this.pullMessage(pullRequest);} catch (InterruptedException ignored) {} catch (Exception e) {log.error("Pull Message Service Run Method exception", e);}}log.info(this.getServiceName() + " service end");}
    private void pullMessage(final PullRequest pullRequest) {//选择一个消费者实例final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());if (consumer != null) {//转换为DefaultMQPushConsumerImpl对象,应该很熟悉吧DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;//调用pullMessage方法继续处理impl.pullMessage(pullRequest);} else {log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);}}
    public void pullMessage(final PullRequest pullRequest) {......PullCallback pullCallback = new PullCallback() {@Overridepublic void onSuccess(PullResult pullResult) {//调用consumeMessageService的submitConsumeRequest方法//consumeMessageService上面提到过,包含顺序和并发消费DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(),processQueue,pullRequest.getMessageQueue(),dispatchToConsume);......}@Overridepublic void onException(Throwable e) {if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {log.warn("execute the pull request exception", e);}DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);}};}
   ConsumeMessageOrderlyService#submitConsumeRequest@Overridepublic void submitConsumeRequest(final List<MessageExt> msgs,final ProcessQueue processQueue,final MessageQueue messageQueue,final boolean dispathToConsume) {if (dispathToConsume) {//ConsumeRequest是一个RunnableConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);//提交到线程池中处理this.consumeExecutor.submit(consumeRequest);}}

来看ConsumeRequestrun方法

       ConsumeMessageOrderlyService.ConsumeRequest#run@Overridepublic void run() {......//核心在这里消费消息       status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);}
DefaultRocketMQListenerContainer.DefaultMessageListenerOrderly#consumeMessage        @Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {for (MessageExt messageExt : msgs) {log.debug("received msg: {}", messageExt);try {long now = System.currentTimeMillis();//处理消息handleMessage(messageExt);long costTime = System.currentTimeMillis() - now;log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime);} catch (Exception e) {log.warn("consume message failed. messageExt:{}", messageExt, e);context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}}return ConsumeOrderlyStatus.SUCCESS;}
    DefaultRocketMQListenerContainer#handleMessageprivate void handleMessage(MessageExt messageExt) throws MQClientException, RemotingException, InterruptedException {if (rocketMQListener != null) {//可以看到最终调用到onMessage方法,也就是开头我们实现的接口中的onMessage方法rocketMQListener.onMessage(doConvertMessage(messageExt));} else if (rocketMQReplyListener != null) {Object replyContent = rocketMQReplyListener.onMessage(doConvertMessage(messageExt));Message<?> message = MessageBuilder.withPayload(replyContent).build();org.apache.rocketmq.common.message.Message replyMessage = MessageUtil.createReplyMessage(messageExt, convertToBytes(message));consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer().send(replyMessage, new SendCallback() {@Override public void onSuccess(SendResult sendResult) {if (sendResult.getSendStatus() != SendStatus.SEND_OK) {log.error("Consumer replies message failed. SendStatus: {}", sendResult.getSendStatus());} else {log.info("Consumer replies message success.");}}@Override public void onException(Throwable e) {log.error("Consumer replies message failed. error: {}", e.getLocalizedMessage());}});}}

至此整个流程也就通了


总结

@RocketMQMessageListener相当于定义一个消费者,topic、consumerGroup、selectorExpression、consumeMode、messageModel定义了消费者的一些属性

实现RocketMQListener接口来处理具体消费逻辑

每个消费者初始化了一个DefaultRocketMQListenerContainer对象,该对象中包含消费实例和消费者的属性

服务启动的时候开启一个线程轮训队列中的消息,如果没有就一直阻塞,拿到消息后,最终会调用自己实现的onMessage方法

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/65801.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SEAN代码(1)

代码地址 首先定义一个trainer。 trainer Pix2PixTrainer(opt)在Pix2PixTrainer内部&#xff0c;首先定义Pix2PixModel模型。 self.pix2pix_model Pix2PixModel(opt)在Pix2PixModel内部定义生成器&#xff0c;判别器。 self.netG, self.netD, self.netE self.initialize_…

Ansible学习笔记10

1、在group1的被管理机里的mariadb里创建一个abc库&#xff1b; 1&#xff09; 然后我们到agent主机上进行检查&#xff1a; 可以看到数据库已经创建成功。 再看几个其他命令&#xff1a; #a组主机重启mysql&#xff0c;并设置开机自启 ansible a -m service -a "namemy…

HDMI 输出实验

FPGA教程学习 第十四章 HDMI 输出实验 文章目录 FPGA教程学习前言实验原理实验过程程序设计时钟模块&#xff08;video_pll&#xff09;彩条产生模块&#xff08;color_bar)配置数据查找表模块&#xff08;lut_adv7511&#xff09;I2C Master 寄存器配置模块&#xff08;i2c_c…

elasticSearch+kibana+logstash+filebeat集群改成https认证

文章目录 一、生成相关证书二、配置elasticSearh三、配置kibana四、配置logstash五、配置filebeat六、连接https es的java api 一、生成相关证书 ps&#xff1a;主节点操作 切换用户&#xff1a;su es 进入目录&#xff1a;cd /home/es/elasticsearch-7.6.2 创建文件&#x…

Adobe Illustrator 2023 for mac安装教程,可用。

Adobe Illustrator 是行业标准的矢量图形应用程序&#xff0c;可以为印刷、网络、视频和移动设备创建logos、图标、绘图、排版和插图。数以百万计的设计师和艺术家使用Illustrator CC创作&#xff0c;从网页图标和产品包装到书籍插图和广告牌。此版本是2023版本&#xff0c;适配…

LeetCode(力扣)236. 二叉树的最近公共祖先Python

LeetCode236. 二叉树的最近公共祖先 题目链接代码 题目链接 https://leetcode.cn/problems/lowest-common-ancestor-of-a-binary-tree/ 代码 # Definition for a binary tree node. # class TreeNode: # def __init__(self, x): # self.val x # self.…

C语言深入理解指针(非常详细)(二)

目录 指针运算指针-整数指针-指针指针的关系运算 野指针野指针成因指针未初始化指针越界访问指针指向的空间释放 如何规避野指针指针初始化注意指针越界指针不使用时就用NULL避免返回局部变量的地址 assert断言指针的使用和传址调用传址调用例子&#xff08;strlen函数的实现&a…

The Cherno——OpenGL

The Cherno——OpenGL 1. 欢迎来到OpenGL OpenGL是一种跨平台的图形接口&#xff08;API&#xff09;&#xff0c;就是一大堆我们能够调用的函数去做一些与图像相关的事情。特殊的是&#xff0c;OpenGL允许我们访问GPU&#xff08;Graphics Processing Unit 图像处理单元&…

Python小知识 - 如何使用Python的Flask框架快速开发Web应用

如何使用Python的Flask框架快速开发Web应用 现在越来越多的人把Python作为自己的第一语言来学习&#xff0c;Python的简洁易学的语法以及丰富的第三方库让人们越来越喜欢上了这门语言。本文将介绍如何使用Python的Flask框架快速开发Web应用。 Flask是一个使用Python编写的轻量级…

Spring Boot中通过maven进行多环境配置

上文 java Spring Boot将不同配置拆分入不同文件管理 中 我们说到了&#xff0c;多环境的多文件区分管理 说到多环境 其实不止我们 Spring Boot有 很多的东西都有 那么 这就有一个问题 如果 spring 和 maven 都配置了环境 而且他们配的不一样 那么 会用谁的呢&#xff1f; 此…

《TCP/IP网络编程》阅读笔记--基于Windows实现Hello Word服务器端和客户端

目录 1--Hello Word服务器端 2--客户端 3--编译运行 3-1--编译服务器端 3-2--编译客户端 3-3--运行 1--Hello Word服务器端 // gcc hello_server_win.c -o hello_server_win -lwsock32 // hello_server_win 9190 #include <stdio.h> #include <stdlib.h> #i…

【算法刷题-双指针篇】

目录 1.leetcode-27. 移除元素2.leetcode-344. 反转字符串3.leetcode-剑指 Offer 05. 替换空格4.leetcode-206. 反转链表5.leetcode-19. 删除链表的倒数第 N 个结点6.leetcode-面试题 02.07. 链表相交7.leetcode-142. 环形链表 II8.leetcode-15. 三数之和9.leetcode-18. 四数之…

Git使用——GitHub项目回退版本

查看历史版本 使用git log命令查看项目的历史版本&#xff1a; 可以一直回车&#xff0c;直到找到想要的历史版本&#xff0c;复制commit后面的那一串id。 恢复历史版本 执行命令 git reset --hard 版本号&#xff1a; git reset --hard 39ac3ea2448e81ea992b7c4fdad9252983…

ARM 汇编基础知识

1.为什么学习汇编&#xff1f; 我们在进行嵌入式 Linux 开发的时候是绝对要掌握基本的 ARM 汇编&#xff0c;因为 Cortex-A 芯片一 上电 SP 指针还没初始化&#xff0c; C 环境还没准备好&#xff0c;所以肯定不能运行 C 代码&#xff0c;必须先用汇编语言设置好 C 环境…

七、Linux中一些符号的含义和宿主目录的介绍

1、Linux中一些符号的含义 在Linux命令行中&#xff0c;会看到如下一些符号&#xff0c;含义如下。 符号含义. 代表当前目录..代表上一层目录&#xff0c;当前目录的父目录-代表前一个目录&#xff0c;刚才从哪个目录cd过来~代表当前用户的宿主目录/代表根目录$普通用户的命…

两个线程同步执行:解决乱箭穿心(STL/Windows/Linux)

C自学精简教程 目录(必读) C并发编程入门 目录 多线程同步 线程之间同步是指线程等待其他线程执行完某个动作之后再执行&#xff08;本文情况&#xff09;。 线程同步还可以是像十字路口的红绿灯一样&#xff0c;只允许一个方向的车同行&#xff0c;其他方向的车等待。 本…

Mac 如何判断下载Mac with Intel Chip 还是 Mac with Apple Chip

如下图&#xff0c;当我们在 Mac系统 下载客户端时&#xff0c;有两种选择&#xff1a;Mac with Intel Chip 、 Mac with Apple Chip 如何判断要下载哪一种&#xff1f; 需要判断本机Mac是在Inter芯片还是Apple芯片上运行的。方法如下&#xff1a; 点击屏幕左上角Apple标志&a…

DHorse v1.3.2 发布,基于 k8s 的发布平台

版本说明 新增特性 构建版本、部署应用时的线程池可配置化&#xff1b; 优化特性 构建版本跳过单元测试&#xff1b; 解决问题 解决Vue应用详情页面报错的问题&#xff1b;解决Linux环境下脚本运行失败的问题&#xff1b;解决下载Maven安装文件失败的问题&#xff1b; 升…

Docker技术--Docker中的网络问题

1.docker中的网络通信 如果想要弄清楚docker中的网络通信问题,其实需要弄清楚这几个问题就可以:容器与容器之间的通信、容器与外部网络之间的通信、外部网络与容器之间的通信。 -a:容器与容器之间的通信,如下所示: 在默认情况下,docker使用网桥(Bridge模式)与NAT通信。这…

已解决module ‘pip‘ has no attribute ‘pep425tags‘报错问题(如何正确查看pip版本、支持、32位、64位方法汇总)

本文摘要&#xff1a;本文已解决module ‘pip‘ has no attribute ‘pep425tags‘的相关报错问题&#xff0c;并总结提出了几种可用解决方案。同时结合人工智能GPT排除可能得隐患及错误。并且最后说明了如何正确查看pip版本、支持、32位、64位方法汇总 &#x1f60e; 作者介绍&…