RabbitMQ(消息队列)

RabbitMQ

它是消息中间件,是在消息的传输过程中保存消息的容器,实现应用程序和应用程序之间通信的中间产品。目前主流消息队列通讯协议是AMQP(二进制传输,支持多种语言)、JMS(HTTP传输,只支持Java)。

特点:每秒十万左右级别、消息延迟在微秒级、完整的消息确认机制、并发能力强、性能好。

常见MQ

  • ActiveMQ基于JMS,每秒数万级别
  • RabbitMQ基于AMQP,每秒十万级别
  • RocketMQ是阿里的产品,基于JMS,每秒十万级别,经历过双十一
  • Kafka自定义协议,每秒百万级别

体系结构

分为:服务器、交换器、队列;

服务器:负责管理所有的交换器和队列,一个RabbitMQ内有多个服务器,(为了避免每次发送消息都建立TCP连接,有了很多的服务器,每个线程建立单独的服务器进行通讯)每个服务器负责一部分交换器和队列,之间通过 HTTP 协议通信;

交换机:负责接收、路由、传递消息,支持多种交换器类型,每个类型有不同的消息传递方式和使用场景;

队列:负责存储消息,支持多种队列,都有不同的存储方式;

安装:

使用docker方式

# 拉取镜像
docker pull rabbitmq:3.8.12-management
# 注意:在拉取镜像,遇到missing signature key问题,需要提升docker的版本# 运行容器
# -d 参数:后台运行 Docker 容器
# -e 参数:设置容器内的环境变量,这里我们设置了登录RabbitMQ管理后台的默认用户和密码
docker run --name rabbitmq -d -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=guest -e RABBITMQ_DEFAULT_PASS=123456 rabbitmq:3.8.12-management# 启动成功,MQ的客户端页面,输入所设置的用户名和密码
# 访问:http://xxx:15672# 如果访问不通,需要开放端口
firewall-cmd --zone=plublic -add-pord=5672/tcp --add-pord=15672/tcp --permanent
success
firewall-cmd --reload
success

发送消息

需要引包

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.20.0</version>
</dependency>

生产者

// 生产者 - 产生消息
public static void main(String[] args) throws Exception {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("IP地址");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("guest");connectionFactory.setPassword("123456");// 创建连接Connection connection = connectionFactory.newConnection();// 创建频道Channel channel = connection.createChannel();// 创建队列,参数:队列名称、是否定义持久化队列、是否独占本次连接、是否在不使用的时候自动删除队列、其他参数channel.queueDeclare("new_queue", true, false, false, null);String message = "发送的消息的内容:123";// 参数:交换机名称,默认Default Exchange、队列名称、配置信息、消息内容channel.basicPublish("", "new_queue", null, message.getBytes());// 关闭资源  channel.close();connection.close();
}

创建一个队列,并有消息待查看,点击该队列的名称,在Get messages处,可以查看该消息的信息;

查看队列:

在这里插入图片描述
在这里插入图片描述
消费者

// 消费者 - 要消费消息
public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("IP地址");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("guest");factory.setPassword("123456");// 创建连接 Connection        Connection connection = factory.newConnection();// 创建频道  Channel channel = connection.createChannel();// 接收消息  DefaultConsumer consumer = new DefaultConsumer(channel) {// 回调方法,当收到消息后,会自动执行该方法  // 参数:标识、获取一些信息,交换机等、配置信息、数据(消息内容)@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumerTag:" + consumerTag);System.out.println("Exchange:" + envelope.getExchange());System.out.println("RoutingKey:" + envelope.getRoutingKey());System.out.println("properties:" + properties);System.out.println("body:" + new String(body));}};// 监听程序,用来监听消息,参数:队列、是否自动确认、回调对象  channel.basicConsume("new_queue", true, consumer);
}

如果有被消费者消费,会在管理页中,该队列的 Ready进行-1;

工作模式

简单模式

生产者(只有一个)、消费者(只有一个)、消息储存在队列中;

工作队列模式

生产者(只有一个)、消费者(有多个)、消息储存在队列中;消费者谁抢到算谁的。

发布订阅模式

生产者(只有一个)、消费者(有多个)、交换机、多个队列;

生产者把消息发送给交换机,交换机处理消息取决于交换机的类型,交换机根据类型把消费存在对应的队列中,消费者(多个)满足规则都可以得到消息;

交换机有3种类型

➢ Fanout:广播,将消息交给所有绑定到交换机的队列

➢ Direct:定向,把消息交给符合指定routing key 的队列

➢ Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

路由模式

队列与交换机的绑定,使用Direct,消费者监听的队列,该队列与交换机绑定的路由件匹配,该消费者可以收到消息。其他消费者也监听该队列,但路由件不匹配,不会收到消息。

查看交换机

在这里插入图片描述

交换机与队列的绑定

在这里插入图片描述
在这里插入图片描述

主题模式(通配符匹配)

这个与路由模式类似,只是这个支持通配符绑定,# 匹配一个或多个词,* 匹配不多不少恰好1个词。

创建

创建交换机

在这里插入图片描述

创建队列

在这里插入图片描述

队列与交换机绑定

在这里插入图片描述

整合SpringBoot

引包

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

更改配置文件

spring: rabbitmq: host: IP地址port: 5672 username: guest password: 123456 virtual-host: /# 来保证消息的可靠性publisher-confirm-type: CORRELATED # 交换机的确认publisher-returns: true # 队列的确认

代码

// 生产者
@Autowired  
private RabbitTemplate rabbitTemplate;public void testSend() {  rabbitTemplate.convertAndSend("交换机","路由键","消息内容");  
}  
// 消费者 (durable 是否持久化)
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "队列名字", durable = "true"),exchange = @Exchange(value = "交换机"),key = {"路由键"}
))
public void process(String dateString,Message message,Channel channel) {log.info("消息内容:"+ dateString);
}

配置类

生产者保障消息的可靠性

生产者 - 保障消息是否发送到队列或者交换机

@Component
public class MQAckConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void init() {rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnsCallback(this);}/*** 确认消息是否发送到交换机*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {log.info("消息发送到交换机 - 成功!数据:" + correlationData);} else {log.info("消息发送到交换机 - 失败!数据:" + correlationData + " 原因:" + cause);}}/*** 确认消息是否发送到队列 - 只有发送失败的时候才会调用该方法*/@Overridepublic void returnedMessage(ReturnedMessage returned) {log.info("消息主体: " + new String(returned.getMessage().getBody()));log.info("应答码: " + returned.getReplyCode());log.info("描述:" + returned.getReplyText());log.info("消息使用的交换器 exchange : " + returned.getExchange());log.info("消息使用的路由键 routing : " + returned.getRoutingKey());}}

消费者保障消息的可靠性

消费者 - 保障消息真的收到,改为手动确认ACK

spring:rabbitmq:...listener:simple:acknowledge-mode: manual # 把消息确认模式改为手动确认

消费者手动ACK

@RabbitListener( // 设置绑定关系bindings = @QueueBinding(// 配置队列信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除value = @Queue(value = QUEUE_NAME, durable = "true", autoDelete = "false"),// 配置交换机信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除exchange = @Exchange(value = EXCHANGE_DIRECT, durable = "true", autoDelete = "false"),// 配置路由键信息key = {ROUTING_KEY}
))
public void processMessage2(String dataString, Message message, Channel channel) throws IOException {// 注意: 重置消息,需要考虑幂等性long tag = message.getMessageProperties().getDeliveryTag();try {log.info("消费者 - 接收消息:" + dataString);// System.out.println(10 / 0);  // 手动制造出异常,让消息回到队列中channel.basicAck(tag, false);} catch (Exception e) {// 获取信息,查看此消息是否曾经被投递过Boolean redelivered = message.getMessageProperties().getRedelivered();if (!redelivered) {// 没有被投递过,那就重新放回队列,重新投递,再试一次channel.basicNack(tag, false, true);} else {// 已经被投递过,且这一次仍然进入了 catch 块,那么返回拒绝此消息,不会回到MQ队列中channel.basicReject(tag, false);}}
}

消费者-限流

大量消息进入队列中,消息队列中消息有1万,设置每次最多从队列取回1000个消息,并发能力只能处理1000个请求,消费端-最多只处理1000个请求;

限流配置

spring:rabbitmq:...listener:simple:acknowledge-mode: manual # 把消息确认模式改为手动确认prefetch: 10  # 设置消费者,每次最多从消息队列服务器取回多少消息,并不是一下把队列中的消息全部取走

从队列中Ready,变成所设置的值,从此起到了限流的作用,消息者处理ACK个数也下降。

在这里插入图片描述

消息超时

可在创建队列的时候,设置参数:x-message-ttl = 3000 (毫秒值),当我们生产者发送消息到队列,队列里的消息没有被消费者消费时,可通过队列里的消息超时时间,进行丢弃消息。

@Autowired
private RabbitTemplate rabbitTemplate;MessagePostProcessor processor = (Message message) -> {// 设定超时时间,单位 (毫秒)message.getMessageProperties().setExpiration("7000");return message;
};
rabbitTemplate.convertAndSend("交换机名", "路由键", "生产者发送消息 - 消息超时 - " + processor);

死信

一个消息无法被消费,会变成死信;产生的原因:消费者拒绝:basicNackbasicReject这两个方法不把消息重新放回队列、队列溢出:队列里的消息达到数量的限制、消息超时:超时时间未被消费;

解决:丢弃(一般不重要数据)、入库(记录日志、后续处理)、监听(进入死信队列,监听死信队列进行处理)

前提准备:

创建死信交换机、死信队列、死信路由键,互相绑定;

创建正常交换机、队列、路由键,互相绑定;
在这里插入图片描述

// 前提把 死信、正常都创建好,绑定好
// 监听正常队列
@RabbitListener(queues = {‘正常队列’})
public void processMessageNormal(String dateString, Message message, Channel channel) throws IOException {// 监听正常队列,但是拒绝消息,进行消息拒绝log.info("【正常】接受到消息:" + dateString);;channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
}
// 监听死信队列
@RabbitListener(queues = {‘死信队列’})
public void processMessageDead(String dataString, Message message, Channel channel) throws IOException {// 监听死信队列log.info("【死信】接受到消息 = " + dataString);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

延迟队列

方案一:

生产者发送消息,到队列中,该队列配置消息超时时间,并没有消费者进行监听该队列(进行监听消费),超时进入死信队列,(监听死信队列)就是延迟队列的一种配置。

方案二:

安装插件,默认消息存放最多2天

地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

下载插件的网址:https://www.rabbitmq.com/community-plugins.html

事务

该事务处理仅仅在java层面,生产者1发送消息,生产者2发送消息,保障其中有一个发送失败,都要失败,需要添加配置类:

@Configuration
@Data
public class RabbitConfig {@Beanpublic RabbitTransactionManager transactionManager(CachingConnectionFactory connectionFactory) {return new RabbitTransactionManager(connectionFactory);}@Beanpublic RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setChannelTransacted(true);return rabbitTemplate;}
}

惰性队列

一般队列创建是默认,并不是惰性队列,惰性队列适用场景:在非常长的队列(百万条消息),生产者的速度超过消费者,消费者处理慢,使用惰性队列;它把消息放在队列中,并不是马上进行持久化操作,是在有空闲时,当队列达到百分之多少时,再进行数据持久化操作。

// x-queue-mode 参数,可在插件安装配置
@Queue(value = 队列名字, durable = "true", autoDelete = "false", arguments = {@Argument(name = "x-queue-mode", value = "lazy")
})

队列消息优先级

创建队列,如果这个值是0,代表优先级无效,设置的值优先级不能超过该值,优先级越高占用内存资源越多。
在这里插入图片描述

生产者配置

@Resource
private RabbitTemplate rabbitTemplate;// 生产者发送消息 1  - 优先级是1
rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, "生产者发送消息 - 演示优先级:1", message -> {message.getMessageProperties().setPriority(1);return message;
});
// 生产者发送消息 2  - 优先级是2
rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, "生产者发送消息 - 演示优先级:2", message -> {message.getMessageProperties().setPriority(2);return message;
});
// 生产者发送消息 3  - 优先级是3
rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, "生产者发送消息 - 演示优先级:3", message -> {message.getMessageProperties().setPriority(3);return message;
});
// 生产者发送消息1、消息2、消息3的顺序(先进先出)
// 但消费者,消费优先消费:消息3、消息2、消息1 (改变了消费的顺序)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/860774.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

要离职了,记录一下个人在用的 Mac 应用

大家好&#xff0c;我是楷鹏。 通用 飞书 说起来不信&#xff0c;第一个推荐的是【飞书】&#xff0c;飞书是目前用过最舒服的项目管理应用了。 单拎出来一个飞书文档&#xff0c;功能和体验远超市面上腾讯文档、石墨文档、语雀等等。 现在飞书还支持个人版&#xff0c;No…

【系统架构设计师】六、信息系统基础知识(定义|分类|企业信息化系统|生命周期|建设原则|开发方法)

目录 一、信息系统的定义 二、信息系统的分类 三、企业使用的信息化系统 四、信息系统的生命周期 五、信息系统建设原则 六、信息系统的开发方法 6.1 结构化方法 6.2 原型法 6.3 构件化开发方法 6.4 面向服务的方法 6.5 面向对象的方法 6.6 敏捷方法 历年真题考情&#x…

还是国产大模型靠谱!这里有一个OpenAI API用户特别搬家计划

近日&#xff0c;一场风波在科技圈引起了广泛的关注。6月25日凌晨&#xff0c;OpenAI向大量开发者发送邮件&#xff0c;通知他们&#xff1a;“您的组织有流量来自来OpenAl目前不支持的地区。从7月9日起&#xff0c;我们将采取额外措施&#xff0c;停止OpenAI不支持的国家和地区…

iOS开发者模式自带弱网测试工具

弱网测试的思路 弱网功能测试&#xff1a;2G/3G/4G、高延时、高丢包 无网状态测试&#xff1a;断网功能测试、本地数据存储 用户体验关注&#xff1a;响应时间、页面呈现、超时文案、超时重连、安全及大流量风险 网络切换测试&#xff1a;WIFI → 4G/3G/2G → 网多状态切换…

①分析胃癌组蛋白脱乙酰酶HDS模型-配对转录组差异

目录 HDS评分构建 ①数据加载 ②评分计算 做样本及评分展示图 ①数据处理 ②进行作图 分析配对的单细胞及转录组胃癌数据的 HDS评分,数据源于gastric-cancer - GitCode①胃癌单细胞和配对转录组揭示胃肿瘤微环境(文献和数据)_代码笔记:处理迄今为止最大的单细胞胃癌数…

贪吃蛇项目GameStart部分:对游戏的初始化

接上一篇文章介绍完需要使用到的WIN32API的相关知识&#xff0c;本篇文章让我们来开始使用他们来创建我们的贪吃蛇欢迎界面以及游戏所需要的地图。 准备工作&#xff1a; 为了后面我们构建贪吃蛇游戏所需要的各项函数便于观察&#xff0c;同时便于我们的函数声明&#xff0c;在…

【源码+文档+调试讲解】企业人才引进服务平台

摘 要 随着信息时代的来临&#xff0c;过去的传统管理方式缺点逐渐暴露&#xff0c;对过去的传统管理方式的缺点进行分析&#xff0c;采取计算机方式构建企业人才引进服务平台。本文通过课题背景、课题目的及意义相关技术&#xff0c;提出了一种企业信息、招聘信息、应聘信息等…

Python-爬虫 下载天涯论坛帖子

为了爬取的高效性&#xff0c;实现的过程中我利用了python的threading模块&#xff0c;下面是threads.py模块&#xff0c;定义了下载解析页面的线程&#xff0c;下载图片的线程以及线程池 import threading import urllib2 import Queue import re thread_lock threading.RL…

300 KVA(240kW、180KVAR)系列负载组

交流 固定式/永久式 电阻式和电抗性 300 KVA&#xff08;240kW、180KVAR&#xff09; 480 伏交流电 60赫兹 这是一款紧凑、多功能的 300 KVA 固定/永久负载组&#xff0c;用于测试备用发电机和地面电源装置。负载组可用于测试在 480V 电压下最大 300KVA 的任何负载。…

可穿戴式手持气象仪

TH-SQ17在快节奏的现代生活中&#xff0c;我们越来越依赖各种智能设备来辅助我们的决策和行动。其中&#xff0c;气象信息的重要性不言而喻&#xff0c;它不仅关系到我们的出行安全&#xff0c;更影响着我们的日常生活安排。如今&#xff0c;一款革命性的产品——可穿戴式手持气…

Qt 实战(6)事件 | 6.1、事件机制

文章目录 一、事件1、基本概念2、事件描述3、事件循环4、事件分发4.1、QApplication::notify()4.2、QObject::event() 5、事件传递6、事件处理器 前言&#xff1a; Qt 框架中的事件机制&#xff08;Event Mechanism&#xff09;是一种核心功能&#xff0c;它允许应用程序以事件…

文华WH7主图多空预警系统指标公式源码

RSV:(CLOSE-LLV(LOW,9))/(HHV(HIGH,9)-LLV(LOW,9))*100;//收盘价与N周期最低值做差&#xff0c;N周期最高值与N周期最低值做差&#xff0c;两差之间做比值定义为RSV K:SMA(RSV,3,1);//RSV的移动平均 D:SMA(K,3,1);//K值的移动平均 DIFF : EMA(CLOSE,12) - EMA(CLOSE,26); D…

springboot集成达梦数据库,打包后,tomcat中启动报错

背景&#xff1a;springboot集成达梦数据库8&#xff0c;在工具idea中正常使用&#xff0c;但是打包后&#xff0c;无法启动&#xff0c;报错 pom引入的依赖 但是这种情况&#xff0c;只有在idea中启动没问题的解决方法 需要修改引入的依赖&#xff0c;再次打包就可以 <d…

考研数学一有多难?130+背后的残酷真相

考研数学一很难 大家平时在网上上看到很多人说自己考了130&#xff0c;其实这些人只占参加考研数学人数的极少部分&#xff0c;有个数据可以展示出来考研数学到底有多难&#xff1a; 在几百万考研大军中&#xff0c;能考到120分以上的考生只有2%。绝大多数人的分数集中在30到…

Lua流媒体服务器支持(MP4视频、桌面直播、摄像头)

本来在做FFMPEG的项目&#xff0c;忽然想到Lua封装FFMPEG与SRS实现一个简易的直播网站何尝不是一个大胆的想法。 示例为初级版本&#xff0c;主要是用来验证可行性和功能性DEMO 演示效果&#xff1a; Lua流媒体直播服务器(支持MP4、桌面直播、摄像头)_哔哩哔哩_bilibili 代码简…

【SSM】医疗健康平台-用户端-体检预约

知识目标 了解FreeMarker&#xff0c;能够简述FreeMarker的作用和生成文件的原理 熟悉FreeMarker的常用指令&#xff0c;能够在FTL标签中正确使用assign指令、include指令、if指令和list指令 掌握显示套餐列表功能的实现 掌握显示套餐详情功能的实现 掌握体检预约功能的实现…

【源码】最新源支付系统源码 V7版全开源 免授权 附搭建教程

最新源支付系统源码_V7版全开源_免授权_附详细搭建教程_站长亲测 YPay是专为个人站长打造的聚合免签系统&#xff0c;拥有卓越的性能和丰富的功能。它采用全新轻量化的界面UI&#xff0c;让您能更方便快捷地解决知识付费和运营赞助的难题。同时&#xff0c;它基于高性能的thin…

el-form重置后input无法输入问题

新增用户遇到的问题&#xff1a; 如果你没有为 formData 设置默认值&#xff0c;而只是将其初始化为空对象 {}&#xff0c;则在打开dialog时&#xff0c;正常输入&#xff0c; formdata会变成如下 但是&#xff0c;打开后&#xff0c;直接使用 resetFields 或直接清空表单&…

宜搭低代码开发高级认证例题1-待办列表

1、进行中待办和已完成待办界面相同 关键代码就是重要度默认为1星 2、新增自定义页面Todolist 2.1主要参数设置-新建远和API getTodoList和getDoneList代码相同 绑定代码&#xff1a;/${window.pageConfig.appType || window.g_config.appKey}/v1/form/searchFormDatas.json …

福州代理记账服务财务专业知识会计助手

福州的代理记服务可探索企业和个体工商户处理财务和会计工作。选择合适的代理记服务不仅可以节省成本&#xff0c;还可以确保财务工作专业、合规。以下是一些关于代理记服务的关键信息和财务信息&#xff0c;供您参考&#xff1a; https://www.9733.cn/news/detail/180.html …