RocketMQ笔记(五)SpringBoot整合RocketMQ批量发送消息

目录

    • 一、简介
      • 1.1、特点
    • 二、Maven依赖
    • 三、application配置
    • 四、批量发送
      • 4.1、同步消息
      • 4.2、异步消息
      • 4.3、顺序消息
      • 4.4、关于异步批量发送
      • 4.5、结论
    • 五、其他

一、简介

  在之前的文章中,我讲过了,同步发送单条消息,异步发送单条消息,发送单向消息,发送顺序消息,今天我们讲讲如何批量发送消息,主要还是使用方法RocketMQTemplatesyncSend方法。

1.1、特点

  批量发送和单条发送消息的主要区别有以下几点:

  • 网络开销 发送单条消息时,每个消息都需要单独建立网络连接、发送数据包、等待响应等,网络开销较大。批量发送可以将多条消息打包在一起发送,减少网络连接建立的次数,降低网络开销
  • 吞吐量 由于批量发送减少了网络开销,所以可以在单位时间内发送更多的消息,提高了吞吐量。在高并发高流量场景下,批量发送能够发挥更好的性能
  • 消息顺序 单条发送消息的顺序是有序的,后发送的在队列中排在前发送的后面。而对于批量发送,一个批次内的消息顺序是固定的,但不同批次之间的消息顺序是无序的,会按照到达顺序存储在队列中。如果需要严格消息顺序,单条发送更合适
  • 消息重试 如果批量发送的一个批次中有部分消息发送失败,需重发整个批次,没有选择重发其中部分消息的功能(涉及幂等性问题)。单条发送失败时只需重发该单条消息
  • 编程复杂度 批量发送需要构造MessageBatch或Message列表对象,编程略微复杂些。单条发送只需构造单个Message对象

二、Maven依赖

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>rocketmq</artifactId><groupId>com.alian</groupId><version>1.0.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>06-send-batched-message</artifactId><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>com.alian</groupId><artifactId>common-rocketmq-dto</artifactId><version>1.0.0-SNAPSHOT</version></dependency></dependencies></project>

  父工程已经在我上一篇文章里,通用公共包也在我上一篇文章里有说明,包括消费者。具体参考:RocketMQ笔记(一)SpringBoot整合RocketMQ发送同步消息

三、application配置

application.properties

server.port=8005# rocketmq地址
rocketmq.name-server=192.168.0.234:9876
# 默认的生产者组
rocketmq.producer.group=batched_group
# 发送同步消息超时时间
rocketmq.producer.send-message-timeout=3000
# 用于设置在消息发送失败后,生产者是否尝试切换到下一个服务器。设置为 true 表示启用,在发送失败时尝试切换到下一个服务器
rocketmq.producer.retry-next-server=true
# 用于指定消息发送失败时的重试次数
rocketmq.producer.retry-times-when-send-failed=3
# 设置消息压缩的阈值,为0表示禁用消息体的压缩
rocketmq.producer.compress-message-body-threshold=0

四、批量发送

  在 RocketMQ 中,RocketMQTemplatesyncSend方法,它允许你批量发送同步消息,主要参数:

  • topic:(普通消息都发送到topic=string_message_topic
  • Collection<T>:消息集合

测试类都引入依赖

	@Autowiredprivate RocketMQTemplate rocketMQTemplate;

4.1、同步消息

    @Testpublic void syncSendBatchStringMessagesWithBuilder() {String topic = "string_message_topic";String message = "超级喜欢Golang语言:";List<Message<String>> messageList = new ArrayList<>();for (int i = 0; i < 10; i++) {Message<String> rocketMessage = MessageBuilder.withPayload(message + i)// 设置消息类型.setHeader(MessageHeaders.CONTENT_TYPE, "text/plain").build();// 加入到列表messageList.add(rocketMessage);}// 使用syncSend发送批量消息SendResult sendResult = rocketMQTemplate.syncSend(topic, messageList);log.info("同步批量发送普通消息结果:{}",sendResult);}

运行结果:

[_GROUP_STRING_1] c.a.concurrent.StringMessageConsumer     : 字符串消费者接收到的消息: 同步批量发送普通消息:0
[_GROUP_STRING_2] c.a.concurrent.StringMessageConsumer     : 字符串消费者接收到的消息: 同步批量发送普通消息:1
[_GROUP_STRING_4] c.a.concurrent.StringMessageConsumer     : 字符串消费者接收到的消息: 同步批量发送普通消息:3
[_GROUP_STRING_5] c.a.concurrent.StringMessageConsumer     : 字符串消费者接收到的消息: 同步批量发送普通消息:4
[_GROUP_STRING_3] c.a.concurrent.StringMessageConsumer     : 字符串消费者接收到的消息: 同步批量发送普通消息:2
[_GROUP_STRING_6] c.a.concurrent.StringMessageConsumer     : 字符串消费者接收到的消息: 同步批量发送普通消息:5
[_GROUP_STRING_7] c.a.concurrent.StringMessageConsumer     : 字符串消费者接收到的消息: 同步批量发送普通消息:6
[_GROUP_STRING_8] c.a.concurrent.StringMessageConsumer     : 字符串消费者接收到的消息: 同步批量发送普通消息:7
[_GROUP_STRING_9] c.a.concurrent.StringMessageConsumer     : 字符串消费者接收到的消息: 同步批量发送普通消息:8
[GROUP_STRING_10] c.a.concurrent.StringMessageConsumer     : 字符串消费者接收到的消息: 同步批量发送普通消息:9

4.2、异步消息

    @Testpublic void asyncSendBatchStringMessageWithBuilder() {String topic = "string_message_topic";String message = "Alian超级喜欢Golang语言:";List<Message<String>> messageList = new ArrayList<>();for (int i = 0; i < 10; i++) {Message<String> rocketMessage = MessageBuilder.withPayload(message + i)// 设置消息类型.setHeader(MessageHeaders.CONTENT_TYPE, "text/plain").build();// 加入到列表messageList.add(rocketMessage);}// 使用asyncSend发送批量消息rocketMQTemplate.asyncSend(topic, messageList, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {// 异步发送成功的回调逻辑log.info("异步批量发送普通消息成功: " + sendResult);}@Overridepublic void onException(Throwable e) {// 异步发送失败的回调逻辑log.info("异步批量发送普通消息失败: " + e.getMessage());}});}

运行结果:

[_GROUP_STRING_1] c.a.concurrent.StringMessageConsumer     : 字符串消费者接收到的消息: 异步批量发送普通消息:0
[_GROUP_STRING_8] c.a.concurrent.StringMessageConsumer     : 字符串消费者接收到的消息: 异步批量发送普通消息:1
[_GROUP_STRING_3] c.a.concurrent.StringMessageConsumer     : 字符串消费者接收到的消息: 异步批量发送普通消息:7
[_GROUP_STRING_6] c.a.concurrent.StringMessageConsumer     : 字符串消费者接收到的消息: 异步批量发送普通消息:4
[_GROUP_STRING_9] c.a.concurrent.StringMessageConsumer     : 字符串消费者接收到的消息: 异步批量发送普通消息:2
[_GROUP_STRING_5] c.a.concurrent.StringMessageConsumer     : 字符串消费者接收到的消息: 异步批量发送普通消息:6
[GROUP_STRING_10] c.a.concurrent.StringMessageConsumer     : 字符串消费者接收到的消息: 异步批量发送普通消息:3
[_GROUP_STRING_2] c.a.concurrent.StringMessageConsumer     : 字符串消费者接收到的消息: 异步批量发送普通消息:8
[_GROUP_STRING_4] c.a.concurrent.StringMessageConsumer     : 字符串消费者接收到的消息: 异步批量发送普通消息:9
[_GROUP_STRING_7] c.a.concurrent.StringMessageConsumer     : 字符串消费者接收到的消息: 异步批量发送普通消息:5

4.3、顺序消息

  在 RocketMQ 中,RocketMQTemplatesyncSendOrderly方法,它允许你批量发送同步消息,主要参数:

  • topic:(和之前有区别,普通消息都发送到topic=ordered_string_message_topic
  • Collection<T>:消息集合
  • hashKey:通过hashKey发送到同一个队列
    @Testpublic void syncSendBatchOrderlyStringMessagesWithBuilder() {String topic = "ordered_string_message_topic";String message = "同步批量发送顺序消息,超级喜欢Go语言:";List<Message<String>> messageList = new ArrayList<>();for (int i = 0; i < 10; i++) {Message<String> rocketMessage = MessageBuilder.withPayload(message + i)// 设置消息类型.setHeader(MessageHeaders.CONTENT_TYPE, "text/plain").build();// 加入到列表messageList.add(rocketMessage);}// 使用syncSendOrderly发送批量顺序消息,消费者线程设置为1SendResult sendResult = rocketMQTemplate.syncSendOrderly(topic, messageList, "alian_sync_ordered");log.info("批量发送顺序消息发送结果:{}",sendResult);}

运行结果:

[GROUP_STRING_10] com.alian.ordered.StringMessageConsumer  : 字符串消费者接收到的消息: 同步批量发送顺序消息,超级喜欢Go语言:0
[GROUP_STRING_10] com.alian.ordered.StringMessageConsumer  : 字符串消费者接收到的消息: 同步批量发送顺序消息,超级喜欢Go语言:1
[GROUP_STRING_10] com.alian.ordered.StringMessageConsumer  : 字符串消费者接收到的消息: 同步批量发送顺序消息,超级喜欢Go语言:2
[GROUP_STRING_10] com.alian.ordered.StringMessageConsumer  : 字符串消费者接收到的消息: 同步批量发送顺序消息,超级喜欢Go语言:3
[GROUP_STRING_10] com.alian.ordered.StringMessageConsumer  : 字符串消费者接收到的消息: 同步批量发送顺序消息,超级喜欢Go语言:4
[GROUP_STRING_10] com.alian.ordered.StringMessageConsumer  : 字符串消费者接收到的消息: 同步批量发送顺序消息,超级喜欢Go语言:5
[GROUP_STRING_10] com.alian.ordered.StringMessageConsumer  : 字符串消费者接收到的消息: 同步批量发送顺序消息,超级喜欢Go语言:6
[GROUP_STRING_10] com.alian.ordered.StringMessageConsumer  : 字符串消费者接收到的消息: 同步批量发送顺序消息,超级喜欢Go语言:7
[GROUP_STRING_10] com.alian.ordered.StringMessageConsumer  : 字符串消费者接收到的消息: 同步批量发送顺序消息,超级喜欢Go语言:8
[GROUP_STRING_10] com.alian.ordered.StringMessageConsumer  : 字符串消费者接收到的消息: 同步批量发送顺序消息,超级喜欢Go语言:9

  所以我之前说批量发送消息的topic不一样,因为

@Slf4j
@Service
@RocketMQMessageListener(topic = "ordered_string_message_topic", consumerGroup = "ORDERED_GROUP_STRING", consumeMode = ConsumeMode.ORDERLY)
public class StringMessageConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {log.info("字符串消费者接收到的消息: {}", message);// 处理消息的业务逻辑}
}

  顺序消息要顺序消费,也就是每次是一个线程去消费,相当于单线程,也就有序了。关键就是配置了:consumeMode = ConsumeMode.ORDERLY

  当然,我们也可以把消费者线程数设置为 consumeThreadNumber = 1,也就是单线程消费了,从而确保了消息的顺序消费(指单实例):

@RocketMQMessageListener(topic = "ordered_string_message_topic", consumerGroup = "CONCURRENT_GROUP_STRING",consumeThreadNumber = 1)
public class StringMessageConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {log.info("字符串消费者接收到的消息: {}", message);// 处理消息的业务逻辑}
}

4.4、关于异步批量发送

  有可能你会写下面的异步批量发送顺序消息

	@Testpublic void asyncSendBatchOrderlyStringMessageWithBuilder2() {String topic = "ordered_string_message_topic";String message = "Alian超级喜欢Golang语言:";List<String> messageList = new ArrayList<>();for (int i = 0; i < 10; i++) {// 加入到列表messageList.add(message + i);}// 使用 asyncSendOrderly 发送批量消息rocketMQTemplate.asyncSendOrderly(topic, messageList, "alian_async_ordered", new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {// 异步发送成功的回调逻辑log.info("异步消息发送字符串消息成功: " + sendResult);}@Overridepublic void onException(Throwable e) {// 异步发送失败的回调逻辑log.info("异步消息发送字符串消息失败: " + e.getMessage());}});}

  其实这个是不对的,最终的结果是一个把你这里的messageList,当做了一个消息列表接收了,如下结果:

[GROUP_STRING_18] com.alian.ordered.StringMessageConsumer  : 字符串消费者接收到的消息: ["Alian超级喜欢Golang语言:0","Alian超级喜欢Golang语言:1","Alian超级喜欢Golang语言:2","Alian超级喜欢Golang语言:3","Alian超级喜欢Golang语言:4","Alian超级喜欢Golang语言:5","Alian超级喜欢Golang语言:6","Alian超级喜欢Golang语言:7","Alian超级喜欢Golang语言:8","Alian超级喜欢Golang语言:9"]

  RocketMQ对于单条消息和批量消息在队列中是如何被处理的?

  • 对于单条发送的消息,RocketMQ会按照队列中的顺序,将每条消息分发给一个消费者线程。因此,即使有多个消费者线程,由于每条消息都被单独处理,消费的顺序仍然会与发送的顺序一致。

  • 对于批量发送的消息,情况就有所不同。批量消息是作为一个整体发送的,因此在队列中,它们被视为一个单独的实体。当RocketMQ从队列中取出批量消息时,它会将整个批量消息作为一个整体分发给一个消费者线程。如果有多个消费者线程,由于操作系统的线程调度策略,处理批量消息的线程可能会在处理消息的过程中被调度出去,从而允许其他线程处理后面的消息。这样就可能导致消费的顺序与发送的顺序不一致。

4.5、结论

为此我测试了多次,得到结论:

  • 单条发送消息到同一个队列,使用多个消费线程消费该队列,由于消息本身是有序的,所以消费顺序也是有序的
  • 单批次批量发送消息到同一个队列,使用单个消费线程消费该队列,由于消费线程是单一的,所以消费顺序也是有序的
  • 单批次批量发送消息到同一个队列,使用多个消费线程消费时,消费顺序就不是有序的了

五、其他

  既然知道批量消息是作为一个整体的,那么肯定就会对消息大小有限制,在 Apache RocketMQ 中,批量消息的大小默认限制是4MB。这意味着,你不能发送总大小超过4MB的批量消息。

如果你想修改这个限制,你需要修改RocketMQ的配置。具体的修改方法如下:

  1. 找到RocketMQ的配置文件broker.conf,这个文件通常位于RocketMQ安装目录的conf目录下。
  2. broker.conf文件中,找到maxMessageSize这个配置项。这个配置项决定了批量消息的最大大小。
  3. 修改maxMessageSize的值为你想要的大小。注意,这个值是以字节为单位的,所以如果你想设置批量消息的最大大小为8MB,你应该设置maxMessageSize=8388608
  4. 保存并关闭broker.conf文件。
  5. 重启RocketMQBroker服务,以使新的配置生效。

  虽然你可以通过修改配置来增加批量消息的最大大小,但是你应该谨慎地考虑这个决定。增加批量消息的最大大小可能会增加Broker的内存使用量,并可能影响到消息的发送和接收性能。因此,在修改这个配置之前,你应该先考虑你的应用的需求和Broker的性能。

  因为优先的是@RocketMQMessageListener 注解中设置 consumerGroupmessageModel 参数。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/786728.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

yolov5目标检测可视化界面pyside6源码(无登录版)

一、软件简介&#xff1a; 这是基于yolov5-7.0目标检测实现的的可视化目标检测源码 本套项目没有用户登录的功能&#xff0c;如需用户登录版&#xff0c;看另一篇文章&#xff1a;yolov5pyside6登录用户管理目标检测可视化源码_yolov5用户登入功能-CSDN博客 ①程序中图片和图标…

稳定性生产总结

本期我们来谈下稳定性生产这个话题&#xff0c;稳定性建设目标有两个&#xff1a;降发生、降影响&#xff0c; 在降发生中的措施是做到三点&#xff1a;系统高可用、 高性能、 高质量&#xff0c;三高问题确实是一个很热的话题&#xff0c;里面涉及很多点。 在降影响中要做到…

Windows系统搭建TortoiseSVN客户端并实现无公网IP访问内网服务端

文章目录 前言1. TortoiseSVN 客户端下载安装2. 创建检出文件夹3. 创建与提交文件4. 公网访问测试 前言 TortoiseSVN是一个开源的版本控制系统&#xff0c;它与Apache Subversion&#xff08;SVN&#xff09;集成在一起&#xff0c;提供了一个用户友好的界面&#xff0c;方便用…

Wheel Controller 3D

Wheel Controller 3D是Unity内置WheelCollider的完整替代品。它允许更真实的车辆行为、完全定制和3D地面检测。 Wheel Controller 3D是Unity内置WheelCollider的完整替代品。它允许更真实的车辆行为、完全定制和3D地面检测。 如果您正在寻找包含Wheel Controller 3D的完整车辆物…

路径规划——搜索算法详解(六):LPA*算法详解与Matlab代码

上文讲解了D*算法&#xff0c;D*算法为在动态环境下进行路径规划的场景提出了可行的解决方案&#xff0c;本文将继续介绍另外一种动态规划路径的方法——Lifelong Planning A*&#xff08;LPA*&#xff09;算法。 该算法可以看作是A*的增量版本&#xff0c;是一种在固定起始点…

idea开发 java web 酒店推荐系统bootstrap框架开发协同过滤算法web结构java编程计算机网页

一、源码特点 java 酒店推荐推荐系统是一套完善的完整信息系统&#xff0c;结合java web开发和bootstrap UI框架完成本系统 采用协同过滤算法进行推荐 &#xff0c;对理解JSP java编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式…

萨科微slkor(www.slkoric.com)半导体

萨科微slkor&#xff08;www.slkoric.com&#xff09;半导体技术总监&#xff0c;清华大学李老师介绍说&#xff0c;IGBT器件与MOSFET在技术上的主要区别在于&#xff0c;在IGBT芯片背面引入了一个P掺杂的集电极。从MOSFET拓展至IGBT主要存在IGBT器件设计和IGBT器件加工工艺两方…

Layui三级联动插件使用方法

Layui高版本中没有在提供三级联动这个动画了&#xff0c;而是封装成了一个插件&#xff0c;使用方式也很简单 官网 省市县区三级联动下拉选择器 layarea - Layui 第三方扩展组件平台 (layuion.com)https://dev.layuion.com/extend/layarea/#doc html页面约束 整个选择器需要…

如何在 Oracle 中使用 CREATE SEQUENCE 语句

在本文中&#xff0c;我们将讨论 Oracle CREATE SEQUENCE 语句&#xff0c;其主要目的是提供一种可靠的方法来生成唯一且连续的数值&#xff0c;通常用于数据库表中的主键字段。此功能对于维护数据完整性和效率、确保不同记录之间的标识符有序分配尤其重要。从本质上讲&#xf…

日记本(源码+文档)

日记本&#xff08;小程序、ios、安卓都可部署&#xff09; 文件包含内容程序简要说明功能项目截图客户端首页日记列表 书写日记个人中心设置密码锁拨打客服热线修改信息退出登录登录页输入密码锁注册页 后端管理登录页首页管理员列表管理用户管理日记列表管理日记数据 文件包含…

【stm32】USART编码部分--详细步骤

USART编码部分(文章最后附上源码) 如果看不懂步骤可以根据源码参考此篇文章就能轻而易举学会USART通信啦&#xff01; 编码步骤 第一步 开启时钟 把需要用到的USART和GPIO的时钟打开 第二部 GPIO初始化 把TX配置成复用输出&#xff0c;RX配置成输入(上拉输入、浮空输入)。…

C++ 注册Nacos

下载源码&#xff1a; git clone GitHub - nacos-group/nacos-sdk-cpp: C client for Nacos 编译源码 cd nacos-sdk-cpp cmake . make 生成库文件 在nacos-sdk-cpp 下 注册nacos 将include 和libnacos-cli.so libnacos-cli-static.a 放入你的工程 如果Nacos服务地址:…

ExpressionUtil的应用

ExpressionUtil是什么 ExpressionUtil是一个工具类&#xff0c;用于处理表达式相关的操作。它提供了一些方法&#xff0c;方便用户在程序中处理表达式相关的计算、比较、转换等操作。例如&#xff0c;可以使用ExpressionUtil计算一个数学表达式的结果&#xff0c;比较两个表达式…

代码随想录笔记|C++数据结构与算法学习笔记-栈和队列(〇)|stack、queue、单调队列和优先级队列(priority_queue)、大顶堆和小顶堆

文章目录 stack容器stack 基本概念常用接口构造函数赋值操作数据存取大小操作 queue容器queue常用接口构造函数赋值操作数据存取大小操作 栈和队列的灵魂四问C中stack,queue是容器吗我们使用的stack,queue属于哪个版本的STL我们使用的STL中stack,queue是如何实现的&#xff1f;…

SAP CAP篇十六:写个ERP的会计系统吧,Part III

本文目录 本系列文章目标开发步骤数据库表设计Service 定义生成Fiori App更新CDS Annotation更新Entity: Companies更新Entity&#xff1a;Accounts App运行 本系列文章 SAP CAP篇一: 快速创建一个Service&#xff0c;基于Java的实现 SAP CAP篇二&#xff1a;为Service加上数据…

volatile关键字的作用

volatile 关键字告诉编译器 i 是随时可能发生变化的&#xff0c;每次使用它的时候必须从内存中取出 i 的值&#xff0c;因而编译器生成的汇编代码会重新从 i 的地址处读取数据放在 k 中。 所以说使用 volatile 声明的变量的值的时候&#xff0c;系统总是重新从它所在的内存读…

NLP学习路线总结:从入门到精通

自然语言处理&#xff08;Natural Language Processing&#xff0c;NLP&#xff09;是人工智能领域的重要分支&#xff0c;它致力于使计算机能够理解、解释和生成人类语言。NLP技术的应用范围广泛&#xff0c;涵盖了机器翻译、情感分析、语义理解、信息抽取等诸多领域。对于想要…

每日一题---存在重复元素(1)和(2)

文章目录 一、存在重复数组1,1.题目展示1.2.解题思路1.3.参考代码 二、存在重复元素||2.1.题目展示2.2.解题思路2.3.参考代码 大家学习完了数组&#xff0c;指针等内容可以进行刷题了&#xff0c;刷题不仅可以增加大家的代码量&#xff0c;也可以积累自己的经验&#xff0c;言归…

C语言之指针的指向地址和指针的内容总结(八十九)

简介&#xff1a; CSDN博客专家&#xff0c;专注Android/Linux系统&#xff0c;分享多mic语音方案、音视频、编解码等技术&#xff0c;与大家一起成长&#xff01; 优质专栏&#xff1a;Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 优质专栏&#xff1a;多媒…

量化交易入门(三十九)怎么获取A股历史数据

前面我们都是以美股的苹果股票为例进行策略和技术指标的回测&#xff0c;量化交易对中国A股是否适用呢&#xff1f;我们怎么样免费获取A股的股票数据呢&#xff1f;我给你们介绍三个免费的数据平台Tushare 、AkShare和Baostock。 1、Tushare Tushare是一个免费、开源的Pytho…