【测试开发】Mq消息重复如何测试?

本篇文章主要讲述重复消费的原因,以及如何去测试这个场景,最后也会告诉大家,目前互联网项目关于如何避免重复消费的解决方案。

Mq为什么会有重复消费的问题?

Mq 常见的缺点之一就是消息重复消费问题,产生这种问题的原因是什么呢?有以下几点:

工作流程

Mq消息重复如何测试?

1、producer 生成数据,发送到broker集群,当遇到网络抖动超时,可能会重复发送。

为了保证数据的可靠性一般都会配置重试机制如下:

rocketmq:producer:group: sanyouProducer#发送消息超过5秒未接收到broker返回的成功消息send-message-timeout: 5000#重试最大次数retry-times-when-send-failed: 2max-message-size: 4194304name-server: 172.30.34.10:9876;172.30.35.37:9876;172.30.35.30:9876#发送消息超时时长,意思是超过5秒钟未收到broker返回的发送成功的消息,#producer会重复发送,但并不是一直发送,会根据retry-times-when-send-failed次数,#最多重试多少次

极端情况下,网络出现抖动,生产者超过设置的时间未收到broker返回的成功消息,会重新发送消息。

2、消费者宕机,未提交offset给broker

由上图可知,broker接收到producer 发送的消息后,会把消息发送给消费者,一般情况下,消费者消费完一条数据,会提交一个offset给到broker,告诉它,这条消息我消费了,但是,极端情况下,消费者消费一条消息成功,提交offset之前,宕机了或者网络抖动超时了,broker未收到offset,就认为这条消息没人消费,当消费者重启服务器或网络恢复,那么broker还会发送这条消息给消费者重新消费。

3、业务上的bug,可能会导致重复消费。

生产者producer的上游系统,突然出现了bug,导致重复调用生产者所在服务的接口,生产者收到请求后,继续发送消息给broker。

当然了,重复消费的原因有很多,以上只是常见的几种原因,那怎么去测试呢?

怎么测试重复消费场景?

假如有这么一个场景,采购员在采购系统的前端页面进行采购单下单操作,下单成功后,采购系统这边会保留一份采购单数据,然后发送一条mq给到wms 仓库系统,那么生产者就是采购系统,消费者就是wms仓库系统,wms消费到采购单的消息,落入数据库wms_purchase表中,为了简化,我只设计了三个字段。

建表ddl:

CREATE TABLE `wms_purchase` (`id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT '仓库采购单id',`purchase_id` bigint(20) NOT NULL COMMENT '采购单id',`purchase_name` varchar(255) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=237 DEFAULT CHARSET=utf8;

怎么测试呢?很简单,我们只要编写生产者工具,在工具里加个循环,尽量循环多次,如下:

@RestController
@RequestMapping("/mq")
public class ProducerController {// 自动注入 RocketMQTemplate模板类,用于生产消息@Autowiredprivate RocketMQTemplate mqTemplate;// 模拟生产者重复消费问题,前提是数据库没有唯一索引,并且项目未做幂等性校验@RequestMapping("/send")public String testSend(@RequestBody WmsPurchaseDto params) {try {for (int i = 0; i <100 ; i++) {mqTemplate.convertAndSend("fourbrothertopic", params);}return "success";} catch (Exception e) {e.printStackTrace();return "fail";}}

解读:

requestmapping对外暴露一个web接口,地址是localhost:8080/demo/mq/send,
post请求,参数是json格式,类似
{
    "purchaseId": "256465",
    "purchaseName": "测试"
}

这种形式,然后起个for循环,循环调用convertAndSend方法,发送同样的消息,最终结果如下图:

Mq消息重复如何测试?

这里模拟producer重复发送的场景,前提是数据库没有对采购单id做唯一索引,并且项目未做幂等性校验。数据库里出现很多采购单id一样的数据,业务上这是不允许的。

假如说,项目出现了这么一种bug,开发那边是怎么修复的呢?

Mq如何保证幂等性?

分享几种解决方案的具体代码demo:

1、数据库unique key(表里不允许重复列出现)来保证幂等性。

很简单,我们只要在wms_purchase里,对purchaseId添加唯一索引即可,提示:在添加唯一索引之前,需清理完表里的数据。

也可以使用ddl语句:

ALTER TABLE `wms_purchase` ADD UNIQUE ( `purchaseId` ) 

代码不变,调用以下接口:

localhost:8080/demo/mq/send post请求
{"purchaseId": "256465","purchaseName": "测试"
}

得到以下结果:

Mq消息重复如何测试?

上图中,循环生产同一条采购单数据,但是右边表中只出现了一条采购单id是256465的数据,说明添加唯一索引确实保证了幂等性,但是代码里却出现大量类似Duplicate entry '256465' for key 'uniqe_key_purchaseid' 日志,是因为触发了数据设置的唯一索引,

由于触发了唯一索引,导致消费者未提交offset给broker,那么broker会认为这条消息未被消费,后续会持续不断地推送消息给消费者,也就意味着会持续不断地报错。

另外这种持续无效的请求数据库会占用数据库的连接资源,在高并发的场景下,会严重拖垮系统响应效率。

虽然保证了幂等性,但是日志里总是报错,太不讲究、也不雅观,那怎么解决呢?

2、数据库unique key+redis 来保证幂等性。

如截图:

Mq消息重复如何测试?

通俗的理解就是,消费者在进行数据库落库操作之前,会判断redis是有这条采购单数据,如果有就直接放过这条消息不做处理,没有这条数据,那就进行落库操作,但在落库之前还要进一步判断数据库是否有这条采购单数据,没有那就进行落库,落库成功,再把采购单的id当做key,采购单数据当做value set 进redis缓存里,设置一定的过期时间。

redis基于内存,操作数据特别快,在进行落库之前查询redis,可以避免很多无效的请求数据库,但是为啥要设置过期时间?因为redis的内存资源有限,并且很宝贵,所以我们希望设置的数据能在一段时间内定期失效,即使失效,也没关系,还有数据库的唯一索引兜底。

这样就很好的保证了幂等性,也避免了大量的日志报错。伪代码如下:

@Component
//mq的监听器,指定topic是TopicTest,消费者组consumerGroupTest
@RocketMQMessageListener(topic = "fourbrothertopic", consumerGroup = "consumerGroupTest")
@Slf4j
public class ConsumeController implements RocketMQListener {@Autowiredprivate WmsPurchaseMapper wmsPurchaseMapper;@Autowiredprivate RedisTemplate redisTemplate;@Overridepublic void onMessage(String message) {log.info("------- Consumer: {}", message);//将message消息映射成WmsPurchase实体WmsPurchase wmsPurchase = JSONObject.parseObject(message, WmsPurchase.class);//首先判断redis里面是否有这条采购单数据,通过PurchaseId查询,有数据,则直接放过不做处理if (redisTemplate.opsForValue().get(wmsPurchase.getPurchaseId().toString())==null){//然后再使用PurchaseId查询数据库,有数据,则直接放过不做处理if (null == wmsPurchaseMapper.selectByPurchaseId(wmsPurchase.getPurchaseId())){//数据库没有数据,就进行插入操作,if (wmsPurchaseMapper.insert(wmsPurchase)>0){//插入成功就把purchaseid塞进redis里,过期时间是72小时redisTemplate.opsForValue().set(wmsPurchase.getPurchaseId(),wmsPurchase.toString(),72, TimeUnit.HOURS);}}else {//能走到这个判断分支,说明缓存里的采购单数据已经失效,如果还有消息重复消费//那就再放入缓存一次,72h过期redisTemplate.opsForValue().set(wmsPurchase.getPurchaseId(),wmsPurchase.toString(),72, TimeUnit.HOURS);log.info("数据库已保留该数据");// 触发重复消费告警机制}}else {log.info("缓存已保留该数据");// 触发重复消费告警机制}}
}

思路很简单,如代码中注释。当然这种方法也有缺点,就是过于依赖redis,有些系统没有使用redis组件,那么还得维护一套redis组件,并且还得保证redis集群高可用。那项目只有mysql,能不能依靠数据库去维护保证幂等性呢?当然可以!

3、还有一种方法叫去重表+唯一索引,顾名思义就是另外维护一张表,记录已经消费的采购单数据,其实和上述方法差不多,上述方法查询缓存,取重表查询数据库取重表。

伪代码 如下:

@Component
//mq的监听器,指定topic是TopicTest,消费者组consumerGroupTest
@RocketMQMessageListener(topic = "fourbrothertopic", consumerGroup = "consumerGroupTest")
@Slf4j
public class ConsumeController implements RocketMQListener {@Autowiredprivate WmsPurchaseMapper wmsPurchaseMapper;@Autowiredprivate UniquePurchaseMapper uniquePurchaseMapper;@Autowiredprivate RedisTemplate redisTemplate;@SneakyThrows@Overridepublic void onMessage(String message) {log.info("------- Consumer: {}", message);//将message消息映射成WmsPurchase实体WmsPurchase wmsPurchase = JSONObject.parseObject(message, WmsPurchase.class);log.info("映射后实体消息"+ JSON.toJSONString(wmsPurchase));if (uniquePurchaseMapper.selectByPurchaseId(wmsPurchase.getPurchaseId().intValue())  == null){if (null == wmsPurchaseMapper.selectByPurchaseId(wmsPurchase.getPurchaseId())){//数据库没有数据,就进行插入操作,if (wmsPurchaseMapper.insert(wmsPurchase)>0){//插入成功就把purchaseid塞进unique_purchaseUniquePurchase  uniquePurchase =   new UniquePurchase();uniquePurchase.setPurchaseId(wmsPurchase.getPurchaseId().intValue());log.info("插入取重表消息:"+ JSON.toJSONString(uniquePurchase));uniquePurchaseMapper.insert(uniquePurchase);}}else {log.info("数据库已保留该数据");//自动触发告警机制}}else {log.info("取重表已有这条采购单数据");}}

代码已上传至gitee,感兴趣可以自行阅读。

上述方式在查询取重表时,并发不安全,极端情况下还是会触发唯一索引错误,比如说,消费者要消费大量消息(线程),执行上述代码,A线程执行完23行,挂起了,cpu把执行权给了B线程,B执行到25行并插入成功,那么这时A线程被唤起,也执行到了23行,结果触发了唯一索引错误。那怎么避免呢?

我们可以让所有线程别并发执行,串行执行,那就用到redis的分布式锁技术。

4、分布式锁+uniquekey

伪代码如下

@Component
//mq的监听器,指定topic是TopicTest,消费者组consumerGroupTest
@RocketMQMessageListener(topic = "fourbrothertopic", consumerGroup = "consumerGroupTest")
@Slf4j
public class ConsumeController implements RocketMQListener {@Autowiredprivate WmsPurchaseMapper wmsPurchaseMapper;@Autowiredprivate RedissonClient redisson;@Autowiredprivate UniquePurchaseMapper uniquePurchaseMapper;@Autowiredprivate RedisTemplate redisTemplate;@SneakyThrows@Overridepublic void onMessage(String message) {log.info("------- Consumer: {}", message);//将message消息映射成WmsPurchase实体WmsPurchase wmsPurchase = JSONObject.parseObject(message, WmsPurchase.class);
// 注入redisson
// 获取锁对象RLock lock = redisson.getLock("lockName");try {// 1. 最常见的使用方法//lock.lock();// 2. 支持过期解锁功能,10秒钟以后自动解锁, 无需调用unlock方法手动解锁//lock.lock(10, TimeUnit.SECONDS);// 3. 尝试加锁,最多等待2秒,上锁以后8秒自动解锁boolean res = lock.tryLock();if (res) { //成功//然后再使用PurchaseId查询数据库,有数据,则直接放过不做处理if (null == wmsPurchaseMapper.selectByPurchaseId(wmsPurchase.getPurchaseId())){//数据库没有数据,就进行插入操作,if (wmsPurchaseMapper.insert(wmsPurchase)>0){//插入成功就把purchaseid塞进redis里,过期时间是72小时redisTemplate.opsForValue().set(wmsPurchase.getPurchaseId().toString(),wmsPurchase.toString(),1, TimeUnit.HOURS);}}else {redisTemplate.opsForValue().set(wmsPurchase.getPurchaseId().toString(),wmsPurchase.toString(),1, TimeUnit.HOURS);log.info("数据库已保留该数据");//自动触发告警机制}}} catch (Exception e) {e.printStackTrace();} finally {//释放锁RLock lockName = redisson.getLock("lockName");if (lockName.isLocked()) {if (lockName.isHeldByCurrentThread()) {lockName.unlock();}}}
}

这种也是比较常见的一种,缺点也很明显,在高并发,大请求量的场景下,所有线程串行执行,处理效率势必会降低。当然了,技术没有好坏,只有合不合适。如果你的项目并发量一般,可以尝试使用上述方法。

具体代码demo已上传至gitee平台,地址如下:

https://gitee.com/lv1792017548/rocketmq-demo.git

总结

本文主要分享了如何测试mq消息队列重复性消费,以及避免重复消费常见的解决方案。

最后感谢每一个认真阅读我文章的人,礼尚往来总是要有的,虽然不是什么很值钱的东西,如果你用得到的话可以直接拿走:

在这里插入图片描述

这些资料,对于【软件测试】的朋友来说应该是最全面最完整的备战仓库,这个仓库也陪伴上万个测试工程师们走过最艰难的路程,希望也能帮助到你!   

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/72639.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

VMware 安装 黑群晖7.1.1-42962 DS918+

本例的用的文件 1、ARPL 1.0beat 引导文件 vmdk格式&#xff1a; https://download.csdn.net/download/mshxuyi/88309308 2、DS918_42962.pat&#xff1a;https://download.csdn.net/download/mshxuyi/88309383 一、引导文件 1、创建一个虚拟机 2、下一步&#xff0c;选稍后…

Linux图形栈入门概念

Mesa在图形栈中的位置 游戏引擎&#xff1a; 游戏引擎指的是一种软件框架&#xff0c;通过编程和各种工具&#xff0c;帮助开发者设计、构建和运行视频游戏。它相当于一个虚拟的世界创造工具&#xff0c;提供了各种功能模块和资源&#xff0c;如渲染引擎、物理引擎(碰撞检测、重…

HTTP代理只能代理HTTP协议吗?

HTTP代理是一种代理服务器&#xff0c;它可以充当客户端和服务器之间的中介&#xff0c;以帮助客户端访问服务器上的资源。但是&#xff0c;HTTP代理并不仅仅只能代理HTTP协议。 HTTP代理可以代理的协议 除了HTTP协议之外&#xff0c;HTTP代理还可以代理其他协议&#xff0c;例…

Apache实现weblogic集群配置

安装apache&#xff0c;安装相对稳定的版本。如果安装后测试能否正常启动&#xff0c;可以通过访问http://localhost/进行测试。安装Weblogic&#xff0c;参见文档将bea安装目录 weblogic81/server/bin 下的 mod_wl_20.so 文件copy到 apache安装目录下Apache2/modules/目录下A…

C++ 浅拷贝和深拷贝

目录 1. 浅拷贝 2. 深拷贝 1. 浅拷贝 浅拷贝只是拷贝一个指针&#xff0c;并没有新开辟一个地址&#xff0c;拷贝的指针和原来的指针指向同一块地址&#xff0c;如果原来的指针所指向的资源释放了&#xff0c;那么再释放浅拷贝的指针的资源就会出现错误 对一个已知对象进行拷贝…

【深入解析spring cloud gateway】06 gateway源码简要分析

上一节做了一个很简单的示例&#xff0c;微服务通过注册到eureka上&#xff0c;然后网关通过服务发现访问到对应的微服务。本节将简单地对整个gateway请求转发过程做一个简单的分析。 一、核心流程 主要流程&#xff1a; Gateway Client向 Spring Cloud Gateway 发送请求请求…

探索Apache Hive:融合专业性、趣味性和吸引力的数据库操作奇幻之旅

文章目录 版权声明一 数据库操作二 Hive数据表操作2.1 表操作语法和数据类型2.2 Hive表分类2.3 内部表Vs外部表2.4 内部表操作2.4.1 创建内部表2.4.2 其他创建内部表的形式2.4.3 数据分隔符2.4.4 自定义分隔符2.4.5 删除内部表 2.5 外部表操作2.5.1 创建外部表2.5.2 操作演示2.…

代码生成商业化一些思考

代码生成解决方案 生成项目代码有3大类的解决思路&#xff1a; 1.从底到上的生成&#xff0c;部分代码生成生成一行代码或者一个方法种一小块代码生成&#xff0c;ide插件代码生成基本这种思路 2.大语言模型作为软件工程不同角色agent&#xff0c;用户给出idea每个agent自动…

BFS练习1

BFS练习1 - 题目 - Daimayuan Online Judge 问题描述&#xff1a; 刚开始吓一跳&#xff0c;以为有什么更简单的呢&#xff0c;因为每一次都要走一次bfs&#xff0c;看了数据范围后&#xff0c;感觉跑一次bfs进行记录即可。 代码&#xff1a; void solve() {int a,k; cin>…

超详细的 pytest 教程(一)使用入门篇

前言 pytest到目前为止还没有翻译的比较好全面的使用文档&#xff0c;很多英文不太好的小伙伴&#xff0c;在学习时看英文文档还是很吃力。本来去年就计划写pytest详细的使用文档的&#xff0c;由于时间关系一直搁置&#xff0c;直到今天才开始写。本文是第一篇&#xff0c;主…

leetcode 1609.奇偶树

⭐️ 题目描述 &#x1f31f; leetcode链接&#xff1a;奇偶树 思路&#xff1a; 树的层序遍历&#xff0c;用队列辅助。用一个变量记录当前是多少层&#xff0c;以及当前层的节点个数&#xff0c;依次遍历&#xff0c;因为需要判断当前层是否严格递增或递减&#xff0c;如果正…

数据结构——带头双向循环链表

数据结构——带头双向循环链表 一、带头双向循环链表的定义二、带头双向循环链表的实现2.1初始化创建带头双向循环链表的节点2.2申请新节点2.3节点的初始化2.4带头双向循环链表的尾插2.5带头双向循环链表的头插2.6判空函数2.7带头双向循环链表的打印函数2.8带头双向循环链表的尾…

软件生命周期及流程【软件测试】

软件的生命周期 软件生命周期是软件开始研制到最终被废弃不用所经历的各个阶段。 瀑布型生命周期模型 规定了它们自上而下、相互衔接的固定次序&#xff0c;如同瀑布流水&#xff0c;逐级下落&#xff0c;具有顺序性和依赖性。每个阶段规定文档并需进行评审。 特点&#xff…

SpringMVC:从入门到精通

一、SpringMVC是什么 SpringMVC是Spring提供的一个强大而灵活的web框架&#xff0c;借助于注解&#xff0c;Spring MVC提供了几乎是POJO的开发模式【POJO是指简单Java对象&#xff08;Plain Old Java Objects、pure old java object 或者 plain ordinary java object&#xff0…

redis 5.0.x 部署

PS&#xff1a;对于使用者来说&#xff0c;Redis5.0和4.0都是一样的&#xff0c;但是redis 4.0的集群部署需要额外安装ruby的东西&#xff0c;5.0中则集成到了redis-cli&#xff0c;部署起来更方便 1.1 安装Redis 本章基于CentOS 7.9.2009编写而成&#xff0c;由于Linux发行版…

【网络知识点】三次握手和四次挥手

文章目录 一、三次握手二、四次挥手 一、三次握手 三次握手的原理如下&#xff1a; 客户端向服务器发送一个SYN&#xff08;同步&#xff09;包&#xff0c;其中包含一个随机生成的初始序列号&#xff08;ISN&#xff09;。 服务器收到SYN包后&#xff0c;会发送一个SYNACK&…

QT设计一个小闹钟

设置一个闹钟&#xff0c;左侧窗口显示当前时间&#xff0c;右侧设置时间&#xff0c;以及控制闹钟的开关&#xff0c;下方显示闹钟响时的提示语。当按启动按钮时&#xff0c;设置时间与闹钟提示语均不可再改变。当点击停止时&#xff0c;关闭闹钟并重新启用设置时间与闹钟提示…

微服务-kubernetes安装

文章目录 一、前言二、kubernetes2.1、Kubernetes (K8S) 是什么2.1.1、主要特性&#xff1a;2.2.2、传统部署方式&#xff1a;2.2.3、虚拟机部署2.2.4容器部署2.2.5什么时候需要 Kubernetes2.2.6、Kubernetes 集群架构 三、kubernetes安装3.1、主节点需要组件3.1.1、设置对应主…

【Eclipse】Project interpreter not specified 新建项目时,错误提示,已解决

目录 0.环境 1&#xff09;问题截图&#xff1a; 2&#xff09;错误发生原因&#xff1a; 1.解决思路 2.具体步骤 0.环境 windows 11 64位&#xff0c;Eclipse 2021-06 1&#xff09;问题截图&#xff1a; 2&#xff09;错误发生原因&#xff1a; 由于我手欠&#xff0c;将…

HTTPS 之fiddler抓包--jmeter请求

一、浅谈HTTPS 我们都知道HTTP并非是安全传输&#xff0c;在HTTPS基础上使用SSL协议进行加密构成的HTTPS协议是相对安全的。目前越来越多的企业选择使用HTTPS协议与用户进行通信&#xff0c;如百度、谷歌等。HTTPS在传输数据之前需要客户端&#xff08;浏览器&#xff09;与服…