RocketMQ重复消费的解决方案::分布式锁直击面试!

文章目录

  • 场景分析
    • 方法的幂等
    • 分布式锁
      • Redis实现分布式锁
        • 抢锁的设计思路
    • 分布式锁案例
  • 直击面试
    • rocketmq什么时候重复消费
    • 消息丢失的问题
      • 消息在哪里丢失
      • 发送端确保发送成功并且配合失败的业务处理
      • 消费端确保消息不丢失
      • rocketmq 主从+同步刷盘

场景分析

在这里插入图片描述
分布式系统架构中,队列是分布式的,生产端是分布式集群,消费端也是分布式集群.

相当于有多个消费端同时监听队列,同时减库存,写入订单.

面试题:如何处理消息重复消费的问题. 重复消费大部分场景,需要解决的.

引入2个概念来解决: 幂等的业务方法,和消息的分布式锁.

方法的幂等

结论: 一个方法的一次业务逻辑调用和N次调用的结果是一致的,我们称这种方法就是幂等.

一旦重复消费,一定要把消费的业务逻辑方法(orderAdd)设计成幂等的.

  • 幂等的方法
    • GET方法: 查询方法,天生幂等.
    • DELETE方法: 删除方法,天生幂等.
    • PUT方法: 修改,并不是天生幂等,需要设计
      • 减少库存:
        • update stock_tbl set stock=stock-#{stock} where id=#{id}(不是幂等)
        • select * from stock_log where order_id=#{orderId}(查询日志,判断是否已经见过库存了),没有数据 update stock_tbl set stock=stock-#{stock} where id=#{id},insert into stock_log (字段) values (订单id,商品减库存信息) (这样设计就幂等了,依然有问题)
    • POST方法: 新增,并不是天生幂等,需要设计
      • 新增订单: insert into order_tbl (order_id,order_item_id,count,user_id) values (各种属性);如果使用唯一属性校验,作用在order_id order_sn(编号).同一张订单,这个字段值是相同(幂等满足,没做幂等不满足)
  • 当前orderAdd方法设计幂等的解决思路(之一)
    • 使用订单id或者订单编号,**userId+商品id(**这个只满足当前我们的案例特点,不满足实际场景.)查询订单,如果已经存在了,库存不减少,订单不增了,购物车不用删除了
@Override
public void orderAdd(OrderAddDTO orderAddDTO) {//幂等设计思路: 利用userId和commodityCode 查询,如果已经存在了订单,方法直接执行结束//如果结果不存在,减库存,生单,删除购物车int count=orderMapper.selectExists(orderAddDTO);if (count>0){log.debug("订单已经新增了");return;}StockReduceCountDTO countDTO=new StockReduceCountDTO();countDTO.setCommodityCode(orderAddDTO.getCommodityCode());countDTO.setReduceCount(orderAddDTO.getCount());// 利用Dubbo调用stock模块减少库存的业务逻辑层方法实现功能stockService.reduceCommodityCount(countDTO);// 2.从购物车中删除用户选中的商品(调用Cart模块删除购物车中商品的方法)// 利用dubbo调用cart模块删除购物车中商品的方法实现功能Order order=new Order();BeanUtils.copyProperties(orderAddDTO,order);// 下面执行新增 假设insert是幂等的.orderMapper.insertOrder(order);log.info("新增订单信息为:{}",order);cartService.cartDelete(orderAddDTO);
}@Select("select count(id) from " +
"order_tbl where user_id=#{userId} and commodity_code=#{commodityCode}")
int selectExists(OrderAddDTO orderAddDTO);

分布式锁

当前分布式消费架构
在这里插入图片描述
即使,将方法设计成幂等,这个架构中,消息重复消费

,满足线程安全问题的所有因素

  • 并发/多线程
  • 写操作
  • 共享数据

只要解决其中一点,线程安全问题就消失了.

并发多线程–>串行

写操作–> 避免写(不能满足当前案例,必须写)

共享数据–>个体数据(不能满足,重复消费,重复订单是前提)

分布式线程安全问题的解决方案—分布式锁

错误思路: 引入synchronized同步锁,不能解决分布式场景下,多个进程的并发线程安全问题.

概念: 分布式场景下,多进程,多线程并发的抢锁机制. 抢到资源锁,执行业务逻辑,抢不到等待或者放弃执行.能够避免对同一个资源出现并发多线程操作的解决方案.

和synchronized的区别在于 synchronizeds本地锁.管理一个进程中的多线程,分布式锁是管理多个进程中的多线程.

分布式锁当前落地方案: redis setnx命令

Redis实现分布式锁

抢锁的设计思路

  • 目标: 多线程执行业务之前,先判断执行权限,抢锁,抢到锁的才能执行业务,抢不到的不执行.(当前案例中,抢锁,然后执行的业务逻辑是:orderAdd)
    在这里插入图片描述
    抢锁如何执行?: setnx key “”

  • key值如何设计?: 需要结合业务,设计key值(redis中最主要的功能,都关系到key值的设计),抢锁的逻辑中,满足是业务数据,满足重复消费的重复数据.就可以实现这个key值的设计. 消息Id是重复的.

  • 当前业务流程设计缺陷: 如果有一个消费者抢到锁了,执行了业务方法.执行完成后,没有释放锁的机制.如果引入等待重抢的机制,由于抢到锁的没有释放,会导致死锁.
    在这里插入图片描述
    释放锁的逻辑引入
    在这里插入图片描述
    上述整改的流程中避免了死锁问题,但是存在删除失败导致死锁的问题.
    在这里插入图片描述
    所以,要保证del释放没有成功,在redis也一定不会长期保存.
    在这里插入图片描述
    兜底的解决死锁问题.基本不会出现死锁了.
    在这里插入图片描述为了解决误删除的问题,抢锁的时候setnx key value值设计成一个随机数.
    在这里插入图片描述
    随机数两个消费,多个消费者生成相同的可能性极低.

分布式锁案例

@Component
@RocketMQMessageListener(topic = "business-order-topic",consumerGroup = "${rocketmq.consumer.group}",selectorExpression = "orderAdd")
@Slf4j
public class OrderAddConsumerListener implements RocketMQListener<MessageExt> {@Autowiredprivate IOrderService orderService;@Autowiredprivate StringRedisTemplate redisTemplate;@Overridepublic void onMessage(MessageExt msg) {//拿到底层消息对象的bodybyte[] body = msg.getBody();//尝试先解析成stringString orderJson=new String(body, StandardCharsets.UTF_8);System.out.println(orderJson);OrderAddDTO orderAddDTO=JSON.toJavaObject(JSON.parseObject(orderJson),OrderAddDTO.class);System.out.println(orderAddDTO);//1.生成锁的key值,生成当前这把锁的随机数//准备锁keyString msgKeyLock="msg:order:add:"+msg.getMsgId();//准备随机数 4 6 8位String randCode=new Random().nextInt(9000)+1000+"";ValueOperations<String, String> stringOps = redisTemplate.opsForValue();try{//补充消息消费的抢锁机制//2.抢锁 setnx msgKeyLock randCode expire 10sBoolean tryLockSuccess = stringOps.setIfAbsent(msgKeyLock, randCode, 10, TimeUnit.SECONDS);//3.判断 抢锁成功还是失败if(!tryLockSuccess){//3.2 失败了 可以等待5秒重新抢锁,也可以直接结束//尝试这里使用while编写等待5秒重新抢的逻辑log.info("有别人抢锁了,msgKey:{},value:{}",msgKeyLock,randCode);return;}//3.1 成功了 执行orderAddorderService.orderAdd(orderAddDTO);}catch (CoolSharkServiceException e){//业务异常,说明订单新增业务性失败,比如库存没了log.error("库存减少失败,库存触底了:{},异常信息:{}",orderAddDTO,e.getMessage());}finally {//释放锁 读以下锁的value值,等于当前生成value才释放String s = stringOps.get(msgKeyLock);if (s!=null && s.equals(randCode)){//del msgKeyLockredisTemplate.delete(msgKeyLock);}}}
}

直击面试

rocketmq什么时候重复消费

  • 在broker做扩展的时候,消息队列的消息,做扩展的时候,原本存储在原队列的消息,会进行rebalance重平衡.
  • 消费开始阶段
    在这里插入图片描述消费者consumer1 所在group1 绑定队列,push消费模式,使得消费者接受到了queue1 queue2的6条消息.消费过程,成功执行,即将返回确认.
    在这里插入图片描述
    总结:
  • 消费者并发消费的逻辑,同一组消费者绑定分布式队列,推送批量的消息
  • 在某个消费者还没有来得及消费,或者没来得及返回确认给rocketmq,队列发生了扩容缩容
  • rocketmq会对队列中所有的消息做rebalance重平衡(消息重新分配给不同队列),消费者绑定也充平衡
  • 导致已经推送的但是未返回确认的消息,被发送给不同消费者多次.

消息丢失的问题

rocketmq kafka rabbitmq activemq都是队列.只要谈到其中一个.

  1. 重复消费的问题(方法必须设计成幂等,一旦设计成幂等,可能造成线程安全隐患,所以引入分布式锁)
  2. 消息丢失如何处理.

面试题:消息丢失如何处理.

消息在哪里丢失

  • 发送没成功,没有解决不成功的业务逻辑
  • rocketmq保存的时候,断电,宕机,丢失消息(运行的时候,消息存储在内存)
  • 消费端丢失消息(没有成功处理消息,就直接返回success,并不是所有的消费逻辑都是先消费,再确认的,如果关注的是消费速度,不关注成功或者是否丢失,就可以这样处理)

发送端确保发送成功并且配合失败的业务处理

同步发送,接收发送结果,SEND_OK才结束.

客户端代码底层都有默认重试(retry 3 times).发送重试都失败了.

处理发送失败的逻辑.

  1. 发送到备用/失败的队列
  2. 记录日志,将消息来源,目标和消息内容,详细记录,等待监控系统,维护人员来直接处理

消费端确保消息不丢失

一定是先消息费,在确认,消费失败,返回失败(rocketmq消费点位保持原有位置不变,同一个消费者组,会重新拿到消息)

rocketmq 主从+同步刷盘

在这里插入图片描述
同步刷盘(消息数据可靠性保证): 如果持久化内存消息数据到磁盘失败,发送结果没有成功.

异步刷盘: 只要内存接收到了生产端的消息数据,数据是否持久化到磁盘,都会给生产端发送成功接收信息.

主从的双机热备: broker可以配置主从,考虑数据可靠性,和性能,一般主master做同步刷盘,slave做异步刷盘.(都同步刷盘,100%保证消息只要到达rocketmq就不会丢失,但是性能不能保证.)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/13404.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

css实现有缺口的border

css实现有缺口的border 1.问题回溯2.css实现有缺口的border 1.问题回溯 通常会有那种两个div都有border重叠在一起就会有种加粗的效果。 div1,div2,div3都有个1px的border&#xff0c;箭头标记的地方是没有处理解决的&#xff0c;很明显看着是有加粗效果的。其实这种感觉把di…

【Java从入门到大牛】集合进阶上篇

&#x1f525; 本文由 程序喵正在路上 原创&#xff0c;CSDN首发&#xff01; &#x1f496; 系列专栏&#xff1a;Java从入门到大牛 &#x1f320; 首发时间&#xff1a;2023年7月29日 &#x1f98b; 欢迎关注&#x1f5b1;点赞&#x1f44d;收藏&#x1f31f;留言&#x1f43…

IntelliJ IDEA流行的构建工具——Gradle

IntelliJ IDEA&#xff0c;是java编程语言开发的集成环境。IntelliJ在业界被公认为最好的java开发工具&#xff0c;尤其在智能代码助手、代码自动提示、重构、JavaEE支持、各类版本工具(git、svn等)、JUnit、CVS整合、代码分析、 创新的GUI设计等方面的功能可以说是超常的。 如…

基于java SpringBoot和HTML的博客系统

随着网络技术渗透到社会生活的各个方面&#xff0c;传统的交流方式也面临着变化。互联网是一个非常重要的方向。基于Web技术的网络考试系统可以在全球范围内使用互联网&#xff0c;可以在本地或异地进行通信&#xff0c;大大提高了通信和交换的灵活性。在当今高速发展的互联网时…

【达哥讲网络】第3集:数据交换的垫基石——二层交换原理

专业的网络工程师在进行网络设计时&#xff0c;会事先规划好不同业务数据的转发路径&#xff0c;一方面是为了满足用户应用需求&#xff0c;另一方面是为了提高数据转发效率、充分利用各设备/各链路的硬件或带宽资源。在进行网络故障排除时&#xff0c;理顺各路数据的转发路径也…

AI For Engineers 线上参会指南

AI For Engineers 线上参会指南 欢迎您报名参加 AI For Engineers&#xff1a;工程师 AI 全球会议&#xff0c;为了让各位参会者参会体验更佳&#xff0c;更好地利用本次会议收获更多。Altair 特别为各位准备了线上参会指南&#xff0c;一起来看看吧~ 会议时间&#xff1a;20…

掌握Python的X篇_12_如何使用VS Code调试Python程序

本篇将会介绍如何使用VS Code调试Python程序。 文章目录 1. 什么是调试2. 断点3. 如何启动调试4. 监视窗口5. 单步 1. 什么是调试 我们可以利用VS Code对Python代码进行调试。所谓调试&#xff0c;大家可以理解成有能力将程序进行 “慢动作播放”让我们有机会看到程序一步一步…

flutter minio

背景 前端 经常需要上传文件 图片 视频等等 到后端服务器&#xff0c; 如果到自己服务器 一般会有安全隐患。也不方便管理这些文件。如果要想使用一些骚操作 比如 按照前端请求生成不同分辨率的图片&#xff0c;那就有点不太方便了。 这里介绍以下 minio&#xff0c;&#xff0…

flutter开发实战-父子Widget组件调用方法

flutter开发实战-父子Widget组件调用方法 在最近开发中遇到了需要父组件调用子组件方法&#xff0c;子组件调用父组件的方法。这里记录一下方案。 一、使用GlobalKey 父组件使用globalKey.currentState调用子组件具体方法&#xff0c;子组件通过方法回调callback方法调用父组…

PHP-Mysql图书管理系统--【白嫖项目】

强撸项目系列总目录在000集 PHP要怎么学–【思维导图知识范围】 文章目录 本系列校训本项目使用技术 首页phpStudy 设置导数据库后台的管理界面数据库表结构项目目录如图&#xff1a;代码部分&#xff1a;主页的head 配套资源作业&#xff1a; 本系列校训 用免费公开视频&am…

LeetCode32.Longest-Valid-Parentheses<最长有效括号>

题目&#xff1a; 思路&#xff1a; 遍历括号.遇到右括号然后前一个是左括号 那就res2,然后重定位 i 的值 并且长度减少2; 但是问题在于无法判断最长的括号.只能得到string内的全部括号长度. 错误代码: 写过一题类似的,那题是找括号数.记得是使用的栈,但是死活写不出来. 看完…

Dubbo 指定调用固定ip+port dubbo调用指定服务 dubbo调用不随机 dubbo自定义调用服务 dubbo点对点通信 dubbo指定ip

1. 在写分布式im时nami-im: 分布式im, 集群 zookeeper netty kafka nacos rpc主要为gate&#xff08;长连接服务&#xff09; logic &#xff08;业务&#xff09; lsb &#xff08;负载均衡&#xff09;store&#xff08;存储&#xff09; - Gitee.com&#xff0c;需要指定某一…

当机器人变硬核:探索深度学习中的时间序列预测

收藏自&#xff1a;Wed, 15 Sep 2021 10:32:56 UTC 摘要&#xff1a;时间序列预测是机器学习和深度学习领域的一个重要应用&#xff0c;它可以用于预测未来趋势、分析数据模式和做出决策。本文将介绍一些基本概念和常用方法&#xff0c;并结合具体的案例&#xff0c;展示如何使…

React Native 0.72 版本,带来诸多更新

经过漫长的等待,React Native 终于迎来了0.72 版本,此处版本带来了Metro重要的功能更新、性能优化、开发人员体验的改进以及其他一些重要的变化。我们可以从下面的链接中获取此次版本更新的内容:0.72更新内容 一、Metro 新功能 众所周知,Metro 是 React Native 默认的 Jav…

idea插件开发-自定义语言4-Syntax Highlighter

SyntaxHighlighter用于指定应如何突出显示特定范围的文本&#xff0c;ColorSettingPage可以定义颜色。 一、Syntax Highter 1、文本属性键&#xfeff; TextAttributesKey用于指定应如何突出显示特定范围的文本。不同类型的数据比如关键字、数字、字符串等如果要突出显示都需…

代码-【5 二叉树非递归后序遍历,找指定结点的父节点】

二叉树T按二叉链表存储&#xff0c;求指定结点q的父节点&#xff1a;

【Ubuntu系统18.04虚拟机ros下实现darknet_ros(YOLO V3)检测问题解析最全】

原本打算在搭载Ubuntu18.04的智能小车上面运行使用darknet_ros 包来进行yolov3的检测&#xff0c;但是运行过程中遇到了不少问题&#xff0c;从头到尾部的运行包括遇到的解决方法以及对应的文章一并列出&#xff0c;免得到处查找。 首先是在ROS下实现darknet_ros(YOLO V3)检测…

浅谈自动化测试

谈谈那些实习测试工程师应该掌握的基础知识&#xff08;一&#xff09;_什么时候才能变强的博客-CSDN博客https://blog.csdn.net/qq_17496235/article/details/131839453谈谈那些实习测试工程师应该掌握的基础知识&#xff08;二&#xff09;_什么时候才能变强的博客-CSDN博客h…

使用克拉默法则进行三点定圆(二维)

目录 1.二维圆2.python代码3.计算结果 本文由CSDN点云侠原创&#xff0c;爬虫网站请自重。 1.二维圆 已知不共线的三个点&#xff0c;设其坐标为 ( x 1 , y 1 ) (x_1,y_1) (x1​,y1​)、 ( x 2 , y 2 ) (x_2,y_2) (x2​,y2​)、 ( x 3 , y 3 ) (x_3,y_3) (x3​,y3​)&#xf…

FSM:Full Surround Monodepth from Multiple Cameras

参考代码&#xff1a;None 介绍 深度估计任务作为基础环境感知任务&#xff0c;在基础上构建的3D感知才能更加准确&#xff0c;并且泛化能力更强。单目的自监督深度估计已经有MonoDepth、ManyDepth这些经典深度估计模型了&#xff0c;而这篇文章是对多目自监督深度估计进行探…