RabbitMQ教程:发布/订阅模式(Publish/Subscribe)(三)

文章目录

  • RabbitMQ教程:发布/订阅模式(Publish/Subscribe)(三)
    • 一、引言
    • 二、简介
    • 三、准备工作
      • 3.1 说明
      • 3.2 生成项目
    • 四、实战
      • 4.1 交换机(Exchanges)
      • 4.2 临时队列(Temporary Queues)
      • 4.3 绑定(Bindings)
      • 4.4 整合代码
        • 发布程序
        • 订阅程序
      • 4.5 验证一:先广播后订阅
      • 4.6 验证二:先订阅后广播
    • 五、结论

RabbitMQ教程:发布/订阅模式(Publish/Subscribe)(三)

一、引言

在快节奏的软件开发世界中,我们经常面临需要将消息发送给多个接收者的场景。例如,在构建日志监控系统、实时通知系统等场景时,我们希望一个事件的发生能够被多个服务同时感知和处理。这时,发布/订阅模式(Publish/Subscribe)就显得尤为重要。在本教程中,我们将通过一个简单的例子来学习如何使用RabbitMQ实现发布/订阅模式。

二、简介

在上一篇教程中,我们学习了如何使用RabbitMQ实现工作队列(Work Queues)。今天,我们将探索工作队列的进阶应用——发布/订阅模式,这是一种允许多个接收者(Subscribers)监听同一个消息通道,并在消息发布时接收通知的机制。发布/订阅模式的核心在于解耦消息的发送者(Publisher)和接收者(Subscribers),发送者不需要知道有哪些接收者,只需要将消息发送到一个交换机(Exchange),而接收者则订阅这个交换机来接收消息。

三、准备工作

3.1 说明

在本教程中,我们将使用RabbitMQ的.NET客户端来创建一个简单的发布/订阅系统。我们将创建一个名为logs的fanout类型的交换机,并将所有日志消息广播给所有订阅了该交换机的队列。

3.2 生成项目

首先,我们需要生成两个项目:

EmitLogApp:用于模拟日志消息的发布者。
ReceiveLogsApp:用于接收并打印日志消息的订阅者。
我们可以使用以下命令来创建这两个项目:

dotnet new console --name EmitLog
cd EmitLog
dotnet add package RabbitMQ.Client
cd ..
dotnet new console --name ReceiveLogs
cd ReceiveLogs
dotnet add package RabbitMQ.Client

这些命令创建了两个新的控制台应用程序,一个用于发送日志消息,另一个用于接收并打印日志消息。

四、实战

4.1 交换机(Exchanges)

在之前的教程中,我们直接将消息发送到队列。现在,我们需要引入交换机(Exchange)的概念。在RabbitMQ中,生产者从不直接向队列发送消息,而是发送到交换机,然后由交换机将消息推送到一个或多个队列。交换机的行为由交换机类型定义。

我们将创建一个名为logsfanout类型的交换机,它将广播所有接收到的消息给所有绑定到它的队列。
在这里插入图片描述

channel.ExchangeDeclare("logs", ExchangeType.Fanout);

4.2 临时队列(Temporary Queues)

在我们的日志系统中,我们希望每个运行的接收者程序都能接收到所有日志消息。因此,我们不需要为队列指定名称,而是让服务器为我们生成一个随机名称。同时,我们希望在消费者断开连接后队列能自动删除。

在.NET客户端中,我们可以创建一个非持久性、独占的、自动删除的队列,并让服务器为我们生成一个名称:

var queueName = channel.QueueDeclare().QueueName;

4.3 绑定(Bindings)

我们已经创建了一个fanout交换机和一个队列。现在,我们需要告诉交换机将消息发送到我们的队列。交换机和队列之间的关系称为绑定(Binding)。

channel.QueueBind(queue: queueName, exchange: "logs", routingKey: string.Empty);

在这里插入图片描述

4.4 整合代码

发布程序
using RabbitMQ.Client;
using System.Text;await PublishMessagesAsync(10);
/// <summary>
/// 发布指定次数的消息到RabbitMQ队列
/// </summary>
/// <param name="loopCount">消息发送的次数</param>
/// <returns>Task对象,表示异步操作</returns>
async Task PublishMessagesAsync(int loopCount)
{// 循环发送指定次数的消息for (int i = 1; i <= loopCount; i++){// 调用SendMessageToQueue方法发送消息,并包含当前迭代次数await SendMessageToQueue($"Iteration {i} - Hello World");// 这里可以添加延迟,如果需要的话// await Task.Delay(1000);}Console.ReadLine();
}
/// <summary>
/// 向RabbitMQ队列发送一条消息
/// </summary>
/// <param name="message">要发送的消息内容</param>
/// <returns>Task对象,表示异步操作</returns>
async Task SendMessageToQueue(string message)
{// 创建连接工厂,并设置RabbitMQ服务器地址为localhostvar factory = new ConnectionFactory { HostName = "localhost" };// 使用异步方式创建连接using var connection = await factory.CreateConnectionAsync();// 使用异步方式创建通道using var channel = await connection.CreateChannelAsync();//声明名为"logs"的fanout类型的交换机await channel.ExchangeDeclareAsync(exchange: "logs", type: ExchangeType.Fanout);// 将消息内容编码为字节数组var body = Encoding.UTF8.GetBytes(message);// 异步发布消息到队列await channel.BasicPublishAsync(exchange: "logs", routingKey: string.Empty, body: body);Console.WriteLine($" [x] Sent {message}");
}
订阅程序
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
// 创建连接工厂,并设置RabbitMQ服务器地址为localhos
var factory = new ConnectionFactory { HostName = "localhost" };
// 使用异步方式创建连接
using var connection = await factory.CreateConnectionAsync();
// 使用异步方式创建通道
using var channel = await connection.CreateChannelAsync();// 异步声明一个名为"logs"的fanout类型的交换机
// 交换机会将所有接收到的消息广播给所有绑定到它的队列
await channel.ExchangeDeclareAsync(exchange: "logs",type: ExchangeType.Fanout);// 声明一个由服务器命名的队列,这样每个消费者都会有一个唯一的队列
// 这使得我们可以有多个消费者同时接收消息,而不会相互干扰
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
string queueName = queueDeclareResult.QueueName;// 将由服务器创建的队列绑定到"logs"交换机
// 这样,交换机就会将消息发送到这个队列
await channel.QueueBindAsync(queue: queueName, exchange: "logs", routingKey: string.Empty);
// 输出提示信息,表示消费者正在等待日志消息
Console.WriteLine(" [*] Waiting for logs.");// 创建一个异步事件驱动的消费者
var consumer = new AsyncEventingBasicConsumer(channel);// 设置当消费者接收到消息时的事件处理程序
consumer.ReceivedAsync += (model, ea) =>
{// 从接收到的消息中提取消息体并转换为字符串byte[] body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);// 打印接收到的消息Console.WriteLine($" [x] {message}");// 返回Task.CompletedTask以满足异步事件处理的签名要求return Task.CompletedTask;
};
// 开始消费指定队列的消息
// 这个调用会告诉RabbitMQ服务器,我们有一个消费者准备好接收"logs"交换机绑定的队列中的消息
await channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer);Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();

4.5 验证一:先广播后订阅

运行EmitLogApp\bin\Debug\net8.0\EmitLogApp.exe,即先发布广播;再运行ReceiveLogsApp\bin\Debug\net8.0\ReceiveLogsApp.exe进行订阅广播。可以发现,没有接收到任何内容。原因是需要先启动订阅者(消费者),再启动广播(发布者/生产者)才可以接收到消息
在这里插入图片描述

4.6 验证二:先订阅后广播

先运行ReceiveLogsApp\bin\Debug\net8.0\ReceiveLogsApp.exe
再运行EmitLogApp\bin\Debug\net8.0\EmitLogApp.exe;可以发现每个消费者(订阅者)都收到了相同信息。
在这里插入图片描述

五、结论

在本教程中,我们深入探讨了RabbitMQ发布/订阅模式的概念和实现。通过构建一个简单的日志系统,我们学习了如何创建fanout类型的交换机,以及如何发送和接收消息。以下是我们从本教程中获得的关键要点:

  1. 解耦发送者和接收者:发布/订阅模式允许发送者和接收者之间没有直接的联系,发送者只需要将消息发送到交换机,而接收者则订阅交换机来接收消息。

  2. 消息广播fanout类型的交换机会将所有接收到的消息广播给所有绑定到它的队列,这对于日志系统、事件通知等场景非常有用。

  3. 临时队列:我们使用了临时队列来接收消息,这样每个订阅者都会有自己的队列,并且在订阅者断开连接后,队列会自动删除。

  4. 动态订阅:订阅者可以随时订阅或取消订阅交换机,这使得系统具有很高的灵活性和动态性。

通过这些机制,我们能够建立一个高效的发布/订阅系统,它不仅能够提高系统的响应速度,还能够在面对各种异常情况时保持消息的可靠性和持久性。这些知识为我们在实际开发中实现复杂的事件驱动架构提供了坚实的基础。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/60028.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CANoe发送和接收CAN DataBase(DBC文件)数据

目录 1、接收CAN数据&#xff0c;DBC解析数据内容 2、发送DBC定义的CAN报文 CANoe是一款强大的网络仿真和开发工具&#xff0c;它支持发送和接收基于CAN DataBase&#xff08;DBC文件&#xff09;的数据。 1、接收CAN数据&#xff0c;DBC解析数据内容 要使用CANoe接收CAN数…

【第六课】Rust所有权系统(二)

目录 前言 借用和引用 借用规则 切片和迭代器 总结 前言 上节课介绍了Rust中的所有权系统&#xff0c;简单回顾一下&#xff0c;rust的内存系统系统&#xff0c;每一块内存都有一个主人&#xff0c;主人对这块内存有着读写和释放的权限&#xff0c;当主人离开作用域之后&am…

ISUP协议视频平台EasyCVR私有化部署视频平台如何实现RTMP推流将大疆无人机的视频画面回传?

在现代视频监控和流媒体技术领域&#xff0c;EasyCVR视频融合云平台以其卓越的性能和灵活性&#xff0c;成为了跨区域、网络化视频监控综合管理的理想选择。作为TSINGSEE青犀视频“云边端”架构体系中的核心组件&#xff0c;私有化部署视频平台EasyCVR不仅能够实现视频数据的集…

排序算法 -归并排序

文章目录 1. 归并排序&#xff08;Merge Sort&#xff09;1.1 简介1.2 归并排序的步骤1.3 归并排序c 语言实现代码说明 1.4 时间复杂度1.5 空间复杂度1.6 动画 1. 归并排序&#xff08;Merge Sort&#xff09; 1.1 简介 归并排序&#xff08;Merge Sort&#xff09;是一种基于…

unity 一个物体随键盘上下左右旋转和前进的脚本

注意&#xff1a;脚本挂在gamaobject 上面 &#xff0c;操作对象的目标 this.gameObject 为操作对象 using System.Collections; using System.Collections.Generic; using UnityEngine;public class changePosition : MonoBehaviour {//操作对象的目标 this.gameObject 为操…

视频里的音频怎么提取出来成单独文件?音频提取照着这些方法做

在数字时代&#xff0c;视频与音频的分离与重组已成为日常需求之一。无论是出于制作背景音乐、保存讲座内容&#xff0c;还是编辑播客素材&#xff0c;提取视频中的音频并将其保存为单独文件都显得尤为重要。视频里的音频怎么提取出来成单独文件&#xff1f;本文将详细介绍几种…

React(一)

文章目录 项目地址一、创建第一个react项目二、JSX语法2.1 生成列表2.2 大括号识别JS的表达式2.3 列表循环array2.4 条件判断以及假值显示2.5 复杂条件渲染2.6 事件监听和绑定2.7 使用Fregments返回多个根标签2.8 多条件渲染2.9 导出子组件 三、组件3.1 设置组件3.2 props给子组…

记录一下在原有的接口中增加文件上传☞@RequestPart

首先&#xff0c;咱声明一下&#xff1a; RequestBody和 MultipartFile 不可以 同时使用&#xff01;&#xff01;&#xff01; 因为这两者预期的请求内容类型不同。RequestBody 预期请求的 Content-Type 是 application/json 或 application/xml&#xff0c;而 MultipartFile …

HTTPSOK ---助力阿里云免费 SSL 证书自动续期

目前许多用户面临着 SSL 证书过期续期的难题&#xff0c;尤其是对于阿里云的 免费 SSL 证书&#xff0c;每三个月需要手动申请和更新。为了帮助用户更轻松地管理 SSL 证书&#xff0c;现推出了强大的 HTTPSOK 服务&#xff0c;为用户提供了更便捷的自动续期和管理解决方案。 什…

5G的SUCI、SUPI、5G-GUTI使用场景及关系

使用场景(来源于对23.501、23.502、33.501、23.003的理解) 1、UE初始注册时&#xff0c;根据HN Public Key把SUPI加密成SUCI&#xff0c;并发送初始注册请求 2、AMF转发SUCI给AUSF和UDM进行认证&#xff0c;并获取解密后的SUPI 3、AMF根据SUPI生成一个5G-GUTI&#xff0c;并保…

大数据-226 离线数仓 - Flume 优化配置 自定义拦截器 拦截原理 拦截器实现 Java

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; Java篇开始了&#xff01; 目前开始更新 MyBatis&#xff0c;一起深入浅出&#xff01; 目前已经更新到了&#xff1a; Hadoop&#xff0…

PyAEDT:Ansys Electronics Desktop API 简介

在本文中&#xff0c;我将向您介绍 PyAEDT&#xff0c;这是一个 Python 库&#xff0c;旨在增强您对 Ansys Electronics Desktop 或 AEDT 的体验。PyAEDT 通过直接与 AEDT API 交互来简化脚本编写&#xff0c;从而允许在 Ansys 的电磁、热和机械求解器套件之间无缝集成。通过利…

定时器(QTimer)与随机数生成器(QRandomGenerator)的应用实践——Qt(C++)

一、QTimer与QRandomGenerator &#xff08;一&#xff09;QTimer&#xff08;定时器&#xff09;[2] QTimer类为定时功能提供了一个高级编程接口。在使用QTimer时&#xff0c;实例化一个QTimer对象并将其timeout()发射信号与合适的信号槽相连接。通过调用QTimer的start()函数…

用redis的zset实现日榜,周榜,月榜

思路&#xff1a; 1.初始化一个月的数据&#xff1a; /*** 初始化一个月数据*/Testpublic void initMonthData(){//计算当前时间小时的keylong hourSystem.currentTimeMillis()/(1000*60*60);for(int i1;i<24*30;i){String key"W_hour"(hour-i);Random random new…

通过shell脚本分析部署nginx网络服务

通过shell脚本分析部署nginx网络服务 1.接收用户部署的服务名称 [rootlocalhost xzy]# vim 1.sh [rootlocalhost xzy]# chmod x 1.sh [rootlocalhost xzy]# ./1.sh2.判断服务是否安装 已安装&#xff1b;自定义网站配置路径为/www&#xff1b;并创建共享目录和网页文件&…

威胁驱动的网络安全方法论

摘要 目前的网络安全风险管理实践很大程度上是由合规性要求驱动的&#xff0c;这使得公司/组织不得不在安全控制和漏洞上投入人力/物力。&#xff08;风险管理涉及多个方面&#xff0c;包括资产、威胁、漏洞和控制&#xff0c;并根据事故发生的可能性及造成的影响进行评估。威…

『VUE』30. 生命周期的介绍(详细图文注释)

目录 生命周期生命周期的8阶段生命周期小例子总结 欢迎关注 『VUE』 专栏&#xff0c;持续更新中 欢迎关注 『VUE』 专栏&#xff0c;持续更新中 生命周期 每个 Vue 组件实例在创建时都需要经历一系列的初始化步骤&#xff0c;比如设置好数据侦听&#xff0c;编译模板&#xf…

idea 通过git撤销commit但未push的操作

1、undo commit 适用情况&#xff1a;代码修改完了&#xff0c;已经Commit了&#xff0c;但是还未push&#xff0c;然后发现还有地方需要修改不想提交本次记录了。这时可以进行Undo Commit&#xff0c;修改后再重新Commit。注意&#xff1a;如果已经进行了Push&#xff0c;线上…

【graphics】图形绘制 C++

众所周知&#xff0c;周知所众&#xff0c;图形绘制对于竞赛学僧毫无用处&#xff0c;所以这个文章&#xff0c;专门对相关人员教学&#xff08;成长中的码农、高中僧、大学僧&#xff09;。 他人经验教学参考https://blog.csdn.net/qq_46107892/article/details/133386358?o…

Spring Boot出现java: 错误: 无效的源发行版:16的解决方式

第一步&#xff1a; 修改为SDK的目标字节码版本 第二步&#xff1a;CtrlShiftAltS进入项目结构 第三步&#xff1a;pom.xml文件中 在网上搜索和自己SDK适配的Springboot版本&#xff0c;1.8对应的是2.7.1&#xff08;可以用&#xff09; 修改Java版本为1.8 最后的最后&a…