视频:
MQ基础-01.RabbitMQ课程介绍_哔哩哔哩_bilibilihttps://www.bilibili.com/video/BV1mN4y1Z7t9?p=1&vd_source=d0ea58f1127eed138a4ba5421c577eb1
一、RabbitMQ简介
1.同步调用
优势:时效性强,等待结果后才返回
劣势:拓展性差,性能下降,级联失败问题
2.异步调用
异步调用就是基于消息通知的方式,一般含有三个角色
(1)消息发送者:投递消息的人,原来的调用方
(2)消息代理:管理、暂存、转发消息,可以理解微信服务器
(3)消息接受者:接收和处理消息的人,原来服务提供方
Broker是消息代理
二.RabbitMQ的安装
RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网地址:RabbitMQ: One broker to queue them all | RabbitMQhttps://www.rabbitmq.com/
三.RabbitMQ入门
(1)登录RabbitMQ后添加队列
(2)交换机先绑定队列名字
(3)交换机发送消息给队列
队列可以查看接收到的消息
四、数据隔离
将Virtual host切换为/
(1)新建一个用户
(2)为用户创建virtual host
(3)测试不同virtual host直接数据隔离现象,通过修改virtual host即可
五、Java客户端
(1)入门示例
1.引入依赖
<!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
2.配置RabbitMQ服务端信息(消费者和生产者都需要配置)
spring:rabbitmq:virtual-host: /hamllport: 5672host: 192.168.92.136username: hmallpassword: 123
3.消息发送方
package cn.itcast.mq.helloworld;import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testvoid testSendMessageQueue(){String queueName = "simple.queue";String msg = "Hello,amqp";rabbitTemplate.convertAndSend(queueName,msg);}}
4.消息接收方(不断接收消息)
package cn.itcast.mq.listeners;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class MqListener {@RabbitListener(queues = "simple.queue")public void listenSimpleQueue(String msg){System.out.println("消费者收到消息:"+msg);}
}
(2)消费者消息推送限制
(3)Fanout交换机
1.创建hmall.fanout交换机,绑定fanout.queue1和fanout.queue2
2.消息发送方
@Testvoid testSendFanout(){String exchangeName = "hmall.fanout";String msg = "Hello,everyone!";rabbitTemplate.convertAndSend(exchangeName,null,msg);}
3.消息接收方
@RabbitListener(queues = "fanout.queue1")public void listenFanoutQueue(String msg) throws InterruptedException {System.out.println("消费者1收到消息:"+msg);}@RabbitListener(queues = "fanout.queue2")public void listenFanout2Queue(String msg) throws InterruptedException {System.err.println("消费者2收到消息:....."+msg);}
(4)Direct交换机
注意:Direct交换机绑定队列时配置Routing Key
如下图所示:
绑定queue1要配置blue和red的Routing Key,而绑定queue2要配置yellow和red的Routing Key
1.创建hmall.direct交换机,绑定direct.queue1和direct.queue2
2.消息发送方
@Testvoid testSendDirect(){String exchangeName = "hmall.direct";String msg = "Hello,every Direct!";rabbitTemplate.convertAndSend(exchangeName,"blue",msg);}
3.消息接收方
@RabbitListener(queues = "direct.queue1")public void listenDirectQueue(String msg) throws InterruptedException {System.out.println("消费者1收到消息:"+msg);}@RabbitListener(queues = "direct.queue2")public void listenDirect2Queue(String msg) throws InterruptedException {System.err.println("消费者2收到消息:....."+msg);}
(5)Topic交换机
1.Topic交换机绑定队列
注意:Topic交换机绑定队列时配置Routing Key
2. 消息发送者
@Testvoid testSendTopic(){String exchangeName = "hmall.topic";String msg = "Hello,every Topic!";rabbitTemplate.convertAndSend(exchangeName,"china.hello",msg);}
3.消息接收方
@RabbitListener(queues = "topic.queue1")public void listenTopicQueue(String msg){System.out.println("消费者1收到消息:"+msg);}@RabbitListener(queues = "topic.queue2")public void listenTopicQueue2(String msg){System.err.println("消费者2收到消息:....."+msg);}
(6)声明队列和交换机方式一
两种创建交换机、队列、和绑定队列的方式
package cn.itcast.mq.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class FanoutConfig {@Beanpublic FanoutExchange fanoutExchange(){
// ExchangeBuilder.fanoutExchange("hmall.fanout").build();return new FanoutExchange("hmall.fanout1");}@Beanpublic Queue fanoutQueue3(){
// QueueBuilder.durable("fanout.queue1").build();return new Queue("fanout.queue3");}@Beanpublic Binding fanoutBinding3(Queue fanoutQueue3,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange);}@Beanpublic Queue fanoutQueue4(){return new Queue("fanout.queue4");}@Beanpublic Binding fanoutBinding4(){return BindingBuilder.bind(fanoutQueue4()).to(fanoutExchange());}
}
(7)声明队列和交换机方式二
示例代码:
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1",durable = "true"),exchange = @Exchange(name = "hmall.direct",type = ExchangeTypes.DIRECT),key = {"red","blue"}))public void listenDirectQueue(String msg) throws InterruptedException {System.out.println("消费者1收到消息:"+msg);}@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "direct.queue2",durable = "true"),exchange = @Exchange(name = "hmall.direct",type = ExchangeTypes.DIRECT),key = {"red","yellow"}))public void listenDirect2Queue(String msg) throws InterruptedException {System.err.println("消费者2收到消息:....."+msg);}
(8)消息转换器
1.添加一个队列,名为object.queue
2.编写单元测试,向队列中发送一条消息,消息的类型为Map
@Testvoid testSendObject(){Map<String, Object> msg = new HashMap<>(2);msg.put("name","Jack");msg.put("age",21);rabbitTemplate.convertAndSend("object.queue",msg);}
3.打开控制台,发现发送来的消息是一串乱码,解决方式如下:
3.1引入依赖:
<!-- Jackson--><dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId></dependency>
3.2配置MessageConverter
@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}