Spring Boot整合RocketMQ实现延迟消息消费

导包
     <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.0.3</version></dependency>
添加配置信息
application配置文件
# rocketMq地址
rocketmq.name-server=106.52.60.215:9876
# 生产者分组
rocketmq.producer.group=myGroup
rocketmq.producer.topics=topic1
# 消费者分组
rocketmq.consumer.group=myGroup
# topic
rocketmq.consumer.topics=topic1
# 表示顺序消费模式
rocketmq.consumer.consume-mode=ORDERLY
# 消费者的最大线程数,即消费消息的线程池大小。默认值为20,如果不需要处理大量的消息,可以将其调小。
rocketmq.consumer.consume-thread-max=1
# 表示每次消费消息的最大数量,即一次性消费的最大消息数。默认值为1,即每次只消费一条消息。如果需要批量消费消息,可以将其调大。但是需要注意的是,批量消费消息可能会影响消费的效率和消息的顺序性。
rocketmq.consumer.consume-message-batch-max-size=1
yml配置文件
rocketmq:consumer:consume-message-batch-max-size: 1consume-mode: ORDERLYconsume-thread-max: 1group: myGrouptopics: topic1name-server: 106.52.60.215:9876producer:group: myGrouptopics: topic1
生产者发送消息
同步发现消息

在Spring Boot中,可以使用RocketMQTemplate来发送消息。设置消息的延迟级别,可以使用RocketMQTemplatesend(Message message, long timeout, int delayLevel)方法,其中delayLevel为延迟级别,单位为秒

RocketMQ支持18个级别的延迟时间,分别为1s、5s、10s、30s、1m、2m、3m、4m、5m、6m、7m、8m、9m、10m、20m、30m、1h、2h

import com.songzixain.Blog;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;/*** <p>* Description: 消息生产者* </p>** @author * @version * @create * @see */
@Slf4j
@RestController
public class MyProducer1 {@Value("${rocketmq.producer.topics}")private String topic;@Autowiredprivate RocketMQTemplate rocketMQTemplate;/*** description 同步发送延迟消息** @param:  []* @return* @Date   2023/3/11*/@GetMapping("syncSendTest")public void sendDelayMsg() {Blog blog = new Blog();blog.setBlogName("余十步");blog.setUrl("yushibu");// delayTimeLevel代表延迟级别  messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2hint delayTimeLevel = 3;Message<Blog> message = MessageBuilder.withPayload(blog).setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, delayTimeLevel).build();SendResult sendResult = rocketMQTemplate.syncSend(topic, message, 3000, delayTimeLevel);log.info("消息发送成功,时间:{} 发送内容:{}", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")), blog.toString());log.info("发送结果:{}", sendResult);}
}
异步发送消息(推荐)
import com.songzixain.Blog;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;/*** <p>* Description: 消息生产者* </p>** @author * @version * @create * @see */
@Slf4j
@RestController
public class DemoProducers {@Autowiredprivate RocketMQTemplate rocketMQTemplate;/*** description 延迟消息发送** @param:  [user]* @return* @Date   2023/3/11*/@RequestMapping("/asyncSendTest")public  String asyncSendTest(){Blog blog = new Blog();blog.setBlogName("余十步");blog.setUrl("yushibu");// 构建消息体Message<Blog> msg = MessageBuilder.withPayload(blog).build();rocketMQTemplate.asyncSend("topic1", msg, new SendCallback() {// 发送成功@Overridepublic void onSuccess(SendResult sendResult) {log.info("消息发送成功,时间:{} 发送内容:{}",  LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")), blog.toString());}// 发送失败@Overridepublic void onException(Throwable throwable) {log.info("消息发送失败,时间:{} 发送内容:{}",  LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")), blog.toString());}// ps:3 代表第三个延迟10s   延迟级别:"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"},3000,3);return "发送成功";}
}
消息消费者
import com.songzixain.Blog;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;/*** <p>* Description: 消息生产者* </p>** @author * @version * @create * @see 
@Slf4j
@RestController
public class MyProducer2 {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Value("${rocketmq.producer.topics}")private String topic;/*** description 延迟消息发送* 在上面的代码中,我们使用了RocketMQTemplate的syncSend方法来发送消息。* 其中,第一个参数是消息的主题,第二个参数是消息内容,第三个参数是延迟时间(单位为毫秒)* ,第四个参数是发送消息的重试次数。* @param:  [user]* @return* @Date   2023/3/11*/@RequestMapping("/asyncSendTest")public  String asyncSendTest(){Blog blog = new Blog();blog.setBlogName("余十步");blog.setUrl("yushibu");// 构建消息体Message<Blog> msg = MessageBuilder.withPayload(blog).build();rocketMQTemplate.asyncSend(topic, msg, new SendCallback() {// 发送成功@Overridepublic void onSuccess(SendResult sendResult) {log.info("消息发送成功,时间:{} 发送内容:{}",  LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")), blog.toString());}// 发送失败@Overridepublic void onException(Throwable throwable) {log.info("消息发送失败,时间:{} 发送内容:{}",  LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")), blog.toString());}// ps:3 代表第三个延迟10s   延迟级别:"messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"},3000,4);return "发送成功";}
}
启动测试

启动请求:http://localhost:8081/asyncSendTest

控制台打印

可以看到,消息生产者设置的延迟级别是3,对应延迟了10秒钟

延迟级别:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

需要注意的是,顺序消费模式下,同一个消费者组内的消费者只会有一个线程消费同一个队列中的消息,这样才能保证消息的顺序性。

通过以上步骤,就可以使用RocketMQ实现消息延迟功能了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/34499.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Ubuntu+Apache2 搭建Gerrit 环境

一、前言 时隔多年&#xff0c;好久没有更新CSDN 博客了&#xff0c;主要原因有如下两点&#xff1a; 1、平时工作繁忙&#xff0c;无暇更新。 2、工作内容涉及信息安全&#xff0c;一些工作经验积累不便更新到互联网上。 最近一直在折腾搭建Gerrit 环境&#xff0c;最开始是…

基于Transformer的自监督学习在NLP中的前沿应用

1. 引言 自然语言处理&#xff08;NLP&#xff09;领域正经历一场由自监督学习&#xff08;Self-Supervised Learning, SSL&#xff09;和Transformer架构共同驱动的革命。自监督学习通过巧妙地利用未标注数据&#xff0c;大大减少了对人工标注的依赖&#xff0c;而Transforme…

基于IM948(Low-cost IMU+蓝牙)模块的高精度PDR(Pedestrian Dead Reckoning)定位系统 — 可以供模块和配套代码

一、背景与意义 行人PDR定位系统中的PDR&#xff08;Pedestrian Dead Reckoning&#xff0c;即行人航位推算&#xff09;背景意义在于其提供了一种在GPS信号不可用或不可靠的环境下&#xff0c;对行人进行精确定位和导航的解决方案。以下是关于PDR背景意义的详细描述&#xff1…

Shopee、Lazada测评,是找服务商呢?还是建议自己养号补单呢?

目前大部分Shopee、Lazada的卖家由于运营成本的增加&#xff0c;都会找服务商测评来打造权重&#xff0c;但是找服务商有很多不靠谱&#xff0c;建议还是自行精养一批号&#xff0c;账号在手里比较安全可控&#xff0c;随时随地可以送测&#xff0c;精准搜索关键词货比三家下单…

【日记】希望文竹长得越来越好吧(856 字)

正文 为什么昨天给老师提早说了今天上课…… 今天都要忙死了。不论上午下午都手忙脚乱。上午之前的存量客户来开新账户&#xff0c;流程卡在客户经理尽调那里。恰好那个客户经理还是部门主管&#xff0c;我们没一个人敢催。向副行长汇报情况&#xff0c;又跟客户说。客户跟他们…

【Android】android studio简单实现图书馆借阅管理系统

希望文章能给到你启发和灵感&#xff5e; 点赞收藏关注 支持一下吧&#xff5e; 阅读指南 序幕一、基础环境说明1.1 硬件环境1.2 软件环境 二、整体设计2.1 数据库逻辑处理&#xff1a;2.2 登录/注册模块2.3 功能界面初始化&#xff1a;2.4 图书管理模块2.5 图书租借服务2.6 读…

Java25年还有更多的工作岗位适合二本学生就业吗?

Java作为一种广泛使用的编程语言。尽管技术领域不断发展和变化&#xff0c;Java依然在许多行业中占据重要地位。以下是一些原因&#xff0c;刚好我有一些资料&#xff0c;是我根据网友给的问题精心整理了一份「JAVA的资料从专业入门到高级教程」&#xff0c; 点个关注在评论区…

基于Java的软件测试管理系统【附源码】

毕业&#xff08;设计&#xff09;论文 题 目&#xff1a; 软件测试管理系统 学 号&#xff1a; 姓 名&#xff1a; 院 部&#xff1a; 专 业&#xff1a; 班 级&#xff1a; 指导教师&#xff1a; 职 称&#xff1a; 完成日期&#xff1a; 年 月 日 摘要 随着信息技术的不断…

[leetcode]insert-into-a-binary-search-tree

. - 力扣&#xff08;LeetCode&#xff09; class Solution { public:TreeNode* insertIntoBST(TreeNode* root, int val) {if (root nullptr) {return new TreeNode(val);}TreeNode* pos root;while (pos ! nullptr) {if (val < pos->val) {if (pos->left nullptr…

如何从0构建一款类jest工具

Jest工作原理 Jest 是一个流行的 JavaScript 测试框架&#xff0c;特别适用于 React 项目&#xff0c;但它也可以用来测试任何 JavaScript 代码。Jest 能够执行用 JavaScript 编写的测试文件的原因在于其设计和内部工作原理。下面是 Jest 的工作原理及其内部机制的详细解释&…

NetSuite Account Merge 科目合并功能分析

最近项目中&#xff0c;客户有提到过能否将不用的Account与新建的Account进行合并&#xff0c;即我们所说的Merge功能&#xff5e;可以&#xff0c;但是该功能有使用的限制&#xff0c;比如最直接的一点需要注意&#xff0c;不同类型的Account是不可以使用Merge功能的&#xff…

汽车软件开发者的必修课:ASPICE 4.0主要特点、优势及与之前版本的变化之处

ASPICE&#xff08;汽车SPICE&#xff09;4.0是专为汽车行业量身定制的过程评估模型&#xff0c;旨在确保软件和系统开发过程的质量和可靠性。它是更广泛的 ISO/IEC 330xx 系列标准的一部分&#xff0c;源自通用 SPICE&#xff08;软件流程改进和能力确定&#xff09;框架。 AS…

批归一化(Batch Normalization)和层归一化(Layer Normalization)的作用

在深度学习领域&#xff0c;归一化技术被广泛用于加速神经网络的训练速度并提高其稳定性。本文将介绍两种常见的归一化方法&#xff1a;批归一化&#xff08;Batch Normalization, BN&#xff09;和层归一化&#xff08;Layer Normalization, LN&#xff09;&#xff0c;并通过…

ATA-7025高压放大器的优势如何

高压放大器是一类在电子领域中具有重要作用的设备&#xff0c;其主要功能是将输入信号的电压放大到更高的水平。在许多应用中&#xff0c;高压放大器展现出独特的优势&#xff0c;下面将介绍高压放大器的优势以及它们在不同领域的应用。 高压放大器的优势 1.信号驱动能力强 高压…

ATA-3040C功率放大器的基本要求包括什么

功率放大器是电子设备中常用的一个组件&#xff0c;用于将输入信号增强到足够大的电平&#xff0c;以驱动负载而不失真。要设计一个高效和性能优越的功率放大器&#xff0c;需要考虑多个基本要求和设计考虑因素。下面安泰电子将介绍功率放大器的基本要求&#xff0c;以及如何满…

中兴光猫破解telnet配置命令汇总

中兴光猫telnet配置命令汇总 | LogDicthttps://www.logdict.com/archives/zhong-xing-guang-mao-telnetpei-zhi-ming-ling-hui-zong

【王道数据结构笔记】单链表的基本操作之指定结点的后插操作(代码分析)

🎈个人主页:豌豆射手^ 🎉欢迎 👍点赞✍评论⭐收藏 🤗收录专栏:数据结构 🤝希望本文对您有所裨益,如有不足之处,欢迎在评论区提出指正,让我们共同学习、交流进步! 【王道数据结构笔记】单链表的基本操作之指定结点的后插操作(代码分析) 引言一 代码二 分析总结…

【LeetCode:2741. 特别的排列 + 递归 + 记忆化搜索 + 动态规划】

&#x1f680; 算法题 &#x1f680; &#x1f332; 算法刷题专栏 | 面试必备算法 | 面试高频算法 &#x1f340; &#x1f332; 越难的东西,越要努力坚持&#xff0c;因为它具有很高的价值&#xff0c;算法就是这样✨ &#x1f332; 作者简介&#xff1a;硕风和炜&#xff0c;…

解决GPU 显存未能完全释放

一、 现象 算法同学反馈显存未能完全释放。 二、解决方法 一条命令搞定 注意&#xff1a;执行时注意不要误杀其他的python进程&#xff0c;需要确认好。 我的这条命令是将所有python进程都杀死了 ps -elf | grep python | awk {print $4} | xargs kill -s 9

使用AI技术实现语言练习

使用人工智能技术实现语言场景练习&#xff0c;可以有效地提高学习者的语言能力&#xff0c;包括口语、听力、阅读和写作。以下是一些常见的应用场景。北京木奇移动技术有限公司&#xff0c;专业的软件外包开发公司&#xff0c;欢迎交流合作。 1. 口语练习 虚拟对话伙伴: 利用…