RabbitMQ不公平分发与预取值

1.分发简介

RabbitMQ不设置的话默认采用轮询方式分发消息,你一个我一个(公平);但实际生活中,由于处理速度不同,若还采用轮询方式分发会导致处理速度快的空等待,因此我们采用不公平分发

2.不公平分发

消费者这侧设置即可,以之前的Worker3和Worker4为例

2.1.Worker3

package com.hong.rabbitmq3;import com.hong.utils.RabbitMQUtil;
import com.hong.utils.SleepUtil;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;/*** @Description: 消息手动应答时不丢失,放回队列重新消费* @Author: hong* @Date: 2023-12-16 23:05* @Version: 1.0**/
public class Worker3 {private static final String TASK_QUEUE_NAME = "ack_queue";public static void main(String[] args) throws Exception{Channel channel = RabbitMQUtil.getChannel();System.out.println("worker3等待接收消息,处理速度快");DeliverCallback deliverCallback = (comsumerTag, message) -> {SleepUtil.sleep(1);System.out.println("接收到的消息:"+  new String(message.getBody(),"UTF-8"));//手动应答/*** 第一个参数:消息标识* 第二个参数是否批量:true批量*/channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};CancelCallback cancelCallback = var -> System.out.println(var + "消息消费被中断!");/** 不公平分发* 不设置或设置0 公平分发(轮询分发,RabbitMQ默认消息分发方式)* 1  不公平分发*/channel.basicQos(1);//手动应答falsechannel.basicConsume(TASK_QUEUE_NAME,false,deliverCallback,cancelCallback);}
}

2.2.Worker4

package com.hong.rabbitmq3;import com.hong.utils.RabbitMQUtil;
import com.hong.utils.SleepUtil;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;/*** @Description: 消息手动应答时不丢失, 放回队列重新消费* @Author: hong* @Date: 2023-12-16 23:05* @Version: 1.0**/
public class Worker4 {private static final String TASK_QUEUE_NAME = "ack_queue";public static void main(String[] args) throws Exception{Channel channel = RabbitMQUtil.getChannel();System.out.println("worker4等待接收消息,处理速度慢");DeliverCallback deliverCallback = (comsumerTag, message) -> {SleepUtil.sleep(20);System.out.println("接收到的消息:"+  new String(message.getBody(),"UTF-8"));//手动应答/*** 第一个参数:消息标识* 第二个参数是否批量:true批量*/channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};CancelCallback cancelCallback = var -> System.out.println(var + "消息消费被中断!");/** 不公平分发* 不设置或设置0 公平分发(轮询分发,RabbitMQ默认消息分发方式)* 1  不公平分发*/channel.basicQos(1);channel.basicConsume(TASK_QUEUE_NAME,false,deliverCallback,cancelCallback);}
}

在这里插入图片描述

3.结果

启动Task3,Worker3,Worker4发现处理速度快的Worker3在Worker4还没处理完第一条消息时已处理了多条消息(能者多劳/强者多劳)
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

4.预取值

不公平分发不管处理速度如何都是将消息分发给相对空闲的消费者,而预取值可以认为是未确认的消息缓冲区,该值时通道上允许未确认消息的最大值。一旦达到此值RabbitMQ在该通道上传递消息,除非至少有一个未应答的消息被ack.
还是只在消费者这侧修改,以之前的Worker3和Worker4为例

4.1.Worker3

Worker3处理速度快,设置预取值为5

package com.hong.rabbitmq4;import com.hong.utils.RabbitMQUtil;
import com.hong.utils.SleepUtil;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;/*** @Description: 预取值* @Author: hong* @Date: 2023-12-18 23:05* @Version: 1.0**/
public class Worker3 {private static final String TASK_QUEUE_NAME = "prefetch_queue";public static void main(String[] args) throws Exception{Channel channel = RabbitMQUtil.getChannel();System.out.println("worker3等待接收消息,处理速度快");DeliverCallback deliverCallback = (comsumerTag, message) -> {SleepUtil.sleep(1);System.out.println("接收到的消息:"+  new String(message.getBody(),"UTF-8"));//手动应答/*** 第一个参数:消息标识* 第二个参数是否批量:true批量*/channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};CancelCallback cancelCallback = var -> System.out.println(var + "消息消费被中断!");/** 不公平分发* 不设置或设置0 公平分发(轮询分发,RabbitMQ默认消息分发方式)* 1  不公平分发* 5*/channel.basicQos(5);//手动应答falsechannel.basicConsume(TASK_QUEUE_NAME,false,deliverCallback,cancelCallback);}
}

4.2.Worker4

Worker4处理速度慢,设置预取值为2

package com.hong.rabbitmq4;import com.hong.utils.RabbitMQUtil;
import com.hong.utils.SleepUtil;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;/*** @Description: 预取值* @Author: hong* @Date: 2023-12-18 23:05* @Version: 1.0**/
public class Worker4 {private static final String TASK_QUEUE_NAME = "prefetch_queue";public static void main(String[] args) throws Exception{Channel channel = RabbitMQUtil.getChannel();System.out.println("worker4等待接收消息,处理速度慢");DeliverCallback deliverCallback = (comsumerTag, message) -> {SleepUtil.sleep(20);System.out.println("接收到的消息:"+  new String(message.getBody(),"UTF-8"));//手动应答/*** 第一个参数:消息标识* 第二个参数是否批量:true批量*/channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};CancelCallback cancelCallback = var -> System.out.println(var + "消息消费被中断!");/** 不公平分发* 不设置或设置0 公平分发(轮询分发,RabbitMQ默认消息分发方式)* 1  不公平分发*/channel.basicQos(2);channel.basicConsume(TASK_QUEUE_NAME,false,deliverCallback,cancelCallback);}
}

5.预取值结果

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
预取值也是一种不公平分发,不公平总是将消息转给相对空闲的消费者,预取值是提前设置好的每个消费者处理的数量,有点类似权重。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/241058.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vue常用的指令

1、v-if 它其实取值就是true或者false,它是控制元素是否需要被渲染。 2、v-else-if 因为除了v-if这个条件可能还有别的条件。 3、v-else 它除了不满足v-if和v-else-if之后的东西。 4、v-show 它取值也是true或者false,其实跟样式里面display属性是…

Vue3中使用props和emits详解

前言 在Vue3中,父子组件之间的数据传递是一个常见的需求。本文将介绍如何在Vue3中传递对象,并且在子组件中访问和修改父组件对象中的属性值,以及子组件如何调用父组件中的方法。 在 Vue 3 中,父子组件之间传值有以下作用&#xf…

微信小程序开发学习(上强度):从0开始写项目

前置知识 1、配置插件 微信小程序 基础模板引入sass的两种方法_微信小程序使用sass-CSDN博客 之后在对应页面里新建一个scss文件,写css 2、注册小程序,有个自己的appid,不用测试号了 5.1.注册小程序账号获取appid及个人和企业版差异_哔哩…

Flutter 三: Dart

1 数据类型 数字(number) int double 字符串转换成 num int.parse(“1”) double.parse(“1”);double 四舍五入保留两位小数 toStringAsFixed(2) 返回值为stringdouble 直接舍弃小数点后几位的数据 可使用字符串截取的方式 字符串(string) 单引号 双引号 三引号三引号 可以输…

positivessl多域名ev证书

PositiveSSL是Sectigo的子品牌,拥有类型丰富的SSL数字证书,旗下的SSL证书产品为众多企业和个人开发者营造了安全的网络环境,对网站信息进行加密服务,支持兼容所有现代浏览器和移动设备。今天就随SSL盾小编了解PositiveSSL旗下的多…

C++面向对象(OOP)编程-STL详解(vector)

本文主要介绍STL六大组件,并主要介绍一些容器的使用。 目录 1 泛型编程 2 CSTL 3 STL 六大组件 4 容器 4.1 顺序性容器 4.1.1 顺序性容器的使用场景 4.2 关联式容器 4.2.1 关联式容器的使用场景 4.3 容器适配器 4.3.1 容器适配器的使用场景 5 具体容器的…

生物信息学R分析工具包ggkegg的详细使用方法

ggkegg介绍 ggkegg 是一个用于生物信息学研究的工具,可以用于分析和解释基因组学数据,并将其与已知的KEGG数据库进行比较。ggkegg 是从 KEGG 获取信息并使用 ggplot2 和 ggraph 进行解析、分析和可视化的工具包,结合其他使用 KEGG 进行生物功…

数据流图_DFD图_精简易上手

数据流图(DFD)是一种图形化技术,它描绘信息流和数据从输人移动到输出的过程中所经受的变换。 首先给出一个数据流图样例 基本的四种图形 直角矩形:代表源点或终点,一般来说,是人,如例图的仓库管理员和采购员圆形(也可以画成圆角矩形):是处理,一般来说,是动作,是动词名词的形式…

PromptNER: Prompt Locating and Typing for Named Entity Recognition

原文链接: https://aclanthology.org/2023.acl-long.698.pdf ACL 2023 介绍 问题 目前将prompt方法应用在ner中主要有两种方法:对枚举的span类型进行预测,或者通过构建特殊的prompt来对实体进行定位。但作者认为这些方法存在以下问题&#xf…

mySQL数据库用户管理

目录 1.创建外键约束 外键的定义 主键表和外键表的理解 具体操作 2.数据库用户管理 新建用户 查看用户信息 重命名用户 删除用户 修改当前和其他用户登录密码 忘记 root密码的解决办法 3.数据库用户授权 授予权限 查看权限 撤销权限 1.创建外键约束 外键的定义…

【视觉实践】使用Mediapipe进行目标检测:杯子检测和椅子检测实践

目录 1 Mediapipe 2 Solutions 3 安装mediapipe 4 实践 1 Mediapipe Mediapipe是google的一个开源项目,可以提供开源的、跨平台的常用机器学习(machine learning,ML)方案。MediaPipe是一个用于构建机器学习管道</

计算机毕业设计 基于SpringBoot的房屋租赁管理系统的设计与实现 Java实战项目 附源码+文档+视频讲解

博主介绍&#xff1a;✌从事软件开发10年之余&#xff0c;专注于Java技术领域、Python人工智能及数据挖掘、小程序项目开发和Android项目开发等。CSDN、掘金、华为云、InfoQ、阿里云等平台优质作者✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精…

JS中页面跳转的几种方法

我们在html中学的页面跳转最常用的就是a标签了把&#xff01; a标签跳转 直接跳转页面&#xff1a;比较适用于页面中固定的地址 <a href"页面路径">跳转</a> js中window对象location进行跳转 是指覆盖当前页面进行跳转 window。loaction。href&quo…

.Net 访问电子邮箱-LumiSoft.Net,好用

序言&#xff1a; 网上找了很多关于.Net如何访问电子邮箱的方法&#xff0c;但是大多数都达不到想要的需求&#xff0c;只有一些 收发邮件。因此 花了很大功夫去看 LumiSoft.Net.dll 的源码&#xff0c;总算做出自己想要的结果了&#xff0c;果然学习诗人进步。 介绍&#xff…

华为OD机试 - 灰度图存储(Java JS Python C)

题目描述 黑白图像常采用灰度图的方式存储,即图像的每个像素填充一个灰色阶段值,256阶灰图是一个灰阶值取值范围为 0~255 的灰阶矩阵,0表示全黑,255表示全白,范围内的其他值表示不同的灰度。 但在计算机中实际存储时,会使用压缩算法,其中一个种压缩格式描述如如下: 1…

ceph集群搭建详细教程(ceph-deploy)

ceph-deploy比较适合生产环境&#xff0c;不是用cephadm搭建。相对麻烦一些&#xff0c;但是并不难&#xff0c;细节把握好就行&#xff0c;只是命令多一些而已。 实验环境 服务器主机public网段IP&#xff08;对外服务&#xff09;cluster网段IP&#xff08;集群通信&#x…

C语言中的关键字

Static 静态局部变量 结果&#xff1a; a作为静态局部变量&#xff0c;第一次进入该函数的时候&#xff0c;进行第一次变量的初始化&#xff0c;在程序整个运行期间都不释放。&#xff08;因为下一次调用还继续使用上次调用结束的数值&#xff09; 但是其作用域为局部作用域&…

大语言模型的三种主要架构 Decoder-Only、Encoder-Only、Encoder-Decoder

现代大型语言模型&#xff08;LLM&#xff09;的演变进化树&#xff0c;如下图&#xff1a; https://arxiv.org/pdf/2304.13712.pdf 基于 Transformer 模型以非灰色显示&#xff1a; decoder-only 模型在蓝色分支&#xff0c; encoder-only 模型在粉色分支&#xff0c; encod…

数据权限篇

文章目录 1. 如何实现数据权限&#xff08;内核&#xff09;1.1 原理1.2 源码实现&#xff0c;mybatis如何重写sql1.2.1 重写sql1.2.2 解析sql1.2.3 DataPermissionDatabaseInterceptor 1. 如何实现数据权限&#xff08;内核&#xff09; 1.1 原理 面对复杂多变的需求&#xf…

LeetCode-23 合并 K 个升序链表

LeetCode-23 合并 K 个升序链表 给你一个链表数组&#xff0c;每个链表都已经按升序排列。 请你将所有链表合并到一个升序链表中&#xff0c;返回合并后的链表。 示例 1&#xff1a; 输入&#xff1a;lists [[1,4,5],[1,3,4],[2,6]] 输出&#xff1a;[1,1,2,3,4,4,5,6] 解…