Spring Boot整合RocketMQ实现延迟消息消费

导包
     <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.0.3</version></dependency>
添加配置信息
application配置文件
# rocketMq地址
rocketmq.name-server=106.52.60.215:9876
# 生产者分组
rocketmq.producer.group=myGroup
rocketmq.producer.topics=topic1
# 消费者分组
rocketmq.consumer.group=myGroup
# topic
rocketmq.consumer.topics=topic1
# 表示顺序消费模式
rocketmq.consumer.consume-mode=ORDERLY
# 消费者的最大线程数,即消费消息的线程池大小。默认值为20,如果不需要处理大量的消息,可以将其调小。
rocketmq.consumer.consume-thread-max=1
# 表示每次消费消息的最大数量,即一次性消费的最大消息数。默认值为1,即每次只消费一条消息。如果需要批量消费消息,可以将其调大。但是需要注意的是,批量消费消息可能会影响消费的效率和消息的顺序性。
rocketmq.consumer.consume-message-batch-max-size=1
yml配置文件
rocketmq:consumer:consume-message-batch-max-size: 1consume-mode: ORDERLYconsume-thread-max: 1group: myGrouptopics: topic1name-server: 106.52.60.215:9876producer:group: myGrouptopics: topic1
生产者发送消息
同步发现消息

在Spring Boot中,可以使用RocketMQTemplate来发送消息。设置消息的延迟级别,可以使用RocketMQTemplatesend(Message message, long timeout, int delayLevel)方法,其中delayLevel为延迟级别,单位为秒

RocketMQ支持18个级别的延迟时间,分别为1s、5s、10s、30s、1m、2m、3m、4m、5m、6m、7m、8m、9m、10m、20m、30m、1h、2h

import com.songzixain.Blog;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;/*** <p>* Description: 消息生产者* </p>** @author * @version * @create * @see */
@Slf4j
@RestController
public class MyProducer1 {@Value("${rocketmq.producer.topics}")private String topic;@Autowiredprivate RocketMQTemplate rocketMQTemplate;/*** description 同步发送延迟消息** @param:  []* @return* @Date   2023/3/11*/@GetMapping("syncSendTest")public void sendDelayMsg() {Blog blog = new Blog();blog.setBlogName("余十步");blog.setUrl("yushibu");// delayTimeLevel代表延迟级别  messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2hint delayTimeLevel = 3;Message<Blog> message = MessageBuilder.withPayload(blog).setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, delayTimeLevel).build();SendResult sendResult = rocketMQTemplate.syncSend(topic, message, 3000, delayTimeLevel);log.info("消息发送成功,时间:{} 发送内容:{}", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")), blog.toString());log.info("发送结果:{}", sendResult);}
}
异步发送消息(推荐)
import com.songzixain.Blog;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;/*** <p>* Description: 消息生产者* </p>** @author * @version * @create * @see */
@Slf4j
@RestController
public class DemoProducers {@Autowiredprivate RocketMQTemplate rocketMQTemplate;/*** description 延迟消息发送** @param:  [user]* @return* @Date   2023/3/11*/@RequestMapping("/asyncSendTest")public  String asyncSendTest(){Blog blog = new Blog();blog.setBlogName("余十步");blog.setUrl("yushibu");// 构建消息体Message<Blog> msg = MessageBuilder.withPayload(blog).build();rocketMQTemplate.asyncSend("topic1", msg, new SendCallback() {// 发送成功@Overridepublic void onSuccess(SendResult sendResult) {log.info("消息发送成功,时间:{} 发送内容:{}",  LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")), blog.toString());}// 发送失败@Overridepublic void onException(Throwable throwable) {log.info("消息发送失败,时间:{} 发送内容:{}",  LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")), blog.toString());}// ps:3 代表第三个延迟10s   延迟级别:"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"},3000,3);return "发送成功";}
}
消息消费者
import com.songzixain.Blog;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;/*** <p>* Description: 消息生产者* </p>** @author * @version * @create * @see 
@Slf4j
@RestController
public class MyProducer2 {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Value("${rocketmq.producer.topics}")private String topic;/*** description 延迟消息发送* 在上面的代码中,我们使用了RocketMQTemplate的syncSend方法来发送消息。* 其中,第一个参数是消息的主题,第二个参数是消息内容,第三个参数是延迟时间(单位为毫秒)* ,第四个参数是发送消息的重试次数。* @param:  [user]* @return* @Date   2023/3/11*/@RequestMapping("/asyncSendTest")public  String asyncSendTest(){Blog blog = new Blog();blog.setBlogName("余十步");blog.setUrl("yushibu");// 构建消息体Message<Blog> msg = MessageBuilder.withPayload(blog).build();rocketMQTemplate.asyncSend(topic, msg, new SendCallback() {// 发送成功@Overridepublic void onSuccess(SendResult sendResult) {log.info("消息发送成功,时间:{} 发送内容:{}",  LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")), blog.toString());}// 发送失败@Overridepublic void onException(Throwable throwable) {log.info("消息发送失败,时间:{} 发送内容:{}",  LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")), blog.toString());}// ps:3 代表第三个延迟10s   延迟级别:"messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"},3000,4);return "发送成功";}
}
启动测试

启动请求:http://localhost:8081/asyncSendTest

控制台打印

可以看到,消息生产者设置的延迟级别是3,对应延迟了10秒钟

延迟级别:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

需要注意的是,顺序消费模式下,同一个消费者组内的消费者只会有一个线程消费同一个队列中的消息,这样才能保证消息的顺序性。

通过以上步骤,就可以使用RocketMQ实现消息延迟功能了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/34499.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Ubuntu+Apache2 搭建Gerrit 环境

一、前言 时隔多年&#xff0c;好久没有更新CSDN 博客了&#xff0c;主要原因有如下两点&#xff1a; 1、平时工作繁忙&#xff0c;无暇更新。 2、工作内容涉及信息安全&#xff0c;一些工作经验积累不便更新到互联网上。 最近一直在折腾搭建Gerrit 环境&#xff0c;最开始是…

基于Transformer的自监督学习在NLP中的前沿应用

1. 引言 自然语言处理&#xff08;NLP&#xff09;领域正经历一场由自监督学习&#xff08;Self-Supervised Learning, SSL&#xff09;和Transformer架构共同驱动的革命。自监督学习通过巧妙地利用未标注数据&#xff0c;大大减少了对人工标注的依赖&#xff0c;而Transforme…

基于IM948(Low-cost IMU+蓝牙)模块的高精度PDR(Pedestrian Dead Reckoning)定位系统 — 可以供模块和配套代码

一、背景与意义 行人PDR定位系统中的PDR&#xff08;Pedestrian Dead Reckoning&#xff0c;即行人航位推算&#xff09;背景意义在于其提供了一种在GPS信号不可用或不可靠的环境下&#xff0c;对行人进行精确定位和导航的解决方案。以下是关于PDR背景意义的详细描述&#xff1…

Nacos、Sentinel底层核心原理

Nacos和Sentinel都是服务于微服务架构的组件&#xff0c;但它们各自承担不同的角色和功能。 ### Nacos的核心原理&#xff1a; 1. **服务发现与注册中心**&#xff1a;Nacos作为服务注册中心&#xff0c;允许服务实例在启动时注册自己&#xff0c;并在关闭时注销。客户端可以通…

如何正确选择EC油封端盖?

在机械系统中&#xff0c;EC油封端盖扮演着关键角色。正确选择密封圈不仅能确保系统的正常运行&#xff0c;还能延长设备的使用寿命。本文将从多个角度探讨如何选择合适的EC油封端盖。 分析应用环境 操作环境&#xff1a; 温度范围&#xff1a;确定操作环境的温度范围。像FK…

超越常规:深度定制Ant Design Vue组件样式

标题&#xff1a;超越常规&#xff1a;深度定制Ant Design Vue组件样式 Ant Design Vue是一个基于Vue.js的UI设计语言&#xff0c;它提供了一套企业级的高质量React组件。尽管Ant Design Vue的组件已经非常完善&#xff0c;但在某些情况下&#xff0c;我们可能需要根据特定的设…

Shopee、Lazada测评,是找服务商呢?还是建议自己养号补单呢?

目前大部分Shopee、Lazada的卖家由于运营成本的增加&#xff0c;都会找服务商测评来打造权重&#xff0c;但是找服务商有很多不靠谱&#xff0c;建议还是自行精养一批号&#xff0c;账号在手里比较安全可控&#xff0c;随时随地可以送测&#xff0c;精准搜索关键词货比三家下单…

【日记】希望文竹长得越来越好吧(856 字)

正文 为什么昨天给老师提早说了今天上课…… 今天都要忙死了。不论上午下午都手忙脚乱。上午之前的存量客户来开新账户&#xff0c;流程卡在客户经理尽调那里。恰好那个客户经理还是部门主管&#xff0c;我们没一个人敢催。向副行长汇报情况&#xff0c;又跟客户说。客户跟他们…

Python风控建模实战案例数据库(50个实战数据集,上千万数据量)

作者Toby&#xff0c;来源公众号&#xff1a;Python风控模型&#xff0c;Python风控建模实战案例数据库 风险控制建模是指利用数据和分析方法来识别、评估和管理金融风险的过程。在金融领域&#xff0c;风险控制建模通常涉及建立数学模型&#xff0c;用于评估借款人、投资组合、…

git 命令 远程分支B合并到本地自己的分支A

场景说明&#xff1a;每个同事都有自己的开发分支&#xff0c;开发完统一汇总到 dev 分支 我&#xff1a;本地开发分支A&#xff0c;正在开发&#xff0c; 同事&#xff1a;远程分支B开发完&#xff0c;提交了代码在他的分支&#xff0c; 现在需要将同事B分支的代码合并到本地的…

【Android】android studio简单实现图书馆借阅管理系统

希望文章能给到你启发和灵感&#xff5e; 点赞收藏关注 支持一下吧&#xff5e; 阅读指南 序幕一、基础环境说明1.1 硬件环境1.2 软件环境 二、整体设计2.1 数据库逻辑处理&#xff1a;2.2 登录/注册模块2.3 功能界面初始化&#xff1a;2.4 图书管理模块2.5 图书租借服务2.6 读…

Java25年还有更多的工作岗位适合二本学生就业吗?

Java作为一种广泛使用的编程语言。尽管技术领域不断发展和变化&#xff0c;Java依然在许多行业中占据重要地位。以下是一些原因&#xff0c;刚好我有一些资料&#xff0c;是我根据网友给的问题精心整理了一份「JAVA的资料从专业入门到高级教程」&#xff0c; 点个关注在评论区…

基于Java的软件测试管理系统【附源码】

毕业&#xff08;设计&#xff09;论文 题 目&#xff1a; 软件测试管理系统 学 号&#xff1a; 姓 名&#xff1a; 院 部&#xff1a; 专 业&#xff1a; 班 级&#xff1a; 指导教师&#xff1a; 职 称&#xff1a; 完成日期&#xff1a; 年 月 日 摘要 随着信息技术的不断…

[leetcode]insert-into-a-binary-search-tree

. - 力扣&#xff08;LeetCode&#xff09; class Solution { public:TreeNode* insertIntoBST(TreeNode* root, int val) {if (root nullptr) {return new TreeNode(val);}TreeNode* pos root;while (pos ! nullptr) {if (val < pos->val) {if (pos->left nullptr…

如何从0构建一款类jest工具

Jest工作原理 Jest 是一个流行的 JavaScript 测试框架&#xff0c;特别适用于 React 项目&#xff0c;但它也可以用来测试任何 JavaScript 代码。Jest 能够执行用 JavaScript 编写的测试文件的原因在于其设计和内部工作原理。下面是 Jest 的工作原理及其内部机制的详细解释&…

NetSuite Account Merge 科目合并功能分析

最近项目中&#xff0c;客户有提到过能否将不用的Account与新建的Account进行合并&#xff0c;即我们所说的Merge功能&#xff5e;可以&#xff0c;但是该功能有使用的限制&#xff0c;比如最直接的一点需要注意&#xff0c;不同类型的Account是不可以使用Merge功能的&#xff…

汽车软件开发者的必修课:ASPICE 4.0主要特点、优势及与之前版本的变化之处

ASPICE&#xff08;汽车SPICE&#xff09;4.0是专为汽车行业量身定制的过程评估模型&#xff0c;旨在确保软件和系统开发过程的质量和可靠性。它是更广泛的 ISO/IEC 330xx 系列标准的一部分&#xff0c;源自通用 SPICE&#xff08;软件流程改进和能力确定&#xff09;框架。 AS…

Kylin有哪些功能特点

Apache Kylin 是一款开源的、分布式的分析数据仓库&#xff0c;它提供 Hadoop/Spark 之上的 SQL 接口及多维分析&#xff08;OLAP&#xff09;能力以支持超大规模数据。Kylin 的功能特点主要体现在以下几个方面&#xff1a; 1. SQL接口与多维分析&#xff08;OLAP&#xff09;…

批归一化(Batch Normalization)和层归一化(Layer Normalization)的作用

在深度学习领域&#xff0c;归一化技术被广泛用于加速神经网络的训练速度并提高其稳定性。本文将介绍两种常见的归一化方法&#xff1a;批归一化&#xff08;Batch Normalization, BN&#xff09;和层归一化&#xff08;Layer Normalization, LN&#xff09;&#xff0c;并通过…

Transformer模型在图像描述生成中的革新应用

Transformer模型自从由Vaswani等人在2017年提出以来&#xff0c;已经在自然语言处理&#xff08;NLP&#xff09;领域引起了革命性的变化。特别是在图像描述生成&#xff08;Image Captioning&#xff09;任务中&#xff0c;Transformer模型展示了其卓越的性能。本文将深入探讨…