rabbitmq如何保证消息不被重复消费_如何保证消息不被重复消费

一. 重复消息

为什么会出现消息重复?消息重复的原因有两个:1.生产时消息重复,2.消费时消息重复。

1.1 生产时消息重复

由于生产者发送消息给MQ,在MQ确认的时候出现了网络波动,生产者没有收到确认,实际上MQ已经接收到了消息。这时候生产者就会重新发送一遍这条消息。

生产者中如果消息未被确认,或确认失败,我们可以使用定时任务+(redis/db)来进行消息重试。

@Component@Slf4Jpublic class SendMessage {    @Autowired    private MessageService messageService;    @Autowired    private RabbitTemplate rabbitTemplate;    // 最大投递次数    private static final int MAX_TRY_COUNT = 3;    /**     * 每30s拉取投递失败的消息, 重新投递     */    @Scheduled(cron = "0/30 * * * * ?")    public void resend() {        log.info("开始执行定时任务(重新投递消息)");        List msgLogs = messageService.selectTimeoutMsg();        msgLogs.forEach(msgLog -> {            String msgId = msgLog.getMsgId();            if (msgLog.getTryCount() >= MAX_TRY_COUNT) {                messageService.updateStatus(msgId, Constant.MsgLogStatus.DELIVER_FAIL);                log.info("超过最大重试次数, 消息投递失败, msgId: {}", msgId);            } else {                messageService.updateTryCount(msgId, msgLog.getNextTryTime());// 投递次数+1                CorrelationData correlationData = new CorrelationData(msgId);                rabbitTemplate.convertAndSend(msgLog.getExchange(), msgLog.getRoutingKey(), MessageHelper.objToMsg(msgLog.getMsg()), correlationData);// 重新投递                log.info("第 " + (msgLog.getTryCount() + 1) + " 次重新投递消息");            }        });        log.info("定时任务执行结束(重新投递消息)");    }}

1.2 消费时消息重复

消费者消费成功后,再给MQ确认的时候出现了网络波动,MQ没有接收到确认,为了保证消息被消费,MQ就会继续给消费者投递之前的消息。这时候消费者就接收到了两条一样的消息。

修改消费者,模拟异常

@RabbitListener(queuesToDeclare = @Queue(value = "javatrip", durable = "true"))public void receive(String message, @Headers Map headers, Channel channel) throws Exception{    System.out.println("重试"+System.currentTimeMillis());    System.out.println(message);    int i = 1 / 0;}

配置yml重试策略

spring:  rabbitmq:    listener:      simple:        retry:          enabled: true # 开启消费者进行重试          max-attempts: 5 # 最大重试次数          initial-interval: 3000 # 重试时间间隔

由于重复消息是由于网络原因造成的,因此不可避免重复消息。但是我们需要保证消息的幂等性。

二. 如何保证消息幂等性

让每个消息携带一个全局的唯一ID,即可保证消息的幂等性,具体消费过程为:

  1. 消费者获取到消息后先根据id去查询redis/db是否存在该消息
  2. 如果不存在,则正常消费,消费完毕后写入redis/db
  3. 如果存在,则证明消息被消费过,直接丢弃。
0fa44ba3a7275bdb65404260cc562d29.png

生产者

@PostMapping("/send")public void sendMessage(){    JSONObject jsonObject = new JSONObject();    jsonObject.put("message","Java旅途");    String json = jsonObject.toJSONString();    Message message = MessageBuilder.withBody(json.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("UTF-8").setMessageId(UUID.randomUUID()+"").build();    amqpTemplate.convertAndSend("javatrip",message);}

消费者

@Component@RabbitListener(queuesToDeclare = @Queue(value = "javatrip", durable = "true"))public class Consumer {    @RabbitHandler    public void receiveMessage(Message message) throws Exception {        Jedis jedis = new Jedis("localhost", 6379);        String messageId = message.getMessageProperties().getMessageId();        String msg = new String(message.getBody(),"UTF-8");        System.out.println("接收到的消息为:"+msg+"==消息id为:"+messageId);        String messageIdRedis = jedis.get("messageId");        if(messageId == messageIdRedis){            return;        }        JSONObject jsonObject = JSONObject.parseObject(msg);        String email = jsonObject.getString("message");        jedis.set("messageId",messageId);    }}

如果需要存入db的话,可以直接将这个ID设为消息的主键,下次如果获取到重复消息进行消费时,由于数据库主键的唯一性,则会直接抛出异常。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/468837.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JavaScript RegExp 对象

// 1 创建正则表达式两种方式 // 1.1 用new // 第1个参数模式是必须的 第2个模式修饰符参数可选 // i:忽略大小写 g:全局匹配 m:多行匹配 var box new RegExp(Box); var box new RegExp(Box, gim);// 1.2 用字面量的形式 // 两个反斜杠是正则表达式的字面量表示形成 var box …

Git 分布式版本控制工具 06在IDEA中使用Git:获取Git仓库+本地仓库操作+远程仓库操作+创建/查看/切换/推送/合并分支操作

在IDEA中使用Git7. 在IDEA中使用Git7.1 在IDEA中配置Git7.2 获取Git仓库7.2.1 本地初始化仓库7.2.2 从远程仓库克隆7.3 Git忽略文件7.4 本地仓库操作7.4.1 将文件加入暂存区7.4.2 将暂存区文件提交到版本库7.4.3 查看日志7.5 远程仓库操作7.5.1 查看远程仓库7.5.2 添加远程仓库…

linux 内核宏container_of剖析

1、前面说的我在好几年前读linux 驱动代码的时候看到这个宏,百度了好久,知道怎么用了,但是对实现过程和原理还是一知半解。container_of宏 在linux内核代码里面使用次数非常非常多,对于喜欢linux编程的同学来说,了解其…

目录树 删除 数据结构_数据结构:B树和B+树的插入、删除图文详解

B树1.1B树的定义B树也称B-树,它是一颗多路平衡查找树。我们描述一颗B树时需要指定它的阶数,阶数表示了一个结点最多有多少个孩子结点,一般用字母m表示阶数。当m取2时,就是我们常见的二叉搜索树。一颗m阶的B树定义如下:1)每个结点最…

MMdetection框架速成系列 第01部分:学习路线图与步骤+优先学习的两个目标检测模型代码+loss计算流程+遇到问题如何求助+Anaconda3下的安装教程(mmdet+mmdet3d)

mmdetection 学习目录1 mmdetection 学习建议1.1 mmdetection 学习的第一件事1.2 mmdetection学习路线图1.2.1 优先看的两个库1.2.2 阅读代码1.2.3 代码学习步骤1.2.4 建议优先学习的两个目标检测模型代码1.2.5 loss计算流程的攻坚克难1.3 遇到问题如何求助2 Anaconda3下的安装…

分页查询千万级数据慢

mysql查询千万级数据越来越慢优化: 1.分表:(固定某个表存多少数量的数据:例如:一张表存100w的数据量); 2.优化sql和建立适合的索引(复合索引); 3.使用redis缓存。(redis存一份ID.然后mysql存一份ID每次插入删除的时候同步即可。查询的时候只需要从redis里面找出适合的10个ID,然…

python爬取天气数据山东_Python的学习《山东省各城市天气爬取》

Pythonscrapy爬取山东各城市天气预报1、在命令提示符环境使用pip install scrapy命令安装Python扩展库scrapy,详见Python使用Scrapy爬虫框架爬取天涯社区小说“大宗师”全文2、使用下图中的命令创建爬虫项目3、进入爬虫项目文件夹,执行下面的命令创建爬虫…

Data-structures-and-algorithms-interview-questions-and-their-solutions

https://techiedelight.quora.com/500-Data-structures-and-algorithms-interview-questions-and-their-solutions转载于:https://www.cnblogs.com/zengkefu/p/6724312.html

机器人循迹小车资料

前言 我记得在大学的时候,参加电子比赛,我们有一个题目是平衡小车项目,那个对基础要求还是比较高的,总结了一些平衡车机器的资料,希望对大家有帮助。 正文 关注公众号,回复【机器人资料】获取

单片机实现环形队列_稀疏数组和队列(二)

队列的介绍队列以一种先入先出(FIFO)的线性表,还有一种先入后出的线性表(FILO)叫做栈。教科书上有明确的定义与描述。类似于现实中排队时的队列(队尾进,队头出),队列只在线性表两端进行操作,插入元素的一端称为表尾,删…

draw graph

http://www.icl.pku.edu.cn/member/yujs/bsdfiles/html/mpost.htmlUNIX下绘图面面观http://www.tug.org/metapost.htmlhttp://www.python-course.eu/finite_state_machine.phphttp://www.hahack.com/tools/pgftikz-resources/http://www.bubuko.com/infodetail-2002658.htmlhtt…

漫画-Linux中断子系统综述

1、中断引发的面试教训2、什么是中断?中断: (英语:Interrupt)指当出现需要时,CPU暂时停止当前程序的执行转而执行处理新情况的程序和执行过程。即在程序运行过程中,系统出现了一个必须由CPU立即…

创业究竟有多难?

前几天跟我的一个前同事Y聊天,Y跟我说他们的项目终于拿到了500万的融资,还给我们几个关系好的发了他签约时的照片,照片中的他充满了喜悦与期待,我们好几个关系不错的都知道这些年的他都经历了什么,也许用坎坷来形容都不…

SQL强化(二) 在Oracle 中写代码

一 : 关于查询中的转换 -- 字符串转换 一 : decode 函数 转换 SELECT DECODE ( PROTYPE.PRO_TYPE_DATE, L, 长, m, 短, 默认值 )FROM PROTYPE -- 字符串转换 二 : case 转换 SELECT T1.PRO_TYPE_ID, T1.PRO_TYPE_NAME, CASE T1.PRO_TYPE_DATEWHEN S THEN 短WHEN L THEN 长EL…

vue 非es6 写法怎么按须加载_Vue源码必学指南:flow(语法检查)以及rollup(模板打包)...

点击上方蓝色字关注我们~一、前言虽然 Vue3 已经公开了代码,但是Vue3.0还处于开发阶段,直接上手使用Typescript是不合适的 , 对于前端的老手是不错的选择, 但是如果没有研究源码经验的开发者还是建议使用完善, 成熟的源码进行入手. 而 Vue 2.x 中使用的 flow 是一个…

漫画|创业到底有多难?

我有一群同事,我们一起经历了一个产品从无到有的过程,从开始的斗志满满到最后跟老板的不欢而散,其中的辛酸苦楚也许只能我们自己能体味,在这过程中,我们共同经历过的事情,有快乐的,悲伤的&#…

【LeetCode】053. Maximum Subarray

题目: Find the contiguous subarray within an array (containing at least one number) which has the largest sum. For example, given the array [-2,1,-3,4,-1,2,1,-5,4],the contiguous subarray [4,-1,2,1] has the largest sum 6. 题解: 遍历所…

python怎么数据归一化_基于数据归一化以及Python实现方式

数据归一化:数据的标准化是将数据按比例缩放,使之落入一个小的特定区间,去除数据的单位限制,将其转化为无量纲的纯数值,便于不同单位或量级的指标能够进行比较和加权。为什么要做归一化:1)加快梯度下降求最…

Android面试总结经

自上周怒辞职以后,就開始苦逼的各种面试生涯,生活全然靠私活来接济,时有时没有,真难。还能快乐的玩耍吗。最多一天面试了5家,哎感觉都是不急招人,各种等待通知。好不easy等来一家。还克扣了薪资&#xff0c…

生成step文件_利用opencv给彦女王生成一副蒙太奇画像

大家好呀,前两天烈阳天道1上映了,不知道大家看没看呢,里面还有一小段彦穿越虫洞与猴哥相遇的画面,彦女王啊啊啊~~所以我去网上爬了二百来张我大学的风景画,然后找了以前存的彦女王的图片,生成了一幅蒙太奇画…