RabbitMQ个人理解与基本使用

目录

一. 作用:

二. RabbitMQ的5中队列模式:

1. 简单模式

2. Work模式

3. 发布/订阅模式

4. 路由模式

5. 主题模式

三. 消息持久化:

消息过期时间

ACK应答 

四. 同步接收和异步接收:

应用场景

五. 基本使用 :

引入依赖库:

配置文件RabbitMQConfig: 

创建消息任务类: 

解析:


一. 作用:

        RabbitMQ主要用于消息队列的实现。

二. RabbitMQ的5中队列模式:

1. 简单模式

一个生产者(发送方)对应一个消费者(接收方)

2. Work模式

一个生产者对应多个消费者,但是只能有一个消费者获得消息(排他)

3. 发布/订阅模式

一个消费者将消息首先发送到fanout交换器,交换器绑定到多个队列,然后与之对应的所有消费者都能接收到消息(不排他)

4. 路由模式

生产者将消息发送到direct交换器,交换器按照关键字(Key),把消息路由到某个队列

5. 主题模式

生产者将消息发送到Topic交换器,交换器按照复杂的规则,把消息路由到某个队列

三. 消息持久化:

        消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何保证消息可靠性的呢?答案就是消息持久化。持久化可以防止在异常情况下丢失数据。除了消息持久化之外,甚至交换器和队列都能持久化。也就是说rabbitmq的消息会被存储在磁盘上,只有当消费收到消息,rabbitmq确认消费者收到消息(Acknowledgments--简称ACK)后才会将消息从队列中删除。  

  • 消息过期时间

        如果消费者一直不接收消息,消息会一直保存在消息队列当中,短期内可能不会有什么影响,但是如果经过长时间的积累后,消息会变得很多很多 ,浪费大量的资源,内存。

        为了应对这种情况,就可以对rabbitmq设置消息的过期时间,在规定时间内消息没有被接收,就会删除掉该消息。

  • ACK应答 

        消费者接收到消息后,为了让RabbitMQ 知道,就需要返回一个ACK应答,告诉RabbitMQ消费者已经收到了消息,如果收到消息后我们需要删除该消息,只需要在ACK应答中加上deliveryTag标志位。

四. 同步接收和异步接收:

        同步接收:指消费者调用方法时,会阻塞来等待消息,直到消息被成功消费或者队列为空。(没有消息等待消息再接着处理)。

        异步接收: 指消费者不会在接收消息时阻塞,而是通过回调函数处理消息。消费者在等待消息的同时不会停下,可以处理其他任务。(当有消息时才来处理消息)。

  • 应用场景

        同步接收 :当消息的处理顺序对业务逻辑非常重要,就使用同步接收,消费者一次只处理一个消息,确保了每条消息的处理顺序。

        异步接收:当处理消息的时间比较长,或者系统的并发量大时,采用异步接收会更好。

RabbitMQ还有一个杀手锏——同时使用异步收发和同步收发。

五. 基本使用 :

引入依赖库:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.9.0</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency> 

配置文件RabbitMQConfig: 

import com.rabbitmq.client.ConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {@Value("${rabbitmq.factoryHost}")private String host;@Beanpublic ConnectionFactory connectionFactory() {ConnectionFactory factory = new ConnectionFactory();factory.setHost(host);factory.setPort(5672);return factory;}
}

 host配置,我将rabbitMQ放在虚拟机上的,所有ip是虚拟机的地址:

创建消息任务类: 

@Slf4j
@Component
public class MessageTask {@Autowiredprivate ConnectionFactory factory;@Autowiredprivate MessageService messageService;/** 同步发送消息* */public void send(String topic, MessageEntity entity) {//向MongoDB保存消息数据,返回消息IDString id = messageService.insertMessage(entity);//向RabbitMQ发送消息try(Connection connection = factory.newConnection();Channel channel = connection.createChannel()){//连接到某个topicchannel.queueDeclare(topic, true, false, false, null);HashMap header = new HashMap();header.put("messageId",id);//创建AMQP协议参与对象,添加附加属性AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().headers(header).build();channel.basicPublish("",topic,properties,entity.getMsg().getBytes());log.debug("消息发送成功");} catch (Exception e){log.error(e.getMessage());throw new EmosException("向MQ发送消息失败");}}/** 异步发送消息* */@Async("AsyncTaskExecutor")public void sendAsync(String topic, MessageEntity entity) {send(topic, entity);}/** 同步接收消息* */public int receive(String topic) {int i = 0;try (//接收消息数据Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 从队列中获取消息,不自动确认channel.queueDeclare(topic, true, false, false, null);//Topic中有多少条数据未知,所以使用死循环接收数据,直到接收不到消息,退出死循环while (true) {//创建响应接收数据,禁止自动发送Ack应答GetResponse response = channel.basicGet(topic, false);if (response != null) {AMQP.BasicProperties properties = response.getProps();Map<String, Object> header = properties.getHeaders(); //获取附加属性对象String messageId = header.get("messageId").toString();byte[] body = response.getBody();//获取消息正文String message = new String(body);log.debug("从RabbitMQ接收的消息:" + message);MessageRefEntity entity = new MessageRefEntity();entity.setMessageId(messageId);entity.setReceiverId(Integer.parseInt(topic));entity.setReadFlag(false);entity.setLastFlag(true);messageService.insertRef(entity); //把消息存储在MongoDB中//数据保存到MongoDB后,才发送Ack应答,让Topic删除这条消息long deliveryTag = response.getEnvelope().getDeliveryTag();channel.basicAck(deliveryTag, false);i++;} else {break; //接收不到消息,则退出死循环}}} catch (Exception e) {log.error("执行异常", e);}return i;}/** 异步接收消息* */@Asyncpublic int receiveAsync(String topic) {return receive(topic);}/** 同步删除消息* */public void deleteQueue(String topic) {try(//接收消息数据Connection connection = factory.newConnection();Channel channel = connection.createChannel()){channel.queueDelete(topic);log.debug("成功删除消息队列:"+topic);} catch (Exception e){log.error("删除消息队列失败:",e);throw new EmosException("删除消息队列失败");}}/** 异步删除消息* */@Asyncpublic void deleteAsync(String topic) {deleteQueue(topic);}
}

解析:

channel.queueDeclare(String queueName, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments);
  • queueName:队列的名称,用于标识消息的存储位置。
  • durable:

        true,表示队列是持久化的。

        false,表示队列是非持久化的。

  • exclusive:

        true:队列仅供当前连接使用,连接断开时队列会自动删除。

        false:队列可供多个连接共享。

  • autoDelete:
    true:当队列不再被任何消费者订阅时,队列会自动删除。
    false:队列即使没有消费者订阅也会一直存在,直到手动删除。

  • arguments:额外的参数,null表示没有额外参数

Map<String, Object> arguments = new HashMap<>();
arguments.put("x-message-ttl", 60000); // 设置队列中消息的过期时间为 60 秒(60000 毫秒)

channel.queueDeclare("myQueue", true, false, false, arguments);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/62808.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

前端怎么预览pdf

1.背景 后台返回了一个在线的pdf地址&#xff0c;需要我这边去做一个pdf的预览&#xff08;需求1&#xff09;&#xff0c;并且支持配置是否可以下载&#xff08;需求2&#xff09;&#xff0c;需要在当前页就能预览&#xff08;需求3&#xff09;。之前我写过一篇预览pdf的文…

Python 参数配置使用 XML 文件的教程:轻松管理你的项目配置

Python 参数配置使用 XML 文件的教程&#xff1a;轻松管理你的项目配置 一句话总结&#xff1a;当配置项存储在外部文件&#xff08;如 XML、JSON&#xff09;时&#xff0c;修改配置无需重新编译和发布代码。通过更新 XML 文件即可调整参数&#xff0c;无需更改源代码&#xf…

解决 MySQL 启动失败与大小写问题,重置数据库

技术文档&#xff1a;解决 MySQL 启动失败与大小写问题&#xff0c;重置数据库 1. 问题背景 在使用 MySQL 时&#xff0c;可能遇到以下问题&#xff1a; MySQL 启动失败&#xff0c;日志显示 “permission denied” 或 “Can’t create directory” 错误。MySQL 在修改配置文…

python webdriver-manager 实现selenium 免下载安装webdriver

python webdriver-manager 实现selenium 免下载安装webdriver selenium在自动化测试中,通常需要使用浏览器驱动来与浏览器进行交互。然而,手动下载、安装、以及管理这些驱动非常麻烦,尤其是当驱动版本频繁更新时。为此,webdriver-manager库提供了一个极简的方案,自动帮我…

滑动窗口算法专题

滑动窗口简介 滑动窗口就是利用单调性&#xff0c;配合同向双指针来优化暴力枚举的一种算法。 该算法主要有四个步骤 1. 先进进窗口 2. 判断条件&#xff0c;后续根据条件来判断是出窗口还是进窗口 3. 出窗口 4.更新结果&#xff0c;更新结果这个步骤是不确定的&#xff0c…

C# 中的Task

文章目录 前言一、Task 的基本概念二、创建 Task使用异步方法使用 Task.Run 方法 三、等待 Task 完成使用 await 关键字使用 Task.Wait 方法 四、处理 Task 的异常使用 try-catch 块使用 Task.Exception 属性 五、Task 的延续使用 ContinueWith 方法使用 await 关键字和异步方法…

【AIGC】如何高效使用ChatGPT挖掘AI最大潜能?26个Prompt提问秘诀帮你提升300%效率的!

还记得第一次使用ChatGPT时&#xff0c;那种既兴奋又困惑的心情吗&#xff1f;我是从一个对AI一知半解的普通用户&#xff0c;逐步成长为现在的“ChatGPT大神”。这一过程并非一蹴而就&#xff0c;而是通过不断的探索和实践&#xff0c;掌握了一系列高效使用的技巧。今天&#…

浩辰CAD教程004:柱梁板

文章目录 柱梁板标准柱角柱构造柱柱齐墙边绘制梁绘制楼板 柱梁板 标准柱 绘制标准柱&#xff1a; ①&#xff1a;点选插入柱子②&#xff1a;沿着一根轴线布置柱子③&#xff1a;指定的矩形区域内的轴线交点插入柱子 替换现有柱子&#xff1a;选择替换之后的柱子形状&#x…

UNIX数据恢复—UNIX系统常见故障问题和数据恢复方案

UNIX系统常见故障表现&#xff1a; 1、存储结构出错&#xff1b; 2、数据删除&#xff1b; 3、文件系统格式化&#xff1b; 4、其他原因数据丢失。 UNIX系统常见故障解决方案&#xff1a; 1、检测UNIX系统故障涉及的设备是否存在硬件故障&#xff0c;如果存在硬件故障&#xf…

桥接模式的理解和实践

桥接模式&#xff08;Bridge Pattern&#xff09;&#xff0c;又称桥梁模式&#xff0c;是一种结构型设计模式。它的核心思想是将抽象部分与实现部分分离&#xff0c;使它们可以独立地进行变化&#xff0c;从而提高系统的灵活性和可扩展性。本文将详细介绍桥接模式的概念、原理…

HTML综合

一.HTML的初始结构 <!DOCTYPE html> <html lang"en"><head><!-- 设置文本字符 --><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><!-- 设置网页…

二维码数据集,使用yolov,voc,coco标注,3044张各种二维码原始图片(未图像增强)

二维码数据集&#xff0c;使用yolov&#xff0c;voc&#xff0c;coco标注&#xff0c;3044张各种二维码原始图片&#xff08;未图像增强&#xff09; 数据集分割 训练组70&#xff05; 2132图片 有效集20&#xff05; 607图片 测试集10&#xff05; 305图…

Python爬虫技术的最新发展

在互联网的海洋中&#xff0c;数据就像是一颗颗珍珠&#xff0c;而爬虫技术就是我们手中的潜水艇。2024年&#xff0c;爬虫技术有了哪些新花样&#xff1f;让我们一起潜入这个话题&#xff0c;看看最新的发展和趋势。 1. 异步爬虫&#xff1a;速度与激情 随着现代Web应用的复…

用豆包MarsCode IDE,从0到1画出精美数据大屏!

豆包MarsCode IDE 是一个云端 AI IDE 平台&#xff0c;通过内置的 AI 编程助手&#xff0c;开箱即用的开发环境&#xff0c;可以帮助开发者更专注于各类项目的开发。 作为一名前端开发工程师&#xff0c;今天想尝试利用豆包MarsCode IDE&#xff0c;选择 Vue Echarts 创建一个…

游戏引擎学习第42天

仓库: https://gitee.com/mrxiao_com/2d_game 简介 目前我们正在研究的内容是如何构建一个基本的游戏引擎。我们将深入了解游戏开发的每一个环节&#xff0c;从最基础的技术实现到高级的游戏编程。 角色移动代码 我们主要讨论的是角色的移动代码。我一直希望能够使用一些基…

Redis是什么?Redis和MongoDB的区别在那里?

Redis介绍 Redis&#xff08;Remote Dictionary Server&#xff09;是一个开源的、基于内存的数据结构存储系统&#xff0c;它可以用作数据库、缓存和消息中间件。以下是关于Redis的详细介绍&#xff1a; 一、数据结构支持 字符串&#xff08;String&#xff09; 这是Redis最…

计算机网络中的三大交换技术详解与实现

目录 计算机网络中的三大交换技术详解与实现1. 计算机网络中的交换技术概述1.1 交换技术的意义1.2 三大交换技术简介 2. 电路交换技术2.1 理论介绍2.2 Python实现及代码详解2.3 案例分析 3. 分组交换技术3.1 理论介绍3.2 Python实现及代码详解3.3 案例分析 4. 报文交换技术4.1 …

[Python] 操作redis使用pipeline保证原子性

1. pipeline介绍 在Python中使用Redis的Pipeline可以使多个Redis命令在一个请求中批量执行&#xff0c;从而提高效率。redis-py库提供了对Redis Pipeline的支持&#xff0c;下面是一个基本的例子&#xff1a; 首先&#xff0c;确保你已安装了redis库&#xff1a; pip instal…

Bug 解决 无法正常登录或获取不到用户信息

目录 1、跨域问题 2、后端代码问题 3、前端代码问题 我相信登录这个功能是很多人做项目时候遇到第一个槛&#xff01; **看起来好像很简单的登录功能&#xff0c;实际上还是有点坑的&#xff0c;比如明明账号密码都填写正确了&#xff0c;**为什么登录后请求接口又说我没登…

论文翻译 | ChunkRAG: Novel LLM-Chunk Filtering Method for RAG Systems

摘要 使用大型语言模型&#xff08;LLM&#xff09;的检索-增强生成&#xff08;RAG&#xff09;系统经常由于检索不相关或松散相关的信息而生成不准确的响应。现有的在文档级别操作的方法无法有效地过滤掉此类内容。我们提出了LLM驱动的块过滤&#xff0c;ChunkRAG&#xff0…