RabbitMQ不公平分发问题分析及问题解决

1.不公平分发

1.1 不公平分发策略是什么?

在 RabbitMQ 中,不公平分发(Unfair Dispatch)是指当多个消费者(Consumers)同时订阅同一个队列(Queue)时,消息的分发机制是不公平的,可能导致负载不均衡等问题。

1.2 不公平分发产生的原因?

默认情况下,RabbitMQ 采用的是轮询(Round Robin)的方式将消息(工作线程)平均分发给各个消费者,也就是理论上每一个消费者消费的消息数量都是一样的。但是实际上,可能由于消费者处理消息的速度不同(可能由于网络因素、配置因素等不同),可能就会导致有些消费者长时间处于空闲状态,有些消费者消息处理不过来,导致消息积压,导致负载不均衡等情况,会严重影响到整个系统的性能。

1.3 怎么解决?

1.3.1 公平分发

公平分发:在公平预取模式下,每个消费者一次只能从队列中预先获取一条消息。当消费者处理完当前的消息并确认后,RabbitMQ 才会将下一条消息发送给该消费者,没有处理完消息的消费者将继续处理消息。这样可以确保每个消费者都能够公平地接收和处理消息,避免某个消费者长时间占用资源导致负载不均衡。

代码演示公平分发策略,在消费者中消费消息之前,设置参数 channel.basicQos(1);

public class Work05 {//队列名称public static final String TASK_QUEUE_NAME = "ACK_QUEUE";//接受消息public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();System.out.println("C1等待接受消息处理时间较短");DeliverCallback deliverCallback = (consumerTag, message) -> {//沉睡1SSleepUtils.sleep(1);System.out.println("接受到的消息:" + new String(message.getBody(), "UTF-8"));//手动应答/*** 1.消息的标记Tag* 2.是否批量应答 false表示不批量应答信道中的消息*/channel.basicAck(message.getEnvelope().getDeliveryTag(), false);};CancelCallback cancelCallback = (consumerTag -> {System.out.println(consumerTag + "消费者取消消费接口回调逻辑");});//设置不公平分发int prefetchCount = 1;channel.basicQos(prefetchCount);//采用手动应答boolean autoAck = false;channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback);}
}

开启成功,会看到如下结果:

image-20230711004411432

演示结果如下

image-20230711005222486

C2消费者处理消息时间较长,在处理消息完成之前,不会收到新的消息。C1消费者处理消息效率较高,因为处理完一条消息后将继续处理收到的新消息。

1.3.2 预取值分发

预取值分发策略:带权的消息分发,公平分发策略虽然解决了负载不均衡的问题,但是在高并发场景下会存在性能问题。

试想一下,每个消费者消息消息时都会从一个缓存区里面读取消息,这个缓存区有大小限制,既解决了消息积压的问题,又优化了消费者处理完消息之后才向队列中读取一条数据这种损耗性能的操作。

因此可以通过一个未确认的消息缓冲区,开发人员能限制此缓冲区的大小以避免缓冲区里面无限制的未确认消息问题。这个时候也是可以通过使用 basic.qos 方法设置「预取计数」值来完成的。

该值定义通道上允许的未确认消息的最大数量。一旦数量达到配置的数量, RabbitMQ 将停止在通道上传递更多消息,除非至少有一个未处理的消息被确认,例如,假设在通道上有未确认的消息 5、6、7,8,并且通道的预取计数设置为 4,此时 RabbitMQ 将不会在该通道上再传递任何消息,除非至少有一个未应答的消息被 ack。比方说 tag=6 这个消息刚刚被确认 ACK,RabbitMQ 将会感知这个情况到并再发送一条消息。消息应答和 QoS 预取值对用户吞吐量有重大影响。

通常,增加预取将提高向消费者传递消息的速度。虽然自动应答传输消息速率是最佳的,但是,在这种情况下已传递但尚未处理的消息的数量也会增加,从而增加了消费者的 RAM 消耗(随机存取存储器)应该小心使用具有无限预处理的自动确认模式或手动确认模式,消费者消费了大量的消息如果没有确认的话,会导致消费者连接节点的内存消耗变大,所以找到合适的预取值是一个反复试验的过程,不同的负载该值取值也不同 100 到 300 范围内的值通常可提供最佳的吞吐量,并且不会给消费者带来太大的风险。

预取值为 1 是最保守的。当然这将使吞吐量变得很低,特别是消费者连接延迟很严重的情况下,特别是在消费者连接等待时间较长的环境 中。对于大多数应用来说,稍微高一点的值将是最佳的。

public class Work03 {//队列名称public static final String TASK_QUEUE_NAME = "ACK_QUEUE";//接受消息public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();System.out.println("C1等待接受消息处理时间较短");DeliverCallback deliverCallback =(consumerTag,message) ->{//沉睡1SSleepUtils.sleep(1);System.out.println("接受到的消息:"+new String(message.getBody(),"UTF-8"));//手动应答/*** 1.消息的标记Tag* 2.是否批量应答 false表示不批量应答信道中的消息*/channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};CancelCallback cancelCallback = (consumerTag -> {System.out.println(consumerTag + "消费者取消消费接口回调逻辑");});//设置不公平分发//int prefetchCount = 1;//值不等于 1,则代表预取值,预取值为4int prefetchCount = 4;channel.basicQos(prefetchCount);//采用手动应答boolean autoAck = false;channel.basicConsume(TASK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);}
}

注意:不公平分发和预取值分发都用到 basic.qos 方法,如果取值为 1,代表不公平分发,取值不为1,代表预取值分发。

image-20231217154934867

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/234916.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

I.MX6ULL_Linux_驱动篇(48)linux I2C驱动

I2C 是很常用的一个串行通信接口,用于连接各种外设、传感器等器件。本章我们来学习一下如何在 Linux 下开发 I2C 接口器件驱动,重点是学习 Linux 下的 I2C 驱动框架,按照指定的框架去编写 I2C 设备驱动。本章同样以 I.MX6U-ALPHA 开发板上的 …

逻辑判断 | 判断推理

文章目录 翻译推理前推后(A->B)后推前(B->A)且和或 翻译推理 前推后(A->B) 关键词 如果 … 那么 …若 … 则 …只要 … 就 …为了 … 一定(必须)…所有 … , … 都… 是 … 的充分条件 后推前(B->A) 关键词 只有 … 才 …除非 …否则不…

Java数据结构-模拟ArrayList集合思想,手写底层源码(1),底层数据结构是数组,编写add添加方法,正序打印和倒叙打印

package com.atguigu.structure; public class Demo02_arrayList {public static void main(String[] args) {MyGenericArrayListV1 arrayListV1 new MyGenericArrayListV1();//arr.add(element:100,index:1);下标越界,无法插入//初始化(第一次添加&…

Java 栈和队列的交互实现

文章目录 队列和栈的区别一.用队列模拟实现栈1.1入栈1.2出栈1.3返回栈顶元素1.4判断栈是否为空 二.用栈模拟实现队列2.1 入队2.2出队2.3peek2.4判断队列是否为空 三.完整代码3.1 队列模拟实现栈3.2栈模拟实现队列 队列和栈的区别 栈和队列都是常用的数据结构,它们的…

令人惊叹的代码技巧

在编程世界中,有一些令人惊叹的代码技巧和巧妙的实现方式。以下是一些我见过的令人印象深刻的代码技巧: 函数式编程魔法: 使用函数式编程的一些特性,比如高阶函数、匿名函数和Lambda表达式,可以使代码更为简洁、易读。…

飞天使-k8s知识点1-kubernetes架构简述

文章目录 名词功能要点 k8s核心要素CNCF 云原生框架简介k8s组建介绍 名词 CI 持续集成, 自动化构建和测试:通过使用自动化构建工具和自动化测试套件,持续集成可以帮助开发人员自动构建和测试他们的代码。这样可以快速检测到潜在的问题,并及早…

揭秘Spark学习框架网站:让你轻松掌握大数据处理神器!

介绍:Apache Spark是一个开源的大数据处理框架,它致力于实现高速、易用和复杂分析。Spark最初由加州大学伯克利分校的AMPLab于2009年开始开发,并于2010年成为Apache的开源项目之一。由于其出色的性能表现与丰富的功能特性,Spark已…

解决:Network ErrorAxiosError: Network Error at XMLHttpRequest.handleError

问题:Uncaught runtime errors:ERROR Network Error AxiosError: Network Error at XMLHttpRequest.handleError (webpack-internal:///./node_modules/axios/lib/adapters/xhr.js:160:14) 解决:在Controller类上加上CrossOrigin注解 /**解…

【计算机四级(网络工程师)笔记】操作系统概论

目录 一、OS的概念 1.1OS的定义 1.2OS的特征 1.2.1并发性 1.2.2共享性 1.2.3随机性 1.3研究OS的观点 1.3.1软件的观点 1.3.2资源管理器的观点 1.3.3进程的观点 1.3.4虚拟机的观点 1.3.5服务提供者的观点 二、OS的分类 2.1批处理操作系统 2.2分时操作系统 2.3实时操作系统 2.4嵌…

SpringBoot之IOCDI的详细解析

3.3.2 IOC详解 通过IOC和DI的入门程序呢,我们已经基本了解了IOC和DI的基础操作。接下来呢,我们学习下IOC控制反转和DI依赖注入的细节。 3.3.2.1 bean的声明 前面我们提到IOC控制反转,就是将对象的控制权交给Spring的IOC容器,由…

c语言编写http服务器(Linux下运行)

参考文章&#xff1a;https://blog.csdn.net/baixingyubxy/article/details/125964986?spm1001.2014.3001.5506 上面是详细讲解&#xff0c;我这篇是总结了他的代码&#xff0c;因为他没给整体代码 所有代码&#xff1a; #include <stdio.h> #include <stdlib.h&g…

爬虫入门--爬取电影TOP250-附源码解析

爬取电影TOP250 1 知识小课堂1.1 什么是爬虫1.2 爬虫能做什么 2 代码解析2.1 运行环境2.2 过程解析2.2.1 第一步&#xff1a;引入两个模块2.2.2 找到网址2.2.3 拉去页面全内容 2.2.42.3 完整代码 1 知识小课堂 1.1 什么是爬虫 爬虫&#xff0c;也叫网络蜘蛛&#xff0c;如果把…

02 ModBus TCP

目录 一、ModBus TCP 一帧数据格式 二、0x01 读线圈状态 三、0x03读保持寄存器 四、0x05写单个线圈 五、0x06 写单个寄存器 六、0x0f写多个线圈 七、0x10&#xff1a;写多个保持寄存器 八、通信过程 九、不同modbus通信模式的应用场景 一、ModBus TCP 一帧数据格式 其…

图神经网络 (GNN) 概述

GNN 作者 with DALLE 3 一、说明 神经网络是受人脑工作启发的计算模型&#xff0c;能够从复杂的非结构化数据&#xff08;如图像、文本、音频和视频&#xff09;中学习。然而&#xff0c;还有许多其他类型的数据无法用传统的神经网络轻松表示&#xff0c;例如那些具有图形结构的…

Vue2面试题:说一下对跨域的理解?

http请求分为两大类&#xff1a;普通http请求&#xff08;如百度请求&#xff09;和ajax请求&#xff08;跨域是出现在ajax请求&#xff09; 同源策略&#xff1a;在浏览器发起ajax请求时&#xff0c;当前的网址和被请求的网址协议、域名、端口号必须完全一致&#xff0c;目的是…

windows平台配置vsCode_CMake_Clang/LLVM_ninja环境与测试

cmake配置 官网下载并安装 设置系统环境变量 cmake --versionvsCode插件 Clang/LLVM配置 vsCode clangd插件&#xff0c;提示安装lsp 官网下载LLVM 安装包安装&#xff0c;设置系统环境变量 clang --versionninja配置 官网下载 解压&#xff0c;设置系统环境变量 ninja -…

基于YOLOv8的结核病预测系统设计与实现

一、项目背景 本系统的目的是通过痰液图像来检测出结核杆菌的携带者&#xff0c;及时采取治疗措施&#xff0c;在病情早期对其进行相关治疗减少结核病的传播。程序使用的样本是经过染色处理可以使得结核杆菌在显微镜拍摄的医学图像&#xff0c;通过检测医学图像中的结核杆菌诊…

操作系统系列:关于终端Shell

操作系统系列&#xff1a;关于终端 Shell在Win32上创建一个新进程重定向输入和输出 Shell Unix命令处理器或者Shell都是进程&#xff0c;它获取用户键入的命令&#xff0c;fork出一个进程&#xff0c;子进程调用exec来执行用户的命令&#xff0c;父进程等待子进程执行结束。 这…

常用的金融小知识的简单理解

m1和m2 剪刀差扩大 说明经济向好&#xff08;牛市&#xff09;&#xff0c;否则m1变为m2 剪刀差收窄&#xff0c;经济回落 社融-社会融资规模 社融数据增长意味着人们看好未来经济,敢于贷款,赚钱效应增加 cpi-居民消费价格指数 cpi上升意味着通过膨胀 ppi-生产者价格指数…

Android Canvas状态save与restore,Kotlin

Android Canvas状态save与restore&#xff0c;Kotlin private fun f1() {val bitmap BitmapFactory.decodeResource(resources, R.mipmap.pic).copy(Bitmap.Config.ARGB_8888, true)val canvas Canvas(bitmap)val paint Paint(Paint.ANTI_ALIAS_FLAG)paint.color Color.RED…