Spring 事务源码分析

前言:

我们知道 Spring 声明式事务是通过 AOP 来实现的,日常项目开发中我们只需要使用 @Transactional 注解就可以实现声明式事务,那你知道通过 @Transactional 注解怎样实现事务的吗?本篇我们将从源码来分析 Spring 声明式事务的执行流程。

Spring 相关知识储备:

深入理解 Spring IOC 底层实现机制(refresh 方法源码分析)

Spring 源码之 BeanDefinition 加载分析

深入理解 Spring Bean 生命周期(附源码分析)

深入理解 Spring 循环依赖之三级缓存(附源码分析)

深入理解 Spring AOP 源码分析(附源码分析)

Spring 事务理解

Spring 声明式事务的原理?

Spring 声明式事务是借助于 Spring AOP 来实现的,在方法开始之前开启事务,在方法执行之后提交或回滚事务,Spring 是通过 TransactionInterceptor 来实现的,TransactionInterceptor 实现了 MethodInterceptor 接口,即通过 AOP 环绕增强的方式实现 Spring 事务。

TransactionInterceptor 类源码分析:

TransactionInterceptor 类主要是通过 invoke 方法调用了父类 TransactionAspectSupport #invokeWithinTransaction 方法来管理事务。

public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable {//无参构造方法public TransactionInterceptor() {}//有参构造方法public TransactionInterceptor(PlatformTransactionManager ptm, Properties attributes) {this.setTransactionManager(ptm);this.setTransactionAttributes(attributes);}//有参构造方法public TransactionInterceptor(PlatformTransactionManager ptm, TransactionAttributeSource tas) {this.setTransactionManager(ptm);this.setTransactionAttributeSource(tas);}//核心方法 通过 AOP 拦截 Spring 声明式事务@Nullablepublic Object invoke(final MethodInvocation invocation) throws Throwable {//获取 targetClass 也就是被代理的类的 ClassClass<?> targetClass = invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null;//调用 TransactionAspectSupport 的 invokeWithinTransaction 方法return this.invokeWithinTransaction(invocation.getMethod(), targetClass, new CoroutinesInvocationCallback() {@Nullablepublic Object proceedWithInvocation() throws Throwable {//执行目标方法return invocation.proceed();}public Object getTarget() {return invocation.getThis();}public Object[] getArguments() {return invocation.getArguments();}});}......
}

TransactionAspectSupport 类

TransactionAspectSupport 类是 Spring 管理事务的基础类,支持声明式事务和编程式事务的管理,TransactionAspectSupport 通过模板方法来制定了事务的流程,通过策略模式来实际管理事务,针对声明式事务和编程式事务有不同的事务管理器,如下:

  • PlatformTransactionManager:声明式事务管理器。
  • ReactiveTransactionManager:编程式事务管理器。

TransactionAspectSupport 管理事务的流程简图(声明式事务)

TransactionAspectSupport 管理事务的流程非常清晰简单,依次是获取事务源、确定事务管理器、创建事务、执行业务方法、提交或者回滚事务,跟我们了解的事务流程完全一致。

在这里插入图片描述
了解了 TransactionAspectSupport 管理事务的流程,我们分析一下源码看看是不是这么回事。

TransactionAspectSupport#invokeWithinTransaction 方法源码分析

TransactionAspectSupport#invokeWithinTransaction 方法逻辑非常清晰,首先就是获取一个事务源,然后获取事务管理器,判断是那种类型的事务,然后就开始创建事务、执行业务方法、执行事务提交或者回滚。

//TransactionAspectSupport#invokeWithinTransaction 
@Nullable
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, TransactionAspectSupport.InvocationCallback invocation) throws Throwable {//获取事务属性源TransactionAttributeSource tas = this.getTransactionAttributeSource();//获取事务属性TransactionAttribute txAttr = tas != null ? tas.getTransactionAttribute(method, targetClass) : null;//根据事务属性获取事务管理器TransactionManager tm = this.determineTransactionManager(txAttr);//this.reactiveAdapterRegistry != null:编程式适配器注册表不为空//tm instanceof ReactiveTransactionManager:是否是编程式事务管理器if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {TransactionAspectSupport.ReactiveTransactionSupport txSupport = (TransactionAspectSupport.ReactiveTransactionSupport)this.transactionSupportCache.computeIfAbsent(method, (key) -> {if (KotlinDetector.isKotlinType(method.getDeclaringClass()) && TransactionAspectSupport.KotlinDelegate.isSuspend(method)) {throw new TransactionUsageException("Unsupported annotated transaction on suspending function detected: " + method + ". Use TransactionalOperator.transactional extensions instead.");} else {ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(method.getReturnType());if (adapter == null) {throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type: " + method.getReturnType());} else {return new TransactionAspectSupport.ReactiveTransactionSupport(adapter);}}});return txSupport.invokeWithinTransaction(method, targetClass, invocation, txAttr, (ReactiveTransactionManager)tm);} else {//返回声明式事务管理器PlatformTransactionManager ptm = this.asPlatformTransactionManager(tm);//切点名称 类名+方法名称String joinpointIdentification = this.methodIdentification(method, targetClass, txAttr);Object retVal;if (txAttr != null && ptm instanceof CallbackPreferringPlatformTransactionManager) {//CallbackPreferringPlatformTransactionManager 表示是编程式是事务 我们重点分析 申明式事务TransactionAspectSupport.ThrowableHolder throwableHolder = new TransactionAspectSupport.ThrowableHolder();try {retVal = ((CallbackPreferringPlatformTransactionManager)ptm).execute(txAttr, (statusx) -> {TransactionAspectSupport.TransactionInfo txInfo = this.prepareTransactionInfo(ptm, txAttr, joinpointIdentification, statusx);Object var9;try {Object retVal = invocation.proceedWithInvocation();if (vavrPresent && TransactionAspectSupport.VavrDelegate.isVavrTry(retVal)) {retVal = TransactionAspectSupport.VavrDelegate.evaluateTryFailure(retVal, txAttr, statusx);}var9 = retVal;return var9;} catch (Throwable var13) {if (txAttr.rollbackOn(var13)) {if (var13 instanceof RuntimeException) {throw (RuntimeException)var13;}throw new TransactionAspectSupport.ThrowableHolderException(var13);}throwableHolder.throwable = var13;var9 = null;} finally {this.cleanupTransactionInfo(txInfo);}return var9;});if (throwableHolder.throwable != null) {throw throwableHolder.throwable;} else {return retVal;}} catch (TransactionAspectSupport.ThrowableHolderException var20) {throw var20.getCause();} catch (TransactionSystemException var21) {if (throwableHolder.throwable != null) {this.logger.error("Application exception overridden by commit exception", throwableHolder.throwable);var21.initApplicationException(throwableHolder.throwable);}throw var21;} catch (Throwable var22) {if (throwableHolder.throwable != null) {this.logger.error("Application exception overridden by commit exception", throwableHolder.throwable);}throw var22;}} else {//不是编程式事务 也就是 声明式事务 //创建一个事务 TransactionAspectSupport.TransactionInfo txInfo = this.createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);try {//通过回调执行目标方法 也就是执行业务逻辑retVal = invocation.proceedWithInvocation();} catch (Throwable var18) {//在方法出现异常的情况下完成事务 也就是异常回滚this.completeTransactionAfterThrowing(txInfo, var18);throw var18;} finally {//清除缓存中的事务信息this.cleanupTransactionInfo(txInfo);}if (vavrPresent && TransactionAspectSupport.VavrDelegate.isVavrTry(retVal)) {TransactionStatus status = txInfo.getTransactionStatus();if (status != null && txAttr != null) {retVal = TransactionAspectSupport.VavrDelegate.evaluateTryFailure(retVal, txAttr, status);}}//提交事务this.commitTransactionAfterReturning(txInfo);return retVal;}}
}

AbstractFallbackTransactionAttributeSource#getTransactionAttribute 方法源码分析

我们知道 AbstractFallbackTransactionAttributeSource#getTransactionAttribute 方法的作用就是获取事务属性,整个流程就是先进行简单判断是否是 Object.class,,,,

//AbstractFallbackTransactionAttributeSource#getTransactionAttribute
//获取事务属性
@Nullable
public TransactionAttribute getTransactionAttribute(Method method, @Nullable Class<?> targetClass) {//如果类是 Objeect 直接返回空 没有获取到四五属性if (method.getDeclaringClass() == Object.class) {return null;} else {//从缓存中查找Object cacheKey = this.getCacheKey(method, targetClass);//从 attributeCache 缓存中查找 Spring 初始化的时候 Bean后置处理器 会识别到 @Transactional注解 的类和方法 将事务属性放入缓存attributeCache 中 并创建 AOP 代理对象TransactionAttribute cached = (TransactionAttribute)this.attributeCache.get(cacheKey);if (cached != null) {//找到了 //cached == NULL_TRANSACTION_ATTRIBUTE : 也就是默认的的事务属性 其实就是空return cached == NULL_TRANSACTION_ATTRIBUTE ? null : cached;} else {//缓存中没有//去解析方法 和 代理类TransactionAttribute txAttr = this.computeTransactionAttribute(method, targetClass);if (txAttr == null) {//解析结果还是为 null 表示这个方法不需要事务 存入缓存 下次好用this.attributeCache.put(cacheKey, NULL_TRANSACTION_ATTRIBUTE);} else {//不为空 获取方法的全名 类名+方法名String methodIdentification = ClassUtils.getQualifiedMethodName(method, targetClass);if (txAttr instanceof DefaultTransactionAttribute) {//是 DefaultTransactionAttributeDefaultTransactionAttribute dta = (DefaultTransactionAttribute)txAttr;//设置事务属性描述dta.setDescriptor(methodIdentification);dta.resolveAttributeStrings(this.embeddedValueResolver);}if (this.logger.isTraceEnabled()) {this.logger.trace("Adding transactional method '" + methodIdentification + "' with attribute: " + txAttr);}//加入事务属性缓存this.attributeCache.put(cacheKey, txAttr);}return txAttr;}}
}

AbstractFallbackTransactionAttributeSource#computeTransactionAttribute 源码分析

AbstractFallbackTransactionAttributeSource#computeTransactionAttribute 方法的作用就是解析方法或者类上的 @Transactional 注解,然后返回事务属性,我们接着分析一下解析 @Transactional 注解的源代码,也就是 this.findTransactionAttribute(specificMethod) 这行代码。

//AbstractFallbackTransactionAttributeSource#computeTransactionAttribute
//解析方法和代理类
@Nullable
protected TransactionAttribute computeTransactionAttribute(Method method, @Nullable Class<?> targetClass) {//allowPublicMethodsOnly:是否只允许公共的 public 方法 默认是 true//获取方法修饰符 判断是否是 publicif (this.allowPublicMethodsOnly() && !Modifier.isPublic(method.getModifiers())) {//即使是有 @Transactional 注解方法 如果不是 public 事务不生效return null;} else {//method:接口方法//通过接口方法和代理类 获取到具体的实现类方法Method specificMethod = AopUtils.getMostSpecificMethod(method, targetClass);//在目标方法上找事务属性  真正解析 @Transactional 注解的方法TransactionAttribute txAttr = this.findTransactionAttribute(specificMethod);if (txAttr != null) {//找到直接返回return txAttr;} else {//在目标方法所属类上找事务属性txAttr = this.findTransactionAttribute(specificMethod.getDeclaringClass());if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {//事务属性不为空 且是用户级别的方法  返回事务属性return txAttr;} else {//目标方法所属类上 也没有找到事务属性 一般不会到这里来if (specificMethod != method) {//接口方法和实现类方法不一致//去接口方法上找事务属性txAttr = this.findTransactionAttribute(method);if (txAttr != null) {//找到事务属性返回return txAttr;}//接口方法的类上去找事务属性txAttr = this.findTransactionAttribute(method.getDeclaringClass());if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {//找到返回return txAttr;}}return null;}}}
}

AnnotationTransactionAttributeSource#findTransactionAttribute 源码分析

AnnotationTransactionAttributeSource#findTransactionAttribute 方法就是用事务注解解析器解析注解得到事务属性信息,方法本身没有什么逻辑,只是调用了 SpringTransactionAnnotationParser#parseTransactionAnnotation 方法。

//AnnotationTransactionAttributeSource#findTransactionAttribute(java.lang.reflect.Method)
//查找事务属性
@Nullable
protected TransactionAttribute findTransactionAttribute(Method method) {return this.determineTransactionAttribute(method);
}//AnnotationTransactionAttributeSource#determineTransactionAttribute
//确定事务属性
@Nullable
protected TransactionAttribute determineTransactionAttribute(AnnotatedElement element) {//事务注解解析器 迭代遍历Iterator var2 = this.annotationParsers.iterator();TransactionAttribute attr;do {if (!var2.hasNext()) {return null;}//得到事务注解解析器TransactionAnnotationParser parser = (TransactionAnnotationParser)var2.next();//解析器解析注解元素 得到事务属性attr = parser.parseTransactionAnnotation(element);} while(attr == null);return attr;
}

SpringTransactionAnnotationParser#parseTransactionAnnotation 源码分析

SpringTransactionAnnotationParser#parseTransactionAnnotation 方法真正解析 @Transactional 注解的方法,方法中的相关属性十分熟悉了,比如传播属性、隔离级别、超时时间、是否只读等,然后封装成 TransactionAttribute 返回。

//SpringTransactionAnnotationParser#parseTransactionAnnotation
//解析事务注解属性
protected TransactionAttribute parseTransactionAnnotation(AnnotationAttributes attributes) {//基础规则的事务属性  是 TransactionAttribute 的实现类RuleBasedTransactionAttribute rbta = new RuleBasedTransactionAttribute();//传播属性Propagation propagation = (Propagation)attributes.getEnum("propagation");rbta.setPropagationBehavior(propagation.value());//隔离级别Isolation isolation = (Isolation)attributes.getEnum("isolation");rbta.setIsolationLevel(isolation.value());//超时时间rbta.setTimeout(attributes.getNumber("timeout").intValue());String timeoutString = attributes.getString("timeoutString");Assert.isTrue(!StringUtils.hasText(timeoutString) || rbta.getTimeout() < 0, "Specify 'timeout' or 'timeoutString', not both");rbta.setTimeoutString(timeoutString);//是否只读rbta.setReadOnly(attributes.getBoolean("readOnly"));//限定符 也就是每一个@Transactional注解的value属性rbta.setQualifier(attributes.getString("value"));//设置标签rbta.setLabels(Arrays.asList(attributes.getStringArray("label")));List<RollbackRuleAttribute> rollbackRules = new ArrayList();//回滚的异常Class[] var7 = attributes.getClassArray("rollbackFor");int var8 = var7.length;int var9;Class rbRule;for(var9 = 0; var9 < var8; ++var9) {rbRule = var7[var9];rollbackRules.add(new RollbackRuleAttribute(rbRule));}String[] var11 = attributes.getStringArray("rollbackForClassName");var8 = var11.length;String rbRule;for(var9 = 0; var9 < var8; ++var9) {rbRule = var11[var9];rollbackRules.add(new RollbackRuleAttribute(rbRule));}//不回滚的异常var7 = attributes.getClassArray("noRollbackFor");var8 = var7.length;for(var9 = 0; var9 < var8; ++var9) {rbRule = var7[var9];rollbackRules.add(new NoRollbackRuleAttribute(rbRule));}var11 = attributes.getStringArray("noRollbackForClassName");var8 = var11.length;for(var9 = 0; var9 < var8; ++var9) {rbRule = var11[var9];rollbackRules.add(new NoRollbackRuleAttribute(rbRule));}//设置回滚规则rbta.setRollbackRules(rollbackRules);return rbta;
}

TransactionAspectSupport#determineTransactionManager 源码分析

TransactionAspectSupport#determineTransactionManager 方法就是通过寻找事务管理器,先通过事务属性寻找事务管理器,也就是 @Transactional 注解属性 transactionManager 指定的事务管理器,如果找不到就通过 transactionManagerBeanName 寻找事务管理器,如果再找不到寻找默认的事务管理器并加入缓存中。

//TransactionAspectSupport#determineTransactionManager
@Nullable
protected TransactionManager determineTransactionManager(@Nullable TransactionAttribute txAttr) {if (txAttr != null && this.beanFactory != null) {//事务属性不为空 beanFactory 也不会为空String qualifier = txAttr.getQualifier();//获取限定符 也就是每一个@Transactional注解的value属性或者transactionManager属性if (StringUtils.hasText(qualifier)) {//设置了限定符的 从 beanFactory 中获取事务管理器return this.determineQualifiedTransactionManager(this.beanFactory, qualifier);} else if (StringUtils.hasText(this.transactionManagerBeanName)) {//如果设置了 transactionManagerBeanName 属性  通过 transactionManagerBeanName 查找指定的事务管理器return this.determineQualifiedTransactionManager(this.beanFactory, this.transactionManagerBeanName);} else {//都不是 直接获取默认事务管理器TransactionManager defaultTransactionManager = this.getTransactionManager();if (defaultTransactionManager == null) {//默认事务管理器为空 尝试从缓存中获取defaultTransactionManager = (TransactionManager)this.transactionManagerCache.get(DEFAULT_TRANSACTION_MANAGER_KEY);if (defaultTransactionManager == null) {//缓存中获取的事务管理器还是为空 从容器 beanFactory 中获取类型为 TransactionManager 的事务管理器defaultTransactionManager = (TransactionManager)this.beanFactory.getBean(TransactionManager.class);//加入缓存this.transactionManagerCache.putIfAbsent(DEFAULT_TRANSACTION_MANAGER_KEY, defaultTransactionManager);}}//返回事务管理器return defaultTransactionManager;}} else {//返回事务管理器return this.getTransactionManager();}
}

TransactionAspectSupport#createTransactionIfNecessary 源码分析

TransactionAspectSupport#createTransactionIfNecessary 方法主要作用就是创建一个事务,并将事务信息封装成一个 TransactionInfo 绑定到当前线程中。

//TransactionAspectSupport#createTransactionIfNecessary
//创建事务
protected TransactionAspectSupport.TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm, @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {//如果事务没有自定名称 使用切对面作为事务名称  也就是 类名+方法名if (txAttr != null && ((TransactionAttribute)txAttr).getName() == null) {txAttr = new DelegatingTransactionAttribute((TransactionAttribute)txAttr) {public String getName() {return joinpointIdentification;}};}//事务状态TransactionStatus status = null;if (txAttr != null) {if (tm != null) {//通过事务管理器创建一个事务 并返回事务状态 重点方法status = tm.getTransaction((TransactionDefinition)txAttr);} else if (this.logger.isDebugEnabled()) {this.logger.debug("Skipping transactional joinpoint [" + joinpointIdentification + "] because no transaction manager has been configured");}}//将事务信息封装成 TransactionInfo 并绑定到当前线程中return this.prepareTransactionInfo(tm, (TransactionAttribute)txAttr, joinpointIdentification, status);
}//TransactionAspectSupport#prepareTransactionInfo
//准备事务信息  其实就是将事务信息封装成为 TransactionInfo 并绑定到当前线程中
protected TransactionAspectSupport.TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm, @Nullable TransactionAttribute txAttr, String joinpointIdentification, @Nullable TransactionStatus status) {//将事务信息封装成为 TransactionInfoTransactionAspectSupport.TransactionInfo txInfo = new TransactionAspectSupport.TransactionInfo(tm, txAttr, joinpointIdentification);if (txAttr != null) {if (this.logger.isTraceEnabled()) {this.logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]");}//设置事务状态txInfo.newTransactionStatus(status);} else if (this.logger.isTraceEnabled()) {this.logger.trace("No need to create transaction for [" + joinpointIdentification + "]: This method is not transactional.");}//事务绑定到当前线程txInfo.bindToThread();return txInfo;
}

AbstractPlatformTransactionManager#getTransaction 源码分析

AbstractPlatformTransactionManager#getTransaction 方法会创建一个事务,会先判断是否存在事务、已经事务针对不同的事务传播机制做出不同的处理,核心方法是创建一个新的事务 this.startTransaction(def, transaction, debugEnabled, suspendedResources)。

//org.springframework.transaction.support.AbstractPlatformTransactionManager#getTransaction
//获取一个事务 其实应该说是创建一个事务
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {//获取事务属性TransactionDefinition def = definition != null ? definition : TransactionDefinition.withDefaults();//获取事务对象 例如 DataSourceTransactionManagerObject transaction = this.doGetTransaction();boolean debugEnabled = this.logger.isDebugEnabled();//判断是否已经存在了事务 if (this.isExistingTransaction(transaction)) {//有数据库连接 且是活跃的 表示已经存在了事务 要根据事务传播机制处理return this.handleExistingTransaction(def, transaction, debugEnabled);} else if (def.getTimeout() < -1) {//设置的超时时间小于-1 表示超时时间设置错误  抛出异常throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());} else if (def.getPropagationBehavior() == 2) {//事务传播机制为 2 直接抛出异常 2 对应的是 PROPAGATION_MANDATORY 如果当前存在事务,则加入该事务,如果当前没有事务,则抛出异常throw new IllegalTransactionStateException("No existing transaction found for transaction marked with propagation 'mandatory'");} else if (def.getPropagationBehavior() != 0 && def.getPropagationBehavior() != 3 && def.getPropagationBehavior() != 6) {//def.getPropagationBehavior() != 0:不是 PROPAGATION_REQUIRED 一定要在事务中运行//def.getPropagationBehavior() != 3:不是 PROPAGATION_REQUIRES_NEW 总是创建一个新的事事务运行//def.getPropagationBehavior() != 6:不是 PROPAGATION_NESTED 有没有事务都创建一个新的事务//其实就是非事务运行if (def.getIsolationLevel() != -1 && this.logger.isWarnEnabled()) {this.logger.warn("Custom isolation level specified but no actual transaction initiated; isolation level will effectively be ignored: " + def);}//会创建一个空事物 没有事务的提交以及回滚 但是会把数据库连接绑定到当前线程上boolean newSynchronization = this.getTransactionSynchronization() == 0;return this.prepareTransactionStatus(def, (Object)null, true, newSynchronization, debugEnabled, (Object)null);} else {//非以上情况 就是在事务中运行//如果存在事务 就挂起事务AbstractPlatformTransactionManager.SuspendedResourcesHolder suspendedResources = this.suspend((Object)null);if (debugEnabled) {this.logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);}try {//开启一个新的事务 核心方法return this.startTransaction(def, transaction, debugEnabled, suspendedResources);} catch (Error | RuntimeException var7) {//唤醒挂起的事务this.resume((Object)null, suspendedResources);throw var7;}}
}

AbstractPlatformTransactionManager#startTransaction 源码分析

AbstractPlatformTransactionManager#startTransaction 方法会真正的去开启一个新的事务,并返回事务状态,核心方法是 this.doBegin(transaction, definition),Spring 源码一如既往 do 才是真正干活的方法,doBegin 方法会真正的开启一个新事务。

//org.springframework.transaction.support.AbstractPlatformTransactionManager#startTransaction
//开启一个新的事物
private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction, boolean debugEnabled, @Nullable AbstractPlatformTransactionManager.SuspendedResourcesHolder suspendedResources) {//是否开启新同步  transactionSynchronization = 0 所以默认是需要开启新同步boolean newSynchronization = this.getTransactionSynchronization() != 2;//根据事务属性组装一个事务状态DefaultTransactionStatus status = this.newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);//开启事务 获取数据连接并绑定到当前线程上 重点方法this.doBegin(transaction, definition);//准备同步  也就是激活同步 设置一些状态this.prepareSynchronization(status, definition);//返回事务状态return status;
}//org.springframework.transaction.support.AbstractPlatformTransactionManager#newTransactionStatus
protected DefaultTransactionStatus newTransactionStatus(TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction, boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {//newSynchronization 是否开启新的同步 默认 true//当前线程没有活跃的事务boolean actualNewSynchronization = newSynchronization && !TransactionSynchronizationManager.isSynchronizationActive();//创建一=一个新的 DefaultTransactionStatus 对象 开启新开启的事务 DefaultTransactionStatus 是 TransactionStatus 的默认实现return new DefaultTransactionStatus(transaction, newTransaction, actualNewSynchronization, definition.isReadOnly(), debug, suspendedResources);
}//org.springframework.transaction.support.DefaultTransactionStatus#DefaultTransactionStatus
public DefaultTransactionStatus(@Nullable Object transaction, boolean newTransaction, boolean newSynchronization, boolean readOnly, boolean debug, @Nullable Object suspendedResources) {this.transaction = transaction;this.newTransaction = newTransaction;this.newSynchronization = newSynchronization;this.readOnly = readOnly;this.debug = debug;this.suspendedResources = suspendedResources;
}//org.springframework.transaction.support.AbstractPlatformTransactionManager#prepareSynchronization
//准备同步  也就是激活同步 设置一些状态
protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {//是否是新的同步if (status.isNewSynchronization()) {//设置当前事务是激活状态TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());//设置当前事务隔离级别TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(definition.getIsolationLevel() != -1 ? definition.getIsolationLevel() : null);//设置当前事务是否只读TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());//设置当前事务名称TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());//初始化 初始化了一个 synchronizationsTransactionSynchronizationManager.initSynchronization();}}

DataSourceTransactionManager#doBegin 源码分析

DataSourceTransactionManager#doBegin 方法是真正开启事务的方法,主要是获取事务连接、设置事务隔离级别、设置非自动提交,使用 TransactionSynchronizationManager 将数据库连接绑定到当前线程上,以及发生异常释放数据库连接。

//org.springframework.jdbc.datasource.DataSourceTransactionManager#doBegin
//真正的开启一个事务 Spring 源码一如既往 do 才是真正干活的方法
protected void doBegin(Object transaction, TransactionDefinition definition) {//转为 DataSourceTransactionObject DataSourceTransactionManager.DataSourceTransactionObject txObject = (DataSourceTransactionManager.DataSourceTransactionObject)transaction;//数据库连接Connection con = null;try {//!txObject.hasConnectionHolder():没有数据库连接//txObject.getConnectionHolder().isSynchronizedWithTransaction():txObject 是否激活了事务if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) {//从数据库中获取一个连接Connection newCon = this.obtainDataSource().getConnection();if (this.logger.isDebugEnabled()) {this.logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");}//将新的数据库连接设置到 txObject 中 txObject.setConnectionHolder(new ConnectionHolder(newCon), true);}//获取连接的持有者 并设置与事务同步txObject.getConnectionHolder().setSynchronizedWithTransaction(true);//再次获取数据库连接con = txObject.getConnectionHolder().getConnection();//获取事务的隔离级别Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);//设置事务的隔离级别txObject.setPreviousIsolationLevel(previousIsolationLevel);//设置只读txObject.setReadOnly(definition.isReadOnly());//获取事务的提交方法 一般默认是自动提交if (con.getAutoCommit()) {//设置为手动提交txObject.setMustRestoreAutoCommit(true);if (this.logger.isDebugEnabled()) {this.logger.debug("Switching JDBC Connection [" + con + "] to manual commit");}//自动提交设置为 falsecon.setAutoCommit(false);}//事务只读属性的处理this.prepareTransactionalConnection(con, definition);//设置当前事务是活跃的txObject.getConnectionHolder().setTransactionActive(true);//获取事务超时时间int timeout = this.determineTimeout(definition);if (timeout != -1) {//超时时间合法 就设置给当前事务txObject.getConnectionHolder().setTimeoutInSeconds(timeout);}if (txObject.isNewConnectionHolder()) {//是否是一个新的事务持有者//将事务绑定到事务同步管理器上 这里使用了 ThreadLocalTransactionSynchronizationManager.bindResource(this.obtainDataSource(), txObject.getConnectionHolder());}} catch (Throwable var7) {if (txObject.isNewConnectionHolder()) {//出现异常后  释放连接DataSourceUtils.releaseConnection(con, this.obtainDataSource());//设置当前持有为nulltxObject.setConnectionHolder((ConnectionHolder)null, false);}throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", var7);}
}

DataSourceUtils#prepareConnectionForTransaction 源码分析

DataSourceUtils#prepareConnectionForTransaction 方法的作用就是设置数据库连接的只读属性,修改当前连接的隔离级别。

//org.springframework.jdbc.datasource.DataSourceUtils#prepareConnectionForTransaction
//获取事务隔离级别
@Nullable
public static Integer prepareConnectionForTransaction(Connection con, @Nullable TransactionDefinition definition) throws SQLException {Assert.notNull(con, "No Connection specified");//事务定义不为空 且事务是只读if (definition != null && definition.isReadOnly()) {try {if (logger.isDebugEnabled()) {logger.debug("Setting JDBC Connection [" + con + "] read-only");}//设置当前数据库连接为只读con.setReadOnly(true);} catch (RuntimeException | SQLException var4) {for(Object exToCheck = var4; exToCheck != null; exToCheck = ((Throwable)exToCheck).getCause()) {if (exToCheck.getClass().getSimpleName().contains("Timeout")) {throw var4;}}logger.debug("Could not set JDBC Connection read-only", var4);}}//事务隔离级别Integer previousIsolationLevel = null;if (definition != null && definition.getIsolationLevel() != -1) {if (logger.isDebugEnabled()) {logger.debug("Changing isolation level of JDBC Connection [" + con + "] to " + definition.getIsolationLevel());}//获取连接的隔离级别int currentIsolation = con.getTransactionIsolation();if (currentIsolation != definition.getIsolationLevel()) {//连接的隔离级别不等于手动设置的隔离级别previousIsolationLevel = currentIsolation;//将连接的隔离级别设置为手动设置的隔离级别con.setTransactionIsolation(definition.getIsolationLevel());}}return previousIsolationLevel;
}

DataSourceTransactionManager#prepareTransactionalConnection 源码分析

DataSourceTransactionManager#prepareTransactionalConnection 方法主要对只读事务进行处理,这里主要是对 oracle 数据库,oracle不支持 con.setReadOnly(true) 语法,因此要支持语句。

//DataSourceTransactionManager#prepareTransactionalConnection
//设置事务只读属性
protected void prepareTransactionalConnection(Connection con, TransactionDefinition definition) throws SQLException {if (this.isEnforceReadOnly() && definition.isReadOnly()) {//如果 this.isEnforceReadOnly() 为true 且事务只读属性为 true//创建一个 StatementStatement stmt = con.createStatement();Throwable var4 = null;try {//执行只读语句stmt.executeUpdate("SET TRANSACTION READ ONLY");} catch (Throwable var13) {var4 = var13;throw var13;} finally {if (stmt != null) {if (var4 != null) {try {//关闭连接stmt.close();} catch (Throwable var12) {var4.addSuppressed(var12);}} else {//关闭连接stmt.close();}}}}}

TransactionSynchronizationManager#bindResource 源码分析

将事务绑定到事务管理器中,这里使用了 ThreadLocal,resources 是一个 ThreadLocal。

//TransactionSynchronizationManager#bindResource
//将事务绑定到事务管理器中,这里使用了 ThreadLocal,resources 是一个 ThreadLocal。
public static void bindResource(Object key, Object value) throws IllegalStateException {Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);Assert.notNull(value, "Value must not be null");//获取当前线程的 mapMap<Object, Object> map = (Map)resources.get();if (map == null) {//为空创建一个map = new HashMap();//并设置给当前现场resources.set(map);}//将 actualKey  value 设置到 ThreadLocal 中 actualKey 是  DataSource value 是 ConnectionHolderObject oldValue = ((Map)map).put(actualKey, value);if (oldValue instanceof ResourceHolder && ((ResourceHolder)oldValue).isVoid()) {oldValue = null;}if (oldValue != null) {//已经有绑定到当前线程的 key 抛出异常throw new IllegalStateException("Already value [" + oldValue + "] for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");} else {if (logger.isTraceEnabled()) {logger.trace("Bound value [" + value + "] for key [" + actualKey + "] to thread [" + Thread.currentThread().getName() + "]");}}
}

AbstractPlatformTransactionManager#handleExistingTransaction 源码分析

AbstractPlatformTransactionManager#handleExistingTransaction 方法主要针对嵌套事务进行处理,首先还是判断事务的传播行为,验证是否可以进行嵌套事务,然后根据不同的传播行为进行不同的处理,具体如下:

  • PROPAGATION_NEVER(5):要求以非事务运行,肯定是不允许在嵌套事务中发生的,直接抛出异常。
  • PROPAGATION_NOT_SUPPORTS(5):要求以非事务运行,如果存在事务就挂起,就挂起外围事务,开启一个新的空事务。
  • PROPAGATION_REQUIRES_NEW(3):要求总以新事务运行,如果存在事务就挂起,就挂起外围事务,开启一个新的事务。
  • PROPAGATION_NESTED(6):父子嵌套事务,父子事务的核心是子事务不会影响父事务,因此当前事务保存了一个回滚点,子事务回滚不会影响到父事务也就是外层事务,但父事务回滚会回滚子事务。
  • PROPAGATION_REQUIRED(0)、PROPAGATION_SUPPORTS(1)、PROPAGATION_SUPPORTS(2):这几个传播行为的时候,都是内层事务加入外层事务,并没有新的事务产生,是真正的嵌套事务。
//org.springframework.transaction.support.AbstractPlatformTransactionManager#handleExistingTransaction
//嵌套事务的处理逻辑
private TransactionStatus handleExistingTransaction(TransactionDefinition definition, Object transaction, boolean debugEnabled) throws TransactionException {if (definition.getPropagationBehavior() == 5) {//5:PROPAGATION_NEVER 以非事务运行throw new IllegalTransactionStateException("Existing transaction found for transaction marked with propagation 'never'");} else {AbstractPlatformTransactionManager.SuspendedResourcesHolder suspendedResources;if (definition.getPropagationBehavior() == 4) {//4:PROPAGATION_NOT_SUPPORTS 以非事务方式运行 当前存在事务 就挂起if (debugEnabled) {this.logger.debug("Suspending current transaction");}//挂起事务suspendedResources = this.suspend(transaction);boolean newSynchronization = this.getTransactionSynchronization() == 0;//开启一个新的空事务return this.prepareTransactionStatus(definition, (Object)null, false, newSynchronization, debugEnabled, suspendedResources);} else if (definition.getPropagationBehavior() == 3) {//3:PROPAGATION_REQUIRES_NEW 总是开启一个新的事务运行if (debugEnabled) {this.logger.debug("Suspending current transaction, creating new transaction with name [" + definition.getName() + "]");}//挂起外层事务suspendedResources = this.suspend(transaction);try {//建立一个新的事务return this.startTransaction(definition, transaction, debugEnabled, suspendedResources);} catch (Error | RuntimeException var6) {this.resumeAfterBeginException(transaction, suspendedResources, var6);throw var6;}} else if (definition.getPropagationBehavior() == 6) {//6:PROPAGATION_NESTED 父子嵌套事务if (!this.isNestedTransactionAllowed()) {//不允许嵌套事务 抛出异常 默认是允许嵌套事务的throw new NestedTransactionNotSupportedException("Transaction manager does not allow nested transactions by default - specify 'nestedTransactionAllowed' property with value 'true'");} else {if (debugEnabled) {this.logger.debug("Creating nested transaction with name [" + definition.getName() + "]");}if (this.useSavepointForNestedTransaction()) {//创建嵌套事务DefaultTransactionStatus status = this.prepareTransactionStatus(definition, transaction, false, false, debugEnabled, (Object)null);//创建一个保存点 这个事务在回滚的时候 只会回滚到这个保存点status.createAndHoldSavepoint();return status;} else {return this.startTransaction(definition, transaction, debugEnabled, (AbstractPlatformTransactionManager.SuspendedResourcesHolder)null);}}} else {//来到这里是一下几种情况 才是真正的嵌套事务//0:PROPAGATION_REQUIRED 一定要以事务的方式运行 内外层事务绑定//1:PROPAGATION_SUPPORTS 当前存在事务 则加入事务 不存在事务就以非事务运行 //2:PROPAGATION_MANDATORY 当前存在事务 则加入事务 不存在事务抛出异常if (debugEnabled) {this.logger.debug("Participating in existing transaction");}if (this.isValidateExistingTransaction()) {if (definition.getIsolationLevel() != -1) {//获取外层事务隔离级别Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {//外事务隔离级别为空 或者 当前事务隔离级别与外层事务隔离级别不一致 抛出异常Constants isoConstants = DefaultTransactionDefinition.constants;throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] specifies isolation level which is incompatible with existing transaction: " + (currentIsolationLevel != null ? isoConstants.toCode(currentIsolationLevel, "ISOLATION_") : "(unknown)"));}}if (!definition.isReadOnly() && TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {//当前事务非只读 外层事务是只读  抛出异常throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] is not marked as read-only but existing transaction is");}}//判断是否是新的的事务同步 因为是嵌套事务 加入到外围事务 所以不是新事务boolean newSynchronization = this.getTransactionSynchronization() != 2;//return this.prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, (Object)null);}}
}

AbstractPlatformTransactionManager#suspend 源码分析

AbstractPlatformTransactionManager#suspend 方法就是挂起事务,对当前线程的一些事务信息清空,并把当前外层的事务信息封装成一个 SuspendedResourcesHolder 返回,这样可以保证当前事务运行完毕后,挂起的事务可以继续运行。

//挂起事务
//org.springframework.transaction.support.AbstractPlatformTransactionManager#suspend
@Nullable
protected final AbstractPlatformTransactionManager.SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {if (TransactionSynchronizationManager.isSynchronizationActive()) {//当前线程的事务处于活跃状态//解除绑定在当前线程上的同步事务List suspendedSynchronizations = this.doSuspendSynchronization();try {//需要挂起的数据库连接Object suspendedResources = null;if (transaction != null) {//解绑当前线程的数据库连接suspendedResources = this.doSuspend(transaction);}//获取当前线程上绑定的事务名称String name = TransactionSynchronizationManager.getCurrentTransactionName();//清空当前线程上绑定的事务名称TransactionSynchronizationManager.setCurrentTransactionName((String)null);//获取当前线程绑定的事务只读属性boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();//设置当前线程的事务只读属性为 falseTransactionSynchronizationManager.setCurrentTransactionReadOnly(false);//获取当前线程绑定的事务隔离级别Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();//设置当前线程绑定的事务隔离级别为空TransactionSynchronizationManager.setCurrentTransactionIsolationLevel((Integer)null);//获取当前线程绑定的事务获取状态boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();//设置当前线程的活跃状态为 falseTransactionSynchronizationManager.setActualTransactionActive(false);//封装为 SuspendedResourcesHolderreturn new AbstractPlatformTransactionManager.SuspendedResourcesHolder(suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);} catch (Error | RuntimeException var8) {//异常恢复this.doResumeSynchronization(suspendedSynchronizations);throw var8;}} else if (transaction != null) {//事务没有激活 也需要挂起Object suspendedResources = this.doSuspend(transaction);return new AbstractPlatformTransactionManager.SuspendedResourcesHolder(suspendedResources);} else {//都不满足 返回 nullreturn null;}
}

TransactionAspectSupport#completeTransactionAfterThrowing 源码分析

TransactionAspectSupport#completeTransactionAfterThrowing 方法主要是对出现异常情况后进行判断,到底是需要会滚还是需要提交事务,并不是所有的异常都需要回滚。

//异常回滚
//org.springframework.transaction.interceptor.TransactionAspectSupport#completeTransactionAfterThrowing
protected void completeTransactionAfterThrowing(@Nullable TransactionAspectSupport.TransactionInfo txInfo, Throwable ex) {if (txInfo != null && txInfo.getTransactionStatus() != null) {//事务信息不为空  且事务状态不为空if (this.logger.isTraceEnabled()) {this.logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "] after exception: " + ex);}if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {//事务属性不为空 且回滚异常不为空 try {//会滚事务txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());} catch (TransactionSystemException var6) {this.logger.error("Application exception overridden by rollback exception", ex);var6.initApplicationException(ex);throw var6;} catch (Error | RuntimeException var7) {this.logger.error("Application exception overridden by rollback exception", ex);throw var7;}} else {try {//虽然异常还是提交事务txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());} catch (TransactionSystemException var4) {this.logger.error("Application exception overridden by commit exception", ex);var4.initApplicationException(ex);throw var4;} catch (Error | RuntimeException var5) {this.logger.error("Application exception overridden by commit exception", ex);throw var5;}}}}

TransactionSynchronization 源码分析

TransactionSynchronization 状态这几个值在后续的代码中用到,这里简单解释一下 0、1、2 各种代表什么状态。

public interface TransactionSynchronization extends Flushable {//提交事务状态int STATUS_COMMITTED = 0;//回滚事务状态int STATUS_ROLLED_BACK = 1;//未知事务状态int STATUS_UNKNOWN = 2;..........
}

AbstractPlatformTransactionManager#rollback 源码分析

AbstractPlatformTransactionManager#rollback 方法完成了事务的回滚,回滚时候也会根据不同情况执行不同的回滚策略,例如全局回滚、只会滚内部事务、回滚到指定的保存点。

//事务回滚
//org.springframework.transaction.support.AbstractPlatformTransactionManager#rollbackpublic final void rollback(TransactionStatus status) throws TransactionException {if (status.isCompleted()) {//事务状态已经是完成状态了 就不能进行会滚了 抛出异常throw new IllegalTransactionStateException("Transaction is already completed - do not call commit or rollback more than once per transaction");} else {//默认的事务状态DefaultTransactionStatus defStatus = (DefaultTransactionStatus)status;//真正的会滚方法this.processRollback(defStatus, false);}
}//真正事务回滚方法
//org.springframework.transaction.support.AbstractPlatformTransactionManager#processRollback
private void processRollback(DefaultTransactionStatus status, boolean unexpected) {try {//意外回滚 默认falseboolean unexpectedRollback = unexpected;try {//完成时触发  其实就是解绑事务资源 resoure 也就是从当前线程的 ThreadLocal 中移出this.triggerBeforeCompletion(status);//事务是否有保存点if (status.hasSavepoint()) {if (status.isDebug()) {this.logger.debug("Rolling back transaction to savepoint");}//如果有保存点  就只会滚到指定的保存点  其实就是 PROPAGATION_NESTED status.rollbackToHeldSavepoint();} else if (status.isNewTransaction()) {//是新的事务 if (status.isDebug()) {this.logger.debug("Initiating transaction rollback");}//直接会滚 也就是直接获取当前线程上的数据库连接  进行会滚this.doRollback(status);} else {//进入这里表名既没有保存点  也不是新的事务  就只能是嵌套事务 也就是一下几种情况//0:PROPAGATION_REQUIRED 一定要以事务的方式运行 内外层事务绑定//1:PROPAGATION_SUPPORTS 当前存在事务 则加入事务 不存在事务就以非事务运行 //2:PROPAGATION_MANDATORY 当前存在事务 则加入事务 不存在事务抛出异常if (status.hasTransaction()) {//有事务if (!status.isLocalRollbackOnly() && !this.isGlobalRollbackOnParticipationFailure()) {//status.isLocalRollbackOnly():仅本地回滚  默认是false//isGlobalRollbackOnParticipationFailure():全局回滚  默认是true//进入这里表示修改了全局回滚属性 也就是内部事务回滚不影响外部事务  所以没有任何操作if (status.isDebug()) {this.logger.debug("Participating transaction failed - letting transaction originator decide on rollback");}} else {if (status.isDebug()) {this.logger.debug("Participating transaction failed - marking existing transaction as rollback-only");}//这里就是表示内部事务回滚 外部事务也会会回滚this.doSetRollbackOnly(status);}} else {this.logger.debug("Should roll back transaction but cannot - no transaction available");}//默认值是 falseif (!this.isFailEarlyOnGlobalRollbackOnly()) {//unexpectedRollback 赋值为 false  不过一开始也赋值为 false 了unexpectedRollback = false;}}} catch (Error | RuntimeException var8) {//异常时触发操作this.triggerAfterCompletion(status, 2);throw var8;}//完成后触发操作this.triggerAfterCompletion(status, 1);if (unexpectedRollback) {throw new UnexpectedRollbackException("Transaction rolled back because it has been marked as rollback-only");}} finally {//事务完成后 完成一些清理工作this.cleanupAfterCompletion(status);}}

AbstractPlatformTransactionManager#triggerBeforeCompletion 源码分析

AbstractPlatformTransactionManager#triggerBeforeCompletion 事务完成前操作,主要是操作 SqlSession,把相关信息从当前线程解绑,关闭 SqlSession 等,移除当前线程的 ThreadLoacl 中的 SqlSession。

//org.springframework.transaction.support.AbstractPlatformTransactionManager#triggerBeforeCompletion
//完成前触发
protected final void triggerBeforeCompletion(DefaultTransactionStatus status) {if (status.isNewSynchronization()) {//如果存在事务同步if (status.isDebug()) {this.logger.trace("Triggering beforeCompletion synchronization");}//调用 TransactionSynchronizationUtils 的方法TransactionSynchronizationUtils.triggerBeforeCompletion();}}//org.springframework.transaction.support.TransactionSynchronizationUtils#triggerBeforeCompletion
//完成前触发操作
public static void triggerBeforeCompletion() {//迭代遍历 绑定到当前线程中的 TransactionSynchronization Iterator var0 = TransactionSynchronizationManager.getSynchronizations().iterator();while(var0.hasNext()) {TransactionSynchronization synchronization = (TransactionSynchronization)var0.next();try {//调用 beforeCompletion 方法synchronization.beforeCompletion();} catch (Throwable var3) {logger.error("TransactionSynchronization.beforeCompletion threw exception", var3);}}}//org.springframework.transaction.support.AbstractTransactionStatus#rollbackToHeldSavepoint
//回滚到指定的保存点
public void rollbackToHeldSavepoint() throws TransactionException {//获取保存点Object savepoint = this.getSavepoint();if (savepoint == null) {//保存点为空 抛出异常throw new TransactionUsageException("Cannot roll back to savepoint - no savepoint associated with current transaction");} else {//会滚到指定的保存点this.getSavepointManager().rollbackToSavepoint(savepoint);//释放保存点this.getSavepointManager().releaseSavepoint(savepoint);//设置保存点为空this.setSavepoint((Object)null);}
}//org.springframework.jdbc.datasource.JdbcTransactionObjectSupport#rollbackToSavepoint
//会滚到指定保存点 真正的回滚方法
public void rollbackToSavepoint(Object savepoint) throws TransactionException {//获取连接持有者ConnectionHolder conHolder = this.getConnectionHolderForSavepoint();try {//获取连接调用 rollback 方法回滚到保存点conHolder.getConnection().rollback((Savepoint)savepoint);//重置回滚 rollbackOnly 为falseconHolder.resetRollbackOnly();} catch (Throwable var4) {//回滚异常throw new TransactionSystemException("Could not roll back to JDBC savepoint", var4);}
}//org.springframework.jdbc.datasource.JdbcTransactionObjectSupport#releaseSavepoint
//释放保存点
public void releaseSavepoint(Object savepoint) throws TransactionException {//获取连接的持有者ConnectionHolder conHolder = this.getConnectionHolderForSavepoint();try {//获取连接调用 releaseSavepoint 方法 释放保存点conHolder.getConnection().releaseSavepoint((Savepoint)savepoint);} catch (Throwable var4) {logger.debug("Could not explicitly release JDBC savepoint", var4);}}

CciLocalTransactionManager#doSetRollbackOnly 源码分析

CciLocalTransactionManager#doSetRollbackOnly 仅设置回滚,并没有真正的回滚,这里是嵌套事务且事务传播级别为 PROPAGATION_REQUIRED、PROPAGATION_SUPPORTS、PROPAGATION_MANDATORY 才会执行,这里只设置回滚标志 rollbackOnly 为 true,到统一提交的时候再确定是否回滚。

//org.springframework.jca.cci.connection.CciLocalTransactionManager#doSetRollbackOnly
//仅设置回滚 内部事务回滚 外部事务也会回滚
protected void doSetRollbackOnly(DefaultTransactionStatus status) {CciLocalTransactionManager.CciLocalTransactionObject txObject = (CciLocalTransactionManager.CciLocalTransactionObject)status.getTransaction();if (status.isDebug()) {this.logger.debug("Setting CCI local transaction [" + txObject.getConnectionHolder().getConnection() + "] rollback-only");}//只是设置 rollbackOnly 属性为 true 并没有真正的回滚txObject.getConnectionHolder().setRollbackOnly();
}public void setRollbackOnly() {this.rollbackOnly = true;
}

AbstractPlatformTransactionManager#triggerAfterCompletion 源码分析

AbstractPlatformTransactionManager#triggerAfterCompletion 方法的作用主要是清空当前线程 ThreadLocal 中的事务同步集合 TransactionSynchronization。

//org.springframework.transaction.support.AbstractPlatformTransactionManager#triggerAfterCompletion
//完成后操作
private void triggerAfterCompletion(DefaultTransactionStatus status, int completionStatus) {if (status.isNewSynchronization()) {//获取目前注册的 TransactionSynchronization 集合List<TransactionSynchronization> synchronizations = TransactionSynchronizationManager.getSynchronizations();//获取完毕后 清空TransactionSynchronizationManager.clearSynchronization();if (status.hasTransaction() && !status.isNewTransaction()) {//有事务 且不是新事物if (!synchronizations.isEmpty()) {//synchronizations 不为空 注册一个 afterCompletion 回调this.registerAfterCompletionWithExistingTransaction(status.getTransaction(), synchronizations);}} else {if (status.isDebug()) {this.logger.trace("Triggering afterCompletion synchronization");}//没有事务 或者是新事物  执行  afterCompletion 回调this.invokeAfterCompletion(synchronizations, completionStatus);}}}//org.springframework.transaction.support.AbstractPlatformTransactionManager#invokeAfterCompletion
//回调 afterCompletion 方法
protected final void invokeAfterCompletion(List<TransactionSynchronization> synchronizations, int completionStatus) {TransactionSynchronizationUtils.invokeAfterCompletion(synchronizations, completionStatus);
}//org.springframework.transaction.support.TransactionSynchronizationUtils#invokeAfterCompletion
//回调 afterCompletion 
public static void invokeAfterCompletion(@Nullable List<TransactionSynchronization> synchronizations, int completionStatus) {if (synchronizations != null) {//事务同步不为空 迭代遍历调用Iterator var2 = synchronizations.iterator();while(var2.hasNext()) {TransactionSynchronization synchronization = (TransactionSynchronization)var2.next();try {//调用 afterCompletion 方法synchronization.afterCompletion(completionStatus);} catch (Throwable var5) {logger.error("TransactionSynchronization.afterCompletion threw exception", var5);}}}}

AbstractPlatformTransactionManager#cleanupAfterCompletion 源码分析

AbstractPlatformTransactionManager#cleanupAfterCompletion 方法主要是对回滚完成后做一些善后工作,设置当前事务状态已经完成,如果是新事物,将会清除绑定在当前线程的事务信息,并判断是否有挂起的事务,如果有会恢复挂起的事务,这里主要是嵌套事务会有这种场景。

//org.springframework.transaction.support.AbstractPlatformTransactionManager#cleanupAfterCompletion
//事务完成后清理工作
private void cleanupAfterCompletion(DefaultTransactionStatus status) {//事务状态设置为已完成status.setCompleted();//是否是新事务同步if (status.isNewSynchronization()) {//如果是 将当前线程的事务同步管理器清空TransactionSynchronizationManager.clear();}//是否是新事物if (status.isNewTransaction()) {//do 开头的方法 真正执行清理的方法this.doCleanupAfterCompletion(status.getTransaction());}//是否有挂起的事务if (status.getSuspendedResources() != null) {//有if (status.isDebug()) {this.logger.debug("Resuming suspended transaction after completion of inner transaction");}Object transaction = status.hasTransaction() ? status.getTransaction() : null;//恢复挂起的事务this.resume(transaction, (AbstractPlatformTransactionManager.SuspendedResourcesHolder)status.getSuspendedResources());}}//org.springframework.transaction.support.TransactionSynchronizationManager#clear
//就是把当前线程绑定的事务各种信息清除  其实就是各个 ThreadLocal 的remove
public static void clear() {//清除事务同步synchronizations.remove();//清除事务名称currentTransactionName.remove();//清除事务只读状态currentTransactionReadOnly.remove();//清除事务隔离级别currentTransactionIsolationLevel.remove();//清除事务的活跃状态actualTransactionActive.remove();
}//org.springframework.transaction.support.AbstractPlatformTransactionManager#resume
//恢复挂起的事务 将事务信息绑定到当前线程
protected final void resume(@Nullable Object transaction, @Nullable AbstractPlatformTransactionManager.SuspendedResourcesHolder resourcesHolder) throws TransactionException {//资源持有者是否为空if (resourcesHolder != null) {//获取挂起的事务资源Object suspendedResources = resourcesHolder.suspendedResources;if (suspendedResources != null) {//挂起的事务资源不为空 恢复挂起的事务this.doResume(transaction, suspendedResources);}//获取被挂起的事务同步List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;if (suspendedSynchronizations != null) {//设置事务的活跃状态TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);//设置事务的隔离级别TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);//设置事务的只读属性TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);//设置实物名称TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);this.doResumeSynchronization(suspendedSynchronizations);}}}//org.springframework.transaction.support.AbstractPlatformTransactionManager#doResumeSynchronization
//恢复事务同步
private void doResumeSynchronization(List<TransactionSynchronization> suspendedSynchronizations) {//事务同步管理器初始化  空的 LinkedHashSetTransactionSynchronizationManager.initSynchronization();//迭代遍历挂起的事务同步Iterator var2 = suspendedSynchronizations.iterator();while(var2.hasNext()) {TransactionSynchronization synchronization = (TransactionSynchronization)var2.next();//解绑当前线程 ThreadLoacl 的 resources 的 SqlSessionsynchronization.resume();//注册事务同步TransactionSynchronizationManager.registerSynchronization(synchronization);}}

TransactionAspectSupport#cleanupTransactionInfo 源码分析

TransactionAspectSupport#cleanupTransactionInfo 方法逻辑很简单,就是清除当前绑定的事务信息,再设置回老的事务信息。//org.springframework.transaction.interceptor.TransactionAspectSupport#cleanupTransactionInfo
//清理事物信息
protected void cleanupTransactionInfo(@Nullable TransactionAspectSupport.TransactionInfo txInfo) {if (txInfo != null) {txInfo.restoreThreadLocalStatus();}}//org.springframework.transaction.interceptor.TransactionAspectSupport.TransactionInfo#restoreThreadLocalStatus
//恢复线程本地状态
private void restoreThreadLocalStatus() {//将当前线程的 transactionInfoHolder 设置为老的 oldTransactionInfoTransactionAspectSupport.transactionInfoHolder.set(this.oldTransactionInfo);
}

TransactionAspectSupport#commitTransactionAfterReturning 源码分析

TransactionAspectSupport#commitTransactionAfterReturning 方法主要作用就是提交事务,同样真正提交事务之前对各种状态做了校验,同时在事务提交过程中同样会由事务回滚操作,比如发生各种异常的时候,提交完成后也会对进行 clean 操作,最终的提交是调用 Connection.commit()方法。

//org.springframework.transaction.interceptor.TransactionAspectSupport#commitTransactionAfterReturning
//业务方法执行成功后 提交事务
protected void commitTransactionAfterReturning(@Nullable TransactionAspectSupport.TransactionInfo txInfo) {if (txInfo != null && txInfo.getTransactionStatus() != null) {//事务信息不为空 且事务状态不为空if (this.logger.isTraceEnabled()) {this.logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");}//通过事务管理器提交事务txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());}}//org.springframework.transaction.support.AbstractPlatformTransactionManager#commit
public final void commit(TransactionStatus status) throws TransactionException {//事务是否已完成if (status.isCompleted()) {//事务已完成 直接抛异常throw new IllegalTransactionStateException("Transaction is already completed - do not call commit or rollback more than once per transaction");} else {//获取事务状态DefaultTransactionStatus defStatus = (DefaultTransactionStatus)status;if (defStatus.isLocalRollbackOnly()) {//事务设置了要回滚if (defStatus.isDebug()) {this.logger.debug("Transactional code has requested rollback");}//执行回滚 回滚时候分析了该方法this.processRollback(defStatus, false);} else if (!this.shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {//shouldCommitOnGlobalRollbackOnly():是否已全局方式对标记为回滚的事务调用 commit 源码默认返回 false//isGlobalRollbackOnly:当前事务被设置为回滚if (defStatus.isDebug()) {this.logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");}//执行回滚 回滚时候分析了该方法this.processRollback(defStatus, true);} else {//都不满足 执行正在的提交this.processCommit(defStatus);}}
}//org.springframework.transaction.support.AbstractPlatformTransactionManager#processCommit
//提交事务
private void processCommit(DefaultTransactionStatus status) throws TransactionException {try {//调用完成标志boolean beforeCompletionInvoked = false;try {//意外回滚标志boolean unexpectedRollback = false;//提交前准备 源码空实现 可以自己扩展this.prepareForCommit(status);//提交前触发this.triggerBeforeCommit(status);//完成前触发  之前分析过 就是解绑当前线程的资源this.triggerBeforeCompletion(status);//调用完成标志 赋值为 truebeforeCompletionInvoked = true;//是否有保存点if (status.hasSavepoint()) {if (status.isDebug()) {this.logger.debug("Releasing transaction savepoint");}//获取保存点的回滚方式 是否全局回滚unexpectedRollback = status.isGlobalRollbackOnly();//释放保存点status.releaseHeldSavepoint();} else if (status.isNewTransaction()) {//没有保存点if (status.isDebug()) {this.logger.debug("Initiating transaction commit");}//获取是否全局回滚方式unexpectedRollback = status.isGlobalRollbackOnly();//提交事务 调用 Connection.commit() 方法提交事务this.doCommit(status);} else if (this.isFailEarlyOnGlobalRollbackOnly()) {//仅在早期失败时回滚unexpectedRollback = status.isGlobalRollbackOnly();}if (unexpectedRollback) {//为 ture  抛出异常  会被 catchthrow new UnexpectedRollbackException("Transaction silently rolled back because it has been marked as rollback-only");}} catch (UnexpectedRollbackException var17) {//unexpectedRollback 为 true 时候会进来这里//完成后触发 状态1 表示提交this.triggerAfterCompletion(status, 1);throw var17;} catch (TransactionException var18) {//doCommit 异常时候进入这里if (this.isRollbackOnCommitFailure()) {//提交异常时回滚this.doRollbackOnCommitException(status, var18);} else {//完成后触发 状态2  表示位置this.triggerAfterCompletion(status, 2);}throw var18;} catch (Error | RuntimeException var19) {//其他异常进入这里if (!beforeCompletionInvoked) {this.triggerBeforeCompletion(status);}//提交异常时候回滚this.doRollbackOnCommitException(status, var19);throw var19;}try {//提交后触发this.triggerAfterCommit(status);} finally {//提交完成后触发 0 表示提交this.triggerAfterCompletion(status, 0);}} finally {//最终的清理工作 清理事务的信息 上面分析过this.cleanupAfterCompletion(status);}}

总结:阅读了 Sping 事务源码之后,我们熟悉了事务的执行流程,也可以理解 Sping 事务的传播行为的作用,通过源码我们可以知道 PROPAGATION_NESTED 隔离级别并不是真正的新开了一个事务,只是自己新建了保存点,并没有新建事务,因为自己新建了事务保存点,所以自己提交回滚都不会影响外层事务,同样因为使用的是同一个事务,外层事务回滚内层事务也会回滚,这些知识不通过阅读源码是很难知道的,希望可以帮助到有需要的小伙伴。

欢迎提出建议及对错误的地方指出纠正。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/14222.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

STM32定时器四大功能之定时器编码接口

1什么是编码器接口&#xff1f; 编码器接口接受编码器的正交信号&#xff0c;根据编码器产生的正交信号脉冲控制CNT的自增和自减&#xff0c;从而指示编码器的旋转方向和旋转速度。 每个高级定时器和通用定时器都有一个编码器接口&#xff0c;同时正交编码器产生的正交信号分…

Redis 的持久化(真的好细)

前言 Redis 是一个内存数据库&#xff0c;把数据存储在内存中&#xff0c;而内存中的数据是不持久的&#xff0c;要想数据持久就得将数据存储到硬盘中&#xff0c;而 Redis 相比于 Mysql 这样的关系型数据库最大的优势就在于将数据存储在内存中从而效率更高&#xff0c;速度更快…

docker 安装RabbitMQ-web版本

直接拉去web版本 docker pull rabbitmq:management启动命令 设置用户名 admin 密码123456 docker run -dit --name rabbitmq -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USERadmin -e RABBITMQ_DEFAULT_PASS123456 rabbitmq:management访问地址 http://127.0.0.1:…

GeoScene产品学习视频收集

1、易智瑞运营的极思课堂https://www.geosceneonline.cn/learn/library 2、历年易智瑞技术公开课视频资料 链接&#xff1a;技术公开课-易智瑞信息技术有限公司&#xff0c;GIS/地理信息系统&#xff0c;空间分析-制图-位置智能-地图 3、一些关于GeoScene系列产品和技术操作的视…

二进制部署k8s集群 部署高可用master节点

目录 本次部署的环境 一、master02 节点部署 二、负载均衡部署 安装nginx服务 部署keepalive服务 修改node节点上的配置文件 在master节点上创建pod 三、部署 Dashboard 二进制部署k8s集群部署的步骤总结 &#xff08;1&#xff09;k8s的数据存储中中心的搭建 etcd &…

Apache Log4j Server 反序列化命令执行漏洞(CVE-2017-5645)

漏洞复现环境搭建请参考 http://t.csdnimg.cn/MxmId 漏洞版本 Apache Log4j 2.8.2之前的2.x版本 漏洞验证 &#xff08;1&#xff09;开放端口4712 漏洞利用 &#xff08;1&#xff09;ysoserial工具获取 wget https://github.com/frohoff/ysoserial/releases/download/v0…

Flink DataStream API 基础算子(一)

一、介绍 官网 DataStream API 得名于特殊的 DataStream 类&#xff0c;该类用于表示 Flink 程序中的数据集合。你可以认为 它们是可以包含重复项的不可变数据集合。这些数据可以是有界&#xff08;有限&#xff09;的&#xff0c;也可以是无界&#xff08;无限&#xff09;的…

spring启动后自动退出了

在项目中启动spring框架的application&#xff0c;但是还未等到接口访问它就自己退出了&#xff0c;运行截图如下所示&#xff1a; 解决办法&#xff1a; 将build.gradle文件里的依赖修改一下。我原先的依赖是&#xff1a; org.springframework:spring-web:5.3.10 现修改为 …

2024 电工杯高校数学建模竞赛(B题)| 平衡膳食食谱 |建模秘籍文章代码思路大全

铛铛&#xff01;小秘籍来咯&#xff01; 小秘籍团队独辟蹊径&#xff0c;运用负载均衡&#xff0c;多目标规划等强大工具&#xff0c;构建了这一题的详细解答哦&#xff01; 为大家量身打造创新解决方案。小秘籍团队&#xff0c;始终引领着建模问题求解的风潮。 抓紧小秘籍&am…

肯尼亚大坝决堤反思:强化大坝安全监测的必要性

一、背景介绍 近日&#xff0c;肯尼亚发生了一起严重的大坝决堤事件。当地时间4月29日&#xff0c;肯尼亚内罗毕以北的一座大坝决堤&#xff0c;冲毁房屋和车辆。当地官员称&#xff0c;事故遇难人数已升至71人。这起事件再次提醒我们&#xff0c;大坝安全无小事&#xff0c;监…

正点原子[第二期]Linux之ARM(MX6U)裸机篇学习笔记-23.1,2 讲 I2C驱动

前言&#xff1a; 本文是根据哔哩哔哩网站上“正点原子[第二期]Linux之ARM&#xff08;MX6U&#xff09;裸机篇”视频的学习笔记&#xff0c;在这里会记录下正点原子 I.MX6ULL 开发板的配套视频教程所作的实验和学习笔记内容。本文大量引用了正点原子教学视频和链接中的内容。…

了解区块链基础设施,共同构建安全且强大的Sui网络

区块链基础设施的范畴很广&#xff0c;但其核心是那些直接与网络互动的计算机。这些实体通常被称为节点&#xff0c;分为不同的类型&#xff0c;例如维护完整区块链副本的全节点&#xff0c;以及作为共识决定者的验证节点。除了这两种类型之外&#xff0c;还有其他类型的节点&a…

【oracle的安装记录】

oracle安装记录 一、下载以后&#xff0c;解压到同一路径下面 二、双击可执行安装文件&#xff0c;等待文件加载 三、双击以后&#xff0c;弹出信息 四、提示该窗口&#xff0c;点击【是】即可 五、未填写配置安全更新信息 六、弹出小窗口&#xff0c;选择【是】 七、安装选项…

SQLI-labs-第二十四关

目录 1、登录界面 2、注册界面 3、修改密码界面 知识点&#xff1a;二次注入 思路&#xff1a; 这一关有几个页面可以给我们输入&#xff0c;一个登录界面&#xff0c;一个注册页面&#xff0c;一个修改密码界面 1、登录界面 首先我们登录界面看看 登录后出现一个修改密码…

Ubuntu 搭建SRT协议 环境

1.官网clone源码 GitHub - Haivision/srt: Secure, Reliable, Transport 打不开的话国内gitee 不是最新的 https://gitee.com/smartavs/srt.git 下下来之后 cd 到srt目录 需要安装cmake openssl等依赖 我的环境已经有了 mkdir build && cd build cmake .. -…

最有效的企业数据防泄漏手段 | 数据泄漏防护系统推荐

随意信息安全意识不断提高&#xff0c;企业纷纷寻求高效的数据防泄漏手段。在众多解决方案中&#xff0c;这五款软件各具特色&#xff0c;但它们的共同目标都是确保企业数据的安全性和保密性。 接下来&#xff0c;我们将逐一介绍这五款软件的特点和优势。 1、Ping 32 Ping32…

前端项目使用docker编译发版和gitlab-cicd发版方式

项目目录 app/ ├── container/ │ ├── init.sh │ ├── nginx.conf.template ├── src/ ├── .gitlab-ci.yml └── deploy.sh └── Dockerfile └── Makefilecontainer目录是放nginx的配置文件&#xff0c;给nginx镜像使用 .gitlab-ci.yml和Makefile是c…

阿里云 EMR Serverless Spark 版开启免费公测

阿里云 EMR Serverless Spark 版是一款云原生&#xff0c;专为大规模数据处理和分析而设计的全托管 Serverless 产品。它为企业提供了一站式的数据平台服务&#xff0c;包括任务开发、调试、调度和运维等&#xff0c;极大地简化了数据处理的全生命周期工作流程。使用 EMR Serve…

LayUI使用(一)点击树组件的右边空白区域也可响应事件

前提&#xff1a; 如下&#xff0c;希望能够点击右边的空白区域也能够响应&#xff0c;而不仅仅是点击文本才响应 分析流程 一开始问了chatgpt&#xff0c;但它给的方案太麻烦了&#xff0c;而且还有错误&#xff0c;因此自己上手F12进入调试模式&#xff0c;点击查看最终渲…

文件外发审核是数据防泄漏的重要手段,那该怎么落地?

企业在日常经营中&#xff0c;无可避免地会产生文件外发的需求&#xff0c;文件发送对象包括但不限于合作方、供应商、客户、公关媒体、慈善组织等等&#xff0c;不一而足。而由于外发的对象不同&#xff0c;所涉及的文件类型也多种多样&#xff1a; 商业合作合同&#xff1a;…