RocketMQ如何安全的批量发送消息❓

优点:

批量发送消息可以提高rocketmq的生产者性能和吞吐量。

使用场景:

  1. 发送大量小型消息时;
  2. 需要降低消息发送延迟时;
  3. 需要提高生产者性能时;

注意事项:

  1. 消息列表的大小不能超过broker设置的最大消息大小;
  2. 消息列表的大小不能超过生产证设置的maxMessageSize 参数,此参数默认为 4MB;
  3. 批量发送消息不支持消息事务;
  4. 如果代码在发送消息列表时发生异常,则可能会发生部分消息发送成功,部分消息发送失败的情况。如果要确保所有消息都已成功发送,则需要增加错误处理逻辑和消息重试机制;


批量发送消息为什么要限制maxMessageSize❓

消息列表的大小不能超过生产者设置的maxMessageSize参数,主要是为了避免消息发送延迟和消息过大导致broker出现性能问题。如果尝试发送大于maxMessageSize的消息,RocketMQ会抛出MessageTooLargeException异常,并且消息不会被发送到broker。

如果开发者在开发时遇到了消息列表大小超过maxMessageSize的情况,可以考虑以下几种处理方式:

    1. 提升maxMessageSize参数的大小,这样可以容纳更大的消息列表。但是,需要注意在提升参数大小时,要考虑到RocketMQ broker的性能和网络带宽等因素。
    2. 考虑将消息列表进行拆分,然后分批发送。这样可以避免一次发送过多的消息。
    3. 计算消息的大小并进行压缩。可以使用一些压缩算法,如 LZ4、Snappy 等,对消息进行压缩,以减小消息的大小。
    4. 对超过 maxMessageSize 的消息进行过滤或其他处理。可以通过业务逻辑对消息进行分组或分类,对超过 maxMessageSize 的消息进行过滤或其他处理,以避免发送超出限制的消息。

代码实现

package com.resource.sync.rocketmq;import java.util.Iterator;
import java.util.List;/*** @description:消息分割,在rocketmq中,一次性发送消息的长度不可超过4mb,此时我们需要进行切割,确保消息长度小于4mb**/
public class ListSplitter<T> implements Iterator<List<T>> {/*** 分割数据大小*/private int sizeLimit;/*** 分割数据列表*/private final List<T> messages;/*** 分割索引*/private int currIndex;public ListSplitter(int sizeLimit, List<T> messages) {this.sizeLimit = sizeLimit;this.messages = messages;}@Overridepublic boolean hasNext() {return currIndex < messages.size();}@Overridepublic List<T> next() {int nextIndex = currIndex;int totalSize = 0;for (; nextIndex < messages.size(); nextIndex++) {T t = messages.get(nextIndex);totalSize = totalSize + t.toString().length();if (totalSize > sizeLimit) {break;}}List<T> subList = messages.subList(currIndex, nextIndex);currIndex = nextIndex;return subList;}
}
    private final int maxMessageSize = 1024 * 1024 * 4;/*** 消息分割(批量发送)*/private void bulkSendMsg(List<Message<String>> messageList) {// 限制数据大小ListSplitter splitter = new ListSplitter(maxMessageSize, messageList);while (splitter.hasNext()) {List<Message> nextList = splitter.next();syncBulkSendMessage("topic", nextList);}}/*** @param topic* @param list* @description:发送实时消息(批量)*/public void syncBulkSendMessage(String topic, List<Message> list) {SendResult sendResult = null;try {sendResult = rocketMQTemplate.syncSend(topic, list);if (sendResult.getSendStatus() != SendStatus.SEND_OK) {log.error("BULK_ROCKET_MQ_DISTRIBUTION_ERROR.RESULT_STATUS:{},MSG_ID:{}", sendResult.getSendStatus(), sendResult.getMsgId());}if (sendResult.getSendStatus() == SendStatus.SEND_OK) {log.info("BULK_SEND_MSG_SUCCESS.MSG_ID:{}", sendResult.getMsgId());}} catch (Exception e) {log.error("BULK_ROCKET_MQ_DISTRIBUTION_ERROR:{}", e);}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/135575.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何快速教你看自己电脑cpu是几核几线程

目录 一、我们日常中说的电脑多少核多少线程&#xff0c;很多人具体不知道什么意思&#xff0c;下面举例4核和4线程什么意思。二、那么4线程又是怎么回事呢&#xff1f;三、那么知道了上面的介绍后怎么看一台电脑是几核&#xff0c;几线程呢&#xff1f; 一、我们日常中说的电脑…

JSONP 跨域访问(2), JSONP劫持

JSONP 跨域访问(2), JSONP劫持 一, 利用 XSS 漏洞执行jsonp 1. 利用过程 发现有jsonp的请求: <script type"text/javascript" src"http://192.168.112.200/security/jsonp.php?callbackjsonpCallback"></script>向xss漏洞的位置注入代码…

​软考-高级-信息系统项目管理师教程 第四版【第24章-法律法规与标准规范-思维导图】​

软考-高级-信息系统项目管理师教程 第四版【第24章-法律法规与标准规范-思维导图】 课本里章节里所有蓝色字体的思维导图

springboot 项目升级 2.7.16 踩坑

记录一下项目更新版本依赖踩坑 这个是项目最早的版本依赖 这里最初是最初是升级到 2.5.7 偷了个懒 这个版本的兼容性比较强 就选了这版本 也不用去修改就手动的去换了一下RabbitMQ的依赖 因为这边项目有AMQP 风险预警 1.spring-amqp版本低于2.4.17的用户应升级到2.4.17 2.spri…

时序预测 | MATLAB实现WOA-CNN-BiLSTM-Attention时间序列预测(SE注意力机制)

时序预测 | MATLAB实现WOA-CNN-BiLSTM-Attention时间序列预测&#xff08;SE注意力机制&#xff09; 目录 时序预测 | MATLAB实现WOA-CNN-BiLSTM-Attention时间序列预测&#xff08;SE注意力机制&#xff09;预测效果基本描述模型描述程序设计参考资料 预测效果 基本描述 1.MAT…

蓝桥杯官网练习题(移动距离)

题目描述 X 星球居民小区的楼房全是一样的&#xff0c;并且按矩阵样式排列。其楼房的编号为 1,2,3, 当排满一行时&#xff0c;从下一行相邻的楼往反方向排号。 比如&#xff1a;当小区排号宽度为 6 时&#xff0c;开始情形如下&#xff1a; 1 2 3 4 5 6 12 …

python模块之redisbloom redis布隆过滤器

一、简介 RedisBloom 是一个 Redis 模块&#xff0c;提供了布隆过滤器&#xff08;Bloom Filter&#xff09;、计数器&#xff08;Count-Min Sketch&#xff09;、Top-K&#xff08;Top-K&#xff09;、Top-K with expiry&#xff08;Top-K with Expiration&#xff09;和多样…

开发知识点-golang

golang语言学习 环境搭建win10配置go环境 ubuntu20.04安装golang介绍下载 Go 压缩包调整环境变量验证 Go 安装过程 环境搭建 win10配置go环境 中文网进行下载 https://studygolang.com/dl 配置环境变量 增加GOROOT: 新建 -->变量名为: GOROOT(必须大写) 变量值: 你安装…

2311rust无畏并发.

原文 Rust无畏并发 Rust是为了解决两个麻烦问题: 1,如何安全系统编程 2,如何无畏并发 最初,这些问题似乎是无关的,但令惊讶的是,方法竟然是相同的:使Rust安全的相同工具也可帮助解决并发问题. 内存安全和并发错误,一般认为是代码在不应访问数据时访问数据.Rust依靠所有权为…

删除word最后一页之后的空白页

最近编辑word比较多&#xff0c;有时最后一页&#xff08;最后一页内容还有可能是表格&#xff09;之后&#xff0c;还有一页空白页&#xff0c;单独按下backspace、del都删不掉&#xff0c;很让人着急。 经过查询有几种方法&#xff1a; &#xff08;1&#xff09;点击选中空…

12、填写NGINX配置部署前端;运行jar部署后端

后端可以部署的方式&#xff0c;首先直接运行jar是肯定可以的。此外&#xff0c;可以单独开docker容器运行在容器中。 但是这里运行在容器中必要性&#xff0c;其实并不大。 当前我们直接运行jar来运行后端。后面推出集成docker。 直接运行jar包的方式&#xff0c;首先需要打…

Spark Streaming

Spark Streaming Spark Streaming概念Spark Streaming操作1 netcat传入数据2 DStream 创建3 自定义数据源4 接受kafka数据DStream 转换1无状态的转换2有状态的转换updateSateByKeyWindowOperations Spark Streaming概念 Spark Streaming 用于流式数据的处理。 Spark Streaming…

临界资源,临界区,通信的干扰问题(互斥),信号量(本质,上下文切换问题,原子性,自身的安全性,操作)

目录 引入 概念 临界资源 临界区 干扰存在原因 互斥 信号量 引入 举例 概念 介绍 表示可用资源数 表示等待进程数 申请信号量 信号量的本质 全局变量? 共享内存? 不安全问题 -- 上下文切换 原子性 信号量自身的安全性 原子操作的意义 操作 引入 通信…

Collection集合 迭代器遍历Iterator 和集合增强For

迭代器遍历Iterator 标准写法: 增强For for(类型 名称 : 集合 ) 举例: 不仅可以集合也可以数组 底层仍然是iterator

【MySQL】约束

一、基本概念 1、什么是约束 约束是表级的强制规定 2、为什么使用约束 是为了保证表中数据的完整性&#xff0c;完整性又可以拆分为精确性和可靠性 3、怎么去保证数据完整性呢&#xff0c;从以下四个角度进行考虑 实体完整性&#xff1a;一张表中&#xff0c;不能存在两条完…

Power Apps-库组件连接数据表

点击添加数据 可以选择Excel或SharePoint导入 选择右侧边栏中的网站&#xff0c;再选择想要连接的数据表 点击插入&#xff0c;选择布局中的某个库&#xff0c; 选中它可以点击上方的布局&#xff0c;选择想要的样式 右侧选择数据源中的表就将组件与数据表连接起来了 如果想修…

Vite创建React项目,另外一种更加简单的方法

在上一篇blog中一个一个安装依赖dependencies&#xff0c;有没有一步到位的方法呢&#xff0c;有! 参考《React 18 Design Patterns and Best Practices Design, build, and deploy production-ready web applications with React》4th 第一章倒数第二节Vite as a solution有个…

flutter生态一统甜夏 @Android @ios @windowse @macos @linux @Web

(愿景)G o o g l e 中 国flutter生态一统天下(IT) Web Android ios Windowse Macos Linux Google中国https://space.bilibili.com/64169458 https://pub-web.flutter-io.cn 构建 Flutter Web 应用 构建 Flutter Web 应用 - Flutter 中文文档 - Flutter 中文开发者网站 …

Packet Tracer路由器连接终端设备怎么配置?

在Packet Tracer中配置一台路由器和三台终端设备可以帮助你建立一个简单的局域网&#xff0c;以下是配置的基本步骤&#xff1a; 打开Packet Tracer&#xff0c;从左侧设备栏中拖拽一个路由器和三个终端设备到工作区。 连接设备&#xff1a;使用网线将路由器的端口与每台终端设…

vue3+setup 解决:this.$refs引用子组件报错 is not a function

一、如果在父组件中以下四步都没问题的话&#xff0c;再看下面步骤 二、如果父组件引用的是index页面 请在 头部加上以下代码 &#xff08;如果是form页面请忽略这一步&#xff09; <template> <a-modalv-model:visible"visible"title"头部名称&…