🔭 嗨,您好 👋 我是 vnjohn,在互联网企业担任后端开发,CSDN 优质创作者
📖 推荐专栏:Spring、MySQL、Nacos、Java,后续其他专栏会持续优化更新迭代
🌲文章所在专栏:RocketMQ
🤔 我当前正在学习微服务领域、云原生领域、消息中间件等架构、原理知识
💬 向我询问任何您想要的东西,ID:vnjohn
🔥觉得博主文章写的还 OK,能够帮助到您的,感谢三连支持博客🙏
😄 代词: vnjohn
⚡ 有趣的事实:音乐、跑步、电影、游戏
目录
- 前言
- new
- BrokerOuterAPI
- MQClientInstance
- NettyRemotingClient
- initialize
- NettyRemotingServer
- start
- NettyRemotingServer
- NettyRemotingClient
- 总结
前言
RocketMQ 专栏篇:
从零开始:手把手搭建 RocketMQ 单节点、集群节点实例
保护数据完整性:探索 RocketMQ 分布式事务消息的力量
RocketMQ 分布式事务消息实战指南:确保数据一致性的关键设计
RocketMQ 生产者源码分析:DefaultMQProducer、DefaultMQProducerImpl
RocketMQ MQClientInstance、生产者实例启动源码分析
RocketMQ 投递消息方式以及消息体结构分析:Message、MessageQueueSelector
RocketMQ DefaultMQProducer#send 方法源码解析:生产者投递消息(一)
RocketMQ DefaultMQProducer#send 方法源码解析:生产者投递消息(二)
RocketMQ 通信机制底层数据结构及源码解析
上篇文章【RocketMQ 通信机制底层数据结构及源码解析 】主要介绍了 RocketMQ 中底层的网络通信机制涉及到的数据结构以及线程模型通信,未做过多源码的介绍,这篇文章主要围绕着一块的源码解读.
new
在 Broker 服务端创建 BrokerController 时,会实例化 BrokerController,在里面会传递 NettyServerConfig、NettyClientConfig,如下:
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
final NettyClientConfig nettyClientConfig = new NettyClientConfig();
nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
nettyServerConfig.setListenPort(10911);
nettyServerConfig.setUseEpollNativeSelector(true);
// .....
final BrokerController controller = new BrokerController(brokerConfig,nettyServerConfig,nettyClientConfig,messageStoreConfig);
Netty Server Boss 默认绑定的端口:10911
BrokerOuterAPI
在实例化 BrokerController 时,会先将 NettyRemotingClient 先创建好,它主要用来与其他 Broker 之间进行相互通信的,比如:当通过命令在某台 Broker 创建一个 Topic,会通过当前 Broker 组装好信息,发送给其他 Broker 进行 Topic 路由信息进行传递,以便于其他 Broker 都得知该 Topic 信息,进行消息的接收.
public BrokerController(final BrokerConfig brokerConfig,final NettyServerConfig nettyServerConfig,final NettyClientConfig nettyClientConfig,final MessageStoreConfig messageStoreConfig) {this.brokerConfig = brokerConfig;this.nettyServerConfig = nettyServerConfig;// ....this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
public BrokerOuterAPI(final NettyClientConfig nettyClientConfig, RPCHook rpcHook) {this.remotingClient = new NettyRemotingClient(nettyClientConfig);this.remotingClient.registerRPCHook(rpcHook);
}
MQClientInstance
在生产者、消费者启动时,通过 MQClientManager#getOrCreateMQClientInstance会创建 MQClientInstance 实例,会将 NettyClientConfig 绑定好
public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {this.nettyClientConfig = new NettyClientConfig();this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());// 客户端远程调用的处理器,接受来自 Broker 请求并做出响应this.clientRemotingProcessor = new ClientRemotingProcessor(this);// MQ 客户端 API 发起请求的类this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
}
public MQClientAPIImpl(final NettyClientConfig nettyClientConfig,final ClientRemotingProcessor clientRemotingProcessor,RPCHook rpcHook, final ClientConfig clientConfig) {this.clientConfig = clientConfig;// RocketMQ 网络模型的核心类this.remotingClient = new NettyRemotingClient(nettyClientConfig, null);this.clientRemotingProcessor = clientRemotingProcessor;this.remotingClient.registerRPCHook(rpcHook); this.remotingClient.registerProcessor(RequestCode.CHECK_TRANSACTION_STATE, this.clientRemotingProcessor, null);// 消费组数量发生变化,触发重平衡this.remotingClient.registerProcessor(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, this.clientRemotingProcessor, null);// 消费者客户端重置偏移量this.remotingClient.registerProcessor(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, this.clientRemotingProcessor, null);// 获取消费者状态this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT, this.clientRemotingProcessor, null);// 获取消费者运行的信息this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_RUNNING_INFO, this.clientRemotingProcessor, null);// 消费消息this.remotingClient.registerProcessor(RequestCode.CONSUME_MESSAGE_DIRECTLY, this.clientRemotingProcessor, null);// 回复消息this.remotingClient.registerProcessor(RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT, this.clientRemotingProcessor, null);
}
NettyRemotingClient
NettyRemotingClient 充当 RocketMQ 网络通信模型下的客户端,生产者、消费者、Broker 都持有对它的引用进行使用,它整体的实例化过程源码如下:
public NettyRemotingClient(final NettyClientConfig nettyClientConfig,final ChannelEventListener channelEventListener) {// 单向发送信号量数、异步发送信号量数 = 65535super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue());this.nettyClientConfig = nettyClientConfig;this.channelEventListener = channelEventListener;int publicThreadNums = nettyClientConfig.getClientCallbackExecutorThreads();if (publicThreadNums <= 0) {publicThreadNums = 4;}// 使用公共线程池处理来自客户端的各种 Processor,最低线程数为 4、最大线程数为 CPU 核数this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyClientPublicExecutor_" + this.threadIndex.incrementAndGet());}});this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));}});if (nettyClientConfig.isUseTLS()) {try {sslContext = TlsHelper.buildSslContext(true);log.info("SSL enabled for client");} catch (IOException e) {log.error("Failed to create SSLContext", e);} catch (CertificateException e) {log.error("Failed to create SSLContext", e);throw new RuntimeException("Failed to create SSLContext", e);}}
}
在其实例化时,提供了一个内部局部变量为 Bootstrap
private final Bootstrap bootstrap = new Bootstrap();
initialize
在实例化 BrokerController 期间,只是会将 Netty 服务端,给设置好,不做任何处理
NettyRemotingServer
调用 BrokerController#initialize 初始化方法时,会实例化 NettyRemotingServer
public NettyRemotingServer(final NettyServerConfig nettyServerConfig,final ChannelEventListener channelEventListener) {super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());this.serverBootstrap = new ServerBootstrap();this.nettyServerConfig = nettyServerConfig;this.channelEventListener = channelEventListener;int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();if (publicThreadNums <= 0) {publicThreadNums = 4;}// Processor 公共处理的线程池,当未指定 Executor 时this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());}});// 默认都是创建 EpollEventLoopGroupif (useEpoll()) {this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));}});this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);private int threadTotal = nettyServerConfig.getServerSelectorThreads();@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));}});} else {this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet()));}});this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);private int threadTotal = nettyServerConfig.getServerSelectorThreads();@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));}});}loadSslContext();
}
- 创建 Semaphore Oneway 信号量:256,Semaphore Async 信号量:64
- 创建 Processor 公共处理的线程池,当 Processor 未指定 Executor 时,分配给这个 Executor 进行处理,公共的业务线程池
- 创建 1 个线程数的 EpollEventLoopGroup,Reactor 主线程
- 创建 3 个线程数的 EpollEventLoopGroup,Reactor 线程池
通过 useEpoll 方法来判别 EpollEventLoopGroup 还是 NioEventLoopGroup
private boolean useEpoll() {// OS 类型:Windows、Linuxreturn RemotingUtil.isLinuxPlatform()// 通过 NettyServerConfig.setUseEpollNativeSelector 方法设置是否开启 Epoll Selector 模型&& nettyServerConfig.isUseEpollNativeSelector()&& Epoll.isAvailable();
}
在实例化 BrokerController 时已经设置 useEpollNativeSelector 变量为 true.
start
NettyRemotingServer
通过 BrokerController#start 方法再调用 NettyRemotingServer#start 方法启动 Netty Server 服务端
public void start() {this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyServerConfig.getServerWorkerThreads(),new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());}});prepareSharableHandlers();/*1. SslHandler:SSL安全套接字协议2. ⬇3. FileRegionEncoder:文件区域采用 Zero-Copy SendFile 编码传输4. ⬇5. NettyEncoder:编码器6. ⬇7. NettyDecoder:解码器8. ⬇9. IdleStateHandler:空闲检查10. ⬇11. NettyConnectManageHandler:网络连接管理12. ⬇13. NettyServerHandler:服务端请求处理器*/ServerBootstrap childHandler =this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector).channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)// 在 TCP 协议中,当服务器端接收到客户端的连接请求时,会创建一个连接队列来存储这些请求,然后依次处理// ChannelOption.SO_BACKLOG 参数就是用来设置这个连接队列的大小.option(ChannelOption.SO_BACKLOG, nettyServerConfig.getServerSocketBacklog())// 默认情况下,TCP 连接在 TIME_WAIT 状态时,不能立即被重用,必须等待一段时间才能重用// 通过给套接字配置可重用属性,告诉操作系统内核,这样的 TCP 连接可以复用 TIME_WAIT 状态的连接.option(ChannelOption.SO_REUSEADDR, true)// 用于开启或者关闭保活探测,默认情况下是关闭的// 当 SO_KEEPALIVE 开启时,可以保持连接检测对方主机是否崩溃,避免(服务器)永远阻塞于 TCP 连接的输入.option(ChannelOption.SO_KEEPALIVE, false)// TCP_NODELAY 是禁用Nagle算法,即数据包立即发送出去// 如果要求高实时性,有数据发送时就马上发送,就将该选项设置为 true 关闭 Nagle 算法// 如果要减少发送次数减少网络交互,就设置为 false 等累积一定大小后再发送。默认为 false.childOption(ChannelOption.TCP_NODELAY, true)// 绑定本地端口 Broker:10911、NameSrv:9876.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler).addLast(defaultEventExecutorGroup,encoder,new NettyDecoder(),new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),connectionManageHandler,serverHandler);}});// 设置发送缓冲区大小if (nettyServerConfig.getServerSocketSndBufSize() > 0) {log.info("server set SO_SNDBUF to {}", nettyServerConfig.getServerSocketSndBufSize());childHandler.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize());}// 设置接收缓冲区大小if (nettyServerConfig.getServerSocketRcvBufSize() > 0) {log.info("server set SO_RCVBUF to {}", nettyServerConfig.getServerSocketRcvBufSize());childHandler.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize());}// 设置写缓冲区大小if (nettyServerConfig.getWriteBufferLowWaterMark() > 0 && nettyServerConfig.getWriteBufferHighWaterMark() > 0) {log.info("server set netty WRITE_BUFFER_WATER_MARK to {},{}",nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark());childHandler.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark()));}// 设置是否开启池化 ByteBufAllocator,采用默认的 PooledByteBufAllocatorif (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);}try {ChannelFuture sync = this.serverBootstrap.bind().sync();InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();this.port = addr.getPort();} catch (InterruptedException e1) {throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);}if (this.channelEventListener != null) {this.nettyEventExecutor.start();}this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {NettyRemotingServer.this.scanResponseTable();} catch (Throwable e) {log.error("scanResponseTable exception", e);}}}, 1000 * 3, 1000);
}
启动 NettyRemotingServer 流程如下:
- 创建 DefaultEventExecutorGroup Worker 线程池,默认线程数量:8,线程名 prefix:NettyServerCodecThread_
- 通过 ServerBootstrap 指定好分组:Reactor 主线程、Reactor 线程池
- 创建 EpollServerSocketChannel,ServerSocketChannel 实现类
- 设置服务端参数,如下表
- 调用 io.netty.bootstrap.AbstractBootstrap#bind 方法创建一个 EpollServerSocketChannel,并且绑定好地址、端口
- 启动 NettyEventExecutor,它是一个单独的线程,用来接收来自 Netty 客户端空闲、关闭、连接、异常事件并进行监听回调处理.
- 创建一个 Timer 定时器,每隔 3 秒扫描哪些超时等待的客户端请求,并对它们进行处理,响应超时等待回调请求返回给客户端
参数名 | 参数值 | 参数描述 |
---|---|---|
ChannelOption.SO_BACKLOG | 1024 | 当服务器端接收到客户端的连接请求时,会创建一个连接队列来存储这些请求 |
ChannelOption.SO_REUSEADDR | true | 通过给套接字配置可重用属性,告诉操作系统内核,这样的 TCP 连接可以复用 TIME_WAIT 状态的连接 |
ChannelOption.SO_KEEPALIVE | false | 用于开启或者关闭保活探测,默认情况下是关闭的 |
ChannelOption.TCP_NODELAY | true | 如果要求高实时性,有数据发送时就马上发送,就将该选项设置为 true 关闭 Nagle 算法 如果要减少发送次数减少网络交互,就设置为 false 等累积一定大小后再发送,默认为 false |
ChannelOption.SO_SNDBUF | 0 | 设置发送缓冲区大小 |
ChannelOption.SO_RCVBUF | 0 | 设置接收缓冲区大小 |
ChannelOption.WRITE_BUFFER_WATER_MARK | 0 | 设置写缓冲区大小 |
ChannelOption.ALLOCATOR | PooledByteBufAllocator.DEFAULT | 优先分配直接内存 |
Broker 服务端会在初始化阶段,通过调用 BrokerController#registerProcessor 方法注册,请求 -> Processor 处理器之间的映射关系,将其写入到 NettyRemotingAbstract#processorTable 集合中,当接收来自客户端请求时,代表输入由 Netty 最后一个处理器:NettyRemotingServer.NettyServerHandler 接收处理,执行其内部的 channelRead0 方法处理消息收到的请求,根据请求体 RequestCommand 携带的 code,从 processorTable 集合中找到 Pair 组合「Processor,Executor」等待 Broker 处理完成之后,再执行客户端的回调方法,返回给客户端具体的请求结果.
NettyRemotingClient
在执行 BrokerController#start 时,同时会将 BrokerOuterAPI 启动,也就是启动 NettyRemotingClient
在执行 DefaultMQProducer#start、DefaultMQPushConsumerImpl#start 方法时,同时会将 MQClientAPIImpl 也启动,也就是启动 NettyRemotingClient
所以,从 Broker、生产者、消费者角度作为客户端,它们使用的都是同一个类 NettyRemotingClient 逻辑作为 Netty 客户端使用,以下是其启动时具体的源码:
public void start() {this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(// 默认线程数为 4nettyClientConfig.getClientWorkerThreads(),new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());}});// 后续会发起请求时会通过 eventLoopGroupWorker 去建立 Socket 连接与服务端之间进行读、写交互,NioSocketChannel 代表的就是非阻塞的 SocketChannelBootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)// 数据包组装为更大的帧然后进行发送.option(ChannelOption.TCP_NODELAY, true)// 定时发送探测包来探测连接的对端是否存活.option(ChannelOption.SO_KEEPALIVE, false).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()).handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();if (nettyClientConfig.isUseTLS()) {if (null != sslContext) {pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));log.info("Prepend SSL handler");} else {log.warn("Connections are insecure as SSLContext is null!");}}// DefaultEventExecutorGroup 用来执行以下五个 ChannelHandlerpipeline.addLast(defaultEventExecutorGroup,// 编码 -> 处理请求new NettyEncoder(),// 解码 -> 处理响应new NettyDecoder(),new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),new NettyConnectManageHandler(),// 远程调用->请求、响应处理器new NettyClientHandler());}});// 操作系统客户端发送缓冲区的大小if (nettyClientConfig.getClientSocketSndBufSize() > 0) {log.info("client set SO_SNDBUF to {}", nettyClientConfig.getClientSocketSndBufSize());handler.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize());}// 操作系统客户端接收缓冲区的大小if (nettyClientConfig.getClientSocketRcvBufSize() > 0) {log.info("client set SO_RCVBUF to {}", nettyClientConfig.getClientSocketRcvBufSize());handler.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize());}if (nettyClientConfig.getWriteBufferLowWaterMark() > 0 && nettyClientConfig.getWriteBufferHighWaterMark() > 0) {log.info("client set netty WRITE_BUFFER_WATER_MARK to {},{}",nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark());handler.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark()));}// Timer 定时执行哪些请求过期的事件,每隔 3 秒this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {NettyRemotingClient.this.scanResponseTable();} catch (Throwable e) {log.error("scanResponseTable exception", e);}}}, 1000 * 3, 1000);// 生产者、消费者客户端一般为空,在 nameserver 与 Broker 交互时会使用到,做一些连接、关闭、异常、死亡状态的回调处理if (this.channelEventListener != null) {this.nettyEventExecutor.start();}
}
启动 NettyRemotingClient 流程如下:
- 创建 DefaultEventExecutorGroup Worker 线程池,用于向客户端发起写事件、接收读事件的处理
- 通过 group 绑定 Worker 主线程,创建 NioSocketChannel 非阻塞 SocketChannel
- 设置相关的客户端参数,如下表
- 设置客户端请求、响应时要执行的处理器逻辑,主要是:编码-NettyEncoder、解码-NettyDecoder、请求_响应处理器-NettyClientHandler
- 创建一个 Timer 定时器,每隔 3 秒扫描哪些超时等待的请求,并对它们进行处理,响应超时等待回调请求返回给业务调用方
参数名 | 参数值 | 参数描述 |
---|---|---|
ChannelOption.TCP_NODELAY | true | 如果要求高实时性,有数据发送时就马上发送,就将该选项设置为 true 关闭 Nagle 算法 如果要减少发送次数减少网络交互,就设置为 false 等累积一定大小后再发送,默认为 false |
ChannelOption.SO_KEEPALIVE | false | 用于开启或者关闭保活探测,默认情况下是关闭的 |
CONNECT_TIMEOUT_MILLIS | 3000 | 连接超时时长 3 秒,在规定时间内未处理完成返回 Timeout 异常 |
ChannelOption.SO_SNDBUF | 0 | 客户端发送缓冲区的大小 |
ChannelOption.SO_RCVBUF | 0 | 客户端接收缓冲区的大小 |
ChannelOption.WRITE_BUFFER_WATER_MARK | 0 | 设置写缓冲区大小 |
在作为客户端角度,只有当每次发起投递消息、消费消息请求时,才会创建与服务端之间的 Channel 通道,核心方法 NettyRemotingClient#createChannel 内部调用 Bootstrap#connect(java.net.SocketAddress) 建立与服务端之间的连接,然后再发起请求,请求的内容以及协议已经在本节专栏的上一篇博文讲到过了.
总结
该篇文章主要介绍在 RocketMQ remoting 底层通信模块中的 NettyRemotingServer、NettyRemotingClient 实例化、初始化、启动时源码的分析,在 BrokerController 实例化会优先构建好 Netty 客户端实例,在其初始化阶段会构建好 Netty 服务端实例,而在生产者、消费者侧,是在实例化 MQClientInstance 实例时会将 Netty 客户端实例也构建好,同时在 Broker、生产者、消费者启动时,会将对应的 Netty 服务端、客户端都一并启动,比编写文章不易,希望对您有帮助,能够喜欢~
博文放在 RocketMQ 专栏里,欢迎订阅,会持续更新!
如果觉得博文不错,关注我 vnjohn,后续会有更多实战、源码、架构干货分享!
推荐专栏:Spring、MySQL,订阅一波不再迷路
大家的「关注❤️ + 点赞👍 + 收藏⭐」就是我创作的最大动力!谢谢大家的支持,我们下文见!