RocketMQ的底层通信模块remoting
remoting是RocketMQ的底层通信模块,RocketMQ底层通讯是使用Netty来实现的。本文通过对remoting源码进行分析,来说明remoting如何实现高性能通信的。
二、Remoting 通信模块结构
remoting 的网络通信是基于 Netty 实现,模块中类的继承关系如下:
可见通信的类继承自类RemotingService,RemotingService的定义如下:
RemotingService是RPC 远程服务基础类。主要定义所有的远程服务类的基础方法:
public interface RemotingService {// 服务启动void start();// 服务停止void shutdown();//注册RPC钩子函数void registerRPCHook(RPCHook rpcHook);
}
抽象出服务端与客户端都需要的三个方法,其中注册hook是为了能够在远程通讯之后或调用之前执行用户自定义的逻辑。
RemotingServer/RemotingClient
远程服务器/客户端基础接口,两者中的方法基本类似,故这里重点介绍一下 RemotingServer,定位 RPC 远程操作的相关“业务方法”。
public interface RemotingServer extends RemotingService {/*** 注册处理器* @param requestCode 请求码* @param processor 处理器* @param executor 线程池* 这三者是绑定关系:* 根据请求的code 找到处理对应请求的处理器与线程池 并完成业务处理。*/void registerProcessor(final int requestCode, final NettyRequestProcessor processor,final ExecutorService executor);/*** 注册缺省处理器* @param processor 缺省处理器* @param executor 线程池*/void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor);// 获取服务端口int localListenPort();/*** 根据 请求码 获取 处理器和线程池* @param requestCode 请求码* @return*/Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode);/*** 同步调用* @param channel 通信通道* @param request 业务请求对象* @param timeoutMillis 超时时间* @return 响应结果封装*/RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,RemotingTimeoutException;/*** 异步调用* @param channel 通信通道* @param request 业务请求对象* @param timeoutMillis 超时时间* @param invokeCallback 响应结果回调对象*/void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis,final InvokeCallback invokeCallback) throws InterruptedException,RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;/*** 单向调用 (不关注返回结果)* @param channel 通信通道* @param request 业务请求对象* @param timeoutMillis 超时时间*/void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis)throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,RemotingSendRequestException;
}
从上面的代码可以看出,RemotingServer的主要功能是注册请求协议处理器、请求调用方法。
NettyRemotingServer::服务端的实现类,实现了 RemotingServer 接口,继承 NettyRemotingAbstract 抽象类
public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);// Netty服务端启动器private final ServerBootstrap serverBootstrap;// worker组private final EventLoopGroup eventLoopGroupSelector;// boss组 private final EventLoopGroup eventLoopGroupBoss;// Netty服务端配置信息类private final NettyServerConfig nettyServerConfig;// 公共线程池 (在注册协议处理器的时候,若未给处理器指定线程池,那么就是用该公共线程池)private final ExecutorService publicExecutor;// Netty Channel 特殊状态监听器private final ChannelEventListener channelEventListener;// 定时器 (功能: 扫描 responseTable表,将过期的responseFuture移除)private final Timer timer = new Timer("ServerHouseKeepingService", true);// 用于在pipeline指定handler中 执行任务的线程池private DefaultEventExecutorGroup defaultEventExecutorGroup;// 服务端绑定的端口private int port = 0;private static final String HANDSHAKE_HANDLER_NAME = "handshakeHandler";private static final String TLS_HANDLER_NAME = "sslHandler";private static final String FILE_REGION_ENCODER_NAME = "fileRegionEncoder";// 用于处理 SSL 握手连接的处理器private HandshakeHandler handshakeHandler;// 协议编码 处理器private NettyEncoder encoder;// 连接管理 处理器private NettyConnectManageHandler connectionManageHandler;// 核心业务 处理器private NettyServerHandler serverHandler;// 参数1: nettyServerConfig Netty服务端配置信息// 参数2: channelEventListener channel特殊状态监听器public NettyRemotingServer(final NettyServerConfig nettyServerConfig,final ChannelEventListener channelEventListener) {// 调用父类 就是通过 Semaphore 设置请求并发限制// 1. 设置 单行请求的并发限制// 2. 设置 异步请求的并发限制super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());this.serverBootstrap = new ServerBootstrap();this.nettyServerConfig = nettyServerConfig;this.channelEventListener = channelEventListener;// 创建公共线程池 publicExecutor 线程数量为:4int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();if (publicThreadNums <= 0) {publicThreadNums = 4;}this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());}});// 下面就是根据操作系统平台来选择创建 bossGroup 和 workGroup的逻辑if (useEpoll()) {this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));}});this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);private int threadTotal = nettyServerConfig.getServerSelectorThreads();@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));}});} else {this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet()));}});this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);private int threadTotal = nettyServerConfig.getServerSelectorThreads();@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));}});}// 加载SSL连接的相关方法 (不在本篇的分析范围内)loadSslContext();}
}
NettyRemotingServer当中重要的参数:
父类的属性 semaphoreOneway , **semaphoreAsync ** 用来控制请求并发量的
serverBootstrap Netty服务器启动器
nettyServerConfig Netty服务器配置信息
channelEventListener Netty Channel状态监听器
eventLoopGroupSelector worker组
eventLoopGroupBoss boss组
NettyRemotingServer的启动
// 启动Netty 服务器@Overridepublic void start() {// Netty pipeline中的指定 handler 采用该线程池执行this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyServerConfig.getServerWorkerThreads(),new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());}});// 初始化 处理器 handler// 1. handshakeHandler SSL连接// 2. encoder 编码器// 3. connectionManageHandler 连接管理器处理器// 4. serverHandler 核心业务处理器prepareSharableHandlers();// 下面就是 Netty 创建服务端启动器的固定流程 ServerBootstrap childHandler =// 配置服务端 启动对象// 配置工作组 boss 和 worker 组this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)// 设置服务端ServerSocketChannel 类型.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)// 设置服务端ch选项.option(ChannelOption.SO_BACKLOG, 1024).option(ChannelOption.SO_REUSEADDR, true).option(ChannelOption.SO_KEEPALIVE, false)// 设置客户端ch选项.childOption(ChannelOption.TCP_NODELAY, true)// 设置 接收缓冲区 和 发送缓冲区的 大小.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize()).childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())// 设置服务器端口.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {// 初始化 客户端ch pipeline 的逻辑, 同时指定了线程池为 defaultEventExecutorGroupch.pipeline().addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler).addLast(defaultEventExecutorGroup,encoder,new NettyDecoder(),new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),connectionManageHandler,serverHandler);}});if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {// 客户端开启 内存池,使用的内存池 是 PooledByteBufAllocator.DEFAULTchildHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);}try {// 服务器 绑定端口ChannelFuture sync = this.serverBootstrap.bind().sync();InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();this.port = addr.getPort();} catch (InterruptedException e1) {throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);}// 条件成立: channel状态监听器不为空, 则创建 网络异常事件执行器if (this.channelEventListener != null) {this.nettyEventExecutor.start();}// 提交定时任务,每一秒 执行一次// 扫描 responseTable 表, 将过期的 responseFuture 移除this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {NettyRemotingServer.this.scanResponseTable();} catch (Throwable e) {log.error("scanResponseTable exception", e);}}}, 1000 * 3, 1000);}
上述代码 基本上就是 模板Netty创建服务端的代码,主要做了如下几件事:
启动Netty服务器
开启 channel状态监听线程
开启 扫描 responseFuture 的定时任务
通过这个结构图可以看出,RocketMQ 在 Netty 原生的多线程 Reactor 模型上做了一系列的扩展和优化,使用多个线程池来处理数据
1、一个 Reactor 主线程(eventLoopGroupBoss,即为上面的1)负责监听 TCP 网络连接请求,建立好连接,创建 SocketChannel,并注册到 selector 上。RocketMQ 的源码中会自动根据 OS 的类型选择 NIO 和 Epoll,也可以通过参数配置,然后监听真正的网络数据。
2、拿到网络数据后,再丢给 Worker 线程池(eventLoopGroupSelector,即为上面的“N”,源码中默认设置为3),
3、在真正执行业务逻辑之前需要进行 SSL 验证、编解码、空闲检查、网络连接管理,这些工作交给 defaultEventExecutorGroup(即为上面的“M1”,源码中默认设置为 8 )去做。
4、而处理业务操作放在业务线程池中执行,根据 RomotingCommand 的业务请求码 code 去 processorTable 这个本地缓存变量中找到对应的 processor,然后封装成 task 任务后,提交给对应的业务 processor 处理线程池来执行(sendMessageExecutor,以发送消息为例,即为上面的 “M2”)。从入口到业务逻辑的几个步骤中线程池一直再增加,这跟每一步逻辑复杂性相关,越复杂,需要的并发通道越宽。
NettyRemotingAbstract:抽象类NettyRemotingAbstract是NettyRemotingServer的父类,主要定义了请求并发量、控制响应对象和各种请求处理函数。
public abstract class NettyRemotingAbstract {// 控制 单向请求的 并发量protected final Semaphore semaphoreOneway;// 控制 异步请求的 并发量protected final Semaphore semaphoreAsync;// 响应对象映射表 (key: opaque value:responseFuture)protected final ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable =new ConcurrentHashMap<Integer, ResponseFuture>(256);// 请求处理器映射表 (key: requestCode value:(processor,executor) )protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);// Netty事件监听线程池protected final NettyEventExecutor nettyEventExecutor = new NettyEventExecutor();// 默认的请求处理器对 包含(processor,executor) protected Pair<NettyRequestProcessor, ExecutorService> defaultRequestProcessor;// SSL相关protected volatile SslContext sslContext;// 扩展钩子protected List<RPCHook> rpcHooks = new ArrayList<RPCHook>();
}
结合RocketMQ源码分析
01初始化流程
在Broker创建的时候会去初始化NettyRemotingServer类,调用其构造方法。
public NettyRemotingServer(final NettyServerConfig nettyServerConfig,final ChannelEventListener channelEventListener) {super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());this.serverBootstrap = new ServerBootstrap();this.nettyServerConfig = nettyServerConfig;this.channelEventListener = channelEventListener;int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();if (publicThreadNums <= 0) {publicThreadNums = 4;}// 初始化用于接收客户端的TCP连接this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());}});// 根据配置设置NIO还是Epoll来作为Selector线程池// 默认在Linux环境为true,windows环境下都是NIO,相对来说Linux更快。if (useEpoll()) {this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));}});this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);private int threadTotal = nettyServerConfig.getServerSelectorThreads();@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));}});} else {this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet()));}});this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);private int threadTotal = nettyServerConfig.getServerSelectorThreads();@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));}});}// 加载SSLloadSslContext();
}
首先会初始化ServerBootstrap,NettyServerConfig等相关参数,同时会判断是在哪个操作系统,如果是在Linux平台则会使用EpollEventLoopGroup作为线程池,如果不是则会使用NioEventLoopGroup作为线程池。
02启动流程
在Broker作为服务端启动与NameServer作为服务端启动的时候都会来初始化,启动一个Netty的服务端进行相关的通讯。
@Override
public void start() {// 默认的处理线程池组,用于处理后面多个NettyHandle的操作this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyServerConfig.getServerWorkerThreads(),new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());}});prepareSharableHandlers();// 正常的一种Netty的服务端的启动逻辑// 其中包括解码器,编码器,心跳管理器,连接管理器,消息类型处理分发ServerBootstrap childHandler =this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector).channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, nettyServerConfig.getServerSocketBacklog()).option(ChannelOption.SO_REUSEADDR, true).option(ChannelOption.SO_KEEPALIVE, false).childOption(ChannelOption.TCP_NODELAY, true).localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler).addLast(defaultEventExecutorGroup,encoder,new NettyDecoder(),new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),connectionManageHandler,serverHandler);}});if (nettyServerConfig.getServerSocketSndBufSize() > 0) {log.info("server set SO_SNDBUF to {}", nettyServerConfig.getServerSocketSndBufSize());childHandler.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize());}if (nettyServerConfig.getServerSocketRcvBufSize() > 0) {log.info("server set SO_RCVBUF to {}", nettyServerConfig.getServerSocketRcvBufSize());childHandler.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize());}if (nettyServerConfig.getWriteBufferLowWaterMark() > 0 && nettyServerConfig.getWriteBufferHighWaterMark() > 0) {log.info("server set netty WRITE_BUFFER_WATER_MARK to {},{}",nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark());childHandler.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark()));}if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);}try {ChannelFuture sync = this.serverBootstrap.bind().sync();InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();this.port = addr.getPort();} catch (InterruptedException e1) {throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);}if (this.channelEventListener != null) {this.nettyEventExecutor.start();}// 定时扫描responseTable,获取返回结果,并处理超时业务。this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {// 超时请求扫描NettyRemotingServer.this.scanResponseTable();} catch (Throwable e) {log.error("scanResponseTable exception", e);}}}, 1000 * 3, 1000);
}
初始化线程池组处理多个NettyHandle操作。
初始化ServerBootStrap,设置tcp参数,编解码器,心跳处理器,连接管理器,请求分发。
定时扫描ResponseTable,获取返回结果并处理超时业务。
NettyEncoder类
将RemotingCommand对象序列化为ByteBuffer对象。根据serializerType的不同,Header会编码为JSON或者二进制。
@Override
public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)throws Exception {try {remotingCommand.fastEncodeHeader(out);byte[] body = remotingCommand.getBody();if (body != null) {out.writeBytes(body);}} catch (Exception e) {log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);if (remotingCommand != null) {log.error(remotingCommand.toString());}RemotingUtil.closeChannel(ctx.channel());}
}
将RemotingCommand对象编码成ByteBuf进行数据的传递,先将Header部分进行编码,然后将body追加到ByteBuf中,Header的编码相对复杂因为在传递过程中我们得让被传递方知道该ByteBuf如何进行拆解,同时我们也要防止各种不正常的情况方法,这是就需要我们规定一些内容,这些都会在Netty里面去详讲,这里就提及一下。(第一个字节表示编解码类型)
NettyDecoder类
NettyEncoder继承自LengthFieldBasedFrameDecoder,主要有用于解码入站数据流,并将数据流解码为RemotingCommand对象。
@Override
public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {ByteBuf frame = null;try {frame = (ByteBuf) super.decode(ctx, in);if (null == frame) {return null;}return RemotingCommand.decode(frame);} catch (Exception e) {log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);RemotingUtil.closeChannel(ctx.channel());} finally {if (null != frame) {frame.release();}}return null;
}
首先会调用LengthFieldBasedFrameDecoder.decoder方法进行首次解码,然后调用NettyDecoder.decoder进行二次解码,完成Header与body的解码并转换成RemotingCommand对象。而之所以需要两次第一个就是为了解出对应的长度,即数据流中前4个字节的值表示有效数据域的长度,除开前4个字节外的内容都是有效数据域的内容,不存在偏移量。
IdleStateHandler类
进行心跳检查类,客户端与服务端之间需要保持长连接则需要通过一个心跳机制来保证有效性,当其中一者处于宕机或者网络延迟的情况下判断是否有效,如果无效及时进行释放资源。(该类三个参数,第一个为读空闲时间,第二个为写空闲时间,第三个为读或写空闲时间,如果在规定时间内没有触发对应事件就会触发定时任务的执行)
NettyConnectManageHandler类
用于监听pipeline中入站/出站的事件,主要进行日志记录等操作。
NettyServerHandler类
核心业务处理器,处理的时候会去调用channelRead0进行相关逻辑,最终会去执行processMessageReceived方法。
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {final RemotingCommand cmd = msg;if (cmd != null) {switch (cmd.getType()) {// 处理请求的过程case REQUEST_COMMAND:processRequestCommand(ctx, cmd);break;case RESPONSE_COMMAND:// 处理响应的过程processResponseCommand(ctx, cmd);break;default:break;}}
}
TYPE类型为REQUEST_COMMAND表示请求消息,则调用processRequestCommand方法进行处理。
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {// 从processorTable中找到对应的Processor,如果不存在则使用defaultRequestProcessor处理器final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;final int opaque = cmd.getOpaque();if (pair != null) {Runnable run = new Runnable() {@Overridepublic void run() {try {// 获取nameServerString remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());doBeforeRpcHooks(remoteAddr, cmd);final RemotingResponseCallback callback = new RemotingResponseCallback() {@Override// 服务端处理完请求之后调用public void callback(RemotingCommand response) {doAfterRpcHooks(remoteAddr, cmd, response);// 不是单向if (!cmd.isOnewayRPC()) {if (response != null) {response.setOpaque(opaque);response.markResponseType();response.setSerializeTypeCurrentRPC(cmd.getSerializeTypeCurrentRPC());try {// 往客户端输出结果ctx.writeAndFlush(response);} catch (Throwable e) {log.error("process request over, but response failed", e);log.error(cmd.toString());log.error(response.toString());}} else {}}}};// 如果是异步处理的Processif (pair.getObject1() instanceof AsyncNettyRequestProcessor) {AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();processor.asyncProcessRequest(ctx, cmd, callback);} else {// 不是异步知道拿到Processor进行相关处理NettyRequestProcessor processor = pair.getObject1();RemotingCommand response = processor.processRequest(ctx, cmd);callback.callback(response);}} catch (Throwable e) {log.error("process request exception", e);log.error(cmd.toString());if (!cmd.isOnewayRPC()) {final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,RemotingHelper.exceptionSimpleDesc(e));response.setOpaque(opaque);ctx.writeAndFlush(response);}}}};if (pair.getObject1().rejectRequest()) {final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,"[REJECTREQUEST]system busy, start flow control for a while");response.setOpaque(opaque);ctx.writeAndFlush(response);return;}try {final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);pair.getObject2().submit(requestTask);} catch (RejectedExecutionException e) {if ((System.currentTimeMillis() % 10000) == 0) {log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())+ ", too many requests and system thread pool busy, RejectedExecutionException "+ pair.getObject2().toString()+ " request code: " + cmd.getCode());}if (!cmd.isOnewayRPC()) {final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,"[OVERLOAD]system busy, start flow control for a while");response.setOpaque(opaque);ctx.writeAndFlush(response);}}} else {String error = " request type " + cmd.getCode() + " not supported";final RemotingCommand response =RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);response.setOpaque(opaque);ctx.writeAndFlush(response);log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);}
}
首先从processorTable获取对应的Processor,而processorTable的内容都是通过启动Broker或者是NameServer进行存入的,存储的结构为map以Code作为key以Processor作为值,当我们使用的时候只需要根据code查询即可,如果没有找到就直接使用默认的defaultRequestProcessor处理器。
如果获取到的Processor为异步的则调用异步处理请求,如果不是则直接调用Processor对应的processRequest方法进行处理。
TYPE类型为RESPONSE_COMMAND表示响应消息,则调用processResponseCommand进行处理。
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {final int opaque = cmd.getOpaque();// 从map缓存获取正在进行的其中一个请求final ResponseFuture responseFuture = responseTable.get(opaque);if (responseFuture != null) {responseFuture.setResponseCommand(cmd);// 移除本次请求responseTable.remove(opaque);// 执行回调if (responseFuture.getInvokeCallback() != null) {executeInvokeCallback(responseFuture);} else {responseFuture.putResponse(cmd);responseFuture.release();}} else {log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));log.warn(cmd.toString());}
}
超时请求扫描
在每次请求时将ResponseFuture放入Map中,通过定时扫描来判断记录时间与超时时间来比较判断是否超时。
public void scanResponseTable() {final List<ResponseFuture> rfList = new LinkedList<ResponseFuture>();Iterator<Entry<Integer, ResponseFuture>> it = this.responseTable.entrySet().iterator();while (it.hasNext()) {Entry<Integer, ResponseFuture> next = it.next();ResponseFuture rep = next.getValue();// 当前时间大于请求开始时间+超时等待时间+1秒,则任务超时了if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) {rep.release();it.remove();rfList.add(rep);log.warn("remove timeout request, " + rep);}}// 执行被移除 ResponseFuture对象的executeInvokeCallback方法 for (ResponseFuture rf : rfList) {try {executeInvokeCallback(rf);} catch (Throwable e) {log.warn("scanResponseTable, operationComplete Exception", e);}}
}