【Seata源码学习 】篇五 注册分支事务

【Seata源码学习 】篇五 分支事务注册

1.远程服务调用绑定XID

回到事务模版方法类TransactionalTemplate中

                beginTransaction(txInfo, tx);Object rs;try {// Do Your Business// 执行执行拦截器链路rs = business.execute();} catch (Throwable ex) {// 3. The needed business exception to rollback.// 如果抛出异常,判断异常是否在指定的范围中(默认为Throwable类及其子类)// 执行异常回滚的前后钩子方法// 如果当前事务的角色是 launcher 也就是 TM ,通过TmNettyRemotingClient  向TC发送一个 GlobalRollbackRequest 同步消息// 并记录TC返回的当前事务状态StatuscompleteTransactionAfterThrowing(txInfo, tx, ex);throw ex;}

beginTransaction 开启全局事务我们已经在上一篇看过了,最终会将TC返回到XID绑定到RootContext中,并且TC会将全局事务会话持久化,通常我们使用DB,那么将会往seata数据库的global_table中插入一条数据。

接下来就要执行拦截器链路,最后一个拦截器执行完毕后如果无异常就要执行目标方法。调用目标方法通常会使用到第三方的RPC框架,例如 Fegin,RestTemplate等等

在引入的spring-cloud-seata包下,基于SpringbootStater机制,将会自动装配四个配置类,我们就看最常用的SeataFeignClientAutoConfiguration

image-20231123095810260

public class SeataFeignClientAutoConfiguration {public SeataFeignClientAutoConfiguration() {}@Bean@Scope("prototype")@ConditionalOnClass(name = {"com.netflix.hystrix.HystrixCommand"})@ConditionalOnProperty(name = {"feign.hystrix.enabled"},havingValue = "true")Feign.Builder feignHystrixBuilder(BeanFactory beanFactory) {return SeataHystrixFeignBuilder.builder(beanFactory);}@Bean@Scope("prototype")@ConditionalOnClass(name = {"com.alibaba.csp.sentinel.SphU"})@ConditionalOnProperty(name = {"feign.sentinel.enabled"},havingValue = "true")Feign.Builder feignSentinelBuilder(BeanFactory beanFactory) {return SeataSentinelFeignBuilder.builder(beanFactory);}@Bean@ConditionalOnMissingBean@Scope("prototype")Feign.Builder feignBuilder(BeanFactory beanFactory) {return SeataFeignBuilder.builder(beanFactory);}
}

实际上无论是否集成hystrix或者sentinel , 最终构建的都是 SeataFeignClient 对象

static Feign.Builder builder(BeanFactory beanFactory) {return SentinelFeign.builder().retryer(Retryer.NEVER_RETRY).client(new SeataFeignClient(beanFactory));}

而SeataFeignClient在发送请求时,会将Request进行修改

com.alibaba.cloud.seata.feign.SeataFeignClient#execute

 public Response execute(Request request, Request.Options options) throws IOException {//修改请求信息Request modifiedRequest = this.getModifyRequest(request);return this.delegate.execute(modifiedRequest, options);}
    private Request getModifyRequest(Request request) {//获取开启全局事务时绑定到RootContext中到XIDString xid = RootContext.getXID();if (StringUtils.isEmpty(xid)) {return request;} else {// 放到请求头中Map<String, Collection<String>> headers = new HashMap(16);headers.putAll(request.headers());List<String> seataXid = new ArrayList();seataXid.add(xid);headers.put("TX_XID", seataXid);// 返回添加了 TX_XID 请求头的新Requestreturn Request.create(request.method(), request.url(), headers, request.body(), request.charset());}}

这里需要注意一点,在发起RPC调用时,需要保证跟开启全局事务的方法在同一个线程中,因为RootContext是将XID存储在ThreadLocal中,每个Thread都绑定了自己的ThreadLocal.ThreadLocalMap,而ThreadLocal.get就是从当前线程中获取ThreadLocalMap,再去获取存储在线程中的变量。

2.数据源代理

image-20231123104326151

public class SeataDataSourceAutoConfiguration {/*** The bean seataAutoDataSourceProxyCreator.*/@Bean(BEAN_NAME_SEATA_AUTO_DATA_SOURCE_PROXY_CREATOR)@ConditionalOnMissingBean(SeataAutoDataSourceProxyCreator.class)public SeataAutoDataSourceProxyCreator seataAutoDataSourceProxyCreator(SeataProperties seataProperties) {// spring上下文中添加SeataAutoDataSourceProxyCreatorreturn new SeataAutoDataSourceProxyCreator(seataProperties.isUseJdkProxy(),seataProperties.getExcludesForAutoProxying(), seataProperties.getDataSourceProxyMode());}
}

SeataAutoDataSourceProxyCreator 继承了 AbstractAutoProxyCreator

    private Object[] buildAdvisors(String dataSourceProxyMode) {// 使用 SeataAutoDataSourceProxyAdvice 对目标方法进行拦截Advice advice = new SeataAutoDataSourceProxyAdvice(dataSourceProxyMode);return new Object[]{new DefaultIntroductionAdvisor(advice)};}@Overrideprotected Object[] getAdvicesAndAdvisorsForBean(Class<?> beanClass, String beanName, TargetSource customTargetSource) {return advisors;}@Overrideprotected boolean shouldSkip(Class<?> beanClass, String beanName) {//排除指定列表的beanif (excludes.contains(beanClass.getName())) {return true;}//对数据源对象进行拦截return SeataProxy.class.isAssignableFrom(beanClass);}protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {// we only care DataSource beanif (!(bean instanceof DataSource)) {return bean;}// when this bean is just a simple DataSource, not SeataDataSourceProxyif (!(bean instanceof SeataDataSourceProxy)) {Object enhancer = super.wrapIfNecessary(bean, beanName, cacheKey);// this mean this bean is either excluded by user or had been proxy beforeif (bean == enhancer) {return bean;}// else, build proxy,  put <origin, proxy> to holder and return enhancerDataSource origin = (DataSource) bean;// 创建代理类 AT模式为 DataSourceProxy  XA模式为 DataSourceProxyXASeataDataSourceProxy proxy = buildProxy(origin, dataSourceProxyMode);// 将 DataSource 与  SeataDataSourceProxy 映射缓存DataSourceProxyHolder.put(origin, proxy);return enhancer;}

io.seata.spring.annotation.datasource.SeataAutoDataSourceProxyAdvice#invoke

    public Object invoke(MethodInvocation invocation) throws Throwable {// check whether current context is expectedif (!inExpectedContext()) {return invocation.proceed();}Method method = invocation.getMethod();String name = method.getName();Class<?>[] parameterTypes = method.getParameterTypes();Method declared;try {declared = DataSource.class.getDeclaredMethod(name, parameterTypes);} catch (NoSuchMethodException e) {// mean this method is not declared by DataSourcereturn invocation.proceed();}// switch invoke instance to its proxyDataSource origin = (DataSource) invocation.getThis();SeataDataSourceProxy proxy = DataSourceProxyHolder.get(origin);Object[] args = invocation.getArguments();// 方法由 SeataDataSourceProxy 代理类执行return declared.invoke(proxy, args);}

获取Connection对象

假设当前为seata的全局事务模式为AT,当原始数据源调用getConnection方法获取连接时,实际上调用的是DataSourceProxy.getConnection方法,拿到的是ConnectionProxy对象

    @Overridepublic ConnectionProxy getConnection() throws SQLException {Connection targetConnection = targetDataSource.getConnection();return new ConnectionProxy(this, targetConnection);}@Overridepublic ConnectionProxy getConnection(String username, String password) throws SQLException {Connection targetConnection = targetDataSource.getConnection(username, password);return new ConnectionProxy(this, targetConnection);}

获取Statement对象

AbstractConnectionProxy

 public Statement createStatement() throws SQLException {Statement targetStatement = getTargetConnection().createStatement();return new StatementProxy(this, targetStatement);}@Overridepublic PreparedStatement prepareStatement(String sql) throws SQLException {String dbType = getDbType();// support oracle 10.2+PreparedStatement targetPreparedStatement = null;if (BranchType.AT == RootContext.getBranchType()) {List<SQLRecognizer> sqlRecognizers = SQLVisitorFactory.get(sql, dbType);if (sqlRecognizers != null && sqlRecognizers.size() == 1) {SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);if (sqlRecognizer != null && sqlRecognizer.getSQLType() == SQLType.INSERT) {TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dbType).getTableMeta(getTargetConnection(),sqlRecognizer.getTableName(), getDataSourceProxy().getResourceId());String[] pkNameArray = new String[tableMeta.getPrimaryKeyOnlyName().size()];tableMeta.getPrimaryKeyOnlyName().toArray(pkNameArray);targetPreparedStatement = getTargetConnection().prepareStatement(sql,pkNameArray);}}}if (targetPreparedStatement == null) {targetPreparedStatement = getTargetConnection().prepareStatement(sql);}return new PreparedStatementProxy(this, targetPreparedStatement, sql);}

而获取的Statement 与 PreparedStatement 分别是 StatementProxy 与 PreparedStatementProxy,而两者都是由ExecuteTemplate来执行sql语句,假设RM此时要执行的业务是更新操作

   public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers,StatementProxy<S> statementProxy,StatementCallback<T, S> statementCallback,Object... args) throws SQLException {// 如果当前即不需要全局锁 并且事务类型不为AT模式,那么直接执行原生SQLif (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) {// Just work as original statementreturn statementCallback.execute(statementProxy.getTargetStatement(), args);}// 获取数据源类型String dbType = statementProxy.getConnectionProxy().getDbType();if (CollectionUtils.isEmpty(sqlRecognizers)) {sqlRecognizers = SQLVisitorFactory.get(statementProxy.getTargetSQL(),dbType);}Executor<T> executor;if (CollectionUtils.isEmpty(sqlRecognizers)) {executor = new PlainExecutor<>(statementProxy, statementCallback);} else {if (sqlRecognizers.size() == 1) {SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);// 区分当前执行的SQL语句类型switch (sqlRecognizer.getSQLType()) {case INSERT:executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},new Object[]{statementProxy, statementCallback, sqlRecognizer});break;// 假设当前执行的是UPDATAE语句case UPDATE:executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);break;case DELETE:executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);break;case SELECT_FOR_UPDATE:executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);break;case INSERT_ON_DUPLICATE_UPDATE:switch (dbType) {case JdbcConstants.MYSQL:case JdbcConstants.MARIADB:executor =new MySQLInsertOrUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);break;default:throw new NotSupportYetException(dbType + " not support to INSERT_ON_DUPLICATE_UPDATE");}break;default:executor = new PlainExecutor<>(statementProxy, statementCallback);break;}} else {executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);}}T rs;try {// 执行器执行SQL语句rs = executor.execute(args);} catch (Throwable ex) {if (!(ex instanceof SQLException)) {// Turn other exception into SQLExceptionex = new SQLException(ex);}throw (SQLException) ex;}return rs;}

UPDATE语句使用的是UpdateExecutor执行器,而UpdateExecutor的类关系图如下

image-20231126101103175

由父类BaseTransactionalExecutor 调用execute方法,将RootContext中的XID绑定到StatementProxy中,并标识是否需要全局锁

   public T execute(Object... args) throws Throwable {String xid = RootContext.getXID();if (xid != null) {statementProxy.getConnectionProxy().bind(xid);}statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());return doExecute(args);}

关闭自动提交事务并执行sql

接着调用子类AbstractDMLBaseExecutor的doExecute方法,主要是关闭自动提交,然后执行 executeAutoCommitFalse 方法

    public T doExecute(Object... args) throws Throwable {AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();// connectionProxy.getAutoCommit() 会判断原始数据源对象是否是自动提交事务 (JDBC默认提交事务)// 但在seata中 即使原始数据源是自动化提交,在 executeAutoCommitTrue 方法中也会关闭自动提交事务if (connectionProxy.getAutoCommit()) {// 将自动提交设置为false,再调用executeAutoCommitFalsereturn executeAutoCommitTrue(args);} else {return executeAutoCommitFalse(args);}}

io.seata.rm.datasource.exec.AbstractDMLBaseExecutor#executeAutoCommitFalse

    protected T executeAutoCommitFalse(Object[] args) throws Exception {// 要求当前数据源是MYSQL, 且不能是多主键if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) {throw new NotSupportYetException("multi pk only support mysql!");}// 封装前镜像数据 TableRecords 记录了每一个字段对应的值,且获取的是当前最新的数据TableRecords beforeImage = beforeImage();// 执行sql语句T result = statementCallback.execute(statementProxy.getTargetStatement(), args);int updateCount = statementProxy.getUpdateCount();//修改成功if (updateCount > 0) {//封装后镜像数据TableRecords afterImage = afterImage(beforeImage);//将前后镜像打包成回滚日志 其中包括全局锁信息prepareUndoLog(beforeImage, afterImage);}return result;}

博主使用的seata版本是1.5.2,此版本不支持多主键.

为了保证前镜像获取的数据是最新的,创建SQL查询语句时会拼接 FOR UPDATE ,使用当前读的方式。成功修改数据后,会将修改后的数据也构建成镜像数据,最后由前后镜像一起组成回滚日志。

提交事务

由于我们获取的数据源连接对象时ConnectionProxy,因此提交事务时也时此对象进行提交

io.seata.rm.datasource.ConnectionProxy#commit

    public void commit() throws SQLException {try {//由lockRetryPolicy手动提交事务 支持全局锁获取失败重试机制lockRetryPolicy.execute(() -> {doCommit();return null;});} catch (SQLException e) {if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) {// 如果出现SQLException 异常 回滚本地事务,向TC报告,并移除RootContext中的全局事务XIDrollback();}throw e;} catch (Exception e) {throw new SQLException(e);}}

io.seata.rm.datasource.ConnectionProxy#doCommit

    private void doCommit() throws SQLException {//全局事务XID不为空if (context.inGlobalTransaction()) {processGlobalTransactionCommit();//要求全局锁} else if (context.isGlobalLockRequire()) {processLocalCommitWithGlobalLocks();} else {//都不是直接提交targetConnection.commit();}}

io.seata.rm.datasource.ConnectionProxy#processGlobalTransactionCommit

    public void commit() throws SQLException {try {//由lockRetryPolicy手动提交事务 支持全局锁获取失败重试机制// 如果是获取全局锁失败,将会重试lockRetryPolicy.execute(() -> {doCommit();return null;});} catch (SQLException e) {if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) {// 如果出现SQLException 异常 回滚本地事务,向TC报告,并移除RootContext中的全局事务XIDrollback();}throw e;} catch (Exception e) {throw new SQLException(e);}}

由lockRetryPolicy手动提交事务,而如果出现全局锁冲突,将会在线程休眠后重试

io.seata.rm.datasource.ConnectionProxy.LockRetryPolicy#execute

        protected <T> T doRetryOnLockConflict(Callable<T> callable) throws Exception {LockRetryController lockRetryController = new LockRetryController();while (true) {try {return callable.call();} catch (LockConflictException lockConflict) {//先回滚本地事务onException(lockConflict);// AbstractDMLBaseExecutor#executeAutoCommitTrue the local lock is releasedif (connection.getContext().isAutoCommitChanged()&& lockConflict.getCode() == TransactionExceptionCode.LockKeyConflictFailFast) {lockConflict.setCode(TransactionExceptionCode.LockKeyConflict);}//休眠10秒收重试lockRetryController.sleep(lockConflict);} catch (Exception e) {onException(e);throw e;}}}

io.seata.rm.datasource.ConnectionProxy#processGlobalTransactionCommit

private void processGlobalTransactionCommit() throws SQLException {try {//在回滚日志已经准备好的情况下 向TC注册分支事务,并将分支事务ID保存到ConnectionContext中register();} catch (TransactionException e) {recognizeLockKeyConflictException(e, context.buildLockKeys());}try {// 将回滚日志刷新到数据库中 等待和本地事务一起提交UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);//本地事务提交targetConnection.commit();} catch (Throwable ex) {LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);report(false);throw new SQLException(ex);}if (IS_REPORT_SUCCESS_ENABLE) {report(true);}context.reset();}

Undo回滚日志将和本地事务一起提交,但在提交前,需要先注册分支事务ID

io.seata.rm.AbstractResourceManager#branchRegister

    public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException {try {BranchRegisterRequest request = new BranchRegisterRequest();request.setXid(xid);request.setLockKey(lockKeys);request.setResourceId(resourceId);request.setBranchType(branchType);request.setApplicationData(applicationData);BranchRegisterResponse response = (BranchRegisterResponse) RmNettyRemotingClient.getInstance().sendSyncRequest(request);if (response.getResultCode() == ResultCode.Failed) {throw new RmTransactionException(response.getTransactionExceptionCode(), String.format("Response[ %s ]", response.getMsg()));}return response.getBranchId();} catch (TimeoutException toe) {throw new RmTransactionException(TransactionExceptionCode.IO, "RPC Timeout", toe);} catch (RuntimeException rex) {throw new RmTransactionException(TransactionExceptionCode.BranchRegisterFailed, "Runtime", rex);}}

注册分支事务需要将当前全局事务的XID,全局锁,资源信息,分支事务类型以及应用数据,一起构建成BranchRegisterRequest消息发送给TC,具体如下数据所示

{"branchType": "AT","lockKey": "product_table:1","resourceId": "jdbc:mysql://localhost:3306/seata-test","typeCode": 11,"xid": "xxx.xx.xx.xxx:8091:1378562425205329921"
}

3.TC处理 BranchRegisterRequest 消息

TC在处理 BranchRegisterRequest消息 依然也是 ServerOnRequestProcessor,最终交由AbstractTCInboundHandle进行处理

io.seata.server.AbstractTCInboundHandler#handle(io.seata.core.protocol.transaction.BranchRegisterRequest, io.seata.core.rpc.RpcContext)

 public BranchRegisterResponse handle(BranchRegisterRequest request, final RpcContext rpcContext) {BranchRegisterResponse response = new BranchRegisterResponse();exceptionHandleTemplate(new AbstractCallback<BranchRegisterRequest, BranchRegisterResponse>() {@Overridepublic void execute(BranchRegisterRequest request, BranchRegisterResponse response)throws TransactionException {try {//真正处理分支注册doBranchRegister(request, response, rpcContext);} catch (StoreException e) {throw new TransactionException(TransactionExceptionCode.FailedStore, String.format("branch register request failed. xid=%s, msg=%s", request.getXid(), e.getMessage()), e);}}}, request, response);return response;}

io.seata.server.coordinator.DefaultCoordinator#doBranchRegister

 @Overrideprotected void doBranchRegister(BranchRegisterRequest request, BranchRegisterResponse response,RpcContext rpcContext) throws TransactionException {MDC.put(RootContext.MDC_KEY_XID, request.getXid());// 获取分支IDresponse.setBranchId(core.branchRegister(request.getBranchType(), request.getResourceId(), rpcContext.getClientId(),request.getXid(), request.getApplicationData(), request.getLockKey()));}

io.seata.server.coordinator.DefaultCore#branchRegister

 @Overridepublic Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,String applicationData, String lockKeys) throws TransactionException {// 当前事务类型为AT模式,返回 ATCore 在调用 branchRegister 方法return getCore(branchType).branchRegister(branchType, resourceId, clientId, xid,applicationData, lockKeys);}

io.seata.server.coordinator.AbstractCore#branchRegister

 public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,String applicationData, String lockKeys) throws TransactionException {GlobalSession globalSession = assertGlobalSessionNotNull(xid, false);return SessionHolder.lockAndExecute(globalSession, () -> {// 全局事务当前必须是激活且是beging状态 否则抛出异常globalSessionStatusCheck(globalSession);globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());//注册分支事务回话(将分支事务信息绑定上全局事务XID)BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId,applicationData, lockKeys, clientId);MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(branchSession.getBranchId()));//加全局锁branchSessionLock(globalSession, branchSession);try {//将分支事务会话添加到全局事务会话到集合中,并向监听器添加onAddBranch事件,将分支事务会话持久到数据库中globalSession.addBranch(branchSession);} catch (RuntimeException ex) {//如果出现异常,从数据库中删除分支事务信息branchSessionUnlock(branchSession);throw new BranchTransactionException(FailedToAddBranch, String.format("Failed to store branch xid = %s branchId = %s", globalSession.getXid(),branchSession.getBranchId()), ex);}if (LOGGER.isInfoEnabled()) {LOGGER.info("Register branch successfully, xid = {}, branchId = {}, resourceId = {} ,lockKeys = {}",globalSession.getXid(), branchSession.getBranchId(), resourceId, lockKeys);}return branchSession.getBranchId();});}

首先会判断当前全局事务的状态,接着创建一个新的BranchSession对象,生成branchId,并绑定上XID;接着就是尝试加全局锁,加锁失败会抛出异常;然后再将分支事务会话添加到全局事务会话到集合中,并向监听器添加onAddBranch事件,将分支事务会话持久到数据库中,并返回分支事务ID。如果此过程出现异常,则会删除分支事务信息。

判断全局事务状态

            // 全局事务当前必须是激活且是beging状态 否则抛出异常globalSessionStatusCheck(globalSession);

io.seata.server.coordinator.AbstractCore#globalSessionStatusCheck

protected void globalSessionStatusCheck(GlobalSession globalSession) throws GlobalTransactionException {//全局事务必须是激活状态if (!globalSession.isActive()) {throw new GlobalTransactionException(GlobalTransactionNotActive, String.format("Could not register branch into global session xid = %s status = %s, cause by globalSession not active",globalSession.getXid(), globalSession.getStatus()));}//全局事务必须是begin状态if (globalSession.getStatus() != GlobalStatus.Begin) {throw new GlobalTransactionException(GlobalTransactionStatusInvalid, String.format("Could not register branch into global session xid = %s status = %s while expecting %s",globalSession.getXid(), globalSession.getStatus(), GlobalStatus.Begin));}}

全局事务当前必须是激活且是beging状态 否则抛出异常

注册分支事务

BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId,applicationData, lockKeys, clientId);

io.seata.server.session.SessionHelper#newBranchByGlobal(io.seata.server.session.GlobalSession, io.seata.core.model.BranchType, java.lang.String, java.lang.String, java.lang.String, java.lang.String)

 public static BranchSession newBranchByGlobal(GlobalSession globalSession, BranchType branchType, String resourceId,String applicationData, String lockKeys, String clientId) {//创建新的BranchSession对象BranchSession branchSession = new BranchSession();//分别绑定XID,事务ID,新生成的分支事务id 并设置事务类型与资源信息,锁信息,RM客户端信息(服务名和id)branchSession.setXid(globalSession.getXid());branchSession.setTransactionId(globalSession.getTransactionId());branchSession.setBranchId(UUIDGenerator.generateUUID());branchSession.setBranchType(branchType);branchSession.setResourceId(resourceId);branchSession.setLockKey(lockKeys);branchSession.setClientId(clientId);branchSession.setApplicationData(applicationData);return branchSession;}

最终的分支信息如图所示

image-20231126172153705

添加全局锁

//加全局锁branchSessionLock(globalSession, branchSession);

io.seata.server.transaction.at.ATCore#branchSessionLock

 @Overrideprotected void branchSessionLock(GlobalSession globalSession, BranchSession branchSession)throws TransactionException {String applicationData = branchSession.getApplicationData();boolean autoCommit = true;boolean skipCheckLock = false;//applicationData为空 往下走if (StringUtils.isNotBlank(applicationData)) {if (objectMapper == null) {objectMapper = new ObjectMapper();}try {Map<String, Object> data = objectMapper.readValue(applicationData, HashMap.class);Object clientAutoCommit = data.get(AUTO_COMMIT);if (clientAutoCommit != null && !(boolean)clientAutoCommit) {autoCommit = (boolean)clientAutoCommit;}Object clientSkipCheckLock = data.get(SKIP_CHECK_LOCK);if (clientSkipCheckLock instanceof Boolean) {skipCheckLock = (boolean)clientSkipCheckLock;}} catch (IOException e) {LOGGER.error("failed to get application data: {}", e.getMessage(), e);}}try {//加全局锁if (!branchSession.lock(autoCommit, skipCheckLock)) {throw new BranchTransactionException(LockKeyConflict,String.format("Global lock acquire failed xid = %s branchId = %s", globalSession.getXid(),branchSession.getBranchId()));}} catch (StoreException e) {if (e.getCause() instanceof BranchTransactionException) {throw new BranchTransactionException(((BranchTransactionException)e.getCause()).getCode(),String.format("Global lock acquire failed xid = %s branchId = %s", globalSession.getXid(),branchSession.getBranchId()));}throw e;}}

io.seata.server.session.BranchSession#lock(boolean, boolean)

public boolean lock(boolean autoCommit, boolean skipCheckLock) throws TransactionException {if (this.getBranchType().equals(BranchType.AT)) {//获取锁资源管理器执行加速操作return LockerManagerFactory.getLockManager().acquireLock(this, autoCommit, skipCheckLock);}return true;}

无论是哪种锁资源管理器,其调用acquireLock都是走的父类AbstractLockManager的方法

io.seata.server.lock.AbstractLockManager#acquireLock(io.seata.server.session.BranchSession, boolean, boolean)

 @Overridepublic boolean acquireLock(BranchSession branchSession, boolean autoCommit, boolean skipCheckLock) throws TransactionException {if (branchSession == null) {throw new IllegalArgumentException("branchSession can't be null for memory/file locker.");}// lockKey通常是由 表名 + 主键组成String lockKey = branchSession.getLockKey();if (StringUtils.isNullOrEmpty(lockKey)) {// no lockreturn true;}// get locks of branch// 使用 ; 分割lockKey 获取RowLock集合,RowLock封装了 XID,分支事务ID,表民,主键等信息List<RowLock> locks = collectRowLocks(branchSession);if (CollectionUtils.isEmpty(locks)) {// no lockreturn true;}// 获取枷锁操作对象,调用acquireLock方法return getLocker(branchSession).acquireLock(locks, autoCommit, skipCheckLock);}

我们就看 DataBaseLocker 是怎么加锁的

    public boolean acquireLock(List<RowLock> locks, boolean autoCommit, boolean skipCheckLock) {// 首先RowLock集合对象不能为空if (CollectionUtils.isEmpty(locks)) {// no lockreturn true;}try {return lockStore.acquireLock(convertToLockDO(locks), autoCommit, skipCheckLock);} catch (StoreException e) {throw e;} catch (Exception t) {LOGGER.error("AcquireLock error, locks:{}", CollectionUtils.toString(locks), t);return false;}}

io.seata.server.storage.db.lock.LockStoreDataBaseDAO#acquireLock(java.util.List<io.seata.core.store.LockDO>, boolean, boolean)

 public boolean acquireLock(List<LockDO> lockDOs, boolean autoCommit, boolean skipCheckLock) {Connection conn = null;PreparedStatement ps = null;ResultSet rs = null;Set<String> dbExistedRowKeys = new HashSet<>();boolean originalAutoCommit = true;if (lockDOs.size() > 1) {lockDOs = lockDOs.stream().filter(LambdaUtils.distinctByKey(LockDO::getRowKey)).collect(Collectors.toList());}try {conn = lockStoreDataSource.getConnection();if (originalAutoCommit = conn.getAutoCommit()) {conn.setAutoCommit(false);}List<LockDO> unrepeatedLockDOs = lockDOs;//check lockif (!skipCheckLock) {boolean canLock = true;//query// select * from lock_table where row_key in ( ? ) order by status desc// RowKey 由数据库资源信息 + 表名 + 主键组成String checkLockSQL = LockStoreSqlFactory.getLogStoreSql(dbType).getCheckLockableSql(lockTable, lockDOs.size());ps = conn.prepareStatement(checkLockSQL);for (int i = 0; i < lockDOs.size(); i++) {ps.setString(i + 1, lockDOs.get(i).getRowKey());}rs = ps.executeQuery();String currentXID = lockDOs.get(0).getXid();boolean failFast = false;// 如果在lock_tabel中根据  RowKey 找到了记录,需要判断当前xid是否与数据库中的xid是否一致// 如果不一致 加速失败while (rs.next()) {String dbXID = rs.getString(ServerTableColumnsName.LOCK_TABLE_XID);if (!StringUtils.equals(dbXID, currentXID)) {if (LOGGER.isInfoEnabled()) {String dbPk = rs.getString(ServerTableColumnsName.LOCK_TABLE_PK);String dbTableName = rs.getString(ServerTableColumnsName.LOCK_TABLE_TABLE_NAME);long dbBranchId = rs.getLong(ServerTableColumnsName.LOCK_TABLE_BRANCH_ID);LOGGER.info("Global lock on [{}:{}] is holding by xid {} branchId {}", dbTableName, dbPk, dbXID, dbBranchId);}if (!autoCommit) {int status = rs.getInt(ServerTableColumnsName.LOCK_TABLE_STATUS);if (status == LockStatus.Rollbacking.getCode()) {failFast = true;}}canLock = false;break;}// 将已经存在的全局锁缓冲起来dbExistedRowKeys.add(rs.getString(ServerTableColumnsName.LOCK_TABLE_ROW_KEY));}if (!canLock) {// 加锁失败 回滚保存分支事务信息的sqlconn.rollback();if (failFast) {throw new StoreException(new BranchTransactionException(LockKeyConflictFailFast));}return false;}// If the lock has been exists in db, remove it from the lockDOsif (CollectionUtils.isNotEmpty(dbExistedRowKeys)) {unrepeatedLockDOs = lockDOs.stream().filter(lockDO -> !dbExistedRowKeys.contains(lockDO.getRowKey())).collect(Collectors.toList());}if (CollectionUtils.isEmpty(unrepeatedLockDOs)) {conn.rollback();return true;}}// lockif (unrepeatedLockDOs.size() == 1) {LockDO lockDO = unrepeatedLockDOs.get(0);// 生成sql语句,将全局锁信息插入到lock_table表中if (!doAcquireLock(conn, lockDO)) {if (LOGGER.isInfoEnabled()) {LOGGER.info("Global lock acquire failed, xid {} branchId {} pk {}", lockDO.getXid(), lockDO.getBranchId(), lockDO.getPk());}conn.rollback();return false;}} else {if (!doAcquireLocks(conn, unrepeatedLockDOs)) {if (LOGGER.isInfoEnabled()) {LOGGER.info("Global lock batch acquire failed, xid {} branchId {} pks {}", unrepeatedLockDOs.get(0).getXid(),unrepeatedLockDOs.get(0).getBranchId(), unrepeatedLockDOs.stream().map(lockDO -> lockDO.getPk()).collect(Collectors.toList()));}conn.rollback();return false;}}conn.commit();return true;} catch (SQLException e) {throw new StoreException(e);} finally {IOUtil.close(rs, ps);if (conn != null) {try {if (originalAutoCommit) {conn.setAutoCommit(true);}conn.close();} catch (SQLException e) {}}}}

注册全局锁之前会根据RowKey查询全局锁信息是否已经存在,生成的查询语句如下

select * from lock_table where row_key in ( ? ) order by status desc

RowKey如下所示

jdbc:mysql://localhost:3306/seata-test^^^product_table^^^1

如果locktable表中已经有数据了,则比较数据中XID与当前当前的xid是否一致,如果不一致说明已经被其他全局事务加了锁,则标记当前加锁失败,并抛出异常;如果一致,则将当前的RowKey缓冲到set集合中。最后调用LockStoreDataBaseDAO.doAcquireLock方法将锁信息保存到数据库lock_table表中。

持久化分支信息

成功添加上全局锁后,就要将分支事务会话添加到全局事务会话到集合中,并向监听器添加onAddBranch事件,将分支事务会话持久到数据库中

globalSession.addBranch(branchSession);

io.seata.server.session.GlobalSession#addBranch

    public void addBranch(BranchSession branchSession) throws TransactionException {for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {// 分支事务持久化lifecycleListener.onAddBranch(this, branchSession);}// 将分支事务标记为已注册状态branchSession.setStatus(BranchStatus.Registered);// 将分支事务添加到GlobalSession到List集合中add(branchSession);}

返回分支事务ID

 return branchSession.getBranchId();

最后携带刚生成的分支事务ID返回BranchRegisterResponse消息给到RM

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/173058.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于Java SSM框架+Vue留学生交流互动论坛网站项目【项目源码+论文说明】计算机毕业设计

基于java的SSM框架Vue实现学生交流互动论坛网站演示 摘要 21世纪的今天&#xff0c;随着社会的不断发展与进步&#xff0c;人们对于信息科学化的认识&#xff0c;已由低层次向高层次发展&#xff0c;由原来的感性认识向理性认识提高&#xff0c;管理工作的重要性已逐渐被人们所…

【腾讯云 HAI域探秘】基于高性能应用服务器HAI部署的 ChatGLM2-6B模型,我开发了AI办公助手,公司行政小姐姐用了都说好!

目录 前言 一、腾讯云HAI介绍&#xff1a; 1、即插即用 轻松上手 2、横向对比 青出于蓝 3、多种高性能应用部署场景 二、腾讯云HAI一键部署并使用ChatGLM2-6B快速实现开发者所需的相关API服务 1、登录 高性能应用服务 HAI 控制台 2、点击 新建 选择 AI模型&#xff0c;…

【C/C++】如何不使用 sizeof 求数据类型占用的字节数

实现代码&#xff1a; #include <stdio.h>#define GET_TYPE_SIZE(TYPE) ((char *)(&TYPE 1) - (char *) & TYPE)int main(void) {char a a;short b 0;int c 0;long d 0;long long e 0;float f 0.0;double g 0.0;long double h 0.0;char* i NULL;print…

如何在gitlab上使用hooks

参考链接&#xff1a;gitlab git hooks 1. Git Hook 介绍 与许多其他版本控制系统一样&#xff0c;Git 有一种方法可以在发生某些重要操作时&#xff0c;触发自定义脚本&#xff0c;即 Git Hook&#xff08;Git 钩子&#xff09;。 当我们初始化一个项目之后&#xff0c;.git…

Qt 样式表

QLabel&#xff0c;应用于Widget&#xff1a; .QLabel {background-color:pink; }.QLabel[warnlevel_1] {border:5px solid yellow; }.QLabel[warnlevel_2] {border:5px solid red; } QWidget{background-color:rgb(54,54,54); }QLineEdit{border: 1px solid #ABCDA0; /…

Android设计模式--桥接模式

闻正言&#xff0c;行正道&#xff0c;左右前后皆正人 一&#xff0c;定义 将抽象部分与实现部分分离&#xff0c;使它们都可以独立地进行变化 二&#xff0c;使用场景 从模式的定义中&#xff0c;我们大致可以了解到&#xff0c;这里的桥接的作用其实就是连接抽象部分与实现…

PHP 针对mysql 自动生成数据字典

PHP 针对mysql 自动生成数据字典 确保php 可以正常使用mysqli 扩展 这里还需要注意 数据库密码 如果密码中有特殊字符 如&#xff1a; 首先&#xff0c;我们需要了解MySQL中的特殊字符包括哪些。MySQL中的特殊字符主要包括以下几类&#xff1a; 1. 单引号&#xff08;&a…

嵌入式硬件电路·电平

目录 1. 电平的概念 1.1 高电平 1.2 低电平 2. 电平的使用场景 2.1 高电平使能 2.2 低电平使能 2.3 失能 1. 电平的概念 电平是指电信号电压的大小或高低状态。在数字电子学中&#xff0c;电平有两种状态&#xff0c;高电平和低电平&#xff0c;用来表示二进制中…

ChatGPT初体验:注册、API Key获取与ChatAPI调用详解

自从2022年10月&#xff0c;ChatGPT诞生以后&#xff0c;实际上已经改变了很多&#xff01;其火爆程度简直超乎想象&#xff0c;一周的时间用户过百万&#xff0c;两个月的时间用户过亿。 目前ChatGPT4已经把2023年4月以前的人类的知识都学习到了&#xff0c;在软件工程里面&am…

Matplotlib不规则子图_Python数据分析与可视化

除了网格子图&#xff0c;matplotlib还支持不规则的多行多列子图网格。 plt.GridSpec()对象本事不能直接创建一个图形&#xff0c;他只是 plt.subplot()命令可以识别的简易接口。 这里创建了一个带行列间距的23网格&#xff1a; grid plt.GridSpec(2, 3, wspace0.4, hspace0…

mac VScode 添加PHP debug

在VScode里面添加PHP Debug 插件,根据debug描述内容操作 1: 随意在index里面写个方法,然后用浏览器访问你的hello 方法,正常会进入下边的内容 class IndexController {public function index(){return 您好&#xff01;这是一个[api]示例应用;}public function hello() {phpin…

group by

引入 日常开发中&#xff0c;我们经常会使用到group by。你是否知道group by的工作原理呢&#xff1f;group by和having有什么区别呢&#xff1f;group by的优化思路是怎样的呢&#xff1f;使用group by有哪些需要注意的问题呢&#xff1f; 使用group by的简单例子group by 工…

java学习part12多态

99-面向对象(进阶)-面向对象的特征三&#xff1a;多态性_哔哩哔哩_bilibili 1.多态&#xff08;仅限方法&#xff09; 父类引用指向子类对象。 调用重写的方法&#xff0c;就会执行子类重写的方法。 编译看引用表面类型&#xff0c;执行看实际变量类型。 2.父子同名属性是否…

数组与链表

作者简介&#xff1a;大家好&#xff0c;我是smart哥&#xff0c;前中兴通讯、美团架构师&#xff0c;现某互联网公司CTO 联系qq&#xff1a;184480602&#xff0c;加我进群&#xff0c;大家一起学习&#xff0c;一起进步&#xff0c;一起对抗互联网寒冬 除了HashMap&#xff0…

计算机服务器中了mallox勒索病毒如何处理,mallox勒索病毒解密文件恢复

科技技术的发展推动了企业的生产运营&#xff0c;网络技术的不断应用&#xff0c;极大地方便了企业日常生产生活&#xff0c;但网络毕竟是一把双刃剑&#xff0c;网络安全威胁一直存在&#xff0c;近期&#xff0c;云天数据恢复中心接到很多企业的求助&#xff0c;企业的计算机…

耶鲁博弈论笔记

编辑记录&#xff1a; 1126&#xff1a;开个新坑&#xff0c;耶鲁大学的博弈论课程&#xff0c; 和专业相关不大&#xff0c;纯兴趣&#xff0c;尽量写好一点吧 1. 首先指出博弈论是一种研究策略形式的方法&#xff0c;对于经济学中&#xff0c;完全竞争市场只能被动接受均衡…

Elasticsearch 的使用

一、简介 1.Shard&#xff08;分片&#xff09; 数据分散集群的架构模式&#xff0c;Elasticsearch 将一个 Index&#xff08;索引&#xff09;中的数据切为多个 Shard&#xff08;分片&#xff09;&#xff0c;分布在不同服务器节点上。 默认每个索引会分配5个主分片和1个副本…

C#的函数

可以发现首字母一般为动词 可以看出void是返回空值 这个就不是放回空值了 例如 函数修饰符 1、如上面的实例一样——无修饰符&#xff1a;如果一个参数没有用参数修饰符标记&#xff0c;则认为它将按值进行传递&#xff0c;这将以为着被调用的方法收到原始数据的一份副本。&am…

谈谈Redis的几种经典集群模式

目录 前言 主从复制 哨兵模式 分片集群 前言 Redis集群是一种通过将多个Redis节点连接在一起以实现高可用性、数据分片和负载均衡的技术。它允许Redis在不同节点上同时提供服务&#xff0c;提高整体性能和可靠性。在Redis中提供集群方案总共有三种&#xff1a;主从复制、…

PyQt6把QTDesigner生成的UI文件转成python源码,并运行

锋哥原创的PyQt6视频教程&#xff1a; 2024版 PyQt6 Python桌面开发 视频教程(无废话版) 玩命更新中~_哔哩哔哩_bilibili2024版 PyQt6 Python桌面开发 视频教程(无废话版) 玩命更新中~共计18条视频&#xff0c;包括&#xff1a;2024版 PyQt6 Python桌面开发 视频教程(无废话版…