RocketMQ快速入门:集成spring, springboot实现各类消息消费(七)附带源码

0. 引言

rocketmq支持两种消费模式:pull和push,在实际开发中这两种模式分别是如何实现的呢,在spring框架和springboot框架中集成有什么差异?今天我们一起来探究这两个问题。

1. java client实现消息消费

1、添加依赖

		<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.8.0</version></dependency>

1.1 Push消息消费

rocketmq的push消费是通过pull模式为基础来进行模拟的,就是通过监听器,不断的pull来实现,因此其实现重点就是实现监听器

rocektmq的监听器支持2种:

  • MessageListenerConcurrently 拉取到新消息之后就提交到线程池去消费
  • MessageListenerOrderly 通过加分布式锁和本地锁保证同时只有一条线程去消费一个队列上的数据,以此保证顺序消费

这里虽然还有MessageListener类型,实际上是上述两种的父类,该方法也被弃用了
在这里插入图片描述
所以push模式的的重点就是实现MessageListenerConcurrently监听器,其内部只有一个consumeMessage方法
在这里插入图片描述
那么实现的重点就是consumeMessage方法,这里我们睡眠了10s,用于模拟该监听器运行10s

public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_test");consumer.setNamesrvAddr("127.0.0.1:9876");// 集群消费模式consumer.setMessageModel(MessageModel.CLUSTERING);// 设置topicconsumer.subscribe("topic_test", "*");// 注册回调函数,处理消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {for (MessageExt msg : list) {String topic = msg.getTopic();try {String messageBody = new String(msg.getBody(), "utf-8");System.out.println(topic+":"+messageBody);} catch (UnsupportedEncodingException e) {e.printStackTrace();}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者实例consumer.start();Thread.sleep(10000);}

当然,如上的形式只能用于我们单元测试使用,集成在生产中时肯定不能这样用,我们需要将其注册为bean形式,并在项目启动时进行调用,让其注册为监听器

@Component
public class Consumer1PushListener implements MessageListenerConcurrently {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt msg : list) {String topic = msg.getTopic();try {String messageBody = new String(msg.getBody(), "utf-8");System.out.println(topic+":"+messageBody);} catch (UnsupportedEncodingException e) {e.printStackTrace();}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}@PostConstructpublic void init(){DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_test");consumer.setNamesrvAddr("127.0.0.1:9876");// 集群消费模式consumer.setMessageModel(MessageModel.CLUSTERING);// 注册监听器consumer.registerMessageListener(this);try{// 设置topicconsumer.subscribe("topic_test", "*");// 启动示例consumer.start();}catch (Exception e){e.printStackTrace();System.out.println("rocketmq 消费者启动失败");}}
}

我们启动项目,发送一条消息,会发现消费者可以实时消费

在这里插入图片描述
消息模式如何调整?
rocektmq 有集群模式和广播模式两种消息模式,如果需要调整的话,通过消费者的setMessageModel方法即可调整

// 集群模式
consumer.setMessageModel(MessageModel.CLUSTERING);
// 广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);

1.2 Pull消息消费

pull模式的实现更加简单,直接查看pull消费者类DefaultMQPullConsumer,其下有pull方法
在这里插入图片描述
官方给出的示例代码如下:

public static void main(String[] args) throws MQClientException {DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");consumer.setNamesrvAddr("127.0.0.1:9876");consumer.start();try {MessageQueue mq = new MessageQueue();mq.setQueueId(0);mq.setTopic("topic_test");mq.setBrokerName("Broker");long offset = 26;PullResult pullResult = consumer.pull(mq, "*", offset, 32);if (pullResult.getPullStatus().equals(PullStatus.FOUND)) {System.out.printf("%s%n", pullResult.getMsgFoundList());consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());}} catch (Exception e) {e.printStackTrace();}consumer.shutdown();}

但是截止目前,该类已经被弃用了
在这里插入图片描述
更加推荐的是用DefaultLitePullConsumer类实现,其下的poll方法可以帮助我们更加方便的实现消息消费,这里需要注意,两个类,一个是pull,一个是poll,pull实际上是需要指定偏移量的,而poll则自动帮我们更新了偏移量

public static void main(String[] args) throws MQClientException {DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("group2");consumer.setNamesrvAddr("127.0.0.1:9876");consumer.subscribe("topic_test", "*");consumer.start();try {List<MessageExt> messageList = consumer.poll(3000);for (MessageExt message : messageList) {System.out.println("pull消费:"+new String(message.getBody()));}} catch (Exception e) {e.printStackTrace();}consumer.shutdown();}

发送几条消息,运行测试
在这里插入图片描述
生产中使用时,大家可以把DefaultLitePullConsumer定义为bean, 以此减少每次资源创建的消耗,具体方式可参考上述push模式的实现代码

1.3 顺序消息消费

rocketmq中提供了两种消费处理形式:并发消费(MessageListenerConcurrently)和顺序消费(MessageListenerOrderly

并发消费消费者会创建多个线程同时消费队列消息,而顺序消费流程跟并发消费最大的区别在于,顺序消费对要处理的队列加锁,确保同一队列,同一时间,只允许一个消费线程处理

我们在之前消息发送的章节已经提前体验过顺序消费代码实现了,通过上述对监听器类型的描述,我们也能知道顺序消费的实现,就是实现MessageListenerOrderly监听器

public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_test");consumer.setNamesrvAddr("127.0.0.1:9876");// 集群消费模式consumer.setMessageModel(MessageModel.CLUSTERING);// 设置topicconsumer.subscribe("topic_order", "*");// 注册回调函数,处理消息consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {byte[] body = list.get(0).getBody();System.out.println("接收消息:"+new String(body, StandardCharsets.UTF_8));return ConsumeOrderlyStatus.SUCCESS;}});// 启动消费者实例consumer.start();Thread.sleep(10000);}

2. springboot实现消息消费

1、添加依赖

		<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.0</version></dependency>

2、修改配置项

rocketmq:name-server: localhost:9876producer:group: group_test # 生产者分组,事务消息会使用send-message-timeout: 3000 # 消息发送超时时长,默认3sretry-times-when-send-failed: 3 # 同步发送消息失败重试次数,默认2retry-times-when-send-async-failed: 3 # 异步发送消息失败重试次数,默认2

2.1 push消息消费

通过实现RocketMQListener<T>接口,其中T是泛型,及消息内容的数据类型,可以是String, JSONObject,也可以是自定义数据结构类型

将监听器声明为bean,并实现onMessage方法即可

@Component
@RocketMQMessageListener(topic = "topic_test", consumerGroup = "group_test")
public class MessageListener implements RocketMQListener<String> {@Overridepublic void onMessage(String s) {System.out.println("消费消息:" + s);}
}

注解中的messageModel属性可以用来设置消息模式,默认为集群模式

在这里插入图片描述

2.2 pull消息消费

添加消费者配置

rocketmq:name-server: localhost:9876consumer:group: "group_test"topic: "topic_test"

通过receive方法实现消费

 @GetMapping(value = "/poll")public void poll() {List<String> list = rocketMQTemplate.receive(String.class);for (String message : list) {System.out.println("poll消费:"+message);}}

2.3 顺序消息消费

与普通消息不同的是,要声明消费模式为顺序消费consumeMode= ConsumeMode.ORDERLY

@Component
@RocketMQMessageListener(topic = "topic_order", consumerGroup = "group_order", consumeMode= ConsumeMode.ORDERLY)
public class MessageOrderListener implements RocketMQListener<String> {@Overridepublic void onMessage(String s) {System.out.println("顺序消费消息:" + s);}
}

3. 总结

消息消费相对更加简单,实际上掌握一种之后,其他类型的也就能够举一反三了,本文也只是针对最常用的类型进行列举,还有更多参数的支持,需要大家在实际应用中探索。

本文演示源码见:https://gitee.com/wuhanxue/wu_study/tree/master/demo/rocketmq_demo

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/31270.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

uniapp获取视频第一帧作为封面图

uniapp获取视频第一帧作为封面图&#xff0c;前提是在oss里面存储的 视频路径 ‘?x-oss-processvideo/snapshot,t_0,f_jpg’ <image :src"item.video_url ?x-oss-processvideo/snapshot,t_0,f_jpg" mode"aspectFill"></image>

学生如何利用假期提升个人能力?

学生利用假期提升个人能力可以通过多种方式实现&#xff0c;以下是一些建议&#xff1a; 1. **学习新技能**&#xff1a;利用假期时间学习一门新语言、编程、艺术、音乐、体育等&#xff0c;这些技能不仅能够丰富个人生活&#xff0c;还能在未来的学业和职业发展中发挥作用。 …

JavaScript流程控制分支

目录 一、流程控制 二、顺序流程控制 三、分支流程控制 if 语句 1.分支结构 2.if 语句 3.if else 语句 &#xff08;双分支语句&#xff09; 4.if else if 语句 &#xff08;多分支语句&#xff09; 四、三元表达式 五、分支流程控制switch语句 1.分支流程控制switch语句 …

什么是iPaaS?

一、iPaaS简介 iPaaS&#xff0c;即集成平台即服务&#xff08;Integration Platform as a Service&#xff09;&#xff0c;是一种基于云计算的自助服务模型&#xff0c;它为企业提供了一种标准化的应用程序集成方式。能够促进开发、执行和治理集成流程&#xff0c;连接本地和…

正定矩阵(Positive Definite Matrix)

正定矩阵&#xff08;Positive Definite Matrix&#xff09; flyfish Positive&#xff08;正数&#xff09; &#xff1a;在数学和统计学中&#xff0c;通常指大于零的数。在矩阵理论中&#xff0c;一个矩阵被称为正定&#xff0c;是因为它的性质类似于正数的性质。 Defini…

真的不用太焦虑,普通人怎么选都是错

作为一个在职场摸爬滚打多年的过来人&#xff0c;我想跟大家分享一些我的感悟。 这些年看着身边的年轻人们&#xff0c;总是为工作的选择而焦虑不已&#xff1a;他们担心选错了行业&#xff0c;误入歧途&#xff1b;担心选错了公司&#xff0c;前途渺茫。然而&#xff0c;我想告…

STM32 运行atof函数进入hard fault中断

目前为了糊口,做了硬件工程师,因此博客也很久没更新了。目前也只能业余时间自己玩玩喜欢的东西。 最近在研究FOC时候,发现STM32在运行“atof”函数时候,导致程序进入了hard fault中断中。 事情的起因是这样的: 我已经通过Jlink的RTT功能,替代了单片机的串口日志输出。翻阅…

裁员裁到大动脉,是一种什么体验!

大家好啊&#xff0c;我是董董灿。 降本增效是每个当老板的人都喜欢挂在嘴边的口头禅&#xff0c;尤其是行业不景气&#xff0c;公司发展遇到瓶颈的时候。 大部分公司降本增效的手段其实非常相似&#xff0c;比较容易实施的手段也就那几种。 要么搞设备自动化和流程自动化&a…

Anthropic 发布新AI模型Claude 3.5 Sonnet

&#x1f989; AI新闻 &#x1f680; Anthropic 发布新AI模型Claude 3.5 Sonnet 摘要&#xff1a;Anthropic 发布了其最强 AI 模型 Claude 3.5 Sonnet。速度更快、处理细微差别和幽默的能力提升&#xff0c;且支持编写、编辑和执行代码。该模型通过公司网站、iPhone 应用及 A…

【Qt6.3 基础教程 11】 深入探索列表型控件:QListWidget和QComboBox

文章目录 前言QListWidget&#xff1a;便捷的项目列表主要特性示例&#xff1a;使用QListWidget QComboBox&#xff1a;下拉选择的高效实现主要特性示例&#xff1a;使用QComboBox 结合Model/View架构使用总结 前言 在任何现代用户界面中&#xff0c;列表是展示项目集合的重要…

笔记-python map函数

map()函数是Python内置的高阶函数&#xff0c;它接收一个函数f和一个list作为参数。通过将函数f依次应用于list的每个元素&#xff0c;map()函数会生成一个新的list并返回。 例如&#xff0c;对于list [1, 2, 3, 4, 5, 6, 7, 8, 9]&#xff0c;如果我们想要计算list中每个元素…

数据库系统概念(第八周 第一堂)(规范化关系数据库设计)(强推学习!!!)

目录 前言 E-R模型质量低的深层原因 数据依赖 函数依赖 主属性/非主属性 逻辑蕴含与闭包 Armstrongs Axiom 求解F闭包算法 求解属性集闭包算法 属性集闭包的作用 候选码求解理论和算法 候选码求解理论 无关属性 检验方法 正则覆盖 关系模式的设计 关系…

【深度学习】GPT-2,Language Models are Unsupervised Multitask Learners,【语言建模】

论文&#xff1a;https://d4mucfpksywv.cloudfront.net/better-language-models/language_models_are_unsupervised_multitask_learners.pdf 文章目录 摘要引言方法2.1 训练数据集2.2 输入表示2.3 模型3. 实验3.1 语言建模3.2 Children’s Book Test3.3 LAMBADA3.4 Winograd Sc…

自动驾驶学习-车载摄像头ISP(2)

背景 智能驾驶ISP&#xff08;Image Signal Processor&#xff0c;图像信号处理器&#xff09;在自动驾驶和辅助驾驶系统中扮演着至关重要的角色。 典型的ISP通常会对摄像头输出的RAW数据先做黑电平矫正&#xff08;BLC&#xff09;、坏点矫正&#xff08;DPC&#xff09;、数…

如何DIY出专属个性化的CSDN主页?一招教你搞定!

个人主页&#xff1a;学习前端的小z 个人专栏&#xff1a;HTML5和CSS3悦读 本专栏旨在分享记录每日学习的前端知识和学习笔记的归纳总结&#xff0c;欢迎大家在评论区交流讨论&#xff01; 文章目录 &#x1f4af;如何通过HTMLCSS自定义模板diy出自己的个性化csdn主页&#x…

SD3发布,送你3个ComfyUI工作流

大家好&#xff0c;我是每天分享AI应用的萤火君&#xff01; 这几天AI绘画界最轰动的消息莫过于Stable Diffusion 3&#xff08;简称SD3&#xff09;的发布。SD3是一个多模态的 Diffusion Transformer 模型&#xff0c;其在图像质量、排版、复杂提示理解和资源效率方面具有显著…

ADC常用的十大滤波算法(C语言)

一、限幅滤波法 1、方法&#xff1a; 根据经验判断两次采样允许的最大偏差值&#xff08;设为A&#xff09; 每次检测到新值时判断&#xff1a; a. 如果本次值与上次值之差<A&#xff0c;则本次值有效 b. 如果本次值与上次值之差>A&#xff0c;则本次值无效&#xf…

QT MQTT (二)编译与集成

一、QT MQTT 提供 MQTT 客户端服务的 Qt 专用库基于标准化发布 / 订阅协议&#xff0c;用于在设备和组件之间可靠地共享数据。MQTT 是为保证状态正确性、满足高安全标准和交换最小数据而设计的协议&#xff0c;因此被广泛应用于各种分布式系统和物联网解决方案中。 Qt开发MQT…

【Oracle篇】Oracle数据库坏块处理:rman修复坏块实践与案例分析(第七篇,总共八篇)

&#x1f4ab;《博主介绍》&#xff1a;✨又是一天没白过&#xff0c;我是奈斯&#xff0c;DBA一名✨ &#x1f4ab;《擅长领域》&#xff1a;✌️擅长Oracle、MySQL、SQLserver、阿里云AnalyticDB for MySQL(分布式数据仓库)、Linux&#xff0c;也在扩展大数据方向的知识面✌️…

Python 学习 第三册 第13章 动态规划

----用教授的方式学习 目录 13.1 又见斐波那契数列 13.2 动态规划与 0/1 背包问题 13.3 动态规划与分治算法 13.1 又见斐波那契数列 一个很直观的斐波那契数列的递归实现: def fib(n): """假设n是非负整数返回第n个斐波那契数""" …