go消息队列RabbitMQ - 订阅模式-direct

1.发布订阅

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

在Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在向 Exchange发送消息时,也必须指定消息的 RoutingKey。
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

2.绑定

绑定可以采用额外的routing_key参数。为了避免与Channel.Publish参数混淆,我们将其称为binding key。这是我们如何使用键创建绑定的方法:

err = ch.QueueBind(q.Name,    // queue name"black",   // routing key"logs",    // exchangefalse,nil)

 3.直连交换器

direct交换器背后的路由算法很简单——消息进入其binding key与消息的routing key完全匹配的队列。

direct-exchang

绑定了两个队列的direct交换器X。第一个队列绑定键为orange,第二个队列绑定为两个,一个绑定键为black,另一个为green

在这种设置中,使用orange路由键发布到交换器的消息将被路由到队列Q1。路由键为blackgreen的消息将转到Q2。所有其他消息将被丢弃。

4.多重绑定

direct-exchange-multiple

用相同的绑定键绑定多个队列是完全合法的。在我们的示例中,我们可以使用绑定键blackXQ1之间添加绑定。在这种情况下,direct交换器的行为将类似fanout,并将消息广播到所有匹配的队列。带有black路由键的消息将同时传递给Q1Q2

5.发送日志

在日志系统中使用这个模型,发送消息到direct交换器。这样,接收脚本将能够选择其想要接收的日志级别。

先创建一个direct交换器:

err = ch.ExchangeDeclare("logs_direct", // name"direct",      // typetrue,          // durablefalse,         // auto-deletedfalse,         // internalfalse,         // no-waitnil,           // arguments
)

指定routing key发送一条消息:

body := bodyFrom(os.Args)
err = ch.Publish("logs_direct",         // exchangeseverityFrom(os.Args), // routing keyfalse, // mandatoryfalse, // immediateamqp.Publishing{ContentType: "text/plain",Body:        []byte(body),
})

为了简化问题,我们假设“严重性”可以是“info”、“warning”、“error”之一。

6.订阅

为感兴趣的每种严重性(日志级别)创建一个新的绑定。绑定的routing key通过os.Args获取

q, err := ch.QueueDeclare("",    // namefalse, // durablefalse, // delete when unusedtrue,  // exclusivefalse, // no-waitnil,   // arguments
)
failOnError(err, "Failed to declare a queue")if len(os.Args) < 2 {log.Printf("Usage: %s [info] [warning] [error]", os.Args[0])os.Exit(0)
}
// 建立多个绑定关系
for _, s := range os.Args[1:] {log.Printf("Binding queue %s to exchange %s with routing key %s",q.Name, "logs_direct", s)err = ch.QueueBind(q.Name,        // queue names,             // routing key"logs_direct", // exchangefalse,nil)failOnError(err, "Failed to bind a queue")
}

7.完整示例

img

emit_log_direct.go脚本的代码:

package mainimport ("log""os""strings""github.com/streadway/amqp"
)func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()err = ch.ExchangeDeclare("logs_direct", // name"direct",      // typetrue,          // durablefalse,         // auto-deletedfalse,         // internalfalse,         // no-waitnil,           // arguments)failOnError(err, "Failed to declare an exchange")body := bodyFrom(os.Args)err = ch.Publish("logs_direct",         // exchangeseverityFrom(os.Args), // routing keyfalse, // mandatoryfalse, // immediateamqp.Publishing{ContentType: "text/plain",Body:        []byte(body),})failOnError(err, "Failed to publish a message")log.Printf(" [x] Sent %s", body)
}func bodyFrom(args []string) string {var s stringif (len(args) < 3) || os.Args[2] == "" {s = "hello"} else {s = strings.Join(args[2:], " ")}return s
}func severityFrom(args []string) string {var s stringif (len(args) < 2) || os.Args[1] == "" {s = "info"} else {s = os.Args[1]}return s
}

receive_logs_direct.go的代码:

package mainimport ("log""os""github.com/streadway/amqp"
)func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()err = ch.ExchangeDeclare("logs_direct", // name"direct",      // typetrue,          // durablefalse,         // auto-deletedfalse,         // internalfalse,         // no-waitnil,           // arguments)failOnError(err, "Failed to declare an exchange")q, err := ch.QueueDeclare("",    // namefalse, // durablefalse, // delete when unusedtrue,  // exclusivefalse, // no-waitnil,   // arguments)failOnError(err, "Failed to declare a queue")if len(os.Args) < 2 {log.Printf("Usage: %s [info] [warning] [error]", os.Args[0])os.Exit(0)}for _, s := range os.Args[1:] {log.Printf("Binding queue %s to exchange %s with routing key %s",q.Name, "logs_direct", s)err = ch.QueueBind(q.Name,        // queue names,             // routing key"logs_direct", // exchangefalse,nil)failOnError(err, "Failed to bind a queue")}msgs, err := ch.Consume(q.Name, // queue"",     // consumertrue,   // auto ackfalse,  // exclusivefalse,  // no localfalse,  // no waitnil,    // args)failOnError(err, "Failed to register a consumer")forever := make(chan bool)go func() {for d := range msgs {log.Printf(" [x] %s", d.Body)}}()log.Printf(" [*] Waiting for logs. To exit press CTRL+C")<-forever
}

如果你只想将“warning”和“err”(而不是“info”)级别的日志消息保存到文件中,只需打开控制台并输入:

go run receive_logs_direct.go warning error > logs_from_rabbit.log

如果你想在屏幕上查看所有日志消息,请打开一个新终端并执行以下操作:

go run receive_logs_direct.go info warning error
# => [*] Waiting for logs. To exit press CTRL+C

例如,要发出error日志消息,只需输入:

go run emit_log_direct.go error "Run. Run. Or it will explode."
# => [x] Sent 'error':'Run. Run. Or it will explode.'

(这里是(emit_log_direct.go)和(receive_logs_direct.go)的完整源码)

参考文章:

go rabbitmq Routing模式 - 范斯猫 (fansimao.com)

RabbitMQ Go语言客户端教程4——路由 - 范斯猫 (fansimao.com)

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/678716.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

第 384 场 LeetCode 周赛题解

A 修改矩阵 模拟 class Solution { public:vector<vector<int>> modifiedMatrix(vector<vector<int>> &matrix) {int m matrix.size(), n matrix[0].size();vector<int> mx(n, INT32_MIN);for (int i 0; i < m; i)for (int j 0; j &l…

Java微服务学习Day1

文章目录 认识微服务服务拆分及远程调用服务拆分服务远程调用提供者与消费者 Eureka注册中心介绍构建EurekaServer注册user-serviceorder-service完成服务拉取 Ribbon负载均衡介绍原理策略饥饿加载 Nacos注册中心介绍配置分级存储负载均衡环境隔离nacos注册中心原理 认识微服务…

Python : 使用python实现学生管理系统的功能,详细注释

一、学生管理系统 学生描述&#xff1a;姓名、年龄、成绩 学生管理系统功能&#xff1a;添加学生信息、删除学生信息、根据姓名修改学生信息、根据姓名查询学生信息、显示所有学生信息、退出系统 二、代码说明 1. 将每一个学生的信息放一个元组中&#xff0c;再把元组添加到列表…

单片机基础入门:简单介绍51单片机的工作原理

在电子技术领域&#xff0c;单片机是实现智能化控制不可或缺的关键元件。它们集成了许多功能于一身&#xff0c;成为了各种电子系统的心脏。为了更好地理解单片机如何工作&#xff0c;本文将重点介绍51单片机的基本组成和工作原理。 51单片机是一种广泛使用的微控制器&#xf…

【UE 求职】学了虚幻引擎可以应聘哪些岗位?

目录 1 领域1.1 游戏开发领域1.2 影视和动画制作1.3 建筑和工程可视化1.4 模拟和训练1.5 其他领域 2 如何做好一份简历1. 明确简历目标2. 突出UE5相关技能3. 展示相关项目经验4. 教育背景5. 专业经验6. 软技能7. 证书和奖项8. 定制化和校对 &#x1f64b;‍♂️ 作者&#xff1…

使用PHPStudy搭建本地web网站并实现任意浏览器公网访问

文章目录 [toc]使用工具1. 本地搭建web网站1.1 下载phpstudy后解压并安装1.2 打开默认站点&#xff0c;测试1.3 下载静态演示站点1.4 打开站点根目录1.5 复制演示站点到站网根目录1.6 在浏览器中&#xff0c;查看演示效果。 2. 将本地web网站发布到公网2.1 安装cpolar内网穿透2…

springcloud分布式架构网上商城源码和论文

首先,论文一开始便是清楚的论述了系统的研究内容。其次,剖析系统需求分析,弄明白“做什么”,分析包括业务分析和业务流程的分析以及用例分析,更进一步明确系统的需求。然后在明白了系统的需求基础上需要进一步地设计系统,主要包罗软件架构模式、整体功能模块、数据库设计。本项…

Unity Meta Quest MR 开发(四):使用 Scene API 和 Depth API 实现深度识别和环境遮挡

文章目录 &#x1f4d5;教程说明&#x1f4d5;Scene API 实现遮挡&#x1f4d5;Scene API 实现遮挡的缺点&#x1f4d5;Depth API 实现遮挡⭐导入 Depth API⭐修改环境配置⭐添加 EnvironmentDepthOcclusion 预制体⭐给物体替换遮挡 Shader⭐取消现实手部的遮挡效果 此教程相关…

Peter算法小课堂—背包问题

我们已经学过好久好久的动态规划了&#xff0c;动态规划_Peter Pan was right的博客-CSDN博客 那么&#xff0c;我用一张图片来概括一下背包问题。 大家有可能比较疑惑&#xff0c;优化决策怎么优化呢&#xff1f;答案是&#xff0c;滚动数组&#xff0c;一个神秘而简单的东西…

点云标注工具

目录 3d手势识别 c 3d关键点&#xff0c;Bounding Box Labels Rectangle Labels KITTI 3D Ground Truth Annotator c标注工具 3d手势识别 GitHub - 99xtaewoo/Automated-Hand-3D-pose-annotation-Tool: Automated Hand 3D pose annotation Tool c 3d关键点&#xff0c;Bou…

bcdedit /store 填什么,Windows11的BCD文件在哪里?

Windows11为EFI引导&#xff0c;bcd文件在 EFI分区的 \EFI\Microsoft\Boot\BCD 可以选择挂载EFI分区&#xff0c;或者使用如下方式&#xff0c;该路径可充当盘符使用。 例 bcdedit /store Z:\EFI\Microsoft\Boot\BCD /enum /v

【LeetCode每日一题】二维前缀和基本概念与案例

二维前缀和 根据某个块块 的 左上角坐标&#xff0c;和右下角坐标 求出 块块的累加和。 304. 二维区域和检索 - 矩阵不可变 /*** param {number[][]} matrix*/ var NumMatrix function(matrix) {let row matrix.length;let col matrix[0].length;// 初始化一个二维数组&am…

项目02《游戏-13-开发》Unity3D

基于 项目02《游戏-12-开发》Unity3D &#xff0c; 任务 &#xff1a;宠物系统 及 人物头像血条 首先在主面板MainPanel预制体中新建一个Panel&#xff0c; 命名为PlayerInfo 新建Image&#xff0c;作为头像 新建Slider&#xff0c;作为血条 对Panel组件添加一个水…

PE 特征码定位修改程序清单 uiAccess

requestedExecutionLevel level"asInvoker" uiAccess"false" 可以修改这一行来启用禁用原程序的盾牌图标&#xff0c;似乎作用不大。以前没事写的一个小玩意&#xff0c;记录一下。 等同于这里的设置&#xff1a; 截图 代码如下&#xff1a; #include …

谷粒商城【成神路】-【7】——库存系统

目录 &#x1f9c8;1.仓库维护 &#x1f35f;&#x1f35f;1.1配置网关陆游规则 &#x1f35f;&#x1f35f;1.2修改模糊查询 &#x1f95e;2.仓库库存 &#x1f37f;3.采购需需求 &#x1f35f;&#x1f35f;3.1采购的模糊检索 &#x1f35f;&#x1f35f;3.2合并…

system V——进程间通信

上一篇博客中我介绍了system V进程间通信中的内存共享&#xff0c;但是其中还有两 种通信方式&#xff1a;消息队列、和信号量&#xff0c;接下来我将简单介绍一下&#xff0c;消息队列和 信号量以及操作系统是如何看待system V进程间通信的。1. 消息队列 a. 大致介绍 消息队…

4核8G服务器配置性能怎么样?12M带宽配置服务器能干什么?

腾讯云轻量4核8G12M轻量应用服务器支持多少人同时在线&#xff1f;通用型-4核8G-180G-2000G&#xff0c;2000GB月流量&#xff0c;系统盘为180GB SSD盘&#xff0c;12M公网带宽&#xff0c;下载速度峰值为1536KB/s&#xff0c;即1.5M/秒&#xff0c;假设网站内页平均大小为60KB…

CVE-2012-1823 漏洞复现

CVE-2012-1823 PHP SAPI 与运行模式 首先&#xff0c;介绍一下PHP的运行模式。 下载PHP源码&#xff0c;可以看到其中有个目录叫sapi。sapi在PHP中的作用&#xff0c;类似于一个消息的“传递者”&#xff0c;比如在《Fastcgi协议分析 && PHP-FPM未授权访问漏洞 &…

中年低端中产程序员从西安出发到海南三亚低成本吃喝万里行:西安-南宁-湛江-雷州-徐闻-博鳌-陵水-三亚-重庆-西安

文章大纲 旅途规划来回行程的确定南宁 - 北海 - 湛江轮渡成为了最终最大的不确定性&#xff01;感谢神州租车气温与游玩地点总体花费 游玩过程出发时间&#xff1a;Day1-1月25日星期四&#xff0c;西安飞南宁路途中&#xff1a;Day2-1月26日星期五&#xff0c;南宁-湛江-住雷州…

工业自动化监控界面与网页、移动UI大相径庭,不能机械照搬。

工业自动化系统监控界面与网页UI、移动UI设计的不同之处主要体现在以下几个方面&#xff1a; 设备和数据展示&#xff1a;工业自动化系统监控界面需要展示大量的设备状态和实时数据&#xff0c;如传感器数据、设备运行状态等。相比之下&#xff0c;网页UI和移动UI设计更注重内容…