go消息队列RabbitMQ - 订阅模式-fanout

1、发布订阅 

订阅模式,消息被路由投递给多个队列,一个消息被多个消费者获取。

1) 可以有多个消费者
2) 每个消费者有自己的queue(队列)
3) 每个队列都要绑定到Exchange(交换机)
4) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
5) 交换机把消息发送给绑定过的所有队列
6) 队列的消费者都能拿到消息。实现一条消息被多个消费者消费

  • 相关场景:邮件群发,群聊天,广播(广告)

2、Exchanges(交换器)

消息产生者将消息放入交换机,交换机发布订阅把消息发送到所有消息队列中,对应消息队列的消费者拿到消息进行消费

有几种交换器类型可用:directtopicheaders 和 fanout。我们将集中讨论最后一个——fanout

2.1 创建交换器

创建一个这种类型的交换器,并给它起个名字叫logs

err = ch.ExchangeDeclare("logs",   // name"fanout", // typetrue,     // durablefalse,    // auto-deletedfalse,    // internalfalse,    // no-waitnil,      // arguments
)

fanout(扇出)交换器非常简单。正如你可能从名称中猜测的那样,它只是将接收到的所有消息广播到它知道的所有队列中。 

2.2 临时队列

也是自动删除队列吗,和普通队列在使用上没有什么区别,唯一的区别是,当消费者断开连接时,队列将会被删除。自动删除队列允许的消费者没有限制,也就是说当这个队列上最后一个消费者断开连接才会执行删除。

自动删除队列只需要在声明队列时,设置属性auto-delete标识为true即可。系统声明的随机队列,缺省就是自动删除的。

q, err := ch.QueueDeclare("",    // 空字符串作为队列名称false, // 非持久队列false, // delete when unusedtrue,  // 独占队列(当前声明队列的连接关闭后即被删除)false, // no-waitnil,   // arguments
)

上述方法返回时,生成的队列实例包含RabbitMQ生成的随机队列名称。例如,它可能看起来像amq.gen-JzTY20BRgKO-HjmUJj0wLg

2.3 交换器与队列绑定

 

交换器和队列之间的关系称为绑定

err = ch.QueueBind(q.Name, // queue name"",     // routing key"logs", // exchangefalse,nil,
)

从现在开始,logs交换器将会把消息添加到我们的队列中。

2.4 发布消息到交换机

例如,发布到fanout交换器:

body := bodyFrom(os.Args)
err = ch.Publish("logs", // exchange"",     // routing keyfalse,  // mandatoryfalse,  // immediateamqp.Publishing{ContentType: "text/plain",Body:        []byte(body),})

 3 完整代码

产生日志消息的生产程序与上一教程看起来没有太大不同。最重要的变化是我们现在希望将消息发布到logs交换器,而不是空的消息交换器。发送时,我们需要提供一个routingKey,但是对于fanout型交换器,它的值可以被忽略(传空字符串)。下面是emit_log.go脚本的代码:

package mainimport ("log""os""strings""github.com/streadway/amqp"
)func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()err = ch.ExchangeDeclare("logs",   // name"fanout", // typetrue,     // durablefalse,    // auto-deletedfalse,    // internalfalse,    // no-waitnil,      // arguments)failOnError(err, "Failed to declare an exchange")body := bodyFrom(os.Args)err = ch.Publish("logs", // exchange"",     // routing keyfalse,  // mandatoryfalse,  // immediateamqp.Publishing{ContentType: "text/plain",Body:        []byte(body),})failOnError(err, "Failed to publish a message")log.Printf(" [x] Sent %s", body)
}func bodyFrom(args []string) string {var s stringif (len(args) < 2) || os.Args[1] == "" {s = "hello"} else {s = strings.Join(args[1:], " ")}return s
}

(emit_logs.go源码)

如你所见,在建立连接之后,我们声明了交换器。此步骤是必需的,因为禁止发布到不存在的交换器。

如果没有队列绑定到交换器,那么消息将丢失,但这对我们来说是ok的。如果没有消费者在接收,我们可以安全地丢弃该消息。

receive_logs.go的代码:

package mainimport ("log""github.com/streadway/amqp"
)func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()err = ch.ExchangeDeclare("logs",   // name"fanout", // typetrue,     // durablefalse,    // auto-deletedfalse,    // internalfalse,    // no-waitnil,      // arguments)failOnError(err, "Failed to declare an exchange")q, err := ch.QueueDeclare("",    // namefalse, // durablefalse, // delete when unusedtrue,  // exclusivefalse, // no-waitnil,   // arguments)failOnError(err, "Failed to declare a queue")err = ch.QueueBind(q.Name, // queue name"",     // routing key"logs", // exchangefalse,nil,)failOnError(err, "Failed to bind a queue")msgs, err := ch.Consume(q.Name, // queue"",     // consumertrue,   // auto-ackfalse,  // exclusivefalse,  // no-localfalse,  // no-waitnil,    // args)failOnError(err, "Failed to register a consumer")forever := make(chan bool)go func() {for d := range msgs {log.Printf(" [x] %s", d.Body)}}()log.Printf(" [*] Waiting for logs. To exit press CTRL+C")<-forever
}

(receive_logs.go源码)

如果要将日志保存到文件,只需打开控制台并输入:

go run receive_logs.go > logs_from_rabbit.log

如果希望在屏幕上查看日志,请切换到一个新的终端并运行:

go run receive_logs.go

当然,要发出日志,请输入:

go run emit_log.go

使用rabbitmqctl list_bindings命令,你可以验证代码是否确实根据需要创建了绑定关系和队列。在运行两个receive_logs.go程序后,你应该看到类似以下内容:

sudo rabbitmqctl list_bindings
# => Listing bindings ...
# => logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
# => logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
# => ...done.

对结果的解释很简单:数据从logs交换器进入了两个由服务器分配名称的队列。这正是我们想要的。

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/668502.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

c++阶梯之类与对象(中)

目录 1.类的6个默认成员函数 2. 构造函数 2.1 构造函数概念的引出 2.2 构造函数的特性 3. 析构函数 3.1 析构函数的概念 3.2 特性 未使用构造与析构的版本 使用了构造与析构函数的版本 4. 拷贝构造函数 4.1 拷贝构造函数的概念 4.2 特性 结语 本节我们来认识…

使用 PyTorch 构建 NLP 聊天机器人

一、说明 聊天机器人提供自动对话&#xff0c;可以帮助用户完成任务或寻求信息。随着深度学习的最新进展&#xff0c;聊天机器人正变得越来越具有对话性和实用性。这个全面的教程将利用 PyTorch 和 Python 从头开始构建聊天机器人&#xff0c;涵盖模型架构、数据准备、训练循环…

AIGC技术讲解以及应用的落地

简介 近期&#xff0c;火爆的“AI绘画”、图片转AI图&#xff0c;智能聊天软件ChatGPT&#xff0c;引起了人们广泛关注。人工智能潜力再次被证明&#xff0c;而这三个概念均来自同一个领域&#xff1a;AIGC。AIGC到底是什么&#xff1f;为什么如此引人关注&#xff1f;AIGC能产…

Linux ---- Shell编程之免交互

一、Here Document 多行重定向 1、Here Document定义 使用I/O重定向的方式将命令列表提供给交互式程序标准输入的一种替代品Here Document 是标准输 入的一种替代品&#xff0c;可以帮助脚本开发人员不必使用临时文件来构建输入信息&#xff0c;而是直接就地生产出一个文件…

Java Lambda 表达式以及对 Lambda 表达式的简化

Java Lambda 表达式以及对 Lambda 表达式的简化 LambdaDemo.java package com.zhong.lambdademo;import java.util.Arrays; import java.util.Comparator;public class LambdaDemo {public static void main(String[] args) {Student[] student new Student[5];student[0] ne…

(15)求两个整数的平均值

文章目录 每日一言题目解题思路代码结语 每日一言 现在&#xff0c;我怕的并不是那艰苦严峻的生活&#xff0c;而是不能再学习和认识我迫切想了解的世界。对我来说&#xff0c;不学习&#xff0c;毋宁死。——罗蒙诺索夫 题目 输入两个整数m和n&#xff0c;写一个函数average…

createvm

New-VM -Name “new 5” -Generation 2 -BootDevice CD -NoVHD Set-VMDvdDrive -VMName TestVM -Path .\WinBuild.iso Set-VMFirmware “Test VM” -EnableSecureBoot Off Start-VM -Name TestVM debug-vm testuefiisov2 -InjectNonMaskableInterrupt -Force Write-Host $MyI…

GrayLog踩坑历险记

背景 GrayLog作为ELK的替代产品&#xff0c;是新生代的日志采集框架。在一个采集节点日志的需求中&#xff0c;因为节点很多&#xff0c;产生的日志也很多&#xff0c;因此尝试了使用GrayLog进行日志的采集。下面记录一下使用GrayLog中遇到的坑和解决方案。 一、部署与启动 …

论软件外包模式与企业信息化建设

企业信息化是企业在发展过程中的推进器&#xff0c;没有也可以勉强存活&#xff0c;但是谈不上加速。软件外包模式就是建造推进器的方法。软件外包模式大体分为以下几类&#xff1a; (1)购买现成的&#xff0c;在上面进行定制开发 代表用友&#xff0c;金蝶厂商&#x…

(十三)Java开发扩展之软件包与安装——JDK和MySQL

文章目录 1、RPM1.1、什么是RPM&#xff1f;1.2、RPM包的名称格式1.2.1、RPM查询命令1.2.2、RPM卸载命令1.2.3、RPM安装命令 2、YUM2.1、什么是YUM?2.2、yum安装程序命令 3、安装JDK4、安装MySQL 1、RPM 1.1、什么是RPM&#xff1f; RPM&#xff08;RedHat Package Manager&a…

《学成在线》微服务实战项目实操笔记系列(P1~P49)【上】

《学成在线》项目实操笔记系列【上】&#xff0c;跟视频的每一P对应&#xff0c;全系列12万字&#xff0c;涵盖详细步骤与问题的解决方案。如果你操作到某一步卡壳&#xff0c;参考这篇&#xff0c;相信会带给你极大启发。同时也欢迎大家提问与讨论&#xff0c;我会尽力帮大家解…

阿里面试:Seata如何实现RC?保证事务的隔离性?

尼恩说在前面 在40岁老架构师 尼恩的读者交流群(50)中&#xff0c;最近有小伙伴拿到了一线互联网企业如阿里、滴滴、极兔、有赞、希音、百度、网易、美团的面试资格&#xff0c;遇到很多很重要的面试题&#xff1a; Seata 如何实现 RC &#xff1f;保证事务的隔离性&#xff1…

使用sql创建数据库以及常用的sql简介

SQL简介 SQL&#xff08;Structured Query Language&#xff09;是一种用于管理关系数据库管理系统的标准编程语言。它用于执行各种操作&#xff0c;如查询、更新、插入和删除数据库中的数据。SQL是一种声明性语言&#xff0c;这意味着它指定了需要执行的操作&#xff0c;但不需…

docker进阶问题二

如何使用Docker的容器调试和故障排查工具&#xff1f; Docker提供了一系列工具和命令来帮助开发者调试和排查容器中的问题。以下是一些常用的调试和故障排查方法&#xff1a; 1. 容器日志 查看容器日志是最基本的调试手段。使用docker logs命令可以查看容器的标准输出&#…

Transformer实战-系列教程3:Vision Transformer 源码解读1

&#x1f6a9;&#x1f6a9;&#x1f6a9;Transformer实战-系列教程总目录 有任何问题欢迎在下面留言 本篇文章的代码运行界面均在Pycharm中进行 本篇文章配套的代码资源已经上传 Vision Transformer 源码解读1 Vision Transformer 源码解读2 Vision Transformer 源码解读3 Vis…

Vue 实现动态路由

Vue 实现动态路由 Vue中实现动态路由主要涉及到两个方面&#xff1a;一是路由的动态添加&#xff0c;二是基于路由的参数变化来动态渲染组件。这通常在使用Vue Router时进行配置和实现。以下是实现动态路由的一些基本步骤和概念&#xff1a; 安装和设置Vue Router npm insta…

kubesphere部署k8s-v1.23.10

功能&#xff1a; &#x1f578; 部署 Kubernetes 集群 &#x1f517; Kubernetes 多集群管理 &#x1f916; Kubernetes DevOps &#x1f50e; 云原生可观测性 &#x1f9e9; 基于 Istio 的微服务治理 &#x1f4bb; 应用商店 &#x1f4a1; Kubernetes 边缘节点管理 &#x1…

latex论文写作遇到的问题

图一&#xff1a; 图二&#xff1a; 图三&#xff1a; 使用模版的时候将图一转为图二&#xff1a;在.tex文件开头导言部分加上&#xff1a; \usepackage{titletoc} \titlecontents{section}[0pt]{\addvspace{1.5pt}\filright\bf}{\contentspush{第\thecontentslabel\ 章\qu…

2024.2.4 awd总结

防御阶段 感觉打了几次awd&#xff0c;前面阶段还算比较熟练 1.ssh连接 靶机登录 修改密码 [root8 ~]# passwd Changing password for user root. New password: Retype new password: 2.xftp连接 备份网站源码 我觉得这步还是非常重要的&#xff0c;万一后面被删站。。…

【幻兽帕鲁】如何快速部署私人服务器

看了许多关于如何部署服务器的&#xff0c;大部分都是要买阿里云或者腾讯云的服务器并且至少四核以上才能保证流畅运行。 但是对于想搭建私服但又没有技术的小白&#xff0c;确实是有点难度了。购买云服务器后还要配置服务器&#xff0c;配置OpenVPN、PalServer&#xff0c;doc…