【SpringCloud-Seata源码分析2】

文章目录

  • 分支事务注册-客户端
  • 分支事务服务端的执行

分支事务注册-客户端

第一篇我们将全局事务启动,以及开启源码分析完成了,现在我们需要看一下分支事务注册。
在这里插入图片描述
我们分支事务的开始需要从PreparedStatementProxy#executeUpdate中去看。

public class PreparedStatementProxy extends AbstractPreparedStatementProxy implements PreparedStatement, ParametersHolder {public Map<Integer, ArrayList<Object>> getParameters() {return this.parameters;}public PreparedStatementProxy(AbstractConnectionProxy connectionProxy, PreparedStatement targetStatement, String targetSQL) throws SQLException {super(connectionProxy, targetStatement, targetSQL);}public boolean execute() throws SQLException {return (Boolean)ExecuteTemplate.execute(this, (statement, args) -> {return statement.execute();}, new Object[0]);}public ResultSet executeQuery() throws SQLException {return (ResultSet)ExecuteTemplate.execute(this, (statement, args) -> {return statement.executeQuery();}, new Object[0]);}
//这个是分支事务的核心入口public int executeUpdate() throws SQLException {return (Integer)ExecuteTemplate.execute(this, (statement, args) -> {return statement.executeUpdate();}, new Object[0]);}
}

判断出当前的业务Sql是什么类型,我们需要选择不同的执行器。

public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers, StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback, Object... args) throws SQLException {//判断是否是全局锁,并且是否是AT模式if (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) {//否 执行普通的SQLreturn statementCallback.execute(statementProxy.getTargetStatement(), args);} else {//获取数据库类型String dbType = statementProxy.getConnectionProxy().getDbType();if (CollectionUtils.isEmpty(sqlRecognizers)) {sqlRecognizers = SQLVisitorFactory.get(statementProxy.getTargetSQL(), dbType);}Object executor;if (CollectionUtils.isEmpty(sqlRecognizers)) {executor = new PlainExecutor(statementProxy, statementCallback);} else if (sqlRecognizers.size() == 1) {SQLRecognizer sqlRecognizer = (SQLRecognizer)sqlRecognizers.get(0);label44://根据不同的SQL类型选择不同的执行器switch(sqlRecognizer.getSQLType()) {case INSERT:executor = (Executor)EnhancedServiceLoader.load(InsertExecutor.class, dbType, new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class}, new Object[]{statementProxy, statementCallback, sqlRecognizer});break;case UPDATE:executor = new UpdateExecutor(statementProxy, statementCallback, sqlRecognizer);break;case DELETE:executor = new DeleteExecutor(statementProxy, statementCallback, sqlRecognizer);break;case SELECT_FOR_UPDATE:executor = new SelectForUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);break;case INSERT_ON_DUPLICATE_UPDATE:byte var8 = -1;switch(dbType.hashCode()) {case 104382626:if (dbType.equals("mysql")) {var8 = 0;}break;case 839186932:if (dbType.equals("mariadb")) {var8 = 1;}}switch(var8) {case 0:case 1:executor = new MySQLInsertOrUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);break label44;default:throw new NotSupportYetException(dbType + " not support to INSERT_ON_DUPLICATE_UPDATE");}default:executor = new PlainExecutor(statementProxy, statementCallback);}} else {executor = new MultiExecutor(statementProxy, statementCallback, sqlRecognizers);}try {//核心入口执行T rs = ((Executor)executor).execute(args);return rs;} catch (Throwable var9) {Throwable ex = var9;if (!(var9 instanceof SQLException)) {ex = new SQLException(var9);}throw (SQLException)ex;}}}

excute()执行

    public T execute(Object... args) throws Throwable {//获取事务的xidString xid = RootContext.getXID();if (xid != null) {//绑定xidthis.statementProxy.getConnectionProxy().bind(xid);}//将事务绑定上全局锁this.statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());return this.doExecute(args);}
    protected T executeAutoCommitTrue(Object[] args) throws Throwable {ConnectionProxy connectionProxy = this.statementProxy.getConnectionProxy();Object var3;try {//将事务改为手动提交connectionProxy.changeAutoCommit();var3 = (new AbstractDMLBaseExecutor.LockRetryPolicy(connectionProxy)).execute(() -> {//执行非自动提交T result = this.executeAutoCommitFalse(args);connectionProxy.commit();return result;});} catch (Exception var7) {LOGGER.error("execute executeAutoCommitTrue error:{}", var7.getMessage(), var7);if (!AbstractDMLBaseExecutor.LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {			//报错将事务进行回滚connectionProxy.getTargetConnection().rollback();}throw var7;} finally {connectionProxy.getContext().reset();//将自动提交置为trueconnectionProxy.setAutoCommit(true);}return var3;}
protected T executeAutoCommitFalse(Object[] args) throws Exception {if (!"mysql".equalsIgnoreCase(this.getDbType()) && this.isMultiPk()) {throw new NotSupportYetException("multi pk only support mysql!");} else {//设置前置镜像TableRecords beforeImage = this.beforeImage();//执行业务SqlT result = this.statementCallback.execute(this.statementProxy.getTargetStatement(), args);int updateCount = this.statementProxy.getUpdateCount();if (updateCount > 0) {//执行后置镜像TableRecords afterImage = this.afterImage(beforeImage);this.prepareUndoLog(beforeImage, afterImage);}return result;}}

执行提交

    public void commit() throws SQLException {try {this.lockRetryPolicy.execute(() -> {this.doCommit();return null;});} catch (SQLException var2) {if (this.targetConnection != null && !this.getAutoCommit() && !this.getContext().isAutoCommitChanged()) {this.rollback();}throw var2;} catch (Exception var3) {throw new SQLException(var3);}}

执行事务的提交

   private void doCommit() throws SQLException {if (this.context.inGlobalTransaction()) {//如果是全局事务就执行此方法this.processGlobalTransactionCommit();} else if (this.context.isGlobalLockRequire()) {//执行全局锁的事务提交this.processLocalCommitWithGlobalLocks();} else {//其他this.targetConnection.commit();}}

组装请求数据,发送后端

  public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException {try {BranchRegisterRequest request = new BranchRegisterRequest();request.setXid(xid);request.setLockKey(lockKeys);request.setResourceId(resourceId);request.setBranchType(branchType);request.setApplicationData(applicationData);BranchRegisterResponse response = (BranchRegisterResponse)RmNettyRemotingClient.getInstance().sendSyncRequest(request);if (response.getResultCode() == ResultCode.Failed) {throw new RmTransactionException(response.getTransactionExceptionCode(), String.format("Response[ %s ]", response.getMsg()));} else {return response.getBranchId();}} catch (TimeoutException var9) {throw new RmTransactionException(TransactionExceptionCode.IO, "RPC Timeout", var9);} catch (RuntimeException var10) {throw new RmTransactionException(TransactionExceptionCode.BranchRegisterFailed, "Runtime", var10);}}

分支事务服务端的执行

在这里插入图片描述
我们首先看一下分支事务服务端注册的入口DefaultCoordinator#

    @Overridepublic AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {//判断请求是否来自于事务的客户端if (!(request instanceof AbstractTransactionRequestToTC)) {throw new IllegalArgumentException();}AbstractTransactionRequestToTC transactionRequest = (AbstractTransactionRequestToTC) request;transactionRequest.setTCInboundHandler(this);return transactionRequest.handle(context);}

我们的核心分支事务注册代码

    @Overridepublic BranchRegisterResponse handle(BranchRegisterRequest request, final RpcContext rpcContext) {//创建返回的实例BranchRegisterResponse response = new BranchRegisterResponse();exceptionHandleTemplate(new AbstractCallback<BranchRegisterRequest, BranchRegisterResponse>() {@Overridepublic void execute(BranchRegisterRequest request, BranchRegisterResponse response)throws TransactionException {try {//核心分支注册实例doBranchRegister(request, response, rpcContext);} catch (StoreException e) {throw new TransactionException(TransactionExceptionCode.FailedStore, String.format("branch register request failed. xid=%s, msg=%s", request.getXid(), e.getMessage()), e);}}}, request, response);return response;}
 @Overridepublic Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,String applicationData, String lockKeys) throws TransactionException {// key1:获取GlobalSessionGlobalSession globalSession = assertGlobalSessionNotNull(xid, false);return SessionHolder.lockAndExecute(globalSession, () -> {// 检查事务状态globalSessionStatusCheck(globalSession);globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());//key2: 创建分支会话BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId,applicationData, lockKeys, clientId);MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(branchSession.getBranchId()));//key3:增加分支事务锁branchSessionLock(globalSession, branchSession);try {//key4: 全局会话添加分支会话globalSession.addBranch(branchSession);} catch (RuntimeException ex) {// key5: 出现异常释放锁branchSessionUnlock(branchSession);throw new BranchTransactionException(FailedToAddBranch, String.format("Failed to store branch xid = %s branchId = %s", globalSession.getXid(),branchSession.getBranchId()), ex);}if (LOGGER.isInfoEnabled()) {LOGGER.info("Register branch successfully, xid = {}, branchId = {}, resourceId = {} ,lockKeys = {}",globalSession.getXid(), branchSession.getBranchId(), resourceId, lockKeys);}// key6: 返回分支会话的分支IDreturn branchSession.getBranchId();});}

创建封装分支事务的信息

    public static BranchSession newBranchByGlobal(GlobalSession globalSession, BranchType branchType, String resourceId,String applicationData, String lockKeys, String clientId) {BranchSession branchSession = new BranchSession();branchSession.setXid(globalSession.getXid());branchSession.setTransactionId(globalSession.getTransactionId());branchSession.setBranchId(UUIDGenerator.generateUUID());branchSession.setBranchType(branchType);branchSession.setResourceId(resourceId);branchSession.setLockKey(lockKeys);branchSession.setClientId(clientId);branchSession.setApplicationData(applicationData);return branchSession;}

分支事务加锁

protected void branchSessionLock(GlobalSession globalSession, BranchSession branchSession)throws TransactionException {//分支事务的参数校验String applicationData = branchSession.getApplicationData();boolean autoCommit = true;boolean skipCheckLock = false;if (StringUtils.isNotBlank(applicationData)) {if (objectMapper == null) {objectMapper = new ObjectMapper();}try {Map<String, Object> data = objectMapper.readValue(applicationData, HashMap.class);Object clientAutoCommit = data.get(AUTO_COMMIT);if (clientAutoCommit != null && !(boolean)clientAutoCommit) {autoCommit = (boolean)clientAutoCommit;}Object clientSkipCheckLock = data.get(SKIP_CHECK_LOCK);if (clientSkipCheckLock instanceof Boolean) {skipCheckLock = (boolean)clientSkipCheckLock;}} catch (IOException e) {LOGGER.error("failed to get application data: {}", e.getMessage(), e);}}try {// 增加分支锁,如果返回false加锁失败我们直接抛出异常if (!branchSession.lock(autoCommit, skipCheckLock)) {throw new BranchTransactionException(LockKeyConflict,String.format("Global lock acquire failed xid = %s branchId = %s", globalSession.getXid(),branchSession.getBranchId()));}} catch (StoreException e) {if (e.getCause() instanceof BranchTransactionException) {throw new BranchTransactionException(((BranchTransactionException)e.getCause()).getCode(),String.format("Global lock acquire failed xid = %s branchId = %s", globalSession.getXid(),branchSession.getBranchId()));}throw e;}}
    public boolean lock(boolean autoCommit, boolean skipCheckLock) throws TransactionException {// 必须还AT事务if (this.getBranchType().equals(BranchType.AT)) {return LockerManagerFactory.getLockManager().acquireLock(this, autoCommit, skipCheckLock);}return true;}
    public boolean acquireLock(BranchSession branchSession, boolean autoCommit, boolean skipCheckLock) throws TransactionException {if (branchSession == null) {throw new IllegalArgumentException("branchSession can't be null for memory/file locker.");}// 获取分支锁的keyString lockKey = branchSession.getLockKey();if (StringUtils.isNullOrEmpty(lockKey)) {// no lockreturn true;}// get locks of branch// 行锁收集List<RowLock> locks = collectRowLocks(branchSession);if (CollectionUtils.isEmpty(locks)) {// no lockreturn true;}// 进行存储return getLocker(branchSession).acquireLock(locks, autoCommit, skipCheckLock);}

添加分支事务

 private void writeSession(LogOperation logOperation, SessionStorable sessionStorable) throws TransactionException {// 将事务信息写入数据库if (!transactionStoreManager.writeSession(logOperation, sessionStorable)) {if (LogOperation.GLOBAL_ADD.equals(logOperation)) {throw new GlobalTransactionException(TransactionExceptionCode.FailedWriteSession,"Fail to store global session");} else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {throw new GlobalTransactionException(TransactionExceptionCode.FailedWriteSession,"Fail to update global session");} else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {throw new GlobalTransactionException(TransactionExceptionCode.FailedWriteSession,"Fail to remove global session");} else if (LogOperation.BRANCH_ADD.equals(logOperation)) {throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession,"Fail to store branch session");} else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession,"Fail to update branch session");} else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession,"Fail to remove branch session");} else {throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession,"Unknown LogOperation:" + logOperation.name());}}}

如果发生异常

    @Overridepublic boolean unlock() throws TransactionException {if (this.getBranchType() == BranchType.AT) {// 释放锁return LockerManagerFactory.getLockManager().releaseLock(this);}return true;}
    public boolean releaseLock(BranchSession branchSession) throws TransactionException {if (branchSession == null) {throw new IllegalArgumentException("branchSession can't be null for memory/file locker.");}List<RowLock> locks = collectRowLocks(branchSession);try {// 释放锁return getLocker(branchSession).releaseLock(locks);} catch (Exception t) {LOGGER.error("unLock error, branchSession:{}", branchSession, t);return false;}}
  public boolean releaseLock(List<RowLock> locks) {if (CollectionUtils.isEmpty(locks)) {// no lockreturn true;}try {return lockStore.unLock(convertToLockDO(locks));} catch (StoreException e) {throw e;} catch (Exception t) {LOGGER.error("unLock error, locks:{}", CollectionUtils.toString(locks), t);return false;}}

将数据库的锁删除

private static final String BATCH_DELETE_LOCK_SQL = "delete from " + LOCK_TABLE_PLACE_HOLD+ " where " + ServerTableColumnsName.LOCK_TABLE_XID + " = ? and (" + LOCK_TABLE_PK_WHERE_CONDITION_PLACE_HOLD + ") ";

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/32488.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

智能物流系统堪比帝王宠信妃子,我给你类比说明一下……

导语 大家好&#xff0c;我是社长&#xff0c;老K。专注分享智能制造和智能仓储物流等内容。 新书《智能物流系统构成与技术实践》人俱乐部 让我们将智能物流系统种涉及出库入库作业完整链条的“货到人”拣选系统的工作流程与古代帝王宠信翻牌妃子的过程进行一个有趣的类比&…

【vue3|第13期】深入了解Vue3生命周期:管理组件的诞生、成长与消亡

日期&#xff1a;2024年6月22日 作者&#xff1a;Commas 签名&#xff1a;(ง •_•)ง 积跬步以致千里,积小流以成江海…… 注释&#xff1a;如果您觉得有所帮助&#xff0c;帮忙点个赞&#xff0c;也可以关注我&#xff0c;我们一起成长&#xff1b;如果有不对的地方&#xf…

神经科学原理精解【1】

文章目录 神经系统组成神经系统两类细胞脑组织基本结构参考资料 神经系统组成 神经系统由中驱神经系统和外围神经系统组成。中驱神经系统包括脑和脊髓。脑的主要功能是大脑、小脑和脑干。外围神经系统由位于脑和脊髓之外的神经和神经细胞组成。 神经系统两类细胞 神经元 感知…

基于深度学习的边缘检测

基于深度学习的边缘检测 边缘检测是计算机视觉中的一项基本任务&#xff0c;旨在识别图像中像素值变化显著的区域&#xff0c;即边缘。传统的边缘检测算法&#xff08;如Sobel、Canny等&#xff09;通过滤波器和梯度运算来检测边缘&#xff0c;而基于深度学习的方法则通过训练…

【SSM】医疗健康平台-管理端-检查组管理

技能目标 掌握新增检查组功能的实现 掌握查询检查组功能的实现 掌握编辑检查组功能的实现 掌握删除检查组功能的实现 体检的检查项种类繁多&#xff0c;为了方便管理和快速筛选出类别相同的检查项&#xff0c;医疗健康将类别相同的检查项放到同一个检查组中进行管理&#…

VMware ESXi 主机的健康检查常用命令

使用root登录esxi 主机&#xff0c;然后运行下面的一些命令&#xff0c;可以对ESXi的健康状态有个基本了解&#xff1a; 检查主机资源利用情况&#xff1a; esxtop: 实时查看主机资源使用情况&#xff0c;包括CPU、内存、磁盘和网络。esxcli vm process list: 列出当前在主机上…

【CV炼丹师勇闯力扣训练营 Day8】

CV炼丹师勇闯力扣训练营 代码随想录算法训练营第8天 ● 344.反转字符串 ● 541. 反转字符串II ● 卡码网&#xff1a;54.替换数字 一、344 反转字符串 编写一个函数&#xff0c;其作用是将输入的字符串反转过来。输入字符串以字符数组 s 的形式给出。 不要给另外的数组分配额…

# Kafka_深入探秘者(1):初识 kafka

Kafka_深入探秘者&#xff08;1&#xff09;&#xff1a;初识 kafka 一、kafka 特性 1、Kafka &#xff1a;最初是由 Linkedln 公司采用 Scala 语言开发的一个多分区、多副本并且基于 ZooKeeper 协调的分布式消息系统&#xff0c;现在已经捐献给了 Apache 基金会。目前 Kafka…

Python基础教程(三十一):pyecharts模块

💝💝💝首先,欢迎各位来到我的博客,很高兴能够在这里和您见面!希望您在这里不仅可以有所收获,同时也能感受到一份轻松欢乐的氛围,祝你生活愉快! 💝💝💝如有需要请大家订阅我的专栏【Python系列】哟!我会定期更新相关系列的文章 💝💝💝关注!关注!!请…

代码随想录leetcode200题之单调栈

目录 1 介绍2 训练3 参考 1 介绍 本博客用来记录代码随想录leetcode200题之单调栈相关题目。 2 训练 题目1&#xff1a;739. 每日温度 解题思路&#xff1a;单调栈模型–找到数组中下一个更大数。从右到左遍历&#xff0c;保留更大值&#xff0c;因此是一个单调递减的栈。 …

MyBatis-Plus入门教程(一)

MyBatis-Plus 是一个 MyBatis 的增强工具&#xff0c;在 MyBatis 的基础上为其提供了许多便捷功能&#xff0c;使开发者能够更快速、高效地进行数据库操作。 MyBatis-Plus 简介 1. 什么是 MyBatis-Plus&#xff1f; MyBatis-Plus&#xff08;简称 MP&#xff09;是一个 MyBa…

google-自我插件

1. Bitwarden 密码管理器 2. React Developer Tools 3. Vue.js devtools 4. YouTube™ 双字幕 5. 沉浸式翻译 - 网页翻译插件 6. FeHelper(前端助手) 7. IDM Integration Module 8. 待续…

如何使用kimi智能助手:您的智能生活小助手

Kimi智能助手是一款功能强大的AI工具&#xff0c;旨在帮助用户提高工作效率和生活品质。下面小编将详细介绍如何使用Kimi智能助手&#xff0c;涵盖其主要功能以及一些实用技巧。 一、Kimi智能助手的主要功能 多语言对话能力&#xff1a;Kimi擅长中文和英文的对话&#xff0c;可…

探索计算机视觉(人工智能重要分支)的发展与应用

引言 在当今快速发展的科技时代&#xff0c;计算机视觉作为人工智能领域的重要分支&#xff0c;正日益成为各行各业不可或缺的关键技术。从简单的图像处理到复杂的智能系统&#xff0c;计算机视觉的发展不仅改变了我们看待世界的方式&#xff0c;也深刻影响着工业、医疗、交通等…

Windows安装配置jdk和maven

他妈的远程连接不上公司电脑&#xff0c;只能在家重新配置一遍&#xff0c;在此记录一下后端环境全部配置 Windows安装配置JDK 1.8一、下载 JDK 1.8二、配置环境变量三、验证安装 Windows安装配置Maven 3.8.8一、下载安装 Maven并配置环境变量二、设置仓库镜像及本地仓库三、测…

2024最新版Python 3.12.4安装使用指南

2024最新版Python 3.12.4安装使用指南 2024最新版Python 3.12.4安装使用指南0. Python的受欢迎程度1. 安装最新版Python 3.12.42. 验证Python 3.12.4版本3. 验证Python功能4. 使用IDLE交互式开发模式5. 安装Python扩展库相关阅读&#xff1a; By Jackson 2024最新版Python 3.12…

11.1JavaEE——Spring MVC的核心类和注解(一)DispatcherServlet

一、DispatcherServlet作用 DispatcherServlet是Spring MVC的核心类&#xff0c;也是Spring MVC的流程控制中心&#xff0c;也称为Spring MVC的前端控制器&#xff0c;它可以拦截客户端的请求。拦截客户端请求之后&#xff0c;DispatcherServlet会根据具体规则将请求交给其他组…

java基于ssm+jsp 汽车在线销售系统

1 前台功能模块 网站首页 网页首页汽车在线销售系统模块如下&#xff1a;首页、汽车信息、新闻资讯、留言反馈、我的收藏管理等功能图1 图1网页首页 网页前台车辆信息效果图如图2所示 图2 车辆信息界面图 2 管理员功能模块 管理员输入个人的账号、密码登录系统&#xff0c…

压力测试

1.什么是压力测试 压力测试考察当前软硬件环境下系统所能承受的最大负荷并帮助找出系统瓶颈所在。压测都是为了系统在线上的处理能力和稳定性维持在一个标准范围内&#xff0c;做到心中有数 使用压力测试&#xff0c;我们有希望找到很多种用其他测试方法更难发现的错误&#…

基于matlab的K-means聚类图像分割

1 原理 K-means聚类算法在图像分割中的应用是基于一种无监督的学习方法&#xff0c;它将图像中的像素点或特征区域划分为K个不同的簇或类别。以下是K-means聚类算法用于图像分割的原理&#xff0c;包括步骤和公式&#xff1a; 1.1 原理概述 选择簇的数量(K)&#xff1a; 首先…