RabbitMQ安装及使用笔记
RabbitMQ是一个开源的消息代理软件,它实现了高级消息队列协议(AMQP),用于在分布式系统中进行消息传递。
1.安装
利用docker load命令加载mq镜像 docker load -i mq.tar
基于Docker来安装RabbitMQ,使用下面的命令。
docker run \-e RABBITMQ_DEFAULT_USER=dong \-e RABBITMQ_DEFAULT_PASS=dong97 \-v mq-plugins:/plugins \--name mq \--hostname mq \-p 15672:15672 \-p 5672:5672 \--network dong-net \-d \rabbitmq:3.8-management
安装完成后,我们访问 http://192.168.196.1:15672即可看到管理控制台。首次访问需要登录,默认的用户名和密码在配置文件中已经指定了。
2.RabbitMQ对应的架构如图
其中包含几个概念:
- publisher:生产者,也就是发送消息的一方
- consumer:消费者,也就是消费消息的一方
- queue:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理
- exchange:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。
- virtual host:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue
3.Demo工程
配置SpringAMQP相关的依赖
<!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
为了方便测试,在控制台新建一个队列:simple.queue
3.1.1消息发送
配置MQ地址,在publisher服务的application.yml中添加配置:
spring:rabbitmq:host: 192.168.196.1 # 你的虚拟机IPport: 5672 # 端口virtual-host: /mall # 虚拟主机username: mall # 用户名password: 123 # 密码
3.1.2消息发送
然后在publisher服务中编写测试类SpringAmqpTest,并利用RabbitTemplate实现消息发送:
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSimpleQueue() {// 队列名称String queueName = "simple.queue";// 消息String message = "hello, spring amqp!";// 发送消息rabbitTemplate.convertAndSend(queueName, message);}
}
3.2.1消息接收
配置MQ地址,在consumer服务的application.yml中添加配置:
spring:rabbitmq:host: 192.168.196.1 # 你的虚拟机IPport: 5672 # 端口virtual-host: /mall # 虚拟主机username: mall # 用户名password: 123 # 密码
3.2.2消息接收
在consumer服务的com.dong.consumer.listener包中新建一个类SpringRabbitListener,代码如下
@Component
public class SpringRabbitListener {// 利用RabbitListener来声明要监听的队列信息// 将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。// 可以看到方法体中接收的就是消息体的内容@RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage(String msg) throws InterruptedException {System.out.println("spring 消费者接收到消息:【" + msg + "】");}
}
4.WorkQueues模型
Work queues,任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。
当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。
此时就可以使用work 模型,多个消费者共同处理消息处理,消息处理的速度就能大大提高了。
为了方便测试,在控制台新建一个队列:work.queue
4.1消息发送
循环发送,模拟大量消息堆积现象。
在publisher服务中的SpringAmqpTest类中添加一个测试方法:
/*** workQueue* 向队列中不停发送消息,模拟消息堆积。*/
@Test
public void testWorkQueue() throws InterruptedException {// 队列名称String queueName = "work.queue";// 消息String message = "hello, message_";for (int i = 0; i < 50; i++) {// 发送消息,每20毫秒发送一次,相当于每秒发送50条消息rabbitTemplate.convertAndSend(queueName, message + i);Thread.sleep(20);}
}
4.2消息接收
模拟多个消费者绑定同一个队列,我们在consumer服务的SpringRabbitListener中添加2个新的方法:
@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(20);
}@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println("消费者2接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(200);
}
注意到这两消费者,都设置了Thead.sleep,模拟任务耗时:
- 消费者1 sleep了20毫秒,相当于每秒钟处理50个消息
- 消费者2 sleep了200毫秒,相当于每秒处理5个消息
4.3能者多劳
消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。导致1个消费者空闲,另一个消费者忙的不可开交。没有充分利用每一个消费者的能力,最终消息处理的耗时远远超过了1秒。这样显然是有问题的。
在spring中有一个简单的配置,可以解决这个问题。修改consumer服务的application.yml文件
spring:rabbitmq:listener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息