SpringBoot整合RabbitMQ direct交换机、fanout交换机、topic交换机

PS 常见错误
1、有匹配到交换机,但是没有匹配到绑定的队列。(交换机没有绑定队列)- not route
2、没有匹配到交换机。(交换机名称错误,not found - exchange)
3、交换机和队列都没有匹配(和第二种状态一样,没有匹配到交换机,直接返回。)

本篇主要讲的是
直连交换机(amq.direct)、
扇形交换机(amq.fanout)、
主题交换机(amq.topic)。
MQ中交换机和队列是绑定的关系,其中这三个交换机之间的差别就是,绑定路由键值的不同,看以下代码和解释:

1.BindingBuilder.bind(queue()).to(exchange()).with(routingkey);
2.BindingBuilder.bind(queue()).to(exchange());
第一种是直连交换机和主题交换机的用法。
第二种是扇形交换机的用法,不需要绑定路由键。
queue()方法:自定义的队列,也可以使用已存在的队列new Queue("已存在的队列名或自定义队列名",true,true,false)1.durable,是否持久化,默认为false。若为true,则将队列放入到磁盘中。否则只存再内存中,宕机后,所有数据都会消失。2.exclusive(排他性队列),是否仅当前连接可见,默认为false。若为true,则该队列在当前连接断开后,就会删除该队列,其优先级高于durable,也就是说无论是否设置为持久化,该操作都生效。3.autoDelete,自动删除队列,默认为false。若为true,则当消费者宕机后,会自动删除该队列。此时若生产者持续发送消息,这期间的发送内容都会丢失。这些自己测试一下就知道了。
exchange()方法:自定义的交换机,也可以使用已存在的交换机。(直连交换机) new DirectExchange("已存在交换机名或自定义交换机名",true);1.durable
routinekey:交换机与队列之间绑定的路由键。发送消息时,routinekey错误,则无法获取队列,则无法进行消息发送。直连交换机和主题交换机是必须加路由键,扇形交换机则不是必须加的。

项目依赖及配置

POM的依赖文件,基于SpringBoot 2.4.3版本,生产者项目和消费者项目都可以使用这样的依赖。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.4.3</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.mq.producter</groupId><artifactId>mqproducter</artifactId><version>0.0.1-SNAPSHOT</version><name>mqproducter</name><description>Demo project for Spring Boot</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build>
</project>

application.properties配置如下,生产者和消费者都可用一样的。

spring.application.name=MqProducer
##配置rabbitmq
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

1、直连交换机

直连交换机与队列进行绑定时,需要绑定一个路由键X。此时若有消息载体中携带了X路由键,则会找到对应的队列,并将消息存放到对应的队列中。

直连交换机 及 队列配置代码

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
/*** 直连交换机配置*/
public class DirectConfig {@BeanQueue directQueue(){return new Queue("directQueue",true);}@BeanDirectExchange createExchange(){//原本存在的交换机,都是持久性的。return new DirectExchange("amq.direct");}@BeanBinding bindingExchange(){//可以绑定自定义交换机或者绑定已存在的交换机return BindingBuilder.bind(directQueue()).to(createExchange()).with("directRouting");}
}

监听直连交换机 及 队列配置

监听直连交换机绑定的队列

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;@Component
@RabbitListener(queues = {"directQueue"})
public class DirectListener {@RabbitHandlerpublic void process(Map m){System.out.println("DirectListener接受到的消息为:"+m.toString());}
}

2、扇形交换机(广播)

扇形交换机与队列进行绑定,不需要绑定路由键,就算绑定了也没有作用。若扇形交换机中绑定了N个队列,此时有消息载体到了扇形交换机,则会被存放到扇形交换机绑定的所有队列中,消费者消费时,就会消费其所有队列的消息。

扇形交换机 及 队列配置代码

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
/*** 直连交换机配置*/
public class FanoutConfig {@BeanQueue firstFanoutQueue(){return new Queue("firstFanoutQueue",true);}@BeanQueue secondFanoutQueue(){return new Queue("secondFanoutQueue",true);}@BeanFanoutExchange createFanoutExchange(){return new FanoutExchange("amq.fanout");}@BeanBinding bindingFirstQueue(){//可以绑定自定义交换机或者绑定已存在的交换机//需要注意的是,扇形交换机with就不需要了return BindingBuilder.bind(firstFanoutQueue()).to(createFanoutExchange());}@BeanBinding bindingSeccondQueue(){//可以绑定自定义交换机或者绑定已存在的交换机return BindingBuilder.bind(secondFanoutQueue()).to(createFanoutExchange());}
}

监听扇形交换机 及 队列配置

因为有N个队列,此时需要创建N个监听器,或者在一个监听器中,监听N个队列,这样子简单的监听就没办法对不同的队列进行逻辑处理了。
FirstFanoutListener.java

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;@Component
@RabbitListener(queues = {"firstFanoutQueue"})
public class FirstFanoutListener {@RabbitHandlerpublic void process(Map m){System.out.println("FirstFanoutListener接受到的消息为:"+m.toString());}
}

SecondFanoutListener.java

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;@Component
@RabbitListener(queues = {"secondFanoutQueue"})
public class SecondFanoutListener {@RabbitHandlerpublic void process(Map m){System.out.println("SecondFanoutListener接受到的消息为:"+m.toString());}
}

这两个类监听的是扇形交换机绑定的两个队列,若有消息载体进入扇形交换机,这两个监听类都会实时消费数据。

3、主题交换机

主题交换机和队列进行绑定时,可以通过通配符来进行路由键的模糊匹配。路由键可以使用topic.* 或者topic.#表示。其中*号代表了,句点后必须有1个或多个值。#号代表了,句点后必须有0个或多个值。
 主题交换机是很强大的,它可以代替直连交换机和扇形交换机使用。若路由键中设置#号(说明路由键可以为空),则可以代替扇形交换机。若路由键不包含 *号或者#号(说明不存在模糊匹配,需要全匹配。)则可以代替直连交换机使用。

看一下以下的例子:
 若有交换机绑定了队列1(topic.thing)、队列2(topic.
 此时若有消息载体携带topic.thing路由键,则消息就会进入队列1和队列2中。
 若有消息载体携带topic.other,则消息只会进入到队列2中,因为
号匹配了other。这就说明句点后可以输入任意字符。

主题交换机 及 队列配置代码

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
/*** 直连交换机配置*/
public class TopicConfig {@BeanQueue firstTopicQueue(){return new Queue("firstTopicQueue",true);}@BeanQueue secondTopicQueue(){return new Queue("secondTopicQueue",true);}@BeanTopicExchange createTopicExchange(){//选择已存在的交换机return new TopicExchange("amq.topic");}@BeanBinding bindingFirstTopicQueue(){//可以绑定自定义交换机或者绑定已存在的交换机//需要注意的是,扇形交换机with就不需要了return BindingBuilder.bind(firstTopicQueue()).to(createTopicExchange()).with("topic.thing");}@BeanBinding bindingSecondTopicQueue(){//可以绑定自定义交换机或者绑定已存在的交换机return BindingBuilder.bind(secondTopicQueue()).to(createTopicExchange()).with("topic.#");}
}

主题交换机的代码有所不同,在路由键中使用了通配符#

监听主题交换机 及 队列配置

因为有N个队列,此时需要创建N个监听器,或者在一个监听器中,监听N个队列,这样子简单的监听就没办法对不同的队列进行逻辑处理了。
FirstTopicListener.java

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;@Component
@RabbitListener(queues = {"firstTopicQueue"})
public class FirstTopicListener {@RabbitHandlerpublic void process(Map m){System.out.println("FirstTopicListener接受到的消息为:"+m.toString());}
}

SecondTopicListener.java

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;@Component
@RabbitListener(queues = {"secondTopicQueue"})
public class SecondTopicListener {@RabbitHandlerpublic void process(Map m){System.out.println("SecondTopicListener接受到的消息为:"+m.toString());}
}

这两个类监听的是主题交换机绑定的两个队列,若有消息载体进入扇形交换机,携带绑定的路由键,就会进入到指定队列,不同的监听器就会实时消费。

发送信息

发送信息接口代码

交换机对应的简单的配置代码都已经完成了,接下来就写一下发送消息的接口。

package com.mq.producter.mqproducter.action;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;@RestController
@RequestMapping("/mq")
public class ProducerAction {@AutowiredRabbitTemplate rabbitTemplate;@RequestMapping("/sending")public void sending(){try{String message = "This is a first message!";Map<String,Object> m = commonMap(message);rabbitTemplate.convertAndSend("amq.direct","directRouting",m);}catch(Exception e){e.printStackTrace();}}@RequestMapping("/sendtofanout")public void sendToFanout(){try{String message = "This is a second message to Fanout!";Map<String,Object> m = commonMap(message);rabbitTemplate.convertAndSend("amq.fanout",null,m);}catch(Exception e){e.printStackTrace();}}@RequestMapping("/sendtotopic1")public void sendToTopicMan(){try{String message = "This is a first message to topic : Thing!";Map<String,Object> m = commonMap(message);rabbitTemplate.convertAndSend("amq.topic","topic.thing",m);}catch(Exception e){e.printStackTrace();}}@RequestMapping("/sendtotopic2")public void sendToTopicWoman(){try{String message = "This is a first message to topic : other!";Map<String,Object> m = commonMap(message);rabbitTemplate.convertAndSend("amq.topic","topic.asdasd",m);}catch(Exception e){e.printStackTrace();}}public Map<String,Object> commonMap(String message){String uuid = String.valueOf(UUID.randomUUID());String date = new SimpleDateFormat("yyyy-MM-dd HH:mm").format(new Date());Map<String,Object> m = new HashMap<>();m.put("messageId", uuid);m.put("createTime",date);m.put("message",message);return m;}
}

生产者和消费者都开发完成了,将两个项目都启动后,调用接口,往MQ中发送数据。

1.调用发送到直连交换机的接口

消费者端实时消费数据:

DirectListener接受到的消息为:{createTime=2024-02-25 13:55, messageId=cf0b8188-6fb4-43cb-86f2-ad0286889ceb, message=This is a first message!}

2.调用发送到扇形交换机的接口

FirstFanoutListener接受到的消息为:{createTime=2024-02-25 13:56, messageId=afe4206e-46e8-4e9c-9a99-d38d130b54e1, message=This is a second message to Fanout!}
SecondFanoutListener接受到的消息为:{createTime=2024-02-25 13:56, messageId=afe4206e-46e8-4e9c-9a99-d38d130b54e1, message=This is a second message to Fanout!}

两个监听器都实时消费了数据。

3.调用发送到主题交换机的两个接口

调用发送消息载体中携带topic.thing的接口:

SecondTopicListener{createTime=2024-02-25 14:02, messageId=7f412bbd-4032-476b-a578-cd7a0aa04e1d, message=This is a first message to topic : Man!}
FirstTopicListener接受到的消息为:{createTime=2024-02-25 14:02, messageId=7f412bbd-4032-476b-a578-cd7a0aa04e1d, message=This is a first message to topic : Man!}

明明topic.thing路由绑定的是队列firstTopicQueue,但是secondTopicQueue却也收到了,为什么呢?这是因为队列secondTopicQueue的路由键配置的是topic.*,那么句点后无论是什么都会和匹配成功。
若调用发送消息载体中携带topic.other的接口:

SecondTopicListener{createTime=2024-02-25 14:05, messageId=ad408aef-d678-4d06-be5c-7982403b3b48, message=This is a first message to topic : Woman!}

那么此刻只有队列2是匹配的。
以上就是简单的生产者与消费者的代码。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/3965.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Vue 组件单元测试深度探索:组件交互与状态变更 专业解析和实践

在Vue组件单元测试中&#xff0c;验证组件之间的交互&#xff08;如父组件与子组件、兄弟组件之间的通信&#xff09;以及状态变更的正确性对于保证整个应用的协调运作至关重要。本文详细介绍了父组件向子组件传递props、子组件向父组件发送事件、兄弟组件通过共享状态&#xf…

自然语言处理 (NLP) 的技术演变史

一、简述 本文的目标是了解自然语言处理 (NLP) 的历史&#xff0c;包括 Transformer 体系结构如何彻底改变该领域并帮助我们创建大型语言模型 (LLM)。 基础模型&#xff08;如 GPT-4&#xff09;是最先进的自然语言处理模型&#xff0c;旨在理解、生成人类语言并与之交互。 要理…

鸿蒙小案例-搜索高亮

搜索高亮目前官方也没有可以现成的组件&#xff0c;但是需求来了&#xff0c;怎么办&#xff0c;只能摸索着自己写一个 目前官方API中最接近的应该是 richText组件了&#xff0c;富文本组件&#xff0c;当然可以实现&#xff0c;但是有不少问题 1.大小调整太麻烦&#xff0c;跟…

MySQL中截取字符串有哪些方法

文章目录 一、SUBSTRING() 或 SUBSTR() 函数二、LEFT() 函数三、RIGHT() 函数四、使用字符串连接和定位函数截取五、 正则表达式截取六、SUBSTRING_INDEX() 函数&#xff1a; 在MySQL中&#xff0c;你可以使用多种方法来截取字符串。以下是一些常用的方法&#xff1a; 一、SUB…

焊接机器人-常见焊接工艺参数

常见焊接工艺参数 常见焊缝平焊立焊 常见焊接工艺调试方法注意事项 常见焊缝 常见的焊缝一般见于不需要坡口焊的规则钢构件&#xff1a;如H型钢、H型牛角杠、T型梁、弧形梁等。 平焊 参数Value电流(安培)200A电压(伏特 )20V摆弧-振幅(毫米)4-6mm摆弧- 频率(Hz)1Hz摆弧- 两侧…

国产3D自研技术如何突围?眸瑞科技给3D建设、管理带来全新模式

眸瑞科技是全球领先的数字孪生引擎技术及服务提供商&#xff0c;它专注于让一切3D模型在全网多端轻量化处理与展示&#xff0c;为行业数字化转型升级与数字孪生应用提供成套的国产自研3D可视化技术、产品与服务。 引言 眸瑞科技是全球领先的数字孪生引擎技术及服务提供商&…

【MyBatisPlus】一、公共字段填充配置

目录 一、实体类配置 二、配置MyBatis Plus元对象处理器 三、接口字段自动填充 在使用mybatisplus项目中设置公共字段填充&#xff0c;可以按如下进行配置 一、实体类配置 TableField(value "create_time",fill FieldFill.INSERT)private LocalDateTime createTime…

【C++】哈希思想

目录 哈希介绍&#xff1a; 一&#xff0c;位图 1-1&#xff0c;位图的认识 1-2&#xff0c;位图的简单实现 1-3&#xff0c;位图的应用 二&#xff0c;布隆过滤器 2-1&#xff0c;布隆过滤器的认识 2-2&#xff0c;布隆过滤器的简单实现 2-3&#xff0c;布隆过滤器的…

Kafka 3.x.x 入门到精通(06)——Kafka进阶

Kafka 3.x.x 入门到精通&#xff08;06&#xff09;&#x1f449;&#x1f449;&#x1f449;&#x1f449; Kafka进阶 3. Kafka进阶3.1 Controller选举3.2 Broker上线下线3.3 数据偏移量定位3.4 Topic删除3.5 日志清理和压缩3.7 页缓存3.8 零拷贝3.9 顺写日志3.10 Linux集群部…

Debian12使用宝塔国际aaPanel无法安装Docker

宝塔国际aaPanel自带安装Docker&#xff0c;安装了几次都失败&#xff0c;最后仔细看了安装日志&#xff0c;才发现其中的问题。 复制 --2023-11-28 13:42:13-- https://node.aapanel.com/install/0/docker_install_en.sh Resolving node.aapanel.com (node.aapanel.com)...…

Dockerfile镜像构建实战

一、构建Apache镜像 cd /opt/ #建立工作目录 mkdir /opt/apache cd apache/vim Dockerfile #基于的基础镜像 FROM centos:7 #维护镜像的用户信息 MAINTAINER this is apache image <cyj> #镜像操作指令安装Apache软件 RUN yum install -y httpd #开启80端口 EXPOSE 80 #…

Java23种设计模式-行为型模式之解释器模式

解释器模式&#xff08;Interpreter Pattern&#xff09;&#xff1a;定义了一种文法&#xff0c;并且对于任何该文法的句子&#xff0c;都能够解释和执行。可以将复杂的问题分解成一系列简单的表达式&#xff0c;然后使用解释器来解释这些表达式。 涉及角色&#xff1a; 抽象…

从零开始利用MATLAB进行FPGA设计(五)详解双口RAM

创作于谱仪算法设计过程中的数字能谱生成模块设计。 往期回顾&#xff1a; 从零开始利用MATLAB进行FPGA设计&#xff08;四&#xff09;生成优化HDL代码 从零开始利用MATLAB进行FPGA设计&#xff08;三&#xff09;将Simulink模型转化为定点数据类型 目录 1.关于双口RAM …

php反序列化字符串逃逸

字符串逃逸 字符串逃逸是通过改变序列化字符串的长度造成的php反序列化漏洞 一般是因为替换函数使得字符串长度发生变化&#xff0c;不论变长还是变短&#xff0c;原理都大致相同 在学习之前&#xff0c;要先了解序列化字符串的结构&#xff0c;在了解结构的基础上才能更好理解…

JavaScript算法|前 K 个高频元素、寻找峰值和合并区间、 搜索二维矩阵 II和计算右侧小于当前元素的个数

文章目录 前 K 个高频元素方法一方法二代码 寻找峰值方法一 取最大值方法二 暴力法方法三 二分法 合并区间方法一 合并重叠方法二 合并重叠 搜索二维矩阵 II方法一 暴力法方法二 相邻比较法 计算右侧小于当前元素的个数方法一 暴力法方法二 排序法 前 K 个高频元素 给定一个非…

前端工程化详解

目录 开发前1.使用脚手架工具2.使用编译工具 开发中1.代码规范2.公共方法抽离3.公共组件抽离4.公共样式抽离5.icon 图片 国际化文案 常量等静态数据规划管理6.业务模块区分7.项目版本管理工具8.开发IDE以及代码检查工具 开发结束1.单元测试2.项目打包3.项目发布 开发前 1.使用…

低代码信创开发核心技术(四)动态元数据系统设计

一、概述 在当今快速发展的信息技术领域&#xff0c;动态元数据系统扮演着至关重要的角色。它不仅能够提供数据的描述信息&#xff0c;还能动态地适应业务需求的变化&#xff0c;从而提高系统的灵活性和可扩展性。构建一个动态元数据系统意味着我们可以在不重启系统的情况下&a…

【机器学习】机器学习学习笔记 - 监督学习 - KNN线性回归岭回归 - 02

监督学习 KNN (k-nearest neighbors) KNN 是用 k 个最近邻的训练数据集来寻找未知对象分类的一种算法 from sklearn import neighbors# 分类 # 创建KNN分类器模型并进行训练 classifier neighbors.KNeighborsClassifier(num_neighbors, weightsdistance) classifier.fit(X,…

前端面试题合集

1.对前端监控的理解&#xff1f; 异常监控&#xff08;监控前端页面的报错&#xff09;> try / catch 、window.onerror、window.addEventListener、Vue.config.errorHandle JS 代码运行错误、语法错误等&#xff1b;AJAX 请求错误&#xff1b;静态资源加载错误&#xff1…

硬件21、接线端子XH2.54、2.54排针排母、2510接插件、PH2.0、町洋接线端子5.08、ISP接口JTAG插座

XH2.54端子的间距为2.54毫米&#xff0c;2.54排针排母的间距也是2.54mm&#xff0c;2510接插件也是2.54、而PH2.0端子的间距为2.0毫米&#xff0c;町洋接线端子插针间的距离是5.08mm&#xff0c;ISP接口JTAG插座针脚的间距一般也是2.54mm XH2.54 针脚间距为2.54mm 插头 接线…