【Seata源码学习 】 篇二 TM与RM初始化过程

【Seata源码学习 】 篇二 TM与RM初始化过程

1.GlobalTransactionScanner 初始化

GlobalTransactionScanner 实现了InitializingBean 接口,在初始化后将执行自定义的初始化方法

io.seata.spring.annotation.GlobalTransactionScanner#afterPropertiesSet

   @Overridepublic void afterPropertiesSet() {//是否禁用了全局事务if (disableGlobalTransaction) {if (LOGGER.isInfoEnabled()) {LOGGER.info("Global transaction is disabled.");}return;}//初始化客户端initClient();}

io.seata.spring.annotation.GlobalTransactionScanner#initClient

private void initClient() {if (LOGGER.isInfoEnabled()) {LOGGER.info("Initializing Global Transaction Clients ... ");}if (DEFAULT_TX_GROUP_OLD.equals(txServiceGroup)) {LOGGER.warn("the default value of seata.tx-service-group: {} has already changed to {} since Seata 1.5, " +"please change your default configuration as soon as possible " +"and we don't recommend you to use default tx-service-group's value provided by seata",DEFAULT_TX_GROUP_OLD, DEFAULT_TX_GROUP);}if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));}//init TM//初始化事务管理器TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);if (LOGGER.isInfoEnabled()) {LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);}//init RM//初始化资源管理器RMClient.init(applicationId, txServiceGroup);if (LOGGER.isInfoEnabled()) {LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);}if (LOGGER.isInfoEnabled()) {LOGGER.info("Global Transaction Clients are initialized. ");}//注册应用上下文关闭回调方法registerSpringShutdownHook();}

image-20231113213409054

2. 初始化事务管理器 TM

流程图

image-20231114222639955

实例化 TmNettyRemotingClient

io.seata.tm.TMClient#init(java.lang.String, java.lang.String, java.lang.String, java.lang.String)

 public static void init(String applicationId, String transactionServiceGroup) {//TM进行netty网络通信的客户端// applicationId 当前应用id  transactionServiceGroup 事务分组名称TmNettyRemotingClient tmNettyRemotingClient = TmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);tmNettyRemotingClient.init();}

首先看下获取实例的方法

io.seata.core.rpc.netty.TmNettyRemotingClient#getInstance(java.lang.String, java.lang.String)

public static TmNettyRemotingClient getInstance(String applicationId, String transactionServiceGroup) {TmNettyRemotingClient tmNettyRemotingClient = getInstance();tmNettyRemotingClient.setApplicationId(applicationId);tmNettyRemotingClient.setTransactionServiceGroup(transactionServiceGroup);return tmNettyRemotingClient;}

io.seata.core.rpc.netty.TmNettyRemotingClient#getInstance()

    public static TmNettyRemotingClient getInstance() {//双检锁,保证只有一个实例if (instance == null) {synchronized (TmNettyRemotingClient.class) {if (instance == null) {//netty的配置NettyClientConfig nettyClientConfig = new NettyClientConfig();//消息处理线程池//核心线程和最大线程都是16 没有非核心线程//有界的阻塞队列 容量为200final ThreadPoolExecutor messageExecutor = new ThreadPoolExecutor(nettyClientConfig.getClientWorkerThreads(), nettyClientConfig.getClientWorkerThreads(),KEEP_ALIVE_TIME, TimeUnit.SECONDS,new LinkedBlockingQueue<>(MAX_QUEUE_SIZE),new NamedThreadFactory(nettyClientConfig.getTmDispatchThreadPrefix(),nettyClientConfig.getClientWorkerThreads()),RejectedPolicies.runsOldestTaskPolicy());//创建实例instance = new TmNettyRemotingClient(nettyClientConfig, null, messageExecutor);}}}return instance;}

io.seata.core.rpc.netty.TmNettyRemotingClient#TmNettyRemotingClient

private TmNettyRemotingClient(NettyClientConfig nettyClientConfig,EventExecutorGroup eventExecutorGroup,ThreadPoolExecutor messageExecutor) {//调用父类构造器super(nettyClientConfig, eventExecutorGroup, messageExecutor, NettyPoolKey.TransactionRole.TMROLE);//基于SPI机制加载鉴权签名组件 AuthSignerthis.signer = EnhancedServiceLoader.load(AuthSigner.class);// set enableClientBatchSendRequest// 是否开启了批量发送请求对配置。默认 falsethis.enableClientBatchSendRequest = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.ENABLE_TM_CLIENT_BATCH_SEND_REQUEST,DefaultValues.DEFAULT_ENABLE_TM_CLIENT_BATCH_SEND_REQUEST);//监听配置是否有变化ConfigurationCache.addConfigListener(ConfigurationKeys.ENABLE_TM_CLIENT_BATCH_SEND_REQUEST, new ConfigurationChangeListener() {@Overridepublic void onChangeEvent(ConfigurationChangeEvent event) {String dataId = event.getDataId();String newValue = event.getNewValue();if (ConfigurationKeys.ENABLE_TM_CLIENT_BATCH_SEND_REQUEST.equals(dataId) && StringUtils.isNotBlank(newValue)) {enableClientBatchSendRequest = Boolean.parseBoolean(newValue);}}});}

实例化 AbstractNettyRemotingClient

io.seata.core.rpc.netty.AbstractNettyRemotingClient#AbstractNettyRemotingClient

 public AbstractNettyRemotingClient(NettyClientConfig nettyClientConfig, EventExecutorGroup eventExecutorGroup,ThreadPoolExecutor messageExecutor, NettyPoolKey.TransactionRole transactionRole) {//调用父类构造器 用于处理消息的线程池super(messageExecutor);//当前事务角色this.transactionRole = transactionRole;//创建 NettyClientBootstrap 实例 clientBootstrap = new NettyClientBootstrap(nettyClientConfig, eventExecutorGroup, transactionRole);//消息处理器  clientBootstrap.setChannelHandlers(new ClientHandler());//channel管理器clientChannelManager = new NettyClientChannelManager(new NettyPoolableFactory(this, clientBootstrap), getPoolKeyFunction(), nettyClientConfig);}

实例化 AbstractNettyRemoting

io.seata.core.rpc.netty.AbstractNettyRemoting#AbstractNettyRemoting

  public AbstractNettyRemoting(ThreadPoolExecutor messageExecutor) {//设置处理消息的线程池this.messageExecutor = messageExecutor;}

初始化 TmNettyRemotingClient

回到

io.seata.tm.TMClient#init(java.lang.String, java.lang.String, java.lang.String, java.lang.String)

创建完成 TmNettyRemotingClient 实例后,调用init方法

public static void init(String applicationId, String transactionServiceGroup, String accessKey, String secretKey) {TmNettyRemotingClient tmNettyRemotingClient = TmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup, accessKey, secretKey);tmNettyRemotingClient.init();}
    @Overridepublic void init() {// registry processor//注册请求处理器registerProcessor();if (initialized.compareAndSet(false, true)) {//调用父类初始化方法 // 1. 定时重连// 2. 超时检测super.init();//如果事务分组不为空if (io.seata.common.util.StringUtils.isNotBlank(transactionServiceGroup)) {//通过channel管理器建立链接getClientChannelManager().reconnect(transactionServiceGroup);}}}

io.seata.core.rpc.netty.TmNettyRemotingClient#registerProcessor

   private void registerProcessor() {//根据不同的消息类型,使用不同的消息处理器//两个处理器 一种对消息进行处理//还有一种是处理心跳// 1.registry TC response processorClientOnResponseProcessor onResponseProcessor =new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());//注册就是将 消息处理器与线程池封装成一对pair,然后在进一步封装成map,map对key为消息类型,value为封装对pairsuper.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_REG_CLT_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_BATCH_RESULT_MSG, onResponseProcessor, null);// 2.registry heartbeat message processorClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);}
 @Overridepublic void registerProcessor(int requestCode, RemotingProcessor processor, ExecutorService executor) {Pair<RemotingProcessor, ExecutorService> pair = new Pair<>(processor, executor);this.processorTable.put(requestCode, pair);}

初始化 AbstractNettyRemotingClient

io.seata.core.rpc.netty.AbstractNettyRemotingClient#init

    @Overridepublic void init() {//周期线程池 第一次在60秒后通过连接管理器重新建立链接,之后每10秒重新建立一次链接timerExecutor.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {clientChannelManager.reconnect(getTransactionServiceGroup());}}, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);if (this.isEnableClientBatchSendRequest()) {mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,MAX_MERGE_SEND_THREAD,KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(),new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));mergeSendExecutorService.submit(new MergedSendRunnable());}//启动一个周期线程池,每3秒检查一次请求是否超时super.init();//启动netty客户端clientBootstrap.start();}

io.seata.core.rpc.netty.NettyClientBootstrap#start

启动过程中一共设置了4个消息处理器

  1. IdleStateHandler 处理心跳
  2. ProtocolV1Decoder 消息解码
  3. ProtocolV1Encoder 消息编码
  4. ClientHandler 消息处理
 @Overridepublic void start() {if (this.defaultEventExecutorGroup == null) {this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyClientConfig.getClientWorkerThreads(),new NamedThreadFactory(getThreadPrefix(nettyClientConfig.getClientWorkerThreadPrefix()),nettyClientConfig.getClientWorkerThreads()));}this.bootstrap.group(this.eventLoopGroupWorker).channel(nettyClientConfig.getClientChannelClazz()).option(ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()).option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize()).option(ChannelOption.SO_RCVBUF,nettyClientConfig.getClientSocketRcvBufSize());if (nettyClientConfig.enableNative()) {if (PlatformDependent.isOsx()) {if (LOGGER.isInfoEnabled()) {LOGGER.info("client run on macOS");}} else {bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED).option(EpollChannelOption.TCP_QUICKACK, true);}}bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(),nettyClientConfig.getChannelMaxWriteIdleSeconds(),nettyClientConfig.getChannelMaxAllIdleSeconds())).addLast(new ProtocolV1Decoder()).addLast(new ProtocolV1Encoder());if (channelHandlers != null) {addChannelPipelineLast(ch, channelHandlers);}}});if (initialized.compareAndSet(false, true) && LOGGER.isInfoEnabled()) {LOGGER.info("NettyClientBootstrap has started");}}

io.seata.core.rpc.netty.AbstractNettyRemotingClient.ClientHandler

  @Overridepublic void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {if (!(msg instanceof RpcMessage)) {return;}processMessage(ctx, (RpcMessage) msg);}

io.seata.core.rpc.netty.AbstractNettyRemoting#processMessage

protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {if (LOGGER.isDebugEnabled()) {LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));}Object body = rpcMessage.getBody();//顶层接口 MessageTypeAwareif (body instanceof MessageTypeAware) {MessageTypeAware messageTypeAware = (MessageTypeAware) body;//根据消息的类型获取不同的RemotingProcessor进行处理final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());if (pair != null) {//如果线程池不为空,使用线程池执行 前面封装pair时,线程池都是nullif (pair.getSecond() != null) {try {pair.getSecond().execute(() -> {try {pair.getFirst().process(ctx, rpcMessage);} catch (Throwable th) {LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);} finally {MDC.clear();}});} catch (RejectedExecutionException e) {LOGGER.error(FrameworkErrorCode.ThreadPoolFull.getErrCode(),"thread pool is full, current max pool size is " + messageExecutor.getActiveCount());if (allowDumpStack) {String name = ManagementFactory.getRuntimeMXBean().getName();String pid = name.split("@")[0];long idx = System.currentTimeMillis();try {String jstackFile = idx + ".log";LOGGER.info("jstack command will dump to " + jstackFile);Runtime.getRuntime().exec(String.format("jstack %s > %s", pid, jstackFile));} catch (IOException exx) {LOGGER.error(exx.getMessage());}allowDumpStack = false;}}} else {//如果消息处理器对应的线程池是空的,则直接处理try {pair.getFirst().process(ctx, rpcMessage);} catch (Throwable th) {LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);}}} else {LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());}} else {LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);}}

初始化 AbstractNettyRemoting

io.seata.core.rpc.netty.AbstractNettyRemoting#init

public void init() {//每3秒检查一次请求是否超时timerExecutor.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {for (Map.Entry<Integer, MessageFuture> entry : futures.entrySet()) {MessageFuture future = entry.getValue();if (future.isTimeout()) {futures.remove(entry.getKey());RpcMessage rpcMessage = future.getRequestMessage();future.setResultMessage(new TimeoutException(String.format("msgId: %s ,msgType: %s ,msg: %s ,request timeout", rpcMessage.getId(), String.valueOf(rpcMessage.getMessageType()), rpcMessage.getBody().toString())));if (LOGGER.isDebugEnabled()) {LOGGER.debug("timeout clear future: {}", entry.getValue().getRequestMessage().getBody());}}}nowMills = System.currentTimeMillis();}}, TIMEOUT_CHECK_INTERVAL, TIMEOUT_CHECK_INTERVAL, TimeUnit.MILLISECONDS);}

TM客户端与 TC 建立连接

io.seata.core.rpc.netty.NettyClientChannelManager#reconnect

void reconnect(String transactionServiceGroup) {List<String> availList = null;try {//根据事务分组名称找seata服务端地址列表 默认根据File配置映射关系查找//tx-service-group 事务分组名//vgroup-mapping.事务分组名=分组seata服务列表名//seata.service.grouplist.分组seata服务列表名=seata服务地址availList = getAvailServerList(transactionServiceGroup);} catch (Exception e) {LOGGER.error("Failed to get available servers: {}", e.getMessage(), e);return;}if (CollectionUtils.isEmpty(availList)) {RegistryService registryService = RegistryFactory.getInstance();String clusterName = registryService.getServiceGroup(transactionServiceGroup);//如果找不到任何seata server 服务配置列表,抛出异常if (StringUtils.isBlank(clusterName)) {LOGGER.error("can not get cluster name in registry config '{}{}', please make sure registry config correct",ConfigurationKeys.SERVICE_GROUP_MAPPING_PREFIX,transactionServiceGroup);return;}if (!(registryService instanceof FileRegistryServiceImpl)) {LOGGER.error("no available service found in cluster '{}', please make sure registry config correct and keep your seata server running", clusterName);}return;}Set<String> channelAddress = new HashSet<>(availList.size());try {for (String serverAddress : availList) {try {//与所有seata server建立长连接acquireChannel(serverAddress);channelAddress.add(serverAddress);} catch (Exception e) {LOGGER.error("{} can not connect to {} cause:{}", FrameworkErrorCode.NetConnect.getErrCode(),serverAddress, e.getMessage(), e);}}} finally {if (CollectionUtils.isNotEmpty(channelAddress)) {List<InetSocketAddress> aliveAddress = new ArrayList<>(channelAddress.size());for (String address : channelAddress) {String[] array = address.split(":");aliveAddress.add(new InetSocketAddress(array[0], Integer.parseInt(array[1])));}RegistryFactory.getInstance().refreshAliveLookup(transactionServiceGroup, aliveAddress);} else {RegistryFactory.getInstance().refreshAliveLookup(transactionServiceGroup, Collections.emptyList());}}}

3.初始化资源管理器 RM

io.seata.rm.RMClient#init

    public static void init(String applicationId, String transactionServiceGroup) {//获取RmNettyRemotingClient实例RmNettyRemotingClient rmNettyRemotingClient = RmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);//设置资源管理器rmNettyRemotingClient.setResourceManager(DefaultResourceManager.get());//设置资源事务管理器rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get());//初始化RmNettyRemotingClientrmNettyRemotingClient.init();}

实例化 RmNettyRemotingClient

io.seata.core.rpc.netty.RmNettyRemotingClient#getInstance(java.lang.String, java.lang.String)

  public static RmNettyRemotingClient getInstance(String applicationId, String transactionServiceGroup) {//获取实例,并创建消息处理线程池RmNettyRemotingClient rmNettyRemotingClient = getInstance();//设置应用idrmNettyRemotingClient.setApplicationId(applicationId);//设置事务分组名rmNettyRemotingClient.setTransactionServiceGroup(transactionServiceGroup);return rmNettyRemotingClient;}

io.seata.core.rpc.netty.RmNettyRemotingClient#getInstance()

public static RmNettyRemotingClient getInstance() {//双检锁创建实例 保证单例if (instance == null) {synchronized (RmNettyRemotingClient.class) {if (instance == null) {//netty客户端配置NettyClientConfig nettyClientConfig = new NettyClientConfig();//消息处理线程池final ThreadPoolExecutor messageExecutor = new ThreadPoolExecutor(nettyClientConfig.getClientWorkerThreads(), nettyClientConfig.getClientWorkerThreads(),KEEP_ALIVE_TIME, TimeUnit.SECONDS, new LinkedBlockingQueue<>(MAX_QUEUE_SIZE),new NamedThreadFactory(nettyClientConfig.getRmDispatchThreadPrefix(),nettyClientConfig.getClientWorkerThreads()), new ThreadPoolExecutor.CallerRunsPolicy());instance = new RmNettyRemotingClient(nettyClientConfig, null, messageExecutor);}}}return instance;}

RmNettyRemotingClient 及其父类的实例化过程都与TM是一致的,我们可以看下继承关系图

截屏2023-11-16 21.15.23

真正有区别的地方在于TM客户端初始化的过程与RM客户端初始化的过程

初始化 RmNettyRemotingClient

//获取RmNettyRemotingClient实例RmNettyRemotingClient rmNettyRemotingClient = RmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);//设置资源管理器rmNettyRemotingClient.setResourceManager(DefaultResourceManager.get());//设置资源事务管理器rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get());//初始化RmNettyRemotingClientrmNettyRemotingClient.init();

io.seata.core.rpc.netty.RmNettyRemotingClient#init

    public void init() {// 注册消息处理器registerProcessor();if (initialized.compareAndSet(false, true)) {super.init();// Found one or more resources that were registered before initialization// 与TC建立连接前 会先判断资源是否存在if (resourceManager != null&& !resourceManager.getManagedResources().isEmpty()&& StringUtils.isNotBlank(transactionServiceGroup)) {getClientChannelManager().reconnect(transactionServiceGroup);}}}

io.seata.core.rpc.netty.RmNettyRemotingClient#registerProcessor

private void registerProcessor() {//注册类五种不同的消息处理器// 1.registry rm client handle branch commit processor// 分支事务提交RmBranchCommitProcessor rmBranchCommitProcessor = new RmBranchCommitProcessor(getTransactionMessageHandler(), this);super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT, rmBranchCommitProcessor, messageExecutor);// 2.registry rm client handle branch rollback processor// 分支事务回滚RmBranchRollbackProcessor rmBranchRollbackProcessor = new RmBranchRollbackProcessor(getTransactionMessageHandler(), this);super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK, rmBranchRollbackProcessor, messageExecutor);// 3.registry rm handler undo log processor// 回滚日志处理RmUndoLogProcessor rmUndoLogProcessor = new RmUndoLogProcessor(getTransactionMessageHandler());super.registerProcessor(MessageType.TYPE_RM_DELETE_UNDOLOG, rmUndoLogProcessor, messageExecutor);// 4.registry TC response processor// TC响应处理ClientOnResponseProcessor onResponseProcessor =new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_REG_RM_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_BATCH_RESULT_MSG, onResponseProcessor, null);// 5.registry heartbeat message processor// 心跳信息处理ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);}

以上就是TM和RM实例化的过程,至于不同的消息处理器的实现我们放到后面去看

4. TM和RM初始化总结

截屏2023-11-16 21.51.27

两者其实过程是一致的,TM客户端对象TMClient主要是实例并初始化TmNettyRemotingClient,RM客户端对象RMClient主要是实例并初始化RmNettyRemotingClient。两者类继承关系如下所示

截屏2023-11-16 21.15.23

NettyRemotingClient 对象主要是在初始化方法中消息处理器并与TC服务端建立长连接,TM与RM注册的消息处理器是不同的,并且RM在与TC建立连接前会先判断数据库资源是否存在。TmNettyRemotingClient与RmNettyRemotingClient都将共同的方法放到抽象父类 AbstractNettyRemotingClient 中 。

父类 AbstractNettyRemotingClient 封装了原生的Netty信息,用于创建Netty客户端对象,并在初始化方法中启动一个周期线程去定期重新发起连接请求

AbstractNettyRemotingClient 的父类 AbstractNettyRemoting 主要是在执行初始化方法时启动 一个周期线程池,每隔3秒检测一次发送的消息集合中是否有消息超时,默认的超时时间为30秒

io.seata.common.DefaultValues#DEFAULT_RPC_RM_REQUEST_TIMEOUT

    long DEFAULT_RPC_RM_REQUEST_TIMEOUT = Duration.ofSeconds(30).toMillis();

我们可以通过设置 transport.rpcRmRequestTimeout (毫秒)去改变这个默认的值

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/144679.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

关于Chrome中F12调试Console输入多行

在chrome 浏览器中使用console调试的时&#xff0c;如果想在console中输入多行代码&#xff0c;需要进行换行。 这时我们可以使用 [ Shift Enter ] 。也叫&#xff1a; 软回车。

数据分析 - 离散概率分布的运用

期望公式 期望的方差 期望的标准差

操作系统 day10(调度的概念、层次、七状态模型)

调度的概念 调度的层次 作业调度&#xff08;高级调度&#xff09; 进程调度&#xff08;低级调度&#xff09; 内存调度&#xff08;中级调度&#xff09; 挂起态与七状态模型 三层调度的联系和对比

使用docker部署ELK日志框架-Elasticsearch

一、ELK知识了解 1-ELK组件 工作原理&#xff1a; &#xff08;1&#xff09;在所有需要收集日志的服务器上部署Logstash&#xff1b;或者先将日志进行集中化管理在日志服务器上&#xff0c;在日志服务器上部署 Logstash。 &#xff08;2&#xff09;Logstash 收集日志&#…

WPF程序给按钮增加不同状态的图片

首先我们在资源里添加几个图片&#xff0c;Up&#xff0c;Over和Down状态。 然后我们创建一个Style。默认我们的背景设置成Up 然后在Triggers里添加代码&#xff0c;当Property&#xff1a;IsMouseOver为True的时候更换成Over&#xff1b;当Property&#xff1a;IsPressed为Tr…

AR人脸道具SDK,打造极致用户体验

为了满足企业在AR领域的应用需求&#xff0c;美摄科技推出了一款领先的AR人脸道具SDK&#xff0c;旨在帮助企业快速、高效地开发出具有丰富玩法体验的AR应用&#xff0c;从而提升企业的竞争力和市场份额。 一、丰富的AR人脸道具&#xff0c;满足多样化需求 美摄科技AR人脸道具…

JAVA安全之Shrio550-721漏洞原理及复现

前言 关于shrio漏洞&#xff0c;网上有很多博文讲解&#xff0c;这些博文对漏洞的解释似乎有一套约定俗成的说辞&#xff0c;让人云里来云里去&#xff0c;都没有对漏洞产生的原因深入地去探究..... 本文从现象到本质&#xff0c;旨在解释清楚Shrio漏洞是怎么回事&#xff01…

Opencv!!在树莓派上安装Opencv!

一、更新树莓派系统 sudo apt-get update sudo apt-get upgrade二、安装python-opencv sudo apt-get install libopencv-dev sudo apt-get install python3-opencv三、查看是否安装成功 按以下命令顺序执行&#xff1a; python import cv2 cv2.__version__如果出现版本号&a…

x3daudio1_7.dll错误:解决方法和丢失原因及作用

x3daudio1_7.dll是Windows操作系统中的一个动态链接库&#xff08;DLL&#xff09;文件&#xff0c;主要作用是为DirectX音频提供支持。DirectX是微软推出的一套多媒体应用程序开发接口&#xff0c;广泛应用于游戏、多媒体制作等领域。x3daudio1_7.dll文件包含了许多与三维音频…

2.5 Windows驱动开发:DRIVER_OBJECT对象结构

在Windows内核中&#xff0c;每个设备驱动程序都需要一个DRIVER_OBJECT对象&#xff0c;该对象由系统创建并传递给驱动程序的DriverEntry函数。驱动程序使用此对象来注册与设备对象和其他系统对象的交互&#xff0c;并在操作系统需要与驱动程序进行交互时使用此对象。DRIVER_OB…

C语言ZZULIOJ1148:组合三位数之一

题目描述 把1、2、3、4、5、6、7、8、9组合成3个3位数&#xff0c;要求每个数字仅使用一次&#xff0c;使每个3位数均为完全平方数。按从小到大的顺序输出这三个三位数。 输入:无 输出:按从小到大的顺序输出这三个三位数&#xff0c;由空格隔开。输出占一行。 提示 若一个数能表…

FBI:皇家勒索软件要求350名受害者支付2.75亿美元

导语 最近&#xff0c;FBI和CISA联合发布的一份通告中透露&#xff0c;自2022年9月以来&#xff0c;皇家勒索软件&#xff08;Royal ransomware&#xff09;已经入侵了全球至少350家组织的网络。这次更新的通告还指出&#xff0c;这个勒索软件团伙的赎金要求已经超过了2.75亿美…

GDPU 商务英语 [初入职场](持续更新……)

&#x1f468;‍&#x1f3eb; 商务英语&#xff08;初入职场电子书PDF&#xff09;提取码&#xff1a;t9ri Unit 1 Job-seeking ✨ 单词 recruitment n. 招聘physical adj. 有形的;物质的profitability n. 盈利launch vt. 将(新产品等)投放市场budget n. 预算account for 占…

【入门篇】1.3 redis客户端之 jedis 高级使用示例

文章目录 0.前言1. 发布和订阅消息2. 事务操作3. 管道操作4. jedis 支持哨兵模式5. jedis 支持集群模式5. 参考链接 0.前言 Jedis是Redis的Java客户端&#xff0c;它支持所有的Redis原生命令&#xff0c;使用方便&#xff0c;且可以与Java项目无缝集成。 该库的最新版本支持Re…

场景图形管理-多视图与相机(3)

在OSG中多视图的管理是通过osgViewer::CompositeViewer类来实现的。该类负责多个视图的管理及同步工作&#xff0c;继承自osgViewer;:ViewerBase类&#xff0c;继承关系图如图8-13所示 图8-13 osgViewer::CompositeViewer 的继承关系图 在前面已经讲到&#xff0c;osgViewer:Vi…

Win通过WSL配置安装Redis

一共分为如下几步&#xff1a; 安装WSL发行版&#xff0c;如Ubuntu安装Redis配置Redis与WSL WSL安装 这里有微软官方的文档&#xff1a;https://learn.microsoft.com/zh-cn/windows/wsl/install 但我不建议零基础的这么做。很容易输完一些命令之后&#xff0c;把环境弄得乱七…

私域电商:实体商家想通过异业联盟引流,应该怎么做?

​异业联盟引流是一种有效的营销策略&#xff0c;通过与不同行业的企业或品牌合作&#xff0c;共同推广产品或服务&#xff0c;扩大品牌影响力和用户群体。以下是异业联盟引流的一些详细过程&#xff1a; ​选择合作联盟&#xff1a; 首先&#xff0c;需要选择与自己企业或品…

Mybatis-Plus最新教程

目录 原理&#xff1a;MybatisPlus通过扫描实体类&#xff0c;并基于反射获取实体类信息作为数据库信息。 ​编辑1.添加依赖 2.常用注解 3.常见配置&#xff1a; 4.条件构造器 5.QueryWrapper 6.UpdateWrapper 7.LambdaQueryWrapper:避免硬编码 8.自定义SQL 9.Iservic…

【kafka】windows安装启动

1.zookeeper的安装与启动 快速打开window powershell&#xff1a; windowx&#xff0c;选 2.kafka下载 —注意kafka和zookeeper需要版本匹配 安装路径 注意&#xff0c;kafka安装目录不能有空格。文件下载到&#xff1a; D:\Program_Files\kafka_2.12-3.6.0新建logs文件 修改c…

nginx服务器

nginx反向代理 nginx 反向代理的好处&#xff1a; 提高访问速度 因为nginx本身可以进行缓存&#xff0c;如果访问的同一接口&#xff0c;并且做了数据缓存&#xff0c; nginx就直接可把数据返回&#xff0c;不需要真正地访问服务端&#xff0c;从而提高访问速度。 进行负载均衡…