SpringCloud微服务 【实用篇】| RabbitMQ快速入门、SpringAMQP

目录

一:初始RabbitMQ

1. 同步和异步通讯

1.1 同步调用

1.2 异步调用

2. MQ常见框架

二:RabbitMQ快速入门

1. RabbitMQ概述和安装

2. 常见消息队列模型

3. 快速入门案例

三:SpringAMQP

1. Basic Queue 简单队列模型

2. Work Queue 工作队列模型

3. 发布订阅模型-Fanout 发布

4. 发布订阅模型-Direct 发布

5. 发布订阅模型-Topic 发布

6. 消息转换器


前些天突然发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,感兴趣的同学可以进行学习人工智能学习

一:初始RabbitMQ

1. 同步和异步通讯

同步通讯和异步通讯理解

生活中就有很多同步和异步的案例,例1:假如你现在与一个妹子聊天,采用同步通信更像是打视频电话,就像直播一样,所得到的信息都能立刻同步过去,具有一定的优势;而异步通信更像是微信聊天,别人不想理你也不知道,时效性不是那么好,但也有自己的优点。例2:假如你现在在和三个妹子聊天,同步通信只能一个妹子聊,就会错失很多良机;异步通信可以多个妹子一块聊,还不会被发现;所以那么牛的技术我们当然要好好学习!

1.1 同步调用

案例:前面学习的微服务间基于Feign的调用就属于同步方式,就存在以下问题:

耦合度高:每次加入新的需求,都要修改原来的代码

对于一个订单业务,我们支付成功后就需要更改订单服务修改订单状态,然后进行发货;支付服务调用订单服务还是存储服务都需要等待对方的响应,是实时的调用。此时一个完整的系统开发好了,如果产品经理需要增加一个短信通知服务等功能,此时就需要在支付服务里增加代码;每次增加一个业务,代码就需要更改,具有很强的耦合性!

性能下降(吞吐量):调用者需要等待服务提供者响应,如果调用链过长则响应时间等于每次调用的时间之和

假如现在调用支付服务需要50ms,支付服务调用其它服务都需要150ms,支付服务调用每个服务都是同步调用,所以只能进行等待当前调用完成才可以调用其它的服务;所以一个完整的服务调用下来就需要500ms,这相当于1s中只能处理请求。数以十万百万的请求过来根本顶不住,性能下降、吞吐量也下降了!

资源浪费:调用链中的每个服务在等待响应过程中,不能释放请求占用的资源,高并发场景下会极度浪费系统资源

在支付服务等待订单服务的过程,CPU和内存都在占用着啥都不干,只有某个服务调用完成才会执行下一个,在等待的过程中浪费大量资源,资源利用的不够充分!

级联失败:如果服务提供者出现问题,所有调用方都会跟着出问题,如同多米诺骨牌一样,迅速导致整个微服务群故障

假如现在存储服务挂了,此时支付服务进行访问,就会一直进入阻塞状态,这个请求就不会被释放,后面阻塞的越来越多,等待资源耗尽,支付服务就进不去了,相当于支付服务也挂了;所以造成整个服务就瘫痪了!

总结同步调用:

优点:时效性强,可以立即得到结果。

缺点:耦合度高、性能和吞吐能力下降、有额外的资源消耗、有级联失联问题。

1.2 异步调用

异步调用常见实现就是事件驱动模式

 在支付服务与其它服务之间引入一个Broker(事件代理者)。一旦有人支付成功就是一个事件,这个事件交给Broker来管理;而订单、仓储等服务就会找Broker这个老大哥,一旦有人支付成功你要通知我们(订阅事件);所以一旦有人支付成功,Broker就会发布支付成功事件(这里通知完就会返回给用户,不会等待其它服务响应完)去通知其它服务有人支付成功了,此时其它服务就会去修改订单状态!

优势一:服务解耦

原来增加业务需要更改业务的代码,现在就不用了;因为现在支付服务不负责调用,只负责发送一个事件到Broker,至于是谁接收?什么时间接收?有没有完成?完全不用管。所以一旦有新的业务只需要订阅新的Broker事件即可(到时候直接大喇嘛一喊,就能通知到你)!注:这样将来增加或删除业务就不需要更改代码,只需要订阅或取消订阅事件即可

优势二:性能提升,吞吐量提升

以前的耗时是总耗时加在一起50+150*3=500ms,现在只要支付成功,支付服务就向Broker发布事件,立刻就能返还给用户支付成功50+10=60ms。而Broker通知其它服务,什么时候去完成?多久去完成?完全不用管。

优势三:服务没有强依赖,不担心级联失败问题,没有资源浪费

支付服务相当于借用Broker去通知而不是调用,此时仓储服务挂了,也和我没关系,只需要重启仓储服务即可。既然没有强的依赖关系,我不调用你,也不需要等待你,所以就没有了资源浪费。

优势四:流量削峰

假设现在有多个用户发出请求,此时Broker就起到一个缓冲的作用,把请求都放到让订单服务、仓储等服务按照自己的能力去处理业务,处理完再去Broker取,现在此时的压力是Broker扛着。

总结异步调用:

优点:耦合度低,性能和吞吐量提升,故障隔离,没有资源消耗,没有级联失联问题,流量消峰。

缺点:依赖于Broker的可靠性、安全性、吞吐能力,架构复杂了,业务没有明显的流程线,不好追踪管理。

2. MQ常见框架

MQ (MessageQueue),中文是消息队列,字面来看就是存放消息的队列。也就是事件驱动架构中的Brokeri

MQ常见的四种实现:RabbitMQ、ActiveMQ、RocketMQ、Kafka 

RabbitMQActiveMQRocketMQKafka
公司/社区RabbitApache阿里Apache
开发语言ErlangJavaJavaScala&Java
协议支持AMQP,XMPP,SMTP,STOMPOpenWire,STOMP,REST,XMPP,AMQP自定义协议自定义协议
可用性一般
单机吞吐量一般非常高
消息延迟微秒级毫秒级毫秒级毫秒以内
消息可靠性一般一般

追求可用性:Kafka、 RocketMQ 、RabbitMQ

追求可靠性:RabbitMQ、RocketMQ

追求吞吐能力:RocketMQ、Kafka

追求消息低延迟:RabbitMQ、Kafka

二:RabbitMQ快速入门

1. RabbitMQ概述和安装

RabbitMQ概述

RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网地址:RabbitMQ: easy to use, flexible messaging and streaming — RabbitMQ

RabbitMQ安装

单机部署:基于Centos7虚拟机中使用Docker来安装!

第一步:下载镜像

①在线拉取

docker pull rabbitmq:3-management

②从本地加载,使用本地已经安装的镜像包 

上传到虚拟机目录后(例如tmp目录),使用命令加载镜像即可:

docker load -i mq.tar

第二步:安装MQ

执行下面的命令来运行MQ容器:

docker run \-e RABBITMQ_DEFAULT_USER=itcast \ # -e设置环境变量:用户名和密码-e RABBITMQ_DEFAULT_PASS=123321 \--name mq \--hostname mq1 \ # --hostname配置主机名,集群部署需要配置这个-p 15672:15672 \ # 管理平台的端口-p 5672:5672 \ # 消息通信的端口-d \rabbitmq:3-management

第三步:查看状态

docker ps

成功启动

第四步:登录管理品台页面

注:如果出现第二天登录不上的情况,请重启docker,service docker restart

192.168.#.#:15672 # 前面是虚拟机IP,后面是端口

输入设置的账户密码

需要注意的是:每个用户都需要有自己独享的虚拟主机

RabbitMQ的结构和概念

Publisher是消息的发送者,Consumer是消息的消费者。发送者将来会把消息发送到exchange(交换机),交换机会把消息路由到queue(队列),队列负责暂存消息;而后消费者从队列中获取消息,然后处理消息!

注:每创建一个用户都对应一个VirtualHost(虚拟主机),各个虚拟主机之间是相互隔离的,看不到,这样可以避免干扰。

总结RabbitMQ中的几个概念:

①channel:操作MQ的工具; 

②exchange:路由消息到队列中 ;

③queue:缓存消息 ;

④virtual host:虚拟主机,是对queue、exchange等资源的逻辑分组、隔离;

2. 常见消息队列模型

MQ的官方文档中给出了7个MQ的Demo示例,其中与消息发送和接收有关系的就是前5个:

①其中前2个命名为基本消息队列(BasicQueue)工作消息队列(WorkQueue),这两种有一个共同的特征:消息的发送和接收都是基于队列来完成的(没有通过交换机),其中P代表发送者、C代表消费者、中间的红色部分代表消息队列。

②后3个都属于发布订阅(Publish、Subscribe),只是交换机类型不同分为三种:Fanout Exchange(广播)Direct Exchange(路由)Topic Exchange(主题),其中紫色的部分就代表交换机。

3. 快速入门案例

HelloWorld案例---》基本消息队列入门

注:mq-demo是父工程用来做依赖管理,consumer和publisher是两个子工程

官方的HelloWorld是基于最基础的消息队列模型来实现的,只包括三个角色:

publisher:消息发布者,将消息发送到队列;

queue:消息队列,负责接受并缓存消息;

consumer:订阅队列,处理队列中的消息;

注:其中queue是由MQ进行管理的,所以我们只需要写publisher和consumer这两部分代码

mq-demo父工程

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.itcast.demo</groupId><artifactId>mq-demo</artifactId><version>1.0-SNAPSHOT</version><modules><module>publisher</module><module>consumer</module></modules><packaging>pom</packaging><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.9.RELEASE</version><relativePath/></parent><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--单元测试--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency></dependencies>
</project>

publisher消息的发送者

其中启动类和yml文件是SpringBoot工程必备的,没什么好说的,最主要的是测试类

①首先要先创建连接,需要连接工厂ConnectioFactory

②根据连接工厂,去设置连接的信息:连接的地址、端口号、虚拟主机、用户名、密码

③前面连接工厂和参数都准备好了,然后就是调用连接工厂ConnectionFactory的newConnection方法,正式建立连接connection

④正式建立连接后,就需要调用connection的createChannel建立通道channnel,这样生产者和消费者才能完成消息的发送和接收;

⑤通道有了就可以基于通道向队列queue中发送消息了,首先是声明了队列的名称,然后调用通道的queueDeclare方法向队列中发送消息;

⑥有了队列,生产者就可以向队列中发送消息了,把准备的消息发送到队列当中,以字节的形式发送出去。

⑦最后在关闭通道和连接。

注:无论是声明队列还是向队列中发送消息实际上使用的都是通道channel

package cn.itcast.mq.helloworld;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class PublisherTest {@Testpublic void testSendMessage() throws IOException, TimeoutException {// 1.建立连接ConnectionFactory factory = new ConnectionFactory();// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码factory.setHost("192.168.150.101");factory.setPort(5672); // 5672是通信的端口,15672是管理的接口factory.setVirtualHost("/");factory.setUsername("itcast");factory.setPassword("123321");// 1.2.建立连接Connection connection = factory.newConnection();// 2.创建通道ChannelChannel channel = connection.createChannel();// 3.创建队列String queueName = "simple.queue";channel.queueDeclare(queueName, false, false, false, null);// 4.发送消息String message = "hello, rabbitmq!";channel.basicPublish("", queueName, null, message.getBytes());System.out.println("发送消息成功:【" + message + "】");// 5.关闭通道和连接channel.close();connection.close();}
}

1. 正式建立连接connection,管理界面就会有连接的信息 

2. 连接正式建立后,就会创建通道channel供消息的发送和接收使用

3. 根据通道向队列queue发送消息

4. 消息发送到队列后,就关闭通道和连接(发完就不管了,解除了耦合)

①控制台

② 管理的页面queue,都表示消息已经成功发出去

Consumer消息的接收者

①消费者就需要从队列中接收消息,所以也会有创建连接工厂、准备参数、创建通道等操作,这些代码不变;

②值得注意的是在这里我们又创建了队列,这是为什么呢?这是因为我们生产者和消费者的启动顺序是不同的,万一消费者先启动找队列找不到怎办?为了避免这种情况的发生都声明了对列。并且如果这个对列已经创建过了不会再次创建;

③下面实际上就相当于回调函数,调用basicConsume方法,表示消费一条消息,那么去干什么呢,什么行为?这里就采用了匿名内部类对象DefaultConsumer(默认的消费者),重写了handleDelivery方法(处理投递的消息),把处理的行为挂载到队列queueName当中;一旦消息队列中有了消息,这个回调函数就会执行。

package cn.itcast.mq.helloworld;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ConsumerTest {public static void main(String[] args) throws IOException, TimeoutException {// 1.建立连接ConnectionFactory factory = new ConnectionFactory();// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码factory.setHost("192.168.2.129");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("itcast");factory.setPassword("123321");// 1.2.建立连接Connection connection = factory.newConnection();// 2.创建通道ChannelChannel channel = connection.createChannel();// 3.创建队列String queueName = "simple.queue";channel.queueDeclare(queueName, false, false, false, null);// 4.订阅消息channel.basicConsume(queueName, true, new DefaultConsumer(channel){@Override public void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {// 5.处理消息String message = new String(body); // 发的时候是字节,接的时候也必须是字节,这里在转换为字符串System.out.println("接收到消息:【" + message + "】");}});System.out.println("等待接收消息。。。。");}
}

此时的执行结果

此时先打印的是 ”等待接收消息。。。。” ,实际上这就是回调机制,前面的代码只是让回调函数和队列进行绑定,此时的消息还没过来,代码会继续执行,一直到MQ把消息投递过来才会打印。这也再次证明了是异步的!

一旦消息被消费,队列中的就会被删除!

三:SpringAMQP

前面我们使用官方的API实现了简单的MQ程序,但是发现程序非常的麻烦;接下来就学习一下SpringAMQP,大大简化了消息的发送和接收。

什么是SpringAMQP

SpringAmqp的官方地址:Spring AMQP,是应用间消息通信的一种协议,与语言平台无关。

AMQP:在学习SpringAMQP之前需要先了解一下AMQP,Advanced Message Queuing Protocol(高级消息队列协议),是用于在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。

Spring AMQP:是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象spring-rabbit是底层的默认实现

①用于异步处理入站消息的监听器容器;

②用于发送和接收消息的 RabbitTemplate

RabbitAdmin 实现自动化的声明队列、交换和绑定,自动创建队列;

接下来就是用SpringAMQP实现消息队列的五种类型!

1. Basic Queue 简单队列模型

案例:利用SpringAMQP实现HelloWorld中的基础消息队列功能

第一步:在父工程中引入spring-amqp的依赖

<!--AMQP依赖,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--SpringBoot的单元测试依赖-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId>
</dependency>

第二步:在publisher服务中利用RabbitTemplate发送消息到simple.queue这个队列

①在publisher服务中编写application.yml,添加mq连接信息

以配置的方式制定建立连接的一些信息。

spring:rabbitmq:host: 192.168.2.129  # IP地址port: 5672  # 端口virtual-host: /  # 虚拟主机username: itcast # 用户名password: 123321 # 密码

②在publisher服务中新建一个测试类,编写测试方法:

直接使用RabbitTemplate工具类发送信息即可。

注:springamqd不会帮你创建队列,只能存在已有的队列中,所以要自己提前在浏览器的控制页面上创建这个对列!

package cn.itcast.mq.spring;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {// 注入RabbitTemplate@Autowiredprivate RabbitTemplate rabbitTemplate;// 调用工具类的方法@Testpublic void testSendMessageSimpleQueue(){// 第一个参数队列的名称String queueName = "simple2.queue";// 第二个参数消息String message = "hello SpringAMQP!";rabbitTemplate.convertAndSend(queueName,message);}
}

成功发送

第三步:在consumer服务中编写消费逻辑,绑定simple.queue这个队列,进行监听

①在consumer服务中编写application.yml,添加mq连接信息:

spring:rabbitmq:host: 192.168.2.129port: 5672virtual-host: /username: itcastpassword: 123321

②在consumer服务中新建一个类,添加@Component注解,类中声明方法添加@RabbitListener注解,编写消费逻辑:

package cn.itcast.mq.listener;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component // 纳入Spring管理
public class SpringRabbitListener {// 声明监听那个队列@RabbitListener(queues = "simple2.queue")// 行为,封装成方法public void listenSimpleQueueMessage(String msg){ // Spring会把消息传递过来给msg参数System.out.println("消费者接收到的消息是:"+msg);}
}

运行主函数,启动上面的Bean

2. Work Queue 工作队列模型

前面已经学习了简单队列的发送和接收,一旦有人拿到消息,就会从队列中删除,其它消费者根本拿不到。那如果有多个消息怎么办呢?就可以基于上述的特性让多个消费者合作处理。接下来就学习一下Work queue(工作队列)可以提高消息处理速度,避免队列消息堆积

案例:模拟WorkQueue,实现一个队列绑定多个消费者

第一步:在publisher服务中定义测试方法,每秒产生50条消息,发送到simple.queue

package cn.itcast.mq.spring;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {// 注入RabbitTemplate@Autowiredprivate RabbitTemplate rabbitTemplate;// 发送消息@Testpublic void testSendMessageWorkQueue(){String queueName = "simple2.queue";String message = "hello--->";// 利用for循环发送50条消息for (int i = 1; i <= 50; i++) {rabbitTemplate.convertAndSend(queueName,message+i);// 休眠20毫秒try {Thread.sleep(20);} catch (InterruptedException e) {throw new RuntimeException(e);}}}
}

第二步:在consumer服务中定义两个消息监听者,都监听simple.queue队列

注:消费者1每秒处理50条消息,消费者2每秒处理10条消息。

package cn.itcast.mq.listener;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.time.LocalDateTime;@Component // 纳入Spring管理
public class SpringRabbitListener {// 消费者1@RabbitListener(queues = "simple2.queue")// 行为,封装成方法public void listenWorkQueue1Message(String msg){System.out.println("消费者1接收到的消息是:"+msg+ LocalDateTime.now());// 每秒处理50条消息try {Thread.sleep(20);} catch (InterruptedException e) {throw new RuntimeException(e);}}// 消费者2@RabbitListener(queues = "simple2.queue")// 行为,封装成方法public void listenWorkQueue2Message(String msg){System.out.println("消费者2接收到的消息是---》"+msg+LocalDateTime.now());// 每秒处理10条消息try {Thread.sleep(100);} catch (InterruptedException e) {throw new RuntimeException(e);}}
}

执行结果:

理论上1秒处理完,实际上却是2秒才处理完,并没有做到能者多劳,消费者1实际上在1秒内很快就处理完消息,而消费者2因为能力不够却需要2秒。实际上这是因为MQ的预取机制,才开始就优先从队列中拿过来,并没有考虑到消费能力如何!

第三步:消费预取限制

修改application.yml文件,设置preFetch这个值,可以控制预取消息的上限!

spring:rabbitmq:host: 192.168.150.101 # 主机名port: 5672 # 端口virtual-host: / # 虚拟主机 username: itcast # 用户名password: 123321 # 密码listener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

执行结果:能者多劳,可以在1秒内完成

3. 发布订阅模型-Fanout 发布

发布订阅模式

发布订阅模式与之前案例的区别就是允许将同一消息发送给多个消费者,实现方式是加入了exchange(交换机)。到底是发给谁?这是由交换机的类型决定的:

①Fanout:广播;

②Direct:路由; 

③Topic:话题;

注意:exchange负责消息路由,而不是存储,路由失败则消息丢失;消息的存储是由队列完成的

发布订阅-Fanout Exchange

Fanout Exchange 会将接收到的消息广播到每一个跟其绑定的queue!

案例:利用SpringAMQP演示FanoutExchange的使用

第一步:在consumer服务中,声明队列(Queue)、交换机(Exchange),并将两者绑定(Binding)

①SpringAMQP提供了声明交换机、队列、绑定关系的API,例如:

②在consumer服务创建一个类,添加@Configuration注解,并声明FanoutExchange、Queue和绑定关系对象Binding,代码如下:

package cn.itcast.mq.config;import com.rabbitmq.client.impl.AMQImpl;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class FanoutConfig {// 声明交换机fanout.exchange@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("fanout.exchange");}// 声明队列1 fanout.queue1@Beanpublic Queue queue1(){return new Queue("fanout.queue1");}@Bean// 声明队列2 fanout.queue2public Queue queue2(){return new Queue("fanout.queue2");}// 进行绑定@Beanpublic Binding bindingQueue1(FanoutExchange fanoutExchange,Queue queue1){return BindingBuilder.bind(queue1).to(fanoutExchange);}@Beanpublic Binding bindingQueue2(FanoutExchange fanoutExchange,Queue queue2){return BindingBuilder.bind(queue2).to(fanoutExchange);}}

成功声明交换机

成功声明队列

绑定成功

第二步:在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2

package cn.itcast.mq.listener;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.time.LocalDateTime;
import java.time.LocalTime;@Component // 纳入Spring管理
public class SpringRabbitListener {// 声明监听那个队列@RabbitListener(queues = "fanout.queue1")// 行为,封装成方法public void listenFanoutQueueMessage1(String msg){ // Spring会把消息传递过来给msg参数System.out.println("消费者1接收到的消息是:"+msg);}@RabbitListener(queues = "fanout.queue2")// 行为,封装成方法public void listenFanoutQueueMessage2(String msg){ // Spring会把消息传递过来给msg参数System.out.println("消费者2接收到的消息是:"+msg);}}

第三步:在publisher中编写测试方法,向交换机itcast.fanout发送消息

注:以前是发送到queue,现在是发送到exchange,注意区别!

package cn.itcast.mq.spring;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {// 注入RabbitTemplate@Autowiredprivate RabbitTemplate rabbitTemplate;// 向交换机发送消息@Testpublic void testSendFanoutExchange(){// 交换机String exchangeName = "fanout.exchange";// 信息String message = "Hello eyeryone";// 发送消息rabbitTemplate.convertAndSend(exchangeName,"",message); // 中间参数routingKey后面会讲,这里先设置为空}
}

执行结果:

4. 发布订阅模型-Direct 发布

发布订阅-DirectExchange

Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为路由模式。

①每一个Queue都与Exchange设置一个BindingKey;相当于暗号密码!

②发布者发送消息到Exchange时,也要指定一个消息的RoutingKey;与上面的BindingKey对上就发给谁!

③Exchange将消息路由到BindingKey与消息RoutingKey一致的队列;并且一个队列能绑定多个key;如果两个队列的BindingKey都能与RountingKey对上就都会发送(就相当于广播)!

声明单个key

声明多个key 

案例:利用SpringAMQP演示DirectExchange的使用

第一步:在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2

注:前面使用Bean方式声明一个个类,发现太麻烦了,所以这里就学习一下使用利用@RabbitListener注解声明Exchange、Queue、RoutingKey。

package cn.itcast.mq.listener;import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.time.LocalDateTime;
import java.time.LocalTime;@Component // 纳入Spring管理
public class SpringRabbitListener {// DirectExchange,使用注解的形式绑定@RabbitListener(bindings = @QueueBinding(value = @Queue("direct.queue1"),exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),key = {"red","blue"}))public void LitenDirectQueue1(String msg){System.out.println("消费者接收到direct.queue1的消息:["+msg+"]");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),key = {"red","yellow"}))public void ListenDirectQueue2(String msg){System.out.println("消费者接收到direct.queue2的消息:["+msg+"]");}}

声明后启动!查看控制页面,成功绑定

第二步:在publisher中编写测试方法,向itcast. direct发送消息

package cn.itcast.mq.spring;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {// 注入RabbitTemplate@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendDirectExchange(){String exchangeName = "itcast.direct";String message = "hello blue";rabbitTemplate.convertAndSend(exchangeName,"blue",message);}
}

此时RoutingKey为blue,只有direct.quque1能接收到:

如果此时RoutingKey为red

@Test
public void testSendDirectExchange(){String exchangeName = "itcast.direct";String message = "hello red";rabbitTemplate.convertAndSend(exchangeName,"red",message);
}

则direct.quque1和direct.queue2都能接收到:

总结:所以相对于Fanout Exchange,Direct Exchange更加的灵活,可以通过key这个标记把消息传递给某一个或者所有,Fanout Exchange可以看做是Direct Exchange的一种特殊存在。

5. 发布订阅模型-Topic 发布

发布订阅-TopicExchange

TopicExchange与DirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以点 “ .” 分割。 例如:china.news 代表中国的新闻消息; japan.news 则代表日本新闻。

Queue与Exchange指定BindingKey时可以使用通配符:

①#:代指0个或多个单词;

②*:代指一个单词;

案例:利用SpringAMQP演示TopicExchange的使用

第一步:利用@RabbitListener声明Exchange、Queue、RoutingKey 在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2

    // topic Exchange@RabbitListener(bindings = @QueueBinding(value = @Queue("topic.queue1"),exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC),key = "china.#"))public void ListenTopicQueue1(String msg){System.out.println("消费者接收到topic.queue1的消息:["+msg+"]");}@RabbitListener(bindings = @QueueBinding(value = @Queue("topic.queue2"),exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC),key = "#.news"))public void ListenTopicQueue2(String msg){System.out.println("消费者接收到topic.queue2的消息:["+msg+"]");}

成功声明与绑定 

第二步:在publisher中编写测试方法,向itcast. topic发送消息

    @Testpublic void testSendTopicExchange(){String exchangeName = "itcast.topic";String message = "It's a nice day ";rabbitTemplate.convertAndSend(exchangeName,"china.weath",message);}

此时是topic.queue1接收到消息

总结:Topic Exchange和Direct Exchange的本质相同,Topic Exchange可以指定通配符的方式来表达BindingKey,相对于Direct Exchange灵活度又变高了。

6. 消息转换器

说明:在SpringAMQP的发送方法中,接收消息的类型实际上是Object,也就是说我们可以发送任意对象类型的消息,SpringAMQP会帮我们序列化为字节后发送

案例:测试发送Object类型消息

在consumer中利用@Bean声明一个队列:

package cn.itcast.mq.config;import com.rabbitmq.client.impl.AMQImpl;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class FanoutConfig {// 声明一个队列@Beanpublic Queue objectQueue(){return new Queue("object.queue");}
}

发送一个Map集合到object.queue队列

package cn.itcast.mq.spring;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;import java.util.HashMap;
import java.util.Map;@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {// 注入RabbitTemplate@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendObjectQueue(){// 准备一个Map集合Map<String,Object> msg = new HashMap<>();msg.put("name", "Jack");msg.put("age", 21);// 发送rabbitTemplate.convertAndSend("object.queue",msg);}
}

执行结果:最终的结果是通过JDK序列化转换成字节发送的

注:JDK序列化性能比较差、安全性比较差容易出现注入的情况、数据长度太长了占用额外的内存空间。

消息转换器

①Spring的对消息对象的处理是由org.springframework.amqp.support.converter.

MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化!

②如果要修改只需要定义一个MessageConverter 类型的Bean即可,推荐用JSON方式序列化!步骤如下:

第一步:在父工程中引入jackson依赖

<!--jackson依赖-->
<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId>
</dependency>

第二步:在publisher启动类声明MessageConverter,覆盖掉原来的配置

package cn.itcast.mq;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;@SpringBootApplication
public class PublisherApplication {public static void main(String[] args) {SpringApplication.run(PublisherApplication.class);}// 覆盖原理的序列化方式@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}
}

执行结果:成功转换成Json格式

第三步:在consumer服务中MessageConverter并监听object.queue队列并消费消息

启动类声明MessageConverter,覆盖掉原来的配置

package cn.itcast.mq;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;@SpringBootApplication
public class ConsumerApplication {public static void main(String[] args) {SpringApplication.run(ConsumerApplication.class, args);}// 覆盖原理的序列化方式@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}
}

消费object.queue队列的消息

    @RabbitListener(queues = "object.queue")public void ListenOjectQueue(Map<String,Object> msg){System.out.println("消费者接收到object.queue的消息:["+msg+"]");}

成功消费:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/612137.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ORACLE RAC DG文件路径错乱解决办法

最近接手了一个客户的RAC-RAC dg环境的维护,登录上去之后发现dg延迟了8天,由于主库的空间非常紧张,归档日志早就删除了,所以准备使用rman基于scn点的备份恢复的方案恢复dg同步 在备份完成之后,使用新的控制文件进行数据恢复的时候报错datafile 43 not found: 检查了一下发现当…

SpringBoot中使用单例模式+ScheduledExecutorService实现异步多线程任务(若依源码学习)

场景 若依前后端分离版手把手教你本地搭建环境并运行项目&#xff1a; 若依前后端分离版手把手教你本地搭建环境并运行项目_本地运行若依前后端分离-CSDN博客 设计模式-单例模式-饿汉式单例模式、懒汉式单例模式、静态内部类在Java中的使用示例&#xff1a; 设计模式-单例模…

Python requirements.txt 详解

文章目录 1 概述1.1 作用1.2 注意 2 操作2.1 生成 requirements.txt2.2 安装 requirements.txt 3 示例3.1 新建 Django 项目3.2 找到 Scripts 目录&#xff0c;执行生成 requirements.txt 命令 1 概述 1.1 作用 作用&#xff1a;记录 当前项目下 所有 依赖包及其版本号&#…

不知道题目是啥

本题是学校的集训里的题&#xff0c;所有不知道题目名字是啥&#xff0c;直接看题目就好 解题思路&#xff1a;因为字符串只含有小写字母&#xff0c;所以可以创建两个数组分别来存s和t的每个字母出现次数&#xff0c;然后遍历数组&#xff0c;如果s字符串中的某个字母比t的小&…

输电线路分布式故障诊断装置的四大特点介绍-深圳鼎信

输电线路分布式故障诊断装置是一种利用行波测距、无线通信等技术手段实现电网故障定位的设备。这对于电网的故障处理和恢复具有重要意义&#xff0c;可以帮助运维人员提高故障处理的效率&#xff0c;缩短故障处理时间&#xff0c;减少停电时间&#xff0c;提高用户的供电可靠性…

premiere简约大气3D动画logo片头Pr模板Mogrt免费下载

Premiere简约大气3D动画logo片头pr模板mogrt下载&#xff0c;无需插件&#xff0c;高清分辨率&#xff0c;易于自定义&#xff0c;包括教程&#xff0c;不包括音频和图像。免费下载&#xff1a;https://prmuban.com/37065.html

Linux学习(1):目录结构、编辑器和用户管理

Linux学习&#xff08;1&#xff09;&#xff1a;目录结构、编辑器和用户管理 1 Linux目录结构2 vi和vim编辑器2.1 快捷键练习 3 用户管理3.1 添加用户3.2 删除用户即主目录3.3 切换用户 4 用户组 1 Linux目录结构 在linux世界里&#xff0c;一切皆为文件。 linux目录结构&a…

test fuzz-05-模糊测试 kelinci AFL-based fuzzing for Java

拓展阅读 开源 Auto generate mock data for java test.(便于 Java 测试自动生成对象信息) 开源 Junit performance rely on junit5 and jdk8.(java 性能测试框架。性能测试。压测。测试报告生成。) test fuzz-01-模糊测试&#xff08;Fuzz Testing&#xff09; test fuzz-…

Gin CORS 跨域请求资源共享与中间件

Gin CORS 跨域请求资源共享与中间件 文章目录 Gin CORS 跨域请求资源共享与中间件一、同源策略1.1 什么是浏览器的同源策略&#xff1f;1.2 同源策略判依据1.3 跨域问题三种解决方案 二、CORS:跨域资源共享简介(后端技术)三 CORS基本流程1.CORS请求分类2.基本流程 四、CORS两种…

Java项目:02 基于ssm超市订单管理系统

项目介绍 基于ssm超市订单管理系统 环境&#xff1a;jdk1.8&#xff0c;mysql5.7&#xff0c;tomcat8.5&#xff0c;maven3.6 软件&#xff1a;IDEA 功能&#xff1a;超市后台管理系统&#xff0c;有订单管理&#xff0c;供应商管理&#xff0c;用户管理&#xff0c;密码修改&…

阿赵UE学习笔记——9、材质和材质实例

阿赵UE学习笔记目录 大家好&#xff0c;我是阿赵。   继续学习虚幻引擎&#xff0c;这次来了解一下UE里面关于材质的一些概念性的东西。 一、材质 材质这个概念&#xff0c;在所有三维软件里面都会有&#xff0c;比如3Dsmax里面的材质球&#xff0c;或者Unity里面的Material…

解决docker run报错:Error response from daemon: No command specified.

将docker镜像export/import之后&#xff0c;对新的镜像执行docker run时报错&#xff1a; docker: Error response from daemon: No command specified. 解决方法&#xff1a; 方案1&#xff1a; 查看容器的command&#xff1a; docker ps --no-trunc 在docker run命令上增加…

【Python】AttributeError: module ‘torch.nn‘ has no attribute ‘HardSigmoid‘

AttributeError: module ‘torch.nn’ has no attribute ‘HardSigmoid’ 这个错误是因为PyTorch的torch.nn模块中并没有HardSigmoid这个函数。是拼写的大小写问题&#xff0c;换成nn.Hardsigmoid()即可。 如下述代码出错。 import torch import torch.nn as nn hard_sigmoid…

自动化的力量可实现更好的供应商风险管理

长期以来&#xff0c;公司一直依赖制造商、服务提供商、供应商或顾问等丰富的外部各方网络来促进整体运营并从外部专业知识或产品中获益。虽然这些合作伙伴关系通常是互惠互利的&#xff0c;但公司也需要意识到第三方甚至第四方供应商带来的潜在风险&#xff0c;并考虑整个供应…

VSCode使用MinGW编译器,配置C/C++环境

目录 一、安装VSCode 二、安装MinGW编译器 1、配置环境变量 2、测试配置是否成功 三、配置VSCode 1、安装所需扩展 2、新建代码存放文件夹 3、添加配置文件 4、配置文件内容 &#xff08;1&#xff09;c_cpp_properties.json &#xff08;2&#xff09;launch.json …

基于Java SSM框架实现线上教学平台系统项目【项目源码+论文说明】计算机毕业设计

基于java的SSM框架实现线上教学平台演示 摘要 在社会快速发展的影响下&#xff0c;使线上教学平台的管理和运营比过去十年更加理性化。依照这一现实为基础&#xff0c;设计一个快捷而又方便的网上线上教学平台系统是一项十分重要并且有价值的事情。对于传统的线上教学平台控制…

走进shell

Linux系统启动时&#xff0c;会自动创建多个虚拟控制台。虚拟控制台是运行在Linux系统内存中的终端会话。 打开Linux控制台Terminal使用tty命令查看当前使用的虚拟控制台。 注&#xff1a;tty 表示电传打字机(teletypewriter) $ tty /dev/pts/0表示当前使用的是/dev/pts/0 虚拟…

(1)(1.13) SiK无线电高级配置(五)

文章目录 前言 10 可用频率范围 11 DUTY_CYCLE 设置 12 低延迟模式 13 先听后说 (LBT) 14 升级无线电固件 15 MAVLink协议说明 前言 本文提供 SiK 遥测无线电(SiK Telemetry Radio)的高级配置信息。它面向"高级用户"和希望更好地了解无线电如何运行的用户。 1…

C#基础:通过QQ邮件发送验证码到指定邮箱

一、控制台程序 using System; using System.Net; using System.Net.Mail;public class EmailSender {public void SendEmail(string toAddress, string subject, string body){// 设置发件人邮箱地址以及授权码string fromAddress "xxxxxqq.com";string password …

频率阈图像滤波

介绍 频率阈图像滤波是一种在频域中进行图像处理的方法&#xff0c;它基于图像的频率分布来实现滤波效果。具体步骤如下&#xff1a; 将原始图像转换到频域&#xff1a;使用快速傅里叶变换&#xff08;FFT&#xff09;将图像从空间域转换到频域。对频域图像应用频率阈滤波器&a…