RabbitMQ三、springboot整合rabbitmq(消息可靠性、高级特性)

一、springboot整合RabbitMQ(jdk17)(创建两个项目,一个生产者项目,一个消费者项目)

  1. 上面使用原生JAVA操作RabbitMQ较为繁琐,很多的代码都是重复书写的,使用springboot可以简化代码的编写。

生产者项目

在这里插入图片描述

第一步:创建springboot工程,然后引入rabbitmq的依赖

<!-- RabbitMQ起步依赖 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

第二步:编写配置文件

spring:rabbitmq:host: 192.168.70.130  # 虚拟机的地址port: 5672username: adminpassword: adminvirtual-host: /#日志格式
logging:pattern:console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'

第三步:编写RabbitMQ的配置类

@Configuration
public class RabbitmqConfig1 {private final String EXCHANGE_NAME = "boot_exchange";private final String QUEUE_NAME = "boot_queue";private final String ROUTE_NAME = "boot_route";//创建交换机@Bean(EXCHANGE_NAME)public Exchange getExchange(){return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();}//创建队列@Bean(QUEUE_NAME)public Queue getQueue(){return new Queue(QUEUE_NAME);}//交换机和队列绑定@Beanpublic Binding exchangeBindQueue(@Qualifier(QUEUE_NAME) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with(ROUTE_NAME).noargs();}
}

第四步:编写发送消息测试类

//编写发送消息测试类
@SpringBootTest
public class RabbitmqTest {// 注入RabbitTemplate工具类@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendMessage(){/*** 发送消息* 参数1:交换机* 参数2:路由key* 参数3:要发送的消息*/rabbitTemplate.convertAndSend("boot_exchange","boot_route","你好我有一个毛衫");System.out.println("发送消息成功");}
}

消费者项目

在这里插入图片描述

第一步:创建springboot工程,然后引入rabbitmq的依赖

<!-- RabbitMQ起步依赖 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

第二步:编写配置文件

spring:rabbitmq:host: 192.168.70.130  # 虚拟机的地址port: 5672username: adminpassword: adminvirtual-host: /#日志格式
logging:pattern:console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'

第三步:编写消费者,监听队列

@Component
public class Consumer1 {/*** 监听队列* @param message* queues表示监听的队列的名称*/@RabbitListener(queues = "boot_queue")public void listener(String message){System.out.println("接受到消息 = " + message);}
}

4、rabbitmq的消息可靠性

  1. RabbitMQ消息投递的路径为:
    生产者--->交换机--->队列--->消费者

  2. 在RabbitMQ工作的过程中,每个环节消息都可能传递失败,那么RabbitMQ是如何监听消息是否成功投递的呢?

      1. 确认模式(confirm):可以监听消息是否从生产者成功传递到交换机
      1. 退回模式(return):可以监听消息是否从交换机成功传递到队列
      1. 消费者消息确认(Consumer Ack):可以监听消费者是否成功处理消息。

【一】rabbitmq的消息可靠性——确认模式

  1. 确认模式(confirm):可以监听消息是否从生产者成功传递到交换机
  2. 创建一个新的生产者项目,导入mq(上面的第一步操作)依赖进行开发:(也可以在原来的基础上修改信息)
    • 代码组成和上面的生产者项目是一样的,也是三步内容。
第一步:修改配置文件

只是添加了一句代码
在这里插入图片描述

spring:rabbitmq:host: 192.168.70.130port: 5672username: adminpassword: adminvirtual-host: / # 表示使用默认的virtual-host#开启确认模式publisher-confirm-type: correlated#????
logging:pattern:console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'
第二步:在生产者的配置类创建交换机和队列(RabbitMQ的配置类)
@Configuration
public class RabbitmqConfig2Confirm {public final String EXCHANGE_NAME = "confirm_exchange";public final String QUEUE_NAME = "confirm_queue";public final String ROUTING_NAME = "confirm_routing";//    创建交换机@Bean(EXCHANGE_NAME)public Exchange exchange(){return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();}//    创建队列@Bean(QUEUE_NAME)public Queue queue(){return QueueBuilder.durable(QUEUE_NAME).build();}
//    创建交换机和队列绑定@Beanpublic Binding exchangeBindQueue(@Qualifier(QUEUE_NAME) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with(ROUTING_NAME).noargs();}
}
第三步:编写测试类发生消息:生产者定义确认模式的回调方法(springboot的测试类,能够加载到第二步的配置类)
 @Testvoid testConfirm() {//回调确认rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {/**** @param correlationData 配置信息* @param b 是否成功,true 是 ,false 否* @param s 失败原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {if (b){System.out.println("发送成功");}else{System.out.println("发送失败,原因:"+s);}}});//发送消息/*** 发送消息* 参数1:交换机* 参数2:路由key* 参数3:要发送的消息*/rabbitTemplate.convertAndSend("confirm_exchange","confirm_routing","send message...confirm");}

由于rabbitmq的confirm确认模式是确认消息是否从生产者成功传递到交换机的,所以就没必要写消费者进行信息的消费了

  • 当我们执行测试类的时候,先执行rabbitTemplate.convertAndSend(“confirm_exchange”,“confirm_routing”,“send message…confirm”);,无论消息是否成功发送都会调用 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback()方法,如果发送成功则执行if语句的代码,如果发送失败则调用else语句的代码。
    • 根据执行的是if或者else的语句,就能判断消息是否成功传递到交换机了。

【二】rabbitmq的消息可靠性——退回模式

  1. 退回模式(return):可以监听消息是否从交换机成功传递到队列
  2. 创建一个新的生产者项目,导入mq(上面的第一步操作)依赖进行开发:(也可以在原来的基础上修改信息)
    • 代码组成和上面的生产者项目是一样的,也是三步内容。
第一步:修改配置文件

只是添加了一句
在这里插入图片描述

# rabbitmq???
spring:rabbitmq:host: 192.168.70.130port: 5672username: adminpassword: adminvirtual-host: /#开启确认模式publisher-confirm-type: correlated#开始回退模式publisher-returns: true#????
logging:pattern:console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'
第二步:编写配置类(RabbitMQ的配置类)
@Configuration
public class RabbitmqConfig3Return {public final String EXCHANGE_NAME = "return_exchange";public final String QUEUE_NAME = "return_queue";public final String ROUTING_NAME = "return_routing";
//    创建交换机@Bean(EXCHANGE_NAME)public Exchange exchange(){return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();}//    创建队列@Bean(QUEUE_NAME)public Queue queue(){return QueueBuilder.durable(QUEUE_NAME).build();}//    创建交换机和队列绑定@Beanpublic Binding exchangeBindQueue(@Qualifier(QUEUE_NAME) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with(ROUTING_NAME).noargs();}
}
第三步:编写测试类发生消息:生产者定义退回模式的回调方法(springboot的测试类,能够加载到第二步的配置类)
@Testvoid testReturnSendMessage(){
//        调用回退模式的回调方法,只有失败才会回调,成功不会回调哦
// 失败后将失败信息封装到参数中rabbitTemplate.setReturnsCallback(returned ->{Message message = returned.getMessage();System.out.println("消息对象:"+message);System.out.println("错误码:"+returned.getReplyCode());System.out.println("错误信息:"+returned.getReplyText());System.out.println("交换机:"+returned.getExchange());System.out.println("路由键:"+returned.getRoutingKey());});//        发送消息/*** 发送消息* 参数1:交换机* 参数2:路由key* 参数3:要发送的消息*/rabbitTemplate.convertAndSend("return_exchange","return_routing","send message...return");}

由于rabbitmq的return回退模式是确认消息是否从交换机成功传递到队列的,还没有传递到消费者,所以就没必要写消费者进行信息的消费了

  • 当我们执行测试类的时候,先执行rabbitTemplate.convertAndSend(“return_exchange”,“return_routing”,“send message…return”);,如果消息成功发送到队列上则不会调用 rabbitTemplate.setReturnsCallback方法,如果发送步成功则调用回调方法rabbitTemplate.setReturnsCallback,
    • 根据运行结果就可以知道在传递消息到队列上的时候哪里发生错误了

在这里插入图片描述

【三】rabbitmq的消息可靠性——消费者消息确认(Consumer Ack)

  1. 在RabbitMQ中,消费者接收到消息后会向队列发送确认签收的消息,只有确认签收的消息才会被移除队列。这种机制称为消费者消息确认(Consumer Acknowledge,简称Ack)
    • 类似快递员派送快递也需要我们签收,否则一直存在于快递公司的系统中。
  2. 消费者消息确认(Consumer Acknowledge,简称Ack)分为自动确认手动确认
    • 自动确认指消息只要被消费者接收到,无论是否成功处理消息,则自动签收,并将消息从队列中移除
    • 但是在实际开发中,收到消息后可能业务处理出现异常,那么消息就会丢失。此时需要设置手动签收,即在业务处理成功后再通知签收消息,如果出现异常,则拒签消息,让消息依然保留在队列当中。

● 自动确认:spring.rabbitmq.listener.simple.acknowledge=“none”
● 手动确认:spring.rabbitmq.listener.simple.acknowledge=“manual”

  1. 创建一个新的生产者项目和新的消费者项目,导入mq(上面的第一步操作)依赖进行开发:(也可以在原来的基础上修改信息)
    • 代码组成和上面的生产者项目是一样的,也是三步内容。
生产者项目:第一步:修改配置文件

不用修改

# rabbitmq???
spring:rabbitmq:host: 192.168.70.130port: 5672username: adminpassword: adminvirtual-host: /#开启确认模式publisher-confirm-type: correlated#开始回退模式publisher-returns: true#????
logging:pattern:console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'
生产者项目:第二步:编写配置类(RabbitMQ的配置类)
@Configuration
public class RabbitmqConfig4ACK {public final String EXCHANGE_NAME = "ack_exchange";public final String QUEUE_NAME = "ack_queue";public final String ROUTING_NAME = "ack_routing";
//    创建交换机@Bean(EXCHANGE_NAME)public Exchange exchange(){return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();}//    创建队列@Bean(QUEUE_NAME)public Queue queue(){return QueueBuilder.durable(QUEUE_NAME).build();}//    创建交换机和队列绑定@Beanpublic Binding exchangeBindQueue(@Qualifier(QUEUE_NAME) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with(ROUTING_NAME).noargs();}
}
生产者项目:第三步:编写测试类发生消息:(springboot的测试类,能够加载到第二步的配置类)
 @Testvoid testAck(){//        发送消息rabbitTemplate.convertAndSend("ack_exchange","ack_routing","send message...ack");}
消费者项目(自动确认):第一步:修改配置文件
  • 消费者消息确认——自动确认的配置文件
spring:rabbitmq:host: 192.168.70.130port: 5672username: adminpassword: adminvirtual-host: /#开动手动签收listener:simple:acknowledge-mode: none   # 默认就是自动确认
#????
logging:pattern:console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'

在这里插入图片描述

  • 自动签收模式就是:消息只要被消费者接收到,无论是否成功处理消息,则自动签收,并将消息从队列中移除。当我们拿到消息的时候,业务出现异常了,所以无法正确处理消息,导致消息丢失了。
消费者项目(自动确认):第二步:编写消费者类,监听队列
  • 自动确认的消费者类
@Component
public class AckConsumer {
//    自动签收@RabbitListener(queues = "ack_queue")public void listenMessage(Message message, Channel channel) throws IOException, InterruptedException, IOException {
//        获取消息String s = new String(message.getBody(), StandardCharsets.UTF_8);System.out.println(s);
//        TODO,处理事务
//        故意出错int i= 1/0;}}
消费者项目(手动确认):第一步:修改配置文件
  • 消费者消息确认——手动确认的配置文件
spring:rabbitmq:host: 192.168.70.130port: 5672username: adminpassword: adminvirtual-host: /#开动手动签收listener:simple:acknowledge-mode: manual  
#????
logging:pattern:console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'
消费者项目(手动确认):第二步:编写消费者类,监听队列
  • 手动确认
@Component
public class AckConsumer {//    手动签收@RabbitListener(queues = "ack_queue")public void listenMessage(Message message, Channel channel) throws IOException, InterruptedException, IOException {
// 消息投递序号,消息每次投递该值都会+1long deliveryTag = message.getMessageProperties().getDeliveryTag();try {
//            int i = 1/0; //模拟处理消息出现bugSystem.out.println("成功接受到消息:"+message);// 签收消息/*** 参数1:消息投递序号* 参数2:是否一次可以签收多条消息*/channel.basicAck(deliveryTag,true);}catch (Exception e){System.out.println("消息消费失败!");Thread.sleep(2000);// 拒签消息/*** 参数1:消息投递序号* 参数2:是否一次可以拒签多条消息* 参数3:拒签后消息是否重回队列*/channel.basicNack(deliveryTag,true,true);}}
}

在这里插入图片描述

在这里插入图片描述

  • 手动签收模式就是:如果出现异常,则拒签消息,让消息依然保留在队列当中。方便下次请求能够请求到这次因为异常而没有接收到的消息。

【四】RabbitMQ高级特性——消费端限流

在这里插入图片描述

  • 前面说过MQ可以对请求进行“削峰填谷”,即通过消费端限流的方式限制消息的拉取速度,达到保护消费端的目的。
  • 使用【三】rabbitmq的消息可靠性——消费者消息确认(Consumer Ack)的项目,消费者使用手动确认模式的代码即可(但是要修改配置文件)
第一步:先在生产者项目中,发送多个消息
@Testpublic void testLimitSendBatch() {// 发送十条消息for (int i = 0; i < 10; i++) {rabbitTemplate.convertAndSend("ack_exchange", "ack_routing", "这是第"+i+"条消息");}}
第二步:修改消费者项目的配置文件

最主要就是配置文件的修改:
在这里插入图片描述

spring:rabbitmq:host: 192.168.70.130port: 5672username: adminpassword: adminvirtual-host: /#开动手动签收listener:simple:acknowledge-mode: manual  #none是默认的prefetch: 5  # 每次消费者从队列拉取的消息数量(限制)#????
logging:pattern:console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'
第三步:重新编写消费者类
@Component
public class ConsumerLimit {
//    手动签收@RabbitListener(queues = "limit_queue")public void listenMessage(Message message, Channel channel) throws IOException, InterruptedException, IOException {
//        获取消息String s = new String(message.getBody(), StandardCharsets.UTF_8);System.out.println(s);//        模拟业务处理Thread.sleep(3000);long deliveryTag = message.getMessageProperties().getDeliveryTag();
// 手动签收channel.basicAck(deliveryTag,true);}}
  • 其实就是修改了消费者项目的配置文件,添加一条配置信息,限制消费者消息的拉取速度。
    在这里插入图片描述

【五】RabbitMQ高级特性——利用限流实现不公平分发

  1. 在RabbitMQ中,多个消费者监听同一条队列,则队列默认采用的轮询分发。但是在某种场景下这种策略并不是很好,例如消费者1处理任务的速度非常快,而其他消费者处理速度却很慢。此时如果采用公平分发,则消费者1有很大一部分时间处于空闲状态。此时可以采用不公平分发,即谁处理的快,谁处理的消息多。
  • 在【四】RabbitMQ高级特性——消费端限流的基础上,修改一消费者项目的配置文件,然后在消费者类中多写几个监听消息的方法(或者多写几个消费者类)。
第一步:修改消费者项目的配置文件

最主要就是配置文件的修改:
在这里插入图片描述

spring:rabbitmq:host: 192.168.70.130port: 5672username: adminpassword: adminvirtual-host: /#开动手动签收listener:simple:acknowledge-mode: manual  #none是默认的prefetch: 1  #  消费端最多拉取1条消息消费,这样谁处理的快谁拉取下一条消息,实现了不公平分发#????
logging:pattern:console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'
第二步:修改消费者类,编写多个监听方法
@Component
public class ConsumerUnfair {
//  消费者1@RabbitListener(queues = "ack_queue")public void listenMessage1(Message message, Channel channel) throws IOException, InterruptedException, IOException {
//        获取消息String s = new String(message.getBody(), StandardCharsets.UTF_8);System.out.println("消费者1"+s);Thread.sleep(3000);long deliveryTag = message.getMessageProperties().getDeliveryTag();
// 手动签收channel.basicAck(deliveryTag,true);}//    消费者2@RabbitListener(queues = "ack_queue")public void listenMessage2(Message message, Channel channel) throws IOException, InterruptedException, IOException {
//        获取消息String s = new String(message.getBody(), StandardCharsets.UTF_8);System.out.println("消费者2"+s);Thread.sleep(1000);long deliveryTag = message.getMessageProperties().getDeliveryTag();
// 手动签收channel.basicAck(deliveryTag,true);}// .......监听方法
}
  • 最主要的就是消费者项目的配置文件的修改: 配置消费端最多拉取1条消息消费,这样谁处理的快谁拉取下一条消息,实现了不公平分发。

【六】RabbitMQ高级特性——消息存活时间

  1. RabbitMQ可以设置消息的存活时间(Time To Live,简称TTL),当消息到达存活时间后还没有被消费,会被移出队列。RabbitMQ可以对队列的所有消息设置存活时间,也可以对某条消息设置存活时间。
  • 使用【三】rabbitmq的消息可靠性——消费者消息确认(Consumer Ack)的项目,消费者使用手动确认模式的代码
第一步:修改生产者项目的配置类

在这里插入图片描述

@Configuration
public class RabbitmqConfig7ttl {public final String EXCHANGE_NAME = "ack_exchange";public final String QUEUE_NAME = "ack_queue";public final String ROUTING_NAME = "ack_routing";
//    创建交换机@Bean(EXCHANGE_NAME)public Exchange exchange(){return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();}//    创建队列@Bean(QUEUE_NAME)public Queue queue(){return QueueBuilder.durable(QUEUE_NAME)
//                设置队列的超时的时间,单位是毫秒.ttl(10000).build();}//    创建交换机和队列绑定@Beanpublic Binding exchangeBindQueue(@Qualifier(QUEUE_NAME) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with(ROUTING_NAME).noargs();}
}
第二步:修改生产者项目的测试类

设置单条消息存活时间
在这里插入图片描述

 @Testpublic void testTtlSendBatch() {// 发送十条消息for (int i = 0; i < 100; i++) {if (i%5 == 0) {//设置消息属性MessageProperties messageProperties = new MessageProperties();//设置存活时间messageProperties.setExpiration("10000");// 创建消息对象(可以配置消息的一些配置)Message message = new Message(("这是第" + i + "条消息").getBytes(StandardCharsets.UTF_8), messageProperties);// 发送消息rabbitTemplate.convertAndSend("ack_exchange", "ack_routing", message);}else {rabbitTemplate.convertAndSend("ack_exchange", "ack_routing", "这是第" + i + "条消息");}}}
    1. 如果设置了单条消息的存活时间,也设置了队列的存活时间,以时间的为准。
    1. 消息过期后,并不会马上移除消息,只有消息消费到队列顶端时,才会移除该消息

【七】RabbitMQ高级特性——优先级队列

  1. 假设在电商系统中有一个订单催付的场景,即客户在一段时间内未付款会给用户推送一条短信提醒,但是系统中分为大型商家和小型商家。比如像苹果,小米这样大商家一年能给我们创造很大的利润,所以在订单量大时,他们的订单必须得到优先处理,此时就需要为不同的消息设置不同的优先级,此时我们要使用优先级队列。
  • 使用【三】rabbitmq的消息可靠性——消费者消息确认(Consumer Ack)的项目,消费者使用手动确认模式的代码
第一步:修改生产者项目的配置类

在这里插入图片描述

@Configuration
public class RabbitmqConfig8Priority {public final String EXCHANGE_NAME = "priority_exchange";public final String QUEUE_NAME = "priority_queue";public final String ROUTING_NAME = "priority_routing";
//    创建交换机@Bean(EXCHANGE_NAME)public Exchange exchange(){return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();}//    创建队列@Bean(QUEUE_NAME)public Queue queue(){return QueueBuilder.durable(QUEUE_NAME)
//                设置队列的优先级,值越大优先级越高,一般不超过10.maxPriority(10).build();}//    创建交换机和队列绑定@Beanpublic Binding exchangeBindQueue(@Qualifier(QUEUE_NAME) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with(ROUTING_NAME).noargs();}
}
第二步:修改生产者项目的测试
 @Testpublic void testPrioritySendBatch() {// 发送十条消息for (int i = 0; i < 100; i++) {if (i%5 == 0) {//设置消息属性MessageProperties messageProperties = new MessageProperties();
//             设置优先级messageProperties.setPriority(9);// 创建消息对象(可以配置消息的一些配置)Message message = new Message(("这是第" + i + "条消息").getBytes(StandardCharsets.UTF_8), messageProperties);// 发送消息rabbitTemplate.convertAndSend("priority_exchange", "priority_routing", message);}else {rabbitTemplate.convertAndSend("priority_exchange", "priority_routing", "这是第" + i + "条消息");}}}
  • 设置了消息的优先级,那么消费者项目在消费消息的时候就会优先消费等级高的消息。

【八】RabbitMQ高级特性——死信队列

  1. 在MQ中,当消息成为死信(Dead message)后,消息中间件可以将其从当前队列发送到另一个队列中,当前队列就是死信队列。而在RabbitMQ中,由于有交换机的概念,实际是将死信发送给了死信交换机(Dead Letter Exchange,简称DLX)。死信交换机和死信队列和普通的没有区别。
    在这里插入图片描述
  2. 消息成为死信的情况:
      1. 队列消息长度到达限制。
      1. 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;
      1. 消息到达存活时间未被消费。
生产者项目:第一步:修改配置文件
# rabbitmq???
spring:rabbitmq:host: 192.168.70.130port: 5672username: adminpassword: adminvirtual-host: /#开启确认模式publisher-confirm-type: correlated#开始回退模式publisher-returns: true#????
logging:pattern:console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'
生产者项目:第二步:编写配置类(RabbitMQ的配置类)
@Configuration
public class RabbitmqConfig9Dead {//    死信private final String DEAD_EXCHANGE = "dead_exchange";private final String DEAD_QUEUE = "dead_queue";private final String DEAD_ROUTING = "dead_routing";// 死信交换机@Bean(DEAD_EXCHANGE)public Exchange deadExchange(){return ExchangeBuilder.topicExchange(DEAD_EXCHANGE).durable(true).build();}// 死信队列@Bean(DEAD_QUEUE)public Queue deadQueue(){return QueueBuilder.durable(DEAD_QUEUE).build();}// 死信交换机绑定死信队列@Beanpublic Binding bindDeadQueue(@Qualifier(DEAD_EXCHANGE) Exchange exchange,@Qualifier(DEAD_QUEUE)Queue queue){return BindingBuilder.bind(queue).to(exchange).with(DEAD_ROUTING).noargs();}// 普通private final String NORMAL_EXCHANGE = "normal_exchange";private final String NORMAL_QUEUE = "normal_queue";private final String NORMAL_ROUTING = "normal_routing";// 普通交换机@Bean(NORMAL_EXCHANGE)public Exchange normalExchange(){return ExchangeBuilder.topicExchange(NORMAL_EXCHANGE).durable(true).build();}// 普通队列@Bean(NORMAL_QUEUE)public Queue normalQueue(){return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DEAD_EXCHANGE) // 绑定死信交换机.deadLetterRoutingKey(DEAD_ROUTING) // 死信队列路由关键字.ttl(10000) // 消息存活10s.maxLength(10) // 队列最大长度为10.build();}// 普通交换机绑定普通队列@Beanpublic Binding bindNormalQueue(@Qualifier(NORMAL_EXCHANGE) Exchange exchange,@Qualifier(NORMAL_QUEUE)Queue queue){return BindingBuilder.bind(queue).to(exchange).with(NORMAL_ROUTING).noargs();}
}
生产者项目:第三步:编写测试类发生消息:(springboot的测试类,能够加载到第二步的配置类)
@Test
public void testDlx(){// 存活时间过期后变成死信//     rabbitTemplate.convertAndSend("normal_exchange","normal_routing","测试死信");// 超过队列长度后变成死信//     for (int i = 0; i < 20; i++) {//       rabbitTemplate.convertAndSend("normal_exchange","normal_routing","测试死信");//     }// 消息拒签但不返回原队列后变成死信rabbitTemplate.convertAndSend("normal_exchange","normal_routing","测试死信");
}
消费者项目(手动确认):第一步:修改配置文件
  • 消费者消息确认——手动确认的配置文件
spring:rabbitmq:host: 192.168.70.130port: 5672username: adminpassword: adminvirtual-host: /#开动手动签收listener:simple:acknowledge-mode: manual  
#????
logging:pattern:console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'
消费者项目(手动确认):第二步:编写消费者类,监听队列
  • 手动确认
@Component
public class ConsumerDead {@RabbitListener(queues = "normal_queue")public void listenMessage1(Message message, Channel channel) throws IOException, InterruptedException, IOException {
//        获取消息String s = new String(message.getBody(), StandardCharsets.UTF_8);System.out.println("消费者1"+s);Thread.sleep(500);long deliveryTag = message.getMessageProperties().getDeliveryTag();
// 拒绝签收channel.basicNack(deliveryTag,true,false);}
  • 死信队列小结
      1. 死信交换机和死信队列和普通的没有区别
      1. 当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列
      1. 消息成为死信的三种情况:
        1. 队列消息长度到达限制;
        1. 消费者拒接消费消息,并且不重回队列;
        1. 原队列存在消息过期设置,消息到达超时时间未被消费;

【九】RabbitMQ高级特性——延迟队列

  1. 延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。
    • 例如:
        1. 下单后,30分钟未支付,取消订单,回滚库存。
        1. 新用户注册成功7天后,发送短信问候。
        • 实现方式:
            1. 定时器
            1. 延迟队列
              在这里插入图片描述
  • RabbitMQ中并未提供延迟队列功能,我们可以使用死信队列实现延迟队列的效果。
    在这里插入图片描述
    1. 延迟队列 指消息进入队列后,可以被延迟一定时间,再进行消费。
    1. RabbitMQ没有提供延迟队列功能,但是可以使用 : TTL + DLX 来实现延迟队列效果。
      在这里插入图片描述
第一步:创建springboot项目并添加依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId>
</dependency>
第二步:编写配置文件
spring:rabbitmq:host: 192.168.70.130port: 5672username: adminpassword: adminvirtual-host: /#开动手动签收listener:simple:acknowledge-mode: manual
# ????
logging:pattern:console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'
第三步:编写配置类
@Configuration
public class RabbitMQConfig {private final String DEAD_EXCHANGE = "order_expire_exchange";private final String DEAD_QUEUE = "order_expire_queue";private final String DEAD_ROUTING = "order_expire_routing";private final String ORDER_EXCHANGE = "order_exchange";private final String ORDER_QUEUE = "order_queue";private final String ORDER_ROUTING = "order_routing";// 死信交换机@Bean(DEAD_EXCHANGE)public Exchange deadExchange(){return ExchangeBuilder.topicExchange(DEAD_EXCHANGE).durable(true).build();}// 死信队列@Bean(DEAD_QUEUE)public Queue deadQueue(){return QueueBuilder.durable(DEAD_QUEUE).build();}// 死信交换机绑定死信队列@Beanpublic Binding bindDeadQueue(@Qualifier(DEAD_EXCHANGE) Exchange exchange, @Qualifier(DEAD_QUEUE)Queue queue){return BindingBuilder.bind(queue).to(exchange).with(DEAD_ROUTING).noargs();}// 普通交换机@Bean(ORDER_EXCHANGE)public Exchange normalExchange(){return ExchangeBuilder.topicExchange(ORDER_EXCHANGE).durable(true).build();}// 普通队列@Bean(ORDER_QUEUE)public Queue normalQueue(){return QueueBuilder.durable(ORDER_QUEUE).deadLetterExchange(DEAD_EXCHANGE) // 绑定死信交换机.deadLetterRoutingKey(DEAD_ROUTING) // 死信队列路由关键字.ttl(10000) // 消息存活10s(模拟30min超时).build();}// 普通交换机绑定普通队列@Beanpublic Binding bindNormalQueue(@Qualifier(ORDER_EXCHANGE) Exchange exchange,@Qualifier(ORDER_QUEUE)Queue queue){return BindingBuilder.bind(queue).to(exchange).with(ORDER_ROUTING).noargs();}
}
第四步:创建控制器,完成下单功能
@RestController
public class OrderController {//注入MQ@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/addOrder")public String addOrder(){//生成订单号String orderNumber = "2030061812251234";//在service层完成订单逻辑//将订单号发送到订单mq,30分钟过期进入死信队列,死信队列消费查询订单支付状态,做对应处理rabbitTemplate.convertAndSend("order_exchange","order_routing",orderNumber);return "下单成功! 您的订单号为 :"+orderNumber;}
}
第五步:创建消费者,监听消息
@Component
public class ListenerOrder {//监听订单过期队列@RabbitListener(queues = "order_expire_queue")public void orderListener(String orderId){System.out.println("orderId = " + orderId);//根据订单id查询订单状态是否支付/*** 监听死信队列的类,回去30min超时订单号,根据订单号查询订单的支付状态* 支付:走下一步流程* 未支付:关闭订单,库存回滚*/}
}
手动签收模式的结果
  • 在手动签收模式的时候,当我们启动项目,访问订单功能时,立刻生成了一个队列消息
    在这里插入图片描述
  • 然后因为是手动签收模式,所以在消息的存活时间过去了之后,成为了死信消息,所以被消息被拒收了,但是还存在队列中。
    在这里插入图片描述
自动签收模式的结果
spring:rabbitmq:host: 192.168.70.130port: 5672username: adminpassword: adminvirtual-host: /#开动自动签收listener:simple:acknowledge-mode: none   # 默认的
# ????
logging:pattern:console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'
  • 在自动签收模式的时候,当我们启动项目,访问订单功能时,立刻生成了一个队列消息
    在这里插入图片描述
  • 因为是自动签收的,所以消息过了存活时间之后就没了(自动确认指消息只要被消费者接收到,无论是否成功处理消息,则自动签收,并将消息从队列中移除)
    在这里插入图片描述

RabbitMQ一、RabbitMQ的介绍与安装(docker)

RabbitMQ二、RabbitMQ的六种模式

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/21080.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Vue3集成Phaser-飞机大战游戏(设计与源码)

文章目录 引言项目初始化游戏设计和结构游戏程序实现Vue页面嵌入PhaserPreloader 场景加载游戏场景功能实现功能类定义Boom爆炸类Bullet子弹类Enemy敌军类Player玩家类End游戏结束类 总结 更多相关内容可查看 引言 飞机大战&#xff08;也被称为射击游戏或空战游戏&#xff09…

轻松上手MYSQL:优化MySQL慢查询,让数据库起飞

​&#x1f308; 个人主页&#xff1a;danci_ &#x1f525; 系列专栏&#xff1a;《设计模式》《MYSQL应用》 &#x1f4aa;&#x1f3fb; 制定明确可量化的目标&#xff0c;坚持默默的做事。 ✨欢迎加入探索MYSQL慢查询之旅✨ &#x1f44b; 大家好&#xff01;我是你们的…

如何优雅简洁的使用YOLOv8

如何优雅简洁的使用YOLOv8 目录训练调用代码如何一键训练多个yamlexport模型测试多个yaml是否运行正常predict本文提供了 如何优雅简洁的使用YOLOv8 🗝️YOLOv8实战宝典--星级指南:从入门到精通,您不可错过的技巧   -- 聚焦于YOLO的 最新版本, 对颈部网络改进、添加局…

Crosslink-NX器件应用连载(11): 图像(数据)远程传输

作者&#xff1a;Hello&#xff0c;Panda 大家下午好&#xff0c;晚上好。这里分享一个Lattice Crosslink-NX器件实现图像或数据&#xff08;卫星数据、雷达数据、ToF传感器数据等&#xff09;远程传输的案例&#xff08;因为所描述的内容颇杂&#xff0c;晒图不好晒&#xff…

文件批量改后缀名,轻松实现TXT到DOCX格式转换,高效管理您的文件库!

文件处理与管理已成为我们日常生活和工作中不可或缺的一环。然而&#xff0c;面对海量的文件&#xff0c;如何高效地进行格式转换和管理&#xff0c;却成为了一道难题。今天&#xff0c;我们将为您揭晓一个神奇的解决方案——文件批量改后缀名功能&#xff0c;让您轻松实现TXT到…

【docker】docker的安装

如果之前安装了旧版本的docker我们需要进行卸载&#xff1a; 卸载之前的旧版本 卸载 # 卸载旧版本 sudo apt-get remove docker docker-engine docker.io containerd runc # 卸载历史版本 apt-get purge docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker…

linux文件共享之samba

1.介绍 Samba是一个开源文件共享服务&#xff0c;可以使linux与windows之间进行文件共享&#xff0c;可以根据不同人员调整共享设置以及权限管理。 2.安装 一个命令就OK了&#xff1a;yum install -y samba [rootansible01 ~]# yum install -y samba 已加载插件&#xff1a;l…

Python爬虫之简单学习BeautifulSoup库,学习获取的对象常用方法,实战豆瓣Top250

BeautifulSoup是一个非常流行的Python库&#xff0c;广泛应用于网络爬虫开发中&#xff0c;用于解析HTML和XML文档&#xff0c;以便于从中提取所需数据。它是进行网页内容抓取和数据挖掘的强大工具。 功能特性 易于使用: 提供简洁的API&#xff0c;使得即使是对网页结构不熟悉…

【一刷《剑指Offer》】面试题 31:连续子数组的最大和

牛客对应题目链接&#xff1a;连续子数组最大和_牛客题霸_牛客网 (nowcoder.com) 力扣对应题目链接&#xff1a;53. 最大子数组和 - 力扣&#xff08;LeetCode&#xff09; 核心考点 &#xff1a;简单动归问题。 一、《剑指Offer》对应内容 二、分析题目 1、贪心 从前往后迭…

关于Posix标准接口和Nuttx操作系统

基本介绍 主要参考&#xff1a; Linux 系统中的 POSIX 接口详细介绍_linux posix-CSDN博客 POSIX&#xff08;Portable Operating System Interface&#xff0c;可移植操作系统接口&#xff09;是由 IEEE&#xff08;Institute of Electrical and Electronics Engineers&#x…

大模型对齐方法笔记四:针对领域问答来进行知识对齐方法KnowPAT

KnowPAT KnowPAT(Knowledgeable Preference AlignmenT) 出自2023年11月的论文《Knowledgeable Preference Alignment for LLMs in Domain-specific Question Answering》&#xff0c;主要针对领域问答来进行知识对齐。 在领域问答有两个挑战&#xff1a;希望输出满足用户的要…

Notepad++ 常用

File Edit search view Encoding Language Settings Tools Macro Run Plugins Window 文件 编辑 搜索 视图 编码 语言 设置 工具 宏 运行 插件 窗口 快捷方式 定位行 &#xff1a;CTRL g查找&#xff1a; CTRL F替换&am…

小白也能看得懂的基于HTML+CSS+JS实现的五子棋小游戏

五子棋是一种起源于中国的传统棋类游戏&#xff0c;具有悠久的历史。 基本规则 棋盘&#xff1a; 五子棋通常在一个 15x15 的棋盘上进行&#xff0c;但也可以在更大的棋盘上进行。棋盘上的每个交叉点称为一个“点”。 棋子&#xff1a; 五子棋使用黑白两色的棋子。两名玩家分别…

【竞技宝】欧冠:多特抢开局失败,皇马展示顶级防守反击

本赛季欧冠决赛结束,皇马在上半场被压制的情况下,2比0击败多特蒙德夺得队史第15座欧冠冠军奖杯。比赛中多特蒙德已经展现出了不俗的状态,可是面对老辣的皇马他们还是败下阵来,皇马用顶级的防守反击给多特上了一课。通过这场比赛,相信球迷们也清楚当今足坛硬实力不可或缺。 在许…

7-18 对象关系映射(orm_name)---PTA实验C++

一、题目描述 一开始看到对象关系映射&#xff0c;其实我是拒绝的。这三个词凑一块&#xff0c;能是给C初学者的题吗&#xff1f; 再仔细读需求&#xff0c;才发现在课设项目已经用过这功能。Object Relational Mapping&#xff08;ORM&#xff09;就是面向对象&#xff08;O…

《平渊》· 柒 —— 大道至简?真传一句话,假传万卷书!

《平渊》 柒 "真传一句话, 假传万卷书" 对于 "大道至简"&#xff0c;不少专家可能会说出一大堆乱七八糟的名词, 比如这样&#xff1a; 所谓 "大道" 即支撑天地运转的 "系统自动力"&#xff0c;更具体地来说&#xff0c;即是天地人以…

快手游戏《无尽梦回》官宣开测:热血动作肉鸽来袭

易采游戏网最新消息&#xff1a;5月30日11:00&#xff0c;快手自研的梦境主题动作冒险手游《无尽梦回》正式宣布开启测试。此次测试名为“肉鸽进化实验”&#xff0c;旨在测试多角色技能交会的玩法。游戏将开放32人同局竞技&#xff0c;让玩家在激烈的战斗中角逐出唯一的胜利者…

HTML如何让文字底部线条不紧贴在文字下面(既在内容下方又超出内容区域)

hello&#xff0c;大家好&#xff0c;星途星途今天给大家带来的内容是如何让文字底部线条不紧贴在文字下面。 话不多说&#xff0c;先上效果图 简单来说就是padding和margin的区别。 在网页设计中&#xff0c;有时我们想要给某个元素添加一个装饰性的线条&#xff0c;比如底部…

过滤器、监听器、拦截器的区别

过滤器、监听器、拦截器的区别 过滤器&#xff08;filter&#xff09;、监听器&#xff08;Listener&#xff09;是JavaWeb的三大组件。而拦截器&#xff08;Interceptor&#xff09;是Spring框架中的。 我们主要是要分清除过滤器和拦截器的区别&#xff1a; 实现原理&#…

overleaf 写参考文献引用

目录 1、 新建.bib 文件 2、导入引用 3、在文档中引用参考文献 4、生成参考文献列表 1、 新建.bib 文件 在Overleaf项目中&#xff0c;你可以选择导入现有的 .bib 文件或在项目中创建一个新的 .bib 文件来管理你的参考文献。 导入.bib 文件&#xff1a; 在项目文件树中点击…