Springboot-RabbitMQ 消息队列使用

一、概念介绍:

RabbitMQ中几个重要的概念介绍:

  • Channels:信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的 TCP 连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。

  • Exchanges:交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。

    • 交换机类型主要有以下几种:
    • Direct Exchange(直连交换机):这种类型的交换机根据消息的Routing Key(路由键)进行精确匹配,只有绑定了相同路由键的队列才会收到消息。适用于点对点的消息传递场景。
    • Fanout Exchange(扇形交换机):这种类型的交换机采用广播模式,它会将消息发送给所有绑定到该交换机的队列,不管消息的路由键是什么。适用于消息需要被多个消费者处理的场景。
    • Topic Exchange(主题交换机):这种类型的交换机支持基于模式匹配的路由键,可以使用通配符*(匹配一个单词)和#(匹配零个或多个单词)进行匹配。适用于实现更复杂的消息路由逻辑。
    • Headers Exchange(头交换机):这种类型的交换机不处理路由键,而是根据发送的消息内容中的headers属性进行匹配。适用于需要在消息头中携带额外信息的场景。
  • Queues:消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

二、引入依赖:

 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

三、添加配置信息

spring:rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guestlistener:simple:acknowledge-mode: manual  # 手动提交

四、Direct Exchange(直连交换机)模式

1、新建配置文件 RabbitDirectConfig类

package com.example.direct;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author wasin* @version 1.0* @date 2024/6/4* @description: 直连交换机--这种类型的交换机根据消息的Routing Key(路由键)进行精确匹配,* 只有绑定了相同路由键的队列才会收到消息。适用于点对点的消息传递场景*/
@Configuration
public class RabbitDirectConfig {/*** 队列名称*/public static final String QUEUE_MESSAGE ="QUEUE_MESSAGE";public static final String QUEUE_USER ="QUEUE_USER";/*** 交换机*/public static final String EXCHANGE="EXCHANGE_01";/*** 路由*/public static final String ROUTING_KEY="ROUTING_KEY_01";@Beanpublic Queue queue01() {return new Queue(QUEUE_MESSAGE, //队列名称true, //是否持久化false, //是否排他false //是否自动删除);}@Beanpublic Queue queue02() {return new Queue(QUEUE_USER, //队列名称true, //是否持久化false, //是否排他false //是否自动删除);}@Beanpublic DirectExchange exchange01() {return new DirectExchange(EXCHANGE,true, //是否持久化false //是否排他);}@Beanpublic Binding demoBinding() {return BindingBuilder.bind(queue01()).to(exchange01()).with(ROUTING_KEY);}@Beanpublic Binding demoBinding2() {return BindingBuilder.bind(queue02()).to(exchange01()).with(ROUTING_KEY);}
}

2、添加消息生产者 Producer类

package com.example.direct;import com.example.entity.User;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;/*** @author wasin* @version 1.0* @date 2024/6/4* @description:*/
@Component
public class Producer {@ResourceRabbitTemplate rabbitTemplate;public void sendMessageByExchangeANdRoute(String message){rabbitTemplate.convertAndSend(RabbitDirectConfig.EXCHANGE, RabbitDirectConfig.ROUTING_KEY,message);}/*** 默认交换器,隐式地绑定到每个队列,路由键等于队列名称。* @param message*/public void sendMessageByQueue(String message){rabbitTemplate.convertAndSend(RabbitDirectConfig.QUEUE_MESSAGE,message);}public void sendMessage(User user){rabbitTemplate.convertAndSend(RabbitDirectConfig.QUEUE_USER,user);}
}

3、添加消息消费者

package com.example.direct;import com.example.entity.User;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** @author wasin* @version 1.0* @date 2024/6/4* @description:*/
@Component
public class Consumer {@RabbitListener(queues = RabbitDirectConfig.QUEUE_USER)public void onMessage(User user){System.out.println("收到的实体bean消息:"+user);}@RabbitListener(queues = RabbitDirectConfig.QUEUE_MESSAGE)public void onMessage2(String message){System.out.println("收到的字符串消息:"+message);}
}

4、 测试

package com.example;import com.example.entity.User;
import com.example.direct.Producer;
import com.example.fanout.FanoutProducer;
import com.example.topic.TopicProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@SpringBootTest
class SpringbootRabbitMqApplicationTests {@ResourceProducer producer;@Testpublic void sendMessage() throws InterruptedException {producer.sendMessageByQueue("哈哈");producer.sendMessage(new User().setAge(10).setName("wasin"));}
}

五、Topic Exchange(主题交换机)模式

1、新建RabbitTopicConfig类

package com.example.topic;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author wasin* @version 1.0* @date 2024/6/4* @description: 主题交换机--这种类型的交换机支持基于模式匹配的路由键,* 可以使用通配符*(匹配一个单词)和#(匹配零个或多个单词)进行匹配。适用于实现更复杂的消息路由逻辑。*/
@Configuration
public class RabbitTopicConfig {/*** 交换机*/public static final String EXCHANGE = "EXCHANGE_TOPIC1";/*** 队列名称*/public static final String QUEUE_TOPIC1 = "QUEUE_TOPIC";/*** 路由* "*" 与 "#",用于做模糊匹配。其中 "*" 用于匹配一个单词,"#" 用于匹配多个单词(可以是零个)* 可以匹配 aa.wasin.aa.bb  wasin.aa.bb  wasin.aa ....* aa.bb.wasin.cc 无法匹配*/public static final String ROUTING_KEY1 = "*.wasin.#";@Beanpublic Queue queue() {return new Queue(QUEUE_TOPIC1, //队列名称true, //是否持久化false, //是否排他false //是否自动删除);}@Beanpublic TopicExchange exchange() {return new TopicExchange(EXCHANGE,true, //是否持久化false //是否排他);}@Beanpublic Binding binding() {return BindingBuilder.bind(queue()).to(exchange()).with(ROUTING_KEY1);}
}

2、新建 消息生产者和发送者

  • TopicProducer类
package com.example.topic;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;/*** @author wasin* @version 1.0* @date 2024/6/4* @description:*/
@Component
public class TopicProducer {@ResourceRabbitTemplate rabbitTemplate;/*** @param routeKey 路由* @param message 消息*/public void sendMessageByQueue(String routeKey, String message){rabbitTemplate.convertAndSend(RabbitTopicConfig.EXCHANGE,routeKey,message);}}
  • TopicConsumer类
package com.example.topic;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** @author wasin* @version 1.0* @date 2024/6/4* @description:*/
@Slf4j
@Component
public class TopicConsumer {@RabbitListener(queues = RabbitTopicConfig.QUEUE_TOPIC1)public void onMessage2(String message){log.info("topic收到的字符串消息:{}",message);}
}

六、Fanout Exchange(扇形交换机)模式

1、 新建 RabbitFanoutConfig类

package com.example.fanout;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author wasin* @version 1.0* @date 2024/6/4* @description: 扇形交换机--这种类型的交换机采用广播模式,它会将消息发送给所有绑定到该交换机的队列,* 不管消息的路由键是什么。适用于消息需要被多个消费者处理的场景。*/
@Configuration
public class RabbitFanoutConfig {/*** 交换机*/public static final String EXCHANGE = "EXCHANGE_FANOUT";/*** 队列名称*/public static final String QUEUE_FANOUT1 = "QUEUE_FANOUT";/*** 队列名称*/public static final String QUEUE_FANOUT2 = "QUEUE_FANOUT2";@Beanpublic Queue queueFanout1() {return new Queue(QUEUE_FANOUT1, //队列名称true, //是否持久化false, //是否排他false //是否自动删除);}@Beanpublic Queue queueFanout2() {return new Queue(QUEUE_FANOUT2, //队列名称true, //是否持久化false, //是否排他false //是否自动删除);}@Beanpublic FanoutExchange exchangeFanout() {return new FanoutExchange(EXCHANGE,true, //是否持久化false //是否排他);}@Beanpublic Binding bindingFanout() {return BindingBuilder.bind(queueFanout1()).to(exchangeFanout());}@Beanpublic Binding bindingFanout2() {return BindingBuilder.bind(queueFanout2()).to(exchangeFanout());}}

2、新建 消息生产者和发送者

  • FanoutProducer类:
package com.example.fanout;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;/*** @author wasin* @version 1.0* @date 2024/6/4* @description:*/
@Component
public class FanoutProducer {@ResourceRabbitTemplate rabbitTemplate;/*** @param message 消息*/public void sendMessageByQueue(String message) {rabbitTemplate.convertAndSend(RabbitFanoutConfig.EXCHANGE, "", message);}}
  • FanoutConsumer类
package com.example.fanout;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;import java.io.IOException;/*** @author wasin* @version 1.0* @date 2024/6/4* @description:*/
@Slf4j
@Component
public class FanoutConsumer {/*** 手动提交* @param message* @param channel* @param tag* @throws IOException*/@RabbitListener(queues = RabbitFanoutConfig.QUEUE_FANOUT1)public void onMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {log.info("fanout1收到的字符串消息:{}",message);channel.basicAck(tag,false);}@RabbitListener(queues = RabbitFanoutConfig.QUEUE_FANOUT2)public void onMessage2(String message){log.info("fanout2到的字符串消息:{}",message);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/22249.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2021 hnust 湖科大 数字系统设计与VHDL课程 大作业 - 出租车计价器设计

2021 hnust 湖科大 数字系统设计与VHDL课程大作业-出租车计价器设计 描述 大二上的eda考查课的实验&#xff0c;额外实现了停车等待2分钟后收费1元/min。内含项目文件&#xff08;实测可运行&#xff09;&#xff0c;代码&#xff0c;报告&#xff0c;视频和照片&#xff0c;…

JavaScript函数定义,函数参数,函数调用

JavaScript函数定义&#xff1a; 在JavaScript中&#xff0c;我们可以使用关键字function来定义一个函数。函数定义的一般语法如下&#xff1a; function functionName(parameter1, parameter2, ...){// 函数体 }其中&#xff0c;functionName是函数的名称&#xff0c;可以自定…

功能强大且专业的PDF转换软件PDF Shaper Professional 14.2

PDF Shaper Professional是一款适用于Windows的程序&#xff0c;可让您在计算机上处理PDF文件。 要开始使用PDF Shaper Professional&#xff0c;您需要在Windows计算机上下载并安装该程序。您还应该有合适的驱动程序和编解码器来处理计算机上的文本和图形。 安装程序后&#…

分享一份糟糕透顶的简历,看看跟你写的一样不

最近看了一个人的简历&#xff0c;怎么说呢&#xff0c;前几年这么写没问题&#xff0c;投出去就有回复&#xff0c;但从现在开始&#xff0c;这么写肯定不行了。下面我给大家分享一下内容&#xff1a; 目录 &#x1f926;‍♀️这是简历文档截图 &#x1f937;‍♀️这是基本…

淘宝评论API调用指南,让你购物不再困扰

一、淘宝评论API概述 淘宝评论API是淘宝开放平台提供的一种服务&#xff0c;它允许开发者通过调用API接口获取淘宝商品评论数据&#xff0c;联讯数据从而为用户提供更加丰富和实用的购物决策信息。通过使用淘宝评论API&#xff0c;开发者可以轻松地实现以下功能&#xff1a; …

SwiftUI 利用 Swizz 黑魔法为系统创建的默认对象插入新协议方法(二)

功能需求 在 SwiftUI 的开发中,我们往往需要借助底层 UIKit 的“上帝之手”来进一步实现额外的定制功能。比如,在可拖放(Dragable)SwiftUI 的实现中,会缺失拖放取消的回调方法让我们这些秃头码农们“欲哭无泪” 如上图所示,我们在拖放取消时将界面中的一切改变都恢复如初…

slf4j等多个jar包冲突绑定的排查方法使用IDEA的maven help解决

1.安装 2.使用maven help解决&#xff0c;找到对应包存在的冲突 使用exclude直接解决即可

【人工智能】第四部分:ChatGPT的技术实现

人不走空 &#x1f308;个人主页&#xff1a;人不走空 &#x1f496;系列专栏&#xff1a;算法专题 ⏰诗词歌赋&#xff1a;斯是陋室&#xff0c;惟吾德馨 目录 &#x1f308;个人主页&#xff1a;人不走空 &#x1f496;系列专栏&#xff1a;算法专题 ⏰诗词歌…

dnf手游版游玩感悟

dnf手游于5月21号正式上线&#xff0c;作为一个dnf端游老玩家&#xff0c;并且偶尔上线ppk&#xff0c;自然下载了手游版&#xff0c;且玩了几天。 不得不说dnf手游的优化做到了极好的程度。 就玩法系统这块&#xff0c;因为dnf属于城镇地下城模式&#xff0c;相比…

前端工程化工具系列(三)—— Stylelint(v16.6.1):CSS/SCSS 代码质量工具

Stylelint 是 CSS/SCSS 的静态分析工具&#xff0c;用于检查其中的违规和错误。 1. 环境要求 v16 以上的 Stylelint&#xff0c;支持 Node.js 的版本为 v18.12.0。 在命令行工具中输入以下内容后回车&#xff0c;来查看当前系统中 Node.js 的版本。 node -vNode.js 推荐使用…

Shell脚本快速入门

为什么要学shell&#xff1f;能做什么&#xff1f; 答&#xff1a;CI/CD 持续集成&#xff0c;自动化部署作业方式&#xff0c;需要将一系列linux命令程序化&#xff0c;shell 就能做到。

13. 《C语言》——【strlen函数的使用和模拟实现】

文章目录 前言strlen函数strlen函数的使用strlen函数的3种方法实现方法1方法2方法3 总结 前言 各位老板好~ &#xff0c; 今天我们讲解strlen函数如何去使用以及如何去模拟实现strlen函数。希望各位老板能够给一个点赞和一个大大的关注&#xff0c;感谢各位老板&#xff01;str…

塑料焊接机熔深对激光焊接质量有什么影响

塑料焊接机的熔深对焊接质量具有直接且显著的影响。以下是熔深对焊接质量影响的详细解释&#xff1a; 1. 焊接强度&#xff1a;熔深直接决定了焊缝的截面积&#xff0c;从而影响焊接接头的强度。较深的熔深意味着焊缝的截面积更大&#xff0c;可以提供更强的结合力&#xff0c;…

OpenStreetMap部署(OSM)

参考&#xff1a;https://github.com/openstreetmap/openstreetmap-website/blob/master/DOCKER.md OpenStreeMap 部署 操作系统建议使用 Ubuntu 22 版本 安装 Docker # 更新软件包索引&#xff1a; sudo apt-get update # 允许APT使用HTTPS&#xff1a; sudo apt-get inst…

【计算机组成原理】详谈计算机发展历程

计算机发展历程 导读一、计算机的诞生1.1 历史背景1.2 计算机的发明 二、计算机硬件的发展1.1 计算机的四代变化1.1.1 第一代计算机bug的由来 1.1.2 第二代计算机1.1.3 第三代计算机半导体存储器的发展 1.1.4 第四代计算机 1.2 个人计算机的发展1.2.1 微处理器的发展1.2.2 个人…

AIGC之Stable Diffusion Web Ui 初体验

前言 Stable Diffusion辣么火&#xff0c;同学你确定不尝试一下嘛&#xff1f; 纯代码学习版本搞啦&#xff0c;Web Ui 也得试试咧 网上有很多安装Stable Diffusion Web Ui 的介绍了&#xff0c;我在这说一下我的踩坑记录 想安装的同学&#xff0c;看这个链接 万字长文&#x…

U-Net: Convolutional Networks for Biomedical Image Segmentation--论文笔记

U-Net: Convolutional Networks for Biomedical Image Segmentation 资料 1.代码地址 2.论文地址 https://arxiv.org/pdf/1505.04597 3.数据集地址 论文摘要的翻译 人们普遍认为&#xff0c;深度网络的成功训练需要数千个带注释的训练样本。在本文中&#xff0c;我们提出…

44-5 waf绕过 - SQL注入绕WAF方法

环境准备: 43-5 waf绕过 - 安全狗简介及安装-CSDN博客然后安装sqlilabs靶场:构建完善的安全渗透测试环境:推荐工具、资源和下载链接_渗透测试靶机下载-CSDN博客 一、双写绕过 打开sql靶场的第一关:http://127.0.0.1/sqli-labs-master/Less-1/?id=1 验证一下waf是否开启防…

C\C++内存管理(未完结)

文章目录 一.C\C内存分布二.C语言中动态内存管理方式&#xff1a;malloc/calloc/realloc/free三.C内存管理方式3.1.new/delete操作内置类型3.2.new和delete操作自定义类型 四.operator new与operator delete函数&#xff08;重要点进行讲解&#xff09;4.1. operator new与oper…

npm install 出错,‘proxy‘ config is set properly. See: ‘npm help config‘

背景 从远程clone下项目之后&#xff0c;使用命令 npm install 安装依赖&#xff0c;报错如下 意为&#xff1a; 报错&#xff1a; npm犯错!network与网络连通性有关的问题。 npm犯错!网络在大多数情况下&#xff0c;你背后的代理或有坏的网络设置。 npm犯错!网络 npm犯错…