RabbitMQ教程:发布/订阅模式(Publish/Subscribe)(三)

文章目录

  • RabbitMQ教程:发布/订阅模式(Publish/Subscribe)(三)
    • 一、引言
    • 二、简介
    • 三、准备工作
      • 3.1 说明
      • 3.2 生成项目
    • 四、实战
      • 4.1 交换机(Exchanges)
      • 4.2 临时队列(Temporary Queues)
      • 4.3 绑定(Bindings)
      • 4.4 整合代码
        • 发布程序
        • 订阅程序
      • 4.5 验证一:先广播后订阅
      • 4.6 验证二:先订阅后广播
    • 五、结论

RabbitMQ教程:发布/订阅模式(Publish/Subscribe)(三)

一、引言

在快节奏的软件开发世界中,我们经常面临需要将消息发送给多个接收者的场景。例如,在构建日志监控系统、实时通知系统等场景时,我们希望一个事件的发生能够被多个服务同时感知和处理。这时,发布/订阅模式(Publish/Subscribe)就显得尤为重要。在本教程中,我们将通过一个简单的例子来学习如何使用RabbitMQ实现发布/订阅模式。

二、简介

在上一篇教程中,我们学习了如何使用RabbitMQ实现工作队列(Work Queues)。今天,我们将探索工作队列的进阶应用——发布/订阅模式,这是一种允许多个接收者(Subscribers)监听同一个消息通道,并在消息发布时接收通知的机制。发布/订阅模式的核心在于解耦消息的发送者(Publisher)和接收者(Subscribers),发送者不需要知道有哪些接收者,只需要将消息发送到一个交换机(Exchange),而接收者则订阅这个交换机来接收消息。

三、准备工作

3.1 说明

在本教程中,我们将使用RabbitMQ的.NET客户端来创建一个简单的发布/订阅系统。我们将创建一个名为logs的fanout类型的交换机,并将所有日志消息广播给所有订阅了该交换机的队列。

3.2 生成项目

首先,我们需要生成两个项目:

EmitLogApp:用于模拟日志消息的发布者。
ReceiveLogsApp:用于接收并打印日志消息的订阅者。
我们可以使用以下命令来创建这两个项目:

dotnet new console --name EmitLog
cd EmitLog
dotnet add package RabbitMQ.Client
cd ..
dotnet new console --name ReceiveLogs
cd ReceiveLogs
dotnet add package RabbitMQ.Client

这些命令创建了两个新的控制台应用程序,一个用于发送日志消息,另一个用于接收并打印日志消息。

四、实战

4.1 交换机(Exchanges)

在之前的教程中,我们直接将消息发送到队列。现在,我们需要引入交换机(Exchange)的概念。在RabbitMQ中,生产者从不直接向队列发送消息,而是发送到交换机,然后由交换机将消息推送到一个或多个队列。交换机的行为由交换机类型定义。

我们将创建一个名为logsfanout类型的交换机,它将广播所有接收到的消息给所有绑定到它的队列。
在这里插入图片描述

channel.ExchangeDeclare("logs", ExchangeType.Fanout);

4.2 临时队列(Temporary Queues)

在我们的日志系统中,我们希望每个运行的接收者程序都能接收到所有日志消息。因此,我们不需要为队列指定名称,而是让服务器为我们生成一个随机名称。同时,我们希望在消费者断开连接后队列能自动删除。

在.NET客户端中,我们可以创建一个非持久性、独占的、自动删除的队列,并让服务器为我们生成一个名称:

var queueName = channel.QueueDeclare().QueueName;

4.3 绑定(Bindings)

我们已经创建了一个fanout交换机和一个队列。现在,我们需要告诉交换机将消息发送到我们的队列。交换机和队列之间的关系称为绑定(Binding)。

channel.QueueBind(queue: queueName, exchange: "logs", routingKey: string.Empty);

在这里插入图片描述

4.4 整合代码

发布程序
using RabbitMQ.Client;
using System.Text;await PublishMessagesAsync(10);
/// <summary>
/// 发布指定次数的消息到RabbitMQ队列
/// </summary>
/// <param name="loopCount">消息发送的次数</param>
/// <returns>Task对象,表示异步操作</returns>
async Task PublishMessagesAsync(int loopCount)
{// 循环发送指定次数的消息for (int i = 1; i <= loopCount; i++){// 调用SendMessageToQueue方法发送消息,并包含当前迭代次数await SendMessageToQueue($"Iteration {i} - Hello World");// 这里可以添加延迟,如果需要的话// await Task.Delay(1000);}Console.ReadLine();
}
/// <summary>
/// 向RabbitMQ队列发送一条消息
/// </summary>
/// <param name="message">要发送的消息内容</param>
/// <returns>Task对象,表示异步操作</returns>
async Task SendMessageToQueue(string message)
{// 创建连接工厂,并设置RabbitMQ服务器地址为localhostvar factory = new ConnectionFactory { HostName = "localhost" };// 使用异步方式创建连接using var connection = await factory.CreateConnectionAsync();// 使用异步方式创建通道using var channel = await connection.CreateChannelAsync();//声明名为"logs"的fanout类型的交换机await channel.ExchangeDeclareAsync(exchange: "logs", type: ExchangeType.Fanout);// 将消息内容编码为字节数组var body = Encoding.UTF8.GetBytes(message);// 异步发布消息到队列await channel.BasicPublishAsync(exchange: "logs", routingKey: string.Empty, body: body);Console.WriteLine($" [x] Sent {message}");
}
订阅程序
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
// 创建连接工厂,并设置RabbitMQ服务器地址为localhos
var factory = new ConnectionFactory { HostName = "localhost" };
// 使用异步方式创建连接
using var connection = await factory.CreateConnectionAsync();
// 使用异步方式创建通道
using var channel = await connection.CreateChannelAsync();// 异步声明一个名为"logs"的fanout类型的交换机
// 交换机会将所有接收到的消息广播给所有绑定到它的队列
await channel.ExchangeDeclareAsync(exchange: "logs",type: ExchangeType.Fanout);// 声明一个由服务器命名的队列,这样每个消费者都会有一个唯一的队列
// 这使得我们可以有多个消费者同时接收消息,而不会相互干扰
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
string queueName = queueDeclareResult.QueueName;// 将由服务器创建的队列绑定到"logs"交换机
// 这样,交换机就会将消息发送到这个队列
await channel.QueueBindAsync(queue: queueName, exchange: "logs", routingKey: string.Empty);
// 输出提示信息,表示消费者正在等待日志消息
Console.WriteLine(" [*] Waiting for logs.");// 创建一个异步事件驱动的消费者
var consumer = new AsyncEventingBasicConsumer(channel);// 设置当消费者接收到消息时的事件处理程序
consumer.ReceivedAsync += (model, ea) =>
{// 从接收到的消息中提取消息体并转换为字符串byte[] body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);// 打印接收到的消息Console.WriteLine($" [x] {message}");// 返回Task.CompletedTask以满足异步事件处理的签名要求return Task.CompletedTask;
};
// 开始消费指定队列的消息
// 这个调用会告诉RabbitMQ服务器,我们有一个消费者准备好接收"logs"交换机绑定的队列中的消息
await channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer);Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();

4.5 验证一:先广播后订阅

运行EmitLogApp\bin\Debug\net8.0\EmitLogApp.exe,即先发布广播;再运行ReceiveLogsApp\bin\Debug\net8.0\ReceiveLogsApp.exe进行订阅广播。可以发现,没有接收到任何内容。原因是需要先启动订阅者(消费者),再启动广播(发布者/生产者)才可以接收到消息
在这里插入图片描述

4.6 验证二:先订阅后广播

先运行ReceiveLogsApp\bin\Debug\net8.0\ReceiveLogsApp.exe
再运行EmitLogApp\bin\Debug\net8.0\EmitLogApp.exe;可以发现每个消费者(订阅者)都收到了相同信息。
在这里插入图片描述

五、结论

在本教程中,我们深入探讨了RabbitMQ发布/订阅模式的概念和实现。通过构建一个简单的日志系统,我们学习了如何创建fanout类型的交换机,以及如何发送和接收消息。以下是我们从本教程中获得的关键要点:

  1. 解耦发送者和接收者:发布/订阅模式允许发送者和接收者之间没有直接的联系,发送者只需要将消息发送到交换机,而接收者则订阅交换机来接收消息。

  2. 消息广播fanout类型的交换机会将所有接收到的消息广播给所有绑定到它的队列,这对于日志系统、事件通知等场景非常有用。

  3. 临时队列:我们使用了临时队列来接收消息,这样每个订阅者都会有自己的队列,并且在订阅者断开连接后,队列会自动删除。

  4. 动态订阅:订阅者可以随时订阅或取消订阅交换机,这使得系统具有很高的灵活性和动态性。

通过这些机制,我们能够建立一个高效的发布/订阅系统,它不仅能够提高系统的响应速度,还能够在面对各种异常情况时保持消息的可靠性和持久性。这些知识为我们在实际开发中实现复杂的事件驱动架构提供了坚实的基础。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/60028.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

html兼容性问题处理

文章目录 HTML5兼容性问题及解决方法1. 标签支持问题2. 兼容性检测3. 属性值支持问题4. 媒体支持问题5. Web API支持问题6. CSS兼容性问题7. 特定浏览器问题的解决方法 HTML5兼容性问题及解决方法 HTML5作为一种新的标记语言&#xff0c;虽然带来了许多新特性和改进&#xff0…

CANoe发送和接收CAN DataBase(DBC文件)数据

目录 1、接收CAN数据&#xff0c;DBC解析数据内容 2、发送DBC定义的CAN报文 CANoe是一款强大的网络仿真和开发工具&#xff0c;它支持发送和接收基于CAN DataBase&#xff08;DBC文件&#xff09;的数据。 1、接收CAN数据&#xff0c;DBC解析数据内容 要使用CANoe接收CAN数…

【第六课】Rust所有权系统(二)

目录 前言 借用和引用 借用规则 切片和迭代器 总结 前言 上节课介绍了Rust中的所有权系统&#xff0c;简单回顾一下&#xff0c;rust的内存系统系统&#xff0c;每一块内存都有一个主人&#xff0c;主人对这块内存有着读写和释放的权限&#xff0c;当主人离开作用域之后&am…

ISUP协议视频平台EasyCVR私有化部署视频平台如何实现RTMP推流将大疆无人机的视频画面回传?

在现代视频监控和流媒体技术领域&#xff0c;EasyCVR视频融合云平台以其卓越的性能和灵活性&#xff0c;成为了跨区域、网络化视频监控综合管理的理想选择。作为TSINGSEE青犀视频“云边端”架构体系中的核心组件&#xff0c;私有化部署视频平台EasyCVR不仅能够实现视频数据的集…

LeetCode 2816.翻倍以链表形式表示的数字

题目&#xff1a; 给你一个 非空 链表的头节点 head &#xff0c;表示一个不含前导零的非负数整数。 将链表 翻倍 后&#xff0c;返回头节点 head 。 思路&#xff1a; 思路一&#xff1a;反转链表&#xff0c;两个相同的链表求和 思路二&#xff1a;如果不考虑进位&#…

力扣(leetcode)面试经典150题——27. 移除元素

题目 给你一个数组 nums 和一个值 val&#xff0c;你需要 原地 移除所有数值等于 val 的元素。元素的顺序可能发生改变。然后返回 nums 中与 val 不同的元素的数量。 假设 nums 中不等于 val 的元素数量为 k&#xff0c;要通过此题&#xff0c;您需要执行以下操作&#xff1a…

VueDPlayer视频插件的使用

VueDPlayer 是一个基于 DPlayer 的 Vue 封装组件&#xff0c;DPlayer 是一个 HTML5 视频播放器&#xff0c;支持弹幕、视频倍速播放、视频预加载等功能。 以下是如何在 Vue.js 项目中使用 VueDPlayer 的步骤&#xff1a; 1.安装 VueDPlayer&#xff1a; 使用 npm 或 yarn 安装…

排序算法 -归并排序

文章目录 1. 归并排序&#xff08;Merge Sort&#xff09;1.1 简介1.2 归并排序的步骤1.3 归并排序c 语言实现代码说明 1.4 时间复杂度1.5 空间复杂度1.6 动画 1. 归并排序&#xff08;Merge Sort&#xff09; 1.1 简介 归并排序&#xff08;Merge Sort&#xff09;是一种基于…

前端开发之 节流与防抖

防抖节流的作用是什么&#xff1f; 节流&#xff08;throttle&#xff09;与 防抖&#xff08;debounce&#xff09;都是为了限制函数的执行频次&#xff0c;以优化函数触发频率过高导致的响应速度跟不上触发频率&#xff0c;出现延迟&#xff0c;假死或卡顿的现象。 其实很多前…

unity 一个物体随键盘上下左右旋转和前进的脚本

注意&#xff1a;脚本挂在gamaobject 上面 &#xff0c;操作对象的目标 this.gameObject 为操作对象 using System.Collections; using System.Collections.Generic; using UnityEngine;public class changePosition : MonoBehaviour {//操作对象的目标 this.gameObject 为操…

C# 事件编程详解

文章目录 1.什么是事件?2.事件的声明与使用2.1 声明事件2.2 订阅与触发事件3.事件的核心概念3.1 事件处理委托3.2 自定义事件参数4.事件的高级用法4.1 多播委托与事件4.2 事件解除订阅4.3 自定义事件访问器5.事件的应用场景5.1 GUI 应用程序中的事件5.2 基于事件的编程模型5.3…

C# 属性与结构

C# 属性 C# 属性&#xff0c;属性是一种特殊的类成员。 我们使用预定义的 set 和 get 方法来访问和修改它们。 属性读取和写入会转换为获取和设置方法调用。 与使用自定义方法调用&#xff08;例如object.GetName()&#xff09;相比&#xff0c;使用字段符号&#xff08;例如o…

Linux系统性能调优技巧详解

Linux系统性能调优技巧详解 在Linux系统中,性能调优是确保系统在高负载下依然能够稳定、高效运行的重要环节。调优的目标包括优化系统资源的利用率(如CPU、内存、磁盘和网络),减少瓶颈,并提升系统的响应速度。本文将深入探讨Linux系统性能调优的技巧,并结合代码使用案例…

视频里的音频怎么提取出来成单独文件?音频提取照着这些方法做

在数字时代&#xff0c;视频与音频的分离与重组已成为日常需求之一。无论是出于制作背景音乐、保存讲座内容&#xff0c;还是编辑播客素材&#xff0c;提取视频中的音频并将其保存为单独文件都显得尤为重要。视频里的音频怎么提取出来成单独文件&#xff1f;本文将详细介绍几种…

基于Canny边缘检测和轮廓检测

这段代码实现了基于Canny边缘检测和轮廓检测&#xff0c;从图像中筛选出面积较大的矩形&#xff0c;并使用OpenCV和Matplotlib显示结果。主要流程如下&#xff1a; 步骤详解&#xff1a; 读取图像&#xff1a; img cv2.imread(U:/1.png)使用cv2.imread()加载图像。 转换为灰…

cisco防火墙在内网通过外网域名进行访问的配置

1.配置主机的access-list列表 access-list outside_acl extended permit tcp any 192.168.1.123 2.对主机和端口进行映射&#xff0c; 2.1 nat (inside,outside) source static 192.168.1.123 interface service stcp80 stcp8800 注释&#xff1a;先对主机进行外网映射…

React(一)

文章目录 项目地址一、创建第一个react项目二、JSX语法2.1 生成列表2.2 大括号识别JS的表达式2.3 列表循环array2.4 条件判断以及假值显示2.5 复杂条件渲染2.6 事件监听和绑定2.7 使用Fregments返回多个根标签2.8 多条件渲染2.9 导出子组件 三、组件3.1 设置组件3.2 props给子组…

记录一下在原有的接口中增加文件上传☞@RequestPart

首先&#xff0c;咱声明一下&#xff1a; RequestBody和 MultipartFile 不可以 同时使用&#xff01;&#xff01;&#xff01; 因为这两者预期的请求内容类型不同。RequestBody 预期请求的 Content-Type 是 application/json 或 application/xml&#xff0c;而 MultipartFile …

HTTPSOK ---助力阿里云免费 SSL 证书自动续期

目前许多用户面临着 SSL 证书过期续期的难题&#xff0c;尤其是对于阿里云的 免费 SSL 证书&#xff0c;每三个月需要手动申请和更新。为了帮助用户更轻松地管理 SSL 证书&#xff0c;现推出了强大的 HTTPSOK 服务&#xff0c;为用户提供了更便捷的自动续期和管理解决方案。 什…

5G的SUCI、SUPI、5G-GUTI使用场景及关系

使用场景(来源于对23.501、23.502、33.501、23.003的理解) 1、UE初始注册时&#xff0c;根据HN Public Key把SUPI加密成SUCI&#xff0c;并发送初始注册请求 2、AMF转发SUCI给AUSF和UDM进行认证&#xff0c;并获取解密后的SUPI 3、AMF根据SUPI生成一个5G-GUTI&#xff0c;并保…