【Seata源码学习 】篇三 seata客户端全局事务开启、提交与回滚

1.GlobalTransactionalInterceptor 对事务方法对增强

我们已经知道 GlobalTransactionScanner 会给bean的类或方法上面标注有@GlobalTransactional 注解 和 @GlobalLock的 添加一个 advisor (DefaultPointcutAdvisor ,advisor = 绑定了PointCut 的 advise)

而此处的 DefaultPointcutAdvisor 的 advice 为 GlobalTransactionalInterceptor,PointCut 为 Pointcut.TRUE(匹配所有方法)

然后由 DynamicAdvisedInterceptor 回调函数 获取当前 bean对应的 AdvisedSupport , 从中遍历所有的 advisor,然后再获取 此 advisor 绑定的 PointCut 对目标方法进行匹配 ,如果满足,则添加到 拦截器链路中,后续递归调用

接下来我们看下 GlobalTransactionalInterceptor 对目标方法做了哪些增强

io.seata.spring.annotation.GlobalTransactionalInterceptor#invoke

 public Object invoke(final MethodInvocation methodInvocation) throws Throwable {Class<?> targetClass =methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);// Method.getDeclaringClass 获取方法声明的类 如果是静态方法,则返回方法所在类;如果是实例方法,则返回方法所在类的超类//!specificMethod.getDeclaringClass().equals(Object.class) 排除Object方法if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);//默认包装生成的DefaultPointcutAdvisor 拦截所有的方法// public DefaultPointcutAdvisor(Advice advice) {//		this(Pointcut.TRUE, advice);//	}//因此在此处对方法进行过滤  只处理标注了@GlobalTransactional 和 @GlobalLock 注解的方法final GlobalTransactional globalTransactionalAnnotation =getAnnotation(method, targetClass, GlobalTransactional.class);final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);if (!localDisable) {if (globalTransactionalAnnotation != null || this.aspectTransactional != null) {//根据@GlobalTransactional 注解的属性封装事务切面信息AspectTransactional transactional;if (globalTransactionalAnnotation != null) {transactional = new AspectTransactional(globalTransactionalAnnotation.timeoutMills(),globalTransactionalAnnotation.name(), globalTransactionalAnnotation.rollbackFor(),globalTransactionalAnnotation.rollbackForClassName(),globalTransactionalAnnotation.noRollbackFor(),globalTransactionalAnnotation.noRollbackForClassName(),globalTransactionalAnnotation.propagation(),globalTransactionalAnnotation.lockRetryInterval(),globalTransactionalAnnotation.lockRetryTimes());} else {transactional = this.aspectTransactional;}//处理全局事务return handleGlobalTransaction(methodInvocation, transactional);} else if (globalLockAnnotation != null) {//处理全局锁return handleGlobalLock(methodInvocation, globalLockAnnotation);}}}return methodInvocation.proceed();}

2.全局事务执行器 TransactionalExecutor 的匿名实现

io.seata.spring.annotation.GlobalTransactionalInterceptor#handleGlobalTransaction

  Object handleGlobalTransaction(final MethodInvocation methodInvocation,final AspectTransactional aspectTransactional) throws Throwable {boolean succeed = true;try {return transactionalTemplate.execute(new TransactionalExecutor() {@Overridepublic Object execute() throws Throwable {// 责任链模式 继续执行链路上的其他拦截器方法,如果已经执行到最后一个// 则直接执行目标方法return methodInvocation.proceed();}public String name() {String name = aspectTransactional.getName();if (!StringUtils.isNullOrEmpty(name)) {return name;}return formatMethod(methodInvocation.getMethod());}@Overridepublic TransactionInfo getTransactionInfo() {//注解解析// reset the value of timeoutint timeout = aspectTransactional.getTimeoutMills();if (timeout <= 0 || timeout == DEFAULT_GLOBAL_TRANSACTION_TIMEOUT) {timeout = defaultGlobalTransactionTimeout;}TransactionInfo transactionInfo = new TransactionInfo();transactionInfo.setTimeOut(timeout);transactionInfo.setName(name());transactionInfo.setPropagation(aspectTransactional.getPropagation());transactionInfo.setLockRetryInterval(aspectTransactional.getLockRetryInterval());transactionInfo.setLockRetryTimes(aspectTransactional.getLockRetryTimes());Set<RollbackRule> rollbackRules = new LinkedHashSet<>();for (Class<?> rbRule : aspectTransactional.getRollbackFor()) {rollbackRules.add(new RollbackRule(rbRule));}for (String rbRule : aspectTransactional.getRollbackForClassName()) {rollbackRules.add(new RollbackRule(rbRule));}for (Class<?> rbRule : aspectTransactional.getNoRollbackFor()) {rollbackRules.add(new NoRollbackRule(rbRule));}for (String rbRule : aspectTransactional.getNoRollbackForClassName()) {rollbackRules.add(new NoRollbackRule(rbRule));}transactionInfo.setRollbackRules(rollbackRules);return transactionInfo;}});} catch (TransactionalExecutor.ExecutionException e) {TransactionalExecutor.Code code = e.getCode();//根据异常类型执行不同的钩子方法switch (code) {case RollbackDone:throw e.getOriginalException();case BeginFailure:succeed = false;failureHandler.onBeginFailure(e.getTransaction(), e.getCause());throw e.getCause();case CommitFailure:succeed = false;failureHandler.onCommitFailure(e.getTransaction(), e.getCause());throw e.getCause();case RollbackFailure:failureHandler.onRollbackFailure(e.getTransaction(), e.getOriginalException());throw e.getOriginalException();case RollbackRetrying:failureHandler.onRollbackRetrying(e.getTransaction(), e.getOriginalException());throw e.getOriginalException();default:throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code));}} finally {if (degradeCheck) {EVENT_BUS.post(new DegradeCheckEvent(succeed));}}}

将事务参数信息封装到TransactionInfo对象中,如事务传播行为,超时时间,超时重试次数,异常回滚类型等等,

如果出现异常,则根据异常类型分别执行不同的钩子方法

并且会继续回调拦截器链路上的其他拦截器方法

3.TransactionalTemplate 全局事务执行过程的模版方法

io.seata.tm.api.TransactionalTemplate#execute

public Object execute(TransactionalExecutor business) throws Throwable {// 1. Get transactionInfo//获取@GlobalTransation注解的属性封装的TransactionInfoTransactionInfo txInfo = business.getTransactionInfo();if (txInfo == null) {throw new ShouldNeverHappenException("transactionInfo does not exist");}// 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.// GlobalTransactionContext 全局事务上下文对象 用于创建一个新事务,或者获取当前事务// GlobalTransactionContext.getCurrent - > RootContext.getXID -> ContextCore.get// ContextCore 是一个接口 seata有两个实现  FastThreadLocalContextCore  ThreadLocalContextCore 都是基于ThreadLocal存储XIDGlobalTransaction tx = GlobalTransactionContext.getCurrent();// 1.2 Handle the transaction propagation.// 获取当前事务的传播行为Propagation propagation = txInfo.getPropagation();// 用于存储被挂起的事务XIDSuspendedResourcesHolder suspendedResourcesHolder = null;try {//处理事务的传播行为switch (propagation) {//如果当前事务的传播行为是 NOT_SUPPORTED 则以非事务的方式执行调用methodInvocation.proceed()// 如果当前拦截器不为拦截链的最后一个,则将获取下一个拦截器执行invoke方法,如果是最后一个,则直接执行目标方法case NOT_SUPPORTED:// If transaction is existing, suspend it.//如果当前存在全局事务,则挂起当前事务if (existingTransaction(tx)) {suspendedResourcesHolder = tx.suspend();}// Execute without transaction and return.// 继续执行拦截器链return business.execute();case REQUIRES_NEW:// If transaction is existing, suspend it, and then begin new transaction.// 如果当前存在事务 则挂起当前事务 并创建一个新的事务if (existingTransaction(tx)) {suspendedResourcesHolder = tx.suspend();tx = GlobalTransactionContext.createNew();}// Continue and execute with new transactionbreak;case SUPPORTS:// If transaction is not existing, execute without transaction.// 如果不存在事务 则跳过当前事务拦截器 执行拦截器链并返回if (notExistingTransaction(tx)) {return business.execute();}// Continue and execute with new transactionbreak;case REQUIRED:// If current transaction is existing, execute with current transaction,// else continue and execute with new transaction.break;case NEVER:// If transaction is existing, throw exception.// 有事务抛出异常if (existingTransaction(tx)) {throw new TransactionException(String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s", tx.getXid()));} else {// Execute without transaction and return.return business.execute();}case MANDATORY:// If transaction is not existing, throw exception.// 要求必须有事务,没事务抛出异常if (notExistingTransaction(tx)) {throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");}// Continue and execute with current transaction.break;default:throw new TransactionException("Not Supported Propagation:" + propagation);}// 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.// 如果当前的事务上下文中不存在事务,实例化默认全局事务对象 且此次事务发起为 TM 角色为 Launcherif (tx == null) {tx = GlobalTransactionContext.createNew();}// set current tx config to holder// 记录当前的全局锁配置,存放到 ThreadLocalGlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);try {// 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,//    else do nothing. Of course, the hooks will still be triggered.// 执行全局事务开启的前后置钩子方法// 如果当前事务的角色是 Participant 也就是 RM ,判断当前事务上下文RootContext是否存在XID,如果不存在,抛出异常// 如果当前事务的角色是 launcher 也就是 TM ,判断当前事务上下文RootContext是否存在XID,如果存在,抛出异常// 如果不存在,则通过TmNettyRemotingClient  向TC发送一个 GlobalBeginRequest 同步消息,并获取TC返回的XID,绑定到RootContextbeginTransaction(txInfo, tx);Object rs;try {// Do Your Business// 执行执行拦截器链路rs = business.execute();} catch (Throwable ex) {// 3. The needed business exception to rollback.// 如果抛出异常,判断异常是否在指定的范围中(默认为Throwable类及其子类)// 执行异常回滚的前后钩子方法// 如果当前事务的角色是 launcher 也就是 TM ,通过TmNettyRemotingClient  向TC发送一个 GlobalRollbackRequest 同步消息// 并记录TC返回的当前事务状态StatuscompleteTransactionAfterThrowing(txInfo, tx, ex);throw ex;}// 4. everything is fine, commit.//  如果方法执行过程中没有出现异常//  执行事务提交的前后置方法//  如果当前事务的角色是 launcher 也就是 TM ,通过TmNettyRemotingClient  向TC发送一个 GlobalCommitRequest 同步消息// 并记录TC返回的当前事务状态StatuscommitTransaction(tx);return rs;} finally {//5. clear// 恢复以前的全局锁配置resumeGlobalLockConfig(previousConfig);// 执行整个事务完成的前后置方法triggerAfterCompletion();// 移除当前绑定的事务钩子对象cleanUp();}} finally {// If the transaction is suspended, resume it.// 当前事务执行完毕后,恢复挂起的事务,// 获取suspendedResourcesHolder关联的xid,由RootContext重新绑定if (suspendedResourcesHolder != null) {tx.resume(suspendedResourcesHolder);}}}

判断RootContext中是否绑定了XID,如果没有绑定,说明当前不存在事务返回null,如果有绑定XID,则返回默认的GlobalTransaction实现,记录当前全局事务的状态为beging,且为事务的参与者participate。接下来根据全局事务不同的传播行为,进一步判断需不需要挂起当前的全局事务,或者跳过事务处理,如果当前的传播行为要求有一个事务,而当前不存在全局事务(GlobalTransaction对象为null),则无参实例化GlobalTransaction,默认为事务的发起者luancher,事务状态未知

接着执行不同的钩子方法,且都是由事务发起者luancher使用 TmNettyRemotingClient 与 TC 通信,发送GlobalReportRequest消息,如果链路执行顺利,则发送GlobalCommitRequest消息,如果出现异常,发送GlobalRollbackRequest

4.总结与流程图

seata客户端模版方法执行

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/150926.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

工业基础类IFC—材质和纹理

在我们的 IFC技术交流群&#xff08;788206534&#xff09;里&#xff0c;经常会有人提问“如何学习 IFC文档或者其开发”的问题。对于这个问题&#xff0c;我一直没有机会做一个完整的回答&#xff0c;这次我认真回忆了自己关于IFC的学习经历&#xff0c;在此与大家分享。一是…

Wireshark 截取指定端口海量包分析

有个应用要分析一下协议&#xff0c;但是8939&#xff0c;8940传输一下子几个G的数据&#xff0c;但是要分析的端口8939实际上只有几个MB最多&#xff0c;如果用wireshark有界面的程序一截取就会卡死&#xff0c;于是使用命令行方式&#xff0c;截取指定端口的 tshark -i &quo…

Flume的安装部署及常见问题解决

1.安装地址 &#xff08;1&#xff09; Flume官网地址&#xff1a;http://flume.apache.org/ &#xff08;2&#xff09;文档查看地址&#xff1a;http://flume.apache.org/FlumeUserGuide.html &#xff08;3&#xff09;下载地址&#xff1a;http://archive.apache.org/dist…

基于Qt QList和QMap容器类示例

## QList<T> QList<T>容器是一个数组列表,特点如下: 1.大多数情况下可以用QList。像prepend()、append()和insert()这种操作,通常QList比QVector快的多。这是因为QList是基于index标签存储它的元素项在内存中(虽然内存不连续,这点与STL的list 是一样的),比…

【机器学习基础】K-Means聚类算法

&#x1f680;个人主页&#xff1a;为梦而生~ 关注我一起学习吧&#xff01; &#x1f4a1;专栏&#xff1a;机器学习 欢迎订阅&#xff01;相对完整的机器学习基础教学&#xff01; ⭐特别提醒&#xff1a;针对机器学习&#xff0c;特别开始专栏&#xff1a;机器学习python实战…

linux镜像的下载,系统下载(个人使用)

文章目录 一、系统之家二、国内镜像源三、Centos官网四、安装成功截图五、镜像类型的区别参考文档 一、系统之家 系统之家官网 二、国内镜像源 下载镜像地址&#xff1a; 1、官网地址&#xff1a;https://www.centos.org/ 2、阿里镜像站&#xff1a;https://mirrors.aliyu…

一文读懂:testcafe框架和页面元素交互

一、互动要求 使用 TestCafe 与元素进行交互操作&#xff0c;元素需满足以下条件&#xff1a;☟ 元素在 body 页面窗口或 iframe 窗口的元素内。如果某个元素在视口之外&#xff0c;则 TestCafe 通过滚动可以滚动到元素可见。 元素是可见的&#xff0c;具有以下属性&#…

实力认证|易知微上榜中国信息通信研究院数字孪生城市产业图谱!

近期&#xff0c;中国通信院就数字孪生技术在城市层面的广泛应用&#xff0c;根据数字孪生产业框架&#xff0c;结合产业发展动态和企业综合实力评估&#xff0c;选取了核心产业、关联产业和辐射产业等各领域业务代表性较强的企业&#xff08;机构&#xff09;&#xff0c;形成…

Flink(六)【DataFrame 转换算子(下)】

前言 今天学习剩下的转换算子&#xff1a;分区、分流、合流。 每天出来自学是一件孤独又充实的事情&#xff0c;希望多年以后回望自己的大学生活&#xff0c;不会因为自己的懒惰与懈怠而悔恨。 回答之所以起到了作用&#xff0c;原因是他们自己很努力。 …

FPGA系列:1、FPGA/verilog源代码保护:基于Quartus13.1平台保护verilog源码发给第三方但不泄露源码

catlog 需求具体步骤工程描述去掉相关调试文件切换顶层模块并导出相应模块为网表文件切换回原顶层模块并添加相应保护模块的qxp文件再次编译工程 参考&#xff1a; 需求 有时需要将源码交付给第三方&#xff0c;但是源码中部分模块涉及到的核心代码无法暴漏给第三方。因此&…

2023年【高处安装、维护、拆除】模拟考试题及高处安装、维护、拆除模拟考试题库

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 2023年【高处安装、维护、拆除】模拟考试题及高处安装、维护、拆除模拟考试题库&#xff0c;包含高处安装、维护、拆除模拟考试题答案和解析及高处安装、维护、拆除模拟考试题库练习。安全生产模拟考试一点通结合国家…

C语言之qsort()函数的模拟实现

C语言之qsort()函数的模拟实现 文章目录 C语言之qsort()函数的模拟实现1. 简介2. 冒泡排序3. 对冒泡排序进行改造4. 改造部分4.1 保留部分的冒泡排序4.2 比较部分4.3 交换部分 5. bubble_sort2完整代码6. 使用bubble_sort2来排序整型数组7. 使用bubble_sort2来排序结构体数组7.…

golang学习笔记——接口interfaces

文章目录 Go 语言接口例子空接口空接口的定义空接口的应用空接口作为函数的参数空接口作为map的值 类型断言接口值 类型断言例子001类型断言例子002类型断言例子003巩固练习 Go 语言接口 接口&#xff08;interface&#xff09;定义了一个对象的行为规范&#xff0c;只定义规范…

基于java web个人财务管理系统

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SSM 前端&#xff1a;采用JSP技术开发 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目&#x…

stable-diffusion-webui之webui.py

主要就是webui的启动这块&#xff0c;需要初始化的地方&#xff0c;东西还是挺多的。

8年资深测试,自动化测试常见问题总结,惊险避坑...

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 1、自动化测试简介…

2023.11.18 Hadoop之 YARN

1.简介 Apache Hadoop YARN &#xff08;Yet Another Resource Negotiator&#xff0c;另一种资源协调者&#xff09;是一种新的 Hadoop 资源管理器&#xff0c;它是一个通用资源管理系统和调度平台&#xff0c;可为上层应用提供统一的资源管理和调度。支持多个数据处理框架&…

《轻购优品》新零售玩法:消费积分认购+众筹新玩法

《轻购优品》新零售玩法&#xff1a;消费积分认购众筹新玩法 引言&#xff1a;2023年开年已来&#xff0c;政府的工作报告提出“把恢复和扩大消费摆在优先位置”&#xff0c;并且把2023年定位为“消费提振年”&#xff0c;以“全年乐享全年盛惠”为主题多地政府共同发力&#x…

Altium Designer 相同模块的布局布线复用-AD

1、利用交互式布线&#xff0c;将两个相同模块的元器件在PCB上分块显示。 在原理图中&#xff0c;框选某一模块电路、按快捷键 TS 切换到PCB编辑界面、工具>器件摆放>在矩形区域内排列&#xff08;可将模块中的器件都集中放置到矩形框内&#xff09;。2、为模块电路添加 …

YOLOv8改进 | EIoU、SIoU、WIoU、DIoU、FocusIoU等二十余种损失函数

一、本文介绍 这篇文章介绍了YOLOv8的重大改进&#xff0c;特别是在损失函数方面的创新。它不仅包括了多种IoU损失函数的改进和变体&#xff0c;如SIoU、WIoU、GIoU、DIoU、EIOU、CIoU&#xff0c;还融合了“Focus”思想&#xff0c;创造了一系列新的损失函数。这些组合形式的…