newCall 实际上是创建了一个 RealCall 有三个参数:OkHttpClient(通用配置,超时时间等) Request(Http请求所用到的条件,url等) 布尔变量forWebSocket(webSocket是一种应用层的交互方式,可双向交互,一般用不到,除非需要频繁刷新数据,股票等。)
private RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {this.client = client;this.originalRequest = originalRequest;this.forWebSocket = forWebSocket;}
Request会被进行多次封装(所以构造函数里对象被命名为originRequest)
在进行newCall().enqueue()
,实际就是RealCall
的enqueue()
@Override public void enqueue(Callback responseCallback) {synchronized (this) {if (executed) throw new IllegalStateException("Already Executed");executed = true;}//1transmitter.callStart();//2 关键client.dispatcher().enqueue(new AsyncCall(responseCallback));}
分别看1和2,主要看2
public void callStart() {//跟踪程序错误this.callStackTrace = Platform.get().getStackTraceForCloseable("response.body().close()");//eventListener是一个监听器,连接的接入和关闭,对程序进行监听eventListener.callStart(call);}
client.dispatcher()
返回一个Dispatcher
类对象,即线程调度器,然后用这个去进行异步操作,代入参数为一个AsyncCall
void enqueue(AsyncCall call) {synchronized (this) {//1readyAsyncCalls.add(call);// Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to// the same host.//2if (!call.get().forWebSocket) {AsyncCall existingCall = findExistingCallWithHost(call.host());if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);}}//3promoteAndExecute();}
- 1的
readyAsyncCalls
是一个ArrayDeque<AsyncCall>
,存放 准备要执行但还没有执行,然后会在3的promoteAndExecute()
中执行
private boolean promoteAndExecute() {assert (!Thread.holdsLock(this));List<AsyncCall> executableCalls = new ArrayList<>();boolean isRunning;synchronized (this) {for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {AsyncCall asyncCall = i.next();if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity.i.remove();asyncCall.callsPerHost().incrementAndGet();executableCalls.add(asyncCall);runningAsyncCalls.add(asyncCall);}isRunning = runningCallsCount() > 0;}for (int i = 0, size = executableCalls.size(); i < size; i++) {AsyncCall asyncCall = executableCalls.get(i);asyncCall.executeOn(executorService());}return isRunning;}
promoteAndExecute
会挑选那些不会导致超负载的call(不超过AsyncCall对应的maxRequest),放进executableCalls
和runningAsyncCalls
,然后去执行,就是去遍历executableCalls
然后执行。
分别执行就是把调用每一个asyncCall 的 executeOn()
:
void executeOn(ExecutorService executorService) {assert (!Thread.holdsLock(client.dispatcher()));boolean success = false;try {executorService.execute(this);success = true;} catch (RejectedExecutionException e) {InterruptedIOException ioException = new InterruptedIOException("executor rejected");ioException.initCause(e);transmitter.noMoreExchanges(ioException);responseCallback.onFailure(RealCall.this, ioException);} finally {if (!success) {client.dispatcher().finished(this); // This call is no longer running!}}}
核心只有一行:
executorService.execute(this);
这里就已经切换线程了,执行的都是传入的 executorService.对象 的execute()
方法,都会在后台执行
@Override protected void execute() {boolean signalledCallback = false; // 标记回调是否已触发transmitter.timeoutEnter(); // 进入超时处理逻辑try {Response response = getResponseWithInterceptorChain(); // 调用拦截器链获取responsesignalledCallback = true; // 标记回调已触发responseCallback.onResponse(RealCall.this, response); // 调用response的回调函数} catch (IOException e) {if (signalledCallback) {// 不要重复触发回调!Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);} else {responseCallback.onFailure(RealCall.this, e);}} catch (Throwable t) {cancel(); // 取消请求if (!signalledCallback) {IOException canceledException = new IOException("canceled due to " + t);canceledException.addSuppressed(t);responseCallback.onFailure(RealCall.this, canceledException); // 调用失败回调函数}throw t;} finally {client.dispatcher().finished(this); // 请求结束,将请求移出调度队列}}
其中回调函数就是当初我们在应用层所定义的Callback里边定义的onFailure()
和 onResponse()
,然后如果出现异常,会进行调用相应的方法。