同步和异步通讯
同步通讯:
需要实时响应,时效性强
耦合度高
每次增加功能都要修改两边的代码
性能下降
需要等待服务提供者的响应,如果调用链过长则每次响应时间需要等待所有调用完成
资源浪费
调用链中的每个服务在等待响应过程中,不能释放请求占用的资源,高并发场景下会极度浪费系统资源
级联失败
如果一个服务的提供者出现了问题,所有调用方都会出问题,出现雪崩问题
异步通讯:
在发布方和接收方之间存在中间人(Broker)
发布方只需将消息发布到中间人
接受方只需要从中间人订阅消息
实现解耦
吞吐量提升
无需等待订阅者处理完成,响应快速
故障隔离
服务不存在直接调用,不存在级联失败问题
资源占用问题
调用之间不会阻塞,不会产生无效资源占用问题
解耦
每个服务之间灵活插拔,实现解耦
流量削峰
所有发布事件由Broker直接接受,接受者按照自己的速度从Broker处理事件,实现缓冲
MQ
MessageQueue消息队列
即上述过程中的Broker
几种常见的MQ对比
MQ的基本结构
publisher:发布者
consumer:消费者
exchange:交换机,负责消息路由
queue:队列,存储消息,消息的缓冲区
队列绑定交换机
virtualHost:虚拟主机
channel:表示通道,操作MQ的工具,连接消息发布者和交换机,连接消息接受者和队列
RabbitMQ整体工作流程
发布者发布消息给交换机
交换机将消息路由到与其绑定的队列
消费者监听与其对应的队列获取消息
RabbitMQ消息模型
生产者->(交换机)->队列->消费者
基本消息队列BasicQueue
工作消息队列WorkQueue
多个消费者并发消费队列,消费者之间是竞争关系
发布订阅(Publish,Subscribe)
根据交换机类型不同分为三种
广播
消费者各自拥有
生产者将消息发送到交换机,具体发给哪个队列,生产者无法决定,由交换机决定.
交换机把消息发送给绑定过的所有队列,队列的所有消费者都能拿到消息.实现一条消息被多个消费者消费
生产者->所有消费者
路由
需求不同的消息被不同队列消费,就需要用到Direct类型的Exchange.在Direct模型下,需要指定RoutingKey(路由key).在消息发送方向交换机发送消息时,必须指定消息的路由key
交换机在接受到生产者消息后,将消息递交给routingkey完全匹配的队列
主题
topic类型相当于可以使用通配符匹配routingkey的路由类型
通配符规则
#:匹配一个或者多个词
*:匹配恰好一个词
SpringAMQP
Spring官方基于RabbitMQ提供的一套消息收发的模版工具:SpringAMQP
提供了三个功能:
自动声明队列,交换机以及绑定关系
基于注解的监听器模式,异步接收消息
封装RabbitTemplate工具用于发送消息
引入依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
实现通信
新建队列->publisher发送->consumer接收
基于rabbitTemplate API进行发送
简单队列通信
发送:
@GetMapping("/simple")public void publishMessage(){String queueName = "simplequeue";String message = "simplequeuemessage";rabbitTemplate.convertAndSend(queueName,message);System.out.println("simple_success");}
接收:
@Component
public class MqListener {@RabbitListener(queues = "simplequeue")public void simpleListener(String msg){System.out.printf("%s 简单队列收到消息:%s", Convert.toStr(LocalDateTime.now()),msg);}
}
工作队列通信
多个消费者监听一个队列,不同消费者因为自身的能力不同对消息处理的时间也不同
如果不进行额外设置的话,会将队列中的消息平均分配给所有消费者
造成处理能力浪费的情况
所以我们可以通过配置
listener:simple:prefetch: 1#每个消费者每次只能取一条
来限制每个消费者预取的数量,实现能者多劳的工作场景
发送:
@GetMapping("/work")public void PublishWorkMessage(){String queueName = this.WORK;for (int i = 0; i < 100; i++) {rabbitTemplate.convertAndSend(queueName,System.currentTimeMillis());}}
接收:
@RabbitListener(queues = "workqueue")public void workListener1(String msg){System.out.printf("%s 工作队列1收到消息:%s\n", Convert.toStr(LocalDateTime.now()),msg);}@RabbitListener(queues = "workqueue")public void workListener2(String msg){System.out.printf("%s 工作队列2收到消息:%s\n", Convert.toStr(LocalDateTime.now()),msg);}
交换机
上述两种工作方式都是不包含交换机,消息直接发送到队列的通信方式
我们可以通过引入交换机来实现消息的路由,决定具体要发送到哪个队列
消息通信流程 发布者->交换机->交换机决定的队列->消费者
交换机类型
包含以下四种:
FanOut:广播,将消息传递给所有绑定交换机的队列
Direct:基于RoutingKey发送消息给对应的队列
Topic:通配符订阅,基于通配符RoutingKey发送消息给对应的队列
Headers:头匹配,基于MQ的消息头匹配
FanOut
创建多个队列->创建交换机进行绑定->发布者发布->消费者接收
发布:
@GetMapping("/fanout")public void publishFanoutMessage(){String exchangeName = this.FANOUT;for (int i = 0; i < 10; i++) {rabbitTemplate.convertAndSend(exchangeName,"",System.currentTimeMillis() + "from:fanout");}}
接收:
@RabbitListener(queues = "cfjg_queue1")public void queue1Listener(String msg){System.out.printf("%s 队列1收到消息:%s\n", Convert.toStr(LocalDateTime.now()),msg);}@RabbitListener(queues = "cfjg_queue2")public void queue2Listener(String msg){System.out.printf("%s 队列2收到消息:%s\n", Convert.toStr(LocalDateTime.now()),msg);}
Direct
创建多个队列->创建交换机进行路由key绑定->发布者发布->消费者接收
发布:
@GetMapping("/direct")public void publishDirectMessage(){String exchangeName = this.DIRECT;for (int i = 0; i < 10; i++) {if(i%2 == 0){rabbitTemplate.convertAndSend(exchangeName,this.Queue1,"directTo:queue1--" + i);}else if(i%2 != 0){rabbitTemplate.convertAndSend(exchangeName,this.Queue2,"directTo:queue2--" + i);}if(i%5 == 0){rabbitTemplate.convertAndSend(exchangeName,this.ALL,"directTo:all--" + i);}}}
接收:
@RabbitListener(queues = "cfjg_queue1")public void queue1Listener(String msg){System.out.printf("%s 队列1收到消息:%s\n", Convert.toStr(LocalDateTime.now()),msg);}@RabbitListener(queues = "cfjg_queue2")public void queue2Listener(String msg){System.out.printf("%s 队列2收到消息:%s\n", Convert.toStr(LocalDateTime.now()),msg);}
Topic
创建多个队列->创建交换机进行通配符路由key绑定->发布者发布->消费者接收
路由key由多个由.分割的单词组成
绑定时使用#或*通配符进行绑定
#:代表任意多个单词
*.代表任意一个单词
eg:cfjg.test
可以由cfjg.#或者*.test进行匹配
发布:
@GetMapping("/topic")public void publishTopicMessage(){for (int i = 0; i < 100; i++) {String tmp = i % 2 == 0 ? "two" : "one";String routingKey = "cfjg." + tmp;System.out.println(routingKey);rabbitTemplate.convertAndSend(this.TOPIC,routingKey,"topicTo:queue--" + i);}}
接收:
@RabbitListener(queues = "cfjg_queue1")public void queue1Listener(String msg){System.out.printf("%s 队列1收到消息:%s\n", Convert.toStr(LocalDateTime.now()),msg);}@RabbitListener(queues = "cfjg_queue2")public void queue2Listener(String msg){System.out.printf("%s 队列2收到消息:%s\n", Convert.toStr(LocalDateTime.now()),msg);}
声明队列和交换机
SpringAMQP提供了一个Queue类用来创建队列
public class Queue extends AbstractDeclarable implements Cloneable {}
提供了一个Exchange接口用来表示不同类型的交换机
SpringAMQP提供了ExchangeBuilder和BindingBuilder来简化创建和绑定队列和交换机的过程
我们可以在消费者中编写一个配置类来对队列和交换机进行声明
@Configuration
public class MqConfig {//声明交换机@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("cfjg.fanout");}//声明队列@Beanpublic Queue fanoutQueue1(){return new Queue("test_queue");}//声明绑定@Beanpublic Binding bingingQueue1(){return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());}
}
direct和topic模式需要每个key都进行一次绑定(同控制台操作)
基于注解声明
Spring提供了基于注解方式进行声明的途径
通过注解可以声明
绑定
队列
交换机
路由key
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "cfjg_queue2"),exchange = @Exchange(name = "cfjg.fanout" , type = ExchangeTypes.FANOUT),key = {"red","blue"}))public void fanoutQueue(String msg){System.out.println(msg);}
消息转换器
在MQ的消息传输中,会先将对象序列化为字节,接收消息时将字节反序列化为Java对象
但是默认的JDK序列化存在以下问题
数据体积过大,
存在安全漏洞,
可读性差
可以使用JSON进行序列化和反序列化
引入JackSon依赖来进行JSON序列化
<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version>
</dependency>
通过在配置类中注册转换器Bean来实现消息发送时的自动序列化
@Beanpublic MessageConverter messageConverter(){return new jackson2JsonMessageConverter();}