RabbitMQ(二)

二、高级特性、应用问题以及集群搭建

高级特性

1.消息的可靠性投递

在使用RabbitMQ的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。
rabbitMQ整个消息投递的路径为:
producer -> rabbitMQ broker -> exchange -> queue -> consumer

  • confirm确认模式
    confirm确认模式是再producer传递给exchange过程中控制消息的模式,当消息成功的从producer传递到了exchange,那么则会返回一个 confirmCallBack() 回调函数
  • return 退回模式
    return退回模式是指消息从exchange传递给queue过程中消息传递失败,则会返回一个returnCallBack() 回调函数

1.1 confirm确认模式的代码编写:

因为确认模式是producer到exchange,所以代码和配置修改应该写在生产者的模块中。
一步:开启确认模式

新版本的rabbitmq弃用了publish-confirms:true,可以改用
publisher-confirm-type: correlated实现同样的效果

spring:rabbitmq:password: heimausername: heimaport: 5673virtual-host: itcasthost: 1.12.244.105#开启确认模式publisher-confirm-type: correlated

二步:编写confirmCallBack()函数
回调函数confirm()的返回值在发送消息成功时ack为true,但是我遇到一个问题,就是消息发送成功了,在队列中也能看到,但是返回值ack为false,

clean channel shutdown;

这是因为convertAncSend()方法结束后rabbitMQ的资源也就关闭了,所以就算成功了,回调函数返回值也是false;所以我们在后面强制睡眠200ms,让资源晚点关闭,这样的话得到的ack就是true了

package com.rabbitmq.springboot_mqproducer;import com.rabbitmq.springboot_mqproducer.rabbitMQconfig.MQConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@SpringBootTest
class SpringbootMqProducerApplicationTests {@ResourceRabbitTemplate rabbitTemplate;@Testvoid contextLoads() throws InterruptedException {rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {/**** @param correlationData 相关的配置信息* @param b 消息是否发送成功* @param s 消息发送失败原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {System.out.println("confirm方法被执行了");System.out.println(b);if(b){System.out.println("消息从producer -> exchange成功");System.out.println("失败原因:" + s);}else{System.out.println("消息从producer -> exchange失败");System.out.println("失败原因:" + s);}}});rabbitTemplate.convertAndSend(MQConfig.EXCHANGE_NAME,"test.hello","测试springboot整合交换机");Thread.sleep(200);}
}

结果:
在这里插入图片描述

1.2 return回退模式的代码编写

一步:开启回退模式

spring:rabbitmq:password: heimausername: heimaport: 5673virtual-host: itcasthost: 1.12.244.105#开启确认模式publisher-confirm-type: correlated#开启回退模式publisher-returns: true

二步:编写returnCallBack()函数
三步:设置exchange处理消息的模式
setMandatory为true,如果消息没有到队列queue,则返回消息给发送方
setMandatory为false,如果消息没有到队列queue,则丢弃消息(默认)

package com.rabbitmq.springboot_mqproducer;import com.rabbitmq.springboot_mqproducer.rabbitMQconfig.MQConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.propertyeditors.StringArrayPropertyEditor;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@SpringBootTest
class SpringbootMqProducerApplicationTests {@ResourceRabbitTemplate rabbitTemplate;@Testvoid contextLoads() throws InterruptedException {//编写confirm回调函数rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {/**** @param correlationData 相关的配置信息* @param b 消息是否发送成功* @param s 消息发送失败原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {System.out.println("confirm方法被执行了");System.out.println(b);if(b){System.out.println("消息从producer -> exchange成功");System.out.println("失败原因:" + s);}else{//消息发送失败,需要做一些处理System.out.println("消息从producer -> exchange失败");System.out.println("失败原因:" + s);}}});//编写return回调函数rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {System.out.println("return回退模式回调函数执行了");System.out.println("消息:"+returnedMessage.getMessage());System.out.println("exchange:"+returnedMessage.getExchange());System.out.println("replyCode:"+returnedMessage.getReplyCode());System.out.println("replyText:"+returnedMessage.getReplyText());System.out.println("routingKey:"+returnedMessage.getRoutingKey());}});//设置回退模式中,exchange处理消息的方式/*当将mandatory设置为false(默认值),如果RabbitMQ无法将消息路由,消息将会被静默丢弃,生产者不会收到通知。当设置mandatory为true时,意味着消息被视为"mandatory",如果在发布消息时RabbitMQ无法将消息路由到任何队列(例如由于没有匹配的队列与指定的路由键),则代理将通过调用ReturnListener回调的returnedMessage方法将消息返回给生产者(发布者)。生产者可以根据需要适当地处理这个返回的消息,例如记录日志或执行某些恢复操作。*/rabbitTemplate.setMandatory(true);//TODO 这里把routingKey写错,是为了让交换机找不到queue,从而触发returnCallBack()函数rabbitTemplate.convertAndSend(MQConfig.EXCHANGE_NAME,"testtttt.hello","测试springboot整合交换机");Thread.sleep(200);}}

消息的可靠投递小结:

  • 设置配置publisher-confirm-type: correlated开启确认模式
  • 使用rabbitTemplate.setConfirmCallback设置回调函数。当消息发送到exchange后回调confirm方法。在方法中判断ack,如果为true, 则发送成功,如果为false,则发送失败,需要处理。
  • 设置ConnectionFactory的publisher-returns="true"开肩退回模式。
  • 使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到queue失败后,如果设置了rabbitTemplate.setMandatory(true)参数,则会将消息退回给producer。并执行回调函数returnedMessage。
  • 在RabbitMQ中也提供了事务机制,但是性能较差,此处不做讲解。
    使用channel下列方法,完成事务控制:
    txSelect(),用于将当前channel设置成transaction模式
    txCommit(),用于提交事务
    txRollback(),用于回滚事务

2.Consumer Ack

ack指Acknowledge,确认。表示消费端收到消息后的确认方式。
有三种确认方式:

  • 自动确认:acknowledge=“none”
  • 手动确认:acknowledge=“manual”
  • 根据异常情况确认:acknowledge=“auto”(这种方式很麻烦,不做讲解)

其中自动确认是指,当消息一旦被Consumer接收到, 则自动确认收到,并将相应message从RabbitMQ的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck() 手动签收,如果出现异常,则调用channel.basicNack() 方法,让其自动重新发送消息。

代码编写:
发送消息的生产者端代码不用变,只需要能够发送消息就行
消费者端:
一步:编写yml配置文件

spring:rabbitmq:username: heimapassword: heimavirtual-host: itcasthost: 1.12.244.105port: 5673#设置消息为手动签收listener:simple:acknowledge-mode: manual #消费者端确认模式:none自动确认 manual手动确认 auto通过抛出异常的类型,来做响应的处理concurrency: 1 #当前监听的数量max-concurrency: 5 #最大监听数量retry:enabled: true #是否支持重试max-attempts: 4 #最大重试次数,默认为3

二步:编写消费者代码
消费者端创建一个listener并实现ChannelAwareMessageListener接口(其实也可以不实现该接口,只要 @RabbitListener 标记的方法,或者 @RabbitListener 标记的类+ @RabbitHandler 标记的方法的参数列表有[com.rabbitmq.client.Channel]和[org.springframework.amqp.core.Message]两个参数,都可以)

package com.rabbit.springboot_mqconsumer;import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;/*** @author Watching* * @date 2023/7/19* * Describe:*/
@Component
public class RabbitMQListener implements ChannelAwareMessageListener {
//    @RabbitListener(queues = {"boot_topic_queue"})//填写队列名称,可以以字符串数组的方式监听多个队列
//    public void listener(Message message){
//        System.out.println("message:"+message);
//    }/*** 使用ChannelAwareMessageListener监听器接口中的onMessage()方法来充当消费者,如果上面注释的方法与当前方法同时存在,一条消息只会被消费一次。不会被两个方法都消费** @param message* @param channel* @throws Exception Consumer ACK机制:*                   1.设置手动签收。acknowledge= "manual”*                   2.让监听器类实现ChannelAwareMessageListener接口*                   3.如果消息成功处理,则调用channel的basicAck()签收*                   4.如果消息处理失败,则调用channel的basicNack( )拒绝签收,broker重新发送给consumer*/@RabbitListener(queues = "boot_topic_queue" )@Overridepublic void onMessage(Message message, Channel channel) throws Exception {try{//1.接收消息System.out.println("message:" + message);System.out.println("channel:" + channel);//2.处理业务逻辑System.out.println("模拟处理业务逻辑......");//3.手动签收/*void basicAck(long deliveryTag, boolean multiple) throws IOException;deliveryTag:当消费者接收到一条消息后,RabbitMQ 会为该消息分配一个唯一的 DeliveryTag。这个 DeliveryTag 是一个64位的长整型数值,并且只在该 Channel 内唯一,即相同 Channel 下的 DeliveryTag 不会重复。multiple:当 multiple 设置为 false 时,表示只确认当前指定的 deliveryTag 对应的一条消息。也就是说,只确认指定的单个消息已经成功被处理或处理失败。当 multiple 设置为 true 时,表示确认当前指定的 deliveryTag 及其之前所有未确认的消息(在同一个 Channel 下)。也就是说,会一次性确认多条消息的处理状态,将 deliveryTag 小于或等于指定 deliveryTag 的所有消息都确认处理了。这种批量确认的机制有助于提高消息的处理效率,特别是当消费者处理多条消息时,可以通过一次性确认多条消息的方式来减少网络开销和消费者端的负担。在使用 channel.basicAck(deliveryTag, multiple) 和 channel.basicNack(deliveryTag, multiple, requeue) 方法时,可以根据实际场景来选择是单条确认还是批量确认,以满足不同的业务需求。*/channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);System.out.println("完成手动签收");}catch(Exception e){//4.出现异常,拒绝签收/*deliveryTag:一个唯一标识消息的64位长整型数值,用于确认消息的消费状态。multiple:一个布尔类型的参数,用于决定是否批量处理多条消息。若设置为 true,则会否定当前指定 deliveryTag 及其之前的所有未确认消息;若设置为 false,则只否定当前指定 deliveryTag 对应的一条消息。requeue:一个布尔类型的参数,表示是否将消息重新放回队列。若设置为 true,则消息会被重新入队列,RabbitMQ 会再次将它发送给消费者;若设置为 false,则消息会被直接丢弃,不会重新放回队列。*/System.out.println("代码逻辑出现异常,拒收");channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,true);}}
}

只需要两步,就可以实现Consumer ack,下面我们来测试一下:
首先是正常运行的代码的结果:(业务逻辑代码无异常)
在这里插入图片描述
生产者端是用的前面测试boot整合的代码
在这里插入图片描述
然后我们来测试业务逻辑代码出错的情况,我们在业务逻辑代码处添加一个除数不能为0的异常
在这里插入图片描述
再次运行代码,一直在重试,一直再报错
在这里插入图片描述

消息的可靠性总结

1.持久化:

  • exchange要持久化
  • queue要持久化
  • message要持久化

2.生产方确认Confirm(在后续文章中会讲解如何在回调函数中进行具体的处理
3.消费方确认Ack
4. Broker高可用(集群搭建

3.消费端限流

在A系统中,每秒最多只能处理1000条请求,如果在一秒钟只能瞬间有5000条请求打入A系统,那么A系统就会崩溃,所以我们在A系统中加入一个MQ中间件,让5000个请求先发送到MQ,然后A系统再分批次的从MQ中拉取1000条请求,这样A系统就避免了崩溃的情况。
这也是我们常说的MQ的削峰功能
在这里插入图片描述
设置MQ消费限流很简单,只需要设置两个属性:

  • 确认模式设置为手动确认(在上面的Ack我们已经讲过)
  • 设置prefetch属性,prefetch = n,n就是每次从MQ中获取消息的数量
    在这里插入图片描述
    其余的消费端代码和生产者端代码不用修改。
    当设置了消费端限流后,如果从MQ中取出1条消息,消费者端没有进行确认,那么消费者端将不会再从MQ中取消息,直到消息被确认。

4.TTL

TTL全称Time To Live(存活时间/过期时间)。
当消息到达存活时间后,还没有被消费,会被自动清除。
RabbitMQ可以对消息设置过期时间,也可以对整个队列Queue设置过期时间。
举一个例子:
生活中我们在购买商品的时候会下订单,系统会提示我们要在30分钟之内付款,否则订单将会被取消。
在这里插入图片描述

Ⅰ、先在控制台模拟上面的情况

①创建一个交换机
在这里插入图片描述
②创建一个队列
在这里插入图片描述
③进入交换机exchange_ttl和队列queue_ttl进行绑定
在这里插入图片描述
④消息的发布
在这里插入图片描述
⑤在消息队列中查看
将鼠标放上ttl,就可以看到设置的时间,等时间一过,这条消息就会被自动清除。
在这里插入图片描述

Ⅱ、代码实现队列过期,和消息过期

①创建交换机,队列,以及绑定关系

package com.rabbitmq.springboot_mqproducer.rabbitMQconfig;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** @author Watching* * @date 2023/7/18* * Describe:*/
@Configuration
public class MQConfig {public static final String QUEUE_TTL_NAME = "queue_ttl";public static final String EXCHANGE_TTL_NAME = "exchange_ttl";/*
创建队列,测试ttl特性*/@Bean("test_queue_ttl")public Queue ttlQueue() {Map<String,Object> arguments = new HashMap<>();arguments.put("x-message-ttl",10000);//消息过期的时间arguments.put("x-expires",100000);//队列过期的时间//设置队列的ttl时间return QueueBuilder.durable(QUEUE_TTL_NAME).withArguments(arguments).build();//参数的属性可以在控制台上查看}/*
创建一个交换机测试队列ttl特性*/@Bean("test_exchange_ttl")public Exchange ttlExchange() {return ExchangeBuilder.topicExchange(EXCHANGE_TTL_NAME).durable(true).build();}/*绑定ttl交换机和队列*/@Beanpublic Binding ttlBinding(@Qualifier("test_exchange_ttl") Exchange exchange, @Qualifier("test_queue_ttl") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("ttl.#").noargs();}
}

在创建队列时,我们指定了x-message-ttl,使队列中的所有消息都是一个固定的时间过期
我们还可以在发送消息时,指定每条消息的过期时间。
只需要在发送方法convertAndSend()方法中添加一个消息后处理参数即可

/*MessagePostProcessor 是 Spring AMQP 中的一个接口,用于对消息进行后处理。通过实现该接口,你可以在发送消息之前对消息进行一些自定义处理,例如添加自定义的消息头、修改消息内容等。*/MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {//1.设置消息属性message.getMessageProperties().setExpiration("5000");//5000ms过期//2.返回该消息return message;}};@Testvoid testSend() throws InterruptedException {for (int i = 0; i < 10; i++) {rabbitTemplate.convertAndSend(MQConfig.EXCHANGE_TTL_NAME, "ttl.hello", "测试ttl"+i,messagePostProcessor);}Thread.sleep(200);}

小细节:
①当队列设置了x-expires和x-messgae-ttl,消息过期时间以短的为准
②当队列设置了x-messgae-ttl,且发送消息时通过消息后处理也设置了过期时间,那么消息过期时间也以短的为准。
③当十条消息中只有一条消息设置了过期时间,这条消息过期后,只有处于队列顶端,即即将被消费时,才会对这条消息是否过期做判断。

5.死信队列

5.1 概念

死信队列,英文缩写: DLX ,Dead Letter Exchange (死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。(死信队列为什么英文翻译过来使死信交换机呢?因为交换机概念只有在RabbitMQ中才有,其它MQ中间件只有队列概念,所以习惯叫死信队列,而RabbitMQ中存在交换机概念,所以叫死信交换机。)
在这里插入图片描述
在这里我们需要理解的问题有:
①消息什么时候成为死信?

  • 队列长度达到限制,比如队列最多容纳10条消息,当第11条消息进入时,这条消息就成为了死信消息。
  • 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;
  • 原队列存在消息过期设置,消息到达超时时间却并未被消费

以上三种,满足一条即为死信消息

②队列如何绑定死信交换机?
队列设置参数:x-dead-letter-exchangex-dead-letter-routing-key
x-dead-letter-exchange:死信交换机的名称
x-dead-letter-routing-key:消息发送时指定的routingKey

在这里插入图片描述

5.2 代码实现死信队列

创建死信队列:

  • 1.声明正常的队列(test_queue_dLx)和交换机(test_exchange_dlx)
  • 2.声明死信队列(queue_dlx)和死信交换机(exchange_dlx)
  • 3.正常队列绑定死信交换机,正常队列绑定死信队列不需要创建Binding Bean,只需要在正常队列创建时设置参数就可以
    – 设置两个参数:
    x-dead-letter-exchange:死信交换机名称
    x-dead-letter-routing-key:发送给死信交换机的routingkey

设置正常队列中的消息的过期时间x-message-ttl
设置正常队列的长度限制x-max-length

package com.rabbitmq.springboot_mqproducer.rabbitMQconfig;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** @author Watching* * @date 2023/7/18* * Describe:*/
@Configuration
public class MQConfig {/*** 测试死信队列*//*创建普通交换机和普通队列*/@Bean("test_exchange_dlx")public Exchange testDlxExchange() {return ExchangeBuilder.topicExchange("test_exchange_dlx").durable(true).build();}@Bean("test_queue_dlx")public Queue testDlxQueue() {Map<String,Object> map = new HashMap<>();//x-dead-letter-exchange:死信交换机名称map.put("x-dead-letter-exchange","exchange_dlx");//x-dead-letter-routing-key:发送给死信交换机的routingkeymap.put("x-dead-letter-routing-key","dlx.hehe");//这个routingkey只需要满足死信交换机的路由规则就可以//设置正常队列中的消息的过期时间ttlmap.put("x-message-ttl",10000);//设置正常队列的长度限制max-lengthmap.put("x-max_length",10);return QueueBuilder.durable("test_queue_dlx").withArguments(map).build();}@Beanpublic Binding binding1(@Qualifier("test_exchange_dlx") Exchange exchange,@Qualifier("test_queue_dlx")Queue queue){return BindingBuilder.bind(queue).to(exchange).with("test.dlx.#").noargs();}/*创建死信交换机和死信队列*/@Bean("exchange_dlx")public Exchange dlxExchange() {return ExchangeBuilder.topicExchange("exchange_dlx").durable(true).build();}@Bean("queue_dlx")public Queue dlxQueue() {return QueueBuilder.durable("queue_dlx").build();}@Beanpublic Binding binding2(@Qualifier("exchange_dlx") Exchange exchange,@Qualifier("queue_dlx")Queue queue){return BindingBuilder.bind(queue).to(exchange).with("dlx.#").noargs();}/*绑定普通队列和死信交换机,并不需要写一个Binding,只需要在普通队列中添加参数就行*/
}

发送消息测试死信消息:
1.过期时间
2.长度限制
3.消息拒收

    @Testvoid testDlx() {//1.过期时间
//        rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.hello","测试消息超出过期时间变成死信");//2.超出队列消息数量限制
//        for (int i = 0; i < 20; i++) {
//            rabbitTemplate.convertAndSend("test_exchange_dlx", "test.dlx.hello", "测试消息超出队列数量限制变成死信");
//        }//3.消费端拒收rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.hello","测试消息被拒收变成死信");}

死信队列小结:
1.死信交换机,死信队列和普通交换机,普通队列没有区别.
2.当消息成为死信后,如果该队列绑定了死信交换机,则消息会被重新路由到死信队列中
3.消息成为死信的三种情况

  • 消息在队列中到达超时时间并未被消费
  • 消息在消费者端被拒收,且设置了不重回队列
  • 队列长度存在限制,消息数量超出了限制

6.延迟队列

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。
需求:
1.下单后,30分钟未支付,取消订单,回滚库存。
2.新用户注册成功7天后,发送短信问候。
实现方式:
1.定时器
2.延迟队列
订单系统将订单放入延迟队列种,30分钟后取出,去库存系统中判断订单是否已经支付,再进行后续的支付或者未支付操作
在这里插入图片描述
但是!
RabbitMQ官方没有提供延迟队列,所以我们需要使用ttl+死信队列构成延迟队列
普通队列设置为30min中过期,过期后消息路由到死信队列,库存系统从死信队列中取消息,这样就形成了一个延迟队列
在这里插入图片描述

代码实现延迟队列

1.定义正常交换机(order_exchange)和队列(order_queue),同时绑定
2.定义死信交换机(order_exchange_dlx) 和队列(order_queue_dlx),同时绑定
3.绑定正常队列和死信交换机,设置正常队列过期时间为10秒

    /*** 测试延迟队列*//*1.定义正常交换机(order_exchange)和队列(order_queue)*/@Bean("orderQueue")public Queue orderQueue(){//3.正常队列绑定死信交换机Map<String,Object> map = new HashMap<>();map.put("x-dead-letter-exchange","order_exchange_dlx");map.put("x-dead-letter-routing-key","dlx.order.hehe");//设置正常队列的消息过期时间map.put("x-message-ttl",10000);return QueueBuilder.durable("order_queue").withArguments(map).build();}@Bean("orderExchange")public Exchange orderExchange(){return ExchangeBuilder.topicExchange("order_exchange").build();}@Beanpublic Binding orderBinding(@Qualifier("orderQueue")Queue queue,@Qualifier("orderExchange")Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("order.#").noargs();}/*2.定义死信交换机(order_exchange_dlx) 和队列(order_queue_dlx)*/@Bean("orderQueueDlx")public Queue orderQueueDlx(){return QueueBuilder.durable("order_queue_dlx").build();}@Bean("orderExchangeDlx")public Exchange orderExchangeDlx(){return ExchangeBuilder.topicExchange("order_exchange_dlx").build();}@Beanpublic Binding orderBindingDlx(@Qualifier("orderQueueDlx")Queue queue,@Qualifier("orderExchangeDlx")Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("dlx.order.#").noargs();}

4.创建生产者发送消息

    /*** 测试延迟队列*/@Testvoid testDelay() throws InterruptedException {rabbitTemplate.convertAndSend("order_exchange","order.test","测试延迟队列");for (int i = 10;i > 0;i--){System.out.println(i+"...");Thread.sleep(1000);}}

5.创建消费者

package com.rabbit.springboot_mqconsumer;import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;/*** @author Watching* * @date 2023/8/2* * Describe:*/
@Component
public class OrderListener implements ChannelAwareMessageListener {@RabbitListener(queues = "order_queue_dlx")//监听死信队列@Overridepublic void onMessage(Message message, Channel channel) throws Exception {try{//1.接收messageSystem.out.println("message:"+message);//2.处理业务逻辑System.out.println("处理业务逻辑");System.out.println("根据订单id在数据库中查询订单状态");System.out.println("判断订单是否支付成功");System.out.println("未支付,回滚库存,取消订单");//3.手动签收channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);}catch (Exception e){//4.业务出错,拒绝签收channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,true);//业务出错,拒签后要将这条消息重新放回死信队列}}
}

延迟队列小结:
1.延迟队列指消息进入队列后,可以被延迟一定时间,再进行消费。
2. RabbitMQ没有提供延迟队列功能,但是可以使用: TTL + DLX来实现延迟队列效果。

应用问题

1.消息补偿

消息补偿机制

2.幂等性保障

幂等性保障

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/25571.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于PHP+vue的网上订餐系统的设计与开发_769b9

快速发展的社会中&#xff0c;人们的生活水平都在提高&#xff0c;生活节奏也在逐渐加快。为了节省时间和提高工作效率&#xff0c;越来越多的人选择利用互联网进行线上打理各种事务&#xff0c;通过线上管理订餐也就相继涌现。与此同时&#xff0c;人们开始接受方便的生活方式…

汽车维修保养记录查询API:实现车辆健康状况一手掌握

在当今的数字化世界中&#xff0c;汽车维修保养记录的查询和管理变得前所未有地简单和便捷。通过API&#xff0c;我们可以轻松地获取车辆的维修和保养记录&#xff0c;从而实现对手中车辆健康状况的实时掌握。 API&#xff08;应用程序接口&#xff09;是进行数据交换和通信的标…

【LeetCode 75】第二十一题(1207)独一无二的出现次数

目录 题目: 示例: 分析: 代码运行结果: 题目: 示例: 分析: 用两个unordered_map来分别存放每个数字的出现次数和出现的次数这个数,有点绕,比如说有给的数组有两个1,那么第一个map存放的是(1,2),表示1这个数子出现了两次,而第二个map存放的是(2,true),表示有出现次数为2的数…

mysql 笔记(一)-mysql的架构原理

mysql体系结构 mysql Server 架构自顶向下大致可以分为网络连接层,服务层,存储引擎和系统文件层.体系架构图如下: 网络连接层提供与mysql服务器建立的支持.常见的java.c.python/.net ,它们通过各自API技术与mysql建立连接. 服务层是Mysql Server 的核心,主要包含系统管理和控…

Redis 基础

1.定义 Redis 是一个高性能的key-value数据库&#xff0c;key是字符串类型。 2.核心特点&#xff1a; 单进程&#xff1a; Redis的服务器程序采用的是单进程模型来处理客户端的请求。对读写时间的响 应是通过对epoll函数的包装来做到的。 3.数据类型&#xff1a; 键的类型…

hcip——期中小试

要求&#xff1a; 1、该拓扑为公司网络&#xff0c;其中包括公司总部、公司分部以及公司骨干网&#xff0c;不包含运营商公网部分。 2 、设备名称均使用拓扑上名称改名&#xff0c;并且区分大小写。 3 、整张拓扑均使用私网地址进行配置。 4 、整张网络中&#xff0c;运行 O…

CNN成长路:从AlexNet到EfficientNet(02)

一、说明 在~10年的深度学习中&#xff0c;进步是多么迅速&#xff01;早在 2012 年&#xff0c;Alexnet 在 ImageNet 上的准确率就达到了 63.3% 的 Top-1。现在&#xff0c;我们超过90%的EfficientNet架构和师生训练&#xff08;teacher-student&#xff09;。 二、第一阶段 …

Android性能优化—Apk瘦身优化

随着业务迭代&#xff0c;apk体积逐渐变大。项目中积累的无用资源&#xff0c;未压缩的图片资源等&#xff0c;都为apk带来了不必要的体积 增加。而APK 的大小会影响应用加载速度、使用的内存量以及消耗的电量。在讨论如何缩减应用的大小之前&#xff0c;有必要了解下应用 APK …

【超细节】Vue3组件事件怎么声明,defineEmits与emit

目录 前言 一、基本语法 1. 子组件触发 2. 父组件监听 二、 事件参数 1. 传值 2. 接收值 三、 事件校验 四、注意事项 前言 组件事件是 Vue 组件之间进行通信的一种方式。它允许一个组件触发一个自定义事件&#xff0c;并且其他组件可以监听并响应这个事件。 一、基本…

electron+vue+ts窗口间通信

文章目录 一. 目的二.逻辑分析三. 代码示例 "types/node": "^20.3.1","vitejs/plugin-vue": "^4.1.0","vueuse/electron": "^10.2.1","electron": "^25.2.0","electron-packager":…

docker部署jenkins且jenkins中使用docker去部署项目

docker部署jenkins且jenkins中使用docker去部署项目 1、确定版本 2.346.1是最后一个支持jdk8的 2、编写docker-compose.yml并执行 在这个目录中新增data文件夹&#xff0c;注意data是用来跟docker中的文件进行映射的 docker-compose.yml version: "3.1" service…

day38 滑动窗口

1. 滑动窗口 应用场景&#xff1a; 满足xxx条件&#xff08;计算结果、出现次数、同时包含&#xff09; 关键词&#xff1a;最长最短子串无重复等等 1&#xff09;最长 左右指针在起始点&#xff0c;R 向右依次滑动循环&#xff1b; 如果&#xff1a; 窗内元素满足条件&#x…

【Groups】50 Matplotlib Visualizations, Python实现,源码可复现

详情请参考博客: Top 50 matplotlib Visualizations 因编译更新问题&#xff0c;本文将稍作更改&#xff0c;以便能够顺利运行。 1 Dendrogram 树状图根据给定的距离度量将相似的点组合在一起&#xff0c;并根据点的相似性将它们组织成树状的链接。 新建文件Dendrogram.py: …

session-cookies 三个缓存 localStorage、sessionStorage、Cookies。

session-cookies session-cookies is localStorage、sessionStorage、Cookies。session-cookies This plugin is used to summarize the browser’s three caches localStorage, sessionStorage, Cookies.The plugin is designed to be quick and easy to use. Below is a sum…

鸿鹄工程项目管理系统em Spring Cloud+Spring Boot+前后端分离构建工程项目管理系统 em

​ Java版工程项目管理系统 Spring CloudSpring BootMybatisVueElementUI前后端分离 功能清单如下&#xff1a; 首页 工作台&#xff1a;待办工作、消息通知、预警信息&#xff0c;点击可进入相应的列表 项目进度图表&#xff1a;选择&#xff08;总体或单个&#xff09;项目…

计算机网络-性能指标

计算机网络-性能指标 文章目录 计算机网络-性能指标简介速率比特速率 带宽吞吐量时延时延计算 时延带宽积往返时间网络利用率丢包率总结 简介 性能指标可以从不同的方面来度量计算机网络的性能 常用的计算机网络的性能指标有以下8个 速率带宽吞吐量时延时延带宽积往返时间利…

C++ 学习系列 1 -- 左值、右值与万能引用

1. 何为左值&#xff1f;何为右值&#xff1f; 简单的说&#xff0c;左值可以放在等号的左边&#xff0c;右值可以放在等号的右边。 左值可以取地址&#xff0c;右值不能取地址。 1.1 左值举例&#xff1a; 变量、函数或数据成员返回左值引用的表达式 如 x、x 1、cout <…

ARCGIS地理配准出现的问题

第一种。已有省级行政区矢量数据&#xff0c;在网上随便找一个相同省级行政区图片&#xff0c;利用地理配准工具给图片添加坐标信息。 依次添加省级行政区选择矢量数据、浙江省图片。 此时&#xff0c;图层默认的坐标系与第一个加载进来的省级行政区选择矢量数据的坐标系一致…

Python(三)

诚信像一面镜子&#xff0c;一旦打破&#xff0c;你的人格就会出现裂痕。 存在短路的情景 谢谢观看 Python(三)

一百四十三、Linux——Linux的CentOS 7系统语言由中文改成英文

一、目的 之前安装CentOS 7系统的时候把语言设置成中文&#xff0c;结果Linux文件夹命名出现中文乱码的问题&#xff0c;于是决定把Linux系统语言由中文改成英文 二、实施步骤 &#xff08;一&#xff09;到etc目录下&#xff0c;找到配置文件locale.conf # cd /etc/ # ls…