结合seata和2PC,简单聊聊seata源码

当前代码分析基于seata1.6.1

整体描述

整体代码流程可以描述为

  1. TM开启全局事务,会调用TC来获取XID。
  2. TC在接收到通知后,会生成XID,然后会将当前全局事务保存到global_table表中,并且返回XID。
  3. 在获取到XID后,会执行业务逻辑。
  4. 执行业务逻辑的时候,如果发生了增删改,则会对增删改语句做增强。
  5. 获取前置镜像数据---执行sql,不提交事务--获取后置镜像---准备undoLog---作为RM向TC提交事务分支---生成undo_log日志---提交本地事务,注意,在这里,本地事务已经提交了。只是有undo_log可用于回滚。
  6. TC接收RM端提交的分支事务,存储到brand_table中。
  7. 当全局分支事务都执行完成,TM会向TC提起全局事务提交的请求。
  8. TC接收到请求后,删除全局事务和分支事务(global_table 和 brand_table)。
  9. TC 通知RM,删除 undo log 日志。

源码解析

系统启动初始化

主要完成两个事情:初始化 TM和RM客户端;创建方法拦截器

在客户端中,核心配置类是SeataAutoConfiguration,在这个类中初始化了一个核心的扫描器GlobalTransactionScanner。

GlobalTransactionScanner 全局事务扫描器,实现了InitializingBean接口,如果继承了该接口,spring会在完成DI之后,调用afterPropertiesSet方法,在该方法中完成了对TM客户端和RM客户端的创建,代码如下

@Override
public void afterPropertiesSet() {if (disableGlobalTransaction) {if (LOGGER.isInfoEnabled()) {LOGGER.info("Global transaction is disabled.");}ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,(ConfigurationChangeListener)this);return;}if (initialized.compareAndSet(false, true)) {//创建客户端的方法initClient();}
}private void initClient() {...其他校验//创建TM客户端并且初始化TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);...//创建RM客户端并且初始化RMClient.init(applicationId, txServiceGroup);...}

同时,GlobalTransactionScanner 继承了AbstractAutoProxyCreator 抽象类,在类完成初始化之后,会调用父类的 postProcessAfterInitialization方法,在父类的方法中,会调用该类重写的一个wrapIfNecessary方法。

wrapIfNecessary 方法会生成一个 GlobalTransactionalInterceptor 全局事务拦截器。

protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {// do checkersif (!doCheckers(bean, beanName)) {return bean;}try {synchronized (PROXYED_SET) {if (PROXYED_SET.contains(beanName)) {return bean;}interceptor = null;//check TCC proxyif (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {...} else {...//生成一个全局事务处理的拦截器,if (globalTransactionalInterceptor == null) {globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,(ConfigurationChangeListener)globalTransactionalInterceptor);}interceptor = globalTransactionalInterceptor;}...} catch (Exception exx) {throw new RuntimeException(exx);}}

TM端开启全局事务

开启全局事务

GlobalTransactionalInterceptor中的逻辑

忽略中间调用过程,最终会走到io.seata.tm.api.TransactionalTemplate#execute

在该类中,核心代码如下

public Object execute(TransactionalExecutor business) throws Throwable {...try {...try {//开启全局事务beginTransaction(txInfo, tx);Object rs;try {//进入业务代码,执行业务逻辑rs = business.execute();} catch (Throwable ex) {//当出现业务逻辑异常进行回滚completeTransactionAfterThrowing(txInfo, tx, ex);throw ex;}//所有分支事务无异常,提交全局事务commitTransaction(tx, txInfo);return rs;} finally {...}} finally {...}
}

开启全局事务的方法就在beginTransaction中,继续往下会去TC中获取一个XID,就是全局事务id

TC端接收全局事务请求后

记录全局事务

在server层的代码中,全局事务的入口方法为 io.seata.server.coordinator.DefaultCoordinator#doGlobalBegin

在该方法中会去调用core.begin方法,进入xid获取流程

@Override
protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)throws TransactionException {//开始获取xidresponse.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),request.getTransactionName(), request.getTimeout()));if (LOGGER.isInfoEnabled()) {LOGGER.info("Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{}",rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid());}
}

忽略其他流程,关注核心,其调用往下的链路为

core.begin--session.begin--lifecycleListener.onBegin--找到子类方法--this.addGlobalSession--找子类方法

在这里,可以找到三个实现类,分别是代表了数据库、文件和redis实现,文件是默认实现,其他几种都需要进行配置,我们针对数据库实现进行描述。

方法路径为:io.seata.server.storage.db.session.DataBaseSessionManager#addGlobalSession

然后调用方法transactionStoreManager.writeSession,transactionStoreManager是一个接口,同样有三种实现

在writeSession方法中,会插入全局事务表数据,代码如下

/*** 插入全局事务表* 表为:global_table* @param globalTransactionDO the global transaction do* @return*/
@Override
public boolean insertGlobalTransactionDO(GlobalTransactionDO globalTransactionDO) {String sql = LogStoreSqlsFactory.getLogStoreSqls(dbType).getInsertGlobalTransactionSQL(globalTable);Connection conn = null;PreparedStatement ps = null;try {int index = 1;conn = logStoreDataSource.getConnection();conn.setAutoCommit(true);ps = conn.prepareStatement(sql);//插入xidps.setString(index++, globalTransactionDO.getXid());//插入事务idps.setLong(index++, globalTransactionDO.getTransactionId());//插入事务状态,begin  = 1ps.setInt(index++, globalTransactionDO.getStatus());//插入应用id,一般是服务名ps.setString(index++, globalTransactionDO.getApplicationId());//插入事务组ps.setString(index++, globalTransactionDO.getTransactionServiceGroup());String transactionName = globalTransactionDO.getTransactionName();transactionName = transactionName.length() > transactionNameColumnSize ?transactionName.substring(0, transactionNameColumnSize) :transactionName;//插入事务名称ps.setString(index++, transactionName);//插入超时时间ps.setInt(index++, globalTransactionDO.getTimeout());//插入事务开始时间ps.setLong(index++, globalTransactionDO.getBeginTime());ps.setString(index++, globalTransactionDO.getApplicationData());return ps.executeUpdate() > 0;} catch (SQLException e) {throw new StoreException(e);} finally {IOUtil.close(ps, conn);}
}

RM执行业务代码,并且提交事务

代理数据源,生成undo log,并且通知TC

在seata中,需要配置数据源代理,这个代理会在执行增删改查的时候,对操作进行增强

这里核心需要关注的方法是io.seata.rm.datasource.exec.AbstractDMLBaseExecutor#doExecute

@Override
public T doExecute(Object... args) throws Throwable {AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();//一般在初始状态下,这个autoCommit是trueif (connectionProxy.getAutoCommit()) {return executeAutoCommitTrue(args);} else {return executeAutoCommitFalse(args);}
}

然后会调用 executeAutoCommitTrue 方法,该方法主要做了几个事情,分别是:获取前置镜像和后置镜像,并且制作undo_log;执行目标sql和插入undo_log;作为RM和TC进行交互,提交分支事务;以及提交事务。代码如下

executeAutoCommitTrue 方法代码

 protected T executeAutoCommitTrue(Object[] args) throws Throwable {ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();try {//设置提交方式为手动提交connectionProxy.changeAutoCommit();return new LockRetryPolicy(connectionProxy).execute(() -> {//执行sql,并且准备前置镜像和后置镜像T result = executeAutoCommitFalse(args);//提交本地事务(内部会和RM进行交互)connectionProxy.commit();return result;});} catch (Exception e) {...异常处理} finally {connectionProxy.getContext().reset();connectionProxy.setAutoCommit(true);}
}

executeAutoCommitFalse方法代码

protected T executeAutoCommitFalse(Object[] args) throws Exception {//获取前置镜像TableRecords beforeImage = beforeImage();//执行目标sql,注意,这边执行完后,事务是未提交的T result = statementCallback.execute(statementProxy.getTargetStatement(), args);//获取后置镜像TableRecords afterImage = afterImage(beforeImage);//准备undo_logprepareUndoLog(beforeImage, afterImage);return result;
}

processGlobalTransactionCommit 方法,该方法就是connectionProxy.commit()最终指向的方法

 private void processGlobalTransactionCommit() throws SQLException {try {//作为RM,向TC发起请求,注册分支事务,会插入数据到TC的mysql表中register();} catch (TransactionException e) {recognizeLockKeyConflictException(e, context.buildLockKeys());}try {//生成undo_log日志,用于事务回滚UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);//提交undo_log 回滚日志和本地事务,事务在这里已经提交了targetConnection.commit();} catch (Throwable ex) {...异常处理}...其他处理
}

TM提交全局事务

进行提交就是向TC发起请求,相关代码如下

@Override
public void commit() throws TransactionException {//判断当前角色,只有TM才能执行if (role == GlobalTransactionRole.Participant) {...return;}//XID不能为空assertXIDNotNull();int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT;try {//可重试的执行,最多可执行5次while (retry > 0) {try {retry--;//向tc发起调用status = transactionManager.commit(xid);break;} catch (Throwable ex) {...}}} finally {...}...
}

TC处理全局事务

TC在接收到提交请求后,会由方法 io.seata.server.coordinator.DefaultCoordinator#doGlobalCommit 进行处理。

    protected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext)throws TransactionException {MDC.put(RootContext.MDC_KEY_XID, request.getXid());//设置状态为异步提交状态response.setGlobalStatus(core.commit(request.getXid()));}

在AT模式下,事务的提交为异步的方式

public GlobalStatus commit(String xid) throws TransactionException {//获取全局session,不同模式获取方式不同,如果是db,则会从数据库获取GlobalSession globalSession = SessionHolder.findGlobalSession(xid);if (globalSession == null) {//如果获取不到session,返回已完成状态,一般在调用超时的时候会发生,这样也可以保证幂等return GlobalStatus.Finished;}...boolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {if (globalSession.getStatus() == GlobalStatus.Begin) {// Highlight: Firstly, close the session, then no more branch can be registered.globalSession.closeAndClean();//判断是否可以异步提交,AT模式下可以异步提交if (globalSession.canBeCommittedAsync()) {//AT模式下异步事务提交globalSession.asyncCommit();MetricsPublisher.postSessionDoneEvent(globalSession, GlobalStatus.Committed, false, false);return false;} else {...}}return false;});...
}

最终将事务状态设置为异步提交

    public void asyncCommit() throws TransactionException {this.addSessionLifecycleListener(SessionHolder.getAsyncCommittingSessionManager());//设置事务状态为异步提交,这里在设置为异步提交后就不管了this.setStatus(GlobalStatus.AsyncCommitting);SessionHolder.getAsyncCommittingSessionManager().addGlobalSession(this);}

TC异步执行全局事务commit

核心逻辑为 io.seata.server.coordinator.DefaultCoordinator#init

该方法会异步的去进行处理,每秒执行一次

    /*** Init.*/public void init() {...//异步处理的部分//会从global中,每次取100条进行处理,并且删除这100条数据,然后遍历brand_table,根据global_table取删除//操作完了之后,向RM进行通知,进行undo_log的删除asyncCommitting.scheduleAtFixedRate(() -> SessionHolder.distributedLockAndExecute(ASYNC_COMMITTING, this::handleAsyncCommitting), 0,ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);...}

未完待续...

以下是经过注释的源码地址:seata: Seata 是一款开源的分布式事务解决方案,提供高性能和简单易用的分布式事务服务 - Gitee.com

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/55195.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数据结构与算法JavaScript描述练习------第11章图和图算法

1. 编写一个程序&#xff0c;测试广度优先和深度优先这两种图搜索算法哪一种速度更快。请使用不 同大小的图来测试你的程序。 function Graph(v) {this.vertices v;this.edges 0;this.adj [];this.marked [];this.edgeTo [];for (var i 0; i < this.vertices; i) {thi…

MATLAB中sscanf函数用法

目录 语法 说明 示例 将字符向量转换为数值 转换文本和调整输出数组大小 统计在文本中找到的元素数目 显示错误消息 返回最后一个扫描位置 匹配指定的字符 sscanf函数的功能是从字符串读取格式化数据。 语法 A sscanf(str,formatSpec) A sscanf(str,formatSpec,si…

selenium的IDE插件进行录制和回放并导出为python/java脚本(10)

Selenium IDE&#xff1a;Selenium Suite下的开源Web自动化测试工具&#xff0c;是Firefox或者chrome的一个插件&#xff0c;具有记录和回放功能&#xff0c;无需编程即可创建测试用例&#xff0c;并且可以将用例直接导出为可用的python/java等编程语言的脚本。 我们以chrome浏…

Android 自适应

一开始项目使用的是第三方框架 GitHub - JessYanCoding/AndroidAutoSize: &#x1f525; A low-cost Android screen adaptation solution (今日头条屏幕适配方案终极版&#xff0c;一个极低成本的 Android 屏幕适配方案). 但是会偶现&#xff0c;断电重启第一次&#xff0c;…

Flutter-发现局域网中的设备

前言 现在有一个需求&#xff1a;要能够获取到局域网中的遮阳帘设备。通过搜索发现flutter_mdns_plugin可以满足这个需求 Pub&#xff1a;flutter_mdns_plugin | Flutter package GitHub&#xff1a;https://github.com/terrabythia/flutter_mdns_plugin MDNS服务类型 要根据…

Vue3嵌套导航相对路径问题

有如下的页面设计&#xff0c;页面上方第一次导航&#xff0c;两个菜单&#xff0c;首页和新闻 点击新闻&#xff0c;内容里面嵌套一个左侧和右侧&#xff0c;左侧有4条新闻&#xff0c;点击某一条新闻&#xff0c;右侧显示详情 代码如下&#xff1a; ​ File Path: d:\hello\…

自感式压力传感器结构设计

自感式压力传感器的结构如图2-35 和图 2-36所示&#xff0c;分为变隙式、变面积式和螺管式三种&#xff0c;每种均由线网、铁心和衔铁三部分组成。 图2-35 自感式压力传感器的结构 1-线圈 2-铁心 3-衔铁 图2-36 螺管式 1-线图 2-铁心 3一衔铁 自感式压力传感器按磁路变化可…

QT的核心机制 对话框资源

案例 1、键盘按下w&#xff0c;s&#xff0c;a&#xff0c;d键分别为标签向上&#xff0c;下&#xff0c;左&#xff0c;右移动 鼠标按下获取本地坐标&#xff0c;全局坐标 鼠标双击获取本地坐标&#xff0c;全局坐标 鼠标移动获取本地坐标&#xff0c;全局坐标 让鼠标跟踪…

Midjourney零基础学习

Midjourney学习笔记TOP04 Midjourney的各种参数设置 Midjourney的用户操作界面没有醒目的工具栏、属性栏&#xff0c;所有的操作都是通过调用各种指令和参数进行的。 【MJ Version】 Midjourney在2023年3月份就已经更新到了V5版本&#xff0c;V5版本除了画质有所提升外&#…

interwirelessac9560感叹号,电脑无法连接wifi,无法搜索到wifi

interwirelessac9560感叹号 电脑无法连接wifi&#xff0c;无法搜索到wifi 原因 这可能是wifl模块出现了问题。 解决方案 1、winx 打开&#xff0c;选择【设备管理器】 2、选择网络适配器 右键打开wireless-AC&#xff0c;选择【卸载设备】。 3、关机2分钟后&#xff0c…

SpringBoot智慧外贸平台

专业团队&#xff0c;咨询就送开题报告&#xff0c;欢迎大家私信留言&#xff0c;联系方式在文章底部 摘 要 网络的广泛应用给生活带来了十分的便利。所以把智慧外贸管理与现在网络相结合&#xff0c;利用java技术建设智慧外贸平台&#xff0c;实现智慧外贸的信息化。则对于进…

数据结构-5.9.树的存储结构

一.树的逻辑结构&#xff1a; 二.双亲表示法(顺序存储)&#xff1a; 1.树中除了根结点外每一颗树中的任意一个结点都只有一个父结点(双亲结点)&#xff1b; 2.结点包括结点数据和指针&#xff1b; 3.上述图片中右边的顺序存储解析&#xff1a;比如A结点左边的0&#xff0c;就…

ASML业绩暴雷,股价一度跌超16%

KlipC报道&#xff1a;当地时间10月15日&#xff0c;阿斯麦&#xff08;ASML&#xff09;原定于周三公布的三季度业绩报告由于技术原因被短暂地提前公布&#xff0c;业绩报告显示&#xff0c;阿斯麦第三季度总净销售额75亿欧元&#xff0c;毛利率50.8%&#xff0c;净利润21亿欧…

社招高频面试题

1.单例模式 面试突击50&#xff1a;单例模式有几种写法&#xff1f; 2.Mybatis缓存机制 MyBatis的一、二级缓存查询关系 一级缓存是SqlSession级别&#xff0c;不能跨SqlSession共享&#xff0c;默认开启。 二级缓存是基于mapper namespace级别的&#xff0c;可以跨SqlSessi…

PetaLinux工程的常用命令——petalinux-create

petalinux-create&#xff1a;此命令创建新的PetaLinux项目或组件。 注&#xff1a;有些命令我没用过&#xff0c;瞎翻译有可能会翻译错了&#xff0c;像是和fpgamanager相关的部分。 用法: petalinux-create [options] <-t|--type <TYPE> <-n|--name <COMPONEN…

LeetCode:3039.进行操作使字符串为空(模拟 Java)

目录 3039.进行操作使字符串为空 题目描述&#xff1a; 实现代码与解析&#xff1a; 模拟 原理思路&#xff1a; 3039.进行操作使字符串为空 题目描述&#xff1a; 给你一个字符串 s 。 请你进行以下操作直到 s 为 空 &#xff1a; 每次操作 依次 遍历 a 到 z&#xff…

Mongodb 获取集合(collection)的统计信息

在MongoDB中&#xff0c;获取指定集合&#xff08;collection&#xff09;的统计信息可以通过执行collStats命令来实现。这个命令提供了关于集合的详细信息&#xff0c;包括&#xff1a; 集合的大小索引的大小和数量文档的数量存储空间的使用情况各种统计数据&#xff0c;如平…

[linux 驱动]gpio子系统详解与实战

目录 1 描述 1.1 文件节点操作 gpio 引脚 1.2 gpio 引脚计算 2 结构体 2.1 gpio_desc 2.2 gpio_device 2.3 gpio_chip 3 相关函数 3.1 goio 申请释放 3.1.1 gpio_request 3.1.2 gpio_free 3.2 gpio 输入输出设置 3.2.1 gpio_direction_input 3.2.2 gpio_direction…

filecoin filspark 检索

安装 boost 1、安装 YugabyteDB2、boostd-data 运行3、初始化 boostd4、运行 boostd5、运行 booster-http6、需要公网映射端口6.1 Libp2p 公网映射本地端口24001 发布矿工6.2 Graphql 公网映射本地端口8080 web界面6.3 IndexProvider.HttpPublisher 公网映射本地端口6700 http发…

微信小程序路由跳转的区别及其常见的使用场景

在微信小程序中&#xff0c;页面路由跳转的实现有几种常用方式&#xff0c;不同的跳转方式适用于不同的使用场景。下面是几种跳转方法的区别及其在实际项目中的应用场景。 1. wx.navigateTo 简介&#xff1a;保留当前页面并跳转到指定页面&#xff0c;最多保留10个页面的历史记…