【Seata源码学习 】篇六 全局事务提交与回滚
全局事务提交
TM在RPC远程调用RM后,如果没有出现异常,将向TC发送提交全局事务请求io.seata.tm.api.TransactionalTemplate#execute
public Object execute(TransactionalExecutor business) throws Throwable {// 1. Get transactionInfo//获取@GlobalTransation注解的属性封装的TransactionInfoTransactionInfo txInfo = business.getTransactionInfo();if (txInfo == null) {throw new ShouldNeverHappenException("transactionInfo does not exist");}// 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.// GlobalTransactionContext 全局事务上下文对象 用于创建一个新事务,或者获取当前事务// GlobalTransactionContext.getCurrent - > RootContext.getXID -> ContextCore.get// ContextCore 是一个接口 seata有两个实现 FastThreadLocalContextCore ThreadLocalContextCore 都是基于ThreadLocal存储XIDGlobalTransaction tx = GlobalTransactionContext.getCurrent();// 1.2 Handle the transaction propagation.// 获取当前事务的传播行为Propagation propagation = txInfo.getPropagation();// 存储被挂起的事务XIDSuspendedResourcesHolder suspendedResourcesHolder = null;try {//处理事务的传播行为switch (propagation) {//如果当前事务的传播行为是 NOT_SUPPORTED 则以非事务的方式执行调用methodInvocation.proceed()// 如果当前拦截器不为拦截链的最后一个,则将获取下一个拦截器执行invoke方法,如果是最后一个,则直接执行目标方法case NOT_SUPPORTED:// If transaction is existing, suspend it.//如果当前存在全局事务,则挂起当前事务if (existingTransaction(tx)) {suspendedResourcesHolder = tx.suspend();}// Execute without transaction and return.// 责任链模式 继续执行拦截器链return business.execute();case REQUIRES_NEW:// If transaction is existing, suspend it, and then begin new transaction.// 如果当前存在事务 则挂起当前事务 并创建一个新的事务if (existingTransaction(tx)) {suspendedResourcesHolder = tx.suspend();tx = GlobalTransactionContext.createNew();}// Continue and execute with new transactionbreak;case SUPPORTS:// If transaction is not existing, execute without transaction.// 如果不存在事务 则跳过当前事务拦截器 执行拦截器链并返回if (notExistingTransaction(tx)) {return business.execute();}// Continue and execute with new transactionbreak;case REQUIRED:// If current transaction is existing, execute with current transaction,// else continue and execute with new transaction.break;case NEVER:// If transaction is existing, throw exception.// 有事务抛出异常if (existingTransaction(tx)) {throw new TransactionException(String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s", tx.getXid()));} else {// Execute without transaction and return.return business.execute();}case MANDATORY:// If transaction is not existing, throw exception.// 要求必须有事务,没事务抛出异常if (notExistingTransaction(tx)) {throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");}// Continue and execute with current transaction.break;default:throw new TransactionException("Not Supported Propagation:" + propagation);}// 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.// 如果当前的事务上下文中不存在事务 那么此次事务发起为 TM 角色为 Launcherif (tx == null) {tx = GlobalTransactionContext.createNew();}// set current tx config to holder// 记录当前的全局锁配置,存放到 ThreadLocalGlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);try {// 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,// else do nothing. Of course, the hooks will still be triggered.// 执行全局事务开启的前后置钩子方法// 如果当前事务的角色是 Participant 也就是 RM ,判断当前事务上下文RootContext是否存在XID,如果不存在,抛出异常// 如果当前事务的角色是 launcher 也就是 TM ,判断当前事务上下文RootContext是否存在XID,如果存在,抛出异常// 如果不存在,则通过TmNettyRemotingClient 向TC发送一个 GlobalReportRequest 同步消息,并获取TC返回的XID,绑定到RootContextbeginTransaction(txInfo, tx);Object rs;try {// Do Your Business// 执行执行拦截器链路rs = business.execute();} catch (Throwable ex) {// 3. The needed business exception to rollback.// 如果抛出异常,判断异常是否在指定的范围中(默认为Throwable类及其子类)// 执行异常回滚的前后钩子方法// 如果当前事务的角色是 launcher 也就是 TM ,通过TmNettyRemotingClient 向TC发送一个 GlobalRollbackRequest 同步消息// 并记录TC返回的当前事务状态StatuscompleteTransactionAfterThrowing(txInfo, tx, ex);throw ex;}// 4. everything is fine, commit.// 如果方法执行过程中没有出现异常// 执行事务提交的前后置方法// 如果当前事务的角色是 launcher 也就是 TM ,通过TmNettyRemotingClient 向TC发送一个 GlobalCommitRequest 同步消息 提交全局事务// 并记录TC返回的当前事务状态StatuscommitTransaction(tx);return rs;} finally {//5. clear// 恢复以前的全局锁配置resumeGlobalLockConfig(previousConfig);// 执行整个事务完成的前后置方法triggerAfterCompletion();// 移除当前绑定的事务钩子对象cleanUp();}} finally {// If the transaction is suspended, resume it.// 当前事务执行完毕后,恢复挂起的事务,// 获取suspendedResourcesHolder关联的xid,由RootContext重新绑定if (suspendedResourcesHolder != null) {tx.resume(suspendedResourcesHolder);}}}
事务提交前后钩子方法执行
在全局事务提交前后,seata给我们预留了两个钩子方法,可以根据实际生产中的业务需要进行扩展
io.seata.tm.api.TransactionalTemplate#commitTransaction
private void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {try {//全局事务提交前钩子方法triggerBeforeCommit();//全局事务提交tx.commit();//全局事务提交后钩子方法triggerAfterCommit();} catch (TransactionException txe) {// 4.1 Failed to committhrow new TransactionalExecutor.ExecutionException(tx, txe,TransactionalExecutor.Code.CommitFailure);}}
TM提交全局事务
io.seata.tm.api.DefaultGlobalTransaction#commit
public void commit() throws TransactionException {// 全局事务提交必须是 TM发起的if (role == GlobalTransactionRole.Participant) {// Participant has no responsibility of committingif (LOGGER.isDebugEnabled()) {LOGGER.debug("Ignore Commit(): just involved in global transaction [{}]", xid);}return;}//XID不能为空assertXIDNotNull();// 全局事务提交失败默认重试5次int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT;try {while (retry > 0) {try {retry--;//由 TransactionManager 对事务进行提交操作 status = transactionManager.commit(xid);break;} catch (Throwable ex) {LOGGER.error("Failed to report global commit [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());if (retry == 0) {throw new TransactionException("Failed to report global commit", ex);}}}} finally {if (xid.equals(RootContext.getXID())) {//挂起事务 -》 将当前的XID解绑,将XID封装到SuspendedResourcesHolder中suspend();}}if (LOGGER.isInfoEnabled()) {LOGGER.info("[{}] commit status: {}", xid, status);}}
全局事务提交失败默认会重试5次,seata对事务的操作都是交由TransactionManager接口处理,TC、TM、RM分别有不同的实现类,具体如下图所示
io.seata.tm.DefaultTransactionManager#commit
public GlobalStatus commit(String xid) throws TransactionException {GlobalCommitRequest globalCommit = new GlobalCommitRequest();//绑定全局事务XIDglobalCommit.setXid(xid);//向TC发送GlobalCommitRequest消息GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);return response.getGlobalStatus();}
TC处理全局事务提交
TC接收到消息后由ServerHandler调用processMessage处理,将根据消息的类型从pair中匹配合适的RemotingProcessor处理器; GlobalCommitResponse 消息将匹配ServerOnRequestProcessor处理器,ServerOnRequestProcessor进而又将消息由TransactionMessageHandler进行处理。
TransactionMessageHandler 也是一个接口,TC将使用的是DefaultCoordinator实现类,最终将消息向上转型为AbstractTransactionRequestToTC,调用不同消息子类的handle进行处理。
io.seata.core.protocol.transaction.GlobalCommitRequest#handle
public AbstractTransactionResponse handle(RpcContext rpcContext) {return handler.handle(this, rpcContext);}
io.seata.server.AbstractTCInboundHandler#handle(io.seata.core.protocol.transaction.GlobalCommitRequest, io.seata.core.rpc.RpcContext)
public GlobalCommitResponse handle(GlobalCommitRequest request, final RpcContext rpcContext) {GlobalCommitResponse response = new GlobalCommitResponse();response.setGlobalStatus(GlobalStatus.Committing);exceptionHandleTemplate(new AbstractCallback<GlobalCommitRequest, GlobalCommitResponse>() {@Overridepublic void execute(GlobalCommitRequest request, GlobalCommitResponse response)throws TransactionException {try {//真正开始执行全局事务提交doGlobalCommit(request, response, rpcContext);} catch (StoreException e) {throw new TransactionException(TransactionExceptionCode.FailedStore,String.format("global commit request failed. xid=%s, msg=%s", request.getXid(), e.getMessage()),e);}}@Overridepublic void onTransactionException(GlobalCommitRequest request, GlobalCommitResponse response,TransactionException tex) {super.onTransactionException(request, response, tex);checkTransactionStatus(request, response);}@Overridepublic void onException(GlobalCommitRequest request, GlobalCommitResponse response, Exception rex) {super.onException(request, response, rex);checkTransactionStatus(request, response);}}, request, response);return response;}
io.seata.server.coordinator.DefaultCoordinator#doGlobalCommit
protected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext)throws TransactionException {MDC.put(RootContext.MDC_KEY_XID, request.getXid());response.setGlobalStatus(core.commit(request.getXid()));}
io.seata.server.coordinator.DefaultCore#commit
@Overridepublic GlobalStatus commit(String xid) throws TransactionException {// 根据XID 从存储介质中找到对应的GlobalSessionGlobalSession globalSession = SessionHolder.findGlobalSession(xid);// 如果没找到 则全局事务已结束if (globalSession == null) {return GlobalStatus.Finished;}globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());// just lock changeStatusboolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {if (globalSession.getStatus() == GlobalStatus.Begin) {// Highlight: Firstly, close the session, then no more branch can be registered.// AT模式释放全局锁globalSession.closeAndClean();// 如果是AT模式 ,可以异步提交if (globalSession.canBeCommittedAsync()) {// 将当前全局事务状态改为 异步提交中 并且将当前全局事务会话加入到 ASYNC_COMMITTING_SESSION_MANAGER sessionHolder中globalSession.asyncCommit();MetricsPublisher.postSessionDoneEvent(globalSession, GlobalStatus.Committed, false, false);return false;} else {globalSession.changeGlobalStatus(GlobalStatus.Committing);return true;}}return false;});if (shouldCommit) {boolean success = doGlobalCommit(globalSession, false);//If successful and all remaining branches can be committed asynchronously, do async commit.if (success && globalSession.hasBranch() && globalSession.canBeCommittedAsync()) {globalSession.asyncCommit();return GlobalStatus.Committed;} else {return globalSession.getStatus();}} else {return globalSession.getStatus() == GlobalStatus.AsyncCommitting ? GlobalStatus.Committed : globalSession.getStatus();}}
DefaultCore 首先根据XID从存储介质中找到GlobalSession,AT模式下释放全局锁,标记当前全局事务状态为GlobalStatus.AsyncCommitting
seata 服务端在启动时会初始化 DefaultCoordinator,DefaultCoordinator会启动周期线程每隔一秒钟执行handleAsyncCommitting处理异步全局事务提交
asyncCommitting.scheduleAtFixedRate(() -> SessionHolder.distributedLockAndExecute(ASYNC_COMMITTING, this::handleAsyncCommitting), 0,ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
protected void handleAsyncCommitting() {// 用于从存储介质中查找满足条件的sessionSessionCondition sessionCondition = new SessionCondition(GlobalStatus.AsyncCommitting);// 查找所有状态为 GlobalStatus.AsyncCommitting 的全局事务Collection<GlobalSession> asyncCommittingSessions =SessionHolder.getAsyncCommittingSessionManager().findGlobalSessions(sessionCondition);if (CollectionUtils.isEmpty(asyncCommittingSessions)) {return;}SessionHelper.forEach(asyncCommittingSessions, asyncCommittingSession -> {try {asyncCommittingSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());// 调用 DefaultCore.doGlobalCommit 处理core.doGlobalCommit(asyncCommittingSession, true);} catch (TransactionException ex) {LOGGER.error("Failed to async committing [{}] {} {}", asyncCommittingSession.getXid(), ex.getCode(), ex.getMessage(), ex);}});}
io.seata.server.coordinator.DefaultCore#doGlobalCommit
public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {boolean success = true;// start committing eventMetricsPublisher.postSessionDoingEvent(globalSession, retrying);//当前事务未AT模式if (globalSession.isSaga()) {success = getCore(BranchType.SAGA).doGlobalCommit(globalSession, retrying);} else {// 遍历所有的分支事务Boolean result = SessionHelper.forEach(globalSession.getSortedBranches(), branchSession -> {// if not retrying, skip the canBeCommittedAsync branchesif (!retrying && branchSession.canBeCommittedAsync()) {return CONTINUE;}BranchStatus currentStatus = branchSession.getStatus();if (currentStatus == BranchStatus.PhaseOne_Failed) {SessionHelper.removeBranch(globalSession, branchSession, !retrying);return CONTINUE;}try {// 策略模式 不同的分支事务类型交由不同的 Core 处理BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);if (isXaerNotaTimeout(globalSession,branchStatus)) {LOGGER.info("Commit branch XAER_NOTA retry timeout, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());branchStatus = BranchStatus.PhaseTwo_Committed;}switch (branchStatus) {//分支事务提交成功 将当前分支事务从全局事务中移除case PhaseTwo_Committed:SessionHelper.removeBranch(globalSession, branchSession, !retrying);return CONTINUE;case PhaseTwo_CommitFailed_Unretryable://not at branchSessionHelper.endCommitFailed(globalSession, retrying);LOGGER.error("Committing global transaction[{}] finally failed, caused by branch transaction[{}] commit failed.", globalSession.getXid(), branchSession.getBranchId());return false;default:if (!retrying) {globalSession.queueToRetryCommit();return false;}if (globalSession.canBeCommittedAsync()) {LOGGER.error("Committing branch transaction[{}], status:{} and will retry later",branchSession.getBranchId(), branchStatus);return CONTINUE;} else {LOGGER.error("Committing global transaction[{}] failed, caused by branch transaction[{}] commit failed, will retry later.", globalSession.getXid(), branchSession.getBranchId());return false;}}} catch (Exception ex) {StackTraceLogger.error(LOGGER, ex, "Committing branch transaction exception: {}",new String[] {branchSession.toString()});if (!retrying) {globalSession.queueToRetryCommit();throw new TransactionException(ex);}}return CONTINUE;});// Return if the result is not nullif (result != null) {return result;}//If has branch and not all remaining branches can be committed asynchronously,//do print log and return falseif (globalSession.hasBranch() && !globalSession.canBeCommittedAsync()) {LOGGER.info("Committing global transaction is NOT done, xid = {}.", globalSession.getXid());return false;}if (!retrying) {//contains not AT branchglobalSession.setStatus(GlobalStatus.Committed);}}// if it succeeds and there is no branch, retrying=true is the asynchronous state when retrying. EndCommitted is// executed to improve concurrency performance, and the global transaction ends..if (success && globalSession.getBranchSessions().isEmpty()) {SessionHelper.endCommitted(globalSession, retrying);LOGGER.info("Committing global transaction is successfully done, xid = {}.", globalSession.getXid());}return success;}
全局事务在提交时,会遍历所有与之关联的分支事务ID,向RM发送BranchCommitRequest 消息,RM分支事务提交成功后将分支事务从全局事务中删除。当所有分支事务全部执行成功后,将GlobalSession信息从数据库中删除
public BranchStatus branchCommit(GlobalSession globalSession, BranchSession branchSession) throws TransactionException {try {BranchCommitRequest request = new BranchCommitRequest();request.setXid(branchSession.getXid());request.setBranchId(branchSession.getBranchId());request.setResourceId(branchSession.getResourceId());request.setApplicationData(branchSession.getApplicationData());request.setBranchType(branchSession.getBranchType());return branchCommitSend(request, globalSession, branchSession);} catch (IOException | TimeoutException e) {throw new BranchTransactionException(FailedToSendBranchCommitRequest,String.format("Send branch commit failed, xid = %s branchId = %s", branchSession.getXid(),branchSession.getBranchId()), e);}}
RM提交分支事务
TC发送BranchCommitRequest消息后,RM接收到并将消息交由RmBranchCommitProcessor处理
io.seata.core.rpc.netty.RmNettyRemotingClient#registerProcessor
// 分支事务提交RmBranchCommitProcessor rmBranchCommitProcessor = new RmBranchCommitProcessor(getTransactionMessageHandler(), this);super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT, rmBranchCommitProcessor, messageExecutor);
io.seata.core.rpc.processor.client.RmBranchCommitProcessor#process
public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {//获取TC的地址String remoteAddress = NetUtil.toStringAddress(ctx.channel().remoteAddress());Object msg = rpcMessage.getBody();if (LOGGER.isInfoEnabled()) {LOGGER.info("rm client handle branch commit process:" + msg);}//处理分支事务提交handleBranchCommit(rpcMessage, remoteAddress, (BranchCommitRequest) msg);}
io.seata.core.rpc.processor.client.RmBranchCommitProcessor#handleBranchCommit
private void handleBranchCommit(RpcMessage request, String serverAddress, BranchCommitRequest branchCommitRequest) {BranchCommitResponse resultMessage;resultMessage = (BranchCommitResponse) handler.onRequest(branchCommitRequest, null);if (LOGGER.isDebugEnabled()) {LOGGER.debug("branch commit result:" + resultMessage);}try {this.remotingClient.sendAsyncResponse(serverAddress, request, resultMessage);} catch (Throwable throwable) {LOGGER.error("branch commit error: {}", throwable.getMessage(), throwable);}}
io.seata.rm.AbstractRMHandler#onRequest
public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {if (!(request instanceof AbstractTransactionRequestToRM)) {throw new IllegalArgumentException();}AbstractTransactionRequestToRM transactionRequest = (AbstractTransactionRequestToRM)request;transactionRequest.setRMInboundMessageHandler(this);return transactionRequest.handle(context);}
io.seata.core.protocol.transaction.BranchCommitRequest#handle
public AbstractTransactionResponse handle(RpcContext rpcContext) {return handler.handle(this);}
io.seata.rm.AbstractRMHandler#handle(io.seata.core.protocol.transaction.BranchCommitRequest)
public BranchCommitResponse handle(BranchCommitRequest request) {BranchCommitResponse response = new BranchCommitResponse();//真正开始处理分支事务提交exceptionHandleTemplate(new AbstractCallback<BranchCommitRequest, BranchCommitResponse>() {@Overridepublic void execute(BranchCommitRequest request, BranchCommitResponse response)throws TransactionException {doBranchCommit(request, response);}}, request, response);return response;}
io.seata.rm.AbstractRMHandler#doBranchCommit
protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response)throws TransactionException {String xid = request.getXid();long branchId = request.getBranchId();String resourceId = request.getResourceId();String applicationData = request.getApplicationData();if (LOGGER.isInfoEnabled()) {LOGGER.info("Branch committing: " + xid + " " + branchId + " " + resourceId + " " + applicationData);}// AT模式下 将从存储介质中删除Undo.log日志BranchStatus status = getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId,applicationData);response.setXid(xid);response.setBranchId(branchId);response.setBranchStatus(status);if (LOGGER.isInfoEnabled()) {LOGGER.info("Branch commit result: " + status);}}
全局事务提交时,TC会释放全局锁,如果为AT模式将标记全局事务状态为异步提交中,然后遍历所有与当前全局事务绑定的分支事务,向RM发送BranchCommitRequest消息,RM随后删除undo.log日志,并向TC返回状态。待所有的分支事务全部删除undo.log日志成功后,TC将标记全局事务状态为Committed 并 从存储介质中删除全局事务信息
全局事务回滚
TM提交全局事务回滚请求
TM在RPC远程调用RM后,如果出现异常,获取自身业务方法抛出异常,将执行completeTransactionAfterThrowing方法
io.seata.tm.api.TransactionalTemplate#completeTransactionAfterThrowing
private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable originalException) throws TransactionalExecutor.ExecutionException {//roll backif (txInfo != null && txInfo.rollbackOn(originalException)) {try {//全局事务回滚rollbackTransaction(tx, originalException);} catch (TransactionException txe) {// Failed to rollbackthrow new TransactionalExecutor.ExecutionException(tx, txe,TransactionalExecutor.Code.RollbackFailure, originalException);}} else {// not roll back on this exception, so commitcommitTransaction(tx);}}
接着向TC发送GlobalRollbackRequest消息
io.seata.tm.DefaultTransactionManager#rollback
public GlobalStatus rollback(String xid) throws TransactionException {GlobalRollbackRequest globalRollback = new GlobalRollbackRequest();globalRollback.setXid(xid);GlobalRollbackResponse response = (GlobalRollbackResponse) syncCall(globalRollback);return response.getGlobalStatus();}
TC处理全局事务回滚请求
io.seata.server.AbstractTCInboundHandler#handle(io.seata.core.protocol.transaction.GlobalRollbackRequest, io.seata.core.rpc.RpcContext)
public GlobalRollbackResponse handle(GlobalRollbackRequest request, final RpcContext rpcContext) {GlobalRollbackResponse response = new GlobalRollbackResponse();response.setGlobalStatus(GlobalStatus.Rollbacking);exceptionHandleTemplate(new AbstractCallback<GlobalRollbackRequest, GlobalRollbackResponse>() {@Overridepublic void execute(GlobalRollbackRequest request, GlobalRollbackResponse response)throws TransactionException {try {doGlobalRollback(request, response, rpcContext);} catch (StoreException e) {throw new TransactionException(TransactionExceptionCode.FailedStore, String.format("global rollback request failed. xid=%s, msg=%s", request.getXid(), e.getMessage()), e);}}@Overridepublic void onTransactionException(GlobalRollbackRequest request, GlobalRollbackResponse response,TransactionException tex) {super.onTransactionException(request, response, tex);// may be appears StoreException outer layer method catchcheckTransactionStatus(request, response);}@Overridepublic void onException(GlobalRollbackRequest request, GlobalRollbackResponse response, Exception rex) {super.onException(request, response, rex);// may be appears StoreException outer layer method catchcheckTransactionStatus(request, response);}}, request, response);return response;}
io.seata.server.coordinator.DefaultCore#rollback
public GlobalStatus rollback(String xid) throws TransactionException {GlobalSession globalSession = SessionHolder.findGlobalSession(xid);if (globalSession == null) {return GlobalStatus.Finished;}globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());// just lock changeStatusboolean shouldRollBack = SessionHolder.lockAndExecute(globalSession, () -> {globalSession.close(); // Highlight: Firstly, close the session, then no more branch can be registered.if (globalSession.getStatus() == GlobalStatus.Begin) {// 标记全局事务状态为 RollbackingglobalSession.changeGlobalStatus(GlobalStatus.Rollbacking);return true;}return false;});if (!shouldRollBack) {return globalSession.getStatus();}boolean rollbackSuccess = doGlobalRollback(globalSession, false);return rollbackSuccess ? GlobalStatus.Rollbacked : globalSession.getStatus();}
io.seata.server.coordinator.DefaultCore#doGlobalRollback
public boolean doGlobalRollback(GlobalSession globalSession, boolean retrying) throws TransactionException {boolean success = true;// start rollback eventMetricsPublisher.postSessionDoingEvent(globalSession, retrying);if (globalSession.isSaga()) {success = getCore(BranchType.SAGA).doGlobalRollback(globalSession, retrying);} else {//遍历分支事务Boolean result = SessionHelper.forEach(globalSession.getReverseSortedBranches(), branchSession -> {BranchStatus currentBranchStatus = branchSession.getStatus();if (currentBranchStatus == BranchStatus.PhaseOne_Failed) {SessionHelper.removeBranch(globalSession, branchSession, !retrying);return CONTINUE;}try {//分支事务回滚操作BranchStatus branchStatus = branchRollback(globalSession, branchSession);if (isXaerNotaTimeout(globalSession, branchStatus)) {LOGGER.info("Rollback branch XAER_NOTA retry timeout, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());branchStatus = BranchStatus.PhaseTwo_Rollbacked;}switch (branchStatus) {//回滚成功则将分支事务从存储介质中删除case PhaseTwo_Rollbacked:SessionHelper.removeBranch(globalSession, branchSession, !retrying);LOGGER.info("Rollback branch transaction successfully, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());return CONTINUE;//回滚失败则标记全局锁回滚失败,并释放全局锁case PhaseTwo_RollbackFailed_Unretryable:SessionHelper.endRollbackFailed(globalSession, retrying);LOGGER.info("Rollback branch transaction fail and stop retry, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());return false;default:LOGGER.info("Rollback branch transaction fail and will retry, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());if (!retrying) {globalSession.queueToRetryRollback();}return false;}} catch (Exception ex) {StackTraceLogger.error(LOGGER, ex,"Rollback branch transaction exception, xid = {} branchId = {} exception = {}",new String[] {globalSession.getXid(), String.valueOf(branchSession.getBranchId()), ex.getMessage()});if (!retrying) {globalSession.queueToRetryRollback();}throw new TransactionException(ex);}});// Return if the result is not nullif (result != null) {return result;}}// In db mode, lock and branch data residual problems may occur.// Therefore, execution needs to be delayed here and cannot be executed synchronously.if (success) {// 全局事务回滚成功,将全局锁释并标记全局锁状态为 Rollbacked 随后删除全局事务信息SessionHelper.endRollbacked(globalSession, retrying);LOGGER.info("Rollback global transaction successfully, xid = {}.", globalSession.getXid());}return success;}
io.seata.server.coordinator.AbstractCore#branchRollback
public BranchStatus branchRollback(GlobalSession globalSession, BranchSession branchSession) throws TransactionException {try {BranchRollbackRequest request = new BranchRollbackRequest();request.setXid(branchSession.getXid());request.setBranchId(branchSession.getBranchId());request.setResourceId(branchSession.getResourceId());request.setApplicationData(branchSession.getApplicationData());request.setBranchType(branchSession.getBranchType());return branchRollbackSend(request, globalSession, branchSession);} catch (IOException | TimeoutException e) {throw new BranchTransactionException(FailedToSendBranchRollbackRequest,String.format("Send branch rollback failed, xid = %s branchId = %s",branchSession.getXid(), branchSession.getBranchId()), e);}}
与全局事务提交类似,全局事务回滚时也是遍历所有的分支事务,随后进行分支事务回滚操作,分支事务回滚成功后就将分支事务的信息从存储介质中删除;待所有的分支事务全部回滚后,就将全局锁释放,并删除全局锁信息。
接下来我们看下分支事务回滚操作
RM处理分支事务回滚
TC向RM发送BranchRollbackRequest消息后,RM将消息交由 RmBranchRollbackProcessor 进行处理
RmBranchRollbackProcessor rmBranchRollbackProcessor = new RmBranchRollbackProcessor(getTransactionMessageHandler(), this);super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK, rmBranchRollbackProcessor, messageExecutor);
io.seata.core.rpc.processor.client.RmBranchRollbackProcessor#handleBranchRollback
private void handleBranchRollback(RpcMessage request, String serverAddress, BranchRollbackRequest branchRollbackRequest) {BranchRollbackResponse resultMessage;resultMessage = (BranchRollbackResponse) handler.onRequest(branchRollbackRequest, null);if (LOGGER.isDebugEnabled()) {LOGGER.debug("branch rollback result:" + resultMessage);}try {this.remotingClient.sendAsyncResponse(serverAddress, request, resultMessage);} catch (Throwable throwable) {LOGGER.error("send response error: {}", throwable.getMessage(), throwable);}}
io.seata.rm.AbstractRMHandler#handle(io.seata.core.protocol.transaction.BranchRollbackRequest)
public BranchRollbackResponse handle(BranchRollbackRequest request) {BranchRollbackResponse response = new BranchRollbackResponse();exceptionHandleTemplate(new AbstractCallback<BranchRollbackRequest, BranchRollbackResponse>() {@Overridepublic void execute(BranchRollbackRequest request, BranchRollbackResponse response)throws TransactionException {doBranchRollback(request, response);}}, request, response);return response;}
io.seata.rm.AbstractRMHandler#doBranchRollback
protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response)throws TransactionException {String xid = request.getXid();long branchId = request.getBranchId();String resourceId = request.getResourceId();String applicationData = request.getApplicationData();if (LOGGER.isInfoEnabled()) {LOGGER.info("Branch Rollbacking: " + xid + " " + branchId + " " + resourceId);}//AT模式 使用的是 DataSourceManagerBranchStatus status = getResourceManager().branchRollback(request.getBranchType(), xid, branchId, resourceId,applicationData);response.setXid(xid);response.setBranchId(branchId);response.setBranchStatus(status);if (LOGGER.isInfoEnabled()) {LOGGER.info("Branch Rollbacked result: " + status);}}
io.seata.rm.datasource.DataSourceManager#branchRollback
public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,String applicationData) throws TransactionException {DataSourceProxy dataSourceProxy = get(resourceId);if (dataSourceProxy == null) {throw new ShouldNeverHappenException(String.format("resource: %s not found",resourceId));}try {//通过undo.log日志进行回滚UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId);} catch (TransactionException te) {StackTraceLogger.info(LOGGER, te,"branchRollback failed. branchType:[{}], xid:[{}], branchId:[{}], resourceId:[{}], applicationData:[{}]. reason:[{}]",new Object[]{branchType, xid, branchId, resourceId, applicationData, te.getMessage()});if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;} else {return BranchStatus.PhaseTwo_RollbackFailed_Retryable;}}return BranchStatus.PhaseTwo_Rollbacked;}
io.seata.rm.datasource.undo.AbstractUndoLogManager#undo
public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {Connection conn = null;ResultSet rs = null;PreparedStatement selectPST = null;boolean originalAutoCommit = true;for (; ; ) {try {conn = dataSourceProxy.getPlainConnection();// The entire undo process should run in a local transaction.if (originalAutoCommit = conn.getAutoCommit()) {conn.setAutoCommit(false);}// Find UNDO LOG//根据全局事务xid 和分支事务id查找回滚日志selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);selectPST.setLong(1, branchId);selectPST.setString(2, xid);rs = selectPST.executeQuery();//存在回滚日志boolean exists = false;while (rs.next()) {exists = true;// It is possible that the server repeatedly sends a rollback request to roll back// the same branch transaction to multiple processes,// ensuring that only the undo_log in the normal state is processed.int state = rs.getInt(ClientTableColumnsName.UNDO_LOG_LOG_STATUS);if (!canUndo(state)) {if (LOGGER.isInfoEnabled()) {LOGGER.info("xid {} branch {}, ignore {} undo_log", xid, branchId, state);}return;}String contextString = rs.getString(ClientTableColumnsName.UNDO_LOG_CONTEXT);//解析回滚日志Map<String, String> context = parseContext(contextString);byte[] rollbackInfo = getRollbackInfo(rs);String serializer = context == null ? null : context.get(UndoLogConstants.SERIALIZER_KEY);UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance(): UndoLogParserFactory.getInstance(serializer);//反序列化成BranchUndoLog对象BranchUndoLog branchUndoLog = parser.decode(rollbackInfo);try {// put serializer name to localsetCurrentSerializer(parser.getName());List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs();if (sqlUndoLogs.size() > 1) {Collections.reverse(sqlUndoLogs);}for (SQLUndoLog sqlUndoLog : sqlUndoLogs) {TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dataSourceProxy.getDbType()).getTableMeta(conn, sqlUndoLog.getTableName(), dataSourceProxy.getResourceId());sqlUndoLog.setTableMeta(tableMeta);AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(dataSourceProxy.getDbType(), sqlUndoLog);//执行回滚日志undoExecutor.executeOn(conn);}} finally {// remove serializer nameremoveCurrentSerializer();}}// If undo_log exists, it means that the branch transaction has completed the first phase,// we can directly roll back and clean the undo_log// Otherwise, it indicates that there is an exception in the branch transaction,// causing undo_log not to be written to the database.// For example, the business processing timeout, the global transaction is the initiator rolls back.// To ensure data consistency, we can insert an undo_log with GlobalFinished state// to prevent the local transaction of the first phase of other programs from being correctly submitted.// See https://github.com/seata/seata/issues/489if (exists) {//回滚日志执行成功后删除deleteUndoLog(xid, branchId, conn);//提交本地事务conn.commit();if (LOGGER.isInfoEnabled()) {LOGGER.info("xid {} branch {}, undo_log deleted with {}", xid, branchId,State.GlobalFinished.name());}} else {insertUndoLogWithGlobalFinished(xid, branchId, UndoLogParserFactory.getInstance(), conn);conn.commit();if (LOGGER.isInfoEnabled()) {LOGGER.info("xid {} branch {}, undo_log added with {}", xid, branchId,State.GlobalFinished.name());}}return;} catch (SQLIntegrityConstraintViolationException e) {// Possible undo_log has been inserted into the database by other processes, retrying rollback undo_logif (LOGGER.isInfoEnabled()) {LOGGER.info("xid {} branch {}, undo_log inserted, retry rollback", xid, branchId);}} catch (Throwable e) {if (conn != null) {try {conn.rollback();} catch (SQLException rollbackEx) {LOGGER.warn("Failed to close JDBC resource while undo ... ", rollbackEx);}}throw new BranchTransactionException(BranchRollbackFailed_Retriable, String.format("Branch session rollback failed and try again later xid = %s branchId = %s %s", xid,branchId, e.getMessage()), e);} finally {try {if (rs != null) {rs.close();}if (selectPST != null) {selectPST.close();}if (conn != null) {if (originalAutoCommit) {conn.setAutoCommit(true);}conn.close();}} catch (SQLException closeEx) {LOGGER.warn("Failed to close JDBC resource while undo ... ", closeEx);}}}}
通过xid和分支事务ID,从业务库中的undo.log表中获取回滚日志,经过解析和反序列化后执行
io.seata.rm.datasource.undo.AbstractUndoExecutor#executeOn
public void executeOn(Connection conn) throws SQLException {// 前后镜像数据 与当前数据三者比较// 如果前镜像数据 = 后镜像数据 无需回滚 数据没有变化// 如果后镜像数据 != 当前数据 不能回滚,因为产生了脏数据if (IS_UNDO_DATA_VALIDATION_ENABLE && !dataValidationAndGoOn(conn)) {return;}PreparedStatement undoPST = null;try {// 构建回滚SQL模版String undoSQL = buildUndoSQL();undoPST = conn.prepareStatement(undoSQL);//获取数据修改前的镜像数据TableRecords undoRows = getUndoRows();//遍历所有修改前的行数据for (Row undoRow : undoRows.getRows()) {ArrayList<Field> undoValues = new ArrayList<>();List<Field> pkValueList = getOrderedPkList(undoRows, undoRow, getDbType(conn));for (Field field : undoRow.getFields()) {if (field.getKeyType() != KeyType.PRIMARY_KEY) {undoValues.add(field);}}//拼装成真正的回滚SQLundoPrepare(undoPST, undoValues, pkValueList);//执行回滚SQLundoPST.executeUpdate();}} catch (Exception ex) {if (ex instanceof SQLException) {throw (SQLException) ex;} else {throw new SQLException(ex);}}finally {//important for oracleIOUtil.close(undoPST);}}
在执行回滚SQL前,会将前后镜像数据与当前数据进行比较
// 如果前镜像数据 = 后镜像数据 无需回滚 数据没有变化// 如果后镜像数据 != 当前数据 不能回滚,因为产生了脏数据
如果后镜像数据等于当前数据 且 不等于前镜像数据,那么将创建回滚SQL并执行,完成分支事务回滚操作