SpringBoot集成多个rabbitmq

1、pom文件

<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.4.9</version>
</dependency>

2、rabbitmq的连接配置文件

spring:rabbitmq:mq1:host: xxx.xxx.xxx.xxxport: 5672username: xxxxpassword: xxxxxenable: truemq2:host: xxx.xxx.xxx.xxxport: 5672username: xxxxxpassword: xxxxxenable: true

3、mq1的相关代码  MQ1RabbitConfiguration.java

package com.pojo.config;import lombok.Data;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;@Data
@Component("mq1RabbitmqConfig")
@ConfigurationProperties(prefix = "spring.rabbitmq.mq1") //读取mq1的配置信息
@ConditionalOnProperty(name = "spring.rabbitmq.mq1.enable", havingValue = "true") //是否启用
public class MQ1RabbitConfiguration {private String host;private Integer port;private String username;private String password;@Autowiredprivate ReturnCallBack1 returnCallBack1;@Autowiredprivate ConfirmCallBack1 confirmCallBack1;@Bean(name = "mq1ConnectionFactory")//命名mq1的ConnectionFactory,如果项目中只有一个mq则不必如此@Primarypublic ConnectionFactory createConnectionFactory() {//消息队列1的连接CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setHost(host);connectionFactory.setPort(port);connectionFactory.setUsername(username);connectionFactory.setPassword(password);//开启发送到交换机和队列的回调connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);return connectionFactory;}@Bean(name = "mq1RabbitTemplate")//命名mq1的RabbitTemplate,如果项目中只有一个mq则不必如此@Primarypublic RabbitTemplate brainRabbitTemplate(@Qualifier("mq1ConnectionFactory") ConnectionFactory connectionFactory) {//消息生产RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);//发送消息时设置强制标志,仅当提供了returnCallback时才适用rabbitTemplate.setMandatory(true);//确保消息是否发送到交换机,成功与失败都会触发rabbitTemplate.setConfirmCallback(confirmCallBack1);//确保消息是否发送到队列,成功发送不触发,失败触发rabbitTemplate.setReturnsCallback(returnCallBack1);return rabbitTemplate;}@Bean(name = "simpleRabbitListenerContainerFactory1")@Primarypublic SimpleRabbitListenerContainerFactory firstFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,@Qualifier("mq1ConnectionFactory") ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();configurer.configure(factory, connectionFactory);return factory;}@Bean(name = "subQueue01")public Queue firstQueue() {return new Queue("subQueue01");}@Bean(name = "subQueue02")public Queue secondQueue() {return new Queue("subQueue02");}@Bean(name = "subQueue03")public Queue thirdQueue() {return new Queue("subQueue03", true);}@Bean(name = "subQueue04")public Queue fourQueue() {return new Queue("subQueue04", true);}@Bean(name = "topicExchangeOne")public TopicExchange topicExchange() {
//        Direct exchange(直连交换机)
//        Fanout exchange(扇型交换机)
//        Topic exchange(主题交换机)
//        Headers exchange(头交换机)
//        Dead Letter Exchange(死信交换机)return new TopicExchange("topicExchangeOne");}@Bean(name = "binding1")public Binding binding1(@Qualifier("subQueue01") Queue queue, TopicExchange exchange) {//绑定队列1到TopicExchange  routingKey是队列1的队列名return BindingBuilder.bind(queue).to(exchange).with("subQueue01");}@Bean(name = "fanoutExchangeOne")public FanoutExchange fanoutExchange() {
//        Direct exchange(直连交换机)
//        Fanout exchange(扇型交换机)
//        Topic exchange(主题交换机)
//        Headers exchange(头交换机)
//        Dead Letter Exchange(死信交换机)return new FanoutExchange("fanoutExchangeOne");}@Bean(name = "binding3")public Binding binding3(@Qualifier("subQueue03") Queue queue, FanoutExchange exchange) {//绑定队列3到fanoutExchange  队列3和队列4都能消费fanoutExchange的消息return BindingBuilder.bind(queue).to(exchange);}@Bean(name = "binding4")public Binding binding4(@Qualifier("subQueue04") Queue queue, FanoutExchange exchange) {//绑定队列4到fanoutExchange  队列3和队列4都能消费fanoutExchange的消息return BindingBuilder.bind(queue).to(exchange);}}

ConfirmCallBack1 .java

package com.pojo.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class ConfirmCallBack1 implements RabbitTemplate.ConfirmCallback {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String s) {if (!ack) {log.info("ConfirmCallBack1消息发送交换机失败:{}", s);} else {log.info("ConfirmCallBack1消息发送交换机成功");}}
}
ReturnCallBack1.java
package com.pojo.config;import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class ReturnCallBack1 implements RabbitTemplate.ReturnsCallback {@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.info("ReturnCallBack1消息发送队列失败:{}", JSON.toJSON(returnedMessage));}
}

4、mq2的相关代码

  MQ2RabbitConfiguration.java

package com.pojo.config;import lombok.Data;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;@Data
@Component("mq2RabbitmqConfig")
@ConfigurationProperties(prefix = "spring.rabbitmq.mq2") //读取mq1的配置信息
@ConditionalOnProperty(name = "spring.rabbitmq.mq2.enable", havingValue = "true") //是否启用
public class MQ2RabbitConfiguration {private String host;private Integer port;private String username;private String password;@Autowiredprivate ReturnCallBack2 returnCallBack2;@Autowiredprivate ConfirmCallBack2 confirmCallBack2;@Bean(name = "mq2ConnectionFactory")   //命名mq1的ConnectionFactory,如果项目中只有一个mq则不必如此public ConnectionFactory createConnectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setHost(host);connectionFactory.setPort(port);connectionFactory.setUsername(username);connectionFactory.setPassword(password);//开启发送到交换机和队列的回调connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);return connectionFactory;}@Bean(name = "mq2RabbitTemplate") //命名mq1的RabbitTemplate,如果项目中只有一个mq则不必如此public RabbitTemplate brainRabbitTemplate(@Qualifier("mq2ConnectionFactory") ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);//发送消息时设置强制标志,仅当提供了returnCallback时才适用rabbitTemplate.setMandatory(true);//确保消息是否发送到交换机,成功与失败都会触发rabbitTemplate.setConfirmCallback(confirmCallBack2);//确保消息是否发送到队列,成功发送不触发,失败触发rabbitTemplate.setReturnsCallback(returnCallBack2);return rabbitTemplate;}@Bean(name = "simpleRabbitListenerContainerFactory2")public SimpleRabbitListenerContainerFactory secondFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,@Qualifier("mq2ConnectionFactory") ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();configurer.configure(factory, connectionFactory);return factory;}}

ConfirmCallBack2.java

package com.pojo.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class ConfirmCallBack2 implements RabbitTemplate.ConfirmCallback {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String s) {if (!ack) {log.info("ConfirmCallBack2消息发送交换机失败:{}", s);} else {log.info("ConfirmCallBack2消息发送交换机成功");}}
}

ReturnCallBack2.java

package com.pojo.config;import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class ReturnCallBack2 implements RabbitTemplate.ReturnsCallback {@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.info("ReturnCallBack2消息发送队列失败:{}", JSON.toJSON(returnedMessage));}
}

5、消息生产者

package com.pojo.prj.controller;import com.pojo.common.anno.NoNeedLogin;
import com.pojo.common.base.ApplicationContextUtils;
import com.pojo.common.base.BaseController;
import com.pojo.util.ResponseResult;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;/*** <p>* 项目表 前端控制器* </p>** @author zhushangjin* @menu 项目管理* @since 2022-11-14*/
@RestController
@Slf4j
public class ProjectController extends BaseController {@Resource(name = "mq1RabbitTemplate")//初始化mq1的RabbitTemplate对象private RabbitTemplate mq1RabbitTemplate;@Resource(name = "mq2RabbitTemplate")//初始化mq1的RabbitTemplate对象private RabbitTemplate mq2RabbitTemplate;/*** 获取项目下拉列表** @return* @status done*/@GetMapping("/prj/project/list")@NoNeedLoginpublic ResponseResult<String> list() {String active = ApplicationContextUtils.getActiveProfile();logger.error(ApplicationContextUtils.getActiveProfile());return ResponseResult.ok("ReturnCallBack2");}@GetMapping("/prj/project/test1")public ResponseResult test1() {//发送到topicExchangeOne类型的交换机,根据routekey去找发送到哪个队列里,// 只有这一个队列才能收到这条消息String str = "test1test1test1test1test1";mq1RabbitTemplate.convertAndSend("topicExchangeOne","subQueue01", str);return buildResponseResult(true);}@GetMapping("/prj/project/test2")public ResponseResult test2() {//发送到direct类型的交换机,根据routekey去找发送到哪个队列里,//只有这一个队列才能收到这条消息mq2RabbitTemplate.convertAndSend("subQueue02", "test2test2test2test2test2");return buildResponseResult(true);}@GetMapping("/prj/project/test3")public ResponseResult test3() {//发送到fanout类型的交换机,跟这个交换机绑定的队列都会收到这一条消息,// 故第二个参数routekey无需填写mq1RabbitTemplate.convertAndSend("fanoutExchangeOne", null, "test3test3test3test3test3");return buildResponseResult(true);}}

6、消息消费者

Receiver1.java

package com.pojo.config;import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@RabbitListener(queues = "subQueue01", containerFactory = "simpleRabbitListenerContainerFactory1")
public class Receiver1 {@RabbitHandler(isDefault = true)public void process(String hello) {System.out.println("Receiver1: " + hello);}}

Receiver2.java

package com.pojo.config;import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@RabbitListener(queues = "subQueue02", containerFactory = "simpleRabbitListenerContainerFactory2")
public class Receiver2 {@RabbitHandler(isDefault = true)public void process(String hello) {System.out.println("Receiver2: " + hello);}}

Receiver3.java

package com.pojo.config;import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@RabbitListener(queues = "subQueue03", containerFactory = "simpleRabbitListenerContainerFactory1")
public class Receiver3 {@RabbitHandler(isDefault = true)public void process(String hello) {System.out.println("Receiver3 : " + hello);}}

Receiver4.java

package com.pojo.config;import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@RabbitListener(queues = "subQueue04", containerFactory = "simpleRabbitListenerContainerFactory1")
public class Receiver4 {@RabbitHandler(isDefault = true)public void process(String hello) {System.out.println("Receiver4 : " + hello);}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/60157.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

第二十五章 TCP 客户端 服务器通信 - TCP 设备的 READ 命令

文章目录 第二十五章 TCP 客户端 服务器通信 - TCP 设备的 READ 命令TCP 设备的 READ 命令READ 修改 $ZA 和 $ZB$ZA 和 READ 命令 第二十五章 TCP 客户端 服务器通信 - TCP 设备的 READ 命令 TCP 设备的 READ 命令 从服务器或客户端发出 READ 命令以读取客户端或服务器设置的…

【K8S系列】Kubernetes Pod节点ImagePullBackOff 状态及解决方案详解【已解决】

在 Kubernetes 中&#xff0c;当某个 Pod 的容器无法从指定的镜像仓库拉取镜像时&#xff0c;Pod 的状态会变为 ImagePullBackOff。这通常是因为指定的镜像不存在、镜像标签错误、认证失败或网络问题等原因。 以下是关于 ImagePullBackOff 的详细分析及解决方案。 1. ImagePull…

VMware虚拟机(Ubuntu或centOS)共享宿主机网络资源

VMware虚拟机(Ubuntu或centOS)共享宿主机网络资源 由于需要在 Linux 环境下进行一些测试工作&#xff0c;于是决定使用 VMware 虚拟化软件来安装 Ubuntu 24.04 .1操作系统。考虑到测试过程中需要访问 Github &#xff0c;要使用Docker拉去镜像等外部网络资源&#xff0c;因此产…

前列腺分割:基于边界加权(解决弱边界)、域自适应(少样本)

前列腺分割&#xff1a;基于边界加权&#xff08;解决弱边界&#xff09;、域自适应&#xff08;少样本&#xff09; 理解发现规律论文大纲观察1. 观察行为2. 变量分析3. 假设提出4. 验证过程 解法拆解 论文&#xff1a;Boundary-weighted Domain Adaptive Neural Network for …

鼠标绘制轮廓

需要对label进行提升&#xff0c;新建MyLabel类&#xff0c;并将其提升到label控件上&#xff0c;详见上篇控件提升 mylabelmouse.h #pragma once #include <QtWidgets/QMainWindow> #include "ui_mylabelmouse.h" #include <QMenu> #include "My…

C语言-详细讲解-冒泡排序与选择排序

1.冒泡排序 冒泡排序是一种比较简单的排序算法。它重复地走访要排序的数列&#xff0c;一次比较两个元素&#xff0c;如果它们的顺序错误就把它们交换过来。走访数列的工作是重复地进行直到没有再需要交换&#xff0c;也就是说该数列已经排序完成。这个名字的由来是因为越小&a…

MATLAB常见数学运算函数

MATLAB中含有许多有用的函数,可以随时调用。 a b s abs abs函数 a b s abs abs函数在MATLAB中可以求绝对值,也可以求复数的模长:c e i l ceil ceil函数 向正无穷四舍五入(如果有小数,就向正方向进一)f l o o r floor floor函数 向负无穷四舍五入(如果有小数,就向负方向…

SpringBoot 集成 Sharding-JDBC(一):数据分片

在深入探讨 Sharding-JDBC 之前&#xff0c;建议读者先了解数据库分库分表的基本概念和应用场景。如果您还没有阅读过相关的内容&#xff0c;可以先阅读我们之前的文章&#xff1a; 关系型数据库海量数据存储策略-CSDN博客 这篇文章将帮助您更好地理解分库分表的基本原理和实现…

go-zero(六) JWT鉴权

go-zero JWT鉴权 还记得我们之前登录功能&#xff0c;返回的信息是token吗&#xff1f; 这个token其实就是JSON Web Token简称JWT,它是一种开放标准&#xff08;RFC 7519&#xff09;&#xff0c;用于在网络应用环境间安全地传递声明信息。 它是一种基于 JSON 的令牌&#xf…

Flink是如何实现 End-To-End Exactly-once的?

flink 如何实现端到端的 Exactly-once? 端到端包含 Source, Transformation,Sink 三部分的Exactly-once Source&#xff1a;支持数据的replay&#xff0c;如Kafka的offset。Transformation&#xff1a;借助于checkpointSink&#xff1a;Checkpoint 两阶段事务提交 两阶段提…

ZYNQ程序固化——ZYNQ学习笔记7

一、ZYNQ启动过程 二、 SD卡启动实操 1、对ZYNQ进行配置添加Flash 2、添加SD卡 3、重新生成硬件信息 4、创建vitis工程文件 5、勾选板级支持包 6、对系统工程进行整体编译&#xff0c;生成两个Debug文件&#xff0c;如图所示。 7、插入SD卡&#xff0c;格式化为 8、考入BOOT.…

python移动鼠标但不显示移动轨迹

如果需要完全隐藏鼠标的移动轨迹&#xff0c;可以尝试通过以下两种方法解决&#xff1a; 方法 1&#xff1a;直接移动到目标点进行点击 通过直接设置鼠标的最终位置而不模拟移动&#xff0c;可以避免显示鼠标的移动轨迹。以下是优化代码&#xff1a; import pyautogui import…

进程其他知识点

/* #include <stdlib.h> void exit(int status); #include <unistd.h> void _exit(int status); status 参数&#xff1a;是进程退出时的一个状态信息。父进程回收子进程资源的时候可以获取到。 */ #include <stdio.h> #include <stdlib.h> #include &…

Go语言内存分配源码分析学习笔记

大家好&#xff0c;我是V 哥。GO GO GO&#xff0c;今天来说一说Go语言内存分配问题&#xff0c;Go语言内存分配的源码主要集中在runtime包中&#xff0c;它实现了Go语言的内存管理&#xff0c;包括初始化、分配、回收和释放等。下面来对这些过程详细分析一下&#xff0c;先赞后…

Android ART知多少?

Android 虚拟机 ART&#xff08;Android Runtime&#xff09;是 Android 平台上的应用程序运行时环境&#xff0c;用于执行应用程序的字节码。ART 自 Android 5.0&#xff08;Lollipop&#xff09;开始取代了 Dalvik&#xff0c;成为 Android 的默认运行时环境。本文将从以下几…

C++ —— 剑斩旧我 破茧成蝶—C++11

江河入海&#xff0c;知识涌动&#xff0c;这是我参与江海计划的第2篇。 目录 1. C11的发展历史 2. 列表初始化 2.1 C98传统的{} 2.2 C11中的{} 2.3 C11中的std::initializer_list 3. 右值引用和移动语义 3.1 左值和右值 3.2 左值引用和右值引用 3.3 引用延长生命周期…

使用ufw配置防火墙,允许特定范围IP访问

在ubuntu上使用 ufw&#xff08;Uncomplicated Firewall&#xff09;允许特定 IP 地址或子网访问某个端口&#xff08;如 22 端口&#xff09;非常简单。以下是具体的步骤&#xff1a; 1. 安装 UFW&#xff08;如果尚未安装&#xff09; 首先&#xff0c;确保 ufw 已经安装。…

推荐15个2024最新精选wordpress模板

以下是推荐的15个2024年最新精选WordPress模板&#xff0c;轻量级且SEO优化良好&#xff0c;适合需要高性能网站的用户。中文wordpress模板适合搭建企业官网使用。英文wordpress模板&#xff0c;适合B2C网站搭建&#xff0c;功能强大且兼容性好&#xff0c;是许多专业外贸网站的…

(计算机毕设)基于SpringBoot+Vue的房屋租赁系统的设计与实现

博主可接毕设设计&#xff01;&#xff01;&#xff01; 各种毕业设计源码只要是你有的题目我这里都有源码 摘 要 社会的发展和科学技术的进步&#xff0c;互联网技术越来越受欢迎。网络计算机的生活方式逐渐受到广大人民群众的喜爱&#xff0c;也逐渐进入了每个用户的使用。互…

python蓝桥杯刷题2

1.最短路 题解&#xff1a;这个采用暴力枚举&#xff0c;自己数一下就好了 2.门牌制作 题解&#xff1a;门牌号从1到2020&#xff0c;使用for循环遍历一遍&#xff0c;因为range函数无法调用最后一个数字&#xff0c;所以设置成1到2021即可&#xff0c;然后每一次for循环&…