前言:
我们知道 Spring 声明式事务是通过 AOP 来实现的,日常项目开发中我们只需要使用 @Transactional 注解就可以实现声明式事务,那你知道通过 @Transactional 注解怎样实现事务的吗?本篇我们将从源码来分析 Spring 声明式事务的执行流程。
Spring 相关知识储备:
深入理解 Spring IOC 底层实现机制(refresh 方法源码分析)
Spring 源码之 BeanDefinition 加载分析
深入理解 Spring Bean 生命周期(附源码分析)
深入理解 Spring 循环依赖之三级缓存(附源码分析)
深入理解 Spring AOP 源码分析(附源码分析)
Spring 事务理解
Spring 声明式事务的原理?
Spring 声明式事务是借助于 Spring AOP 来实现的,在方法开始之前开启事务,在方法执行之后提交或回滚事务,Spring 是通过 TransactionInterceptor 来实现的,TransactionInterceptor 实现了 MethodInterceptor 接口,即通过 AOP 环绕增强的方式实现 Spring 事务。
TransactionInterceptor 类源码分析:
TransactionInterceptor 类主要是通过 invoke 方法调用了父类 TransactionAspectSupport #invokeWithinTransaction 方法来管理事务。
public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable {//无参构造方法public TransactionInterceptor() {}//有参构造方法public TransactionInterceptor(PlatformTransactionManager ptm, Properties attributes) {this.setTransactionManager(ptm);this.setTransactionAttributes(attributes);}//有参构造方法public TransactionInterceptor(PlatformTransactionManager ptm, TransactionAttributeSource tas) {this.setTransactionManager(ptm);this.setTransactionAttributeSource(tas);}//核心方法 通过 AOP 拦截 Spring 声明式事务@Nullablepublic Object invoke(final MethodInvocation invocation) throws Throwable {//获取 targetClass 也就是被代理的类的 ClassClass<?> targetClass = invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null;//调用 TransactionAspectSupport 的 invokeWithinTransaction 方法return this.invokeWithinTransaction(invocation.getMethod(), targetClass, new CoroutinesInvocationCallback() {@Nullablepublic Object proceedWithInvocation() throws Throwable {//执行目标方法return invocation.proceed();}public Object getTarget() {return invocation.getThis();}public Object[] getArguments() {return invocation.getArguments();}});}......
}
TransactionAspectSupport 类
TransactionAspectSupport 类是 Spring 管理事务的基础类,支持声明式事务和编程式事务的管理,TransactionAspectSupport 通过模板方法来制定了事务的流程,通过策略模式来实际管理事务,针对声明式事务和编程式事务有不同的事务管理器,如下:
- PlatformTransactionManager:声明式事务管理器。
- ReactiveTransactionManager:编程式事务管理器。
TransactionAspectSupport 管理事务的流程简图(声明式事务)
TransactionAspectSupport 管理事务的流程非常清晰简单,依次是获取事务源、确定事务管理器、创建事务、执行业务方法、提交或者回滚事务,跟我们了解的事务流程完全一致。
了解了 TransactionAspectSupport 管理事务的流程,我们分析一下源码看看是不是这么回事。
TransactionAspectSupport#invokeWithinTransaction 方法源码分析
TransactionAspectSupport#invokeWithinTransaction 方法逻辑非常清晰,首先就是获取一个事务源,然后获取事务管理器,判断是那种类型的事务,然后就开始创建事务、执行业务方法、执行事务提交或者回滚。
//TransactionAspectSupport#invokeWithinTransaction
@Nullable
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, TransactionAspectSupport.InvocationCallback invocation) throws Throwable {//获取事务属性源TransactionAttributeSource tas = this.getTransactionAttributeSource();//获取事务属性TransactionAttribute txAttr = tas != null ? tas.getTransactionAttribute(method, targetClass) : null;//根据事务属性获取事务管理器TransactionManager tm = this.determineTransactionManager(txAttr);//this.reactiveAdapterRegistry != null:编程式适配器注册表不为空//tm instanceof ReactiveTransactionManager:是否是编程式事务管理器if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {TransactionAspectSupport.ReactiveTransactionSupport txSupport = (TransactionAspectSupport.ReactiveTransactionSupport)this.transactionSupportCache.computeIfAbsent(method, (key) -> {if (KotlinDetector.isKotlinType(method.getDeclaringClass()) && TransactionAspectSupport.KotlinDelegate.isSuspend(method)) {throw new TransactionUsageException("Unsupported annotated transaction on suspending function detected: " + method + ". Use TransactionalOperator.transactional extensions instead.");} else {ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(method.getReturnType());if (adapter == null) {throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type: " + method.getReturnType());} else {return new TransactionAspectSupport.ReactiveTransactionSupport(adapter);}}});return txSupport.invokeWithinTransaction(method, targetClass, invocation, txAttr, (ReactiveTransactionManager)tm);} else {//返回声明式事务管理器PlatformTransactionManager ptm = this.asPlatformTransactionManager(tm);//切点名称 类名+方法名称String joinpointIdentification = this.methodIdentification(method, targetClass, txAttr);Object retVal;if (txAttr != null && ptm instanceof CallbackPreferringPlatformTransactionManager) {//CallbackPreferringPlatformTransactionManager 表示是编程式是事务 我们重点分析 申明式事务TransactionAspectSupport.ThrowableHolder throwableHolder = new TransactionAspectSupport.ThrowableHolder();try {retVal = ((CallbackPreferringPlatformTransactionManager)ptm).execute(txAttr, (statusx) -> {TransactionAspectSupport.TransactionInfo txInfo = this.prepareTransactionInfo(ptm, txAttr, joinpointIdentification, statusx);Object var9;try {Object retVal = invocation.proceedWithInvocation();if (vavrPresent && TransactionAspectSupport.VavrDelegate.isVavrTry(retVal)) {retVal = TransactionAspectSupport.VavrDelegate.evaluateTryFailure(retVal, txAttr, statusx);}var9 = retVal;return var9;} catch (Throwable var13) {if (txAttr.rollbackOn(var13)) {if (var13 instanceof RuntimeException) {throw (RuntimeException)var13;}throw new TransactionAspectSupport.ThrowableHolderException(var13);}throwableHolder.throwable = var13;var9 = null;} finally {this.cleanupTransactionInfo(txInfo);}return var9;});if (throwableHolder.throwable != null) {throw throwableHolder.throwable;} else {return retVal;}} catch (TransactionAspectSupport.ThrowableHolderException var20) {throw var20.getCause();} catch (TransactionSystemException var21) {if (throwableHolder.throwable != null) {this.logger.error("Application exception overridden by commit exception", throwableHolder.throwable);var21.initApplicationException(throwableHolder.throwable);}throw var21;} catch (Throwable var22) {if (throwableHolder.throwable != null) {this.logger.error("Application exception overridden by commit exception", throwableHolder.throwable);}throw var22;}} else {//不是编程式事务 也就是 声明式事务 //创建一个事务 TransactionAspectSupport.TransactionInfo txInfo = this.createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);try {//通过回调执行目标方法 也就是执行业务逻辑retVal = invocation.proceedWithInvocation();} catch (Throwable var18) {//在方法出现异常的情况下完成事务 也就是异常回滚this.completeTransactionAfterThrowing(txInfo, var18);throw var18;} finally {//清除缓存中的事务信息this.cleanupTransactionInfo(txInfo);}if (vavrPresent && TransactionAspectSupport.VavrDelegate.isVavrTry(retVal)) {TransactionStatus status = txInfo.getTransactionStatus();if (status != null && txAttr != null) {retVal = TransactionAspectSupport.VavrDelegate.evaluateTryFailure(retVal, txAttr, status);}}//提交事务this.commitTransactionAfterReturning(txInfo);return retVal;}}
}
AbstractFallbackTransactionAttributeSource#getTransactionAttribute 方法源码分析
我们知道 AbstractFallbackTransactionAttributeSource#getTransactionAttribute 方法的作用就是获取事务属性,整个流程就是先进行简单判断是否是 Object.class,,,,
//AbstractFallbackTransactionAttributeSource#getTransactionAttribute
//获取事务属性
@Nullable
public TransactionAttribute getTransactionAttribute(Method method, @Nullable Class<?> targetClass) {//如果类是 Objeect 直接返回空 没有获取到四五属性if (method.getDeclaringClass() == Object.class) {return null;} else {//从缓存中查找Object cacheKey = this.getCacheKey(method, targetClass);//从 attributeCache 缓存中查找 Spring 初始化的时候 Bean后置处理器 会识别到 @Transactional注解 的类和方法 将事务属性放入缓存attributeCache 中 并创建 AOP 代理对象TransactionAttribute cached = (TransactionAttribute)this.attributeCache.get(cacheKey);if (cached != null) {//找到了 //cached == NULL_TRANSACTION_ATTRIBUTE : 也就是默认的的事务属性 其实就是空return cached == NULL_TRANSACTION_ATTRIBUTE ? null : cached;} else {//缓存中没有//去解析方法 和 代理类TransactionAttribute txAttr = this.computeTransactionAttribute(method, targetClass);if (txAttr == null) {//解析结果还是为 null 表示这个方法不需要事务 存入缓存 下次好用this.attributeCache.put(cacheKey, NULL_TRANSACTION_ATTRIBUTE);} else {//不为空 获取方法的全名 类名+方法名String methodIdentification = ClassUtils.getQualifiedMethodName(method, targetClass);if (txAttr instanceof DefaultTransactionAttribute) {//是 DefaultTransactionAttributeDefaultTransactionAttribute dta = (DefaultTransactionAttribute)txAttr;//设置事务属性描述dta.setDescriptor(methodIdentification);dta.resolveAttributeStrings(this.embeddedValueResolver);}if (this.logger.isTraceEnabled()) {this.logger.trace("Adding transactional method '" + methodIdentification + "' with attribute: " + txAttr);}//加入事务属性缓存this.attributeCache.put(cacheKey, txAttr);}return txAttr;}}
}
AbstractFallbackTransactionAttributeSource#computeTransactionAttribute 源码分析
AbstractFallbackTransactionAttributeSource#computeTransactionAttribute 方法的作用就是解析方法或者类上的 @Transactional 注解,然后返回事务属性,我们接着分析一下解析 @Transactional 注解的源代码,也就是 this.findTransactionAttribute(specificMethod) 这行代码。
//AbstractFallbackTransactionAttributeSource#computeTransactionAttribute
//解析方法和代理类
@Nullable
protected TransactionAttribute computeTransactionAttribute(Method method, @Nullable Class<?> targetClass) {//allowPublicMethodsOnly:是否只允许公共的 public 方法 默认是 true//获取方法修饰符 判断是否是 publicif (this.allowPublicMethodsOnly() && !Modifier.isPublic(method.getModifiers())) {//即使是有 @Transactional 注解方法 如果不是 public 事务不生效return null;} else {//method:接口方法//通过接口方法和代理类 获取到具体的实现类方法Method specificMethod = AopUtils.getMostSpecificMethod(method, targetClass);//在目标方法上找事务属性 真正解析 @Transactional 注解的方法TransactionAttribute txAttr = this.findTransactionAttribute(specificMethod);if (txAttr != null) {//找到直接返回return txAttr;} else {//在目标方法所属类上找事务属性txAttr = this.findTransactionAttribute(specificMethod.getDeclaringClass());if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {//事务属性不为空 且是用户级别的方法 返回事务属性return txAttr;} else {//目标方法所属类上 也没有找到事务属性 一般不会到这里来if (specificMethod != method) {//接口方法和实现类方法不一致//去接口方法上找事务属性txAttr = this.findTransactionAttribute(method);if (txAttr != null) {//找到事务属性返回return txAttr;}//接口方法的类上去找事务属性txAttr = this.findTransactionAttribute(method.getDeclaringClass());if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {//找到返回return txAttr;}}return null;}}}
}
AnnotationTransactionAttributeSource#findTransactionAttribute 源码分析
AnnotationTransactionAttributeSource#findTransactionAttribute 方法就是用事务注解解析器解析注解得到事务属性信息,方法本身没有什么逻辑,只是调用了 SpringTransactionAnnotationParser#parseTransactionAnnotation 方法。
//AnnotationTransactionAttributeSource#findTransactionAttribute(java.lang.reflect.Method)
//查找事务属性
@Nullable
protected TransactionAttribute findTransactionAttribute(Method method) {return this.determineTransactionAttribute(method);
}//AnnotationTransactionAttributeSource#determineTransactionAttribute
//确定事务属性
@Nullable
protected TransactionAttribute determineTransactionAttribute(AnnotatedElement element) {//事务注解解析器 迭代遍历Iterator var2 = this.annotationParsers.iterator();TransactionAttribute attr;do {if (!var2.hasNext()) {return null;}//得到事务注解解析器TransactionAnnotationParser parser = (TransactionAnnotationParser)var2.next();//解析器解析注解元素 得到事务属性attr = parser.parseTransactionAnnotation(element);} while(attr == null);return attr;
}
SpringTransactionAnnotationParser#parseTransactionAnnotation 源码分析
SpringTransactionAnnotationParser#parseTransactionAnnotation 方法真正解析 @Transactional 注解的方法,方法中的相关属性十分熟悉了,比如传播属性、隔离级别、超时时间、是否只读等,然后封装成 TransactionAttribute 返回。
//SpringTransactionAnnotationParser#parseTransactionAnnotation
//解析事务注解属性
protected TransactionAttribute parseTransactionAnnotation(AnnotationAttributes attributes) {//基础规则的事务属性 是 TransactionAttribute 的实现类RuleBasedTransactionAttribute rbta = new RuleBasedTransactionAttribute();//传播属性Propagation propagation = (Propagation)attributes.getEnum("propagation");rbta.setPropagationBehavior(propagation.value());//隔离级别Isolation isolation = (Isolation)attributes.getEnum("isolation");rbta.setIsolationLevel(isolation.value());//超时时间rbta.setTimeout(attributes.getNumber("timeout").intValue());String timeoutString = attributes.getString("timeoutString");Assert.isTrue(!StringUtils.hasText(timeoutString) || rbta.getTimeout() < 0, "Specify 'timeout' or 'timeoutString', not both");rbta.setTimeoutString(timeoutString);//是否只读rbta.setReadOnly(attributes.getBoolean("readOnly"));//限定符 也就是每一个@Transactional注解的value属性rbta.setQualifier(attributes.getString("value"));//设置标签rbta.setLabels(Arrays.asList(attributes.getStringArray("label")));List<RollbackRuleAttribute> rollbackRules = new ArrayList();//回滚的异常Class[] var7 = attributes.getClassArray("rollbackFor");int var8 = var7.length;int var9;Class rbRule;for(var9 = 0; var9 < var8; ++var9) {rbRule = var7[var9];rollbackRules.add(new RollbackRuleAttribute(rbRule));}String[] var11 = attributes.getStringArray("rollbackForClassName");var8 = var11.length;String rbRule;for(var9 = 0; var9 < var8; ++var9) {rbRule = var11[var9];rollbackRules.add(new RollbackRuleAttribute(rbRule));}//不回滚的异常var7 = attributes.getClassArray("noRollbackFor");var8 = var7.length;for(var9 = 0; var9 < var8; ++var9) {rbRule = var7[var9];rollbackRules.add(new NoRollbackRuleAttribute(rbRule));}var11 = attributes.getStringArray("noRollbackForClassName");var8 = var11.length;for(var9 = 0; var9 < var8; ++var9) {rbRule = var11[var9];rollbackRules.add(new NoRollbackRuleAttribute(rbRule));}//设置回滚规则rbta.setRollbackRules(rollbackRules);return rbta;
}
TransactionAspectSupport#determineTransactionManager 源码分析
TransactionAspectSupport#determineTransactionManager 方法就是通过寻找事务管理器,先通过事务属性寻找事务管理器,也就是 @Transactional 注解属性 transactionManager 指定的事务管理器,如果找不到就通过 transactionManagerBeanName 寻找事务管理器,如果再找不到寻找默认的事务管理器并加入缓存中。
//TransactionAspectSupport#determineTransactionManager
@Nullable
protected TransactionManager determineTransactionManager(@Nullable TransactionAttribute txAttr) {if (txAttr != null && this.beanFactory != null) {//事务属性不为空 beanFactory 也不会为空String qualifier = txAttr.getQualifier();//获取限定符 也就是每一个@Transactional注解的value属性或者transactionManager属性if (StringUtils.hasText(qualifier)) {//设置了限定符的 从 beanFactory 中获取事务管理器return this.determineQualifiedTransactionManager(this.beanFactory, qualifier);} else if (StringUtils.hasText(this.transactionManagerBeanName)) {//如果设置了 transactionManagerBeanName 属性 通过 transactionManagerBeanName 查找指定的事务管理器return this.determineQualifiedTransactionManager(this.beanFactory, this.transactionManagerBeanName);} else {//都不是 直接获取默认事务管理器TransactionManager defaultTransactionManager = this.getTransactionManager();if (defaultTransactionManager == null) {//默认事务管理器为空 尝试从缓存中获取defaultTransactionManager = (TransactionManager)this.transactionManagerCache.get(DEFAULT_TRANSACTION_MANAGER_KEY);if (defaultTransactionManager == null) {//缓存中获取的事务管理器还是为空 从容器 beanFactory 中获取类型为 TransactionManager 的事务管理器defaultTransactionManager = (TransactionManager)this.beanFactory.getBean(TransactionManager.class);//加入缓存this.transactionManagerCache.putIfAbsent(DEFAULT_TRANSACTION_MANAGER_KEY, defaultTransactionManager);}}//返回事务管理器return defaultTransactionManager;}} else {//返回事务管理器return this.getTransactionManager();}
}
TransactionAspectSupport#createTransactionIfNecessary 源码分析
TransactionAspectSupport#createTransactionIfNecessary 方法主要作用就是创建一个事务,并将事务信息封装成一个 TransactionInfo 绑定到当前线程中。
//TransactionAspectSupport#createTransactionIfNecessary
//创建事务
protected TransactionAspectSupport.TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm, @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {//如果事务没有自定名称 使用切对面作为事务名称 也就是 类名+方法名if (txAttr != null && ((TransactionAttribute)txAttr).getName() == null) {txAttr = new DelegatingTransactionAttribute((TransactionAttribute)txAttr) {public String getName() {return joinpointIdentification;}};}//事务状态TransactionStatus status = null;if (txAttr != null) {if (tm != null) {//通过事务管理器创建一个事务 并返回事务状态 重点方法status = tm.getTransaction((TransactionDefinition)txAttr);} else if (this.logger.isDebugEnabled()) {this.logger.debug("Skipping transactional joinpoint [" + joinpointIdentification + "] because no transaction manager has been configured");}}//将事务信息封装成 TransactionInfo 并绑定到当前线程中return this.prepareTransactionInfo(tm, (TransactionAttribute)txAttr, joinpointIdentification, status);
}//TransactionAspectSupport#prepareTransactionInfo
//准备事务信息 其实就是将事务信息封装成为 TransactionInfo 并绑定到当前线程中
protected TransactionAspectSupport.TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm, @Nullable TransactionAttribute txAttr, String joinpointIdentification, @Nullable TransactionStatus status) {//将事务信息封装成为 TransactionInfoTransactionAspectSupport.TransactionInfo txInfo = new TransactionAspectSupport.TransactionInfo(tm, txAttr, joinpointIdentification);if (txAttr != null) {if (this.logger.isTraceEnabled()) {this.logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]");}//设置事务状态txInfo.newTransactionStatus(status);} else if (this.logger.isTraceEnabled()) {this.logger.trace("No need to create transaction for [" + joinpointIdentification + "]: This method is not transactional.");}//事务绑定到当前线程txInfo.bindToThread();return txInfo;
}
AbstractPlatformTransactionManager#getTransaction 源码分析
AbstractPlatformTransactionManager#getTransaction 方法会创建一个事务,会先判断是否存在事务、已经事务针对不同的事务传播机制做出不同的处理,核心方法是创建一个新的事务 this.startTransaction(def, transaction, debugEnabled, suspendedResources)。
//org.springframework.transaction.support.AbstractPlatformTransactionManager#getTransaction
//获取一个事务 其实应该说是创建一个事务
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {//获取事务属性TransactionDefinition def = definition != null ? definition : TransactionDefinition.withDefaults();//获取事务对象 例如 DataSourceTransactionManagerObject transaction = this.doGetTransaction();boolean debugEnabled = this.logger.isDebugEnabled();//判断是否已经存在了事务 if (this.isExistingTransaction(transaction)) {//有数据库连接 且是活跃的 表示已经存在了事务 要根据事务传播机制处理return this.handleExistingTransaction(def, transaction, debugEnabled);} else if (def.getTimeout() < -1) {//设置的超时时间小于-1 表示超时时间设置错误 抛出异常throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());} else if (def.getPropagationBehavior() == 2) {//事务传播机制为 2 直接抛出异常 2 对应的是 PROPAGATION_MANDATORY 如果当前存在事务,则加入该事务,如果当前没有事务,则抛出异常throw new IllegalTransactionStateException("No existing transaction found for transaction marked with propagation 'mandatory'");} else if (def.getPropagationBehavior() != 0 && def.getPropagationBehavior() != 3 && def.getPropagationBehavior() != 6) {//def.getPropagationBehavior() != 0:不是 PROPAGATION_REQUIRED 一定要在事务中运行//def.getPropagationBehavior() != 3:不是 PROPAGATION_REQUIRES_NEW 总是创建一个新的事事务运行//def.getPropagationBehavior() != 6:不是 PROPAGATION_NESTED 有没有事务都创建一个新的事务//其实就是非事务运行if (def.getIsolationLevel() != -1 && this.logger.isWarnEnabled()) {this.logger.warn("Custom isolation level specified but no actual transaction initiated; isolation level will effectively be ignored: " + def);}//会创建一个空事物 没有事务的提交以及回滚 但是会把数据库连接绑定到当前线程上boolean newSynchronization = this.getTransactionSynchronization() == 0;return this.prepareTransactionStatus(def, (Object)null, true, newSynchronization, debugEnabled, (Object)null);} else {//非以上情况 就是在事务中运行//如果存在事务 就挂起事务AbstractPlatformTransactionManager.SuspendedResourcesHolder suspendedResources = this.suspend((Object)null);if (debugEnabled) {this.logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);}try {//开启一个新的事务 核心方法return this.startTransaction(def, transaction, debugEnabled, suspendedResources);} catch (Error | RuntimeException var7) {//唤醒挂起的事务this.resume((Object)null, suspendedResources);throw var7;}}
}
AbstractPlatformTransactionManager#startTransaction 源码分析
AbstractPlatformTransactionManager#startTransaction 方法会真正的去开启一个新的事务,并返回事务状态,核心方法是 this.doBegin(transaction, definition),Spring 源码一如既往 do 才是真正干活的方法,doBegin 方法会真正的开启一个新事务。
//org.springframework.transaction.support.AbstractPlatformTransactionManager#startTransaction
//开启一个新的事物
private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction, boolean debugEnabled, @Nullable AbstractPlatformTransactionManager.SuspendedResourcesHolder suspendedResources) {//是否开启新同步 transactionSynchronization = 0 所以默认是需要开启新同步boolean newSynchronization = this.getTransactionSynchronization() != 2;//根据事务属性组装一个事务状态DefaultTransactionStatus status = this.newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);//开启事务 获取数据连接并绑定到当前线程上 重点方法this.doBegin(transaction, definition);//准备同步 也就是激活同步 设置一些状态this.prepareSynchronization(status, definition);//返回事务状态return status;
}//org.springframework.transaction.support.AbstractPlatformTransactionManager#newTransactionStatus
protected DefaultTransactionStatus newTransactionStatus(TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction, boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {//newSynchronization 是否开启新的同步 默认 true//当前线程没有活跃的事务boolean actualNewSynchronization = newSynchronization && !TransactionSynchronizationManager.isSynchronizationActive();//创建一=一个新的 DefaultTransactionStatus 对象 开启新开启的事务 DefaultTransactionStatus 是 TransactionStatus 的默认实现return new DefaultTransactionStatus(transaction, newTransaction, actualNewSynchronization, definition.isReadOnly(), debug, suspendedResources);
}//org.springframework.transaction.support.DefaultTransactionStatus#DefaultTransactionStatus
public DefaultTransactionStatus(@Nullable Object transaction, boolean newTransaction, boolean newSynchronization, boolean readOnly, boolean debug, @Nullable Object suspendedResources) {this.transaction = transaction;this.newTransaction = newTransaction;this.newSynchronization = newSynchronization;this.readOnly = readOnly;this.debug = debug;this.suspendedResources = suspendedResources;
}//org.springframework.transaction.support.AbstractPlatformTransactionManager#prepareSynchronization
//准备同步 也就是激活同步 设置一些状态
protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {//是否是新的同步if (status.isNewSynchronization()) {//设置当前事务是激活状态TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());//设置当前事务隔离级别TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(definition.getIsolationLevel() != -1 ? definition.getIsolationLevel() : null);//设置当前事务是否只读TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());//设置当前事务名称TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());//初始化 初始化了一个 synchronizationsTransactionSynchronizationManager.initSynchronization();}}
DataSourceTransactionManager#doBegin 源码分析
DataSourceTransactionManager#doBegin 方法是真正开启事务的方法,主要是获取事务连接、设置事务隔离级别、设置非自动提交,使用 TransactionSynchronizationManager 将数据库连接绑定到当前线程上,以及发生异常释放数据库连接。
//org.springframework.jdbc.datasource.DataSourceTransactionManager#doBegin
//真正的开启一个事务 Spring 源码一如既往 do 才是真正干活的方法
protected void doBegin(Object transaction, TransactionDefinition definition) {//转为 DataSourceTransactionObject DataSourceTransactionManager.DataSourceTransactionObject txObject = (DataSourceTransactionManager.DataSourceTransactionObject)transaction;//数据库连接Connection con = null;try {//!txObject.hasConnectionHolder():没有数据库连接//txObject.getConnectionHolder().isSynchronizedWithTransaction():txObject 是否激活了事务if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) {//从数据库中获取一个连接Connection newCon = this.obtainDataSource().getConnection();if (this.logger.isDebugEnabled()) {this.logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");}//将新的数据库连接设置到 txObject 中 txObject.setConnectionHolder(new ConnectionHolder(newCon), true);}//获取连接的持有者 并设置与事务同步txObject.getConnectionHolder().setSynchronizedWithTransaction(true);//再次获取数据库连接con = txObject.getConnectionHolder().getConnection();//获取事务的隔离级别Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);//设置事务的隔离级别txObject.setPreviousIsolationLevel(previousIsolationLevel);//设置只读txObject.setReadOnly(definition.isReadOnly());//获取事务的提交方法 一般默认是自动提交if (con.getAutoCommit()) {//设置为手动提交txObject.setMustRestoreAutoCommit(true);if (this.logger.isDebugEnabled()) {this.logger.debug("Switching JDBC Connection [" + con + "] to manual commit");}//自动提交设置为 falsecon.setAutoCommit(false);}//事务只读属性的处理this.prepareTransactionalConnection(con, definition);//设置当前事务是活跃的txObject.getConnectionHolder().setTransactionActive(true);//获取事务超时时间int timeout = this.determineTimeout(definition);if (timeout != -1) {//超时时间合法 就设置给当前事务txObject.getConnectionHolder().setTimeoutInSeconds(timeout);}if (txObject.isNewConnectionHolder()) {//是否是一个新的事务持有者//将事务绑定到事务同步管理器上 这里使用了 ThreadLocalTransactionSynchronizationManager.bindResource(this.obtainDataSource(), txObject.getConnectionHolder());}} catch (Throwable var7) {if (txObject.isNewConnectionHolder()) {//出现异常后 释放连接DataSourceUtils.releaseConnection(con, this.obtainDataSource());//设置当前持有为nulltxObject.setConnectionHolder((ConnectionHolder)null, false);}throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", var7);}
}
DataSourceUtils#prepareConnectionForTransaction 源码分析
DataSourceUtils#prepareConnectionForTransaction 方法的作用就是设置数据库连接的只读属性,修改当前连接的隔离级别。
//org.springframework.jdbc.datasource.DataSourceUtils#prepareConnectionForTransaction
//获取事务隔离级别
@Nullable
public static Integer prepareConnectionForTransaction(Connection con, @Nullable TransactionDefinition definition) throws SQLException {Assert.notNull(con, "No Connection specified");//事务定义不为空 且事务是只读if (definition != null && definition.isReadOnly()) {try {if (logger.isDebugEnabled()) {logger.debug("Setting JDBC Connection [" + con + "] read-only");}//设置当前数据库连接为只读con.setReadOnly(true);} catch (RuntimeException | SQLException var4) {for(Object exToCheck = var4; exToCheck != null; exToCheck = ((Throwable)exToCheck).getCause()) {if (exToCheck.getClass().getSimpleName().contains("Timeout")) {throw var4;}}logger.debug("Could not set JDBC Connection read-only", var4);}}//事务隔离级别Integer previousIsolationLevel = null;if (definition != null && definition.getIsolationLevel() != -1) {if (logger.isDebugEnabled()) {logger.debug("Changing isolation level of JDBC Connection [" + con + "] to " + definition.getIsolationLevel());}//获取连接的隔离级别int currentIsolation = con.getTransactionIsolation();if (currentIsolation != definition.getIsolationLevel()) {//连接的隔离级别不等于手动设置的隔离级别previousIsolationLevel = currentIsolation;//将连接的隔离级别设置为手动设置的隔离级别con.setTransactionIsolation(definition.getIsolationLevel());}}return previousIsolationLevel;
}
DataSourceTransactionManager#prepareTransactionalConnection 源码分析
DataSourceTransactionManager#prepareTransactionalConnection 方法主要对只读事务进行处理,这里主要是对 oracle 数据库,oracle不支持 con.setReadOnly(true) 语法,因此要支持语句。
//DataSourceTransactionManager#prepareTransactionalConnection
//设置事务只读属性
protected void prepareTransactionalConnection(Connection con, TransactionDefinition definition) throws SQLException {if (this.isEnforceReadOnly() && definition.isReadOnly()) {//如果 this.isEnforceReadOnly() 为true 且事务只读属性为 true//创建一个 StatementStatement stmt = con.createStatement();Throwable var4 = null;try {//执行只读语句stmt.executeUpdate("SET TRANSACTION READ ONLY");} catch (Throwable var13) {var4 = var13;throw var13;} finally {if (stmt != null) {if (var4 != null) {try {//关闭连接stmt.close();} catch (Throwable var12) {var4.addSuppressed(var12);}} else {//关闭连接stmt.close();}}}}}
TransactionSynchronizationManager#bindResource 源码分析
将事务绑定到事务管理器中,这里使用了 ThreadLocal,resources 是一个 ThreadLocal。
//TransactionSynchronizationManager#bindResource
//将事务绑定到事务管理器中,这里使用了 ThreadLocal,resources 是一个 ThreadLocal。
public static void bindResource(Object key, Object value) throws IllegalStateException {Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);Assert.notNull(value, "Value must not be null");//获取当前线程的 mapMap<Object, Object> map = (Map)resources.get();if (map == null) {//为空创建一个map = new HashMap();//并设置给当前现场resources.set(map);}//将 actualKey value 设置到 ThreadLocal 中 actualKey 是 DataSource value 是 ConnectionHolderObject oldValue = ((Map)map).put(actualKey, value);if (oldValue instanceof ResourceHolder && ((ResourceHolder)oldValue).isVoid()) {oldValue = null;}if (oldValue != null) {//已经有绑定到当前线程的 key 抛出异常throw new IllegalStateException("Already value [" + oldValue + "] for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");} else {if (logger.isTraceEnabled()) {logger.trace("Bound value [" + value + "] for key [" + actualKey + "] to thread [" + Thread.currentThread().getName() + "]");}}
}
AbstractPlatformTransactionManager#handleExistingTransaction 源码分析
AbstractPlatformTransactionManager#handleExistingTransaction 方法主要针对嵌套事务进行处理,首先还是判断事务的传播行为,验证是否可以进行嵌套事务,然后根据不同的传播行为进行不同的处理,具体如下:
- PROPAGATION_NEVER(5):要求以非事务运行,肯定是不允许在嵌套事务中发生的,直接抛出异常。
- PROPAGATION_NOT_SUPPORTS(5):要求以非事务运行,如果存在事务就挂起,就挂起外围事务,开启一个新的空事务。
- PROPAGATION_REQUIRES_NEW(3):要求总以新事务运行,如果存在事务就挂起,就挂起外围事务,开启一个新的事务。
- PROPAGATION_NESTED(6):父子嵌套事务,父子事务的核心是子事务不会影响父事务,因此当前事务保存了一个回滚点,子事务回滚不会影响到父事务也就是外层事务,但父事务回滚会回滚子事务。
- PROPAGATION_REQUIRED(0)、PROPAGATION_SUPPORTS(1)、PROPAGATION_SUPPORTS(2):这几个传播行为的时候,都是内层事务加入外层事务,并没有新的事务产生,是真正的嵌套事务。
//org.springframework.transaction.support.AbstractPlatformTransactionManager#handleExistingTransaction
//嵌套事务的处理逻辑
private TransactionStatus handleExistingTransaction(TransactionDefinition definition, Object transaction, boolean debugEnabled) throws TransactionException {if (definition.getPropagationBehavior() == 5) {//5:PROPAGATION_NEVER 以非事务运行throw new IllegalTransactionStateException("Existing transaction found for transaction marked with propagation 'never'");} else {AbstractPlatformTransactionManager.SuspendedResourcesHolder suspendedResources;if (definition.getPropagationBehavior() == 4) {//4:PROPAGATION_NOT_SUPPORTS 以非事务方式运行 当前存在事务 就挂起if (debugEnabled) {this.logger.debug("Suspending current transaction");}//挂起事务suspendedResources = this.suspend(transaction);boolean newSynchronization = this.getTransactionSynchronization() == 0;//开启一个新的空事务return this.prepareTransactionStatus(definition, (Object)null, false, newSynchronization, debugEnabled, suspendedResources);} else if (definition.getPropagationBehavior() == 3) {//3:PROPAGATION_REQUIRES_NEW 总是开启一个新的事务运行if (debugEnabled) {this.logger.debug("Suspending current transaction, creating new transaction with name [" + definition.getName() + "]");}//挂起外层事务suspendedResources = this.suspend(transaction);try {//建立一个新的事务return this.startTransaction(definition, transaction, debugEnabled, suspendedResources);} catch (Error | RuntimeException var6) {this.resumeAfterBeginException(transaction, suspendedResources, var6);throw var6;}} else if (definition.getPropagationBehavior() == 6) {//6:PROPAGATION_NESTED 父子嵌套事务if (!this.isNestedTransactionAllowed()) {//不允许嵌套事务 抛出异常 默认是允许嵌套事务的throw new NestedTransactionNotSupportedException("Transaction manager does not allow nested transactions by default - specify 'nestedTransactionAllowed' property with value 'true'");} else {if (debugEnabled) {this.logger.debug("Creating nested transaction with name [" + definition.getName() + "]");}if (this.useSavepointForNestedTransaction()) {//创建嵌套事务DefaultTransactionStatus status = this.prepareTransactionStatus(definition, transaction, false, false, debugEnabled, (Object)null);//创建一个保存点 这个事务在回滚的时候 只会回滚到这个保存点status.createAndHoldSavepoint();return status;} else {return this.startTransaction(definition, transaction, debugEnabled, (AbstractPlatformTransactionManager.SuspendedResourcesHolder)null);}}} else {//来到这里是一下几种情况 才是真正的嵌套事务//0:PROPAGATION_REQUIRED 一定要以事务的方式运行 内外层事务绑定//1:PROPAGATION_SUPPORTS 当前存在事务 则加入事务 不存在事务就以非事务运行 //2:PROPAGATION_MANDATORY 当前存在事务 则加入事务 不存在事务抛出异常if (debugEnabled) {this.logger.debug("Participating in existing transaction");}if (this.isValidateExistingTransaction()) {if (definition.getIsolationLevel() != -1) {//获取外层事务隔离级别Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {//外事务隔离级别为空 或者 当前事务隔离级别与外层事务隔离级别不一致 抛出异常Constants isoConstants = DefaultTransactionDefinition.constants;throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] specifies isolation level which is incompatible with existing transaction: " + (currentIsolationLevel != null ? isoConstants.toCode(currentIsolationLevel, "ISOLATION_") : "(unknown)"));}}if (!definition.isReadOnly() && TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {//当前事务非只读 外层事务是只读 抛出异常throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] is not marked as read-only but existing transaction is");}}//判断是否是新的的事务同步 因为是嵌套事务 加入到外围事务 所以不是新事务boolean newSynchronization = this.getTransactionSynchronization() != 2;//return this.prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, (Object)null);}}
}
AbstractPlatformTransactionManager#suspend 源码分析
AbstractPlatformTransactionManager#suspend 方法就是挂起事务,对当前线程的一些事务信息清空,并把当前外层的事务信息封装成一个 SuspendedResourcesHolder 返回,这样可以保证当前事务运行完毕后,挂起的事务可以继续运行。
//挂起事务
//org.springframework.transaction.support.AbstractPlatformTransactionManager#suspend
@Nullable
protected final AbstractPlatformTransactionManager.SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {if (TransactionSynchronizationManager.isSynchronizationActive()) {//当前线程的事务处于活跃状态//解除绑定在当前线程上的同步事务List suspendedSynchronizations = this.doSuspendSynchronization();try {//需要挂起的数据库连接Object suspendedResources = null;if (transaction != null) {//解绑当前线程的数据库连接suspendedResources = this.doSuspend(transaction);}//获取当前线程上绑定的事务名称String name = TransactionSynchronizationManager.getCurrentTransactionName();//清空当前线程上绑定的事务名称TransactionSynchronizationManager.setCurrentTransactionName((String)null);//获取当前线程绑定的事务只读属性boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();//设置当前线程的事务只读属性为 falseTransactionSynchronizationManager.setCurrentTransactionReadOnly(false);//获取当前线程绑定的事务隔离级别Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();//设置当前线程绑定的事务隔离级别为空TransactionSynchronizationManager.setCurrentTransactionIsolationLevel((Integer)null);//获取当前线程绑定的事务获取状态boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();//设置当前线程的活跃状态为 falseTransactionSynchronizationManager.setActualTransactionActive(false);//封装为 SuspendedResourcesHolderreturn new AbstractPlatformTransactionManager.SuspendedResourcesHolder(suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);} catch (Error | RuntimeException var8) {//异常恢复this.doResumeSynchronization(suspendedSynchronizations);throw var8;}} else if (transaction != null) {//事务没有激活 也需要挂起Object suspendedResources = this.doSuspend(transaction);return new AbstractPlatformTransactionManager.SuspendedResourcesHolder(suspendedResources);} else {//都不满足 返回 nullreturn null;}
}
TransactionAspectSupport#completeTransactionAfterThrowing 源码分析
TransactionAspectSupport#completeTransactionAfterThrowing 方法主要是对出现异常情况后进行判断,到底是需要会滚还是需要提交事务,并不是所有的异常都需要回滚。
//异常回滚
//org.springframework.transaction.interceptor.TransactionAspectSupport#completeTransactionAfterThrowing
protected void completeTransactionAfterThrowing(@Nullable TransactionAspectSupport.TransactionInfo txInfo, Throwable ex) {if (txInfo != null && txInfo.getTransactionStatus() != null) {//事务信息不为空 且事务状态不为空if (this.logger.isTraceEnabled()) {this.logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "] after exception: " + ex);}if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {//事务属性不为空 且回滚异常不为空 try {//会滚事务txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());} catch (TransactionSystemException var6) {this.logger.error("Application exception overridden by rollback exception", ex);var6.initApplicationException(ex);throw var6;} catch (Error | RuntimeException var7) {this.logger.error("Application exception overridden by rollback exception", ex);throw var7;}} else {try {//虽然异常还是提交事务txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());} catch (TransactionSystemException var4) {this.logger.error("Application exception overridden by commit exception", ex);var4.initApplicationException(ex);throw var4;} catch (Error | RuntimeException var5) {this.logger.error("Application exception overridden by commit exception", ex);throw var5;}}}}
TransactionSynchronization 源码分析
TransactionSynchronization 状态这几个值在后续的代码中用到,这里简单解释一下 0、1、2 各种代表什么状态。
public interface TransactionSynchronization extends Flushable {//提交事务状态int STATUS_COMMITTED = 0;//回滚事务状态int STATUS_ROLLED_BACK = 1;//未知事务状态int STATUS_UNKNOWN = 2;..........
}
AbstractPlatformTransactionManager#rollback 源码分析
AbstractPlatformTransactionManager#rollback 方法完成了事务的回滚,回滚时候也会根据不同情况执行不同的回滚策略,例如全局回滚、只会滚内部事务、回滚到指定的保存点。
//事务回滚
//org.springframework.transaction.support.AbstractPlatformTransactionManager#rollbackpublic final void rollback(TransactionStatus status) throws TransactionException {if (status.isCompleted()) {//事务状态已经是完成状态了 就不能进行会滚了 抛出异常throw new IllegalTransactionStateException("Transaction is already completed - do not call commit or rollback more than once per transaction");} else {//默认的事务状态DefaultTransactionStatus defStatus = (DefaultTransactionStatus)status;//真正的会滚方法this.processRollback(defStatus, false);}
}//真正事务回滚方法
//org.springframework.transaction.support.AbstractPlatformTransactionManager#processRollback
private void processRollback(DefaultTransactionStatus status, boolean unexpected) {try {//意外回滚 默认falseboolean unexpectedRollback = unexpected;try {//完成时触发 其实就是解绑事务资源 resoure 也就是从当前线程的 ThreadLocal 中移出this.triggerBeforeCompletion(status);//事务是否有保存点if (status.hasSavepoint()) {if (status.isDebug()) {this.logger.debug("Rolling back transaction to savepoint");}//如果有保存点 就只会滚到指定的保存点 其实就是 PROPAGATION_NESTED status.rollbackToHeldSavepoint();} else if (status.isNewTransaction()) {//是新的事务 if (status.isDebug()) {this.logger.debug("Initiating transaction rollback");}//直接会滚 也就是直接获取当前线程上的数据库连接 进行会滚this.doRollback(status);} else {//进入这里表名既没有保存点 也不是新的事务 就只能是嵌套事务 也就是一下几种情况//0:PROPAGATION_REQUIRED 一定要以事务的方式运行 内外层事务绑定//1:PROPAGATION_SUPPORTS 当前存在事务 则加入事务 不存在事务就以非事务运行 //2:PROPAGATION_MANDATORY 当前存在事务 则加入事务 不存在事务抛出异常if (status.hasTransaction()) {//有事务if (!status.isLocalRollbackOnly() && !this.isGlobalRollbackOnParticipationFailure()) {//status.isLocalRollbackOnly():仅本地回滚 默认是false//isGlobalRollbackOnParticipationFailure():全局回滚 默认是true//进入这里表示修改了全局回滚属性 也就是内部事务回滚不影响外部事务 所以没有任何操作if (status.isDebug()) {this.logger.debug("Participating transaction failed - letting transaction originator decide on rollback");}} else {if (status.isDebug()) {this.logger.debug("Participating transaction failed - marking existing transaction as rollback-only");}//这里就是表示内部事务回滚 外部事务也会会回滚this.doSetRollbackOnly(status);}} else {this.logger.debug("Should roll back transaction but cannot - no transaction available");}//默认值是 falseif (!this.isFailEarlyOnGlobalRollbackOnly()) {//unexpectedRollback 赋值为 false 不过一开始也赋值为 false 了unexpectedRollback = false;}}} catch (Error | RuntimeException var8) {//异常时触发操作this.triggerAfterCompletion(status, 2);throw var8;}//完成后触发操作this.triggerAfterCompletion(status, 1);if (unexpectedRollback) {throw new UnexpectedRollbackException("Transaction rolled back because it has been marked as rollback-only");}} finally {//事务完成后 完成一些清理工作this.cleanupAfterCompletion(status);}}
AbstractPlatformTransactionManager#triggerBeforeCompletion 源码分析
AbstractPlatformTransactionManager#triggerBeforeCompletion 事务完成前操作,主要是操作 SqlSession,把相关信息从当前线程解绑,关闭 SqlSession 等,移除当前线程的 ThreadLoacl 中的 SqlSession。
//org.springframework.transaction.support.AbstractPlatformTransactionManager#triggerBeforeCompletion
//完成前触发
protected final void triggerBeforeCompletion(DefaultTransactionStatus status) {if (status.isNewSynchronization()) {//如果存在事务同步if (status.isDebug()) {this.logger.trace("Triggering beforeCompletion synchronization");}//调用 TransactionSynchronizationUtils 的方法TransactionSynchronizationUtils.triggerBeforeCompletion();}}//org.springframework.transaction.support.TransactionSynchronizationUtils#triggerBeforeCompletion
//完成前触发操作
public static void triggerBeforeCompletion() {//迭代遍历 绑定到当前线程中的 TransactionSynchronization Iterator var0 = TransactionSynchronizationManager.getSynchronizations().iterator();while(var0.hasNext()) {TransactionSynchronization synchronization = (TransactionSynchronization)var0.next();try {//调用 beforeCompletion 方法synchronization.beforeCompletion();} catch (Throwable var3) {logger.error("TransactionSynchronization.beforeCompletion threw exception", var3);}}}//org.springframework.transaction.support.AbstractTransactionStatus#rollbackToHeldSavepoint
//回滚到指定的保存点
public void rollbackToHeldSavepoint() throws TransactionException {//获取保存点Object savepoint = this.getSavepoint();if (savepoint == null) {//保存点为空 抛出异常throw new TransactionUsageException("Cannot roll back to savepoint - no savepoint associated with current transaction");} else {//会滚到指定的保存点this.getSavepointManager().rollbackToSavepoint(savepoint);//释放保存点this.getSavepointManager().releaseSavepoint(savepoint);//设置保存点为空this.setSavepoint((Object)null);}
}//org.springframework.jdbc.datasource.JdbcTransactionObjectSupport#rollbackToSavepoint
//会滚到指定保存点 真正的回滚方法
public void rollbackToSavepoint(Object savepoint) throws TransactionException {//获取连接持有者ConnectionHolder conHolder = this.getConnectionHolderForSavepoint();try {//获取连接调用 rollback 方法回滚到保存点conHolder.getConnection().rollback((Savepoint)savepoint);//重置回滚 rollbackOnly 为falseconHolder.resetRollbackOnly();} catch (Throwable var4) {//回滚异常throw new TransactionSystemException("Could not roll back to JDBC savepoint", var4);}
}//org.springframework.jdbc.datasource.JdbcTransactionObjectSupport#releaseSavepoint
//释放保存点
public void releaseSavepoint(Object savepoint) throws TransactionException {//获取连接的持有者ConnectionHolder conHolder = this.getConnectionHolderForSavepoint();try {//获取连接调用 releaseSavepoint 方法 释放保存点conHolder.getConnection().releaseSavepoint((Savepoint)savepoint);} catch (Throwable var4) {logger.debug("Could not explicitly release JDBC savepoint", var4);}}
CciLocalTransactionManager#doSetRollbackOnly 源码分析
CciLocalTransactionManager#doSetRollbackOnly 仅设置回滚,并没有真正的回滚,这里是嵌套事务且事务传播级别为 PROPAGATION_REQUIRED、PROPAGATION_SUPPORTS、PROPAGATION_MANDATORY 才会执行,这里只设置回滚标志 rollbackOnly 为 true,到统一提交的时候再确定是否回滚。
//org.springframework.jca.cci.connection.CciLocalTransactionManager#doSetRollbackOnly
//仅设置回滚 内部事务回滚 外部事务也会回滚
protected void doSetRollbackOnly(DefaultTransactionStatus status) {CciLocalTransactionManager.CciLocalTransactionObject txObject = (CciLocalTransactionManager.CciLocalTransactionObject)status.getTransaction();if (status.isDebug()) {this.logger.debug("Setting CCI local transaction [" + txObject.getConnectionHolder().getConnection() + "] rollback-only");}//只是设置 rollbackOnly 属性为 true 并没有真正的回滚txObject.getConnectionHolder().setRollbackOnly();
}public void setRollbackOnly() {this.rollbackOnly = true;
}
AbstractPlatformTransactionManager#triggerAfterCompletion 源码分析
AbstractPlatformTransactionManager#triggerAfterCompletion 方法的作用主要是清空当前线程 ThreadLocal 中的事务同步集合 TransactionSynchronization。
//org.springframework.transaction.support.AbstractPlatformTransactionManager#triggerAfterCompletion
//完成后操作
private void triggerAfterCompletion(DefaultTransactionStatus status, int completionStatus) {if (status.isNewSynchronization()) {//获取目前注册的 TransactionSynchronization 集合List<TransactionSynchronization> synchronizations = TransactionSynchronizationManager.getSynchronizations();//获取完毕后 清空TransactionSynchronizationManager.clearSynchronization();if (status.hasTransaction() && !status.isNewTransaction()) {//有事务 且不是新事物if (!synchronizations.isEmpty()) {//synchronizations 不为空 注册一个 afterCompletion 回调this.registerAfterCompletionWithExistingTransaction(status.getTransaction(), synchronizations);}} else {if (status.isDebug()) {this.logger.trace("Triggering afterCompletion synchronization");}//没有事务 或者是新事物 执行 afterCompletion 回调this.invokeAfterCompletion(synchronizations, completionStatus);}}}//org.springframework.transaction.support.AbstractPlatformTransactionManager#invokeAfterCompletion
//回调 afterCompletion 方法
protected final void invokeAfterCompletion(List<TransactionSynchronization> synchronizations, int completionStatus) {TransactionSynchronizationUtils.invokeAfterCompletion(synchronizations, completionStatus);
}//org.springframework.transaction.support.TransactionSynchronizationUtils#invokeAfterCompletion
//回调 afterCompletion
public static void invokeAfterCompletion(@Nullable List<TransactionSynchronization> synchronizations, int completionStatus) {if (synchronizations != null) {//事务同步不为空 迭代遍历调用Iterator var2 = synchronizations.iterator();while(var2.hasNext()) {TransactionSynchronization synchronization = (TransactionSynchronization)var2.next();try {//调用 afterCompletion 方法synchronization.afterCompletion(completionStatus);} catch (Throwable var5) {logger.error("TransactionSynchronization.afterCompletion threw exception", var5);}}}}
AbstractPlatformTransactionManager#cleanupAfterCompletion 源码分析
AbstractPlatformTransactionManager#cleanupAfterCompletion 方法主要是对回滚完成后做一些善后工作,设置当前事务状态已经完成,如果是新事物,将会清除绑定在当前线程的事务信息,并判断是否有挂起的事务,如果有会恢复挂起的事务,这里主要是嵌套事务会有这种场景。
//org.springframework.transaction.support.AbstractPlatformTransactionManager#cleanupAfterCompletion
//事务完成后清理工作
private void cleanupAfterCompletion(DefaultTransactionStatus status) {//事务状态设置为已完成status.setCompleted();//是否是新事务同步if (status.isNewSynchronization()) {//如果是 将当前线程的事务同步管理器清空TransactionSynchronizationManager.clear();}//是否是新事物if (status.isNewTransaction()) {//do 开头的方法 真正执行清理的方法this.doCleanupAfterCompletion(status.getTransaction());}//是否有挂起的事务if (status.getSuspendedResources() != null) {//有if (status.isDebug()) {this.logger.debug("Resuming suspended transaction after completion of inner transaction");}Object transaction = status.hasTransaction() ? status.getTransaction() : null;//恢复挂起的事务this.resume(transaction, (AbstractPlatformTransactionManager.SuspendedResourcesHolder)status.getSuspendedResources());}}//org.springframework.transaction.support.TransactionSynchronizationManager#clear
//就是把当前线程绑定的事务各种信息清除 其实就是各个 ThreadLocal 的remove
public static void clear() {//清除事务同步synchronizations.remove();//清除事务名称currentTransactionName.remove();//清除事务只读状态currentTransactionReadOnly.remove();//清除事务隔离级别currentTransactionIsolationLevel.remove();//清除事务的活跃状态actualTransactionActive.remove();
}//org.springframework.transaction.support.AbstractPlatformTransactionManager#resume
//恢复挂起的事务 将事务信息绑定到当前线程
protected final void resume(@Nullable Object transaction, @Nullable AbstractPlatformTransactionManager.SuspendedResourcesHolder resourcesHolder) throws TransactionException {//资源持有者是否为空if (resourcesHolder != null) {//获取挂起的事务资源Object suspendedResources = resourcesHolder.suspendedResources;if (suspendedResources != null) {//挂起的事务资源不为空 恢复挂起的事务this.doResume(transaction, suspendedResources);}//获取被挂起的事务同步List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;if (suspendedSynchronizations != null) {//设置事务的活跃状态TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);//设置事务的隔离级别TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);//设置事务的只读属性TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);//设置实物名称TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);this.doResumeSynchronization(suspendedSynchronizations);}}}//org.springframework.transaction.support.AbstractPlatformTransactionManager#doResumeSynchronization
//恢复事务同步
private void doResumeSynchronization(List<TransactionSynchronization> suspendedSynchronizations) {//事务同步管理器初始化 空的 LinkedHashSetTransactionSynchronizationManager.initSynchronization();//迭代遍历挂起的事务同步Iterator var2 = suspendedSynchronizations.iterator();while(var2.hasNext()) {TransactionSynchronization synchronization = (TransactionSynchronization)var2.next();//解绑当前线程 ThreadLoacl 的 resources 的 SqlSessionsynchronization.resume();//注册事务同步TransactionSynchronizationManager.registerSynchronization(synchronization);}}
TransactionAspectSupport#cleanupTransactionInfo 源码分析
TransactionAspectSupport#cleanupTransactionInfo 方法逻辑很简单,就是清除当前绑定的事务信息,再设置回老的事务信息。//org.springframework.transaction.interceptor.TransactionAspectSupport#cleanupTransactionInfo
//清理事物信息
protected void cleanupTransactionInfo(@Nullable TransactionAspectSupport.TransactionInfo txInfo) {if (txInfo != null) {txInfo.restoreThreadLocalStatus();}}//org.springframework.transaction.interceptor.TransactionAspectSupport.TransactionInfo#restoreThreadLocalStatus
//恢复线程本地状态
private void restoreThreadLocalStatus() {//将当前线程的 transactionInfoHolder 设置为老的 oldTransactionInfoTransactionAspectSupport.transactionInfoHolder.set(this.oldTransactionInfo);
}
TransactionAspectSupport#commitTransactionAfterReturning 源码分析
TransactionAspectSupport#commitTransactionAfterReturning 方法主要作用就是提交事务,同样真正提交事务之前对各种状态做了校验,同时在事务提交过程中同样会由事务回滚操作,比如发生各种异常的时候,提交完成后也会对进行 clean 操作,最终的提交是调用 Connection.commit()方法。
//org.springframework.transaction.interceptor.TransactionAspectSupport#commitTransactionAfterReturning
//业务方法执行成功后 提交事务
protected void commitTransactionAfterReturning(@Nullable TransactionAspectSupport.TransactionInfo txInfo) {if (txInfo != null && txInfo.getTransactionStatus() != null) {//事务信息不为空 且事务状态不为空if (this.logger.isTraceEnabled()) {this.logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");}//通过事务管理器提交事务txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());}}//org.springframework.transaction.support.AbstractPlatformTransactionManager#commit
public final void commit(TransactionStatus status) throws TransactionException {//事务是否已完成if (status.isCompleted()) {//事务已完成 直接抛异常throw new IllegalTransactionStateException("Transaction is already completed - do not call commit or rollback more than once per transaction");} else {//获取事务状态DefaultTransactionStatus defStatus = (DefaultTransactionStatus)status;if (defStatus.isLocalRollbackOnly()) {//事务设置了要回滚if (defStatus.isDebug()) {this.logger.debug("Transactional code has requested rollback");}//执行回滚 回滚时候分析了该方法this.processRollback(defStatus, false);} else if (!this.shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {//shouldCommitOnGlobalRollbackOnly():是否已全局方式对标记为回滚的事务调用 commit 源码默认返回 false//isGlobalRollbackOnly:当前事务被设置为回滚if (defStatus.isDebug()) {this.logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");}//执行回滚 回滚时候分析了该方法this.processRollback(defStatus, true);} else {//都不满足 执行正在的提交this.processCommit(defStatus);}}
}//org.springframework.transaction.support.AbstractPlatformTransactionManager#processCommit
//提交事务
private void processCommit(DefaultTransactionStatus status) throws TransactionException {try {//调用完成标志boolean beforeCompletionInvoked = false;try {//意外回滚标志boolean unexpectedRollback = false;//提交前准备 源码空实现 可以自己扩展this.prepareForCommit(status);//提交前触发this.triggerBeforeCommit(status);//完成前触发 之前分析过 就是解绑当前线程的资源this.triggerBeforeCompletion(status);//调用完成标志 赋值为 truebeforeCompletionInvoked = true;//是否有保存点if (status.hasSavepoint()) {if (status.isDebug()) {this.logger.debug("Releasing transaction savepoint");}//获取保存点的回滚方式 是否全局回滚unexpectedRollback = status.isGlobalRollbackOnly();//释放保存点status.releaseHeldSavepoint();} else if (status.isNewTransaction()) {//没有保存点if (status.isDebug()) {this.logger.debug("Initiating transaction commit");}//获取是否全局回滚方式unexpectedRollback = status.isGlobalRollbackOnly();//提交事务 调用 Connection.commit() 方法提交事务this.doCommit(status);} else if (this.isFailEarlyOnGlobalRollbackOnly()) {//仅在早期失败时回滚unexpectedRollback = status.isGlobalRollbackOnly();}if (unexpectedRollback) {//为 ture 抛出异常 会被 catchthrow new UnexpectedRollbackException("Transaction silently rolled back because it has been marked as rollback-only");}} catch (UnexpectedRollbackException var17) {//unexpectedRollback 为 true 时候会进来这里//完成后触发 状态1 表示提交this.triggerAfterCompletion(status, 1);throw var17;} catch (TransactionException var18) {//doCommit 异常时候进入这里if (this.isRollbackOnCommitFailure()) {//提交异常时回滚this.doRollbackOnCommitException(status, var18);} else {//完成后触发 状态2 表示位置this.triggerAfterCompletion(status, 2);}throw var18;} catch (Error | RuntimeException var19) {//其他异常进入这里if (!beforeCompletionInvoked) {this.triggerBeforeCompletion(status);}//提交异常时候回滚this.doRollbackOnCommitException(status, var19);throw var19;}try {//提交后触发this.triggerAfterCommit(status);} finally {//提交完成后触发 0 表示提交this.triggerAfterCompletion(status, 0);}} finally {//最终的清理工作 清理事务的信息 上面分析过this.cleanupAfterCompletion(status);}}
总结:阅读了 Sping 事务源码之后,我们熟悉了事务的执行流程,也可以理解 Sping 事务的传播行为的作用,通过源码我们可以知道 PROPAGATION_NESTED 隔离级别并不是真正的新开了一个事务,只是自己新建了保存点,并没有新建事务,因为自己新建了事务保存点,所以自己提交回滚都不会影响外层事务,同样因为使用的是同一个事务,外层事务回滚内层事务也会回滚,这些知识不通过阅读源码是很难知道的,希望可以帮助到有需要的小伙伴。
欢迎提出建议及对错误的地方指出纠正。