springboot与rabbitmq的整合【演示5种基本交换机】

前言
👏作者简介:我是笑霸final,一名热爱技术的在校学生。
📝个人主页:个人主页1 || 笑霸final的主页2
📕系列专栏:后端专栏
📧如果文章知识点有错误的地方,请指正!和大家一起学习,一起进步👀
🔥如果感觉博主的文章还不错的话,👍点赞👍 + 👀关注👀 + 🤏收藏🤏

话不多说 直接开干

目录

  • 一 导入maven坐标与配置
  • 二、直连交换机direct exchange
    • 2.1配置类QueueConfig
    • 2.2消息提供者
    • 2.2消息消费者
    • 2.3测试类
  • 三、默认交换机default exchange
    • 3.1配置类和消息提供者
    • 3.2消息消费者
    • 3.3测试结果
  • 四、扇型交换机fanout exchange
    • 4.1配置类
    • 4.2消息提供者
    • 4.3消息消费者
    • 4.4测试类
  • 五、主题交换机topic exchanges
    • 5.1配置类
    • 5.2消息提供者
    • 5.3消息消费者
    • 5.4测试
  • 六、头交换机 headers exchange
    • 6.1配置类
    • 6.2创建消息提供者
    • 6.3消息消费者
    • 6、4测试结果

一 导入maven坐标与配置

<!--rabbitmq--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

基础配置文件

spring:rabbitmq:username: 你的用户名password: 你的密码host: rabbitmq安装的主机的 ip地址port: 5672 #端口号

二、直连交换机direct exchange

直连型交换机(direct exchange)是根据消息携带的路由键(routing key)将消息投递给对应队列的。

  • 将一个队列 绑定到 某个交换机上,同时赋予该绑定一个路由键(routing key)
  • 当一个携带着路由键为routingKey01的消息被发送给直连交换机时,交换机会把它路由给绑定值同样为routingKey01的队列。

直连交换机经常用来循环分发任务给多个工作者(workers)。当这样做的时候,我们需要明白一点,在AMQP 0-9-1中,消息的负载均衡是发生在消费者(consumer)之间的,而不是队列(queue)之间。

在这里插入图片描述

2.1配置类QueueConfig

@Configuration
public class QueueConfig {/*** 创建一个队列  队列名为direct1* */@Beanpublic Queue queue01(){return new Queue("direct1",true);//true表示持久化}/*** 创建一个直连交换机 名为directExchange* */@Beanpublic DirectExchange directExchange(){return new DirectExchange("directExchange");}/*** 在让队列和直连交换机绑定在一起* */@Beanpublic Binding binding(){Binding binding= BindingBuilder.bind(queue01()).to(directExchange()).with("routingKey01");return binding;}}

2.2消息提供者

@Component
public class MqProducer {@Resourceprivate RabbitTemplate rabbitTemplate;public void sent_test(Object o){//convertAndSend(交换机的名字,交换机中路由键名称,参数)rabbitTemplate.convertAndSend("directExchange",//交换机名字"routingKey01",//路由keyo);}
}

2.2消息消费者


@Component
@Slf4j
public class MqConsumer {/*** 接收消息*/@RabbitListener(queues = {"direct1"})public void receivedD(Message message, Channel channel)throws Exception{String msg=new String(message.getBody());log.info("当前时间:{},消费者1收到消息:{}",new Date().toString(),msg);}}

我写了两个消费者内容一致

2.3测试类

@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class SpringRunnerTest {@Resourceprivate MqProducer mqProducer;//注入消息提供者@Testpublic void test_send() throws InterruptedException {// 循环发送消息while (true) {mqProducer.sent_test("你好,我是Lottery 001");Thread.sleep(3500);}}
}

测试结果
在这里插入图片描述

三、默认交换机default exchange

默认交换机(default exchange)实际上一个由消息代理预先声明好的没有名字(名字为空字符串)的直连交换机(direct exchange)。它有一个特殊的属性使得它对于简单应用特别有用处:那就是每个新建队列(queue)都会自动绑定到默认交换机上,绑定的路由键(routing key)名称队列名称 相同

3.1配置类和消息提供者

/**
*配置类
*/
@Configuration
public class QueueConfig {
//只需要创建一个队列
//每个`新建队列`(queue)都会`自动`绑定到`默认交换机`上,
//绑定的`路由键(routing //key)名称`与`队列名称` 相同@Beanpublic Queue queue02(){return new Queue("def");}}
/**
*消息提供者
*/
@Component
public class MqProducer {@Resourceprivate RabbitTemplate rabbitTemplate;public void def_sent_test(Object obj){//convertAndSend(交换机的名字,交换机中路由键名称,参数)rabbitTemplate.convertAndSend(//没有名字(名字为空字符串)"","def",obj);//消息内容}
}

默认交换机名字是空字符串 。每个新建队列(queue)都会自动绑定到默认交换机上,绑定的路由键(routing key)名称队列名称 相同

3.2消息消费者

@Component
@Slf4j
public class MqConsumer {/*** 接收消息*/@RabbitListener(queues = {"def"})public void receivedD02(Message message, Channel channel)throws Exception{String msg=new String(message.getBody());log.info("当前时间:{},消费者收到消息:{}",new Date().toString(),msg);}}

3.3测试结果

@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class SpringRunnerTest {@Resourceprivate MqProducer mqProducer;//注入消息提供者@Testpublic void test_send02() throws InterruptedException {// 循环发送消息while (true) {mqProducer.def_sent_test("测试默认交换机");Thread.sleep(3500);}}
}

在这里插入图片描述

四、扇型交换机fanout exchange

扇型交换机(fanout exchange)将消息路由给绑定到它身上的所有队列,而不理会绑定的路由键。如果N个队列绑定到某个扇型交换机上,当有消息发送给此扇型交换机时,交换机会将消息的拷贝分别发送给这所有的N个队列。扇型用来交换机处理消息的广播路由(broadcast routing)
这个交换机上的路由键将失效
在这里插入图片描述

4.1配置类

@Configuration
public class QueueConfig {/*** 创建多个队列* @return*/@Beanpublic Queue queue03_1(){return new Queue("fanout03_1");}@Beanpublic Queue queue03_2(){return new Queue("fanout03_2");}@Beanpublic Queue queue03_3(){return new Queue("fanout03_3");}/*** 创建一个扇形交换机*/@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("fanoutExchange");}/*** 队列和扇形交换机绑定*/@Beanpublic Binding binding_3_1(){Binding binding= BindingBuilder.bind(queue03_1()).to(fanoutExchange());return binding;}@Beanpublic Binding binding_3_2(){Binding binding= BindingBuilder.bind(queue03_2()).to(fanoutExchange());return binding;}@Beanpublic Binding binding_3_3(){Binding binding= BindingBuilder.bind(queue03_3()).to(fanoutExchange());return binding;}
}

4.2消息提供者

 @Component
public class MqProducer {@Resourceprivate RabbitTemplate rabbitTemplate;/*** 扇形交换机*/public void fanout_sent_test(Object o){//convertAndSend(交换机的名字,交换机中路由键名称,参数)rabbitTemplate.convertAndSend("fanoutExchange","",//扇形交换机也没有路由建o);}}

注意:扇形交换机也没有路由key 也用空字符串

4.3消息消费者

@Component
@Slf4j
public class MqConsumer {@RabbitListener(queues = {"fanout03_1"})public void receivedD03_1(Message message, Channel channel)throws Exception{String msg=new String(message.getBody());log.info("绑定队列一 当前时间:{},消费者收到消息:{}",new Date().toString(),msg);}@RabbitListener(queues = {"fanout03_2"})public void receivedD03_2(Message message, Channel channel)throws Exception{String msg=new String(message.getBody());log.info("绑定队列二 当前时间:{},消费者收到消息:{}",new Date().toString(),msg);}@RabbitListener(queues = {"fanout03_3"})public void receivedD03_3(Message message, Channel channel)throws Exception{String msg=new String(message.getBody());log.info("绑定队列三 当前时间:{},消费者收到消息:{}",new Date().toString(),msg);}
}

4.4测试类

@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class SpringRunnerTest {@Resourceprivate MqProducer mqProducer;//注入消息提供者@Testpublic void test_send03() throws InterruptedException {int a=1;// 循环发送消息while (true) {mqProducer.fanout_sent_test("测试扇形交换机 第"+ a++ +"次循环");Thread.sleep(3500);}}
}

在这里插入图片描述

五、主题交换机topic exchanges

主题交换机(topic exchanges)通过对消息的路由键队列到交换机的绑定模式之间的匹配,将消息路由 一个或多个队列。主题交换机经常用来实现各种分发/订阅模式及其变种。主题交换机通常用来实现消息的多播路由(multicast routing)。

在这里插入图片描述

5.1配置类

@Configuration
public class QueueConfig {/*** 创建;两个队列*/@Beanpublic Queue topicQueue_1(){return new Queue("topicQueue_1");}@Beanpublic Queue topicQueue_2(){return new Queue("topicQueue_2");}/*** 创建主题交换机*/@Beanpublic TopicExchange TopicExchange(){return new TopicExchange("TopicExchange");}/*** 根据不同的key绑定不同的队列*/@Beanpublic Binding bindingTopicExchange_1(){Binding binding= BindingBuilder.bind(topicQueue_1()).to(TopicExchange()).with("key1");return binding;}@Beanpublic Binding bindingTopicExchange_2(){Binding binding= BindingBuilder.bind(topicQueue_2()).to(TopicExchange()).with("key2");return binding;}
}

5.2消息提供者

@Component
public class MqProducer {@Resourceprivate RabbitTemplate rabbitTemplate;/*** 主题交换机*/public void topic_sent_test(Object o,String key){rabbitTemplate.convertAndSend("TopicExchange",key, //后面动态的传递keyo);}
}

5.3消息消费者

@Component
@Slf4j
public class MqConsumer1 {/*** 接收消息*/@RabbitListener(queues = {"topicQueue_1"})public void topicQueue_1(Message message, Channel channel)throws Exception{String msg=new String(message.getBody());log.info("队列一 当前时间:{},消费者收到消息:{}",new Date().toString(),msg);}@RabbitListener(queues = {"topicQueue_2"})public void topicQueue_2(Message message, Channel channel)throws Exception{String msg=new String(message.getBody());log.info("队列二 当前时间:{},消费者收到消息:{}",new Date().toString(),msg);}}

5.4测试

@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class SpringRunnerTest {@Resourceprivate MqProducer mqProducer;//注入消息提供者@Testpublic void test_send04() throws InterruptedException {// 循环发送消息int a=1;while (true) {if(a%2 == 0){mqProducer.topic_sent_test("!!给队列二的消息==第"+ a++ +"次循环","key2");}else{mqProducer.topic_sent_test("!!给队列一的消息==第"+ a++ +"次循环","key1");}Thread.sleep(3500);}}
}

在这里插入图片描述

使用案例:

  • 分发有关于特定地理位置的数据,例如销售点
  • 由多个工作者(workers)完成的后台任务,每个工作者负责处理某些特定的任务
  • 股票价格更新(以及其他类型的金融数据更新)
  • 涉及到分类或者标签的新闻更新(例如,针对特定的运动项目或者队伍)
  • 云端的不同种类服务的协调
  • 分布式架构/基于系统的软件封装,其中每个构建者仅能处理一个特定的架构或者系统。

六、头交换机 headers exchange

有时消息的路由操作会涉及到多个属性,此时使用消息头就比用路由键更容易表达,头交换机(headers exchange)就是为此而生的。头交换机使用多个消息属性代替 路由键建立路由规则。通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。
在这里插入图片描述

6.1配置类

@Configuration
public class QueueConfig {/*** 创建2个队列*/@Bean(name = "headersQ1")public Queue queue1() {return new Queue("headersQ1");}@Bean(name = "headersQ2")public Queue queue2() {return new Queue("headersQ2");}/*** 创建交换机* @return*/@Beanpublic HeadersExchange headersExchange() {return new HeadersExchange("headersExchange");}/*** 绑定交换机和队列*/@Beanpublic Binding binding1() {HashMap<String, Object> header = new HashMap<>();header.put("queue", "queue1");header.put("bindType", "whereAll");return BindingBuilder.bind(queue1()).to(headersExchange()).whereAll(header).match();}@Beanpublic Binding binding2() {HashMap<String, Object> header = new HashMap<>();header.put("queue", "queue2");header.put("bindType", "whereAny");return BindingBuilder.bind(queue2()).to(headersExchange()).whereAny(header).match();}
}

6.2创建消息提供者

@Component
public class MqProducer {@Resourceprivate RabbitTemplate rabbitTemplate;/*** 头交换机* @param msg*/public void headers_send(String msg,int a) {//a用来控制头信息 达到传递给不同的队列效果MessageProperties messageProperties = new MessageProperties();if(  a % 3 ==0){messageProperties.setHeader("queue", "queue2");messageProperties.setHeader("bindType", "whereAny");}else{messageProperties.setHeader("queue", "queue1");messageProperties.setHeader("bindType", "whereAll");}Message message = new Message(msg.getBytes(), messageProperties);rabbitTemplate.convertAndSend("headersExchange", null, message);}
}

6.3消息消费者

@Component
@Slf4j
public class MqConsumer1 {/*** 接收消息*/@RabbitListener(queues = "headersQ1")public void receive1(String msg) {log.info("接收到 headersQ1 发送的消息:" + msg);}@RabbitListener(queues = "headersQ2")public void receive2(String msg) {log.info("接收到 headersQ2 发送的消息:" + msg);}}

6、4测试结果

@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class SpringRunnerTest {@Resourceprivate MqProducer mqProducer;//注入消息提供者@Testpublic void test_headers_send() throws InterruptedException {// 循环发送消息int a=1;while (true) {mqProducer.headers_send("消息"+a,a++);Thread.sleep(3500);}}
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/3522.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

微信小程序下拉刷新

一、如何设置微信小程序所有页面都可以下拉刷新呢&#xff1f; 1、在app.json的"window"中进行配置 &#xff08;1&#xff09;把"backgroundTextStyle":“light"改为"backgroundTextStyle”:“dark” &#xff08;2&#xff09;添加"enab…

【0】冒泡排序

前言 通过函数模板技术设计一个冒泡排序算法&#xff0c;领悟泛型编程的思想和冒泡排序的思想&#xff0c;然后使用QTest测试各种输入值&#xff0c;养成先写测试代码&#xff0c;后写程序代码的习惯 0x0 编写一个int版本的冒泡函数 1.不管要排序的数组长度多长&#xff0c;外…

5_2-点赞功能-结合(多线程)or(多线程兼异步)定时持久化到数据库-应用redis的scan方法

0、前提引入&#xff1a; 视频讲解&#xff1a; 5.2-点赞功能-结合多线程定时持久化到数据库-应用redis的scan方法_哔哩哔哩_bilibili 项目前身&#xff1a; 5-点赞功能-定时持久化到数据库-pipelinelua-优化ByScan_哔哩哔哩_bilibili https://www.bilibili.com/video/BV1…

CASAIM与大疆达成全自动化测量技术合作,CASAIM IS全自动化蓝光测量仪实现无人机叶片全尺寸检测及质量控制

近期&#xff0c;CASAIM与大疆达成全自动化测量技术合作&#xff0c;CASAIM IS全自动化蓝光测量仪实现无人机叶片全尺寸检测及质量控制。 无人机行业在过去几年里取得了迅猛发展&#xff0c;大疆是全球领先的无人飞行器控制系统及无人机解决方案的研发商和生产商&#xff0c;客…

Spring-AOP(面向切面)

Spring-AOP(面向切面) 场景模拟(计算器) 功能接口 public interface Calculator {int add(int i, int j);int minus(int i, int j);int multiply(int i, int j);int div(int i, int j); }实现类 public class CalculateLogImpl implements Calculator {Overridepublic int …

PALO ALTO NETWORKS 的新一代防火墙如何保护企业安全

轻松采用创新技术、阻止网络攻击得逞并专注更重要的工作 IT 的快速发展已改变网络边界的面貌。数据无处不在&#xff0c;用户可随时随地从各类设备访问这些数据。同时&#xff0c;IT 团队正在采用云、分析和自动化来加速新应用的交付以及推动业务发展。这些根本性的转变带来了…

Go 框架推荐

基础库 castgoframe Kafka kafka-gosaramagoka Redis go-redis 本地缓存 freecachebigcache Nacos & Viper nacos-sdk-govipernacos-viper Golang Web gingo-zeroREST client Go Grpc go-groc Plugin ** go-plugin HBase go-hbase Postgresql pgx-ormpg…

【Linux】- 任务调度和定时任务

任务调度和定时任务 1 crond 任务调度2 at 定时任务 1 crond 任务调度 crontab 进行 定时任务的设置 任务调度&#xff1a;是指系统在某个时间执行的特定的命令或程序。 任务调度分类&#xff1a;1.系统工作&#xff1a;有些重要的工作必须周而复始地执行。如病毒扫描等 个别…

ChatGPT 最佳实践指南之:系统地测试变化

Test changes systematically 系统地测试变化 Improving performance is easier if you can measure it. In some cases a modification to a prompt will achieve better performance on a few isolated examples but lead to worse overall performance on a more representa…

【Docker】Docker基本概念

Docker基本概念 1.Docker概述1.1 Docker是什么&#xff1f;1.2 Docker的宗旨1.3 容器的优点1.4 Docker与虚拟机的区别1.5 容器在内核中支持的两种技术1.6 namespace的六大类型 2.Docker核心概念2.1 镜像2.2 容器2.3 仓库 3. 知识点总结3.1 Docker是什么&#xff1f;3.2 容器和虚…

智能分析网关V2有抓拍告警但无法推送到EasyCVR,是什么原因?

我们在此前的文章中也介绍了关于智能分析网关V2接入EasyCVR平台的操作步骤&#xff0c;感兴趣的用户可以查看这篇文章&#xff1a;在EasyCVR新版本v3.3中&#xff0c;如何正确接入智能分析网关V2&#xff1f; 智能分析网关V2是基于边缘AI计算技术&#xff0c;可对前端摄像头采…

常见Redis使用问题

一 lettuce使用问题 1 问题描述 Redis Cluster集群&#xff0c;当master宕机&#xff0c;主从切换&#xff0c;客户端报错 timed out 2 原因 SpringBoot2.X版本开始Redis默认的连接池都是采用的Lettuce。当节点发生改变后&#xff0c;Letture默认是不会刷新节点拓扑的。 3…

每日一题2023.7.19|ACM模式

文章目录 C的输入方式介绍cin>>cin.get(字符变量名)cin.get(数组名,接收字符数目)cin.get()cin.getline() getline()gets()getchar() AB问题|AB问题||AB问题|||ABⅣAB问题ⅤAB问题Ⅵ C的输入方式介绍 参考博客 cin>> 最基本&#xff0c;最常用的字符或者数字的输…

el-tree 树全部展开或收起

绑定属性expanded&#xff0c;树自带方法this.$refs.tree.store.root.expanded&#xff0c;在mounted方法中给树方法赋值expandAll false&#xff0c;具体代码实现详情如下&#xff1a; html代码&#xff1a; <template><el-treeref"tree":data"sho…

java建造者模式

在Java中实现建造者模式&#xff0c;可以通过创建一个建造者类&#xff08;Builder&#xff09;和一个产品类&#xff08;Product&#xff09;来完成。下面是一个简单的示例&#xff1a; 首先&#xff0c;我们创建一个产品类&#xff08;Product&#xff09;&#xff0c;其中包…

rabbitmq部署(docker方式)

前言&#xff1a;rabbitmq一旦有漏洞&#xff0c;版本升级麻烦&#xff0c;于是改为docker部署 环境&#xff1a;centos7 #停掉之前的服务 systemctl stop rabbitmq-server systemctl disable rabbitmq-server 查了官网&#xff0c;当前3.11.x 最高版本是3.11.19, 虽然3.12…

jupyter定制数学函数

from math import * #导入绘图模块 import numpy as np #导入数值计算模块 import matplotlib.pyplot as plt #导入绘图模块 plt.rcParams[font.sans-serif][SimHei] #绘图中文 plt.rcParams[axes.unicode_minus]False #绘图负号 import mpl_toolkits.axisartist as axisartist…

微信公众平台自定义菜单 /事件推送

用户点击自定义菜单后&#xff0c;微信会把点击事件推送给开发者&#xff0c;请注意&#xff0c;点击菜单弹出子菜单&#xff0c;不会产生上报。请注意&#xff0c;第3个到第8个的所有事件&#xff0c;仅支持微信iPhone5.4.1以上版本&#xff0c;和Android5.4以上版本的微信用户…

react中使用redux,但是通过useEffect监听不到redux中的数据变化?

在React中使用Redux并通过useEffect监听Redux中的数据变化时&#xff0c;需要使用react-redux库中的useSelector钩子来选择需要监听的Redux状态。useSelector函数允许您从Redux存储中选择和获取特定的状态。 以下是一种在React组件中使用Redux&#xff0c;并通过useEffect监听…

安卓通过adb pull和adb push 手机与电脑之间传输文件

1.可以参考这篇文章 https://www.cnblogs.com/hhddcpp/p/4247923.html2.根据上面的文章&#xff0c;我做了如下修改 //设置/system为可读写&#xff1a; adb remount //复制手机中的文件到电脑中。需要在电脑中新建一个文件夹&#xff0c;我新建的文件夹为ce文件夹 adb pull …