RabbitMQ中的Publish-Subscribe模式

在现代分布式系统中,消息队列(Message Queue)是实现异步通信和解耦系统的关键组件。RabbitMQ 是一个功能强大且广泛使用的开源消息代理,支持多种消息传递模式。其中,Publish/Subscribe(发布/订阅)模式是一种常见且重要的模式,它允许消息发布者将消息广播给多个订阅者。

本文将深入探讨 RabbitMQ 中的 Publish/Subscribe 模式,包括其工作原理、实现方式、适用场景以及最佳实践。


1. Publish/Subscribe 模式简介

1.1 什么是 Publish/Subscribe 模式?

Publish/Subscribe(发布/订阅)模式是一种消息传递模式,它将消息的发送者(发布者)和接收者(订阅者)解耦。发布者将消息发布到一个交换机(Exchange),而订阅者通过绑定到交换机的**队列(Queue)**来接收消息。

与点对点模式(如工作队列)不同,Publish/Subscribe 模式允许多个订阅者接收相同的消息,从而实现消息的广播。

在这里插入图片描述

1.2 核心概念

在 RabbitMQ 中,Publish/Subscribe 模式依赖以下核心组件:

  • 发布者(Publisher):发送消息的客户端。
  • 交换机(Exchange):接收发布者发送的消息,并根据规则将消息路由到队列。
  • 队列(Queue):存储消息的缓冲区。
  • 订阅者(Subscriber):从队列中消费消息的客户端。
  • 绑定(Binding):定义交换机和队列之间的关系。

2. Publish/Subscribe 模式的工作原理

2.1 交换机的作用

在 RabbitMQ 中,消息不会直接发送到队列,而是发送到交换机。交换机根据绑定规则将消息路由到相应的队列。

RabbitMQ 提供了多种类型的交换机,其中最常用的是:

  • Fanout 交换机:将消息广播到所有绑定到它的队列,忽略路由键(Routing Key)。
  • Direct 交换机:根据消息的路由键将消息路由到匹配的队列。
  • Topic 交换机:支持更复杂的路由规则,允许使用通配符匹配路由键。
  • Headers 交换机:根据消息的头部属性进行路由。

在 Publish/Subscribe 模式中,通常使用 Fanout 交换机,因为它能够将消息广播到所有绑定的队列。

2.2 消息的广播过程

  1. 发布者将消息发送到交换机。
  2. 交换机接收到消息后,将消息广播到所有绑定的队列。
  3. 订阅者从队列中消费消息。

3. Java 实现 Publish/Subscribe 模式

以下是使用 Java 和 RabbitMQ Java Client 实现 Publish/Subscribe 模式的完整示例。

3.1 添加依赖

在 Maven 项目中,添加 RabbitMQ Java Client 依赖:

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.20.0</version>
</dependency>

3.2 创建发布者(Publisher)

发布者负责将消息发送到交换机。以下是发布者的代码:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.nio.charset.StandardCharsets;public class Publisher {private static final String EXCHANGE_NAME = "publisher_subscriber";public static void main(String[] argv) throws Exception {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.200.138");factory.setPort(5672);factory.setVirtualHost("/test");factory.setUsername("test");factory.setPassword("test");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 声明一个 Fanout 交换机channel.exchangeDeclare(EXCHANGE_NAME, "fanout");// 发布消息String message = "Hello, Subscribers!";channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8));System.out.println(" [x] Sent '" + message + "'");}}
}

3.3 创建订阅者(Subscriber)

订阅者负责从队列中消费消息。以下是订阅者的代码:

import com.rabbitmq.client.*;import java.nio.charset.StandardCharsets;public class Subscriber {private static final String EXCHANGE_NAME = "publisher_subscriber";public static void main(String[] argv) throws Exception {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.200.138");factory.setPort(5672);factory.setVirtualHost("/test");factory.setUsername("test");factory.setPassword("test");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 声明一个 Fanout 交换机channel.exchangeDeclare(EXCHANGE_NAME, "fanout");// 创建一个临时队列,并绑定到交换机String queueName = channel.queueDeclare().getQueue();channel.queueBind(queueName, EXCHANGE_NAME, "");System.out.println(" [*] Waiting for messages. To exit press CTRL+C");// 定义消息处理函数DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println(" [x] Received '" + message + "'");};// 开始消费消息channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});}
}

3.4 运行示例

  • 启动多个订阅者,在不同的终端窗口中运行多个订阅者实例

    启动多个订阅者后,能在RabbitMQ终端页面,能看到多个临时的队列,但交换机只有一个publisher_subscriber

在这里插入图片描述

在这里插入图片描述

  • 启动发布者,在另一个终端窗口中运行发布者
3.4.1 观察输出

所有订阅者都会收到发布者发送的消息。例如:

发布者输出:

 [x] Sent 'Hello, Subscribers!'

订阅者输出:

 [*] Waiting for messages. To exit press CTRL+C[x] Received 'Hello, Subscribers!'

在这里插入图片描述


4. 代码解析

4.1 发布者代码解析

  • 连接工厂ConnectionFactory 用于创建到 RabbitMQ 服务器的连接。
  • 交换机声明channel.exchangeDeclare(EXCHANGE_NAME, "fanout") 声明一个 Fanout 交换机。
  • 消息发布channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8)) 将消息发送到交换机。

4.2 订阅者代码解析

  • 临时队列channel.queueDeclare().getQueue() 创建一个非持久化的、独占的临时队列。
  • 队列绑定channel.queueBind(queueName, EXCHANGE_NAME, "") 将队列绑定到交换机。
  • 消息处理DeliverCallback 定义了如何处理接收到的消息。
  • 消费消息channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }) 开始消费消息。

5. Publish/Subscribe 模式的适用场景

5.1 日志记录

在分布式系统中,日志记录是一个常见的需求。使用 Publish/Subscribe 模式,可以将日志消息广播给多个日志处理器,分别将日志写入文件、数据库或发送到监控系统。

5.2 实时通知

在社交网络或即时通讯应用中,可以使用 Publish/Subscribe 模式向多个用户发送实时通知。例如,当用户发布新动态时,通知所有关注者。

5.3 分布式缓存更新

在分布式缓存系统中,当缓存数据更新时,可以使用 Publish/Subscribe 模式通知所有缓存节点同步更新。

5.4 事件驱动架构

在事件驱动架构中,Publish/Subscribe 模式用于实现事件的广播。例如,当用户注册成功时,发布一个事件,通知多个服务(如邮件服务、积分服务)执行相应的操作。


6. 最佳实践

6.1 使用持久化

为了确保消息不会丢失,建议将交换机和队列设置为持久化。例如:

channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true);
channel.queueDeclare("my_queue", true, false, false, null);

6.2 处理消息确认

在生产环境中,建议启用消息确认机制,确保消息被成功消费。例如:

channel.basicConsume(queueName, false, deliverCallback, consumerTag -> { });

6.3 避免消息积压

在高并发场景下,可能会出现消息积压的情况。可以通过设置队列的最大长度或使用**死信队列(DLX)**来处理积压的消息。

6.4 监控和报警

使用 RabbitMQ 的管理界面或监控工具(如 Prometheus + Grafana)监控消息队列的状态,并设置报警规则,及时发现和解决问题。


7. 总结

Publish/Subscribe 模式是 RabbitMQ 中一种强大且灵活的消息传递模式,适用于需要将消息广播给多个订阅者的场景。通过使用 Fanout 交换机,可以轻松实现消息的广播,同时结合持久化、消息确认和监控机制,可以构建高可靠性的分布式系统。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/64206.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【iOS】OC高级编程 iOS多线程与内存管理阅读笔记——自动引用计数(四)

目录 ARC规则 规则 对象型变量不能作为C语言结构体的成员 显式转换id和void* 属性 数组 ARC规则 规则 在ARC有效的情况下编译源代码必须遵守一定的规则&#xff1a; 主要解释一下最后两条 对象型变量不能作为C语言结构体的成员 要把对象型变量加入到结构体成员中时&a…

路由引入问题(双点双向路由回馈问题)

简介 总所周知&#xff0c;路由引入import又称路由重分发redistribute&#xff0c;为了解决不同路由协议进程间路由信息不互通而使用的技术&#xff0c;由于不同路由协议的算法、机制、开销等因素的差异&#xff0c;它们之间无法直接交换路由信息。因此&#xff0c;路由引入技…

26. Three.js案例-自定义多面体

26. Three.js案例-自定义多面体 实现效果 知识点 WebGLRenderer WebGLRenderer 是 Three.js 中用于渲染场景的主要类。它支持 WebGL 渲染&#xff0c;并提供了多种配置选项。 构造器 new THREE.WebGLRenderer(parameters) 参数类型描述parametersObject可选参数对象&…

【在Linux世界中追寻伟大的One Piece】HTTP Session

目录 1 -> 引入HTTP Session 1.1 -> 定义 1.2 -> 工作原理 1.3 -> 安全性 1.4 -> 超时和失效 1.5 -> 用途 2 -> 模拟session行为 3 -> 实验测试session 1 -> 引入HTTP Session 1.1 -> 定义 HTTP Session是服务器用来跟踪用户与服务器交…

Docker-Dockerfile、registry

Dockerfile 一、概述 1、commit的局限 很容易制作简单的镜像&#xff0c;但碰到复杂的情况就十分不方便&#xff0c;例如碰到下面的情况&#xff1a; 需要设置默认的启动命令需要设置环境变量需要指定镜像开放某些特定的端口 2、Dockerfile是什么 Dockerfile是一种更强大的镜…

蓝桥杯刷题——day1

蓝桥杯刷题——day1 题目一题干题目解析代码 题目二题干题目解析代码 题目一 题干 给定一个字符串 s &#xff0c;验证 s 是否是 回文串 &#xff0c;只考虑字母和数字字符&#xff0c;可以忽略字母的大小写。本题中&#xff0c;将空字符串定义为有效的 回文串 。 题目链接&a…

【多模态文档智能】OCR-free感知多模态大模型技术链路及训练数据细节

目前的一些多模态大模型的工作倾向于使用MLLM进行推理任务&#xff0c;然而&#xff0c;纯OCR任务偏向于模型的感知能力&#xff0c;对于文档场景&#xff0c;由于文字密度较高&#xff0c;现有方法往往通过增加图像token的数量来提升性能。这种策略在增加新的语言时&#xff0…

如何在 Ubuntu 22.04 上使用 Fail2Ban 保护 SSH

前言 SSH&#xff0c;这玩意儿&#xff0c;简直是连接云服务器的标配。它不仅好用&#xff0c;还很灵活。新的加密技术出来&#xff0c;它也能跟着升级&#xff0c;保证核心协议的安全。但是&#xff0c;再牛的协议和软件&#xff0c;也都有可能被攻破。SSH 在网上用得这么广&…

供应链系统设计-中台系统设计系列(三)- 好中台的标准之稳定原则

概述 在上一篇供应链系统设计-中台系统设计系列&#xff08;二&#xff09;- 好中台的标准之复用原则中&#xff0c;我们以复用原则为主&#xff0c;讨论了以下3点&#xff1a; 前台业务效率提升&#xff1a;好的中台能够显著提高前台业务的效率&#xff0c;通过将前台业务中通…

CTF 攻防世界 Web: FlatScience write-up

题目名称-FlatScience 网址 index 目录中没有发现提示信息&#xff0c;链接会跳转到论文。 目前没有发现有用信息&#xff0c;尝试目录扫描。 目录扫描 注意到存在 robots.txt 和 login.php。 访问 robots.txt 这里表明还存在 admin.php admin.php 分析 在这里尝试一些 sql…

axios请求拦截器和响应拦截器,封装naive-ui的 Loading Bar加载条和useMessage消息提示

接之前的博客设计从0开始边做边学&#xff0c;用vue和python做一个博客&#xff0c;非规范化项目&#xff0c;怎么简单怎么弄&#xff0c;跑的起来有啥毛病解决啥毛病&#xff08;三&#xff09;&#xff0c;目前已经完成了基本的功能demo&#xff0c;但是请求接口不可能每个页…

Blue Ocean 在Jenkins上创建Pipeline使用详解

BlueOcean是Jenkins的一个插件,它提供了一套可视化操作界面来帮助用户创建、编辑Pipeline任务。以下是对BlueOcean中Pipeline操作的详细解释: 一、安装与启动BlueOcean 安装:在Jenkins的“系统管理”->“插件管理”->“可选插件”中搜索“BlueOcean”,然后点击“Ins…

opencv——识别图片颜色并绘制轮廓

图像边缘检测 本实验要用到Canny算法&#xff0c;Canny边缘检测方法常被誉为边缘检测的最优方法。 首先&#xff0c;Canny算法的输入端应为图像的二值化结果&#xff0c;接收到二值化图像后&#xff0c;需要按照如下步骤进行&#xff1a; 高斯滤波。计算图像的梯度和方向。非极…

基础库urllib的使用

学习爬虫&#xff0c;其基本的操作便是模拟浏览器向服务器发出请求&#xff0c;那么我们需要从哪个地方做起呢?请求需要我们自己构造吗?我们需要关心请求这个数据结构怎么实现吗?需要了解 HTTP、TCP、IP层的网络传输通信吗?需要知道服务器如何响应以及响应的原理吗? 可能…

剑指Offer|day4 LCR 004. 只出现一次的数字 II

LCR 004. 只出现一次的数字 II 给你一个整数数组 nums &#xff0c;除某个元素仅出现 一次 外&#xff0c;其余每个元素都恰出现 **三次 。**请你找出并返回那个只出现了一次的元素。 示例 1&#xff1a; 输入&#xff1a;nums [2,2,3,2] 输出&#xff1a;3提示&#xff1a…

Mysql学习笔记之SQL-1

上篇文章我们介绍了Mysql的安装&#xff0c;这篇文章我们介绍Mysql的操作语言SQL 1. 简介 sql全称&#xff08;Structured Query Language&#xff09;是结构化查询语言&#xff0c;操作关系型数据库的编程语言&#xff0c;定义了一套操作关系型数据库统一标准 2. sql分类 …

埃隆马斯克X-AI发布Grok-2大模型,快来体验~

引言 近年来&#xff0c;人工智能技术的快速发展推动了大语言模型的广泛应用。无论是日常生活中的智能助手&#xff0c;还是行业中的自动化解决方案&#xff0c;大语言模型都扮演着越来越重要的角色。2024年&#xff0c;X-AI推出了新一代的大模型——Grok-2&#xff0c;这款模…

PostgreSQL的学习心得和知识总结(一百六十三)|深入理解PostgreSQL数据库之 GUC参数compute_query_id 的使用和实现

目录结构 注&#xff1a;提前言明 本文借鉴了以下博主、书籍或网站的内容&#xff0c;其列表如下&#xff1a; 1、参考书籍&#xff1a;《PostgreSQL数据库内核分析》 2、参考书籍&#xff1a;《数据库事务处理的艺术&#xff1a;事务管理与并发控制》 3、PostgreSQL数据库仓库…

多线程编程杂谈(上)

问题 线程执行的过程中可以强制退出吗&#xff1f; 主动退出&#xff1f;被动退出&#xff1f; 问题抽象示例 需要解决的问题 g_run 全局变量需要保护吗&#xff1f; 如何编码使得线程中每行代码的执行可被 g_run 控制&#xff1f; 线程代码在被 g_run 控制并 "强制退…

【Git】:企业级开发和多人协作开发啊

目录 多人协作 模拟配置多人协作环境 多人同一分支开发 多人不同分支开发 远程分支删除后的问题 企业级开发模型 系统开发环境 分支设计规范 多人协作 模拟配置多人协作环境 目前&#xff0c;我们所完成的工作如下&#xff1a; 基本完成 Git 的所有本地库的相关操作&#xff…