WorkQueue模型

        WorkQueues,也被称为任务队列模型。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时的处理。此时就可以使用work模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因为任务是不会被重复执行的。

P:生产者

C1:消费者-1,领取任务并且完成任务,假设完成速度较慢。

C2:消费者-2,领取任务并且完成任务,假设完成速度快。

1.生产者

public class Provider {//生产消息@Testpublic void testSendMessage() throws IOException, TimeoutException {Connection connection = RabbitMqUtil.getConnection();Channel channel = connection.createChannel();channel.queueDeclare("work",true,false,false,null);for(int i = 1;i<=200;i++){channel.basicPublish("","work",null,(i+"hello rabbitmq").getBytes());}RabbitMqUtil.closeConnectionAndChannel(channel,connection);}
}

2.消费者-1

 

public class Consumer1 {//消费消息,这里需要用main函数,因为消费端要一直监听队列,而test测试会直接结束public static void main(String[] args) throws IOException, TimeoutException {//获取连接对象Connection connection = RabbitMqUtil.getConnection();//获取连接通道Channel channel = connection.createChannel();channel.queueDeclare("work",true,false,false,null);channel.basicConsume("work",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try {Thread.sleep(2000);System.out.println("consumer1得到:"+new String(body));} catch (InterruptedException e) {e.printStackTrace();}}});//注意这里不能关闭通道和连接,因为要一直监听}
}

 3.消费者-2

public class Consumer2 {//消费消息,这里需要用main函数,因为消费端要一直监听队列,而test测试会直接结束public static void main(String[] args) throws IOException, TimeoutException {//获取连接对象Connection connection = RabbitMqUtil.getConnection();//获取连接通道Channel channel = connection.createChannel();channel.queueDeclare("work",true,false,false,null);channel.basicConsume("work",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumer2得到:"+new String(body));}});//注意这里不能关闭通道和连接,因为要一直监听}
}

4.结果分析

通过运行结果我们发现消费者1和消费2是平均处理消息的,就比如1000个消息一人处理一半。而且现在的机制是,队列中的消息会一次性全部分配给两个消费者,囤积到两个消费者处然后让两个消费者去各自慢慢消化。这样就会产生一些问题:

1.由于我们设置了消息的自动确认机制,两个消费者刚得到大量消息都还没开始消费其实就已经告诉队列我们确认完了,这样显然是不合理的。

2.消费者那边一次性囤积了大量未处理的消息,如果处理中宕机了,囤积的消息会丢失。

而且假如消费者2执行的很快,而另一个消费者1执行的很慢,这样消费者2很快执行完就空闲了,而消费者1一直迟迟执行不完。能不能改进为能者多劳的机制呢?

  • 消费者一次只接收一条未确认的消息。

  • 关闭自动确认消息。

  • 消费者处理完一条,要手动确认消息。

5.能者多劳

消费者改进:

public class Consumer1Improve {//消费消息,这里需要用main函数,因为消费端要一直监听队列,而test测试会直接结束public static void main(String[] args) throws IOException, TimeoutException {//获取连接对象Connection connection = RabbitMqUtil.getConnection();//获取连接通道Channel channel = connection.createChannel();channel.queueDeclare("work",true,false,false,null);channel.basicQos(1);//一次只接收一条//参数2:关闭自动确认channel.basicConsume("work",false,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try {Thread.sleep(2000);System.out.println("consumer1得到:"+new String(body));channel.basicAck(envelope.getDeliveryTag(),false);//手动确认消息} catch (InterruptedException e) {e.printStackTrace();}}});//注意这里不能关闭通道和连接,因为要一直监听}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/589642.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

问题描述:与编码器对应的解码器是什么,说明一下解码器的名字由来,结构,原理,特点,用处。

问题描述&#xff1a;与编码器对应的解码器是什么&#xff0c;说明一下解码器的名字由来&#xff0c;结构&#xff0c;原理&#xff0c;特点&#xff0c;用处。 问题解答&#xff1a; 定义&#xff1a;解码器是一种电子设备或程序&#xff0c;用于将经过编码的数据转换回原始…

概率论基础

1.概率论 1.1 随机事件与概率 1.1.1 基本概念 ​ 样本点(sample point)&#xff1a; 称为试验 S S S的可能结果为样本点&#xff0c;用 ω \omega ω表示。 ​ 样本空间(sample space)&#xff1a;称试验 S S S的样本点构成的集合为样本空间&#xff0c;用 Ω \Omega Ω表示…

R语言【CoordinateCleaner】——cc_gbif(): 根据通过 method 参数定义的方法,删除或标记地理空间中异常值的记录。

cc_gbif()是R语言包coordinatecleaner中的一个函数&#xff0c;用于清理GBIF&#xff08;全球生物多样性信息设施&#xff09;数据集的地理坐标。该函数可以识别潜在的坐标错误&#xff0c;并对其进行修正或删除。 以下是cc_gbifl()函数的一般用法和主要参数&#xff1a; cc_…

把form表单数据转为json,并传给父页面

阻止form表单提交&#xff0c;表单数据转为json字符串&#xff0c;并传给父页面 // 获取表单元素var form document.getElementById(myForm);// 监听表单提交事件form.addEventListener(submit, function(event) {// 在这里处理表单提交的逻辑var rental_id $("#c-id&q…

gem5学习(8):创建一个简单的缓存对象--Creating a simple cache object

目录 一、SimpleCache SimObject 二、Implementing the SimpleCache 1、getSlavePort() 2、handleRequest() 3、AccessEvent() 4、accessTiming() &#xff08;1&#xff09;缓存命中&#xff1a;sendResponse() &#xff08;2&#xff09;缓存未命中&#xff1a; 三、…

实现3x3卷积的手写FIFO

例子来自米联科例程&#xff0c; 因为不同平台之间调IP会变麻烦&#xff0c;重新阅读手册太花时间了&#xff08;虽然我觉得fifo这种常用IP尽量掌握为好&#xff09;&#xff0c;使用手写的FIFO可以节约开发的流程。 通过这个例子也可以优化自己所使用的手写FIFO。 // by C…

matlab概率论例子

高斯概率模型&#xff1a; [f,xi] ksdensity(x): returns a probability density estimate, f, for the sample in the vector x. The estimate is based on a normal kernel function, and is evaluated at 100 equally spaced points, xi, that cover the range of the da…

Mybatis行为配置之Ⅰ—缓存

专栏精选 引入Mybatis Mybatis的快速入门 Mybatis的增删改查扩展功能说明 mapper映射的参数和结果 Mybatis复杂类型的结果映射 Mybatis基于注解的结果映射 Mybatis枚举类型处理和类型处理器 再谈动态SQL Mybatis配置入门 Mybatis行为配置之Ⅰ—缓存 Mybatis行为配置…

读书笔记1-C++ Primer Plus

C是在C语言基础上开发的一种集面向对象编程&#xff08;OOP&#xff09;、通用编程和传统的过程化编程于一体的编程语言。本书是根据2003年的ISO/ANSI C标准编写的&#xff0c;通过大量短小精悍的程序详细而全面地阐述了C的基本概念和技术。 全书分17章和10个附录&#xff0c;分…

我的2024

我的2024 前言 今天是2024年的第一天&#xff0c;2024年是我人生中比较有意义的一年吧&#xff0c;从一个无忧无虑的大学生到一位社会工作人事&#xff0c;从一个不需要考虑任何事情&#xff0c;可以每天自自在在的做自己想做的事情到即将为了自己的工作&#xff0c;父母的身…

使用WAZUH检测LD_PRELAOD劫持、SQL注入、主动响应防御

目录 1、检查后门 使用工具检测后门 1.chkrootkit 2.rkhunter 手动检查文件 检查ld.so.preload文件 2、检测LD_PRELOAD ubuntu配置 wazuh配置 3、检测SQL注入 ubuntu配置 攻击模拟 4、主动响应 wauzh的安装以及设置代理可以参考本篇&#xff1a;WAZUH的安装、设置…

代码随想录算法训练营第二十天 | 654.最大二叉树、617.合并二叉树、700.二叉搜索树中的搜索、 98.验证二叉搜索树

654.最大二叉树 题目链接&#xff1a;654.最大二叉树 给定一个不重复的整数数组 nums 。 最大二叉树 可以用下面的算法从 nums 递归地构建: 创建一个根节点&#xff0c;其值为 nums 中的最大值。递归地在最大值 左边 的 子数组前缀上 构建左子树。递归地在最大值 右边 的 子…

Apache Flink连载(二十三):Flink HA - Flink基于Yarn HA

🏡 个人主页:IT贫道_大数据OLAP体系技术栈,Apache Doris,Clickhouse 技术-CSDN博客 🚩 私聊博主:加入大数据技术讨论群聊,获取更多大数据资料。 🔔 博主个人B栈地址:豹哥教你大数据的个人空间-豹哥教你大数据个人主页-哔哩哔哩视频 目录 1. Yarn HA配置 ​​​​…

Cache替换算法

由于Cache很小&#xff0c;主存很大&#xff0c;Cache很容易装满&#xff0c;Cache满了怎么办&#xff1f; ——采用替换算法。 全相联映射&#xff1a;Cache完全满了才需要替换&#xff0c;需要在全局中选择替换哪一块。直接映射&#xff1a;如果对应位置非空&#xff0c;则…

linux线程与进程

简要 在Linux系统中&#xff0c;进程&#xff08;Process&#xff09;和线程&#xff08;Thread&#xff09;是操作系统中两个重要的概念&#xff0c;它们都是用于执行程序的执行单元&#xff0c;但有一些关键的区别。 在Linux系统中&#xff0c;可以使用fork系统调用创建新…

Vue3-30-路由-嵌套路由的基本使用

什么是嵌套路由 嵌套路由 &#xff1a;就是一个组件内部还希望展示其他的组件&#xff0c;使用嵌套的方式实现页面组件的渲染。 就像 根组件 通过路由渲染 普通组件一样&#xff0c;嵌套路由也是一样的道理。 嵌套路由的相关关键配置 1、<router-view> 标签 声明 被嵌套组…

在 Spring 中操作 Redis

&#x1f9f8;欢迎来到dream_ready的博客&#xff0c;&#x1f4dc;相信您对博主首页也很感兴趣o (ˉ▽ˉ&#xff1b;) &#x1f4dc;redis和缓存及相关问题和解决办法 什么是缓存预热、缓存穿透、缓存雪崩、缓存击穿 目录 1、引入依赖 2、对 Redis 的配置文件进行书写 3、S…

kivy PageLayout 的说明及例子

PageLayout 是 Kivy GUI 框架中的一个布局管理器&#xff0c;它允许开发者在同一个窗口中放置多个页面&#xff0c;用户可以通过滑动来浏览这些页面。PageLayout 的工作方式类似于一个可以滑动的标签页&#xff08;TabbedPanel&#xff09;&#xff0c;但其页面可以自由调整大小…

【Python_PySide2学习笔记(二十二)】进度对话框QProgressDialog类的基本用法

进度对话框QProgressDialog类的基本用法 进度对话框QProgressDialog类的基本用法前言一、QProgressDialog 的常用方法1、创建进度对话框2、进度对话框设置窗口标题3、进度对话框隐藏"最大化"、"最小化"、"关闭"4、进度对话框设置是否自动关闭5、…

Atlas Hook 导入 Hive 元数据

Atlas 部署之后就可以导入 Hive 元数据&#xff0c;这部分工作由 Atlas 组件 Hook 来完成。初次导入 Hive 元数据需要通过执行 shell 脚本来完成&#xff0c;然后&#xff0c;Atlas 就可以自动同步增量元数据信息了。下面我介绍一下如何完成这些工作。 初次导入 Hive 元数据 …