一、RabbitMQ下载并使用插件
1、查看RabbitMQ插件的文件路径
docker inspect rabbitmq
找到Mounts下面Name:rabbitmq_plugin的Source即为插件路径
使用 cd 进入到该目录
2、下载插件
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.13.0/rabbitmq_delayed_message_exchange-3.13.0.ez
3、使用插件
1、进入到容器内部
docker exec -it rabbitmq /bin/bash
2、启动插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
3、查看插件是否使用成功
rabbitmq-plugins list
4、重启容器
docker restart rabbitmq
二、使用延时队列
1、生产者
@Testpublic void test01(){//创建消息后置处理器对象MessagePostProcessor postProcessor = message -> {//设置消息过期时间/毫秒message.getMessageProperties().setHeader("x-delay","10000");return message;};rabbitTemplate.convertAndSend("exchange.test.delay","routing.key.test.delay","hello delay message by plug" + new SimpleDateFormat("HH:mm:ss").format(new Date()),postProcessor);}
2、消费者
@RabbitListener(queues={"queue.test.delay"})public void lisenter03(String data, Message message, Channel channel) throws IOException {log.info("接收到 : " + data);log.info("当前时间 : " + new SimpleDateFormat("HH:mm:ss").format(new Date()));channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}
三、其他(消息可靠性-生产者确认)
1、配置application.yml
spring:rabbitmq:host: localhostport: 5672username: adminpassword: 123456virtual-host: /publisher-confirm-type: correlated #交换机确认publisher-returns: true #队列确认
2、编写配置类
@Component
@Slf4j
public class RabbitMqConfig implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;//构造后执行@PostConstructpublic void InitRabbit(){rabbitTemplate.setReturnsCallback(this);rabbitTemplate.setConfirmCallback(this);}//发送到交换机执行的回调函数@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {log.info("confirm 回调 data : " + correlationData);log.info("confirm 回调 ack : " + ack);log.info("confirm 回调 cause : " + cause);}//发送到队列失败才执行的回调函数//使用延时队列时成功也会执行@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.info("returnedMessage 回调 消息 : " + returnedMessage.getMessage().getBody());log.info("returnedMessage 回调 状态码 : " + returnedMessage.getReplyCode());log.info("returnedMessage 回调 描述 : " + returnedMessage.getReplyText());log.info("returnedMessage 回调 交换机 : " + returnedMessage.getExchange());log.info("returnedMessage 回调 路由键 : " + returnedMessage.getRoutingKey());}
}
四、延时队列的其他实现思路
可以通过配置消息的过期时间和死信队列,消费者监听死信队列,同样可以实现延时消息的效果