【转】RabbitMQ六种队列模式-2.工作队列模式

前言

RabbitMQ六种队列模式-简单队列
RabbitMQ六种队列模式-工作队列 [本文]
RabbitMQ六种队列模式-发布订阅
RabbitMQ六种队列模式-路由模式
RabbitMQ六种队列模式-主题模式

上文我们了解了 RabbitMQ 六种队列模式中的简单队列,代码也是非常的简单,比较容易理解。

但是简单队列有个缺点,简单队列是一一对应的关系,即点对点,一个生产者对应一个消费者,按照这个逻辑,如果我们有一些比较耗时的任务,也就意味着需要大量的时间才能处理完毕,显然简单队列模式并不能满足我们的工作需求,我们今天再来看看工作队列。

文章目录

1. 什么是工作队列2. 代码部分2.1 生产者2.2 消费者3. 循环分发3.1 启动生产者3.2 启动两个消费者3.3 公平分发4. 消息持久化4.1 问题背景4.2 参数配置5. 工作队列总结

1. 什么是工作队列

工作队列:用来将耗时的任务分发给多个消费者(工作者)

主要解决问题:处理资源密集型任务,并且还要等他完成。有了工作队列,我们就可以将具体的工作放到后面去做,将工作封装为一个消息,发送到队列中,一个工作进程就可以取出消息并完成工作。如果启动了多个工作进程,那么工作就可以在多个进程间共享。

工作队列也称为公平性队列模式,怎么个说法呢?

循环分发,假如我们拥有两个消费者,默认情况下,RabbitMQ 将按顺序将每条消息发送给下一个消费者,平均而言,每个消费者将获得相同数量的消息,这种分发消息的方式称为轮询。

看代码吧。

2. 代码部分

2.1 生产者

创建50个消息

public class Producer2 {/** 队列名称 */private static final String QUEUE_NAME = "test_queue";public static void main(String[] args) throws IOException, TimeoutException {/** 1.获取连接 */Connection newConnection = MQConnectionUtils.newConnection();/** 2.创建通道 */Channel channel = newConnection.createChannel();/**3.创建队列声明 */channel.queueDeclare(QUEUE_NAME, false, false, false, null);/**保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息 */channel.basicQos(1);for (int i = 1; i <= 50; i++) {String msg = "生产者消息_" + i;System.out.println("生产者发送消息:" + msg);/**4.发送消息 */channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());}channel.close();newConnection.close();}}

2.2 消费者

public class Customer2_1 {/*** 队列名称*/private static final String QUEUE_NAME = "test_queue";public static void main(String[] args) throws IOException, TimeoutException {System.out.println("001");/** 1.获取连接 */Connection newConnection = MQConnectionUtils.newConnection();/** 2.获取通道 */final Channel channel = newConnection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);/** 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息 */channel.basicQos(1);DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)throws IOException {String msgString = new String(body, "UTF-8");System.out.println("消费者获取消息:" + msgString);try {Thread.sleep(1000);} catch (Exception e) {} finally {/** 手动回执消息 */channel.basicAck(envelope.getDeliveryTag(), false);}}};/** 3.监听队列 */channel.basicConsume(QUEUE_NAME, false, defaultConsumer);}}

3. 循环分发

3.1 启动生产者

3.2 启动两个消费者

在生产者中我们发送了50条消息进入队列,而上方消费者启动图里很明显的看到轮询的效果,就是每个消费者会分到相同的队列任务。

3.3 公平分发

由于上方模拟的是非常简单的消息队列的消费,假如有一些非常耗时的任务,某个消费者在缓慢地进行处理,而另一个消费者则空闲,显然是非常消耗资源的。

再举一个例子,一个1年的程序员,跟一个3年的程序员,分配相同的任务量,明显3年的程序员处理起来更加得心应手,很快就无所事事了,但是3年的程序员拿着非常高的薪资!显然3年的程序员应该承担更多的责任,那怎么办呢?

公平分发。

其实发生上述问题的原因是 RabbitMQ 收到消息后就立即分发出去,而没有确认各个工作者未返回确认的消息数量,类似于TCP/UDP中的UDP,面向无连接。

因此我们可以使用 basicQos 方法,并将参数 prefetchCount 设为1,告诉 RabbitMQ 我每次值处理一条消息,你要等我处理完了再分给我下一个。这样 RabbitMQ 就不会轮流分发了,而是寻找空闲的工作者进行分发。

关键性代码:

/** 2.获取通道 */
final Channel channel = newConnection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/** 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息 */
channel.basicQos(1);

4. 消息持久化

4.1 问题背景

上边我们提到的公平分发是由消费者收取消息时确认解决的,但是这里面又会出现被 kill 的情况。

当有多个消费者同时收取消息,且每个消费者在接收消息的同时,还要处理其它的事情,且会消耗很长的时间。在此过程中可能会出现一些意外,比如消息接收到一半的时候,一个消费者死掉了。

这种情况要使用消息接收确认机制,可以执行上次宕机的消费者没有完成的事情。

但是在默认情况下,我们程序创建的消息队列以及存放在队列里面的消息,都是非持久化的。当RabbitMQ死掉了或者重启了,上次创建的队列、消息都不会保存。

怎么办呢?

4.2 参数配置

参数配置一:生产者创建队列声明时,修改第二个参数为 true

/**3.创建队列声明 */
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

参数配置二:生产者发送消息时,修改第三个参数为MessageProperties.PERSISTENT_TEXT_PLAIN

for (int i = 1; i <= 50; i++) {String msg = "生产者消息_" + i;System.out.println("生产者发送消息:" + msg);/**4.发送消息 */channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
}

5. 工作队列总结

1、循环分发:消费者端在信道上打开消息应答机制,并确保能返回接收消息的确认信息,这样可以保证消费者发生故障也不会丢失消息。

2、消息持久化:服务器端和客户端都要指定队列的持久化和消息的持久化,这样可以保证RabbitMQ重启,队列和消息也不会丢失。

3、公平分发:指定消费者接收的消息个数,避免出现消息均匀推送出现的资源不合理利用的问题。

案例代码:https://www.lanzous.com/i5ydu6d

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/437073.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

redis 哨兵模式 cluster模式区别_Redis哨兵(Sentinel)模式快速入门

当主服务器宕机后&#xff0c;需要手动把一台从服务器切换为主服务器&#xff0c;这就需要人工干预&#xff0c;费事费力&#xff0c;还会造成一段时间内服务不可用。 所以更多时候&#xff0c;我们优先考虑哨兵(sentinel) 模式。Redis sentinel是Redis高可用实现方案&#xff…

IOS开发入门笔记

IB&#xff1a;IB是指Interface Builder&#xff0c;如IBAction&#xff0c;IBOutlet。IBOutlet和IBAction是什么&#xff1f;IBOutlet&#xff1a;中文翻译应该是&#xff1a;插座&#xff0c;连接点&#xff08;书上写的是出口&#xff09;。autorelease相当于Qt中的deleteLa…

【转】RabbitMQ六种队列模式-3.发布订阅模式

前言 RabbitMQ六种队列模式-简单队列RabbitMQ六种队列模式-工作队列RabbitMQ六种队列模式-发布订阅 [本文]RabbitMQ六种队列模式-路由模式RabbitMQ六种队列模式-主题模式 上文的工作队列模式是直接在生产者与消费者里声明好一个队列&#xff0c;这种情况下消息只会对应同类型的…

Microsoft C 运行时库 (CRT) 参考

链接&#xff1a;https://docs.microsoft.com/zh-cn/cpp/c-runtime-library/c-run-time-library-reference?viewmsvc-160 重要文章&#xff1a; 跨 DLL 边界传递 CRT 对象时可能的错误 如果应用程序使用多个 CRT 版本&#xff0c;将存在什么问题&#xff1f;

组建一个局域网一般会用到哪些设备_路由器和交换机的区别是什么?在实际使用中有哪些不同...

电工之家&#xff1a;www.dgzj.com QQ群&#xff1a;2179090关注电工之家官方微信公众号“电工之家”&#xff0c;收获更多经验知识。其实关于路由器和交换机确实有很多人容易混淆&#xff0c;包括猫MOdem&#xff0c;我最初就认为猫就是路由器&#xff0c;现在回想起来真的傻…

【转】RabbitMQ六种队列模式-4.路由模式

前言 RabbitMQ六种队列模式-简单队列RabbitMQ六种队列模式-工作队列RabbitMQ六种队列模式-发布订阅RabbitMQ六种队列模式-路由模式 [本文]RabbitMQ六种队列模式-主题模式 本文带大家了解 RabbitMQ 队列模式中的路由模式。 其实只要看过上篇发布模式后&#xff0c;相信路由模式…

百度二年级手工机器人_让父母少弯腰的家务神器——追觅扫拖机器人慧目F9评测...

大家好&#xff0c;我是CC。父母都已过了花甲之年&#xff0c;退休在家本应是享清福的时候&#xff0c;为了照顾双上班族的我&#xff0c;平时承担了接儿子放学和给我蹭晚餐的任务&#xff0c;操劳不少。眼看着他们一点点老去&#xff0c;总想也为他们做点什么&#xff0c;所以…

宝藏Git学习资源

Learn Git Branching 这才是真正的Git——Git内部原理揭秘&#xff01;

【转】RabbitMQ六种队列模式-5.主题模式

前言 RabbitMQ六种队列模式-简单队列RabbitMQ六种队列模式-工作队列RabbitMQ六种队列模式-发布订阅RabbitMQ六种队列模式-路由模式RabbitMQ六种队列模式-主题模式 [本文] 从前面的几篇我们依次经历了 exchange 模式从 fanout > direct 的转变过程&#xff0c;在 fanout 时&a…

javase哪部分最难_高中物理哪部分最难?这里有答案和方法!一定要收藏

高中物理最难的部分是什么?对于大多数同学来说&#xff0c;电粒子在电磁场中的运动、动力学分析以及电学实验比较难搞定&#xff0c;看看下面的方法&#xff0c;希望对你有所帮助。从应试而言&#xff0c;应是带电粒子在电磁场中的运动(力&#xff0c;运动轨迹&#xff0c;几何…

矩阵初等变换的计算细节

矩阵初等变换的计算细节&#xff1a; 随便选一行消元&#xff0c;叫做目标行。一般把目标行放到最下面&#xff0c;即运用第一种初等行变换—交换。先用第二种初等行变换&#xff0c;即数乘化简。再用第三种初等行变换&#xff0c;即倍加消&#xff0c;消不掉以后换一行消。 …

【转】分布式websocket服务器

最近在开发一个游戏的客服系统&#xff0c;同一时间咨询问题的玩家多&#xff0c;为了保证服务器高可用&#xff0c;需要利用分布式&#xff0c;另外服务器宕机还需要玩家无感知重连&#xff0c;最关键的一点是如何实现服务器的高扩展性&#xff0c;即性能不足时&#xff0c;如…

【Rational Rose使用笔记】协作图

例题 用例题亲自画一下是最好的。 例题出处&#xff1a;掌握在Rational Rose中绘制协作图&#xff08;交互图&#xff09;的操作方法 其他参考文章&#xff1a; UML–协作图详解UML中创建对象的方法&#xff1a;Rational Rose建立对象图 笔记 以下是总结的一些笔记&#x…

python输入城市名称_python 查询天气(输入城市名,输出天气)

python 查询天气&#xff0c;输入城市名&#xff0c;输出天气def get_whether(city_name):""""""city_code_dict { \北京: 101010100, 上海: 101020100, \天津: 101030100, 重庆: 101040100, \}if len(city_name) 0:print "city name is …

【转】RocketMQ的一些特性(生产者消费者配置参数的含义)

一 nameserver 相对来说&#xff0c;nameserver的稳定性非常高。原因有二&#xff1a; 1 nameserver互相独立&#xff0c;彼此没有通信关系&#xff0c;单台nameserver挂掉&#xff0c;不影响其他nameserver&#xff0c;即使全部挂掉&#xff0c;也不影响业务系统使用&#xff…

【Rational Rose使用笔记】用例图

一 先找例题&#xff1a; 掌握运用Rose工具绘制用例图的基本操作 再学习&#xff1a; UML建模——用例图&#xff08;Use Case Diagram&#xff09; 二 下面是笔记&#xff1a; 了解几种关系&#xff1a;依赖、关联、泛化、包含、扩展。 如何区别依赖、关联&#xff1f; 依…

【转】使用IIS做HTTP和WebSocket服务的反向代理

反向代理对于服务器来说是非常实用的功能&#xff0c;可以将毫不相关的网站部署到同一个域名下&#xff0c;对于使用docker的人来说可以免去大量配置上的麻烦。它还能将HTTP流量转换成HTTPS&#xff0c;多个服务只需要一个证书就能解决。 对于nginx和Apache&#xff0c;网上已…

【OSG】基础知识点合集

三大遍历&#xff1a;更新遍历、拣选遍历、绘制遍历

sqlserver获取当前时间_c#获取并显示当前日期时间

主要使用ToLongDateString、ToShortDateString、ToLongTimeString、ToShortTimeString、DayOfWeek等获取日期、时间、星期等数据。MM 两位数月份mm 两位数分钟HH 两位小时&#xff0c;24小时制hh 两位小时&#xff0c;12小时制string br ""; string a1 D…

【考研计算机】AOE关键路径

介绍 数据结构AOE网 计算题专题&#xff1a;关键路径法(CPM) 需要注意的点&#xff1a; 最重要的点&#xff0c;以下图为例&#xff1a; 一个活动有多个入口&#xff0c;这是表示此活动必须在前置的多个入口都完成时&#xff0c;才能开始。 即&#xff0c;开始条件 入口1 &…