【Spring Cloud Alibaba】RocketMQ的基础使用,如何发送消息和消费消息

在现代分布式架构的开发中,消息队列扮演着至关重要的角色,用于解耦系统组件、保障可靠性以及实现异步通信。RocketMQ作为一款开源的分布式消息中间件,凭借其高性能、高可用性和良好的扩展性,成为了众多企业在构建高可靠性、高吞吐量应用系统时的首选。
对于Spring Cloud Alibaba的用户来说,集成RocketMQ并进行消息的发送与消费是常见的任务。本篇博客将深入介绍RocketMQ的基础使用方法,带大家一步步学习如何在Spring Cloud Alibaba中发送和消费消息。

在开始之前,确保已经完成了以下准备工作:

  1. 安装RocketMQ:确保已经在系统中成功安装了RocketMQ,并启动了相关服务。教程可以查看我上一篇博客
    【Spring Cloud Alibaba】Linux安装RocketMQ以及RocketMQ Dashboard可视化工具
  2. JDK:安装了JDK 1.8及以上版本,以便于运行Java应用程序。

文章目录

  • 🎺 第一步,搭建rocketmq项目环境
  • 🎺 第二步,生产者代码
    • 🎺普通消息
      • 🎺普通消息发送
        • 🎺同步发送
        • 🎺异步发送
        • 🎺单向模式发送
      • 🎺 普通消息接收
    • 🎺顺序消息
      • 🎺顺序消息发送
      • 🎺顺序消息接收
    • 🎺 延迟消息
      • 🎺 延迟消息发送
      • 🎺延时消息接收
    • 🎺批量消息
      • 🎺批量消息发送
      • 🎺批量消息接收
    • 🎺 事务消息
      • 🎺 事务消息发送
      • 在实际中遇到的问题

rocketmq官网地址:
https://rocketmq.apache.org/zh/docs/4.x/

这里我们讲的是4.x的版本

🎺 第一步,搭建rocketmq项目环境

        <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.1</version></dependency>

我们使用的是rocketmq-spring-boot-starter2.2.1,其中的rocketmq版本为4.9.1

🎺 第二步,生产者代码

🎺普通消息

🎺普通消息发送

🎺同步发送

在这里插入图片描述

同步发送是最常用的方式,是指消息发送方发出一条消息后,会在收到服务端同步响应之后才发下一条消息的通讯方式,可靠的同步传输被广泛应用于各种场景,如重要的通知消息、短消息通知等。

    /*** 普通消息发送 https://rocketmq.apache.org/zh/docs/4.x/producer/02message1#31-%E5%90%8C%E6%AD%A5%E5%8F%91%E9%80%81** @return*/@GetMapping("/syncSend")public SendResult syncSend(String message) {Message<String> stringMessage = MessageBuilder.createMessage(message, new MessageHeaders(null));return rocketMQTemplate.syncSend("my-topic:*", stringMessage);}

🎺异步发送

在这里插入图片描述
异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。

    /*** 普通消息发送 https://rocketmq.apache.org/zh/docs/4.x/producer/02message1#32-%E5%BC%82%E6%AD%A5%E5%8F%91%E9%80%81** @return*/@GetMapping("/asyncSend")public String asyncSend(String message) {Message<String> stringMessage = MessageBuilder.createMessage(message, new MessageHeaders(null));rocketMQTemplate.asyncSend("my-topic:*", stringMessage, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("异步发送服务器返回信息成功");}@Overridepublic void onException(Throwable throwable) {System.out.println("异步发送服务器返回信息失败");}});return "异步发送成功";}

🎺单向模式发送

在这里插入图片描述

发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。

    /*** 普通消息发送 https://rocketmq.apache.org/zh/docs/4.x/producer/02message1#33-%E5%8D%95%E5%90%91%E6%A8%A1%E5%BC%8F%E5%8F%91%E9%80%81** @return*/@GetMapping("/sendOneway")public String sendOneway(String message) {Message<String> stringMessage = MessageBuilder.createMessage(message, new MessageHeaders(null));rocketMQTemplate.sendOneWay("my-topic:*", stringMessage);return "发送成功";}

🎺 普通消息接收

MQ的消费模式可以大致分为两种,一种是推Push,一种是拉Pull。

Push是服务端主动推送消息给客户端,优点是及时性较好,但如果客户端没有做好流控,一旦服务端推送大量消息到客户端时,就会导致客户端消息堆积甚至崩溃。

Pull是客户端需要主动到服务端取数据,优点是客户端可以依据自己的消费能力进行消费,但拉取的频率也需要用户自己控制,拉取频繁容易造成服务端和客户端的压力,拉取间隔长又容易造成消费不及时。

Apache RocketMQ既提供了Push模式也提供了Pull模式。

该博客中所有消费者都使用默认的push模式

@Component
@RocketMQMessageListener(topic = "my-topic", consumerGroup = "my-group", consumeMode = ConsumeMode.CONCURRENTLY)
public class NormalRocketMQListener implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("Received message:"+message);}
}

messageModel = MessageModel.BROADCASTING即广播消息,每个消费者都会去消费
但是即使都消费了,但是trackType都会显示NOT_CONSUME_YET
在这里插入图片描述

🎺顺序消息

🎺顺序消息发送

顺序消息是一种对消息发送和消费顺序有严格要求的消息。
对于一个指定的Topic,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费

/*** 顺序消息发送 https://rocketmq.apache.org/zh/docs/4.x/producer/03message2** @return*/@GetMapping("/syncSendOrderly")public SendResult syncSendOrderly(String message) {String[] split = message.split(",");List<Message<String>> list = new ArrayList<>();for (String mes : split) {list.add(MessageBuilder.createMessage(mes, new MessageHeaders(null)));}return rocketMQTemplate.syncSendOrderly("order-topic:*", list, String.valueOf(System.currentTimeMillis()));}

🎺顺序消息接收

消费者这里要设置consumeMode = ConsumeMode.ORDERLY才能实现顺序接收

@Component
@RocketMQMessageListener(topic = "order-topic", consumerGroup = "order-topic", consumeMode = ConsumeMode.ORDERLY)
public class OrderRocketMQListener implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("Received message:"+message);}
}

🎺 延迟消息

🎺 延迟消息发送

延迟消息发送是指消息发送到Apache RocketMQ后,并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费。

    /*** 延迟消息发送 https://rocketmq.apache.org/zh/docs/4.x/producer/04message3** @return*/@GetMapping("/send")public SendResult send(String message) {Message<String> stringMessage = MessageBuilder.createMessage(message, new MessageHeaders(null));return rocketMQTemplate.syncSend("delay-topic:*", stringMessage,1000,2);}

🎺延时消息接收

@Component
@RocketMQMessageListener(topic = "delay-topic", consumerGroup = "delay-topic")
public class DelayRocketMQListener implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("Received message:"+message);}
}

🎺批量消息

🎺批量消息发送

在对吞吐率有一定要求的情况下,Apache RocketMQ可以将一些消息聚成一批以后进行发送,可以增加吞吐率,并减少API和网络调用次数。

    /*** 批量消息发送 https://rocketmq.apache.org/zh/docs/4.x/producer/05message4** @return*/@GetMapping("/send")public SendResult send(String message) {String[] split = message.split(",");List<Message<String>> list = new ArrayList<>();for (String mes : split) {list.add(MessageBuilder.createMessage(mes, new MessageHeaders(null)));}return rocketMQTemplate.syncSend("list-topic:*", list);}

🎺批量消息接收

@Component
@RocketMQMessageListener(topic = "list-topic", consumerGroup = "list-topic")
public class ListRocketMQListener implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("Received message:"+message);}
}

🎺 事务消息

🎺 事务消息发送

    /*** 事务消息发送 https://rocketmq.apache.org/zh/docs/4.x/producer/06message5** @return*/@GetMapping("/send")public SendResult send(String message) {Message<String> message1 = MessageBuilder.createMessage(message, new MessageHeaders(null));return rocketMQTemplate.sendMessageInTransaction("transA-topic:*", message1, null);}

在这里插入图片描述

rocketmq-springboot 提供了一个注解@RocketMQTransactionListener
使用方法:实现RocketMQLocalTransactionListener接口,并且类上加注解@RocketMQTransactionListener

@RocketMQTransactionListener
public class TopicATransactionalMessageService implements RocketMQLocalTransactionListener {private Map<String, RocketMQTransactionStrategy> strategyMap;//策略模式@Autowiredpublic TopicATransactionalMessageService(TopicAStrategy topicAStrategy,TopicBStrategy topicBStrategy) {strategyMap = new HashMap<>();strategyMap.put("transA-topic", topicAStrategy);strategyMap.put("TopicB", topicBStrategy);// ...}/*** @param msg* @param arg* @return*/@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {String topic = msg.getHeaders().get(RocketMQHeaders.PREFIX+RocketMQHeaders.TOPIC).toString();RocketMQTransactionStrategy strategy = strategyMap.get(topic);if (strategy == null) {// 如果没有对应的策略,可以抛出异常或者返回一个默认的事务状态}return strategy.executeLocalTransaction(msg, arg);}/*** @param msg* @return*/@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {String topic = msg.getHeaders().get(RocketMQHeaders.PREFIX+RocketMQHeaders.TOPIC).toString();RocketMQTransactionStrategy strategy = strategyMap.get(topic);if (strategy == null) {// 如果没有对应的策略,可以抛出异常或者返回一个默认的事务状态}return strategy.checkLocalTransaction(msg);}
}

在这里插入图片描述

这里可以使用策略模式来实现对每个topic的自定义策略

每个topic处理类需要实现RocketMQTransactionStrategy接口

@Service
public class TopicAStrategy implements RocketMQTransactionStrategy {@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {msg.getHeaders().get(RocketMQHeaders.PREFIX + RocketMQHeaders.TOPIC);Object payload = msg.getPayload();String mes = new String((byte[]) payload);if (mes.equals("1")) {return RocketMQLocalTransactionState.COMMIT;} else if (mes.equals("2")) {return RocketMQLocalTransactionState.ROLLBACK;} else {return RocketMQLocalTransactionState.UNKNOWN;}// ...}@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {return RocketMQLocalTransactionState.COMMIT;// ...}
}

这样发送不同的topic都会有不同的处理策略

更多消息可查看官网
https://rocketmq.apache.org/zh/docs/4.x/introduction/02quickstart/

在实际中遇到的问题

  • conf/broker.conf 中配置autoCreateTopicEnable=true
    ,如果没有对应的topic,则会在生产者有消息发送到mq的时候自动创建对应的topic
    如果不配置该属性,且开始没有topic的时候生产者发送消息到topic会报错org.apache.rocketmq.client.exception.MQBrokerException: CODE: 17 DESC: topic[delay-topic] not exist, apply first please!

  • 不管有没有配置autoCreateTopicEnable=true,都会出现以下的情况:
    添加了消费者注解,如@RocketMQMessageListener(topic = "delay-topic", consumerGroup = "delay-topic"),程序会自动创建一个名称为%RETRY%delay-topic的topic
    如果没有对应的topic,则会一直报错org.apache.rocketmq.client.exception.MQClientException: CODE: 17 DESC: No topic route info in name server for the topic: delay-topic
    解决方法:
    在mq中手动添加对应的topic即可

  • 如果你修改了autoCreateTopicEnable=true没有效果,删除rocketmq存储数据的文件夹store即可,默认存放位置/root/store

博客中涉及到的仓库地址:
https://gitee.com/WangFuGui-Ma/spring-cloud-alibaba

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/37205.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

stable diffusion 单张图片换头roop安装配置

1.首先安装秋叶大佬的webui 2.然后在拓展里面搜索roop,下载roop插件,然后重启webui 3.重启后,在文生图和图生图的界面,就可以看到roop的入口 4.这里面,需要提前安装Visual Studio. 勾选一些必要的选项,这里可以参照b站的视频 # 秋叶版本Stablediffusion的Roop插件的安装 …

使用 Python 在 NLP 中进行文本预处理

一、说明 自然语言处理 &#xff08;NLP&#xff09; 是人工智能 &#xff08;AI&#xff09; 和计算语言学的一个子领域&#xff0c;专注于使计算机能够理解、解释和生成人类语言。它涉及计算机和自然语言之间的交互&#xff0c;允许机器以对人类有意义和有用的方式处理、分析…

安卓中常见的字节码指令介绍

问题背景 安卓开发过程中&#xff0c;经常要通过看一些java代码对应的字节码&#xff0c;来了解java代码编译后的运行机制&#xff0c;本文将通过一个简单的demo介绍一些基本的字节码指令。 问题分析 比如以下代码&#xff1a; public class test {public static void main…

Java课题笔记~ JSP编程

4.1 JSP基本语法 JSP (全称Java Server Pages) 是由 Sun Microsystems 公司倡导和许多公司参与共同创建的一种使软件开发者可以响应客户端请求&#xff0c;而动态生成 HTML、XML 或其他格式文档的Web网页的技术标准。 JSPHTMLJava JSP的本质是Servlet 访问JSP的时候&#x…

【设计模式】原型模式

原型模式&#xff08;Prototype Pattern&#xff09;是用于创建重复的对象&#xff0c;同时又能保证性能。这种类型的设计模式属于创建型模式&#xff0c;它提供了一种创建对象的最佳方式之一。 这种模式是实现了一个原型接口&#xff0c;该接口用于创建当前对象的克隆。当直接…

基于微服务+Java+Spring Cloud +Vue+UniApp +MySql实现的智慧工地云平台源码

基于微服务JavaSpring Cloud VueUniApp MySql开发的智慧工地云平台源码 智慧工地概念&#xff1a; 智慧工地就是互联网建筑工地&#xff0c;是将互联网的理念和技术引入建筑工地&#xff0c;然后以物联网、移动互联网技术为基础&#xff0c;充分应用BIM、大数据、人工智能、移…

Android布局【RelativeLayout】

文章目录 介绍常见属性根据父容器定位根据兄弟组件定位 通用属性margin 设置组件与父容器的边距padding 设置组件内部元素的边距 项目结构主要代码 介绍 RelativeLayout是一个相对布局&#xff0c;如果不指定对齐位置&#xff0c;都是默认相对于父容器的左上角的开始布局 常见…

MQTT宝典

文章目录 1.介绍2.发布和订阅3.MQTT 数据包结构4.Demo5.EMQX 1.介绍 什么是MQTT协议 MQTT&#xff08;消息队列遥测传输协议&#xff09;&#xff0c;是一种基于发布/订阅&#xff08;publish/subscribe&#xff09;模式的“轻量级”通讯协议&#xff0c;该协议构建于TCP/IP协…

安卓快速开发

1.环境搭建 Android Studio下载网页&#xff1a;https://developer.android.google.cn/studio/index.html 第一次新建工程需要等待很长时间&#xff0c;新建一个Empty Views Activity 项目&#xff0c;右上角选择要运行的机器&#xff0c;运行就安装上去了(打开USB调试)。 2…

【Linux】UDP协议——传输层

目录 传输层 再谈端口号 端口号范围划分 认识知名端口号 两个问题 netstat与iostat pidof UDP协议 UDP协议格式 UDP协议的特点 面向数据报 UDP的缓冲区 UDP使用注意事项 基于UDP的应用层协议 传输层 在学习HTTP等应用层协议时&#xff0c;为了便于理解&#xff…

【实操】2023年npm组件库的创建发布流程

2022年的实践为基础&#xff0c;2023年我再建一个组件库【ZUI】。步骤回顾&#xff1a; 2022年的npm组件包的发布删除教程_npm i ant-design/pro-components 怎么删除_啥咕啦呛的博客-CSDN博客 1.在gitee上创建一个项目,相信你是会的 2.创建初始化项目&#xff0c;看吧&#…

【新品发布】ChatWork企业知识库系统源码

系统简介 基于前后端分离架构以及Vue3、uni-app、ThinkPHP6.x、PostgreSQL、pgvector技术栈开发&#xff0c;包含PC端、H5端。 ChatWork支持问答式和文档式知识库&#xff0c;能够导入txt、doc、docx、pdf、md等多种格式文档。 导入数据完成向量化训练后&#xff0c;用户提问…

两个pdf合并成一个pdf怎么合并?这几个方法值得推荐

两个pdf合并成一个pdf怎么合并&#xff1f;pdf文件的合并是一个很常见的需求&#xff0c;特别是在处理工作文件或学习资料时。为了更好的帮助你了解如何将两个pdf文件合并成一个&#xff0c;下面就给大家详细介绍几种合并方法。 方法一&#xff1a;使用迅捷PDF转换器 这是一款…

小红书如何打造爆款引流吸粉?11个秘诀助你秒变达人!

在这个充满信息和内容的时代&#xff0c;小红书以其独特的社交平台特性和个性化内容吸引了众多用户。今天&#xff0c;我们就来揭秘小红书关注战略&#xff0c;了解如何在这个平台上打造独特的内容体验&#xff0c;与用户建立更亲近的连接。#小红书# 1、定位清晰&#xff0c;找…

【论文阅读】基于深度学习的时序预测——Pyraformer

系列文章链接 论文一&#xff1a;2020 Informer&#xff1a;长时序数据预测 论文二&#xff1a;2021 Autoformer&#xff1a;长序列数据预测 论文三&#xff1a;2022 FEDformer&#xff1a;长序列数据预测 论文四&#xff1a;2022 Non-Stationary Transformers&#xff1a;非平…

页面跳转和两个页面之间的数据传递-鸿蒙ArkTS

页面跳转和两个页面之间的数据传递-ArkTS 页面跳转和两个页面之间的数据传递-ArkTS关于router的使用**跳转页面的实现方式。**页面接受跳转传递的参数页面返回及携带参数效果代码Index页面Second页面 参考资料 页面跳转和两个页面之间的数据传递-ArkTS 本篇文章主要是对两个页面…

TiDB在科捷物流神州金库核心系统的应用与实践

业务背景 北京科捷物流有限公司于2003年在北京正式成立&#xff0c;是ISO质量管理体系认证企业、国家AAAAA级物流企业、海关AEO高级认证企业&#xff0c;注册资金1亿元&#xff0c;是中国领先的大数据科技公司——神州控股的全资子公司。科捷物流融合B2B和B2C的客户需求&#…

matlab使用教程(15)—图论基础

1.有向图和无向图 1.1什么是图&#xff1f; 图是表示各种关系的节点和边的集合&#xff1a; • 节点 是与对象对应的顶点。 • 边 是对象之间的连接。 • 图的边有时会有权重 &#xff0c;表示节点之间的每个连接的强度&#xff08;或一些其他属性&#xff09;。 这些定…

MySQL8.xx一主两从复制安装与配置

搭建环境: 查看系统版本cat /etc/redhat-release [rootwww tools]# cat /etc/redhat-release CentOS Linux release 7.9.2009 (Core) 查看内核版本cat /proc/version 目标: 一主两从 主机IP 主机名称 端口 搭建环境 安装目录192.168.1.100 docker…

19.正则表达式

19.1什么是正则表达式 ●正则表达式( Regular Expression) 是用于匹配字符串中字符组合的模式。在JavaScript中&#xff0c; 正则表达式也是对象 ●通常用来查找、替换那些符合正则表达式的文本&#xff0c;许多语言都支持正则表达式 ●正则表达式在JavaScript中的使用场景: …