RabbitMQ如何做到不丢不重

目录

MQTT协议

如何保证消息100%不丢失

生产端可靠性投递

​编辑

RabbitMQ的Broker端投

(1)消息持久化

(2)设置集群镜像模式

(3)消息补偿机制

消费端

ACK机制改为手动

总结


MQTT协议

先来说下MQTT协议中的3种语义,这个非常重要。

在MQTT协议中,给出了三种传递消息时能够提供的服务质量标准,这三种服务质量从低到高依次是:

At most once:至多一次。消息在传递时,最多会被送达一次。也就是说,没什么消息可靠性保证,允许丢消息。
At least once:至少一次。消息在传递时,至少会被送达一次。也就是说,不允许丢消息,但是允许有少量重复消息出现。
Exactly once:恰好一次。消息在传递时,只会被送达一次,不允许丢失也不允许重复,这个是最高的等级 这个服务质量标准不仅适用于MQTT,对所有的消息队列都是适用的。现在常用的绝大部分消息队列提供的服务质量都是 At least once,包括RocketMQ、RabbitMQ和Kafka都是这样。也就是说,消息队列很难保证消息不重复。

At least once+幂等消费=Exactly once

如何保证消息100%不丢失

消息从生产端到消费端消费要经过3个步骤:

  1. 生产端发送消息到RabbitMQ;
  2. RabbitMQ发送消息到消费端;
  3. 消费端消费这条消息;  

所以要保证消息不丢,就得从三个方面入手,分别是生产端、RabbitMQ的Broker端、消费端。三个都保证不丢失,才能保证100%不丢。

生产端可靠性投递

事务机制:

// 设置channel开启事务
rabbitTemplate.setChannelTransacted(true);@Bean
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory){return new RabbitTransactionManager(connectionFactory);}@Transactional(rollbackFor = Exception.class,transactionManager = "rabbitTransactionManager")
public void publishMessage(String message) throws Exception {rabbitTemplate.setMandatory(true);rabbitTemplate.convertAndSend("java",message);}

confirm消息确认机制:

# 开启发送确认
spring.rabbitmq.publisher-confirm-type=correlated
# 开启发送失败回退
spring.rabbitmq.publisher-returns=true	     
@Configuration
@Slf4j
public class RabbitMQConfig {@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void enableConfirmCallback() {//confirm 监听,当消息成功发到交换机 ack = true,没有发送到交换机 ack = false//correlationData 可在发送时指定消息唯一 idrabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if(!ack){//记录日志、落库定时任务扫描重发,同时对开发人员进行通知}});//当消息成功发送到交换机没有路由到队列触发此监听rabbitTemplate.setReturnsCallback(returned -> {//记录日志、落库、同时对开发人员进行通知});}
}

一般不推荐事务的模式,因为是同步的会影响性能,所以都会采用异步回调的confirm模式。

RabbitMQ的Broker端投

说三点:

(1)要保证rabbitMQ不丢失消息,那么就需要开启rabbitMQ的持久化机制,即把消息持久化到硬盘上,这样即使rabbitMQ挂掉在重启后仍然可以从硬盘读取消息;

(2)如果rabbitMQ单点故障怎么办,这种情况倒不会造成消息丢失,这里就要提到rabbitMQ的3种安装模式,单机模式、普通集群模式、镜像集群模式,这里要保证rabbitMQ的高可用就要配合HAPROXY做镜像集群模式

(3)如果硬盘坏掉怎么保证消息不丢失

(1)消息持久化

RabbitMQ 的消息默认存放在内存上面,如果不特别声明设置,消息不会持久化保存到硬盘上面的,如果节点重启或者意外crash掉,消息就会丢失。

所以就要对消息进行持久化处理。如何持久化,下面具体说明下:

要想做到消息持久化,必须满足以下三个条件,缺一不可。

1) Exchange 设置持久化

2)Queue 设置持久化

3)Message持久化发送:发送消息设置发送模式deliveryMode=2,代表持久化消息

(2)设置集群镜像模式

我们先来介绍下RabbitMQ三种部署模式:

1)单节点模式:最简单的情况,非集群模式,节点挂了,消息就不能用了。业务可能瘫痪,只能等待。

2)普通模式:消息只会存在与当前节点中,并不会同步到其他节点,当前节点宕机,有影响的业务会瘫痪,只能等待节点恢复重启可用(必须持久化消息情况下)。

3)镜像模式:消息会同步到其他节点上,可以设置同步的节点个数,但吞吐量会下降。属于RabbitMQ的HA方案

为什么设置镜像模式集群,因为队列的内容仅仅存在某一个节点上面,不会存在所有节点上面,所有节点仅仅存放消息结构和元数据。

如果想解决上面途中问题,保证消息不丢失,需要采用HA 镜像模式队列。

下面介绍下三种HA策略模式

1)同步至所有的

2)同步最多N个机器

3)只同步至符合指定名称的nodes

命令处理HA策略模版:rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]

1)为每个以“rock.wechat”开头的队列设置所有节点的镜像,并且设置为自动同步模式 rabbitmqctl set_policy ha-all "^rock.wechat" '{"ha-mode":"all","ha-sync-mode":"automatic"}' rabbitmqctl set_policy -p rock ha-all "^rock.wechat" '{"ha-mode":"all","ha-sync-mode":"automatic"}'

2)为每个以“rock.wechat.”开头的队列设置两个节点的镜像,并且设置为自动同步模式 rabbitmqctl set_policy -p rock ha-exacly "^rock.wechat" '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'

3)为每个以“node.”开头的队列分配指定的节点做镜像 rabbitmqctl set_policy ha-nodes "^nodes." '{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}'

但是:HA 镜像队列有一个很大的缺点就是:系统的吞吐量会有所下降。

(3)消息补偿机制

为什么还要消息补偿机制呢?难道消息还会丢失,没错,系统是在一个复杂的环境,不要想的太简单了,虽然以上的三种方案,基本可以保证消息的高可用不丢失的问题,

但是作为有追求的程序员来讲,要绝对保证我的系统的稳定性,有一种危机意识。

比如:持久化的消息,保存到硬盘过程中,当前队列节点挂了,存储节点硬盘又坏了,消息丢了,怎么办?

1)生产端首先将业务数据以及消息数据入库,需要在同一个事务中,消息数据入库失败,则整体回滚

2)根据消息表中消息状态,失败则进行消息补偿措施,重新发送消息处理。

消费端
ACK机制改为手动

RabbitMQ的自动ack机制默认在消息发出后就立即将这条消息删除,而不管消费端是否接收到,是否处理完。
我们需要进行手动消费

#开启手动ACK,消费消息的时候,就必须发送ack确认,不然消息永远还在队列中
spring.rabbitmq.listener.simple.acknowledge-mode=manual

 basicNack 方法的第三个参数代表是否重回队列,通常代码的报错并不会因为重试就能解决,所以可能这种情况:继续被消费,继续报错,重回队列,继续被消费…死循环。
一定要有重发消息次数的限制,或者干脆不入队,发送到Redis进行下记录也行。一般就不会再次入队了,而是记录并通知开发人员,进行手动处理

@RabbitHandler@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC))public void process(String msg, Message message, Channel channel) {long tag = message.getMessageProperties().getDeliveryTag();Action action = Action.SUCCESS;try {System.out.println("消费者RabbitDemoConsumer从RabbitMQ服务端消费消息:" + msg);if ("bad".equals(msg)) {throw new IllegalArgumentException("测试:抛出可重回队列的异常");}if ("error".equals(msg)) {throw new Exception("测试:抛出无需重回队列的异常");}} catch (IllegalArgumentException e1) {e1.printStackTrace();//根据异常的类型判断,设置action是可重试的,还是无需重试的action = Action.RETRY;} catch (Exception e2) {//打印异常e2.printStackTrace();//根据异常的类型判断,设置action是可重试的,还是无需重试的action = Action.REJECT;} finally {try {if (action == Action.SUCCESS) {//multiple 表示是否批量处理。true表示批量ack处理小于tag的所有消息。false则处理当前消息channel.basicAck(tag, false);} else if (action == Action.RETRY) {//Nack,拒绝策略,消息重回队列channel.basicNack(tag, false, true);} else {//Nack,拒绝策略,并且从队列中删除channel.basicNack(tag, false, false);}channel.close();} catch (Exception e) {e.printStackTrace();}}}
}
总结

如果需要保证消息在整条链路中不丢失,那就需要生产端、mq自身与消费端共同去保障。

生产端:对生产的消息进行状态标记,开启confirm机制,依据mq的响应来更新消息状态,使用定时任务重新投递超时的消息,多次投递失败进行报警。

mq自身:开启持久化,并在落盘后再进行ack。如果是镜像部署模式,需要在同步到多个副本之后再进行ack。

消费端:开启手动ack模式,在业务处理完成后再进行ack,并且需要保证幂等。

通过以上的处理,理论上不存在消息丢失的情况,但是系统的吞吐量以及性能有所下降。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/359124.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【LESS系列】简介和使用

LESS —— 一个CSS预编译框架,它在CSS的语法基础之上,引入了变量、Mixin(混入)、运算以及函数等功能,大大简化了CSS的编写,并且降低了CSS的维护成本,就像它的名称所说的那样,LESS可以…

掌握 React 与 React Native

今天面试被人问到React 与 React Native ,废话不多说,直接上干货。 React Native官网:https://reactnative.cn/ React Native App 页面布局直接通过 HTML 和 CSS 的前端基础技术进行布局与开发,大大降低了学习成本。 你如果早就在心里有开发一个自己 App 的想法,而刚…

cpu序列号唯一吗_怎么看电脑硬件是不是新的 有什么软件能检测吗?

怎么看电脑硬件是不是全新的很多人比较关心的问题,毕竟现在很多奸商为了最求利润什么招式都用的出来,其中依旧重新,依次充好是最常见的伎俩了。 怎么看电脑硬件是不是新的 有什么软件能检测吗?这个时候大家可能就会想了&#xff0…

扬州大学计算机考研难考吗,扬州大学(专业学位)计算机技术考研难吗

考研真题资料优惠价原价选择很多考生在准备扬州大学(专业学位)计算机技术考研难吗?是考研报考的时候都会产生这样的疑问:这个专业的研究生好吗?适合我吗?对我以后的人生和职业会有帮助吗?考生在准备扬州大学(专业学位)…

21世纪的设计模式:适配器模式

这是我的演讲“ 21世纪的设计模式”的第三部分。 适配器模式桥接世界。 在一个世界中,我们有一个概念的界面。 在另一个世界,我们有不同的界面。 这两个接口有不同的用途,但有时我们需要进行转移。 在一个编写良好的世界中,我们可…

在百度搜索页添加公司总部的客服电话

先看一下收录标准 1.登录百度数据开放平台:http://open.baidu.com/data/ms/nav/pc/ 注意事项: 2.进到信息提交的页面,先填写相应的资质,后提交信息即可,按照给出来的示例,去填写相对应的信息就可以了

理解Linux系统中的load average(图文版)

本文转自:http://heipark.iteye.com/blog/1340384 一、什么是load average? linux系统中的Load对当前CPU工作量的度量 (WikiPedia: the system load is a measure of the amount of work that a computer system is doing)。也有简单的说是进程队列的长度…

click点击后鼠标移去就失效怎么实现_鼠标右键失灵怎么办,你知道原因吗?

在上网时,有时会遇到按右键没有反应的情况。一开始还以为是中了病毒,但是重启之后,又恢复正常了,这到底是怎么回事呢?如果是在我电脑上操作电脑软件导致的鼠标右键失灵可能以下原因造成的。一:系统繁忙&…

计算机考研379分,考研379分报考南开大学被刷,是调剂还是二战?师姐建议非常肯定...

原标题:考研379分报考南开大学被刷,是调剂还是二战?师姐建议非常肯定随着考研国家线公布,34所自主划线的985大学也加快了复试的脚步,目前很多大学已经开始了复试工作,比如清华大学,南开大学等。…

微信小程序源码下载链接

参考链接备用:https://www.cnblogs.com/ytkah/p/9003620.html 微信小程序的火热程度大家都有所了解,也有很多牛人写了不错的小程序,今天ytkah就整理一些github上的小程序开源项目,源码可以直接下载来用,感兴趣的朋友赶…

开源源码合集

微信小程序的火热程度大家都有所了解,也有很多牛人写了不错的小程序,今天Benson就整理一些github上的小程序开源项目,源码可以直接下载来用,感兴趣的朋友赶紧去看看吧!仿豆瓣电影微信小程序 https://github.com/zc…

2019广西对口计算机分数线,2019广西本科第一批投档分数线出炉,网友:我差一点考上清华大学...

2019广西本科第一批投档分数线出炉,网友:我差一点考上清华大学……就在昨天,7月13号,广西招生考试院公布了“2019年普通高校招生本科第一批最低投档分数线”。不知道,参加第一批志愿填报的考生们,你们过线了…

Android 4.4及以上系统下应用的状态栏颜色渐变效果的实现

上一篇转载的博文里讲到了怎么开启状态栏透明的效果,不过如果在有ActionBar的情况下,会出现状态栏透明而ActionBar横亘在状态栏和内容之间的丑陋情况,如下图: 通过百度之后,发现了GitHub上有个项目可以实现从ActionBar…

学习Spring-Cloud –基础结构和配置

我有机会与Spring-Cloud一起创建了一组云就绪微服务的样本集,Spring-Cloud如何使不同的基础架构组件和服务很好地协同工作给我留下了深刻的印象。 我习惯于基于基于Netflix OSS的堆栈创建微服务,通常在Netflix堆栈中, Eureka被认为是微服务进…

【APICloud系列|6】使用APICloud接入客服系统美洽获取Appkey配置

1.使用超级管理员(没有注册的提前注册登录一下)登录美洽:https://app.meiqia.com/ 2.找到设置——SDK 3.点击添加APP配置 4.上传

架构(三层架构)、框架(MVC)、设计模式三者异同点

前言: 本博客主要针对架构、框架和设计模式三者的差别、还有三层和MVC的差别进行讨论、对于这三者一点都不了解的、请点在维基和百度百科上补补课、这里就不发链接了 软件架构(software architecture) 软件的架构是系统的一个草图、阐述了各个…

轩逸车联网功能怎么用_北斗已建设完成,那“北斗导航”怎么用?“短报文功能”怎么用?...

6月23号长征三号乙运载火箭将北斗导航系统(BDS)最后一颗卫星,也就是第55颗卫星成功发射升空,这是我国全球导航系统的收官之作,整个导航系统建设耗时20年。中国北斗(BDS)是继美国(GPS)、俄罗斯(GLONASS)第三大成熟的全球定位导航系统&#xff…

nba2k19登陆显示你与服务器,NBA2K19无法登陆了?NBA2K19连不上服务器怎么办?[图]...

NBA2K19是一款很不错的篮球类游戏,喜欢篮球的朋友一定不要错过了!大家都知道这款游戏可以联网也可以单机,但是有朋友反映NBA2K19连不上服务器,这是什么原因呢?NBA2K19连不上服务器:1、卸载其他NBA2K学习版内…

深入进货单-价格跟踪----宇然电脑公司管理软件

细节决定软件是不是一款好的管理软件,也验证软件是否成熟的标致.E2在细节上考虑周到. 在进货时,我们常常会想知道历史的进货过程.在E2下非常简单. 转载于:https://www.cnblogs.com/do_easy/p/4365236.html

转-Android Studio *.jar 与 *.aar 的生成与*.aar导入项目方法

主要讲解Android Studio中生成aar文件以及本地方式使用aar文件的方法。 在Android Studio中对一个自己库进行生成操作时将会同时生成*.jar与*.aar文件。 分别存储位置: *.jar:库/build/intermediates/bundles/debug(release)/classes.jar *.aar&#xff…