RocketMQ 自动注入消费者

目录

  • 前言
  • 一、情景介绍
  • 二、问题分析
  • 三、代码实现


前言

之前接到一个需求,我们项目的技术负责人希望通过配置的形式,在项目启动的时候自动根据配置生成对应的消费者

觉得还有点意思,随即记录一下~


一、情景介绍

比如我这里有一个消费者 MessageConsumer

@Slf4j
@Service
@RocketMQMessageListener(consumerGroup = "mike-group",topic = "mike-message",selectorExpression = "TAG_MESSAGE_CONSUMER",consumeThreadMax = 6,consumeTimeout = 60L)
public class MessageConsumer implements RocketMQListener<NotifyMessage> {@Overridepublic void onMessage(NotifyMessage notifyMessage) {System.err.println("我收到啦~~");System.err.println("message = " + notifyMessage);}
}

在项目启动的时候会根据 @RocketMQMessageListener 注解上的配置生成一个消费者

假如我还需要一个 MessageConsumer 消费者,其 selectorExpression 的配置为 TAG_MESSAGE_CONSUMER_01consumeThreadMax 要设置为 8

通常情况下我们会再复制一个 MessageConsumer 命名为 MessageConsumer_01,然后在新的消费者上改对应的配置,例如:

@Slf4j
@Service
@RocketMQMessageListener(consumerGroup = "mike-group-01",topic = "mike-message",selectorExpression = "TAG_MESSAGE_CONSUMER_01",consumeThreadMax = 8,consumeTimeout = 60L)
public class MessageConsumer_01 implements RocketMQListener<NotifyMessage> {@Overridepublic void onMessage(NotifyMessage notifyMessage) {System.err.println("我收到啦~~");System.err.println("message = " + notifyMessage);}
}

这样做虽然没啥问题,只是这两个类除了配置不一样,其它的代码都是一摸一样的,倘若之后还要有一个 selectorExpression = TAG_MESSAGE_CONSUMER_02 的消费者,那我又得再复制一个 MessageConsumer,这样就造成了大量的代码冗余

所以就希望通过读取配置文件生成对应配置的消费者


二、问题分析

要如何实现这个功能,可以去看下 RocketMQ 的源码,看 Spring 是如何创建 RocketMQ 的消费者的

源码如下:

org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer#initRocketMQPushConsumer

在这里插入图片描述

在该方法中可以看到 Spring 是如何初始化消费者,参照这个方法,只需要在项目启动完成后,将初始化从注解上获取消费者配置的地方换成从配置文件上获取就可以了

通过实现 ApplicationListener<ApplicationReadyEvent> 可以监听项目是否启动完成


三、代码实现

因为消费者是需要通过配置文件的配置来自动生成,那么可以将需要自动生成的消费者(比如 MessageConsumer)其 @RocketMQMessageListener 的配置注释掉

@Slf4j
@Service
//@RocketMQMessageListener(
//        consumerGroup = "mike-group",
//        topic = "mike-message",
//        selectorExpression = "TAG_MESSAGE_CONSUMER",
//        consumeThreadMax = 6,
//        consumeTimeout = 60L)
public class MessageConsumer implements RocketMQListener<NotifyMessage> {@Overridepublic void onMessage(NotifyMessage notifyMessage) {System.err.println("我收到啦~~");System.err.println("message = " + notifyMessage);}
}

配置文件上自动注入消费者的配置最好和 @RocketMQMessageListener 的属性相同,并且可以配置多个自动注入的消费者,那么对应的映射文件可以这么写

AutoConsumerProperties.java

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Configuration;import java.util.List;@Data
@RefreshScope
@Configuration
@ConfigurationProperties(prefix = "auto-consumer")
public class AutoConsumerProperties {private List<AutoConsumer> messageConsumer;
}

AutoConsumer.java

import lombok.Data;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.SelectorType;@Data
public class AutoConsumer {private String consumerGroup;private String topic;private SelectorType selectorType = SelectorType.TAG;private String selectorExpression = "*";private ConsumeMode consumeMode = ConsumeMode.CONCURRENTLY;private MessageModel messageModel = MessageModel.CLUSTERING;private int consumeThreadMin = 64;private int consumeThreadMax = 64;private long consumeTimeout = 15L;private String accessKey;private String secretKey;private boolean enableMsgTrace;private String customizedTraceTopic;private String nameServer;private String accessChannel;
}

核心代码:

ConsumerStarted.java

import cn.hutool.core.collection.CollUtil;
import com.mike.common.core.utils.JacksonUtil;
import com.mike.server.message.config.properties.AutoConsumer;
import com.mike.server.message.config.properties.AutoConsumerProperties;
import com.mike.server.message.consumer.MessageConsumer;
import com.mike.server.message.domain.entity.NotifyMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Objects;@Slf4j
@Component
public class ConsumerStarted implements ApplicationContextAware, ApplicationListener<ApplicationReadyEvent> /* , InitializingBean, SmartLifecycle */ {@Value("${rocketmq.name-server:}")private String nameServer;@Value("${rocketmq.consumer.topic:}")private String topic;@Value("${rocketmq.consumer.access-key:}")private String accessKey;@Value("${rocketmq.consumer.secret-key:}")private String secretKey;@Resourceprivate AutoConsumerProperties autoConsumerProperties;@Resourceprivate MessageConsumer messageConsumer;private ApplicationContext applicationContext;private final static boolean enableMsgTrace = true;private final static String customizedTraceTopic = null;@Override@SuppressWarnings("all")public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {// 需要等到程序启动完全之后再去启动initConsumer();}public void initConsumer() {List<AutoConsumer> messageConsumers = autoConsumerProperties.getMessageConsumer();if (CollUtil.isEmpty(messageConsumers)) return;final RocketMQListener<NotifyMessage> messageConsumerListener = messageConsumer;this.autoGenerateConsumer(messageConsumers, messageConsumerListener, NotifyMessage.class);}@SuppressWarnings("all")private <R> void autoGenerateConsumer(List<AutoConsumer> autoConsumers, RocketMQListener<R> rocketMQListener, Class<R> objClass) {// 根据 tag 自动生成对应的消费者for (AutoConsumer autoConsumer : autoConsumers) {String consumerGroup = autoConsumer.getConsumerGroup();String nameServer = getValueOrDefault(autoConsumer.getNameServer(), this.nameServer);String topic = getValueOrDefault(autoConsumer.getTopic(), this.topic);String accessKey = getValueOrDefault(autoConsumer.getAccessKey(), this.accessKey);String secretKey = getValueOrDefault(autoConsumer.getSecretKey(), this.secretKey);try {Assert.notNull(consumerGroup, "Property 'consumerGroup' is required");Assert.notNull(nameServer, "Property 'nameServer' is required");Assert.notNull(topic, "Property 'topic' is required");DefaultMQPushConsumer consumer;RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(this.applicationContext.getEnvironment(), accessKey, secretKey);if (Objects.nonNull(rpcHook)) {consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook, new AllocateMessageQueueAveragely(), enableMsgTrace, customizedTraceTopic);consumer.setVipChannelEnabled(false);} else {consumer = new DefaultMQPushConsumer(consumerGroup, enableMsgTrace, customizedTraceTopic);}consumer.setInstanceName(RocketMQUtil.getInstanceName(this.nameServer));consumer.setNamesrvAddr(this.nameServer);consumer.setAccessChannel(AccessChannel.LOCAL);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.setConsumeThreadMin(autoConsumer.getConsumeThreadMin());consumer.setConsumeThreadMax(autoConsumer.getConsumeThreadMax());if (consumer.getConsumeThreadMax() < consumer.getConsumeThreadMin()) {consumer.setConsumeThreadMin(consumer.getConsumeThreadMax());}switch (autoConsumer.getMessageModel()) {case BROADCASTING:consumer.setMessageModel(MessageModel.BROADCASTING);break;case CLUSTERING:consumer.setMessageModel(MessageModel.CLUSTERING);break;default:throw new IllegalArgumentException("Property 'messageModel' was wrong.");}switch (autoConsumer.getSelectorType()) {case TAG:consumer.subscribe(topic, autoConsumer.getSelectorExpression());break;case SQL92:consumer.subscribe(topic, MessageSelector.bySql(autoConsumer.getSelectorExpression()));break;default:throw new IllegalArgumentException("Property 'selectorType' was wrong.");}switch (autoConsumer.getConsumeMode()) {case ORDERLY:consumer.setMessageListener(new DefaultMessageListenerOrderly(autoConsumer, rocketMQListener, objClass));break;case CONCURRENTLY:consumer.setMessageListener(new DefaultMessageListenerConcurrently(autoConsumer, rocketMQListener, objClass));break;default:throw new IllegalArgumentException("Property 'consumeMode' was wrong.");}consumer.start();log.info("Consumer Start Success: {}:{}", topic, autoConsumer.getSelectorExpression());} catch (MQClientException e) {e.printStackTrace();log.info("Consumer Start Failed: {}:{}", topic, autoConsumer.getSelectorExpression());}}}private String getValueOrDefault(String value, String defaultValue) {return StringUtils.isNotBlank(value)? value: defaultValue;}@Override@SuppressWarnings("all")public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.applicationContext = applicationContext;}public class DefaultMessageListenerOrderly<T> implements MessageListenerOrderly {private final AutoConsumer autoConsumer;private final RocketMQListener<T> rocketMQListener;private final Class<T> objClass;public DefaultMessageListenerOrderly(AutoConsumer autoConsumer, RocketMQListener<T> rocketMQListener, Class<T> objClass) {this.autoConsumer = autoConsumer;this.rocketMQListener = rocketMQListener;this.objClass = objClass;}public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgList, ConsumeOrderlyContext context) {for (MessageExt messageExt : msgList) {log.info("group[{}]-tag[{}] consume start ->>>", autoConsumer.getConsumerGroup(), autoConsumer.getSelectorExpression());log.debug("received msg: {}", messageExt);try {long now = System.currentTimeMillis();this.rocketMQListener.onMessage(doConvertMessage(messageExt, this.objClass));long costTime = System.currentTimeMillis() - now;log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);} catch (Exception var9) {log.warn("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), var9);final long suspendCurrentQueueTimeMillis = 1000L;context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}}return ConsumeOrderlyStatus.SUCCESS;}}public class DefaultMessageListenerConcurrently <T> implements MessageListenerConcurrently {private final AutoConsumer autoConsumer;private final RocketMQListener<T> rocketMQListener;private final Class<T> objClass;public DefaultMessageListenerConcurrently(AutoConsumer autoConsumer, RocketMQListener<T> rocketMQListener, Class<T> objClass) {this.autoConsumer = autoConsumer;this.rocketMQListener = rocketMQListener;this.objClass = objClass;}public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext context) {for (MessageExt messageExt : msgList) {log.info("group[{}]-tag[{}] consume start ->>>", autoConsumer.getConsumerGroup(), autoConsumer.getSelectorExpression());log.debug("received msg: {}", messageExt);try {long now = System.currentTimeMillis();this.rocketMQListener.onMessage(doConvertMessage(messageExt, objClass));long costTime = System.currentTimeMillis() - now;log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);} catch (Exception var9) {log.warn("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), var9);final int delayLevelWhenNextConsume = 0;context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}}@SuppressWarnings("unchecked")private <T> T doConvertMessage(MessageExt messageExt, Class<T> objClass) {if (Objects.equals(objClass, MessageExt.class)) {return (T)messageExt;} else {String str = new String(messageExt.getBody(), StandardCharsets.UTF_8);if (Objects.equals(objClass, String.class)) {return (T)str;} else {if (objClass != null) {return JacksonUtil.fromJson(str, objClass);} else {log.info("convert failed. str:{}, msgType:{}", str, null);throw new RuntimeException("cannot convert message to " + null);}}}}
}

配置文件 yml 新增自动注入消费者的配置

auto-consumer:message-consumer:- consumer-group: mico-grouptopic: mike-messageselector-expression: TAG_MESSAGE_CONSUMERconsume-thread-max: 6- consumer-group: mike-group-01topic: mike-messageselector-expression: TAG_MESSAGE_CONSUMER_01consume-thread-max: 8- consumer-group: mike-group-02topic: mike-messageselector-expression: TAG_MESSAGE_CONSUMER_02consume-thread-max: 10

如果是配置在 properties 文件中,配置如下:

auto-consumer.message-consumer[0].consumer-group = mico-group
auto-consumer.message-consumer[0].topic = mike-message
auto-consumer.message-consumer[0].selector-expression = TAG_MESSAGE_CONSUMER
auto-consumer.message-consumer[0].consume-thread-max = 6auto-consumer.message-consumer[1].consumer-group = mico-group-01
auto-consumer.message-consumer[1].topic = mike-message
auto-consumer.message-consumer[1].selector-expression = TAG_MESSAGE_CONSUMER_01
auto-consumer.message-consumer[1].consume-thread-max = 8auto-consumer.message-consumer[2].consumer-group = mico-group-02
auto-consumer.message-consumer[2].topic = mike-message
auto-consumer.message-consumer[2].selector-expression = TAG_MESSAGE_CONSUMER_02
auto-consumer.message-consumer[2].consume-thread-max = 10

启动项目进行验证,观察是否有三个消费者被创建

在这里插入图片描述

从日志上看确实根据配置文件自动创建了三个不同的消费者

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/59861.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数据结构(C语言版)(第2版) 课后习题答案 李冬梅

数据结构(C语言版)(第2版) 第1章 绪论 1.简述下列概念:数据、数据元素、数据项、数据对象、数据结构、逻辑结构、存储结构、抽象数据类型。 答案: 数据:是客观事物的符号表示,指所有能输入到计算机中并被计算机程序处理的符号的总称。如数学计算中用到的整数和实数…

Vue 自定义icon组件封装SVG图标

通过自定义子组件CustomIcon.vue使用SVG图标&#xff0c;相比iconfont下载文件、重新替换更节省时间。 子组件包括&#xff1a; 1. Icons.vue 存放所有SVG图标的path 2. CustomIcon.vue 通过icon的id索引对应的图标 使用的时候需要将 <Icons></Icons> 引到使用的…

吴恩达深度学习笔记:卷积神经网络(Foundations of Convolutional Neural Networks)4.9-4.10

目录 第四门课 卷积神经网络&#xff08;Convolutional Neural Networks&#xff09;第四周 特殊应用&#xff1a;人脸识别和神经风格转换&#xff08;Special applications: Face recognition &Neural style transfer&#xff09;4.9 内容代价函数&#xff08;Content cos…

界面控件DevExpress WPF中文教程:Data Grid——卡片视图设置

DevExpress WPF拥有120个控件和库&#xff0c;将帮助您交付满足甚至超出企业需求的高性能业务应用程序。通过DevExpress WPF能创建有着强大互动功能的XAML基础应用程序&#xff0c;这些应用程序专注于当代客户的需求和构建未来新一代支持触摸的解决方案。 无论是Office办公软件…

LLM训练”中的“分布式训练并行技术;分布式训练并行技术

目录 “LLM训练”中的“分布式训练并行技术” 分布式训练并行技术 数据并行 流水线并行:按阶段(stage)进行切分 张量并行 序列并行 多维混合并行 自动并行 MOE并行 重要的分布式AI框架 “LLM训练”中的“分布式训练并行技术” 随着深度学习技术的不断发展,特别是…

Ubuntu开启FTP与SSH服务

在配置开发环境时&#xff0c;这两个配置感觉是最有用的&#xff0c;开启FTP服务可以将远程linux上的文件映射到Windows上&#xff0c;不管是使用虚拟机还是嵌入式linux设备&#xff0c;特别在开发写代码的时候&#xff0c;映射到Windows上使用VS code打开编写比在linux上编写舒…

虚拟现实技术及其在教育领域的应用

&#x1f493; 博客主页&#xff1a;瑕疵的CSDN主页 &#x1f4dd; Gitee主页&#xff1a;瑕疵的gitee主页 ⏩ 文章专栏&#xff1a;《热点资讯》 虚拟现实技术及其在教育领域的应用 虚拟现实技术及其在教育领域的应用 虚拟现实技术及其在教育领域的应用 引言 虚拟现实技术概述…

搜维尔科技:Varjo XR-4在教育科研领域应用

医学教育与培训&#xff1a; • 解剖学教学&#xff1a;传统的解剖学教学依赖于教科书、图片或实体标本&#xff0c;学生对于人体结构的空间关系理解存在一定难度。而使用Varjo头显&#xff0c;学生可以沉浸在虚拟的人体解剖环境中&#xff0c;全方位、多角度地观察人体的各个…

Java 源码中的 Unicode 逃逸问题,别被注释给骗了

背景 看了一段项目源码&#xff0c;定义了一个 List 对象&#xff0c;往该列表对象 add 的代码前面有注释符号&#xff0c;但是程序运行时列表中却存在对象&#xff0c;为什么呢&#xff1f;仔细看了一下&#xff0c;注释符号和 add 代码之间有一个特殊符号 \u000d&#xff0c…

基于python的机器学习(一)—— 基础知识(Scikit-learn安装)

目录 一、机器学习基础 1.1 机器学习概述 1.2 监督学习、无监督学习和强化学习 1.3 聚类、分类、回归、标注 1.3.1 聚类 1.3.2 分类 1.3.3 回归 1.3.4 标注 1.4 机器学习、人工智能和数据挖掘 1.5 机器学习的三个要素 二、Scikit-learn 机器学习库 2.1 Scikit-lea…

React 入门课程 - 使用CDN编程React

1. 第一个React 注意&#xff1a;在vscode里&#xff0c;使用Live Server来运行html文件。 index.html <html><head><link rel"stylesheet" href"index.css"><script crossorigin src"https://unpkg.com/react17/umd/react.de…

23isctf

where_is_the_flag 1.打开环境&#xff0c;上面有一句话木马&#xff0c;直接蚁剑上 flag1&#xff1a;蚁剑连接上就可以直接看见&#xff0c;FLAG1:Yunxi{d0c0 flag2:在根目录下就有 797a-4697- flag3&#xff1a; 在主页面有一个start.sh里面有提示信息 4dfe-9b48-50ff…

传统RAG流程;密集检索器,稀疏检索器:中文的M3E

目录 传统RAG流程 相似性搜索中:神经网络的密集检索器,稀疏检索器 密集检索器 BGE系列模型 text-embedding-ada-002模型 M3E模型 稀疏检索器 示例一:基于TF-IDF的稀疏检索器 示例二:基于BM25的稀疏检索器 稀疏检索器的特点与优势 传统RAG流程 相似性搜索中:神经…

黑马程序员linux学习【持续更新】

Linux基础 一、Linux简介 1.分类 不同领域的主流操作系统&#xff0c;主要分为下 几类&#xff1a;桌面操作系统、服务器操作系统、移动设备操作系统、嵌入式操作系统。 桌面操作系统 操作系统特点Windows用户数量最多MacOS操作体验好&#xff0c;办公人士首选Linux用户数…

02多线程基础知识

目录 1. 线程与进程 进程&#xff08;Process&#xff09; 线程&#xff08;Thread&#xff09; 2. 并发和并行 并发&#xff08;Concurrency&#xff09; 并行&#xff08;Parallelism&#xff09; 3. CPU 调度 定义 类型 调度算法 上下文切换 4.线程间的状态流转…

brainpy 动力学编程基础

文章参考&#xff1a; 《神经计算建模实战——基于brainpy》 吴思 【brainpy学习笔记】基础知识2(动力学模型的编程基础)-CSDN博客 Brainpy手册 文章目录 积分器&#xff1a;定义ODE函数数值积分方法 更新函数和动力系统计算介绍什么是brainpy.DynamicalSystem&#xff1f;如…

数据结构之二叉树--前序,中序,后序详解(含源码)

二叉树 二叉树不能轻易用断言&#xff0c;因为树一定有空 二叉树链式结构的实现 在学习二叉树的基本操作前&#xff0c;需先要创建一棵二叉树&#xff0c;然后才能学习其相关的基本操作。 typedef int BTDataType; typedef struct BinaryTreeNode {BTDataType _data;struct B…

【NativeUI下的data table备注信息的快捷输入-会议签到补充】

NativeUI下的data table备注信息的快捷输入-会议签到补充 概述结构本文任务子组件在列中定制显示父组件的备注补充父组件的便捷输入按钮父组件快捷按钮给子组件的备注用最后固定在底部 概述 本文讲述Vue3的数据和函数在父组件,子组件的交互,以NativeUI的datatable为载体,实现签…

从本地到云端:Linux上快速搭建Cloudreve云盘并实现远程管理

文章目录 前言1. 安装Docker2. 使用Docker拉取镜像3. 创建并启动Cloudreve容器4. 本地访问测试5. 公网远程访问本地Cloudreve5.1 内网穿透工具安装5.2 创建远程连接公网地址5.3 使用固定公网地址远程访问 前言 大家好&#xff01;今天我们要聊聊如何在Linux系统上&#xff0c;…

如何简化App Store提现?——作为游戏开发者的跨境收款体验分享

目录 如何简化App Store提现&#xff1f;——作为游戏开发者的跨境收款体验分享跨境收款常见的几个问题使用万里汇收款后的体验1. 结算流程简单&#xff0c;到账更快2. 多场景收付更灵活3. 多种支付方式支持 使用后的效果&#xff1a;资金管理更高效个人建议 如何简化App Store…