目录
- 1. 初识RabbitMQ
- 2. AMQP
- 3.RabbitMQ的极速入门
- 4. Exchange(交换机)详解
- 4.1 Direct Exchange
- 4.2 Topic Exchange
- 4.3 Fanout Exchange
- 5. Message 消息
1. 初识RabbitMQ
RabbitMQ 是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,RabbitMQ是使用 Erlang语言来编写的,并且RabbitMQ是基于AMQP协议的
RabbitMQ的优点:
- 开源、性能优秀、稳定性保障
- 提供可靠性消息投递模式(confirm)、返回模式(return)
- 与SpringAMQP完美的整合、API丰富
- 集群模式丰富,表达式配置,HA模式,镜像队列模型
- 保证数据不丢失的前提下做到高可靠性、可用性
RabbitMQ官网
RabbitMQ的整体架构:
RabbitMQ的消息流转:
2. AMQP
AMQP全称: Advanced Message Queuing Protocol
AMQP翻译: 高级消息队列协议
AMQP定义: 是具有现代特征的二进制协议。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计
AMQP核心概念:
- Server:又称Broker,接受客户端的连接,实现AMQP实体服务
- Connection:连接,应用程序与Broker的网络连接
- Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道。客户端可建立多个Channel,每个Channel代表一个会话任务
- Message:消息,服务器和应用程序之间传送的数据,由Properties和Body组成。Properties可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body则是消息体的内容
- Virtual host:虚拟地址,用于进行逻辑隔离,最上层的消息路由。同一个Virtual Host里面不能有相同名称的Exchange或Queue
- Exchange:交换机,接收消息,根据路由键转发消息到绑定的队列
- Binding:Exchange和Queue之间的虚拟连接,binding中可以包含routing key
- Routing key:一个路由规则,虚拟机可用它确定如何路由一个特定消息
- Queue:也称为Message Queue,消息队列,保存消息并将它们转发给消费者
3.RabbitMQ的极速入门
后台启动: ./rabbitmq start &
关闭: ./rabbitmqctl stop
节点状态: ./rabbitmqctl status
管控台: http://ip:15672
RabbitMQ生产消费快速入门:
环境: springboot+jdk1.7+rabbitmq3.6.5 (Maven依赖配置)
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>1.5.9.RELEASE</version></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>3.6.5</version></dependency></dependencies>
public class Procuder {public static void main(String[] args) throws Exception {//1.创建一个ConnectionFactory 并进行配置ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setHandshakeTimeout(20000);//2.通过连接工厂创建连接Connection connection = connectionFactory.newConnection();//3.通过Connection 创建一个 ChannelChannel channel = connection.createChannel();/*** basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)* exchange:指定交换机 不指定 则默认 (AMQP default交换机) 通过routingkey进行匹配 * props 消息属性* body 消息体*///4.通过Channel发送数据for(int i = 0; i < 5; i++){System.out.println("生产消息:" + i);String msg = "Hello RabbitMQ" + i;channel.basicPublish("", "test", null, msg.getBytes());}//5.记得关闭相关的连接channel.close();connection.close();}
}
public class Consumer {public static void main(String[] args) throws Exception{//1.创建一个ConnectionFactory 并进行配置ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setHandshakeTimeout(20000);//2.通过连接工厂创建连接Connection connection = connectionFactory.newConnection();//3.通过Connection 创建一个 ChannelChannel channel = connection.createChannel();//4. 声明创建一个队列String queueName = "test";/*** durable 是否持久化* exclusive 独占的 相当于加了一把锁*/channel.queueDeclare(queueName,true,false,false,null);//5.创建消费者QueueingConsumer queueingConsumer = new QueueingConsumer(channel);//6.设置channel/*** ACK: 当一条消息从生产端发到消费端,消费端接收到消息后会马上回送一个ACK信息给broker,告诉它这条消息收到了* autoack: * true 自动签收 当消费者一收到消息就表示消费者收到了消息,消费者收到了消息就会立即从队列中删除。* false 手动签收 当消费者收到消息在合适的时候来显示的进行确认,说我已经接收到了该消息了,RabbitMQ可以从队列中删除该消息了* */channel.basicConsume(queueName, true, queueingConsumer);//7.获取消息while(true){Delivery delivery = queueingConsumer.nextDelivery();String msg = new String(delivery.getBody());System.err.println("消费端:" + msg);//Envelope envelope = delivery.getEnvelope();}}
}
4. Exchange(交换机)详解
Exchange: 接收消息,并根据路由键转发消息所绑定的队列
交换机属性:
- Name: 交换机名称
- Type: 交换机类型 diect、topic、fanout、headers
- Durability: 是否需要持久化,true为持久化
- AutoDelete: 当最后一个绑定到Exchange的队列删除后,自动删除该Exchange
- Internal: 当前Exchange是否用于RabbitMQ内部使用,默认为false (百分之99的情况默认为false 除非对Erlang语言较了解,做一些扩展)
- Arguments: 扩展参数, 用于扩展AMQP协议可自定化使用
4.1 Direct Exchange
所有发送到Direct Exchange的消息被转发到RouteKey指定的Queue
注意:Direct模式可以使用RabbitMQ自带的Exchange: default Exchange,所以不需要将Exchange进行任何绑定(binding)操作,消息传递时,RoutingKey必须完全匹配才会被队列接收,否则该消息会被抛弃
public class ProducerDirectExchange {public static void main(String[] args) throws Exception {//1.创建ConnectionFactoryConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");//2.创建ConnectionConnection connection = connectionFactory.newConnection();//3.创建ChannelChannel channel = connection.createChannel();//4.声明String exchangeName = "test_direct_exchange";String routingKey = "test.direct";//5.发送String msg = "Hello World RabbitMQ4 Direct Exchange Message";channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());}
}
public class ConsumerDirectExchange {public static void main(String[] args) throws Exception{ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setHandshakeTimeout(20000);connectionFactory.setAutomaticRecoveryEnabled(true);connectionFactory.setNetworkRecoveryInterval(3000);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();//声明String exchangeName = "test_direct_exchange";String exchangeType = "direct";String queueName = "test_direct_queue";String routingKey = "test.direct";//表示声明了一个交换机channel.exchangeDeclare(exchangeName, exchangeType,true,false,false,null);//表示声明了一个队列channel.queueDeclare(queueName,false,false,false,null);//建立一个绑定关系channel.queueBind(queueName, exchangeName, routingKey);//durable 是否持久化消息QueueingConsumer consumer = new QueueingConsumer(channel);//参数:队列名称,是否自动ACK,Consumerchannel.basicConsume(queueName, true, consumer);//循环获取消息while(true){//获取消息,如果没有消息,这一步将会一直阻塞Delivery delivery = consumer.nextDelivery();String msg = new String(delivery.getBody());System.out.println("收到消息:" + msg);}}
}
4.2 Topic Exchange
所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定Topic的Queue上
Exchange将RouteKey和某Topic进行模糊匹配,此时队列需要绑定一个Topic
注意:可以使用通配符进行匹配
符号 # 匹配一个或多个词
符号 * 匹配不多不少一个词
例如: "log.#" 能够匹配到 “log.info.oa”
"log.*" 只会匹配到 "log.err"
public class ProducerTopicExchange {public static void main(String[] args) throws Exception {//1.创建ConnectionFactoryConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setHandshakeTimeout(20000);//2.创建ConnectionConnection connection = connectionFactory.newConnection();//3.创建ChannelChannel channel = connection.createChannel();//4.声明String exchangeName = "test_topic_exchange";String routingKey1 = "user.save";String routingKey2 = "user.update";String routingKey3 = "user.delete.abc";//5.发送String msg = "Hello World RabbitMQ4 Direct Exchange Message";channel.basicPublish(exchangeName, routingKey1, null, msg.getBytes());channel.basicPublish(exchangeName, routingKey2, null, msg.getBytes());channel.basicPublish(exchangeName, routingKey3, null, msg.getBytes());}
}
public class ConsumerTopicExchange {public static void main(String[] args) throws Exception{ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setHandshakeTimeout(20000);connectionFactory.setAutomaticRecoveryEnabled(true);connectionFactory.setNetworkRecoveryInterval(3000);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();//声明String exchangeName = "test_topic_exchange";String exchangeType = "topic";String queueName = "test_topic_queue";String routingKey = "user.#";//表示声明了一个交换机channel.exchangeDeclare(exchangeName, exchangeType,true,false,false,null);//表示声明了一个队列channel.queueDeclare(queueName,false,false,false,null);//建立一个绑定关系channel.queueBind(queueName, exchangeName, routingKey);//durable 是否持久化消息QueueingConsumer consumer = new QueueingConsumer(channel);//参数:队列名称,是否自动ACK,Consumerchannel.basicConsume(queueName, true, consumer);//循环获取消息while(true){//获取消息,如果没有消息,这一步将会一直阻塞Delivery delivery = consumer.nextDelivery();String msg = new String(delivery.getBody());System.out.println("收到消息:" + msg);}}
}
4.3 Fanout Exchange
不处理路由键,只需要简单的将队列绑定到交换机上
发送到交换机的消息都会被转发到与该交换机绑定的所有队列上
所以Fanout交换机转发消息是最快的
public class ProducerFanoutExchange {public static void main(String[] args) throws Exception {//1.创建ConnectionFactoryConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setHandshakeTimeout(20000);//2.创建ConnectionConnection connection = connectionFactory.newConnection();//3.创建ChannelChannel channel = connection.createChannel();//4.声明String exchangeName = "test_fanout_exchange";//5.发送for(int i = 0; i < 10 ; i++){String msg = "Hello World RabbitMQ4 Direct Exchange Message";channel.basicPublish(exchangeName, "", null, msg.getBytes());}channel.close();connection.close();}
}
public class ConsumerFanoutExchange {public static void main(String[] args) throws Exception{ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setHandshakeTimeout(20000);connectionFactory.setAutomaticRecoveryEnabled(true);connectionFactory.setNetworkRecoveryInterval(3000);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();//声明String exchangeName = "test_fanout_exchange";String exchangeType = "fanout";String queueName = "test_topic_queue";//无需指定路由key String routingKey = "";//表示声明了一个交换机channel.exchangeDeclare(exchangeName, exchangeType,true,false,false,null);//表示声明了一个队列channel.queueDeclare(queueName,false,false,false,null);//建立一个绑定关系channel.queueBind(queueName, exchangeName, routingKey);//durable 是否持久化消息QueueingConsumer consumer = new QueueingConsumer(channel);//参数:队列名称,是否自动ACK,Consumerchannel.basicConsume(queueName, true, consumer);//循环获取消息while(true){//获取消息,如果没有消息,这一步将会一直阻塞Delivery delivery = consumer.nextDelivery();String msg = new String(delivery.getBody());System.out.println("收到消息:" + msg);}}
}
5. Message 消息
服务器与应用程序之间传递的数据,本质上就是一段数据,由Properties和Body组成
常用属性:delivery mode、headers (自定义属性)
其他属性:content_type、content_encoding、priority、expiration
消息的properties属性用法示例:
public class Procuder {public static void main(String[] args) throws Exception {//1.创建一个ConnectionFactory 并进行配置ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setHandshakeTimeout(20000);//2.通过连接工厂创建连接Connection connection = connectionFactory.newConnection();//3.通过Connection 创建一个 ChannelChannel channel = connection.createChannel();Map<String,Object> headers = new HashMap<>();headers.put("my1", "111");headers.put("my2", "222");//10秒不消费 消息过期移除消息队列AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().deliveryMode(2).contentEncoding("utf-8").expiration("10000").headers(headers).build();//4.通过Channel发送数据for(int i = 0; i < 5; i++){System.out.println("生产消息:" + i);String msg = "Hello RabbitMQ" + i;channel.basicPublish("", "test", properties, msg.getBytes());}//5.记得关闭相关的连接channel.close();connection.close();}
}
public class Consumer {public static void main(String[] args) throws Exception{//1.创建一个ConnectionFactory 并进行配置ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setHandshakeTimeout(20000);//2.通过连接工厂创建连接Connection connection = connectionFactory.newConnection();//3.通过Connection 创建一个 ChannelChannel channel = connection.createChannel();//4. 声明创建一个队列String queueName = "test";channel.queueDeclare(queueName,true,false,false,null);//5.创建消费者QueueingConsumer queueingConsumer = new QueueingConsumer(channel);//6.设置channelchannel.basicConsume(queueName, true, queueingConsumer);//7.获取消息while(true){Delivery delivery = queueingConsumer.nextDelivery();String msg = new String(delivery.getBody());System.err.println("消费端:" + msg);Map<String, Object> headers = delivery.getProperties().getHeaders();System.err.println("headers value:" + headers.get("my1"));}}
}