什么是MQ?
MQ 是消息队列(Message Queue)的缩写,它是一种应用程序间异步通信的技术。消息队列允许应用程序或服务间通过发送消息来交换数据,而不是直接调用对方,从而实现解耦、异步处理和负载均衡等目的。
简单来说,消息队列就像是一个邮局,应用程序就像是寄信人和收信人。一个应用程序(寄信人)发送消息(信件)到消息队列(邮局),另一个应用程序(收信人)从队列中取出消息进行处理。这个过程可以是同步的,也可以是完全异步的,意味着发送者和接收者不必同时在线,他们通过消息队列中转消息。
消息队列的主要优势包括:
- 解耦:发送者和接收者只依赖于消息队列,而不是直接依赖于对方,降低系统间的耦合度。
- 异步处理:发送者可以快速发送消息并继续处理其他任务,无需等待接收者的响应。
- 负载均衡:通过调整消费者的数量来处理不同的负载。
- 缓冲:在高负载情况下,消息队列可以作为缓冲,防止系统因请求过多而崩溃。
- 可扩展性:系统组件可以独立地扩展,提高系统整体的扩展性和弹性。
消息队列广泛应用于微服务架构、分布式系统、大数据处理流程等领域,常见的消息队列实现包括 Kafka、RabbitMQ、ActiveMQ 等。
RabbitMQ
一、安装
以下是基于docker的安装步骤
使用下面的命令即可:
docker run \-e RABBITMQ_DEFAULT_USER=admin \-e RABBITMQ_DEFAULT_PASS=123456 \-v mq-plugins:/plugins \--name mq \--hostname mq \-p 15672:15672 \-p 5672:5672 \--network networkname # 网络名\-d \rabbitmq:3.8-management
可以看到在安装命令中有两个映射的端口:
-
15672:RabbitMQ提供的管理控制台的端口
-
5672:RabbitMQ的消息发送处理接口
安装完成后,我们访问 http://虚拟机ip:15672即可看到管理控制台。首次访问需要登录,默认的用户名和密码在命令中已经指定了。
二、基本构造
其中包含几个概念:
-
publisher
:生产者,也就是发送消息的一方 -
consumer
:消费者,也就是消费消息的一方 -
queue
:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理,可通过控制台的Queue选项卡进行管理 -
exchange
:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列,可通过控制台的Exchange选项卡进行管理 -
virtual host
:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue,通过控制台的Admin选项卡进行管理
SpringAMQP
一、概述
我们开发业务功能的时候,肯定不会在控制台收发消息,而是应该基于编程的方式。由于RabbitMQ
采用了AMQP协议,因此它具备跨语言的特性。任何语言只要遵循AMQP协议收发消息,都可以与RabbitMQ
交互。并且RabbitMQ
官方也提供了各种不同语言的客户端。
但是,RabbitMQ官方提供的Java客户端编码相对复杂,一般生产环境下我们更多会结合Spring来使用。而Spring的官方刚好基于RabbitMQ提供了这样一套消息收发的模板工具:SpringAMQP。并且还基于SpringBoot对其实现了自动装配,使用起来非常方便。
SpringAmqp的官方地址:Spring AMQP
SpringAMQP提供了三个功能:
-
自动声明队列、交换机及其绑定关系
-
基于注解的监听器模式,异步接收消息
-
封装了RabbitTemplate工具,用于发送消息
二、使用
1、引入依赖
<!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
2、修改配置
spring:rabbitmq:host: 192.168.88.130 # 你的虚拟机IPport: 5672 # 端口virtual-host: / # 虚拟主机username: admin # 用户名password: 123456 # 密码
3、发送消息示例:
private RabbitTemplate rabbitTemplate;@Testpublic void testSimpleQueue() {// 队列名称String queueName = "simple.queue";// 消息String message = "hello, spring amqp!";// 发送消息rabbitTemplate.convertAndSend(queueName, message);}
4、接收消息示例:
@Component
public class SpringRabbitListener {// 利用RabbitListener来声明要监听的队列信息// 将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。// 可以看到方法体中接收的就是消息体的内容@RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage(String msg) throws InterruptedException {System.out.println("spring 消费者接收到消息:【" + msg + "】");}
}
WorkQueues:任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。
5、交换机类型
在之前的两个测试案例中,都没有交换机,生产者直接发送消息到队列。而一旦引入交换机,消息发送的模式会有很大变化:
可以看到,在订阅模型中,多了一个exchange角色,而且过程略有变化:
-
Publisher:生产者,不再发送消息到队列中,而是发给交换机
-
Exchange:交换机,一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
-
Queue:消息队列也与以前一样,接收消息、缓存消息。不过队列一定要与交换机绑定。
-
Consumer:消费者,与以前一样,订阅队列,没有变化
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
交换机的类型有四种:
-
Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机
-
Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列
-
Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符
-
Headers:头匹配,基于MQ的消息头匹配,用的较少。
Fanout交换机
Fanout,英文翻译是扇出,我觉得在MQ中叫广播更合适。
在广播模式下,消息发送流程是这样的:
-
创建一个名为
hmall.fanout
的交换机,类型是Fanout
-
创建两个队列
fanout.queue1
和fanout.queue2
,绑定到交换机hmall.fanout
-
在
consumer
服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2 -
在publisher中编写测试方法,向
test.fanout
发送消息
说白了,就是发送给Fanout交换机的消息,Fanout交换机会将消息转发给所有绑定它的队列(广播)
发送示例:
@Test
public void testFanoutExchange() {// 交换机名称String exchangeName = "test.fanout";// 消息String message = "hello, everyone!";rabbitTemplate.convertAndSend(exchangeName, "", message);
}
接收示例:
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}
Direct交换机
在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
-
声明一个名为
hmall.direct
的交换机 -
声明队列
direct.queue1
,绑定hmall.direct
,bindingKey
为blud
和red
-
声明队列
direct.queue2
,绑定hmall.direct
,bindingKey
为yellow
和red
-
在
consumer
服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2 -
在publisher中编写测试方法,向
hmall.direct
发送消息
说白了,就是在绑定队列与交换机时添加了一个RoutingKey
(路由key),相当于令牌, 利用这个RoutingKey做身份验证,将消息发送给需要的消费者
发送示例:
@Test
public void testSendDirectExchange() {// 交换机名称String exchangeName = "test.direct";// 消息String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "red", message);
}
接收示例:
@RabbitListener(queues = "direct.queue1")
public void listenDirectQueue1(String msg) {System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}@RabbitListener(queues = "direct.queue2")
public void listenDirectQueue2(String msg) {System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}
Topic交换机
Topic
类型的Exchange
与Direct
相比,都是可以根据RoutingKey
把消息路由到不同的队列。
只不过Topic
类型Exchange
可以让队列在绑定BindingKey
的时候使用通配符!
通配符规则:
-
#
:匹配一个或多个词 -
*
:匹配不多不少恰好1个词
举例:
-
item.#
:能够匹配item.spu.insert
或者item.spu
-
item.*
:只能匹配item.spu
图示:
假如此时publisher发送的消息使用的RoutingKey
共有四种:
-
china.news
代表有中国的新闻消息; -
china.weather
代表中国的天气消息; -
japan.news
则代表日本新闻 -
japan.weather
代表日本的天气消息;
解释:
-
topic.queue1
:绑定的是china.#
,凡是以china.
开头的routing key
都会被匹配到,包括:-
china.news
-
china.weather
-
-
topic.queue2
:绑定的是#.news
,凡是以.news
结尾的routing key
都会被匹配。包括:-
china.news
-
japan.news
-
至于收发消息,与DIrect交换机一致
声明队列和交换机
可以在控制台通过图形化界面手动声明(很简单不赘述)
也可以直接在程序中声明,SpringAMQP提供了一个Queue类,用来创建队列:
示例:
/*** 声明交换机* @return Direct类型交换机 */@Beanpublic DirectExchange directExchange(){return ExchangeBuilder.directExchange("hmall.direct").build(); //FanoutExchange topicExchange}/*** 声明队列*/@Beanpublic Queue directQueue1(){return new Queue("direct.queue1");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("red");}
用注解声明:
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}
配置JSON转换器
默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:
-
数据体积过大
-
有安全漏洞
-
可读性差
显然,JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。
首先在publisher
和consumer
两个服务中都引入依赖:
<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version>
</dependency>
注意,如果项目中引入了spring-boot-starter-web
依赖,则无需再次引入Jackson
依赖。
配置消息转换器,在publisher
和consumer
两个服务的启动类中添加一个Bean即可:
@Bean
public MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;
}
消息转换器中添加的messageId可以便于我们将来做幂等性判断。