RocketMQ NettyRemotingServer、NettyRemotingClient 实例化、初始化、启动源码解析

在这里插入图片描述

🔭 嗨,您好 👋 我是 vnjohn,在互联网企业担任后端开发,CSDN 优质创作者
📖 推荐专栏:Spring、MySQL、Nacos、Java,后续其他专栏会持续优化更新迭代
🌲文章所在专栏:RocketMQ
🤔 我当前正在学习微服务领域、云原生领域、消息中间件等架构、原理知识
💬 向我询问任何您想要的东西,ID:vnjohn
🔥觉得博主文章写的还 OK,能够帮助到您的,感谢三连支持博客🙏
😄 代词: vnjohn
⚡ 有趣的事实:音乐、跑步、电影、游戏

目录

  • 前言
  • new
    • BrokerOuterAPI
    • MQClientInstance
    • NettyRemotingClient
  • initialize
    • NettyRemotingServer
  • start
    • NettyRemotingServer
    • NettyRemotingClient
  • 总结

前言

RocketMQ 专栏篇:

从零开始:手把手搭建 RocketMQ 单节点、集群节点实例

保护数据完整性:探索 RocketMQ 分布式事务消息的力量

RocketMQ 分布式事务消息实战指南:确保数据一致性的关键设计

RocketMQ 生产者源码分析:DefaultMQProducer、DefaultMQProducerImpl

RocketMQ MQClientInstance、生产者实例启动源码分析

RocketMQ 投递消息方式以及消息体结构分析:Message、MessageQueueSelector

RocketMQ DefaultMQProducer#send 方法源码解析:生产者投递消息(一)
RocketMQ DefaultMQProducer#send 方法源码解析:生产者投递消息(二)
RocketMQ 通信机制底层数据结构及源码解析

上篇文章【RocketMQ 通信机制底层数据结构及源码解析 】主要介绍了 RocketMQ 中底层的网络通信机制涉及到的数据结构以及线程模型通信,未做过多源码的介绍,这篇文章主要围绕着一块的源码解读.

new

在 Broker 服务端创建 BrokerController 时,会实例化 BrokerController,在里面会传递 NettyServerConfig、NettyClientConfig,如下:

final NettyServerConfig nettyServerConfig = new NettyServerConfig();
final NettyClientConfig nettyClientConfig = new NettyClientConfig();
nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
nettyServerConfig.setListenPort(10911);
nettyServerConfig.setUseEpollNativeSelector(true);
// .....
final BrokerController controller = new BrokerController(brokerConfig,nettyServerConfig,nettyClientConfig,messageStoreConfig);

Netty Server Boss 默认绑定的端口:10911

BrokerOuterAPI

在实例化 BrokerController 时,会先将 NettyRemotingClient 先创建好,它主要用来与其他 Broker 之间进行相互通信的,比如:当通过命令在某台 Broker 创建一个 Topic,会通过当前 Broker 组装好信息,发送给其他 Broker 进行 Topic 路由信息进行传递,以便于其他 Broker 都得知该 Topic 信息,进行消息的接收.

public BrokerController(final BrokerConfig brokerConfig,final NettyServerConfig nettyServerConfig,final NettyClientConfig nettyClientConfig,final MessageStoreConfig messageStoreConfig) {this.brokerConfig = brokerConfig;this.nettyServerConfig = nettyServerConfig;// ....this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
public BrokerOuterAPI(final NettyClientConfig nettyClientConfig, RPCHook rpcHook) {this.remotingClient = new NettyRemotingClient(nettyClientConfig);this.remotingClient.registerRPCHook(rpcHook);
}

MQClientInstance

在生产者、消费者启动时,通过 MQClientManager#getOrCreateMQClientInstance会创建 MQClientInstance 实例,会将 NettyClientConfig 绑定好

public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {this.nettyClientConfig = new NettyClientConfig();this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());// 客户端远程调用的处理器,接受来自 Broker 请求并做出响应this.clientRemotingProcessor = new ClientRemotingProcessor(this);// MQ 客户端 API 发起请求的类this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
}
public MQClientAPIImpl(final NettyClientConfig nettyClientConfig,final ClientRemotingProcessor clientRemotingProcessor,RPCHook rpcHook, final ClientConfig clientConfig) {this.clientConfig = clientConfig;// RocketMQ 网络模型的核心类this.remotingClient = new NettyRemotingClient(nettyClientConfig, null);this.clientRemotingProcessor = clientRemotingProcessor;this.remotingClient.registerRPCHook(rpcHook); 		this.remotingClient.registerProcessor(RequestCode.CHECK_TRANSACTION_STATE, this.clientRemotingProcessor, null);// 消费组数量发生变化,触发重平衡this.remotingClient.registerProcessor(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, this.clientRemotingProcessor, null);// 消费者客户端重置偏移量this.remotingClient.registerProcessor(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, this.clientRemotingProcessor, null);// 获取消费者状态this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT, this.clientRemotingProcessor, null);// 获取消费者运行的信息this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_RUNNING_INFO, this.clientRemotingProcessor, null);// 消费消息this.remotingClient.registerProcessor(RequestCode.CONSUME_MESSAGE_DIRECTLY, this.clientRemotingProcessor, null);// 回复消息this.remotingClient.registerProcessor(RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT, this.clientRemotingProcessor, null);
}

NettyRemotingClient

NettyRemotingClient 充当 RocketMQ 网络通信模型下的客户端,生产者、消费者、Broker 都持有对它的引用进行使用,它整体的实例化过程源码如下:

public NettyRemotingClient(final NettyClientConfig nettyClientConfig,final ChannelEventListener channelEventListener) {// 单向发送信号量数、异步发送信号量数 = 65535super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue());this.nettyClientConfig = nettyClientConfig;this.channelEventListener = channelEventListener;int publicThreadNums = nettyClientConfig.getClientCallbackExecutorThreads();if (publicThreadNums <= 0) {publicThreadNums = 4;}// 使用公共线程池处理来自客户端的各种 Processor,最低线程数为 4、最大线程数为 CPU 核数this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyClientPublicExecutor_" + this.threadIndex.incrementAndGet());}});this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));}});if (nettyClientConfig.isUseTLS()) {try {sslContext = TlsHelper.buildSslContext(true);log.info("SSL enabled for client");} catch (IOException e) {log.error("Failed to create SSLContext", e);} catch (CertificateException e) {log.error("Failed to create SSLContext", e);throw new RuntimeException("Failed to create SSLContext", e);}}
}

在其实例化时,提供了一个内部局部变量为 Bootstrap

 private final Bootstrap bootstrap = new Bootstrap();

initialize

在实例化 BrokerController 期间,只是会将 Netty 服务端,给设置好,不做任何处理

NettyRemotingServer

调用 BrokerController#initialize 初始化方法时,会实例化 NettyRemotingServer

public NettyRemotingServer(final NettyServerConfig nettyServerConfig,final ChannelEventListener channelEventListener) {super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());this.serverBootstrap = new ServerBootstrap();this.nettyServerConfig = nettyServerConfig;this.channelEventListener = channelEventListener;int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();if (publicThreadNums <= 0) {publicThreadNums = 4;}// Processor 公共处理的线程池,当未指定 Executor 时this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());}});// 默认都是创建 EpollEventLoopGroupif (useEpoll()) {this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));}});this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);private int threadTotal = nettyServerConfig.getServerSelectorThreads();@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));}});} else {this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet()));}});this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);private int threadTotal = nettyServerConfig.getServerSelectorThreads();@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));}});}loadSslContext();
}
  1. 创建 Semaphore Oneway 信号量:256,Semaphore Async 信号量:64
  2. 创建 Processor 公共处理的线程池,当 Processor 未指定 Executor 时,分配给这个 Executor 进行处理,公共的业务线程池
  3. 创建 1 个线程数的 EpollEventLoopGroup,Reactor 主线程
  4. 创建 3 个线程数的 EpollEventLoopGroup,Reactor 线程池

通过 useEpoll 方法来判别 EpollEventLoopGroup 还是 NioEventLoopGroup

private boolean useEpoll() {// OS 类型:Windows、Linuxreturn RemotingUtil.isLinuxPlatform()// 通过 NettyServerConfig.setUseEpollNativeSelector 方法设置是否开启 Epoll Selector 模型&& nettyServerConfig.isUseEpollNativeSelector()&& Epoll.isAvailable();
}

在实例化 BrokerController 时已经设置 useEpollNativeSelector 变量为 true.

start

NettyRemotingServer

通过 BrokerController#start 方法再调用 NettyRemotingServer#start 方法启动 Netty Server 服务端

public void start() {this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyServerConfig.getServerWorkerThreads(),new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());}});prepareSharableHandlers();/*1. SslHandler:SSL安全套接字协议2. ⬇3. FileRegionEncoder:文件区域采用 Zero-Copy SendFile 编码传输4. ⬇5. NettyEncoder:编码器6. ⬇7. NettyDecoder:解码器8. ⬇9. IdleStateHandler:空闲检查10. ⬇11. NettyConnectManageHandler:网络连接管理12. ⬇13. NettyServerHandler:服务端请求处理器*/ServerBootstrap childHandler =this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector).channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)// 在 TCP 协议中,当服务器端接收到客户端的连接请求时,会创建一个连接队列来存储这些请求,然后依次处理// ChannelOption.SO_BACKLOG 参数就是用来设置这个连接队列的大小.option(ChannelOption.SO_BACKLOG, nettyServerConfig.getServerSocketBacklog())// 默认情况下,TCP 连接在 TIME_WAIT 状态时,不能立即被重用,必须等待一段时间才能重用// 通过给套接字配置可重用属性,告诉操作系统内核,这样的 TCP 连接可以复用 TIME_WAIT 状态的连接.option(ChannelOption.SO_REUSEADDR, true)// 用于开启或者关闭保活探测,默认情况下是关闭的// 当 SO_KEEPALIVE 开启时,可以保持连接检测对方主机是否崩溃,避免(服务器)永远阻塞于 TCP 连接的输入.option(ChannelOption.SO_KEEPALIVE, false)// TCP_NODELAY 是禁用Nagle算法,即数据包立即发送出去// 如果要求高实时性,有数据发送时就马上发送,就将该选项设置为 true 关闭 Nagle 算法// 如果要减少发送次数减少网络交互,就设置为 false 等累积一定大小后再发送。默认为 false.childOption(ChannelOption.TCP_NODELAY, true)// 绑定本地端口 Broker:10911、NameSrv:9876.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler).addLast(defaultEventExecutorGroup,encoder,new NettyDecoder(),new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),connectionManageHandler,serverHandler);}});// 设置发送缓冲区大小if (nettyServerConfig.getServerSocketSndBufSize() > 0) {log.info("server set SO_SNDBUF to {}", nettyServerConfig.getServerSocketSndBufSize());childHandler.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize());}// 设置接收缓冲区大小if (nettyServerConfig.getServerSocketRcvBufSize() > 0) {log.info("server set SO_RCVBUF to {}", nettyServerConfig.getServerSocketRcvBufSize());childHandler.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize());}// 设置写缓冲区大小if (nettyServerConfig.getWriteBufferLowWaterMark() > 0 && nettyServerConfig.getWriteBufferHighWaterMark() > 0) {log.info("server set netty WRITE_BUFFER_WATER_MARK to {},{}",nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark());childHandler.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark()));}// 设置是否开启池化 ByteBufAllocator,采用默认的 PooledByteBufAllocatorif (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);}try {ChannelFuture sync = this.serverBootstrap.bind().sync();InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();this.port = addr.getPort();} catch (InterruptedException e1) {throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);}if (this.channelEventListener != null) {this.nettyEventExecutor.start();}this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {NettyRemotingServer.this.scanResponseTable();} catch (Throwable e) {log.error("scanResponseTable exception", e);}}}, 1000 * 3, 1000);
}

启动 NettyRemotingServer 流程如下:

  1. 创建 DefaultEventExecutorGroup Worker 线程池,默认线程数量:8,线程名 prefix:NettyServerCodecThread_
  2. 通过 ServerBootstrap 指定好分组:Reactor 主线程、Reactor 线程池
  3. 创建 EpollServerSocketChannel,ServerSocketChannel 实现类
  4. 设置服务端参数,如下表
  5. 调用 io.netty.bootstrap.AbstractBootstrap#bind 方法创建一个 EpollServerSocketChannel,并且绑定好地址、端口
  6. 启动 NettyEventExecutor,它是一个单独的线程,用来接收来自 Netty 客户端空闲、关闭、连接、异常事件并进行监听回调处理.
  7. 创建一个 Timer 定时器,每隔 3 秒扫描哪些超时等待的客户端请求,并对它们进行处理,响应超时等待回调请求返回给客户端
参数名参数值参数描述
ChannelOption.SO_BACKLOG1024当服务器端接收到客户端的连接请求时,会创建一个连接队列来存储这些请求
ChannelOption.SO_REUSEADDRtrue通过给套接字配置可重用属性,告诉操作系统内核,这样的 TCP 连接可以复用 TIME_WAIT 状态的连接
ChannelOption.SO_KEEPALIVEfalse用于开启或者关闭保活探测,默认情况下是关闭的
ChannelOption.TCP_NODELAYtrue如果要求高实时性,有数据发送时就马上发送,就将该选项设置为 true 关闭 Nagle 算法
如果要减少发送次数减少网络交互,就设置为 false 等累积一定大小后再发送,默认为 false
ChannelOption.SO_SNDBUF0设置发送缓冲区大小
ChannelOption.SO_RCVBUF0设置接收缓冲区大小
ChannelOption.WRITE_BUFFER_WATER_MARK0设置写缓冲区大小
ChannelOption.ALLOCATORPooledByteBufAllocator.DEFAULT优先分配直接内存

Broker 服务端会在初始化阶段,通过调用 BrokerController#registerProcessor 方法注册,请求 -> Processor 处理器之间的映射关系,将其写入到 NettyRemotingAbstract#processorTable 集合中,当接收来自客户端请求时,代表输入由 Netty 最后一个处理器:NettyRemotingServer.NettyServerHandler 接收处理,执行其内部的 channelRead0 方法处理消息收到的请求,根据请求体 RequestCommand 携带的 code,从 processorTable 集合中找到 Pair 组合「Processor,Executor」等待 Broker 处理完成之后,再执行客户端的回调方法,返回给客户端具体的请求结果.

NettyRemotingClient

在执行 BrokerController#start 时,同时会将 BrokerOuterAPI 启动,也就是启动 NettyRemotingClient

在执行 DefaultMQProducer#start、DefaultMQPushConsumerImpl#start 方法时,同时会将 MQClientAPIImpl 也启动,也就是启动 NettyRemotingClient

所以,从 Broker、生产者、消费者角度作为客户端,它们使用的都是同一个类 NettyRemotingClient 逻辑作为 Netty 客户端使用,以下是其启动时具体的源码:

public void start() {this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(// 默认线程数为 4nettyClientConfig.getClientWorkerThreads(),new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());}});// 后续会发起请求时会通过 eventLoopGroupWorker 去建立 Socket 连接与服务端之间进行读、写交互,NioSocketChannel 代表的就是非阻塞的 SocketChannelBootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)// 数据包组装为更大的帧然后进行发送.option(ChannelOption.TCP_NODELAY, true)// 定时发送探测包来探测连接的对端是否存活.option(ChannelOption.SO_KEEPALIVE, false).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()).handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();if (nettyClientConfig.isUseTLS()) {if (null != sslContext) {pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));log.info("Prepend SSL handler");} else {log.warn("Connections are insecure as SSLContext is null!");}}// DefaultEventExecutorGroup 用来执行以下五个 ChannelHandlerpipeline.addLast(defaultEventExecutorGroup,// 编码 -> 处理请求new NettyEncoder(),// 解码 -> 处理响应new NettyDecoder(),new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),new NettyConnectManageHandler(),// 远程调用->请求、响应处理器new NettyClientHandler());}});// 操作系统客户端发送缓冲区的大小if (nettyClientConfig.getClientSocketSndBufSize() > 0) {log.info("client set SO_SNDBUF to {}", nettyClientConfig.getClientSocketSndBufSize());handler.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize());}// 操作系统客户端接收缓冲区的大小if (nettyClientConfig.getClientSocketRcvBufSize() > 0) {log.info("client set SO_RCVBUF to {}", nettyClientConfig.getClientSocketRcvBufSize());handler.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize());}if (nettyClientConfig.getWriteBufferLowWaterMark() > 0 && nettyClientConfig.getWriteBufferHighWaterMark() > 0) {log.info("client set netty WRITE_BUFFER_WATER_MARK to {},{}",nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark());handler.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark()));}// Timer 定时执行哪些请求过期的事件,每隔 3 秒this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {NettyRemotingClient.this.scanResponseTable();} catch (Throwable e) {log.error("scanResponseTable exception", e);}}}, 1000 * 3, 1000);// 生产者、消费者客户端一般为空,在 nameserver 与 Broker 交互时会使用到,做一些连接、关闭、异常、死亡状态的回调处理if (this.channelEventListener != null) {this.nettyEventExecutor.start();}
}

启动 NettyRemotingClient 流程如下:

  1. 创建 DefaultEventExecutorGroup Worker 线程池,用于向客户端发起写事件、接收读事件的处理
  2. 通过 group 绑定 Worker 主线程,创建 NioSocketChannel 非阻塞 SocketChannel
  3. 设置相关的客户端参数,如下表
  4. 设置客户端请求、响应时要执行的处理器逻辑,主要是:编码-NettyEncoder、解码-NettyDecoder、请求_响应处理器-NettyClientHandler
  5. 创建一个 Timer 定时器,每隔 3 秒扫描哪些超时等待的请求,并对它们进行处理,响应超时等待回调请求返回给业务调用方
参数名参数值参数描述
ChannelOption.TCP_NODELAYtrue如果要求高实时性,有数据发送时就马上发送,就将该选项设置为 true 关闭 Nagle 算法
如果要减少发送次数减少网络交互,就设置为 false 等累积一定大小后再发送,默认为 false
ChannelOption.SO_KEEPALIVEfalse用于开启或者关闭保活探测,默认情况下是关闭的
CONNECT_TIMEOUT_MILLIS3000连接超时时长 3 秒,在规定时间内未处理完成返回 Timeout 异常
ChannelOption.SO_SNDBUF0客户端发送缓冲区的大小
ChannelOption.SO_RCVBUF0客户端接收缓冲区的大小
ChannelOption.WRITE_BUFFER_WATER_MARK0设置写缓冲区大小

在作为客户端角度,只有当每次发起投递消息、消费消息请求时,才会创建与服务端之间的 Channel 通道,核心方法 NettyRemotingClient#createChannel 内部调用 Bootstrap#connect(java.net.SocketAddress) 建立与服务端之间的连接,然后再发起请求,请求的内容以及协议已经在本节专栏的上一篇博文讲到过了.

总结

该篇文章主要介绍在 RocketMQ remoting 底层通信模块中的 NettyRemotingServer、NettyRemotingClient 实例化、初始化、启动时源码的分析,在 BrokerController 实例化会优先构建好 Netty 客户端实例,在其初始化阶段会构建好 Netty 服务端实例,而在生产者、消费者侧,是在实例化 MQClientInstance 实例时会将 Netty 客户端实例也构建好,同时在 Broker、生产者、消费者启动时,会将对应的 Netty 服务端、客户端都一并启动,比编写文章不易,希望对您有帮助,能够喜欢~

博文放在 RocketMQ 专栏里,欢迎订阅,会持续更新!

如果觉得博文不错,关注我 vnjohn,后续会有更多实战、源码、架构干货分享!

推荐专栏:Spring、MySQL,订阅一波不再迷路

大家的「关注❤️ + 点赞👍 + 收藏⭐」就是我创作的最大动力!谢谢大家的支持,我们下文见!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/41628.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数学系C++ 类与对象 STL(九)

目录 目录 面向对象&#xff1a;py&#xff0c;c艹&#xff0c;Java都是,但c是面向过程 特征&#xff1a; 对象 内敛成员函数【是啥】&#xff1a; 构造函数和析构函数 构造函数 复制构造函数/拷贝构造函数&#xff1a; 【……】 实参与形参的传递方式&#xff1a;值…

【LeetCode】螺旋矩阵

目录 一、题目二、解法完整代码 一、题目 给你一个 m 行 n 列的矩阵 matrix &#xff0c;请按照 顺时针螺旋顺序 &#xff0c;返回矩阵中的所有元素。 示例 1&#xff1a; 输入&#xff1a;matrix [[1,2,3],[4,5,6],[7,8,9]] 输出&#xff1a;[1,2,3,6,9,8,7,4,5] 示例 2&…

go-redis 封装事件-client封装模型、批量数据处理的导出器设计

一、redis-go的封装实践-client模型 // Copyright 2020 Lingfei Kong <colin404foxmail.com>. All rights reserved. // Use of this source code is governed by a MIT style // license that can be found in the LICENSE file.package storageimport ("context&q…

Halcon 铣刀刀口破损缺陷检测

一 OTSU OTSU&#xff0c;是一种自适应阈值确定的方法,又叫大津法&#xff0c;简称OTSU&#xff0c;是一种基于全局的二值化算法,它是根据图像的灰度特性,将图像分为前景和背景两个部分。当取最佳阈值时&#xff0c;两部分之间的差别应该是最大的&#xff0c;在OTSU算法中所采…

排序 -- 万能测试oj

. - 力扣&#xff08;LeetCode&#xff09; 这道题我们可以使用我们学过的那些常见的排序方法来进行解答 //插入排序 void InsertSort(int* nums, int n) {for (int i 0; i < n-1; i){int end i;int tmp nums[end 1];while (end > 0){if (tmp < nums[end]){nums[…

PyVideoTrans:一款功能全面的视频翻译配音工具!【送源码】

PyVideoTrans是一款功能全面的视频翻译配音工具&#xff0c;专为视频内容创作者设计。它能够将视频中的语言翻译成另一种语言&#xff0c;并自动生成与之匹配的字幕和配音。支持多种语言&#xff0c;包括但不限于中文&#xff08;简繁体&#xff09;、英语、韩语、日语、俄语、…

Wormhole Filters: Caching Your Hash on Persistent Memory——泛读笔记

EuroSys 2024 Paper 论文阅读笔记整理 问题 近似成员关系查询&#xff08;AMQ&#xff09;数据结构可以高效地近似确定元素是否在集合中&#xff0c;例如Bloom滤波器[10]、cuckoo滤波器[23]、quotient滤波器[8]及其变体。但AMQ数据结构的内存消耗随着数据规模的增长而快速增长…

MSPM0G3507——串口0从数据线传输变为IO口传输

默认的跳线帽时这样的&#xff0c;这样时是数据线传输 需要改成这样&#xff0c;即可用IO口进行数据传输

windows系统本地端口被占用的问题

第一步&#xff1a;查找所有运行的端口 按住“WindowsR”组合键&#xff0c;打开命令窗口&#xff0c;输入【cmd】命令&#xff0c;回车。在弹出的窗口中输入 命令【netstat -ano】&#xff0c;再按一下回车键 Win系统端口被占用-查找所有运行的端口 第二步&#xff1a;查看…

opencv_C++学习笔记(入门30讲)

文章目录 1.配置开发环境2.图像读取与显示3.图像色彩空间转换4.图像对象的创建与赋值5.图像像素的读写操作6.图像像素的算数操作7.滚动条-调整图像亮度8.滚动条-调整对比度和亮度9.键盘响应操作10.图像像素的逻辑操作11.图像的通道分离和合并12.图像色彩空间转换13.图像的像素值…

阿里云存储的降本增效与运维

小浩负责公司存储架构层&#xff0c;需要确保存储层不会成为公司业务系统的性能瓶颈&#xff0c;让数据读写达到最佳性能。那么小浩可以从哪些方面着手优化性能呢&#xff1f;他继续求助系统架构师大雷。 小浩&#xff1a;雷哥&#xff0c;PD反馈公司系统最近响应很慢&#xff…

HTTP模块(一)

HTTP服务 本小节主要讲解HTTP服务如何创建服务&#xff0c;查看HTTP请求&响应报文&#xff0c;还有注意事项说明&#xff0c;另外讲解本地环境&Node环境&浏览器之间的链路图示&#xff0c;如何提取HTTP报文字符串&#xff0c;及报错信息查询。 创建HTTP服务端 c…

lspci

【原】Linux之PCIE三种空间解析 PCIe学习笔记——2.PCIe配置空间 PCIE学习&#xff08;2&#xff09;PCIE配置空间详解 开发者分享 | 使用 lspci 和 setpci 调试 PCIe 问题 b : 字节 w&#xff1a;word L&#xff1a; 4byte

LLM - 词表示和语言模型

一. 词的相似度表示 (1): 用一系列与该词相关的词来表示 (2): 把每个词表示一个独立的符号(one hot) (3): 利用该词上下文的词来表示该词 (3): 建立一个低维度的向量空间&#xff0c;用深度学习方法将该词映射到这个空间里(Word Embedding) 二&#xff1a;语言模型 (1): 根…

Redis源码整体结构

一 前言 Redis源码研究为什么先介绍整体结构呢?其实也很简单,作为程序员的,要想对一个项目有快速的认知,对项目整体目录结构有一个清晰认识,有助于我们更好的了解这个系统。 二 目录结构 Redis源码download到本地之后,对应结构如下: 从上面的截图可以看出,Redis源码一…

52-5 内网代理2 - LCX端口转发(不推荐使用LCX)

环境搭建: 本地开3台虚拟机:kali(必须)、windows2012与2008 (可换成其他windows虚拟机) kali - 网络配置成桥接模式 windows2012 - 设置两个网卡,NAT与桥接模式 注意:windows2012要关闭防火墙,要不然其他主机ping不通 关闭防火墙后再开启远程桌面连接 windwos20…

去O化神器 Exbase

随着去O化进程推动&#xff0c;很多旧业务依赖的oracle数据库&#xff0c;都需要实现做数据库的替换&#xff0c;当下能很好兼容Oracle&#xff0c;并实现异构数据库之间转换的工具并不多。这里给大家推荐一个商业工具数据库迁移工具exbase&#xff08;北京海量&#xff09;&am…

昇思MindSpore 25天学习打卡营|day18

DCGAN生成漫画头像 在下面的教程中&#xff0c;我们将通过示例代码说明DCGAN网络如何设置网络、优化器、如何计算损失函数以及如何初始化模型权重。在本教程中&#xff0c;使用的动漫头像数据集共有70,171张动漫头像图片&#xff0c;图片大小均为96*96。 GAN基础原理 这部分原…

想知道你的电脑能不能和如何升级RAM吗?这里有你想要的一些提示

考虑给你的电脑增加更多的RAM,但不确定从哪里开始?本指南涵盖了有关升级Windows PC或笔记本电脑中RAM的所有信息。 你需要升级RAM吗 在深入研究升级RAM的过程之前,评估是否需要升级是至关重要的。你是否经历过系统滞后、频繁的BSOD错误或应用程序和程序突然崩溃?这些症状…

从零开始的python学习生活

pycharm部分好用快捷键 变量名的定义 与之前学习过的语言有所不同的是&#xff0c;python中变量名的定义更加的简洁 such as 整形。浮点型和字符串的定义 money50 haha13.14 gaga"hello"字符串的定义依然是需要加上引号&#xff0c;也不需要写&#xff1b;了 字符…