Rabbitmq消息积压问题如何解决以及如何进行限流

一、增加处理能力
优化系统架构、增加服务器资源、采用负载均衡等手段,以提高系统的处理能力和并发处理能力。通过增加服务器数量或者优化代码,确保系统能够及时处理所有的消息。

二、异步处理
将消息的处理过程设计为异步执行,即接收到消息立即返回响应,然后将消息放入队列中进行后续处理。这样可以避免同步请求的阻塞,提高系统的吞吐量和响应速度。

三、消息分片
如果消息体较大或者复杂,可以考虑将消息分片处理。将消息拆分为多个小的部分进行处理,减少单个消息的处理时间,从而提高整体处理能力。

四、集群扩展
根据实际情况,可以考虑通过添加更多的节点来扩展消息处理的集群规模,实现分布式部署和负载均衡,以应对大量消息的处理需求。

五、优化数据库操作
如果消息的处理涉及到数据库操作,可以考虑对数据库查询和写入进行性能优化,如建立索引、合理使用缓存等,以减少数据库的压力。

六、监控和报警
建立监控系统,实时监测消息队列的积压情况、处理延迟等指标,并设置相应的报警机制,及时发现和解决潜在问题,确保消息的正常处理。需要根据具体的场景和需求来选择适合的解决方案,综合考虑各种因素,以提高系统的消息处理能力和性能。

七、增加消费者数量
增加消费者可以提高消息处理速度,从而减少消息积压。可以根据消息的类型和优先级分配消费者,使消息得到及时处理。

八、增加队列
可以增加队列数量来缓解消息积压。根据消息的类型和优先级,可以将不同类型的消息存储在不同的队列中,更好的管理消息流量

九、设置消息的过期时间
可以设置消息的过期时间,当消息在队列中等待时间超过指定时间时,会被自动删除,直到消息被正确处理或超过最大重试次数为止。

十、使用限流机制
可以使用限流机制来控制消费者的消费速度,避免消息过多导致消费者无法及时处理。可以使用Qos机制,设置每个消费者同时处理消息的最大数量,从而保证系统的性能和稳定性。

如何进行限流了

电商中秒杀请求,属于瞬间大流量,同一时刻会有大量的请求涌入到系统中,可能导致系统挂掉。应付这种瞬间大流量的其中一种方式,便是利用消息队列

1、利用消息队列先进先出的特性,将请求进行削峰;
2、控制好消费端的消费速度,进行必要的限流。

在消费端,要做到上面提到的第2点,在Spring Boot RabbitMQ中只需要利用@RabbitListener注解,做一些简单配置就可以了。


一个listener对应多个consumer


默认情况一下,一个listener对应一个consumer,如果想对应多个,有两种方式。

方式一:直接在application.yml文件中配置

spring:rabbitmq:listener:simple:concurrency: 5max-concurrency: 10

这个是个全局配置,应用里的任何队列对应的listener都至少有5个consumer,但是千万别这么做,因为一般情况下,一个listener对应一个consumer是够用的。只是针对部分场景,才需要一对多。

方式二:直接在@RabbitListener上配置

@Component
public class SpringBootMsqConsumer {@RabbitListener(queues = "spring-boot-direct-queue",concurrency = "5-10")public void receive(Message message) {System.out.println("receive message:" + new String(message.getBody()));}
}

利用@RabbitListener中的concurrency属性进行指定就行。例如上面的

concurrency = “5-10”

就表示最小5个,最大10个consumer。启动消费端应用后,找到spring-boot-direct-queue这个队列的consumer,会发现有5个。

在这里插入图片描述

这5个消费者都可以从spring-boot-direct-queue这个队列中获取消息,加快队列中消息的消费速度,提高吞吐量。


限流


我们经过压测,来判断consumer的消费能力,如果单位时间内,consumer到达的消息太多,也可能把消费者压垮。
得到压测数据后,可以在@RabbitListener中配置prefetch count

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class SpringBootMsqConsumer {@RabbitListener(queues = "spring-boot-direct-queue",concurrency = "5-10",containerFactory = "mqConsumerlistenerContainer")public void receive(Message message) {System.out.println("receive message:" + new String(message.getBody()));}
}

只需要在@RabbitListener中,用containerFactory指定一个监听器工厂类就行。这里用的是:

containerFactory = “mqConsumerlistenerContainer”

定义监听器工厂类很简单。

import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMqConfig {@Autowiredprivate CachingConnectionFactory connectionFactory;@Bean(name = "mqConsumerlistenerContainer")public SimpleRabbitListenerContainerFactory mqConsumerlistenerContainer(){SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setPrefetchCount(50);return factory;}
}

上面有一句factory.setPrefetchCount(50);,就是用于设置prefetch count的,启动后,会在spring-boot-direct-queue队列的consumer中体现出来。
在这里插入图片描述
配置成功后,consumer单位时间内接收到消息就是50条。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/60618.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于机器学习的fNIRS信号质量控制方法

摘要 尽管功能性近红外光谱(fNIRS)在神经系统研究中的应用越来越广泛,但fNIRS信号处理仍未标准化,并且受到经验和手动操作的高度影响。在任何信号处理过程的开始阶段,信号质量控制(SQC)对于防止错误和不可靠结果至关重要。在fNIRS分析中&…

FreeSWITCH 1.10.10 简单图形化界面5 - 使用百度TTS

FreeSWITCH 1.10.10 简单图形化界面5 - 使用百度TTS 0、 界面预览1、注册百度AI开放平台,开通语音识别服务2、获取AppID/API Key/Secret Key3、 安装百度语音合成sdk4、合成代码5、在PBX中使用百度TTS6、音乐文件-TTS7、拨号规则-tts_command 0、 界面预览 http://…

网络有源号角(50W-100W)社区小区广播 工地语音播报,隧道广播,钢铁广播广播系统

网络有源号角(50W-100W)社区小区广播 工地语音播报,隧道广播,钢铁广播广播系统 SV-7042T 50W网络有源号角 SV-7042T是深圳锐科达电子有限公司的一款壁挂式网络有源号角,具有10/100M以太网接口,可将网络音…

ceph源码阅读 erasure-code

1、ceph纠删码 纠删码(Erasure Code)是比较流行的数据冗余的存储方法,将原始数据分成k个数据块(data chunk),通过k个数据块计算出m个校验块(coding chunk)。把nkm个数据块保存在不同的节点,通过n中的任意k个块还原出原始数据。EC包含编码和解…

解密Spring MVC异常处理:从局部到全局,打造稳固系统的关键步骤

😀前言 在现代软件开发中,异常处理是不可或缺的一部分,它能够有效地提高系统的稳定性和健壮性。在Spring MVC框架中,异常处理机制起着至关重要的作用,它允许开发者在程序运行过程中捕获、处理和报告异常,从…

Qt/C++编写视频监控系统80-远程回放视频流

一、前言 远程回放NVR或者服务器上的视频文件,一般有三种方式,第一种是调用厂家的SDK,这个功能最全,但是缺点明显就是每个厂家的设备都有自己的SDK,只兼容自家的设备,如果你的软件需要接入多个厂家的&…

【深入解读Redis系列】Redis系列(五):切片集群详解

首发博客地址 https://blog.zysicyj.top/ 系列文章地址[1] 如果 Redis 内存很大怎么办? 假设一台 32G 内存的服务器部署了一个 Redis,内存占用了 25G,会发生什么? 此时最明显的表现是 Redis 的响应变慢,甚至非常慢。 这…

分类预测 | MATLAB实现SSA-CNN-SVM基于麻雀算法优化卷积支持向量机分类预测

分类预测 | MATLAB实现SSA-CNN-SVM基于麻雀算法优化卷积支持向量机分类预测 目录 分类预测 | MATLAB实现SSA-CNN-SVM基于麻雀算法优化卷积支持向量机分类预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 MATLAB实现SSA-CNN-SVM基于麻雀算法优化卷积支持向量机分类预测…

数学建模:数据的预处理

🔆 文章首发于我的个人博客:欢迎大佬们来逛逛 文章目录 数据预处理数据变换数据清洗缺失值处理异常值处理 数据预处理 数据变换 常见的数据变换的方式:通过某些简单的函数进行数据变换。 x ′ x 2 x ′ x x ′ log ⁡ ( x ) ∇ f ( x k )…

Redis 持久化和发布订阅

一、持久化 Redis 是内存数据库,如果不将内存中的数据库状态保存到磁盘,那么一旦服务器进程退出,服务器中的数据库状态也会消失。所以 Redis 提供了持久化功能! 1.1、RDB(Redis DataBase) 1.1.1 …

第一方支付、第二方支付、第三方支付、第三方支付是什么?

我相信关于支付行业大家多多少少都有一些自己的理解,但是具体的一些名词如标题中的这些,第一方、第二方、第三方支付,到底指的是什么? 第一方支付 也就是现金支付,其本质的意义就是指货币支付,从最早出现货…

猜拳游戏小程序源码 大转盘积分游戏小程序源码 积分游戏小程序源码

简介: 猜拳游戏大转盘积分游戏小程序前端模板源码,一共五个静态页面,首页、任务列表、大转盘和猜拳等五个页面 图片:

Spring与Mybatis集成且Aop整合

目录 一、集成 1.1 集成的概述 1.2 集成的优点 1.3 代码示例 二、整合 2.1 整合概述 2.2 整合进行分页 一、集成 1.1 集成的概述 集成是指将不同的组件、部分或系统组合在一起,以形成一个整体功能完整的解决方案。它是通过连接、交互和协调组件之间的关系来实…

MybatisPlus-插件篇

文章目录 一、前言二、插件1、分页插件2.1.1、引入依赖2.1.1、配置分页插件2.1.3、使用分页方法 2、乐观锁插件2.1、引入依赖2.2、添加版本字段2.3、配置乐观锁插件2.4、执行更新操作 三、总结 一、前言 本文将详细介绍mybatisplus中常用插件的使用。 二、插件 1、分页插件 …

Texlive2023与Texstudio2023卸载与安装(详细全程)

早在两年前安装了texlive2020,最近重新使用总是报错,好像是因为版本过低。我就找了个时间更新一下texlive版本,全程如下。 1、卸载texlive老版本 1)找到texlive目录,比如我的是D:\texlive\2022\tlpkg\installer&…

链路聚合原理

文章目录 一、定义二、功能三、负载分担四、分类五、常用命令 首先可以看下思维导图,以便更好的理解接下来的内容。 一、定义 在网络中,端口聚合是一种将连接到同一台交换机的多个物理端口捆绑在一起,形成一个逻辑端口的技术。通过端口聚合&…

Llama模型结构解析(源码阅读)

目录 1. LlamaModel整体结构流程图2. LlamaRMSNorm3. LlamaMLP4. LlamaRotaryEmbedding 参考资料: https://zhuanlan.zhihu.com/p/636784644 https://spaces.ac.cn/archives/8265 ——《Transformer升级之路:2、博采众长的旋转式位置编码》 前言&#x…

Uniapp笔记(五)uniapp语法4

本章目标 授权登录【难点、重点】 条件编译【理解】 小程序分包【理解】 一、授权登录 我的模块其实是两个组件&#xff0c;一个是登录组件&#xff0c;一个是用户信息组件&#xff0c;根据用户的登录状态判断是否要显示那个组件 1、登录的基本布局 <template><…

【java】【已解决】IDEA启动报错:Lombok Requires Annotation Processing

解决办法&#xff1a; 1、根据异常提示操作&#xff1a; 直接点击错误提示后面的蓝色标识【Enable】&#xff08;小编点完了所以变灰色&#xff09;&#xff0c;此操作等价于下面的步骤&#xff1a; 【File】-->【Settings】-->【Build】-->【Compiler】-->【Ann…

【数学建模】-- 模糊综合评价

模糊综合评价&#xff08;Fuzzy Comprehensive Evaluation&#xff09;是一种用于处理不确定性和模糊性信息的决策分析方法。它通常用于解决复杂的多指标决策问题&#xff0c;其中各指标之间可能存在交叉影响和模糊性的情况。模糊综合评价通过将不确定性和模糊性量化&#xff0…