Spring Boot 集成 RocketMQ 全流程指南:从依赖引入到消息收发

前言

在分布式系统中,消息中间件是解耦服务、实现异步通信的核心组件。RocketMQ 作为阿里巴巴开源的高性能分布式消息中间件,凭借其高吞吐、低延迟、高可靠等特性,成为企业级应用的首选。而 Spring Boot 通过其“约定优于配置”的设计理念,极大简化了项目开发的复杂度。本文将通过 手动连接配置连接 两种方式,详细讲解如何在 Spring Boot 中集成 RocketMQ,实现消息的同步与异步发送,并提供完整示例代码。

微信图片_20250414010059

一、环境准备

在开始前,请确保:

  1. JDK 17、Maven 3.6+、Spring Boot 2.7+。
  2. 安装RocketMQ服务(本地或远程),推荐使用RocketMQ Docker镜像快速搭建(可参考之前文章)。

二、示例—Springboot集成mq(手动连接)

通过编码方式初始化生产者,适用于需要动态控制资源的场景。

2.1 新建项目

image-20230727182626779

image-20230727182647022

image

2.2 引入依赖

       <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.4</version></dependency>

image

2.3 生产者发送消息

  • 构建一个消息生产者DefaultMQProducer实例,然后指定生产者组为jihaiProducer;
  • 指定NameServer的地址:服务器的ip:9876,因为需要从NameServer拉取Broker的信息
  • producer.start() 启动生产者
  • 构建一个内容为:技海拾贝的消息1,然后指定这个消息往jihaishibei这个topic发送
  • producer.send(msg):发送消息,打印结果
  • 关闭生产者
public class Producer {public static void main(String[] args) throws Exception {//创建一个生产者,指定生产者组为jihaiProducerDefaultMQProducer producer = new DefaultMQProducer("jihaiProducer");// 指定NameServer的地址producer.setNamesrvAddr("localhost:9876");// 第一次发送可能会超时,设置的比较大producer.setSendMsgTimeout(60000);// 启动生产者producer.start();// 创建一条消息// topic为 jihaishibei// 消息内容为 技海拾贝的消息1// tags 为 TagAMessage msg = new Message("jihaishibei", "TagA", "技海拾贝的消息1 ".getBytes(RemotingHelper.DEFAULT_CHARSET));// 发送消息并得到消息的发送结果,然后打印SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);// 关闭生产者producer.shutdown();}}

image

启动,发送消息

image

在控制台可以看到这条消息

image

image

这里就能看到发送消息的详细信息。

左下角消息的消费的消费,因为我们还没有消费者订阅这个topic,所以左下角没数据。

2.4 消费者消费消息

  • 创建一个消费者实例对象,指定消费者组为jihaiConsumer
  • 指定NameServer的地址:服务器的ip:9876
  • 订阅 jihaishibei这个topic的所有信息
  • consumer.registerMessageListener ,这个很重要,是注册一个监听器,这个监听器是当有消息的时候就会回调这个监听器,处理消息,所以需要用户实现这个接口,然后处理消息。
  • 启动消费者

public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {// 通过push模式消费消息,指定消费者组DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("jihaiConsumer");// 指定NameServer的地址consumer.setNamesrvAddr("localhost:9876");// 订阅这个topic下的所有的消息consumer.subscribe("jihaishibei", "*");// 注册一个消费的监听器,当有消息的时候,会回调这个监听器来消费消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {System.out.printf("消费消息:%s", new String(msg.getBody()) + "\n");}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者consumer.start();System.out.printf("Consumer Started.%n");}
}

image

启动服务,进行消费

image

在控制台,发现被jihaiConsumer这个消费者组给消费了。

image

三、示例2—Springboot集成mq(配置连接)

在 Spring Boot 中,可以通过配置文件简化 RocketMQ 的连接配置。以下是在 application.yml​ 文件中进行的配置:

3.1 配置文件修改

image

image

image

spring:application:name: rocket-mq-demorocketmq:name-server: 127.0.0.1:9876producer:group: rocket-mq-demo-producersend-message-timeout: 10000comsumer:group: rocket-mq-demo-comsumersend-message-timeout: 10000

image

3.2 添加依赖

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.3.0</version>  
</dependency>

根据需要选择最新版本,从中央仓库可以查看https://central.sonatype.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter

image

image

备注:如果添加rocketmq-client依赖,先注释这个依赖

3.3 消费者service类

import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;/*** messageModel=MessageModel.CLUSTERING* 监听模式,有消息就会消费*/
@Service
@RocketMQMessageListener(topic = "jihaishibei-topic", consumerGroup = "rocket-mq-demo-comsumer", messageModel = MessageModel.CLUSTERING)
public class RocketMQConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String s) {System.out.printf("收到消息: %s\n", s);}
}

3.4 生产者service类

import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;@Service
public class RocketMQProducer {@Autowiredprivate RocketMQTemplate rocketMQTemplate;private final String topic = "jihaishibei-topic";// 1.同步发送消息// 同步发送是指发送方发送一条消息后,会等待服务器返回确认信息后再进行后续操作。这种方式适用于需要可靠性保证的场景。public void createAndSend(String message){rocketMQTemplate.convertAndSend(topic, message);System.out.printf("同步发送结果: %s\n", message);}// 1.同步发送消息// 同步发送是指发送方发送一条消息后,会等待服务器返回确认信息后再进行后续操作。这种方式适用于需要可靠性保证的场景。public void sendSyncMessage(String message){SendResult sendResult = rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(message).build());System.out.println(sendResult.getMsgId());System.out.printf("同步发送结果: %s\n", message);}// 2.异步发送消息// 异步发送是指发送方发送消息后,不等待服务器返回确认信息,而是通过回调接口处理返回结果。这种方式适用于对响应时间要求较高的场景。public void sendAsyncMessage(String message){rocketMQTemplate.asyncSend(topic, MessageBuilder.withPayload(message).build(), new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.printf("异步发送成功: %s\n", sendResult);}@Overridepublic void onException(Throwable throwable) {System.out.printf("异步发送失败: %s\n", throwable.getMessage());}});}// 3.单向发送消息// 单向发送是指发送方只负责发送消息,不关心服务器的响应。该方式适用于对可靠性要求不高的场景,如日志收集。public void sendOneWayMessage(String message){rocketMQTemplate.sendOneWay(topic, MessageBuilder.withPayload(message).build());System.out.println("单向消息发送成功");}
}

3.5 测试controller类

@RequestMapping("api")
@RestController
public class RocketController {@Autowiredprivate RocketMQProducer rocketMQProducer;@GetMapping("/createAndSend")public String createAndSend(@RequestParam String message) {rocketMQProducer.createAndSend(message);return "同步消息发送成功";}@GetMapping("/sendSync")public String sendSync(@RequestParam String message) {rocketMQProducer.sendSyncMessage(message);return "同步消息发送成功";}@GetMapping("/sendAsync")public String sendAsync(@RequestParam String message) {rocketMQProducer.sendAsyncMessage(message);return "异步消息发送中";}@GetMapping("/sendOneWay")public String sendOneWay(@RequestParam String message) {rocketMQProducer.sendOneWayMessage(message);return "单向消息发送成功";}
}

3.6 启动服务

image

3.7 测试

同步消息1

image

image

image

同步消息2

image

image

image

异步消息

image

image

image

单向发送消息

image

image

image

四、结束语

本文通过手动连接与配置连接两种方式,展示了Spring Boot与RocketMQ的集成实践。手动连接帮助开发者理解底层API逻辑,而Spring Boot的配置化集成则极大简化了开发流程。无论是同步消息的可靠性保障,还是异步消息的性能优化,RocketMQ均能与Spring Boot无缝协作,为分布式系统提供高效的消息通信能力。

未来可进一步探索集群部署、消息重试机制及监控告警,以实现更健壮的消息服务。希望本文能为开发者快速构建高可用的消息系统提供参考!

求点关注-gif动图 138_爱给网_aigei_com

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/76836.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

HTTPS实现安全的关键方法及技术细节

HTTPS&#xff08;HyperText Transfer Protocol Secure&#xff09;通过多种技术手段实现数据传输的安全性&#xff0c;其核心机制基于SSL/TLS协议&#xff0c;并结合数字证书、加密算法等技术。 SSL&#xff1a;Secure Sockets Layer&#xff0c;安全套接字层 TLS&#xff1a;…

Java【多线程】(8)CAS与JUC组件

目录 1.前言 2.正文 2.1CAS概念 2.2CAS两种用途 2.2.1实现原子类 2.2.2实现自旋锁 2.3缺陷&#xff1a;ABA问题 2.4JUC组件 2.4.1Callable接口 2.4.2ReentrantLock&#xff08;与synchronized对比&#xff09; 2.4.3Semaphore信号量 2.4.4CountDownLatch 3.小结 1…

【Docker】离线安装Docker

背景 离线安装Docker的必要性&#xff0c;第一&#xff0c;在目前数据安全升级的情况下&#xff0c;很多外网已经基本不好访问了。第二&#xff0c;如果公司有对外部署的需求&#xff0c;那么难免会存在对方只有内网的情况&#xff0c;那么我们就要做到学会离线安装。 下载安…

MecAgent Copilot:机械设计师的AI助手,开启“氛围建模”新时代

MecAgent Copilot作为机械设计师的AI助手,正通过多项核心技术推动机械设计进入“氛围建模”新时代。以下从功能特性、技术支撑和应用场景三方面解析其创新价值: 一、核心功能特性 ​​智能草图生成与参数化建模​​ 支持自然语言输入生成设计草图和3D模型,如输入“剖面透视…

MCU屏和RGB屏

一、MCU屏 MCU屏‌&#xff1a;全称为单片机控制屏&#xff08;Microcontroller Unit Screen&#xff09;&#xff0c;在显示屏背后集成了单片机控制器&#xff0c;因此&#xff0c;MCU屏里面有专用的驱动芯片。驱动芯片如&#xff1a;ILI9488、ILI9341、SSD1963等。驱动芯片里…

7.5 使用MobileNet v3进行图像的区分

MobileNet v3是Google在2019年提出的轻量级卷积神经网络结构,旨在提高在移动设备上的速度和准确性,广泛的用于轻量级网络。 MobileNet v3-Small的网络结构如下,它的输入是224x224的3通道彩色图片。 使用过程如下: 1.创建模型、修改最终分类数量 #1.创建mobilenet_v3_small…

构建面向大模型训练与部署的一体化架构:从文档解析到智能调度

作者&#xff1a;汪玉珠&#xff5c;算法架构师 标签&#xff1a;大模型训练、数据集构建、GRPO、自监督聚类、指令调度系统、Qwen、LLaMA3 &#x1f9ed; 背景与挑战 随着 Qwen、LLaMA3 等开源大模型不断进化&#xff0c;行业逐渐从“能跑通”迈向“如何高效训练与部署”的阶…

PostgreSQL技术大讲堂 - 第86讲:数据安全之--data_checksums天使与魔鬼

PostgreSQL技术大讲堂 - 第86讲&#xff0c;主题&#xff1a;数据安全之--data_checksums天使与魔鬼 1、data_checksums特性 2、避开DML规则&#xff0c;嫁接非法数据并合法化 3、避开约束规则&#xff0c;嫁接非法数据到表中 4、避开数据检查&#xff0c;读取坏块中的数据…

【机器学习】机器学习笔记

1 机器学习定义 计算机程序从经验E中学习&#xff0c;解决某一任务T&#xff0c;进行某一性能P&#xff0c;通过P测定在T上的表现因经验E而提高。 eg&#xff1a;跳棋程序 E&#xff1a; 程序自身下的上万盘棋局 T&#xff1a; 下跳棋 P&#xff1a; 与新对手下跳棋时赢的概率…

Ubuntu20.04 设置开机自启

参考&#xff1a; Ubuntu20.04 设置开机自启_ubuntu进bos系统-CSDN博客

数据库中存储过程的流程语句讲解

一、流程语句讲解 二、总结 一、流程语句讲解 1.1 if语句讲解 语法&#xff1a; IF condition THENstatements; ELSEIF condition THENstatements; ELSEstatements; END IF; 题目示例&#xff1a; # 判断成绩等级 # 输入学生的编号,取出学生的第一门课&#xff0c;然后判断…

kubernetes》》k8s》》ConfigMap 、Secret

configmap官网 ConfigMap是一种 API 对象&#xff0c;使用时&#xff0c; Pods 可以将其用作环境变量、命令行参数或者存储卷中的配置文件。ConfigMap将配置和Pod解耦&#xff0c;更易于配置文件的更改和管理。ConfigMap 并不提供保密或者加密功能。 如果你想存储的数据是机密的…

git在IDEA中使用技巧

git在IDEA中使用技巧 merge和rebase 参考&#xff1a;IDEA小技巧-Git的使用 git回滚、强推、代码找回 参考&#xff1a;https://www.bilibili.com/video/BV1Wa411a7Ek?spm_id_from333.788.videopod.sections&vd_source2f73252e51731cad48853e9c70337d8e cherry pick …

Spring 事务失效的原因及解决方案全解析,来复习了

Spring 事务失效是指在使用 Spring 声明式事务管理时&#xff0c;预期的事务行为&#xff08;如事务的开启、提交、回滚等&#xff09;未按预期执行&#xff0c;导致数据操作未满足 ACID 特性&#xff08;原子性、一致性、隔离性、持久性&#xff09;&#xff0c;从而引发数据不…

「出海匠」借助CloudPilot AI实现AWS降本60%,支撑AI电商高速增长

&#x1f50e;公司简介 「出海匠」&#xff08;chuhaijiang.com&#xff09;是「数绘星云」公司打造的社交内容电商服务平台&#xff0c;专注于为跨境生态参与者提供数据支持与智能化工作流。平台基于大数据与 AI 技术&#xff0c;帮助商家精准分析市场趋势、优化运营策略&…

python每日一练

题目一 输入10个整数,输出其中不同的数,即如果一个数出现了多次,只输出一次(要求按照每一个不同的数第一次出现的顺序输出)。 解题 错误题解 a list(map(int,input().split())) b [] b.append(a[i]) for i in range(2,11):if a[i] not in b:b.append(a[i]) print(b)但是会…

Docker实战:从零构建高可用的MySQL主从集群与Redis集群

在分布式系统架构中&#xff0c;数据库集群是保障数据高可用和性能的关键组件。本文将通过Docker技术&#xff0c;手把手教你搭建MySQL主从集群和Redis Cluster&#xff0c;并分享独创的优化技巧与运维实战经验。 一、为什么选择Docker部署集群&#xff1f; 传统数据库集群搭…

STM32电机库 电机控制特性

ST MC FW库提供FOC和六步法两种电机控制方式。这使得它能够驱动永磁同步电机 (PMSM) 和无刷直流电机 (BLDC)。FOC 更适合 PMSM,而六步法更适合 BLDC 电机。该固件可以驱动内嵌式PMSM 和标贴式PMSM。 ST Motor Control 固件库提供以下功能: FOC SVPWM 生成: 可配置的 PW…

Go:方法

方法声明 type point struct { X, Y float64 }// 普通函数 func Distance(p, q Point) float64 {return math.Hypot(q.x - p.x, q.y - p.Y) }// Point类型的方法 func (p Point) Distance(q Point) float64 {return math.Hypot(q.x - p.x, q.y - p.Y) }方法声明与普通函数声…

前端基础之《Vue(4)—响应式原理》

一、什么是响应式 1、响应式英文reactive 当你get/set一个变量时&#xff0c;你有办法可以“捕获到”这种行为。 2、一个普通对象和一个响应式对象对比 &#xff08;1&#xff09;普通对象 <script>// 这种普通对象不具备响应式var obj1 {a: 1,b: 2} </script>…