go消息队列RabbitMQ - 订阅模式-fanout

1、发布订阅 

订阅模式,消息被路由投递给多个队列,一个消息被多个消费者获取。

1) 可以有多个消费者
2) 每个消费者有自己的queue(队列)
3) 每个队列都要绑定到Exchange(交换机)
4) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
5) 交换机把消息发送给绑定过的所有队列
6) 队列的消费者都能拿到消息。实现一条消息被多个消费者消费

  • 相关场景:邮件群发,群聊天,广播(广告)

2、Exchanges(交换器)

消息产生者将消息放入交换机,交换机发布订阅把消息发送到所有消息队列中,对应消息队列的消费者拿到消息进行消费

有几种交换器类型可用:directtopicheaders 和 fanout。我们将集中讨论最后一个——fanout

2.1 创建交换器

创建一个这种类型的交换器,并给它起个名字叫logs

err = ch.ExchangeDeclare("logs",   // name"fanout", // typetrue,     // durablefalse,    // auto-deletedfalse,    // internalfalse,    // no-waitnil,      // arguments
)

fanout(扇出)交换器非常简单。正如你可能从名称中猜测的那样,它只是将接收到的所有消息广播到它知道的所有队列中。 

2.2 临时队列

也是自动删除队列吗,和普通队列在使用上没有什么区别,唯一的区别是,当消费者断开连接时,队列将会被删除。自动删除队列允许的消费者没有限制,也就是说当这个队列上最后一个消费者断开连接才会执行删除。

自动删除队列只需要在声明队列时,设置属性auto-delete标识为true即可。系统声明的随机队列,缺省就是自动删除的。

q, err := ch.QueueDeclare("",    // 空字符串作为队列名称false, // 非持久队列false, // delete when unusedtrue,  // 独占队列(当前声明队列的连接关闭后即被删除)false, // no-waitnil,   // arguments
)

上述方法返回时,生成的队列实例包含RabbitMQ生成的随机队列名称。例如,它可能看起来像amq.gen-JzTY20BRgKO-HjmUJj0wLg

2.3 交换器与队列绑定

 

交换器和队列之间的关系称为绑定

err = ch.QueueBind(q.Name, // queue name"",     // routing key"logs", // exchangefalse,nil,
)

从现在开始,logs交换器将会把消息添加到我们的队列中。

2.4 发布消息到交换机

例如,发布到fanout交换器:

body := bodyFrom(os.Args)
err = ch.Publish("logs", // exchange"",     // routing keyfalse,  // mandatoryfalse,  // immediateamqp.Publishing{ContentType: "text/plain",Body:        []byte(body),})

 3 完整代码

产生日志消息的生产程序与上一教程看起来没有太大不同。最重要的变化是我们现在希望将消息发布到logs交换器,而不是空的消息交换器。发送时,我们需要提供一个routingKey,但是对于fanout型交换器,它的值可以被忽略(传空字符串)。下面是emit_log.go脚本的代码:

package mainimport ("log""os""strings""github.com/streadway/amqp"
)func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()err = ch.ExchangeDeclare("logs",   // name"fanout", // typetrue,     // durablefalse,    // auto-deletedfalse,    // internalfalse,    // no-waitnil,      // arguments)failOnError(err, "Failed to declare an exchange")body := bodyFrom(os.Args)err = ch.Publish("logs", // exchange"",     // routing keyfalse,  // mandatoryfalse,  // immediateamqp.Publishing{ContentType: "text/plain",Body:        []byte(body),})failOnError(err, "Failed to publish a message")log.Printf(" [x] Sent %s", body)
}func bodyFrom(args []string) string {var s stringif (len(args) < 2) || os.Args[1] == "" {s = "hello"} else {s = strings.Join(args[1:], " ")}return s
}

(emit_logs.go源码)

如你所见,在建立连接之后,我们声明了交换器。此步骤是必需的,因为禁止发布到不存在的交换器。

如果没有队列绑定到交换器,那么消息将丢失,但这对我们来说是ok的。如果没有消费者在接收,我们可以安全地丢弃该消息。

receive_logs.go的代码:

package mainimport ("log""github.com/streadway/amqp"
)func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()err = ch.ExchangeDeclare("logs",   // name"fanout", // typetrue,     // durablefalse,    // auto-deletedfalse,    // internalfalse,    // no-waitnil,      // arguments)failOnError(err, "Failed to declare an exchange")q, err := ch.QueueDeclare("",    // namefalse, // durablefalse, // delete when unusedtrue,  // exclusivefalse, // no-waitnil,   // arguments)failOnError(err, "Failed to declare a queue")err = ch.QueueBind(q.Name, // queue name"",     // routing key"logs", // exchangefalse,nil,)failOnError(err, "Failed to bind a queue")msgs, err := ch.Consume(q.Name, // queue"",     // consumertrue,   // auto-ackfalse,  // exclusivefalse,  // no-localfalse,  // no-waitnil,    // args)failOnError(err, "Failed to register a consumer")forever := make(chan bool)go func() {for d := range msgs {log.Printf(" [x] %s", d.Body)}}()log.Printf(" [*] Waiting for logs. To exit press CTRL+C")<-forever
}

(receive_logs.go源码)

如果要将日志保存到文件,只需打开控制台并输入:

go run receive_logs.go > logs_from_rabbit.log

如果希望在屏幕上查看日志,请切换到一个新的终端并运行:

go run receive_logs.go

当然,要发出日志,请输入:

go run emit_log.go

使用rabbitmqctl list_bindings命令,你可以验证代码是否确实根据需要创建了绑定关系和队列。在运行两个receive_logs.go程序后,你应该看到类似以下内容:

sudo rabbitmqctl list_bindings
# => Listing bindings ...
# => logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
# => logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
# => ...done.

对结果的解释很简单:数据从logs交换器进入了两个由服务器分配名称的队列。这正是我们想要的。

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/668502.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

c++阶梯之类与对象(中)

目录 1.类的6个默认成员函数 2. 构造函数 2.1 构造函数概念的引出 2.2 构造函数的特性 3. 析构函数 3.1 析构函数的概念 3.2 特性 未使用构造与析构的版本 使用了构造与析构函数的版本 4. 拷贝构造函数 4.1 拷贝构造函数的概念 4.2 特性 结语 本节我们来认识…

使用 PyTorch 构建 NLP 聊天机器人

一、说明 聊天机器人提供自动对话&#xff0c;可以帮助用户完成任务或寻求信息。随着深度学习的最新进展&#xff0c;聊天机器人正变得越来越具有对话性和实用性。这个全面的教程将利用 PyTorch 和 Python 从头开始构建聊天机器人&#xff0c;涵盖模型架构、数据准备、训练循环…

AIGC技术讲解以及应用的落地

简介 近期&#xff0c;火爆的“AI绘画”、图片转AI图&#xff0c;智能聊天软件ChatGPT&#xff0c;引起了人们广泛关注。人工智能潜力再次被证明&#xff0c;而这三个概念均来自同一个领域&#xff1a;AIGC。AIGC到底是什么&#xff1f;为什么如此引人关注&#xff1f;AIGC能产…

Linux ---- Shell编程之免交互

一、Here Document 多行重定向 1、Here Document定义 使用I/O重定向的方式将命令列表提供给交互式程序标准输入的一种替代品Here Document 是标准输 入的一种替代品&#xff0c;可以帮助脚本开发人员不必使用临时文件来构建输入信息&#xff0c;而是直接就地生产出一个文件…

(15)求两个整数的平均值

文章目录 每日一言题目解题思路代码结语 每日一言 现在&#xff0c;我怕的并不是那艰苦严峻的生活&#xff0c;而是不能再学习和认识我迫切想了解的世界。对我来说&#xff0c;不学习&#xff0c;毋宁死。——罗蒙诺索夫 题目 输入两个整数m和n&#xff0c;写一个函数average…

GrayLog踩坑历险记

背景 GrayLog作为ELK的替代产品&#xff0c;是新生代的日志采集框架。在一个采集节点日志的需求中&#xff0c;因为节点很多&#xff0c;产生的日志也很多&#xff0c;因此尝试了使用GrayLog进行日志的采集。下面记录一下使用GrayLog中遇到的坑和解决方案。 一、部署与启动 …

(十三)Java开发扩展之软件包与安装——JDK和MySQL

文章目录 1、RPM1.1、什么是RPM&#xff1f;1.2、RPM包的名称格式1.2.1、RPM查询命令1.2.2、RPM卸载命令1.2.3、RPM安装命令 2、YUM2.1、什么是YUM?2.2、yum安装程序命令 3、安装JDK4、安装MySQL 1、RPM 1.1、什么是RPM&#xff1f; RPM&#xff08;RedHat Package Manager&a…

《学成在线》微服务实战项目实操笔记系列(P1~P49)【上】

《学成在线》项目实操笔记系列【上】&#xff0c;跟视频的每一P对应&#xff0c;全系列12万字&#xff0c;涵盖详细步骤与问题的解决方案。如果你操作到某一步卡壳&#xff0c;参考这篇&#xff0c;相信会带给你极大启发。同时也欢迎大家提问与讨论&#xff0c;我会尽力帮大家解…

阿里面试:Seata如何实现RC?保证事务的隔离性?

尼恩说在前面 在40岁老架构师 尼恩的读者交流群(50)中&#xff0c;最近有小伙伴拿到了一线互联网企业如阿里、滴滴、极兔、有赞、希音、百度、网易、美团的面试资格&#xff0c;遇到很多很重要的面试题&#xff1a; Seata 如何实现 RC &#xff1f;保证事务的隔离性&#xff1…

Transformer实战-系列教程3:Vision Transformer 源码解读1

&#x1f6a9;&#x1f6a9;&#x1f6a9;Transformer实战-系列教程总目录 有任何问题欢迎在下面留言 本篇文章的代码运行界面均在Pycharm中进行 本篇文章配套的代码资源已经上传 Vision Transformer 源码解读1 Vision Transformer 源码解读2 Vision Transformer 源码解读3 Vis…

kubesphere部署k8s-v1.23.10

功能&#xff1a; &#x1f578; 部署 Kubernetes 集群 &#x1f517; Kubernetes 多集群管理 &#x1f916; Kubernetes DevOps &#x1f50e; 云原生可观测性 &#x1f9e9; 基于 Istio 的微服务治理 &#x1f4bb; 应用商店 &#x1f4a1; Kubernetes 边缘节点管理 &#x1…

latex论文写作遇到的问题

图一&#xff1a; 图二&#xff1a; 图三&#xff1a; 使用模版的时候将图一转为图二&#xff1a;在.tex文件开头导言部分加上&#xff1a; \usepackage{titletoc} \titlecontents{section}[0pt]{\addvspace{1.5pt}\filright\bf}{\contentspush{第\thecontentslabel\ 章\qu…

2024.2.4 awd总结

防御阶段 感觉打了几次awd&#xff0c;前面阶段还算比较熟练 1.ssh连接 靶机登录 修改密码 [root8 ~]# passwd Changing password for user root. New password: Retype new password: 2.xftp连接 备份网站源码 我觉得这步还是非常重要的&#xff0c;万一后面被删站。。…

【幻兽帕鲁】如何快速部署私人服务器

看了许多关于如何部署服务器的&#xff0c;大部分都是要买阿里云或者腾讯云的服务器并且至少四核以上才能保证流畅运行。 但是对于想搭建私服但又没有技术的小白&#xff0c;确实是有点难度了。购买云服务器后还要配置服务器&#xff0c;配置OpenVPN、PalServer&#xff0c;doc…

解锁亚马逊测评防关联新技术:亚马逊鲲鹏系统

在亚马逊测评的过程中&#xff0c;一直以来都存在着一些技术难题&#xff0c;特别是在模拟买家行为时需要考虑诸多因素&#xff0c;包括关键词搜索、IP地址切换以及防关联等。然而&#xff0c;最新的技术突破&#xff0c;亚马逊鲲鹏系统正是为了解决这些问题而诞生的。 首先&am…

No matching client found for package name ‘com.unity3d.player‘

2024年2月5日更新 下面的一系列操作最终可能都无用&#xff0c;大致这问题出现原因是我在Unity采用了Android方式接入Firebase&#xff0c;而Android接入实际上和Unity接入方式有配置上的不一样&#xff0c;我就是多做了几步操作如下。https://firebase.google.com/docs/androi…

【Java】Redis入门

1. Redis入门 1.1 Redis简介 Redis是一个基于内存的key-value结构数据库。Redis 是互联网技术领域使用最为广泛的存储中间件。 官网&#xff1a;https://redis.io 中文网&#xff1a;https://www.redis.net.cn/ key-value结构存储&#xff1a; 主要特点&#xff1a; 基于内…

docker复习笔记01(小滴课堂)安装+部署mysql

查看内核版本。 关闭防火墙&#xff1a; 查看docker版本&#xff1a; 下载阿里yum源&#xff1a; 再看一下yum版本都有哪些&#xff1a; 我们可以看的docker-ce了。 安装它&#xff1a; 设置docker服务开机启动&#xff1a; 更新日志文件&#xff1a; 启动docker&#xff1a; …

CSS写渐变边框线条

box-sizing: border-box; border-top: 1px solid; border-image: linear-gradient(to right, red, blue) 1;

STM32F407移植OpenHarmony笔记9

继上一篇笔记&#xff0c;已经完成liteos内核的基本功能适配。 今天尝试启动OHOS和XTS兼容性测试。 如何启动OHOS&#xff1f; OHOS系统初始化接口是OHOS_SystemInit(void)&#xff0c;在内核初始化完成后&#xff0c;就能调用。 extern void OHOS_SystemInit(void); OHOS_Sys…