底层netty通信是异步的,那我们平时调用采取的同步是如何将底层的异步转为同步的呢?
dubbo远程rpc协议和网络框架有多种,我们以默认的dubbo协议、网络框架netty作为切入点.
注意点:debug时将过期时间设置长一点:
调用发送消息
1. DubboInvoker
这个类很重要,因为客户端没有具体的实现都是通过代理实现的调用逻辑,而这个类就是最终的工作者,其内部核心方法如下:
@Overrideprotected Result doInvoke(final Invocation invocation) throws Throwable {// 将Invocation转为RpcInvocation类型RpcInvocation inv = (RpcInvocation) invocation;// 获取方法名final String methodName = RpcUtils.getMethodName(invocation);// 将路径和版本设置为附件inv.setAttachment(PATH_KEY, getUrl().getPath());inv.setAttachment(VERSION_KEY, version);ExchangeClient currentClient;// 获取可用的交换客户端列表List<? extends ExchangeClient> exchangeClients = clientsProvider.getClients();if (exchangeClients.size() == 1) {// 若只有一个客户端则直接使用该客户端currentClient = exchangeClients.get(0);} else {// 若有多个客户端则通过取模操作选择一个客户端currentClient = exchangeClients.get(index.getAndIncrement() % exchangeClients.size());}// 将当前客户端的本地地址设置到RpcContext中RpcContext.getServiceContext().setLocalAddress(currentClient.getLocalAddress());try {// 检查是否是单向调用boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);// 计算超时时间int timeout = RpcUtils.calculateTimeout(getUrl(), invocation, methodName, DEFAULT_TIMEOUT);if (timeout <= 0) {// 若超时时间小于等于0,则返回默认的异步调用结果return AsyncRpcResult.newDefaultAsyncResult(new RpcException(RpcException.TIMEOUT_TERMINATE,"No time left for making the following call: " + invocation.getServiceName() + "."+ RpcUtils.getMethodName(invocation) + ", terminate directly."),invocation);}// 将超时时间设置为附件invocation.setAttachment(TIMEOUT_KEY, String.valueOf(timeout));// 获取数据的大小Integer payload = getUrl().getParameter(PAYLOAD, Integer.class);// 创建Request对象Request request = new Request();if (payload != null) {request.setPayload(payload);}request.setData(inv);request.setVersion(Version.getProtocolVersion());if (isOneway) {// 若为单向调用,则发送请求boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);request.setTwoWay(false);currentClient.send(request, isSent);return AsyncRpcResult.newDefaultAsyncResult(invocation);} else {// 若为双向调用request.setTwoWay(true);// 获取回调执行器ExecutorService executor = getCallbackExecutor(getUrl(), inv);CompletableFuture<AppResponse> appResponseFuture =currentClient.request(request, timeout, executor).thenApply(AppResponse.class::cast);// 保存兼容的Futureif (setFutureWhenSync || ((RpcInvocation) invocation).getInvokeMode() != InvokeMode.SYNC) {FutureContext.getContext().setCompatibleFuture(appResponseFuture);}// 返回异步调用结果AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);result.setExecutor(executor);return result;}} catch (TimeoutException e) {// 超时异常处理throw new RpcException(RpcException.TIMEOUT_EXCEPTION,"Invoke remote method timeout. method: " + RpcUtils.getMethodName(invocation) + ", provider: "+ getUrl() + ", cause: " + e.getMessage(),e);} catch (RemotingException e) {// 远程调用异常处理String remoteExpMsg = "Failed to invoke remote method: " + RpcUtils.getMethodName(invocation)+ ", provider: " + getUrl() + ", cause: " + e.getMessage();if (e.getCause() instanceof IOException && e.getCause().getCause() instanceof SerializationException) {throw new RpcException(RpcException.SERIALIZATION_EXCEPTION, remoteExpMsg, e);} else {throw new RpcException(RpcException.NETWORK_EXCEPTION, remoteExpMsg, e);}}}
其中看一下这行代码:
CompletableFuture<AppResponse> appResponseFuture =currentClient.request(request, timeout, executor).thenApply(AppResponse.class::cast);
会调用到org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeClient#request(java.lang.Object, int, java.util.concurrent.ExecutorService)
@Overridepublic CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor)throws RemotingException {return channel.request(request, timeout, executor);}
然后进入到org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeChannel#request(java.lang.Object, int, java.util.concurrent.ExecutorService)
@Overridepublic CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor)throws RemotingException {if (closed) {throw new RemotingException(this.getLocalAddress(),null,"Failed to send request " + request + ", cause: The channel " + this + " is closed!");}Request req;if (request instanceof Request) {req = (Request) request;} else {// create request.req = new Request();req.setVersion(Version.getProtocolVersion());req.setTwoWay(true);req.setData(request);}DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);try {channel.send(req);} catch (RemotingException e) {future.cancel();throw e;}return future;}
2. DefaultFuture
可以看到这里创建了一个DefaultFuture类,而DefaultFuture继承了CompletableFuture<Object>
其中进入到这行中的newFuture方法
DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
public static DefaultFuture newFuture(Channel channel, Request request, int timeout, ExecutorService executor) {final DefaultFuture future = new DefaultFuture(channel, request, timeout);future.setExecutor(executor);// timeout checktimeoutCheck(future);return future;}
继续进入new DefaultFuture(channel, request, timeout)
private DefaultFuture(Channel channel, Request request, int timeout) {this.channel = channel;this.request = request;this.id = request.getId();this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);// put into waiting map.FUTURES.put(id, this);CHANNELS.put(id, channel);}
重点:FUTURES.put(id, this); CHANNELS.put(id, channel);
将创建出来的放入map中
private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<>()
DefaultFuture类是异步转同步的关键,
当netty监听到有数据返回时,会根据这里的id进行找到对应的DefaultFuture
继续回到org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeChannel#request(java.lang.Object, int, java.util.concurrent.ExecutorService)中的channel.send(req);
它会进入到:
org.apache.dubbo.remoting.transport.AbstractPeer#send
然后继续进入到org.apache.dubbo.remoting.transport.netty4.NettyChannel#send
@Overridepublic void send(Object message, boolean sent) throws RemotingException {// whether the channel is closedsuper.send(message, sent);boolean success = true;int timeout = 0;try {Object outputMessage = message;if (!encodeInIOThread) {ByteBuf buf = channel.alloc().buffer();ChannelBuffer buffer = new NettyBackedChannelBuffer(buf);codec.encode(this, buffer, message);outputMessage = buf;}ChannelFuture future = writeQueue.enqueue(outputMessage).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!(message instanceof Request)) {return;}ChannelHandler handler = getChannelHandler();if (future.isSuccess()) {handler.sent(NettyChannel.this, message);} else {Throwable t = future.cause();if (t == null) {return;}Response response = buildErrorResponse((Request) message, t);handler.received(NettyChannel.this, response);}}});if (sent) {// wait timeout mstimeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);success = future.await(timeout);}Throwable cause = future.cause();if (cause != null) {throw cause;}} catch (Throwable e) {removeChannelIfDisconnected(channel);throw new RemotingException(this,"Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to "+ getRemoteAddress() + ", cause: " + e.getMessage(),e);}if (!success) {throw new RemotingException(this,"Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to "+ getRemoteAddress() + "in timeout(" + timeout + "ms) limit");}}
然后看这行:
handler.sent(NettyChannel.this, message);
然后会进入:org.apache.dubbo.remoting.transport.AbstractPeer#sent
然后进入:org.apache.dubbo.remoting.transport.AbstractChannelHandlerDelegate#sent
然后进入:org.apache.dubbo.remoting.exchange.support.header.HeartbeatHandler#sent
org.apache.dubbo.remoting.transport.dispatcher.WrappedChannelHandler#sent
org.apache.dubbo.remoting.transport.AbstractChannelHandlerDelegate#sent
org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#sent
到这里消息发送出去后,我们还看HeaderExchangeHandler这里的一个重要方法:
@Overridepublic void received(Channel channel, Object message) throws RemotingException {final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);if (message instanceof Request) {// handle request.Request request = (Request) message;if (request.isEvent()) {handlerEvent(channel, request);} else {if (request.isTwoWay()) {handleRequest(exchangeChannel, request);} else {handler.received(exchangeChannel, request.getData());}}} else if (message instanceof Response) {handleResponse(channel, (Response) message);} else if (message instanceof String) {if (isClientSide(channel)) {Exception e = new Exception("Dubbo client can not supported string message: " + message+ " in channel: " + channel + ", url: " + channel.getUrl());logger.error(TRANSPORT_UNSUPPORTED_MESSAGE, "", "", e.getMessage(), e);} else {String echo = handler.telnet(channel, (String) message);if (StringUtils.isNotEmpty(echo)) {channel.send(echo);}}} else {handler.received(exchangeChannel, message);}}
看一下这一行:handleResponse(channel, (Response) message);
最终会进入到org.apache.dubbo.remoting.exchange.support.DefaultFuture#received(org.apache.dubbo.remoting.Channel, org.apache.dubbo.remoting.exchange.Response, boolean)
public static void received(Channel channel, Response response, boolean timeout) {try {DefaultFuture future = FUTURES.remove(response.getId());if (future != null) {Timeout t = future.timeoutCheckTask;if (!timeout) {// decrease Timet.cancel();}future.doReceived(response);shutdownExecutorIfNeeded(future);} else {logger.warn(PROTOCOL_TIMEOUT_SERVER,"","","The timeout response finally returned at "+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))+ ", response status is " + response.getStatus()+ (channel == null? "": ", channel: " + channel.getLocalAddress() + " -> "+ channel.getRemoteAddress())+ ", please check provider side for detailed result.");}} finally {CHANNELS.remove(response.getId());}}
然后进入到org.apache.dubbo.remoting.exchange.support.DefaultFuture#doReceived
private void doReceived(Response res) {if (res == null) {throw new IllegalStateException("response cannot be null");}if (res.getStatus() == Response.OK) {this.complete(res.getResult());} else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));} else if (res.getStatus() == Response.SERIALIZATION_ERROR) {this.completeExceptionally(new SerializationException(res.getErrorMessage()));} else {this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));}}
看一下这行:this.complete(res.getResult());
这就是利用了:
CompletableFuture
类中的 complete()
方法用于手动完成一个异步任务,并设置其结果。通过调用 complete()
方法,可以将一个特定的结果设置到 CompletableFuture
对象中,然后任何等待该异步任务的操作都会得到这个预先设置的结果。
注意:
一旦调用了 complete() 方法,CompletableFuture 对象的状态会立即变为已完成,而且之后任何对该对象的计算都不会再触发异步任务的执行。如果该对象已经处于完成状态,再次调用 complete() 方法不会有任何效果。
如果异步任务已经抛出了异常,调用 complete() 方法将不会有任何效果。此时,可以使用 completeExceptionally(Throwable ex) 方法手动设置异步任务的异常结果。
如果有多个线程同时尝试调用 complete() 方法,只有第一个成功的线程能够设置结果,其他线程的调用将被忽略。
CompletableFuture get()调用会阻塞等待结果,只要执行了complete(T value)就会立即得到结果
那我们接下来会想,那HeaderExchangeHandler中的received是何时被调用的呢?
就是netty监听到有返回值时调用的,会调用到org.apache.dubbo.remoting.transport.netty4.NettyClientHandler#channelRead
然后进入到org.apache.dubbo.remoting.transport.AbstractPeer#received
可以看下图:
最终进入到:org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#received
然后回到org.apache.dubbo.rpc.protocol.dubbo.DubboInvoker#doInvoke
这个方法是org.apache.dubbo.rpc.protocol.AbstractInvoker#invoke调用的,看一下这个方法:
@Overridepublic Result invoke(Invocation inv) throws RpcException {// if invoker is destroyed due to address refresh from registry, let's allow the current invoke to proceedif (isDestroyed()) {logger.warn(PROTOCOL_FAILED_REQUEST,"","","Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, "+ ", dubbo version is " + Version.getVersion()+ ", this invoker should not be used any longer");}RpcInvocation invocation = (RpcInvocation) inv;// prepare rpc invocationprepareInvocation(invocation);// do invoke rpc invocation and return async resultAsyncRpcResult asyncResult = doInvokeAndReturn(invocation);// wait rpc result if syncwaitForResultIfSync(asyncResult, invocation);return asyncResult;}
其中waitForResultIfSync(asyncResult, invocation);就是同步阻塞等待
这个方法中的asyncResult.get(timeout, TimeUnit.MILLISECONDS);会进入到:org.apache.dubbo.rpc.AsyncRpcResult#get(long, java.util.concurrent.TimeUnit)
@Overridepublic Result get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {long deadline = System.nanoTime() + unit.toNanos(timeout);if (executor instanceof ThreadlessExecutor) {ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;try {while (!responseFuture.isDone() && !threadlessExecutor.isShutdown()) {long restTime = deadline - System.nanoTime();if (restTime > 0) {threadlessExecutor.waitAndDrain(deadline);} else {throw new TimeoutException("Timeout after " + unit.toMillis(timeout) + "ms waiting for result.");}}} finally {threadlessExecutor.shutdown();}}long restTime = deadline - System.nanoTime();if (!responseFuture.isDone() && restTime < 0) {throw new TimeoutException("Timeout after " + unit.toMillis(timeout) + "ms waiting for result.");}return responseFuture.get(restTime, TimeUnit.NANOSECONDS);}
方法的最后一行:responseFuture.get(restTime, TimeUnit.NANOSECONDS)就是调用的java.util.concurrent.CompletableFuture#get(long, java.util.concurrent.TimeUnit)
同步阻塞等待结果
这里注意:org.apache.dubbo.rpc.protocol.AbstractInvoker#invoke返回的是AsyncRpcResult类型的结果,那真正将AsyncRpcResult中的result拿出来的是哪里呢?
是org.apache.dubbo.rpc.proxy.InvocationUtil#invoke这个方法中的
invoker.invoke(rpcInvocation).recreate()
我们进入recreate()看一下:org.apache.dubbo.rpc.AsyncRpcResult#recreate
@Overridepublic Object recreate() throws Throwable {RpcInvocation rpcInvocation = (RpcInvocation) invocation;if (InvokeMode.FUTURE == rpcInvocation.getInvokeMode()) {return RpcContext.getClientAttachment().getFuture();} else if (InvokeMode.ASYNC == rpcInvocation.getInvokeMode()) {return createDefaultValue(invocation).recreate();}return getAppResponse().recreate();}
然后先看org.apache.dubbo.rpc.AsyncRpcResult#getAppResponse方法:
public Result getAppResponse() {try {if (responseFuture.isDone()) {return responseFuture.get();}} catch (Exception e) {// This should not happen in normal request process;logger.error(PROXY_ERROR_ASYNC_RESPONSE,"","","Got exception when trying to fetch the underlying result from AsyncRpcResult.");throw new RpcException(e);}return createDefaultValue(invocation);}
responseFuture.get()会拿到org.apache.dubbo.remoting.exchange.support.DefaultFuture#doReceived方法中complete(res.getResult())的值
即AppResponse类型:
然后回到getAppResponse().recreate();再进入到:org.apache.dubbo.rpc.AppResponse#recreate
@Overridepublic Object recreate() throws Throwable {if (exception != null) {// fix issue#619try {Object stackTrace = exception.getStackTrace();if (stackTrace == null) {exception.setStackTrace(new StackTraceElement[0]);}} catch (Exception e) {// ignore}if (Dubbo2CompactUtils.isEnabled()&& Dubbo2RpcExceptionUtils.isRpcExceptionClassLoaded()&& (exception instanceof RpcException)&& !Dubbo2RpcExceptionUtils.getRpcExceptionClass().isAssignableFrom(exception.getClass())) {RpcException recreated = Dubbo2RpcExceptionUtils.newRpcException(((RpcException) exception).getCode(), exception.getMessage(), exception.getCause());if (recreated != null) {recreated.setStackTrace(exception.getStackTrace());throw recreated;}}throw exception;}return result;}
拿到最终想要得到的result。
最后补充一下其中的监听是否超时任务
3. 定时任务
org.apache.dubbo.common.resource.GlobalResourceInitializer
我们看一下DefaultFuture类:
private static final GlobalResourceInitializer<Timer> TIME_OUT_TIMER = new GlobalResourceInitializer<>(() -> new HashedWheelTimer(new NamedThreadFactory("dubbo-future-timeout", true), 30, TimeUnit.MILLISECONDS),DefaultFuture::destroy);
看一下 new HashedWheelTimer方法org.apache.dubbo.common.timer.HashedWheelTimer#HashedWheelTimer(java.util.concurrent.ThreadFactory, long, java.util.concurrent.TimeUnit, int, long)
public HashedWheelTimer(ThreadFactory threadFactory,long tickDuration, TimeUnit unit, int ticksPerWheel,long maxPendingTimeouts) {if (threadFactory == null) {throw new NullPointerException("threadFactory");}if (unit == null) {throw new NullPointerException("unit");}if (tickDuration <= 0) {throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);}if (ticksPerWheel <= 0) {throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);}// Normalize ticksPerWheel to power of two and initialize the wheel.wheel = createWheel(ticksPerWheel);mask = wheel.length - 1;// Convert tickDuration to nanos.this.tickDuration = unit.toNanos(tickDuration);// Prevent overflow.if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {throw new IllegalArgumentException(String.format("tickDuration: %d (expected: 0 < tickDuration in nanos < %d",tickDuration, Long.MAX_VALUE / wheel.length));}workerThread = threadFactory.newThread(worker);this.maxPendingTimeouts = maxPendingTimeouts;if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {reportTooManyInstances();}}
其中:workerThread = threadFactory.newThread(worker);
而work为:Worker worker = new Worker();
然后接着看DefaultFuture类中的org.apache.dubbo.remoting.exchange.support.DefaultFuture#timeoutCheck:
private static void timeoutCheck(DefaultFuture future) {TimeoutCheckTask task = new TimeoutCheckTask(future.getId());future.timeoutCheckTask = TIME_OUT_TIMER.get().newTimeout(task, future.getTimeout(), TimeUnit.MILLISECONDS);}
其中的newTimeout方法:
@Overridepublic Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {if (task == null) {throw new NullPointerException("task");}if (unit == null) {throw new NullPointerException("unit");}long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {pendingTimeouts.decrementAndGet();throw new RejectedExecutionException("Number of pending timeouts ("+ pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "+ "timeouts (" + maxPendingTimeouts + ")");}start();// Add the timeout to the timeout queue which will be processed on the next tick.// During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;// Guard against overflow.if (delay > 0 && deadline < 0) {deadline = Long.MAX_VALUE;}HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);timeouts.add(timeout);return timeout;}
然后看其中的start()方法
public void start() {switch (WORKER_STATE_UPDATER.get(this)) {case WORKER_STATE_INIT:if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {workerThread.start();}break;case WORKER_STATE_STARTED:break;case WORKER_STATE_SHUTDOWN:throw new IllegalStateException("cannot be started once stopped");default:throw new Error("Invalid WorkerState");}// Wait until the startTime is initialized by the worker.while (startTime == 0) {try {startTimeInitialized.await();} catch (InterruptedException ignore) {// Ignore - it will be ready very soon.}}}
workerThread.start();
而Worker是一个Runnable,所以会调用到run()
org.apache.dubbo.common.timer.HashedWheelTimer.Worker#run
@Overridepublic void run() {// Initialize the startTime.startTime = System.nanoTime();if (startTime == 0) {// We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.startTime = 1;}// Notify the other threads waiting for the initialization at start().startTimeInitialized.countDown();do {final long deadline = waitForNextTick();if (deadline > 0) {int idx = (int) (tick & mask);processCancelledTasks();HashedWheelBucket bucket =wheel[idx];transferTimeoutsToBuckets();bucket.expireTimeouts(deadline);tick++;}} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);// Fill the unprocessedTimeouts so we can return them from stop() method.for (HashedWheelBucket bucket : wheel) {bucket.clearTimeouts(unprocessedTimeouts);}for (; ; ) {HashedWheelTimeout timeout = timeouts.poll();if (timeout == null) {break;}if (!timeout.isCancelled()) {unprocessedTimeouts.add(timeout);}}processCancelledTasks();}