go消息队列RabbitMQ - 订阅模式-direct

1.发布订阅

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

在Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在向 Exchange发送消息时,也必须指定消息的 RoutingKey。
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

2.绑定

绑定可以采用额外的routing_key参数。为了避免与Channel.Publish参数混淆,我们将其称为binding key。这是我们如何使用键创建绑定的方法:

err = ch.QueueBind(q.Name,    // queue name"black",   // routing key"logs",    // exchangefalse,nil)

 3.直连交换器

direct交换器背后的路由算法很简单——消息进入其binding key与消息的routing key完全匹配的队列。

direct-exchang

绑定了两个队列的direct交换器X。第一个队列绑定键为orange,第二个队列绑定为两个,一个绑定键为black,另一个为green

在这种设置中,使用orange路由键发布到交换器的消息将被路由到队列Q1。路由键为blackgreen的消息将转到Q2。所有其他消息将被丢弃。

4.多重绑定

direct-exchange-multiple

用相同的绑定键绑定多个队列是完全合法的。在我们的示例中,我们可以使用绑定键blackXQ1之间添加绑定。在这种情况下,direct交换器的行为将类似fanout,并将消息广播到所有匹配的队列。带有black路由键的消息将同时传递给Q1Q2

5.发送日志

在日志系统中使用这个模型,发送消息到direct交换器。这样,接收脚本将能够选择其想要接收的日志级别。

先创建一个direct交换器:

err = ch.ExchangeDeclare("logs_direct", // name"direct",      // typetrue,          // durablefalse,         // auto-deletedfalse,         // internalfalse,         // no-waitnil,           // arguments
)

指定routing key发送一条消息:

body := bodyFrom(os.Args)
err = ch.Publish("logs_direct",         // exchangeseverityFrom(os.Args), // routing keyfalse, // mandatoryfalse, // immediateamqp.Publishing{ContentType: "text/plain",Body:        []byte(body),
})

为了简化问题,我们假设“严重性”可以是“info”、“warning”、“error”之一。

6.订阅

为感兴趣的每种严重性(日志级别)创建一个新的绑定。绑定的routing key通过os.Args获取

q, err := ch.QueueDeclare("",    // namefalse, // durablefalse, // delete when unusedtrue,  // exclusivefalse, // no-waitnil,   // arguments
)
failOnError(err, "Failed to declare a queue")if len(os.Args) < 2 {log.Printf("Usage: %s [info] [warning] [error]", os.Args[0])os.Exit(0)
}
// 建立多个绑定关系
for _, s := range os.Args[1:] {log.Printf("Binding queue %s to exchange %s with routing key %s",q.Name, "logs_direct", s)err = ch.QueueBind(q.Name,        // queue names,             // routing key"logs_direct", // exchangefalse,nil)failOnError(err, "Failed to bind a queue")
}

7.完整示例

img

emit_log_direct.go脚本的代码:

package mainimport ("log""os""strings""github.com/streadway/amqp"
)func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()err = ch.ExchangeDeclare("logs_direct", // name"direct",      // typetrue,          // durablefalse,         // auto-deletedfalse,         // internalfalse,         // no-waitnil,           // arguments)failOnError(err, "Failed to declare an exchange")body := bodyFrom(os.Args)err = ch.Publish("logs_direct",         // exchangeseverityFrom(os.Args), // routing keyfalse, // mandatoryfalse, // immediateamqp.Publishing{ContentType: "text/plain",Body:        []byte(body),})failOnError(err, "Failed to publish a message")log.Printf(" [x] Sent %s", body)
}func bodyFrom(args []string) string {var s stringif (len(args) < 3) || os.Args[2] == "" {s = "hello"} else {s = strings.Join(args[2:], " ")}return s
}func severityFrom(args []string) string {var s stringif (len(args) < 2) || os.Args[1] == "" {s = "info"} else {s = os.Args[1]}return s
}

receive_logs_direct.go的代码:

package mainimport ("log""os""github.com/streadway/amqp"
)func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()err = ch.ExchangeDeclare("logs_direct", // name"direct",      // typetrue,          // durablefalse,         // auto-deletedfalse,         // internalfalse,         // no-waitnil,           // arguments)failOnError(err, "Failed to declare an exchange")q, err := ch.QueueDeclare("",    // namefalse, // durablefalse, // delete when unusedtrue,  // exclusivefalse, // no-waitnil,   // arguments)failOnError(err, "Failed to declare a queue")if len(os.Args) < 2 {log.Printf("Usage: %s [info] [warning] [error]", os.Args[0])os.Exit(0)}for _, s := range os.Args[1:] {log.Printf("Binding queue %s to exchange %s with routing key %s",q.Name, "logs_direct", s)err = ch.QueueBind(q.Name,        // queue names,             // routing key"logs_direct", // exchangefalse,nil)failOnError(err, "Failed to bind a queue")}msgs, err := ch.Consume(q.Name, // queue"",     // consumertrue,   // auto ackfalse,  // exclusivefalse,  // no localfalse,  // no waitnil,    // args)failOnError(err, "Failed to register a consumer")forever := make(chan bool)go func() {for d := range msgs {log.Printf(" [x] %s", d.Body)}}()log.Printf(" [*] Waiting for logs. To exit press CTRL+C")<-forever
}

如果你只想将“warning”和“err”(而不是“info”)级别的日志消息保存到文件中,只需打开控制台并输入:

go run receive_logs_direct.go warning error > logs_from_rabbit.log

如果你想在屏幕上查看所有日志消息,请打开一个新终端并执行以下操作:

go run receive_logs_direct.go info warning error
# => [*] Waiting for logs. To exit press CTRL+C

例如,要发出error日志消息,只需输入:

go run emit_log_direct.go error "Run. Run. Or it will explode."
# => [x] Sent 'error':'Run. Run. Or it will explode.'

(这里是(emit_log_direct.go)和(receive_logs_direct.go)的完整源码)

参考文章:

go rabbitmq Routing模式 - 范斯猫 (fansimao.com)

RabbitMQ Go语言客户端教程4——路由 - 范斯猫 (fansimao.com)

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/678716.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

第 384 场 LeetCode 周赛题解

A 修改矩阵 模拟 class Solution { public:vector<vector<int>> modifiedMatrix(vector<vector<int>> &matrix) {int m matrix.size(), n matrix[0].size();vector<int> mx(n, INT32_MIN);for (int i 0; i < m; i)for (int j 0; j &l…

Java微服务学习Day1

文章目录 认识微服务服务拆分及远程调用服务拆分服务远程调用提供者与消费者 Eureka注册中心介绍构建EurekaServer注册user-serviceorder-service完成服务拉取 Ribbon负载均衡介绍原理策略饥饿加载 Nacos注册中心介绍配置分级存储负载均衡环境隔离nacos注册中心原理 认识微服务…

ChatGPT学习大纲

引言 在2023年2月份左右开始使用ChatGPT时&#xff0c;就被它强大的理解能力和应答效果所折服&#xff0c;这期间一直在断断续续的学习和使用&#xff0c;也没形成一个完整的学习过程&#xff0c;最近刚好有空&#xff0c;就寻思着好好再学习总结一下&#xff0c;故写出了ChatG…

Python : 使用python实现学生管理系统的功能,详细注释

一、学生管理系统 学生描述&#xff1a;姓名、年龄、成绩 学生管理系统功能&#xff1a;添加学生信息、删除学生信息、根据姓名修改学生信息、根据姓名查询学生信息、显示所有学生信息、退出系统 二、代码说明 1. 将每一个学生的信息放一个元组中&#xff0c;再把元组添加到列表…

java中使用Lambda表达式实现参数化方法

Lambda表达式实现参数化方法说明 Lambda表达式在Java中是一种简洁、函数式的方式来表示匿名函数。它们特别适用于那些需要一个函数作为参数的方法&#xff0c;即函数式接口。参数化方法&#xff08;通常指的是泛型方法&#xff09;是那些可以接受类型参数的方法&#xff0c;这…

2.3 Verilog 数据类型

Verilog 最常用的 2 种数据类型就是线网&#xff08;wire&#xff09;与寄存器&#xff08;reg&#xff09;&#xff0c;其余类型可以理解为这两种数据类型的扩展或辅助。 线网&#xff08;wire&#xff09; wire 类型表示硬件单元之间的物理连线&#xff0c;由其连接的器件输…

单片机基础入门:简单介绍51单片机的工作原理

在电子技术领域&#xff0c;单片机是实现智能化控制不可或缺的关键元件。它们集成了许多功能于一身&#xff0c;成为了各种电子系统的心脏。为了更好地理解单片机如何工作&#xff0c;本文将重点介绍51单片机的基本组成和工作原理。 51单片机是一种广泛使用的微控制器&#xf…

Android 车载应用之快速入门

一、Android Automotive OS 概览 车载 Android 系统也被称为 Android Automotive OS,是对原始 Android 系统的一个功能扩充版本。与手机系统一样,Android Automotive OS 源代码完全开放,第三方供应商和汽车制造商可以官方源码的基础上自行开发和拓展,无论是编程语言还是各…

CI/CD到底是啥?持续集成/持续部署概念解释

前言 大家好&#xff0c;我是chowley&#xff0c;日常工作中&#xff0c;我每天都在接触CI/CD&#xff0c;今天就给出我心中的答案。 在现代软件开发中&#xff0c;持续集成&#xff08;Continuous Integration&#xff0c;CI&#xff09;和持续部署&#xff08;Continuous D…

【UE 求职】学了虚幻引擎可以应聘哪些岗位?

目录 1 领域1.1 游戏开发领域1.2 影视和动画制作1.3 建筑和工程可视化1.4 模拟和训练1.5 其他领域 2 如何做好一份简历1. 明确简历目标2. 突出UE5相关技能3. 展示相关项目经验4. 教育背景5. 专业经验6. 软技能7. 证书和奖项8. 定制化和校对 &#x1f64b;‍♂️ 作者&#xff1…

Day41- 动态规划part09

一、打家劫舍 题目一&#xff1a;198. 打家劫舍 198. 打家劫舍 你是一个专业的小偷&#xff0c;计划偷窃沿街的房屋。每间房内都藏有一定的现金&#xff0c;影响你偷窃的唯一制约因素就是相邻的房屋装有相互连通的防盗系统&#xff0c;如果两间相邻的房屋在同一晚上被小偷闯…

使用PHPStudy搭建本地web网站并实现任意浏览器公网访问

文章目录 [toc]使用工具1. 本地搭建web网站1.1 下载phpstudy后解压并安装1.2 打开默认站点&#xff0c;测试1.3 下载静态演示站点1.4 打开站点根目录1.5 复制演示站点到站网根目录1.6 在浏览器中&#xff0c;查看演示效果。 2. 将本地web网站发布到公网2.1 安装cpolar内网穿透2…

springcloud分布式架构网上商城源码和论文

首先,论文一开始便是清楚的论述了系统的研究内容。其次,剖析系统需求分析,弄明白“做什么”,分析包括业务分析和业务流程的分析以及用例分析,更进一步明确系统的需求。然后在明白了系统的需求基础上需要进一步地设计系统,主要包罗软件架构模式、整体功能模块、数据库设计。本项…

Unity Meta Quest MR 开发(四):使用 Scene API 和 Depth API 实现深度识别和环境遮挡

文章目录 &#x1f4d5;教程说明&#x1f4d5;Scene API 实现遮挡&#x1f4d5;Scene API 实现遮挡的缺点&#x1f4d5;Depth API 实现遮挡⭐导入 Depth API⭐修改环境配置⭐添加 EnvironmentDepthOcclusion 预制体⭐给物体替换遮挡 Shader⭐取消现实手部的遮挡效果 此教程相关…

从Unity到Three.js(模型文件加载)

模型加载功能探索&#xff0c;用blender导出了个glb格式的cube进行的测试。 初接触js语法&#xff0c;回调注册的地方直接使用匿名函数总感觉脑子跟不上&#xff0c;反应不过来&#xff0c;就把加载后的回调简单封装了下&#xff0c; 官方文档是直接使用的匿名函数。 另外看官方…

带你了解软件系统架构的演变详解

在这个数字时代&#xff0c;我们身边无处不在的软件系统扮演着无比重要的角色。你曾想过背后那复杂的系统是如何演变而来的吗&#xff1f;本文将深入浅出&#xff0c;以小白的视角&#xff0c;描绘软件系统架构的绚丽蜕变历程&#xff0c;让我们一同踏上这场感性而技术的冒险之…

Peter算法小课堂—背包问题

我们已经学过好久好久的动态规划了&#xff0c;动态规划_Peter Pan was right的博客-CSDN博客 那么&#xff0c;我用一张图片来概括一下背包问题。 大家有可能比较疑惑&#xff0c;优化决策怎么优化呢&#xff1f;答案是&#xff0c;滚动数组&#xff0c;一个神秘而简单的东西…

刘谦龙年春晚魔术模拟

守岁共此时 代码 直接贴代码了&#xff0c;异常处理有点问题&#xff0c;正常流程能跑通 package com.yuhan.snginx.util.chunwan;import java.util.*;/*** author yuhan* since 2024/02/10*/ public class CWMS {static String[] num {"A", "2", &quo…

更换商品图片日期JSON格式报错 - 序列化与反序列化日期格式设置

报错信息 msg: “服务端异常&#xff0c;请联系管理员JSON parse error: Cannot deserialize value of type java.util.Date from String “2023-11-13 13:13:35”: not a valid representation (error: Failed to parse Date value ‘2023-11-13 13:13:35’: Cannot parse da…

点云标注工具

目录 3d手势识别 c 3d关键点&#xff0c;Bounding Box Labels Rectangle Labels KITTI 3D Ground Truth Annotator c标注工具 3d手势识别 GitHub - 99xtaewoo/Automated-Hand-3D-pose-annotation-Tool: Automated Hand 3D pose annotation Tool c 3d关键点&#xff0c;Bou…