RocketMq详解:二、SpringBoot集成RocketMq

在上一章中我们对Rocket的基础知识、特性以及四大核心组件进行了详细的介绍,本章带着大家一起去在项目中具体的进行应用,并设计将其作为一个工具包只提供消息的分发服务和业务模块进行解耦

在进行本章的学习之前,需要确保你的可以正常启动和访问RocketMq服务,还未安装的可以移步至此:《docker安装rocketMq》

1.添加maven依赖

		<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.3</version></dependency>

2.项目结构

在这里插入图片描述

3.配置管理

rocketmq:name-server: 127.0.0.1:9876# 生产者producer:group: boot_group_1# 消息发送超时时间send-message-timeout: 3000# 消息最大长度4Mmax-message-size: 4096# 消息发送失败重试次数retry-times-when-send-failed: 3# 异步消息发送失败重试次数retry-times-when-send-async-failed: 2# 消费者consumer:group: boot_group_1# 每次提取的最大消息数pull-batch-size: 5

上面的配置如果是在分布式环境下也可以配置在Apollo或nacos等配置中心里进行动态配置

4.配置类

在配置类中主要定义两个Bean的加载,即RocketMQTemplate和DefaultMQProducer,主要是提供消息发送的能力,即生产消息;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author ninesun* @ClassName RocketMqConfig* @description: 消息中间件配置类* @date 2024年05月19日* @version: 1.0*/
@Configuration
public class RocketMqConfig {@Value("${rocketmq.name-server}")private String nameServer;@Value("${rocketmq.producer.group}")private String producerGroup;@Value("${rocketmq.producer.send-message-timeout}")private Integer sendMsgTimeout;@Value("${rocketmq.producer.max-message-size}")private Integer maxMessageSize;@Value("${rocketmq.producer.retry-times-when-send-failed}")private Integer retryTimesWhenSendFailed;@Value("${rocketmq.producer.retry-times-when-send-async-failed}")private Integer retryTimesWhenSendAsyncFailed;@Beanpublic RocketMQTemplate rocketMqTemplate() {RocketMQTemplate rocketMqTemplate = new RocketMQTemplate();rocketMqTemplate.setProducer(defaultMqProducer());return rocketMqTemplate;}@Beanpublic DefaultMQProducer defaultMqProducer() {DefaultMQProducer producer = new DefaultMQProducer();producer.setNamesrvAddr(this.nameServer);producer.setProducerGroup(this.producerGroup);producer.setSendMsgTimeout(this.sendMsgTimeout);producer.setMaxMessageSize(this.maxMessageSize);producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);producer.setRetryTimesWhenSendAsyncFailed(this.retryTimesWhenSendAsyncFailed);return producer;}
}

5.基础用法

5.1 消息生产

编写一个生产者接口类,分别使用RocketMQTemplate和DefaultMQProducer实现消息发送的功能,然后可以通过Dashboard控制面板查看消息详情

  • 编写Controller进行消息的发送
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;@RestController
@Slf4j
public class TestController01 {@Resourceprivate RocketMQTemplate rocketMqTemplate;@Resourceprivate DefaultMQProducer defaultMqProducer;/*** 利用rocketMqTemplate发送消息** @return*/@GetMapping("/send/msg1")public String sendMsg1() {try {// 构建消息主体Map<String, String> msgBody = new HashMap<>();msgBody.put("data", "利用rocketMqTemplate发送消息");// 发送消息rocketMqTemplate.convertAndSend("boot-mq-topic", JSON.toJSONString(msgBody));} catch (Exception e) {e.printStackTrace();}return "OK";}/*** 利用DefaultMQProducer发送消息* @return*/@GetMapping("/send/msg2")public String sendMsg2() {try {// 构建消息主体,此处可以用对象代替,为了方便演示,使用mapMap<String, String> msgBody = new HashMap<>();msgBody.put("data", "利用DefaultMQProducer发送消息");// 构建消息对象Message message = new Message();message.setTopic("boot-mq-topic");message.setTags("boot-mq-tag");message.setKeys("boot-mq-key");message.setBody(JSON.toJSONString(msgBody).getBytes());// 发送消息,打印日志SendResult sendResult = defaultMqProducer.send(message);log.info("msgId:{},sendStatus:{}", sendResult.getMsgId(), sendResult.getSendStatus());} catch (Exception e) {e.printStackTrace();}return "OK";}
}

自己自行测试,访问这两个接口后,我们可以在Dashboard控制面板查看到:
在这里插入图片描述

5.2 消息消费者

接下来,我们创建一个消息消费者。在Spring Boot项目中,我们可以使用@RocketMQMessageListener注解来定义一个消息消费者。

import com.alibaba.fastjson.JSON;
import com.example.demo.po.MessageData;
import com.example.demo.po.User;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;/*** @author hb24795* @ClassName BootMqConsumer* @description: 消费者* @date 2024年05月26日* @version: 1.0*/
@Service
@RocketMQMessageListener(topic = "boot-mq-topic", consumerGroup = "boot_group_1")
public class BootMqConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.printf("------- StringConsumer received:");System.out.println(message);}
}

当然你可以设置更多的消费前置条件:

@Component
@RocketMQMessageListener(topic = "your_topic_name", consumerGroup = "your_consumer_group_name",selectorExpression = "your_tag", selectorType = ExpressionType.TAG)
public class MyConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {// 处理消息的逻辑System.out.println("Received message: " + message);}
}

注意:此处的consumerGroup要和我们配置的group对应
在这里插入图片描述

在这里插入图片描述

上面我们已经完成了一个基本的从消息发送到消息消费的逻辑,大家可以在自己的项目中结合设计模式,aop等来定制化自己的消息中间件,但是消息的类型远不止这几个,在实现之前我们先把消息的发送代码和我们的业务代码进行解耦

@Component
public class MessageProduct {@Resourceprivate RocketMQTemplate rocketMqTemplate;@Resourceprivate DefaultMQProducer defaultMqProducer;public SendResult SendMessage(String topic, Object data, List<String> keys, String tags) throws MQBrokerException, RemotingException, InterruptedException, MQClientException {Message message = new Message();if (StringUtils.isBlank(topic)) {return null;} else {message.setTopic(topic);}if (data != null) {message.setBody(JSON.toJSONString(data).getBytes());}if (!CollectionUtils.isEmpty(keys)) {message.setKeys(keys);}if (StringUtils.isBlank(tags)) {message.setTags(tags);}message.setBody(JSON.toJSONString(data).getBytes());// 发送消息,打印日志return defaultMqProducer.send(message);}
}

5.3 延迟消息

RocketMQ 支持延迟消息发送,但并非任意时间,而是有特定的延迟等级。在较旧的版本中,延迟等级从1到18,每个等级对应一个固定的延迟时间范围。然而,随着RocketMQ的更新发展,其对于延迟消息的支持变得更加灵活。

根据最新的资料,RocketMQ 5.x版本支持的最大延迟时间可以非常长,可达一年之久,这意味着在一定意义上,它能够满足大部分应用场景对于延迟消息的需要,尽管这并不等同于可以任意指定任意时间精度的延迟。

我们先看看必须通过指定的等级进行延迟的实现:

RocketMQ不支持任意时间的延迟,只有18个等级的延迟时间,默认是:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。从头到尾共18个等级时间,s、m、h、d,分别表示秒、分、时、天。

默认的18个等级对应的时间可以修改,在broker.conf中增加如下配置,根据自身需求修改时间,然后重启broker。

1、延迟消息的原理

如果让你来设计RocketMQ的延迟消息,你会怎么设计?
1.延迟消息也是个消息,只是多了延迟时间,既然是消息,不管是不是要立刻处理,先找个临时Topic存储起来。

2.Topic里面实际上是一个个队列,那所有的延迟消息要存在一个队列里吗?不要放在同一个队列里,因为消息各自都有不同的延迟时间,如果放在一个队列里,会牵扯到其余问题:比如排序、比如记录消费位置等。所以是按延迟时间分开存。

3.消息已经存起来了,那怎么处理呢?既然涉及到了延迟时间,那自然启动线程去定时获取消息,判断消息的延迟时间是否已经到达,到达之后则取出来投放到目的Topic。

讲到这里,延迟消息的架构图基本浮现出来了:
在这里插入图片描述

实际上RocketMQ在设计延迟消息时,跟上面的思路基本类似,不在赘述,额外补充几点:

1.消息进入Broker后,会被存储在TopicSCHEDULE_TOPIC_XXXX中,只是在Dashboard中看不到。
TopicSCHEDULE_TOPIC_XXXX中有18个消息队列,分别存储18个延迟等级对应的消息。

2.RocketMQ 在启动时,会从broker.conf中获取18个等级对应的时间,延迟等级和时间的关系会存在放到DelayLevelTable中。

3.RocketMQ会开启18个定时任务每隔100ms,从TopicSCHEDULE_TOPIC_XXXX判断18个队列里的第一个消息是否可以被投放,如果可以投放,则在投放到原本的目的Topic。
判断逻辑:存入时间+延迟时间 > 当前时间。
在这里插入图片描述

说到这里,估计你也能猜到,为什么不支持自定义延迟时间了,核心原因还是性能问题。

试想一下,如果设计成任意时间,那么就不可能使用18个队列了,更不可能使用无限个队列了,只可能使用单个队列。

但是如果使用单个队列,按照先进先出的存放的话,那出现需要后进先出的消息怎么办?那只能对整个队列进行排序,如果消息量很大,每次有消息进来都需要排序,那CPU肯定会被玩爆。

而且队列里的消息被消费后,都会记录偏移量,如果每次有消息进来都要排序,那偏移量则失去意义,增加了消息丢失的风险。

所以,RocketMQ的这种18个延迟时间等级的设计,虽然在延迟时间的自由度上作出了妥协,但是基本满足业务,性能也很优秀。

2.具体实现

我们对上面的通用发送进行修改:

    public SendResult SendMessage(String topic, Object data, List<String> keys, String tags, Integer delayLevel) throws MQBrokerException, RemotingException, InterruptedException, MQClientException {Message message = new Message();if (StringUtils.isBlank(topic)) {return null;} else {message.setTopic(topic);}if (data != null) {message.setBody(JSON.toJSONString(data).getBytes());}if (!CollectionUtils.isEmpty(keys)) {message.setKeys(keys);}if (StringUtils.isBlank(tags)) {message.setTags(tags);}if (delayLevel != null) {message.setDelayTimeLevel(delayLevel);}message.setBody(JSON.toJSONString(data).getBytes());// 发送消息,打印日志return defaultMqProducer.send(message);}

此处设置的等级5,可能对应1分钟的延迟,但具体等级和时间的映射关系可以根据RocketMQ服务器的配置有所不同
我们测试一下:

    @GetMapping("/send/msg2")public String sendMsg2() {try {// 构建消息主体,此处可以用对象代替,为了方便演示,使用map![在这里插入图片描述](https://img-blog.csdnimg.cn/direct/3799dce071c44aec923eb35d5a418f2e.png#pic_center)User user = User.builder().id(1).name("ninesun").build();SendResult sendResult = messageProduct.SendMessage("boot-mq-topic", user, Collections.singletonList(user.getId().toString()), null, null);log.info("msgId:{},sendStatus:{}", sendResult.getMsgId(), sendResult.getSendStatus());} catch (Exception e) {e.printStackTrace();}return "OK";}

如果我们想实现任意时间的延迟,可以利用上面的延迟等级进行实现

我只需要根据自定义的延迟时间获取延迟等级
首先自定义一个消息体,用于存储必要的信息

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class DelayMessageDTO<T> {private T data;private Integer delayTime;private String topic;private List<String> keys;private String tags;
}

设计一个简单的算法用于获取延迟等级

private static final Integer[] delayTimes = {1, 5, 10, 30, 60, 120, 180, 240, 300, 360, 420, 480, 540, 600, 1200, 1800, 3600, 7200};private Integer getDelayLevel(Integer delayTime) {if (delayTime == null || delayTime == 0) {return null;}for (int i = 0; i < delayTimes.length; i++) {int level = i + 1;if (delayTime.equals(delayTimes[i])) {return level;}if (delayTime > delayTimes[i] && delayTime < delayTimes[i + 1]) {return level;}}return null;}

新增一个消费者专门用于处理该消息的延迟

@Service
@RocketMQMessageListener(topic = "delay-consumer-topic", consumerGroup = "boot_group_1")
@Slf4j
public class DelayMqConsumer implements RocketMQListener<String> {@ResourceMessageProduct messageProduct;@Overridepublic void onMessage(String message) {log.info("收到延迟消费消息,消息:{}", message);System.out.println(message);DelayMessageDTO delayMessageDTO = JSON.parseObject(message, DelayMessageDTO.class);log.info("剩余:{}s", delayMessageDTO.getDelayTime());try {messageProduct.SendDelayMessage(delayMessageDTO);} catch (MQBrokerException e) {throw new RuntimeException(e);} catch (RemotingException e) {throw new RuntimeException(e);} catch (InterruptedException e) {throw new RuntimeException(e);} catch (MQClientException e) {throw new RuntimeException(e);}}
}

消息的发送

    public SendResult SendDelayMessage(DelayMessageDTO data) throws MQBrokerException, RemotingException, InterruptedException, MQClientException {// 发送消息,打印日志if (data == null) {return null;}Integer delayLevel = this.getDelayLevel(data.getDelayTime());if (delayLevel == null) {return this.SendMessage(data.getTopic(), data, data.getKeys(), data.getTags(), delayLevel);}Integer delayTime = data.getDelayTime() - delayTimes[delayLevel - 1];data.setDelayTime(delayTime);return this.SendMessage("delay-consumer-topic", data, data.getKeys(), data.getTags(), delayLevel);}

在这里插入图片描述

在这里插入图片描述

至此我们就可以简单的实现一个消息的任意时间延迟,但是实际的延迟实现中,我们并不推荐该延迟方式,因为这种延迟完全依赖于mq的性能,如果遇到消息的积压等,我们的延迟将变得十分不可靠,很多开源社区中推荐:

  • 外部存储与调度:将消息和期望的发送时间存储到数据库或缓存中(如Redis),然后使用一个定时任务(如Quartz、Spring Scheduler)定期检查这些存储的消息,当达到预定时间时,再通过DefaultMQProducer发送出去。
  • 利用死信队列与TTL:虽然这不是直接实现任意时间延迟的方式,但可以通过设置消息的TTL(生存时间)和死信队列机制间接实现。消息到期后成为死信,触发特定逻辑进行处理或重定向到另一个队列进行实际发送。

在RocketMQ5.0版本,支持任意时段的延迟消息。在Github中最新的版本中已经解决了这个问题,[ISSUE #6203] Allow to publish delay message with arbitrary timestamp #6204

我们可以直接使用以下方式去实现:

    @Resourceprivate RocketMQTemplate rocketMqTemplate;public SendResult SendMessage(DelayMessageDTO data) {return rocketMqTemplate.syncSendDelayTimeSeconds(data.getTopic(), JSON.toJSONString(data), data.getDelayTime());}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/851221.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MySQL中的数据库约束

目录 导读&#xff1a; 约束类型 1、not null&#xff08;不能为空&#xff09; 2、unique(唯一) 3、default(默认值约束) 4、primary key(唯一)与unique 相同点&#xff1a; 不同点&#xff1a; auto_increment&#xff1a; 5、foreign key(外键) 语法形式&#xff…

康姿百德集团公司官网床垫价格透明,品质睡眠触手可及

选择康姿百德床垫&#xff0c;价格透明品质靠谱&#xff0c;让你拥有美梦连连 在当今社会&#xff0c;良好的睡眠质量被越来越多的人所重视。睡眠不仅关系到我们第二天的精力状态&#xff0c;更长远地影响着我们的身体健康。因此&#xff0c;选择一款合适的床垫对于获得优质睡…

antdv 穿梭框

antd的穿梭框的数据貌似只接收key和title&#xff0c;而且必须是字符串&#xff08;我测试不是字符串的不行&#xff09;&#xff0c; 所以要把后端返回的数据再处理一下得到我们想要的数据 除了实现简单的穿梭框功能&#xff0c;还想要重写搜索事件&#xff0c;想达到的效果是…

FastAPI:在大模型中使用fastapi对外提供接口

通过本文你可以了解到&#xff1a; 如何安装fastapi&#xff0c;快速接入如何让大模型对外提供API接口 往期文章回顾&#xff1a; 1.大模型学习资料整理&#xff1a;大模型学习资料整理&#xff1a;如何从0到1学习大模型&#xff0c;搭建个人或企业RAG系统&#xff0c;如何评估…

解决!word转pdf时,怎样保持图片不失真

#今天用word写了期末设计报告&#xff0c;里面有很多过程的截图&#xff0c;要打印出来&#xff0c;想到pdf图片不会错位&#xff0c;就转成了pdf&#xff0c;发现图片都成高糊了&#xff0c;找了好多方法&#xff0c;再不下载其他软件和插件的情况下&#xff0c;导出拥有清晰的…

BarTender 常见的使用要点

BarTender 简述 BarTender是由美国海鸥科技&#xff08;Seagull Scientific&#xff09;推出的一款条码打印软件&#xff0c;被广泛应用于标签、条形码、证卡和RFID标记的设计和打印领域。它在全球范围内拥有众多用户&#xff0c;被公认为标签打印方面的全球领先者。BarTender…

JavaScript基础用法(变量定义、输入输出、转义符、注释和编码规范)

天行健&#xff0c;君子以自强不息&#xff1b;地势坤&#xff0c;君子以厚德载物。 每个人都有惰性&#xff0c;但不断学习是好好生活的根本&#xff0c;共勉&#xff01; 文章均为学习整理笔记&#xff0c;分享记录为主&#xff0c;如有错误请指正&#xff0c;共同学习进步。…

史上最详细四叉树地图不同技术应用和代码详解

四叉树地图在计算机和机器人领域应用的很广&#xff0c;但是初学者可能会发现四叉树地图有各种不同的实现方式&#xff0c;很多在机器人领域不适用或是在计算机存储领域不适用。今天我就讲解下各类四叉树的实现方式和应用场景。 史上最详细四叉树地图不同技术应用和代码详解 本…

Bio-Info每日一题:Rosalind-06-Counting Point Mutations

&#x1f389; 进入生物信息学的世界&#xff0c;与Rosalind一起探索吧&#xff01;&#x1f9ec; Rosalind是一个在线平台&#xff0c;专为学习和实践生物信息学而设计。该平台提供了一系列循序渐进的编程挑战&#xff0c;帮助用户从基础到高级掌握生物信息学知识。无论你是初…

Navicat导入json文件(json文件数据导入到MySQL表中)

天行健&#xff0c;君子以自强不息&#xff1b;地势坤&#xff0c;君子以厚德载物。 每个人都有惰性&#xff0c;但不断学习是好好生活的根本&#xff0c;共勉&#xff01; 文章均为学习整理笔记&#xff0c;分享记录为主&#xff0c;如有错误请指正&#xff0c;共同学习进步。…

多目标融合参数搜索

多目标融合 权重分类目人群。 trick normlize 不同Score之间含义、量级和分布差异较大&#xff1a;评分计算的不同部分的意义、范围和分布存在显著差异&#xff0c;这使得直接比较或融合它们的结果变得困难。显式反馈&#xff08;如点赞率&#xff09;存在用户间差异&#…

【9】openssl 代码调试

0x01 前言 最近在学习密码学&#xff0c;但是国密算法(SM2&#xff0c;SM3,SM4,SM9)的细节都在openssl项目里&#xff0c;当然一些国际算法也在。想着看下代码执行过程和理论结合起来。中间走了一些弯路&#xff0c;做个笔记。 0x02 openssl安装 一开始认为是不是直接下载好的…

Layui实现下拉多选功能

1、问题概述? 提供源码下载 在项目中有很多地方需要使用到下拉框,并且实现选择多个信息,下面是展示。 支持如下功能: 1、分页 2、主题自定义 3、国际化 4、下拉方向 5、Tips修改等 6、Style自定义样式 7、取值 8、赋值 2、资源准备及测试? 2.1、资源下载

Leetcode 力扣113. 路径总和 II (抖音号:708231408)

给你二叉树的根节点 root 和一个整数目标和 targetSum &#xff0c;找出所有 从根节点到叶子节点 路径总和等于给定目标和的路径。 叶子节点 是指没有子节点的节点。 示例 1&#xff1a; 输入&#xff1a;root [5,4,8,11,null,13,4,7,2,null,null,5,1], targetSum 22 输出&a…

中山大学和字节发布「视频虚拟试穿」扩散模型VITON-DiT,一键生成换装后视频!

视频虚拟试穿技术日益受到关注&#xff0c;然而现有的工作局限于将服装图像转移到姿势和背景简单的视频上&#xff0c;对于随意拍摄的视频则效果不佳。最近&#xff0c;Sora 揭示了 Diffusion Transformer (DiT) 在生成具有真实场景的逼真视频方面的可扩展性&#xff0c;可以说…

Kubernetes入门-大简介

目录 何为微服务 何为云原生 何为编排器 “Kubernetes”这个名字来自希腊语&#xff0c;意思是“舵手”舵手是一个航海/航行术语&#xff0c;指掌舵的人从本质上说&#xff0c;Kubernetes是云原生微服务(cloud-native microservice)应用的编排器(orchestrator) 何为微服务 …

WordPress 高级缓存插件 W3 Total Cache 开启支持 Brotli 压缩算法

今天明月给大家分享一下 WordPress 高级缓存插件 W3 Total Cache 开启支持 Brotli 压缩算法的教程&#xff0c;在撰写【WordPress 高级缓存插件 W3 Total Cache Pro 详细配置教程】一文的时候明月就发现 W3 Total Cache 已经支持 Brotli 压缩算法了&#xff0c;可惜的是在安装完…

ctfshow-web入门-命令执行(web53-web55)

目录 1、web53 2、web54 3、web55 1、web53 这里的代码有点不一样&#xff0c;说一下这两种的区别&#xff1a; &#xff08;1&#xff09;直接执行 system($c); system($c);这种方式会直接执行命令 $c 并将命令的输出直接发送到标准输出&#xff08;通常是浏览器&#xff…

【qsort函数】

前言 我们要学习qsort函数并利用冒泡函数仿照qsort函数 首先我们要了解一下qsort&#xff08;快速排序&#xff09; 这是函数的的基本参数 void qsort (void* base, size_t num, size_t size,int (*compar)(const void*,const void*)); 简单解释一下 base&#xff1a;指向…

23.在游戏中按下Home键呼出辅助窗口

上一个内容&#xff1a;22.钩子注入原理 在 22.钩子注入原理 它的代码上进行修改 效果图&#xff1a; 首先在CWndMain.h文件中添加下图红框里的东西 ChangeShowState函数的实现 void CWndMain::ChangeShowState() {UiShow !UiShow;ShowWindow(UiShow); } OnInitDialog函数…