MQ高级特性
1.削峰
设置 消费者
测试 添加多条消息
拉取消息 每隔20秒拉取一次 一次拉取五条 然后在20秒内一条一条消费
TTL
Time To Live(存活时间/过期时间)。
当消息到达存活时间后,还没有被消费,会被自动清除。
RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。
可以在管理台新建队列、交换机,绑定
1.图形化操作
添加队列
添加交换机
将交换机和对应的队列进行绑定
时间结束 , 消息失效
2.代码实现
配置 生产者
@Configuration public class TopicMqTtlConfig {@Value("${mq.exchange.name}")private String EXCHANGENAME;@Value("${mq.queue.name1}")private String QUEUENAME1;@Value("${mq.queue.name2}")private String QUEUENAME2;// 1// . 交换机@Bean("ex1")public Exchange getExchange(){Exchange exchange = ExchangeBuilder.topicExchange(EXCHANGENAME).durable(false).build();return exchange;}// 2。 队列@Bean("queue1")public Queue getQueue1(){Queue queue = QueueBuilder.nonDurable(QUEUENAME1).withArgument("x-message-ttl",30000)//过期时间30秒.withArgument("x-max-length",10)//队列中最多接收10条消息超过10条的部分废弃.build();return queue;}@Bean("queue2")public Queue getQueue2(){Queue queue2 = QueueBuilder.nonDurable(QUEUENAME2).withArgument("x-message-ttl",300000000)//过期时间30秒.build();return queue2;}// 3. 交换机和队列进行绑定@Bean("binding1")public Binding bindQueue1ToExchange(@Qualifier("ex1") Exchange exchange,@Qualifier("queue1") Queue queue){Binding binding1 = BindingBuilder.bind(queue).to(exchange).with("ttl1.*").noargs();return binding1;}@Bean("binding2")public Binding bindQueue2ToExchange(@Qualifier("ex1") Exchange exchange,@Qualifier("queue2") Queue queue){Binding binding2 = BindingBuilder.bind(queue).to(exchange).with("ttl2.#").noargs();return binding2;} }
测试
添加成功 ttl1只接收10条
时间过期
死信队列
死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机,因为其他MQ产品中没有交换机的概念),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。
比如消息队列的消息过期,如果绑定了死信交换器,那么该消息将发送给死信交换机
消息在什么情况下会成为死信?(面试会问)
1.队列消息长度到最大的限制
最大的长度设置为10当第11条消息进来的时候就会成为死信
2. 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false(不重新回到队列中)
设置消费者为手动签收的状态
3. 原队列存在消息过期设置,消息到达超时时间未被消费;
队列绑定交换机的方式是什么?
给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key
// 1. 交换机 :正常的交换机 死信交换机
// 2.队列 :正常的 死信
//3.绑定 正常ex - 正常的que
正常的que和死信交换机
死信ex-死信queue
2.代码实现
@Configuration public class TopicMqDeadConfig {@Value("${mq1.exchange.name1}")private String EXCHANGENAME;@Value("${mq1.exchange.name2}")private String DEADEXCHANGE;@Value("${mq1.queue.name1}")private String QUEUENAME1;@Value("${mq1.queue.name2}")private String QUEUENAME2;// 声明正常交换机@Bean("ex1")public Exchange getExchange(){Exchange exchange = ExchangeBuilder.topicExchange(EXCHANGENAME).durable(false).build();return exchange;}// 正常队列@Bean("queue1")public Queue getQueue1(){Queue queue = QueueBuilder.nonDurable(QUEUENAME1).withArgument("x-message-ttl",30000)//过期时间30秒.withArgument("x-dead-letter-exchange",DEADEXCHANGE).withArgument("x-dead-letter-routing-key","dead.test")//将正常队列与死信交换机,死信队列绑定//.withArgument("x-max-length",10)//队列中最多接收10条消息超过10条的部分废弃.build();return queue;}// 交换机和队列进行绑定@Bean("binding1")public Binding bindQueue1ToExchange(@Qualifier("ex1") Exchange exchange,@Qualifier("queue1") Queue queue){Binding binding1 = BindingBuilder.bind(queue).to(exchange).with("normal.*").noargs();return binding1;}// 声明死信交换机@Bean("ex2")public Exchange getDeadExchange(){Exchange exchange = ExchangeBuilder.topicExchange(DEADEXCHANGE).durable(false).build();return exchange;}//死信队列@Bean("queue2")public Queue getQueue2(){Queue queue2 = QueueBuilder.nonDurable(QUEUENAME2).build();return queue2;}// 死信交换机和死信队列进行绑定@Bean("binding2")public Binding bindQueue2ToExchange(@Qualifier("ex2") Exchange exchange,@Qualifier("queue2") Queue queue){Binding binding2 = BindingBuilder.bind(queue).to(exchange).with("dead.*").noargs();return binding2;}}
测试
如果程序出现错误 拒绝签收
监听正常队列
发送消息 启动测试
总结:
1. 死信交换机和死信队列和普通的没有区别
2. 当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列
3. 消息成为死信的三种情况:
1. 队列消息长度到达限制;
2. 消费者拒接消费消息,并且不重回队列;
3. 原队列存在消息过期设置,消息到达超时时间未被消费;
延迟队列
延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。
需求:
- 1. 下单后,30分钟未支付,取消订单,回滚库存
- 2. 新用户注册成功7天后,发送短信问候。
实现方式:
1. 定时器
2. 死信队列
在RabbitMQ中并未提供延迟队列功能。但是可以使用:TTL+死信队列
组合实现延迟队列的效果。
1.配置
添加依赖
<!--2. rabbitmq--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--nacos 配置中心--><!--配置中心--><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId></dependency><!-- application bootstrap --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-bootstrap</artifactId></dependency><!-- nacos--><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId></dependency><dependency><groupId>com.example</groupId><artifactId>sys-comm</artifactId><version>0.0.1-SNAPSHOT</version></dependency>
修改配置
2.代码实现
创建实体类
发送消息 测试
过期后放入死信队列
添加依赖
<dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.16</version></dependency>
将json数据转化为对象
获取成功
3.连接数据库
创建表
创建测试类
@RestController @RequestMapping("order") public class OrderController {@Value("${mq1.exchange.name1}")private String EXCHANGENAME;//@Resourceprivate RabbitTemplate rabbitTemplate;@GetMappingpublic Result aaa(TabOrder order){//1. 消息 存放到mq里面String s = JSONUtil.toJsonStr(order);// openfeign -- 数据添加到数据库里面rabbitTemplate.convertAndSend(EXCHANGENAME, "normal.test", s);return Result.success(s);} }
监听normal
import javax.annotation.Resource; @Component public class XiaoFeng implements ChannelAwareMessageListener {@Resourceprivate TabOrderMapper orderMapper;@Override@RabbitListener(queues = "test_queue_normal")public void onMessage(Message message, Channel channel) throws Exception {//Thread.sleep(2000);// 20sbyte[] body = message.getBody();String s = new String(body);System.out.println(s);// 将字符串转化为 对象long deliveryTag = message.getMessageProperties().getDeliveryTag();try{TabOrder order = JSONUtil.toBean(s, TabOrder.class);// 将订单的信息 报讯到数据库里面int insert = orderMapper.insert(order);channel.basicAck(deliveryTag,true); //}catch(Exception e){//long deliveryTag, boolean multiple, boolean requeueSystem.out.println("拒绝签收消息");channel.basicNack(deliveryTag,true,false);// 死信消息}} }
监听dead
@Component public class YanChi implements ChannelAwareMessageListener {@Resourceprivate TabOrderMapper orderMapper;@Override@RabbitListener(queues = "test_queue_dead")public void onMessage(Message message, Channel channel) throws Exception {//Thread.sleep(2000);// 20sbyte[] body = message.getBody();String s = new String(body);System.out.println(s);// 将字符串转化为 对象long deliveryTag = message.getMessageProperties().getDeliveryTag();try{TabOrder order = JSONUtil.toBean(s, TabOrder.class);// order 的状态TabOrder tabOrder = orderMapper.selectById(order.getId());if(tabOrder.getStatus()==1){// 取消tabOrder.setStatus(3);}orderMapper.updateById(tabOrder);channel.basicAck(deliveryTag,true); //}catch(Exception e){//long deliveryTag, boolean multiple, boolean requeueSystem.out.println("拒绝签收消息");channel.basicNack(deliveryTag,true,false);// 死信消息}} }
测试
成功