RabbitMQ 发布订阅

 RabbitMQ 发布订阅视频学习地址:

简单模式下RabbitMQ 发布者发布消息 消费者消费消息

Publist/Subscribe 发布订阅

RabbitMQ 中,发布订阅模式是一种消息传递方式,其中发送者(发布者)不会将消息直接发送到特 定的接收者(订阅者)。而是将消息发送到一个交换机,交换机将消息转发到绑定到该交换机的每个队 ,每个绑定交换机的队列都将接收到消息。消费者(订阅者)监听自己的队列 并进行消费 。

 

场景 : 开放平台 开发者订阅了某个开放平台的 api 之后,数据有变化就会自动获取到最新的

 

 

 

在订阅模型中,多了一个 Exchange 角色,而且过程略有变化:

 

P :生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给 X (交换机)
C :消费者,消息的接收者,会一直等待消息到来
Queue :消息队列,接收消息、缓存消息
Exchange :交换机( X )。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递 交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange 的类型。
Exchange 有常见以下 3 种类型:
Fanout :广播,将消息交给所有绑定到交换机的队列
Direct :定向,把消息交给符合指定 routing key 的队列
Topic :通配符,把消息交给符合 routing pattern (路由模式) 的队列
Exchange (交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失

 

RabbitMQ 发布订阅模式的一些应用场景:  

 

1. 数据提供商与应用商 :例如中国气象局向多个门户网站提供气象数据。
2. 新闻机构 :将独家新闻发布给多个订阅者,但可能需要根据新闻类型进行更精细的路由。
3. 商城系统 :新添加商品后,同时更新缓存和数据库。
4. 用户通知 :用户充值或转账成功后,通过多种方式(如短信、邮件)通知用户。
5. 消息广播 :将消息广播到多个消费者,例如系统公告、活动通知等。
6. 降低耦合 :生产者和消费者通过 RabbitMQ 进行解耦,不需要直接连接,提高系统的灵活性和可
扩展性。
7. 异步处理 :生产者发送消息后,消费者可以异步处理,提高系统的响应速度和并发处理能力。

 

生产者
emit_log.go

 

package main
import (
"context"
"log"
"os"
"strings"
"github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
if err != nil {
log.Printf("%s: %s", msg, err)
}
}
func bodyForm(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "hello"
} else {
s = strings.Join(args[1:], " ")
}
return s
}
func main() {
// 连接到RabbitMQ服务器
conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 创建一个通道
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
//声明一个交换机
err = ch.ExchangeDeclare(
"logs", //name 交换机名称
"fanout", //交换机类型 Fanout 广播
true, //durable 持久化
false, //autoDelete 是否自动删除
false, //internal 是否内部使用 设置为 false 时,表示无论如何这个交换器都不是
内置的
false, //noWait 是否等待服务器响应 参数通常默认为False,意味着操作会同步进
行并等待服务器的响应
nil, // 其他属性
)
failOnError(err, "Failed to declare an exchange")
//发送消息
body := bodyForm(os.Args)
// 发布消息到交换机,并指定路由键
err = ch.PublishWithContext(
context.Background(),
"logs", // 交换器的名称
"", // 队列名
false, // mandatory 必须发送到队列 ,false表示如果交换器无法根据自身的类型和路
由键找到一个符合条件的队列丢弃
false, //immediate 参数设置为 false 时,表示消息不需要立即被消费者接收
amqp091.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent: %s", body)
}

 

消费者
receive_log.go

 

package main
import (
"log"
"github.com/rabbitmq/amqp091-go"
)
func failOnError2(err error, msg string) {
if err != nil {
log.Printf("%s: %s", msg, err)
}
}
func main() {
//建立连接
conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/")
failOnError2(err, "Failed to connect to RabbitMQ")
defer conn.Close()
//创建一个Channel
ch, err := conn.Channel()
failOnError2(err, "Failed to open a channel")
defer ch.Close()
//声明一个交换机
err = ch.ExchangeDeclare(
"logs", // 交换机名称
"fanout", // 交换机类型
true, // 是否持久化
false, // 是否自动删除
false, // 是否内部使用
false, // 是否等待服务器响应
nil, // 其他属性
)
failOnError2(err, "Failed to declare an exchange")
// 声明一个临时队列
q, err := ch.QueueDeclare(
"", // 队列名称,留空表示由RabbitMQ自动生成
false, // 是否持久化
false, // 是否自动删除(当没有任何消费者连接时)
true, // 是否排他队列(仅限于当前连接)
false, // 是否等待服务器响应
nil, // 其他属性
)
failOnError2(err, "Failed to declare a queue")
// 将队列绑定到交换机上
err = ch.QueueBind(
q.Name, // 队列名称
"", // 路由键,留空表示接收交换机的所有消息
"logs", // 交换机名称
false, // 是否等待服务器响应
nil, // 其他属性
)
failOnError2(err, "Failed to bind a queue")
msgs, err := ch.Consume(
q.Name, // 队列名称
"", // 消费者标识符,留空表示由RabbitMQ自动生成
true, // 是否自动应答
false, // 是否独占模式(仅限于当前连接)
false, // 是否等待服务器响应
false, // noLocal
nil, // 其他属性
)
// msgs, err := ch.Consume(q.Name, "", true, false, false, false, nil)
failOnError2(err, "Failed to register a consumer")
var forever chan struct{}
go func() {
for d := range msgs {
log.Printf(" [x] %s", d.Body)
}
}()
log.Printf(" [x] Waiting for logs. To exit press CTRL+C")
<-forever
}

 运行

# 如果你想保存日志文件
go run receive_log.go > logs_from_rabbit.log
# 如果你想再终端看到日志
go run receive_log.go
# shell2
go run emit_log.go

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/15371.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于open3d对kitti数据集检测结果可视化

前言 KITTI数据集是自动驾驶和计算机视觉领域中一个广泛使用的基准数据集&#xff0c;它提供了丰富的传感器数据&#xff0c;包括激光雷达、相机和GPS等。Open3D是一个功能强大的3D数据处理和可视化库&#xff0c;支持多种3D数据格式。本文将介绍如何使用Open3D对KITTI数据集的…

详解 Spring MVC(Spring MVC 简介)

什么是 Spring MVC&#xff1f; Spring MVC 是 Spring 框架提供的一个基于 MVC 模式的轻量级 Web 框架&#xff0c;是 Spring 为表示层开发提供的一整套完整的解决方案&#xff0c;Spring MVC 使用了 MVC 架构模式&#xff0c;将 Web 层职责解耦&#xff0c;基于请求驱动模型&…

基于Java、SpringBoot和uniapp在线考试系统安卓APP和微信小程序

摘要 基于Java、SpringBoot和uniapp的在线考试系统安卓APP微信小程序是一种结合了现代Web开发技术和移动应用技术的解决方案&#xff0c;旨在为教育机构提供一个方便、高效和灵活的在线考试平台。该系统采用Java语言进行后端开发&#xff0c;使用SpringBoot框架简化企业级应用…

SpringCloud微服务之Nacos、Feign、GateWay详解

SpringCloud微服务之Nacos、Feign、GateWay详解 1、Nacos配置管理1.1、统一配置管理1.1.1、在nacos中添加配置文件1.1.2、从微服务拉取配置 1.2、配置热更新1.2.1、方式一1.2.2、方式二 1.3、配置共享1.3.1、配置共享的优先级 1.4、搭建nacos集群1.4.1、初始化数据库1.4.2、下载…

plt多子图设置

import matplotlib.pyplot as plt# 使用 subplots 函数创建一个 2x3 的子图网格 fig, axs plt.subplots(nrows2, ncols3, figsize(16, 10)) # 调整 figsize 来改变图像大小# 遍历每个子图&#xff0c;并绘制一些内容&#xff08;这里只是简单的示例&#xff09; for ax in ax…

C语言之函数和函数库以及自己制作静态动态链接库并使用

一&#xff1a;函数的本质 1&#xff1a;C语言为什么会有函数 &#xff08;1&#xff09;整个程序分为多个源文件&#xff0c;一个文件分为多个函数&#xff0c;一个函数分成多个语句&#xff0c;这就是整个程序的组织形式。这样的组织好处在于&#xff1a;分化问题、、便于程序…

分布式版本控制工具 git

git 是什么 分布式版本控制工具。github 是代码托管平台。 git 有什么用 保存文件的所有修改记录。使用版本号&#xff08;sha1 哈希值&#xff09; 进行区分。随时可浏览历史版本记录。可还原到历史指定版本。对比不同版本的文件差异。 为什么要使用 git 多人协作开发一个大…

STM32手写超频到128M函数

今天学习了野火的STM32教程学会了如何设置STM32的时钟频率&#xff0c;步骤比较详细&#xff0c;也很容易理解&#xff0c;就是视频教程不能跳着看&#xff0c;只能一节节的看&#xff0c;不然会知识不连贯&#xff0c;造成有些知识不理解&#xff0c;连续着看还是没有什么难度…

docker-file 网络

docker挂载 1.绑定挂载&#xff08;Bind Mounts&#xff09;&#xff1a;绑定挂载是将主机上的文件或目录挂载到容器中。 docker run -v /host/path:/container/path image_name 2.卷挂载&#xff08;Volume Mounts&#xff09;&#xff1a;卷挂载将 Docker 数据卷挂载到容器中…

【CTF Web】CTFShow web4 Writeup(SQL注入+PHP+字符型注入)

web4 1 管理员阿呆又失败了&#xff0c;这次一定要堵住漏洞 解法 注意到&#xff1a; <!-- flag in id 1000 -->拦截很多种字符&#xff0c;连 select 也不给用了。 if(preg_match("/or|\-|\\\|\/|\\*|\<|\>|\!|x|hex|\(|\)|\|select/i",$id)){die(&q…

Android开发-Android开发中的TCP与UDP通信策略的实现

Android 开发中的 TCP 与 UDP 通信策略的实现 1. 前言2. 准备工作3. Kotlin 中 TCP 通信实现客户端代码示例&#xff1a;服务器代码示例&#xff1a; 4. Kotlin 中 UDP 通信实现客户端代码示例&#xff1a;服务器代码示例&#xff1a; 5. TCP 与 UDP 应用场景分析TCP 实现可靠传…

搭建访问阿里云百炼大模型环境

最近这波大降价&#xff0c;还有限时免费&#xff0c;还不赶快试试在线大模型&#xff1f;下面整理访问百炼平台的千问模型方法。 创建RAM子账号并授权 创建RAM子账号 1. “访问控制RAM”入口&#xff08;控制台URL&#xff09; 然后点击进入“RAM管理控制台” 2. 添加用户 …

vue 区分多环境打包

需求&#xff1a;区分不同的环境&#xff08;测试、正式环境&#xff09;&#xff0c;接口文档地址不同&#xff1b; 配置步骤&#xff1a; 1、在根目录下面新建 .env.xxx 文件&#xff08;xxx 根据环境不同配置&#xff09; 文件中一定要配置的参数项为&#xff1a;NODE_ENV…

【Python搞定车载自动化测试】——Python实现CAN总线Bootloader刷写(含Python源码)

系列文章目录 【Python搞定车载自动化测试】系列文章目录汇总 文章目录 系列文章目录&#x1f4af;&#x1f4af;&#x1f4af; 前言&#x1f4af;&#x1f4af;&#x1f4af;一、环境搭建1.软件环境2.硬件环境 二、目录结构三、源码展示1.诊断基础函数方法2.诊断业务函数方法…

qmt量化教程4----订阅全推数据

文章链接 qmt量化教程4----订阅全推数据 (qq.com) 上次写了订阅单股数据的教程 量化教程3---miniqmt当作第三方库设置&#xff0c;提供源代码 全推就主动推送&#xff0c;当行情有变化就会触发回调函数&#xff0c;推送实时数据&#xff0c;可以理解为数据驱动类型&#xff0…

vs2019 c++ 函数的返回值是对象的值传递时候,将调用对象的移动构造函数

以前倒没有注意过这个问题。但编译器这么处理也符合移动构造的语义。因为本来函数体内的变量也要离开作用域被销毁回收了。测试如下&#xff1a; 谢谢

[SCTF2019]babyre

打开看看还是有花指令 解除后首先pass1是解maze&#xff0c;好像又是三维的 x是25&#xff0c;也就是向下跳五层,注意是立体的 得到 passwd1&#xff1a; ddwwxxssxaxwwaasasyywwdd 接着往下看 有一个加密函数IDA逆向常用宏定义_lodword-CSDN博客 unsigned __int64 __fastca…

primeflex样式库笔记 Display相关的案例

回顾 宽度设置的基本总结 w-full&#xff1a;表示widtdh&#xff1a;100%&#xff1b;占满父容器的宽度。 w-screen&#xff1a;表示占满整个屏幕的宽度。 w-1到w-12&#xff0c;是按百分比划分宽度&#xff0c;数字越大&#xff0c;占据的比例就越大。 w-1rem到w-30rem&…

Oracle的安装以及一些相关问题

系列文章目录 Oracle的安装以及一些相关问题 文章目录 系列文章目录前言一、Oracle的安装二、常用命令三、误删dbf四、PLSQL乱码五、oracle更换数据库字符集总结 前言 一段时间没更新&#xff0c;主要最近一直在找工作&#xff0c;最终还是顺着春招找到工作了&#xff0c;现在…

七大经典排序算法——冒泡排序

文章目录 &#x1f4d1;冒泡排序介绍&#x1f324;️代码实现&#x1f324;️做个简单的优化&#x1f324;️复杂度和稳定性分析☁️结语 &#x1f4d1;冒泡排序介绍 冒泡排序是一种简单但效率较低的排序算法。它重复地比较相邻的两个元素&#xff0c;如果顺序不对则交换它们&…