rabbitmq 不同的消费者消费同一个队列_RabbitMQ 消费端限流、TTL、死信队列

消费端限流

1. 为什么要对消费端限流

假设一个场景,首先,我们 Rabbitmq 服务器积压了有上万条未处理的消息,我们随便打开一个消费者客户端,会出现这样情况: 巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这么多数据!

当数据量特别大的时候,我们对生产端限流肯定是不科学的,因为有时候并发量就是特别大,有时候并发量又特别少,我们无法约束生产端,这是用户的行为。所以我们应该对消费端限流,用于保持消费端的稳定,当消息数量激增的时候很有可能造成资源耗尽,以及影响服务的性能,导致系统的卡顿甚至直接崩溃。

2.限流的 api 讲解

RabbitMQ 提供了一种 qos (服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于 consume 或者 channel 设置 Qos 的值)未被确认前,不进行消费新的消息。

/*** Request specific "quality of service" settings.* These settings impose limits on the amount of data the server* will deliver to consumers before requiring acknowledgements.* Thus they provide a means of consumer-initiated flow control.* @param prefetchSize maximum amount of content (measured in* octets) that the server will deliver, 0 if unlimited* @param prefetchCount maximum number of messages that the server* will deliver, 0 if unlimited* @param global true if the settings should be applied to the* entire channel rather than each consumer* @throws java.io.IOException if an error is encountered*/void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
  • prefetchSize:0,单条消息大小限制,0代表不限制
  • prefetchCount:一次性消费的消息数量。会告诉 RabbitMQ 不要同时给一个消费者推送多于 N 个消息,即一旦有 N 个消息还没有 ack,则该 consumer 将 block 掉,直到有消息 ack。
  • global:true、false 是否将上面设置应用于 channel,简单点说,就是上面限制是 channel 级别的还是 consumer 级别。当我们设置为 false 的时候生效,设置为 true 的时候没有了限流功能,因为 channel 级别尚未实现。
  • 注意:prefetchSize 和 global 这两项,rabbitmq 没有实现,暂且不研究。特别注意一点,prefetchCount 在 no_ask=false 的情况下才生效,即在自动应答的情况下这两个值是不生效的。

3.如何对消费端进行限流

  • 首先第一步,我们既然要使用消费端限流,我们需要关闭自动 ack,将 autoAck 设置为 falsechannel.basicConsume(queueName, false, consumer);
  • 第二步我们来设置具体的限流大小以及数量。channel.basicQos(0, 15, false);
  • 第三步在消费者的 handleDelivery 消费方法中手动 ack,并且设置批量处理 ack 回应为 truechannel.basicAck(envelope.getDeliveryTag(), true);

这是生产端代码,与前几章的生产端代码没有做任何改变,主要的操作集中在消费端。

import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;public class QosProducer { public static void main(String[] args) throws Exception { //1. 创建一个 ConnectionFactory 并进行设置 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); //2. 通过连接工厂来创建连接 Connection connection = factory.newConnection(); //3. 通过 Connection 来创建 Channel Channel channel = connection.createChannel(); //4. 声明 String exchangeName = "test_qos_exchange"; String routingKey = "item.add"; //5. 发送 String msg = "this is qos msg"; for (int i = 0; i < 10; i++) { String tem = msg + " : " + i; channel.basicPublish(exchangeName, routingKey, null, tem.getBytes()); System.out.println("Send message : " + tem); } //6. 关闭连接 channel.close(); connection.close(); }}

这里我们创建了两个消费者,以方便验证限流api中的 global 参数设置为 true 时不起作用.。整体结构如下图所示,两个 Consumer 都绑定在同一个队列上,这样的话两个消费者将共同消费发送的10条消息。

4dc3878d43629026932a8e33f192e33c.png
import com.rabbitmq.client.*;import java.io.IOException;public class QosConsumer { public static void main(String[] args) throws Exception { //1. 创建一个 ConnectionFactory 并进行设置 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); factory.setAutomaticRecoveryEnabled(true); factory.setNetworkRecoveryInterval(3000); //2. 通过连接工厂来创建连接 Connection connection = factory.newConnection(); //3. 通过 Connection 来创建 Channel final Channel channel = connection.createChannel(); //4. 声明 String exchangeName = "test_qos_exchange"; String queueName = "test_qos_queue"; String queueName1 = "test_qos_queue_1"; String routingKey = "item.#"; channel.exchangeDeclare(exchangeName, "topic

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/388887.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

动量策略 python_在Python中使用动量通道进行交易

动量策略 pythonMost traders use Bollinger Bands. However, price is not normally distributed. That’s why only 42% of prices will close within one standard deviation. Please go ahead and read this article. However, I have some good news.大多数交易者使用布林…

css3 变换、过渡效果、动画

1 CSS3 选择器 1.1 基本选择器 1.2 层级 空格 > .itemli ~ .item~p 1.3 属性选择器 [attr] [attrvalue] [attr^value] [attr$value] [attr*value] [][][] 1.4 伪类选择器 :link :visited :hover :active :focus :first-child .list li:first-child :last-chi…

mysql常用的存储引擎_Mysql存储引擎

什么是存储引擎&#xff1f;关系数据库表是用于存储和组织信息的数据结构&#xff0c;可以将表理解为由行和列组成的表格&#xff0c;类似于Excel的电子表格的形式。有的表简单&#xff0c;有的表复杂&#xff0c;有的表根本不用来存储任何长期的数据&#xff0c;有的表读取时非…

android studio设计模式和文本模式切换

转载于:https://www.cnblogs.com/judes/p/9437104.html

高斯模糊为什么叫高斯滤波_为什么高斯是所有发行之王?

高斯模糊为什么叫高斯滤波高斯分布及其主要特征&#xff1a; (Gaussian Distribution and its key characteristics:) Gaussian distribution is a continuous probability distribution with symmetrical sides around its center. 高斯分布是连续概率分布&#xff0c;其中心周…

C# webbrowser 代理

百度&#xff0c;google加自己理解后&#xff0c;将所得方法总结一下&#xff1a; 方法1&#xff1a;修改注册表Software//Microsoft//Windows//CurrentVersion//Internet Settings下 ProxyEnable和ProxyServer。这种方法适用于局域网用户&#xff0c;拨号用户无效。 1p…

C MySQL读写分离连接串_Mysql读写分离

一 什么是读写分离MySQL Proxy最强大的一项功能是实现“读写分离(Read/Write Splitting)”。基本的原理是让主数据库处理事务性查询&#xff0c;而从数据库处理SELECT查询。数据库复制被用来把事务性查询导致的变更同步到集群中的从数据库。当然&#xff0c;主服务器也可以提供…

从Jupyter Notebook到脚本

16 Aug: My second article: From Scripts To Prediction API8月16日&#xff1a;我的第二篇文章&#xff1a; 从脚本到预测API As advanced beginners, we know quite a lot: EDA, ML concepts, model architectures etc…… We can write a big Jupyter Notebook, click “Re…

加勒比海兔_加勒比海海洋物种趋势

加勒比海兔Ok, here’s a million dollar question: is the Caribbean really dying? Or, more specifically, are marine species found on Caribbean reefs becoming less abundant?好吧&#xff0c;这是一个百万美元的问题&#xff1a;加勒比海真的死了吗&#xff1f; 或者…

tornado 简易教程

引言 回想Django的部署方式 以Django为代表的python web应用部署时采用wsgi协议与服务器对接&#xff08;被服务器托管&#xff09;&#xff0c;而这类服务器通常都是基于多线程的&#xff0c;也就是说每一个网络请求服务器都会有一个对应的线程来用web应用&#xff08;如Djang…

人口密度可视化_使用GeoPandas可视化菲律宾的人口密度

人口密度可视化GeoVisualization /菲律宾。 (GeoVisualization /Philippines.) Population density is a crucial concept in urban planning. Theories on how it affects economic growth are divided. Some claim, as Rappaport does, that an economy is a form of “spati…

Unity - Humanoid设置Bip骨骼导入报错

报错如下&#xff1a; 解决&#xff1a; 原因是biped骨骼必须按照Unity humanoid的要求设置&#xff0c;在max中设置如下&#xff1a; 转载于:https://www.cnblogs.com/CloudLiu/p/10746052.html

Kubernetes - - k8s - v1.12.3 OpenLDAP统一认证

1&#xff0c;基本概念 为了方便管理和集成jenkins&#xff0c;k8s、harbor、jenkins均使用openLDAP统一认证。2&#xff0c;部署openLDAP 根据之前的文档&#xff0c;openLDAP使用GFS进行数据持久化。下载对应的openLDAP文件git clone https://github.com/xiaoqshuo/k8s-clust…

srpg 胜利条件设定_英雄联盟获胜条件

srpg 胜利条件设定介绍 (Introduction) The e-sports community has been growing rapidly in the past few years, and what used to be a casual pastime has morphed into an industry projected to generate $1.8 B in revenue by 2022. While there are many video games …

机器学习 综合评价_PyCaret:机器学习综合

机器学习 综合评价Any Machine Learning project journey starts with loading the dataset and ends (continues ?!) with the finalization of the optimum model or ensemble of models for predictions on unseen data and production deployment.任何机器学习项目的旅程都…

silverlight 3D 游戏开发

http://www.postvision.net/SilverMotion/DemoTech.aspx silverlight 3D 游戏开发 时间:2010-10-22 06:33来源:开心银光 作者:黎东海 点击: 562次意外发现一个silverlight的实时3D渲染引擎。性能比开源那些强很多。 而且支持直接加载maya,3Dmax等主流3D模型文件。 附件附上它的…

皮尔逊相关系数 相似系数_皮尔逊相关系数

皮尔逊相关系数 相似系数数据科学和机器学习统计 (STATISTICS FOR DATA SCIENCE AND MACHINE LEARNING) In the last post, we analyzed the relationship between categorical variables and categorical and continuous variables. In this case, we will analyze the relati…

Kubernetes持续交付-Jenkins X的Helm部署

Jenkins X 是一个集成化的 CI / CD 平台&#xff0c;可用于 部署在Kubernetes集群或云计算中心。支持在云计算环境下简单地开发和部署应用。本项目是在Kubernetes上的安装支持工具集。 本工具集中包含&#xff1a; Jenkins - 定制好的流水线和运行环境&#xff0c;完全整合CI/C…

中国石油大学(华东)暑期集训--二进制(BZOJ5294)【线段树】

问题 C: 二进制 时间限制: 1 Sec 内存限制: 128 MB提交: 8 解决: 2[提交] [状态] [讨论版] [命题人:]题目描述 pupil发现对于一个十进制数&#xff0c;无论怎么将其的数字重新排列&#xff0c;均不影响其是不是3的倍数。他想研究对于二进制&#xff0c;是否也有类似的性质。于…

Java 8 新特性之Stream API

1. 概述 1.1 简介 Java 8 中有两大最为重要的改革&#xff0c;第一个是 Lambda 表达式&#xff0c;另外一个则是 Stream API&#xff08;java.util.stream.*&#xff09;。 Stream 是 Java 8 中处理集合的关键抽象概念&#xff0c;它可以指定你希望对集合进行的操作&#xff0c…