Springboot整合RocketMQ 基本消息处理

目录

1. 同步消息

2. 异步消息

3. 单向消息

4. 延迟消息

5. 批量消息

6. 顺序消息

 7. Tag过滤


导入依赖

       <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId></dependency>

YAML配置

rocketmq:name-server: localhost:9876     # rocketMq的nameServer地址

1. 同步消息

同步消息是发送消息后等待Broker的响应,确保消息被成功接收。

生产者:

   @AutowiredRocketMQTemplate rocketMQTemplate;@Testvoid contextLoads() {SendResult result = rocketMQTemplate.syncSend("test", MessageBuilder.withPayload("同步消息").build());
//        SendResult result = rocketMQTemplate.syncSend("test", "同步消息");System.out.println("发送状态:" + result.getSendStatus() + " 消息id:" + result.getMsgId());}

2. 异步消息

异步消息是发送消息后不等待Broker响应,通过回调函数处理发送结果。

@AutowiredRocketMQTemplate rocketMQTemplate;@Testvoid contextLoads() {rocketMQTemplate.asyncSend("test", MessageBuilder.withPayload("异步消息").build(), new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("发送成功"+sendResult);}@Overridepublic void onException(Throwable throwable) {System.out.println("发送失败"+throwable);}});}

3. 单向消息

单向消息是发送消息后不等待Broker响应,也没有回调函数。

    @AutowiredRocketMQTemplate rocketMQTemplate;@Testvoid contextLoads() {rocketMQTemplate.sendOneWay("test","单向消息");}

4. 延迟消息

延迟消息是设置消息的延迟时间,确保消息在指定时间后才被消费。

 @AutowiredRocketMQTemplate rocketMQTemplate;@Testvoid contextLoads() {//在RocketMQ中,timeout(超时时间)是指消息发送的最大等待时间。当你发送一个消息时,系统会等待一定的时间来获取发送结果,这个等待的时间就是超时时间。单位msMessage<String> message = MessageBuilder.withPayload("延迟消息").build();//延迟级别 "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h" 2对应5sSendResult result = rocketMQTemplate.syncSend("test", message, 2000, 2);}

5. 批量消息

批量消息是将多个消息打包成一个消息批次发送,提高发送效率。

    @AutowiredRocketMQTemplate rocketMQTemplate;@Testvoid contextLoads() {List<String> list = Arrays.asList("blue", "red", "pink", "yello");rocketMQTemplate.syncSend("test",list);}

上面所有生产者对应的消费者代码为:

@Component
@RocketMQMessageListener(topic = "test",consumerGroup = "test-group-consumer")
public class MQMsgListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt message) {String msgId = message.getMsgId();String msg = new String(message.getBody());System.out.println("消息id:"+msgId+"消息内容:"+msg);}
}

6. 顺序消息

顺序消息是保证同一个消息队列中的消息按顺序消费。

生产者代码:

    @AutowiredRocketMQTemplate rocketMQTemplate;@Testvoid contextLoads() {for(int i=0;i<10;i++){rocketMQTemplate.syncSendOrderly("test","顺序消息"+i,"1");}}

消费者代码更改:

@Component
@RocketMQMessageListener(topic = "test",consumerGroup = "test-group-consumer",consumeMode = ConsumeMode.ORDERLY)
public class MQMsgListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt message) {String msgId = message.getMsgId();String msg = new String(message.getBody());System.out.println("消息id:"+msgId+"消息内容:"+msg);}
}

 7. Tag过滤

消费者订阅的Tag和发送者设置的消息Tag相互匹配,则消息被投递给消费端进行消费。

生产者

    @AutowiredRocketMQTemplate rocketMQTemplate;@Testvoid contextLoads() {rocketMQTemplate.syncSend("test:test","hello");}

消费者

@Component
@RocketMQMessageListener(topic = "test",consumerGroup = "test-group-consumer",selectorType = SelectorType.TAG,selectorExpression = "test")
public class MQMsgListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt message) {String msgId = message.getMsgId();String msg = new String(message.getBody());System.out.println("消息id:"+msgId+"消息内容:"+msg);}
}

 @RocketMQMessageListener 注解参数如下:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/596546.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Spark精讲】性能优化:并行度

Reduce端并行度 RDD&#xff1a; 参数&#xff1a;spark.default.parallelism手动&#xff1a;groupByKey(10)&#xff0c;10即为并行度Spark SQL&#xff1a; 参数&#xff1a;spark.sql.shuffle.partitionsHive on Spark&#xff1a; 1.控制reduce个数的方式与参数 1.1.首先…

基于textcnn做微博情感文本分析

基于TextCNN&#xff08;Text Convolutional Neural Network&#xff09;进行微博情感文本分析是一种常见的文本分类方法。TextCNN利用卷积神经网络&#xff08;CNN&#xff09;结构来处理文本序列数据&#xff0c;通过卷积和池化操作捕捉文本中的局部特征&#xff0c;然后将这…

14:00面试,14:08就出来了,问的问题过于变态了。。。

从小厂出来&#xff0c;没想到在另一家公司又寄了。 到这家公司开始上班&#xff0c;加班是每天必不可少的&#xff0c;看在钱给的比较多的份上&#xff0c;就不太计较了。没想到10月一纸通知&#xff0c;所有人不准加班&#xff0c;加班费不仅没有了&#xff0c;薪资还要降40…

机器学习原理到Python代码实现之LinearRegression

Linear Regression 线性回归模型 该文章作为机器学习的第一篇文章&#xff0c;主要介绍线性回归模型的原理和实现方法。 更多相关工作请参考&#xff1a;Github 算法介绍 线性回归模型是一种常见的机器学习模型&#xff0c;用于预测一个连续的目标变量&#xff08;也称为响应变…

Spring的bean的生命周期!!!

一.单例模式 单例&#xff1a;[启动容器]--->通过构造方法&#xff08;创建对象&#xff09;---->调用set方法&#xff08;注入&#xff09;--->调用init方法&#xff08;初始化&#xff09;----[容器关闭]----->调用destroy方法&#xff08;销毁&#xff09; app…

死锁的处理策略“检测和解除”-第三十九天

目录 前言 死锁的检测 数据结构资源分配图 基于“图”检测死锁 可以消除所有边 不能消除所有边 结论 死锁定理 死锁的解除 本节思维导图 前言 如果系统中既不采取预防死锁的措施&#xff0c;也不采取避免死锁的措施&#xff0c;系统就很可能发生死锁&#xff0c;在这种…

西电期末1019.校验和计算

一.题目 二.分析与思路 难点在于逐个取出数据的每一位&#xff0c;我们编写f函数&#xff0c;使用了一个while函数&#xff0c;每次循环中用取余的运算符找到数据的个位累加&#xff0c;再将n/10&#xff0c;如此n便被去除了个位&#xff0c;十位就成了新的个位&#xff0c;最…

案例精选|淄博绿能燃气工程有限公司日志审计系统建设方案

淄博绿能燃气工程有限公司&#xff0c;成立于1994年&#xff0c;前身为淄博市煤气公司管道液化气分公司。公司业务主要涉及天然气、液化气等市政工程施工及城镇燃气供应等领域&#xff0c;具有市政公用工程施工总承包二级资质&#xff0c;《压力管道安装许可证》压力管道安装GB…

CodeFormer安装记录

1、Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running? 解决方案: systemctl daemon-reload systemctl restart docker.service 2、Error response from daemon: could not select device driver ““ with capabiliti…

利用Embedding优化搜索功能

我们继续用Gemini学习LLM编程之旅。 Embedding是一种自然语言处理 (NLP) 技术&#xff0c;可将文本转换为数值向量。Embedding捕获语义含义和上下文&#xff0c;从而导致具有相似含义的文本具有更接近的Embedding。例如&#xff0c;句子“我带我的狗去看兽医”和“我带我的猫去…

LeetCode---378周赛

题目列表 2980. 检查按位或是否存在尾随零 2981. 找出出现至少三次的最长特殊子字符串 I 2982. 找出出现至少三次的最长特殊子字符串 II 2983. 回文串重新排列查询 一、检查按位或是否存在尾随零 这题和位运算有关&#xff0c;不是很难&#xff0c;题目要求至少有两个数的…

Dubbo相关面试题及答案(2024)

1、Dubbo的基本架构是怎样的&#xff1f; Dubbo是一个高性能的Java RPC&#xff08;远程过程调用&#xff09;框架&#xff0c;它的基本架构主要由以下几个核心组件构成&#xff1a; Provider&#xff08;服务提供方&#xff09;&#xff1a; Provider是指暴露服务的服务提供者…

案例073:基于微信小程序的智慧旅游平台开发

文末获取源码 开发语言&#xff1a;Java 框架&#xff1a;SSM JDK版本&#xff1a;JDK1.8 数据库&#xff1a;mysql 5.7 开发软件&#xff1a;eclipse/myeclipse/idea Maven包&#xff1a;Maven3.5.4 小程序框架&#xff1a;uniapp 小程序开发软件&#xff1a;HBuilder X 小程序…

用贪心算法编程求解任务安排问题

题目&#xff1a;用贪心算法编程求解以下任务安排问题 一个单位时间任务是恰好需要一个单位时间完成的任务。给定一个单位时间任务的有限集S。关于S的一个时间表用于描述S中单位时间任务的执行次序。时间表中第1个任务从时间0 开始执行直至时间1 结束&#xff0c;第2 个任务从时…

20240104确认AIO-3399J的开发板适配ov13850摄像头不支持4K分辨率录像

20240104确认AIO-3399J的开发板适配ov13850摄像头不支持4K分辨率录像 2024/1/4 13:23 开发板&#xff1a;Firefly的AIO-3399J【RK3399】 SDK&#xff1a;rk3399-android-11-r20211216.tar.xz【Android11】 Android11.0.tar.bz2.aa【ToyBrick】 Android11.0.tar.bz2.ab Android1…

人工智能如何重塑金融服务业

在体验优先的世界中识别金融服务业中的AI使用场景 人工智能&#xff08;AI&#xff09;作为主要行业的大型组织的重要业务驱动力&#xff0c;持续受到关注。众所周知&#xff0c;传统金融服务业在采用新技术方面相对滞后&#xff0c;一些组织使用的还是上世纪50年代和60年代发…

Android Jetpack学习系列——Navigation

写在前面 Google在2018年就推出了Jetpack组件库&#xff0c;但是直到今天我才给重视起来&#xff0c;这真的不得不说是一件让人遗憾的事。过去几年的空闲时间里&#xff0c;我一直在尝试做一套自己的组件库&#xff0c;帮助自己快速开发&#xff0c;虽然也听说过Jetpack&#…

Android如何正确使用 Canvas 的 save() 和 restore() 方法

如何正确使用 Canvas 的 save() 和 restore() 方法 在Android的绘图API中&#xff0c;Canvas类提供了一系列强大的功能来绘制自定义视图。为了更高效地管理绘图状态和变换&#xff0c;Canvas类提供了save()和restore()方法。正确使用这些方法是高效绘图和避免常见错误的关键。 …

任务需求分析中的流程图、用例图、er图、类图、时序图线段、图形的作用意义

任务需求分析中的流程图、用例图、er图、类图、时序图线段、图形的作用意义 流程图 流程图中各种图形的含义及用法解析 连接线符号 连接各要素&#xff0c;表示流程的顺序或过程的方向。 批注符号 批注或说明&#xff0c;也可以做条件叙述。 子流程 流程中一部分图形的逻辑…

JS之注册事件兼容性解决方案

本章介绍注册事件兼容性的解决方案 废话不多说&#xff0c;直接上代码&#xff1a; function addEventListener(element, eventName, fn) {//判断当前浏览器是否支持 addEventListener 方法if (element.addEventListener) {element.addEventListener(eventName, fn); // 第三个…