结合seata和2PC,简单聊聊seata源码

当前代码分析基于seata1.6.1

整体描述

整体代码流程可以描述为

  1. TM开启全局事务,会调用TC来获取XID。
  2. TC在接收到通知后,会生成XID,然后会将当前全局事务保存到global_table表中,并且返回XID。
  3. 在获取到XID后,会执行业务逻辑。
  4. 执行业务逻辑的时候,如果发生了增删改,则会对增删改语句做增强。
  5. 获取前置镜像数据---执行sql,不提交事务--获取后置镜像---准备undoLog---作为RM向TC提交事务分支---生成undo_log日志---提交本地事务,注意,在这里,本地事务已经提交了。只是有undo_log可用于回滚。
  6. TC接收RM端提交的分支事务,存储到brand_table中。
  7. 当全局分支事务都执行完成,TM会向TC提起全局事务提交的请求。
  8. TC接收到请求后,删除全局事务和分支事务(global_table 和 brand_table)。
  9. TC 通知RM,删除 undo log 日志。

源码解析

系统启动初始化

主要完成两个事情:初始化 TM和RM客户端;创建方法拦截器

在客户端中,核心配置类是SeataAutoConfiguration,在这个类中初始化了一个核心的扫描器GlobalTransactionScanner。

GlobalTransactionScanner 全局事务扫描器,实现了InitializingBean接口,如果继承了该接口,spring会在完成DI之后,调用afterPropertiesSet方法,在该方法中完成了对TM客户端和RM客户端的创建,代码如下

@Override
public void afterPropertiesSet() {if (disableGlobalTransaction) {if (LOGGER.isInfoEnabled()) {LOGGER.info("Global transaction is disabled.");}ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,(ConfigurationChangeListener)this);return;}if (initialized.compareAndSet(false, true)) {//创建客户端的方法initClient();}
}private void initClient() {...其他校验//创建TM客户端并且初始化TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);...//创建RM客户端并且初始化RMClient.init(applicationId, txServiceGroup);...}

同时,GlobalTransactionScanner 继承了AbstractAutoProxyCreator 抽象类,在类完成初始化之后,会调用父类的 postProcessAfterInitialization方法,在父类的方法中,会调用该类重写的一个wrapIfNecessary方法。

wrapIfNecessary 方法会生成一个 GlobalTransactionalInterceptor 全局事务拦截器。

protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {// do checkersif (!doCheckers(bean, beanName)) {return bean;}try {synchronized (PROXYED_SET) {if (PROXYED_SET.contains(beanName)) {return bean;}interceptor = null;//check TCC proxyif (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {...} else {...//生成一个全局事务处理的拦截器,if (globalTransactionalInterceptor == null) {globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,(ConfigurationChangeListener)globalTransactionalInterceptor);}interceptor = globalTransactionalInterceptor;}...} catch (Exception exx) {throw new RuntimeException(exx);}}

TM端开启全局事务

开启全局事务

GlobalTransactionalInterceptor中的逻辑

忽略中间调用过程,最终会走到io.seata.tm.api.TransactionalTemplate#execute

在该类中,核心代码如下

public Object execute(TransactionalExecutor business) throws Throwable {...try {...try {//开启全局事务beginTransaction(txInfo, tx);Object rs;try {//进入业务代码,执行业务逻辑rs = business.execute();} catch (Throwable ex) {//当出现业务逻辑异常进行回滚completeTransactionAfterThrowing(txInfo, tx, ex);throw ex;}//所有分支事务无异常,提交全局事务commitTransaction(tx, txInfo);return rs;} finally {...}} finally {...}
}

开启全局事务的方法就在beginTransaction中,继续往下会去TC中获取一个XID,就是全局事务id

TC端接收全局事务请求后

记录全局事务

在server层的代码中,全局事务的入口方法为 io.seata.server.coordinator.DefaultCoordinator#doGlobalBegin

在该方法中会去调用core.begin方法,进入xid获取流程

@Override
protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)throws TransactionException {//开始获取xidresponse.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),request.getTransactionName(), request.getTimeout()));if (LOGGER.isInfoEnabled()) {LOGGER.info("Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{}",rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid());}
}

忽略其他流程,关注核心,其调用往下的链路为

core.begin--session.begin--lifecycleListener.onBegin--找到子类方法--this.addGlobalSession--找子类方法

在这里,可以找到三个实现类,分别是代表了数据库、文件和redis实现,文件是默认实现,其他几种都需要进行配置,我们针对数据库实现进行描述。

方法路径为:io.seata.server.storage.db.session.DataBaseSessionManager#addGlobalSession

然后调用方法transactionStoreManager.writeSession,transactionStoreManager是一个接口,同样有三种实现

在writeSession方法中,会插入全局事务表数据,代码如下

/*** 插入全局事务表* 表为:global_table* @param globalTransactionDO the global transaction do* @return*/
@Override
public boolean insertGlobalTransactionDO(GlobalTransactionDO globalTransactionDO) {String sql = LogStoreSqlsFactory.getLogStoreSqls(dbType).getInsertGlobalTransactionSQL(globalTable);Connection conn = null;PreparedStatement ps = null;try {int index = 1;conn = logStoreDataSource.getConnection();conn.setAutoCommit(true);ps = conn.prepareStatement(sql);//插入xidps.setString(index++, globalTransactionDO.getXid());//插入事务idps.setLong(index++, globalTransactionDO.getTransactionId());//插入事务状态,begin  = 1ps.setInt(index++, globalTransactionDO.getStatus());//插入应用id,一般是服务名ps.setString(index++, globalTransactionDO.getApplicationId());//插入事务组ps.setString(index++, globalTransactionDO.getTransactionServiceGroup());String transactionName = globalTransactionDO.getTransactionName();transactionName = transactionName.length() > transactionNameColumnSize ?transactionName.substring(0, transactionNameColumnSize) :transactionName;//插入事务名称ps.setString(index++, transactionName);//插入超时时间ps.setInt(index++, globalTransactionDO.getTimeout());//插入事务开始时间ps.setLong(index++, globalTransactionDO.getBeginTime());ps.setString(index++, globalTransactionDO.getApplicationData());return ps.executeUpdate() > 0;} catch (SQLException e) {throw new StoreException(e);} finally {IOUtil.close(ps, conn);}
}

RM执行业务代码,并且提交事务

代理数据源,生成undo log,并且通知TC

在seata中,需要配置数据源代理,这个代理会在执行增删改查的时候,对操作进行增强

这里核心需要关注的方法是io.seata.rm.datasource.exec.AbstractDMLBaseExecutor#doExecute

@Override
public T doExecute(Object... args) throws Throwable {AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();//一般在初始状态下,这个autoCommit是trueif (connectionProxy.getAutoCommit()) {return executeAutoCommitTrue(args);} else {return executeAutoCommitFalse(args);}
}

然后会调用 executeAutoCommitTrue 方法,该方法主要做了几个事情,分别是:获取前置镜像和后置镜像,并且制作undo_log;执行目标sql和插入undo_log;作为RM和TC进行交互,提交分支事务;以及提交事务。代码如下

executeAutoCommitTrue 方法代码

 protected T executeAutoCommitTrue(Object[] args) throws Throwable {ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();try {//设置提交方式为手动提交connectionProxy.changeAutoCommit();return new LockRetryPolicy(connectionProxy).execute(() -> {//执行sql,并且准备前置镜像和后置镜像T result = executeAutoCommitFalse(args);//提交本地事务(内部会和RM进行交互)connectionProxy.commit();return result;});} catch (Exception e) {...异常处理} finally {connectionProxy.getContext().reset();connectionProxy.setAutoCommit(true);}
}

executeAutoCommitFalse方法代码

protected T executeAutoCommitFalse(Object[] args) throws Exception {//获取前置镜像TableRecords beforeImage = beforeImage();//执行目标sql,注意,这边执行完后,事务是未提交的T result = statementCallback.execute(statementProxy.getTargetStatement(), args);//获取后置镜像TableRecords afterImage = afterImage(beforeImage);//准备undo_logprepareUndoLog(beforeImage, afterImage);return result;
}

processGlobalTransactionCommit 方法,该方法就是connectionProxy.commit()最终指向的方法

 private void processGlobalTransactionCommit() throws SQLException {try {//作为RM,向TC发起请求,注册分支事务,会插入数据到TC的mysql表中register();} catch (TransactionException e) {recognizeLockKeyConflictException(e, context.buildLockKeys());}try {//生成undo_log日志,用于事务回滚UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);//提交undo_log 回滚日志和本地事务,事务在这里已经提交了targetConnection.commit();} catch (Throwable ex) {...异常处理}...其他处理
}

TM提交全局事务

进行提交就是向TC发起请求,相关代码如下

@Override
public void commit() throws TransactionException {//判断当前角色,只有TM才能执行if (role == GlobalTransactionRole.Participant) {...return;}//XID不能为空assertXIDNotNull();int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT;try {//可重试的执行,最多可执行5次while (retry > 0) {try {retry--;//向tc发起调用status = transactionManager.commit(xid);break;} catch (Throwable ex) {...}}} finally {...}...
}

TC处理全局事务

TC在接收到提交请求后,会由方法 io.seata.server.coordinator.DefaultCoordinator#doGlobalCommit 进行处理。

    protected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext)throws TransactionException {MDC.put(RootContext.MDC_KEY_XID, request.getXid());//设置状态为异步提交状态response.setGlobalStatus(core.commit(request.getXid()));}

在AT模式下,事务的提交为异步的方式

public GlobalStatus commit(String xid) throws TransactionException {//获取全局session,不同模式获取方式不同,如果是db,则会从数据库获取GlobalSession globalSession = SessionHolder.findGlobalSession(xid);if (globalSession == null) {//如果获取不到session,返回已完成状态,一般在调用超时的时候会发生,这样也可以保证幂等return GlobalStatus.Finished;}...boolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {if (globalSession.getStatus() == GlobalStatus.Begin) {// Highlight: Firstly, close the session, then no more branch can be registered.globalSession.closeAndClean();//判断是否可以异步提交,AT模式下可以异步提交if (globalSession.canBeCommittedAsync()) {//AT模式下异步事务提交globalSession.asyncCommit();MetricsPublisher.postSessionDoneEvent(globalSession, GlobalStatus.Committed, false, false);return false;} else {...}}return false;});...
}

最终将事务状态设置为异步提交

    public void asyncCommit() throws TransactionException {this.addSessionLifecycleListener(SessionHolder.getAsyncCommittingSessionManager());//设置事务状态为异步提交,这里在设置为异步提交后就不管了this.setStatus(GlobalStatus.AsyncCommitting);SessionHolder.getAsyncCommittingSessionManager().addGlobalSession(this);}

TC异步执行全局事务commit

核心逻辑为 io.seata.server.coordinator.DefaultCoordinator#init

该方法会异步的去进行处理,每秒执行一次

    /*** Init.*/public void init() {...//异步处理的部分//会从global中,每次取100条进行处理,并且删除这100条数据,然后遍历brand_table,根据global_table取删除//操作完了之后,向RM进行通知,进行undo_log的删除asyncCommitting.scheduleAtFixedRate(() -> SessionHolder.distributedLockAndExecute(ASYNC_COMMITTING, this::handleAsyncCommitting), 0,ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);...}

未完待续...

以下是经过注释的源码地址:seata: Seata 是一款开源的分布式事务解决方案,提供高性能和简单易用的分布式事务服务 - Gitee.com

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/55195.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

selenium的IDE插件进行录制和回放并导出为python/java脚本(10)

Selenium IDE&#xff1a;Selenium Suite下的开源Web自动化测试工具&#xff0c;是Firefox或者chrome的一个插件&#xff0c;具有记录和回放功能&#xff0c;无需编程即可创建测试用例&#xff0c;并且可以将用例直接导出为可用的python/java等编程语言的脚本。 我们以chrome浏…

Vue3嵌套导航相对路径问题

有如下的页面设计&#xff0c;页面上方第一次导航&#xff0c;两个菜单&#xff0c;首页和新闻 点击新闻&#xff0c;内容里面嵌套一个左侧和右侧&#xff0c;左侧有4条新闻&#xff0c;点击某一条新闻&#xff0c;右侧显示详情 代码如下&#xff1a; ​ File Path: d:\hello\…

自感式压力传感器结构设计

自感式压力传感器的结构如图2-35 和图 2-36所示&#xff0c;分为变隙式、变面积式和螺管式三种&#xff0c;每种均由线网、铁心和衔铁三部分组成。 图2-35 自感式压力传感器的结构 1-线圈 2-铁心 3-衔铁 图2-36 螺管式 1-线图 2-铁心 3一衔铁 自感式压力传感器按磁路变化可…

QT的核心机制 对话框资源

案例 1、键盘按下w&#xff0c;s&#xff0c;a&#xff0c;d键分别为标签向上&#xff0c;下&#xff0c;左&#xff0c;右移动 鼠标按下获取本地坐标&#xff0c;全局坐标 鼠标双击获取本地坐标&#xff0c;全局坐标 鼠标移动获取本地坐标&#xff0c;全局坐标 让鼠标跟踪…

Midjourney零基础学习

Midjourney学习笔记TOP04 Midjourney的各种参数设置 Midjourney的用户操作界面没有醒目的工具栏、属性栏&#xff0c;所有的操作都是通过调用各种指令和参数进行的。 【MJ Version】 Midjourney在2023年3月份就已经更新到了V5版本&#xff0c;V5版本除了画质有所提升外&#…

interwirelessac9560感叹号,电脑无法连接wifi,无法搜索到wifi

interwirelessac9560感叹号 电脑无法连接wifi&#xff0c;无法搜索到wifi 原因 这可能是wifl模块出现了问题。 解决方案 1、winx 打开&#xff0c;选择【设备管理器】 2、选择网络适配器 右键打开wireless-AC&#xff0c;选择【卸载设备】。 3、关机2分钟后&#xff0c…

SpringBoot智慧外贸平台

专业团队&#xff0c;咨询就送开题报告&#xff0c;欢迎大家私信留言&#xff0c;联系方式在文章底部 摘 要 网络的广泛应用给生活带来了十分的便利。所以把智慧外贸管理与现在网络相结合&#xff0c;利用java技术建设智慧外贸平台&#xff0c;实现智慧外贸的信息化。则对于进…

数据结构-5.9.树的存储结构

一.树的逻辑结构&#xff1a; 二.双亲表示法(顺序存储)&#xff1a; 1.树中除了根结点外每一颗树中的任意一个结点都只有一个父结点(双亲结点)&#xff1b; 2.结点包括结点数据和指针&#xff1b; 3.上述图片中右边的顺序存储解析&#xff1a;比如A结点左边的0&#xff0c;就…

ASML业绩暴雷,股价一度跌超16%

KlipC报道&#xff1a;当地时间10月15日&#xff0c;阿斯麦&#xff08;ASML&#xff09;原定于周三公布的三季度业绩报告由于技术原因被短暂地提前公布&#xff0c;业绩报告显示&#xff0c;阿斯麦第三季度总净销售额75亿欧元&#xff0c;毛利率50.8%&#xff0c;净利润21亿欧…

社招高频面试题

1.单例模式 面试突击50&#xff1a;单例模式有几种写法&#xff1f; 2.Mybatis缓存机制 MyBatis的一、二级缓存查询关系 一级缓存是SqlSession级别&#xff0c;不能跨SqlSession共享&#xff0c;默认开启。 二级缓存是基于mapper namespace级别的&#xff0c;可以跨SqlSessi…

Scala入门基础(10)高级函数

一.什么是高阶函数 二.map函数 三.foreach函数 四.filter函数 五.flatten函数 正文&#xff1a; 一.什么是高阶函数 高阶函数&#xff1a;是一个特殊的函数&#xff0c;特殊之处在于&#xff1a;它指使用其他函数作为参数或返回值 &#xff08;演示&#xff09; 二.map函…

SpringSecurity(一)——认证实现

一、初步理解 SpringSecurity的原理其实就是一个过滤器链&#xff0c;内部包含了提供各种功能的过滤器。 当前系统中SpringSecurity过滤器链中有哪些过滤器及它们的顺序。 核心过滤器&#xff1a; &#xff08;认证&#xff09;UsernamePasswordAuthenticationFilter:负责处理…

python yolov8半自动标注

首先标注一部分图片&#xff0c;进行训练&#xff0c;生成模型&#xff0c;标注文件为xml方便后面统一做处理。 1、标注数据&#xff08;文件为xml, 转为txt用于训练&#xff0c;保留xml标签文件&#xff09; 2、模型训练&#xff08;训练配置、训练代码、&#xff09; 3、使用…

极狐GitLab 发布安全补丁版本 17.4.1、17.3.4、17.2.8

GitLab 是一个全球知名的一体化 DevOps 平台&#xff0c;很多人都通过私有化部署 GitLab 来进行源代码托管。极狐GitLab 是 GitLab 在中国的发行版&#xff0c;专门为中国程序员服务。可以一键式部署极狐GitLab。 学习极狐GitLab 的相关资料&#xff1a; 极狐GitLab 官网极狐…

[已解决]DockerTarBuilder永久解决镜像docker拉取异常问题

前阵子发现阿里云的docker加速镜像失效了&#xff08;甚至连nginx都拉取不了&#xff09;&#xff0c;重新换了并且加多了网络上比较常用的dokcer加速源&#xff0c;可以解决一部分问题&#xff0c;但仍然有一些镜像的某个版本或一些比较冷的镜像就是拉取不了&#xff0c;原因未…

『网络游戏』数据库表格转储【25】

避免勿删数据库表格&#xff0c;可以将表格存储 放到桌面即可 现在将表格删除后点击 浏览桌面表格保存即可 修改客户端脚本&#xff1a;NetSvc.cs 目的是在数据库更新异常时弹出提示以便修改 本章结束

进程间通信、无名管道、有名管道

一、进程 1.1 进程间通信的概念 线程通信通过全局变量即可。 进程间通信是相互独立的&#xff0c;但是所有进程都共用一份内核空间&#xff0c;所以进程和进程之间的通信可以通过内核去进行。 1.2 进程间通信方式 共7种: 传统的进程间通信方式&#xff1a; 无名管道有名管道…

VSCode 查看 Git 的历史记录的三种技巧

前言 在我们日常开发工作过程中&#xff0c;可能经常会看到一些离谱的历史代码&#xff0c;或者当项目发生线上事故时&#xff0c;如何快速定位是谁提交的代码导致的&#xff1f; 作为前端开发者&#xff0c;VSCode 是目前最为流行的代码编辑工具&#xff0c;也是日常最常打开…

OPC UA与PostgreSQL如何实现无缝连接?

随着工业4.0的推进&#xff0c;数据交换和集成在智能制造中扮演着越来越重要的角色。OPC UA能够实现设备与设备、设备与系统之间的高效数据交换。而PostgreSQL则是一种强大的开源关系型数据库管理系统&#xff0c;广泛应用于数据存储和管理。如何将OPC UA与PostgreSQL结合起来&…

python pip安装requirements.txt依赖与国内镜像

python pip安装requirements.txt依赖与国内镜像 如果网络通畅&#xff0c;直接pip安装依赖&#xff1a; pip install -r requirements.txt 如果需要国内的镜像&#xff0c;可以考虑使用阿里的&#xff0c;在后面加上&#xff1a; -i http://mirrors.aliyun.com/pypi/simple --…