Seata流程源码梳理下篇-TC

我们上篇简单梳理了下TM、RM的一些流程(离现在过得挺久的了,这篇我们这篇来梳理下TC的内容。

TC (Transaction Coordinator) - 事务协调者

维护全局和分支事务的状态,驱动全局事务提交或回滚。

TM (Transaction Manager) - 事务管理器

定义全局事务的范围:开始全局事务、提交或回滚全局事务。

RM (Resource Manager) - 资源管理器

管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。

一、TC的定义

TC (Transaction Coordinator) - 事务协调者:维护全局和分支事务的状态,驱动全局事务提交或回滚

​ 这个是官网给出的定义,就是说其实全局事务的协调。具体来说例如:当某个TM(事务管理器)需要开启事务的时候,就需要一个协调者来驱动全局事务或提交,或回滚,这个概念我们开始的时候很容易与TM混淆。我们看到这两个一个是开始,一个是驱动,具体通过demo来说。

1、一些基本逻辑信息

用户购买商品的业务逻辑。整个业务逻辑由3个微服务提供支持:

  • 仓储服务(Storage):对给定的商品扣除仓储数量。
  • 订单服务(Order):根据采购需求创建订单。
  • 帐户服务(Account):从用户帐户中扣除余额。
  • 业务服务(Business):发起整个业务逻辑。

在这里插入图片描述
在这里插入图片描述

这个官网给出的例子,就是Business服务发起本次业务逻辑,也就是通过@GlobalTransactional来开始全局事务,然后后面的也就是TM的一些逻辑处理,这个我们上篇有说过,就是去调TC表示自己要开始全局事务,然后TC就会生成全局事务的XID,之后其调其他的分支事务Stock、Order的话就带上这个全局事务XID。然后这里TC就是一个协调者的角色,其他的TM、RM都想TC注册。例如如果后面如果@GlobalTransactional方法正常执行的话,就事务提交,其就向TC发送消息,然后TC去协调分支事务的提交。

​ 下面我们就来具体看下在TC中这些注册、提交、回滚等的操作。

二、TC的一些逻辑处理

1、前置内容

​ 首先在TC这边,关于对应请求的处理,入口都是RemotingProcessor接口的具体实现

在这里插入图片描述

​ 其的子类:

在这里插入图片描述

2、RM通道注册

​ RM(资源管理器)注册。我们上面有介绍过,Seata会代理服务关于数据库的操作。然后其数据源初始化的时候,就会向TC注册。

在这里插入图片描述

但我们需要注意,这个目前还只是channel的通道连接,不是具体的RM资源注册,具体的RM事务资源注册是在全局事务开始然后操作数据库的时候。

​ 对应到TC这边就是RegRmProcessor在处理,这个主要是保存Channel通道,与Netty相关的内容,目前还没具体研究,但影响不大。先不深入这个。

在这里插入图片描述

3、TM通道注册

在这里插入图片描述

​ 这个与前面的RM是类似的。

4、ServerOnRequestProcessor

​ 然后这个是一个核心的处理类,其用来处理各种类型的信息。

在这里插入图片描述

​ 我们可以看到,像分支注册、全局事务的开始、回滚都是这个Process处理的。

public class ServerOnRequestProcessor implements RemotingProcessor {private static final Logger LOGGER = LoggerFactory.getLogger(ServerOnRequestProcessor.class);private RemotingServer remotingServer;private TransactionMessageHandler transactionMessageHandler;public ServerOnRequestProcessor(RemotingServer remotingServer, TransactionMessageHandler transactionMessageHandler) {this.remotingServer = remotingServer;this.transactionMessageHandler = transactionMessageHandler;}@Overridepublic void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {if (ChannelManager.isRegistered(ctx.channel())) {onRequestMessage(ctx, rpcMessage);} else {........if (LOGGER.isInfoEnabled()) {LOGGER.info(String.format("close a unhandled connection! [%s]", ctx.channel().toString()));}}}private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {Object message = rpcMessage.getBody();RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());...........if (message instanceof MergedWarpMessage) {AbstractResultMessage[] results = new AbstractResultMessage[((MergedWarpMessage) message).msgs.size()];for (int i = 0; i < results.length; i++) {final AbstractMessage subMessage = ((MergedWarpMessage) message).msgs.get(i);results[i] = transactionMessageHandler.onRequest(subMessage, rpcContext);}MergeResultMessage resultMessage = new MergeResultMessage();resultMessage.setMsgs(results);remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), resultMessage);} else {// the single send request messagefinal AbstractMessage msg = (AbstractMessage) message;AbstractResultMessage result = transactionMessageHandler.onRequest(msg, rpcContext);remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), result);}}}

​ 然后处理的核心就是DefaultCoordinator,上面的AbstractRMHandler是RM的处理

在这里插入图片描述

5、DefaultCoordinator

​ 这里的主要用来处理TC的逻辑就是TCInboundHandler接口:

在这里插入图片描述

​ 通过这些Request我们也能找到其主要是用来处理全局事务的开启、分支事务的注册、全局事务的提交、回滚、状态这些。下面我们就来主要看下看下GlobalBeginRequestBranchRegisterRequestGlobalCommitRequest的处理。

1、GlobalBeginRequest的处理

在这里插入图片描述

在这里插入图片描述

​ 通过这里我们可以找到,在GlobalBegin开启全局事务的主要处理就是生成全局事务Xid返回。

在这里插入图片描述

public static GlobalSession createGlobalSession(String applicationId, String txServiceGroup, String txName,int timeout) {GlobalSession session = new GlobalSession(applicationId, txServiceGroup, txName, timeout);return session;
}
    public GlobalSession(String applicationId, String transactionServiceGroup, String transactionName, int timeout) {this.transactionId = UUIDGenerator.generateUUID();this.status = GlobalStatus.Begin;this.applicationId = applicationId;this.transactionServiceGroup = transactionServiceGroup;this.transactionName = transactionName;this.timeout = timeout;this.xid = XID.generateXID(transactionId);}

这里是全局事务的session,然后再通过session.begin来进行对应处理,例如将全局事务session保存到数据库:

在这里插入图片描述

在这里插入图片描述

​ 我们可以看到这里有多种处理方式、例如保存到文件、redis这些。目前我们是保存到数据库中,处理的就是DataBaseSessionManager

public class DataBaseSessionManager extends AbstractSessionManagerimplements Initialize {............@Overridepublic void addGlobalSession(GlobalSession session) throws TransactionException {if (StringUtils.isBlank(taskName)) {boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_ADD, session);if (!ret) {throw new StoreException("addGlobalSession failed.");}} else {boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_UPDATE, session);if (!ret) {throw new StoreException("addGlobalSession failed.");}}}

​ 这里之后就是DataBaseTransactionStoreManager处理入库逻辑

在这里插入图片描述

在这里插入图片描述

2、BranchRegisterRequest分支事务注册的处理

​ 再之后我们来看下分支事务的注册处理逻辑。分支事务的注册就是每个RM对应开始操作DB的时候(这个RM相关的内容上篇有提到过),向TC发起分支事务注册,代码逻辑就是放在seata代理的jdbc的ConnectionProxy

在这里插入图片描述

在这里插入图片描述

然后TC这边就会处理这个请求,可以看到这里就是生成并返回分支事务ID

在这里插入图片描述

在这里插入图片描述

同时会将当前分支事务保存到数据库中

在这里插入图片描述

3、GlobalCommitRequest全局事务提交的处理

​ 我们上篇有提到,再TM发起全局事务后,如果各个分支都执行成功后,其会向TC发起全局事务的提交。

对于我们官方给到的demo。我们看下关于事务相关的表的数据。

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述
然后各个分支都执行完成后,由TM来提交全局事务

在这里插入图片描述

在这里插入图片描述

​ 在TM提交后,我们来看下TC的处理:

在这里插入图片描述

![外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传](https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=F%3A%5CMarkDown-File%5C%E7%9F%A5%E8%AF%86%E7%82%B9%E8%B5%84%E6%96%99%5CSeata%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90-TC.assets%5C在这里插入图片描述
&pos_id=img-hmn2nU2b-1695536657221)

​ 这里我们主要需要注意SessionHelper.forEach(globalSession.getSortedBranches(), branchSession对于每个branchSession的循环调用,也就是每个分支事务的session,在这里就会循环,然后调用每个分支进行分支事务的本地提交,也就是RM本地提交的处理。

在这里插入图片描述

然后RM的处理主要就是RMHandlerAT这个类在处理,因为我们当前是AT模式

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

然后ResourceManager就是DataSourceManager

在这里插入图片描述

​ 这里主要是添加到异步队列中。

在这里插入图片描述

然后主要处理就是删除数据库的undo记录,就我们全局事务成功了,不需要使用undo来回滚本地事务了。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/86424.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

将本地项目上传至Github详解

目录 1 前言2 本地代码上传2.1 命令行方法2.2 图形界面法2.3 结果 1 前言 GitHub是一个面向开源及私有软件项目的托管平台&#xff0c;因为只支持Git作为唯一的版本库格式进行托管&#xff0c;故名GitHub 。开发者常常将github作为代码管理平台&#xff0c;方便代码存储、版本…

基于SpringBoot的的师生健康信息管理系统

目录 前言 一、技术栈 二、系统功能介绍 管理员功能模块 学生功能模块 教师功能模块 三、核心代码 1、登录模块 2、文件上传模块 3、代码封装 前言 随着移动应用技术的发展&#xff0c;越来越多的用户借助于移动手机、电脑完成生活中的事务&#xff0c;许多的传统行业也…

超级详细 SQL 优化大全

1、MySQL的基本架构 1&#xff09;MySQL的基础架构图 左边的client可以看成是客户端&#xff0c;客户端有很多&#xff0c;像我们经常你使用的CMD黑窗口&#xff0c;像我们经常用于学习的WorkBench&#xff0c;像企业经常使用的Navicat工具&#xff0c;它们都是一个客户端。右…

北工大汇编题——分支程序设计

题目要求 信息检素程序设计&#xff1a;在数据区&#xff0c;有9个不同的信息&#xff0c;编号 0-8&#xff0c;每个信息包括20 个字符。从键盘接收 0-8 之间的一个编号&#xff0c;然后再屏幕上显示出相应编号的信息内容&#xff0c;按“q”键退出 完整代码 DATAS SEGMENTn0…

Day 03 python学习笔记

位运算 基于二进制的运算&#xff08;计算机的底层基于位运算&#xff09; 计算机最小单位&#xff1a;bit (比特/位/二进制) 1byte&#xff08;字节&#xff09; 8bit &#xff08; 0000 0000&#xff09; &&#xff1a;与 &#xff08;全真为真&#xff0c;一假则…

项目开发过程中遇到了什么困难?

1.需求最初是什么样的&#xff1f; 2.如何挖掘的需求&#xff0c;挖掘后真实的需求是什么样的&#xff1f; 3.我们做了那些调查&#xff1f; 4.我们给出了那些方案&#xff0c;优缺点是什么&#xff1f; 5.根据实际情况&#xff0c;老板的期望&#xff0c;最终我们选择的什…

Leetcode 01-算法入门与数组-③数组排序

LeetCode 01-算法入门与数组-③数组排序 一. 冒泡排序 1. 冒泡排序算法思想 冒泡排序&#xff08;Bubble Sort&#xff09;基本思想&#xff1a; 经过多次迭代&#xff0c;通过相邻元素之间的比较与交换&#xff0c;使值较小的元素逐步从后面移到前面&#xff0c;值较大的元素…

SAP PO运维(一):系统概览异常处理

打开SAP PIPO Netweaver Administration界面,系统概览下显示异常: 参考SAP note: 2577844 - AS Java Monitoring and Logging parametrization best practice service/protectedwebmethods = SDEFAULT -GetVersionInfo -GetAccessPointList -ListLogFiles -ReadLogFile -Para…

为什么选择Spring Cloud

Spring Cloud与Netflix Netflix是一家做视频网站的公司&#xff0c;之所以要说一下这个公司是因为Spring Cloud在发展之初&#xff0c;Netflix做了很大的贡献。包括服务注册中心Eureka、服务调用Ribbon、Feign&#xff0c;服务容错限流Hystrix、服务网关Zuul等众多组件都是Net…

Linux下ThinkPHP5实现定时器任务 - 结合crontab

实例一&#xff1a; 1.在/application/command创建要配置的PHP类文件&#xff0c;需要继承Command类&#xff0c;并重写configure和execute两个方法&#xff0c;例如: <?php namespace app\command; use think\console\Command; use think\console\Input; use think\cons…

FatFS文件系统在MCU上的应用

FatFS文件系统是单片机领域有名的一个文件系统&#xff0c;由于它的轻量级和兼容性&#xff0c;备受MCU开发者青睐。 在实现如U盘文件读写&#xff0c;SD卡的文件读写等工作时&#xff0c;我们往往需要一个文件系统来支持我们的工作。特别在一些MCU应用中&#xff0c;文件系统…

PPPoE配置

实验需求 配置IP地址使用PPPOE拨号上网设置路由让直播业务部和营销部都可以访问外网 实验拓扑 实验步骤 配置 R1地址池 电信链路&#xff1a; [Huawei]undo info-center enable Info: Information center is disabled. [Huawei]sysname r1 [r1]ip pool zhibo  //配置…

【沐风老师】3DMAX翻转折叠动画插件FoldFx使用方法详解

3DMAX翻转折叠动画插件FoldFx使用方法详解 3DMAX翻转折叠动画插件FoldFx,是3dMax运动图形工具,用于创建多边形折叠动画。用户几乎有无限的可能性,因为动画的每个方面都是可控的。 【适用版本】 适用于3dMax版本:2010及更新版本(推荐3dMax2016及更高版本)。 【安装方法】…

Go 围炉札记

文章目录 一、Go 安装 一、Go 安装 VScode下配置Go语言开发环境【2023最新】 基础篇&#xff1a;新手使用vs code新建go项目 vscode里安装Go插件和配置Go环境 Documentation Golang 配置代理 Go命令详解 一文详解Go语言常用命令 Go 语言教程 熬夜整理&#xff0c;最全的Go语…

大数据-玩转数据-Flink SQL编程

一、概念 1.1 Apache Flink 两种关系型 API Apache Flink 有两种关系型 API 来做流批统一处理&#xff1a;Table API 和 SQL。 Table API 是用于 Scala 和 Java 语言的查询API&#xff0c;它可以用一种非常直观的方式来组合使用选取、过滤、join 等关系型算子。 Flink SQL 是…

CSS 学习笔记(基础)

用来控制网页表现的语言&#xff0c;CSS&#xff08;Cascading Style Sheet&#xff09;&#xff1a;层叠样式表。然后我们继续看看 W3C 标准&#xff1a; 结构&#xff1a;HTML表现&#xff1a;CSS行为&#xff1a;JavaScript CSS导入方式、选择器&属性 由于网页的框架…

linux进程杀不死

项目场景&#xff1a; 虚拟机 问题描述 linux进程杀不死 无反应 原因分析&#xff1a; 进程僵死zombie 解决方案&#xff1a; 进proc或者find命令找到进程所在地址 cat status查看进程杀死子进程

linux系统中mysql 连接出现“too many connections”问题解决办法

问题内容&#xff1a; 原因: mysql配置参数中设定的并发连接数太少或者系统繁忙导致连接数被占满。连接数超过了 MySQL 设置的值&#xff0c; 与 max_connections 和 wait timeout 都有关&#xff0c;wait_timeout 的值越大&#xff0c;连接的空闲等待就越长&#xff0c; 这样就…

Linux忘记密码

在虚拟机安装了centOS7&#xff0c;但是忘记了root密码&#xff0c;登录的时候发现登录不上了&#xff0c;然后重新修改密码。 1、重启虚拟机 2、进入到该页面之后&#xff0c;选中第一个&#xff08;高亮显示即为选中&#xff09;选项&#xff0c;然后按下键盘的“E”键 3…

mybatis日志体系

title: “java日志体系” createTime: 2021-12-08T12:19:5708:00 updateTime: 2021-12-08T12:19:5708:00 draft: false author: “ggball” tags: [“mybatis”] categories: [“java”] description: “java日志体系” java日志体系 常用日志框架 Log4j&#xff1a;Apache …