(四)RabbitMQ高级特性(消费端限流、利用限流实现不公平分发、消息存活时间、优先级队列

Lison <dreamlison@163.com>, v1.0.0, 2023.06.23

RabbitMQ高级特性(消费端限流、利用限流实现不公平分发、消息存活时间、优先级队列

文章目录

  • RabbitMQ高级特性(消费端限流、利用限流实现不公平分发、消息存活时间、优先级队列
    • 消费端限流
    • 利用限流实现不公平分发
    • 消息存活时间
    • 优先级队列

消费端限流

在这里插入图片描述

之前我们讲过MQ可以对请求进行“削峰填谷”,即通过消费端限流的方式限制消息的拉取速度,达到保护消费端的目的。

1、生产者批量发送消息

@Test
public void testSendBatch() {// 发送十条消息for (int i = 0; i < 10; i++) {rabbitTemplate.convertAndSend("my_topic_exchange", "my_routing", "send message..."+i);}
}

2、消费端配置限流机制

spring:rabbitmq:host: 127.0.0.1port: 5672username: adminpassword: 1233456virtual-host: /listener:simple:# 限流机制必须开启手动签收acknowledge-mode: manual# 消费端最多拉取5条消息消费,签收后不满5条才会继续拉取消息。prefetch: 5

3、消费者监听队列

@Component
public class QosConsumer{@RabbitListener(queues = "my_queue")public void listenMessage(Message message, Channel channel) throws IOException, InterruptedException {// 1.获取消息System.out.println(new String(message.getBody()));// 2.模拟业务处理Thread.sleep(3000);// 3.签收消息channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);}
}

利用限流实现不公平分发

在RabbitMQ中,多个消费者监听同一条队列,则队列默认采用的轮询分发。但是在某种场景下这种策略并不是很好,例如消费者1处 理任务的速度非常快,而其他消费者处理速度却很慢。此时如果采用公平分发,则消费者1有很大一部分时间处于空闲状态。此时可以 采用不公平分发,即谁处理的快,谁处理的消息多

1、生产者批量发送消息

@Test
public void testSendBatch() {// 发送十条消息for (int i = 0; i < 10; i++) {rabbitTemplate.convertAndSend("my_topic_exchange", "my_routing", "send message..."+i);}
}

端配置不公平分发

spring:rabbitmq:host: 127.0.0.1port: 5672username: adminpassword: 1233456virtual-host: /listener:simple:# 限流机制必须开启手动签收acknowledge-mode: manual# 消费端最多拉取1条消息消费,这样谁处理的快谁拉取下一条消息,实现了不公平分发prefetch: 1

编写两个消费者

@Component
public class UnfairConsumer {// 消费者1@RabbitListener(queues = "my_queue")public void listenMessage1(Message message, Channel channel) throws Exception{//1.获取消息System.out.println("消费者1:"+new String(message.getBody(),"UTF-8"));//2. 处理业务逻辑Thread.sleep(500); // 消费者1处理快//3. 手动签收channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);}// 消费者2@RabbitListener(queues = "my_queue")public void listenMessage2(Message message, Channel channel) throws Exception{//1.获取消息System.out.println("消费者2:"+new String(message.getBody(),"UTF-8"));//2. 处理业务逻辑Thread.sleep(3000);// 消费者2处理慢//3. 手动签收channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);}
}

消息存活时间

RabbitMQ可以设置消息的存活时间(Time To Live,简称TTL), 当消息到达存活时间后还没有被消费,会被移出队列。RabbitMQ 可以对队列的所有消息设置存活时间,也可以对某条消息设置存活时间。

设置队列所有消息存活时间

1、在创建队列时设置其存活时间:

@Configuration
public class RabbitConfig2 {private final String EXCHANGE_NAME="my_topic_exchange2";private final String QUEUE_NAME="my_queue2";// 1.创建交换机@Bean("bootExchange2")public Exchange getExchange2(){return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();}// 2.创建队列@Bean("bootQueue2")public Queue getMessageQueue2(){return QueueBuilder.durable(QUEUE_NAME).ttl(10000) //队列的每条消息存活10s.build();}// 3.将队列绑定到交换机@Beanpublic Binding bindMessageQueue2(@Qualifier("bootExchange2") Exchange exchange,@Qualifier("bootQueue2") Queue queue){return BindingBuilder.bind(queue).to(exchange).with("my_routing").noargs();}
}

2、生产者批量生产消息,测试存活时间

@Test
public void testSendBatch2() throws InterruptedException {// 发送十条消息for (int i = 0; i < 10; i++) {rabbitTemplate.convertAndSend("my_topic_exchange2", "my_routing", "send message..."+i);}
}

设置单条消息存活时间

@Test
public void testSendMessage() {//设置消息属性MessageProperties messageProperties = new MessageProperties();//设置存活时间messageProperties.setExpiration("10000");// 创建消息对象Message message = new Message("send message...".getBytes(StandardCharsets.UTF_8), messageProperties);// 发送消息rabbitTemplate.convertAndSend("my_topic_exchange", "my_routing", message);
}

注意:

1 如果设置了单条消息的存活时间,也设置了队列的存活时间,以时间短的为准。

2 消息过期后,并不会马上移除消息,只有消息消费到队列顶端时,才会移除该消息。

@Test
public void testSendMessage2() {
for (int i = 0; i < 10; i++) {
if (i == 5) {
// 1.创建消息属性
MessageProperties messageProperties = new MessageProperties();
// 2.设置存活时间
messageProperties.setExpiration(“10000”);
// 3.创建消息对象
Message message = new Message((“send message…” +i).getBytes(),messageProperties);
// 4.发送消息
rabbitTemplate.convertAndSend(“my_topic_exchange”, “my_routing”, message);
} else {
rabbitTemplate.convertAndSend(“my_topic_exchange”, “my_routing”, “sendmessage…” + i);
}
}
}
在以上案例中,i=5的消息才有过期时间,10s后消息并没有 马上被移除,但该消息已经不会被消费了,当它到达队列顶 端时会被移除。

优先级队列

假设在电商系统中有一个订单催付的场景,即客户在一段时间内未付款会给用户推送一条短信提醒,但是系统中分为大型商家和小型商家。比如像苹果,小米这样大商家一年能给我们创造很大的利润,所以在订单量大时,他们的订单必须得到优先处理,此时就需要为不同的消息设置不同的优先级,此时我们要使用优先级队列。

1、创建队列和交换机

@Configuration
public class RabbitConfig3 {private final String EXCHANGE_NAME="priority_exchange";private final String QUEUE_NAME="priority_queue";// 1.创建交换机@Bean(EXCHANGE_NAME)public Exchange priorityExchange(){return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();}// 2.创建队列@Bean(QUEUE_NAME)public Queue priorityQueue(){return QueueBuilder.durable(QUEUE_NAME)//设置队列的最大优先级,最大可以设置到255,官网推荐不要超过10,,如果设置太高比较浪费资源.maxPriority(10).build();}// 3.将队列绑定到交换机@Beanpublic Binding bindPriority(@Qualifier(EXCHANGE_NAME) Exchange exchange, @Qualifier(QUEUE_NAME) Queue queue){return BindingBuilder.bind(queue).to(exchange).with("my_routing").noargs();}
}

2、编写生产者

@Test
public void testPriority() {for (int i = 0; i < 10; i++) {if (i == 5) {// i为5时消息的优先级较高MessageProperties messageProperties = new MessageProperties();messageProperties.setPriority(9);Message message = new Message(("send message..." +i).getBytes(StandardCharsets.UTF_8), messageProperties);rabbitTemplate.convertAndSend("priority_exchange", "my_routing", message);} else {rabbitTemplate.convertAndSend("priority_exchange", "my_routing", "send message..." + i);}}
}

3、编写消费者

@Component
public class PriorityConsumer {@RabbitListener(queues = "priority_queue")public void listenMessage(Message message, Channel channel) throws Exception{//获取消息System.out.println(new String(message.getBody()));//手动签收channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/10425.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

一个简单的 Android 版本目录(Version catalog)实现指南

一个简单的 Android 版本目录实现指南 使用 TOML 格式 在本文中&#xff0c;我们将探讨版本目录以及如何实现它。 版本目录 Gradle 版本目录使您能够以可扩展的方式添加和维护依赖项和插件。因此&#xff0c;不必在各个构建文件中硬编码依赖项名称和版本&#xff0c;而是在目…

【NodeJs】如何将Markdown文件生成HTML文件在线浏览

经常用的编辑器是Markdown&#xff0c;有自带预览排版效果功能的&#xff0c;预览的是HTML网页&#xff0c;如果想要将它转换成HTML网页文件&#xff0c;要怎么做呢。 首先&#xff0c;借助Node的插件来做&#xff0c;在使用前&#xff0c;确保电脑已安装了NodeJS应用&#xf…

Linux相关指令(上)

常见指令&#xff1a; 1 pwd&#xff1a;查看用户当前所在目录 以下面的路径为例&#xff1a; 2 ls&#xff1a;对于目录&#xff0c;该命令列出该目录下的所有子目录与文件。 对于文件&#xff0c;将列出文件名以及其他信息 ls-l&#xff08;or ll&#xff09;&#xff1a;列…

【西安交通大学】:融合传统与创新的学府之旅

【西安交通大学】&#xff1a;融合传统与创新的学府之旅 引言历史与发展学校特色学科优势院系专业校园环境与设施学生生活与社团活动校友荣誉与成就未来发展展望总结&#x1f340;小结&#x1f340; &#x1f389;博客主页&#xff1a;小智_x0___0x_ &#x1f389;欢迎关注&…

Huge and Efficient! 一文了解大规模预训练模型高效训练技术

本文分为三部分介绍了大模型高效训练所需要的主要技术&#xff0c;并展示当前较为流行的训练加速库的统计。文章也同步发布在AI Box知乎专栏&#xff08;知乎搜索 AI Box专栏&#xff09;&#xff0c;欢迎大家在知乎专栏的文章下方评论留言&#xff0c;交流探讨&#xff01; 引…

效率与质量兼备的6个设计工具!

今天本文为大家推荐的这6个设计工具&#xff0c;将帮助设计师实现高效工作&#xff0c;同时也更好地展示自己的创作力&#xff0c;一起来看看吧&#xff01; 1、即时设计 即时设计是一款国内的设计工具&#xff0c;它为设计师提供了非常多实用的设计功能和精致的设计素材&…

【C++】开源:grpc远程过程调用(RPC)配置与使用

&#x1f60f;★,:.☆(&#xffe3;▽&#xffe3;)/$:.★ &#x1f60f; 这篇文章主要介绍grpc远程过程调用&#xff08;RPC&#xff09;配置与使用。 无专精则不能成&#xff0c;无涉猎则不能通。。——梁启超 欢迎来到我的博客&#xff0c;一起学习&#xff0c;共同进步。 喜…

【转载+修改】pytorch中backward求梯度方法的具体解析

原则上&#xff0c;pytorch不支持张量对张量的求导&#xff0c;它只支持标量对张量的求导 我们先看标量对张量求导的情况 import torch xtorch.ones(2,2,requires_gradTrue) print(x) print(x.grad_fn)输出&#xff0c;由于x是被直接创建的&#xff0c;也就是说它是一个叶子节…

Linux查看内存的几种方法

PS的拼接方法 ps aux|head -1;ps aux|grep -v PID|sort -rn -k 4|head 进程的 status 比如说你要查看的进程pid是33123 cat /proc/33123/status VmRSS: 表示占用的物理内存 top PID&#xff1a;进程的ID USER&#xff1a;进程所有者 PR&#xff1a;进程的优先级别&#x…

python内置函数

https://www.runoob.com/python/python-built-in-functions.html https://www.runoob.com/python3/python3-function.html

SSD寿命和写放大测试

一、简述 SSD寿命规格&#xff0c;业界标准为TBW&#xff0c;TBW指的是Terabyte Writteb写入的兆兆字节&#xff0c;也有定义为Total Bytes Written&#xff0c;SSD使用寿命结束之前指定工作量可以写入SSD的总数据量&#xff0c;用来表达固态硬盘的寿命指标。 因为 SSD 使用 N…

同步、异步、阻塞、非阻塞

一、概念 同步与异步&#xff08;线程间调用&#xff09;的区别&#xff1a;关注的是调用方与被调用方之间的交互方式。同步调用会等待被调用方的结果返回&#xff0c;而异步调用则不会等待结果立即返回&#xff0c;可以通过回调或其他方式获取结果。 阻塞非阻塞&#xff08;…

springboot集成

maven配置 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency><groupId>org.apache.commons</groupId><artifactId>…

wordpress我的个人网站搭建

WordPress介绍 WordPress是一个功能强大且易于使用的网站管理平台。它是基于PHP和MySQL构建的&#xff0c;可以在各种不同的主机上运行。 wordpress对服务器的要求 需求最低版本要求PHP7.4 或更高版本MySQL5.6 或更高版本Web服务器任意&#xff08;如&#xff1a;Apache、Ng…

一套流程6个步骤,教你如何正确采购询价

采购询价&#xff08;RFQ&#xff09;是一种竞争性投标文件&#xff0c;用于邀请供应商或承包商就标准化或重复生产的产品或服务提交报价。 询价通常用于大批量/低价值项目&#xff0c;买方必须提供技术规格和商业要求&#xff0c;该文件有时也称为招标书或投标邀请书。询价流…

git恢复删除的分支

1.查看被删除的分支 git remote prune --dry-run origin 被删除的分支是191 2.找到被删除分支的最后一次提交记录的commit SHA值 git reflog 最后一次提交的commit SHA值是3fa7532 3.恢复分支 git checkout -b xiaomeng 3fa7532 4.恢复成功后提交到远端&#xff0c;over&…

ubuntu20.04 安装 docker engine

打开docker官网 点击上图中间的Linux&#xff0c;会是这样&#xff1a; 点击上图的左边栏的 Docker Engine,点击install, 点击 Ubuntu&#xff0c;会是这样&#xff1a; 把页面翻下来&#xff0c;先按照 Insstallation methods 中的 set up thre repository&#xff0c;执行这些…

pytorch工具——认识pytorch

目录 pytorch的基本元素操作创建一个没有初始化的矩阵创建一个有初始化的矩阵创建一个全0矩阵并可指定数据元素类型为long直接通过数据创建张量通过已有的一个张量创建相同尺寸的新张量利用randn_like方法得到相同尺寸张量&#xff0c;并且采用随机初始化的方法为其赋值采用.si…

压力测试-商场项目

1.压力测试 压力测试是给软件不断加压&#xff0c;强制其在极限的情况下运行&#xff0c;观察它可以运行到何种程度&#xff0c;从而发现性能缺陷&#xff0c;是通过搭建与实际环境相似的测试环境&#xff0c;通过测试程序在同一时间内或某一段时间内&#xff0c;向系统发送预…

项目:点餐系统1

项目简介&#xff1a;实现一个http点餐系统服务器&#xff0c;能够支持用户在浏览器访问服务器获取餐馆首页&#xff0c;进行菜品以及订单管理。 具体模型如下&#xff1a; 用户分类&#xff1a; 管理员&#xff1a;进行订单以及菜品管理&#xff08;菜品&订单的增删改查&a…