1 处理请求的过程概述
(1)消费端发起TCP连接后,服务提供方的NettyServer的connected方法将被调用;
(2)因为Netty默认的线程模型为All,因此AllChannelHandler类把接收到的所有消息(包括请求事件、响应事件、连接事件、断开事件,心跳事件等)包装成ChannelEventRunnable任务,并将其投递到线程池中;
(3)接着执行线程池中的任务,并最终将调用DubboProtocol的connected方法。
2 处理请求的实现细节
2.1 NettyServer的connected方法被调用
消费端发起TCP连接后,服务提供方的NettyServer的connected方法将被调用。connected方法为NettyServer父类AbstractServer的connected方法。
其中的依次调用关系为:AbstractServer的connected()->AbstractPeer的connected()->ChannelHandler的connected()。具体实现如下所示。
(1)AbstractServer的connected()
public void connected(Channel ch) throws RemotingException {// If the server has entered the shutdown process, reject any new connectionif (this.isClosing() || this.isClosed()) {logger.warn(INTERNAL_ERROR, "unknown error in remoting module", "", "Close new channel " + ch + ", cause: server is closing or has been closed. For example, receive a new connect request while in shutdown process.");ch.close();return;}if (accepts > 0 && getChannelsSize()> accepts) {logger.error(INTERNAL_ERROR, "unknown error in remoting module", "", "Close channel " + ch + ", cause: The server " + ch.getLocalAddress() + " connections greater than max config " + accepts);ch.close();return;}super.connected(ch);}
(2)AbstractPeer的connected()
private final ChannelHandler handler;public void connected(Channel ch) throws RemotingException {if (closed) {return;}handler.connected(ch);}
2.2 消息被投递到线程池中
调用ChannelHandler的connected()时,因为Netty默认的线程模型为All,因此AllChannelHandler类(ChannelHandler的子类)把接收到的所有消息包装成ChannelEventRunnable任务,并将其投递到线程池中。具体实现如下所示。
public class AllChannelHandler extends WrappedChannelHandler {public AllChannelHandler(ChannelHandler handler, URL url) {super(handler, url);}@Overridepublic void connected(Channel channel) throws RemotingException {ExecutorService executor = getSharedExecutorService();try {executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));} catch (Throwable t) {throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);}}@Overridepublic void disconnected(Channel channel) throws RemotingException {ExecutorService executor = getSharedExecutorService();try {executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));} catch (Throwable t) {throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t);}}@Overridepublic void received(Channel channel, Object message) throws RemotingException {ExecutorService executor = getPreferredExecutorService(message);try {executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));} catch (Throwable t) {if(message instanceof Request && t instanceof RejectedExecutionException){sendFeedback(channel, (Request) message, t);return;}throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);}}@Overridepublic void caught(Channel channel, Throwable exception) throws RemotingException {ExecutorService executor = getSharedExecutorService();try {executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));} catch (Throwable t) {throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);}}
}
2.3 执行线程池中的任务
2.3.1 ChannelEventRunnable的run方法
执行线程池中的任务时,将执行ChannelEventRunnable的run方法,其实现细节具体如下所示。
public void run() {InternalThreadLocalMap internalThreadLocalMap = InternalThreadLocalMap.getAndRemove();try {if (state == ChannelState.RECEIVED) {try {handler.received(channel, message);} catch (Exception e) {logger.warn(INTERNAL_ERROR, "unknown error in remoting module", "", "ChannelEventRunnable handle " + state + " operation error, channel is " + channel+ ", message is " + message, e);}} else {switch (state) {case CONNECTED:try {handler.connected(channel);} catch (Exception e) {logger.warn(INTERNAL_ERROR, "unknown error in remoting module", "", "ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);}break;case DISCONNECTED:try {handler.disconnected(channel);} catch (Exception e) {logger.warn(INTERNAL_ERROR, "unknown error in remoting module", "", "ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);}break;case SENT:try {handler.sent(channel, message);} catch (Exception e) {logger.warn(INTERNAL_ERROR, "unknown error in remoting module", "", "ChannelEventRunnable handle " + state + " operation error, channel is " + channel+ ", message is " + message, e);}break;case CAUGHT:try {handler.caught(channel, exception);} catch (Exception e) {logger.warn(INTERNAL_ERROR, "unknown error in remoting module", "", "ChannelEventRunnable handle " + state + " operation error, channel is " + channel+ ", message is: " + message + ", exception is " + exception, e);}break;default:logger.warn(INTERNAL_ERROR, "unknown error in remoting module", "", "unknown state: " + state + ", message is " + message);}}} finally {InternalThreadLocalMap.set(internalThreadLocalMap);}}
2.3.2 执行connected方法
执行handler.connected(channel)时,将调用HeaderExchangeHandler#connected方法,具体实现如下所示。
public void connected(Channel channel) throws RemotingException {ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);handler.connected(exchangeChannel);channel.setAttribute(Constants.CHANNEL_SHUTDOWN_TIMEOUT_KEY,ConfigurationUtils.getServerShutdownTimeout(channel.getUrl().getOrDefaultApplicationModel()));}
接着在执行handler.connected(exchangeChannel)时,将调用DubboProtocol#connected方法,实现如下所示。
public void connected(Channel channel) throws RemotingException {invoke(channel, ON_CONNECT_KEY);
}private void invoke(Channel channel, String methodKey) {Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey);if (invocation != null) {try {if (Boolean.TRUE.toString().equals(invocation.getAttachment(STUB_EVENT_KEY))) {tryToGetStubService(channel, invocation);}received(channel, invocation);} catch (Throwable t) {logger.warn(PROTOCOL_FAILED_REFER_INVOKER, "", "", "Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t);}}
}public void received(Channel channel, Object message) throws RemotingException {if (message instanceof Invocation) {reply((ExchangeChannel) channel, message);} else {super.received(channel, message);}
}
执行接口请求最终将调用DubboProtocol#reply方法,具体实现如下所示。
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {if (!(message instanceof Invocation)) {throw new RemotingException(channel, "Unsupported request: "+ (message == null ? null : (message.getClass().getName() + ": " + message))+ ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());}Invocation inv = (Invocation) message;// 1、获取调用方法对应的InvokerInvoker<?> invoker = inv.getInvoker() == null ? getInvoker(channel, inv) : inv.getInvoker();// switch TCCLif (invoker.getUrl().getServiceModel() != null) {Thread.currentThread().setContextClassLoader(invoker.getUrl().getServiceModel().getClassLoader());}// need to consider backward-compatibility if it's a callbackif (Boolean.TRUE.toString().equals(inv.getObjectAttachmentWithoutConvert(IS_CALLBACK_SERVICE_INVOKE))) {String methodsStr = invoker.getUrl().getParameters().get("methods");boolean hasMethod = false;if (methodsStr == null || !methodsStr.contains(",")) {hasMethod = inv.getMethodName().equals(methodsStr);} else {String[] methods = methodsStr.split(",");for (String method : methods) {if (inv.getMethodName().equals(method)) {hasMethod = true;break;}}}if (!hasMethod) {logger.warn(PROTOCOL_FAILED_REFER_INVOKER, "", "", new IllegalStateException("The methodName " + inv.getMethodName()+ " not found in callback service interface ,invoke will be ignored."+ " please update the api interface. url is:"+ invoker.getUrl()) + " ,invocation is :" + inv);return null;}}// 2、获取上下文对象,并设置对端地址RpcContext.getServiceContext().setRemoteAddress(channel.getRemoteAddress());// 3、执行invoker调用链Result result = invoker.invoke(inv);// 4、返回结果return result.thenApply(Function.identity());
}