【Seata源码学习 】篇六 全局事务提交与回滚

【Seata源码学习 】篇六 全局事务提交与回滚

全局事务提交

TM在RPC远程调用RM后,如果没有出现异常,将向TC发送提交全局事务请求io.seata.tm.api.TransactionalTemplate#execute

public Object execute(TransactionalExecutor business) throws Throwable {// 1. Get transactionInfo//获取@GlobalTransation注解的属性封装的TransactionInfoTransactionInfo txInfo = business.getTransactionInfo();if (txInfo == null) {throw new ShouldNeverHappenException("transactionInfo does not exist");}// 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.// GlobalTransactionContext 全局事务上下文对象 用于创建一个新事务,或者获取当前事务// GlobalTransactionContext.getCurrent - > RootContext.getXID -> ContextCore.get// ContextCore 是一个接口 seata有两个实现  FastThreadLocalContextCore  ThreadLocalContextCore 都是基于ThreadLocal存储XIDGlobalTransaction tx = GlobalTransactionContext.getCurrent();// 1.2 Handle the transaction propagation.// 获取当前事务的传播行为Propagation propagation = txInfo.getPropagation();// 存储被挂起的事务XIDSuspendedResourcesHolder suspendedResourcesHolder = null;try {//处理事务的传播行为switch (propagation) {//如果当前事务的传播行为是 NOT_SUPPORTED 则以非事务的方式执行调用methodInvocation.proceed()// 如果当前拦截器不为拦截链的最后一个,则将获取下一个拦截器执行invoke方法,如果是最后一个,则直接执行目标方法case NOT_SUPPORTED:// If transaction is existing, suspend it.//如果当前存在全局事务,则挂起当前事务if (existingTransaction(tx)) {suspendedResourcesHolder = tx.suspend();}// Execute without transaction and return.// 责任链模式 继续执行拦截器链return business.execute();case REQUIRES_NEW:// If transaction is existing, suspend it, and then begin new transaction.// 如果当前存在事务 则挂起当前事务 并创建一个新的事务if (existingTransaction(tx)) {suspendedResourcesHolder = tx.suspend();tx = GlobalTransactionContext.createNew();}// Continue and execute with new transactionbreak;case SUPPORTS:// If transaction is not existing, execute without transaction.// 如果不存在事务 则跳过当前事务拦截器 执行拦截器链并返回if (notExistingTransaction(tx)) {return business.execute();}// Continue and execute with new transactionbreak;case REQUIRED:// If current transaction is existing, execute with current transaction,// else continue and execute with new transaction.break;case NEVER:// If transaction is existing, throw exception.// 有事务抛出异常if (existingTransaction(tx)) {throw new TransactionException(String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s", tx.getXid()));} else {// Execute without transaction and return.return business.execute();}case MANDATORY:// If transaction is not existing, throw exception.// 要求必须有事务,没事务抛出异常if (notExistingTransaction(tx)) {throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");}// Continue and execute with current transaction.break;default:throw new TransactionException("Not Supported Propagation:" + propagation);}// 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.// 如果当前的事务上下文中不存在事务 那么此次事务发起为 TM 角色为 Launcherif (tx == null) {tx = GlobalTransactionContext.createNew();}// set current tx config to holder// 记录当前的全局锁配置,存放到 ThreadLocalGlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);try {// 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,//    else do nothing. Of course, the hooks will still be triggered.// 执行全局事务开启的前后置钩子方法// 如果当前事务的角色是 Participant 也就是 RM ,判断当前事务上下文RootContext是否存在XID,如果不存在,抛出异常// 如果当前事务的角色是 launcher 也就是 TM ,判断当前事务上下文RootContext是否存在XID,如果存在,抛出异常// 如果不存在,则通过TmNettyRemotingClient  向TC发送一个 GlobalReportRequest 同步消息,并获取TC返回的XID,绑定到RootContextbeginTransaction(txInfo, tx);Object rs;try {// Do Your Business// 执行执行拦截器链路rs = business.execute();} catch (Throwable ex) {// 3. The needed business exception to rollback.// 如果抛出异常,判断异常是否在指定的范围中(默认为Throwable类及其子类)// 执行异常回滚的前后钩子方法// 如果当前事务的角色是 launcher 也就是 TM ,通过TmNettyRemotingClient  向TC发送一个 GlobalRollbackRequest 同步消息// 并记录TC返回的当前事务状态StatuscompleteTransactionAfterThrowing(txInfo, tx, ex);throw ex;}// 4. everything is fine, commit.//  如果方法执行过程中没有出现异常//  执行事务提交的前后置方法//  如果当前事务的角色是 launcher 也就是 TM ,通过TmNettyRemotingClient  向TC发送一个 GlobalCommitRequest 同步消息 提交全局事务// 并记录TC返回的当前事务状态StatuscommitTransaction(tx);return rs;} finally {//5. clear// 恢复以前的全局锁配置resumeGlobalLockConfig(previousConfig);// 执行整个事务完成的前后置方法triggerAfterCompletion();// 移除当前绑定的事务钩子对象cleanUp();}} finally {// If the transaction is suspended, resume it.// 当前事务执行完毕后,恢复挂起的事务,// 获取suspendedResourcesHolder关联的xid,由RootContext重新绑定if (suspendedResourcesHolder != null) {tx.resume(suspendedResourcesHolder);}}}

事务提交前后钩子方法执行

在全局事务提交前后,seata给我们预留了两个钩子方法,可以根据实际生产中的业务需要进行扩展

io.seata.tm.api.TransactionalTemplate#commitTransaction

    private void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {try {//全局事务提交前钩子方法triggerBeforeCommit();//全局事务提交tx.commit();//全局事务提交后钩子方法triggerAfterCommit();} catch (TransactionException txe) {// 4.1 Failed to committhrow new TransactionalExecutor.ExecutionException(tx, txe,TransactionalExecutor.Code.CommitFailure);}}

TM提交全局事务

io.seata.tm.api.DefaultGlobalTransaction#commit

public void commit() throws TransactionException {// 全局事务提交必须是 TM发起的if (role == GlobalTransactionRole.Participant) {// Participant has no responsibility of committingif (LOGGER.isDebugEnabled()) {LOGGER.debug("Ignore Commit(): just involved in global transaction [{}]", xid);}return;}//XID不能为空assertXIDNotNull();// 全局事务提交失败默认重试5次int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT;try {while (retry > 0) {try {retry--;//由 TransactionManager 对事务进行提交操作 status = transactionManager.commit(xid);break;} catch (Throwable ex) {LOGGER.error("Failed to report global commit [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());if (retry == 0) {throw new TransactionException("Failed to report global commit", ex);}}}} finally {if (xid.equals(RootContext.getXID())) {//挂起事务 -》 将当前的XID解绑,将XID封装到SuspendedResourcesHolder中suspend();}}if (LOGGER.isInfoEnabled()) {LOGGER.info("[{}] commit status: {}", xid, status);}}

全局事务提交失败默认会重试5次,seata对事务的操作都是交由TransactionManager接口处理,TC、TM、RM分别有不同的实现类,具体如下图所示

image-20231201164837073

io.seata.tm.DefaultTransactionManager#commit

    public GlobalStatus commit(String xid) throws TransactionException {GlobalCommitRequest globalCommit = new GlobalCommitRequest();//绑定全局事务XIDglobalCommit.setXid(xid);//向TC发送GlobalCommitRequest消息GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);return response.getGlobalStatus();}

TC处理全局事务提交

TC接收到消息后由ServerHandler调用processMessage处理,将根据消息的类型从pair中匹配合适的RemotingProcessor处理器; GlobalCommitResponse 消息将匹配ServerOnRequestProcessor处理器,ServerOnRequestProcessor进而又将消息由TransactionMessageHandler进行处理。

TransactionMessageHandler 也是一个接口,TC将使用的是DefaultCoordinator实现类,最终将消息向上转型为AbstractTransactionRequestToTC,调用不同消息子类的handle进行处理。

io.seata.core.protocol.transaction.GlobalCommitRequest#handle

    public AbstractTransactionResponse handle(RpcContext rpcContext) {return handler.handle(this, rpcContext);}

io.seata.server.AbstractTCInboundHandler#handle(io.seata.core.protocol.transaction.GlobalCommitRequest, io.seata.core.rpc.RpcContext)

public GlobalCommitResponse handle(GlobalCommitRequest request, final RpcContext rpcContext) {GlobalCommitResponse response = new GlobalCommitResponse();response.setGlobalStatus(GlobalStatus.Committing);exceptionHandleTemplate(new AbstractCallback<GlobalCommitRequest, GlobalCommitResponse>() {@Overridepublic void execute(GlobalCommitRequest request, GlobalCommitResponse response)throws TransactionException {try {//真正开始执行全局事务提交doGlobalCommit(request, response, rpcContext);} catch (StoreException e) {throw new TransactionException(TransactionExceptionCode.FailedStore,String.format("global commit request failed. xid=%s, msg=%s", request.getXid(), e.getMessage()),e);}}@Overridepublic void onTransactionException(GlobalCommitRequest request, GlobalCommitResponse response,TransactionException tex) {super.onTransactionException(request, response, tex);checkTransactionStatus(request, response);}@Overridepublic void onException(GlobalCommitRequest request, GlobalCommitResponse response, Exception rex) {super.onException(request, response, rex);checkTransactionStatus(request, response);}}, request, response);return response;}

io.seata.server.coordinator.DefaultCoordinator#doGlobalCommit

    protected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext)throws TransactionException {MDC.put(RootContext.MDC_KEY_XID, request.getXid());response.setGlobalStatus(core.commit(request.getXid()));}

io.seata.server.coordinator.DefaultCore#commit

  @Overridepublic GlobalStatus commit(String xid) throws TransactionException {// 根据XID 从存储介质中找到对应的GlobalSessionGlobalSession globalSession = SessionHolder.findGlobalSession(xid);// 如果没找到 则全局事务已结束if (globalSession == null) {return GlobalStatus.Finished;}globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());// just lock changeStatusboolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {if (globalSession.getStatus() == GlobalStatus.Begin) {// Highlight: Firstly, close the session, then no more branch can be registered.// AT模式释放全局锁globalSession.closeAndClean();// 如果是AT模式 ,可以异步提交if (globalSession.canBeCommittedAsync()) {// 将当前全局事务状态改为 异步提交中 并且将当前全局事务会话加入到 ASYNC_COMMITTING_SESSION_MANAGER sessionHolder中globalSession.asyncCommit();MetricsPublisher.postSessionDoneEvent(globalSession, GlobalStatus.Committed, false, false);return false;} else {globalSession.changeGlobalStatus(GlobalStatus.Committing);return true;}}return false;});if (shouldCommit) {boolean success = doGlobalCommit(globalSession, false);//If successful and all remaining branches can be committed asynchronously, do async commit.if (success && globalSession.hasBranch() && globalSession.canBeCommittedAsync()) {globalSession.asyncCommit();return GlobalStatus.Committed;} else {return globalSession.getStatus();}} else {return globalSession.getStatus() == GlobalStatus.AsyncCommitting ? GlobalStatus.Committed : globalSession.getStatus();}}

DefaultCore 首先根据XID从存储介质中找到GlobalSession,AT模式下释放全局锁,标记当前全局事务状态为GlobalStatus.AsyncCommitting

seata 服务端在启动时会初始化 DefaultCoordinator,DefaultCoordinator会启动周期线程每隔一秒钟执行handleAsyncCommitting处理异步全局事务提交

     asyncCommitting.scheduleAtFixedRate(() -> SessionHolder.distributedLockAndExecute(ASYNC_COMMITTING, this::handleAsyncCommitting), 0,ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
protected void handleAsyncCommitting() {// 用于从存储介质中查找满足条件的sessionSessionCondition sessionCondition = new SessionCondition(GlobalStatus.AsyncCommitting);// 查找所有状态为 GlobalStatus.AsyncCommitting 的全局事务Collection<GlobalSession> asyncCommittingSessions =SessionHolder.getAsyncCommittingSessionManager().findGlobalSessions(sessionCondition);if (CollectionUtils.isEmpty(asyncCommittingSessions)) {return;}SessionHelper.forEach(asyncCommittingSessions, asyncCommittingSession -> {try {asyncCommittingSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());// 调用 DefaultCore.doGlobalCommit 处理core.doGlobalCommit(asyncCommittingSession, true);} catch (TransactionException ex) {LOGGER.error("Failed to async committing [{}] {} {}", asyncCommittingSession.getXid(), ex.getCode(), ex.getMessage(), ex);}});}

io.seata.server.coordinator.DefaultCore#doGlobalCommit

public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {boolean success = true;// start committing eventMetricsPublisher.postSessionDoingEvent(globalSession, retrying);//当前事务未AT模式if (globalSession.isSaga()) {success = getCore(BranchType.SAGA).doGlobalCommit(globalSession, retrying);} else {// 遍历所有的分支事务Boolean result = SessionHelper.forEach(globalSession.getSortedBranches(), branchSession -> {// if not retrying, skip the canBeCommittedAsync branchesif (!retrying && branchSession.canBeCommittedAsync()) {return CONTINUE;}BranchStatus currentStatus = branchSession.getStatus();if (currentStatus == BranchStatus.PhaseOne_Failed) {SessionHelper.removeBranch(globalSession, branchSession, !retrying);return CONTINUE;}try {// 策略模式 不同的分支事务类型交由不同的 Core 处理BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);if (isXaerNotaTimeout(globalSession,branchStatus)) {LOGGER.info("Commit branch XAER_NOTA retry timeout, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());branchStatus = BranchStatus.PhaseTwo_Committed;}switch (branchStatus) {//分支事务提交成功  将当前分支事务从全局事务中移除case PhaseTwo_Committed:SessionHelper.removeBranch(globalSession, branchSession, !retrying);return CONTINUE;case PhaseTwo_CommitFailed_Unretryable://not at branchSessionHelper.endCommitFailed(globalSession, retrying);LOGGER.error("Committing global transaction[{}] finally failed, caused by branch transaction[{}] commit failed.", globalSession.getXid(), branchSession.getBranchId());return false;default:if (!retrying) {globalSession.queueToRetryCommit();return false;}if (globalSession.canBeCommittedAsync()) {LOGGER.error("Committing branch transaction[{}], status:{} and will retry later",branchSession.getBranchId(), branchStatus);return CONTINUE;} else {LOGGER.error("Committing global transaction[{}] failed, caused by branch transaction[{}] commit failed, will retry later.", globalSession.getXid(), branchSession.getBranchId());return false;}}} catch (Exception ex) {StackTraceLogger.error(LOGGER, ex, "Committing branch transaction exception: {}",new String[] {branchSession.toString()});if (!retrying) {globalSession.queueToRetryCommit();throw new TransactionException(ex);}}return CONTINUE;});// Return if the result is not nullif (result != null) {return result;}//If has branch and not all remaining branches can be committed asynchronously,//do print log and return falseif (globalSession.hasBranch() && !globalSession.canBeCommittedAsync()) {LOGGER.info("Committing global transaction is NOT done, xid = {}.", globalSession.getXid());return false;}if (!retrying) {//contains not AT branchglobalSession.setStatus(GlobalStatus.Committed);}}// if it succeeds and there is no branch, retrying=true is the asynchronous state when retrying. EndCommitted is// executed to improve concurrency performance, and the global transaction ends..if (success && globalSession.getBranchSessions().isEmpty()) {SessionHelper.endCommitted(globalSession, retrying);LOGGER.info("Committing global transaction is successfully done, xid = {}.", globalSession.getXid());}return success;}

全局事务在提交时,会遍历所有与之关联的分支事务ID,向RM发送BranchCommitRequest 消息,RM分支事务提交成功后将分支事务从全局事务中删除。当所有分支事务全部执行成功后,将GlobalSession信息从数据库中删除

    public BranchStatus branchCommit(GlobalSession globalSession, BranchSession branchSession) throws TransactionException {try {BranchCommitRequest request = new BranchCommitRequest();request.setXid(branchSession.getXid());request.setBranchId(branchSession.getBranchId());request.setResourceId(branchSession.getResourceId());request.setApplicationData(branchSession.getApplicationData());request.setBranchType(branchSession.getBranchType());return branchCommitSend(request, globalSession, branchSession);} catch (IOException | TimeoutException e) {throw new BranchTransactionException(FailedToSendBranchCommitRequest,String.format("Send branch commit failed, xid = %s branchId = %s", branchSession.getXid(),branchSession.getBranchId()), e);}}

RM提交分支事务

TC发送BranchCommitRequest消息后,RM接收到并将消息交由RmBranchCommitProcessor处理

io.seata.core.rpc.netty.RmNettyRemotingClient#registerProcessor

       // 分支事务提交RmBranchCommitProcessor rmBranchCommitProcessor = new RmBranchCommitProcessor(getTransactionMessageHandler(), this);super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT, rmBranchCommitProcessor, messageExecutor);

io.seata.core.rpc.processor.client.RmBranchCommitProcessor#process

   public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {//获取TC的地址String remoteAddress = NetUtil.toStringAddress(ctx.channel().remoteAddress());Object msg = rpcMessage.getBody();if (LOGGER.isInfoEnabled()) {LOGGER.info("rm client handle branch commit process:" + msg);}//处理分支事务提交handleBranchCommit(rpcMessage, remoteAddress, (BranchCommitRequest) msg);}

io.seata.core.rpc.processor.client.RmBranchCommitProcessor#handleBranchCommit

    private void handleBranchCommit(RpcMessage request, String serverAddress, BranchCommitRequest branchCommitRequest) {BranchCommitResponse resultMessage;resultMessage = (BranchCommitResponse) handler.onRequest(branchCommitRequest, null);if (LOGGER.isDebugEnabled()) {LOGGER.debug("branch commit result:" + resultMessage);}try {this.remotingClient.sendAsyncResponse(serverAddress, request, resultMessage);} catch (Throwable throwable) {LOGGER.error("branch commit error: {}", throwable.getMessage(), throwable);}}

io.seata.rm.AbstractRMHandler#onRequest

    public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {if (!(request instanceof AbstractTransactionRequestToRM)) {throw new IllegalArgumentException();}AbstractTransactionRequestToRM transactionRequest = (AbstractTransactionRequestToRM)request;transactionRequest.setRMInboundMessageHandler(this);return transactionRequest.handle(context);}

io.seata.core.protocol.transaction.BranchCommitRequest#handle

    public AbstractTransactionResponse handle(RpcContext rpcContext) {return handler.handle(this);}

io.seata.rm.AbstractRMHandler#handle(io.seata.core.protocol.transaction.BranchCommitRequest)

    public BranchCommitResponse handle(BranchCommitRequest request) {BranchCommitResponse response = new BranchCommitResponse();//真正开始处理分支事务提交exceptionHandleTemplate(new AbstractCallback<BranchCommitRequest, BranchCommitResponse>() {@Overridepublic void execute(BranchCommitRequest request, BranchCommitResponse response)throws TransactionException {doBranchCommit(request, response);}}, request, response);return response;}

io.seata.rm.AbstractRMHandler#doBranchCommit

protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response)throws TransactionException {String xid = request.getXid();long branchId = request.getBranchId();String resourceId = request.getResourceId();String applicationData = request.getApplicationData();if (LOGGER.isInfoEnabled()) {LOGGER.info("Branch committing: " + xid + " " + branchId + " " + resourceId + " " + applicationData);}// AT模式下 将从存储介质中删除Undo.log日志BranchStatus status = getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId,applicationData);response.setXid(xid);response.setBranchId(branchId);response.setBranchStatus(status);if (LOGGER.isInfoEnabled()) {LOGGER.info("Branch commit result: " + status);}}

全局事务提交时,TC会释放全局锁,如果为AT模式将标记全局事务状态为异步提交中,然后遍历所有与当前全局事务绑定的分支事务,向RM发送BranchCommitRequest消息,RM随后删除undo.log日志,并向TC返回状态。待所有的分支事务全部删除undo.log日志成功后,TC将标记全局事务状态为Committed 并 从存储介质中删除全局事务信息

全局事务回滚

TM提交全局事务回滚请求

TM在RPC远程调用RM后,如果出现异常,获取自身业务方法抛出异常,将执行completeTransactionAfterThrowing方法

io.seata.tm.api.TransactionalTemplate#completeTransactionAfterThrowing

    private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable originalException) throws TransactionalExecutor.ExecutionException {//roll backif (txInfo != null && txInfo.rollbackOn(originalException)) {try {//全局事务回滚rollbackTransaction(tx, originalException);} catch (TransactionException txe) {// Failed to rollbackthrow new TransactionalExecutor.ExecutionException(tx, txe,TransactionalExecutor.Code.RollbackFailure, originalException);}} else {// not roll back on this exception, so commitcommitTransaction(tx);}}

接着向TC发送GlobalRollbackRequest消息

io.seata.tm.DefaultTransactionManager#rollback

    public GlobalStatus rollback(String xid) throws TransactionException {GlobalRollbackRequest globalRollback = new GlobalRollbackRequest();globalRollback.setXid(xid);GlobalRollbackResponse response = (GlobalRollbackResponse) syncCall(globalRollback);return response.getGlobalStatus();}

TC处理全局事务回滚请求

io.seata.server.AbstractTCInboundHandler#handle(io.seata.core.protocol.transaction.GlobalRollbackRequest, io.seata.core.rpc.RpcContext)

 public GlobalRollbackResponse handle(GlobalRollbackRequest request, final RpcContext rpcContext) {GlobalRollbackResponse response = new GlobalRollbackResponse();response.setGlobalStatus(GlobalStatus.Rollbacking);exceptionHandleTemplate(new AbstractCallback<GlobalRollbackRequest, GlobalRollbackResponse>() {@Overridepublic void execute(GlobalRollbackRequest request, GlobalRollbackResponse response)throws TransactionException {try {doGlobalRollback(request, response, rpcContext);} catch (StoreException e) {throw new TransactionException(TransactionExceptionCode.FailedStore, String.format("global rollback request failed. xid=%s, msg=%s", request.getXid(), e.getMessage()), e);}}@Overridepublic void onTransactionException(GlobalRollbackRequest request, GlobalRollbackResponse response,TransactionException tex) {super.onTransactionException(request, response, tex);// may be appears StoreException outer layer method catchcheckTransactionStatus(request, response);}@Overridepublic void onException(GlobalRollbackRequest request, GlobalRollbackResponse response, Exception rex) {super.onException(request, response, rex);// may be appears StoreException outer layer method catchcheckTransactionStatus(request, response);}}, request, response);return response;}

io.seata.server.coordinator.DefaultCore#rollback

 public GlobalStatus rollback(String xid) throws TransactionException {GlobalSession globalSession = SessionHolder.findGlobalSession(xid);if (globalSession == null) {return GlobalStatus.Finished;}globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());// just lock changeStatusboolean shouldRollBack = SessionHolder.lockAndExecute(globalSession, () -> {globalSession.close(); // Highlight: Firstly, close the session, then no more branch can be registered.if (globalSession.getStatus() == GlobalStatus.Begin) {// 标记全局事务状态为 RollbackingglobalSession.changeGlobalStatus(GlobalStatus.Rollbacking);return true;}return false;});if (!shouldRollBack) {return globalSession.getStatus();}boolean rollbackSuccess = doGlobalRollback(globalSession, false);return rollbackSuccess ? GlobalStatus.Rollbacked : globalSession.getStatus();}

io.seata.server.coordinator.DefaultCore#doGlobalRollback

 public boolean doGlobalRollback(GlobalSession globalSession, boolean retrying) throws TransactionException {boolean success = true;// start rollback eventMetricsPublisher.postSessionDoingEvent(globalSession, retrying);if (globalSession.isSaga()) {success = getCore(BranchType.SAGA).doGlobalRollback(globalSession, retrying);} else {//遍历分支事务Boolean result = SessionHelper.forEach(globalSession.getReverseSortedBranches(), branchSession -> {BranchStatus currentBranchStatus = branchSession.getStatus();if (currentBranchStatus == BranchStatus.PhaseOne_Failed) {SessionHelper.removeBranch(globalSession, branchSession, !retrying);return CONTINUE;}try {//分支事务回滚操作BranchStatus branchStatus = branchRollback(globalSession, branchSession);if (isXaerNotaTimeout(globalSession, branchStatus)) {LOGGER.info("Rollback branch XAER_NOTA retry timeout, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());branchStatus = BranchStatus.PhaseTwo_Rollbacked;}switch (branchStatus) {//回滚成功则将分支事务从存储介质中删除case PhaseTwo_Rollbacked:SessionHelper.removeBranch(globalSession, branchSession, !retrying);LOGGER.info("Rollback branch transaction successfully, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());return CONTINUE;//回滚失败则标记全局锁回滚失败,并释放全局锁case PhaseTwo_RollbackFailed_Unretryable:SessionHelper.endRollbackFailed(globalSession, retrying);LOGGER.info("Rollback branch transaction fail and stop retry, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());return false;default:LOGGER.info("Rollback branch transaction fail and will retry, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());if (!retrying) {globalSession.queueToRetryRollback();}return false;}} catch (Exception ex) {StackTraceLogger.error(LOGGER, ex,"Rollback branch transaction exception, xid = {} branchId = {} exception = {}",new String[] {globalSession.getXid(), String.valueOf(branchSession.getBranchId()), ex.getMessage()});if (!retrying) {globalSession.queueToRetryRollback();}throw new TransactionException(ex);}});// Return if the result is not nullif (result != null) {return result;}}// In db mode, lock and branch data residual problems may occur.// Therefore, execution needs to be delayed here and cannot be executed synchronously.if (success) {// 全局事务回滚成功,将全局锁释并标记全局锁状态为 Rollbacked 随后删除全局事务信息SessionHelper.endRollbacked(globalSession, retrying);LOGGER.info("Rollback global transaction successfully, xid = {}.", globalSession.getXid());}return success;}

io.seata.server.coordinator.AbstractCore#branchRollback

    public BranchStatus branchRollback(GlobalSession globalSession, BranchSession branchSession) throws TransactionException {try {BranchRollbackRequest request = new BranchRollbackRequest();request.setXid(branchSession.getXid());request.setBranchId(branchSession.getBranchId());request.setResourceId(branchSession.getResourceId());request.setApplicationData(branchSession.getApplicationData());request.setBranchType(branchSession.getBranchType());return branchRollbackSend(request, globalSession, branchSession);} catch (IOException | TimeoutException e) {throw new BranchTransactionException(FailedToSendBranchRollbackRequest,String.format("Send branch rollback failed, xid = %s branchId = %s",branchSession.getXid(), branchSession.getBranchId()), e);}}

与全局事务提交类似,全局事务回滚时也是遍历所有的分支事务,随后进行分支事务回滚操作,分支事务回滚成功后就将分支事务的信息从存储介质中删除;待所有的分支事务全部回滚后,就将全局锁释放,并删除全局锁信息。

接下来我们看下分支事务回滚操作

RM处理分支事务回滚

TC向RM发送BranchRollbackRequest消息后,RM将消息交由 RmBranchRollbackProcessor 进行处理

        RmBranchRollbackProcessor rmBranchRollbackProcessor = new RmBranchRollbackProcessor(getTransactionMessageHandler(), this);super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK, rmBranchRollbackProcessor, messageExecutor);

io.seata.core.rpc.processor.client.RmBranchRollbackProcessor#handleBranchRollback

    private void handleBranchRollback(RpcMessage request, String serverAddress, BranchRollbackRequest branchRollbackRequest) {BranchRollbackResponse resultMessage;resultMessage = (BranchRollbackResponse) handler.onRequest(branchRollbackRequest, null);if (LOGGER.isDebugEnabled()) {LOGGER.debug("branch rollback result:" + resultMessage);}try {this.remotingClient.sendAsyncResponse(serverAddress, request, resultMessage);} catch (Throwable throwable) {LOGGER.error("send response error: {}", throwable.getMessage(), throwable);}}

io.seata.rm.AbstractRMHandler#handle(io.seata.core.protocol.transaction.BranchRollbackRequest)

    public BranchRollbackResponse handle(BranchRollbackRequest request) {BranchRollbackResponse response = new BranchRollbackResponse();exceptionHandleTemplate(new AbstractCallback<BranchRollbackRequest, BranchRollbackResponse>() {@Overridepublic void execute(BranchRollbackRequest request, BranchRollbackResponse response)throws TransactionException {doBranchRollback(request, response);}}, request, response);return response;}

io.seata.rm.AbstractRMHandler#doBranchRollback

    protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response)throws TransactionException {String xid = request.getXid();long branchId = request.getBranchId();String resourceId = request.getResourceId();String applicationData = request.getApplicationData();if (LOGGER.isInfoEnabled()) {LOGGER.info("Branch Rollbacking: " + xid + " " + branchId + " " + resourceId);}//AT模式 使用的是 DataSourceManagerBranchStatus status = getResourceManager().branchRollback(request.getBranchType(), xid, branchId, resourceId,applicationData);response.setXid(xid);response.setBranchId(branchId);response.setBranchStatus(status);if (LOGGER.isInfoEnabled()) {LOGGER.info("Branch Rollbacked result: " + status);}}

io.seata.rm.datasource.DataSourceManager#branchRollback

    public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,String applicationData) throws TransactionException {DataSourceProxy dataSourceProxy = get(resourceId);if (dataSourceProxy == null) {throw new ShouldNeverHappenException(String.format("resource: %s not found",resourceId));}try {//通过undo.log日志进行回滚UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId);} catch (TransactionException te) {StackTraceLogger.info(LOGGER, te,"branchRollback failed. branchType:[{}], xid:[{}], branchId:[{}], resourceId:[{}], applicationData:[{}]. reason:[{}]",new Object[]{branchType, xid, branchId, resourceId, applicationData, te.getMessage()});if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;} else {return BranchStatus.PhaseTwo_RollbackFailed_Retryable;}}return BranchStatus.PhaseTwo_Rollbacked;}

io.seata.rm.datasource.undo.AbstractUndoLogManager#undo

public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {Connection conn = null;ResultSet rs = null;PreparedStatement selectPST = null;boolean originalAutoCommit = true;for (; ; ) {try {conn = dataSourceProxy.getPlainConnection();// The entire undo process should run in a local transaction.if (originalAutoCommit = conn.getAutoCommit()) {conn.setAutoCommit(false);}// Find UNDO LOG//根据全局事务xid 和分支事务id查找回滚日志selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);selectPST.setLong(1, branchId);selectPST.setString(2, xid);rs = selectPST.executeQuery();//存在回滚日志boolean exists = false;while (rs.next()) {exists = true;// It is possible that the server repeatedly sends a rollback request to roll back// the same branch transaction to multiple processes,// ensuring that only the undo_log in the normal state is processed.int state = rs.getInt(ClientTableColumnsName.UNDO_LOG_LOG_STATUS);if (!canUndo(state)) {if (LOGGER.isInfoEnabled()) {LOGGER.info("xid {} branch {}, ignore {} undo_log", xid, branchId, state);}return;}String contextString = rs.getString(ClientTableColumnsName.UNDO_LOG_CONTEXT);//解析回滚日志Map<String, String> context = parseContext(contextString);byte[] rollbackInfo = getRollbackInfo(rs);String serializer = context == null ? null : context.get(UndoLogConstants.SERIALIZER_KEY);UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance(): UndoLogParserFactory.getInstance(serializer);//反序列化成BranchUndoLog对象BranchUndoLog branchUndoLog = parser.decode(rollbackInfo);try {// put serializer name to localsetCurrentSerializer(parser.getName());List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs();if (sqlUndoLogs.size() > 1) {Collections.reverse(sqlUndoLogs);}for (SQLUndoLog sqlUndoLog : sqlUndoLogs) {TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dataSourceProxy.getDbType()).getTableMeta(conn, sqlUndoLog.getTableName(), dataSourceProxy.getResourceId());sqlUndoLog.setTableMeta(tableMeta);AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(dataSourceProxy.getDbType(), sqlUndoLog);//执行回滚日志undoExecutor.executeOn(conn);}} finally {// remove serializer nameremoveCurrentSerializer();}}// If undo_log exists, it means that the branch transaction has completed the first phase,// we can directly roll back and clean the undo_log// Otherwise, it indicates that there is an exception in the branch transaction,// causing undo_log not to be written to the database.// For example, the business processing timeout, the global transaction is the initiator rolls back.// To ensure data consistency, we can insert an undo_log with GlobalFinished state// to prevent the local transaction of the first phase of other programs from being correctly submitted.// See https://github.com/seata/seata/issues/489if (exists) {//回滚日志执行成功后删除deleteUndoLog(xid, branchId, conn);//提交本地事务conn.commit();if (LOGGER.isInfoEnabled()) {LOGGER.info("xid {} branch {}, undo_log deleted with {}", xid, branchId,State.GlobalFinished.name());}} else {insertUndoLogWithGlobalFinished(xid, branchId, UndoLogParserFactory.getInstance(), conn);conn.commit();if (LOGGER.isInfoEnabled()) {LOGGER.info("xid {} branch {}, undo_log added with {}", xid, branchId,State.GlobalFinished.name());}}return;} catch (SQLIntegrityConstraintViolationException e) {// Possible undo_log has been inserted into the database by other processes, retrying rollback undo_logif (LOGGER.isInfoEnabled()) {LOGGER.info("xid {} branch {}, undo_log inserted, retry rollback", xid, branchId);}} catch (Throwable e) {if (conn != null) {try {conn.rollback();} catch (SQLException rollbackEx) {LOGGER.warn("Failed to close JDBC resource while undo ... ", rollbackEx);}}throw new BranchTransactionException(BranchRollbackFailed_Retriable, String.format("Branch session rollback failed and try again later xid = %s branchId = %s %s", xid,branchId, e.getMessage()), e);} finally {try {if (rs != null) {rs.close();}if (selectPST != null) {selectPST.close();}if (conn != null) {if (originalAutoCommit) {conn.setAutoCommit(true);}conn.close();}} catch (SQLException closeEx) {LOGGER.warn("Failed to close JDBC resource while undo ... ", closeEx);}}}}

通过xid和分支事务ID,从业务库中的undo.log表中获取回滚日志,经过解析和反序列化后执行

io.seata.rm.datasource.undo.AbstractUndoExecutor#executeOn

public void executeOn(Connection conn) throws SQLException {// 前后镜像数据 与当前数据三者比较// 如果前镜像数据 = 后镜像数据 无需回滚 数据没有变化// 如果后镜像数据 != 当前数据 不能回滚,因为产生了脏数据if (IS_UNDO_DATA_VALIDATION_ENABLE && !dataValidationAndGoOn(conn)) {return;}PreparedStatement undoPST = null;try {// 构建回滚SQL模版String undoSQL = buildUndoSQL();undoPST = conn.prepareStatement(undoSQL);//获取数据修改前的镜像数据TableRecords undoRows = getUndoRows();//遍历所有修改前的行数据for (Row undoRow : undoRows.getRows()) {ArrayList<Field> undoValues = new ArrayList<>();List<Field> pkValueList = getOrderedPkList(undoRows, undoRow, getDbType(conn));for (Field field : undoRow.getFields()) {if (field.getKeyType() != KeyType.PRIMARY_KEY) {undoValues.add(field);}}//拼装成真正的回滚SQLundoPrepare(undoPST, undoValues, pkValueList);//执行回滚SQLundoPST.executeUpdate();}} catch (Exception ex) {if (ex instanceof SQLException) {throw (SQLException) ex;} else {throw new SQLException(ex);}}finally {//important for oracleIOUtil.close(undoPST);}}

在执行回滚SQL前,会将前后镜像数据与当前数据进行比较

    // 如果前镜像数据 = 后镜像数据 无需回滚 数据没有变化// 如果后镜像数据 != 当前数据 不能回滚,因为产生了脏数据

如果后镜像数据等于当前数据 且 不等于前镜像数据,那么将创建回滚SQL并执行,完成分支事务回滚操作

总结图

seata全局事务提交和回滚

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/195793.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【离散数学】——期末刷题题库(集合)

&#x1f383;个人专栏&#xff1a; &#x1f42c; 算法设计与分析&#xff1a;算法设计与分析_IT闫的博客-CSDN博客 &#x1f433;Java基础&#xff1a;Java基础_IT闫的博客-CSDN博客 &#x1f40b;c语言&#xff1a;c语言_IT闫的博客-CSDN博客 &#x1f41f;MySQL&#xff1a…

【FPGA】Verilog:二进制并行加法器 | 超前进位 | 实现 4 位二进制并行加法器和减法器 | MSI/LSI 运算电路

Ⅰ. 前置知识 0x00 并行加法器和减法器 如果我们要对 4 位加法器和减法器进行关于二进制并行运算功能&#xff0c;可以通过将加法器和减法器以 N 个并行连接的方式&#xff0c;创建一个执行 N 位加法和减法运算的电路。 4 位二进制并行加法器 4 位二进制并行减法器 换…

内存是如何工作的

一、什么是内存 从外观上辨识&#xff0c;它就是内存条&#xff1b;从硬件上讲&#xff0c;它叫RAM&#xff0c;翻译过来叫随机存储器。英文全称&#xff1a;Random Access Memory。它也叫主存&#xff0c;是与CPU直接交换数据的内部存储器。其特点是读写速度快&#xff0c;不…

java开发之个微机器人的实现

简要描述&#xff1a; 二次登录 请求URL&#xff1a; http://域名地址/secondLogin 请求方式&#xff1a; POST 请求头Headers&#xff1a; Content-Type&#xff1a;application/jsonAuthorization&#xff1a;login接口返回 参数&#xff1a; 参数名必选类型说明wcId…

【每日一题】从二叉搜索树到更大和树

文章目录 Tag题目来源题目解读解题思路方法一&#xff1a;中序遍历的反序方法二&#xff1a;后缀数组 写在最后 Tag 【中序遍历】【二叉树】【2023-12-04】 题目来源 1038. 从二叉搜索树到更大和树 题目解读 在二叉搜索树中&#xff0c;将每一个节点的值替换成树中大于等于该…

根文件系统lib库添加与初步测试

一. 简介 我们在编译 busybox源码时&#xff0c;选择的是动态编译&#xff0c;所以&#xff0c;制作生成的 根文件系统中/bin或 /sbin目录下软件运行时会调用到一些库文件的。库文件就是交叉编译器的库文件。 前面我们编译 busybox源码时&#xff0c;选择动态编译&#xff0…

NPS内网穿透教程

1.简介 nps是一款轻量级、高性能、功能强大的内网穿透代理服务器。目前支持tcp、udp流量转发&#xff0c;可支持任何tcp、udp上层协议&#xff08;访问内网网站、本地支付接口调试、ssh访问、远程桌面&#xff0c;内网dns解析等等……&#xff09;&#xff0c;此外还支持内网ht…

安卓1.0明显是基于linux内核开发的,安卓1.0是不是linux套壳?

安卓1.0明显是基于linux内核开发的&#xff0c;安卓1.0是不是linux套壳&#xff1f; 在开始前我有一些资料&#xff0c;是我根据自己从业十年经验&#xff0c;熬夜搞了几个通宵&#xff0c;精心整理了一份「安卓开发资料从专业入门到高级教程工具包」&#xff0c;点个关注&…

大数据集群增加数据盘,平衡数据盘HDFS Disk Balancer

大数据集群增加数据盘&#xff0c;平衡数据盘HDFS Disk Balancer 官网&#xff1a;https://hadoop.apache.org/docs/r3.3.6/hadoop-project-dist/hadoop-hdfs/HDFSDiskbalancer.html hdfs diskbalancer -execute /system/diskbalancer/nodename.plan.jsonhdfs diskbalancer -q…

IDEA2023找不到 Allow parallel run

我的idea版本&#xff1a;2023.1.4 第一步&#xff1a;点击Edit Configrations 第二步&#xff1a;点击Modify options 第三步&#xff1a;勾选Allow multiple instances 最后点击Apply应用一下 ok,问题解决&#xff01;

SSM项目实战-登录验证成功并路由到首页面,Vue3+Vite+Axios+Element-Plus技术

1、util/request.js import axios from "axios";let request axios.create({baseURL: "http://localhost:8080",timeout: 50000 });export default request 2、api/sysUser.js import request from "../util/request.js";export const login (…

Mysql日志

文章目录 1. 日志类型2. bin log2.1 写入机制2.2 binlog与redolog对比2.3 两阶段提交 3. 中继日志 1. 日志类型 这 6 类日志分别为&#xff1a; 慢查询日志&#xff1a; 记录所有执行时间超过long_query_time的所有查询&#xff0c;方便我们对查询进行优化。 通用查询日志&am…

在sCrypt网站上铭刻Ordinals

sCrypt发布了一个新的Ordinals铭刻工具&#xff0c;连接Panda Wallet后即可使用。你可以观看我们录制的视频教程&#xff0c;获得更多细节。 铭刻工具同时支持BSV主网&#xff08;mainnet&#xff09;和测试网&#xff08;testnet&#xff09;&#xff0c;你可以在我们的官方网…

手写VUE后台管理系统8 - 配置404NotFound路由

设置404页面 配置路由404页面 配置路由 这里配置了两个路由&#xff0c;一个是主页&#xff0c;另外一个则匹配任意路由显示为404页面。因为只配置了两个路由&#xff0c;如果路径没有匹配到主页&#xff0c;则会被自动导向到404页面&#xff0c;这样就可以实现整站统一的404页…

「Linux」使用C语言制作简易Shell

&#x1f4bb;文章目录 &#x1f4c4;前言简易shell实现shell的概念系统环境变量shell的结构定义内建命令完整代码 &#x1f4d3;总结 &#x1f4c4;前言 对于很多学习后端的同学来讲&#xff0c;学习了C语言&#xff0c;发现除了能写出那个经典的“hello world”以外&#xff…

142873-41-4脂质过氧化抑制剂1-星戈瑞

142873-41-4脂质过氧化抑制剂1 英文名称&#xff1a;Lipid peroxidation inhibitor 1 中文名称&#xff1a;脂质过氧化抑制剂 化学名称&#xff1a;2,4,6,7-四甲基-2-[(4-苯基哌啶-1-基)甲基]-3H-1-苯并呋喃-5-胺 CAS&#xff1a;142873-41-4 外观&#xff1a;固体粉末 分…

D2822ML 用于便携式录音机和收音机作音频功率放大器。采用 DIP8 SOP8 封装形式

D2822ML 用于便携式录音机和收音机作音频功率放大器。采用 DIP8 SOP8 封装形式 特点: 电源电压降到 1.8V 时仍能正常工作交越失真小 静态电流小可作桥式或立体声式功放应用外围元件少通道分离度高 开机和关机无冲击噪声软限幅

RT-Thread 内存管理

在计算机系统中&#xff0c;通常存储空间可以分为两种&#xff1a;内部存储空间和外部存储空间。 内部存储空间通常访问速度比较快&#xff0c;能够按照变量地址随机访问&#xff0c;也就是我们通常所说的RAM&#xff08;随机存储器&#xff09;&#xff0c;可以把它理解为电脑…

微信公众号端在线客服系统源码 聊天记录云端实时保存 附带完整的搭建教程

随着社交媒体的普及&#xff0c;越来越多的用户通过微信公众号与企业进行沟通。因此&#xff0c;开发一款基于微信公众号的在线客服系统&#xff0c;可以帮助企业更好地服务用户&#xff0c;提高客户满意度。同时&#xff0c;为了解决聊天记录的存储和管理问题&#xff0c;我们…

如何看待华为宣称“纯鸿蒙”OS将不再兼容安卓应用 APK彻底再见?

如何看待华为宣称“纯鸿蒙”OS将不再兼容安卓应用 APK彻底再见&#xff1f; 在开始前我有一些资料&#xff0c;是我根据自己从业十年经验&#xff0c;熬夜搞了几个通宵&#xff0c;精心整理了一份「安卓开发资料从专业入门到高级教程工具包」&#xff0c;点个关注&#xff0c;…