rabbitmq如何保证消息不被重复消费_如何保证消息不被重复消费

一. 重复消息

为什么会出现消息重复?消息重复的原因有两个:1.生产时消息重复,2.消费时消息重复。

1.1 生产时消息重复

由于生产者发送消息给MQ,在MQ确认的时候出现了网络波动,生产者没有收到确认,实际上MQ已经接收到了消息。这时候生产者就会重新发送一遍这条消息。

生产者中如果消息未被确认,或确认失败,我们可以使用定时任务+(redis/db)来进行消息重试。

@Component@Slf4Jpublic class SendMessage {    @Autowired    private MessageService messageService;    @Autowired    private RabbitTemplate rabbitTemplate;    // 最大投递次数    private static final int MAX_TRY_COUNT = 3;    /**     * 每30s拉取投递失败的消息, 重新投递     */    @Scheduled(cron = "0/30 * * * * ?")    public void resend() {        log.info("开始执行定时任务(重新投递消息)");        List msgLogs = messageService.selectTimeoutMsg();        msgLogs.forEach(msgLog -> {            String msgId = msgLog.getMsgId();            if (msgLog.getTryCount() >= MAX_TRY_COUNT) {                messageService.updateStatus(msgId, Constant.MsgLogStatus.DELIVER_FAIL);                log.info("超过最大重试次数, 消息投递失败, msgId: {}", msgId);            } else {                messageService.updateTryCount(msgId, msgLog.getNextTryTime());// 投递次数+1                CorrelationData correlationData = new CorrelationData(msgId);                rabbitTemplate.convertAndSend(msgLog.getExchange(), msgLog.getRoutingKey(), MessageHelper.objToMsg(msgLog.getMsg()), correlationData);// 重新投递                log.info("第 " + (msgLog.getTryCount() + 1) + " 次重新投递消息");            }        });        log.info("定时任务执行结束(重新投递消息)");    }}

1.2 消费时消息重复

消费者消费成功后,再给MQ确认的时候出现了网络波动,MQ没有接收到确认,为了保证消息被消费,MQ就会继续给消费者投递之前的消息。这时候消费者就接收到了两条一样的消息。

修改消费者,模拟异常

@RabbitListener(queuesToDeclare = @Queue(value = "javatrip", durable = "true"))public void receive(String message, @Headers Map headers, Channel channel) throws Exception{    System.out.println("重试"+System.currentTimeMillis());    System.out.println(message);    int i = 1 / 0;}

配置yml重试策略

spring:  rabbitmq:    listener:      simple:        retry:          enabled: true # 开启消费者进行重试          max-attempts: 5 # 最大重试次数          initial-interval: 3000 # 重试时间间隔

由于重复消息是由于网络原因造成的,因此不可避免重复消息。但是我们需要保证消息的幂等性。

二. 如何保证消息幂等性

让每个消息携带一个全局的唯一ID,即可保证消息的幂等性,具体消费过程为:

  1. 消费者获取到消息后先根据id去查询redis/db是否存在该消息
  2. 如果不存在,则正常消费,消费完毕后写入redis/db
  3. 如果存在,则证明消息被消费过,直接丢弃。
0fa44ba3a7275bdb65404260cc562d29.png

生产者

@PostMapping("/send")public void sendMessage(){    JSONObject jsonObject = new JSONObject();    jsonObject.put("message","Java旅途");    String json = jsonObject.toJSONString();    Message message = MessageBuilder.withBody(json.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("UTF-8").setMessageId(UUID.randomUUID()+"").build();    amqpTemplate.convertAndSend("javatrip",message);}

消费者

@Component@RabbitListener(queuesToDeclare = @Queue(value = "javatrip", durable = "true"))public class Consumer {    @RabbitHandler    public void receiveMessage(Message message) throws Exception {        Jedis jedis = new Jedis("localhost", 6379);        String messageId = message.getMessageProperties().getMessageId();        String msg = new String(message.getBody(),"UTF-8");        System.out.println("接收到的消息为:"+msg+"==消息id为:"+messageId);        String messageIdRedis = jedis.get("messageId");        if(messageId == messageIdRedis){            return;        }        JSONObject jsonObject = JSONObject.parseObject(msg);        String email = jsonObject.getString("message");        jedis.set("messageId",messageId);    }}

如果需要存入db的话,可以直接将这个ID设为消息的主键,下次如果获取到重复消息进行消费时,由于数据库主键的唯一性,则会直接抛出异常。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/468837.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Git 分布式版本控制工具 06在IDEA中使用Git:获取Git仓库+本地仓库操作+远程仓库操作+创建/查看/切换/推送/合并分支操作

在IDEA中使用Git7. 在IDEA中使用Git7.1 在IDEA中配置Git7.2 获取Git仓库7.2.1 本地初始化仓库7.2.2 从远程仓库克隆7.3 Git忽略文件7.4 本地仓库操作7.4.1 将文件加入暂存区7.4.2 将暂存区文件提交到版本库7.4.3 查看日志7.5 远程仓库操作7.5.1 查看远程仓库7.5.2 添加远程仓库…

linux 内核宏container_of剖析

1、前面说的我在好几年前读linux 驱动代码的时候看到这个宏,百度了好久,知道怎么用了,但是对实现过程和原理还是一知半解。container_of宏 在linux内核代码里面使用次数非常非常多,对于喜欢linux编程的同学来说,了解其…

目录树 删除 数据结构_数据结构:B树和B+树的插入、删除图文详解

B树1.1B树的定义B树也称B-树,它是一颗多路平衡查找树。我们描述一颗B树时需要指定它的阶数,阶数表示了一个结点最多有多少个孩子结点,一般用字母m表示阶数。当m取2时,就是我们常见的二叉搜索树。一颗m阶的B树定义如下:1)每个结点最…

MMdetection框架速成系列 第01部分:学习路线图与步骤+优先学习的两个目标检测模型代码+loss计算流程+遇到问题如何求助+Anaconda3下的安装教程(mmdet+mmdet3d)

mmdetection 学习目录1 mmdetection 学习建议1.1 mmdetection 学习的第一件事1.2 mmdetection学习路线图1.2.1 优先看的两个库1.2.2 阅读代码1.2.3 代码学习步骤1.2.4 建议优先学习的两个目标检测模型代码1.2.5 loss计算流程的攻坚克难1.3 遇到问题如何求助2 Anaconda3下的安装…

机器人循迹小车资料

前言 我记得在大学的时候,参加电子比赛,我们有一个题目是平衡小车项目,那个对基础要求还是比较高的,总结了一些平衡车机器的资料,希望对大家有帮助。 正文 关注公众号,回复【机器人资料】获取

单片机实现环形队列_稀疏数组和队列(二)

队列的介绍队列以一种先入先出(FIFO)的线性表,还有一种先入后出的线性表(FILO)叫做栈。教科书上有明确的定义与描述。类似于现实中排队时的队列(队尾进,队头出),队列只在线性表两端进行操作,插入元素的一端称为表尾,删…

漫画-Linux中断子系统综述

1、中断引发的面试教训2、什么是中断?中断: (英语:Interrupt)指当出现需要时,CPU暂时停止当前程序的执行转而执行处理新情况的程序和执行过程。即在程序运行过程中,系统出现了一个必须由CPU立即…

SQL强化(二) 在Oracle 中写代码

一 : 关于查询中的转换 -- 字符串转换 一 : decode 函数 转换 SELECT DECODE ( PROTYPE.PRO_TYPE_DATE, L, 长, m, 短, 默认值 )FROM PROTYPE -- 字符串转换 二 : case 转换 SELECT T1.PRO_TYPE_ID, T1.PRO_TYPE_NAME, CASE T1.PRO_TYPE_DATEWHEN S THEN 短WHEN L THEN 长EL…

vue 非es6 写法怎么按须加载_Vue源码必学指南:flow(语法检查)以及rollup(模板打包)...

点击上方蓝色字关注我们~一、前言虽然 Vue3 已经公开了代码,但是Vue3.0还处于开发阶段,直接上手使用Typescript是不合适的 , 对于前端的老手是不错的选择, 但是如果没有研究源码经验的开发者还是建议使用完善, 成熟的源码进行入手. 而 Vue 2.x 中使用的 flow 是一个…

漫画|创业到底有多难?

我有一群同事,我们一起经历了一个产品从无到有的过程,从开始的斗志满满到最后跟老板的不欢而散,其中的辛酸苦楚也许只能我们自己能体味,在这过程中,我们共同经历过的事情,有快乐的,悲伤的&#…

生成step文件_利用opencv给彦女王生成一副蒙太奇画像

大家好呀,前两天烈阳天道1上映了,不知道大家看没看呢,里面还有一小段彦穿越虫洞与猴哥相遇的画面,彦女王啊啊啊~~所以我去网上爬了二百来张我大学的风景画,然后找了以前存的彦女王的图片,生成了一幅蒙太奇画…

浪漫情人节|C语言画心型

1.前言新年第一天上班,先祝大家新年快乐,巧的是,今天刚好又是情人节,所以想了下用C实现画心形符号~过年的时候,跟我表哥去接新娘,实地看了下,如果一个汉字内心没有点浪漫的细胞,很难…

CS190.1x Scalable Machine Learning

这门课是CS100.1x的后续课,看课程名字就知道这门课主要讲机器学习。难度也会比上一门课大一点。如果你对这门课感兴趣,可以看看我这篇博客,如果对PySpark感兴趣,可以看我分析作业的博客。 Course Software Setup 这门课的环境配置…

./4.sh: No such file or directory

sh push到目标板后提示出错 #!/bin/bash echo "ladjfaosdjfoia"头bin/bash 我们要看,sh在哪里 130|rk3399_idpad:/data # which sh /system/bin/sh rk3399_idpad:/data # 所以上面的代码应该写成 #!/system/bin/sh echo "ladjfaosdjfoia"修…

Spring总结四:IOC和DI 注解方式

首先我们要了解注解和xml配置的区别: 作用一样,但是注解写在Bean的上方来代替我们之前在xml文件中所做的bean配置,也就是说我们使用了注解的方式,就不用再xml里面进行配置了,相对来说注解方式更为简便。 IOC获取对象注…

和后台如何对接_业务系统如何对接第三方服务?

在产品工作中,我们时常要对接第三方服务。本文作者从过往的对接项目经历中,提炼的关于业务系统,如何对接第三方服务的方法论,希望能对你有所帮助。随着公司业务的发展,我们有时会遇到,需要在自身业务系统中…

adb 启动某个apk

有时候需要用apk来启动某个apk adb shell am start -n com.android.launcher3/com.android.launcher3.Launcher 具体查看~ /rk3399_7in1/packages/apps/Launcher3/AndroidManifest.xml

Makefile 文件中的:obj-$(CONFIG_TEST) += test.o,这一类的是什么意思?

1、obj-$ $(CONFIG_TEST) 是一个整体,$(bbb)表示引用变量 bbb 比如定义 CONFIG_TESTy $(CONFIG_TEST) 就是 y obj-$(CONFIG_TEST) 就是 obj-y 又比如定义 CONFIG_TESTm $(CONFIG_TEST) 就是 m obj-$(CONFIG_TEST) 就是 obj-m obj-y foo.o 该例子告诉Kbuild在这目…

Kconfig中的“depends on”和“select”

在Kconfig文件中: config Adepends on Bselect C它的含义是:CONFIG_A配置与否,取决于CONFIG_B是否配置。一旦CONFIG_A配置了,CONFIG_C也自动配置了。 参考资料:“select” vs “depends” in kernel Kconfig。 所以去…

数组的合并和升序排列_leetcode 33 搜索旋转排序数组

给你一个升序排列的整数数组 nums ,和一个整数 target 。假设按照升序排序的数组在预先未知的某个点上进行了旋转。(例如,数组 [0,1,2,4,5,6,7] 可能变为 [4,5,6,7,0,1,2] )。请你在数组中搜索 target ,如果数组中存在…