【SpringCloud-Seata源码分析2】

文章目录

  • 分支事务注册-客户端
  • 分支事务服务端的执行

分支事务注册-客户端

第一篇我们将全局事务启动,以及开启源码分析完成了,现在我们需要看一下分支事务注册。
在这里插入图片描述
我们分支事务的开始需要从PreparedStatementProxy#executeUpdate中去看。

public class PreparedStatementProxy extends AbstractPreparedStatementProxy implements PreparedStatement, ParametersHolder {public Map<Integer, ArrayList<Object>> getParameters() {return this.parameters;}public PreparedStatementProxy(AbstractConnectionProxy connectionProxy, PreparedStatement targetStatement, String targetSQL) throws SQLException {super(connectionProxy, targetStatement, targetSQL);}public boolean execute() throws SQLException {return (Boolean)ExecuteTemplate.execute(this, (statement, args) -> {return statement.execute();}, new Object[0]);}public ResultSet executeQuery() throws SQLException {return (ResultSet)ExecuteTemplate.execute(this, (statement, args) -> {return statement.executeQuery();}, new Object[0]);}
//这个是分支事务的核心入口public int executeUpdate() throws SQLException {return (Integer)ExecuteTemplate.execute(this, (statement, args) -> {return statement.executeUpdate();}, new Object[0]);}
}

判断出当前的业务Sql是什么类型,我们需要选择不同的执行器。

public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers, StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback, Object... args) throws SQLException {//判断是否是全局锁,并且是否是AT模式if (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) {//否 执行普通的SQLreturn statementCallback.execute(statementProxy.getTargetStatement(), args);} else {//获取数据库类型String dbType = statementProxy.getConnectionProxy().getDbType();if (CollectionUtils.isEmpty(sqlRecognizers)) {sqlRecognizers = SQLVisitorFactory.get(statementProxy.getTargetSQL(), dbType);}Object executor;if (CollectionUtils.isEmpty(sqlRecognizers)) {executor = new PlainExecutor(statementProxy, statementCallback);} else if (sqlRecognizers.size() == 1) {SQLRecognizer sqlRecognizer = (SQLRecognizer)sqlRecognizers.get(0);label44://根据不同的SQL类型选择不同的执行器switch(sqlRecognizer.getSQLType()) {case INSERT:executor = (Executor)EnhancedServiceLoader.load(InsertExecutor.class, dbType, new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class}, new Object[]{statementProxy, statementCallback, sqlRecognizer});break;case UPDATE:executor = new UpdateExecutor(statementProxy, statementCallback, sqlRecognizer);break;case DELETE:executor = new DeleteExecutor(statementProxy, statementCallback, sqlRecognizer);break;case SELECT_FOR_UPDATE:executor = new SelectForUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);break;case INSERT_ON_DUPLICATE_UPDATE:byte var8 = -1;switch(dbType.hashCode()) {case 104382626:if (dbType.equals("mysql")) {var8 = 0;}break;case 839186932:if (dbType.equals("mariadb")) {var8 = 1;}}switch(var8) {case 0:case 1:executor = new MySQLInsertOrUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);break label44;default:throw new NotSupportYetException(dbType + " not support to INSERT_ON_DUPLICATE_UPDATE");}default:executor = new PlainExecutor(statementProxy, statementCallback);}} else {executor = new MultiExecutor(statementProxy, statementCallback, sqlRecognizers);}try {//核心入口执行T rs = ((Executor)executor).execute(args);return rs;} catch (Throwable var9) {Throwable ex = var9;if (!(var9 instanceof SQLException)) {ex = new SQLException(var9);}throw (SQLException)ex;}}}

excute()执行

    public T execute(Object... args) throws Throwable {//获取事务的xidString xid = RootContext.getXID();if (xid != null) {//绑定xidthis.statementProxy.getConnectionProxy().bind(xid);}//将事务绑定上全局锁this.statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());return this.doExecute(args);}
    protected T executeAutoCommitTrue(Object[] args) throws Throwable {ConnectionProxy connectionProxy = this.statementProxy.getConnectionProxy();Object var3;try {//将事务改为手动提交connectionProxy.changeAutoCommit();var3 = (new AbstractDMLBaseExecutor.LockRetryPolicy(connectionProxy)).execute(() -> {//执行非自动提交T result = this.executeAutoCommitFalse(args);connectionProxy.commit();return result;});} catch (Exception var7) {LOGGER.error("execute executeAutoCommitTrue error:{}", var7.getMessage(), var7);if (!AbstractDMLBaseExecutor.LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {			//报错将事务进行回滚connectionProxy.getTargetConnection().rollback();}throw var7;} finally {connectionProxy.getContext().reset();//将自动提交置为trueconnectionProxy.setAutoCommit(true);}return var3;}
protected T executeAutoCommitFalse(Object[] args) throws Exception {if (!"mysql".equalsIgnoreCase(this.getDbType()) && this.isMultiPk()) {throw new NotSupportYetException("multi pk only support mysql!");} else {//设置前置镜像TableRecords beforeImage = this.beforeImage();//执行业务SqlT result = this.statementCallback.execute(this.statementProxy.getTargetStatement(), args);int updateCount = this.statementProxy.getUpdateCount();if (updateCount > 0) {//执行后置镜像TableRecords afterImage = this.afterImage(beforeImage);this.prepareUndoLog(beforeImage, afterImage);}return result;}}

执行提交

    public void commit() throws SQLException {try {this.lockRetryPolicy.execute(() -> {this.doCommit();return null;});} catch (SQLException var2) {if (this.targetConnection != null && !this.getAutoCommit() && !this.getContext().isAutoCommitChanged()) {this.rollback();}throw var2;} catch (Exception var3) {throw new SQLException(var3);}}

执行事务的提交

   private void doCommit() throws SQLException {if (this.context.inGlobalTransaction()) {//如果是全局事务就执行此方法this.processGlobalTransactionCommit();} else if (this.context.isGlobalLockRequire()) {//执行全局锁的事务提交this.processLocalCommitWithGlobalLocks();} else {//其他this.targetConnection.commit();}}

组装请求数据,发送后端

  public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException {try {BranchRegisterRequest request = new BranchRegisterRequest();request.setXid(xid);request.setLockKey(lockKeys);request.setResourceId(resourceId);request.setBranchType(branchType);request.setApplicationData(applicationData);BranchRegisterResponse response = (BranchRegisterResponse)RmNettyRemotingClient.getInstance().sendSyncRequest(request);if (response.getResultCode() == ResultCode.Failed) {throw new RmTransactionException(response.getTransactionExceptionCode(), String.format("Response[ %s ]", response.getMsg()));} else {return response.getBranchId();}} catch (TimeoutException var9) {throw new RmTransactionException(TransactionExceptionCode.IO, "RPC Timeout", var9);} catch (RuntimeException var10) {throw new RmTransactionException(TransactionExceptionCode.BranchRegisterFailed, "Runtime", var10);}}

分支事务服务端的执行

在这里插入图片描述
我们首先看一下分支事务服务端注册的入口DefaultCoordinator#

    @Overridepublic AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {//判断请求是否来自于事务的客户端if (!(request instanceof AbstractTransactionRequestToTC)) {throw new IllegalArgumentException();}AbstractTransactionRequestToTC transactionRequest = (AbstractTransactionRequestToTC) request;transactionRequest.setTCInboundHandler(this);return transactionRequest.handle(context);}

我们的核心分支事务注册代码

    @Overridepublic BranchRegisterResponse handle(BranchRegisterRequest request, final RpcContext rpcContext) {//创建返回的实例BranchRegisterResponse response = new BranchRegisterResponse();exceptionHandleTemplate(new AbstractCallback<BranchRegisterRequest, BranchRegisterResponse>() {@Overridepublic void execute(BranchRegisterRequest request, BranchRegisterResponse response)throws TransactionException {try {//核心分支注册实例doBranchRegister(request, response, rpcContext);} catch (StoreException e) {throw new TransactionException(TransactionExceptionCode.FailedStore, String.format("branch register request failed. xid=%s, msg=%s", request.getXid(), e.getMessage()), e);}}}, request, response);return response;}
 @Overridepublic Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,String applicationData, String lockKeys) throws TransactionException {// key1:获取GlobalSessionGlobalSession globalSession = assertGlobalSessionNotNull(xid, false);return SessionHolder.lockAndExecute(globalSession, () -> {// 检查事务状态globalSessionStatusCheck(globalSession);globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());//key2: 创建分支会话BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId,applicationData, lockKeys, clientId);MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(branchSession.getBranchId()));//key3:增加分支事务锁branchSessionLock(globalSession, branchSession);try {//key4: 全局会话添加分支会话globalSession.addBranch(branchSession);} catch (RuntimeException ex) {// key5: 出现异常释放锁branchSessionUnlock(branchSession);throw new BranchTransactionException(FailedToAddBranch, String.format("Failed to store branch xid = %s branchId = %s", globalSession.getXid(),branchSession.getBranchId()), ex);}if (LOGGER.isInfoEnabled()) {LOGGER.info("Register branch successfully, xid = {}, branchId = {}, resourceId = {} ,lockKeys = {}",globalSession.getXid(), branchSession.getBranchId(), resourceId, lockKeys);}// key6: 返回分支会话的分支IDreturn branchSession.getBranchId();});}

创建封装分支事务的信息

    public static BranchSession newBranchByGlobal(GlobalSession globalSession, BranchType branchType, String resourceId,String applicationData, String lockKeys, String clientId) {BranchSession branchSession = new BranchSession();branchSession.setXid(globalSession.getXid());branchSession.setTransactionId(globalSession.getTransactionId());branchSession.setBranchId(UUIDGenerator.generateUUID());branchSession.setBranchType(branchType);branchSession.setResourceId(resourceId);branchSession.setLockKey(lockKeys);branchSession.setClientId(clientId);branchSession.setApplicationData(applicationData);return branchSession;}

分支事务加锁

protected void branchSessionLock(GlobalSession globalSession, BranchSession branchSession)throws TransactionException {//分支事务的参数校验String applicationData = branchSession.getApplicationData();boolean autoCommit = true;boolean skipCheckLock = false;if (StringUtils.isNotBlank(applicationData)) {if (objectMapper == null) {objectMapper = new ObjectMapper();}try {Map<String, Object> data = objectMapper.readValue(applicationData, HashMap.class);Object clientAutoCommit = data.get(AUTO_COMMIT);if (clientAutoCommit != null && !(boolean)clientAutoCommit) {autoCommit = (boolean)clientAutoCommit;}Object clientSkipCheckLock = data.get(SKIP_CHECK_LOCK);if (clientSkipCheckLock instanceof Boolean) {skipCheckLock = (boolean)clientSkipCheckLock;}} catch (IOException e) {LOGGER.error("failed to get application data: {}", e.getMessage(), e);}}try {// 增加分支锁,如果返回false加锁失败我们直接抛出异常if (!branchSession.lock(autoCommit, skipCheckLock)) {throw new BranchTransactionException(LockKeyConflict,String.format("Global lock acquire failed xid = %s branchId = %s", globalSession.getXid(),branchSession.getBranchId()));}} catch (StoreException e) {if (e.getCause() instanceof BranchTransactionException) {throw new BranchTransactionException(((BranchTransactionException)e.getCause()).getCode(),String.format("Global lock acquire failed xid = %s branchId = %s", globalSession.getXid(),branchSession.getBranchId()));}throw e;}}
    public boolean lock(boolean autoCommit, boolean skipCheckLock) throws TransactionException {// 必须还AT事务if (this.getBranchType().equals(BranchType.AT)) {return LockerManagerFactory.getLockManager().acquireLock(this, autoCommit, skipCheckLock);}return true;}
    public boolean acquireLock(BranchSession branchSession, boolean autoCommit, boolean skipCheckLock) throws TransactionException {if (branchSession == null) {throw new IllegalArgumentException("branchSession can't be null for memory/file locker.");}// 获取分支锁的keyString lockKey = branchSession.getLockKey();if (StringUtils.isNullOrEmpty(lockKey)) {// no lockreturn true;}// get locks of branch// 行锁收集List<RowLock> locks = collectRowLocks(branchSession);if (CollectionUtils.isEmpty(locks)) {// no lockreturn true;}// 进行存储return getLocker(branchSession).acquireLock(locks, autoCommit, skipCheckLock);}

添加分支事务

 private void writeSession(LogOperation logOperation, SessionStorable sessionStorable) throws TransactionException {// 将事务信息写入数据库if (!transactionStoreManager.writeSession(logOperation, sessionStorable)) {if (LogOperation.GLOBAL_ADD.equals(logOperation)) {throw new GlobalTransactionException(TransactionExceptionCode.FailedWriteSession,"Fail to store global session");} else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {throw new GlobalTransactionException(TransactionExceptionCode.FailedWriteSession,"Fail to update global session");} else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {throw new GlobalTransactionException(TransactionExceptionCode.FailedWriteSession,"Fail to remove global session");} else if (LogOperation.BRANCH_ADD.equals(logOperation)) {throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession,"Fail to store branch session");} else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession,"Fail to update branch session");} else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession,"Fail to remove branch session");} else {throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession,"Unknown LogOperation:" + logOperation.name());}}}

如果发生异常

    @Overridepublic boolean unlock() throws TransactionException {if (this.getBranchType() == BranchType.AT) {// 释放锁return LockerManagerFactory.getLockManager().releaseLock(this);}return true;}
    public boolean releaseLock(BranchSession branchSession) throws TransactionException {if (branchSession == null) {throw new IllegalArgumentException("branchSession can't be null for memory/file locker.");}List<RowLock> locks = collectRowLocks(branchSession);try {// 释放锁return getLocker(branchSession).releaseLock(locks);} catch (Exception t) {LOGGER.error("unLock error, branchSession:{}", branchSession, t);return false;}}
  public boolean releaseLock(List<RowLock> locks) {if (CollectionUtils.isEmpty(locks)) {// no lockreturn true;}try {return lockStore.unLock(convertToLockDO(locks));} catch (StoreException e) {throw e;} catch (Exception t) {LOGGER.error("unLock error, locks:{}", CollectionUtils.toString(locks), t);return false;}}

将数据库的锁删除

private static final String BATCH_DELETE_LOCK_SQL = "delete from " + LOCK_TABLE_PLACE_HOLD+ " where " + ServerTableColumnsName.LOCK_TABLE_XID + " = ? and (" + LOCK_TABLE_PK_WHERE_CONDITION_PLACE_HOLD + ") ";

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/32488.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

智能物流系统堪比帝王宠信妃子,我给你类比说明一下……

导语 大家好&#xff0c;我是社长&#xff0c;老K。专注分享智能制造和智能仓储物流等内容。 新书《智能物流系统构成与技术实践》人俱乐部 让我们将智能物流系统种涉及出库入库作业完整链条的“货到人”拣选系统的工作流程与古代帝王宠信翻牌妃子的过程进行一个有趣的类比&…

【vue3|第13期】深入了解Vue3生命周期:管理组件的诞生、成长与消亡

日期&#xff1a;2024年6月22日 作者&#xff1a;Commas 签名&#xff1a;(ง •_•)ง 积跬步以致千里,积小流以成江海…… 注释&#xff1a;如果您觉得有所帮助&#xff0c;帮忙点个赞&#xff0c;也可以关注我&#xff0c;我们一起成长&#xff1b;如果有不对的地方&#xf…

【SSM】医疗健康平台-管理端-检查组管理

技能目标 掌握新增检查组功能的实现 掌握查询检查组功能的实现 掌握编辑检查组功能的实现 掌握删除检查组功能的实现 体检的检查项种类繁多&#xff0c;为了方便管理和快速筛选出类别相同的检查项&#xff0c;医疗健康将类别相同的检查项放到同一个检查组中进行管理&#…

【CV炼丹师勇闯力扣训练营 Day8】

CV炼丹师勇闯力扣训练营 代码随想录算法训练营第8天 ● 344.反转字符串 ● 541. 反转字符串II ● 卡码网&#xff1a;54.替换数字 一、344 反转字符串 编写一个函数&#xff0c;其作用是将输入的字符串反转过来。输入字符串以字符数组 s 的形式给出。 不要给另外的数组分配额…

# Kafka_深入探秘者(1):初识 kafka

Kafka_深入探秘者&#xff08;1&#xff09;&#xff1a;初识 kafka 一、kafka 特性 1、Kafka &#xff1a;最初是由 Linkedln 公司采用 Scala 语言开发的一个多分区、多副本并且基于 ZooKeeper 协调的分布式消息系统&#xff0c;现在已经捐献给了 Apache 基金会。目前 Kafka…

如何使用kimi智能助手:您的智能生活小助手

Kimi智能助手是一款功能强大的AI工具&#xff0c;旨在帮助用户提高工作效率和生活品质。下面小编将详细介绍如何使用Kimi智能助手&#xff0c;涵盖其主要功能以及一些实用技巧。 一、Kimi智能助手的主要功能 多语言对话能力&#xff1a;Kimi擅长中文和英文的对话&#xff0c;可…

探索计算机视觉(人工智能重要分支)的发展与应用

引言 在当今快速发展的科技时代&#xff0c;计算机视觉作为人工智能领域的重要分支&#xff0c;正日益成为各行各业不可或缺的关键技术。从简单的图像处理到复杂的智能系统&#xff0c;计算机视觉的发展不仅改变了我们看待世界的方式&#xff0c;也深刻影响着工业、医疗、交通等…

Windows安装配置jdk和maven

他妈的远程连接不上公司电脑&#xff0c;只能在家重新配置一遍&#xff0c;在此记录一下后端环境全部配置 Windows安装配置JDK 1.8一、下载 JDK 1.8二、配置环境变量三、验证安装 Windows安装配置Maven 3.8.8一、下载安装 Maven并配置环境变量二、设置仓库镜像及本地仓库三、测…

2024最新版Python 3.12.4安装使用指南

2024最新版Python 3.12.4安装使用指南 2024最新版Python 3.12.4安装使用指南0. Python的受欢迎程度1. 安装最新版Python 3.12.42. 验证Python 3.12.4版本3. 验证Python功能4. 使用IDLE交互式开发模式5. 安装Python扩展库相关阅读&#xff1a; By Jackson 2024最新版Python 3.12…

java基于ssm+jsp 汽车在线销售系统

1 前台功能模块 网站首页 网页首页汽车在线销售系统模块如下&#xff1a;首页、汽车信息、新闻资讯、留言反馈、我的收藏管理等功能图1 图1网页首页 网页前台车辆信息效果图如图2所示 图2 车辆信息界面图 2 管理员功能模块 管理员输入个人的账号、密码登录系统&#xff0c…

压力测试

1.什么是压力测试 压力测试考察当前软硬件环境下系统所能承受的最大负荷并帮助找出系统瓶颈所在。压测都是为了系统在线上的处理能力和稳定性维持在一个标准范围内&#xff0c;做到心中有数 使用压力测试&#xff0c;我们有希望找到很多种用其他测试方法更难发现的错误&#…

基于matlab的K-means聚类图像分割

1 原理 K-means聚类算法在图像分割中的应用是基于一种无监督的学习方法&#xff0c;它将图像中的像素点或特征区域划分为K个不同的簇或类别。以下是K-means聚类算法用于图像分割的原理&#xff0c;包括步骤和公式&#xff1a; 1.1 原理概述 选择簇的数量(K)&#xff1a; 首先…

YOLOv9基础 | 实时目标检测新SOTA,手把手带你深度解析yolov9论文!

前言:Hello大家好,我是小哥谈。YOLOv9是Chien-Yao Wang等人提出的YOLO系列的最新版本之一(截止到目前,YOLOv10已发布),于2024年2月21日发布。它是 YOLOv7的改进版本,两者均由Chien-Yao Wang及其同事开发。本节课就以YOLOv9论文为基础带大家深入解析YOLOv9算法。🌈 …

浏览器-服务器架构 (BS架构) 详解

目录 前言1. BS架构概述1.1 BS架构的定义1.2 BS架构的基本原理 2. BS架构的优势2.1 客户端简化2.2 易于更新和维护2.3 跨平台性强2.4 扩展性高 3. BS架构的劣势3.1 网络依赖性强3.2 安全性问题3.3 用户体验局限 4. BS架构的典型应用场景4.1 企业内部应用4.2 电子商务平台4.3 在…

java小代码(1)

代码 &#xff1a; 今日总结到此结束&#xff0c;拜拜&#xff01;

1999-2022年 297个地级市-医院卫生院数量及床位数量(数据收集)

全国297个地级市的医院卫生院数量的稳步增长是医疗事业发展的一个重要标志。政府的持续投入和对医疗设施的改善&#xff0c;不仅提升了医疗服务的硬件水平&#xff0c;也通过引进和培养医疗人才、优化服务流程&#xff0c;提高了医疗服务的整体质量。这些举措极大地增强了人民群…

C语言之详解预处理

前言&#xff1a; 预处理也叫预编译&#xff0c;是编译代码时的第一步&#xff0c;经过预处理后生成一个.i文件&#xff0c;如果不明白编译与链接作用的小伙伴可以先看看博主的上一篇博客—— &#xff0c;不然知识连贯性可能会显得很差哦。 正文目录&#xff1a; 预定义符号#…

font-spider按需生成字体文件

font-spider可以全局安装,也可以单个项目内安装,使用npm run xxxx的形式 npm i font-spider "dev": "font-spider ./*.html" <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name&…

Android测量

最大模式&#xff08;MeasureSpec.AT_MOST&#xff09; 这个也就是父组件&#xff0c;能够给出的最大的空间&#xff0c;当前组件的长或宽最大只能为这么大&#xff0c;当然也可以比这个小。 最高两位是11的时候表示”最大模式”。即MeasureSpec.AT_MOST未指定模式&#xff08;…

1996年-2023年 全国298个地级市-外商直接投资FDI(数据收集)

外商直接投资&#xff08;FDI&#xff09;是一种跨国界的经济活动&#xff0c;它涉及外国投资者在中国境内进行的直接投资行为。这种投资行为不仅包括以货币、实物、技术等形式的资本投入&#xff0c;还可能包括开办独资企业、合资企业、合作企业&#xff0c;以及参与资源开发等…