MessageQueue --- RabbitMQ WorkQueue and Prefetch

MessageQueue --- RabbitMQ WorkQueue and Prefetch

  • 什么是WorkQueue
  • 分发机制 --- RoundRobin
  • 分发机制 --- Prefetch
    • Spring example use prefetch --- Fair Dispatch

什么是WorkQueue

  • Work queues,任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。
  • 当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用workqueu模型,多个消费者共同处理消息处理,消息处理的速度就能大大提高了。

在这里插入图片描述

分发机制 — RoundRobin

工作机制:

  • 默认模式:当多个消费者订阅同一个队列时,RabbitMQ 会依次将消息分发给每个消费者,按顺序循环分配。
  • 示例:
    队列中有消息 M1, M2, M3, M4,消费者 C1 和 C2 同时订阅。
    分发顺序为:M1 → C1,M2 → C2,M3 → C1,M4 → C2。

特点:

  • 简单高效:无需额外配置,适合消费者处理速度相近的场景。

潜在问题:

  • 若消费者处理速度差异较大,可能导致某些消费者空闲,而其他消费者积压消息。
  • 例如:C1 处理速度慢,C2 处理速度快,但 C1 仍会分配到一半的消息,造成负载不均衡。

Example

//消息发送
//循环发送,模拟大量消息堆积现象。
@Test
public void testWorkQueue() throws InterruptedException {// 队列名称String queueName = "simple.queue";// 消息String message = "hello, message_";for (int i = 0; i < 50; i++) {// 发送消息,每20毫秒发送一次,相当于每秒发送50条消息rabbitTemplate.convertAndSend(queueName, message + i);Thread.sleep(20);}
}
//消息接收
//模拟多个消费者绑定同一个队列,我们添加2个方法,
//并且设置不同睡眠时间模拟不同性能读取
@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(20);
}
@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(200);
}

在这里插入图片描述

  • 消费者1很快完成了自己的25条消息
  • 消费者2却在缓慢的处理自己的25条消息。
  • 也就是说消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。导致1个消费者空闲,另一个消费者忙的不可开交。没有充分利用每一个消费者的能力,最终消息处理的耗时远远超过了1秒。这样显然是有问题的。

下面我们介绍prefetch机制,可以做到fair dispatch

分发机制 — Prefetch

工作机制:

  • 配置预取计数(Prefetch Count):通过设置 basicQos 参数,限制每个消费者未确认(unacknowledged)的消息数量。
  • 进入prefetch的消息仍会被保留在队列中,但是同时也会发给消费者等待处理
    在 RabbitMQ 的原始队列(Queue)中,会被标记为 “Unacked”(未确认)状态。
    这些消息不会被其他消费者获取(即使设置了 prefetch 的消费者崩溃)。
    只有消费者显式发送 ack 或 nack 后,消息才会从队列中移除(或重新排队)。

消息状态变化流程

  • 消息推送给消费者:
    RabbitMQ 将消息标记为 “Unacked”,但仍在队列中(占用内存或磁盘,取决于队列持久化配置)。
    此时消息对其他消费者不可见。
  • 消费者处理消息:
    若成功处理并发送 ack → 消息从队列中物理删除。
    若发送 nack(requeue=true) → 消息重新变为 “Ready” 状态,可被其他消费者获取。
    若发送 nack(requeue=false) 或者超时→ 消息被放入死信队列,如果没有配置死信队列则被丢弃

示例:

Channel channel = ...;
Consumer consumer = ...;
channel.basicQos(10); // Per consumer limit
channel.basicConsume("my-queue", false, consumer);
Channel channel = ...;
Consumer consumer = ...;
channel.basicQos(0); // No limit for this consumer,allowing any number of unacknowledged messages.
channel.basicConsume("my-queue", false, consumer);
Channel channel = ...;
Consumer consumer1 = ...;
Consumer consumer2 = ...;
channel.basicQos(10); // Per consumer limit
channel.basicConsume("my-queue1", false, consumer1);
channel.basicConsume("my-queue2", false, consumer2);
Channel channel = ...;
Consumer consumer1 = ...;
Consumer consumer2 = ...;
channel.basicQos(10, false); // Per consumer limit
channel.basicQos(15, true);  // Per channel limit
channel.basicConsume("my-queue1", false, consumer1);
channel.basicConsume("my-queue2", false, consumer2);
//这两个消费者之间总共最多只能有 15 条未确认消息,且每个消费者最多处理 10 条消息。
//由于需要在 Channel 和队列之间协调全局限制,该模式的性能会低于前述示例(存在额外开销)

特点:

  • 负载均衡:处理速度快的消费者会获取更多消息,避免空闲
  • 可以一次性发送多个消息给消费者处理,减少网络开销
  • 可靠性:需配合手动确认(ack)机制,确保消息处理成功后才从队列移除。
  • 适用场景:消费者处理速度差异较大时(如耗时任务),能显著提升整体吞吐量。
  • Automatic acknowledgement mode or manual acknowledgement mode with unlimited prefetch should be used with care. 通常设为 100~300,平衡吞吐与内存占用。

Note:

  • AMQP 0-9-1 协议是channel level prefetch,通过 basic.qos 方法限制channel上的未确认消息数
  • channel level有很大缺陷,由于单个channel可能从多个queue消费消息,channel与queue之间需要为每条消息进行协调,以确保不超出限制。这种机制在单机环境下效率较低,而在集群消费场景中性能会显著下降,大多数使用场景也需要consumer level prefetch
  • 所以RabbitMQ支持consumer level prefetch (也就是以上的例子)
    在这里插入图片描述

Spring example use prefetch — Fair Dispatch

  • 在spring中有一个prefetch的配置,我们修改consumer服务的application.yml文件,添加配置:
spring:rabbitmq:listener:simple:acknowledge-mode: manual # 设置确认方式为手动确认prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

在这里插入图片描述

  • 可以发现,由于消费者1处理速度较快,所以处理了更多的消息;消费者2处理速度较慢,只处理了6条消息。而最终总的执行耗时也在1秒左右,大大提升
  • 还可根据实际情况自定义prefetch count,达到限流的目的
spring:rabbitmq:listener:simple:acknowledge-mode: manual # 设置确认方式为手动确认prefetch: 5 # 限制消费者只能接收5条消息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/74729.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

RNN模型与NLP应用——(9/9)Self-Attention(自注意力机制)

声明&#xff1a; 本文基于哔站博主【Shusenwang】的视频课程【RNN模型及NLP应用】&#xff0c;结合自身的理解所作&#xff0c;旨在帮助大家了解学习NLP自然语言处理基础知识。配合着视频课程学习效果更佳。 材料来源&#xff1a;【Shusenwang】的视频课程【RNN模型及NLP应用…

详解AI采集框架Crawl4AI,打造智能网络爬虫

大家好&#xff0c;Crawl4AI作为开源Python库&#xff0c;专门用来简化网页爬取和数据提取的工作。它不仅功能强大、灵活&#xff0c;而且全异步的设计让处理速度更快&#xff0c;稳定性更好。无论是构建AI项目还是提升语言模型的性能&#xff0c;Crawl4AI都能帮您简化工作流程…

从零开始玩python--python版植物大战僵尸来袭

大家好呀&#xff0c;小伙伴们&#xff01;今天要给大家介绍一个超有趣的Python项目 - 用pygame制作植物大战僵尸游戏的进阶版本。相信不少小伙伴都玩过这款经典游戏&#xff0c;今天我们就用Python来实现它&#xff0c;让编程学习变得更加有趣&#xff01;&#x1f31f; 一、…

图解AUTOSAR_SWS_FlashTest

AUTOSAR Flash Test模块详解 基于AUTOSAR 4.4.0规范的Flash测试模块分析与图解 目录 概述 1.1 Flash Test模块的作用 1.2 工作原理架构设计 2.1 整体架构 2.2 依赖关系状态管理 3.1 状态转换图 3.2 前台与后台测试模式配置结构 4.1 配置类图 4.2 关键配置参数交互流程 5.1 序列…

【mongodb】mongodb的字段类型

目录 1. 基本数据类型1.1 String1.2 Number1.3 Boolean1.4 Date1.5 Null1.6 ObjectId1.7 Array1.8 Binary Data1.9 Object 2. 特殊数据类型2.1 Regular Expression2.2 JavaScript2.3 Symbol2.4 Decimal1282.5 Timestamp2.6 MinKey/MaxKey2.7 DBPointer 3. 常用字段类型示例4. 注…

MySQL篇(五)MySQL主从同步原理深度剖析

MySQL篇&#xff08;五&#xff09;MySQL主从同步原理深度剖析 MySQL篇&#xff08;五&#xff09;MySQL主从同步原理深度剖析一、引言二、MySQL主从同步基础概念主库&#xff08;Master&#xff09;从库&#xff08;Slave&#xff09;二进制日志&#xff08;Binary Log&#x…

论文学习16:Learning Transferable Visual Models From Natural Language Supervision

代码来源 Learning Transferable Visual Models From Natural Language Supervisionhttps://arxiv.org/pdf/2103.00020 模块作用 当前最先进的计算机视觉系统被训练用于预测一组固定的、预先定义的目标类别。这种受限的监督方式限制了它们的通用性和可用性&#xff0c;因为要…

[MySQL初阶]MySQL(9)事务机制

标题&#xff1a;[MySQL初阶]MySQL&#xff08;9&#xff09;事物机制 水墨不写bug 文章目录 一、认识事务1、多线程访问数据库出现的问题2、对CURD的限制是通过事务机制实现的3、事务的四个属性4、哪些引擎支持事务 二、事务的提交与autocommit设置三、事务的隔离性和隔离级别…

spring-cloud-alibaba-nacos-config使用说明

一、核心功能与定位 Spring Cloud Alibaba Nacos Config 是 Spring Cloud Alibaba 生态中的核心组件之一&#xff0c;专为微服务架构提供动态配置管理能力。它通过整合 Nacos 的配置中心功能&#xff0c;替代传统的 Spring Cloud Config&#xff0c;提供更高效的配置集中化管理…

SonarQube数据库配置

SonarQube部署完成后&#xff0c;在浏览器地址栏输入http://IP:9000可以进入登录页面&#xff0c;以本机运行为例&#xff0c;地址为http://127.0.0.1:9000/&#xff0c;默认登录名&#xff1a;admin&#xff0c;登录密码也是admin。登录后会要求设置密码&#xff1a; 按要求设…

医药档案区块链系统

1. 医生用户模块​​ ​​目标用户​​&#xff1a;医护人员 ​​核心功能​​&#xff1a; ​​检索档案​​&#xff1a;通过关键词或筛选条件快速定位患者健康档案。​​请求授权​​&#xff1a;向个人用户发起档案访问权限申请&#xff0c;需经对方确认。​​查看档案​…

CSS3学习教程,从入门到精通, 化妆品网站 HTML5 + CSS3 完整项目(26)

化妆品网站 HTML5 CSS3 完整项目 下面是一个完整的化妆品网站项目&#xff0c;包含主页、登录页面和注册页面。我将按照您的要求提供详细的代码和注释。 1. 网站规划与需求分析 需求分析 展示化妆品产品信息提供用户注册和登录功能响应式设计&#xff0c;适配不同设备美观…

ROS2 多机时间同步(Chrony配置简明指南)

适用场景&#xff1a; 主机运行 ROS2 Humble&#xff08;发布 /scan 等&#xff09;&#xff0c;板子运行 ROS2 Foxy&#xff08;发布 /tf 等&#xff09;&#xff0c;两边通过 ROS_DOMAIN_ID 跨平台通讯。需要保证系统时间对齐&#xff0c;避免 TF 插值失败、建图抖动等问题。…

Nginx配置伪静态,URL重写

Nginx配置伪静态&#xff0c;URL重写 [ Nginx ] 在Nginx低版本中&#xff0c;是不支持PATHINFO的&#xff0c;但是可以通过在Nginx.conf中配置转发规则实现&#xff1a; location / { // …..省略部分代码if (!-e $request_filename) {rewrite ^(.*)$ /index.php?s/$1 l…

电路笔记(元器件):ADC LTC系列模数转换器的输出范围+满量程和偏移调整

LTC1740(LTC1740官方文档)是Analog Devices&#xff08;原Linear Technology&#xff09;公司生产的一款高性能、低功耗的14位模数转换器(ADC)。它通常用于需要高精度和快速采样率的应用中&#xff0c;如通信系统、数据采集设备等。同类产品 LTC1746&#xff1a;一款14位、40Ms…

续-算法-数学知识

3、欧拉函数 1、定义&#xff1a; 1~n 中与 n 互质的数的个数 例如&#xff1a;6 的有 1 2 3 4 5 6 其中&#xff0c;与 n 互质 的 数的个数为 2个分别是&#xff1a;1、5 2、计算&#xff1a; $ N p_1^{a1} p_2^{a2} p_3^{a3} … p_k^{ak} $&#xff08;例如&#x…

C/C++测试框架googletest使用示例

文章目录 文档编译安装示例参考文章 文档 https://github.com/google/googletest https://google.github.io/googletest/ 编译安装 googletest是cmake项目&#xff0c;可以用cmake指令编译 cmake -B build && cmake --build build将编译产物lib和include 两个文件夹…

LintCode第974题-求矩阵各节点的最短路径(以0为标准)

描述 给定一个由0和1组成的矩阵&#xff0c;求每个单元格最近的0的距离。 两个相邻细胞之间的距离是1。 给定矩阵的元素数不超过10,000。 在给定的矩阵中至少有一个0。 单元格在四个方向上相邻:上&#xff0c;下&#xff0c;左和右。 样例 例1: 输入: [[0,0,0],[0,0,0],[0…

Redis核心机制-缓存、分布式锁

目录 缓存 缓存更新策略 定期生成 实时生成 缓存问题 缓存预热&#xff08;Cache preheating&#xff09; 缓存穿透&#xff08;Cache penetration&#xff09; 缓存雪崩&#xff08;Cache avalanche&#xff09; 缓存击穿&#xff08;Cache breakdown&#xff09; 分…

CF每日5题(1300-1500)

最近急速补练蓝桥杯中&#xff0c;疏于cf练习。 感觉自己过题还是太慢了。 今日水题&#xff0c;我水水水水。 1- 1979C lcm 水 1400 第 i i i局赢了&#xff0c;1个硬币顶 k [ i ] k[i] k[i]个贡献&#xff0c;所以每局分硬币 x i 1 k [ i ] x_i{1\over k[i]} xi​k[i]1​个…