文章目录
- 1. 前言
- 2. 思路
- 3. 消息发送
- 4. 消息接收
- 4.1 能者多劳
- 总结
1. 前言
上一篇文章,实现了用 SpringBoot
实现RabbitMQ
的简单队列, 篇文章 操作 用SpringBoot
实现RabbitMQ
的WorkQueue
(SpringAMQP 实现WorkQueue)
Work queues,也被称为(Task queues),任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。
当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。
此时就可以使用 work 模型,多个消费者共同处理消息处理,速度就能大大提高了。
注意:不一定是两个消费者。
2. 思路
代码实现思路:
1.在 publisher 服务中定义测试方法,产生50条消息(每隔20ms发送一条),发送到 test2024.simple.queue
2.在 consumer 服务中定义两个消息监听者,都监听simple.queue队列
3.消费者1处理50条消息(每隔20ms处理一条),消费者2处理50条消息(每隔100ms处理一条)
3. 消息发送
发送消息前先把 服务接收 的服务停止调.
循环 50 次, 每次休息 20 ms , 发送消息到指定队列
@Test
public void test01() throws InterruptedException {// 声明队列名称String queueName = "test2024.simple.queue";String message = "work_";for (int i = 1; i <= 50; i++) {// 发送消息rabbitTemplate.convertAndSend(queueName, message + i);Thread.sleep(20);}
}
发送完消息后, 我们看到消息整整齐齐的在 队列里躺着.
4. 消息接收
来连个监听器监听指定的队列, 但是两个监听队列的消费时间不一样.
@Component
public class SpringRabbitListener {// 监听指定队列,Spring只要接收到该队列的消息就会接收消息@RabbitListener(queues = "test2024.simple.queue")public void rabbitListener1(String message){System.out.println("1号接收器-接收到消息:" + message);;}// 监听指定队列,Spring只要接收到该队列的消息就会接收消息@RabbitListener(queues = "test2024.simple.queue")public void rabbitListener2(String message){System.out.println("2号接收器-接收到消息:" + message);;}}
此时启动消费者服务,然后再启动发送消息.
可以看到 1 号接收器很快消费了 25 条, 然后 2 号接收器缓慢的完成自己的… 1 号接收器没有给 2 号接收器帮忙. 那么咋生产环境中 ,就会造成服务闲置的情况下不能及时消费消息.
说明:阐述上述原因是因为队列平均分配给每个消费者,即使当前消费者没有消费完,队列也会将消息分配给消费者。然后消费者一个一个消息消费,即使消费很快的消费者,消费完毕,而消费很慢的消费者一直在消费。这样很不合理。应该是哪个消费者消费快应该多消费。哪个消费者消费慢应该少消费。
4.1 能者多劳
在 Spring 中有一个简单的配置叫预取 prefetch
,可以解决这个问题。我们修改 consumer 服务的 application.yml 文件,添加配置:
listener:simple:prefetch: 1 # 消费者每次最多只能预取一条消息,当消费完这条消息后,才能获取下一个消息,这样做的好处是消费能力强的消费者,处理的消息就会更多===》能者多劳
做了如上配置后,启动消费者服务,再次发送消息:
我们看到消费能力强的1号接收器完成了更多的工作,这样就达到了能者多劳.
总结
Work模型的使用:
- 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
- 通过设置
prefetch
来控制消费者预取的消息数量