目录
- 1. RabbitMQ入门
- 1-1. 同步调用
- 1-2. 异步调用
- 1-3. MQ技术选型
- 1-4. RabbitMQ介绍
- 消息模式
- 1-5. SpringAMQP
- Basic Queue
- Work Queue
- Fanout Exchange
- Direct Exchange
- Topic Exchange
- 消息转换器
1. RabbitMQ入门
1-1. 同步调用
优势:
- 时效性强,等待到结果后才返回。
问题:
- 扩展性差
- 性能下降
- 级联失败问题
1-2. 异步调用
异步调用方式其实就是基于消息通知的方式,一般包含三个角色:
- 消息发送者:投递消息的人,就是原来的调用方
- 消息代理:管理、暂存、转发消息,你可以把它理解成微信服务器
- 消息接收者:接收和处理消息的人,就是原来的服务提供方
异调用的优势是什么?
- 耦合度低,拓展性强
- 异步调用,无需等待,性能好
- 故障隔离,下游服务故障不影响上游业务
- 缓存消息,流量削峰填谷
异步调用的问题是什么?
- 不能立即得到调用结果,时效性差
- 不确定下游业务执行是否成功
- 业务安全依赖于Broker(消息代理)的可靠性
1-3. MQ技术选型
MQ(MessageQueue), 中文是消息队列,字面来看就是存放消息的队列。也就是异步调用中的Broker。
1-4. RabbitMQ介绍
RabbitMQ的整体架构及核心概念:
virtual-host
:虚拟主机,起到数据隔离的作用publisher
:消息发送者consumer
:消息的消费者queue
:队列,存储消息exchange
:交换机,负责路由消息,不能存储消息
消息模式
消息队列的消息分为二类传输模型:点对点模型、发布 /订阅模型
RabbitMQ在此基础上进行细化,提供了5种消息模型:
- 1、2(点对点模型)
- 3、4、5(发布/订阅模型)
1-5. SpringAMQP
AMQP:是用于在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。
Spring AMQP:基于AMQP协议定义的一套API,提供了模板来发送和接收消息,模板底层是基于RabbitMQ
封装。
Basic Queue
生产者
① 在application.yml中添加MQ配置
spring:rabbitmq:host: 192.168.136.131 # 主机名port: 5672 # 端口virtual-host: itheima # 虚拟主机username: root # 用户名password: root # 密码
② 注入RabbitTemplate ,并利用RabbitTemplate实现消息发送
@SpringBootTest
@RunWith(SpringJUnit4ClassRunner.class)
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;//发送简单消息@Testpublic void testSimpleQueue() {//参数一: 队列名称(次队列需要是提前创建好的) 参数二: 消息内容rabbitTemplate.convertAndSend("p2p", "hello,spring amqp!");}
}
消费者
① 在application.yml中添加MQ配置
spring:rabbitmq:host: 192.168.136.131 # 主机名port: 5672 # 端口virtual-host: itheima # 虚拟主机username: root # 用户名password: root # 密码
② 创建com.itheima.listener.SpringRabbitListener
@Component
public class SpringRabbitListener {//简单类型消息@RabbitListener(queues = "p2p")//声明监听的队列名称public void listenSimpleQueueMessage(String msg) {System.out.println("消费者接收到消息:【" + msg + "】");}
}
Work Queue
当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。
此时就可以使用work
模型,多个消费者共同处理消息处理,速度就能大大提高了。
生产者
这次我们循环发送,模拟大量消息堆积现象。
在publisher服务中的SpringAmqpTest类中添加一个测试方法:
//批量发送消息@Testpublic void testWorkQueue() {for (int i = 1; i <= 50; i++) {rabbitTemplate.convertAndSend("p2p", "message_" + i);}}
消费者
要模拟多个消费者绑定同一个队列,我们在consumer服务的SpringRabbitListener中添加2个新的方法:
//使用下面两个方法来接收p2p队列中的消息@RabbitListener(queues = "p2p")public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println("消费者1接收到消息:【" + msg + "】");Thread.sleep(20);}@RabbitListener(queues = "p2p")public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println("消费者2........接收到消息:【" + msg + "】");Thread.sleep(200);}
先启动ConsumerApplication后(消费者),再执行publisher服务中刚刚编写的发送测试方法testWorkQueue(提供者)。
可以看到消费者1很快完成了自己的25条消息。消费者2却在缓慢的处理自己的25条消息。
也就是说消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。这样显然是有问题的。
能者多劳
在spring中有一个简单的配置,可以解决这个问题。我们修改consumer服务的application.yml文件,添加配置:
spring:rabbitmq:listener:simple:prefetch: 1 # 消费者一次处理一条消息,处理完毕后再从MQ中获取
小结
Work模型的使用:
- 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
- 通过设置
prefetch
来控制消费者预取的消息数量 - 缺点:如果有多个模块监听,都需要这条消息,但是Work模型的消息只会被一个模块的消费者处理
Fanout Exchange
Fanout,在MQ中可以理解为广播模式。
声明队列和交换机
我们习惯在消费者一方来创建交换机和队列
@Configuration
public class FanoutConfiguration {// 声明交换机@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange("fanout.exchange");}// 声明第1个队列@Beanpublic Queue fanoutQueue1() {return new Queue("fanout.queue1");}// 队列1绑定交换机@Beanpublic Binding bindingQueue1(FanoutExchange fanoutExchange, Queue fanoutQueue1) {return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}// 声明第2个队列@Beanpublic Queue fanoutQueue2() {return new Queue("fanout.queue2");}// 队列2绑定交换机@Beanpublic Binding bindingQueue2(FanoutExchange fanoutExchange, Queue fanoutQueue2) {return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}
消费者
//使用下面两个方法来测试Fanout类型的消息
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}
生产者
// 发送fanout消息
@Test
public void testFanoutExchange() throws Exception {//参数一: 交换机名称 参数二:暂时没用 参数三: 消息内容rabbitTemplate.convertAndSend("fanout.exchange", "", "hello,everyone!");
}
小结
交换机的作用是什么?
- 接收publisher发送的消息
- FanoutExchange的会将消息路由到每个绑定的队列
- 不能缓存消息,路由失败,消息丢失
Direct Exchange
在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange
注解声明队列和交换机(消费者)
基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。
在consumer的SpringRabbitListener中添加两个消费者,同时基于注解来声明队列和交换机:
//使用下面两个方法来测试Direct类型的消息
// direct监听器1
@RabbitListener(bindings =@QueueBinding( // 完成队列绑定交换机value = @Queue("direct.queue1"), // 创建队列exchange = @Exchange(value = "direct.exchange",type = ExchangeTypes.DIRECT), // 创建交换机并绑定队列key ={"vip","base"} // 指定routing key
))
public void listenDirectQueue1(String msg){System.out.println("direct消费者1接收消息:【" + msg + "】");
}// direct监听器2
@RabbitListener(bindings =@QueueBinding(value = @Queue("direct.queue2"),exchange = @Exchange(value = "direct.exchange",type = ExchangeTypes.DIRECT),key ={"base"}
))
public void listenDirectQueue2(String msg){System.err.println("direct消费者2接收消息:【" + msg + "】");
}
生产者
// 发送direct消息
@Test
public void testSendDirect() throws Exception {//参数一: 交换机名称 参数二:routingKey 参数三: 消息内容rabbitTemplate.convertAndSend("direct.exchange", "vip", "hello,everyone!");
}
小结
描述下Direct交换机与Fanout交换机的差异?
- Fanout交换机将消息路由给每一个与之绑定的队列
- Direct交换机根据
RoutingKey
判断路由给哪个队列
Topic Exchange
Topic类型的类型Exchange与Direct
相比,可以让队列在绑定Routing key
的时候使用通配符!
Routingkey
一般都是有一个或多个单词组成,多个单词之间以”.
”分割,例如: china.news
通配符规则:
#
:代指0个或多个单词
*
:代指一个单词
注解声明队列和交换机(消费者)
//使用下面两个方法来测试Topic类型的消息@RabbitListener(bindings = @QueueBinding(value = @Queue("topic.queue1"),exchange = @Exchange(value = "topic.exchange", type = ExchangeTypes.TOPIC),key = "china.#"))public void listenTopicQueue1(String msg) {System.out.println("消费者1接收到Topic消息:【" + msg + "】");}@RabbitListener(bindings = @QueueBinding(value = @Queue("topic.queue2"),exchange = @Exchange(value = "topic.exchange", type = ExchangeTypes.TOPIC),key = "#.news"))public void listenTopicQueue2(String msg) {System.out.println("消费者2接收到Topic消息:【" + msg + "】");}
生产者
// 发送topic消息
@Test
public void testTopicExchange() throws Exception {//参数一: 交换机名称 参数二:routingKey 参数三: 消息内容rabbitTemplate.convertAndSend("topic.exchange", "china.news", "喜报!孙悟空大战孙行者,胜!!!");
}
小结
描述下Direct交换机与Topic交换机的差异?
- Topic交换机接收的消息RoutingKey可以是多个单词,以
.
分割 - Topic交换机与队列绑定时的bindingKey可以指定通配符
#
:代表0个或多个词*
:代表1个词
消息转换器
在SpringAMQP的发送方法中,Spring会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。
只不过,默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:
- 数据体积过大
- 可读性差
配置JSON转换器
显然,JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON
方式来做序列化和反序列化。
① 在父工程中引入依赖
<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.9.10</version>
</dependency>
② 在publisher和consumer两个服务启动类中添加json转换器
@Bean
public MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter();
}