Golang学习笔记_RabbitMQ的原理架构和使用

RabbitMQ 简介

  1. 实现了高级消息队列协议(Advanced Message Queuing Protcol)AMQP
  2. 消息队列中间件的作用(Redis实现MQ里面有写过,这里简单带过)
    1. 解耦
    2. 削峰
    3. 异步处理
    4. 缓存
    5. 消息通信
    6. 提高扩展性

RabbitMQ 架构理解

channel
binding
channel
channel
channel
Producer 生产者
Exchange交换机
Queue消息队列
Consumer消费者
Consumer消费者
Consumer消费者
  1. binding(绑定):交换机将消息路由给Queue所遵循的规则,可以定义一个路由键,用于交换机筛选特定的Queue
    1. Routing_Key(路由键):Producer 和 Consumer 协商一致的 key 策略。主要在交换机的 direct(直连)和 topic(主题) 模式下使用,fanout(广播)模式下不使用Routing_Key
  2. Exchange(交换机):主要功能是分发消息给特定的Queue,只负责转发,不具备存储消息的功能。Exchange有以下四种模式:
    1. direct(直连模式),根据携带的Routing_Key来筛选特定的Queue进行消息投递。是RabbitMQ的默认类型,可以不指定Routing_Key,在创建时会默认生成与Queue重名。
    2. hander(头模式),使用场景不多,消息路由涉及多个属性的时候,交换机使用多属性来代替Routing_key建立路由规则,还可以定义匹配单词的个数,例如any为有一个单词满足条件就匹配成功。all为所有单词都满足条件才匹配成功。
    3. fanout(广播模式),不看Routing_Key。只根据Exchange和Queue的binding情况来分发信息。所有与之binding的queue都将接收到同一条消息。
    4. topic(主题模式),相当于模糊查询。topic的routing_key是使用 . 来进行隔断的。有两种匹配方法:
      1. " * " 匹配一个单词,例子如下
      2. " # " 匹配0个~多个单词,例子如下
rabbitMQ.* == rabbitMQ.topic != rabbitMQ.topic.topic
rabbitMQ.# == rabbit.topic == rabbit.topic.topic
  1. Queue(消息队列的存储数据结构):
    1. 存储方式:
      • 持久化,在Server本地硬盘存储一份
      • 临时队列,重启后丢失数据
      • 自动删除,不存在用户连接则删除queue
    2. 队列对ACK请求的不同情况
      • consumer 接收并 ack,queue 删除数据并向 consumer 发送新消息
      • consumer 接收但是未 ack 就断开了连接,queue 会认为消息并未传送成功,consumer 再次连接时会重新发送消息
      • 如果consumer 接收消息成功 ,但是忘记 ack 则 queue 不会重复发送消息
      • 如果 consumer 拒收消息,则 queue 会向另外满足条件的 consumer 继续发送这条消息

RabbitMQ 工作流程

Producer方向
  1. Producer 与 RabbitMQ Broker 建立连接,开启一个信道 channel
  2. 声明交换机并设置属性(交换机类型、持久化等)
  3. 声明Queue并设置属性(持久化,自动删除等)
  4. 通过Routing_key来binding交换机和Queue
  5. 发送信息给交换,交换机根据Routing_key来确认投递的queue
  6. 查找成功后将消息存到queue
  7. 查找失败将消息丢弃或抛回给生产者
  8. 关闭channel
Consumer方向
  1. 与 queue 建立连接,开启channel
  2. 向queue请求队列中的msg
  3. 等待queue回应,开始接收消息
  4. 消息处理完成后 返回回调确认ack
  5. queue 将确认的消息从队列中删除
  6. 关闭channel

RabbitMQ的两种部署方式

Meta Data : 元数据(描述数据的数据)

  • vhost meta data : 为Queue、Exchange、Binding提供命名空间级别的隔离
  • exchange meta data:记录路由的名称类型和属性
  • binding mate data:映射 routing_key和queue之间的绑定关系
  • queue mate data:表队列名称和属性
普通模式

对于该模式的两个节点,消息只会存在其中一个节点,另一个节点只保存mate data,当consumer 连接节点2访问节点1的数据信息时,消息会在两个节点中传递。
该模式下p和c应尽量连接每个节点,这样起到线性拓展的作用。
但存在一个问题,如果节点上还有未消费的消息,但是节点挂了。如果节点设置了持久化,则需要在节点重启的时候消息才会恢复。如果未设置持久化,则消息会丢失。

镜像模式

消息存在多个节点中,消息会在节点与节点之间同步,可实现高可用(当一个节点挂了,另一个节点可以接替其位置,继续工作)但会降低性能,因为大量消息进入和同步,会占用大量带宽,但是为了保证高可靠性需要取舍。

面试题

  • Q:如何保证消息不被重复消费?
    • A:MQ通过确认机制ACK,进行确认。确认后消息从queue中删除,保证消息不被重复消费的。如果因为网络原因ack没有成功发出,导致消息重新投递。可以使用全局唯一消息id来避免。
    1. 消息发送者发送消息时携带一个全局唯一的消息id
    2. 消费者监听到消息后,根据id在redis或者db中查询是否存在消费记录
    3. 如果没有消费就正常消费,消费完毕后,写入redis或者db
    4. 如果消息消费过则直接丢弃
  • Q:如何保证消息的消费顺序?
    • A:RabbitMQ中存在一个设置,叫独占队列。即在同一时间只有一个消费者会消费消息。从而制止了异步操作,保证消费顺序。或者一个Producer对一个Consumer
  • Q:如何保证数据一致性?
    • A:因为MQ的使用场景多为分布式系统,所以一般不追求强一致性。而保证最终一致性就可以。
    • 而保证数据最终一致性,可以采用消息补偿机制。即消息在消费者处理完之后调用生产者的API修改数据状态。如未调用API则判断为消息处理失败或出错。此时间隔一段时间后重新投递消息进行再次操作。
    • 消费者收到消息,处理完毕后,发送一条响应消息给生产者也是消息补偿机制,本意是确认消费者成功消费消息。ACK也是处理方法

RabbitMQ的使用(Golang使用amqp包)

代码部分参考 upup小亮的博客

代码只是简单的操作,主要是熟悉流程。对于如何创建Queue和绑定Exchange之类的操作有个了解。

Simple(简单收发模式,只有一个Queue)

Simple运行机制与WorkQueue相似,只是一个Consumer与多个Consumer的区别。多个Consumer之间存在竞争关系,所以工作队列是创建多个Consumer,多个竞争只有一个可以获取消息消费。消费成功后ack消息删除。
演示代码放到一起了:

WorkQueue 工作队列

生产者

// simple and work queue
func main2() {// 连接到 rabbitMQconn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")if err != nil {log.Fatalf("无法创建连接:%s", err)return}// 默认关闭defer conn.Close()// 创建通道Channelch, err := conn.Channel()if err != nil {log.Fatalf("无法创建channel:%s", err)return}// 通道关闭defer ch.Close()// 创建存储队列queue, err := ch.QueueDeclare("hello", // 队列名称false, // 持久化设置,可以为true根据需求选择false, // 自动删除,没有用户连接删除queue一般不选用false, //独占false, //等待服务器确认nil)   //参数if err != nil {fmt.Println(err)log.Fatalf("无法声明队列:%s", err)return}var body string// 发送信息for i := 0; i < 10; i++ {fmt.Println(i)body = "Hello RabbitMQ" + string(i)err = ch.Publish("",queue.Name,false, // 必须发送到消息队列false, // 不等待服务器确认amqp.Publishing{ContentType: "text/plain",Body:        []byte(body),})if err != nil {log.Fatalf("消息生产失败:%s", err)continue}}
}

消费者

	// create conn// 如果同时运行两个这样的consumer代码,就是工作队列。只有一个consumer就是simpleconn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")if err != nil {log.Fatalf("无法创建连接:%s", err)return}defer conn.Close()// create channelch, err := conn.Channel()if err != nil {log.Fatalf("无法创建channel:%s", err)return}defer ch.Close()// create queuequeue, err := ch.QueueDeclare("hello",false,false,false,false,nil)if err != nil {log.Fatalf("无法创建queue:%s", err)return}// 消费信息msgs, err := ch.Consume(queue.Name,"",true,false,false,false,nil)if err != nil {log.Fatalf("无法消费信息:%s", err)return}for msg := range msgs {log.Println(string(msg.Body))}return

pub/sub 发布订阅模式

发布订阅模式可以创建两个Queue,绑定到同一个Exchange中
生产者这边只需要跟交换机对接,而交换机类型为fanout

func main() {// 连接到 rabbitMQconn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")if err != nil {log.Fatalf("无法创建连接:%s", err)}// 默认关闭defer conn.Close()// 创建通道Channelch, err := conn.Channel()if err != nil {log.Fatalf("无法创建channel:%s", err)}defer ch.Close()// create exchangeex := ch.ExchangeDeclare("exchange1", // 交换机名称"fanout",    // 交换机类型true,        // 是否持久化false,       // 是否自动删除false,       // 是否内部使用false,       // 是否等待服务器响应nil,         // 其他属性)fmt.Println(ex)body := "Hello RabbitMQ for Pub/Sub"err = ch.Publish("exchange1","", // routing key 可以为空,因为fanout不看routing keyfalse,false,amqp.Publishing{ContentType: "text/plain",Body:        []byte(body),})if err != nil {log.Fatalf("err %s:", err)}log.Println(body)
}

消费者:创建交换机,类型为fanout,创建队列,绑定交换机(创建多个consumer绑定同一个queue和同一个交换机。这样发送一个消息,所有的consumer都能收到。== 发布订阅模型)

	// Pub/Sub// Create connconn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")if err != nil{log.Fatalf(err)}defer conn.Close()// channel createch, err := conn.Channel()if err != nil{log.Fatalf(err)}defer ch.Close()// exchange createex := ch.ExchangeDeclare("exchange1","fanout",true,false,false,false,nil)fmt.Println(ex)// queue createqueue, err := ch.QueueDeclare("hello",false,false,false,false,nil)if err != nil{log.Fatalf(err)}err = ch.QueueBind(queue.Name,"","exchange1",false,nil)if err != nil{log.Fatalf(err)}msgs, err := ch.Consume(queue.Name,"",true,false,false,false,nil)if err != nil{log.Fatalf(err)}go func() {for d := range msgs {log.Printf("Received a message: %s", d.Body)}}()log.Printf("Waiting for messages. To exit press CTRL+C")<-make(chan struct{}) // 阻塞主goroutine
}

Routing 模式(对特定的队列投递消息)

生产者

func main() {// 连接到 rabbitMQconn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")if err != nil {log.Fatalf("无法创建连接:%s", err)}// 默认关闭defer conn.Close()// 创建通道Channelch, err := conn.Channel()if err != nil {log.Fatalf("无法创建channel:%s", err)}defer ch.Close()// create exchangeex := ch.ExchangeDeclare("exchange1", // 交换机名称"direct",    // 交换机类型true,        // 是否持久化false,       // 是否自动删除false,       // 是否内部使用false,       // 是否等待服务器响应nil,         // 其他属性)fmt.Println(ex)body := "Hello RabbitMQ for direct routing"// 发布消息到交换机,并指定路由键err = ch.Publish("logs_direct", // 交换机名称"routing_key", // 路由键false,         // 是否等待服务器响应false,         // 是否立即将消息写入磁盘amqp.Publishing{ContentType: "text/plain",Body:        []byte(body),},)if err != nil{log.Fatalf("无法创建send msg:%s", err)}log.Printf("Sent message: %s", message)

消费者

func main() {// 连接到RabbitMQ服务器conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")if err != nil{log.Fatalf("无法创建send msg:%s", err)}defer conn.Close()// 创建一个通道ch, err := conn.Channel()if err != nil{log.Fatalf("无法创建send msg:%s", err)}defer ch.Close()// 声明一个交换机err = ch.ExchangeDeclare("logs_direct", // 交换机名称"direct",      // 交换机类型true,          // 是否持久化false,         // 是否自动删除false,         // 是否内部使用false,         // 是否等待服务器响应nil,           // 其他属性)if err != nil{log.Fatalf("无法创建send msg:%s", err)}// 声明一个临时队列q, err := ch.QueueDeclare("",    // 队列名称,留空表示由RabbitMQ自动生成,因为定义了key所以队列名可以是随意的,毕竟是依靠key来进行匹配的false, // 是否持久化false, // 是否自动删除(当没有任何消费者连接时)true,  // 是否排他队列(仅限于当前连接)false, // 是否等待服务器响应nil,   // 其他属性)// 将队列绑定到交换机上,并指定要接收的路由键err = ch.QueueBind(q.Name,        // 队列名称"routing_key",      // 路由键"logs_direct", // 交换机名称false,         // 是否等待服务器响应nil,           // 其他属性)if err != nil{log.Fatalf("无法创建send msg:%s", err)}// 订阅消息msgs, err := ch.Consume(q.Name, // 队列名称"",     // 消费者标识符,留空表示由RabbitMQ自动生成true,   // 是否自动应答false,  // 是否独占模式(仅限于当前连接)false,  // 是否等待服务器响应false,  // 其他属性nil,    // 其他属性)failOnError(err, "Failed to register a consumer")// 接收消息的goroutinego func() {for d := range msgs {log.Printf("Received a message: %s", d.Body)}}()log.Printf("Waiting for messages. To exit press CTRL+C")<-make(chan struct{}) // 阻塞主goroutine

topic

func main() {// 连接到RabbitMQ服务器conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")if err != nil{log.Fatalf(err)}defer conn.Close()// 创建一个通道ch, err := conn.Channel()if err != nil{log.Fatalf(err)}defer ch.Close()// 声明一个交换机err = ch.ExchangeDeclare("logs_topic", // 交换机名称"topic",      // 交换机类型true,         // 是否持久化false,        // 是否自动删除false,        // 是否内部使用false,        // 是否等待服务器响应nil,          // 其他属性)if err != nil{log.Fatalf(err)}// 定义要发送的消息的路由键和内容routingKey := "example.key.das"message := "Hello, RabbitMQ!"// 发布消息到交换机,并指定路由键err = ch.Publish("logs_topic", // 交换机名称routingKey,   // 路由键false,        // 是否等待服务器响应false,        // 是否立即发送amqp.Publishing{ContentType: "text/plain",Body:        []byte(message),},)if err != nil{log.Fatalf(err)}log.Printf("Sent message: %s", message)
}

消费者

func main() {// 连接到RabbitMQ服务器conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")if err != nil{log.Fatalf(err)}defer conn.Close()// 创建一个通道ch, err := conn.Channel()if err != nil{log.Fatalf(err)}defer ch.Close()// 声明一个交换机err = ch.ExchangeDeclare("logs_topic", // 交换机名称"topic",      // 交换机类型true,         // 是否持久化false,        // 是否自动删除false,        // 是否内部使用false,        // 是否等待服务器响应nil,          // 其他属性)if err != nil{log.Fatalf(err)}// 声明一个临时队列q, err := ch.QueueDeclare("",    // 队列名称,留空表示由RabbitMQ自动生成false, // 是否持久化false, // 是否自动删除(当没有任何消费者连接时)true,  // 是否排他队列(仅限于当前连接)false, // 是否等待服务器响应nil,   // 其他属性)if err != nil{log.Fatalf(err)}// 将队列绑定到交换机上,并指定要接收的路由键err = ch.QueueBind(q.Name,       // 队列名称"example.#",  // 路由键,可以使用通配符*匹配一个单词"logs_topic", // 交换机名称false,        // 是否等待服务器响应nil,          // 其他属性)if err != nil{log.Fatalf(err)}// 创建一个消费者通道msgs, err := ch.Consume(q.Name, // 队列名称"",     // 消费者标识符,留空表示由RabbitMQ自动生成true,   // 是否自动应答false,  // 是否排他消费者false,  // 是否阻塞false,  // 是否等待服务器响应nil,    // 其他属性)if err != nil{log.Fatalf(err)}// 接收和处理消息forever := make(chan bool)go func() {for d := range msgs {log.Printf("Received a message: %s", d.Body)}}()log.Printf("Waiting for messages...")// 阻塞<-forever
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/824739.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

upload-labs靶场详解

靶场环境 下载链接&#xff1a;https://codeload.github.com/c0ny1/upload-labs/zip/refs/heads/master 使用小皮集成环境来完成这个靶场 将文件放到WWW目录下就可以进行访问 进入关卡后页面呈现&#xff1a; Pass-01&#xff08;前端绕过&#xff09; 我们先尝试上传一个web.…

[svelte]属性和逻辑块

属性 / Default values • Svelte 教程 | Svelte 中文网 属性 Declaring props 到目前为止&#xff0c;我们只处理了内部状态——也就是说&#xff0c;这些值只能在给定的组件中访问。 在任何实际应用程序中&#xff0c;都需要将数据从一个组件向下传递到其子组件。为此&…

【Spring】-编程式事务和声明式事务

spring中控制事务的方式有两种&#xff1a;编程式事务和声明式事务&#xff0c;今天我以两种事务出发&#xff0c;对spring中实现事务的EnableTransactionManagement和Transaction两个注解的底层原理进行讨论。 一、编程式事务 什么是编程式事务&#xff1f; 硬编码的方式实现…

Adobe将Sora、Runway、Pika,集成在PR中

4月15日晚&#xff0c;全球多媒体巨头Adobe在官网宣布&#xff0c;将OpenAI的Sora、Pika 、Runway等著名第三方文生视频模型&#xff0c;集成在视频剪辑软件Premiere Pro中&#xff08;简称“PR”&#xff09;。 同时&#xff0c;Adob也会将自身研发的Firefly系列模型包括视频…

【Python】高级进阶(专版提升3)

Python 1 程序结构1.1 模块 Module1.1.1 定义1.1.2 作用1.1.3 导入1.1.3.1 import1.1.3.2 from import 1.1.4 模块变量1.1.5 加载过程1.1.6 分类 1.2 包package1.2.1 定义1.2.2 作用1.2.3 导入1.1.3.1 import1.1.3.2 from import 2 异常处理Error2.1 异常2.2 处理 3 迭代3.1 可…

InfluxDB v1.8

数据存储模型 points(数据点)time(时间戳)measurement(测量指标)field(测量值 key-value)至少一个tag(标签 key-value)零或多个和MySQL对比 series是共享同一个retention policy,measurement以及tag set的数据集合 InfluxDBMySQLmeasurementtablepointscoltagrow(with index)…

Three.js 入门——核心概念和坐标系理解

Three.js 是什么&#xff1f; 一个封装了 WebGL 的库&#xff0c;简化 WebGL 的使用 WebGL vs OpenGL OpenGL 主要被认为是一种 API&#xff08;应用程序编程接口&#xff09;&#xff0c;它为我们提供了大量可用于操作图形和图像的函数&#xff0c;主要用 C语言编写的。 然…

python辅助QQ登入

python辅助QQ登入 import pyautogui import time import random from pyautogui import ImageNotFoundException# 生成随机等待时间&#xff0c;范围在1到3秒之间 random_time random.uniform(1, 3)def find_and_click(image_path, moveFalse, execute_nextTrue):try:image_l…

【QT学习】7.事件,把文本显示在页面中(文本可变),鼠标指针切换,鼠标左键右键按下,qt设置背景样式

0.创建项目&#xff0c;事件的创建 1.事件的位置 2.这就是多态&#xff0c;子类重写父类函数&#xff0c;子类调用子类函数&#xff0c;也可以调用父类函数。但同函数名 1.要求&#xff1a;文本显示在页面中&#xff08;文本可变&#xff09; 1.文本显示在页面的核心代码 主要步…

vue3通过事件总线不同组件之间传递消息(两个组件可以没有任何关系)

首先在main.js中定义 const app createApp(App) app.config.globalProperties.$eventBus new mitt() // 创建全局事件总线对象然后在发送事件的组件中写&#xff08;js和ts导入方式不太一样&#xff0c;用法一样&#xff09; <script setup> import {getCurrentInsta…

DRF requets源码分析

【四】requets源码分析 【1】查看request传递的数据 &#xff08;1&#xff09;视图层 编写传输数据的接口查看request方法的参数 class BookAPIView(APIView):def get(self, request, *args, **kwargs):return Response({body: request.body, data: request.data, post: r…

【Web】DASCTF X GFCTF 2022十月挑战赛题解

目录 EasyPOP hade_waibo EasyLove BlogSystem EasyPOP 先读hint.php sorry.__destruct -> secret_code::secret() exp: $anew sorry(); $bnew secret_code(); $a->password"suibian"; $a->name"jay"; echo serialize($a); 真暗号啊&…

web项目中jsp页面不识别el表达式

如果使用el表达式出现下图问题 ** 解决办法 ** 这是因为maven创建项目时&#xff0c;web.xml头部声明默认是2.3&#xff0c;这个默认jsp关闭el表达式 修改web.xml文件开头的web-app的版本 <?xml version"1.0" encoding"UTF-8"?> <web-app x…

Vue3 Reactive和Ref

当你在使用Vue 3时&#xff0c;reactive 和 ref 是两个常用的响应式API。它们都是用来跟踪状态变化并在UI中进行响应式更新的。 1. ref ref 用于创建一个响应式的基本数据类型变量&#xff0c;例如数字、字符串等。它返回一个带有 .value 属性的对象&#xff0c;该属性包含了…

Python爬取猫眼电影票房 + 数据可视化

目录 主角查看与分析 爬取可视化分析猫眼电影上座率前10分析猫眼电影票房场均人次前10分析猫眼电影票票房占比分析 主角查看与分析 爬取 对猫眼电影票房进行爬取&#xff0c;首先我们打开猫眼 接着我们想要进行数据抓包&#xff0c;就要看网站的具体内容&#xff0c;通过按F12…

Postman之安装

Postman工具之介绍与安装 Postman是什么&#xff1f;Postman有几种安装方式&#xff1f; Postman是什么&#xff1f; postman是一款http客户端的模拟器&#xff0c;它可以模拟发出各种各样的网络请求&#xff0c;用于接口测试。 Postman有几种安装方式&#xff1f; 两种&…

4.17 网络编程

思维导图 select实现TCP并发服务器 #include <myhead.h> #define SER_IP "192.168.125.26" #define SER_PORT 8888int main(int argc, const char *argv[]) {int sfd socket(AF_INET,SOCK_STREAM,0);if(sfd -1){perror("socket error");return -1…

hvv准备ing

常见的SQL注入&#xff1f;sqlmap的常用命令和功能&#xff1f;SQLMAPAPI怎么使用&#xff1f;sqlmap --os-shell 原理&#xff1a;在数据交互中&#xff0c;前端的数据传入到后台处理 时&#xff0c;由于后端没有做严格的判断&#xff0c;导致其传入的“数 据”拼接到SQL语句…

c++程序员通用成长规划

一、长期计划 要有一个长期的学习计划&#xff0c;确定学习方向&#xff0c;拆分为各个模块&#xff0c;每天学习多少&#xff0c;根据实际情况灵活调整&#xff0c;一切以当前实际工作为主后续发展为辅&#xff0c;并且要坚持。这里的坚持不是强制的&#xff0c;比如今天有事…

Spring Boot深度解析:是什么、为何使用及其优势所在

在Java企业级应用开发的漫长历史中&#xff0c;Spring框架以其卓越的依赖注入和面向切面编程的能力&#xff0c;赢得了广大开发者的青睐。然而&#xff0c;随着技术的不断进步和项目的日益复杂&#xff0c;传统的Spring应用开发流程逐渐显得繁琐和低效。为了解决这一问题&#…