RabbitMQ---交换机-Fanout-Direct

  • Publisher:生产者,不再发送消息到队列中,而是发给交换机
  • Exchange:交换机,一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
  • Queue:消息队列也与以前一样,接收消息、缓存消息。不过队列一定要与交换机绑定。
  • Consumer:消费者,与以前一样,订阅队列,没有变化

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

交换机的类型有四种:

  • Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机
  • Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列
  • Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符
  • Headers:头匹配,基于MQ的消息头匹配,用的较少。

Fanout交换机

在这里插入图片描述
简单点来说,就是生产者把消息发给交换机,交换机根据路由(绑定规则)来转发消息给队列,消费者订阅队列,获得消息。

Fanout,英文翻译是扇出,我觉得在MQ中叫广播更合适。
在广播模式下,消息发送流程是这样的:
image.png

  • 1) 可以有多个队列
  • 2) 每个队列都要绑定到Exchange(交换机)
  • 3) 生产者发送的消息,只能发送到交换机
  • 4) 交换机把消息发送给绑定过的所有队列
  • 5) 订阅队列的消费者都能拿到消息
    注意:我下面的代码都是在上一个加依赖的基础上的,可看我上一个文档

01利用官方文档的

消息发送:


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.util.Scanner;public class EmitLog {//定义交换机private static final String EXCHANGE_NAME = "fanout-exchange";public static void main(String[] argv) throws Exception {//创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");//建立连接和通道try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {//通过channel.exchangeDeclare方法声明了一个名为EXCHANGE_NAME(即"logs")的交换机// 并指定了其类型为fanout。fanout类型的交换机会将消息广播到所有与之绑定的队列中。channel.exchangeDeclare(EXCHANGE_NAME, "fanout");//发送消息Scanner scanner=new Scanner(System.in);while(scanner.hasNext()){String message = scanner.nextLine();//channel.basicPublish方法将消息发布到前面声明的交换机中。// 注意,这里的routingKey(即第二个参数)为空字符串"",因为对于fanout类型的交换机来说,routingKey是不起作用的。channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + "'");}}}
}

接收:

注意一定要创建队列,不然只有交换机没用

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;public class ReceiveLogs {private static final String EXCHANGE_NAME = "fanout-exchange";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();//创建通道Channel channel1 = connection.createChannel();Channel channel2 = connection.createChannel();//声明交换机channel1.exchangeDeclare(EXCHANGE_NAME, "fanout");channel2.exchangeDeclare(EXCHANGE_NAME, "fanout");//队列的名字String queueName = "星星";//创建队列channel1.queueDeclare(queueName, true, false, false, null);channel1.queueBind(queueName, EXCHANGE_NAME, "");String queueName1 = "晨晨";//创建队列channel2.queueDeclare(queueName1, true, false, false, null);channel2.queueBind(queueName1, EXCHANGE_NAME, "");System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [1] Received '" + message + "'");};DeliverCallback deliverCallback2 = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [2] Received '" + message + "'");};channel1.basicConsume(queueName, true, deliverCallback, consumerTag -> { });channel2.basicConsume(queueName1, true, deliverCallback2, consumerTag -> { });}
}

请添加图片描述
请添加图片描述
请添加图片描述
可以看到fanout类型,生产者发送一个消息,所有的消费者都能接收到,这个类型不用设置路由

02注解形式的:

消息发送

先在配置类中声明交换机,队列,以及绑定关系

   public static final String FANOUT_QUEUE_1 = "fanout.queue.1";public static final String FANOUT_QUEUE_2 = "fanout.queue.2";public static final String FANOUT_EXCHANGE = "fanout.exchange";@Beanpublic Queue fanoutQueue1() {return new Queue(FANOUT_QUEUE_1);}@Beanpublic Queue fanoutQueue2() {return new Queue(FANOUT_QUEUE_2);}@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange(FANOUT_EXCHANGE);}@Beanpublic Binding binding1() {return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());}@Beanpublic Binding binding2() {return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());}

Test类中添加测试方法:

@Test
public void testFanoutExchange() {// 交换机名称String exchangeName = "fanout.exchange";// 消息String message = "hello, everyone!";rabbitTemplate.convertAndSend(exchangeName, "", message);
}

消息接收

在添加两个方法,作为消费者:

@RabbitListener(queues = "fanout.queue.1")
public void listenFanoutQueue1(String msg) {System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}@RabbitListener(queues = "fanout.queue.2")
public void listenFanoutQueue2(String msg) {System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}

在这里插入图片描述

总结

交换机的作用是什么?

  • 接收publisher发送的消息
  • 将消息按照规则路由到与之绑定的队列
  • 不能缓存消息,路由失败,消息丢失
  • FanoutExchange的会将消息路由到每个绑定的队列

Direct交换机

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
image.png
在Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

此处我省略了官方文档那种,直接springboot注解那种的,而且不再bean。而是全注解那种的,也是对黑马代码的进一步优化,不手动操作添加

下面是黑马的案例
案例需求如图
image.png

  1. 声明一个名为hmall.direct的交换机
  2. 声明队列direct.queue1,绑定hmall.directbindingKeybludred
  3. 声明队列direct.queue2,绑定hmall.directbindingKeyyellowred
  4. consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2
  5. 在publisher中编写测试方法,向hmall.direct发送消息

先在配置类加入下面的然后启动一下:

    /** 基于注解的来声明交换机和队列及其绑定关系 */@RabbitListener( bindings = @QueueBinding(exchange = @Exchange(name = "heima.direct", type = ExchangeTypes.DIRECT),value = @org.springframework.amqp.rabbit.annotation.Queue(name = "direct.queue1"),key = {"red", "blue"}))public void rabbitListener5(String message) {System.out.println("红蓝: " + message);}@RabbitListener( bindings = @QueueBinding(exchange = @Exchange(name = "heima.direct", type = ExchangeTypes.DIRECT),value = @org.springframework.amqp.rabbit.annotation.Queue(name = "direct.queue2"),key = {"yellow","red"}))public void rabbitListener6(String message) {System.out.println("黄红: " + message);}

消息发送

在Test类中添加测试方法:

    @Testpublic void testSendDirectExchange() {// 交换机名称String exchangeName = "heima.direct";// 消息String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "red", message);}@Testpublic void testSendDirectExchange01() {// 交换机名称String exchangeName = "heima.direct";// 消息String message = "最新报道,哥斯拉是居民自治巨型气球,虚惊一场!";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "blue", message);}

消息接收:

    @RabbitListener(queues = "direct.queue1")public void listenDirectQueue1(String msg) {System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");}@RabbitListener(queues = "direct.queue2")public void listenDirectQueue2(String msg) {System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");}

先点击测试的那个red的运行一下,在启动项目:
在这里插入图片描述

我们再切换为blue这个key:

你会发现,只有消费者1收到了消息:
在这里插入图片描述
###总结
描述下Direct交换机与Fanout交换机的差异?

  • Fanout交换机将消息路由给每一个与之绑定的队列
  • Direct交换机根据RoutingKey判断路由给哪个队列
  • 如果多个队列具有相同的RoutingKey,则与Fanout功能类似

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/14513.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何一键生成多个文本二维码?excel表格批量生码的方法

现在很多人会将文本信息做成二维码来展示,当有同类型内容生成大量二维码时,可以使用将文本导入excel表格的方式,将表格中的每条数据批量生成二维码,可以有效提升二维码制作的速度和效率。下面就让小编来将具体的操作步骤分享给大家…

二叉树顺序结构及链式结构

一.二叉树的顺序结构 1.定义:使用数组存储数据,一般使用数组只适合表示完全二叉树,此时不会有空间的浪费 注:二叉树的顺序存储在逻辑上是一颗二叉树,但是在物理上是一个数组,此时需要程序员自己想清楚调整…

手动安装maven依赖到本地仓库

使用mvn install命令安装jar包到指定的仓库。 命令如下: mvn install:install-file -Dmaven.repo.localC:\Users\liyong.m2\repository -DgroupIdcom.aspose -DartifactIdwords -Dversion18.4 -Dpackagingjar -DfileC:\Users\liyong\Desktop\jar\words-18.4.jar 解释…

grafana + Prometheus + node-exporter + pushgateway + alertmanager的监控解决方案

业内比较著名的监控解决方案,据笔者所知,大概是三套: 一个是zabbix的解决方案,一个是prometheusgrafana,一个是ELK zabbix比较重,而且原生支持监控SNMP,自带一个仪表盘,不需要额外…

docker redis 持久化

1、拉取redis镜像 docker pull redis:latest 2、 mkdir /data/redis 3、填充redis.conf文件及根据需求修改相应的配置 •通过官网地址找到对应版本的配置文件 •将配置信息复制到redis.conf中 •常见的修改配置 https://redis.io/docs/latest/operate/oss_and_stack/managem…

高仿果汁导航模板

参考原文:果汁导航风格模板_1234FCOM专注游戏工具及源码例子分享 极速云

基于springboot的毕业设计系统的开发源码

风定落花生,歌声逐流水,大家好我是风歌,混迹在java圈的辛苦码农。今天要和大家聊的是一款基于springboot的毕业设计系统的开发。项目源码以及部署相关请联系风歌,文末附上联系信息 。 项目简介: 毕业设计系统能够实现…

学习通高分免费刷课实操教程

文章目录 概要整体架构流程详细步骤云上全平台登录步骤小结 概要 我之前提到过一个通过浏览器的三个脚本就可以免费高分刷课的文章,由于不方便拍视频进行实操演示,然后写下了这个实操教程,之前的三个脚本划到文章末尾 整体架构流程 整体大…

窗口函数 | rows between …… and ……

ROWS BETWEEN ... AND ... 是 SQL 窗口函数中的一个子句&#xff0c;用于定义窗口函数操作的行范围。窗口函数允许用户对一组相关的记录执行计算&#xff0c;这些记录被称为窗口。 基本语法 <窗口函数> OVER ( [PARTITION BY <列名>] ORDER BY <列名> [AS…

华为云之Zabbix监控平台部署实践

华为云之Zabbix监控平台部署实践 一、本次实践介绍1.1 实践环境简介1.3 本次实践完成目标 二、 相关服务介绍2.1 华为云ECS云服务器介绍2.2 Zabbix介绍 三、环境准备工作3.1 预置实验环境3.2 查看预置环境信息 四、登录华为云4.1 登录华为云4.2 查看ECS状态4.3 连接ECS弹性云服…

力扣HOT100 - 287. 寻找重复数

解题思路&#xff1a; 快慢指针 第一步&#xff0c;慢指针每次移动一步&#xff0c;快指针每次移动两步&#xff0c;直到它们相遇。这一步保证了它们在环中相遇。 接下来&#xff0c;将其中一个指针&#xff08;快指针或慢指针&#xff09;重置到起点&#xff08;即数组的第一…

SpringBoot实现邮箱验证码

自行创建一个SpringBoot项目 导入SpringBoot所需要的邮箱验证码的包 <!--邮件发送--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-mail</artifactId><version>2.6.1</version>…

前后端部署笔记

windows版&#xff1a; 如果傻呗公司让用win电脑部署&#xff0c;类似于我们使用笔记本做局域网服务器&#xff0c;社内使用。 1.安装win版的nginx、mysql、node、jdk等 2.nginx开机自启参考Nginx配置及开机自启动&#xff08;Windows环境&#xff09;_nginx开机自启动 wind…

UPPAAL使用方法

UPPAAL使用方法 由于刚开始学习时间自动机及其使用方法&#xff0c;对UPPAAL使用不太熟悉&#xff0c;网上能找到的教程很少&#xff0c;摸索了很久终于成功实现一个小例子&#xff0c;所以记录一下详细教程。 这里用到的例子参考【UPPAAL学习笔记】1&#xff1a;基本使用示例…

专业级润滑油,一站式批发服务

要为机械设备提供持久稳定的动力保障吗&#xff1f;选择我们的专业级润滑油&#xff0c;让您的设备运转更顺畅&#xff0c;效率更高。 我们专业从事润滑油批发多年&#xff0c;以优质的产品、合理的价格和完善的服务赢得了广大客户的信赖。无论是汽车、机械还是工业设备&#x…

【Vue3】env环境变量的配置和使用(区分cli和vite)

原文作者&#xff1a;我辈李想 版权声明&#xff1a;文章原创&#xff0c;转载时请务必加上原文超链接、作者信息和本声明。 文章目录 前言一、env文件二、vue3cli加载env1..env配置2..dev配置&#xff08;其他环境参考&#xff09;3.package.json文件4.使用 三、vue3vite加载e…

【html5】03-新表单元素及属性

目录 1 引言 2 智能表单控件-type 3 表单属性 form input 5 答疑--解决required自定义提示信息 1 引言 HTML5引入了一系列新的表单输入类型&#xff0c;如email、url、number、range、date、time、datetime-local、month、week、search、color和tel等。这些新类型增强了表…

FFmpeg源码:bytestream_get_byte函数解析

一、引言 FFmpeg源码中经常使用到bytestream_get_byte这个函数&#xff0c;比如使用FFmpeg对BMP图片进行解析&#xff0c;其源码会调用函数bmp_decode_frame&#xff0c;而该函数内部会通过bytestream_get_byte读取BMP 的header。本文讲解函数bytestream_get_byte的作用和内部…

Spark SQL 中DataFrame DSL的使用

在上一篇文章中已经大致说明了DataFrame APi,下面我们具体介绍DataFrame DSL的使用。DataFrame DSL是一种命令式编写Spark SQL的方式&#xff0c;使用的是一种类sql的风格语法。 文章链接&#xff1a; 一、单词统计案例引入 import org.apache.spark.sql.{DataFrame, SaveMod…

Xinstall助力实现App间直接跳转,提升用户体验

在移动互联网时代&#xff0c;App已成为我们日常生活中不可或缺的一部分。然而&#xff0c;在使用各类App时&#xff0c;我们经常会遇到需要在不同App之间切换的情况&#xff0c;这时如果能够直接跳转&#xff0c;将会大大提升用户体验。而Xinstall正是这样一款能够帮助开发者实…