【SpringCloud-Seata源码分析3】

文章目录

  • 事务的提交
    • 客户端提交流程
    • 服务端提交流程
    • 客户端删除undo_log
  • 事务回滚
    • 客户端事务回滚
    • 服务端回滚事务

事务的提交

前面两篇我们分析了seata的TC初始化和TM,RM初始化,并且事务准备阶段源码及业务Sql执行,下面我们分析事务的提交源码。

客户端提交流程

在这里插入图片描述
这个主要是从 TransactionalTemplate#execute方法中

try {//准备阶段this.beginTransaction(txInfo, tx);Object rs;Object ex;try {//业务sql的执行rs = business.execute();} catch (Throwable var17) {ex = var17;this.completeTransactionAfterThrowing(txInfo, tx, var17);throw var17;}//事务的提交this.commitTransaction(tx);ex = rs;return ex;} finally {this.resumeGlobalLockConfig(previousConfig);this.triggerAfterCompletion();this.cleanUp();}
    private void commitTransaction(GlobalTransaction tx) throws ExecutionException {try {//前置镜像this.triggerBeforeCommit();//提交tx.commit();//后置镜像this.triggerAfterCommit();} catch (TransactionException var3) {throw new ExecutionException(tx, var3, Code.CommitFailure);}}
public void commit() throws TransactionException {//判断当前的角色是否是Launcherif (this.role == GlobalTransactionRole.Participant) {if (LOGGER.isDebugEnabled()) {LOGGER.debug("Ignore Commit(): just involved in global transaction [{}]", this.xid);}} else {//判断xid是否为nullthis.assertXIDNotNull();//重试次数,默认为5次可以自定定义重试次数int retry = COMMIT_RETRY_COUNT <= 0 ? 5 : COMMIT_RETRY_COUNT;try {while(retry > 0) {try {--retry;//循环请求后台,并且返回状态this.status = this.transactionManager.commit(this.xid);break;} catch (Throwable var6) {LOGGER.error("Failed to report global commit [{}],Retry Countdown: {}, reason: {}", new Object[]{this.getXid(), retry, var6.getMessage()});if (retry == 0) {throw new TransactionException("Failed to report global commit", var6);}}}} finally {if (this.xid.equals(RootContext.getXID())) {this.suspend();}}if (LOGGER.isInfoEnabled()) {LOGGER.info("[{}] commit status: {}", this.xid, this.status);}}}
    public GlobalStatus commit(String xid) throws TransactionException {//组装请求的参数GlobalCommitRequest globalCommit = new GlobalCommitRequest();globalCommit.setXid(xid);//netty请求seata后端GlobalCommitResponse response = (GlobalCommitResponse)this.syncCall(globalCommit);return response.getGlobalStatus();}
   private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {try {return (AbstractTransactionResponse)TmNettyRemotingClient.getInstance().sendSyncRequest(request);} catch (TimeoutException var3) {throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", var3);}}
}

服务端提交流程

在这里插入图片描述seata后台服务时对事务进行的异步提交,首先来分析一下我们后端服务的入口DefaultCoordinator#doglobalCommit方法

   protected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext)throws TransactionException {//存储线程池的xid方便看日志MDC.put(RootContext.MDC_KEY_XID, request.getXid());//核心方法response.setGlobalStatus(core.commit(request.getXid()));}
 public GlobalStatus commit(String xid) throws TransactionException {//key1:查询全局事务 如果事务被清理,则直接返回完成GlobalSession globalSession = SessionHolder.findGlobalSession(xid);if (globalSession == null) {return GlobalStatus.Finished;}//添加监听器globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());// just lock changeStatusboolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {if (globalSession.getStatus() == GlobalStatus.Begin) {// Highlight: Firstly, close the session, then no more branch can be registered.//释放全局锁globalSession.closeAndClean();// key2: 异步提交//判断是否可以进行异步提交,是AT模式if (globalSession.canBeCommittedAsync()) {//异步提交globalSession.asyncCommit();MetricsPublisher.postSessionDoneEvent(globalSession, GlobalStatus.Committed, false, false);return false;} else {//更改全局事务状态为commitingglobalSession.changeGlobalStatus(GlobalStatus.Committing);return true;}}return false;});//如果可以提交,则执行shouldCommitif (shouldCommit) {boolean success = doGlobalCommit(globalSession, false);//If successful and all remaining branches can be committed asynchronously, do async commit.if (success && globalSession.hasBranch() && globalSession.canBeCommittedAsync()) {globalSession.asyncCommit();return GlobalStatus.Committed;} else {return globalSession.getStatus();}} else {return globalSession.getStatus() == GlobalStatus.AsyncCommitting ? GlobalStatus.Committed : globalSession.getStatus();}}

AT模式将事务状态改为异步提交AsyncCommitting,然后定时运行线程池1s轮训运行一次

    public void asyncCommit() throws TransactionException {this.addSessionLifecycleListener(SessionHolder.getAsyncCommittingSessionManager());// 设置一个状态异步提交·this.setStatus(GlobalStatus.AsyncCommitting);//  异步增加全局session信息SessionHolder.getAsyncCommittingSessionManager().addGlobalSession(this);}

异步定时任务DefaultCoordinator#init方法中定时开启

    public void init() {retryRollbacking.scheduleAtFixedRate(() -> SessionHolder.distributedLockAndExecute(RETRY_ROLLBACKING, this::handleRetryRollbacking), 0,ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS);retryCommitting.scheduleAtFixedRate(() -> SessionHolder.distributedLockAndExecute(RETRY_COMMITTING, this::handleRetryCommitting), 0,COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);//我们事务异步提交都在这里执行asyncCommitting.scheduleAtFixedRate(() -> SessionHolder.distributedLockAndExecute(ASYNC_COMMITTING, this::handleAsyncCommitting), 0,ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);timeoutCheck.scheduleAtFixedRate(() -> SessionHolder.distributedLockAndExecute(TX_TIMEOUT_CHECK, this::timeoutCheck), 0,TIMEOUT_RETRY_PERIOD, TimeUnit.MILLISECONDS);undoLogDelete.scheduleAtFixedRate(() -> SessionHolder.distributedLockAndExecute(UNDOLOG_DELETE, this::undoLogDelete),UNDO_LOG_DELAY_DELETE_PERIOD, UNDO_LOG_DELETE_PERIOD, TimeUnit.MILLISECONDS);}

获取所有的一步提交状态的事务,并加上排他锁

    public static boolean distributedLockAndExecute(String key, NoArgsFunc func) {boolean lock = false;try {//核心方法,请求加锁if (lock = acquireDistributedLock(key)) {func.call();}} catch (Exception e) {LOGGER.info("Exception running function with key = {}", key, e);} finally {if (lock) {try {SessionHolder.releaseDistributedLock(key);} catch (Exception ex) {LOGGER.warn("release distribute lock failure, message = {}", ex.getMessage(), ex);}}}return lock;}
public boolean acquireLock(DistributedLockDO distributedLockDO) {if (demotion) {return true;}Connection connection = null;boolean originalAutoCommit = false;try {connection = distributedLockDataSource.getConnection();originalAutoCommit = connection.getAutoCommit();connection.setAutoCommit(false);//这个是核心,查询出所有的异步提交的事务,并且查询的sql上加上排他锁DistributedLockDO distributedLockDOFromDB = getDistributedLockDO(connection, distributedLockDO.getLockKey());//如果查询出来的数据为nullif (null == distributedLockDOFromDB) {//执行插入的sql "INSERT INTO " + DISTRIBUTED_LOCK_TABLE_PLACE_HOLD + "(" + ALL_COLUMNS + ") VALUES (?, ?, ?)";boolean ret = insertDistribute(connection, distributedLockDO);//提交事务connection.commit();return ret;}//判断过期时间如果大于等于当前的系统时间if (distributedLockDOFromDB.getExpireTime() >= System.currentTimeMillis()) {LOGGER.debug("the distribute lock for key :{} is holding by :{}, acquire lock failure.",distributedLockDO.getLockKey(), distributedLockDOFromDB.getLockValue());//事务提交connection.commit();return false;}//执行更新sql"UPDATE " + "distributed_lock_table" + " SET "+ ServerTableColumnsName.DISTRIBUTED_LOCK_VALUE + "=?, " + ServerTableColumnsName.DISTRIBUTED_LOCK_EXPIRE + "=?"+ " WHERE " + ServerTableColumnsName.DISTRIBUTED_LOCK_KEY + "=?";boolean ret = updateDistributedLock(connection, distributedLockDO);//事务提交connection.commit();return ret;} catch (SQLException ex) {LOGGER.error("execute acquire lock failure, key is: {}", distributedLockDO.getLockKey(), ex);try {if (connection != null) {//回滚connection.rollback();}} catch (SQLException e) {LOGGER.warn("rollback fail because of {}", e.getMessage(), e);}return false;} finally {try {if (originalAutoCommit) {connection.setAutoCommit(true);}IOUtil.close(connection);} catch (SQLException ignore) { }}}

加上分布式事务锁之后,我们需要执行handleAsyncCommitting

protected void handleAsyncCommitting() {SessionCondition sessionCondition = new SessionCondition(GlobalStatus.AsyncCommitting);// key1:获取所有GlobalSession,并且是事务为AsyncCommiting状态Collection<GlobalSession> asyncCommittingSessions =SessionHolder.getAsyncCommittingSessionManager().findGlobalSessions(sessionCondition);if (CollectionUtils.isEmpty(asyncCommittingSessions)) {return;}// key2:遍历SessionHelper.forEach(asyncCommittingSessions, asyncCommittingSession -> {try {asyncCommittingSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());// 进行处理core.doGlobalCommit(asyncCommittingSession, true);} catch (TransactionException ex) {LOGGER.error("Failed to async committing [{}] {} {}", asyncCommittingSession.getXid(), ex.getCode(), ex.getMessage(), ex);}});}

事务的核心执行流程

@Overridepublic boolean  doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {boolean success = true;// start committing eventMetricsPublisher.postSessionDoingEvent(globalSession, retrying);if (globalSession.isSaga()) {success = getCore(BranchType.SAGA).doGlobalCommit(globalSession, retrying);} else {// key1: 获取分支事务Boolean result = SessionHelper.forEach(globalSession.getSortedBranches(), branchSession -> {// if not retrying, skip the canBeCommittedAsync branchesif (!retrying && branchSession.canBeCommittedAsync()) {return CONTINUE;}BranchStatus currentStatus = branchSession.getStatus();// key2: 当前事务状态为PhaseOne_Failed,则删除分支事务,释放全局锁if (currentStatus == BranchStatus.PhaseOne_Failed) {SessionHelper.removeBranch(globalSession, branchSession, !retrying);return CONTINUE;}try {//key3:发送rpc删除undolog日志BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);if (isXaerNotaTimeout(globalSession,branchStatus)) {LOGGER.info("Commit branch XAER_NOTA retry timeout, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());branchStatus = BranchStatus.PhaseTwo_Committed;}switch (branchStatus) {case PhaseTwo_Committed://key4: 删除分支事务信息,释放全局锁SessionHelper.removeBranch(globalSession, branchSession, !retrying);return CONTINUE;case PhaseTwo_CommitFailed_Unretryable://远程删除失败则报错,如果不是异步提交,修改状态CommitFailedSessionHelper.endCommitFailed(globalSession, retrying);LOGGER.error("Committing global transaction[{}] finally failed, caused by branch transaction[{}] commit failed.", globalSession.getXid(), branchSession.getBranchId());return false;default:if (!retrying) {globalSession.queueToRetryCommit();return false;}if (globalSession.canBeCommittedAsync()) {LOGGER.error("Committing branch transaction[{}], status:{} and will retry later",branchSession.getBranchId(), branchStatus);return CONTINUE;} else {LOGGER.error("Committing global transaction[{}] failed, caused by branch transaction[{}] commit failed, will retry later.", globalSession.getXid(), branchSession.getBranchId());return false;}}} catch (Exception ex) {StackTraceLogger.error(LOGGER, ex, "Committing branch transaction exception: {}",new String[] {branchSession.toString()});if (!retrying) {globalSession.queueToRetryCommit();throw new TransactionException(ex);}}return CONTINUE;});// Return if the result is not nullif (result != null) {return result;}//If has branch and not all remaining branches can be committed asynchronously,//do print log and return falseif (globalSession.hasBranch() && !globalSession.canBeCommittedAsync()) {LOGGER.info("Committing global transaction is NOT done, xid = {}.", globalSession.getXid());return false;}if (!retrying) {//contains not AT branchglobalSession.setStatus(GlobalStatus.Committed);}}// if it succeeds and there is no branch, retrying=true is the asynchronous state when retrying. EndCommitted is// executed to improve concurrency performance, and the global transaction ends..if (success && globalSession.getBranchSessions().isEmpty()) {// key4: 删除全局事务信息SessionHelper.endCommitted(globalSession, retrying);LOGGER.info("Committing global transaction is successfully done, xid = {}.", globalSession.getXid());}return success;}

客户端删除undo_log

在这里插入图片描述
我们进入客户端DataSourceManager#branchcommit方法

//这个属于核心,在类的构造方法中,会初始化一个定时任务private final AsyncWorker asyncWorker = new AsyncWorker(this);
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {return this.asyncWorker.branchCommit(xid, branchId, resourceId);}
    public BranchStatus branchCommit(String xid, long branchId, String resourceId) {AsyncWorker.Phase2Context context = new AsyncWorker.Phase2Context(xid, branchId, resourceId);//将事务加入队列this.addToCommitQueue(context);return BranchStatus.PhaseTwo_Committed;}

AsyncWorker的构造方法

    public AsyncWorker(DataSourceManager dataSourceManager) {this.dataSourceManager = dataSourceManager;LOGGER.info("Async Commit Buffer Limit: {}", ASYNC_COMMIT_BUFFER_LIMIT);//初始化一个队列this.commitQueue = new LinkedBlockingQueue(ASYNC_COMMIT_BUFFER_LIMIT);ThreadFactory threadFactory = new NamedThreadFactory("AsyncWorker", 2, true);this.scheduledExecutor = new ScheduledThreadPoolExecutor(2, threadFactory);//每一秒就轮训执行方法doBranchCommitSafelythis.scheduledExecutor.scheduleAtFixedRate(this::doBranchCommitSafely, 10L, 1000L, TimeUnit.MILLISECONDS);}
    private void doBranchCommit() {//判断队列是否为空if (!this.commitQueue.isEmpty()) {List<AsyncWorker.Phase2Context> allContexts = new LinkedList();//将队列中的消息添加到新的list中this.commitQueue.drainTo(allContexts);//校验和处理数据,并将其封装为mapMap<String, List<AsyncWorker.Phase2Context>> groupedContexts = this.groupedByResourceId(allContexts);groupedContexts.forEach(this::dealWithGroupedContexts);}}
 private void dealWithGroupedContexts(String resourceId, List<AsyncWorker.Phase2Context> contexts) {//判断resourceId是否为空if (StringUtils.isBlank(resourceId)) {LOGGER.warn("resourceId is empty and will skip.");} else {DataSourceProxy dataSourceProxy = this.dataSourceManager.get(resourceId);if (dataSourceProxy == null) {LOGGER.warn("failed to find resource for {} and requeue", resourceId);//添加到提交队列this.addAllToCommitQueue(contexts);} else {Connection conn = null;try {conn = dataSourceProxy.getPlainConnection();//获取undolog事务管理器UndoLogManager undoLogManager = UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType());List<List<AsyncWorker.Phase2Context>> splitByLimit = Lists.partition(contexts, 1000);Iterator var7 = splitByLimit.iterator();//循环遍历while(var7.hasNext()) {List<AsyncWorker.Phase2Context> partition = (List)var7.next();//删除undologr日志this.deleteUndoLog(conn, undoLogManager, partition);}} catch (SQLException var12) {this.addAllToCommitQueue(contexts);LOGGER.error("failed to get connection for async committing on {} and requeue", resourceId, var12);} finally {IOUtil.close(conn);}}}}
    private void deleteUndoLog(final Connection conn, UndoLogManager undoLogManager, List<AsyncWorker.Phase2Context> contexts) {Set<String> xids = new LinkedHashSet(contexts.size());Set<Long> branchIds = new LinkedHashSet(contexts.size());contexts.forEach((context) -> {xids.add(context.xid);branchIds.add(context.branchId);});try {//批量删除undoLogManager.batchDeleteUndoLog(xids, branchIds, conn);if (!conn.getAutoCommit()) {conn.commit();}} catch (SQLException var9) {LOGGER.error("Failed to batch delete undo log", var9);try {conn.rollback();this.addAllToCommitQueue(contexts);} catch (SQLException var8) {LOGGER.error("Failed to rollback JDBC resource after deleting undo log failed", var8);}}}

事务回滚

客户端事务回滚

在这里插入图片描述
客户端事务主要从AbstractTransactionRequest的实现类中去找到branchTrancetionRollback类

public abstract class AbstractTransactionRequest extends AbstractMessage {public AbstractTransactionRequest() {}public abstract AbstractTransactionResponse handle(RpcContext rpcContext);
}

这个类是rollBack的核心类

public class BranchRollbackRequest extends AbstractBranchEndRequest {public BranchRollbackRequest() {}public short getTypeCode() {return 5;}public AbstractTransactionResponse handle(RpcContext rpcContext) {return this.handler.handle(this);}
}
    protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response) throws TransactionException {//封装请求参数String xid = request.getXid();long branchId = request.getBranchId();String resourceId = request.getResourceId();String applicationData = request.getApplicationData();if (LOGGER.isInfoEnabled()) {LOGGER.info("Branch Rollbacking: " + xid + " " + branchId + " " + resourceId);}BranchStatus status = this.getResourceManager().branchRollback(request.getBranchType(), xid, branchId, resourceId, applicationData);//封装响应数据response.setXid(xid);response.setBranchId(branchId);response.setBranchStatus(status);if (LOGGER.isInfoEnabled()) {LOGGER.info("Branch Rollbacked result: " + status);}}
    public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {DataSourceProxy dataSourceProxy = this.get(resourceId);if (dataSourceProxy == null) {throw new ShouldNeverHappenException(String.format("resource: %s not found", resourceId));} else {try {//核心的执行方法UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId);} catch (TransactionException var9) {StackTraceLogger.info(LOGGER, var9, "branchRollback failed. branchType:[{}], xid:[{}], branchId:[{}], resourceId:[{}], applicationData:[{}]. reason:[{}]", new Object[]{branchType, xid, branchId, resourceId, applicationData, var9.getMessage()});if (var9.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;}return BranchStatus.PhaseTwo_RollbackFailed_Retryable;}//返回二次提交回滚状态return BranchStatus.PhaseTwo_Rollbacked;}}
 public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {Connection conn = null;ResultSet rs = null;PreparedStatement selectPST = null;boolean originalAutoCommit = true;while(true) {try {conn = dataSourceProxy.getPlainConnection();if (originalAutoCommit = conn.getAutoCommit()) {conn.setAutoCommit(false);}//查询出来事务的undolog      SELECT_UNDO_LOG_SQL = "SELECT * FROM " + UNDO_LOG_TABLE_NAME + " WHERE " + "branch_id" + " = ? AND " + "xid" + " = ? FOR UPDATE";selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);selectPST.setLong(1, branchId);selectPST.setString(2, xid);rs = selectPST.executeQuery();boolean exists = false;while(rs.next()) {exists = true;int state = rs.getInt("log_status");if (!canUndo(state)) {if (LOGGER.isInfoEnabled()) {LOGGER.info("xid {} branch {}, ignore {} undo_log", new Object[]{xid, branchId, state});}return;}String contextString = rs.getString("context");Map<String, String> context = this.parseContext(contextString);//封装undolog,然后执行sql会进行回滚byte[] rollbackInfo = this.getRollbackInfo(rs);String serializer = context == null ? null : (String)context.get("serializer");UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance() : UndoLogParserFactory.getInstance(serializer);BranchUndoLog branchUndoLog = parser.decode(rollbackInfo);try {setCurrentSerializer(parser.getName());List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs();if (sqlUndoLogs.size() > 1) {Collections.reverse(sqlUndoLogs);}Iterator var18 = sqlUndoLogs.iterator();while(var18.hasNext()) {SQLUndoLog sqlUndoLog = (SQLUndoLog)var18.next();TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dataSourceProxy.getDbType()).getTableMeta(conn, sqlUndoLog.getTableName(), dataSourceProxy.getResourceId());sqlUndoLog.setTableMeta(tableMeta);AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(dataSourceProxy.getDbType(), sqlUndoLog);//执行对应的回滚操作undoExecutor.executeOn(conn);}} finally {removeCurrentSerializer();}}if (exists) {//删除undologthis.deleteUndoLog(xid, branchId, conn);conn.commit();if (LOGGER.isInfoEnabled()) {LOGGER.info("xid {} branch {}, undo_log deleted with {}", new Object[]{xid, branchId, AbstractUndoLogManager.State.GlobalFinished.name()});break;}} else {this.insertUndoLogWithGlobalFinished(xid, branchId, UndoLogParserFactory.getInstance(), conn);conn.commit();if (LOGGER.isInfoEnabled()) {LOGGER.info("xid {} branch {}, undo_log added with {}", new Object[]{xid, branchId, AbstractUndoLogManager.State.GlobalFinished.name()});break;}}return;} catch (SQLIntegrityConstraintViolationException var43) {if (LOGGER.isInfoEnabled()) {LOGGER.info("xid {} branch {}, undo_log inserted, retry rollback", xid, branchId);}} catch (Throwable var44) {if (conn != null) {try {conn.rollback();} catch (SQLException var41) {LOGGER.warn("Failed to close JDBC resource while undo ... ", var41);}}throw new BranchTransactionException(TransactionExceptionCode.BranchRollbackFailed_Retriable, String.format("Branch session rollback failed and try again later xid = %s branchId = %s %s", xid, branchId, var44.getMessage()), var44);} finally {try {if (rs != null) {rs.close();}if (selectPST != null) {selectPST.close();}if (conn != null) {if (originalAutoCommit) {conn.setAutoCommit(true);}conn.close();}} catch (SQLException var40) {LOGGER.warn("Failed to close JDBC resource while undo ... ", var40);}}}}

服务端回滚事务

在这里插入图片描述
服务端的事务回滚是从GlobalRollbackRequest方法进入的

public class GlobalRollbackRequest extends AbstractGlobalEndRequest {@Overridepublic short getTypeCode() {return MessageType.TYPE_GLOBAL_ROLLBACK;}@Overridepublic AbstractTransactionResponse handle(RpcContext rpcContext) {//这里是核心return handler.handle(this, rpcContext);}
}
 @Overridepublic GlobalRollbackResponse handle(GlobalRollbackRequest request, final RpcContext rpcContext) {//组装响应的数据GlobalRollbackResponse response = new GlobalRollbackResponse();response.setGlobalStatus(GlobalStatus.Rollbacking);exceptionHandleTemplate(new AbstractCallback<GlobalRollbackRequest, GlobalRollbackResponse>() {@Overridepublic void execute(GlobalRollbackRequest request, GlobalRollbackResponse response)throws TransactionException {try {//核心的执行类doGlobalRollback(request, response, rpcContext);} catch (StoreException e) {throw new TransactionException(TransactionExceptionCode.FailedStore, String.format("global rollback request failed. xid=%s, msg=%s", request.getXid(), e.getMessage()), e);}}@Overridepublic void onTransactionException(GlobalRollbackRequest request, GlobalRollbackResponse response,TransactionException tex) {super.onTransactionException(request, response, tex);// may be appears StoreException outer layer method catchcheckTransactionStatus(request, response);}@Overridepublic void onException(GlobalRollbackRequest request, GlobalRollbackResponse response, Exception rex) {super.onException(request, response, rex);// may be appears StoreException outer layer method catchcheckTransactionStatus(request, response);}}, request, response);return response;}
  public GlobalStatus rollback(String xid) throws TransactionException {//key1:事务回滚,先通过全局事xid找到全局事务和分支事务//1.先通过全局事务XID找到去global_table查询到全局事务对象//2.通过查询出来的xid对象去查询branch_talbe是否有分支事务,最后将这两个事务对象封装到GlobalSession中//这里传入的true代表全局事务和分支事务都查询出来,全局事务是一条数据,分支事务可能会多条是一个listGlobalSession globalSession = SessionHolder.findGlobalSession(xid);if (globalSession == null) {return GlobalStatus.Finished;}globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());// just lock changeStatusboolean shouldRollBack = SessionHolder.lockAndExecute(globalSession, () -> {//设置本事务的active状态为falseglobalSession.close(); // Highlight: Firstly, close the session, then no more branch can be registered.//如果目前的全局事务状态为Begin开始状态,那么修改全局事务的状态为回滚中,修改数据表global_table的status的值为Rollbackingif (globalSession.getStatus() == GlobalStatus.Begin) {globalSession.changeGlobalStatus(GlobalStatus.Rollbacking);return true;}return false;});if (!shouldRollBack) {return globalSession.getStatus();}//key2:真正的回滚逻辑boolean rollbackSuccess = doGlobalRollback(globalSession, false);//返回回滚的逻辑return rollbackSuccess ? GlobalStatus.Rollbacked : globalSession.getStatus();}
@Overridepublic boolean doGlobalRollback(GlobalSession globalSession, boolean retrying) throws TransactionException {boolean success = true;// start rollback eventMetricsPublisher.postSessionDoingEvent(globalSession, retrying);if (globalSession.isSaga()) {success = getCore(BranchType.SAGA).doGlobalRollback(globalSession, retrying);} else {//globalSession.getReverseSortedBranches()得到的是所有分支Boolean result = SessionHelper.forEach(globalSession.getReverseSortedBranches(), branchSession -> {BranchStatus currentBranchStatus = branchSession.getStatus();//如果分支事务是在一阶段失败的,调用removeBranch,释放锁if (currentBranchStatus == BranchStatus.PhaseOne_Failed) {SessionHelper.removeBranch(globalSession, branchSession, !retrying);return CONTINUE;}try {//branchRollback回滚分支事务,这里是server端,所以这里的分支事务的回滚是调用的远程进行回滚的//而远程回滚就是使用的undo log日志表来回滚的BranchStatus branchStatus = branchRollback(globalSession, branchSession);if (isXaerNotaTimeout(globalSession, branchStatus)) {LOGGER.info("Rollback branch XAER_NOTA retry timeout, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());branchStatus = BranchStatus.PhaseTwo_Rollbacked;}switch (branchStatus) {//PhaseTwo_Rollbacked表示远程回滚成功case PhaseTwo_Rollbacked:// 二阶段回滚,删除分支事务信息//删除分支事务信息,就是删除branch_table和释放锁//1.释放锁,删除lock_table中的行锁信息;//2.删除分支事务,branch_tableSessionHelper.removeBranch(globalSession, branchSession, !retrying);LOGGER.info("Rollback branch transaction successfully, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());return CONTINUE;case PhaseTwo_RollbackFailed_Unretryable://远程回滚失败,修改全局事务状态为RollbackFailedSessionHelper.endRollbackFailed(globalSession, retrying);LOGGER.info("Rollback branch transaction fail and stop retry, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());return false;default:LOGGER.info("Rollback branch transaction fail and will retry, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());if (!retrying) {globalSession.queueToRetryRollback();}return false;}} catch (Exception ex) {StackTraceLogger.error(LOGGER, ex,"Rollback branch transaction exception, xid = {} branchId = {} exception = {}",new String[] {globalSession.getXid(), String.valueOf(branchSession.getBranchId()), ex.getMessage()});if (!retrying) {globalSession.queueToRetryRollback();}throw new TransactionException(ex);}});// Return if the result is not nullif (result != null) {return result;}}// In db mode, lock and branch data residual problems may occur.// Therefore, execution needs to be delayed here and cannot be executed synchronously.if (success) {// 删除全局事务数据SessionHelper.endRollbacked(globalSession, retrying);LOGGER.info("Rollback global transaction successfully, xid = {}.", globalSession.getXid());}return success;}

起一个定时任务删除全局事务。

 public void init() {retryRollbacking.scheduleAtFixedRate(() -> SessionHolder.distributedLockAndExecute(RETRY_ROLLBACKING, this::handleRetryRollbacking), 0,ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS);retryCommitting.scheduleAtFixedRate(() -> SessionHolder.distributedLockAndExecute(RETRY_COMMITTING, this::handleRetryCommitting), 0,COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);asyncCommitting.scheduleAtFixedRate(() -> SessionHolder.distributedLockAndExecute(ASYNC_COMMITTING, this::handleAsyncCommitting), 0,ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);timeoutCheck.scheduleAtFixedRate(() -> SessionHolder.distributedLockAndExecute(TX_TIMEOUT_CHECK, this::timeoutCheck), 0,TIMEOUT_RETRY_PERIOD, TimeUnit.MILLISECONDS);undoLogDelete.scheduleAtFixedRate(() -> SessionHolder.distributedLockAndExecute(UNDOLOG_DELETE, this::undoLogDelete),UNDO_LOG_DELAY_DELETE_PERIOD, UNDO_LOG_DELETE_PERIOD, TimeUnit.MILLISECONDS);}
  */protected void handleRetryRollbacking() {SessionCondition sessionCondition = new SessionCondition(rollbackingStatuses);sessionCondition.setLazyLoadBranch(true);Collection<GlobalSession> rollbackingSessions =SessionHolder.getRetryRollbackingSessionManager().findGlobalSessions(sessionCondition);if (CollectionUtils.isEmpty(rollbackingSessions)) {return;}long now = System.currentTimeMillis();SessionHelper.forEach(rollbackingSessions, rollbackingSession -> {try {// prevent repeated rollback// !rollbackingSession.isDeadSession() 判断回滚的全局事务必须超时,对应时间可以进入看if (rollbackingSession.getStatus().equals(GlobalStatus.Rollbacking)&& !rollbackingSession.isDeadSession()) {// The function of this 'return' is 'continue'.return;}if (isRetryTimeout(now, MAX_ROLLBACK_RETRY_TIMEOUT.toMillis(), rollbackingSession.getBeginTime())) {if (ROLLBACK_RETRY_TIMEOUT_UNLOCK_ENABLE) {rollbackingSession.clean();}// Prevent thread safety issuesSessionHolder.getRetryRollbackingSessionManager().removeGlobalSession(rollbackingSession);LOGGER.error("Global transaction rollback retry timeout and has removed [{}]", rollbackingSession.getXid());SessionHelper.endRollbackFailed(rollbackingSession, true);// rollback retry timeout eventMetricsPublisher.postSessionDoneEvent(rollbackingSession, GlobalStatus.RollbackRetryTimeout, true, false);//The function of this 'return' is 'continue'.return;}rollbackingSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());core.doGlobalRollback(rollbackingSession, true);} catch (TransactionException ex) {LOGGER.info("Failed to retry rollbacking [{}] {} {}", rollbackingSession.getXid(), ex.getCode(), ex.getMessage());}});}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/860031.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何应对 Android 面试官 -> MVVM 实战一个新闻客户端 (中)

前言 本章我们基于重构的方式进行一个 MVVM 的实战&#xff0c;我们将一个新闻列表的普通实现&#xff0c;一步一步的改造成 MVVM 的架构模式&#xff0c;一共分为上中下三个章节&#xff0c;本章继续上一章&#xff0c;开始中篇的讲解&#xff1b; 控件化 我们本章向控件化进…

2024年华东杯B题数学建模论文:基于车辆运动学转弯模型的自动驾驶规划问题

摘要 随着自动驾驶技术的发展&#xff0c;车辆转弯问题成为关键挑战。本文针对自动驾驶车辆在转弯过程中的数学建模、路径规划及避障策略进行了深入研究&#xff0c;旨在提升自动驾驶车辆的行驶安全性与效率。 针对问题1&#xff0c;对于四轮前轮驱动车辆的转弯问题&#xff0c…

【C++LeetCode】【热题100】两数相加【中等】-不同效率的题解【1】

题目&#xff1a; 暴力方法&#xff1a; /*** Definition for singly-linked list.* struct ListNode {* int val;* ListNode *next;* ListNode() : val(0), next(nullptr) {}* ListNode(int x) : val(x), next(nullptr) {}* ListNode(int x, ListNo…

常见硬件工程师面试题(二)

大家好&#xff0c;我是山羊君Goat。 对于硬件工程师&#xff0c;学习的东西主要和电路硬件相关&#xff0c;所以在硬件工程师的面试中&#xff0c;对于经验是十分看重的&#xff0c;像PCB设计&#xff0c;电路设计原理&#xff0c;模拟电路&#xff0c;数字电路等等相关的知识…

ps基础入门

1.基础 1.1新建文件 1.2创建指定形状 1.4移动工具 1.41移动画布中的任意元素 1.42移动画布 1.43修改画布大小 1.44修改图像大小 1.5框选工具 1.6矩形工具 1.7图层 1.71图层颜色修改 1.72…

Spring事务介绍、Spring集成MyBatis

目录 1.Spring的事务1.1 什么是事务&#xff1f;1.2 事务的特性&#xff08;ACID&#xff09;1.3 Spring 事务实现方式有哪些&#xff1f;1.4 Spring事务管理接口介绍1.4.1 PlatformTransactionManager:事务管理接口1.4.2 TransactionDefinition:事务属性事务管理器接口1.4.3 T…

《昇思25天学习打卡营第1天|ghqt》

参与这个类活动&#xff0c;我会坚持完成它的。目前MindSpore文档里面的内容还看的不是很懂&#xff0c;希望自己在能不断进步。 第一天学到的内容—— 昇腾应用使能&#xff1a;华为各大产品线基于MindSpore提供的AI平台或服务能力MindSpore&#xff1a;支持端、边、云独立的…

HarmonyOS开发 弹窗组件

1.HarmonyOS开发 弹窗组件 弹窗是移动应用中常见的一种用户界面元素&#xff0c;常用于显示一些重要的信息、提示用户进行操作或收集用户输入。ArkTS提供了多种内置的弹窗供开发者使用&#xff0c;除此之外还支持自定义弹窗&#xff0c;来满足各种不同的需求。 1.1. 示例 1.…

STM32G070休眠例程-STOP模式

一、简介 主控是STM32G070&#xff0c;在低功耗休眠模式时采用Stop0模式&#xff0c;通过外部中断唤醒&#xff0c;唤醒之后&#xff0c;即可开启对应的功能输出&#xff0c;另外程序中设计有看门狗8S溢出&#xff0c;这个采用RTC定时6S周期唤醒去喂狗&#xff0c;RTC唤醒喂狗的…

在线样机生成器,制作精美的电脑手机壁纸图片展示

在线样机生成器&#xff0c;可以制作精美的电脑手机壁纸图片展示。在线样机生成器支持不同的模型如浏览器、手机、笔记本电脑、手表等结合使用&#xff0c;帮助用户快速生成样机展示图片。下面小编就来和大家分享一款免费的在线样机生成器-壁纸样机生成器。 壁纸样机生成器是一…

观测云「可观测性解决方案」荣耀登入华为云官网

继成功上架华为云云商店联营商品后&#xff0c;「观测未来可观测性解决方案」已进一步正式登陆华为云官网&#xff0c;标志着双方合作的深化与拓展。这一全新上架的解决方案是观测云技术实力的集大成之作&#xff0c;为企业提供了一个全面升级的数字化监控观测服务。 观测云&am…

LeetCode 算法:二叉树的直径 c++

原题链接&#x1f517;&#xff1a;二叉树的直径 难度&#xff1a;简单⭐️ 题目 给你一棵二叉树的根节点&#xff0c;返回该树的 直径 。 二叉树的 直径 是指树中任意两个节点之间最长路径的 长度 。这条路径可能经过也可能不经过根节点 root 。 两节点之间路径的 长度 由…

【后端】Nginx+lua+OpenResty高性能实践

文章目录 9. HTTPS安全认证9.1 证书9.2 证书获取方式9.3 自签证书-openssl工具9.4 Nginx配置HTTPS 10. websocket转发配置 【后端&网络&大数据&数据库目录贴】 9. HTTPS安全认证 http协议问题&#xff1a; 明文传输&#xff0c;有被第三方截取到数据信息的风险 &a…

Java代码操作MySQL数据库——JDBC编程

本篇会加入个人的所谓鱼式疯言 ❤️❤️❤️鱼式疯言:❤️❤️❤️此疯言非彼疯言 而是理解过并总结出来通俗易懂的大白话, 小编会尽可能的在每个概念后插入鱼式疯言,帮助大家理解的. &#x1f92d;&#x1f92d;&#x1f92d;可能说的不是那么严谨.但小编初心是能让更多人…

LangChain入门学习笔记(六)—— Model I/O之Output Parsers

当大模型产生输出返回后&#xff0c;它的内容更像是一段平铺的文字没有结构。在传给下游节点处理时可能并不能符合输入要求&#xff0c;LangChain提供了一套机制使得模型返回的内容可以按照开发者定义的那样结构化。 在官网文档中可以看到LangChain提供了丰富的输出解析器&…

二叉树-左叶子之和(easy)

目录 一、问题描述 二、解题思路 三、代码实现 四、刷题链接 一、问题描述 二、解题思路 此题属于树遍历的简单题&#xff0c;用递归深度遍历的方式&#xff0c;当遇到左叶子结点(在递归函数中加上一个判断当前结点是左结点还是右结点的标记位)&#xff0c;此时加上当前结点…

数字图像处理实验报告小论文(Matlab语言)

1.课题分析 在当今信息化社会&#xff0c;图像处理技术已成为众多领域不可或缺的一部分&#xff0c;从医学影像分析到安防监控&#xff0c;再到日常生活中的图片美化&#xff0c;图像处理技术都发挥着至关重要的作用。本次课题主要聚焦于图像灰度处理、图像小波变换和图像分割这…

Python基础系列教程:从零开始学习Python

Python有很多功能强大的机器学习和大数据分析包&#xff0c;适合对大数据和人工智能感兴趣的同学学习。要想了解一门语言&#xff0c;首先需要了解它的语法。本文将介绍Python的一些基础语法&#xff0c;包括数据类型、变量类型、条件控制、循环结构等内容。废话少说&#xff0…

第二十四节:带你梳理Vue2 : Vue具名插槽/作用域插槽/v-slot指令

1. 具名插槽 1.1 没有使用具名插槽的问题 有的时候我们在使用子组件时,在子组件模板上不同的位置插入不同的内容, 只有一个插槽显然没法满足我们的需求,看示例: 需求如下: 子组件是一篇文章的结构父组件在调用子组件是给文章插入标题,正文,时间信息 示例代码如下: <di…

【强化学习的数学原理】课程笔记--1(基本概念,贝尔曼公式)

目录 基本概念State, Action, State transitionPolicy, Reward, Trajectory, Discount ReturnEpisodeMarkov decision process 贝尔曼公式推导确定形式的贝尔曼公式推导一般形式的贝尔曼公式State ValueAction Value 一些例子贝尔曼公式的 Matric-vector form贝尔曼公式的解析解…