基于注解实现去重表消息防止重复消费

基于注解实现去重表消息防止重复消费

1. 背景/问题

在分布式系统中,消息队列(如RocketMQ、Kafka)的 消息重复消费 是常见问题,主要原因包括:

  • 网络抖动:生产者或消费者因网络不稳定触发消息重发。
  • 消费者超时:消费者处理时间过长,消息队列误判为失败并重新投递。
  • 集群故障转移:消费者宕机后,未完成的消息会被其他节点重新拉取。

重复消费带来的问题

  • 业务逻辑多次执行(如重复扣款、重复生成订单)。
  • 数据一致性被破坏(如库存超卖、积分累加错误)。
  • 系统资源浪费,影响性能和稳定性。

为了避免这种情况发生,需要在客户端实现一些机制来确保消息不会被重复消费,例如记录消费者已经处理的消息 ID、使用分布式锁来控制消费进程的唯一性等。这些机制能够保证消息被成功处理,同时也能够提高系统的可靠性和稳定性。

2. 什么是幂等性

幂等性 是指对同一操作的多次执行所产生的影响与一次执行的影响相同。

  • 消息消费场景:无论消息被消费多少次,最终结果应与消费一次一致。
  • 实现目标:通过幂等设计,确保业务逻辑的重复执行不会产生副作用。

3. 幂等设计

核心思路

  1. 幂等标识:为每条消息生成唯一标识(如业务ID + 消息ID),记录其处理状态。
  2. 状态管理:通过数据库或Redis维护幂等标识的状态(如“消费中”“已消费”)。
  3. 过期时间:防止因系统崩溃导致状态长期滞留,需设置合理的超时时间(如10分钟)。
[消费者接收消息]  │  ▼  
[解析消息,生成唯一幂等标识]  │  ▼  
[查询幂等标识状态]  │  
┌───────┴───────┐  
│ 存在且已消费  │           [返回成功,丢弃消息]  
└───────┬───────┘  │  
┌───────┴───────┐  
│ 存在且消费中  │           [延迟消费,等待重试]  
└───────┬───────┘  │  
┌───────┴───────┐  
│   不存在      │  
└───────┬───────┘  │  
[设置幂等标识为“消费中”,并设置过期时间]  │  ▼  
[执行业务逻辑]  │  ▼  
[业务执行成功?]  │  
┌───────┴───────┐  
│     是        │           [更新标识为“已消费”]  
│               │           [删除或保留标识]  
└───────┬───────┘  │  
┌───────┴───────┐  
│     否        │           [删除标识,允许重试]  
└───────┬───────┘  │  ▼  
[流程结束]  

4.抽象通用幂等组件

消息防重复消费幂等组件是通用的通常会提取出来也可供其他模块/服务 使用

4.1自定义幂等注解

提供了一种通用的幂等注解,并通过 SpEL 的形式生成去重表全局唯一 Key

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface NoMQDuplicateConsume {/*** 设置防重令牌 Key 前缀*/String keyPrefix() default "";/*** 通过 SpEL 表达式生成的唯一 Key*/String key();/*** 设置防重令牌 Key 过期时间,单位秒,默认 1 小时*/long keyTimeout() default 3600L;
}

4.2. 定义幂等枚举

幂等需要设置两个状态,消费中和已消费,创建对应的枚举

@RequiredArgsConstructor
public enum IdempotentMQConsumeStatusEnum {/*** 消费中*/CONSUMING("0"),/*** 已消费*/CONSUMED("1");@Getterprivate final String code;/*** 如果消费状态等于消费中,返回失败** @param consumeStatus 消费状态* @return 是否消费失败*/public static boolean isError(String consumeStatus) {return Objects.equals(CONSUMING.code, consumeStatus);}
}

4.3.通过 AOP 的方式进行增强注解

如果说方法上加了注解,会被这段 AOP 代码以环绕增强方式执行

@Slf4j
@Aspect
@RequiredArgsConstructor
public final class NoMQDuplicateConsumeAspect {private final StringRedisTemplate stringRedisTemplate;private static final String LUA_SCRIPT = """local key = KEYS[1]local value = ARGV[1]local expire_time_ms = ARGV[2]return redis.call('SET', key, value, 'NX', 'GET', 'PX', expire_time_ms)""";/*** 增强方法标记 {@link NoMQDuplicateConsume} 注解逻辑*/@Around("@annotation(com.nageoffer.onecoupon.framework.idempotent.NoMQDuplicateConsume)")public Object noMQRepeatConsume(ProceedingJoinPoint joinPoint) throws Throwable {NoMQDuplicateConsume noMQDuplicateConsume = getNoMQDuplicateConsumeAnnotation(joinPoint);String uniqueKey = noMQDuplicateConsume.keyPrefix() + SpELUtil.parseKey(noMQDuplicateConsume.key(), ((MethodSignature) joinPoint.getSignature()).getMethod(), joinPoint.getArgs());String absentAndGet = stringRedisTemplate.execute(RedisScript.of(LUA_SCRIPT, String.class),List.of(uniqueKey),IdempotentMQConsumeStatusEnum.CONSUMING.getCode(),String.valueOf(TimeUnit.SECONDS.toMillis(noMQDuplicateConsume.keyTimeout())));// 如果不为空证明已经有if (Objects.nonNull(absentAndGet)) {boolean errorFlag = IdempotentMQConsumeStatusEnum.isError(absentAndGet);log.warn("[{}] MQ repeated consumption, {}.", uniqueKey, errorFlag ? "Wait for the client to delay consumption" : "Status is completed");if (errorFlag) {throw new ServiceException(String.format("消息消费者幂等异常,幂等标识:%s", uniqueKey));}return null;}Object result;try {// 执行标记了消息队列防重复消费注解的方法原逻辑result = joinPoint.proceed();// 设置防重令牌 Key 过期时间,单位秒stringRedisTemplate.opsForValue().set(uniqueKey, IdempotentMQConsumeStatusEnum.CONSUMED.getCode(), noMQDuplicateConsume.keyTimeout(), TimeUnit.SECONDS);} catch (Throwable ex) {// 删除幂等 Key,让消息队列消费者重试逻辑进行重新消费stringRedisTemplate.delete(uniqueKey);throw ex;}return result;}/*** @return 返回自定义防重复消费注解*/public static NoMQDuplicateConsume getNoMQDuplicateConsumeAnnotation(ProceedingJoinPoint joinPoint) throws NoSuchMethodException {MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();Method targetMethod = joinPoint.getTarget().getClass().getDeclaredMethod(methodSignature.getName(), methodSignature.getMethod().getParameterTypes());return targetMethod.getAnnotation(NoMQDuplicateConsume.class);}

lua脚本解释

local key = KEYS[1] # 第一个 Key,即幂等唯一标识 uniqueKey
local value = ARGV[1] # 第一个参数,即初始化幂等消费状态,为消费中
local expire_time_ms = ARGV[2] # 第二个参数,即幂等 Key 过期时间return redis.call('SET', key, value, 'NX', 'GET', 'PX', expire_time_ms)

该脚本的主要作用是:在 Redis 中尝试以 NX 方式设置一个键,即如果键不存在,则设置新值,并返回设置之前的旧值,同时为该键设置过期时间(以毫秒为单位)。

获取到 Redis 里面的 Key 值后,可能会有三个流程执行:

absentAndGet 为空:代表消息是第一次到达,执行完 LUA 脚本后,会在 Redis 设置 Key 的 Value 值为 0,消费中状态。

absentAndGet 为 0:代表已经有相同消息到达并且还没有处理完,会通过抛异常的形式让 RocketMQ 重试。

absentAndGet 为 1:代表已经有相同消息消费完成,返回空表示不执行任何处理。

4.4.注册为 Spring Bean

另外可以看看另一篇基于分布式锁注解防重复提交

https://blog.csdn.net/sjsjsbbsbsn/article/details/145131305?spm=1001.2014.3001.5501

public class IdempotentConfiguration {/*** 防止消息队列消费者重复消费消息切面控制器*/@Beanpublic NoMQDuplicateConsumeAspect noMQDuplicateConsumeAspect(StringRedisTemplate stringRedisTemplate) {return new NoMQDuplicateConsumeAspect(stringRedisTemplate);}
}

4.5EL工具类

public class SpELUtil {/*** 校验并返回实际使用的 spEL 表达式** @param spEl spEL 表达式* @return 实际使用的 spEL 表达式*/public static Object parseKey(String spEl, Method method, Object[] contextObj) {List<String> spELFlag = ListUtil.of("#", "T(");Optional<String> optional = spELFlag.stream().filter(spEl::contains).findFirst();if (optional.isPresent()) {return parse(spEl, method, contextObj);}return spEl;}/*** 转换参数为字符串** @param spEl       spEl 表达式* @param contextObj 上下文对象* @return 解析的字符串值*/public static Object parse(String spEl, Method method, Object[] contextObj) {DefaultParameterNameDiscoverer discoverer = new DefaultParameterNameDiscoverer();ExpressionParser parser = new SpelExpressionParser();Expression exp = parser.parseExpression(spEl);String[] params = discoverer.getParameterNames(method);StandardEvaluationContext context = new StandardEvaluationContext();if (ArrayUtil.isNotEmpty(params)) {for (int len = 0; len < params.length; len++) {context.setVariable(params[len], contextObj[len]);}}return exp.getValue(context);}
}

5.实战使用

使用天机学堂项目来进行实战

5.1写入common模块

在这里插入图片描述

5.2使用

在这里插入图片描述

直接加上注解就可以

但是实际上这里不存在幂等问题,因为userId和courseId设置了唯一索引,所以这里不存在幂等性,不需要加上幂等注解

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/67777.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Biotin sulfo-N-hydroxysuccinimide ester ;生物素磺基-N-羟基琥珀酰亚胺酯;生物素衍生物;190598-55-1

一、生物素及其衍生物的概述 生物素衍生物是指在生物素&#xff08;Vitamin H或B7&#xff09;分子基础上进行化学修饰得到的衍生化合物。这些衍生化合物在生物医学研究、临床诊断和药物开发等领域有着广泛的应用。 生物素&#xff08;Biotin&#xff09;是一种水溶性维生素&a…

Ubuntu如何安装redis服务?

环境&#xff1a; Ubuntu22.04 WSL2 问题描述&#xff1a; 如何安装redis服务&#xff1f; 解决方案&#xff1a; 1.在 Linux 上&#xff08;如 Ubuntu/Debian&#xff09;安装 1.通过包管理工具安装 Redis 服务器&#xff1a; sudo apt update sudo apt install redis…

Datawhale组队学习笔记task2——leetcode面试题

文章目录 写在前面Day5题目1.0112.路径总和解答2.0113路径总和II解答3.0101.对称二叉树解答 Day6题目1.0124.二叉树中的最大路径和解答2.0199.二叉树的右视图解答3.0226.翻转二叉树解答 Day7题目1.0105.从前序与中序遍历序列构造二叉树解答2.0098.验证二叉搜索树解答3.0110.平衡…

Flask简介与安装以及实现一个糕点店的简单流程

目录 1. Flask简介 1.1 Flask的核心特点 1.2 Flask的基本结构 1.3 Flask的常见用法 1.3.1 创建Flask应用 1.3.2 路由和视图函数 1.3.3 动态URL参数 1.3.4 使用模板 1.4 Flask的优点 1.5 总结 2. Flask 环境创建 2.1 创建虚拟环境 2.2 激活虚拟环境 1.3 安装Flask…

RFID系统安全认证协议及防碰撞算法研究(RFID Security)

目录 1.摘要 2.引言 3.前人研究成果 3.1 RFID系统协议模型 3.2 RFID系统安全认证协议分类 3.3 RFID安全认证协议及其研究 3.3.1 超轻量级安全认证协议及其研究 3.3.2 轻量级安全认证协议及其研究 3.3.2 中量级安全认证协议及其研究 3.3.3 重量级安全认证协议及其研究…

Docker 实现MySQL 主从复制

一、拉取镜像 docker pull mysql:5.7相关命令&#xff1a; 查看镜像&#xff1a;docker images 二、启动镜像 启动mysql01、02容器&#xff1a; docker run -d -p 3310:3306 -v /root/mysql/node-1/config:/etc/mysql/ -v /root/mysql/node-1/data:/var/lib/mysql -e MYS…

Spring MVC:设置响应

目录 引言 1. 返回静态页面 1.1 Spring 默认扫描路径 1.2 RestController 1.2.1 Controller > 返回页面 1.2.2 ResponseBody 2. 返回 HTML 2.1 RequestMapping 2.1.1 produces(修改响应的 Content-Type) 2.1.2 其他属性 3. 返回 JSON 4. 设置状态码 4.1 HttpSer…

基于python+Django+mysql鲜花水果销售商城网站系统设计与实现

博主介绍&#xff1a;黄菊华老师《Vue.js入门与商城开发实战》《微信小程序商城开发》图书作者&#xff0c;CSDN博客专家&#xff0c;在线教育专家&#xff0c;CSDN钻石讲师&#xff1b;专注大学生毕业设计教育、辅导。 所有项目都配有从入门到精通的基础知识视频课程&#xff…

提示词的艺术----AI Prompt撰写指南(个人用)

提示词的艺术 写在前面 制定提示词就像是和朋友聊天一样&#xff0c;要求我们能够清楚地表达问题。通过这个过程&#xff0c;一方面要不断练习提高自己地表达能力&#xff0c;另一方面还要锻炼自己使用更准确精炼的语言提出问题的能力。 什么样的提示词有用&#xff1f; 有…

Spring Boot自动配置原理:如何实现零配置启动

引言 在现代软件开发中&#xff0c;Spring 框架已经成为 Java 开发领域不可或缺的一部分。而 Spring Boot 的出现&#xff0c;更是为 Spring 应用的开发带来了革命性的变化。Spring Boot 的核心优势之一就是它的“自动配置”能力&#xff0c;它极大地简化了 Spring 应用的配置…

大模型GUI系列论文阅读 DAY2续2:《使用指令微调基础模型的多模态网页导航》

摘要 自主网页导航的进展一直受到以下因素的阻碍&#xff1a; 依赖于数十亿次的探索性交互&#xff08;通常采用在线强化学习&#xff09;&#xff0c;依赖于特定领域的模型设计&#xff0c;难以利用丰富的跨领域数据进行泛化。 在本研究中&#xff0c;我们探讨了基于视觉-语…

在视频汇聚平台EasyNVR平台中使用RTSP拉流的具体步骤

之前有用户反馈&#xff0c;在EasyNVR平台中添加Pull时使用海康设备的RTSP流地址无法播放。经过研发的优化及一系列严谨的验证流程&#xff0c;我们已确认优化后的EasyNVR平台&#xff0c;通过Pull方式添加海康设备的RTSP流已经能够正常播放。以下是具体的操作步骤&#xff1a;…

Debezium日常分享系列之:对于从Oracle数据库进行快照的性能优化

Debezium日常分享系列之&#xff1a;对于从Oracle数据库进行快照的性能优化 源数据库Kafka Connect监控测试结果 源数据库 Oracle 19c&#xff0c;本地&#xff0c;CDB数据库主机的I/O带宽为6 GB/s&#xff0c;由此主机上运行的所有数据库共享临时表空间由42个文件组成&#x…

C++书籍 第一部分专业C++程序设计概述

1&#xff0c;必不可少的“hello world” #include<iostream>int main(int argc, char** argv) {std::cout << "hello world" << std::endl;return 0; } 这个是一个极其简单的程序&#xff0c;虽然没有多大简直&#xff0c;但是可以体现c程序格式方…

VIVADO ILA IP进阶使用之任意设置ILA的采样频率

VIVADO ILA IP进阶使用之任意设置ILA的采样频率 VIVADO ILA IP和VIO IP结合使用任意设置ILA的采样频率 目录 前言 一、VIO IP的配置 二、ILA IP的配置 三、测试代码 四、测试结果 总结 前言 VIVADO中编写完程序上板测试时经常会用到viavdo自带的ILA逻辑分析仪IP核&#x…

spring @EnableAspectJAutoProxy @Aspect的使用和源码流程

目录 测试代码EnableAspectJAutoProxyAspectJAutoProxyRegistrarAnnotationAwareAspectJAutoProxyCreatororg.springframework.context.support.AbstractApplicationContext#registerBeanPostProcessors 实例化AnnotationAwareAspectJAutoProxyCreator bean "a"的代理…

【BUUCTF】[GXYCTF2019]BabySQli

进入页面如下 尝试万能密码注入 显示这个&#xff08;qyq&#xff09; 用burp suite抓包试试 发现注释处是某种编码像是base编码格式 MMZFM422K5HDASKDN5TVU3SKOZRFGQRRMMZFM6KJJBSG6WSYJJWESSCWPJNFQSTVLFLTC3CJIQYGOSTZKJ2VSVZRNRFHOPJ5 可以使用下面这个网页在线工具很方便…

重生之我在异世界学编程之算法与数据结构:深入堆篇

大家好&#xff0c;这里是小编的博客频道 小编的博客&#xff1a;就爱学编程 很高兴在CSDN这个大家庭与大家相识&#xff0c;希望能在这里与大家共同进步&#xff0c;共同收获更好的自己&#xff01;&#xff01;&#xff01; 本文目录 正文一、堆的基本概念二、堆的存储表示三…

《自动驾驶与机器人中的SLAM技术》ch8:基于预积分和图优化的紧耦合 LIO 系统

目录 1 预积分 LIO 系统的经验 2 预积分图优化的顶点 3 预积分图优化的边 3.1 NDT 残差边&#xff08;观测值维度为 3 维的单元边&#xff09; 4 基于预积分和图优化 LIO 系统的实现 4.1 IMU 静止初始化 4.2 使用预积分预测 4.3 使用 IMU 预测位姿进行运动补偿 4.4 位姿配准部…

软件测试—— 接口测试(HTTP和HTTPS)

软件测试—— 接口测试&#xff08;HTTP和HTTPS&#xff09; HTTP请求方法GET特点使用场景URL结构URL组成部分URL编码总结 POST特点使用场景请求结构示例 请求标头和响应标头请求标头&#xff08;Request Headers&#xff09;示例请求标头 响应标头&#xff08;Response Header…