概览:
MQ基本概念
RabbitMQ入门
基本工作模
1.MQ是什么?
MQ:Message Queue, 存储消息的中间件,是消息发送过程中的暂存容器,主要用于解决分布式系统进程间的通信。
分布式系统通信的两种方式:直接远程调用、借助第三方间接通信
为什么要使用消息中间件?
如有一个电商交易的场景,用户下单之后调用库存系统减库存,然后调用物流系统进行发货,如果刚开始交易,库存,物流都是属于一个系统,那么他们之间就是接口调用。但是随着系统的发展,各个模块业务越来越庞大、业务逻辑越来越复杂,这个时候就必然要做服务化和业务拆分。这个时候就需要考虑这些系统之间是如何交互的。首先想到的就是RPC(Remote Procedure Call),但是随着系统的发展,可能一笔交易后序需要调用几十个接口位于不同系统的接口,比如短信服务、邮件服务等等,这个时候就需要消息中间件来解决问题了。消息中间件最突出的特点就是提供数据传输的可靠性和高效性,主要解决分布式的系统数据传输需求
摘自:为什么使用消息中间件_为什么用消息中间件-CSDN博客
使用MQ的优势:
(1)应用解耦:提高系统容错性和可维护性
整个系统耦合,会导致系统容错性低、可扩展性低、可维护性低。解耦之后,一个系统挂了,其它系统不会有问题。容错性高、可扩展性高、可维护性高。
(2)异步提速:提升用户体验和系统吞吐量.
如果一个订单系统同步执行:则有: 订单进数据库20ms+调用子系统一300ms+调用子系统二300ms+调用子系统三300ms=920ms,用户等待920ms
如果采用异步模式:则有 订单进数据库20ms+消息发到MQ5ms=25ms,用户等待25ms
(3)削峰填谷:提高系统稳定性
请求瞬间增多每秒5000个,但是子系统每秒只能处理1000请求。
削峰指的是:提高子系统的稳定性。加入中间件后,所有请求进入消息队列。帮子系统处理高并发的请求量。
填谷指的是:大量的消息积压在MQ里。子系统每秒从MQ拉取1000个进行处理,直到处理完所有积压的消息。
使用MQ的劣势:
(1)系统可用性降低
引入的第三方插件|依赖越多,系统稳定性越差。
如果MQ宕机,业务功能就会收到影响。需要额外的工作来确保MQ高可用性。
(2)系统复杂性提高
没有MQ时:系统间同步远程调用
引入MQ时:通过MQ异步调用
引发的问题:如何保证消息不被重复消费?如何处理消息丢失?如何保证消息传递的顺序性。
(3)一致性问题
消息:A——>MQ——>B|C|D
如果BC处理成功,D失败了,如何保证数据一致性。
使用MQ应满足什么条件?
- 生产者不需要消费者的反馈。(消费者消费消息后返回值为空,这才能使异步调用成为可能)
- 容许短暂的不一致
- 用了有效果:解耦、提速、削峰等,超过引入MQ的管理成本
常见的MQ产品
2.RabbitMQ概述
RabbitMQ简介:
2007年,Rabbit技术公司 基于AMQP标准开发 Rabbit MQ1.0
使用Erlang语言(一种专门为高并发、分布式系统开发的语言|电信领域使用广泛)
AMQP协议
- Advanced Message Queuing Protocol高级消息队列协议
- 网络协议,应用层协议的一个开放标准,为面向消息的中间件设计。
- 2006年,AMQP规范发布(类比HTTP)
- 基于此协议的客户端与消息中间件之间传递消息,不受客户端/中间件产品、开发语言等限制。
RabbitMQ的组成:
Broker | 中间件,用于接收发信息的用用,如:RabbitMQ Server |
Virtual host | 虚拟机,处于多租户和安全因素设计的,把AMQP基本组件划分到一个虚拟的分组中,类似于网络中的namespace 。 应用场景:用户隔离 多个不同用户使用同一个RabbitMQ时,可以划分出多个vhost,每个用户在自己的vhost创建exchange/queue等。 |
Connection | 链接,publisher/consumer和broker之间建立TCP链接。 |
Channel | connection内部的逻辑链接。作为一个轻量级Connection,极大减少了操作系统建立TCP链接的开销。 存在原因: 如果每次访问MQ都建立链接,消息量大的时候建立TCP链接,开销非常大,效率也低。 所以使用channel内部逻辑链接,如果应用支持多线程,通常每个thread独占一个channel通信。 AMQP method中: 包含channel id帮助客户端和message broker识别channel 所以channel之间是完全隔离的。 |
Exchange | 交换机,message到达broker的第一站,根据分发规则,匹配查询表中的routing key, 分发消息到queue。 常见的类型: direct: point to point topic: publisher-subscribe fanout: multicast |
Queue | 消息队列,消息最终被送到这里等待被消费。 |
Binding | 绑定,exchange 和 queue之间的虚拟连接,binding中包含routing key. Binding信息被保存在exchange的查询表中,是message分发的依据。 |
RabbitMQ的工作模式:
6种,包含:简单模式、work queues、Publish/Subscribe 发布与订阅模式、Routing 路由模式、Topics主题模式、RPC远程调用模式。
RabbitMQ官网:RabbitMQ Tutorials — RabbitMQ
补充:
JMS:java消息服务(Java Message Service)应用程序接口,是java平台关于面向消息的中间件API
JMS是Java EE 规范的一种,类比JDBC.
很多消息中间件都实现了JMS规范,如:ActiveMQ。RabbitMQ没有提供JMS的实现,但开源社区有。
3.RabbitMQ的简单实现
RabbitMQ的安装:
1.安装RabbitMQ软件:
windows环境下安装RabbitMQ(超详细)_windows安装rabbitmq-CSDN博客
java依赖:
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.20.0</version>
</dependency>
2.开启管理界面及配置:
- 默认端口号:5672
- 图形化界面地址: http://127.0.0.1:15672
- 登陆名: guest 密码:guest
- 配置文件:
3.启动服务及基础配置:
RabbitMQ简单模式:
RabbitMQ生产者:
package org.example.producer;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class SimpleProducer {public void publishMessage() throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory factory=new ConnectionFactory();//2.设置参数factory.setHost("127.0.0.1");//ip 默认值localhostfactory.setPort(5672);//端口 默认值5672factory.setVirtualHost("/itcast");//虚拟机 默认值/factory.setUsername("heima");factory.setPassword("heima");//3.创建链接 ConnectionConnection connection=factory.newConnection();//4.创建ChannelChannel channel=connection.createChannel();//5.创建队列Queue/*queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)queue: 队列名称durable: 是否持久化,是的话,mq重启后,数据还在。exclusive:是否独占,只能有一个消费者监听队列当connection关闭时,是否删除队列autoDelete: 是否自动删除,当没有consumer时,自动删除。arguments: 参数*/channel.queueDeclare("hello_world",true,false,false,null);//6.发送消息/*basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body) texchange:交换机名称,简单模式下交换机会使用默认的“”routingKey: 路由名称props: 配置消息body: 发送消息数据*/String body="hello rabbitMQ!";channel.basicPublish("","hello_world",null,body.getBytes());//7.释放资源channel.close();connection.close();}
}
生产者生产一个消息:
RabbitMQ消费者:
package org.example.consumer;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class SimpleConsumer {public static void main(String[] args) throws IOException, TimeoutException {SimpleConsumer producer=new SimpleConsumer();producer.consumerMessage();}public void consumerMessage() throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory factory=new ConnectionFactory();//2.设置参数factory.setHost("127.0.0.1");//ip 默认值localhostfactory.setPort(5672);//端口 默认值5672factory.setVirtualHost("/");//虚拟机 默认值/factory.setUsername("guest");factory.setPassword("guest");//3.创建链接 ConnectionConnection connection=factory.newConnection();//4.创建ChannelChannel channel=connection.createChannel();//5.创建队列Queue/*queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)queue: 队列名称durable: 是否持久化,是的话,mq重启后,数据还在。exclusive:是否独占,只能有一个消费者监听队列当connection关闭时,是否删除队列autoDelete: 是否自动删除,当没有consumer时,自动删除。arguments: 参数*/channel.queueDeclare("hello_world",true,false,false,null);//6.接收消息/*basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body) texchange:交换机名称,简单模式下交换机会使用默认的“”routingKey: 路由名称props: 配置消息body: 发送消息数据*/Consumer consumer = new DefaultConsumer(channel){/*此处重写该方法是为了打印回调结果回调方法,收到回调方法后,会自动执行该方法consumerTag: 标识envelope: 获取一些信息,交换机,路由keyproperties: 配置信息body: 数据*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumerTag:"+consumerTag);System.out.println("Exchange:"+envelope.getExchange());System.out.println("RoutingKey:"+envelope.getRoutingKey());System.out.println("properties:"+properties);System.out.println("body:"+new String(body));}};channel.basicConsume("hello_world",true,consumer);//7.不需要释放资源,因为要监听队列}
}
总结:
P:生产者,也就是要发送消息的程序。
C: 消费者,消息的接收者,监听队列等待消息到来。
Queue: 消息队列,类似邮箱,可以缓存消息,生产者向其中投递消息,消费者从中取出消息。
4.5种基本工作模式
(1)简单模式 Hello World:
一个生产者,一个队列,一个消费者,不需要设置交换机(使用默认交换机)
(2)工作队列模式Work Queue:
特点: 一个生产者,一个队列,多个消费者(竞争关系),不需要设置交换机(使用默认交换机),同一个消息,对消费者来说是竞争关系,只有一个消费者能消费。
应用场景:对于任务过重或任务较多的情况使用工作队列可以提高任务处理速度。
例如:短信服务部署多个,只需要有一个节点发送成功即可。
发送端:发布多条消息
//创建新的队列channel.queueDeclare("work_queues",true,false,false,null);
//发多条消息String body="hello rabbitMQ!";for(int i=0;i<10;i++){channel.basicPublish("","hello_world",null,(i+"-----"+body).getBytes());}
接收端:多个消费者竞争消费
创建两个消费者consumer1和consumer2,先启动两个消费者,再启动生产者生产消息,观察到两个消费者的消费过程。
消费者需要修改的地方:监听新的队列
channel.basicConsume("work_queues",true,consumer);
consumer1:
consumer2:
(3)发布Publish/subscribe:
需要设置类型为fanout的交换机,并且交换机和队列进行绑定,当消息发送到交换机后,交换机将消息发送到绑定队列。
订阅模式多了一个交换机概念Exchange,且过程略有变化:
P: 消息生产者,消息发送给交换机
C:消息的接收者,监听消息队列,等待消息到来
Queue: 消息队列,接收消息,缓存消息。
Exchange: 交换机,只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失。
发送端:引入交换机,并对交换机和队列进行绑定
创建交换机、两个队列、绑定交换机和队列;发送消息并释放资源
其中交换机类型:BuiltinExchangeType枚举
关键代码:
//4.创建ChannelChannel channel=connection.createChannel();//定义交换机/*exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException {exchange: 交换机名称type: 交换机类型DIRECT("direct"),:定向FANOUT("fanout"),:扇形(广播),发送消息到每一个消费者TOPIC("topic"),:通配符的方式HEADERS("headers");:参数匹配durable: 是否持久化autoDelete: 自动删除internal: 内部使用,一般falsearguments: 参数*/String exchangeName="test_fanout";channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);//5.创建队列QueueString queueName1="test_fanout_queue1";String queueName2="test_fanout_queue2";channel.queueDeclare(queueName1,true,false,false,null);channel.queueDeclare(queueName2,true,false,false,null);//绑定交换机/*queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments)queue:队列名称exchange:交换机名称routingKey:路由键,绑定规则如果交换机的类型为fanout,routingKey设置为“”arguments:参数*/channel.queueBind(queueName1,exchangeName,"");channel.queueBind(queueName2,exchangeName,"");//6.发送消息String body="日志信息: 张三调用了FindAll方法...日志级别:info...";channel.basicPublish(exchangeName,"",null,body.getBytes());//7.释放资源channel.close();connection.close();
接收端:创建两个消费者,分别监听两个队列
consumer1监听queue1,consumer2监听queue2
//Consumer1:
channel.basicConsume("test_fanout_queue1",true,consumer);//Consumer2:
channel.basicConsume("test_fanout_queue2",true,consumer);
启动生产者,消息生产正常,启动消费者1,启动消费者2,两个消费者都收到了消息。
consumer1:
consumer2:
(4)路由模式Routing:
需要设置类型为direct的交换机,交换机和队列进行绑定,并且指定routing key, 当发送消息到交换机后,交换机会根据routing key 将消息发送到对应的队列。
队列与交换机的绑定,不能是任意绑定,而是根据routing key绑定队列,消息根据绑定来决定分发到哪个队列中。
发送端:交换机绑定队列时,指定路由模式
注意:交换者的类型为:BuiltinExchangeType.TOPIC
关键代码:
//4.创建ChannelChannel channel=connection.createChannel();String exchangeName="test_topic";channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);//5.创建队列QueueString queueName1="test_routing_queue1";String queueName2="test_routing_queue2";channel.queueDeclare(queueName1,true,false,false,null);channel.queueDeclare(queueName2,true,false,false,null);//绑定交换机/*queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments)queue:队列名称exchange:交换机名称routingKey:路由键,绑定规则如果交换机的类型为fanout,routingKey设置为“”arguments:参数*/channel.queueBind(queueName1,exchangeName,"error");channel.queueBind(queueName2,exchangeName,"info");channel.queueBind(queueName2,exchangeName,"error");channel.queueBind(queueName2,exchangeName,"warning");//6.发送消息String body="日志信息: 张三调用了FindAll方法...日志级别:info...";channel.basicPublish(exchangeName,"info",null,("info:\t"+body).getBytes());channel.basicPublish(exchangeName,"error",null,("error:\t"+body).getBytes());channel.basicPublish(exchangeName,"warning",null,("warning:\t"+body).getBytes());
接收端:创建Consumer1类,Consumer2类。
consumer1 监听队列1, consumer2监听队列2
//consumer1
channel.basicConsume("test_routing_queue1",true,consumer);//consumer2
channel.basicConsume("test_routing_queue2",true,consumer);
启动生产者生产正常,启动消费者1,启动消费者2
consumer1:
consumer2:
(5)通配符模式Topic:
需要设置类型为topic的交换机,交换机和队列进行绑定,并且指定通配符方式的routing key, 当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列。
应用场景:如像根据日志级别监听某个子系统 系统名.error 消息,并入库
发送者:
指定路由方式,但是路由方式以通配符匹配的形式存在: #匹配一个 *匹配多个
channel.queueBind(queueName1,exchangeName,"#.error");channel.queueBind(queueName2,exchangeName,"order.*");channel.queueBind(queueName2,exchangeName,"*.*");//6.发送消息String body="日志信息: 张三调用了FindAll方法...日志级别:info...";channel.basicPublish(exchangeName,"order.info",null,("order.info:\t"+body).getBytes());channel.basicPublish(exchangeName,"order.error",null,("order.error:\t"+body).getBytes());channel.basicPublish(exchangeName,"A.error",null,("A.error:\t"+body).getBytes());
接收端:创建Consumer1类,Consumer2类。
consumer1监听队列1,consumer2监听队列2
//Consumer1
channel.basicConsume("test_topic_queue1",true,consumer);//Consumer2
channel.basicConsume("test_topic_queue2",true,consumer);
consumer1:
consumer2: