文章目录
- 分支事务注册-客户端
- 分支事务服务端的执行
分支事务注册-客户端
第一篇我们将全局事务启动,以及开启源码分析完成了,现在我们需要看一下分支事务注册。
我们分支事务的开始需要从PreparedStatementProxy#executeUpdate中去看。
public class PreparedStatementProxy extends AbstractPreparedStatementProxy implements PreparedStatement, ParametersHolder {public Map<Integer, ArrayList<Object>> getParameters() {return this.parameters;}public PreparedStatementProxy(AbstractConnectionProxy connectionProxy, PreparedStatement targetStatement, String targetSQL) throws SQLException {super(connectionProxy, targetStatement, targetSQL);}public boolean execute() throws SQLException {return (Boolean)ExecuteTemplate.execute(this, (statement, args) -> {return statement.execute();}, new Object[0]);}public ResultSet executeQuery() throws SQLException {return (ResultSet)ExecuteTemplate.execute(this, (statement, args) -> {return statement.executeQuery();}, new Object[0]);}
//这个是分支事务的核心入口public int executeUpdate() throws SQLException {return (Integer)ExecuteTemplate.execute(this, (statement, args) -> {return statement.executeUpdate();}, new Object[0]);}
}
判断出当前的业务Sql是什么类型,我们需要选择不同的执行器。
public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers, StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback, Object... args) throws SQLException {//判断是否是全局锁,并且是否是AT模式if (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) {//否 执行普通的SQLreturn statementCallback.execute(statementProxy.getTargetStatement(), args);} else {//获取数据库类型String dbType = statementProxy.getConnectionProxy().getDbType();if (CollectionUtils.isEmpty(sqlRecognizers)) {sqlRecognizers = SQLVisitorFactory.get(statementProxy.getTargetSQL(), dbType);}Object executor;if (CollectionUtils.isEmpty(sqlRecognizers)) {executor = new PlainExecutor(statementProxy, statementCallback);} else if (sqlRecognizers.size() == 1) {SQLRecognizer sqlRecognizer = (SQLRecognizer)sqlRecognizers.get(0);label44://根据不同的SQL类型选择不同的执行器switch(sqlRecognizer.getSQLType()) {case INSERT:executor = (Executor)EnhancedServiceLoader.load(InsertExecutor.class, dbType, new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class}, new Object[]{statementProxy, statementCallback, sqlRecognizer});break;case UPDATE:executor = new UpdateExecutor(statementProxy, statementCallback, sqlRecognizer);break;case DELETE:executor = new DeleteExecutor(statementProxy, statementCallback, sqlRecognizer);break;case SELECT_FOR_UPDATE:executor = new SelectForUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);break;case INSERT_ON_DUPLICATE_UPDATE:byte var8 = -1;switch(dbType.hashCode()) {case 104382626:if (dbType.equals("mysql")) {var8 = 0;}break;case 839186932:if (dbType.equals("mariadb")) {var8 = 1;}}switch(var8) {case 0:case 1:executor = new MySQLInsertOrUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);break label44;default:throw new NotSupportYetException(dbType + " not support to INSERT_ON_DUPLICATE_UPDATE");}default:executor = new PlainExecutor(statementProxy, statementCallback);}} else {executor = new MultiExecutor(statementProxy, statementCallback, sqlRecognizers);}try {//核心入口执行T rs = ((Executor)executor).execute(args);return rs;} catch (Throwable var9) {Throwable ex = var9;if (!(var9 instanceof SQLException)) {ex = new SQLException(var9);}throw (SQLException)ex;}}}
excute()执行
public T execute(Object... args) throws Throwable {//获取事务的xidString xid = RootContext.getXID();if (xid != null) {//绑定xidthis.statementProxy.getConnectionProxy().bind(xid);}//将事务绑定上全局锁this.statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());return this.doExecute(args);}
protected T executeAutoCommitTrue(Object[] args) throws Throwable {ConnectionProxy connectionProxy = this.statementProxy.getConnectionProxy();Object var3;try {//将事务改为手动提交connectionProxy.changeAutoCommit();var3 = (new AbstractDMLBaseExecutor.LockRetryPolicy(connectionProxy)).execute(() -> {//执行非自动提交T result = this.executeAutoCommitFalse(args);connectionProxy.commit();return result;});} catch (Exception var7) {LOGGER.error("execute executeAutoCommitTrue error:{}", var7.getMessage(), var7);if (!AbstractDMLBaseExecutor.LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) { //报错将事务进行回滚connectionProxy.getTargetConnection().rollback();}throw var7;} finally {connectionProxy.getContext().reset();//将自动提交置为trueconnectionProxy.setAutoCommit(true);}return var3;}
protected T executeAutoCommitFalse(Object[] args) throws Exception {if (!"mysql".equalsIgnoreCase(this.getDbType()) && this.isMultiPk()) {throw new NotSupportYetException("multi pk only support mysql!");} else {//设置前置镜像TableRecords beforeImage = this.beforeImage();//执行业务SqlT result = this.statementCallback.execute(this.statementProxy.getTargetStatement(), args);int updateCount = this.statementProxy.getUpdateCount();if (updateCount > 0) {//执行后置镜像TableRecords afterImage = this.afterImage(beforeImage);this.prepareUndoLog(beforeImage, afterImage);}return result;}}
执行提交
public void commit() throws SQLException {try {this.lockRetryPolicy.execute(() -> {this.doCommit();return null;});} catch (SQLException var2) {if (this.targetConnection != null && !this.getAutoCommit() && !this.getContext().isAutoCommitChanged()) {this.rollback();}throw var2;} catch (Exception var3) {throw new SQLException(var3);}}
执行事务的提交
private void doCommit() throws SQLException {if (this.context.inGlobalTransaction()) {//如果是全局事务就执行此方法this.processGlobalTransactionCommit();} else if (this.context.isGlobalLockRequire()) {//执行全局锁的事务提交this.processLocalCommitWithGlobalLocks();} else {//其他this.targetConnection.commit();}}
组装请求数据,发送后端
public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException {try {BranchRegisterRequest request = new BranchRegisterRequest();request.setXid(xid);request.setLockKey(lockKeys);request.setResourceId(resourceId);request.setBranchType(branchType);request.setApplicationData(applicationData);BranchRegisterResponse response = (BranchRegisterResponse)RmNettyRemotingClient.getInstance().sendSyncRequest(request);if (response.getResultCode() == ResultCode.Failed) {throw new RmTransactionException(response.getTransactionExceptionCode(), String.format("Response[ %s ]", response.getMsg()));} else {return response.getBranchId();}} catch (TimeoutException var9) {throw new RmTransactionException(TransactionExceptionCode.IO, "RPC Timeout", var9);} catch (RuntimeException var10) {throw new RmTransactionException(TransactionExceptionCode.BranchRegisterFailed, "Runtime", var10);}}
分支事务服务端的执行
我们首先看一下分支事务服务端注册的入口DefaultCoordinator#
@Overridepublic AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {//判断请求是否来自于事务的客户端if (!(request instanceof AbstractTransactionRequestToTC)) {throw new IllegalArgumentException();}AbstractTransactionRequestToTC transactionRequest = (AbstractTransactionRequestToTC) request;transactionRequest.setTCInboundHandler(this);return transactionRequest.handle(context);}
我们的核心分支事务注册代码
@Overridepublic BranchRegisterResponse handle(BranchRegisterRequest request, final RpcContext rpcContext) {//创建返回的实例BranchRegisterResponse response = new BranchRegisterResponse();exceptionHandleTemplate(new AbstractCallback<BranchRegisterRequest, BranchRegisterResponse>() {@Overridepublic void execute(BranchRegisterRequest request, BranchRegisterResponse response)throws TransactionException {try {//核心分支注册实例doBranchRegister(request, response, rpcContext);} catch (StoreException e) {throw new TransactionException(TransactionExceptionCode.FailedStore, String.format("branch register request failed. xid=%s, msg=%s", request.getXid(), e.getMessage()), e);}}}, request, response);return response;}
@Overridepublic Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,String applicationData, String lockKeys) throws TransactionException {// key1:获取GlobalSessionGlobalSession globalSession = assertGlobalSessionNotNull(xid, false);return SessionHolder.lockAndExecute(globalSession, () -> {// 检查事务状态globalSessionStatusCheck(globalSession);globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());//key2: 创建分支会话BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId,applicationData, lockKeys, clientId);MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(branchSession.getBranchId()));//key3:增加分支事务锁branchSessionLock(globalSession, branchSession);try {//key4: 全局会话添加分支会话globalSession.addBranch(branchSession);} catch (RuntimeException ex) {// key5: 出现异常释放锁branchSessionUnlock(branchSession);throw new BranchTransactionException(FailedToAddBranch, String.format("Failed to store branch xid = %s branchId = %s", globalSession.getXid(),branchSession.getBranchId()), ex);}if (LOGGER.isInfoEnabled()) {LOGGER.info("Register branch successfully, xid = {}, branchId = {}, resourceId = {} ,lockKeys = {}",globalSession.getXid(), branchSession.getBranchId(), resourceId, lockKeys);}// key6: 返回分支会话的分支IDreturn branchSession.getBranchId();});}
创建封装分支事务的信息
public static BranchSession newBranchByGlobal(GlobalSession globalSession, BranchType branchType, String resourceId,String applicationData, String lockKeys, String clientId) {BranchSession branchSession = new BranchSession();branchSession.setXid(globalSession.getXid());branchSession.setTransactionId(globalSession.getTransactionId());branchSession.setBranchId(UUIDGenerator.generateUUID());branchSession.setBranchType(branchType);branchSession.setResourceId(resourceId);branchSession.setLockKey(lockKeys);branchSession.setClientId(clientId);branchSession.setApplicationData(applicationData);return branchSession;}
分支事务加锁
protected void branchSessionLock(GlobalSession globalSession, BranchSession branchSession)throws TransactionException {//分支事务的参数校验String applicationData = branchSession.getApplicationData();boolean autoCommit = true;boolean skipCheckLock = false;if (StringUtils.isNotBlank(applicationData)) {if (objectMapper == null) {objectMapper = new ObjectMapper();}try {Map<String, Object> data = objectMapper.readValue(applicationData, HashMap.class);Object clientAutoCommit = data.get(AUTO_COMMIT);if (clientAutoCommit != null && !(boolean)clientAutoCommit) {autoCommit = (boolean)clientAutoCommit;}Object clientSkipCheckLock = data.get(SKIP_CHECK_LOCK);if (clientSkipCheckLock instanceof Boolean) {skipCheckLock = (boolean)clientSkipCheckLock;}} catch (IOException e) {LOGGER.error("failed to get application data: {}", e.getMessage(), e);}}try {// 增加分支锁,如果返回false加锁失败我们直接抛出异常if (!branchSession.lock(autoCommit, skipCheckLock)) {throw new BranchTransactionException(LockKeyConflict,String.format("Global lock acquire failed xid = %s branchId = %s", globalSession.getXid(),branchSession.getBranchId()));}} catch (StoreException e) {if (e.getCause() instanceof BranchTransactionException) {throw new BranchTransactionException(((BranchTransactionException)e.getCause()).getCode(),String.format("Global lock acquire failed xid = %s branchId = %s", globalSession.getXid(),branchSession.getBranchId()));}throw e;}}
public boolean lock(boolean autoCommit, boolean skipCheckLock) throws TransactionException {// 必须还AT事务if (this.getBranchType().equals(BranchType.AT)) {return LockerManagerFactory.getLockManager().acquireLock(this, autoCommit, skipCheckLock);}return true;}
public boolean acquireLock(BranchSession branchSession, boolean autoCommit, boolean skipCheckLock) throws TransactionException {if (branchSession == null) {throw new IllegalArgumentException("branchSession can't be null for memory/file locker.");}// 获取分支锁的keyString lockKey = branchSession.getLockKey();if (StringUtils.isNullOrEmpty(lockKey)) {// no lockreturn true;}// get locks of branch// 行锁收集List<RowLock> locks = collectRowLocks(branchSession);if (CollectionUtils.isEmpty(locks)) {// no lockreturn true;}// 进行存储return getLocker(branchSession).acquireLock(locks, autoCommit, skipCheckLock);}
添加分支事务
private void writeSession(LogOperation logOperation, SessionStorable sessionStorable) throws TransactionException {// 将事务信息写入数据库if (!transactionStoreManager.writeSession(logOperation, sessionStorable)) {if (LogOperation.GLOBAL_ADD.equals(logOperation)) {throw new GlobalTransactionException(TransactionExceptionCode.FailedWriteSession,"Fail to store global session");} else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {throw new GlobalTransactionException(TransactionExceptionCode.FailedWriteSession,"Fail to update global session");} else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {throw new GlobalTransactionException(TransactionExceptionCode.FailedWriteSession,"Fail to remove global session");} else if (LogOperation.BRANCH_ADD.equals(logOperation)) {throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession,"Fail to store branch session");} else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession,"Fail to update branch session");} else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession,"Fail to remove branch session");} else {throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession,"Unknown LogOperation:" + logOperation.name());}}}
如果发生异常
@Overridepublic boolean unlock() throws TransactionException {if (this.getBranchType() == BranchType.AT) {// 释放锁return LockerManagerFactory.getLockManager().releaseLock(this);}return true;}
public boolean releaseLock(BranchSession branchSession) throws TransactionException {if (branchSession == null) {throw new IllegalArgumentException("branchSession can't be null for memory/file locker.");}List<RowLock> locks = collectRowLocks(branchSession);try {// 释放锁return getLocker(branchSession).releaseLock(locks);} catch (Exception t) {LOGGER.error("unLock error, branchSession:{}", branchSession, t);return false;}}
public boolean releaseLock(List<RowLock> locks) {if (CollectionUtils.isEmpty(locks)) {// no lockreturn true;}try {return lockStore.unLock(convertToLockDO(locks));} catch (StoreException e) {throw e;} catch (Exception t) {LOGGER.error("unLock error, locks:{}", CollectionUtils.toString(locks), t);return false;}}
将数据库的锁删除
private static final String BATCH_DELETE_LOCK_SQL = "delete from " + LOCK_TABLE_PLACE_HOLD+ " where " + ServerTableColumnsName.LOCK_TABLE_XID + " = ? and (" + LOCK_TABLE_PK_WHERE_CONDITION_PLACE_HOLD + ") ";