RabbitMQ---work消息模型

1、work消息模型

工作队列或者竞争消费者模式
在这里插入图片描述

在第一篇教程中,我们编写了一个程序,从一个命名队列中发送并接受消息。在这里,我们将创建一个工作队列,在多个工作者之间分配耗时任务。
工作队列,又称任务队列。主要思想就是避免执行资源密集型任务时,必须等待它执行完成。相反我们稍后完成任务,我们将任务封装为消息并将其发送到队列。 在后台运行的工作进程将获取任务并最终执行作业。当你运行许多消费者时,任务将在他们之间共享,但是一个消息只能被一个消费者获取。
这个概念在Web应用程序中特别有用,因为在短的HTTP请求窗口中无法处理复杂的任务。
接下来我们来模拟这个流程:

o P:生产者:任务的发布者
o C1:消费者,领取任务并且完成任务,假设完成速度较快
o C2:消费者2:领取任务并完成任务,假设完成速度慢

面试题:避免消息堆积?

1)采用workqueue,多个消费者监听同一队列。
2)接收到消息以后,而是通过线程池,异步消费。

1.1、生产者

生产者与案例1中的几乎一样:

public class Send {private final static String QUEUE_NAME = "test_work_queue";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 循环发布任务for (int i = 0; i < 50; i++) {// 消息内容String message = "task .. " + i;channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");}// 关闭通道和连接channel.close();connection.close();}
}

不过这里我们是循环发送50条消息。

1.2、消费者1

// 消费者1
public class Recv {private final static String QUEUE_NAME = "test_work_queue";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道final Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 定义队列的消费者DefaultConsumer consumer = new DefaultConsumer(channel) {// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg = new String(body);System.out.println(" [消费者1] received : " + msg + "!");try {// 模拟完成任务的耗时:1000msThread.sleep(1000);} catch (InterruptedException e) {}// 手动ACKchannel.basicAck(envelope.getDeliveryTag(), false);}};// 监听队列。channel.basicConsume(QUEUE_NAME, false, consumer);}
}

1.3、消费者2

//消费者2
public class Recv2 {private final static String QUEUE_NAME = "test_work_queue";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道final Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 定义队列的消费者DefaultConsumer consumer = new DefaultConsumer(channel) {// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg = new String(body);System.out.println(" [消费者2] received : " + msg + "!");try {// 模拟完成任务的耗时:200msThread.sleep(200);} catch (InterruptedException e) {}// 手动ACKchannel.basicAck(envelope.getDeliveryTag(), false);}};// 监听队列。channel.basicConsume(QUEUE_NAME, false, consumer);}
}

与消费者1基本类似,就是没有设置消费耗时时间。
这里是模拟有些消费者快,有些比较慢。
接下来,两个消费者一同启动,然后发送50条消息:
在这里插入图片描述
在这里插入图片描述

可以发现,两个消费者各自消费了25条消息,而且各不相同,这就实现了任务的分发。

1.4、能者多劳

• 刚才的实现有问题吗?
o 消费者1比消费者2的效率要低,一次任务的耗时较长
o 然而两人最终消费的消息数量是一样的
o 消费者2大量时间处于空闲状态,消费者1一直忙碌
• 现在的状态属于是把任务平均分配,正确的做法应该是消费越快的人,消费的越多。
• 怎么实现呢?
o 我们可以使用basicQos方法和prefetchCount = 1设置。
o 这告诉RabbitMQ一次不要向工作人员发送多于一条消息。
o 或者换句话说,不要向工作人员发送新消息,直到它处理并确认了前一个消息。
o 相反,它会将其分派给不是仍然忙碌的下一个工作人员。
在这里插入图片描述

再次测试:

  ![在这里插入图片描述](https://img-blog.csdnimg.cn/a25bdfcd50bf41f9a6e51076560cd15f.png)

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/59115.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SystemVerilog interface详细介绍

1. Interface概念 System Verilog中引入了接口定义&#xff0c;接口与module 等价的定义&#xff0c;是要在其他的接口、module中直接定义&#xff0c;不能写在块语句中&#xff0c;跟class是不同的。接口是将一组线捆绑起来&#xff0c;可以将接口传递给module。 2. 接口的优…

Maven入门教程(一):安装Maven环境

视频教程&#xff1a;Maven保姆级教程 Maven入门教程(一)&#xff1a;安装Maven环境 Maven入门教程(二)&#xff1a;idea/Eclipse使用Maven Maven入门教程(三)&#xff1a;Maven语法 Maven入门教程(四)&#xff1a;Nexus私服 Maven入门教程(五)&#xff1a;自定义脚手架 Maven项…

Vscode画流程图

1.下载插件 Draw.id Integration 2.桌面新建文件&#xff0c;后缀名改为XXX.drawio 在vscode打开此文件 &#xff0c;就可以进行绘制流程图啦

6.Redis-hash

hash 哈希类型中的映射关系通常称为field-value&#xff0c;⽤于区分 Redis 整体的键值对&#xff08;key-value&#xff09;&#xff0c;注意这⾥的value是指field对应的值&#xff0c;不是键&#xff08;key&#xff09;对应的值&#xff0c;请注意 value 在不同上下⽂的作⽤…

R语言和Python用泊松过程扩展:霍克斯过程Hawkes Processes分析比特币交易数据订单到达自激过程时间序列...

全文下载链接&#xff1a;http://tecdat.cn/?p25880 本文描述了一个模型&#xff0c;该模型解释了交易的聚集到达&#xff0c;并展示了如何将其应用于比特币交易数据。这是很有趣的&#xff0c;原因很多。例如&#xff0c;对于交易来说&#xff0c;能够预测在短期内是否有更多…

EVO大赛是什么

价格是你所付出的东西&#xff0c;而价值是你得到的东西 EVO大赛是什么&#xff1f; “EVO”大赛全称“Evolution Championship Series”&#xff0c;是北美最高规格格斗游戏比赛&#xff0c;大赛正式更名后已经连续举办12年&#xff0c;是全世界最大规模的格斗游戏赛事。常见…

angular抛出 ExpressionChangedAfterItHasBeenCheckedError错误分析

当变更检测完成后又更改了表达式值时&#xff0c;Angular 就会抛出 ExpressionChangedAfterItHasBeenCheckedError 错误。Angular 只会在开发模式下抛出此错误。 在开发模式下&#xff0c;Angular 在每次变更检测运行后都会执行一次附加检查&#xff0c;以确保绑定没有更改。这…

Java 8 新特性——Lambda 表达式(2)

一、Java Stream API Java Stream函数式编程接口最初在Java 8中引入&#xff0c;并且与 lambda 一起成为Java开发里程碑式的功能特性&#xff0c;它极大的方便了开放人员处理集合类数据的效率。 Java Stream就是一个数据流经的管道&#xff0c;并且在管道中对数据进行操作&…

springBoot打印精美logo

文章目录 &#x1f412;个人主页&#x1f3c5;JavaEE系列专栏&#x1f4d6;前言&#xff1a;&#x1f380;文本logo &#x1f412;个人主页 &#x1f3c5;JavaEE系列专栏 &#x1f4d6;前言&#xff1a; 本篇博客主要以提供springBoot打印精美logo &#x1f380;文本logo ??…

博流RISC-V芯片BL616开发环境搭建

文章目录 1、工具安装2、代码下载3、环境变量配置4、下载交叉编译器5、编译与下载运行6、使用ninja编译 本文分别介绍博流RISC-V芯片 BL616 在 Windows和Linux 下开发环境搭建&#xff0c;本文同时适用BL618&#xff0c;BL602&#xff0c;BL702&#xff0c;BL808系列芯片。 1、…

电商数据接口API:品牌价格监控与数据分析的重要工具

一、引言 随着电子商务的快速发展&#xff0c;传统品牌企业越来越重视在线销售市场。为了在竞争激烈的市场环境中取得成功&#xff0c;企业需要实时掌握市场动态&#xff0c;了解自身产品的销售情况、价格趋势以及竞品信息。为了实现这一目标&#xff0c;各大电商平台&#xf…

每天 26,315 美元罚款?交通安全局要求特斯拉提供 Autopilot数据

根据美国国家公路交通安全管理局&#xff08;NHTSA&#xff09;最近的特别命令&#xff0c;特斯拉公司被要求提供关于其自动驾驶功能Autopilot的相关信息。这一命令是继NHTSA于2021年8月启动初步评估后&#xff0c;在2022年6月升级为正式调查的一部分&#xff0c;NHTSA近期对特…

自学设计模式(类图、设计原则、单例模式 - 饿汉/懒汉)

设计模式需要用到面向对象的三大特性——封装、继承、多态&#xff08;同名函数具有不同的状态&#xff09; UML类图 eg.—— 描述类之间的关系&#xff08;设计程序之间画类图&#xff09; : public; #: protected; -: private; 下划线: static 属性名:类型&#xff08;默认值…

NPM 管理组织包

目录 1、关于组织范围和包 1.1 管理无作用域的包 2、使用组织设置配置npm客户端 2.1 配置您的npm客户端以使用您组织的范围 为所有新包设置组织范围 为单个包设置组织范围 2.2 将默认包可见性更改为public 将单个包的包可见性设置为public 将所有包的包可见性设置为pu…

C++新经典 | C语言

目录 一、基础之查漏补缺 1.float精度问题 2.字符型数据 3.变量初值问题 4.赋值&初始化 5.头文件之<> VS " " 6.逻辑运算 7.数组 7.1 二维数组初始化 7.2 字符数组 8.字符串处理函数 8.1 strcat 8.2 strcpy 8.3 strcmp 8.4 strlen 9.函数 …

WPF 查看绑定错误——Snoop 的基本使用

关于 可以通过 Snoop 查看 WPF 程序的 Visual Tree&#xff0c;更多介绍请看 snoopwpf 快速开始 一、下载 snoopwpf.msi 安装后打开&#xff0c;选择自己的程序&#xff0c;点击 snoop&#xff08;望远镜&#xff09; 二、筛选 点击左侧下拉列表&#xff0c;选择 Show onl…

docker desktop安装es 并连接elasticsearch-head:5

首先要保证docker安装成功&#xff0c;打开cmd&#xff0c;输入docker -v&#xff0c;出现如下界面说明安装成功了 下面开始安装es 第一步&#xff1a;拉取es镜像 docker pull elasticsearch:7.6.2第二步&#xff1a;运行容器 docker run -d --namees7 --restartalways -p 9…

Maven入门教程(二):idea/Eclipse使用Maven

视频教程&#xff1a;Maven保姆级教程 Maven入门教程(一)&#xff1a;安装Maven环境 Maven入门教程(二)&#xff1a;idea/Eclipse使用Maven Maven入门教程(三)&#xff1a;Maven语法 Maven入门教程(四)&#xff1a;Nexus私服 Maven入门教程(五)&#xff1a;自定义脚手架 4.开发…

forlium 笔记 Map

用于创建交互式地图 1 主要参数 1.1. location 地图位置 地图的经纬度 import foliumm folium.Map(location[31.186358, 121.510256],zoom_start15)m 1.2 tiles 内置样式 默认是OpenStreetMap 1.2.1 Stamen Terrain 它强调了地形特征&#xff0c;如山脉、河流和道路 m …

深度学习论文: Segment Any Anomaly without Training via Hybrid Prompt Regularization

深度学习论文: Segment Any Anomaly without Training via Hybrid Prompt Regularization Segment Any Anomaly without Training via Hybrid Prompt Regularization PDF: https://arxiv.org/pdf/2305.10724.pdf PyTorch代码: https://github.com/shanglianlm0525/CvPytorch Py…