WorkQueue模型

        WorkQueues,也被称为任务队列模型。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时的处理。此时就可以使用work模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因为任务是不会被重复执行的。

P:生产者

C1:消费者-1,领取任务并且完成任务,假设完成速度较慢。

C2:消费者-2,领取任务并且完成任务,假设完成速度快。

1.生产者

public class Provider {//生产消息@Testpublic void testSendMessage() throws IOException, TimeoutException {Connection connection = RabbitMqUtil.getConnection();Channel channel = connection.createChannel();channel.queueDeclare("work",true,false,false,null);for(int i = 1;i<=200;i++){channel.basicPublish("","work",null,(i+"hello rabbitmq").getBytes());}RabbitMqUtil.closeConnectionAndChannel(channel,connection);}
}

2.消费者-1

 

public class Consumer1 {//消费消息,这里需要用main函数,因为消费端要一直监听队列,而test测试会直接结束public static void main(String[] args) throws IOException, TimeoutException {//获取连接对象Connection connection = RabbitMqUtil.getConnection();//获取连接通道Channel channel = connection.createChannel();channel.queueDeclare("work",true,false,false,null);channel.basicConsume("work",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try {Thread.sleep(2000);System.out.println("consumer1得到:"+new String(body));} catch (InterruptedException e) {e.printStackTrace();}}});//注意这里不能关闭通道和连接,因为要一直监听}
}

 3.消费者-2

public class Consumer2 {//消费消息,这里需要用main函数,因为消费端要一直监听队列,而test测试会直接结束public static void main(String[] args) throws IOException, TimeoutException {//获取连接对象Connection connection = RabbitMqUtil.getConnection();//获取连接通道Channel channel = connection.createChannel();channel.queueDeclare("work",true,false,false,null);channel.basicConsume("work",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumer2得到:"+new String(body));}});//注意这里不能关闭通道和连接,因为要一直监听}
}

4.结果分析

通过运行结果我们发现消费者1和消费2是平均处理消息的,就比如1000个消息一人处理一半。而且现在的机制是,队列中的消息会一次性全部分配给两个消费者,囤积到两个消费者处然后让两个消费者去各自慢慢消化。这样就会产生一些问题:

1.由于我们设置了消息的自动确认机制,两个消费者刚得到大量消息都还没开始消费其实就已经告诉队列我们确认完了,这样显然是不合理的。

2.消费者那边一次性囤积了大量未处理的消息,如果处理中宕机了,囤积的消息会丢失。

而且假如消费者2执行的很快,而另一个消费者1执行的很慢,这样消费者2很快执行完就空闲了,而消费者1一直迟迟执行不完。能不能改进为能者多劳的机制呢?

  • 消费者一次只接收一条未确认的消息。

  • 关闭自动确认消息。

  • 消费者处理完一条,要手动确认消息。

5.能者多劳

消费者改进:

public class Consumer1Improve {//消费消息,这里需要用main函数,因为消费端要一直监听队列,而test测试会直接结束public static void main(String[] args) throws IOException, TimeoutException {//获取连接对象Connection connection = RabbitMqUtil.getConnection();//获取连接通道Channel channel = connection.createChannel();channel.queueDeclare("work",true,false,false,null);channel.basicQos(1);//一次只接收一条//参数2:关闭自动确认channel.basicConsume("work",false,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try {Thread.sleep(2000);System.out.println("consumer1得到:"+new String(body));channel.basicAck(envelope.getDeliveryTag(),false);//手动确认消息} catch (InterruptedException e) {e.printStackTrace();}}});//注意这里不能关闭通道和连接,因为要一直监听}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/589642.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

gem5学习(8):创建一个简单的缓存对象--Creating a simple cache object

目录 一、SimpleCache SimObject 二、Implementing the SimpleCache 1、getSlavePort() 2、handleRequest() 3、AccessEvent() 4、accessTiming() &#xff08;1&#xff09;缓存命中&#xff1a;sendResponse() &#xff08;2&#xff09;缓存未命中&#xff1a; 三、…

matlab概率论例子

高斯概率模型&#xff1a; [f,xi] ksdensity(x): returns a probability density estimate, f, for the sample in the vector x. The estimate is based on a normal kernel function, and is evaluated at 100 equally spaced points, xi, that cover the range of the da…

Mybatis行为配置之Ⅰ—缓存

专栏精选 引入Mybatis Mybatis的快速入门 Mybatis的增删改查扩展功能说明 mapper映射的参数和结果 Mybatis复杂类型的结果映射 Mybatis基于注解的结果映射 Mybatis枚举类型处理和类型处理器 再谈动态SQL Mybatis配置入门 Mybatis行为配置之Ⅰ—缓存 Mybatis行为配置…

读书笔记1-C++ Primer Plus

C是在C语言基础上开发的一种集面向对象编程&#xff08;OOP&#xff09;、通用编程和传统的过程化编程于一体的编程语言。本书是根据2003年的ISO/ANSI C标准编写的&#xff0c;通过大量短小精悍的程序详细而全面地阐述了C的基本概念和技术。 全书分17章和10个附录&#xff0c;分…

使用WAZUH检测LD_PRELAOD劫持、SQL注入、主动响应防御

目录 1、检查后门 使用工具检测后门 1.chkrootkit 2.rkhunter 手动检查文件 检查ld.so.preload文件 2、检测LD_PRELOAD ubuntu配置 wazuh配置 3、检测SQL注入 ubuntu配置 攻击模拟 4、主动响应 wauzh的安装以及设置代理可以参考本篇&#xff1a;WAZUH的安装、设置…

Apache Flink连载(二十三):Flink HA - Flink基于Yarn HA

🏡 个人主页:IT贫道_大数据OLAP体系技术栈,Apache Doris,Clickhouse 技术-CSDN博客 🚩 私聊博主:加入大数据技术讨论群聊,获取更多大数据资料。 🔔 博主个人B栈地址:豹哥教你大数据的个人空间-豹哥教你大数据个人主页-哔哩哔哩视频 目录 1. Yarn HA配置 ​​​​…

Cache替换算法

由于Cache很小&#xff0c;主存很大&#xff0c;Cache很容易装满&#xff0c;Cache满了怎么办&#xff1f; ——采用替换算法。 全相联映射&#xff1a;Cache完全满了才需要替换&#xff0c;需要在全局中选择替换哪一块。直接映射&#xff1a;如果对应位置非空&#xff0c;则…

linux线程与进程

简要 在Linux系统中&#xff0c;进程&#xff08;Process&#xff09;和线程&#xff08;Thread&#xff09;是操作系统中两个重要的概念&#xff0c;它们都是用于执行程序的执行单元&#xff0c;但有一些关键的区别。 在Linux系统中&#xff0c;可以使用fork系统调用创建新…

Vue3-30-路由-嵌套路由的基本使用

什么是嵌套路由 嵌套路由 &#xff1a;就是一个组件内部还希望展示其他的组件&#xff0c;使用嵌套的方式实现页面组件的渲染。 就像 根组件 通过路由渲染 普通组件一样&#xff0c;嵌套路由也是一样的道理。 嵌套路由的相关关键配置 1、<router-view> 标签 声明 被嵌套组…

在 Spring 中操作 Redis

&#x1f9f8;欢迎来到dream_ready的博客&#xff0c;&#x1f4dc;相信您对博主首页也很感兴趣o (ˉ▽ˉ&#xff1b;) &#x1f4dc;redis和缓存及相关问题和解决办法 什么是缓存预热、缓存穿透、缓存雪崩、缓存击穿 目录 1、引入依赖 2、对 Redis 的配置文件进行书写 3、S…

kivy PageLayout 的说明及例子

PageLayout 是 Kivy GUI 框架中的一个布局管理器&#xff0c;它允许开发者在同一个窗口中放置多个页面&#xff0c;用户可以通过滑动来浏览这些页面。PageLayout 的工作方式类似于一个可以滑动的标签页&#xff08;TabbedPanel&#xff09;&#xff0c;但其页面可以自由调整大小…

Linux常用命令大全总结及讲解(超详细版)

前言&#xff1a; Linux 是一个基于Linux 内核的开源类Unix 操作系统&#xff0c;Linus Torvalds于 1991 年 9 月 17 日首次发布的操作系统内核。Linux 通常打包为Linux 发行版。 Linux 最初是为基于Intel x86架构的个人计算机开发的&#xff0c;但此后被移植到的平台比任何其…

K8S 中对 Windows 节点的利用

目录 漏洞概述 漏洞详情 ​编辑 漏洞验证 补丁分析 在集群中探索 参考资料 在许多组织中&#xff0c;所运行的很大一部分服务和应用是 Windows 应用。Windows 容器提供了一种封装进程和包依赖项的方式&#xff0c;从而简化了 DevOps 实践&#xff0c;令 Windows 应用程序…

【xdma】 pcie.bar设置

FPGA优质开源项目– PCIE通信 xdma 两者保持一致 FPGA开源项目 – PCIE I/O控制卡 xdma PCIe的XDMA应用 读写部分分为两种&#xff0c;一种是数据的读写&#xff0c;另一种是配置数据的读写&#xff0c;在数据读写部分&#xff0c;DMA通过MIG控制DDR完成数据读写。配置数据…

使用 Tkinter 制作一个进制转换工具,好用!

在平时工作学习当中&#xff0c;我们经常会编写一些简单的 Python GUI 工具&#xff0c;以此来完成各种各样的自动化任务&#xff0c;比如批量处理文件&#xff0c;批量处理图片等等。当我们进行这些工具的编写之时&#xff0c;往往只关注了功能的实现&#xff0c;而忽略了页面…

基于Docker的软件环境部署脚本,持续更新~

使用时CtrlF搜索你想要的环境&#xff0c;如果没有你想要的环境&#xff0c;可以评论留言&#xff0c;会尽力补充。 本文提供的部署脚本默认参数仅适合开发测试&#xff0c;请根据实际情况调节参数。 数据库 MySQL version: 3.9 services:mysql:image: mysql:8.0.35container…

【Unity美术】Unity工程师对3D模型需要达到的了解【二】

&#x1f468;‍&#x1f4bb;个人主页&#xff1a;元宇宙-秩沅 &#x1f468;‍&#x1f4bb; hallo 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍&#x1f4bb; 本文由 秩沅 原创 &#x1f468;‍&#x1f4bb; 收录于专栏&#xff1a;Uni…

一元函数微分学——刷题(8

目录 1.题目&#xff1a;2.解题思路和步骤&#xff1a;3.总结&#xff1a;小结&#xff1a; 1.题目&#xff1a; 2.解题思路和步骤&#xff1a; 先看A&#xff0c;既然存在&#xff0c;那么f(x)和x属于同阶无穷小&#xff0c;所以f(0)0&#xff0c;没问题 再看C&#xff0c;结…

UntiyShader(七)Debug

目录 前言 一、利用假彩色图像 二、利用Visual Studio 三、帧调试器 前言 Debug&#xff08;调试&#xff09;&#xff0c;是程序员检查问题的一种方法&#xff0c;对于一个Shader调试更是一种噩梦&#xff0c;这也是Shader难写的原因之一——如果效果不对&#xff0c;我们…

ubuntu22.04安装anacoda遇到的坑

这几天把用了3年的windows10换成了ubuntu22.04 各种环境都得配置&#xff0c;本文记录下遇到的坑。 1、anacoda在ubuntu上也可以用官方也提供了安装包&#xff0c;但是没有图形界面&#xff0c;需要以命令行的方式安装和运行配置 1.1 安装&#xff1a;官网下载后&#xff0c;…