RocketMQ消息积压如何处理

在高并发的场景下,由于消息产生速度超过消费速度,可能会导致消息积压的问题。本文将介绍 RocketMQ 消息积压的原因和如何处理积压问题。

什么是消息积压

消息积压是使用 MQ 消息队列系统中,最常见的一种性能问题。如下图所示,当生产端的生产效率大于消费端的消费效率就会造成消息处理不完的情况,也就叫 “消息积压”。

消息积压原因

消息积压的原因可以归结为以下几点:

  • 消费者处理速度慢:当消息消费者的处理能力跟不上消息产生的速度时,消息将积压在消息队列中。
  • 消息消费失败:当消息消费者由于某种原因无法正确消费消息时,消息会一直留在消息队列中,导致积压。
  • 配置不合理:如果消息队列的容量设置过小或者消费者的线程数设置过少,都可能导致消息积压。

处理消息积压的方法

当发生了消息积压,这时就得想办法赶紧把积压的消息消费完,就得考虑提高消费能力,一般有两种办法:

消费者扩容

Consume 实例数量小于 MessageQueue 的数量时,增加 Consume 实例可以对消费者进行扩容,来提高消费能力。比如一个 Topic 有 4 个 MessageQueue,2 个消费者进行消费,如果增加一个消费者,明细可以加快拉取消息的频率。

消息迁移Queue扩容

Consume 实例数量大于等于 MessageQueue 的数量时,这种情况再扩容 Consume 实例就没什么用,就得考虑扩容 MessageQueue。

可以新建一个临时的 Topic,临时的 Topic 多设置一些 MessageQueue,然后先用一些消费者把消费的数据丢到临时的 Topic,因为不用业务处理,只是转发一下消息,还是很快的。接下来用扩容的消费者去消费新的 Topic 里的数据,消费完了之后,恢复原状。比如一个 Topic 有 4 个 MessageQueue,并且有 4 个消费者进行消费。

通过前期设置提高消费能力

虽然通过扩容可以在一定程度上解决消息积压问题,但在一些特殊情况下还是会出现消息积压的问题。

消费者消息拉取的速度也取决于本地消息的消费速度,如果本地消息消费的慢,就会延迟一段时间后再去拉取。又是在什么情况下消费者会延迟一段时间后后再去拉取呢?

增加消息队列的容量

如果消息队列的容量设置过小,也有可能会导致消息积压。

可以通过增加消息队列的容量来缓解积压问题。但需要注意,过大的消息队列容量可能会增加消息处理的延迟。

消费者拉取的消息存在 ProcessQueue,消费者是有流量控制的,如果出现下面三种情况,就不会主动去拉取:

  • ProcessQueue 保存的消息数量超过阈值(默认 1000);
  • ProcessQueue 保存的消息大小超过阈值(默认 100M);
  • 对于非顺序消费的场景,ProcessQueue 中保存的最后一条和第一条消息偏移量之差超过阈值(默认 2000)。

优化消息消费的逻辑

检查消息消费逻辑是否存在性能瓶颈或者不必要的复杂计算。优化消息消费的逻辑可以提高消费速度,减少消息积压。

对于顺序消费的场景,ProcessQueue 加锁失败,也会延迟拉取,这个延迟时间是 3s。

设置消息消费失败的处理机制

当消息消费失败时,可以根据业务需求选择合适的处理方式。可以将失败的消息记录下来,后续再次消费;或者将失败的消息发送到死信队列进行处理。

监控和报警机制

建立监控和报警机制,及时发现消息积压的情况并采取相应的措施。可以通过监控指标、日志或者专业的监控工具来实现。

消息批量处理

消费者每次一条一条消费会很慢,如果再有事务的情况下会更慢。此时可能通过批量的方式获取数据,再对数据批量操作,

RocketMQ消息积压是在高并发场景下常见的问题,需要合理的处理策略来保证消息系统的稳定性和性能。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/715455.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2、Redis-Hash【常用】

目录 一、Hash和String的区别 二、常用命令与演示 三、Redis中Hash类型应用场景 一、Hash和String的区别 这是String, keyvaluenameTrxcx 这是Hash, keyvaluestudentTrxcxnameTrxcxage21sexmale 可以明显的看出,String的value就是一条数据&#…

手动实现一个简单的 HTTP 请求

本文我们通过 Socket,写一个 HTTP 协议,直观的感受一下上篇文章中的请求和响应。 定义 socket server 通过上篇文章,我们知道 HTTP 协议底层是通过 Socket 实现的,所以我们先通过 socket 定义一个 server import socket#初始化 …

复试PAT乙级day34

1111~1115 1113 很难,看了题解 人类习惯用 10 进制,可能因为大多数人类有 10 根手指头,可以用于计数。这个世界上有一种叫“钱串子”(学名“蚰蜒”)的生物,有 30 只细长的手/脚,在它们的世界里…

【探索AI】十六 深度学习之第2周:深度神经网络(五)实践与应用

实践与应用 实现步骤 当您想要使用深度学习框架构建简单的深度神经网络并进行训练与评估时,您可以按照以下步骤进行操作: 步骤一:选择深度学习框架 选择您熟悉或希望学习的深度学习框架,比如TensorFlow、PyTorch、Keras等。 …

算法题目跟连系列之“手把手刷链表”

第一道 题目:https://leetcode.cn/problems/partition-list/description/ 86 Partition List 这个题解决的时候,无非就是把链表中小于X的元素摘出来形成一个链表,同时也把大于等于X的元素摘出来形成另外一个链表。最后把这两个链表合并。这个…

卷积神经网络介绍

卷积神经网络(Convolutional Neural Networks,CNN) 网络的组件:卷积层,池化层,激活层和全连接层。 CNN主要由以下层构造而成: 卷积层:Convolutional layer(CONV)池化层&#xff1a…

docker报错 fatal error: runtim: out of memory

fatal error: runtim: out of memory 真无语了 系统内存也够用 原来是虚拟机的不够用了 (原本1g已经加到2g还是会报错) 直接3台虚拟机都加到4g

多线程(进阶四:线程安全的集合类)

目录 一、多线程环境使用ArrayList 二、多线程环境使用队列 三、多线程环境使用哈希表 1、HashMap 2、Hashtable 3、ConcurrentHashMap (1)缩小了锁的粒度 (2)充分使用了CAS原子操作,减少一些加锁 (3)针对扩容操作的一些优化(化整为零&#xff…

maven 项目的创建入门

拓展阅读 maven 包管理平台-01-maven 入门介绍 Maven、Gradle、Ant、Ivy、Bazel 和 SBT 的详细对比表格 maven 包管理平台-02-windows 安装配置 mac 安装配置 maven 包管理平台-03-maven project maven 项目的创建入门 maven 包管理平台-04-maven archetype 项目原型 ma…

蓝桥杯Python B组练习——python复习2

蓝桥杯Python B组练习——python复习2 一、简介 复习python,参考书《Python编程从入门到实践》,[美]Eric Mathes著。前一部分见专栏——蓝桥杯Python B组练习 这一部分不全,不想写了 二、字典 1.一个简单的字典 来看一个游戏&#xff0…

LeetCode -55 跳跃游戏

LeetCode -55 跳跃游戏 给你一个非负整数数组 nums ,你最初位于数组的 第一个下标 。数组中的每个元素代表你在该位置可以跳跃的最大长度。 判断你是否能够到达最后一个下标,如果可以,返回 true ;否则,返回 false 。…

模拟服务器响应的测试框架:moco

第1章:引言 大家好,我是小黑,在这篇博客中,咱们要聊聊Moco测试框架。这个框架,可不是一般的小伙伴,它在模拟服务器响应这块儿,可是有不少看家本领。 首先,Moco是啥呢?简…

stable diffusion webUI之赛博菩萨【秋葉】——工具包新手安裝与使用教程

stable diffusion webUI之赛博菩萨【秋葉】——工具包新手安裝与使用教程 AI浪潮袭来,还是学习学习为妙赛博菩萨【秋葉】简介——(葉ye,四声,同叶)A绘世启动器.exe(sd-webui-aki-v4.6.x)工具包安…

【面试题解析--Java基础】回顾与加深,浅浅回顾JAVA常规八股,利用起碎片化时间。

一、Java基础 1. final 关键字的作用: 修饰类时,被修饰的类无法被继承。修饰方法时,被修饰的方法无法被重写。修饰变量时,变量为常量,初始化后无法重新赋值。 2. static 关键字的作用: 修饰变量和方法时…

住房贷款利息退税笔记

应该缴税了才能退税,如果是学生,没有缴税应该是无法退税的。 产权证明 如果是商品房,没有取得房产证,那就是房屋预售合同 扣除年度 应选择上一年 扣除比例 没有结婚,选否 申报方式

unity 数学 如何计算线和平面的交点

已知一个平面上的一点P0和法向量n,一条直线上的点L0和方向L,求该直线与该平面的交点P 如下图 首先我们要知道向量归一化点乘之后得到就是两个向量的夹角的余弦值,如果两个向量相互垂直则值是0,小于0则两个向量的夹角大于90度,大于…

(C语言)函数详解上

(C语言)函数详解上 目录: 1. 函数的概念 2. 库函数 2.1 标准库和头文件 2.2 库函数的使用方法 2.2.1 sqrt 功能 2.2.2 头文件包含 2.2.3 实践 2.2.4 库函数文档的一般格式 3. 自定义函数 3.1 函数的语法形式 3.2 函数的举例 4. 形参和实参 4.…

MySQL-CDC 新增同步表确无法捕获增量问题处理

Flink-CDC版本&#xff1a;2.3.0 问题描述 之前通过Flink-CDC捕获Mysql数据库的数据变更情况&#xff0c;代码大致如下&#xff1a; StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(flinkEnvConf);MySqlSource<String> mysql …

Redis--事务机制的详解及应用

Redis事务的概念&#xff1a; Redis事务就是将一系列命令包装成一个队列&#xff0c;在执行时候按照添加的顺序依次执行&#xff0c;中间不会被打断或者干扰&#xff0c;在执行事务中&#xff0c;其他客户端提交的命令不可以插入到执行事务的队列中&#xff0c;简单来说Redis事…

【Linux】进程优先级以及Linux内核进程调度队列的简要介绍

进程优先级 基本概念查看系统进程修改进程的优先级Linux2.6内核进程调度队列的简要介绍和进程优先级有关的概念进程切换 基本概念 为什么会存在进程优先级&#xff1f;   进程优先级用于确定在资源竞争的情况下&#xff0c;哪个进程将被操作系统调度为下一个运行的进程。进程…