目录
一、实现消息可靠性投递
1.1、消息生产者端确认机制
1.2、备份交换机
1.3、消费端确认机制
二、消费端限流设置
三、消息超时设置
3.1、从队列设置全局超时时间
3.2、设置消息本身超时时间
四、死信
4.1、消费端拒绝接收消息
4.1.1、创建死信交换机与队列
4.1.2、创建常规交换机与队列
4.2、消息数量超过队列容纳限度
五、延迟队列
5.1、使用死信队列实现
5.2、使用插件实现
5.3、创建交换机与队列
六、事务消息
七、优先级队列
一、实现消息可靠性投递
1.1、消息生产者端确认机制
修改yml
spring:rabbitmq:host: 192.168.200.110port: 5672username: guestpassword: 123456virtual-host: / publisher-confirm-type: correlated #交换机确认publisher-returns: true #队列确认
@Configuration
@Slf4j
public class RabbitMQConfig implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void initRabbitTemplate(){rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnsCallback(this);}//消息发送到交换机成功或失败都会调用@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {log.info("correlationData:" + correlationData);log.info("ack:" + ack);log.info("cause:" + cause);}//发送到队列失败调用@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.info("消息主体:" + new String(returnedMessage.getMessage().getBody()));log.info("应答码:" + returnedMessage.getReplyCode());log.info("描述:" + returnedMessage.getReplyText());log.info("使用交换机:" + returnedMessage.getExchange());log.info("消息使用的路由键:" + returnedMessage.getRoutingKey());}
}
@SpringBootTest
public class MQTest {public static final String EXCHANGE_DIRECT = "exchange.direct.order";public static final String ROUTING_KEY = "order";@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void test1(){rabbitTemplate.convertAndSend(EXCHANGE_DIRECT,ROUTING_KEY,"你好,美羊羊");}
}
1.2、备份交换机
创建备份交换机
创建绑定队列
将原交换机与备份交换机绑定
1.3、消费端确认机制
修改yml
spring:rabbitmq:host: 192.168.200.110port: 5672username: guestpassword: 123456virtual-host: /listener:simple:acknowledge-mode: manual #手动确认
@RabbitListener(queues = {QUEUE_NAME})public void getMessage(String date, Message message, Channel channel) throws IOException {//获取当前deliveryTagIDlong deliveryTag = message.getMessageProperties().getDeliveryTag();try {log.info(" "+1 / 0);//成功返回ACK信息channel.basicAck(deliveryTag,false);log.info("接收消息为:" + date);} catch (Exception e) {//获取消息是否重复投递Boolean redelivered = message.getMessageProperties().getRedelivered();//失败返回NACK信息if (redelivered){//long var1,// boolean var3,// boolean var4 控制消息是否重新放回队列channel.basicNack(deliveryTag,false,false);}else {channel.basicNack(deliveryTag,false,true);}throw new RuntimeException(e);}}
}
二、消费端限流设置
只需要修改yml
spring:rabbitmq:host: 192.168.200.110port: 5672username: guestpassword: 123456virtual-host: /listener:simple:acknowledge-mode: manual #手动确认prefetch: 1 #设置每次从队列中读取消息数
三、消息超时设置
3.1、从队列设置全局超时时间
3.2、设置消息本身超时时间
@Testpublic void test4(){//创建消息后置处理器MessagePostProcessor messagePostProcessor = message -> {//设置过期时间,单位毫秒message.getMessageProperties().setExpiration("10000");return message;};rabbitTemplate.convertAndSend(EXCHANGE_DIRECT,ROUTING_KEY,"a",messagePostProcessor);}
四、死信
4.1、消费端拒绝接收消息
4.1.1、创建死信交换机与队列
正常创建绑定即可
4.1.2、创建常规交换机与队列
创建常规队列注意事项
4.2、消息数量超过队列容纳限度
@Testpublic void test5(){for (int i = 1; i <=20 ; i++) {rabbitTemplate.convertAndSend(EXCHANGE_DIRECT_NORMAL,ROUTING_KEY_NORMAL,"a"+i);}}
五、延迟队列
5.1、使用死信队列实现
5.2、使用插件实现
docker inspect rabbitmq
下载的插件放入source后的目录
进入容器内部
docker exec -it rabbitmq /bin/bash
启动插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
退出容器
exit
重启容器
docker restart rabbitmq
5.3、创建交换机与队列
队列正常创建,无需参数设置
测试代码:
public static final String EXCHANGE_DIRECT_DELAY = "exchange.delay";public static final String ROUTING_KEY_DELAY = "delay";
@Testpublic void test6(){//创建消息后置处理器MessagePostProcessor messagePostProcessor = message -> {//设置过期时间,单位毫秒//必须安装启动延迟插件设置才生效message.getMessageProperties().setHeader("x-delay","10000");return message;};rabbitTemplate.convertAndSend(EXCHANGE_DIRECT_DELAY,ROUTING_KEY_DELAY,"你好,插件" + new SimpleDateFormat("HH:mm:ss").format(new Date()),messagePostProcessor);}
public static final String QUEUE_NAME_DELAY = "queue.delay";
@RabbitListener(queues = {QUEUE_NAME_DELAY})public void getMessageDelay(String date, Message message, Channel channel) throws Exception {//获取当前deliveryTagIDlong deliveryTag = message.getMessageProperties().getDeliveryTag();//成功返回ACK信息channel.basicAck(deliveryTag,false);log.info("接收消息为:" + date);log.info("当前时间为:" + new SimpleDateFormat("HH:mm:ss").format(new Date()));}
六、事务消息
在Java配置类进行设置
@Beanpublic RabbitTransactionManager transactionManager(CachingConnectionFactory cachingConnectionFactory){return new RabbitTransactionManager(cachingConnectionFactory);}@Beanpublic RabbitTemplate rabbitTemplate(CachingConnectionFactory cachingConnectionFactory){RabbitTemplate rabbitTemplate1 = new RabbitTemplate(cachingConnectionFactory);rabbitTemplate1.setChannelTransacted(true);return rabbitTemplate1;}
@Test@Transactional@Rollback(value = false)public void test7(){rabbitTemplate.convertAndSend(EXCHANGE_DIRECT,ROUTING_KEY,"异常前");int var = 3 / 0;rabbitTemplate.convertAndSend(EXCHANGE_DIRECT,ROUTING_KEY,"异常后");}
七、优先级队列
创建交换机与队列
@Testpublic void test8(){//创建消息后置处理器MessagePostProcessor messagePostProcessor = message -> {message.getMessageProperties().setPriority(3);return message;};rabbitTemplate.convertAndSend("exchange.priority","priority","第3级",messagePostProcessor);}