【Seata源码学习 】篇四 TM事务管理器是如何开启全局事务

TM发送 单个或批量 消息

以发送GlobalBeginRequest消息为例

TM在执行拦截器链路前将向TC发送GlobalBeginRequest 消息

io.seata.tm.api.DefaultGlobalTransaction#begin(int, java.lang.String)

   @Overridepublic String begin(String applicationId, String transactionServiceGroup, String name, int timeout)throws TransactionException {GlobalBeginRequest request = new GlobalBeginRequest();request.setTransactionName(name);request.setTimeout(timeout);GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);if (response.getResultCode() == ResultCode.Failed) {throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());}return response.getXid();}

注意 消息TYPE_CODE 为 MessageType.TYPE_GLOBAL_BEGIN 值为 1

package io.seata.core.protocol.transaction;import io.seata.core.protocol.MessageType;
import io.seata.core.rpc.RpcContext;/*** The type Global begin request.** @author slievrly*/
public class GlobalBeginRequest extends AbstractTransactionRequestToTC {private int timeout = 60000;private String transactionName;/*** Gets timeout.** @return the timeout*/public int getTimeout() {return timeout;}/*** Sets timeout.** @param timeout the timeout*/public void setTimeout(int timeout) {this.timeout = timeout;}/*** Gets transaction name.** @return the transaction name*/public String getTransactionName() {return transactionName;}/*** Sets transaction name.** @param transactionName the transaction name*/public void setTransactionName(String transactionName) {this.transactionName = transactionName;}@Overridepublic short getTypeCode() {return MessageType.TYPE_GLOBAL_BEGIN;}@Overridepublic AbstractTransactionResponse handle(RpcContext rpcContext) {return handler.handle(this, rpcContext);}@Overridepublic String toString() {StringBuilder result = new StringBuilder();result.append("timeout=");result.append(timeout);result.append(",");result.append("transactionName=");result.append(transactionName);return result.toString();}
}

io.seata.tm.DefaultTransactionManager#syncCall

    private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {try {return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request);} catch (TimeoutException toe) {throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe);}}

io.seata.core.rpc.netty.AbstractNettyRemotingClient#sendSyncRequest(java.lang.Object)

@Overridepublic Object sendSyncRequest(Object msg) throws TimeoutException {// 获取seata服务端地址String serverAddress = loadBalance(getTransactionServiceGroup(), msg);// TM RPC服务调用默认超时时间30slong timeoutMillis = this.getRpcRequestTimeout();// 将GlobalBeginRequest 封装为 RpcMessageRpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);// send batch message// put message into basketMap, @see MergedSendRunnable// 是否开启seata客户端批量发送消息 1.5默认关闭if (this.isEnableClientBatchSendRequest()) {// send batch message is sync request, needs to create messageFuture and put it in futures.// 批量发送消息需要将消息封装为 MessageFuture 对象 并添加到 futures Map集合中MessageFuture messageFuture = new MessageFuture();messageFuture.setRequestMessage(rpcMessage);messageFuture.setTimeout(timeoutMillis);futures.put(rpcMessage.getId(), messageFuture);// put message into basketMap// 获取当前服务端地址对应的消息队列BlockingQueue<RpcMessage> basket = CollectionUtils.computeIfAbsent(basketMap, serverAddress,key -> new LinkedBlockingQueue<>());// 将当前消息添加到队列中 一般不会添加失败 LinkedBlockingQueue 是无界队列if (!basket.offer(rpcMessage)) {LOGGER.error("put message into basketMap offer failed, serverAddress:{},rpcMessage:{}",serverAddress, rpcMessage);return null;}if (LOGGER.isDebugEnabled()) {LOGGER.debug("offer message: {}", rpcMessage.getBody());}// 如果当前没有在发送队列消息 给mergeLock对象上锁成功 则唤醒所有等待发送消息的线程// isSending 被volatile 修饰 保证可见性和有序性 但是不保证原子性if (!isSending) {synchronized (mergeLock) {mergeLock.notifyAll();}}try {// MessageFuture 封装了 CompletableFuture 对象,此时会超时阻塞当前线程,超时时间30秒// 等待 CompletableFuture.complete 完成获取结果return messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);} catch (Exception exx) {// 如果有异常抛出LOGGER.error("wait response error:{},ip:{},request:{}",exx.getMessage(), serverAddress, rpcMessage.getBody());if (exx instanceof TimeoutException) {throw (TimeoutException) exx;} else {throw new RuntimeException(exx);}}} else {// 如果没有开启客户端批量发送消息 先获取channelChannel channel = clientChannelManager.acquireChannel(serverAddress);// 同步发送消息 并将RPCMessage 封装为 MessageFuture,并设置超时时间 放入 futures Map集合中// 由父类AbstractNettyRemoting的周期线程每隔3秒检查一次消息是否超时// 发送消息时会添加 ChannelFutureListener 监听器,如果消息成功,则调用 CompletableFuture.complete 设置结果,// 并将当前消息id对应的MessageFuture 从futures 中移除return super.sendSync(channel, rpcMessage, timeoutMillis);}}

单个发送消息

如果是发送单个消息,则直接调用AbstractNettyRemoting.sendSync 向TC端发送消息

io.seata.core.rpc.netty.AbstractNettyRemoting#sendSync

protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMillis) throws TimeoutException {if (timeoutMillis <= 0) {throw new FrameworkException("timeout should more than 0ms");}if (channel == null) {LOGGER.warn("sendSync nothing, caused by null channel.");return null;}MessageFuture messageFuture = new MessageFuture();messageFuture.setRequestMessage(rpcMessage);// 设置超时时间 用于检测是否超时 System.currentTimeMillis() - start > timeoutmessageFuture.setTimeout(timeoutMillis);futures.put(rpcMessage.getId(), messageFuture);channelWritableCheck(channel, rpcMessage.getBody());String remoteAddr = ChannelUtil.getAddressFromChannel(channel);doBeforeRpcHooks(remoteAddr, rpcMessage);channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {if (!future.isSuccess()) {//根据消息id从futures中移除,不再进行消息超时检测MessageFuture messageFuture1 = futures.remove(rpcMessage.getId());if (messageFuture1 != null) {//设置结果messageFuture1.setResultMessage(future.cause());}//销毁连接destroyChannel(future.channel());}});try {// 超时阻塞等待获取结果Object result = messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);doAfterRpcHooks(remoteAddr, rpcMessage, result);return result;} catch (Exception exx) {LOGGER.error("wait response error:{},ip:{},request:{}", exx.getMessage(), channel.remoteAddress(),rpcMessage.getBody());if (exx instanceof TimeoutException) {throw (TimeoutException) exx;} else {throw new RuntimeException(exx);}}}

messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS) 会阻塞等待30s获取消息,超时则抛出异常 TimeoutException

批量发送消息

 // send batch message is sync request, needs to create messageFuture and put it in futures.// 批量发送消息需要将消息封装为 MessageFuture 对象 并添加到 futures Map集合中MessageFuture messageFuture = new MessageFuture();messageFuture.setRequestMessage(rpcMessage);messageFuture.setTimeout(timeoutMillis);futures.put(rpcMessage.getId(), messageFuture);// put message into basketMap// 获取当前服务端地址对应的消息队列BlockingQueue<RpcMessage> basket = CollectionUtils.computeIfAbsent(basketMap, serverAddress,key -> new LinkedBlockingQueue<>());// 将当前消息添加到队列中 一般不会添加失败 LinkedBlockingQueue 是无界队列if (!basket.offer(rpcMessage)) {LOGGER.error("put message into basketMap offer failed, serverAddress:{},rpcMessage:{}",serverAddress, rpcMessage);return null;}if (LOGGER.isDebugEnabled()) {LOGGER.debug("offer message: {}", rpcMessage.getBody());}// 如果当前没有在发送队列消息 给mergeLock对象上锁成功 则唤醒所有等待发送消息的线程// isSending 被volatile 修饰 保证可见性和有序性 但是不保证原子性if (!isSending) {synchronized (mergeLock) {mergeLock.notifyAll();}}try {// MessageFuture 封装了 CompletableFuture 对象,此时会超时阻塞当前线程,超时时间30秒// 等待 CompletableFuture.complete 完成获取结果return messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);} catch (Exception exx) {// 如果有异常抛出LOGGER.error("wait response error:{},ip:{},request:{}",exx.getMessage(), serverAddress, rpcMessage.getBody());if (exx instanceof TimeoutException) {throw (TimeoutException) exx;} else {throw new RuntimeException(exx);}}

如果批量发送消息,则会将消息放到basketMap 集合中,AbstractNettyRemotingClient会在初始化时,启动最大和核心都是一的单线程池线程池,提交MergedSendRunnable 任务,死循环不断遍历basketMap,获取等待发送的消息队列,最终由io.seata.core.rpc.netty.AbstractNettyRemoting#sendAsync 发送异步消息。需要注意的是,不管是发送同步消息还是异步消息,TM开启事务所属的线程都会因messageFuture.get 超时阻塞,只不过发送和获取返回消息都变成了异步。

public void run() {// 死循环while (true) {//先上锁synchronized (mergeLock) {// 等待 1s 并释放当前锁try {mergeLock.wait(MAX_MERGE_SEND_MILLS);} catch (InterruptedException e) {}}isSending = true;// 遍历Map集合basketMap.forEach((address, basket) -> {if (basket.isEmpty()) {return;}MergedWarpMessage mergeMessage = new MergedWarpMessage();// 弹出同一个seata服务器地址等待发送的所有消息,合并在一块发送while (!basket.isEmpty()) {RpcMessage msg = basket.poll();// 获取消息体 与消息id 封装为 MergedWarpMessagemergeMessage.msgs.add((AbstractMessage) msg.getBody());mergeMessage.msgIds.add(msg.getId());}if (mergeMessage.msgIds.size() > 1) {printMergeMessageLog(mergeMessage);}Channel sendChannel = null;try {// send batch message is sync request, but there is no need to get the return value.// Since the messageFuture has been created before the message is placed in basketMap,// the return value will be obtained in ClientOnResponseProcessor.// 发送批量消息不会在此处阻塞等待消息的返回  将会采用异步的方式 由 ClientOnResponseProcessor 消息处理器获取返回消息sendChannel = clientChannelManager.acquireChannel(address);AbstractNettyRemotingClient.this.sendAsyncRequest(sendChannel, mergeMessage);} catch (FrameworkException e) {if (e.getErrcode() == FrameworkErrorCode.ChannelIsNotWritable && sendChannel != null) {destroyChannel(address, sendChannel);}// fast failfor (Integer msgId : mergeMessage.msgIds) {MessageFuture messageFuture = futures.remove(msgId);if (messageFuture != null) {messageFuture.setResultMessage(new RuntimeException(String.format("%s is unreachable", address), e));}}LOGGER.error("client merge call failed: {}", e.getMessage(), e);}});isSending = false;}}

接受异步消息由TM初始化时添加的ClientOnResponseProcessor 进行处理,将会遍历所有合并的消息,根据消息ID将其从futures中移除,并调用 future.setResultMessage 设置结果,此时TM发送消息时的阻塞状态将会被唤醒。

io.seata.core.rpc.processor.client.ClientOnResponseProcessor#process

  @Overridepublic void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {//如果当前消息属于合并发送的消息if (rpcMessage.getBody() instanceof MergeResultMessage) {//获取消息体与消息ID,并将消息id对应的MessageFuture从futures中移除,不再进行超时检测MergeResultMessage results = (MergeResultMessage) rpcMessage.getBody();MergedWarpMessage mergeMessage = (MergedWarpMessage) mergeMsgMap.remove(rpcMessage.getId());//遍历所有的消息for (int i = 0; i < mergeMessage.msgs.size(); i++) {int msgId = mergeMessage.msgIds.get(i);MessageFuture future = futures.remove(msgId);if (future == null) {LOGGER.error("msg: {} is not found in futures, result message: {}", msgId,results.getMsgs()[i]);} else {//在此时设置消息结果 结束阻塞等待future.setResultMessage(results.getMsgs()[i]);}}// 与合并消息的处理是一致的} else if (rpcMessage.getBody() instanceof BatchResultMessage) {try {BatchResultMessage batchResultMessage = (BatchResultMessage) rpcMessage.getBody();for (int i = 0; i < batchResultMessage.getMsgIds().size(); i++) {int msgId = batchResultMessage.getMsgIds().get(i);MessageFuture future = futures.remove(msgId);if (future == null) {LOGGER.error("msg: {} is not found in futures, result message: {}", msgId, batchResultMessage.getResultMessages().get(i));} else {future.setResultMessage(batchResultMessage.getResultMessages().get(i));}}} finally {// In order to be compatible with the old version, in the batch sending of version 1.5.0,// batch messages will also be placed in the local cache of mergeMsgMap,// but version 1.5.0 no longer needs to obtain batch messages from mergeMsgMapmergeMsgMap.clear();}} else {MessageFuture messageFuture = futures.remove(rpcMessage.getId());if (messageFuture != null) {messageFuture.setResultMessage(rpcMessage.getBody());} else {if (rpcMessage.getBody() instanceof AbstractResultMessage) {if (transactionMessageHandler != null) {transactionMessageHandler.onResponse((AbstractResultMessage) rpcMessage.getBody(), null);}}}}}

TC 处理 GlobalBeginRequest 消息

NettyChannel消息处理

io.seata.core.rpc.netty.NettyRemotingServer#init

 @Overridepublic void init() {// registry processorregisterProcessor();if (initialized.compareAndSet(false, true)) {super.init();}}

io.seata.core.rpc.netty.NettyRemotingServer#registerProcessor

private void registerProcessor() {// 1. registry on request message processorServerOnRequestProcessor onRequestProcessor =new ServerOnRequestProcessor(this, getHandler());ShutdownHook.getInstance().addDisposable(onRequestProcessor);super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor);super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor);//处理GlobalBeginRequest消息  ServerOnRequestProcessorsuper.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor);super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor);super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor);super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor);super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor);super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor);super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor);// 2. registry on response message processorServerOnResponseProcessor onResponseProcessor =new ServerOnResponseProcessor(getHandler(), getFutures());super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, branchResultMessageExecutor);super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, branchResultMessageExecutor);// 3. registry rm message processorRegRmProcessor regRmProcessor = new RegRmProcessor(this);super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor);// 4. registry tm message processorRegTmProcessor regTmProcessor = new RegTmProcessor(this);super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null);// 5. registry heartbeat message processorServerHeartbeatProcessor heartbeatMessageProcessor = new ServerHeartbeatProcessor(this);super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null);}	

进一步跟进查看具体如何处理

io.seata.core.rpc.processor.server.ServerOnRequestProcessor#process

    public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {// channel是否已经注册if (ChannelManager.isRegistered(ctx.channel())) {onRequestMessage(ctx, rpcMessage);} else {try {if (LOGGER.isInfoEnabled()) {LOGGER.info("closeChannelHandlerContext channel:" + ctx.channel());}ctx.disconnect();ctx.close();} catch (Exception exx) {LOGGER.error(exx.getMessage());}if (LOGGER.isInfoEnabled()) {LOGGER.info(String.format("close a unhandled connection! [%s]", ctx.channel().toString()));}}}

进入io.seata.core.rpc.processor.server.ServerOnRequestProcessor#onRequestMessage

private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {Object message = rpcMessage.getBody();RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());if (LOGGER.isDebugEnabled()) {LOGGER.debug("server received:{},clientIp:{},vgroup:{}", message,NetUtil.toIpAddress(ctx.channel().remoteAddress()), rpcContext.getTransactionServiceGroup());} else {try {BatchLogHandler.INSTANCE.getLogQueue().put(message + ",clientIp:" + NetUtil.toIpAddress(ctx.channel().remoteAddress()) + ",vgroup:"+ rpcContext.getTransactionServiceGroup());} catch (InterruptedException e) {LOGGER.error("put message to logQueue error: {}", e.getMessage(), e);}}//GlobalBeginRequest 继承 AbstractTransactionRequest 继承 AbstractMessageif (!(message instanceof AbstractMessage)) {return;}// 合并消息处理// the batch send request messageif (message instanceof MergedWarpMessage) {//是否开启了TC批量响应 默认false  rpcContext 的 version 不为空并且大于等于 1.5.0// 如果满足 则使用 MergedWarpMessage 来处理请求消息if (NettyServerConfig.isEnableTcServerBatchSendResponse() && StringUtils.isNotBlank(rpcContext.getVersion())&& Version.isAboveOrEqualVersion150(rpcContext.getVersion())) {List<AbstractMessage> msgs = ((MergedWarpMessage)message).msgs;List<Integer> msgIds = ((MergedWarpMessage)message).msgIds;//遍历处理for (int i = 0; i < msgs.size(); i++) {AbstractMessage msg = msgs.get(i);int msgId = msgIds.get(i);//是否开启并发处理消息 默认关闭if (PARALLEL_REQUEST_HANDLE) {CompletableFuture.runAsync(() -> handleRequestsByMergedWarpMessageBy150(msg, msgId, rpcMessage, ctx, rpcContext));} else {//单个消息处理handleRequestsByMergedWarpMessageBy150(msg, msgId, rpcMessage, ctx, rpcContext);}}} else {// results 响应结果集 如果开启了并发处理消息 需要保证线程安全// completableFutures 并发处理消息List<AbstractResultMessage> results = new CopyOnWriteArrayList<>();List<CompletableFuture<Void>> completableFutures = null;for (int i = 0; i < ((MergedWarpMessage)message).msgs.size(); i++) {// 默认关闭 没有开启并发处理消息 如果开启了 则使用 completableFutures 来并发处理消息if (PARALLEL_REQUEST_HANDLE) {if (completableFutures == null) {completableFutures = new ArrayList<>();}int finalI = i;// 并发异步处理消息,并将结果添加到results中completableFutures.add(CompletableFuture.runAsync(() -> {results.add(finalI, handleRequestsByMergedWarpMessage(((MergedWarpMessage)message).msgs.get(finalI), rpcContext));}));} else {// 处理消息并按顺序添加到results集合中results.add(i,handleRequestsByMergedWarpMessage(((MergedWarpMessage)message).msgs.get(i), rpcContext));}}if (CollectionUtils.isNotEmpty(completableFutures)) {try {CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0])).get();} catch (InterruptedException | ExecutionException e) {LOGGER.error("handle request error: {}", e.getMessage(), e);}}MergeResultMessage resultMessage = new MergeResultMessage();resultMessage.setMsgs(results.toArray(new AbstractResultMessage[0]));remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), resultMessage);}} else {// the single send request messagefinal AbstractMessage msg = (AbstractMessage) message;AbstractResultMessage result = transactionMessageHandler.onRequest(msg, rpcContext);remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), result);}}

io.seata.core.rpc.processor.server.ServerOnRequestProcessor#handleRequestsByMergedWarpMessage

private AbstractResultMessage handleRequestsByMergedWarpMessage(AbstractMessage subMessage, RpcContext rpcContext) {return transactionMessageHandler.onRequest(subMessage, rpcContext);
}

io.seata.server.coordinator.DefaultCoordinator#onRequest

public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {if (!(request instanceof AbstractTransactionRequestToTC)) {throw new IllegalArgumentException();}AbstractTransactionRequestToTC transactionRequest = (AbstractTransactionRequestToTC) request;transactionRequest.setTCInboundHandler(this);return transactionRequest.handle(context);}

io.seata.core.protocol.transaction.GlobalBeginRequest#handle

public AbstractTransactionResponse handle(RpcContext rpcContext) {return handler.handle(this, rpcContext);}

io.seata.server.AbstractTCInboundHandler#handle(io.seata.core.protocol.transaction.GlobalBeginRequest, io.seata.core.rpc.RpcContext)

  public GlobalBeginResponse handle(GlobalBeginRequest request, final RpcContext rpcContext) {GlobalBeginResponse response = new GlobalBeginResponse();exceptionHandleTemplate(new AbstractCallback<GlobalBeginRequest, GlobalBeginResponse>() {@Overridepublic void execute(GlobalBeginRequest request, GlobalBeginResponse response) throws TransactionException {try {//真正处理beging消息doGlobalBegin(request, response, rpcContext);} catch (StoreException e) {throw new TransactionException(TransactionExceptionCode.FailedStore,String.format("begin global request failed. xid=%s, msg=%s", response.getXid(), e.getMessage()),e);}}}, request, response);return response;}

获取全局事务XID

io.seata.server.coordinator.DefaultCoordinator#doGlobalBegin

    protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)throws TransactionException {// 通过DefaultCore根据应用ID,事务分组名称,超时时间 创建并开启一个新的事务 返回全局事务XID 放入GlobalBeginResponse中response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),request.getTransactionName(), request.getTimeout()));if (LOGGER.isInfoEnabled()) {LOGGER.info("Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{}",rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid());}}

全局事务会话持久化

io.seata.server.coordinator.DefaultCore#begin

    @Overridepublic String begin(String applicationId, String transactionServiceGroup, String name, int timeout)throws TransactionException {// 创建全局事务对象 由XID.generateXID 生成全局事务XIDGlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,timeout);// 将xid放进ThreadLocal中MDC.put(RootContext.MDC_KEY_XID, session.getXid());
//        SessionHolder.getRootSessionManager() 根据SPI机制去查找
//        SessionHolder 在 io.seata.server.Server.start 启动时初始化,获取当前配置的 会话持久机制模式  ,调用  SessionHolder.init(sessionStoreMode)进行初始化
//        此时我们通过 SessionHolder.getRootSessionManager() 将使用seata的SPI机制去 META-INF/services/ 与  META-INF/seata/ 目录下查找
//        文件名为 io.seata.server.session.SessionManager 的文件 在根据 @LoadLevel注解的name值加载需要的对象
//         例如 如果我们此时的session持久化模式为DB,那么  SessionHolder.getRootSessionManager() 将加载返回 DataBaseSessionManager 对象session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());//开启事务 标记GlobalSession 的事务状态为 1 ,并记录开启时间与激活状态  调用SessionLifecycleListener监听器的onBegin方法session.begin();// transaction start event//发送事务开启事件MetricsPublisher.postSessionDoingEvent(session, false);//返回XIDreturn session.getXid();}

Seata的SPI机制会根据 EnhancedServiceLoader.load(类, 名称) 方法参数一的类的全限定类名,从META-INF/services/ 与 META-INF/seata/ 路径下去匹配,然后根据匹配到的类的全限定类名,定位到具体的类,再根据参数二名称与@LoadLevel注解的name值进行匹配 ,确定要加载的对象,如下所示

image-20231121152441877

加载 DataBaseSessionManager 对象后,添加其到session的生命监听器列表中,在执行session.begin方法时,调用监听器的onBegin方法,进而由父类AbstractSessionManager 执行 onBegin方法,并调用DataBaseSessionManager重写的addGlobalSession方法

io.seata.server.storage.db.session.DataBaseSessionManager#addGlobalSession

    @Overridepublic void addGlobalSession(GlobalSession session) throws TransactionException {if (StringUtils.isBlank(taskName)) {// 将全局事务会话信息写入数据库boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_ADD, session);if (!ret) {throw new StoreException("addGlobalSession failed.");}} else {boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_UPDATE, session);if (!ret) {throw new StoreException("addGlobalSession failed.");}}}

io.seata.server.storage.db.store.DataBaseTransactionStoreManager#writeSession

    public boolean writeSession(LogOperation logOperation, SessionStorable session) {if (LogOperation.GLOBAL_ADD.equals(logOperation)) {// logStore 封装了 DataSource   根据不同类型的数据源生成相应的insert语句 持久化到数据库中return logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));} else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {return logStore.updateGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));} else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {return logStore.deleteGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));} else if (LogOperation.BRANCH_ADD.equals(logOperation)) {return logStore.insertBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));} else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {return logStore.updateBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));} else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {return logStore.deleteBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));} else {throw new StoreException("Unknown LogOperation:" + logOperation.name());}}

io.seata.server.storage.db.store.LogStoreDataBaseDAO#insertGlobalTransactionDO

public boolean insertGlobalTransactionDO(GlobalTransactionDO globalTransactionDO) {// 生成SQL插入 global_table (默认) 表中,持久化全局事务会话String sql = LogStoreSqlsFactory.getLogStoreSqls(dbType).getInsertGlobalTransactionSQL(globalTable);Connection conn = null;PreparedStatement ps = null;try {int index = 1;conn = logStoreDataSource.getConnection();//自动提交conn.setAutoCommit(true);ps = conn.prepareStatement(sql);ps.setString(index++, globalTransactionDO.getXid());ps.setLong(index++, globalTransactionDO.getTransactionId());ps.setInt(index++, globalTransactionDO.getStatus());ps.setString(index++, globalTransactionDO.getApplicationId());ps.setString(index++, globalTransactionDO.getTransactionServiceGroup());String transactionName = globalTransactionDO.getTransactionName();transactionName = transactionName.length() > transactionNameColumnSize ?transactionName.substring(0, transactionNameColumnSize) :transactionName;ps.setString(index++, transactionName);ps.setInt(index++, globalTransactionDO.getTimeout());ps.setLong(index++, globalTransactionDO.getBeginTime());ps.setString(index++, globalTransactionDO.getApplicationData());return ps.executeUpdate() > 0;} catch (SQLException e) {throw new StoreException(e);} finally {IOUtil.close(ps, conn);}}

最终生成的SQL如下所示

insert into global_table(xid, transaction_id, status, application_id, transaction_service_group, transaction_name,timeout, begin_time, application_data, gmt_create, gmt_modified)
values (全局事务XID, 事务id, 事务状态,应用id, 事务分组, 事务名称, 超时时间, 开始时间, 应用数据, now(), now())

数据库中的数据

image-20231121214708246

返回GlobalBeginResponse消息

在创建全局事务并开启后,拿到XID后封装到GlobalBeginResponse中,最终由remotingServer.sendAsyncResponse将GlobalBeginResponse消息返回给TM

TM处理 GlobalBeginResponse 消息

在seata服务端返回 GlobalBeginResponse 消息 后, TM还是由 io.seata.core.rpc.processor.client.ClientOnResponseProcessor#process 处理接收到的消息,通过调用future.setResultMessage 设置消息结果,并恢复阻塞的TM发送GlobalBeginRequest消息的线程,将结果返回

io.seata.tm.api.DefaultGlobalTransaction#begin(int, java.lang.String)

public void begin(int timeout, String name) throws TransactionException {if (role != GlobalTransactionRole.Launcher) {assertXIDNotNull();if (LOGGER.isDebugEnabled()) {LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);}return;}assertXIDNull();String currentXid = RootContext.getXID();if (currentXid != null) {throw new IllegalStateException("Global transaction already exists," +" can't begin a new global transaction, currentXid = " + currentXid);}// 获取到seata服务端返回到XIDxid = transactionManager.begin(null, null, name, timeout);status = GlobalStatus.Begin;//将xid绑定到RootContext中RootContext.bind(xid);if (LOGGER.isInfoEnabled()) {LOGGER.info("Begin new global transaction [{}]", xid);}}

获取到XID后,将XID绑定到RootContext中,至此,全局事务的开启过程也就结束了。

总结流程图

TM开启全局事务过程

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/165165.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

操作系统发展过程--单道批处理系统、多道批处理系统、分时系统、实时系统

一、单道批处理系统 计算机早期&#xff0c;为了能提高利用率&#xff0c;需要尽量保持系统的连续运行&#xff0c;即在处理完一个作业之后&#xff0c;紧接着处理下一个作业&#xff0c;以减少机器的空闲等待时间 1.单道批处理系统的处理过程 为了实现对作业的连续处理&…

51单片机应用从零开始(七)·循环语句(if语句,swtich语句)

51单片机应用从零开始&#xff08;一&#xff09;-CSDN博客 51单片机应用从零开始&#xff08;二&#xff09;-CSDN博客 51单片机应用从零开始&#xff08;三&#xff09;-CSDN博客 51单片机应用从零开始&#xff08;四&#xff09;-CSDN博客 51单片机应用从零开始&#xff08;…

数仓成本下降近一半,StarRocks 存算分离助力云览科技业务出海

成都云览科技有限公司倾力打造了凤凰浏览器&#xff0c;专注于为海外用户提供服务&#xff0c;公司致力于构建一个全球性的数字内容连接入口&#xff0c;为用户带来更为优质、高效、个性化的浏览体验。 作为数据驱动的高科技公司&#xff0c;从数据中挖掘价值一直是公司核心任务…

【Spring进阶系列丨第四篇】学习Spring中的Bean管理(基于xml配置)

前言 在之前的学习中我们知道&#xff0c;容器是一个空间的概念&#xff0c;一般理解为可盛放物体的地方。在Spring容器通常理解为BeanFactory或者ApplicationContext。我们知道spring的IOC容器能够帮我们创建对象&#xff0c;对象交给spring管理之后我们就不用手动去new对象。…

【Docker】从零开始:9.Docker命令:Push推送仓库(Docker Hub,阿里云)

【Docker】从零开始&#xff1a;9.Docker命令:Push推送仓库 知识点1.Docker Push有什么作用&#xff1f;2.Docker仓库有哪几种2.1 公有仓库2.2 第三方仓库2.3 私有仓库2.4 搭建私有仓库的方法有哪几种 3.Docker公有仓库与私有仓库的优缺点对比 Docker Push 命令标准语法操作参数…

openEuler 22.03 LTS x86_64 cephadm 部署ceph18.2.0 未完成 笔记

环境 准备三台虚拟机 10.47.76.94 node-1 10.47.76.95 node-2 10.47.76.96 node-3 下载cephadm [rootnode-1 ~]# yum install cephadm Last metadata expiration check: 0:11:31 ago on Tue 21 Nov 2023 10:00:20 AM CST. Dependencies resolved. Package …

酷开系统 | 酷开科技聚焦价值人群 助力营销增长

2023年&#xff0c;是消费复苏回暖的一年&#xff0c;市场中充溢着大量品牌重启增长的机遇与实例。品牌商期望能够把握住市场趋势&#xff0c;通过营销获得确定性的业绩提升&#xff0c;并在未来收获长期稳定的增长。作为数字媒介的代表之一&#xff0c;OTT大屏营销的属性和价值…

Vue学习之路------指令

Vue指令 vue会根据不同的指令&#xff0c;针对标签实现不同的功能 指令:带有v-前缀的特殊标签属性 1&#xff1a;v-html&#xff1a;指令 <div v-html"msg"></div> 2&#xff1a;v-show 作用&#xff1a;控制元素显示隐藏 语法&#xff1a;v-show&quo…

【SpringMVC】 对请求的不同响应

前言 本文学习如何运用不同的注解来返回不同的响应. 1.返回静态页面Controller 返回index.html页面 Controller 和 RestController的区别 controller 只有加上这个注解,Spring才会帮我们管理这个代码.后续我们访问时才能访问到. RestController 等同于 Controller ResponseBo…

UML建模图文详解教程01——Enterprise Architect的安装与使用

版权声明 本文原创作者&#xff1a;谷哥的小弟作者博客地址&#xff1a;http://blog.csdn.net/lfdfhl Enterprise Architect概述 官方网站&#xff1a;https://www.sparxsystems.cn/products/ea/&#xff1b;图示如下&#xff1a; Enterprise Architect是一个全功能的、基于…

B033-Servlet交互 JSP

目录 ServletServlet的三大职责跳转&#xff1a;请求转发和重定向请求转发重定向汇总请求转发与重定向的区别用请求转发和重定向完善登录 JSP第一个JSP概述注释设置创建JSP文件默认字符编码集 JSP的java代码书写JSP的原理三大指令九大内置对象改造动态web工程进行示例内置对象名…

2.HTML入门

目录 一.HTML介绍 二.HTML常用标签 2.1 标题标签 2.2 段落标签 2.3 超链接标签 2.4 图片标签 2.5 换行与空格 2.6 布局标签 2.7 列表标签 2.8 表单标签 一.HTML介绍 定义&#xff1a;将内容显示在网页&#xff0c;用来描述网页的一种语言&#xff0c;负责网页的架构…

Adiponectin 脂联素 ; T-cadherin +exosome

T-cadherin Adiponectin exosome T-cadherin Adiponectin exosome 代谢综合征中 外泌体、脂肪组织 和 脂联素 的器官间通讯-2019.pdf

C语言之字符串函数

C语言之字符串函数 文章目录 C语言之字符串函数1. strlen的使用和模拟实现1.1 strlen的使用1.2 strlen的模拟实现 2. strcpy的使用和模拟实现2.1 strcpy的使用2.2 strncpy的使用2.3 strcpy的模拟实现 3. strcat的使用和模拟实现3.1 strcat的使用3.2 strncat3.3 strcat的模拟实现…

什么是持续集成的自动化测试?

持续集成的自动化测试 如今互联网软件的开发、测试和发布&#xff0c;已经形成了一套非常标准的流程&#xff0c;最重要的组成部分就是持续集成&#xff08;Continuous integration&#xff0c;简称CI&#xff0c;目前主要的持续集成系统是Jenkins&#xff09;。 那么什么是持…

docker 安装常用环境

一、 安装linux&#xff08;完整&#xff09; 目前为止docker hub 还是被封着&#xff0c;用阿里云、腾讯云镜像找一找版本直接查就行 默认使用latest最新版 #:latest 可以不写 docker pull centos:latest # 拉取后查看 images docker images #给镜像设置标签 # docker tag […

FIB表与快速转发表工作原理

在一张路由表中&#xff0c;当存在多个路由项可同时匹配目的IP地址时&#xff0c;路由查找进程会选择掩码最长的路由项用于转发&#xff0c;即最长匹配原则。因为掩码越长&#xff0c;所处的网段范围就越小&#xff0c;网段的范围越小&#xff0c;就越能快速的定位到PC机的具体…

【分布式】小白看Ring算法 - 03

相关系列 【分布式】NCCL部署与测试 - 01 【分布式】入门级NCCL多机并行实践 - 02 【分布式】小白看Ring算法 - 03 【分布式】大模型分布式训练入门与实践 - 04 概述 NCCL&#xff08;NVIDIA Collective Communications Library&#xff09;是由NVIDIA开发的一种用于多GPU间…

GoLand 2023.2.5(GO语言集成开发工具环境)

GoLand是一款专门为Go语言开发者打造的集成开发环境&#xff08;IDE&#xff09;。它能够提供一系列功能&#xff0c;如代码自动完成、语法高亮、代码格式化、代码重构、代码调试等等&#xff0c;使编写代码更加高效和舒适。 GoLand的特点包括&#xff1a; 1. 智能代码补全&a…

Ubuntu安装CUDA驱动

Ubuntu安装CUDA驱动 前言官网安装确认安装版本安装CUDA Toolkit 前言 CUDA驱动一般指CUDA Toolkit&#xff0c;可通过Nvidia官网下载安装。本文介绍安装方法。 官网 CUDA Toolkit 最新版&#xff1a;CUDA Toolkit Downloads | NVIDIA Developer CUDA Toolkit 最新版文档&…