目录
初识MQ
同步通讯和异步通讯编辑
同步通讯
同步调用存在的问题
总结
同步调用优点:
同步调用的问题:
异步通讯
事件驱动优势
总结
什么是MQ
RabbitMQ快速入门
RabbitMQ概述和安装
RabbitMQ结构和概念编辑
总结
常见消息模型
不同消息模型
HelloWorld案例
案例:完成官方Demo中Hello world案例
总结
SpringAMQP
什么是AMQP
什么是SprngAMQP
Basic Queue 简单队列模型
案例:利用SpringAMQP实现Helloworld中的基础消息队列功能
发送消息
总结
接收消息
总结
Work Queue 工作队列模型编辑
案例:模拟Work Queue,实现一个队列绑定多个消费者
总结
发布,订阅模型-Fanout编辑
案例:利用SpringAMQP演示FanoutExchange的使用
总结
发布,订阅模型-Direct
案例:利用SpringAMQP演示DirectExchange的使用
总结
发布,订阅模型-Topic
案例:利用SpringAMQP演示TopicExchange的使用
总结
消息转换器
测试发送Object类型消息
修改序列化(推荐JSON序列化)
发送消息
接送消息
总结
初识MQ
同步通讯和异步通讯
同步通讯
微服务间基于Feign的调用就属于同步方式,存在一些问题
同步调用存在的问题
-
耦合度高:每次加入新需求,都要修改原来的代码
-
性能下降:调用者需要等待服务提供者响应,如果调用链过长则响应时间等于每次调用时间之和
-
浪费资源:调用链中的每个服务在等待响应过程中,不能释放请求占用的资源,高并发场景下回极度浪费系统资源
-
级联失败:如果服务提供者出现问题,所有调用方都会跟着出问题,如同多米诺骨牌一样,迅速导致镇整个微服务群故障
总结
同步调用优点:
-
时效性较强,可以立即得到结果
同步调用的问题:
-
耦合度高
-
性能和吞吐能力下降
-
有额外的资源消耗
-
有级联失败问题
异步通讯
异步调用常见实现就是事件驱动模式
事件驱动优势
优势一:服务解耦
优势二:新能提示,吞吐量提高
优势三:服务没有强依赖,不担心级联失败问题
优势四:流浪削峰
总结
异步通信的优点:
-
耦合度低
-
吞吐量提示
-
故障隔离
-
流量削峰
异步通讯的缺点:
-
依赖于Broker的可靠性,安全性,吞吐能力
-
架构复杂了,业务没有明显的流程线,不好追踪管理
什么是MQ
MQ(MessageQueue),中文解释是消息队列,字面来看就是存放消息的队列,也就是时间驱动架构中的Broker。
RabbitMQ快速入门
RabbitMQ概述和安装
RabbitMQ是基于Erlang语言开发的开源的消息中间件,官网地址:RabbitMQ: easy to use, flexible messaging and streaming | RabbitMQ
安装RabbitMQ,参考课前资料
Docker运行RabbitMQ:
docker run \-e RABBITMQ_DEFAULT_USER=itcast \-e RABBITMQ_DEFAULT_PASS=123321 \--name mq \--hostname mq1 \-p 15672:15672 \-p 5672:5672 \-d \rabbitmq:3-management
输入端口,输入密码,进入RabbitMQ
RabbitMQ结构和概念
总结
RabbitMQ中的几个概念:
-
channel:操作MQ的工具
-
exchange:路由消息到队列中
-
queue:缓存消息
-
cirtual host:虚拟主机,是对queue,exchange等资源的逻辑分组
常见消息模型
MQ的官方文档中给出了5个MQ的Demo示例,对应了几种不同的用法:
-
基本消息队列(BasicQueue)
-
工作消息队列(WorkQueue)
-
发布订阅(Publish,Subscribe),又根据交换机类型不同分为三种:
-
Fanout Exchange:广播
-
Diect Exchange:路由
-
Topic Exchange:主题
-
不同消息模型
HelloWorld案例
官方的HelloWorld是基于最基础的消息队列模型来实现的,只包括三个角色:
-
publisher:消息发布者,将消息发送到队列queue
-
queue:消息队列,负责接收并缓存消息
-
consumer:订阅列队,处理队列中的消息
案例:完成官方Demo中Hello world案例
实现步骤:
-
导入课前资料中的demo工程
-
运行publisher服务中测试类publisherTest中的测试方法testSendMessage()
-
查看RabbitMQ控制台的消息
-
启动consumer服务,查看是否能接收消息
总结
基本消息队列的消息发送流程:
-
创建connection
-
创建channel
-
利用channel声明队列
-
利用channel向队列发送消息
基本消息队列的消息接收流程:
-
创建connection
-
创建channel
-
利用channel声明队列
-
定义consumer的消费行为handleDelivery()
-
利用channel将消费者与队列绑定
SpringAMQP
什么是AMQP
Advanced Message Queuing Protocol,适用于在应用程序或之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。
什么是SprngAMQP
Spring AMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分,其中Spring-AMPQ是基础抽象,Spring-rabbit是底层的默认实现
SpringAmqp的官方地址:Spring AMQP
Basic Queue 简单队列模型
案例:利用SpringAMQP实现Helloworld中的基础消息队列功能
发送消息
流程如下:
-
在父工程中引入spring-adqp的依赖
<!-- AMQP依赖,包含RabbitMQ--> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId> </dependency>
-
在publisher服务中利用RabbitTemplate发送消息到simple.queue这个队列
-
在publisher服务中编写application.yml,添加mq连接信息:
spring:rabbitmq:host: ip #(填写自己的ip)port: 5672username: itcastpassword: 123321virtual-host: /
-
在publisher服务中新建一个测试类,编写测试方法:
@RunWith(SpringRunner.class) @SpringBootTest public class SpringAmqpTest { @Autowiredprivate RabbitTemplate rabbitTemplate; @Testpublic void testSendMessage2SimpleQueue(){String queueName = "simple.queue";String message = "Hello World!spring amqp!!";rabbitTemplate.convertAndSend(queueName, message);} }
-
-
在consumer服务中编写消费逻辑,绑定simple.queue这个队列
String queueName = "simple.queue";String message = "Hello World!spring amqp!!";rabbitTemplate.convertAndSend(queueName, message);
总结
什么是AMQP?
应用间消息通信的一种协议,与语言和平台无关。
SpringAMQP如何发送消息?
-
引入amqp的starter依赖
-
配置RabbitMQ地址
-
利用RabbitTemplate的convertAndSend方法
接收消息
在consumer中编写消费逻辑,监听simple.queue
-
在consumer服务中编写application.yml,添加mq连接信息:
spring:rabbitmq:host: ipport: 5672username: itcastpassword: 123321virtual-host: /
-
在consumer服务中新建一个类,编写消费逻辑:
@Component public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")public void listenSimpleQueue(String msg){System.out.println("消费者接收到simple.queue的消息:{"+msg+"}");} }
总结
SpringAMQP如何接收消息?
-
映入amqp的starter依赖
-
配置RabbitMQ地址
-
定义类,添加@Component注解
-
类中声明方法,添加@RabbitListener注解,方法参数就时消息
Work Queue 工作队列模型
案例:模拟Work Queue,实现一个队列绑定多个消费者
基本思路如下:
-
在publisher服务中定义测试方法,每秒产生50条消息,发送到simple.queue
@Test public void testSendMessage2WorkQueue() throws InterruptedException {String queueName = "simple.queue";String message = "Hello World!spring amqp!!__";for (int i = 1; i < 50; i++) {rabbitTemplate.convertAndSend(queueName, message+i);Thread.sleep(20);} }
-
在consumer服务中定义两个消息监听者,都监听simple.queue队列
@RabbitListener(queues = "simple.queue") public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println("消费者1接收到simple.queue的消息:{"+msg+"}"+ LocalTime.now());Thread.sleep(20); } @RabbitListener(queues = "simple.queue") public void listenWorkQueue2(String msg) throws InterruptedException {System.out.println("消费者2接收到simple.queue的消息:{"+msg+"}"+ LocalTime.now());Thread.sleep(50); }
-
消费者 1 每秒处理50条消息,消费者 2 每秒处理10条消息
总结
Work模型的使用:
-
多个消费者绑定到一个队列,同一条信息只会被一个消费者处理
-
通过设置prefetch来控制消费者预取的消息数量
发布,订阅模型-Fanout
发布订阅与之前案例的区别就是允许将同一消息发送给多个消费者。实现方式是加入exchange(交换机)。
案例:利用SpringAMQP演示FanoutExchange的使用
实现思路
-
在consumer服务中,利用代码声明队列,交换机,将两者绑定
在consumer服务声明Exchang,Queue,Bingding
SpringAMQP提供了交换机,队列,绑定关系的API,例如:
@Configuration public class FanoutConfig {//itcast.fanout(交换机)@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("itcast.fanout");}//itcast.queue1(队列一)@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}//绑定队列1到交换机@Beanpublic Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}//itcast.queue2(队列二)@Beanpublic Queue fanoutQueue2(){return new Queue("fanout.queue2");}//绑定队列2到交换机@Beanpublic Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}}
-
在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2
@RabbitListener(queues = "fanout.queue1") public void listenFanoutQueue1(String msg){System.out.println("消费者2接收到fanout.queue1的消息:{"+msg+"}"); }@RabbitListener(queues = "fanout.queue2") public void listenFanoutQueue2(String msg){System.out.println("消费者2接收到fanout.queue2的消息:{"+msg+"}"); }
-
在publisher中编写测试方法,向itcast.fanout发送消息
@Testpublic void testSendFanoutExchange(){//交换机名称String exchangeName = "itcast.fanout";//消息String message = "hello,every one!";//发送消息,参数分别是:交换机名称,RoutingKey(暂时为空),消息rabbitTemplate.convertAndSend(exchangeName, "", message);}
总结
交换机的作用是什么?
-
接收publisher发送的消息
-
将消息按照规则路由到与之绑定的队列
-
不能缓存消息,路由失败,消息丢失
-
FanoutExchange的会将消息路由到每个绑定的队列
声明队列,交换机,绑定关系的Bean是什么?
-
Queue
-
FanoutExchange
-
Binding
发布,订阅模型-Direct
发布订阅-DirectExchange
Direct Exchange会将接收到的消息根据规则路由指定的Queue,因此称为路由模式(routes)。
案例:利用SpringAMQP演示DirectExchange的使用
实现思路如下:
实现思路如下:
-
利用@RabbitListener声明Exchange,Queue,RoutingKey
-
在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2
-
并利用@RabbitListener声明Exchange,Queue,RoutingKey
-
-
在consumer服务中,编写两个消费则方法,分别监听direct.queue1和direct.queue2
@RabbitListener(bindings = @QueueBinding(//队列value = @Queue(name = "direct.queue1"),//交换机exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),//邦定机置key = {"red","blue"} )) public void listenDirectQueue1(String msg){System.out.println("消费者接收到direct.queue1的消息:{"+msg+"}"); }@RabbitListener(bindings = @QueueBinding(//队列value = @Queue(name = "direct.queue2"),//交换机exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),//邦定机置key = {"red","yellow"} )) public void listenDirectQueue2(String msg){System.out.println("消费者接收到direct.queue2的消息:{"+msg+"}"); }
-
在publisher中编写测试方法,向itcast.direct发送消息
@Test public void testSendDirectExchange(){//交换机名称String exchangeName = "itcast.direct";//消息String message = "hello,blue one!";//发送消息,参数分别是:交换机名称,RoutingKey("red"),消息rabbitTemplate.convertAndSend(exchangeName, "red", message); }
总结
描述下Direct交换机与Fanout交换机的差异?
-
Fanout交换机将消息路由给每一个与之绑定的队列
-
Direct交换机根据RoutingKey判断路由给哪个队列
-
如果多个队列具有相同的RoutingKey,则与Fanout功能类似
基于@RabbitListener注解声明队列和交换机有哪些常见的注解?
-
@Queue
-
@Exchange
发布,订阅模型-Topic
发布订阅-TopicExchange
TopicExchange与DirectExchange类似,区别在于routingKey必须是多个单纯的列表,并且以 . 分割。
例如:
china.news:代表有中国新闻消息;
china.weather:代表中国的天气消息;
japan.news:代表日本新闻;
japan.weather:代表日本的天气消息;
案例:利用SpringAMQP演示TopicExchange的使用
实现思路如下:
-
利用@RabbitListener声明Exchange,Queue,RoutingKey
-
在consumer服务中,编写两个消费则方法,分别监听topic.queue1和topic.queue2
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC),key = "china.#" )) public void listenTopicQueue1(String msg){System.out.println("消费者接收到topic.queue1的消息:{"+msg+"}"); }@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC),key = "#.news" )) public void listenTopicQueue2(String msg){System.out.println("消费者接收到topic.queue2的消息:{"+msg+"}"); }
-
在publisher中编写测试方法,向itcast.topic发送消息
@Test public void testSendTopicExchange(){//交换机名称String exchangeName = "itcast.topic";//消息String message = "中国的新闻!";//发送消息,参数分别是:交换机名称,RoutingKey,消息rabbitTemplate.convertAndSend(exchangeName, "china.news", message); }
总结
描述下Direct交换机与Topic交换机的差异?
自己总结
消息转换器
测试发送Object类型消息
说明:在SpringAMQP的发送方法中,接收消息的类型是Object,也就是说我们可以发送任意对象类型的消息,SpringAMQP会帮我们序列化为字节后发送。(也就是是对象会序列化为字节)
修改序列化(推荐JSON序列化)
发送消息
-
我们在publisher服务引入依赖
<!--rabbitmq使用json序列化--> <dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId> </dependency>
-
我们在publisher服务声明MessageConverter:
@Bean public MessageConverter messageConverter(){return new Jackson2JsonMessageConverter(); }
-
发送消息
@Test public void testSendObjectQueue(){Map<String,Object> msg = new HashMap<>();msg.put("name","留言");msg.put("age",21);rabbitTemplate.convertAndSend("object.queue",msg); }
接送消息
-
我们在consumer服务引入Jackson依赖:
<!--rabbitmq使用json序列化--> <dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId> </dependency>
-
我们在consumer服务定义MessageConverter:
@Bean public MessageConverter messageConverter(){return new Jackson2JsonMessageConverter(); }
-
然后定义一个消费者,监听object.queue队列并消费信息:
@RabbitListener(queues = "object.queue") public void listenObjectQueue(Map<String,Object> msg){System.out.println("消费者接收到object.queue的消息:{"+msg+"}"); }
总结
SpringAMQP中消息的序列化和反序列化是怎么实现的?
-
利用MessageConverter实现的,默认是JDK的序列化
-
注意发送方与接收方必须使用相同的MessageConverter