文章目录
- 启动seata客户端
- 1.导入依赖
- 2.自动装配
- 发送请求的核心方法
- 客户端开启事务的核心流程
- 服务端分布式事务的处理机制
启动seata客户端
1.导入依赖
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.12.RELEASE</version></parent><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><!-- 选择SpringCloud和Alibaba版本的时候,一定要参考官网的建议,否则会有问题 --><spring-cloud.version>Hoxton.SR12</spring-cloud.version><spring-cloud-alibaba.version>2.2.9.RELEASE</spring-cloud-alibaba.version></properties><dependencyManagement><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>${spring-cloud.version}</version><type>pom</type><scope>import</scope></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-alibaba-dependencies</artifactId><version>${spring-cloud-alibaba.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><dependencies><!-- nacos服务注册与发现 --><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--引入seata--><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-seata</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-openfeign</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>
客户端启动流程图
2.自动装配
自动装配的核心类
在SeataAutoConfiguration我们找到对应注入的类GlobalTransactionScanner,通过名称我们应该推算出,他应该是对应@GlobalTransaction进行扫描,然后注入到容器
我们先看下GlobalTransactionScanner类继承的类
创建的代理类
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {if (!this.doCheckers(bean, beanName)) {return bean;} else {try {synchronized(PROXYED_SET) {//如果代理已经存在直接返回bean对象if (PROXYED_SET.contains(beanName)) {return bean;} else {this.interceptor = null;//判断是否是TCC模式if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, this.applicationContext)) {TCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), this.applicationContext);this.interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));ConfigurationCache.addConfigListener("service.disableGlobalTransaction", new ConfigurationChangeListener[]{(ConfigurationChangeListener)this.interceptor});} else {Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);//判断是否是被注解@globalTransactional或者@GlobalLock代理的类if (!this.existsAnnotation(new Class[]{serviceInterface}) && !this.existsAnnotation(interfacesIfJdk)) {return bean;}//创建globalTransactionalInterceptor拦截器if (this.globalTransactionalInterceptor == null) {this.globalTransactionalInterceptor = new GlobalTransactionalInterceptor(this.failureHandlerHook);ConfigurationCache.addConfigListener("service.disableGlobalTransaction", new ConfigurationChangeListener[]{(ConfigurationChangeListener)this.globalTransactionalInterceptor});}this.interceptor = this.globalTransactionalInterceptor;}LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", new Object[]{bean.getClass().getName(), beanName, this.interceptor.getClass().getName()});if (!AopUtils.isAopProxy(bean)) {bean = super.wrapIfNecessary(bean, beanName, cacheKey);} else {AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);//获取拦截器Advisor[] advisor = this.buildAdvisors(beanName, this.getAdvicesAndAdvisorsForBean((Class)null, (String)null, (TargetSource)null));Advisor[] var8 = advisor;int var9 = advisor.length;for(int var10 = 0; var10 < var9; ++var10) {Advisor avr = var8[var10];int pos = this.findAddSeataAdvisorPosition(advised, avr);advised.addAdvisor(pos, avr);}}PROXYED_SET.add(beanName);return bean;}}} catch (Exception var14) {throw new RuntimeException(var14);}}}
事务管理器TM,资源管理器RM初始化实在实例化之后进行这个是在GlobalTransactionScanner
继承的InitializingBean的afterPropertiesSet方法中实现InitializingBean接口的使用
实现InitializingBean接口的bean,在Spring容器初始化并设置所有bean属性后,会调用其afterPropertiesSet()方法。这通常用于在bean的属性全部设置完毕后需要进行的一些自定义初始化工作,例如验证属性或建立资源连接。
public void afterPropertiesSet() {//判断是否开启事务if (this.disableGlobalTransaction) {if (LOGGER.isInfoEnabled()) {LOGGER.info("Global transaction is disabled.");}//添加监听器 ConfigurationCache.addConfigListener("service.disableGlobalTransaction", new ConfigurationChangeListener[]{this});} else {//CAS将初始化置换成Trueif (this.initialized.compareAndSet(false, true)) {//初始化客户端this.initClient();}}}
初始化客户端的代码
private void initClient() {if (LOGGER.isInfoEnabled()) {LOGGER.info("Initializing Global Transaction Clients ... ");}if ("my_test_tx_group".equals(this.txServiceGroup)) {LOGGER.warn("the default value of seata.tx-service-group: {} has already changed to {} since Seata 1.5, please change your default configuration as soon as possible and we don't recommend you to use default tx-service-group's value provided by seata", "my_test_tx_group", "default_tx_group");}if (!StringUtils.isNullOrEmpty(this.applicationId) && !StringUtils.isNullOrEmpty(this.txServiceGroup)) {//初始化TM客户端TMClient.init(this.applicationId, this.txServiceGroup, accessKey, secretKey);if (LOGGER.isInfoEnabled()) {LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", this.applicationId, this.txServiceGroup);}//初始化RM客户端RMClient.init(this.applicationId, this.txServiceGroup);if (LOGGER.isInfoEnabled()) {LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", this.applicationId, this.txServiceGroup);}if (LOGGER.isInfoEnabled()) {LOGGER.info("Global Transaction Clients are initialized. ");}this.registerSpringShutdownHook();} else {throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", this.applicationId, this.txServiceGroup));}}
TM和RM底层都是用的Netty进行的通讯
TM的初始化
public static void init(String applicationId, String transactionServiceGroup, String accessKey, String secretKey) {TmNettyRemotingClient tmNettyRemotingClient = TmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup, accessKey, secretKey);tmNettyRemotingClient.init();}
RM的初始化
public static void init(String applicationId, String transactionServiceGroup) {RmNettyRemotingClient rmNettyRemotingClient = RmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);rmNettyRemotingClient.setResourceManager(DefaultResourceManager.get());rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get());rmNettyRemotingClient.init();}
发送请求的核心方法
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {//利用AOP获取代理类Class<?> targetClass = methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;//获取代理的方法Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);//判断此方法是否被@GlobalTranscation修饰GlobalTransactional globalTransactionalAnnotation = (GlobalTransactional)this.getAnnotation(method, targetClass, GlobalTransactional.class);//判断此方法是否被注解@GlobalLock修饰GlobalLock globalLockAnnotation = (GlobalLock)this.getAnnotation(method, targetClass, GlobalLock.class);boolean localDisable = this.disable || degradeCheck && degradeNum >= degradeCheckAllowTimes;if (!localDisable) {if (globalTransactionalAnnotation != null || this.aspectTransactional != null) {AspectTransactional transactional;if (globalTransactionalAnnotation != null) {transactional = new AspectTransactional(globalTransactionalAnnotation.timeoutMills(), globalTransactionalAnnotation.name(), globalTransactionalAnnotation.rollbackFor(), globalTransactionalAnnotation.rollbackForClassName(), globalTransactionalAnnotation.noRollbackFor(), globalTransactionalAnnotation.noRollbackForClassName(), globalTransactionalAnnotation.propagation(), globalTransactionalAnnotation.lockRetryInterval(), globalTransactionalAnnotation.lockRetryTimes());} else {transactional = this.aspectTransactional;}//被@GlobalTranstional修饰的方法进入此方法return this.handleGlobalTransaction(methodInvocation, transactional);}//被@GlobalLock修饰的方法进入此类if (globalLockAnnotation != null) {return this.handleGlobalLock(methodInvocation, globalLockAnnotation);}}}return methodInvocation.proceed();}
事务的核心方法TransactionalTemplate的excute方法
public Object execute(TransactionalExecutor business) throws Throwable {//获取事务信息TransactionInfo txInfo = business.getTransactionInfo();//如果为空直接抛异常if (txInfo == null) {throw new ShouldNeverHappenException("transactionInfo does not exist");} else {//创建或者获取全局事务标志xidGlobalTransaction tx = GlobalTransactionContext.getCurrent();//获取事务的传播机制Propagation propagation = txInfo.getPropagation();SuspendedResourcesHolder suspendedResourcesHolder = null;try {Object var6;switch(propagation) {case NOT_SUPPORTED:if (this.existingTransaction(tx)) {suspendedResourcesHolder = tx.suspend();}var6 = business.execute();return var6;case REQUIRES_NEW:if (this.existingTransaction(tx)) {suspendedResourcesHolder = tx.suspend();tx = GlobalTransactionContext.createNew();}break;case SUPPORTS:if (this.notExistingTransaction(tx)) {var6 = business.execute();return var6;}case REQUIRED:break;case NEVER:if (this.existingTransaction(tx)) {throw new TransactionException(String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s", tx.getXid()));}var6 = business.execute();return var6;case MANDATORY:if (this.notExistingTransaction(tx)) {throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");}break;default:throw new TransactionException("Not Supported Propagation:" + propagation);}//如果事务的xid为空,则重新创建一个新的if (tx == null) {tx = GlobalTransactionContext.createNew();}//获取全局锁配置GlobalLockConfig previousConfig = this.replaceGlobalLockConfig(txInfo);try {//开启全局事务this.beginTransaction(txInfo, tx);Object rs;Object ex;try {rs = business.execute();} catch (Throwable var17) {ex = var17;this.completeTransactionAfterThrowing(txInfo, tx, var17);throw var17;}this.commitTransaction(tx);ex = rs;return ex;} finally {this.resumeGlobalLockConfig(previousConfig);this.triggerAfterCompletion();this.cleanUp();}} finally {if (suspendedResourcesHolder != null) {tx.resume(suspendedResourcesHolder);}}}}
客户端开启事务的核心流程
private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws ExecutionException {try {//执行开启事务前的操作this.triggerBeforeBegin();//开启事务tx.begin(txInfo.getTimeOut(), txInfo.getName());//执行开启事务后的操作this.triggerAfterBegin();} catch (TransactionException var4) {throw new ExecutionException(tx, var4, Code.BeginFailure);}}
public void begin(int timeout, String name) throws TransactionException {//判断当前的角色是不是Launcherif (this.role != GlobalTransactionRole.Launcher) {//不是判断当前的xid是否为nullthis.assertXIDNotNull();if (LOGGER.isDebugEnabled()) {LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", this.xid);}} else {//不是判断当前的xid是否为nullthis.assertXIDNull();String currentXid = RootContext.getXID();if (currentXid != null) {throw new IllegalStateException("Global transaction already exists, can't begin a new global transaction, currentXid = " + currentXid);} else {//开启事务,请求后台服务,返回xidthis.xid = this.transactionManager.begin((String)null, (String)null, name, timeout);//将事务状态置为开启this.status = GlobalStatus.Begin;//绑定xidRootContext.bind(this.xid);if (LOGGER.isInfoEnabled()) {LOGGER.info("Begin new global transaction [{}]", this.xid);}}}}
服务端分布式事务的处理机制
protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)throws TransactionException {/*** 调用core.begin开启事务,并持久化* 给response设置全局事务编号XID*/response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),request.getTransactionName(), request.getTimeout()));if (LOGGER.isInfoEnabled()) {LOGGER.info("Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{}",rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid());}}
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)throws TransactionException {//key1:创建全局会话,这里面已经创建了全局事务IdGlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,timeout);MDC.put(RootContext.MDC_KEY_XID, session.getXid());// 添加事务生命周期监听器session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());//2、 开启事务,这里有事务持久话的一个机制session.begin();// transaction start event// 发送事务开启事件MetricsPublisher.postSessionDoingEvent(session, false);// 返回全局事务IDreturn session.getXid();}
@Overridepublic void begin() throws TransactionException {this.status = GlobalStatus.Begin;this.beginTime = System.currentTimeMillis();this.active = true;for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {// 在这里处理lifecycleListener.onBegin(this);}}
private void writeSession(LogOperation logOperation, SessionStorable sessionStorable) throws TransactionException {// 将事务信息写入数据库if (!transactionStoreManager.writeSession(logOperation, sessionStorable)) {if (LogOperation.GLOBAL_ADD.equals(logOperation)) {throw new GlobalTransactionException(TransactionExceptionCode.FailedWriteSession,"Fail to store global session");} else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {throw new GlobalTransactionException(TransactionExceptionCode.FailedWriteSession,"Fail to update global session");} else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {throw new GlobalTransactionException(TransactionExceptionCode.FailedWriteSession,"Fail to remove global session");} else if (LogOperation.BRANCH_ADD.equals(logOperation)) {throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession,"Fail to store branch session");} else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession,"Fail to update branch session");} else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession,"Fail to remove branch session");} else {throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession,"Unknown LogOperation:" + logOperation.name());}}}