2、RocketMQ源码分析(二)

RocketMQ的底层通信模块remoting

remoting是RocketMQ的底层通信模块,RocketMQ底层通讯是使用Netty来实现的。本文通过对remoting源码进行分析,来说明remoting如何实现高性能通信的。

二、Remoting 通信模块结构
remoting 的网络通信是基于 Netty 实现,模块中类的继承关系如下:

在这里插入图片描述
可见通信的类继承自类RemotingService,RemotingService的定义如下:

RemotingService是RPC 远程服务基础类。主要定义所有的远程服务类的基础方法:

public interface RemotingService {// 服务启动void start();// 服务停止void shutdown();//注册RPC钩子函数void registerRPCHook(RPCHook rpcHook);
}

抽象出服务端与客户端都需要的三个方法,其中注册hook是为了能够在远程通讯之后或调用之前执行用户自定义的逻辑。

RemotingServer/RemotingClient
远程服务器/客户端基础接口,两者中的方法基本类似,故这里重点介绍一下 RemotingServer,定位 RPC 远程操作的相关“业务方法”。

public interface RemotingServer extends RemotingService {/***  注册处理器* @param requestCode   请求码* @param processor     处理器* @param executor      线程池*    这三者是绑定关系:*       根据请求的code  找到处理对应请求的处理器与线程池 并完成业务处理。*/void registerProcessor(final int requestCode, final NettyRequestProcessor processor,final ExecutorService executor);/***  注册缺省处理器* @param processor  缺省处理器* @param executor   线程池*/void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor);// 获取服务端口int localListenPort();/***  根据 请求码 获取 处理器和线程池* @param requestCode  请求码* @return*/Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode);/***  同步调用* @param channel   通信通道* @param request   业务请求对象* @param timeoutMillis   超时时间* @return  响应结果封装*/RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,RemotingTimeoutException;/***  异步调用* @param channel  通信通道* @param request  业务请求对象* @param timeoutMillis  超时时间* @param invokeCallback  响应结果回调对象*/void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis,final InvokeCallback invokeCallback) throws InterruptedException,RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;/***  单向调用 (不关注返回结果)* @param channel   通信通道* @param request   业务请求对象* @param timeoutMillis  超时时间*/void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis)throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,RemotingSendRequestException;
}

从上面的代码可以看出,RemotingServer的主要功能是注册请求协议处理器、请求调用方法。

NettyRemotingServer::服务端的实现类,实现了 RemotingServer 接口,继承 NettyRemotingAbstract 抽象类


public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);// Netty服务端启动器private final ServerBootstrap serverBootstrap;// worker组private final EventLoopGroup eventLoopGroupSelector;// boss组 private final EventLoopGroup eventLoopGroupBoss;// Netty服务端配置信息类private final NettyServerConfig nettyServerConfig;// 公共线程池   (在注册协议处理器的时候,若未给处理器指定线程池,那么就是用该公共线程池)private final ExecutorService publicExecutor;//  Netty Channel 特殊状态监听器private final ChannelEventListener  channelEventListener;// 定时器  (功能: 扫描 responseTable表,将过期的responseFuture移除)private final Timer timer = new Timer("ServerHouseKeepingService", true);// 用于在pipeline指定handler中 执行任务的线程池private DefaultEventExecutorGroup defaultEventExecutorGroup;// 服务端绑定的端口private int port = 0;private static final String HANDSHAKE_HANDLER_NAME = "handshakeHandler";private static final String TLS_HANDLER_NAME = "sslHandler";private static final String FILE_REGION_ENCODER_NAME = "fileRegionEncoder";// 用于处理 SSL 握手连接的处理器private HandshakeHandler handshakeHandler;// 协议编码 处理器private NettyEncoder encoder;// 连接管理 处理器private NettyConnectManageHandler connectionManageHandler;// 核心业务 处理器private NettyServerHandler serverHandler;// 参数1: nettyServerConfig  Netty服务端配置信息// 参数2: channelEventListener  channel特殊状态监听器public NettyRemotingServer(final NettyServerConfig nettyServerConfig,final ChannelEventListener channelEventListener) {// 调用父类  就是通过 Semaphore 设置请求并发限制// 1. 设置 单行请求的并发限制// 2. 设置 异步请求的并发限制super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());this.serverBootstrap = new ServerBootstrap();this.nettyServerConfig = nettyServerConfig;this.channelEventListener = channelEventListener;// 创建公共线程池 publicExecutor   线程数量为:4int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();if (publicThreadNums <= 0) {publicThreadNums = 4;}this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());}});// 下面就是根据操作系统平台来选择创建 bossGroup 和 workGroup的逻辑if (useEpoll()) {this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));}});this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);private int threadTotal = nettyServerConfig.getServerSelectorThreads();@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));}});} else {this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet()));}});this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);private int threadTotal = nettyServerConfig.getServerSelectorThreads();@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));}});}// 加载SSL连接的相关方法 (不在本篇的分析范围内)loadSslContext();}
}

NettyRemotingServer当中重要的参数:

父类的属性 semaphoreOneway , **semaphoreAsync ** 用来控制请求并发量的
serverBootstrap Netty服务器启动器
nettyServerConfig Netty服务器配置信息
channelEventListener Netty Channel状态监听器
eventLoopGroupSelector worker组
eventLoopGroupBoss boss组

NettyRemotingServer的启动

  // 启动Netty 服务器@Overridepublic void start() {// Netty pipeline中的指定 handler 采用该线程池执行this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyServerConfig.getServerWorkerThreads(),new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());}});// 初始化 处理器 handler// 1. handshakeHandler  SSL连接// 2. encoder  编码器// 3. connectionManageHandler 连接管理器处理器// 4. serverHandler 核心业务处理器prepareSharableHandlers();// 下面就是 Netty 创建服务端启动器的固定流程 ServerBootstrap childHandler =// 配置服务端 启动对象// 配置工作组 boss 和 worker 组this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)// 设置服务端ServerSocketChannel 类型.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)// 设置服务端ch选项.option(ChannelOption.SO_BACKLOG, 1024).option(ChannelOption.SO_REUSEADDR, true).option(ChannelOption.SO_KEEPALIVE, false)// 设置客户端ch选项.childOption(ChannelOption.TCP_NODELAY, true)// 设置 接收缓冲区 和 发送缓冲区的 大小.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize()).childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())// 设置服务器端口.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {// 初始化 客户端ch pipeline 的逻辑, 同时指定了线程池为 defaultEventExecutorGroupch.pipeline().addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler).addLast(defaultEventExecutorGroup,encoder,new NettyDecoder(),new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),connectionManageHandler,serverHandler);}});if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {// 客户端开启 内存池,使用的内存池 是 PooledByteBufAllocator.DEFAULTchildHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);}try {//  服务器 绑定端口ChannelFuture sync = this.serverBootstrap.bind().sync();InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();this.port = addr.getPort();} catch (InterruptedException e1) {throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);}// 条件成立: channel状态监听器不为空, 则创建 网络异常事件执行器if (this.channelEventListener != null) {this.nettyEventExecutor.start();}// 提交定时任务,每一秒 执行一次// 扫描 responseTable 表, 将过期的 responseFuture 移除this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {NettyRemotingServer.this.scanResponseTable();} catch (Throwable e) {log.error("scanResponseTable exception", e);}}}, 1000 * 3, 1000);}

上述代码 基本上就是 模板Netty创建服务端的代码,主要做了如下几件事:

启动Netty服务器
开启 channel状态监听线程
开启 扫描 responseFuture 的定时任务
在这里插入图片描述

通过这个结构图可以看出,RocketMQ 在 Netty 原生的多线程 Reactor 模型上做了一系列的扩展和优化,使用多个线程池来处理数据

1、一个 Reactor 主线程(eventLoopGroupBoss,即为上面的1)负责监听 TCP 网络连接请求,建立好连接,创建 SocketChannel,并注册到 selector 上。RocketMQ 的源码中会自动根据 OS 的类型选择 NIO 和 Epoll,也可以通过参数配置,然后监听真正的网络数据。
2、拿到网络数据后,再丢给 Worker 线程池(eventLoopGroupSelector,即为上面的“N”,源码中默认设置为3),
3、在真正执行业务逻辑之前需要进行 SSL 验证、编解码、空闲检查、网络连接管理,这些工作交给 defaultEventExecutorGroup(即为上面的“M1”,源码中默认设置为 8 )去做。
4、而处理业务操作放在业务线程池中执行,根据 RomotingCommand 的业务请求码 code 去 processorTable 这个本地缓存变量中找到对应的 processor,然后封装成 task 任务后,提交给对应的业务 processor 处理线程池来执行(sendMessageExecutor,以发送消息为例,即为上面的 “M2”)。从入口到业务逻辑的几个步骤中线程池一直再增加,这跟每一步逻辑复杂性相关,越复杂,需要的并发通道越宽。

NettyRemotingAbstract:抽象类NettyRemotingAbstract是NettyRemotingServer的父类,主要定义了请求并发量、控制响应对象和各种请求处理函数。

public abstract class NettyRemotingAbstract {// 控制 单向请求的 并发量protected final Semaphore semaphoreOneway;// 控制 异步请求的 并发量protected final Semaphore semaphoreAsync;// 响应对象映射表  (key: opaque  value:responseFuture)protected final ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable =new ConcurrentHashMap<Integer, ResponseFuture>(256);// 请求处理器映射表 (key: requestCode  value:(processor,executor)  )protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);// Netty事件监听线程池protected final NettyEventExecutor nettyEventExecutor = new NettyEventExecutor();// 默认的请求处理器对  包含(processor,executor) protected Pair<NettyRequestProcessor, ExecutorService> defaultRequestProcessor;// SSL相关protected volatile SslContext sslContext;// 扩展钩子protected List<RPCHook> rpcHooks = new ArrayList<RPCHook>();
}

结合RocketMQ源码分析

01初始化流程
在Broker创建的时候会去初始化NettyRemotingServer类,调用其构造方法。

public NettyRemotingServer(final NettyServerConfig nettyServerConfig,final ChannelEventListener channelEventListener) {super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());this.serverBootstrap = new ServerBootstrap();this.nettyServerConfig = nettyServerConfig;this.channelEventListener = channelEventListener;int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();if (publicThreadNums <= 0) {publicThreadNums = 4;}// 初始化用于接收客户端的TCP连接this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());}});// 根据配置设置NIO还是Epoll来作为Selector线程池// 默认在Linux环境为true,windows环境下都是NIO,相对来说Linux更快。if (useEpoll()) {this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));}});this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);private int threadTotal = nettyServerConfig.getServerSelectorThreads();@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));}});} else {this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet()));}});this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);private int threadTotal = nettyServerConfig.getServerSelectorThreads();@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));}});}// 加载SSLloadSslContext();
}

首先会初始化ServerBootstrap,NettyServerConfig等相关参数,同时会判断是在哪个操作系统,如果是在Linux平台则会使用EpollEventLoopGroup作为线程池,如果不是则会使用NioEventLoopGroup作为线程池。

02启动流程
在Broker作为服务端启动与NameServer作为服务端启动的时候都会来初始化,启动一个Netty的服务端进行相关的通讯。


@Override
public void start() {// 默认的处理线程池组,用于处理后面多个NettyHandle的操作this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyServerConfig.getServerWorkerThreads(),new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());}});prepareSharableHandlers();// 正常的一种Netty的服务端的启动逻辑// 其中包括解码器,编码器,心跳管理器,连接管理器,消息类型处理分发ServerBootstrap childHandler =this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector).channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, nettyServerConfig.getServerSocketBacklog()).option(ChannelOption.SO_REUSEADDR, true).option(ChannelOption.SO_KEEPALIVE, false).childOption(ChannelOption.TCP_NODELAY, true).localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler).addLast(defaultEventExecutorGroup,encoder,new NettyDecoder(),new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),connectionManageHandler,serverHandler);}});if (nettyServerConfig.getServerSocketSndBufSize() > 0) {log.info("server set SO_SNDBUF to {}", nettyServerConfig.getServerSocketSndBufSize());childHandler.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize());}if (nettyServerConfig.getServerSocketRcvBufSize() > 0) {log.info("server set SO_RCVBUF to {}", nettyServerConfig.getServerSocketRcvBufSize());childHandler.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize());}if (nettyServerConfig.getWriteBufferLowWaterMark() > 0 && nettyServerConfig.getWriteBufferHighWaterMark() > 0) {log.info("server set netty WRITE_BUFFER_WATER_MARK to {},{}",nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark());childHandler.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark()));}if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);}try {ChannelFuture sync = this.serverBootstrap.bind().sync();InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();this.port = addr.getPort();} catch (InterruptedException e1) {throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);}if (this.channelEventListener != null) {this.nettyEventExecutor.start();}// 定时扫描responseTable,获取返回结果,并处理超时业务。this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {// 超时请求扫描NettyRemotingServer.this.scanResponseTable();} catch (Throwable e) {log.error("scanResponseTable exception", e);}}}, 1000 * 3, 1000);
}

初始化线程池组处理多个NettyHandle操作。
初始化ServerBootStrap,设置tcp参数,编解码器,心跳处理器,连接管理器,请求分发。
定时扫描ResponseTable,获取返回结果并处理超时业务。

NettyEncoder类
将RemotingCommand对象序列化为ByteBuffer对象。根据serializerType的不同,Header会编码为JSON或者二进制。

 @Override
public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)throws Exception {try {remotingCommand.fastEncodeHeader(out);byte[] body = remotingCommand.getBody();if (body != null) {out.writeBytes(body);}} catch (Exception e) {log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);if (remotingCommand != null) {log.error(remotingCommand.toString());}RemotingUtil.closeChannel(ctx.channel());}
}

将RemotingCommand对象编码成ByteBuf进行数据的传递,先将Header部分进行编码,然后将body追加到ByteBuf中,Header的编码相对复杂因为在传递过程中我们得让被传递方知道该ByteBuf如何进行拆解,同时我们也要防止各种不正常的情况方法,这是就需要我们规定一些内容,这些都会在Netty里面去详讲,这里就提及一下。(第一个字节表示编解码类型)

NettyDecoder类
NettyEncoder继承自LengthFieldBasedFrameDecoder,主要有用于解码入站数据流,并将数据流解码为RemotingCommand对象。

@Override
public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {ByteBuf frame = null;try {frame = (ByteBuf) super.decode(ctx, in);if (null == frame) {return null;}return RemotingCommand.decode(frame);} catch (Exception e) {log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);RemotingUtil.closeChannel(ctx.channel());} finally {if (null != frame) {frame.release();}}return null;
}

首先会调用LengthFieldBasedFrameDecoder.decoder方法进行首次解码,然后调用NettyDecoder.decoder进行二次解码,完成Header与body的解码并转换成RemotingCommand对象。而之所以需要两次第一个就是为了解出对应的长度,即数据流中前4个字节的值表示有效数据域的长度,除开前4个字节外的内容都是有效数据域的内容,不存在偏移量。

IdleStateHandler类
进行心跳检查类,客户端与服务端之间需要保持长连接则需要通过一个心跳机制来保证有效性,当其中一者处于宕机或者网络延迟的情况下判断是否有效,如果无效及时进行释放资源。(该类三个参数,第一个为读空闲时间,第二个为写空闲时间,第三个为读或写空闲时间,如果在规定时间内没有触发对应事件就会触发定时任务的执行)

NettyConnectManageHandler类
用于监听pipeline中入站/出站的事件,主要进行日志记录等操作。

NettyServerHandler类
核心业务处理器,处理的时候会去调用channelRead0进行相关逻辑,最终会去执行processMessageReceived方法。


public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {final RemotingCommand cmd = msg;if (cmd != null) {switch (cmd.getType()) {// 处理请求的过程case REQUEST_COMMAND:processRequestCommand(ctx, cmd);break;case RESPONSE_COMMAND:// 处理响应的过程processResponseCommand(ctx, cmd);break;default:break;}}
}

TYPE类型为REQUEST_COMMAND表示请求消息,则调用processRequestCommand方法进行处理。

public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {// 从processorTable中找到对应的Processor,如果不存在则使用defaultRequestProcessor处理器final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;final int opaque = cmd.getOpaque();if (pair != null) {Runnable run = new Runnable() {@Overridepublic void run() {try {// 获取nameServerString remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());doBeforeRpcHooks(remoteAddr, cmd);final RemotingResponseCallback callback = new RemotingResponseCallback() {@Override// 服务端处理完请求之后调用public void callback(RemotingCommand response) {doAfterRpcHooks(remoteAddr, cmd, response);// 不是单向if (!cmd.isOnewayRPC()) {if (response != null) {response.setOpaque(opaque);response.markResponseType();response.setSerializeTypeCurrentRPC(cmd.getSerializeTypeCurrentRPC());try {// 往客户端输出结果ctx.writeAndFlush(response);} catch (Throwable e) {log.error("process request over, but response failed", e);log.error(cmd.toString());log.error(response.toString());}} else {}}}};// 如果是异步处理的Processif (pair.getObject1() instanceof AsyncNettyRequestProcessor) {AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();processor.asyncProcessRequest(ctx, cmd, callback);} else {// 不是异步知道拿到Processor进行相关处理NettyRequestProcessor processor = pair.getObject1();RemotingCommand response = processor.processRequest(ctx, cmd);callback.callback(response);}} catch (Throwable e) {log.error("process request exception", e);log.error(cmd.toString());if (!cmd.isOnewayRPC()) {final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,RemotingHelper.exceptionSimpleDesc(e));response.setOpaque(opaque);ctx.writeAndFlush(response);}}}};if (pair.getObject1().rejectRequest()) {final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,"[REJECTREQUEST]system busy, start flow control for a while");response.setOpaque(opaque);ctx.writeAndFlush(response);return;}try {final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);pair.getObject2().submit(requestTask);} catch (RejectedExecutionException e) {if ((System.currentTimeMillis() % 10000) == 0) {log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())+ ", too many requests and system thread pool busy, RejectedExecutionException "+ pair.getObject2().toString()+ " request code: " + cmd.getCode());}if (!cmd.isOnewayRPC()) {final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,"[OVERLOAD]system busy, start flow control for a while");response.setOpaque(opaque);ctx.writeAndFlush(response);}}} else {String error = " request type " + cmd.getCode() + " not supported";final RemotingCommand response =RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);response.setOpaque(opaque);ctx.writeAndFlush(response);log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);}
}

首先从processorTable获取对应的Processor,而processorTable的内容都是通过启动Broker或者是NameServer进行存入的,存储的结构为map以Code作为key以Processor作为值,当我们使用的时候只需要根据code查询即可,如果没有找到就直接使用默认的defaultRequestProcessor处理器。
如果获取到的Processor为异步的则调用异步处理请求,如果不是则直接调用Processor对应的processRequest方法进行处理。
TYPE类型为RESPONSE_COMMAND表示响应消息,则调用processResponseCommand进行处理。


public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {final int opaque = cmd.getOpaque();// 从map缓存获取正在进行的其中一个请求final ResponseFuture responseFuture = responseTable.get(opaque);if (responseFuture != null) {responseFuture.setResponseCommand(cmd);// 移除本次请求responseTable.remove(opaque);// 执行回调if (responseFuture.getInvokeCallback() != null) {executeInvokeCallback(responseFuture);} else {responseFuture.putResponse(cmd);responseFuture.release();}} else {log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));log.warn(cmd.toString());}
}

超时请求扫描
在每次请求时将ResponseFuture放入Map中,通过定时扫描来判断记录时间与超时时间来比较判断是否超时。

public void scanResponseTable() {final List<ResponseFuture> rfList = new LinkedList<ResponseFuture>();Iterator<Entry<Integer, ResponseFuture>> it = this.responseTable.entrySet().iterator();while (it.hasNext()) {Entry<Integer, ResponseFuture> next = it.next();ResponseFuture rep = next.getValue();// 当前时间大于请求开始时间+超时等待时间+1秒,则任务超时了if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) {rep.release();it.remove();rfList.add(rep);log.warn("remove timeout request, " + rep);}}// 执行被移除 ResponseFuture对象的executeInvokeCallback方法  for (ResponseFuture rf : rfList) {try {executeInvokeCallback(rf);} catch (Throwable e) {log.warn("scanResponseTable, operationComplete Exception", e);}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/192023.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

文章解读与仿真程序复现思路——电网技术EI\CSCD\北大核心《市场环境下运行的光热电站子系统容量优化配比研究》

这个标题涉及到对市场环境下运行的光热电站子系统进行容量优化配比的研究。让我们逐步解读&#xff1a; 市场环境下运行的光热电站&#xff1a; 这指的是光热电站在实际市场环境中的运行&#xff0c;可能包括了市场相关的经济、政策、竞争等因素。 子系统&#xff1a; 光热电站…

十六进制数列求和

高精度数组的集大成 做的时候在和高中同学叙叙旧&#xff0c;差点寄掉 代码如下&#xff1a; #include<stdio.h> void expand(int len); const char hexadecimal[17] "0123456789ABCDEF"; int result[20], mid[20], l_result[100];int main(void) {char tm…

你好!二分查找【JAVA】

1.初次相识 二分查找又称折半查找&#xff0c;是一种在有序数组中查找特定元素的算法。二分查找的基本思想是&#xff1a;通过不断地二分数组的中间元素&#xff0c;缩小查找区间&#xff0c;直到找到目标元素或者确定目标元素不存在为止。 二分查找的时间复杂度为O(logn)&…

docker配置redis主从、哨兵集群

搭建redis主从 准备工作 在/usr/local/software/redis/文件夹下建立如下的文件夹、文件 rootlocalhost redis]# mkdir -p 6379/conf 6379/data 6379/log [rootlocalhost redis]# mkdir -p 6380/conf 6380/data 6380/log [rootlocalhost redis]# mkdir -p 6381/conf 6381/…

如何创建一个vue工程

1.打开vue安装网址&#xff1a;安装 | Vue CLI (vuejs.org) 2.创建一个项目文件夹 3.复制地址 4.打开cmd&#xff0c;进入这个地址 5.复制粘贴vue网页的安装命令 npm install -g vue/cli 6.创建vue工程 vue create vue这里可以通过上下键来进行选择。选最后一个选项按回车。 …

制作一个RISC-V的操作系统-环境搭建

文章目录 前言环境搭配 前言 由于之前的操作系统反馈难度太大&#xff0c;所以准备从这个RISC-V操作系统出发&#xff0c;以后知识层面更加深入再去完善。 环境搭配 按照依赖项 $ sudo apt update $ sudo apt install build-essential gcc make perl dkms git gcc-riscv64-…

三轴加速度计LIS2DW12开发(2)----基于中断信号获取加速度数据

三轴加速度计LIS2DW12开发.2--轮基于中断信号获取加速度数据 概述视频教学样品申请生成STM32CUBEMX串口配置IIC配置CS和SA0设置INT1设置串口重定向参考程序初始换管脚获取ID复位操作BDU设置开启INT1中断设置传感器的量程配置过滤器链配置电源模式设置输出数据速率中断判断加速…

Mac卸载、安装Python

卸载 说明 对于删除 Python&#xff0c;我们首先要知道其具体都安装了什么&#xff0c;实际上&#xff0c;在安装 Python 时&#xff0c;其自动生成&#xff1a; Python framework&#xff0c;即 Python 框架&#xff1b;Python 应用目录&#xff1b;指向 Python 的连接。 …

mazing是什么软件?为什么选择iMazing

说起iOS设备管理工具&#xff0c;可能大家还有点陌生&#xff0c;其实就是Apple公司开发的移动设备&#xff0c;因其的操作系统是独特的iOS系统&#xff0c;所以又叫iOS设备。比如大家都在用的iPhone手机&#xff0c;就是这样类型的一个设备。 mazing是什么软件? iMazing是一…

【Linux】Ubuntu添加root用户

在Ubuntu中&#xff0c;默认情况下是禁用了root用户的登录。如果仍然想要启用root用户&#xff0c;并设置root用户的密码&#xff0c;应按照以下步骤进行操作&#xff1a; 一、输入sudo passwd root设置root用户密码 二、切换root用户 sudo -i su root 这两条命令均可却换至…

前端入门(四)Ajax、Promise异步、Axios通信、vue-router路由、组件库

文章目录 AjaxAjax特点 Promise 异步编程&#xff08;缺&#xff09;Promise基本使用状态 - PromiseState结果 - PromiseResult AxiosVue中使用AxiosAxios请求方式getpostput和patchdelete并发请求 Vue路由 - vue-router单页面Web应用&#xff08;single page web application&…

Android HCI日志分析案例2

案例1--蓝牙音箱电量用完后&#xff0c;配对一直失败&#xff0c;提示PIN码不正确 基于MTK平台&#xff0c;通过MTKLogger开启保存HCI日志 问题定位分析 Android日志查看logcat 搜索到关键log 01-20 10:07:55.403760 978 1075 V bt_stack: [VERBOSE2:btm_inq.cc(2032)] …

Stream

什么是Stream&#xff1f; 也叫Stream流&#xff0c;是jdk8开始新增的一套API&#xff0c;可以用来操作集合或者数组的数据 优势&#xff1a;Stream流大量的结合了Lambda的语法风格来编程&#xff0c;提供了一种更加强大&#xff0c;更加简单的方式操作集合或数组中的数据&am…

详解Spring工厂是如何获取Aop中的代理对象的

&#x1f609;&#x1f609; 学习交流群&#xff1a; ✅✅1&#xff1a;这是孙哥suns给大家的福利&#xff01; ✨✨2&#xff1a;我们免费分享Netty、Dubbo、k8s、Mybatis、Spring...应用和源码级别的视频资料 &#x1f96d;&#x1f96d;3&#xff1a;QQ群&#xff1a;583783…

Ext4文件系统解析(一)

1、前言 熟悉Linux操作系统的都应该或多或少的了解或者使用过Ext4文件系统。 接下来&#xff0c;会简单介绍Ext4文件系统的一些特性和工作原理。 2、常用概念 在介绍Ext文件系统之前&#xff0c;先简单描述一些相关概念。 块(Block)&#xff1a;Ext文件系统存储分配的基本单…

【探索Linux】—— 强大的命令行工具 P.19(多线程 | 线程的概念 | 线程控制 | 分离线程)

阅读导航 引言一、 Linux线程概念1. 什么是线程2. 线程的概念3. 线程与进程的区别4. 线程异常 二、Linux线程控制1. POSIX线程库2. 创建线程 pthread_create() 函数&#xff08;1&#xff09;头文件&#xff08;2&#xff09;函数原型&#xff08;3&#xff09;参数解释&#x…

Spring MVC学习随笔-控制器(Controller)开发详解:控制器跳转与作用域(一)

学习视频&#xff1a;孙哥说SpringMVC&#xff1a;结合Thymeleaf&#xff0c;重塑你的MVC世界&#xff01;&#xff5c;前所未有的Web开发探索之旅 第五章、SpringMVC控制器开发详解 三 5.1 核心要点 3.流程跳转 5.2 JavaWeb中流程跳转的核心回顾 5.2.1 JavaWeb中流程跳转的核…

网络入门---网络编程初步认识和实践

目录标题 前言准备工作udpserver.hpp成员变量构造函数初始化函数(socket,bind)start函数(recvfrom) udpServer.ccudpClient.hpp构造函数初始化函数run函数(sendto) udpClient.cc测试 前言 在上一篇文章中我们初步的认识了端口号的作用&#xff0c;ip地址和MAC地址在网络通信时…

QT 中 QProgressDialog 进度条窗口 备查

基础API //两个构造函数 QProgressDialog::QProgressDialog(QWidget *parent nullptr, Qt::WindowFlags f Qt::WindowFlags());QProgressDialog::QProgressDialog(const QString &labelText, const QString &cancelButtonText, int minimum, int maximum, QWidget *…

面试 Java 基础八股文十问十答第三期

面试 Java 基础八股文十问十答第三期 作者&#xff1a;程序员小白条&#xff0c;个人博客 ⭐点赞⭐收藏⭐不迷路&#xff01;⭐ 21.说下Java8的Stream流的常用方法 答: forEach遍历、find、match进行匹配reduce进行归约&#xff0c;比如求和&#xff0c;乘&#xff0c;除聚合…