rabbitmq 不同的消费者消费同一个队列_RabbitMQ 消费端限流、TTL、死信队列

消费端限流

1. 为什么要对消费端限流

假设一个场景,首先,我们 Rabbitmq 服务器积压了有上万条未处理的消息,我们随便打开一个消费者客户端,会出现这样情况: 巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这么多数据!

当数据量特别大的时候,我们对生产端限流肯定是不科学的,因为有时候并发量就是特别大,有时候并发量又特别少,我们无法约束生产端,这是用户的行为。所以我们应该对消费端限流,用于保持消费端的稳定,当消息数量激增的时候很有可能造成资源耗尽,以及影响服务的性能,导致系统的卡顿甚至直接崩溃。

2.限流的 api 讲解

RabbitMQ 提供了一种 qos (服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于 consume 或者 channel 设置 Qos 的值)未被确认前,不进行消费新的消息。

/*** Request specific "quality of service" settings.* These settings impose limits on the amount of data the server* will deliver to consumers before requiring acknowledgements.* Thus they provide a means of consumer-initiated flow control.* @param prefetchSize maximum amount of content (measured in* octets) that the server will deliver, 0 if unlimited* @param prefetchCount maximum number of messages that the server* will deliver, 0 if unlimited* @param global true if the settings should be applied to the* entire channel rather than each consumer* @throws java.io.IOException if an error is encountered*/void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
  • prefetchSize:0,单条消息大小限制,0代表不限制
  • prefetchCount:一次性消费的消息数量。会告诉 RabbitMQ 不要同时给一个消费者推送多于 N 个消息,即一旦有 N 个消息还没有 ack,则该 consumer 将 block 掉,直到有消息 ack。
  • global:true、false 是否将上面设置应用于 channel,简单点说,就是上面限制是 channel 级别的还是 consumer 级别。当我们设置为 false 的时候生效,设置为 true 的时候没有了限流功能,因为 channel 级别尚未实现。
  • 注意:prefetchSize 和 global 这两项,rabbitmq 没有实现,暂且不研究。特别注意一点,prefetchCount 在 no_ask=false 的情况下才生效,即在自动应答的情况下这两个值是不生效的。

3.如何对消费端进行限流

  • 首先第一步,我们既然要使用消费端限流,我们需要关闭自动 ack,将 autoAck 设置为 falsechannel.basicConsume(queueName, false, consumer);
  • 第二步我们来设置具体的限流大小以及数量。channel.basicQos(0, 15, false);
  • 第三步在消费者的 handleDelivery 消费方法中手动 ack,并且设置批量处理 ack 回应为 truechannel.basicAck(envelope.getDeliveryTag(), true);

这是生产端代码,与前几章的生产端代码没有做任何改变,主要的操作集中在消费端。

import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;public class QosProducer { public static void main(String[] args) throws Exception { //1. 创建一个 ConnectionFactory 并进行设置 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); //2. 通过连接工厂来创建连接 Connection connection = factory.newConnection(); //3. 通过 Connection 来创建 Channel Channel channel = connection.createChannel(); //4. 声明 String exchangeName = "test_qos_exchange"; String routingKey = "item.add"; //5. 发送 String msg = "this is qos msg"; for (int i = 0; i < 10; i++) { String tem = msg + " : " + i; channel.basicPublish(exchangeName, routingKey, null, tem.getBytes()); System.out.println("Send message : " + tem); } //6. 关闭连接 channel.close(); connection.close(); }}

这里我们创建了两个消费者,以方便验证限流api中的 global 参数设置为 true 时不起作用.。整体结构如下图所示,两个 Consumer 都绑定在同一个队列上,这样的话两个消费者将共同消费发送的10条消息。

4dc3878d43629026932a8e33f192e33c.png
import com.rabbitmq.client.*;import java.io.IOException;public class QosConsumer { public static void main(String[] args) throws Exception { //1. 创建一个 ConnectionFactory 并进行设置 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); factory.setAutomaticRecoveryEnabled(true); factory.setNetworkRecoveryInterval(3000); //2. 通过连接工厂来创建连接 Connection connection = factory.newConnection(); //3. 通过 Connection 来创建 Channel final Channel channel = connection.createChannel(); //4. 声明 String exchangeName = "test_qos_exchange"; String queueName = "test_qos_queue"; String queueName1 = "test_qos_queue_1"; String routingKey = "item.#"; channel.exchangeDeclare(exchangeName, "topic

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/388887.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

动量策略 python_在Python中使用动量通道进行交易

动量策略 pythonMost traders use Bollinger Bands. However, price is not normally distributed. That’s why only 42% of prices will close within one standard deviation. Please go ahead and read this article. However, I have some good news.大多数交易者使用布林…

css3 变换、过渡效果、动画

1 CSS3 选择器 1.1 基本选择器 1.2 层级 空格 > .itemli ~ .item~p 1.3 属性选择器 [attr] [attrvalue] [attr^value] [attr$value] [attr*value] [][][] 1.4 伪类选择器 :link :visited :hover :active :focus :first-child .list li:first-child :last-chi…

webservice 启用代理服务器

您会发现你写完了一个webservice在调用的时候发现怎也没办法调用&#xff0c;一个简单的webservice怎么不能使用&#xff0c;一肚子的怨恨&#xff0c;哈哈您可能没有为webservice设置代理。 下面就给您写个调用的用例和大家分享下。其实很简单&#xff0c;但是你没有想到的时…

mysql常用的存储引擎_Mysql存储引擎

什么是存储引擎&#xff1f;关系数据库表是用于存储和组织信息的数据结构&#xff0c;可以将表理解为由行和列组成的表格&#xff0c;类似于Excel的电子表格的形式。有的表简单&#xff0c;有的表复杂&#xff0c;有的表根本不用来存储任何长期的数据&#xff0c;有的表读取时非…

android studio设计模式和文本模式切换

转载于:https://www.cnblogs.com/judes/p/9437104.html

高斯模糊为什么叫高斯滤波_为什么高斯是所有发行之王?

高斯模糊为什么叫高斯滤波高斯分布及其主要特征&#xff1a; (Gaussian Distribution and its key characteristics:) Gaussian distribution is a continuous probability distribution with symmetrical sides around its center. 高斯分布是连续概率分布&#xff0c;其中心周…

C# webbrowser 代理

百度&#xff0c;google加自己理解后&#xff0c;将所得方法总结一下&#xff1a; 方法1&#xff1a;修改注册表Software//Microsoft//Windows//CurrentVersion//Internet Settings下 ProxyEnable和ProxyServer。这种方法适用于局域网用户&#xff0c;拨号用户无效。 1p…

C MySQL读写分离连接串_Mysql读写分离

一 什么是读写分离MySQL Proxy最强大的一项功能是实现“读写分离(Read/Write Splitting)”。基本的原理是让主数据库处理事务性查询&#xff0c;而从数据库处理SELECT查询。数据库复制被用来把事务性查询导致的变更同步到集群中的从数据库。当然&#xff0c;主服务器也可以提供…

golang 编写的在线redis 内存分析工具 rma4go

redis 内存分析工具 rma4go redis是一个很有名的内存型数据库&#xff0c;这里不做详细介绍。而rma4go (redis memory analyzer for golang) 是一个redis的内存分析工具&#xff0c;这个工具的主要作用是针对运行时期的redis进行内存的分析&#xff0c;统计redis中key的分布情…

从Jupyter Notebook到脚本

16 Aug: My second article: From Scripts To Prediction API8月16日&#xff1a;我的第二篇文章&#xff1a; 从脚本到预测API As advanced beginners, we know quite a lot: EDA, ML concepts, model architectures etc…… We can write a big Jupyter Notebook, click “Re…

【EasyNetQ】- 使用Future Publish调度事件

许多业务流程要求在将来某个日期安排事件。例如&#xff0c;在与客户进行初次销售联系后&#xff0c;我们可能希望在将来的某个时间安排跟进电话。EasyNetQ可以通过其Future Publish功能帮助您实现此功能。例如&#xff0c;这里我们使用FuturePublish扩展方法来安排未来一个月的…

Java这些多线程基础知识你会吗?

0、并发和并行、进程核线程、多进程和多线程的区别&#xff1a; &#xff08;这里的时间和时刻上的概念同物理上的一样&#xff09; 并发&#xff1a;在一段时间内多个任务同时执行&#xff0c;或者说是在一段很短的时间内可以执行多条程序指令&#xff0c;微观上看起来好像是可…

MySQL set names 命令_mysql set names 命令和 mysql 字符编码问题

先看下面的执行结果&#xff1a;(rootlocalhost)[(none)]mysql>show variables like character%;---------------------------------------------------------------------------------------| Variable_name | Value |---------------------------------------------------…

设置Proxy Server和SQL Server实现数据库安全

首先&#xff0c;我们需要了解一下SQL Server在WinSock上定义协议的步骤&#xff1a; 1. 在”启动”菜单上&#xff0c;指向”程序/Microsoft Proxy Server”&#xff0c;然后点击”Microsoft Management Console”。 2. 展开”Internet Information Service”,再展开运行Proxy…

Python django解决跨域请求的问题

解决方案 1.安装django-cors-headers pip3 install django-cors-headers 2.配置settings.py文件 INSTALLED_APPS [...corsheaders&#xff0c;...] MIDDLEWARE_CLASSES (...corsheaders.middleware.CorsMiddleware,django.middleware.common.CommonMiddleware, # 注意顺序...…

加勒比海兔_加勒比海海洋物种趋势

加勒比海兔Ok, here’s a million dollar question: is the Caribbean really dying? Or, more specifically, are marine species found on Caribbean reefs becoming less abundant?好吧&#xff0c;这是一个百万美元的问题&#xff1a;加勒比海真的死了吗&#xff1f; 或者…

mysql 查出相差年数_MySQL计算两个日期相差的天数、月数、年数

MySQL自带的日期函数TIMESTAMPDIFF计算两个日期相差的秒数、分钟数、小时数、天数、周数、季度数、月数、年数&#xff0c;当前日期增加或者减少一天、一周等等。SELECT TIMESTAMPDIFF(类型,开始时间,结束时间)相差的秒数&#xff1a;SELECT TIMESTAMPDIFF(SECOND,1993-03-23 0…

tornado 简易教程

引言 回想Django的部署方式 以Django为代表的python web应用部署时采用wsgi协议与服务器对接&#xff08;被服务器托管&#xff09;&#xff0c;而这类服务器通常都是基于多线程的&#xff0c;也就是说每一个网络请求服务器都会有一个对应的线程来用web应用&#xff08;如Djang…

如果你的电脑是通过代理上网的.就要用端口映射

由于公网IP地址有限&#xff0c;不少ISP都采用多个内网用户通过代理和网关路由共用一个公网IP上INTERNET的方法&#xff0c; 这样就限制了这些用户在自己计算机上架设个人网站&#xff0c;要实现在这些用户端架设网站&#xff0c;最关键的一点是&#xff0c; 怎样把多用户的内网…

人口密度可视化_使用GeoPandas可视化菲律宾的人口密度

人口密度可视化GeoVisualization /菲律宾。 (GeoVisualization /Philippines.) Population density is a crucial concept in urban planning. Theories on how it affects economic growth are divided. Some claim, as Rappaport does, that an economy is a form of “spati…