消息队列篇--通信协议篇--AMOP(交换机,队列绑定,消息确认,AMOP实现实例,AMOP报文,帧,AMOP消息传递模式等)

AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是一种开放的、跨平台的消息传递协议,旨在提供一种标准化的方式在不同的消息代理和客户端之间进行消息传递。AMQP不仅定义了消息格式和路由机制,还规定了如何建立连接、发送和接收消息等操作。它适用于多种编程语言和平台,并且支持复杂的路由机制。

特点:

  • 支持复杂的路由规则和消息传递模式。
  • 提供多种消息确认机制(如ACK)。
  • 支持消息持久化和高可用性。
  • 支持多语言客户端库。
  • 适用于企业级应用中的异步通信。

1、AMQP的核心概念

(1)、消息(Message)

消息是AMQP中传输的基本单位,通常包含两个部分:

  • 消息头(Header):包含一些元数据,如消息ID、优先级、时间戳等。
  • 消息体(Body):实际要传递的数据内容,可以是文本、字节流等形式。

(2)、交换机(Exchange)

交换机(Exchange)是AMQP中的一个关键组件,是消息进入消息队列的入口点,负责接收消息并将它们路由到一个或多个队列中。交换机根据消息的路由键(Routing Key)和绑定规则(Binding Rules)决定将消息发送到哪些队列。

常见的交换机类型包括:

  • Direct Exchange:根据精确匹配的路由键将消息发送到相应的队列。
  • Fanout Exchange:将消息广播到所有绑定的队列,不考虑路由键。
  • Topic Exchange:根据模式匹配的路由键将消息发送到相应的队列。
  • Headers Exchange:根据消息头中的属性进行路由。

(3)、队列(Queue)

队列是存储消息的地方,消费者从队列中获取消息并进行处理。每个队列都有一个唯一的名称,并且可以有多个消费者同时监听同一个队列。

(4)、绑定(Binding)

绑定是指将交换机与队列关联起来的过程。它决定了哪些消息应该被路由到哪些队列。绑定时需要指定一个路由键(Routing Key),用于确定哪些消息应该被发送到该队列。

(5)、连接(Connection)

连接是客户端与消息代理(Broker)之间的物理网络连接。通常使用TCP协议建立连接。

(6)、通道(Channel)

通道是客户端与消息代理之间的通信路径。通过通道,客户端可以在同一连接上执行多个操作(如发送和接收消息)。通道的好处是可以复用连接,减少资源开销。

(7)、确认(Acknowledgment)

确认机制确保消息已被成功处理。当消费者接收到消息后,可以选择手动或自动发送确认回执给消息代理。如果消息未被确认,消息代理会认为消息未被处理,并可能重新投递该消息。

(8)、生产者(Producer)

生产者是发送消息的应用程序或服务。它们将消息发送到交换器。

(9)、消费者(Consumer)

消费者是从队列中接收消息的应用程序或服务。它们负责处理消息内容。

2、AMQP的工作流程

原理示意图:
在这里插入图片描述

AMQP的典型工作流程包括以下几个步骤:
(1)、建立连接:客户端与消息代理(如:RabbitMQ)建立TCP连接。
(2)、创建通道:在连接上创建一个或多个通道,用于执行消息传递操作。
(3)、声明交换机和队列:声明交换机和队列,并设置相关的属性(如持久性、自动删除等)。
(4)、绑定交换机和队列:将交换机与队列绑定,并指定路由键。
(5)、发送消息:生产者通过交换机发送消息到队列。
(6)、接收消息:消费者从队列中接收消息并进行处理。
(7)、确认消息:消费者发送确认回执给消息代理,表示消息已被成功处理。

3、AMQP的优势

(1)、跨平台兼容性
AMQP是一种开放标准,允许不同的消息代理实现相互兼容。这意味着你可以选择最适合你需求的消息代理,而不必担心供应商锁定问题。
(2)、可靠性
AMQP提供了多种机制来确保消息传递的可靠性,包括持久化消息、事务支持和确认机制。
(3)、灵活性
AMQP支持多种交换器类型和绑定规则,使得它可以灵活应对各种复杂的路由需求。
(4)、安全性
AMQP支持SSL/TLS加密,确保消息在网络传输过程中的安全性。

4、常见的AMQP实现

(1)、RabbitMQ
RabbitMQ是最流行的AMQP实现之一,提供了丰富的功能和良好的社区支持。它不仅支持AMQP,还支持其他协议如MQTT、STOMP等。
(2)、Qpid
Qpid是Apache基金会的一个项目,提供了AMQP的完整实现。它包括两个主要组件:Qpid Broker和Qpid Client。
(3)、ActiveMQ
ActiveMQ是另一个流行的消息代理,虽然它的默认协议是OpenWire,但它也支持AMQP。

5、AMQP的消息传递模型

AMQP支持多种消息传递模型,具体取决于使用的交换机类型。

(1)、点对点(P2P)模型

在P2P模型中,每条消息只能由一个消费者处理。消息被发送到一个队列,然后由某个消费者从队列中获取并处理。

示例:(python)

import pika// 建立连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()// 声明队列
channel.queue_declare(queue='task_queue', durable=True)// 发送消息
channel.basic_publish(exchange='',routing_key='task_queue',body='Hello, AMQP!',properties=pika.BasicProperties(delivery_mode=2,   使消息持久化))print(" [x] Sent 'Hello, AMQP!'")// 关闭连接
connection.close()

(2)、发布/订阅(Pub/Sub)模型

在Pub/Sub模型中,消息可以被多个订阅者接收。消息被发送到一个交换机,然后广播到所有绑定的队列。

示例:(python)

import pika// 建立连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()// 声明交换机
channel.exchange_declare(exchange='logs', exchange_type='fanout')// 发送消息
channel.basic_publish(exchange='logs',routing_key='',body='Hello, AMQP!')
print(" [x] Sent 'Hello, AMQP!'")// 关闭连接
connection.close()

(3)、主题(Topic)模型

在Topic模型中,消息根据路由键的模式匹配被发送到相应的队列。路由键可以包含通配符(表示一个单词, 表示零个或多个单词)。

示例:(python)

import pika// 建立连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()// 声明交换机
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')// 绑定队列
result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queuebinding_keys = ['.orange.', 'lazy.']
for binding_key in binding_keys:channel.queue_bind(exchange='topic_logs',queue=queue_name,routing_key=binding_key)// 接收消息
def callback(ch, method, properties, body):print(f" [x] Received {body.decode()} on {method.routing_key}")channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=True
)print(' [] Waiting for logs. To exit press CTRL+C')
channel.start_consuming()

适用场景:

  • 企业级应用中的异步通信。
  • 微服务架构中的消息传递。
  • 需要复杂路由规则和高可靠性保障的场景。

6、代码示例:(RabbitMQ示例)

(1)、添加依赖

<dependencies><!-- Spring Boot Starter for AMQP --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- Spring Boot Starter Web (Optional, if you need a REST controller) --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- RabbitMQ Java Client --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId></dependency>
</dependencies>

(2)、配置文件(yaml)

spring:rabbitmq:host: localhostport: 5672username: guestpassword: guest

(3)、创建配置类

创建一个配置类来定义交换器、队列和绑定关系。

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {// 定义队列名称public static final String QUEUE_NAME = "exampleQueue";public static final String EXCHANGE_NAME = "exampleExchange";public static final String ROUTING_KEY = "example.routing.key";// 声明队列@Beanpublic Queue queue() {return new Queue(QUEUE_NAME, true); // true 表示持久化队列}// 声明交换器@Beanpublic TopicExchange exchange() {return new TopicExchange(EXCHANGE_NAME);}// 绑定队列到交换器@Beanpublic Binding binding(Queue queue, TopicExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);}
}

(4)、创建消息生产者

创建一个服务类来发送消息。

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class MessageProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMessage(String message) {System.out.println("Sending message: " + message);rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, RabbitMQConfig.ROUTING_KEY, message);}
}

(5)、创建消息消费者

创建一个监听器类来接收消息。

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;@Service
public class MessageConsumer {@RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)public void receiveMessage(String message) {System.out.println("Received message: " + message);}
}

7、AMOP报文

AMQP(Advanced Message Queuing Protocol)是一种二进制协议,用于在客户端和消息代理之间传递消息。与HTTP这种基于文本的协议不同,AMQP的消息是通过二进制格式进行编码的,因此直接查看原始报文并不直观。然而,我们可以从概念上理解AMQP报文的结构,并通过一些工具或库来解析和展示这些报文的内容。

(1)、AMQP报文结构

AMQP报文通常由多个帧组成,每个帧都包含一个帧头和有效载荷部分。
具体来说,在发送一条消息的过程中,常见的报文结构确实包括三个主要的帧:方法帧(Method Frame)、内容头帧(Content Header Frame)和内容体帧(Content Body Frame)。每个帧都需要在其前面拼接一个帧头。

1、帧头(Frame Header)

每个AMQP帧都以一个帧头开始,包含帧类型、通道编号和帧长度等信息。

  • 帧类型(Frame Type):标识帧的类型(如方法帧、内容头帧、内容体帧等)。
  • 通道编号(Channel Number):标识该帧所属的通道。
  • 帧长度(Frame Length):帧的有效载荷部分的长度(不包括帧头本身)。
2、帧的类型
(1)、方法帧(Method Frame)

用于表示特定的操作,如连接、打开通道、声明队列等。

(2)、内容头帧(Content Header Frame)

包含消息的元数据,如消息大小、属性等。

(3)、内容体帧(Content Body Frame)

包含实际的消息内容。

(2)、示例:完整的AMQP报文

假设我们要发送一条JSON消息到exampleQueue的队列中,以下是完整的AMQP报文结构及其二进制表示。

1、声明队列的方法帧

示例:

| Frame Type: 1       |  // 方法帧
| Channel: 1          |
| Payload Length: 17  |
| Class ID: 50        |  // Queue类
| Method ID: 10       |  // Declare方法
| Queue Name Length: 13 |
| Queue Name: "exampleQueue" |
| Durable: true       |

解释:
前三行是方帧的帧头(指定了帧的类型,通道编号和帧的长度),后面为方法帧的具体内容(队列相关的信息,包含队列的类id,方法id,名称,长度等)。

二进制表示:

01 00 01 00 00 00 11  // 帧头
00 32 00 0A           // Class ID 和 Method ID
00 00 00 0D           // 队列名称长度
65 78 61 6D 70 6C 65 51 75 65 75 65  // 队列名称 "exampleQueue"
01                    // Durable: true
2、发送消息的内容头帧

示例:

| Frame Type: 2       |  // 内容头帧
| Channel: 1          |
| Payload Length: 22  |
| Class ID: 60        |  // Basic 类
| Weight: 0           |
| Body Size: 24 bytes |
| Properties:         |
|   Content-Type: application/json |
|   Delivery-Mode: 2 (persistent) |

解释:
前三行是方帧的帧头(指定了帧的类型,通道编号和帧的长度),后面为内容头帧的具体内容。

二进制表示:

02 00 01 00 00 00 16  // 帧头
00 3C 00 00           // Class ID 和 Weight
00 00 00 18           // Body Size: 24 bytes
00 00 00 0E 61 70 70 6C 69 63 61 74 69 6F 6E 2F 6A 73 6F 6E  // Content-Type: application/json
02                    // Delivery-Mode: persistent
3、发送消息的内容体帧

示例:

| Frame Type: 3       |  // 内容体帧
| Channel: 1          |
| Payload Length: 24  |
| {"name": "John", "age": 30} |

解释:
前三行是方帧的帧头(指定了帧的类型,通道编号和帧的长度),后面为内容体帧的具体内容。

二进制表示:
03 00 01 00 00 00 18 // 帧头
7B 22 6E 61 6D 65 22 3A 20 22 4A 6F 68 6E 22 2C 20 22 61 67 65 22 3A 20 33 30 7D // JSON 消息

(3)、AMQP报文总结

在AMQP协议中,每种类型的帧(方法帧、内容头帧、内容体帧)都需要在其前面拼接一个帧头。帧头提供了关于帧的基本信息,使得接收方能够正确解析和处理这些帧。
具体步骤如下:
1、方法帧:用于执行操作(如声明队列)。
2、内容头帧:包含消息的元数据(如消息大小和属性)。
3、内容体帧:包含实际的消息内容。

AMQP本质是一种二进制协议,消息内容是无法通过文本查看的。理解AMQP报文能够让我们更好的了解其本质和原理

8、总结

AMQP是一种强大的消息传递协议,广泛应用于分布式系统中,确保不同组件之间的可靠通信。通过理解其核心概念和工作流程,开发者可以更好地利用AMQP构建高效、可扩展的应用程序。无论是微服务架构、事件驱动系统还是日志收集,AMQP都能提供坚实的通信基础。

乘风破浪!Dare to Be!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/893739.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LLaMA-Factory 微调LLaMA3

LoRA介绍 LoRA&#xff08;Low-Rank Adaptation&#xff09;是一种用于大模型微调的技术&#xff0c; 通过引入低秩矩阵来减少微调时的参数量。在预训练的模型中&#xff0c; LoRA通过添加两个小矩阵B和A来近似原始的大矩阵ΔW&#xff0c;从而减 少需要更新的参数数量。具体来…

【项目实战】—— 高并发内存池设计与实现

目录 一&#xff0c;项目介绍 1.1 关于高并发内存池 1.2 关于池化技术 1.3 关于malloc 二&#xff0c;定长内存池实现 2.1 实现详情 ​2.2 完整代码 三&#xff0c;高并发内存池整体设计 四&#xff0c;threadcache设计 4.1 整体设计 4.2 哈希桶映射对齐规则 4.3 …

设计模式的艺术-代理模式

结构性模式的名称、定义、学习难度和使用频率如下表所示&#xff1a; 1.如何理解代理模式 代理模式&#xff08;Proxy Pattern&#xff09;&#xff1a;给某一个对象提供一个代理&#xff0c;并由代理对象控制对原对象的引用。代理模式是一种对象结构型模式。 代理模式类型较多…

计算机网络 (54)系统安全:防火墙与入侵检测

前言 计算机网络系统安全是确保网络通信和数据不受未经授权访问、泄露、破坏或篡改的关键。防火墙和入侵检测系统&#xff08;IDS&#xff09;是维护网络系统安全的两大核心组件。 一、防火墙 定义与功能 防火墙是一种用来加强网络之间访问控制的特殊网络互联设备&#xff0c;它…

three.js+WebGL踩坑经验合集(3):THREE.Line的射线检测问题(不是阈值方面的,也不是难选中的问题)

笔者之所以要在标题里强调不是阈值方面&#xff0c;是因为网上的大多数文章提到线的射线检测问题&#xff0c;90%以上的文章都说是因为线太细所以难选中&#xff0c;然后让大家把线的阈值调大。 而本文所要探讨的问题则恰好相反&#xff0c;不是难选中&#xff0c;而是在某些角…

省市区三级联动

引言 在网页中&#xff0c;经常会遇到需要用户选择地区的场景&#xff0c;如注册表单、地址填写等。为了提供更好的用户体验&#xff0c;我们可以实现一个三级联动的地区选择器&#xff0c;让用户依次选择省份、城市和地区。 效果展示&#xff1a; 只有先选择省份后才可以选择…

快速搭建深度学习环境(Linux:miniconda+pytorch+jupyter notebook)

本文基于服务器端环境展开&#xff0c;使用的虚拟终端为Xshell。 miniconda miniconda是Anaconda的轻量版&#xff0c;仅包含Conda和Python&#xff0c;如果只做深度学习&#xff0c;可使用miniconda。 [注]&#xff1a;Anaconda、Conda与Miniconda Conda&#xff1a;创建和管…

BGP分解实验·11——路由聚合与条件性通告(3)

续接上&#xff08;2&#xff09;的实验。其拓扑如下&#xff1a; 路由聚合的负向也就是拆分&#xff0c;在有双出口的情况下&#xff0c;在多出口做流量分担是优选方法之一。 BGP可以根据指定来源而聚合路由&#xff0c;在产生该聚合路由的范围内的条目注入到本地BGP表后再向…

攻防世界easyRSA

解密脚本&#xff1a; p473398607161 q4511491 e17def extended_euclidean(a, b):if b 0:return a, 1, 0gcd, x1, y1 extended_euclidean(b, a % b)x y1y x1 - (a // b) * y1return gcd, x, ydef calculate_private_key(p, q, e):phi (p - 1) * (q - 1)gcd, x, y extend…

常见的多媒体框架(FFmpeg GStreamer DirectShow AVFoundation OpenMax)

1.FFmpeg FFmpeg是一个非常强大的开源多媒体处理框架&#xff0c;它提供了一系列用于处理音频、视频和多媒体流的工具和库。它也是最流行且应用最广泛的框架&#xff01; 官方网址&#xff1a;https://ffmpeg.org/ FFmpeg 的主要特点和功能&#xff1a; 编解码器支持: FFmpe…

.NET MAUI进行UDP通信(二)

上篇文章有写过一个简单的demo&#xff0c;本次对项目进行进一步的扩展&#xff0c;添加tabbar功能。 1.修改AppShell.xaml文件&#xff0c;如下所示&#xff1a; <?xml version"1.0" encoding"UTF-8" ?> <Shellx:Class"mauiDemo.AppShel…

计算机网络之链路层

本文章目录结构出自于《王道计算机考研 计算机网络_哔哩哔哩_bilibili》 02 数据链路层 在网上看到其他人做了详细的笔记&#xff0c;就不再多余写了&#xff0c;直接参考着学习吧。 1 详解数据链路层-数据链路层的功能【王道计算机网络笔记】_wx63088f6683f8f的技术博客_51C…

YOLOv11改进,YOLOv11检测头融合DSConv(动态蛇形卷积),并添加小目标检测层(四头检测),适合目标检测、分割等任务

前言 精确分割拓扑管状结构例如血管和道路,对各个领域至关重要,可确保下游任务的准确性和效率。然而,许多因素使任务变得复杂,包括细小脆弱的局部结构和复杂多变的全局形态。在这项工作中,注意到管状结构的特殊特征,并利用这一知识来引导 DSCNet 在三个阶段同时增强感知…

Flutter android debug 编译报错问题。插件编译报错

下面相关内容 都以 Mac 电脑为例子。 一、问题 起因&#xff1a;&#xff08;更新 Android studio 2024.2.2.13、 Flutter SDK 3.27.2&#xff09; 最近 2025年 1 月 左右&#xff0c;我更新了 Android studio 和 Flutter SDK 再运行就会出现下面的问题。当然 下面的提示只是其…

扣子平台音频功能:让声音也能“智能”起来

在数字化时代&#xff0c;音频内容的重要性不言而喻。无论是在线课程、有声读物&#xff0c;还是各种多媒体应用&#xff0c;音频都是传递信息、增强体验的关键元素。扣子平台的音频功能&#xff0c;为开发者和内容创作者提供了一个强大而灵活的工具&#xff0c;让音频的使用和…

RubyFPV开源代码之系统简介

RubyFPV开源代码之系统简介 1. 源由2. 工程架构3. 特性介绍&#xff08;软件&#xff09;3.1 特性亮点3.2 数字优势3.3 使用功能 4. DEMO推荐&#xff08;硬件&#xff09;4.1 天空端4.2 地面端4.3 按键硬件Raspberry PiRadxa 3W/E/C 5. 软件设计6. 参考资料 1. 源由 RubyFPV以…

将 OneLake 数据索引到 Elasticsearch - 第二部分

作者&#xff1a;来自 Elastic Gustavo Llermaly 及 Jeffrey Rengifo 本文分为两部分&#xff0c;第二部分介绍如何使用自定义连接器将 OneLake 数据索引并搜索到 Elastic 中。 在本文中&#xff0c;我们将利用第 1 部分中学到的知识来创建 OneLake 自定义 Elasticsearch 连接器…

PMP–一、二、三模–分类–14.敏捷

文章目录 敏捷中的角色职责与3个工件--题干关键词角色职责3个工件 高频考点分析&#xff08;一、过程&#xff1b;二、人员&#xff09;一、过程&#xff1a;1.1 变更管理&#xff1a;1.1.1 瀑布型变更&#xff08;一次交付、尽量限制、确定性需求 &#xff1e;风险储备&#x…

Vue2下篇

插槽&#xff1a; 基本插槽&#xff1a; 普通插槽&#xff1a;父组件向子组件传递静态内容。基本插槽只能有一个slot标签&#xff0c;因为这个是默认的位置&#xff0c;所以只能有一个 <!-- ParentComponent.vue --> <template> <ChildComponent> <p>…

【科研建模】Pycaret自动机器学习框架使用流程及多分类项目实战案例详解

Pycaret自动机器学习框架使用流程及项目实战案例详解 1 Pycaret介绍2 安装及版本需求3 Pycaret自动机器学习框架使用流程3.1 Setup3.2 Compare Models3.3 Analyze Model3.4 Prediction3.5 Save Model4 多分类项目实战案例详解4.1 ✅ Setup4.2 ✅ Compare Models4.3 ✅ Experime…