SpringBoot集成Apache RocketMQ详解

文章目录

  • 0. 前言
  • 1. Spring Boot 集成Apache RocketMQ详细步骤
    • 1.1.添加依赖
    • 1.2.配置RocketMQ
    • 1.3.创建消息生产者(Producer)
    • 1.4.创建消息消费者(Consumer)
  • 2. 测试验证
  • 3. 常见报错
  • 4. 参考文档
  • 5. 源码地址

在这里插入图片描述

0. 前言

上个章节我们学习了RocketMQ的学习环境安装,讲了两种安装方式 1. docker使用官方镜像安装,2.使用源码方式安装。安装教程如下
如果已经安装了RocketMQ 学习环境可以略过此章节《【实践篇(一)】RocketMQ入门之学习环境搭建》
本章节,我们学习Spring Boot 集成Apache RocketMQ。并验证 在SpringBoot应用中展示如何使用Apache RocketMQ的生产者(Producer)进行消息发送。
这段代码实现了以下类型的消息发送:
使用Apache RocketMQ 官方的依赖库 RocketMQTemplate,实现同步、异步等消息。

  1. 同步消息:使用syncSend方法,生产者会等待消息服务器回复确认后才会继续发送下一条消息。

  2. 异步消息:使用asyncSend方法,生产者发送消息后不等待服务器回复,直接发送下一条消息。

  3. 单向消息:使用sendOneWay方法,生产者只负责发送消息,不等待服务器回复,也不关注发送结果。

  4. 顺序消息:使用sendOrderly方法,按照消息的发送顺序进行消费(First-In-First-Out)。

  5. 延迟消息:使用sendDelayed方法,消息被发送后,不会立即被消费,等待特定的延迟时间后,才能被消费。

  6. 批量消息:使用sendBatch方法,一次发送多条消息,可以有效提高发送的吞吐量。

关于RocketMQ消息的消息模型介绍和使用,我专门写了一篇博客,搭建可以了解
《RocketMQ 消息传递模型》https://blog.csdn.net/wangshuai6707/article/details/132863088

1. Spring Boot 集成Apache RocketMQ详细步骤

1.1.添加依赖

在SpringBoot项目的pom.xml文件中添加RocketMQ的依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.15</version><relativePath/></parent><groupId>com.icepip.project</groupId><artifactId>springboot-icepip-rocketMQ-example</artifactId><version>0.0.1-SNAPSHOT</version><name>springboot-icepip-rocketMQ-example</name><description>Spring boot 集成rocketMQ 示例</description><properties><java.version>8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.0.4</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-thymeleaf</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build></project>

1.2.配置RocketMQ

application.properties文件中配置RocketMQ的相关信息:

rocketmq.name-server=你的RocketMQ服务IP:9876
rocketmq.producer.group=my-producer
# 刚开始未配置 导致超时报错
rocketmq.producer.sendMessageTimeout=10000

1.3.创建消息生产者(Producer)

package com.icepip.project.mqtt.controller;import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.ArrayList;
import java.util.List;
/***  SpringBoot集成Apache RocketMQ详解* @author 冰点* @version 1.0.0* @date 2023/9/9 17:02*/@RestController
@RequestMapping("/producer")
public class ProducerController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;/*** 同步发送消息到指定主题* @param message* @return*/@GetMapping("/syncSend")public String syncSend(String message) {// 同步发送消息到指定主题rocketMQTemplate.syncSend("test-topic", message);return "Sync message: " + message + " sent";}/*** 异步发送消息到指定主题* @param message* @return*/@GetMapping("/asyncSend")public String asyncSend(String message) {// 异步发送消息到指定主题rocketMQTemplate.asyncSend("test-topic", MessageBuilder.withPayload(message).build(), new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("Async message sent successfully, result: " + sendResult);}@Overridepublic void onException(Throwable throwable) {System.err.println("Failed to send async message: " + throwable.getMessage());}}, 3000, 3); // 3000 ms timeout, delay level 3return "Async message: " + message + " sent";}/*** 发送单向消息到指定主题,无需等待Broker的确认* @param message* @return*/@GetMapping("/sendOneWay")public String sendOneWay(String message) {// 发送单向消息到指定主题,无需等待Broker的确认rocketMQTemplate.sendOneWay("test-topic", message);return "OneWay message: " + message + " sent";}// 发送顺序消息@GetMapping("/sendOrderly")public String sendOrderly(String message) {// 发送顺序消息到指定主题rocketMQTemplate.syncSendOrderly("test-topic", message, "order");return "Orderly message: " + message + " sent";}// 发送延迟消息@GetMapping("/sendDelayed")public String sendDelayed(String message) {// 发送延迟消息到指定主题,延迟级别为3rocketMQTemplate.syncSend("test-topic", MessageBuilder.withPayload(message).build(), 1000, 3);return "Delayed message: " + message + " sent";}// 发送批量消息@GetMapping("/sendBatch")public String sendBatch() {List<String> messages = new ArrayList<>();messages.add("message1");messages.add("message2");// 批量发送消息到指定主题rocketMQTemplate.syncSend("test-topic", messages);return "Batch messages sent";}
}

1.4.创建消息消费者(Consumer)

package com.icepip.project.mqtt.handler;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;/*** 定义一个消费者,监听test-topic主题的消息* @author 冰点* @version 1.0.0* @date 2023/9/9 16:29*/@Service
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "my-consumer_test-topic")
public class MyConsumer implements RocketMQListener<String>{// 当收到消息时,该方法将被调用@Overridepublic void onMessage(String message) {System.out.println("Received message: "+ message);}
}

2. 测试验证

在这里插入图片描述
在这里插入图片描述

3. 常见报错

  1. See http://rocketmq.apache.org/docs/faq/ for further details.; nested exception is org.apache.rocketmq.client.exception.MQClientException: Send [3] times, still failed, cost [6386]ms, Topic: test-topic, BrokersSent: [698f11314447, 698f11314447, 698f11314447]
    See http://rocketmq.apache.org/docs/faq/ for further details.] with root cause

org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <172.17.0.8:10911> failed
解决办法,修改Broker的IP为宿主机IP
进容器修改配置文件,修改完启动服务 。启动之前先kill 掉容器里原来的Broker。
nohup sh mqbroker -c /home/rocketmq/rocketmq-4.9.2/broker.conf &
在这里插入图片描述

4. 参考文档

  1. 官方文档链接:https://rocketmq.apache.org/docs/

  2. GitHub链接:https://github.com/apache/rocketmq-spring

5. 源码地址

我的github https://github.com/wangshuai67/icepip-springboot-action-examples

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/80885.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

4G版本云音响设置教程腾讯云平台版本

文章目录 4G本云音响设置教程介绍一、申请设备三元素1.腾讯云物联网平台2.创建产品3.设置产品参数4.添加设备5.获取三元素 二、设置设备三元素1.打开MQTTConfigTools2.计算MQTT参数3.使用USB连接设备4.设置参数 三、腾讯云物联网套件协议使用说明1.推送协议信息2.topic规则说明…

JDK16特性

文章目录 一、JAVA16概述二、语法层面变化1.密封类&#xff08;第二次预览&#xff09;2.instanceof 的模式匹配3.记录类习惯4基于值的类的警告 三、API层面变化1 Vector API&#xff08;孵化器&#xff09; 四、其他变化1.启用 C14 语言功能2.从 Mercurial 迁移到 Git3.ZGC&am…

Microsoft Excel 101 简介

什么是 Microsoft Excel&#xff1f; Microsoft Excel 是一个电子表格程序&#xff0c;用于记录和分析数值数据。 将电子表格想像成构成表格的列和行的集合。 字母通常分配给列&#xff0c;数字通常分配给行。 列和行相交的点称为像元。 单元格的地址由代表列的字母和代表行的…

【C++】LeetCode 160 相交链表

今天再写一道算法题&#xff08;这两周都写算法题有点摆烂&#xff09; 题目 给你两个单链表的头节点 headA 和 headB &#xff0c;请你找出并返回两个单链表相交的起始节点。如果两个链表不存在相交节点&#xff0c;返回 null 。 图示两个链表在节点 c1 开始相交&#xff1…

安防监控/视频汇聚/云存储/AI智能视频分析平台EasyCVR下级海康设备无法级联是什么原因?

安防视频监控平台/视频集中存储/云存储/磁盘阵列EasyCVR可拓展性强、视频能力灵活、部署轻快&#xff0c;可支持的主流标准协议有国标GB28181、RTSP/Onvif、RTMP等&#xff0c;以及支持厂家私有协议与SDK接入&#xff0c;包括海康Ehome、海大宇等设备的SDK等。 有用户反馈&…

[JAVAee]spring-Bean对象的执行流程与生命周期

执行流程 spring中Bean对象的执行流程大致分为四步: 启动Spring容器实例化Bean对象Bean对象注册到Spring容器中将Bean对象装配到所需的类中 ①启动Spring容器,在main方法中获取spring上下文对象并配备spring. import demo.*;import org.springframework.context.Applicati…

pt26django教程

admin 后台数据库管理 django 提供了比较完善的后台管理数据库的接口&#xff0c;可供开发过程中调用和测试使用 django 会搜集所有已注册的模型类&#xff0c;为这些模型类提拱数据管理界面&#xff0c;供开发者使用 创建后台管理帐号: [rootvm mysite2]# python3 manage.…

LeetCode 753. 破解保险箱【欧拉回路,DFS】困难

本文属于「征服LeetCode」系列文章之一&#xff0c;这一系列正式开始于2021/08/12。由于LeetCode上部分题目有锁&#xff0c;本系列将至少持续到刷完所有无锁题之日为止&#xff1b;由于LeetCode还在不断地创建新题&#xff0c;本系列的终止日期可能是永远。在这一系列刷题文章…

如何从第一性原则的原理分解数学问题

如何从第一性原则的原理分解数学问题 摘要&#xff1a;牛津大学入学考试题目展示了所有优秀数学家都使用的系统的第一原则推理&#xff0c;而GPT4仍然在这方面有困难 作者&#xff1a;Keith McNulty 我们中的许多人都熟悉直角三角形的边的规则。根据毕达哥拉斯定理&#xff0c;…

2023年墨西哥 SP/BMV IPC 研究报告

第一章 指数概况 1.1 指数基本情况 墨西哥 S&P/BMV IPC 指数衡量在墨西哥证券交易所 (Bolsa Mexicana de Valores, BMV)上市&#xff0c;规模最大、流动性最高的股票表现。提供一个覆盖墨西哥股市的广泛、具有代表性且可轻易复制的指数。根据多元化要求&#xff0c;按市值…

【深度学习】 Python 和 NumPy 系列教程(廿五):Matplotlib详解:3、多子图和布局:subplot()函数

目录 一、前言 二、实验环境 三、Matplotlib详解 1、2d绘图类型 2、3d绘图类型 3、多子图和布局 1. subplot()函数 简单示例 一、前言 Python是一种高级编程语言&#xff0c;由Guido van Rossum于1991年创建。它以简洁、易读的语法而闻名&#xff0c;并且具有强大的功能…

多表查询——“MySQL数据库”

各位CSDN的uu们好呀&#xff0c;今天&#xff0c;小雅兰的内容是MySQL数据库中的多表查询啦&#xff0c;这个内容确实是一个难点&#xff0c;下面&#xff0c;让我们进入多表查询的世界吧&#xff01;&#xff01;&#xff01; 一个案例引发的多表连接 多表查询分类讲解 SQL9…

精品SpringCloud的高校招生信息管理系统-微服务分布式

《[含文档PPT源码等]精品基于SpringCloud实现的高校招生信息管理系统-微服务-分布式》该项目含有源码、文档、PPT、配套开发软件、软件安装教程、项目发布教程等 软件开发环境及开发工具&#xff1a; 开发语言&#xff1a;Java 框架&#xff1a;springcloud JDK版本&#x…

FL Studio v21.1.1.3750 Producer Edition inc crack官方中文免费激活版功能介绍及百度网盘下载

FL Studio v21.1.1.3750 Producer Edition inc crack官方中文免费激活版是一款功能强大的软件音乐制作环境或数字音频工作站&#xff08;DAW&#xff09;。它代表了25多年的创新发展&#xff0c;在一个软件包中拥有您所需的一切&#xff0c;以创作、编排、录制、编辑、混音和掌…

Linux命令200例:dip用于用户与远程主机建立通信连接

&#x1f3c6;作者简介&#xff0c;黑夜开发者&#xff0c;CSDN领军人物&#xff0c;全栈领域优质创作者✌。CSDN专家博主&#xff0c;阿里云社区专家博主&#xff0c;2023年6月csdn上海赛道top4。 &#x1f3c6;数年电商行业从业经验&#xff0c;历任核心研发工程师&#xff0…

主题教育活动知识竞赛小程序界面分享

主题教育活动知识竞赛小程序界面分享

微服务保护-授权规则

个人名片&#xff1a; 博主&#xff1a;酒徒ᝰ. 个人简介&#xff1a;沉醉在酒中&#xff0c;借着一股酒劲&#xff0c;去拼搏一个未来。 本篇励志&#xff1a;三人行&#xff0c;必有我师焉。 本项目基于B站黑马程序员Java《SpringCloud微服务技术栈》&#xff0c;SpringCloud…

lighttpd以及socket和WebSocket编程

综述 本文涉及到下图绿色背景部分的内容&#xff1a; 左侧位于Linux下&#xff0c;其中包括lighttpd和socket程序&#xff1b;右侧是WebSocket程序。两者通过网络交互。 本文介绍lighttpd的基本使用方式&#xff0c;并通过编程完成一个socket服务器与浏览器端的WebSocket客户…

本地docker注册证书docker login连接到harbor仓库、利用shell脚本将大量镜像pull、tag、push到私有harbor仓库

1. 本地docker注册证书docker login连接到harbor仓库&#xff1a; 我们使用docker login/push/pull去与Harbor打交道&#xff0c;上传下载镜像等。 但是可能会出现x509: certificate signed by unknown authority之类的错误。 [roottest01 harbor.dev]# docker login harbor.d…

数学建模__动态规划

动态规划就是&#xff0c;将任务每一步均记录下来&#xff0c;以便将来重复使用时能够直接调用 问题描述&#xff1a;给定n个物品&#xff0c;每个物品的重量是Wi,价值是Vi&#xff0c;但是背包最多能装下capacity重量的物品&#xff0c;问我们如何选择才能利益最大化。 这里涉…