MQ最终一致性理论与实践

MQ最终一致性理论与实践

原理

分布式事务无论是2PC&3PC还是TCC,基本都遵守XA协议的思想,但全局事务方案并发性较差;

最终一致性方案指的是将最有可能出错的业务以本地事务的方式完成后,采用不断重试的方式(不限于消息系统)来促使同一个分布式事务中的其他关联业务全部完成,不遵从XA协议。相关理论可以参考并学习 分布式事务 | 凤凰架构 | 可靠事件队列

需求

创建订单(order-service), 同时扣减库存(repo-service)

非事务型消息队列

非事务型消息队列

order-service 本地事务

  1. 在t_order表添加订单记录
  2. 在transaction_log 添加对应的扣减库存消息

repo-service 本地事务

  1. 检查本次扣减库存操作是否已经执行过 && 是否可以扣减库存
  2. 执行扣减库存
  3. 写判重表
  4. 向MQ 发送消费完成 ACK

repo-service重复收到消息的原因,一是生产者重复生产,二是中间件重传。为了实现业务的幂等性,repo-service 中维护了一张判重表

  1. order-service后台任务会把消息表中的消息发送给MQ,成功后则删除消息表中的消息。如网络超时则会重新发送消息直到MQ响应成功ACK。这样可能会导致消息的重复,需要repo-service做去重操作。
  2. MQ向repo-service推送消息时,repo-service处理消费完成后会向MQ进行ACK响应,但如果ACK响应发送网络超时则也会出现消费重复消费的情况,需要repo-service做去重操作。

RocketMQ事务型消息队列

RocketMQ事务型消息队列

存在的问题

  1. producer发送失败
  2. producer.send()返回消息异常
  3. 本地事务执行,如果异常,此时如何解决

解决方案

  1. 如果发送失败,那么调用端直接抛出异常,后续不会执行。✅
  2. 如果producer.send没有返回send_ok,则不会执行executeLocal方法,后续会执行check回查,一直查不到信息,最后会回滚消息。✅
  3. rocketMQ的client在事务消息中的bug 下文分析 ❌

如实现方案类似@Transactional add(serviceA→producer.sendTransactionMessage()→saveTransaction(中置half)这一流程,如果在本地事务执行过程中,saveTransaction出现异常,当前操作不会成功,但是由于exception会被RocketMQ捕获,且不会继续抛出,因此异常不会被事务方法add感知,导致serivceA执行成功,但是本地事务不成功。然后执行回查时,会回滚消息,后续的serviceB不会消费消息。

RocketMQ 源码解析

try {sendResult = this.send(msg); //同步发送消息
} catch (Exception e) {throw new MQClientException("send message Exception", e);
}LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;//判断sendResult类型
switch (sendResult.getSendStatus()) {case SEND_OK: {try {if (sendResult.getTransactionId() != null) {msg.putUserProperty("__transactionId__", sendResult.getTransactionId());}String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);if (null != transactionId && !"".equals(transactionId)) {msg.setTransactionId(transactionId);}if (null != localTransactionExecuter) {localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);} else if (transactionListener != null) {log.debug("Used new transaction API");//执行本地事务localTransactionState = transactionListener.executeLocalTransaction(msg, arg);}if (null == localTransactionState) {localTransactionState = LocalTransactionState.UNKNOW;}if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {log.info("executeLocalTransactionBranch return {}", localTransactionState);log.info(msg.toString());}} catch (Throwable e) {//在executeLocalTransaction执行中,如果抛出异常,会被catch掉,但是没有重新throw,因此不会被调用方感知log.info("executeLocalTransactionBranch exception", e);log.info(msg.toString());localException = e;}}break;case FLUSH_DISK_TIMEOUT:case FLUSH_SLAVE_TIMEOUT:case SLAVE_NOT_AVAILABLE:localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;break;default:break;
}

RocketMQ最终一致性如何正确开发

producer端

发送half后置

当前half后置发送的开发,订单服务+事务消息落库+producer.sendHalf都是在一个事务中,注意去判断发送消息返回是否为send_ok。
在这里插入图片描述

  1. 订单数据或者事务消息有异常,由于在同一个事务中,因此事务rollback ✅
  2. 如果half消息发送异常,外层事务方法可以感知,因此事务rollback ✅
  3. 对sendResult的发送结果判断事务是否发送成功,如果发送结果不是send_ok,那么需要抛出异常,此时执行事务rollback ✅

发送half中置

订单和producer.sendHalf在同一个事务方法中,事务消息持久化在executeLocalTransaction方法中。
在这里插入图片描述

  1. 如果下单异常,那么事务rollback,则send不会执行 ✅
  2. 如果half发送失败,事务rollback ✅
  3. 如果send_ok,那么下单操作完成,但是executeLocalTransaction执行失败
    1. 如果不抛出异常,localTransactionState=RollBack时,订单的事务方法感知不到异常,导致订单落库,事务消息存储失败 ❌
    2. 如果抛出异常,会被rocketmq捕获,也感知不到异常 ❌

发送half前置

把业务方法和事务持久化的操作,统一放在executeLocalTransaction方法中
在这里插入图片描述

  1. producer.sendHalf() 异常或者状态不为send_ok,那么抛出异常,本地事务不执行 ✅
  2. 如果本地事务抛出异常,事务Rollback。抛出的异常被rocketmq捕获,broker不会得到事务状态和启动本地回查。✅

结论

对于half前后置都可以保证事务的最终一致性,但是对于把所有的事务执行放在executeLocalTransaction中执行,略微有些问题

  1. 如果业务方法耗时,执行executeLocalTransaction的执行时间过长,可以会增加不必要的回查;而half消息后置,把业务方法先执行,那么会减少不必要的事务回查。
  2. 会添加将object转化为java-bean的代码。

consumer端

  1. consumer端消费失败而去执行回滚的话,需要付出更多的代价,而且还会引发其他系统回退导致的新问题。
  2. consumer端会返回reconsume_later,并重发消息,且默认重试16次,直到消费成功。如果失败,则人工介入处理。

解决方案

  1. 设置消息重试次数,如果达到指定次数,就发邮件或者短信通知人工介入;
  2. 等待消息重试次数超过默认的16次,进入死信队列,然后程序监听对应的私信队列主题,通知人工介入或者在rocketMQ控制台查看处理。

实战(代码样例)

欢迎star https://github.com/WeiXiao-Hyy/mq-eventual-consistency

参考资料

  • MQ最终一致性事务 - 文章分类 - LBJboy - 博客园
  • SpringCloud Alibaba微服务实战三十二 - 集成RocketMQ实现分布式事务
  • 基于RocketMQ分布式事务 - 完整示例 - 掘金

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/692591.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

代码随想录算法训练营(贪心6)| 738.单调递增的数字 968.监控二叉树

738.单调递增的数字 题目链接/文章讲解 968.监控二叉树 (可以跳过) leetcode题目链接 本题是贪心和二叉树的一个结合,比较难,一刷大家就跳过吧。 题目链接/文章讲解 class Solution { private:int result;int traversal(TreeN…

视频接入协议之MIPI

MIPI(Mobile Industry Processor Interface)是一种用于移动设备的串行接口标准,旨在提供高速、低功耗、低成本的接口解决方案。MIPI联盟是一个全球性的组织,致力于开发、推广和管理MIPI标准。 MIPI接口包括了多种协议和规范&…

人工智能之数学基础【梯度下降法】

梯度下降法是求解无约束优化问题的一种简单而有效的优化方法,是一种利用目标函数的Taylor展开构造搜索方向的方法。 思想 梯度下降法三要素:出发点、下降方向、下降步长。用梯度下降法求解优化问题的基本思想可以类比为一个下山的过程,可微分的函数代表一座山,目标就是找…

K8S临时小结

k8s是什么?能解决什么问题? k8s是容器管理平台,一套复杂的开源系统 如何更好的维护pod,k8s第二大要素(pod控制器) k8s的很多对容器(pod)管理的高级特性,都是基于控制器…

4.【架构师成长之路】职场新人:如何快速变得专业(上)

文章目录 导言一、快速变得熟练1、研发类工具2、运维类工具3、泛文档类工具 二、能够系统化思考1、提升思考全面性2、提升内容逻辑性 三、最佳实践本文总结说明 导言 前三篇文章我们讲了在校期间及临近毕业时,你需要做一些怎样的准备。而这些准备本身不仅仅是为了毕…

【算法】树状数组

文章目录 一、基本概念二、核心操作三、常见应用 一、基本概念 树状数组用于动态维护一段区间,操作的时间复杂度为 O ( l o g n ) O(logn) O(logn) 定义: t [ i ] [ i − l o w b i t ( i ) 1 , i ] t[i] [i - lowbit(i) 1, i] t[i][i−lowbit(i)…

harmony 鸿蒙系统学习 安装ohpm报错 ohpm install failed

一. 安装配置 DevEco Studio 安装包时报错 execute ohpm install failed. Install task failed: ArkTS 3.2.12.5. Install ArkTS dependencies failed. 解决办法 找原因,首先,我的电脑中之前安装过node,也许是因为这个。(其实…

Git 使用教程

一、Git的认识 1.1版本控制 什么是“版本控制”?我为什么要关心它呢? 版本控制是一种记录一个或若干文件内容变化,以便将来查阅特定版本修订情况的系统。 a) 还原:如果你是程序开发者,在新写一个促销活动的java文…

Linux常见基本指令

本文将详细的介绍Linux中各常见指令的用法,并且在每个指令都有使用样例。一共有以下指令: 1. man指令 2.目录基础指令:2.1 pwd指令、2.2 ls指令、2.3 cd指令 3.文件创建与删除:3.1 touch指令、3.2 mkdir指令、3.3 rmdir 指令 &…

Rabbitmq入门与应用(二)-RabbitMQ工作模型

RabbitMQ工作模型 RabbitMQ Tutorials — RabbitMQ Broker RabbitMQ服务。 Connection 生产者或是服务者都需要与Broker建立的TCP连接。 Channel 保持的TCP长连接里面去创建和释放Channel,从而减少资源的消耗。其中Channel是相互隔离的,不能共享。 Queu…

PHP如何利用post与get方式传值接收数据

目录 一、POST传值1. 使用curl库发送 POST 请求:2. 使用file_get_contents()函数发送 POST 请求:3. 使用stream_socket_client()函数发送 POST 请求:4. 利用from表单提交数据: 二、GET传值1. 使用http_build_query()函数构建 URL …

Java IO:同步阻塞和装饰器模式详解

前言 大家好,我是chowley,今天来介绍一下Java IO中的两个重要概念——同步阻塞和装饰器模式。 同步阻塞 在计算机编程中,同步阻塞(Synchronous Blocking)指的是在进行某个操作时,当前线程会被阻塞&#…

代码随想录算法训练营|day36

第八章 贪心算法 435.无重叠区间763.划分字母区间56.合并区间代码随想录文章详解总结 435.无重叠区间 对右边界升序排序,类似用最少数箭射爆气球,遍历区间,若当前区间与前一个区间不重合,更新区间边界end为当前区间右边界&#x…

【ansible】自动化运维ansible之playbook剧本编写与运行

目录 一、ansible剧本playbook的组成 二、palybook的基础应用: 实操1:通过palybooks完成nginx的安装 第一种:通过yum安装nginx 第二种:通过编译安装nginx 实操2:playbook定义、引用变量​​​​​​​ 实操3:通过…

有哪几种行为会导致服务器被入侵

导致服务器被入侵的行为有很多种,以下是一些常见的行为: 系统漏洞:服务器操作系统或软件存在漏洞,攻击者可以通过利用这些漏洞获取系统权限,从而入侵服务器。 弱口令:服务器的账号密码过于简单或者未及时更…

C#泛型及其应用:获取并显示员工信信息

目录 一、关于泛型 1.泛型定义 2.泛型与非泛型的区别 3.泛型的应用 (1)泛型类: (2)泛型方法: (3)泛型委托: (4)泛型接口: &a…

通过conda安装cudatoolikit和cudnn

通过conda安装cudatoolikit和cudnn 安装cudatoolkit安装cudnn安装cudatoolkit-dev 安装cudatoolkit conda install cudatoolkit11.3 -c https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/ 安装cudnn conda install cudnn8.5 -c https://mirrors.tuna.tsinghua.edu.…

ECMAScript 6+ 新特性 ( 二 )

2.12. class类 ES6 提供了更接近传统语言的写法,引入了 Class(类)这个概念,作为对象的模板。通过 class 关键字,可以定义类。 ES6 的 class 可以看作只是一个语法糖,它的绝大部分功能ES5 都可以做到&…

五种多目标优化算法(MOGWO、MOJS、NSWOA、MOPSO、MOAHA)性能对比(提供MATLAB代码)

一、5种多目标优化算法简介 1.1MOGWO 1.2MOJS 1.3NSWOA 1.4MOPSO 1.5MOAHA 二、5种多目标优化算法性能对比 为了测试5种算法的性能将其求解9个多目标测试函数(zdt1、zdt2 、zdt3、 zdt4、 zdt6 、Schaffer、 Kursawe 、Viennet2、 Viennet3)&#xff0…

安装python开发包管理环境miniconda

Python 管理 —— Conda Python 环境管理的价值在于将同一个 Python 版本的不同需求分开,比如:项目 A 和 项目 B 都需要 Python 3.10.11 这个版本,都用到了 requests 包,但是项目 A 需要 requests 2.1,而项目 B 需要 …