RabbitMQ-工作模式(Publish模式Routing模式)

在这里插入图片描述

文章目录

  • 发布/订阅(Publish/Subscribe)
    • 交换机
    • 临时队列
    • 绑定
    • 总体代码示例
  • 路由(Routing)
    • 绑定
    • 直连交换机
    • 多重绑定
    • 发送日志
    • 订阅
    • 总体代码示例

更多相关内容可查看

发布/订阅(Publish/Subscribe)

构建一个简单的日志系统

  • 我们将通过构建一个简单的日志系统来说明这个模式。它将包含两个程序 – 第一个程序将发出日志消息,第二个程序将接收并打印它们。
  • 在我们的日志系统中,每个运行中的接收程序都将收到这些消息。这样,我们就能够运行一个接收程序并将日志定向到磁盘;同时,我们也能够运行另一个接收程序,并在屏幕上看到日志。

基本上,发布的日志消息将被广播到所有的接收程序。

交换机

Rabbit中完整的消息模型:

  • 生产者是发送消息的用户应用程序。
  • 队列是存储消息的缓冲区。
  • 消费者是接收消息的用户应用程序。

在RabbitMQ中消息模型的核心思想是,生产者从不直接发送任何消息到队列。实际上,很多时候生产者甚至不知道消息是否会被发送到任何队列。

相反,生产者只能将消息发送到一个交换机。交换机是一个非常简单的东西。它一边从生产者接收消息,另一边将它们推送到队列。交换机必须确切地知道如何处理收到的消息。它应该被附加到特定的队列吗?它应该被附加到多个队列吗?还是应该被丢弃。这些规则由交换机类型定义。
在这里插入图片描述

有几种可用的交换机类型:direct、topic、headers和fanout。我们将专注于最后一种类型 – fanout。让我们创建一个这种类型的交换机,并称其为logs:

channel.exchangeDeclare("logs", "fanout");

fanout交换机非常简单。正如你可能从名称中猜到的那样,它只是将接收到的所有消息广播到它所知道的所有队列。这正是我们的日志记录器所需要的。

列出交换机
要列出服务器上的交换机,您可以运行非常有用的rabbitmqctl命令:

sudo rabbitmqctl list_exchanges

在这个列表中会有一些amq.*交换机和默认(未命名)交换机。这些是默认创建的,无需考虑

之前对交换机一无所知,但仍然能够将消息发送到队列。这是因为我们使用的是默认交换,通过空字符串("")来标识它。

回想一下之前发布消息的方式:

channel.basicPublish("", "hello", null, message.getBytes());

第一个参数是交换机的名称空字符串表示默认或无名交换机:如果存在指定 routingKey 的队列,则消息会被路由到该队列。

现在,我们可以将消息发布到我们命名的交换机:

channel.basicPublish("logs", "", null, message.getBytes());

这样,我们将消息发布到名为 logs 的交换机中,而不是默认的无名交换机。

临时队列

在之前,我们使用的队列都有特定的名称。能够给队列命名对我们来说非常重要 - 我们需要将工作者指向同一个队列。在想要在生产者和消费者之间共享队列时,给队列命名非常重要。

但是对于我们的日志记录器来说情况并非如此。我们希望收到所有日志消息,而不仅仅是其中的一部分。我们也只对当前正在流动的消息感兴趣,而不是旧消息。为了解决这个问题,我们需要两件事情。

  • 首先,每当我们连接到 Rabbit 时,我们都需要一个全新的空队列。为此,我们可以创建一个具有随机名称的队列,或者更好的是让服务器为我们选择一个随机的队列名称。
  • 其次,一旦我们断开消费者连接,队列应该自动删除。

在 Java 中,当我们向 queueDeclare() 方法提供没有参数时,我们创建一个非持久化、独占、自动删除的队列,并且由服务器生成一个名称:

String queueName = channel.queueDeclare().getQueue();

在这一点上,queueName 包含一个随机的队列名称。例如,它可能看起来像amq.gen-JzTY20BRgKO-HjmUJj0wLg

绑定

在这里插入图片描述

我们已经创建了一个fanout 交换机和一个队列。现在我们需要告诉交换机将消息发送到我们的队列。交换机和队列之间的这种关系称为绑定

channel.queueBind(queueName, "logs", "");

从现在开始,logs 交换机将会将消息追加到我们的队列中。

列出绑定
您可以使用以下命令列出现有的绑定:

rabbitmqctl list_bindings

总体代码示例

在这里插入图片描述
发出日志消息的 producer 程序:logsroutingKeyfanoutEmitLog.java

public class EmitLog {private static final String EXCHANGE_NAME = "logs";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {//指定交换机类型-fanoutchannel.exchangeDeclare(EXCHANGE_NAME, "fanout");String message = argv.length < 1 ? "info: Hello World!" :String.join(" ", argv);//绑定交换机 发送消息到交换机EXCHANGE_NAME中channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + "'");}}
}

如果还没有队列绑定到交换机,则消息将丢失, 但这对我们来说没关系,如果还没有消费者在获取,我们可以安全地丢弃该消息。

代码为:ReceiveLogs.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;public class ReceiveLogs {private static final String EXCHANGE_NAME = "logs";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//指定交换机类型-fanoutchannel.exchangeDeclare(EXCHANGE_NAME, "fanout");String queueName = channel.queueDeclare().getQueue();//绑定交换机跟队列channel.queueBind(queueName, EXCHANGE_NAME, "");System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });}
}

如果你想将日志保存到文件中,只需打开控制台并输入以下命令:

java -cp $CP ReceiveLogs > logs_from_rabbit.log

如果你希望在屏幕上看到日志,开启一个新的终端并运行:

java -cp $CP ReceiveLogs

当然,要发出日志,只需输入:

java -cp $CP EmitLog

使用 rabbitmqctl list_bindings 命令,你可以验证代码实际上是否按我们想要的方式创建了绑定和队列。如果有两个 ReceiveLogs.java 程序正在运行,你应该会看到类似如下的输出:

sudo rabbitmqctl list_bindings
# => Listing bindings ...
# => logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
# => logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
# => ...done.

结果的解释很简单:来自 exchange logs 的数据发送到两个带有服务器分配名称的队列中。这正是我们想要的。

路由(Routing)

绑定

在前面的示例中,我们已经创建了绑定。你可能还记得 代码如下:

channel.queueBind(queueName, EXCHANGE_NAME, "");

绑定是交换机和队列之间的关系。这可以简单地理解为:队列对来自该交换机的消息感兴趣。

绑定可以接受一个额外的routingKey 参数。为了避免与basic_publish参数混淆,我们将其称为绑定键。以下是我们如何使用键创建绑定的示例:

channel.queueBind(queueName, EXCHANGE_NAME, "black");

绑定键的含义取决于交换机类型。我们先前使用的fanout 交换机简单地忽略了它的值。

直连交换机

在我们之前的教程中,我们的日志系统将所有消息广播给所有消费者。我们希望扩展其功能,以允许根据消息的严重性进行过滤。例如,我们可能希望一个将日志消息写入磁盘的程序只接收关键错误,而不浪费磁盘空间来记录警告或信息日志消息。

我们之前使用的是fanout 交换机,它并没有提供太多的灵活性 - 它只能进行无脑广播。

相反,我们将使用直连交换机。直连交换机背后的路由算法很简单 - 消息将被发送到绑定键与消息的路由键完全匹配的队列中。

为了说明这一点,考虑以下设置:

橙黑绿P直接问₁Q₂C₁

在这个设置中,我们可以看到直连交换机 X 与两个绑定到它的队列。第一个队列绑定的绑定键是 orange,而第二个队列有两个绑定,一个绑定键为 black,另一个为 green

在这样的设置中,使用路由键 orange发布到交换机的消息将被路由到队列 Q1。具有路由键 black 或 green的消息将发送到 Q2。所有其他消息将被丢弃。

多重绑定

黑黑P直接问₁Q₂C₁C₂

将多个队列与相同的绑定键绑定是完全合法的。在我们的示例中,我们可以添加一个在交换机 X 和队列 Q1 之间的绑定,绑定键为black。在这种情况下,直连交换机将像fanout 交换机一样行为,将消息广播到所有匹配的队列。具有路由键 black 的消息将被传递到 Q1 和 Q2。

发送日志

我们将使用这个模型来构建我们的日志系统。与使用fanout 交换机不同,我们将消息发送到一个直连交换机。我们将日志严重程度作为路由键提供。这样,接收程序就能够选择它想要接收的严重程度。让我们先专注于发送日志。

和往常一样,我们首先需要创建一个交换机:

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

接下来,我们准备发送一条消息:

channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());

我们假设 ‘severity’ 可以是 'info'、'warning' 或 'error' 中的一个。

订阅

接收消息的方式与之前的教程类似,只有一个例外 - 我们将为每个我们感兴趣的严重程度创建一个新的绑定。

String queueName = channel.queueDeclare().getQueue();for(String severity : argv){channel.queueBind(queueName, EXCHANGE_NAME, severity);
}

总体代码示例

错误信息警告错误P直接amq.gen-S9b...amq.gen-Ag1...C₁C₂

生产者类的代码:EmitLogDirect.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class EmitLogDirect {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {//指定交换机类型channel.exchangeDeclare(EXCHANGE_NAME, "direct");//指定日志类型String severity = getSeverity(argv);String message = getMessage(argv);//发送日志channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + severity + "':'" + message + "'");}}//..
}

消费者代码为:ReceiveLogsDirect.java

import com.rabbitmq.client.*;public class ReceiveLogsDirect {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//指定交换机类型channel.exchangeDeclare(EXCHANGE_NAME, "direct");//获取交换机随机名字String queueName = channel.queueDeclare().getQueue();if (argv.length < 1) {System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");System.exit(1);}for (String severity : argv) {//指定日志类型channel.queueBind(queueName, EXCHANGE_NAME, severity);}System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" +delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });}
}

如果你只想将 ‘warning’ 和 ‘error’(而不是 ‘info’)日志消息保存到文件中,只需打开控制台并输入以下命令:

java -cp $CP ReceiveLogsDirect warning error > logs_from_rabbit.log

如果你想在屏幕上看到所有的日志消息,打开一个新的终端并执行以下命令:

java -cp $CP ReceiveLogsDirect info warning error
# => [*] Waiting for logs. To exit press CTRL+C

例如,要发出一个错误日志消息,只需输入以下命令:

java -cp $CP EmitLogDirect error "Run. Run. Or it will explode."
# => [x] Sent 'error':'Run. Run. Or it will explode.'

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/24832.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

(delphi11最新学习资料) Object Pascal 学习笔记---第14章泛型第3节(接口引用 vs 泛型接口约束)

14.3.4 接口引用 vs 泛型接口约束 ​ 在上一个例子中&#xff0c;我定义了一个泛型类&#xff0c;可以与实现特定接口的任何对象一起使用。我也可以通过创建基于接口引用的标准&#xff08;非泛型&#xff09;类来获得类似的效果。实际上&#xff0c;我可以定义一个类&#xf…

前端构建工具总结

前端构建工具是前端开发中的重要组成部分&#xff0c;它们能够帮助开发者自动化地处理、优化和打包前端资源&#xff0c;提高开发效率和项目的可维护性。以下是对一些常用前端构建工具的总结&#xff1a; Webpack 功能&#xff1a;Webpack是一个强大的前端构建工具&#xff0c…

[EFI]ASUS Vivobook 16x M1603QA 电脑 Hackintosh 黑苹果efi引导文件

黑果魏叔提供硬件型号驱动情况 主板ASUS Vivobook 16x M1603QA 处理器AMD Ryzen 5 5600H已驱动 内存8GB DDR4 on board 8GB DDR4 SO-DIMM已驱动 硬盘SSD INTEL 512GB 670P M.2 SSDPEKNU512GZX1 PCIe 3.0 x4 NVMe已驱动 显卡AMD Radeon RX Vega 7已驱动 声卡瑞昱 AMD Hi…

vue antdesgin table 动态表头动态数据示例

以下是一个基于 Vue 和 Ant Design Vue 的示例&#xff0c;可以动态生成表格的表头和数据&#xff1a; <template><div><a-button click"addColumn">添加列</a-button><a-table :columns"columns" :dataSource"dataSource…

HC-05蓝牙模块配置连接和使用

文章目录 1. 前期准备 2. 进入AT模式 3. 电脑串口配置 4. 配置过程 5. 主从机蓝牙连接 6. 蓝牙模块HC-05和电脑连接 1. 前期准备 首先需要准备一个USB转TTL连接器&#xff0c;电脑安装一个串口助手&#xff0c;然后按照下面的连接方式将其相连。 VCCVCCGNDGNDRXDTXDTXD…

ICLR24大模型提示(8) | 退一步思考:在大型语言模型中通过抽象引发推理

【摘要】我们提出了一种简单的提示技术&#xff0c;即后退提示法&#xff0c;它使 LLM 能够进行抽象&#xff0c;从包含特定细节的实例中得出高级概念和第一原理。通过使用概念和原理来指导推理&#xff0c;LLM 显著提高了遵循正确推理路径解决问题的能力。我们使用 PaLM-2L、G…

Facebook企业户 | Facebook公共主页经营

Facebook作为社交媒体巨头&#xff0c;拥有庞大的用户基数&#xff0c;因此&#xff0c;有效经营公共主页是获取持续流量、提升客户信任度和粘性、促进产品或服务销售与转化的关键。要优化Facebook主页&#xff0c;关注以下几点&#xff1a; 1、参与度是关键指标&#xff1a;因…

Python YOLOv5 7.0 基于深度学习的口罩检测识别系统

目录 1&#xff0c;演示视频和资源下载 1.1 演示视频 1.2 资源下载 2&#xff0c;数据集 3&#xff0c;代码 3.1 带 PyQt5 UI 的检测程序&#xff0c;基于YOLOv5 7.0 3.1.1 根据训练结果进行检测 3.1.2 自动保存每张图片/每帧的检测结果 3.1.3 筛选查看每张图片/每帧检…

Linux---sudo命令

文章目录 目录 文章目录 一.sudo命令简介 二.sudo 命令的特点 三.sudo 相关文件 四.sudo 命令授权配置 一.sudo命令简介 sudo 命令全称“SuperUser Do”&#xff0c;是Linux系统中的一个命令能够使普通用户以超级用户身份去执行某些命令。 二.sudo 命令的特点 sudo能够授权…

HC05蓝牙模块与笔记本蓝牙连接

文章目录 1. 电脑和蓝牙模块连接 2. 串口软件调试 1. 电脑和蓝牙模块连接 HC05支持SPP协议&#xff0c;使用PC主机自带蓝牙&#xff0c;或者笔记本加蓝牙适配器。与HC05连接后&#xff0c;可在电脑端虚拟出串口&#xff0c;这样上位机软件就可以像操作串口一样与HC05通信。对…

Websocket前端与后端:深度探索与实战应用

Websocket前端与后端&#xff1a;深度探索与实战应用 在现代网络应用中&#xff0c;Websocket以其双向通信和实时交互的特性&#xff0c;逐渐成为前后端通信的优选方案。然而&#xff0c;对于许多开发者而言&#xff0c;Websocket的前后端实现仍然充满了困惑和挑战。本文将从四…

[ROS 系列学习教程] 建模与仿真 - 使用 Arbotix 控制机器人

ROS 系列学习教程(总目录) 本文目录 一、Arbotix 简介二、安装Arbotix三、配置Arbotix控制器四、配置launch启动文件五、数据交互接口六、在rviz中仿真控制机器人6.1 直接发topic控制6.2 使用键盘控制6.3 编写代码控制机器人移动 前面讲了机器人的建模&#xff0c;是静态的&…

二进制中1的个数-java

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 目录 前言 一、二进制中1的个数 二、算法思路 1.将一个整数转化成二进制形式 2.查询一个数的二进制数中的第k位是多少 3.lowbit(x)操作 三、代码如下 1.代码如下&…

合并两个有序的单链表

25计算机考研&#xff0c;数据结构知识点整理&#xff08;内容借鉴了王道408&#xff0b;数据结构教材&#xff09;&#xff0c;还会不断完善所整理的内容&#xff0c;后续的内容也会不断更新&#xff08;可以关注&#xff09;&#xff0c;若有错误和不足欢迎各位朋友指出! 目…

Rust-03-数据类型

在 Rust 中&#xff0c;每一个值都属于某一个 数据类型&#xff0c;这告诉 Rust 它被指定为何种数据&#xff0c;以便明确数据处理方式。Rust 是 静态类型语言&#xff0c;也就是说在编译时就必须知道所有变量的类型。根据值及其使用方式&#xff0c;编译器通常可以推断出我们想…

ChatTTS 文字生成语言本地模型部署

ChatTTS部署 官方信息 [ChatTTS首页](https://chattts.com/)搭建步骤 1、下载源码 git clone https://github.com/2noise/ChatTTS.git 2、按照环境 pip install torch ChatTTS pip install -r requirements.txt 3、下载模型 git clone https://www.modelscope.cn/pzc163/ch…

[Vue3:axios]:实现实现登陆页面前后端请求,并用Vite解决跨域问题

文章目录 一&#xff1a;前置依赖查看依赖安装 axios&#xff1a;npm install axios 二&#xff1a;配置文件&#xff1a;创建一个用于全局使用的axios实例&#xff0c;并在main.js或main.ts文件中将其配置为全局属性。根目录mainjs文件引入axios 三&#xff1a;登录页面发送登…

【React】dayjs -- 格式化时间 的使用

中文文档 1.安装 Node.js 项目中使用 Day.js&#xff0c;只需使用 npm 安装 npm install dayjs其它地方使用&#xff0c;参考中文文档 2.引入 import dayjs from dayjs3.使用 时间格式化 dayjs().format() // 默认返回的是 ISO8601 格式字符串 2020-04-02T08:02:17-05:0…

Polar Web【中等】xxe

Polar Web【中等】xxe Contents Polar Web【中等】xxe思路&探索EXP运行&总结 思路&探索 如题目所示&#xff0c;此题考查XXE漏洞&#xff0c;具体细节需要逐步深挖 打开站点&#xff0c;提示了flag所在的文件&#xff0c;点击按钮&#xff0c;可见php的配置信息&am…

一款免费文件夹同步工具,旨在帮助用户在不同磁盘或文件夹间进行文件和目录的复制、移动和同步工作

一、简介 1、一款免费文件夹同步工具&#xff0c;旨在帮助用户在不同磁盘或文件夹间进行文件和目录的复制、移动和同步工作。这款工具因其简单易用、高度可定制化的特点&#xff0c;受到了广大用户的青睐。SyncToy支持多种同步模式&#xff0c;包括镜像同步、单向同步以及增量同…