如何保证消息的可靠性+延迟队列(TTL+死信队列+延迟队列)

目录

1.如何保证消息的可靠性

1.1.消息的可靠投递

confirm机制

return机制

1.2.如何保证消息在队列中不丢失

1.3.确保消息能可靠的被消费掉

2.延迟队列

2.1.TTL

2.2.死信队列

2.3.延迟队列

3.如何防止消费者重复消费消息


1.如何保证消息的可靠性

1.1.消息的可靠投递

在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 RabbitMQ 重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。于是,我们开始思考,如何才能进行 RabbitMQ 的消息可靠投递呢?特别是在这样比较极端的情况,RabbitMQ 集群不可用的时候,无法投递的消息该如何处理呢?

在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。

  • confirm 确认模式
  • return 退回模式
  • 消息从 producer 到 exchange 则会返回一个 confirmCallback 。
  • 消息从 exchange-->queue 投递失败则会返回一个 returnCallback 。

默认rabbitmq不开启上面两种模式

我们将利用这两个 callback 控制消息的可靠性投递

confirm和return的实现  

  1. 设置ConnectionFactory的publisher-confirm-type: correlated开启 确认模式。

  2. 使用rabbitTemplate.setConfirmCallback设置回调函数。当消息发送到exchange后回调confirm方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发送失败,需要处理。

  3. 设置ConnectionFactory的publisher-returns="true" 开启 退回模式。

  4. 使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到queue失败后执行回调函数returnedMessage。

confirm机制

演示:4.springboot整合RabbitMQ

(1).配置文件中开启confirm

#开启confirm确认机制
spring.rabbitmq.publisher-confirm-type=correlated

(2)设置rabbitTemplate的confirmCallback回调函数

    /*** 使用confirm机制:* (1)需要开启confirm机制。-----配置文件中加入:spring.rabbitmq.publisher-confirm-type=correlated* (2)为rabbitTemplate指定setConfirmCallback回调函数*/@Testvoid test001() {//只能保证消息从生产者到交换机的可靠性rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {//触发该方法if (ack == false) {System.out.println("未来根据项目的需要,完成相应的操作");}}});rabbitTemplate.convertAndSend("Topics-exchange", "lazy.aaa", "Hello RabbitMQ...");}

return机制

(1).配置文件中开启return

#开启return机制
spring.rabbitmq.publisher-returns=true

(2)设置rabbitTemplate的return回调函数

    /*** return机制:* (1)开启return机制---配置文件加入:spring.rabbitmq.publisher-returns=true* (2)为rabbitTemplate设置return的回调函数*/@Testvoid test002() {//只有当消息无法从交换机到队列时才会触发rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {//为交换机到队列分发消息失败时会触发System.out.println("replyCode====" + replyCode);System.out.println("replyText====" + replyText);}});rabbitTemplate.convertAndSend("Topics-exchange", "lazy.aaa", "Hello RabbitMQ...");}

1.2.如何保证消息在队列中不丢失

(1)设置队列为持久化

(2)设置消息的持久化

1.3.确保消息能可靠的被消费掉

ACK确认机制

多个消费者同时收取消息,收取消息到一半,突然某个消费者挂掉,要保证此条消息不丢失,就需要acknowledgement机制,就是消费者消费完要通知服务端,服务端才将数据删除

这样就解决了,即使一个消费者出了问题,没有同步消息给服务端,还有其他的消费端去消费,保证了消息不丢的case。

ACK的实现

ack指Acknowledge,确认。 表示消费端收到消息后的确认方式。

有三种确认方式:

  • 自动确认:acknowledge="none"
  • 手动确认:acknowledge="manual"
  • 根据异常情况确认:acknowledge="auto"(这种方式使用麻烦,并且不常用)

其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息队列中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。

如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。

消费端:

(1).修改消费端----手动确认消息

#修改消费端----手动确认消息
spring.rabbitmq.listener.simple.acknowledge-mode=manual

(2).修改代码

  

package com.wqg.listener;import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.springframework.amqp.core.Message;import java.io.IOException;/*** @ fileName:MyListener* @ description:* @ author:wqg* @ createTime:2023/7/12 18:57*/
@Component
public class MyListener {//basicAck:确认消息----rabbit服务端删除//basicNack:服务继续发送消息@RabbitListener(queues = {"Topics-queue002"})//queues:表示你监听的队列名public void h(Message message , Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();//把监听到的消息封装到Message类对象中byte[] body = message.getBody();String s = new String(body);System.out.println("消息内容==="+s);try {//int a = 10/0; //模拟宕机System.out.println("核心业务的处理~~~");/***long deliveryTag: 消息的标注* boolean multiple:是否把该消息之前未确认的消息一起确认掉*/channel.basicAck(deliveryTag,true);//确认消息} catch (Exception e) {/*** long deliveryTag; boolean multiple;* boolean requeue:是否要求rabbitmq服务重新发送该消息*/channel.basicNack(deliveryTag,true,true);}}
}

总结: 如何保证消息的可靠性?

  • 保证消息的可靠性投递: confirm机制和return机制
  • 队列中:---持久化
  • 使用ack机制保证消费者的可靠性消费。

2.延迟队列

2.1.TTL

TTL 全称 Time To Live(存活时间/过期时间)。

当消息到达存活时间后,还没有被消费,会被自动清除。

RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。

演示:

使用图形化界面创建

 生产者测试

 

 结果

 

队列设置了过期时间而消息也设置了过期时间-----按照时间短的执行 

小结:

  • 设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期。
  • 设置消息过期时间使用参数:expiration。单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断这一消息是否过期。
  • 如果两者都进行了设置,以时间短的为准。

2.2.死信队列

死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。

什么样的消息会成为死信消息:

  • 队列消息长度到达限制
  • 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false
  • 原队列存在消息过期设置,消息到达超时时间未被消费

队列绑定死信交换机:

演示:

使用图形化界面创建:

 

 

 

 生产者测试

 结果

 

2.3.延迟队列

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。

需求:

  1. 下单后,30分钟未支付,取消订单,回滚库存。

  2. 新用户注册成功7天后,发送短信问候。

实现方式:

  1. 定时器:性能差---每隔一段时间要进行数据库查询。

  2. 延迟队列

通过消息队列完成延迟队列的功能:

  • 在RabbitMQ中并未提供延迟队列功能。
  • 但是可以使用:TTL+死信队列 组合实现延迟队列的效果。

 

演示:

队列为空

 

创建springboot项目

配置文件


#rabbitmq的配置
spring.rabbitmq.host=192.168.75.129
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/

(1). 引入依赖

<dependencies><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency></dependencies>

(2).创建OrderController.java模拟订单

package com.wqg.controller;import com.alibaba.fastjson.JSON;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.HashMap;
import java.util.UUID;/*** @ fileName:OrderController* @ description:订单* @ author:wqg* @ createTime:2023/7/13 16:24*/
@RestController
@RequestMapping("/order")
public class OrderController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/saveOrder")public String save(Integer pid, Integer num) {//生成一个订单号String orderId = UUID.randomUUID().toString().replace("-", "");System.out.println("下单成功,订单号为===" + orderId);HashMap<Object, Object> map = new HashMap<>();map.put("orderId", orderId);map.put("pid", pid);map.put("num", num);rabbitTemplate.convertAndSend("pt_exchange", "qy165.aaa", JSON.toJSONString(map));return "下单成功";}
}

(3).创建MyListener.java模拟监听

package com.wqg.listener;import com.alibaba.fastjson.JSON;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.HashMap;/*** @ fileName:MyListener* @ description:* @ author:wqg* @ createTime:2023/7/13 16:35*/
@Component
public class MyListener {@RabbitListener(queues = {"dead_queue"})public void hello(Message message){byte[] body = message.getBody();String s = new String(body);HashMap hashMap = JSON.parseObject(s, HashMap.class);System.out.println("message==="+hashMap);//取消订单System.out.println("取消订单号为==="+hashMap.get("orderId"));}
}

测试

控制台

 

 

3.如何防止消费者重复消费消息

消息的幂等性---无论操作几次结果都是一样。

  • 生成全局id,存入redis或者数据库,在消费者消费消息之前,查询一下该消息是否有消费过。
  • 如果该消息已经消费过,则告诉mq消息已经消费,将该消息丢弃(手动ack)。
  • 如果没有消费过,将该消息进行消费并将消费记录写进redis或者数据库中。

 简单描述一下需求,如果订单完成之后,需要为用户累加积分,又需要保证积分不会重复累加。那么再mq消费消息之前,先去数据库查询该消息是否已经消费,如果已经消费那么直接丢弃消息。 

演示:

生产者

import com.alibaba.fastjson.JSONObject;
import com.xiaojie.score.entity.Score;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;import java.util.UUID;/*** @author * @version 1.0* @description:发送积分消息的生产者* @date*/
@Component
@Slf4j
public class ScoreProducer implements RabbitTemplate.ConfirmCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;//定义交换机private static final String SCORE_EXCHANGE = "ykq_score_exchaneg";//定义路由键private static final String SCORE_ROUTINNGKEY = "score.add";/*** @description: 订单完成* @param:* @return: java.lang.String* @author xiaojie* @date: */public String completeOrder() {String orderId = UUID.randomUUID().toString();System.out.println("订单已完成");//发送积分通知Score score = new Score();score.setScore(100);score.setOrderId(orderId);String jsonMSg = JSONObject.toJSONString(score);sendScoreMsg(jsonMSg, orderId);return orderId;}/*** @description: 发送积分消息* @param:* @param: message* @param: orderId* @return: void* @author * @date:*/@Asyncpublic void sendScoreMsg(String jsonMSg, String orderId) {this.rabbitTemplate.setConfirmCallback(this);rabbitTemplate.convertAndSend(SCORE_EXCHANGE, SCORE_ROUTINNGKEY, jsonMSg, message -> {//设置消息的id为唯一message.getMessageProperties().setMessageId(orderId);return message;});}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String s) {if (ack) {log.info(">>>>>>>>消息发送成功:correlationData:{},ack:{},s:{}", correlationData, ack, s);} else {log.info(">>>>>>>消息发送失败{}", ack);}}
}

消费者

import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import com.xiaojie.score.entity.Score;
import com.xiaojie.score.mapper.ScoreMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.Map;/*** @author * @version 1.0* @description: 积分的消费者* @date */
@Component
@Slf4j
public class ScoreConsumer {@Autowiredprivate ScoreMapper scoreMapper;@RabbitListener(queues = {"ykq_score_queue"})public void onMessage(Message message, @Headers Map<String, Object> headers, Channel channel) throws IOException {String orderId = message.getMessageProperties().getMessageId();if (StringUtils.isBlank(orderId)) {return;}log.info(">>>>>>>>消息id是:{}", orderId);String msg = new String(message.getBody());Score score = JSONObject.parseObject(msg, Score.class);if (score == null) {return;}//执行前去数据库查询,是否存在该数据,存在说明已经消费成功,不存在就去添加数据,添加成功丢弃消息Score dbScore = scoreMapper.selectByOrderId(orderId);if (dbScore != null) {//证明已经消费消息,告诉mq已经消费,丢弃消息channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);return;}Integer result = scoreMapper.save(score);if (result > 0) {//积分已经累加,删除消息channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);return;} else {log.info("消费失败,采取相应的人工补偿");} }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/1363.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Ajax详解

文章目录 1. 概述1.1 Ajax工作原理1.2 Ajax的作用1.3 同步异步 2. 原生Ajax3. Axios3.1 Axios的基本使用3.2 Axios快速入门3.3 请求方法的别名 1. 概述 Ajax 是 “Asynchronous JavaScript and XML”&#xff08;异步 JavaScript 和 XML&#xff09;的缩写。它是一种在无需重新…

RabbitMQ知识掌握 【进阶篇】

一、如何保证消息的可靠性 &#x1f349; 1.保证消息的可靠性投递 &#x1f95d; 在生产环境中由于一些不明原因&#xff0c;导致 rabbitmq 重启&#xff0c;在 RabbitMQ 重启期间生产者消息投递失败&#xff0c;导致消息丢失&#xff0c;需要手动处理和恢复。于是&#xff0…

微服务Gateway网关(自动定位/自定义过滤器/解决跨域)+nginx反向代理gateway集群

目录 Gateway网关 1.0.为什么需要网关&#xff1f; 1.1.如何使用gateway网关 1.2.网关从注册中心拉取服务 1.3.gateway自动定位 1.4.gateway常见的断言 1.5.gateway内置的过滤器 1.6.自定义过滤器-全局过滤器 1.7.解决跨域问题 2.nginx反向代理gateway集群 2.1.配置…

什么是 TCP 和 UDP?Java 中如何实现 TCP 和 UDP 协议

在计算机网络中&#xff0c;TCP&#xff08;传输控制协议&#xff09;和UDP&#xff08;用户数据报协议&#xff09;是两种最常用的传输层协议。它们都用于在网络上传输数据&#xff0c;但是它们之间有很多不同之处。本文将介绍TCP和UDP的基本概念&#xff0c;以及在Java中如何…

ubuntu20.04配置vscode

下载&#xff1a; https://az764295.vo.msecnd.net/stable/660393deaaa6d1996740ff4880f1bad43768c814/code_1.80.0-1688479026_amd64.debhttps://az764295.vo.msecnd.net/stable/660393deaaa6d1996740ff4880f1bad43768c814/code_1.80.0-1688479026_amd64.deb 安装&#xff1a…

Ubuntu 放弃了战斗向微软投降

导读这几天看到 Ubuntu 放弃 Unity 和 Mir 开发&#xff0c;转向 Gnome 作为默认桌面环境的新闻&#xff0c;作为一个Linux十几年的老兵和Linux桌面的开发者&#xff0c;内心颇感良多。Ubuntu 做为全世界Linux界的桌面先驱者和创新者&#xff0c;突然宣布放弃自己多年开发的Uni…

回首2023上半年:成长、思考、感恩

文章目录 每日一句正能量前言一、目标达成情况总结二、工作和学习成果总结三、下半年规划总结四、个人想法 后记附录 每日一句正能量 做一个向日葵族&#xff0c;面对阳光&#xff0c;不自艾自怜&#xff0c;每天活出最灿烂的自己。曾经拥有的&#xff0c;不要忘记。不能得到的…

day52

思维导图 比较指令结果的条件码 练习 汇编实现1-100的累加 .text .global _strat _start: mov r0,#0mov r1,#0 add_fun:add r0,r0,#1cmp r0,#100addls r1,r1,r0bls add_fun .end

Vue 项目路由、自定义指令、api方法自动引入资源(require.context使用)

前端项目&#xff08;当前我以Vue项目为例&#xff09;当我们把api挂载在main上后 // 将api挂载到vue的原型上 import api from /api Vue.prototype.$apiapi在src下会有一个api文件夹&#xff0c;结构如下&#xff1a; 通常情况下&#xff0c;api文件夹的index.js文件我们通常…

ChatGPT 最佳实践指南之:使用外部工具

Use external tools 使用外部工具 Compensate for the weaknesses of GPTs by feeding them the outputs of other tools. For example, a text retrieval system can tell GPTs about relevant documents. A code execution engine can help GPTs do math and run code. If a …

8.postgresql--Update join 和 Delete using

Update join Update join用于基于另一张表更新表数据&#xff0c;语法如下&#xff1a; UPDATE t1 SET t1.c1 new_value FROM t2 WHERE t1.c2 t2.c2;CREATE TABLE product_segment (id SERIAL PRIMARY KEY,segment VARCHAR NOT NULL,discount NUMERIC (4, 2) );INSERT INTO…

基于C/S架构工作原理序号工作步骤和理论的区别

基于C/S架构工作原理序号工作步骤和理论的区别 SSH 概念 对称加密linux 系统加密&#xff0c;就是加密和揭秘都是使用同一套密钥。 非对称加密有两个密钥&#xff1a;“私钥”和“公钥”。私钥加密后的密文&#xff0c;只能通过对应的公钥进行揭秘。而通过私钥推理出公钥的…

不满足于RPC,详解Dubbo的服务调用链路

系列文章目录 【收藏向】从用法到源码&#xff0c;一篇文章让你精通Dubbo的SPI机制 面试Dubbo &#xff0c;却问我和Springcloud有什么区别&#xff1f; 超简单&#xff0c;手把手教你搭建Dubbo工程&#xff08;内附源码&#xff09; Dubbo最核心功能——服务暴露的配置、使用…

数据可视化——用python绘制简单的折线图

文章目录 前言JSON使用 pyecharts 模块绘制折线图下载 pyecharts 模块使用 pyecharts 模块绘制简单的折线图添加配置选项 前言 前面我们已经学习了python的基础语法和面向对象&#xff0c;那么接下来我们将学习python编程语言的过人之处——数据的可视化之折线图。 JSON 说到…

C/C++内存泄漏原因分析与应对方法

内存泄漏 一、内存泄漏的危害&#xff1a; 内存泄漏会导致当前应用程序消耗更多的内存&#xff0c;使得其他应用程序可用的内存更少了。 如果有个进程可用的内存不够&#xff0c;就会触发Linux操作系统的直接/后台内存回收&#xff08;即将一些内存页的数据写到磁盘里&#…

springboot服务端接口公网远程调试,并实现HTTP服务监听

文章目录 前言1. 本地环境搭建1.1 环境参数1.2 搭建springboot服务项目 2. 内网穿透2.1 安装配置cpolar内网穿透2.1.1 windows系统2.1.2 linux系统 2.2 创建隧道映射本地端口2.3 测试公网地址 3. 固定公网地址3.1 保留一个二级子域名3.2 配置二级子域名3.2 测试使用固定公网地址…

Argo CD 入门扫盲使用

目录 一、什么是 argo cd 二、为什么使用 argo cd 三、argo cd 架构图 四、Argo CD 使用 1、安装 Argo CD 2、安装 Argo CD CLI 3、发布 Argo CD 服务 4、获取 Argo CD 密码 5、准备 Git 仓库 6、创建 Argo CD App 7、版本升级 8、版本回滚 一、什么是 argo cd A…

数据结构(王道)——线性表的存储结构之循环表

一、循环单链表 定义&#xff1a; 循环单链表代码实现 创建并初始化、判断循环单链表是否为空、判断结点p是否为循环单链表的表尾结点的代码操作。 二、循环双链表 定义&#xff1a; 循环双链表代码实现 创建并初始化、判断循环双链表是否为空、判断结点p是否为循环双链表的…

JVM重点整理

一、虚拟机架构图 二、类加载过程 类加载器的作用&#xff1a;负责把class文件加载到内存中 类加载过程&#xff1a; 加载&#xff1a; 通过类的全限定名获取此类的二进制字节流文件的编码结构---->运行时的内存结构内存中生成一个class对象 链接&#xff1a; 验证&#x…

智能电表远程抄表系统原理

智能电表远程抄表系统是现代智能电网建设的重要组成部分&#xff0c;它利用物联网技术实现电表数据的远程采集、传输和处理&#xff0c;提高了电力公司的抄表效率&#xff0c;同时也为用户提供了更加便捷、准确的用电服务。本文将从远程智能电表抄表系统的工作原理、特点、应用…