SpringBoot RabbitMQ收发消息、配置及原理

今天分析SpringBoot通过自动配置集成RabbitMQ的原理以及使用。

AMQP概念

RabbitMQ是基于AMQP协议的message broker,所以我们首先要对AMQP做一个简单的了解。

AMQP (Advanced Message Queuing Protocol) is a messaging protocol that enables conforming client applications to communicate with conforming messaging middleware brokers.

AMQP是Advanced Message Queuing Protocol的缩写,是一个消息收发客户端与消息中间件都要遵守的消息通讯协议。

AMQP消息模型如下图:
在这里插入图片描述
消息有发布者发送给Exchange,Exchange通过绑定路由规则路由该消息到消息队列中,之后消息中间件投递该消息给指定的消费者、或者由消费者按需拉取/消费消息。

消息丢失问题:网络原因、或者消费者消费逻辑的原因均可能导致消息发送(接收)失败,AMQP通过消息确认(message acknowledgements)机制来处理消息发送失败:消息送达消费者后,消费者需发送确认消息通知broker。启用消息确认机制后,broker必须在收到确认之后才能从队列中移除消息,确保消息不丢失。

消息无法送达问题:消息无法送达消费者的情况下,该消息会根据参数设置被broker返回给消息发送者、直接丢弃、或者丢入“死信”队列。

Exchange Type

AMQP消息发送给Exchange,Exchange接收到消息后路由给0个或多个队列,路由规则依赖于Exchange Type。

AMQP包含如下Exange Type:

  1. Direct exchange:路由匹配后直接投递
  2. Fanout exchange:消息投递给绑定到当前Exchange的所有队列(不需要匹配)。
  3. Topic exchange:根据订阅主题投递消息。
  4. Headers exchange:路由规则定义在消息头信息中,根据头信息匹配投递。

队列

队列是AMQP中存储消息的地方,队列具有如下属性:

  1. Name:队列名称。每一个队列都有名称,不指定名称则broker会自动生成一个名称。"amq."开头的队列名称是AMQP的保留名称,不允许应用层使用。
  2. Durable (the queue will survive a broker restart):存活周期。队列创建的时候可以被声明为持久队列或临时队列,持久队列的元数据存储在硬盘、而临时队列存储在内存。
  3. Exclusive (used by only one connection and the queue will be deleted when that connection closes):独占队列,连接关闭后队列删除。
  4. Auto-delete (queue that has had at least one consumer is deleted when last consumer unsubscribes):队列必须绑定至少一个消费者,没有消费者绑定到给队列则队列自动删除。
  5. Arguments (optional; used by plugins and broker-specific features such as message TTL, queue length limit, etc):其他消息参数。

队列使用之前必须首先声明(通过声明指定消息队列参数),声明队列等同于创建队列,声明队列时如果队列已经存在,则:声明的参数与已存在队列参数相同,对当前已存在队列没有任何影响,如果声明的参数与已存在队列参数不同,抛出异常。

绑定

绑定指的是Exchange路由消息到队列的规则。队列首先必须绑定到Exchange之后,Exchange才能发送消息到该队列。routing key是绑定的可选属性,特定Exchange(Direct exchange)发送消息到队列时会根据routing key进行匹配。

消费者

消费者通过以下两种方式从队列中消费消息:

  1. Push方式:消息主动投递给消费者(推荐方式)。
  2. Poll方式:消费者轮询获取(性能原因,不推荐)。

((Consumer) Delivery Acknowledgements)投递确认

由于网络原因,或者消费者应用在消息消费过程中发生错误导致消息消费失败------均可导致消息丢失。

为了尽可能避免消息丢失,AMQP提供了两种消息确认机制:

  1. 自动确认模式:broker发送消息之后确认。
  2. 显式确认:消费者反馈确认消息。

显式确认模式下,由消费者决定确认消息的反馈时机:收到消息后立即反馈、或者消息持久化之后反馈、或者消息完全被消费(消费业务处理完成)之后反馈。

如果消费者一直不给broker发送确认信息,broker会重新投递消息给其他消费者、如果当前没有其他消费者注册到当前队列的话,broker会一直保存该消息直到有新的消费者注册上来之后发送。

拒信

消费者可给broker返回拒绝消息指令,以告知broker消费者处理消息失败。根据消费者的指令,broker可以丢弃该消息、或者重新进入队列等待再次消费。进入队列等待再次消费的情况下,消费者需注意避免造成该消息的循环投递。

Prefetching Messages

多个消费者共享同一队列的场景下,broker可以控制每一消费者最多可以并发处理的消息数量(消费者返回消息确认前brokenr最多投递到该消费者的消息数量)。这种机制其实是一种简单的load balance,一个消费者如果长时间不返回消息确认,极有可能说明该消费者的消费能力出现问题。

Connections

与数据库连接类似,AMQP客户端(生产者或者消费者)通过connections连接broker、并通过connection收发消息。

Channels

多数场景下,客户端都需要创建多个连接以便并发处理消息,但是同时创建多个TCP连接在性能和网络管理上都很不划算,因此,AMQP通过channels多路复用TCP连接:一个连接可以包含多个Channels,所有的客户端操作都依赖于Channels执行,每一个Channels都独立运行互不影响。

RabbitMQ

RabbitMQ is the most widely deployed open source message broker.

RabbitMQ是基于AMQP协议的、Erlang开发的被广泛部署的开源消息队列。

有关RabbitMQ的详细特性本文不展开讨论。

RabbitMQ的安装

RabbitMQ基于Erlang,所以安装RabbitMQ之前,你必须已经安装好了Erlang。不同版本的RabbitMQ对Erlang的版本也有不同的要求,详情参见https://www.rabbitmq.com/which-erlang.html。

根据下载安装Erlang/OTP 25.3.2。

之后从RabbitMQ官网(https://www.rabbitmq.com/install-windows.html)下载安装RabbitMQ(rabbitmq-server-3.12.12.exe)。

SpringBoot项目配置RabbitMQ

RabbitMQ is a lightweight, reliable, scalable, and portable message broker based on the AMQP protocol. Spring uses RabbitMQ to communicate through the AMQP protocol.

Spring使用RabbitMQ作为AMQP协议下的消息队列broker,SpringBoot提供了对RabbitMQ的基于autoConfiguration的支持。

pom文件引入RabbitMQ

	<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

yml文件配置

application.yml文件中配置RabbitMQ:

spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestvirtual-host: /#确认消息已发送到交换机(Exchange)publisher-confirm-type: correlated#确认消息已发送到队列(Queue)publisher-returns: true#消费方消息确认listener:simple:#确认方式:手动确认acknowledge-mode: manual# 拒信不重新入队列default-requeue-rejected: falseretry:enabled: true #监听重试是否可用max-attempts: 5 #最大重试次数,默认为3retryTime: 120000 # 重试间隔时间(毫秒)

发送消息

创建一个RabbitMQ消息发送服务并注入到IoC容器中:

@Component
public class RabbitMQService {private final AmqpAdmin amqpAdmin;private final AmqpTemplate amqpTemplate;public RabbitMQService(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {this.amqpAdmin = amqpAdmin;this.amqpTemplate = amqpTemplate;}public void sendMessage(String message){amqpTemplate.convertAndSend(message);}
}

消息接收

使用@RabbitListener注解接收来自于RabbitMQ的消息:

    @RabbitListener(queues = "someQueue")public void processMessage(String content) {log.info("recieve message from rabbitMQ:"+content);}

需要特别注意:

By default, if retries are disabled and the listener throws an exception, the delivery is retried indefinitely. You can modify this behavior in two ways: Set the defaultRequeueRejected property to false so that zero re-deliveries are attempted or throw an AmqpRejectAndDontRequeueException to signal the message should be rejected. The latter is the mechanism used when retries are enabled and the maximum number of delivery attempts is reached.

默认的重试被关闭的情况下,如果消息监听器抛出异常、发送方将会无限重试。我们可以通过如下两种方式避免这种情况:设置defaultRequeueRejected参数为false,因此失败消息不会重新进入队列。或者抛出AmqpRejectAndDontRequeueException异常表明消息应当被丢弃。第二种机制也是重试机制打开后、重试次数达到上限后采用的机制。

RabbitMQ涉及的内容比较多,详情后补。

春节愉快,开工大吉!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/686935.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

(三十九)大数据实战——Prometheus监控平台的部署搭建

前言 Prometheus监控&#xff08;Prometheus Monitoring&#xff09;是一种开源的系统监控和警报工具。它最初由SoundCloud开发并于2012年发布&#xff0c;并在2016年加入了云原生计算基金会&#xff08;CNCF&#xff09;。Prometheus监控旨在收集、存储和查询各种指标数据&am…

GEE使用 Sentinel-1 SAR影像 和 Otsu 方法绘制洪水地图

洪水是世界上最常见、破坏性最大的自然灾害之一,造成了巨大的生命和财产损失。此外,随着气候变化的影响,近年来,洪灾变得更加频繁和不可预测。为了最大限度地减少生命和财产损失,必须迅速发现洪水蔓延的情况,并及时采取必要的干预措施。洪水蔓延探测大多使用光学传感器或…

大模型基础架构入门

大模型架构 Prefix Decoder 和 Causal Decoder 和 Encoder-Decoder 区别 在于 attention mask不同&#xff1a; https://zhuanlan.zhihu.com/p/626310493 为何现在的大模型大部分是Decoder only结构&#xff1f; https://www.zhihu.com/question/588325646/answer/335725261…

JVM-JVM中对象的结构

对象内存布局 对象里的三个区&#xff1a; 对象头&#xff08;Header&#xff09;&#xff1a;Java对象头占8byte。如果是数组则占12byte。因为JVM里数组size需要使用4byte存储。 标记字段MarkWord&#xff1a; 用于存储对象自身的运行时数据&#xff0c;它是synchronized实现轻…

STM32——OLED菜单

文章目录 一.补充二. 二级菜单代码 简介&#xff1a;首先在我的51 I2C里面有OLED详细讲解&#xff0c;本期代码从51OLED基础上移植过来的&#xff0c;可以先看完那篇文章&#xff0c;在看这个&#xff0c;然后按键我是用的定时器扫描不会堵塞程序,可以翻开我的文章有单独的定时…

MIPS指令集处理器设计(支持64条汇编指令)

一、题目背景和意义 二、国内外研究现状 (略) 三、MIPS指令集处理器设计与实现 (一).MIPS指令集功能性梳理 1.MIPS指令集架构 (1).mips基础指令集格式总结 MIPS是&#xff08;Microcomputer without interlocked pipeline stages&#xff09;[10]的缩写&#xff0c;含义是…

C++友元->全局函数做友元、类做友元、成员函数做友元

全局函数做友元代码&#xff1a; #include<iostream> using namespace std; #include<string> //建筑物类 class Building { //告诉编译器 goodGay全局函数 是 Building类的好朋友&#xff0c;可以访问类中的私有内容 friend void goodGay(Building * bu…

第14讲投票帖子详情实现

投票帖子详情实现 后端,根据id查询投票帖子信息&#xff1a; /*** 根据id查询* param id* return*/ GetMapping("/{id}") public R findById(PathVariable(value "id")Integer id){Vote vote voteService.getById(id);WxUserInfo wxUserInfo wxUserInf…

电商+支付双系统项目------设计数据库

这篇文章将详细介绍电商支付双系统项目的数据库设计。数据库在该项目中扮演着至关重要的角色&#xff0c;它负责存储和管理用户信息、商品数据、订单记录以及支付交易等关键数据。通过精心设计和优化数据库结构&#xff0c;可以实现高效的数据存储和检索&#xff0c;确保系统的…

JavaScript中的常见算法

一.排序算法 1.冒泡排序 冒泡排序比较所有相邻的两个项&#xff0c;如果第一个比第二个大&#xff0c;则交换它们。元素项向上移动至 正确的顺序&#xff0c;就好像气泡升至表面一样。 function bubbleSort(arr) {const { length } arrfor (let i 0; i < length - 1; i)…

详解自定义类型:枚举与联合体!

目录 ​编辑 一、枚举类型 1.枚举类型的声明 2.枚举类型的优点 3.枚举类型的使用 二、联合体类型(共用体&#xff09; 1.联合体类型的声明 2.联合体的特点 3.相同成员的结构体和联合体的对比 4.联合体大小的计算 5.用联合体判断大小端 三.完结散花 悟已往之不谏&…

深入浅出了解谷歌「Gemini大模型」发展历程

Google在2023年12月官宣了Gemini模型&#xff0c;随后2024年2月9日才宣布Gemini 1.0 Ultra正式对公众服务&#xff0c;并且开始收费。现在2024年2月14日就宣布了Gemini 1.5 Pro&#xff0c;史诗级多模态最强MoE首破100万极限上下文纪录&#xff01;&#xff01;&#xff01;Gem…

数据分析 — Pandas 数据加载、存储和清洗

目录 一、文件读取1、常见文件读取函数2、read_csv()3、read_table()4、read_excel()5、read_json()6、read_html()7、大文件读取 二、数据保存1、csv2、excel3、json4、html5、MySQL1、连接数据库2、MySQL 存储到本地3、本地存储到 MySQL 三、数据清洗1、处理缺失值1、判断数据…

Aster实现一台电脑当两台使——副屏使用独立win账号

前言&#xff1a;笔者每年回家&#xff0c;都面临着想要和小伙伴一起玩游戏&#xff0c;但小伙伴没有电脑/只有低配电脑的问题。与此同时&#xff0c;笔者自身的电脑是高配置的电脑&#xff0c;因此笔者想到&#xff0c;能否在自己的电脑上运行游戏&#xff0c;在小伙伴的电脑上…

LaTeX中的documentclass命令:指定文档的类型和整体布局

诸神缄默不语-个人CSDN博文目录 documentclass 是 LaTeX 中一个基础且重要的命令&#xff0c;用于定义文档的整体布局和样式。这个命令告诉 LaTeX 编译器文档是属于哪一类的&#xff0c;比如是文章、报告、书籍等&#xff0c;每一类都有其预定义的格式和结构。 文章目录 基本语…

怎么恢复电脑重装前的数据?介绍几种有效的方法

在日常生活和工作中&#xff0c;电脑已成为我们不可或缺的工具。然而&#xff0c;有时候我们会遇到一些突发情况&#xff0c;比如电脑系统崩溃需要重新安装系统。在这个过程中&#xff0c;我们可能会失去一些重要的数据&#xff0c;比如照片、文档、视频等。这些数据可能包含着…

基于springboot车辆充电桩管理系统源码和论文

随着信息化时代的到来&#xff0c;管理系统都趋向于智能化、系统化&#xff0c;车辆充电桩管理系统也不例外&#xff0c;但目前国内仍都使用人工管理&#xff0c;市场规模越来越大&#xff0c;同时信息量也越来越庞大&#xff0c;人工管理显然已无法应对时代的变化&#xff0c;…

马斯克评 OpenAI 视频模型,接地气又一针见血

马斯克评 OpenAI Sora 昨天&#xff0c;OpenAI 发布了首个视频生成模型 Sora。 一位 X&#xff08;前推特&#xff09;用户分享了 Sora 官网所展示的生成视频&#xff1a;一名女子在东京街头漫步。 该用户评论称&#xff1a;"OpenAI 今天宣布了 Sora&#xff0c;它使用混合…

为什么MySQL不建议使用TEXT字段?

当我们深入探讨“为什么MySQL不建议使用TEXT字段&#xff1f;”这一问题时&#xff0c;可以从一下多个方面来详细理解这个问题&#xff1a; 1. 性能问题 性能问题是MySQL不建议使用TEXT字段的一个重要原因。TEXT字段通常以外部存储方式保存&#xff0c;而不是像固定长度或可变…

C# winfrom实例:四路激光测距雷达数据采集和波形图绘制

1.所述产品 产品型号&#xff1a;TFmini Plus 相关资料下载地址&#xff1a;http://www.benewake.com/download 产品名称&#xff1a;TFmini Plus激光雷达模组制造商公司&#xff1a;北醒&#xff08;北京&#xff09;光子科技有限公司 2.产品功能&#xff1a;TFmini Plus是基…