RocketMQ快速入门:集成spring, springboot实现各类消息消费(七)附带源码

0. 引言

rocketmq支持两种消费模式:pull和push,在实际开发中这两种模式分别是如何实现的呢,在spring框架和springboot框架中集成有什么差异?今天我们一起来探究这两个问题。

1. java client实现消息消费

1、添加依赖

		<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.8.0</version></dependency>

1.1 Push消息消费

rocketmq的push消费是通过pull模式为基础来进行模拟的,就是通过监听器,不断的pull来实现,因此其实现重点就是实现监听器

rocektmq的监听器支持2种:

  • MessageListenerConcurrently 拉取到新消息之后就提交到线程池去消费
  • MessageListenerOrderly 通过加分布式锁和本地锁保证同时只有一条线程去消费一个队列上的数据,以此保证顺序消费

这里虽然还有MessageListener类型,实际上是上述两种的父类,该方法也被弃用了
在这里插入图片描述
所以push模式的的重点就是实现MessageListenerConcurrently监听器,其内部只有一个consumeMessage方法
在这里插入图片描述
那么实现的重点就是consumeMessage方法,这里我们睡眠了10s,用于模拟该监听器运行10s

public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_test");consumer.setNamesrvAddr("127.0.0.1:9876");// 集群消费模式consumer.setMessageModel(MessageModel.CLUSTERING);// 设置topicconsumer.subscribe("topic_test", "*");// 注册回调函数,处理消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {for (MessageExt msg : list) {String topic = msg.getTopic();try {String messageBody = new String(msg.getBody(), "utf-8");System.out.println(topic+":"+messageBody);} catch (UnsupportedEncodingException e) {e.printStackTrace();}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者实例consumer.start();Thread.sleep(10000);}

当然,如上的形式只能用于我们单元测试使用,集成在生产中时肯定不能这样用,我们需要将其注册为bean形式,并在项目启动时进行调用,让其注册为监听器

@Component
public class Consumer1PushListener implements MessageListenerConcurrently {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt msg : list) {String topic = msg.getTopic();try {String messageBody = new String(msg.getBody(), "utf-8");System.out.println(topic+":"+messageBody);} catch (UnsupportedEncodingException e) {e.printStackTrace();}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}@PostConstructpublic void init(){DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_test");consumer.setNamesrvAddr("127.0.0.1:9876");// 集群消费模式consumer.setMessageModel(MessageModel.CLUSTERING);// 注册监听器consumer.registerMessageListener(this);try{// 设置topicconsumer.subscribe("topic_test", "*");// 启动示例consumer.start();}catch (Exception e){e.printStackTrace();System.out.println("rocketmq 消费者启动失败");}}
}

我们启动项目,发送一条消息,会发现消费者可以实时消费

在这里插入图片描述
消息模式如何调整?
rocektmq 有集群模式和广播模式两种消息模式,如果需要调整的话,通过消费者的setMessageModel方法即可调整

// 集群模式
consumer.setMessageModel(MessageModel.CLUSTERING);
// 广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);

1.2 Pull消息消费

pull模式的实现更加简单,直接查看pull消费者类DefaultMQPullConsumer,其下有pull方法
在这里插入图片描述
官方给出的示例代码如下:

public static void main(String[] args) throws MQClientException {DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");consumer.setNamesrvAddr("127.0.0.1:9876");consumer.start();try {MessageQueue mq = new MessageQueue();mq.setQueueId(0);mq.setTopic("topic_test");mq.setBrokerName("Broker");long offset = 26;PullResult pullResult = consumer.pull(mq, "*", offset, 32);if (pullResult.getPullStatus().equals(PullStatus.FOUND)) {System.out.printf("%s%n", pullResult.getMsgFoundList());consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());}} catch (Exception e) {e.printStackTrace();}consumer.shutdown();}

但是截止目前,该类已经被弃用了
在这里插入图片描述
更加推荐的是用DefaultLitePullConsumer类实现,其下的poll方法可以帮助我们更加方便的实现消息消费,这里需要注意,两个类,一个是pull,一个是poll,pull实际上是需要指定偏移量的,而poll则自动帮我们更新了偏移量

public static void main(String[] args) throws MQClientException {DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("group2");consumer.setNamesrvAddr("127.0.0.1:9876");consumer.subscribe("topic_test", "*");consumer.start();try {List<MessageExt> messageList = consumer.poll(3000);for (MessageExt message : messageList) {System.out.println("pull消费:"+new String(message.getBody()));}} catch (Exception e) {e.printStackTrace();}consumer.shutdown();}

发送几条消息,运行测试
在这里插入图片描述
生产中使用时,大家可以把DefaultLitePullConsumer定义为bean, 以此减少每次资源创建的消耗,具体方式可参考上述push模式的实现代码

1.3 顺序消息消费

rocketmq中提供了两种消费处理形式:并发消费(MessageListenerConcurrently)和顺序消费(MessageListenerOrderly

并发消费消费者会创建多个线程同时消费队列消息,而顺序消费流程跟并发消费最大的区别在于,顺序消费对要处理的队列加锁,确保同一队列,同一时间,只允许一个消费线程处理

我们在之前消息发送的章节已经提前体验过顺序消费代码实现了,通过上述对监听器类型的描述,我们也能知道顺序消费的实现,就是实现MessageListenerOrderly监听器

public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_test");consumer.setNamesrvAddr("127.0.0.1:9876");// 集群消费模式consumer.setMessageModel(MessageModel.CLUSTERING);// 设置topicconsumer.subscribe("topic_order", "*");// 注册回调函数,处理消息consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {byte[] body = list.get(0).getBody();System.out.println("接收消息:"+new String(body, StandardCharsets.UTF_8));return ConsumeOrderlyStatus.SUCCESS;}});// 启动消费者实例consumer.start();Thread.sleep(10000);}

2. springboot实现消息消费

1、添加依赖

		<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.0</version></dependency>

2、修改配置项

rocketmq:name-server: localhost:9876producer:group: group_test # 生产者分组,事务消息会使用send-message-timeout: 3000 # 消息发送超时时长,默认3sretry-times-when-send-failed: 3 # 同步发送消息失败重试次数,默认2retry-times-when-send-async-failed: 3 # 异步发送消息失败重试次数,默认2

2.1 push消息消费

通过实现RocketMQListener<T>接口,其中T是泛型,及消息内容的数据类型,可以是String, JSONObject,也可以是自定义数据结构类型

将监听器声明为bean,并实现onMessage方法即可

@Component
@RocketMQMessageListener(topic = "topic_test", consumerGroup = "group_test")
public class MessageListener implements RocketMQListener<String> {@Overridepublic void onMessage(String s) {System.out.println("消费消息:" + s);}
}

注解中的messageModel属性可以用来设置消息模式,默认为集群模式

在这里插入图片描述

2.2 pull消息消费

添加消费者配置

rocketmq:name-server: localhost:9876consumer:group: "group_test"topic: "topic_test"

通过receive方法实现消费

 @GetMapping(value = "/poll")public void poll() {List<String> list = rocketMQTemplate.receive(String.class);for (String message : list) {System.out.println("poll消费:"+message);}}

2.3 顺序消息消费

与普通消息不同的是,要声明消费模式为顺序消费consumeMode= ConsumeMode.ORDERLY

@Component
@RocketMQMessageListener(topic = "topic_order", consumerGroup = "group_order", consumeMode= ConsumeMode.ORDERLY)
public class MessageOrderListener implements RocketMQListener<String> {@Overridepublic void onMessage(String s) {System.out.println("顺序消费消息:" + s);}
}

3. 总结

消息消费相对更加简单,实际上掌握一种之后,其他类型的也就能够举一反三了,本文也只是针对最常用的类型进行列举,还有更多参数的支持,需要大家在实际应用中探索。

本文演示源码见:https://gitee.com/wuhanxue/wu_study/tree/master/demo/rocketmq_demo

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/31270.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

正定矩阵(Positive Definite Matrix)

正定矩阵&#xff08;Positive Definite Matrix&#xff09; flyfish Positive&#xff08;正数&#xff09; &#xff1a;在数学和统计学中&#xff0c;通常指大于零的数。在矩阵理论中&#xff0c;一个矩阵被称为正定&#xff0c;是因为它的性质类似于正数的性质。 Defini…

裁员裁到大动脉,是一种什么体验!

大家好啊&#xff0c;我是董董灿。 降本增效是每个当老板的人都喜欢挂在嘴边的口头禅&#xff0c;尤其是行业不景气&#xff0c;公司发展遇到瓶颈的时候。 大部分公司降本增效的手段其实非常相似&#xff0c;比较容易实施的手段也就那几种。 要么搞设备自动化和流程自动化&a…

Anthropic 发布新AI模型Claude 3.5 Sonnet

&#x1f989; AI新闻 &#x1f680; Anthropic 发布新AI模型Claude 3.5 Sonnet 摘要&#xff1a;Anthropic 发布了其最强 AI 模型 Claude 3.5 Sonnet。速度更快、处理细微差别和幽默的能力提升&#xff0c;且支持编写、编辑和执行代码。该模型通过公司网站、iPhone 应用及 A…

数据库系统概念(第八周 第一堂)(规范化关系数据库设计)(强推学习!!!)

目录 前言 E-R模型质量低的深层原因 数据依赖 函数依赖 主属性/非主属性 逻辑蕴含与闭包 Armstrongs Axiom 求解F闭包算法 求解属性集闭包算法 属性集闭包的作用 候选码求解理论和算法 候选码求解理论 无关属性 检验方法 正则覆盖 关系模式的设计 关系…

【深度学习】GPT-2,Language Models are Unsupervised Multitask Learners,【语言建模】

论文&#xff1a;https://d4mucfpksywv.cloudfront.net/better-language-models/language_models_are_unsupervised_multitask_learners.pdf 文章目录 摘要引言方法2.1 训练数据集2.2 输入表示2.3 模型3. 实验3.1 语言建模3.2 Children’s Book Test3.3 LAMBADA3.4 Winograd Sc…

自动驾驶学习-车载摄像头ISP(2)

背景 智能驾驶ISP&#xff08;Image Signal Processor&#xff0c;图像信号处理器&#xff09;在自动驾驶和辅助驾驶系统中扮演着至关重要的角色。 典型的ISP通常会对摄像头输出的RAW数据先做黑电平矫正&#xff08;BLC&#xff09;、坏点矫正&#xff08;DPC&#xff09;、数…

如何DIY出专属个性化的CSDN主页?一招教你搞定!

个人主页&#xff1a;学习前端的小z 个人专栏&#xff1a;HTML5和CSS3悦读 本专栏旨在分享记录每日学习的前端知识和学习笔记的归纳总结&#xff0c;欢迎大家在评论区交流讨论&#xff01; 文章目录 &#x1f4af;如何通过HTMLCSS自定义模板diy出自己的个性化csdn主页&#x…

SD3发布,送你3个ComfyUI工作流

大家好&#xff0c;我是每天分享AI应用的萤火君&#xff01; 这几天AI绘画界最轰动的消息莫过于Stable Diffusion 3&#xff08;简称SD3&#xff09;的发布。SD3是一个多模态的 Diffusion Transformer 模型&#xff0c;其在图像质量、排版、复杂提示理解和资源效率方面具有显著…

ADC常用的十大滤波算法(C语言)

一、限幅滤波法 1、方法&#xff1a; 根据经验判断两次采样允许的最大偏差值&#xff08;设为A&#xff09; 每次检测到新值时判断&#xff1a; a. 如果本次值与上次值之差<A&#xff0c;则本次值有效 b. 如果本次值与上次值之差>A&#xff0c;则本次值无效&#xf…

QT MQTT (二)编译与集成

一、QT MQTT 提供 MQTT 客户端服务的 Qt 专用库基于标准化发布 / 订阅协议&#xff0c;用于在设备和组件之间可靠地共享数据。MQTT 是为保证状态正确性、满足高安全标准和交换最小数据而设计的协议&#xff0c;因此被广泛应用于各种分布式系统和物联网解决方案中。 Qt开发MQT…

【Oracle篇】Oracle数据库坏块处理:rman修复坏块实践与案例分析(第七篇,总共八篇)

&#x1f4ab;《博主介绍》&#xff1a;✨又是一天没白过&#xff0c;我是奈斯&#xff0c;DBA一名✨ &#x1f4ab;《擅长领域》&#xff1a;✌️擅长Oracle、MySQL、SQLserver、阿里云AnalyticDB for MySQL(分布式数据仓库)、Linux&#xff0c;也在扩展大数据方向的知识面✌️…

git配置ssh key

一、生成ssh公钥和私钥对 打开终端&#xff0c;输入命令&#xff0c;-C 后是git邮箱&#xff0c;在 Enter file in which to save the key (/home/my/.ssh/id_rsa): 后可以输入公钥和私钥对保存路径及文件名&#xff0c;默认是 /home/my/.ssh/id_rsa&#xff0c;其它的全部按回…

从0开始C++(五):友元函数运算符重载

友元函数 介绍 C中的友元函数是一种特殊的函数&#xff0c;它可以访问和操作类的私有成员和保护成员。友元函数可以在类的内部或外部声明和定义&#xff0c;但在其声明和定义中需要使用关键字 friend 来标识。友元函数可以是全局函数&#xff0c;也可以是其他类的成员函数。 …

Web APIs--Dom获取属性操作

目录 1.DOM&#xff08;操作网页内容、用户交互&#xff09; 2.DOM对象获取&#xff08;querySelect(‘’)、querySelectAll(‘’)&#xff09; 总结&#xff1a; 3.操作元素内容&#xff08;修改元素的文本更换内容&#xff09; 1. 元素innerText 属性 2.元素.innerHTML…

第一百一十六节 Java 面向对象设计 - Java 终止块

Java 面向对象设计 - Java 终止块 ​try ​块也可以有零个或一个​ finally​ 块。 ​finally ​块总是与 ​try ​块一起使用。 语法 使用 ​finally​ 块的语法是 finally {// Code for finally block }​finally​ 块以关键字 ​finally​ 开始&#xff0c;后面紧跟一对…

深入分析 Android BroadcastReceiver (四)

文章目录 深入分析 Android BroadcastReceiver (四)1. 广播接收器的深入优化与应用1.1 实时性要求高的应用1.1.1 示例&#xff1a;音乐播放器中处理耳机插拔事件1.1.2 动态注册接收器 1.2 处理耗时操作1.2.1 示例&#xff1a;使用 IntentService 处理耗时操作 1.3 安全性管理1.…

【机器学习】深度学习赋能:基于 LSTM 的智能日志异常检测

目录 1. LSTM 简介 2. 日志序列异常检测概述 3. 数据预处理 3.1 日志解析 3.2 数据清洗 3.3 序列化 3.4 特征提取 示例代码 4. 构建 LSTM 模型 4.1 模型结构 4.2 模型构建示例 5. 训练 LSTM 模型 5.1 数据准备 5.2 模型训练 示例代码 6. 异常检测 6.1 异常分数…

处理文本内容的命令和正则表达式

处理文本内容的命令 正则表达式匹配的是文本内容&#xff0c;linux的文本三剑客 都是针对文本内容 文本三剑客&#xff1a; grep 过滤文本内容 sed 针对文本内容进行增删改查 awk 按行取列 文本三剑客都是按行进行匹配。 grep grep的作用就是使用正则表达式来匹配文本内…

虚拟现实环境下的远程教育和智能评估系统(十一)

视频帧画面知识点区域划分 知识点区域精确分割技术: 在深度学习检测模型结果基础上使用基于交并比&#xff08;IoU&#xff09;阈值的目标合并算法&#xff0c;合并过度重合目标区域面积&#xff0c;实现知识点区域精确分割 多模态知识点内容匹配策略: 图像&#xff1a;利用…

【第18章】Vue实战篇之登录界面

文章目录 前言一、数据绑定1. 数据绑定2. 数据清空 二、表单校验1. 代码2. 展示 三、登录1.登录按钮2.user.js3. login 四、展示总结 前言 上一章完成用户注册&#xff0c;这一章主要做用户登录。 一、数据绑定 登录和注册使用相同的数据绑定 1. 数据绑定 <!-- 登录表单 -…