RabbitMQ讲解与整合

RabbitMq安装

类型概念

租户
RabbitMQ 中有一个概念叫做多租户,每一个 RabbitMQ 服务器都能创建出许多虚拟的消息服务器,这些虚拟的消息服务器就是我们所说的虚拟主机(virtual host),一般简称为 vhost。
每一个 vhost 都是一个独立的小型 RabbitMQ 服务器,这个 vhost 中会有自己的消息队列、消息交换机以及相应的绑定关系等等,并且拥有自己独立的权限,不同的 vhost 中的队列和交换机不能互相绑定,这样技能保证运行安全又能避免命名冲突
交换机
在这里插入图片描述

交换机
属性意义意义
type类型direct默认的直接交换机
根据交换机下队列绑定的routingKey直接匹配
fanout扇形交换机
简单来说就是发布订阅
队列直接绑定在交换机下,统一发布消息
headers头部交换机,通过message header头部信息进行比对
可以根据定义全匹配、部分匹配等规则
topic主题交换机
通过绑定routingKey进行模糊匹配
Durability耐用
(持久化)
durable持久化,数据存放于硬盘
transient瞬态,数据存放于内存
Auto delete自动删除Yes没有绑定队列时自动删除,针对的是曾经有过但后来没有的事物
No不自动删除
Internal内部使用Yes该路由绑定的队列不会被用户消费
No不自动删除

队列

在这里插入图片描述

队列
属性意义意义
type类型Default for virtual host租户配置的默认选项,下列三种其一
默认Classic无需设置
Classic传统的队列类型
数据存储在单个节点上
不具备quorum队列的高可用性和数据保护特性
ps:单机时使用
Quorum高可用性队列
数据会被复制到多个节点
提供更好的数据可靠性和持久性
ps:部署多节点时使用
Stream特殊类型的队列
用于支持事件流处理(event streaming)
具有类似于Kafka的流式处理特性
ps:听说不成熟,暂时用不上
Durability耐用
(持久化)
durable持久化,数据存放于硬盘
transient瞬态,数据存放于内存

参数:

显示参数实际参数作用
Auto expirex-expires设置队列的过期时间,单位为毫秒。当队列在指定时间内未被使用,将会被自动删除
Message TTLx-message-ttl设置队列中消息的过期时间(Time-To-Live),单位为毫秒。消息在队列中存放的时间超过设定的过期时间后会被自动删除
Overflow behaviourx-overflow设置队列溢出行为,可选值为 drop-head(删除最旧的消息)或 reject-publish(拒绝发布新消息)
Single active consumerx-single-active-consumer配置队列是否只允许单个消费者消费消息。当设置了x-single-active-consumer参数时,表示队列只允许有一个消费者活跃地消费消息,其他消费者将被阻塞,直到当前的消费者停止消费或断开连接
Dead letter exchangex-dead-letter-exchange设置队列中的死信消息转发到的交换机名称。当消息成为死信时,将会被转发到指定的交换机
Dead letter routing keyx-dead-letter-routing-key设置死信消息转发时的路由键。死信消息将通过指定的路由键转发到目标交换机
Max lengthx-max-length设置队列的最大长度,即队列中消息的最大数量。当队列中消息数量达到设定的最大长度后,新消息将无法入队
Max length bytesx-max-length-bytes设置队列消息的最大总字节数。当队列中消息的总字节数达到设定的最大值后,新消息将无法入队
Leader locatorx-queue-leader-locator配置队列的领导者(Leader)定位器,集群中使用

SpringBoot整合

引入依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.6.3</version>
</dependency>

配置数据源

spring: rabbitmq:addresses: xxx.xxx.xx.xx:5672username: adminpassword: xxxxxxvirtual-host: /

配置交换机和队列

@Component
public class RabbitMqConfig {// 定义交换机名称public static final String FANOUT_EXCHANGE = "fanout.test";@Bean(name = FANOUT_EXCHANGE)public FanoutExchange fanoutExchange() {// 交换机类型按需创建,这里用的是Fanout,发布订阅,绑定在该交换机下的队列都会收到消息// 参数2:是否持久化// 参数3:是否自动删除return new FanoutExchange(FANOUT_EXCHANGE, true, false);}//  定义队列public static final String FANOUT_QUEUE1 = "queue1";@Bean(name = FANOUT_QUEUE1)public Queue fanoutQueue1() {// 后三个不写也行,这是默认值// 参数2:是否持久化数据到磁盘(防止意外关闭数据丢失)// 参数3:是否具有排他性// 参数4:队列不再使用时是否自动删除return new Queue(FANOUT_QUEUE1, true, false, false);}public static final String FANOUT_QUEUE2 = "queue2";@Bean(name = FANOUT_QUEUE2)public Queue fanoutQueue2() {return new Queue(FANOUT_QUEUE2, true, false, false);}@Beanpublic Binding bindingSimpleQueue1(@Qualifier(FANOUT_QUEUE1) Queue fanoutQueue1,@Qualifier(FANOUT_EXCHANGE) FanoutExchange fanoutExchange) {// 将交换机和队列绑定return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}@Beanpublic Binding bindingSimpleQueue2(@Qualifier(FANOUT_QUEUE2) Queue fanoutQueue2,@Qualifier(FANOUT_EXCHANGE) FanoutExchange fanoutExchange) {// 将交换机和队列绑定return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}

测试发一条消息到队列

@SpringBootTest(classes = TemplateApplication.class)
public class RabbitMQTest {@AutowiredRabbitMessagingTemplate rabbitMessagingTemplate;@Testpublic void testSent(){//指定交换机->指定队列(因为创建的交换机是FanoutExchange,所以绑定该交换机的队列都会收到一条消息)rabbitMessagingTemplate.convertAndSend("fanout.test","发送数据到FanoutExchange");// 如果创建队列不绑定交换机和路由键,那么实际上会有默认的交换机和路由键,均为空,直接将消息发送给队列,队列名则和路由键保持一致,仍然可以成功发送消息。}
}

测试接收队列消息

写个监听类接收消息:

@Component
public class RabbitMqListenter {@RabbitListener(queues = {RabbitMqConfig.FANOUT_QUEUE1,RabbitMqConfig.FANOUT_QUEUE2})public void reciveLogAll(String msg) throws Exception {System.out.println("消费到数据:" + msg);}
}

-------------基础的使用到这里就结束了-------------

拓展事项

rabbitMqPusher

自己封装一个更加方便使用的发送工具,可有可无,其中可以使用RabbitMessagingTemplate和RabbitTemplate,RabbitMessagingTemplate和RabbitTemplate都是Spring AMQP提供的用于与RabbitMQ进行交互的工具类如果只是简单使用,那么RabbitMessagingTemplate就够用了,如果需要更精细的控制,可以选择使用RabbitTemplate

,但它们在使用方式和功能上有一些不同点:

RabbitMessagingTemplate:

RabbitMessagingTemplate是MessagingTemplate的子类,用于在Spring应用程序中发送和接收消息。
它提供了一种更高级别的抽象,使得在Spring框架中更容易使用消息发送和接收的功能。
可以直接与Spring的消息通道(MessageChannel)集成,方便进行消息的发送和接收。

RabbitTemplate:

RabbitTemplate是Spring AMQP提供的用于与RabbitMQ进行交互的核心类,提供了丰富的方法来发送和接收消息。
它是一个强大而灵活的工具,可以直接与RabbitMQ的交互进行细粒度的控制。
可以设置消息的属性、监听发送确认、接收确认等功能,更加灵活地处理消息发送和接收的细节。

package com.template.rabbitmq.producer.impl;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class RabbitMqPusher {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 发送消息* @param quene   队列名称 或 交换机名称* @param message 消息内容*/public void send(String quene, String message) {rabbitTemplate.send(quene, MessageBuilder.withBody(message.getBytes()).build());log.info("发送消息---> quene:{} ---> message:{}", message, quene);}/*** 直接发送消息到队列* 超过有效期丢弃,如果队列没有声明x-message-ttl属性则无效** @param quene      队列名称* @param message    消息内容* @param expiration 有效期(毫秒)*/public void send(String quene, String message, Integer expiration) {rabbitTemplate.send(quene, MessageBuilder.withBody(message.getBytes()).setExpiration(String.valueOf(expiration)).build());log.info("发送消息---> quene:{} ---> message:{} ---> expiration:{}", quene, message, expiration);}/*** 发送消息* 超过有效期丢弃,如果队列没有声明x-message-ttl属性则无效** @param exchange   交换机名称* @param routingKey 路由键* @param message    消息内容* @param expiration 有效期(毫秒)*/public void send(String exchange, String routingKey, String message, Integer expiration) {rabbitTemplate.send(exchange, routingKey, MessageBuilder.withBody(message.getBytes()).setExpiration(String.valueOf(expiration)).build());log.info("发送消息---> exchange:{} ---> routingKey:{} ---> message:{} ---> expiration:{}", exchange, routingKey, message, expiration);}/*** 发送消息** @param exchange   交换机名称* @param routingKey 路由键* @param message    消息内容*/public void send(String exchange, String routingKey, String message) {rabbitTemplate.send(exchange, routingKey, MessageBuilder.withBody(message.getBytes()).build());log.info("发送消息---> exchange:{} ---> routingKey:{} ---> message:{}", exchange, routingKey, message);}}

在RabbitMQ中,如果队列没有设置过期时间(即没有声明x-message-ttl属性),那么即使在发送消息时设置了消息的过期时间也会失效。消息的过期时间只有在队列设置了过期时间的情况下才会生效。
实测以上列代码的方式直接对消息设置有效期是生效的。

死信队列

和普通队列一样,只不过是对其他队列进行配置,将过期的消息路由到死信队列中。
创建死信交换机和死信路由

	// 配置交换机的文件中继续增加配置public static final String DIRECT_GP_DEAD_LETTER_EXCHANGE = "DIRECT_GP_DEAD_LETTER_EXCHANGE";public static final String DIRECT_GP_DEAD_LETTER_QUEUE = "DIRECT_GP_DEAD_LETTER_QUEUE";@Bean(DIRECT_GP_DEAD_LETTER_EXCHANGE)public DirectExchangedirectDeadLetterExchange() {return new DirectExchange(DIRECT_GP_DEAD_LETTER_EXCHANGE, true, false, new HashMap<>());}@Bean(DIRECT_GP_DEAD_LETTER_QUEUE)public Queue directDeadLetterQueue() {return new Queue(DIRECT_GP_DEAD_LETTER_QUEUE, true, false, false, new HashMap<>());}

设置队列消息有效期并绑定死信队列

	@Bean(name = DIRECT_QUEUE1)public Queue directQueue1() {HashMap<String, Object> headers = new HashMap<>();// 配置消息有效期,消息发送到队列10秒后如果未被消费者消费,则过期headers.put("x-message-ttl",10000);// 配置超期交换机,消息过期后会发送到此交换机headers.put("x-dead-letter-exchange",DIRECT_GP_DEAD_LETTER_EXCHANGE);// 配置超期routingKey,消息过期后转移消息时指定的routingKeyheaders.put("x-dead-letter-routing-key",DIRECT_GP_DEAD_LETTER_QUEUE);// 如果只配置了有效期,未配置交换机和routingKey,则消息会被直接丢弃return new Queue(DIRECT_QUEUE1, true, false, false,headers);}

配置完成后,尝试向DIRECT_QUEUE1发送一条消息,不启动消费者,10秒后消息会自动转移到死信队列中,可在可视化管理界面进行验证。

延时队列
延时队列场景举例:

预定一个会议室,两个小时后开始,要求提前十分钟通知参会人员进行开会。
如果不使用延时队列,那么就需要不断轮询,查看是否到达需要通知的时间,进行消息通知。

延时队列的实现方式:

死信队列+消息有效期
预定时间到提前十分钟通知中间有110分钟,那么创建一条通知消息,设置有效期110分钟丢入队列,不用消费者去监听,等待消息过期后路由到指定的死信队列,再去消费死信队列中的消息即可。
所以延时队列实际上是一种实现方案,而不是一种特定的队列类型。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/708058.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

NLP-词向量、Word2vec

Word2vec Skip-gram算法的核心部分 我们做什么来计算一个词在中心词的上下文中出现的概率&#xff1f; 似然函数 词已知&#xff0c;它的上下文单词的概率 相乘。 然后所有中心词的这个相乘数 再全部相乘&#xff0c;希望得到最大。 目标函数&#xff08;代价函数&#xff0…

如何用CDH+Apache DolphinScheduler开启Kerberos

搭建环境 多台linux主机搭建集群CDH 6.3.2 (Parcel)版本Apache DolphinScheduler1.3.2版本&#xff0c;本流程在CDH已搭建完成并可正常使用后&#xff0c;开启kerberos功能&#xff0c;Apache DolphinScheduler用于大数据任务管理与执行&#xff0c;是很不错的任务调度平台&am…

ZYNQ--MIG核配置

文章目录 MIG核配置界面多通道AXI读写DDR3MIG核配置界面 Clock Period: DDR3 芯片运行时钟周期,这个参数的范围和 FPGA 的芯片类型以及具体类型的速度等级有关。本实验选择 1250ps,对应 800M,这是本次实验所采用芯片可选的最大频率。注意这个时钟是 MIG IP 核产生,并输出给…

压缩视频大小的软件有哪些?5款软件推荐

压缩视频大小的软件有哪些&#xff1f;随着高清摄像设备的普及和网络速度的不断提升&#xff0c;视频文件变得越来越庞大&#xff0c;动辄数百兆甚至数GB的大小常常让用户在分享和存储时感到头疼。幸运的是&#xff0c;市面上有许多优秀的视频压缩软件可以帮助我们轻松应对这一…

NFS服务器挂载失败问题

问题 mount.nfs: requested NFS version or transport protocol is not supported背景&#xff1a;现在做嵌入式开发&#xff0c;需要在板端挂载服务器&#xff0c;读取服务器文件。挂载中遇到该问题。 挂载命令长这样 mount -t nfs -o nolock (XXX.IP):/mnt/disk1/zixi01.ch…

vue实现水印功能

目录 一、应用场景 二、实现原理 三、详细开发 1.水印的实现方式 2.防止用户通过控制台修改样式去除水印效果&#xff08;可跳过&#xff0c;有弊端&#xff09; 3.水印的使用 &#xff08;1&#xff09;单页面/全局使用 &#xff08;2&#xff09;全局使用个别页面去掉…

绘制窗口及窗口位置变化

为了方便窗口的移动 &#xff0c;及相交窗口关闭之后被遮挡窗口的重绘&#xff0c;因此给每个窗口建立一个内存BUF&#xff0c;等到不涉及内容变更的重绘&#xff0c;只需要将该BUF复制到显存之中。 然而&#xff0c;重绘时存在一个被遮挡时如何操作的问题。比如下图中依次为从…

【QT+JS】QT和JS 中的正则表达式 、QT跑JS语言

【QTJS】QT和JS 中的正则表达式 、QT跑JS语言 前言正则表达式QT 中的使用QRegExp自带的cap方法怎么用&#xff1f;QRegExp的非贪婪模式与贪婪模式 JS 中的使用 QT 跑JS 语言 前言 在看大佬的系统代码时候&#xff0c;对其中灵活用到的正则表达式和QT 跑JS 语言部分感觉很陌生&…

iOS App冷启动优化:二进制重排

原理 二进制文件中方法的加载顺序&#xff0c; 取决于方法在代码文件中的书写顺序&#xff0c;而不是调用顺序。 应用程序启动时会调用到的方法是有限的&#xff0c;但可能分散在很多个。 由于内存是分页管理的&#xff0c;要加载就要 整页加载。 这就导致很多完全还用不到的方…

网站添加pwa操作和配置manifest.json后,没有效果排查问题

pwa技术官网&#xff1a;https://web.dev/learn/pwa 应用清单manifest.json文件字段说明&#xff1a;https://web.dev/articles/add-manifest?hlzh-cn Web App Manifest&#xff1a;Web App Manifest | MDN 当网站添加了manifest.json文件后&#xff0c;也引入到html中了&a…

FPGA-FIF0模型与应用场景(IP核)

什么是FIFO FIFO (First In First Out) ,也就是先进先出。FPGA或者ASIC中使用到的FIFO一般指的是对数据的存储具有先进先出特性的一个缓存器,常被用于数据的缓存或者高速异步数据的交互。它与普通存储器的区别是没有外部读写地址线,这样使用起来相对简单,但缺点就是只能顺序写…

python脚本实现全景站点欧拉角转矩阵

效果 脚本 import numpy as np import math import csv import os from settings import *def euler_to_rotation_matrix(roll, pitch, yaw):# 计算旋转矩阵# Z-Y-X转换顺序Rz

java多线程编程(学习笔记)入门

一、多线程创建的三种方式 (1)通过继承Thread本身 (2)通过实现runnable接口 (3)通过 Callable 和 Future 创建线程 其中&#xff0c;前两种不能获取到编程的结果&#xff0c;第三种能获取到结果 二、常见的成员方法 方法名称说明String getName()返回此线程的名称void setNam…

Docker之数据卷自定义镜像

文章目录 前言一、数据卷二、自定义镜像 前言 Docker提供了一个持久化存储数据的机制&#xff0c;与容器生命周期分离&#xff0c;从而带来一系列好处&#xff1a; 总的来说Docker 数据卷提供了一种灵活、持久、可共享的存储机制&#xff0c;使得容器化应用在数据管理方面更加…

Git 指令深入浅出【3】—— 远程仓库

Git 指令深入浅出【3】—— 远程仓库 一、远程仓库&#xff08;一&#xff09;基本指令1. 配置 SSH 密钥2. 推送远程仓库其他分支推送远程仓库方法1方法2建立分支链接 方法3 3. 合并分支请求 &#xff08;二&#xff09;.gitignore 忽略文件&#xff08;三&#xff09;标签管理…

MVCC【重点】

参考链接 [1] https://www.bilibili.com/video/BV1YD4y1J7Qq/?spm_id_from333.1007.top_right_bar_window_history.content.click&vd_source0cb0c5881f5c7d76e7580fbd2f551074 [2]https://www.cnblogs.com/jelly12345/p/14889331.html [3]https://xiaolincoding.com/mysql…

基于频率增强的数据增广的视觉语言导航方法(VLN论文阅读)

基于频率增强的数据增广的视觉语言导航方法&#xff08;VLN论文阅读&#xff09; 本文提出的方法很简单&#xff0c;将原始图像增加其他随机图像的高频信息&#xff0c;得到增强的图像作为新的样本&#xff0c;与原始的样本交替训练。背后的动机是&#xff0c;vln模型对高频信息…

TV-SAM 新型零样本医学图像分割算法:GPT-4语言处理 + GLIP视觉理解 + SAM分割技术

TV-SAM 新型零样本医学图像分割算法&#xff1a;GPT-4语言处理 GLIP视觉理解 SAM分割技术 提出背景TV-SAM 方法论 提出背景 论文&#xff1a;https://arxiv.org/ftp/arxiv/papers/2402/2402.15759.pdf 代码&#xff1a;https://github.com/JZK00/TV-SAM 利用了GPT-4的强大语…

TCP/IP-常用网络协议自定义结构体

1、TCP/IP模型&#xff1a; 2、TCP/IP- 各层级网络协议&#xff08;从下往上&#xff09;&#xff1a; 1&#xff09;数据链路层&#xff1a; ARP: 地址解析协议&#xff0c;用IP地址获取MAC地址的协议&#xff0c;通过ip的地址获取mac地 …

【最新】如何将idea上的项目推送到gitee

1.打开Gitee&#xff0c;在首页&#xff0c;点击“”&#xff0c;创建一个仓库 2.填写仓库基本信息 3.下拉&#xff0c;点击“创建”&#xff0c;出现下方页面&#xff0c;证明仓库创建成功。 4.打开idea&#xff0c;下载gitee的插件&#xff08;此处默认已经下载git&#xff0…