Rabbitmq消息不丢失

目录

  • 一、消息不丢失
    • 1.消息确认
    • 2.消息确认业务封装
      • 2.1 发送确认消息测试
      • 2.2 消息发送失败,设置重发机制

一、消息不丢失

消息的不丢失,在MQ角度考虑,一般有三种途径:
1,生产者不丢数据
2,MQ服务器不丢数据
3,消费者不丢数据
保证消息不丢失有两种实现方式:
1,开启事务模式
2,消息确认模式
说明:开启事务会大幅降低消息发送及接收效率,使用的相对较少,因此我们生产环境一般都采取消息确认模式,以下我们只是讲解消息确认模式

1.消息确认

消息持久化
如果希望RabbitMQ重启之后消息不丢失,那么需要对以下3种实体均配置持久化
Exchange
声明exchange时设置持久化(durable = true)并且不自动删除(autoDelete = false)
Queue
声明queue时设置持久化(durable = true)并且不自动删除(autoDelete = false)
message
发送消息时通过设置deliveryMode=2持久化消息

处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。这样,如果消息持久化磁盘之前,rabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。那么如何持久化呢,其实也很容易,就下面两步:
1、将queue的持久化标识durable设置为true,则代表是一个持久的队列
2、发送消息的时候将deliveryMode=2
这样设置以后,rabbitMQ就算挂了,重启后也能恢复数据

发送确认
有时,业务处理成功,消息也发了,但是我们并不知道消息是否成功到达了rabbitmq,如果由于网络等原因导致业务成功而消息发送失败,那么发送方将出现不一致的问题,此时可以使用rabbitmq的发送确认功能,即要求rabbitmq显式告知我们消息是否已成功发送。

手动消费确认
有时,消息被正确投递到消费方,但是消费方处理失败,那么便会出现消费方的不一致问题。比如:订单已创建的消息发送到用户积分子系统中用于增加用户积分,但是积分消费方处理却都失败了,用户就会问:我购买了东西为什么积分并没有增加呢?
要解决这个问题,需要引入消费方确认,即只有消息被成功处理之后才告知rabbitmq以ack,否则告知rabbitmq以nack

2.消息确认业务封装

service-mq修改配置
开启rabbitmq消息确认配置,在common的配置文件中都已经配置好了!

spring:rabbitmq:host: 192.168.121.140port: 5672username: adminpassword: adminpublisher-confirms-type: correlated  #交换机的确认publisher-returns: true  #队列的确认listener:simple:acknowledge-mode: manual #默认情况下消息消费者是自动确认消息的,如果要手动确认消息则需要修改确认模式为manualprefetch: 1 # 消费者每次从队列获取的消息数量。此属性当不设置时为:轮询分发,设置为1为:公平分发

搭建rabbit-util模块
由于消息队列是公共模块,我们把mq的相关业务封装到该模块,其他service微服务模块都可能使用,因此我们把他封装到一个单独的模块,需要使用mq的模块直接引用该模块即可
搭建方式如:
pom.xml

    <dependencies><!--rabbitmq消息队列--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><!--rabbitmq 协议--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-bus-amqp</artifactId></dependency></dependencies>

4.2.4 封装发送端消息确认

/*** @Description 消息发送确认* <p>* ConfirmCallback  只确认消息是否正确到达 Exchange 中* ReturnCallback   消息没有正确到达队列时触发回调,如果正确到达队列不执行* <p>* 1. 如果消息没有到exchange,则confirm回调,ack=false* 2. 如果消息到达exchange,则confirm回调,ack=true* 3. exchange到queue成功,则不回调return* 4. exchange到queue失败,则回调return* */
@Component
@Slf4j
public class MQProducerAckConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;// 修饰一个非静态的void()方法,在服务器加载Servlet的时候运行,并且只会被服务器执行一次在构造函数之后执行,init()方法之前执行。@PostConstructpublic void init() {rabbitTemplate.setConfirmCallback(this);            //指定 ConfirmCallbackrabbitTemplate.setReturnCallback(this);             //指定 ReturnCallback}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {log.info("消息发送成功:" + JSON.toJSONString(correlationData));} else {log.info("消息发送失败:" + cause + " 数据:" + JSON.toJSONString(correlationData));}}@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {// 反序列化对象输出System.out.println("消息主体: " + new String(message.getBody()));System.out.println("应答码: " + replyCode);System.out.println("描述:" + replyText);System.out.println("消息使用的交换器 exchange : " + exchange);System.out.println("消息使用的路由键 routing : " + routingKey);}}

封装消息发送

@Service
public class RabbitService {@Autowiredprivate RabbitTemplate rabbitTemplate;/***  发送消息* @param exchange 交换机* @param routingKey 路由键* @param message 消息*/public boolean sendMessage(String exchange, String routingKey, Object message) {rabbitTemplate.convertAndSend(exchange, routingKey, message);return true;}}

2.1 发送确认消息测试

消息发送端

@RestController
@RequestMapping("/mq")
public class MqController {@Autowiredprivate RabbitService rabbitService;/*** 消息发送*///http://localhost:8282/mq/sendConfirm@GetMapping("sendConfirm")public Result sendConfirm() {rabbitService.sendMessage("exchange.confirm", "routing.confirm", "来人了,开始接客吧!");return Result.ok();}
}

消息接收端

@Component
public class ConfirmReceiver {@SneakyThrows
@RabbitListener(bindings=@QueueBinding(value = @Queue(value = "queue.confirm",autoDelete = "false"),exchange = @Exchange(value = "exchange.confirm",autoDelete = "true"),key = {"routing.confirm"}))
public void process(Message message, Channel channel){System.out.println("RabbitListener:"+new String(message.getBody()));// false 确认一个消息,true 批量确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}
}

测试:http://localhost:8282/mq/sendConfirm

2.2 消息发送失败,设置重发机制

实现思路:借助redis来实现重发机制
模块中添加依赖

<!-- redis -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency><!-- spring2.X集成redis所需common-pool2-->
<dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId>
</dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId>
</dependency>

自定义一个实体类来接收消息

@Data
public class GmallCorrelationData extends CorrelationData {//  消息主体private Object message;//  交换机private String exchange;//  路由键private String routingKey;//  重试次数private int retryCount = 0;//  消息类型  是否是延迟消息private boolean isDelay = false;//  延迟时间private int delayTime = 10;
}

修改发送方法

//  封装一个发送消息的方法
public Boolean sendMsg(String exchange,String routingKey, Object msg){//  将发送的消息 赋值到 自定义的实体类GmallCorrelationData gmallCorrelationData = new GmallCorrelationData();//  声明一个correlationId的变量String correlationId = UUID.randomUUID().toString().replaceAll("-","");gmallCorrelationData.setId(correlationId);gmallCorrelationData.setExchange(exchange);gmallCorrelationData.setRoutingKey(routingKey);gmallCorrelationData.setMessage(msg);//  发送消息的时候,将这个gmallCorrelationData 对象放入缓存。redisTemplate.opsForValue().set(correlationId, JSON.toJSONString(gmallCorrelationData),10, TimeUnit.MINUTES);//  调用发送消息方法//this.rabbitTemplate.convertAndSend(exchange,routingKey,msg);this.rabbitTemplate.convertAndSend(exchange,routingKey,msg,gmallCorrelationData);//  默认返回truereturn true;
}发送失败调用重发方法  MQProducerAckConfig 类中修改
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {//  ack = true 说明消息正确发送到了交换机if (ack){System.out.println("哥们你来了.");log.info("消息发送到了交换机");}else {//  消息没有到交换机log.info("消息没发送到交换机");//  调用重试发送方法this.retrySendMsg(correlationData);}
}@Override
public void returnedMessage(Message message, int code, String codeText, String exchange, String routingKey) {System.out.println("消息主体: " + new String(message.getBody()));System.out.println("应答码: " + code);System.out.println("描述:" + codeText);System.out.println("消息使用的交换器 exchange : " + exchange);System.out.println("消息使用的路由键 routing : " + routingKey);//  获取这个CorrelationData对象的Id  spring_returned_message_correlationString correlationDataId = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");//  因为在发送消息的时候,已经将数据存储到缓存,通过 correlationDataId 来获取缓存的数据String strJson = (String) this.redisTemplate.opsForValue().get(correlationDataId);//  消息没有到队列的时候,则会调用重试发送方法GmallCorrelationData gmallCorrelationData = JSON.parseObject(strJson,GmallCorrelationData.class);//  调用方法  gmallCorrelationData 这对象中,至少的有,交换机,路由键,消息等内容.this.retrySendMsg(gmallCorrelationData);
}/*** 重试发送方法* @param correlationData   父类对象  它下面还有个子类对象 GmallCorrelationData*/
private void retrySendMsg(CorrelationData correlationData) {//  数据类型转换  统一转换为子类处理GmallCorrelationData gmallCorrelationData = (GmallCorrelationData) correlationData;//  获取到重试次数 初始值 0int retryCount = gmallCorrelationData.getRetryCount();//  判断if (retryCount>=3){//  不需要重试了log.error("重试次数已到,发送消息失败:"+JSON.toJSONString(gmallCorrelationData));} else {//  变量更新retryCount+=1;//  重新赋值重试次数 第一次重试 0->1 1->2 2->3gmallCorrelationData.setRetryCount(retryCount);System.out.println("重试次数:\t"+retryCount);//  更新缓存中的数据this.redisTemplate.opsForValue().set(gmallCorrelationData.getId(),JSON.toJSONString(gmallCorrelationData),10, TimeUnit.MINUTES);//  调用发送消息方法 表示发送普通消息  发送消息的时候,不能调用 new RabbitService().sendMsg() 这个方法this.rabbitTemplate.convertAndSend(gmallCorrelationData.getExchange(),gmallCorrelationData.getRoutingKey(),gmallCorrelationData.getMessage(),gmallCorrelationData);}
}

测试:只需修改(错误信息)
在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/40151.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

设计HTML5列表和超链接

在网页中&#xff0c;大部分信息都是列表结构&#xff0c;如菜单栏、图文列表、分类导航、新闻列表、栏目列表等。HTML5定义了一套列表标签&#xff0c;通过列表结构实现对网页信息的合理排版。另外&#xff0c;网页中还包含大量超链接&#xff0c;通过它实现网页、位置的跳转&…

小程序CSS button按钮自定义高度之后不居中

问题&#xff1a; 按钮设置高度后不居中 <view><button class"btn1" size"">Save</button> </view> page {font-size: 30rpx; }.btn1 {margin-top: 100rpx;height: 190rpx;background: linear-gradient(90deg, #FF8A06, #FF571…

Wi-Fi 安全在学校中的重要性

Wi-Fi 是教育机构的基础设施&#xff0c;从在线家庭作业门户到虚拟教师会议&#xff0c;应有尽有。大多数 K-12 管理员对自己的 Wi-Fi 网络的安全性充满信心&#xff0c;并认为他们现有的网络安全措施已经足够。 不幸的是&#xff0c;这种信心往往是错误的。Wi-Fi 安全虽然经常…

【数据结构OJ题】链表中倒数第k个结点

原题链接&#xff1a;https://www.nowcoder.com/practice/529d3ae5a407492994ad2a246518148a?tpId13&&tqId11167&rp2&ru/activity/oj&qru/ta/coding-interviews/question-ranking 目录 1. 题目描述 2. 思路分析 3. 代码实现 1. 题目描述 2. 思路分析 …

VectorStyler for Mac: 让你的创意无限绽放的全新设计工具

VectorStyler for Mac是一款专为Mac用户打造的矢量设计工具&#xff0c;它结合了功能强大的矢量编辑器和创意无限的样式编辑器&#xff0c;让你的创意无限绽放。 VectorStyler for Mac拥有直观简洁的用户界面&#xff0c;让你能够轻松上手。它提供了丰富的矢量绘图工具&#x…

JavaWeb博客项目--推荐算法--完整代码及思路

基于用户的协同过滤算法&#xff08;UserCF&#xff09; 因为我写的是博客项目&#xff0c;博客数量可能比用户数量还多 所以选择基于用户的协同过滤算法 重要思想 当要向用户u进行推荐时&#xff0c;我们先找出与用户u最相似的几个用户&#xff0c;再从这几个用户的喜欢的物…

数据可视化和数字孪生相互促进的关系

数据可视化和数字孪生是当今数字化时代中备受关注的两大领域&#xff0c;它们在不同层面和领域为我们提供了深入洞察和智能决策的机会&#xff0c;随着两种技术的不断融合发展&#xff0c;很多人会将他们联系在一起&#xff0c;本文就带大家浅谈一下二者之间相爱相杀的关系。 …

Springboot集成ip2region离线IP地名映射-修订版

title: Springboot集成ip2region离线IP地名映射 date: 2020-12-16 11:15:34 categories: springboot description: Springboot集成ip2region离线IP地名映射 1. 背景2. 集成 2.1. 步骤2.2. 样例2.3. 响应实例DataBlock2.4. 响应实例RegionAddress 3. 打开浏览器4. 源码地址&…

OpenShift 4 - 基于 MinIO 安装 Red Hat Quay 镜像仓库

《OpenShift / RHEL / DevSecOps 汇总目录》 说明&#xff1a;本文已经在 OpenShift 4.13 Quay 3.9 的环境中验证 本文适合在单机 OpenShift 环境安装 Red Hat Quay 镜像仓库。 另外《OpenShift 4 - 安装 ODF 并部署红帽 Quay (1 Worker)》也可以在单节点部署。 而《OpenShif…

前后端分离------后端创建笔记(11)用户删除

B站视频&#xff1a;30-用户删除&结束语_哔哩哔哩_bilibili 1、现在我们要做一个删除的功能 1.1 首先做一个删除的功能接口&#xff0c;第一步先来到后端&#xff0c;做一个删除的接口 2、删除我们用Delete请求 3、方法名我给他改一下 3.1这里给他调一下删除方法&#xf…

在一小时内构建您的深度学习应用程序

一、说明 我已经做了将近十年的数据分析。有时&#xff0c;我使用机器学习技术从数据中获取见解&#xff0c;并且我习惯于使用经典 ML。 虽然我已经通过了神经网络和深度学习的一些MOOC&#xff0c;但我从未在我的工作中使用过它们&#xff0c;这个领域对我来说似乎很有挑战性。…

智能数据建模软件DTEmpower 2023R2新版本功能介绍

DTEmpower是由天洑软件自主研发的一款通用的智能数据建模软件&#xff0c;致力于帮助工程师及工科专业学生&#xff0c;利用工业领域中的仿真、试验、测量等各类数据进行挖掘分析&#xff0c;建立高质量的数据模型&#xff0c;实现快速设计评估、实时仿真预测、系统参数预警、设…

机器学习深度学习——自注意力和位置编码(数学推导+代码实现)

&#x1f468;‍&#x1f393;作者简介&#xff1a;一位即将上大四&#xff0c;正专攻机器学习的保研er &#x1f30c;上期文章&#xff1a;机器学习&&深度学习——注意力分数&#xff08;详细数学推导代码实现&#xff09; &#x1f4da;订阅专栏&#xff1a;机器学习…

Cat(2):下载与安装

1 github源码下载 要安装CAT&#xff0c;首先需要从github上下载最新版本的源码。 官方给出的建议如下&#xff1a; 注意cat的3.0代码分支更新都发布在master上&#xff0c;包括最新文档也都是这个分支注意文档请用最新master里面的代码文档作为标准&#xff0c;一些开源网站…

MySQL— 基础语法大全及操作演示!!!(上)

MySQL—— 基础语法大全及操作演示&#xff08;上&#xff09; 一、MySQL概述1.1 、数据库相关概念1.1.1 MySQL启动和停止 1.2 、MySQL 客户端连接1.3 、数据模型 二、SQL2.1、SQL通用语法2.2、SQL分类2.3、DDL2.3.1 DDL — 数据库操作2.3.1 DDL — 表操作 2.4、DML2.4.1 DML—…

等保案例 5

用户简介 四川省人民代表大会常务委员会&#xff0c;作为省人民代表大会地常设机关&#xff0c;随着政府部门信息化程度地提高&#xff0c;对信息系统地依赖程度越来越高&#xff0c;同时由于网络安全形势日益严峻、新型攻击层出不穷&#xff0c;单位信息化所面临地各种风险也…

途乐证券-宁德时代发力超充赛道,高压快充概念强势拉升,泰永长征涨停

高压快充概念17日盘中强势拉升&#xff0c;到发稿&#xff0c;泰永长征涨停&#xff0c;万祥科技涨超9%&#xff0c;英可瑞涨逾8%&#xff0c;迦南智能涨超4%。 消息面上&#xff0c;8月16日&#xff0c;宁德时代举行线下新品发布会&#xff0c;正式发布全球首款磷酸铁锂4C超充…

Spark第二课RDD的详解

1.前言 RDD JAVA中的IO 1.小知识点穿插 1. 装饰者设计模式 装饰者设计模式:本身功能不变,扩展功能. 举例&#xff1a; 数据流的读取 一层一层的包装&#xff0c;进而将功能进行进一步的扩展 2.sleep和wait的区别 本质区别是字体不一样,sleep斜体,wait正常 斜体是静态方法…

经过几天的乱搞,已经搞出来第一次stm32点灯程序

看吧那个灯泡已经亮了 stm32跟51不同的地方是这里引脚一组16个&#xff0c;如PA0,PA1,PA2,,,,,,PA15 51一组8个 例如P00,P01,P02,,,,P07