RabbitMQ六种队列模式-工作队列模式

前言

RabbitMQ六种队列模式-简单队列
RabbitMQ六种队列模式-工作队列 [本文]
RabbitMQ六种队列模式-发布订阅
RabbitMQ六种队列模式-路由模式
RabbitMQ六种队列模式-主题模式

上文我们了解了 RabbitMQ 六种队列模式中的简单队列,代码也是非常的简单,比较容易理解。

但是简单队列有个缺点,简单队列是一一对应的关系,即点对点,一个生产者对应一个消费者,按照这个逻辑,如果我们有一些比较耗时的任务,也就意味着需要大量的时间才能处理完毕,显然简单队列模式并不能满足我们的工作需求,我们今天再来看看工作队列。

文章目录

文章目录

      • 前言
      • 文章目录
      • 1. 什么是工作队列
      • 2. 代码部分
        • 2.1 生产者
        • 2.2 消费者
      • 3. 循环分发
        • 3.1 启动生产者
        • 3.2 启动两个消费者
        • 3.3 公平分发
      • 4. 消息持久化
        • 4.1 问题背景
        • 4.2 参数配置
      • 5. 工作队列总结

1. 什么是工作队列

工作队列:用来将耗时的任务分发给多个消费者(工作者)

主要解决问题:处理资源密集型任务,并且还要等他完成。有了工作队列,我们就可以将具体的工作放到后面去做,将工作封装为一个消息,发送到队列中,一个工作进程就可以取出消息并完成工作。如果启动了多个工作进程,那么工作就可以在多个进程间共享。

工作队列也称为公平性队列模式,怎么个说法呢?

循环分发,假如我们拥有两个消费者,默认情况下,RabbitMQ 将按顺序将每条消息发送给下一个消费者,平均而言,每个消费者将获得相同数量的消息,这种分发消息的方式称为轮询。

看代码吧。

2. 代码部分

2.1 生产者

创建50个消息

public class Producer2 {/** 队列名称 */private static final String QUEUE_NAME = "test_queue";public static void main(String[] args) throws IOException, TimeoutException {/** 1.获取连接 */Connection newConnection = MQConnectionUtils.newConnection();/** 2.创建通道 */Channel channel = newConnection.createChannel();/**3.创建队列声明 */channel.queueDeclare(QUEUE_NAME, false, false, false, null);/**保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息 */channel.basicQos(1);for (int i = 1; i <= 50; i++) {String msg = "生产者消息_" + i;System.out.println("生产者发送消息:" + msg);/**4.发送消息 */channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());}channel.close();newConnection.close();}}

2.2 消费者

public class Customer2_1 {/*** 队列名称*/private static final String QUEUE_NAME = "test_queue";public static void main(String[] args) throws IOException, TimeoutException {System.out.println("001");/** 1.获取连接 */Connection newConnection = MQConnectionUtils.newConnection();/** 2.获取通道 */final Channel channel = newConnection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);/** 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息 */channel.basicQos(1);DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)throws IOException {String msgString = new String(body, "UTF-8");System.out.println("消费者获取消息:" + msgString);try {Thread.sleep(1000);} catch (Exception e) {} finally {/** 手动回执消息 */channel.basicAck(envelope.getDeliveryTag(), false);}}};/** 3.监听队列 */channel.basicConsume(QUEUE_NAME, false, defaultConsumer);}}

3. 循环分发

3.1 启动生产者

3.2 启动两个消费者

在生产者中我们发送了50条消息进入队列,而上方消费者启动图里很明显的看到轮询的效果,就是每个消费者会分到相同的队列任务。

3.3 公平分发

由于上方模拟的是非常简单的消息队列的消费,假如有一些非常耗时的任务,某个消费者在缓慢地进行处理,而另一个消费者则空闲,显然是非常消耗资源的。

再举一个例子,一个1年的程序员,跟一个3年的程序员,分配相同的任务量,明显3年的程序员处理起来更加得心应手,很快就无所事事了,但是3年的程序员拿着非常高的薪资!显然3年的程序员应该承担更多的责任,那怎么办呢?

公平分发。

其实发生上述问题的原因是 RabbitMQ 收到消息后就立即分发出去,而没有确认各个工作者未返回确认的消息数量,类似于TCP/UDP中的UDP,面向无连接。

因此我们可以使用 basicQos 方法,并将参数 prefetchCount 设为1,告诉 RabbitMQ 我每次值处理一条消息,你要等我处理完了再分给我下一个。这样 RabbitMQ 就不会轮流分发了,而是寻找空闲的工作者进行分发。

关键性代码:

/** 2.获取通道 */
final Channel channel = newConnection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/** 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息 */
channel.basicQos(1);

4. 消息持久化

4.1 问题背景

上边我们提到的公平分发是由消费者收取消息时确认解决的,但是这里面又会出现被 kill 的情况。

当有多个消费者同时收取消息,且每个消费者在接收消息的同时,还要处理其它的事情,且会消耗很长的时间。在此过程中可能会出现一些意外,比如消息接收到一半的时候,一个消费者死掉了。

这种情况要使用消息接收确认机制,可以执行上次宕机的消费者没有完成的事情。

但是在默认情况下,我们程序创建的消息队列以及存放在队列里面的消息,都是非持久化的。当RabbitMQ死掉了或者重启了,上次创建的队列、消息都不会保存。

怎么办呢?

4.2 参数配置

参数配置一:生产者创建队列声明时,修改第二个参数为 true

/**3.创建队列声明 */
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

参数配置二:生产者发送消息时,修改第三个参数为MessageProperties.PERSISTENT_TEXT_PLAIN

for (int i = 1; i <= 50; i++) {String msg = "生产者消息_" + i;System.out.println("生产者发送消息:" + msg);/**4.发送消息 */channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
}

5. 工作队列总结

1、循环分发:消费者端在信道上打开消息应答机制,并确保能返回接收消息的确认信息,这样可以保证消费者发生故障也不会丢失消息。

2、消息持久化:服务器端和客户端都要指定队列的持久化和消息的持久化,这样可以保证RabbitMQ重启,队列和消息也不会。

3、公平分发:指定消费者接收的消息个数,避免出现消息均匀推送出现的资源不合理利用的问题。

案例代码:https://www.lanzous.com/i5ydu6d

我创建了一个java相关的公众号,用来记录自己的学习之路,感兴趣的小伙伴可以关注一下微信公众号哈:niceyoo

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/414185.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

RabbitMQ六种队列模式-简单队列模式

前言 RabbitMQ六种队列模式-简单队列 [本文] RabbitMQ六种队列模式-工作队列 RabbitMQ六种队列模式-发布订阅 RabbitMQ六种队列模式-路由模式 RabbitMQ六种队列模式-主题模式 在官网的教程中&#xff0c;描述了如上六类工作队列模式&#xff1a; 简单队列模式&#xff1a;最简…

工作290:重置新增的mode

/*4 GY定义 找到该组件下面的add方法*/add() {/* 5GY定义 给当前属性下面的title赋值*/this.title "新增";/*重置新增的mode*/this.mode[]this.show();this.IsShowtrue;this.attrthis.viewMode false;this.approveMode false/* 7GY定义 给当前的form赋值 并进行对…

串口数据字节位的理解

# STM32F4xx系列的MCU的UART数据字节组成如下图 # 发送数据做如下图示&#xff0c;data byte MSB 与 Parity 重合 # 先刨去 Parity bit&#xff0c;分析 LSB ~ MSB 的纯数据&#xff1a; > 假定数据中的 ‘1’ 个数为奇数&#xff0c;偶校验&#xff1a;Parity ‘1’&#…

RabbitMQ消息确认机制

文章目录1. 事务机制2. Confirm模式2.1 生产者2.1.1 普通Confirm模式2.1.2 批量Confirm模式2.1.3 异步Confirm模式2.2 消费者3. 其他消费者如何确保消息一定能够消费成功呢&#xff1f;由于在前面工作队列模式里面我们了解了应答模式&#xff0c;所以我们可以很自信的回答如上题…

RabbitMQ消息幂等性问题

文章目录1. 什么是幂等性&#xff1f;1.1 消息队列的幂等性1.2 模拟重试机制1.2.1 生产者代码1.2.2 消费者代码1.2.3 消费者 application.yml 配置2. 如何保证消息幂等性&#xff0c;不被重复消费&#xff1f;解决方法1. 什么是幂等性&#xff1f; 在编程中一个幂等操作的特点是…

Centos安装JDK(java环境)

王小私下问我 centos 中 jdk 怎么安装呀&#xff0c;所以再次整理了这篇基础环境搭建的文章。 1、创建java目录2、下载上传jdk3、解压jdk4、配置环境变量 1、创建java目录 首先我们创建java的安装目录 cd /usrmkdir javacd java 2、下载上传jdk 我们如上在 usr 目录下创建了 ja…

工作288:根据时间戳处理接口

<template><div class"table-list-page"><div class"query-area"><el-date-pickerv-model"value1"type"daterange"range-separator"至"start-placeholder"开始日期"end-placeholder"结…

Centos7安装MySQL(多图)

文章目录一、在线安装1、替换网易yum源2、清理缓存3、下载rpm文件4、安装MySQL数据库二、本地安装1、上传MySQL安装包2、安装依赖的程序包3、卸载mariadb程序包4、安装MySQL程序包5、修改MySQL目录权限6、初始化MySQL三、启动MySQL1.1、在线安装方式启动MySQL1.2、本地安装方式…

lower_case_table_names=1 启动报错 mysql8.0

本文为采集文章&#xff0c;主博客地址&#xff1a;https://www.cnblogs.com/niceyoo 我们知道在 Linux 环境下默认是区分大小写的&#xff0c;所以我们需要改变这种默认方式&#xff0c;经过网上各种搜索后&#xff0c;基本就是清一色的修改 lower_case_table_names&#xff0…

工作292:修改父子组件传值错误

[Vue warn]: Missing required prop: “title” 在写vue项目中&#xff0c;在子组件中通过props传值的时候&#xff0c;在父组件中没有定义的话就会看到类似的报错&#xff0c; 这个意思是calendar这个组件中通过props传递一个title属性给父组件&#xff0c;并且title属性是必…

MacOS下IDEA设置智能提示不区分大小写

本文只针对&#xff0c;IDEA-2019.2.3版本 目录地址&#xff1a; Edit -> General -> Code Completion -> Match case -> 勾选去掉 截图如下&#xff1a;

博客园文章方块背景格式

有小伙伴问到方格背景的问题&#xff0c;所以写一篇文章记录我的博客园文章背景是如何制作的。 一、辅助网站1. 一键排版2. 代码主题3. 复制二、 图床设置 一、辅助网站 辅助网址&#xff1a;Md2All 作者提供了一篇帮助文章&#xff1a;玩转公众号Markdown 其实大致看完辅助网址…

day02 pycharm 安装

pycharm 是一款现在比较主流的辅助开发软件 不选择虚拟 所以选择Existing现有的 安装后只需打开当前窗口 默认的 不需要大家新的窗口 使用鼠标滚轮来实现放大缩小 使用debug模式测试代码 转载于:https://www.cnblogs.com/zhaohongyu6688/p/8962253.html

eclipse启动项目

今天做的任务不多&#xff0c;没有自己写代码&#xff0c;上午看了些文章&#xff0c;下午我司后台给配了配项目环境&#xff0c;全装C盘了。。以后有我好受的。。 看着后台操作&#xff0c;修改了N多配置&#xff0c;tomcat、redis、zkServer.、Nginx&#xff0c;navcat、ecli…

如何写一份优秀的java程序员简历

背景&#xff1a;进入第一家公司已经工作将近两年了&#xff0c;其中闲了一年&#xff0c;在准备自己的简历的时候&#xff0c;有种江郎才尽的感觉&#xff0c;不知道怎么写&#xff0c;看来平时还是要多积累多熟悉。 PS&#xff1a;这里面的分享看完还是很受用的。 简历看得比…

macos -bash: yarn: command not found/-bash: cnpm: command not found

博客主要更新地址&#xff1a;?https://www.cnblogs.com/niceyoo -bash: cnpm: command not found -bash: yarn: command not found -bash: xxxx: command not found 如上yarn/cnpm皆通用&#xff0c;前提是安装成功后报这个错误哈&#xff01; Error: EACCES: permission den…

部署项目到jetty

一、打包项目 1、在pom.xml中添加以下依赖 <dependency><groupId>org.mortbay.jetty</groupId><artifactId>jetty-plus</artifactId><version>7.0.0.pre5</version><scope>provided</scope> </dependency> <de…

maven jar包冲突的发现与解决[工具篇]

本文是我的第177篇文章。 关于jar冲突排查解决的问题&#xff0c;相信很多小伙伴也都知道有一些&#xff0c;无非就是两类&#xff1a;命令 or 工具。 命令方式比如&#xff1a; mvn dependency:tree 工具方式比如&#xff1a; Maven Helper 而今天的主角就是 Maven Helper 了。…

@Path注解

最近用到的一个项目&#xff0c;看到Controller控制层、Method方法都是通篇的Path注解&#xff0c;由于之前并没有使用过该注解&#xff0c;故记此篇。 首先看一下项目中的使用方式&#xff1a; Path("clientWeb")public class ClientWeb { POST Path("/g…

iOS的SVN

1、cornerstone2、smart svn mac &#xff08;比较好用&#xff09;3、还xcode自带的。转载于:https://www.cnblogs.com/YangBinChina/p/8971148.html