MQ最终一致性理论与实践

MQ最终一致性理论与实践

原理

分布式事务无论是2PC&3PC还是TCC,基本都遵守XA协议的思想,但全局事务方案并发性较差;

最终一致性方案指的是将最有可能出错的业务以本地事务的方式完成后,采用不断重试的方式(不限于消息系统)来促使同一个分布式事务中的其他关联业务全部完成,不遵从XA协议。相关理论可以参考并学习 分布式事务 | 凤凰架构 | 可靠事件队列

需求

创建订单(order-service), 同时扣减库存(repo-service)

非事务型消息队列

非事务型消息队列

order-service 本地事务

  1. 在t_order表添加订单记录
  2. 在transaction_log 添加对应的扣减库存消息

repo-service 本地事务

  1. 检查本次扣减库存操作是否已经执行过 && 是否可以扣减库存
  2. 执行扣减库存
  3. 写判重表
  4. 向MQ 发送消费完成 ACK

repo-service重复收到消息的原因,一是生产者重复生产,二是中间件重传。为了实现业务的幂等性,repo-service 中维护了一张判重表

  1. order-service后台任务会把消息表中的消息发送给MQ,成功后则删除消息表中的消息。如网络超时则会重新发送消息直到MQ响应成功ACK。这样可能会导致消息的重复,需要repo-service做去重操作。
  2. MQ向repo-service推送消息时,repo-service处理消费完成后会向MQ进行ACK响应,但如果ACK响应发送网络超时则也会出现消费重复消费的情况,需要repo-service做去重操作。

RocketMQ事务型消息队列

RocketMQ事务型消息队列

存在的问题

  1. producer发送失败
  2. producer.send()返回消息异常
  3. 本地事务执行,如果异常,此时如何解决

解决方案

  1. 如果发送失败,那么调用端直接抛出异常,后续不会执行。✅
  2. 如果producer.send没有返回send_ok,则不会执行executeLocal方法,后续会执行check回查,一直查不到信息,最后会回滚消息。✅
  3. rocketMQ的client在事务消息中的bug 下文分析 ❌

如实现方案类似@Transactional add(serviceA→producer.sendTransactionMessage()→saveTransaction(中置half)这一流程,如果在本地事务执行过程中,saveTransaction出现异常,当前操作不会成功,但是由于exception会被RocketMQ捕获,且不会继续抛出,因此异常不会被事务方法add感知,导致serivceA执行成功,但是本地事务不成功。然后执行回查时,会回滚消息,后续的serviceB不会消费消息。

RocketMQ 源码解析

try {sendResult = this.send(msg); //同步发送消息
} catch (Exception e) {throw new MQClientException("send message Exception", e);
}LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;//判断sendResult类型
switch (sendResult.getSendStatus()) {case SEND_OK: {try {if (sendResult.getTransactionId() != null) {msg.putUserProperty("__transactionId__", sendResult.getTransactionId());}String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);if (null != transactionId && !"".equals(transactionId)) {msg.setTransactionId(transactionId);}if (null != localTransactionExecuter) {localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);} else if (transactionListener != null) {log.debug("Used new transaction API");//执行本地事务localTransactionState = transactionListener.executeLocalTransaction(msg, arg);}if (null == localTransactionState) {localTransactionState = LocalTransactionState.UNKNOW;}if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {log.info("executeLocalTransactionBranch return {}", localTransactionState);log.info(msg.toString());}} catch (Throwable e) {//在executeLocalTransaction执行中,如果抛出异常,会被catch掉,但是没有重新throw,因此不会被调用方感知log.info("executeLocalTransactionBranch exception", e);log.info(msg.toString());localException = e;}}break;case FLUSH_DISK_TIMEOUT:case FLUSH_SLAVE_TIMEOUT:case SLAVE_NOT_AVAILABLE:localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;break;default:break;
}

RocketMQ最终一致性如何正确开发

producer端

发送half后置

当前half后置发送的开发,订单服务+事务消息落库+producer.sendHalf都是在一个事务中,注意去判断发送消息返回是否为send_ok。
在这里插入图片描述

  1. 订单数据或者事务消息有异常,由于在同一个事务中,因此事务rollback ✅
  2. 如果half消息发送异常,外层事务方法可以感知,因此事务rollback ✅
  3. 对sendResult的发送结果判断事务是否发送成功,如果发送结果不是send_ok,那么需要抛出异常,此时执行事务rollback ✅

发送half中置

订单和producer.sendHalf在同一个事务方法中,事务消息持久化在executeLocalTransaction方法中。
在这里插入图片描述

  1. 如果下单异常,那么事务rollback,则send不会执行 ✅
  2. 如果half发送失败,事务rollback ✅
  3. 如果send_ok,那么下单操作完成,但是executeLocalTransaction执行失败
    1. 如果不抛出异常,localTransactionState=RollBack时,订单的事务方法感知不到异常,导致订单落库,事务消息存储失败 ❌
    2. 如果抛出异常,会被rocketmq捕获,也感知不到异常 ❌

发送half前置

把业务方法和事务持久化的操作,统一放在executeLocalTransaction方法中
在这里插入图片描述

  1. producer.sendHalf() 异常或者状态不为send_ok,那么抛出异常,本地事务不执行 ✅
  2. 如果本地事务抛出异常,事务Rollback。抛出的异常被rocketmq捕获,broker不会得到事务状态和启动本地回查。✅

结论

对于half前后置都可以保证事务的最终一致性,但是对于把所有的事务执行放在executeLocalTransaction中执行,略微有些问题

  1. 如果业务方法耗时,执行executeLocalTransaction的执行时间过长,可以会增加不必要的回查;而half消息后置,把业务方法先执行,那么会减少不必要的事务回查。
  2. 会添加将object转化为java-bean的代码。

consumer端

  1. consumer端消费失败而去执行回滚的话,需要付出更多的代价,而且还会引发其他系统回退导致的新问题。
  2. consumer端会返回reconsume_later,并重发消息,且默认重试16次,直到消费成功。如果失败,则人工介入处理。

解决方案

  1. 设置消息重试次数,如果达到指定次数,就发邮件或者短信通知人工介入;
  2. 等待消息重试次数超过默认的16次,进入死信队列,然后程序监听对应的私信队列主题,通知人工介入或者在rocketMQ控制台查看处理。

实战(代码样例)

欢迎star https://github.com/WeiXiao-Hyy/mq-eventual-consistency

参考资料

  • MQ最终一致性事务 - 文章分类 - LBJboy - 博客园
  • SpringCloud Alibaba微服务实战三十二 - 集成RocketMQ实现分布式事务
  • 基于RocketMQ分布式事务 - 完整示例 - 掘金

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/692591.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

视频接入协议之MIPI

MIPI(Mobile Industry Processor Interface)是一种用于移动设备的串行接口标准,旨在提供高速、低功耗、低成本的接口解决方案。MIPI联盟是一个全球性的组织,致力于开发、推广和管理MIPI标准。 MIPI接口包括了多种协议和规范&…

K8S临时小结

k8s是什么?能解决什么问题? k8s是容器管理平台,一套复杂的开源系统 如何更好的维护pod,k8s第二大要素(pod控制器) k8s的很多对容器(pod)管理的高级特性,都是基于控制器…

4.【架构师成长之路】职场新人:如何快速变得专业(上)

文章目录 导言一、快速变得熟练1、研发类工具2、运维类工具3、泛文档类工具 二、能够系统化思考1、提升思考全面性2、提升内容逻辑性 三、最佳实践本文总结说明 导言 前三篇文章我们讲了在校期间及临近毕业时,你需要做一些怎样的准备。而这些准备本身不仅仅是为了毕…

harmony 鸿蒙系统学习 安装ohpm报错 ohpm install failed

一. 安装配置 DevEco Studio 安装包时报错 execute ohpm install failed. Install task failed: ArkTS 3.2.12.5. Install ArkTS dependencies failed. 解决办法 找原因,首先,我的电脑中之前安装过node,也许是因为这个。(其实…

Git 使用教程

一、Git的认识 1.1版本控制 什么是“版本控制”?我为什么要关心它呢? 版本控制是一种记录一个或若干文件内容变化,以便将来查阅特定版本修订情况的系统。 a) 还原:如果你是程序开发者,在新写一个促销活动的java文…

Linux常见基本指令

本文将详细的介绍Linux中各常见指令的用法,并且在每个指令都有使用样例。一共有以下指令: 1. man指令 2.目录基础指令:2.1 pwd指令、2.2 ls指令、2.3 cd指令 3.文件创建与删除:3.1 touch指令、3.2 mkdir指令、3.3 rmdir 指令 &…

Rabbitmq入门与应用(二)-RabbitMQ工作模型

RabbitMQ工作模型 RabbitMQ Tutorials — RabbitMQ Broker RabbitMQ服务。 Connection 生产者或是服务者都需要与Broker建立的TCP连接。 Channel 保持的TCP长连接里面去创建和释放Channel,从而减少资源的消耗。其中Channel是相互隔离的,不能共享。 Queu…

【ansible】自动化运维ansible之playbook剧本编写与运行

目录 一、ansible剧本playbook的组成 二、palybook的基础应用: 实操1:通过palybooks完成nginx的安装 第一种:通过yum安装nginx 第二种:通过编译安装nginx 实操2:playbook定义、引用变量​​​​​​​ 实操3:通过…

C#泛型及其应用:获取并显示员工信信息

目录 一、关于泛型 1.泛型定义 2.泛型与非泛型的区别 3.泛型的应用 (1)泛型类: (2)泛型方法: (3)泛型委托: (4)泛型接口: &a…

五种多目标优化算法(MOGWO、MOJS、NSWOA、MOPSO、MOAHA)性能对比(提供MATLAB代码)

一、5种多目标优化算法简介 1.1MOGWO 1.2MOJS 1.3NSWOA 1.4MOPSO 1.5MOAHA 二、5种多目标优化算法性能对比 为了测试5种算法的性能将其求解9个多目标测试函数(zdt1、zdt2 、zdt3、 zdt4、 zdt6 、Schaffer、 Kursawe 、Viennet2、 Viennet3)&#xff0…

leetcode1049:最后一块石头的重量二

解题思路: 把石头堆分割成差不多的两堆,使得两堆差值最小 dp数组的含义: dp[j]:背包容量为j的背包最大重量(价值)为dp[j] dp[j] max(dp[j],dp[j-stones[i]] stones[i]) 初始化:(dp数组的大小根据题目进行定义&a…

网络安全--网鼎杯2018漏洞复现(二次注入)

一、环境:在线测试平台 BUUCTF在线评测 (buuoj.cn) 二、进入界面先尝试万能账号 1or11# 换格式 hais1bux1 11or11# 三、万能的不行那我们就得想注册了,去register.php去看看 注册个账号 发现用户名回显,猜测考点为用户名处二次注入&…

Java 那些诗一般的 数据类型 (1)

本篇会加入个人的所谓‘鱼式疯言’ ❤️❤️❤️鱼式疯言:❤️❤️❤️此疯言非彼疯言 而是理解过并总结出来通俗易懂的大白话, 小编会尽可能的在每个概念后插入鱼式疯言,帮助大家理解的. 🤭🤭🤭可能说的不是那么严谨.但小编初心是能让更多人…

【JavaScript 语法】

JavaScript 语法 ■ JavaScript 是什么■ JavaScript 语法■ JS 注释■ JS 结束符■ JS 输入输出语句■ JS 代码块■ JS 关键词■ JS 值■ JS 字面量 (混合值)■ JS 变量(变量值)■ JS 文本值 (字符串)■ JS 字符串可以是对象 ■ …

JS文本加密方法探究

在前端开发中,有时候我们需要对敏感文本进行简单的加密,以提高安全性。本文将介绍一种基于 JavaScript 实现的文本加密方法,使用了 Base64、Unicode 和 ROT13 编码。 示例代码 function encodeText(text) {// Base64编码var base64Encoded …

MongoDB 权限管理

文章目录 前言1. 权限控制1.1 MongoDB 默认角色1.1.1 读写角色1.1.2 管理角色1.1.3 其他角色1.1.4 超级用户角色 1.2 用户管理1.2.1 查看用户1.2.2 创建新用户1.2.3 调整角色1.2.4 删除用户1.2.4 修改密码 前言 上一篇 《MongoDB 单机安装部署》 文章中,为 MongoDB…

STL常用之vector,list,stack,queue,deque总结与对比

一,vector 1)底层 vector的底层是开辟出来的一块连续空间,类似于数组,每次空间满了之后会根据不同的编译器有不同的扩容倍数。 2)优劣 优点:随机访问效率高,因为地址是连续的,底层…

Linux 驱动开发基础知识——APP 怎么读取按键值(十二)

个人名片: 🦁作者简介:学生 🐯个人主页:妄北y 🐧个人QQ:2061314755 🐻个人邮箱:2061314755qq.com 🦉个人WeChat:Vir2021GKBS 🐼本文由…

前端简单知识复习

1.symbol类型 Symbol 是 ECMAScript 6 中引入的一种新的基本数据类型,它表示独一无二的值。Symbol 值是通过 Symbol() 函数创建的。 Symbol 值具有以下特点: 独一无二性(唯一性):每个通过 Symbol() 函数创建的 Symb…

基于RBAC的权限管理的理论实现和权限管理的实现

权限管理的理论 首先需要两个页面支持,分别是角色管理和员工管理,其中角色管理对应的是角色和权限的配合,员工管理则是将登录的员工账号和员工所处的角色进行对应,即通过新增角色这个概念,让权限和员工并不直接关联&a…