SpringBoot集成Apache RocketMQ详解

文章目录

  • 0. 前言
  • 1. Spring Boot 集成Apache RocketMQ详细步骤
    • 1.1.添加依赖
    • 1.2.配置RocketMQ
    • 1.3.创建消息生产者(Producer)
    • 1.4.创建消息消费者(Consumer)
  • 2. 测试验证
  • 3. 常见报错
  • 4. 参考文档
  • 5. 源码地址

在这里插入图片描述

0. 前言

上个章节我们学习了RocketMQ的学习环境安装,讲了两种安装方式 1. docker使用官方镜像安装,2.使用源码方式安装。安装教程如下
如果已经安装了RocketMQ 学习环境可以略过此章节《【实践篇(一)】RocketMQ入门之学习环境搭建》
本章节,我们学习Spring Boot 集成Apache RocketMQ。并验证 在SpringBoot应用中展示如何使用Apache RocketMQ的生产者(Producer)进行消息发送。
这段代码实现了以下类型的消息发送:
使用Apache RocketMQ 官方的依赖库 RocketMQTemplate,实现同步、异步等消息。

  1. 同步消息:使用syncSend方法,生产者会等待消息服务器回复确认后才会继续发送下一条消息。

  2. 异步消息:使用asyncSend方法,生产者发送消息后不等待服务器回复,直接发送下一条消息。

  3. 单向消息:使用sendOneWay方法,生产者只负责发送消息,不等待服务器回复,也不关注发送结果。

  4. 顺序消息:使用sendOrderly方法,按照消息的发送顺序进行消费(First-In-First-Out)。

  5. 延迟消息:使用sendDelayed方法,消息被发送后,不会立即被消费,等待特定的延迟时间后,才能被消费。

  6. 批量消息:使用sendBatch方法,一次发送多条消息,可以有效提高发送的吞吐量。

关于RocketMQ消息的消息模型介绍和使用,我专门写了一篇博客,搭建可以了解
《RocketMQ 消息传递模型》https://blog.csdn.net/wangshuai6707/article/details/132863088

1. Spring Boot 集成Apache RocketMQ详细步骤

1.1.添加依赖

在SpringBoot项目的pom.xml文件中添加RocketMQ的依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.15</version><relativePath/></parent><groupId>com.icepip.project</groupId><artifactId>springboot-icepip-rocketMQ-example</artifactId><version>0.0.1-SNAPSHOT</version><name>springboot-icepip-rocketMQ-example</name><description>Spring boot 集成rocketMQ 示例</description><properties><java.version>8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.0.4</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-thymeleaf</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build></project>

1.2.配置RocketMQ

application.properties文件中配置RocketMQ的相关信息:

rocketmq.name-server=你的RocketMQ服务IP:9876
rocketmq.producer.group=my-producer
# 刚开始未配置 导致超时报错
rocketmq.producer.sendMessageTimeout=10000

1.3.创建消息生产者(Producer)

package com.icepip.project.mqtt.controller;import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.ArrayList;
import java.util.List;
/***  SpringBoot集成Apache RocketMQ详解* @author 冰点* @version 1.0.0* @date 2023/9/9 17:02*/@RestController
@RequestMapping("/producer")
public class ProducerController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;/*** 同步发送消息到指定主题* @param message* @return*/@GetMapping("/syncSend")public String syncSend(String message) {// 同步发送消息到指定主题rocketMQTemplate.syncSend("test-topic", message);return "Sync message: " + message + " sent";}/*** 异步发送消息到指定主题* @param message* @return*/@GetMapping("/asyncSend")public String asyncSend(String message) {// 异步发送消息到指定主题rocketMQTemplate.asyncSend("test-topic", MessageBuilder.withPayload(message).build(), new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("Async message sent successfully, result: " + sendResult);}@Overridepublic void onException(Throwable throwable) {System.err.println("Failed to send async message: " + throwable.getMessage());}}, 3000, 3); // 3000 ms timeout, delay level 3return "Async message: " + message + " sent";}/*** 发送单向消息到指定主题,无需等待Broker的确认* @param message* @return*/@GetMapping("/sendOneWay")public String sendOneWay(String message) {// 发送单向消息到指定主题,无需等待Broker的确认rocketMQTemplate.sendOneWay("test-topic", message);return "OneWay message: " + message + " sent";}// 发送顺序消息@GetMapping("/sendOrderly")public String sendOrderly(String message) {// 发送顺序消息到指定主题rocketMQTemplate.syncSendOrderly("test-topic", message, "order");return "Orderly message: " + message + " sent";}// 发送延迟消息@GetMapping("/sendDelayed")public String sendDelayed(String message) {// 发送延迟消息到指定主题,延迟级别为3rocketMQTemplate.syncSend("test-topic", MessageBuilder.withPayload(message).build(), 1000, 3);return "Delayed message: " + message + " sent";}// 发送批量消息@GetMapping("/sendBatch")public String sendBatch() {List<String> messages = new ArrayList<>();messages.add("message1");messages.add("message2");// 批量发送消息到指定主题rocketMQTemplate.syncSend("test-topic", messages);return "Batch messages sent";}
}

1.4.创建消息消费者(Consumer)

package com.icepip.project.mqtt.handler;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;/*** 定义一个消费者,监听test-topic主题的消息* @author 冰点* @version 1.0.0* @date 2023/9/9 16:29*/@Service
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "my-consumer_test-topic")
public class MyConsumer implements RocketMQListener<String>{// 当收到消息时,该方法将被调用@Overridepublic void onMessage(String message) {System.out.println("Received message: "+ message);}
}

2. 测试验证

在这里插入图片描述
在这里插入图片描述

3. 常见报错

  1. See http://rocketmq.apache.org/docs/faq/ for further details.; nested exception is org.apache.rocketmq.client.exception.MQClientException: Send [3] times, still failed, cost [6386]ms, Topic: test-topic, BrokersSent: [698f11314447, 698f11314447, 698f11314447]
    See http://rocketmq.apache.org/docs/faq/ for further details.] with root cause

org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <172.17.0.8:10911> failed
解决办法,修改Broker的IP为宿主机IP
进容器修改配置文件,修改完启动服务 。启动之前先kill 掉容器里原来的Broker。
nohup sh mqbroker -c /home/rocketmq/rocketmq-4.9.2/broker.conf &
在这里插入图片描述

4. 参考文档

  1. 官方文档链接:https://rocketmq.apache.org/docs/

  2. GitHub链接:https://github.com/apache/rocketmq-spring

5. 源码地址

我的github https://github.com/wangshuai67/icepip-springboot-action-examples

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/80885.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

宝塔安装python和openssl

宝塔安装python和openssl OpenSSL Centos7 openssl 升级 1.1.1k.tar.gz centos7系统安装Vicuna&#xff08;小羊驼&#xff09;聊天机器人 CentOS中输入yum报错&#xff1a;sudo: unable to execute /bin/yum: No such file or directory opensslrpm安装指南-让你的网站更加…

4G版本云音响设置教程腾讯云平台版本

文章目录 4G本云音响设置教程介绍一、申请设备三元素1.腾讯云物联网平台2.创建产品3.设置产品参数4.添加设备5.获取三元素 二、设置设备三元素1.打开MQTTConfigTools2.计算MQTT参数3.使用USB连接设备4.设置参数 三、腾讯云物联网套件协议使用说明1.推送协议信息2.topic规则说明…

JDK16特性

文章目录 一、JAVA16概述二、语法层面变化1.密封类&#xff08;第二次预览&#xff09;2.instanceof 的模式匹配3.记录类习惯4基于值的类的警告 三、API层面变化1 Vector API&#xff08;孵化器&#xff09; 四、其他变化1.启用 C14 语言功能2.从 Mercurial 迁移到 Git3.ZGC&am…

Microsoft Excel 101 简介

什么是 Microsoft Excel&#xff1f; Microsoft Excel 是一个电子表格程序&#xff0c;用于记录和分析数值数据。 将电子表格想像成构成表格的列和行的集合。 字母通常分配给列&#xff0c;数字通常分配给行。 列和行相交的点称为像元。 单元格的地址由代表列的字母和代表行的…

【C++】LeetCode 160 相交链表

今天再写一道算法题&#xff08;这两周都写算法题有点摆烂&#xff09; 题目 给你两个单链表的头节点 headA 和 headB &#xff0c;请你找出并返回两个单链表相交的起始节点。如果两个链表不存在相交节点&#xff0c;返回 null 。 图示两个链表在节点 c1 开始相交&#xff1…

安防监控/视频汇聚/云存储/AI智能视频分析平台EasyCVR下级海康设备无法级联是什么原因?

安防视频监控平台/视频集中存储/云存储/磁盘阵列EasyCVR可拓展性强、视频能力灵活、部署轻快&#xff0c;可支持的主流标准协议有国标GB28181、RTSP/Onvif、RTMP等&#xff0c;以及支持厂家私有协议与SDK接入&#xff0c;包括海康Ehome、海大宇等设备的SDK等。 有用户反馈&…

[JAVAee]spring-Bean对象的执行流程与生命周期

执行流程 spring中Bean对象的执行流程大致分为四步: 启动Spring容器实例化Bean对象Bean对象注册到Spring容器中将Bean对象装配到所需的类中 ①启动Spring容器,在main方法中获取spring上下文对象并配备spring. import demo.*;import org.springframework.context.Applicati…

POJ 2785 4 Values whose Sum is 0 折半枚举

一、题目大意 从四个数组A[ ],B[ ],C[ ],D[ ]中分别取一个元素a b c d&#xff0c;使得 a b c d 0&#xff0c;找出所有a b c d 的解的数量&#xff0c;认为下标不同&#xff0c;但值相同的元素为不同元素。 二、解题思路 如果暴力枚举&#xff0c;一定超时&#xff0c;…

pt26django教程

admin 后台数据库管理 django 提供了比较完善的后台管理数据库的接口&#xff0c;可供开发过程中调用和测试使用 django 会搜集所有已注册的模型类&#xff0c;为这些模型类提拱数据管理界面&#xff0c;供开发者使用 创建后台管理帐号: [rootvm mysite2]# python3 manage.…

LeetCode 753. 破解保险箱【欧拉回路,DFS】困难

本文属于「征服LeetCode」系列文章之一&#xff0c;这一系列正式开始于2021/08/12。由于LeetCode上部分题目有锁&#xff0c;本系列将至少持续到刷完所有无锁题之日为止&#xff1b;由于LeetCode还在不断地创建新题&#xff0c;本系列的终止日期可能是永远。在这一系列刷题文章…

在标准的C++ 语法中,请问有 MyCppClass* mycppclass 这样的变量定义方式吗?

在标准的C 语法中&#xff0c;请问有 MyCppClass*& mycppclass 这样的变量定义方式吗&#xff1f; Author: Lycan Date: 2023/9/16 11:53 Note: 以下问题解答通过大模型生成&#xff0c;主要用于个人学习和备忘&#xff0c;仅供参考&#xff0c;若有错误或者侵权&#xff…

如何从第一性原则的原理分解数学问题

如何从第一性原则的原理分解数学问题 摘要&#xff1a;牛津大学入学考试题目展示了所有优秀数学家都使用的系统的第一原则推理&#xff0c;而GPT4仍然在这方面有困难 作者&#xff1a;Keith McNulty 我们中的许多人都熟悉直角三角形的边的规则。根据毕达哥拉斯定理&#xff0c;…

2023年墨西哥 SP/BMV IPC 研究报告

第一章 指数概况 1.1 指数基本情况 墨西哥 S&P/BMV IPC 指数衡量在墨西哥证券交易所 (Bolsa Mexicana de Valores, BMV)上市&#xff0c;规模最大、流动性最高的股票表现。提供一个覆盖墨西哥股市的广泛、具有代表性且可轻易复制的指数。根据多元化要求&#xff0c;按市值…

【深度学习】 Python 和 NumPy 系列教程(廿五):Matplotlib详解:3、多子图和布局:subplot()函数

目录 一、前言 二、实验环境 三、Matplotlib详解 1、2d绘图类型 2、3d绘图类型 3、多子图和布局 1. subplot()函数 简单示例 一、前言 Python是一种高级编程语言&#xff0c;由Guido van Rossum于1991年创建。它以简洁、易读的语法而闻名&#xff0c;并且具有强大的功能…

使用Selenium和Python自动预订车票

在本文中&#xff0c;我们将探讨如何使用Selenium和Python自动预订车票。我们将以12306.cn网站为例&#xff0c;演示自动化预订车票的过程。通过阅读本文&#xff0c;您将更好地了解如何使用Selenium与网页进行交互。 准备工作 首先&#xff0c;我们需要安装Selenium库。您可…

多表查询——“MySQL数据库”

各位CSDN的uu们好呀&#xff0c;今天&#xff0c;小雅兰的内容是MySQL数据库中的多表查询啦&#xff0c;这个内容确实是一个难点&#xff0c;下面&#xff0c;让我们进入多表查询的世界吧&#xff01;&#xff01;&#xff01; 一个案例引发的多表连接 多表查询分类讲解 SQL9…

bootstrap table export的使用,bootstrap table表格前端导出

第一步&#xff1a;html页面导入2个JS文件 1.依赖文件tableExport.js&#xff0c;该文件的内容如下&#xff1a; use strict;(function ($) {$.fn.tableExport function (options) {let docData;const defaults {csvEnclosure: ",csvSeparator: ,,csvUseBOM: true,dat…

vue前端拿到后端pdf与zip等重新打包为一个新的zip包

目录 vue前端拿到后端pdf与zip等重新打包为一个新的zip包code.vue vue前端拿到后端pdf与zip等重新打包为一个新的zip包 code.vue const urlList [{fileUrl:https://XX.zip,fileName:我是文件.zip},{fileUrl:https://XXX.pdf,fileName:我是pdf.pdf}]this.downloadZip(urlList)…

精品SpringCloud的高校招生信息管理系统-微服务分布式

《[含文档PPT源码等]精品基于SpringCloud实现的高校招生信息管理系统-微服务-分布式》该项目含有源码、文档、PPT、配套开发软件、软件安装教程、项目发布教程等 软件开发环境及开发工具&#xff1a; 开发语言&#xff1a;Java 框架&#xff1a;springcloud JDK版本&#x…

FL Studio v21.1.1.3750 Producer Edition inc crack官方中文免费激活版功能介绍及百度网盘下载

FL Studio v21.1.1.3750 Producer Edition inc crack官方中文免费激活版是一款功能强大的软件音乐制作环境或数字音频工作站&#xff08;DAW&#xff09;。它代表了25多年的创新发展&#xff0c;在一个软件包中拥有您所需的一切&#xff0c;以创作、编排、录制、编辑、混音和掌…