Seata TC端协调全局事务

 1、Seata server注册器

//来自RM分支事务注册
super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor);
//开启全局事务
super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor);
//提交全局事务
super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor);
//回滚全局事务
super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor);
//RM分支事务提交
super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, messageExecutor);
//RM分支事务回滚
super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, messageExecutor);

2、DefaultCoordinator协调者

TC开启全局事务
io.seata.server.coordinator.DefaultCoordinator#doGlobalBegin
@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)throws TransactionException {GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,timeout);MDC.put(RootContext.MDC_KEY_XID, session.getXid());session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());session.begin();// transaction start eventeventBus.post(new GlobalTransactionEvent(session.getTransactionId(), GlobalTransactionEvent.ROLE_TC,session.getTransactionName(), applicationId, transactionServiceGroup, session.getBeginTime(), null, session.getStatus()));return session.getXid();}
TC提交全局事务
io.seata.server.coordinator.DefaultCore#doGlobalCommit   @Overridepublic boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {boolean success = true;// start committing eventeventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC,globalSession.getTransactionName(), globalSession.getApplicationId(), globalSession.getTransactionServiceGroup(),globalSession.getBeginTime(), null, globalSession.getStatus()));if (globalSession.isSaga()) {success = getCore(BranchType.SAGA).doGlobalCommit(globalSession, retrying);} else {Boolean result = SessionHelper.forEach(globalSession.getSortedBranches(), branchSession -> {// if not retrying, skip the canBeCommittedAsync branchesif (!retrying && branchSession.canBeCommittedAsync()) {return CONTINUE;}BranchStatus currentStatus = branchSession.getStatus();if (currentStatus == BranchStatus.PhaseOne_Failed) {globalSession.removeBranch(branchSession);return CONTINUE;}try {BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);switch (branchStatus) {case PhaseTwo_Committed:globalSession.removeBranch(branchSession);return CONTINUE;case PhaseTwo_CommitFailed_Unretryable:if (globalSession.canBeCommittedAsync()) {LOGGER.error("Committing branch transaction[{}], status: PhaseTwo_CommitFailed_Unretryable, please check the business log.", branchSession.getBranchId());return CONTINUE;} else {SessionHelper.endCommitFailed(globalSession);LOGGER.error("Committing global transaction[{}] finally failed, caused by branch transaction[{}] commit failed.", globalSession.getXid(), branchSession.getBranchId());return false;}default:if (!retrying) {globalSession.queueToRetryCommit();return false;}if (globalSession.canBeCommittedAsync()) {LOGGER.error("Committing branch transaction[{}], status:{} and will retry later",branchSession.getBranchId(), branchStatus);return CONTINUE;} else {LOGGER.error("Committing global transaction[{}] failed, caused by branch transaction[{}] commit failed, will retry later.", globalSession.getXid(), branchSession.getBranchId());return false;}}} catch (Exception ex) {StackTraceLogger.error(LOGGER, ex, "Committing branch transaction exception: {}",new String[] {branchSession.toString()});if (!retrying) {globalSession.queueToRetryCommit();throw new TransactionException(ex);}}return CONTINUE;});// Return if the result is not nullif (result != null) {return result;}//If has branch and not all remaining branches can be committed asynchronously,//do print log and return falseif (globalSession.hasBranch() && !globalSession.canBeCommittedAsync()) {LOGGER.info("Committing global transaction is NOT done, xid = {}.", globalSession.getXid());return false;}}// if it succeeds and there is no branch, retrying=true is the asynchronous state when retrying. EndCommitted is executed to improve concurrency performance, and the global transaction ends..if (success && globalSession.getBranchSessions().isEmpty() && retrying) {SessionHelper.endCommitted(globalSession);// committed eventeventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC,globalSession.getTransactionName(), globalSession.getApplicationId(), globalSession.getTransactionServiceGroup(),globalSession.getBeginTime(), System.currentTimeMillis(), globalSession.getStatus()));LOGGER.info("Committing global transaction is successfully done, xid = {}.", globalSession.getXid());}return success;}@Overridepublic GlobalStatus commit(String xid) throws TransactionException {GlobalSession globalSession = SessionHolder.findGlobalSession(xid);if (globalSession == null) {return GlobalStatus.Finished;}globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());// just lock changeStatusboolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {// Highlight: Firstly, close the session, then no more branch can be registered.globalSession.closeAndClean();if (globalSession.getStatus() == GlobalStatus.Begin) {if (globalSession.canBeCommittedAsync()) {globalSession.asyncCommit();return false;} else {globalSession.changeStatus(GlobalStatus.Committing);return true;}}return false;});if (shouldCommit) {boolean success = doGlobalCommit(globalSession, false);//If successful and all remaining branches can be committed asynchronously, do async commit.if (success && globalSession.hasBranch() && globalSession.canBeCommittedAsync()) {globalSession.asyncCommit();return GlobalStatus.Committed;} else {return globalSession.getStatus();}} else {return globalSession.getStatus() == GlobalStatus.AsyncCommitting ? GlobalStatus.Committed : globalSession.getStatus();}}
TC回滚全局事务
io.seata.server.coordinator.DefaultCoordinator#doGlobalRollback@Overridepublic boolean doGlobalRollback(GlobalSession globalSession, boolean retrying) throws TransactionException {boolean success = true;// start rollback eventeventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(),GlobalTransactionEvent.ROLE_TC, globalSession.getTransactionName(),globalSession.getApplicationId(),globalSession.getTransactionServiceGroup(), globalSession.getBeginTime(),null, globalSession.getStatus()));if (globalSession.isSaga()) {success = getCore(BranchType.SAGA).doGlobalRollback(globalSession, retrying);} else {Boolean result = SessionHelper.forEach(globalSession.getReverseSortedBranches(), branchSession -> {BranchStatus currentBranchStatus = branchSession.getStatus();if (currentBranchStatus == BranchStatus.PhaseOne_Failed) {globalSession.removeBranch(branchSession);return CONTINUE;}try {BranchStatus branchStatus = branchRollback(globalSession, branchSession);switch (branchStatus) {case PhaseTwo_Rollbacked:globalSession.removeBranch(branchSession);LOGGER.info("Rollback branch transaction successfully, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());return CONTINUE;case PhaseTwo_RollbackFailed_Unretryable:SessionHelper.endRollbackFailed(globalSession);LOGGER.info("Rollback branch transaction fail and stop retry, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());return false;default:LOGGER.info("Rollback branch transaction fail and will retry, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());if (!retrying) {globalSession.queueToRetryRollback();}return false;}} catch (Exception ex) {StackTraceLogger.error(LOGGER, ex,"Rollback branch transaction exception, xid = {} branchId = {} exception = {}",new String[] {globalSession.getXid(), String.valueOf(branchSession.getBranchId()), ex.getMessage()});if (!retrying) {globalSession.queueToRetryRollback();}throw new TransactionException(ex);}});// Return if the result is not nullif (result != null) {return result;}// In db mode, there is a problem of inconsistent data in multiple copies, resulting in new branch// transaction registration when rolling back.// 1. New branch transaction and rollback branch transaction have no data association// 2. New branch transaction has data association with rollback branch transaction// The second query can solve the first problem, and if it is the second problem, it may cause a rollback// failure due to data changes.GlobalSession globalSessionTwice = SessionHolder.findGlobalSession(globalSession.getXid());if (globalSessionTwice != null && globalSessionTwice.hasBranch()) {LOGGER.info("Rollbacking global transaction is NOT done, xid = {}.", globalSession.getXid());return false;}}if (success) {SessionHelper.endRollbacked(globalSession);// rollbacked eventeventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(),GlobalTransactionEvent.ROLE_TC, globalSession.getTransactionName(),globalSession.getApplicationId(),globalSession.getTransactionServiceGroup(),globalSession.getBeginTime(), System.currentTimeMillis(),globalSession.getStatus()));LOGGER.info("Rollback global transaction successfully, xid = {}.", globalSession.getXid());}return success;}@Overridepublic GlobalStatus rollback(String xid) throws TransactionException {GlobalSession globalSession = SessionHolder.findGlobalSession(xid);if (globalSession == null) {return GlobalStatus.Finished;}globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());// just lock changeStatusboolean shouldRollBack = SessionHolder.lockAndExecute(globalSession, () -> {globalSession.close(); // Highlight: Firstly, close the session, then no more branch can be registered.if (globalSession.getStatus() == GlobalStatus.Begin) {globalSession.changeStatus(GlobalStatus.Rollbacking);return true;}return false;});if (!shouldRollBack) {return globalSession.getStatus();}doGlobalRollback(globalSession, false);return globalSession.getStatus();}

RM注册分支事务到TC

io.seata.server.coordinator.DefaultCoordinator#doBranchRegister    
@Overrideprotected void doBranchRegister(BranchRegisterRequest request, BranchRegisterResponse response,RpcContext rpcContext) throws TransactionException {MDC.put(RootContext.MDC_KEY_XID, request.getXid());response.setBranchId(core.branchRegister(request.getBranchType(), request.getResourceId(), rpcContext.getClientId(),request.getXid(), request.getApplicationData(), request.getLockKey()));}

RM提交分支事务到TC

RM回滚分支事务到TC

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/620758.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

crackmapexec工具详解

下载地址:https://github.com/Porchetta-Industries/CrackMapExec wiki:https://www.crackmapexec.wiki/ 1.安装(MAC) 1.1.python3.9 pipx 安装(运行软件有警告,推荐 python3.11 pipx 安装) …

记录下载安装rabbitmq(Linux) 并整合springboot--详细版(全)

下载rabbitmq(Linux): erlang压缩包: https://share.weiyun.com/TGhfV8eZ rabbitMq-server压缩包: https://share.weiyun.com/ZXbUwWHD (因为RabbitMQ采用 Erlang 实现的工业级的消息队列(MQ)服务器&#…

五、带登录窗体的demo

做了一个简单的带登录窗体的demo,有用户名和密码不能为空的验证,原理是在main.cpp的主函数入口处: 1、将默认的MainWindow主窗体注释。 2、新建一个formlogin登录窗体,在主函数中先运行登录窗体。 3、在登录窗体中引用MainWind…

杨中科 .NETCORE EFCORE 第一部分 基本使用

一 、什么是EF Core 什么是ORM 1、说明: 本课程需要你有数据库、SOL等基础知识。 2、ORM: ObjectRelational Mapping。让开发者用对象操作的形式操作关系数据库 比如插入: User user new User(Name"admin"Password"123”; orm.Save(user);比如查询: Book b…

mac上部署单体hbase

1. 简介 HBase 是一个开源的、分布式的、版本化的典型非关系型数据库。它是 Google BigTable 的开源实现,并且是 Apache 基金会的 Hadoop 项目的一部分1。HBase 在 Hadoop Distributed File System (HDFS) 上运行,作为一个列式存储非关系数据库管理系统…

Flutter之运行错误:this and base files have different roots

运行时报错: this and base files have different roots: E:\Demolpro\waqu\build\flutter-plugin-_android_lifecycle and C:\Users\78535\AppData\Local\Pub\Cache\hosted\pub.dev\flutter_pulgin_android_lifecycle-2.0.17\android 如图: 这种情况…

【JavaSE】P33~P113 方法,重载,数组,对象,构造器,this关键字,数组和对象的内存图,JavaBean

练习 1 方法方法定义及调用JShell简单使用(要求Java9以上)方法语句流程控制及注意事项选择循环 方法的重载 2 数组三种初始化方式数组在内存中的存储内存图 3 对象对象内存图Getter/Setter快捷写法this 关键字构造方法JavaBean的四个标准对象数组 二、编…

【数据结构】八大排序之计数排序算法

🦄个人主页:修修修也 🎏所属专栏:数据结构 ⚙️操作环境:Visual Studio 2022 目录 一.计数排序简介及思想 二.计数排序代码实现 三.计数排序复杂度分析 📌时间复杂度 📌空间复杂度 结语 一.计数排序简介及思想 计数排序(Cou…

Gitlab集成openLDAP统一认证登录

vim /etc/gitlab/gitlab.rb, 可以配置很多个server,因此与sssd服务一样可以配置多个ldap作为高可用 gitlab-ctl reconfiguregitlab-rake gitlab:ldap:checkgitlab-ctl restart gitlab-rake gitlab:ldap:check Checking LDAP ...LDAP: ... Server: ldapm…

发起人自选-钉钉审批

场景描述 配置一个审批流程,在某些审批节点,不能确定谁具体来审批,所以需要手工选择一个人或者多个人保证流程能得以顺利通过。有些审批流程的做法是,上一个节点来选择指定的人,而钉钉的做法是发起人来指定。 钉钉设…

【Maven】007-Maven 工程的继承和聚合关系

【Maven】007-Maven 工程的继承和聚合关系 文章目录 【Maven】007-Maven 工程的继承和聚合关系一、Maven 工程的继承关系1、继承的概念2、继承的作用3、继承的语法4、父工程统一管理依赖版本父工程声明依赖版本子工程继承以来版本 二、Maven 工程的聚合关系1、聚合的概念2、聚合…

【信号与系统】【北京航空航天大学】实验一、信号的MATLAB表示及信号运算

一、实验目的 1、初步掌握 MATLAB 仿真软件的使用; 2、学习使用 MATLAB 产生基本时域信号,并绘制信号波形; 3、学习利用 MATLAB 实现信号的基本运算; 4、利用 MATLAB 分析常用的连续时域信号。 二、实验内容 1、 生成连续信号 …

jQuery圆形轮播自动切换图文

jQuery圆形轮播自动切换图文 注意这里用到了swiper插件&#xff0c;记得引入swiper.js和swiper.css swiper官网 这里面用到的swiper版本是Swiper 4.4.2 不同版本有些写法会不同&#xff0c;可对照官方文档进行调整 效果展示 jquery圆形轮播自动切换文字 html代码片段 <li…

【LangChain学习之旅】—(6) 提示工程(下):用思维链和思维树提升模型思考质量

【LangChain学习之旅】—&#xff08;6&#xff09; 提示工程&#xff08;下&#xff09;&#xff1a;用思维链和思维树提升模型思考质量 什么是 Chain of ThoughtFew-Shot CoTZero-Shot CoTChain of Thought 实战CoT 的模板设计程序的完整框架Tree of Thought总结 Reference&a…

优雅草蜻蜓API大数据服务中心v1.0.4更新-加入蓝奏云直链解析·每日Bing·字数统计·今日油价·历史上的今天等接口

2024年1月13日优雅草蜻蜓API大数据服务中心v1.0.4更新-加入蓝奏云直链解析每日Bing字数统计今日油价历史上的今天等接口 优雅草api服务-大数据中心自12月29日推出以来截止2024年1月13日累计被调用次数为413次&#xff0c;共收录23个接口&#xff0c;截止前一日2024年1月12日当…

《向量数据库指南》让「引用」为 RAG 机器人回答增加可信度

在之前的文章中&#xff0c;我们已经介绍了如何用 Milvus 向量数据库以及 LlamaIndex 搭建基础的聊天机器人《Chat Towards Data Science &#xff5c;如何用个人数据知识库构建 RAG 聊天机器人&#xff1f;》《书接上回&#xff0c;如何用 LlamaIndex 搭建聊天机器人&#xff…

pyqt5 pyinstaller 打包 QThread QLable QscrollArea 滑动 红果短剧

废话 不多说&#xff0c;直接上代码&#xff01;&#xff01;&#xff01; UI.py self.scrollArea QtWidgets.QScrollArea(self.centralwidget)self.scrollArea.setGeometry(QtCore.QRect(20, 130, 541, 511))self.scrollArea.setWidgetResizable(True)self.scrollArea.setOb…

vue2、vue3里面去掉访问地址中路由‘#‘号--nginx配置

需求 我们这里分享一下关于Vue2和Vue3里面如何去掉浏览器路由里面#号的问题&#xff0c;以及nginx的配置。 去掉#号问题之前我们先讨论一下html中的hash模式和history模式。 html中的hash模式 HTML的hash模式指的是URL中的锚点部分&#xff08;#后面的内容&#xff09;被用…

通信入门系列——微积分中极限、连续、导数、微分、积分

本节目录 一、极限 1、数列极限 2、函数极限 二、连续 三、导数 四、微分 五、积分本节内容 一、极限 1、数列极限 数列极限&#xff1a;设{xn}为一个实数列&#xff0c;A为一个定数。若对任意给定的ε>0&#xff0c;总存在正整数N,使得当n>N时&#xff0c;有|xn-A|<…

linux搭建SRS服务器

linux搭建SRS服务器 文章目录 linux搭建SRS服务器SRS说明实验说明搭建步骤推流步骤查看web端服务器拉流步骤final SRS说明 SRS&#xff08;simple Rtmp Server&#xff09;,是一个简单高效的实时视频服务器&#xff0c;支持RTMP/WebRTC/HLS/HTTP-FLV/SRT, 是国人自己开发的一款…