核心方法 getResponseWithInterceptorChain()
internal fun getResponseWithInterceptorChain(): Response {// Build a full stack of interceptors.val interceptors = mutableListOf<Interceptor>()interceptors += client.interceptorsinterceptors += RetryAndFollowUpInterceptor(client)interceptors += BridgeInterceptor(client.cookieJar)interceptors += CacheInterceptor(client.cache)interceptors += ConnectInterceptorif (!forWebSocket) {interceptors += client.networkInterceptors}interceptors += CallServerInterceptor(forWebSocket)val chain = RealInterceptorChain(call = this,interceptors = interceptors,index = 0,exchange = null,request = originalRequest,connectTimeoutMillis = client.connectTimeoutMillis,readTimeoutMillis = client.readTimeoutMillis,writeTimeoutMillis = client.writeTimeoutMillis)var calledNoMoreExchanges = falsetry {val response = chain.proceed(originalRequest)if (isCanceled()) {response.closeQuietly()throw IOException("Canceled")}return response} catch (e: IOException) {calledNoMoreExchanges = truethrow noMoreExchanges(e) as Throwable} finally {if (!calledNoMoreExchanges) {noMoreExchanges(null)}}}
val interceptors = mutableListOf<Interceptor>() //三个模型 interceptors += client.interceptors interceptors += RetryAndFollowUpInterceptor(client) interceptors += BridgeInterceptor(client.cookieJar) interceptors += CacheInterceptor(client.cache) interceptors += ConnectInterceptor if (!forWebSocket) {interceptors += client.networkInterceptors } //一个模型 interceptors += CallServerInterceptor(forWebSocket)
添加网络事件拦截器 Interceptor
val chain = RealInterceptorChain(
call = this,
interceptors = interceptors,
index = 0,
exchange = null,
request = originalRequest,
connectTimeoutMillis = client.connectTimeoutMillis,
readTimeoutMillis = client.readTimeoutMillis,
writeTimeoutMillis = client.writeTimeoutMillis
)
创建Chain
var calledNoMoreExchanges = false
try {
链式执行
//originalRequest 还没有执行的request
val response = chain.proceed(originalRequest)
if (isCanceled()) {
response.closeQuietly()
throw IOException("Canceled")
}
return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw noMoreExchanges(e) as Throwable
} finally {
if (!calledNoMoreExchanges) {
noMoreExchanges(null)
}
}
@Throws(IOException::class)override fun proceed(request: Request): Response {check(index < interceptors.size)calls++if (exchange != null) {check(exchange.finder.sameHostAndPort(request.url)) {"network interceptor ${interceptors[index - 1]} must retain the same host and port"}check(calls == 1) {"network interceptor ${interceptors[index - 1]} must call proceed() exactly once"}}// Call the next interceptor in the chain.val next = copy(index = index + 1, request = request)val interceptor = interceptors[index]@Suppress("USELESS_ELVIS")val response = interceptor.intercept(next) ?: throw NullPointerException("interceptor $interceptor returned null")if (exchange != null) {check(index + 1 >= interceptors.size || next.calls == 1) {"network interceptor $interceptor must call proceed() exactly once"}}check(response.body != null) { "interceptor $interceptor returned a response with no body" }return response}
val response = interceptor.intercept(next) ?: throw NullPointerException(
"interceptor $interceptor returned null")
拦截器开始拦截
拦截器调用process index 会+1 执行后面的拦截器,正面执行完会反向执行
RetryAndFollowUpInterceptor:
class RetryAndFollowUpInterceptor(private val client: OkHttpClient) : Interceptor {@Throws(IOException::class)override fun intercept(chain: Interceptor.Chain): Response {val realChain = chain as RealInterceptorChainvar request = chain.requestval call = realChain.callvar followUpCount = 0var priorResponse: Response? = nullvar newExchangeFinder = truevar recoveredFailures = listOf<IOException>()while (true) {call.enterNetworkInterceptorExchange(request, newExchangeFinder)var response: Responsevar closeActiveExchange = truetry {if (call.isCanceled()) {throw IOException("Canceled")}try {response = realChain.proceed(request)newExchangeFinder = true} catch (e: RouteException) {// The attempt to connect via a route failed. The request will not have been sent.if (!recover(e.lastConnectException, call, request, requestSendStarted = false)) {throw e.firstConnectException.withSuppressed(recoveredFailures)} else {recoveredFailures += e.firstConnectException}newExchangeFinder = falsecontinue} catch (e: IOException) {// An attempt to communicate with a server failed. The request may have been sent.if (!recover(e, call, request, requestSendStarted = e !is ConnectionShutdownException)) {throw e.withSuppressed(recoveredFailures)} else {recoveredFailures += e}newExchangeFinder = falsecontinue}// Attach the prior response if it exists. Such responses never have a body.if (priorResponse != null) {response = response.newBuilder().priorResponse(priorResponse.newBuilder().body(null).build()).build()}val exchange = call.interceptorScopedExchangeval followUp = followUpRequest(response, exchange)if (followUp == null) {if (exchange != null && exchange.isDuplex) {call.timeoutEarlyExit()}closeActiveExchange = falsereturn response}val followUpBody = followUp.bodyif (followUpBody != null && followUpBody.isOneShot()) {closeActiveExchange = falsereturn response}response.body?.closeQuietly()if (++followUpCount > MAX_FOLLOW_UPS) {throw ProtocolException("Too many follow-up requests: $followUpCount")}request = followUppriorResponse = response} finally {call.exitNetworkInterceptorExchange(closeActiveExchange)}}}
错误重试和重定向Follow
while (true)
call.enterNetworkInterceptorExchange(request, newExchangeFinder) 连接前的准备工作,call = RealCall,创建一个ExchangeFinder, 寻找可用的http / ssl 等一个可用连接
RouteException 某条链接线路失败 --->重试 recover 会判断Client 的 retryOnConnectFailure
isRecoverable 是否可以修正的错误 ,在retryOnConnectFailure 为true的前提下
request = followUp 执行下一次请求
若没有出错或者重定向 则返回下一个链式拦截器
BridgeInterceptor:
requestBuilder.header("Content-Type", contentType.toString())}val contentLength = body.contentLength()if (contentLength != -1L) {requestBuilder.header("Content-Length", contentLength.toString())requestBuilder.removeHeader("Transfer-Encoding")} else {requestBuilder.header("Transfer-Encoding", "chunked")requestBuilder.removeHeader("Content-Length")}}if (userRequest.header("Host") == null) {requestBuilder.header("Host", userRequest.url.toHostHeader())}if (userRequest.header("Connection") == null) {requestBuilder.header("Connection", "Keep-Alive")}// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing// the transfer stream.var transparentGzip = falseif (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {transparentGzip = truerequestBuilder.header("Accept-Encoding", "gzip")}
添加head / body /host等
压缩类型 Accept-Encodeing = gzip 自动压缩和解压
CacheInterceptor
override fun intercept(chain: Interceptor.Chain): Response {val call = chain.call()val cacheCandidate = cache?.get(chain.request())val now = System.currentTimeMillis()val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()val networkRequest = strategy.networkRequestval cacheResponse = strategy.cacheResponsecache?.trackResponse(strategy)val listener = (call as? RealCall)?.eventListener ?: EventListener.NONEif (cacheCandidate != null && cacheResponse == null) {// The cache candidate wasn't applicable. Close it.cacheCandidate.body?.closeQuietly()}// If we're forbidden from using the network and the cache is insufficient, fail.if (networkRequest == null && cacheResponse == null) {return Response.Builder().request(chain.request()).protocol(Protocol.HTTP_1_1).code(HTTP_GATEWAY_TIMEOUT).message("Unsatisfiable Request (only-if-cached)").body(EMPTY_RESPONSE).sentRequestAtMillis(-1L).receivedResponseAtMillis(System.currentTimeMillis()).build().also {listener.satisfactionFailure(call, it)}}// If we don't need the network, we're done.if (networkRequest == null) {return cacheResponse!!.newBuilder().cacheResponse(stripBody(cacheResponse)).build().also {listener.cacheHit(call, it)}}if (cacheResponse != null) {listener.cacheConditionalHit(call, cacheResponse)} else if (cache != null) {listener.cacheMiss(call)}var networkResponse: Response? = nulltry {networkResponse = chain.proceed(networkRequest)} finally {// If we're crashing on I/O or otherwise, don't leak the cache body.if (networkResponse == null && cacheCandidate != null) {cacheCandidate.body?.closeQuietly()}}
先查找有没有缓存 有的话直接返回 没有就继续请求
核心类:CacheStrategy
for (i in 0 until headers.size) {val fieldName = headers.name(i)val value = headers.value(i)when {fieldName.equals("Date", ignoreCase = true) -> {servedDate = value.toHttpDateOrNull()servedDateString = value}fieldName.equals("Expires", ignoreCase = true) -> {expires = value.toHttpDateOrNull()}fieldName.equals("Last-Modified", ignoreCase = true) -> {lastModified = value.toHttpDateOrNull()lastModifiedString = value}fieldName.equals("ETag", ignoreCase = true) -> {etag = value}fieldName.equals("Age", ignoreCase = true) -> {ageSeconds = value.toNonNegativeInt(-1)}}}
when {etag != null -> {conditionName = "If-None-Match"conditionValue = etag}lastModified != null -> {conditionName = "If-Modified-Since"conditionValue = lastModifiedString}servedDate != null -> {conditionName = "If-Modified-Since"conditionValue = servedDateString}else -> return CacheStrategy(request, null) // No condition! Make a regular request.}
先拿到可用的 CacheStrategy 有则返回没有就继续请求 然后put缓存
ConnectInterceptor
请求的核心类
@Throws(IOException::class)override fun intercept(chain: Interceptor.Chain): Response {val realChain = chain as RealInterceptorChainval exchange = realChain.call.initExchange(chain)val connectedChain = realChain.copy(exchange = exchange)return connectedChain.proceed(realChain.request)}
internal fun initExchange(chain: RealInterceptorChain): Exchange {synchronized(this) {check(expectMoreExchanges) { "released" }check(!responseBodyOpen)check(!requestBodyOpen)}val exchangeFinder = this.exchangeFinder!!val codec = exchangeFinder.find(client, chain)val result = Exchange(this, eventListener, exchangeFinder, codec)this.interceptorScopedExchange = resultthis.exchange = resultsynchronized(this) {this.requestBodyOpen = truethis.responseBodyOpen = true}if (canceled) throw IOException("Canceled")return result}
initExchange 初始化
val codec = exchangeFinder.find(client, chain)------> 编码 解码 coder & decoder
find = ExChangeFinder 的 find 函数
find源码:
fun find(client: OkHttpClient,chain: RealInterceptorChain): ExchangeCodec {try {val resultConnection = findHealthyConnection(connectTimeout = chain.connectTimeoutMillis,readTimeout = chain.readTimeoutMillis,writeTimeout = chain.writeTimeoutMillis,pingIntervalMillis = client.pingIntervalMillis,connectionRetryEnabled = client.retryOnConnectionFailure,doExtensiveHealthChecks = chain.request.method != "GET")return resultConnection.newCodec(client, chain)} catch (e: RouteException) {trackFailure(e.lastConnectException)throw e} catch (e: IOException) {trackFailure(e)throw RouteException(e)}}
找到可用的连接,源码:
private fun findHealthyConnection(connectTimeout: Int,readTimeout: Int,writeTimeout: Int,pingIntervalMillis: Int,connectionRetryEnabled: Boolean,doExtensiveHealthChecks: Boolean): RealConnection {while (true) {val candidate = findConnection(connectTimeout = connectTimeout,readTimeout = readTimeout,writeTimeout = writeTimeout,pingIntervalMillis = pingIntervalMillis,connectionRetryEnabled = connectionRetryEnabled)// Confirm that the connection is good.if (candidate.isHealthy(doExtensiveHealthChecks)) {return candidate}// If it isn't, take it out of the pool.candidate.noNewExchanges()// Make sure we have some routes left to try. One example where we may exhaust all the routes// would happen if we made a new connection and it immediately is detected as unhealthy.if (nextRouteToTry != null) continueval routesLeft = routeSelection?.hasNext() ?: trueif (routesLeft) continueval routesSelectionLeft = routeSelector?.hasNext() ?: trueif (routesSelectionLeft) continuethrow IOException("exhausted all routes")}}
val candidate = findConnection > if (candidate.isHealthy(doExtensiveHealthChecks)) ( 先拿到一个可用连接 再判断健康与否 ,如果为false 就再重新取
源码:
@Throws(IOException::class)private fun findConnection(connectTimeout: Int,readTimeout: Int,writeTimeout: Int,pingIntervalMillis: Int,connectionRetryEnabled: Boolean): RealConnection {if (call.isCanceled()) throw IOException("Canceled")// Attempt to reuse the connection from the call.val callConnection = call.connection // This may be mutated by releaseConnectionNoEvents()!if (callConnection != null) {var toClose: Socket? = nullsynchronized(callConnection) {if (callConnection.noNewExchanges || !sameHostAndPort(callConnection.route().address.url)) {toClose = call.releaseConnectionNoEvents()}}// If the call's connection wasn't released, reuse it. We don't call connectionAcquired() here// because we already acquired it.if (call.connection != null) {check(toClose == null)return callConnection}// The call's connection was released.toClose?.closeQuietly()eventListener.connectionReleased(call, callConnection)}// We need a new connection. Give it fresh stats.refusedStreamCount = 0connectionShutdownCount = 0otherFailureCount = 0// Attempt to get a connection from the pool.if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {val result = call.connection!!eventListener.connectionAcquired(call, result)return result}// Nothing in the pool. Figure out what route we'll try next.val routes: List<Route>?val route: Routeif (nextRouteToTry != null) {// Use a route from a preceding coalesced connection.routes = nullroute = nextRouteToTry!!nextRouteToTry = null} else if (routeSelection != null && routeSelection!!.hasNext()) {// Use a route from an existing route selection.routes = nullroute = routeSelection!!.next()} else {// Compute a new route selection. This is a blocking operation!var localRouteSelector = routeSelectorif (localRouteSelector == null) {localRouteSelector = RouteSelector(address, call.client.routeDatabase, call, eventListener)this.routeSelector = localRouteSelector}val localRouteSelection = localRouteSelector.next()routeSelection = localRouteSelectionroutes = localRouteSelection.routesif (call.isCanceled()) throw IOException("Canceled")// Now that we have a set of IP addresses, make another attempt at getting a connection from// the pool. We have a better chance of matching thanks to connection coalescing.if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) {val result = call.connection!!eventListener.connectionAcquired(call, result)return result}route = localRouteSelection.next()}// Connect. Tell the call about the connecting call so async cancels work.val newConnection = RealConnection(connectionPool, route)call.connectionToCancel = newConnectiontry {newConnection.connect(connectTimeout,readTimeout,writeTimeout,pingIntervalMillis,connectionRetryEnabled,call,eventListener)} finally {call.connectionToCancel = null}call.client.routeDatabase.connected(newConnection.route())// If we raced another call connecting to this host, coalesce the connections. This makes for 3// different lookups in the connection pool!if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) {val result = call.connection!!nextRouteToTry = routenewConnection.socket().closeQuietly()eventListener.connectionAcquired(call, result)return result}synchronized(newConnection) {connectionPool.put(newConnection)call.acquireConnectionNoEvents(newConnection)}eventListener.connectionAcquired(call, newConnection)return newConnection}
通过五种方式拿到可用连接:
if (call.isCanceled()) throw IOException("Canceled")
val callConnection = call.connection // This may be mutated by releaseConnectionNoEvents()! 判断call里面是否有一个已经建立的连接,如果没有就继续往下执行获取连接
refusedStreamCount = 0 connectionShutdownCount = 0 otherFailureCount = 0// Attempt to get a connection from the pool. if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {val result = call.connection!!eventListener.connectionAcquired(call, result)return result }
尝试获取连接 callAcquirePooledConnection
fun callAcquirePooledConnection(address: Address,call: RealCall,routes: List<Route>?,requireMultiplexed: Boolean): Boolean {for (connection in connections) {synchronized(connection) {if (requireMultiplexed && !connection.isMultiplexed) return@synchronizedif (!connection.isEligible(address, routes)) return@synchronizedcall.acquireConnectionNoEvents(connection)return true}}return false}
对所有连接池里进行循环
if (!connection.isEligible(address, routes)) return@synchronized 是否可用 ,false 则继续取下一个 直到取到可用连接
fun acquireConnectionNoEvents(connection: RealConnection) {connection.assertThreadHoldsLock()check(this.connection == null)this.connection = connectionconnection.calls.add(CallReference(this, callStackTrace))}
this.connection = connection 复制connection
然后创建编码解码器 return resultConnection.newCodec(client, chain)
val result = Exchange(this, eventListener, exchangeFinder, codec)
通过函数判断是否可用
internal fun isEligible(address: Address, routes: List<Route>?): Boolean {assertThreadHoldsLock()// If this connection is not accepting new exchanges, we're done.if (calls.size >= allocationLimit || noNewExchanges) return false// If the non-host fields of the address don't overlap, we're done.if (!this.route.address.equalsNonHost(address)) return false// If the host exactly matches, we're done: this connection can carry the address.if (address.url.host == this.route().address.url.host) {return true // This connection is a perfect match.}// At this point we don't have a hostname match. But we still be able to carry the request if// our connection coalescing requirements are met. See also:// https://hpbn.co/optimizing-application-delivery/#eliminate-domain-sharding// https://daniel.haxx.se/blog/2016/08/18/http2-connection-coalescing/// 1. This connection must be HTTP/2.if (http2Connection == null) return false// 2. The routes must share an IP address.if (routes == null || !routeMatchesAny(routes)) return false// 3. This connection's server certificate's must cover the new host.if (address.hostnameVerifier !== OkHostnameVerifier) return falseif (!supportsUrl(address.url)) return false// 4. Certificate pinning must match the host.try {address.certificatePinner!!.check(address.url.host, handshake()!!.peerCertificates)} catch (_: SSLPeerUnverifiedException) {return false}return true // The caller's address can be carried by this connection.}
if (calls.size >= allocationLimit || noNewExchanges) return false 当前请求的数量是否超过限制 http2 以下为1个
// If the non-host fields of the address don't overlap, we're done.
if (!this.route.address.equalsNonHost(address)) return false
// If the host exactly matches, we're done: this connection can carry the address.
if (address.url.host == this.route().address.url.host) {
return true // This connection is a perfect match.
}
internal fun equalsNonHost(that: Address): Boolean {return this.dns == that.dns &&this.proxyAuthenticator == that.proxyAuthenticator &&this.protocols == that.protocols &&this.connectionSpecs == that.connectionSpecs &&this.proxySelector == that.proxySelector &&this.proxy == that.proxy &&this.sslSocketFactory == that.sslSocketFactory &&this.hostnameVerifier == that.hostnameVerifier &&this.certificatePinner == that.certificatePinner &&this.url.port == that.url.port }
判断是否为统一连接 /代理 主机名 等
http2 判断只要IP地址相同 但是必须判断证书是否相同 就复用连接
判断是否为同一个路径 url
如果没有取到可用连接,则往下执行在获取一次: if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) {
相比上次多了个route 通过RouseSelection 来自于RouteSelect,一个slect 包含多个selection ,一个selection 包含route,最后一个boolean参数 为是否共用同一个连接 host
supportsUrl 函数判断 主机名和端口必须一样 或者符号其他要求
nextRouteToTry = selectRoute ,当连接建立,若有连接已经建立,会缓存当前的,因为如果网络不稳定或者已经建立的无法连接,还会需要当前建立的,基于多路复用
address.dns.lockUp 解析地址
Address 关键参数 uriHost / uriPort 也就是主机名和端口
call 取消 则抛出异常
RouteSelector
httpTunnel 代理https 的一种方式
rawSocket 是实际connetc 的tcp端口
RealConnection
connectTls 连接tls
headShake 验证:
handshake = Handshake(unverifiedHandshake.tlsVersion, unverifiedHandshake.cipherSuite,unverifiedHandshake.localCertificates) {certificatePinner.certificateChainCleaner!!.clean(unverifiedHandshake.peerCertificates,address.url.host) }
tls版本 以及其他信息
raw socket = tcp 连接 socke : https t= ssl socket
isHealthy 是否为健康连接 :判断socket 是否关闭, http2的心跳验证
查找一个可用且健康的连接 findHealthConnection
newCodec 如果是http2 返回http2的编码解码 否则返回http1的,writeUtf-8 ,http1 明文传输,http2通过stream
ExChange ---ConnectChain
CallServerInterceptor---- 发送请求和读取结果,通过stream 和 io ,然后缓存,通过gzip 解压缩,
stetho 网络调试库 fackbook