1、Interceptor
Interceptors是OkHttp整个框架的核心,包含了请求监控、请求重写、调用重试等机制。它主要使用责任链模式,解决请求与请求处理之间的耦合。
1.1 责任链模式
将接收对象放入链中,按照链中顺序让多个对象处理请求。请求者不用知道具体是由谁处理。解决请求与接收之间的耦合,提高灵活性。 责任链负责对请求参数的解析,所有的扩展都是针对链中节点进行扩展。
PS:RetryAndFollowUpInterceptor前面一个拦截器是用户自定义的,networkinterceptor也是自定义。用于拦截被最纯净的request,和response
1.2 Interceptor内部
fun intercept(chain: Chain): Response
companion object {
/**
* Constructs an interceptor for a lambda. This compact syntax is most useful for inline
* interceptors.
*
* ```
* val interceptor = Interceptor { chain: Interceptor.Chain ->
* chain.proceed(chain.request())
* }
* ```
*/
inline operator fun invoke(crossinline block: (chain: Chain) -> Response): Interceptor =
Interceptor { block(it) }
}
定义了一个intercept方法用于实现拦截器的具体业务
interface Chain {
fun request(): Request
@Throws(IOException::class)
fun proceed(request: Request): Response
/**
* Returns the connection the request will be executed on. This is only available in the chains
* of network interceptors; for application interceptors this is always null.
*/
fun connection(): Connection?
fun call(): Call
fun connectTimeoutMillis(): Int
fun withConnectTimeout(timeout: Int, unit: TimeUnit): Chain
fun readTimeoutMillis(): Int
fun withReadTimeout(timeout: Int, unit: TimeUnit): Chain
fun writeTimeoutMillis(): Int
fun withWriteTimeout(timeout: Int, unit: TimeUnit): Chain
}
定义了一个链的接口,用来让整个拦截器链运行起来。
2、getResponseWithInterceptorChain()
上一章我们说到,无论是同步请求,还是异步请求,最后的Response都会通过getResponseWithInterceptorChain()
来获取
internal fun getResponseWithInterceptorChain(): Response {
// Build a full stack of interceptors.
val interceptors = mutableListOf<Interceptor>()
interceptors += client.interceptors
interceptors += RetryAndFollowUpInterceptor(client)
interceptors += BridgeInterceptor(client.cookieJar)
interceptors += CacheInterceptor(client.cache)
interceptors += ConnectInterceptor
if (!forWebSocket) {
interceptors += client.networkInterceptors
}
interceptors += CallServerInterceptor(forWebSocket)
val chain = RealInterceptorChain(
call = this,
interceptors = interceptors,
index = 0,
exchange = null,
request = originalRequest,
connectTimeoutMillis = client.connectTimeoutMillis,
readTimeoutMillis = client.readTimeoutMillis,
writeTimeoutMillis = client.writeTimeoutMillis
)
var calledNoMoreExchanges = false
try {
val response = chain.proceed(originalRequest)
if (isCanceled()) {
response.closeQuietly()
throw IOException("Canceled")
}
return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw noMoreExchanges(e) as Throwable
} finally {
if (!calledNoMoreExchanges) {
noMoreExchanges(null)
}
}
}
首先,定义了一个interceptors集合,并却依次将RetryAndFollowUpInterceptor、BridgeInterceptor、CacheInterceptor、ConnectInterceptor放入。然后创建了一个chain:RealInterceptorChain,设置其索引归零,初始请求,连接时间参数等等。
val response = chain.proceed(originalRequest)
由chain的proceed()
方法,开始第一个拦截器的业务、
2.1 RetryAndFollowUpInterceptor
“重试和重定向拦截器”,看名字可以看出,该拦截器的主要是负责请求失败的时候重新尝试的。但是该拦截器还有一个很关键的前置工作
call.enterNetworkInterceptorExchange(request, newExchangeFinder)
request是原始的请求,newExchangeFinder是一个Boolean参数,从最开始的业务进去,这个Boolean类型是true的。
fun enterNetworkInterceptorExchange(request: Request, newExchangeFinder: Boolean) {
check(interceptorScopedExchange == null)
synchronized(this) {
check(!responseBodyOpen) {
"cannot make a new request because the previous response is still open: " +
"please call response.close()"
}
check(!requestBodyOpen)
}
if (newExchangeFinder) {
this.exchangeFinder = ExchangeFinder(
connectionPool,
createAddress(request.url),
this,
eventListener
)
}
}
也就是说,RetryAndFollowUpInterceptor前置工作,在exchangeFinder 为空的前提下,会先创建一个exchangeFinder 。这个是进行连接的准备工作。
while (true) {
call.enterNetworkInterceptorExchange(request, newExchangeFinder)
var response: Response
var closeActiveExchange = true
try {
if (call.isCanceled()) {
throw IOException("Canceled")
}
try {
response = realChain.proceed(request)
newExchangeFinder = true
} catch (e: RouteException) {
// The attempt to connect via a route failed. The request will not have been sent.
if (!recover(e.lastConnectException, call, request, requestSendStarted = false)) {
throw e.firstConnectException.withSuppressed(recoveredFailures)
} else {
recoveredFailures += e.firstConnectException
}
newExchangeFinder = false
continue
} catch (e: IOException) {
// An attempt to communicate with a server failed. The request may have been sent.
if (!recover(e, call, request, requestSendStarted = e !is ConnectionShutdownException)) {
throw e.withSuppressed(recoveredFailures)
} else {
recoveredFailures += e
}
newExchangeFinder = false
continue
}
// Attach the prior response if it exists. Such responses never have a body.
if (priorResponse != null) {
response = response.newBuilder()
.priorResponse(priorResponse.newBuilder()
.body(null)
.build())
.build()
}
val exchange = call.interceptorScopedExchange
val followUp = followUpRequest(response, exchange)
if (followUp == null) {
if (exchange != null && exchange.isDuplex) {
call.timeoutEarlyExit()
}
closeActiveExchange = false
return response
}
val followUpBody = followUp.body
if (followUpBody != null && followUpBody.isOneShot()) {
closeActiveExchange = false
return response
}
response.body?.closeQuietly()
if (++followUpCount > MAX_FOLLOW_UPS) {
throw ProtocolException("Too many follow-up requests: $followUpCount")
}
request = followUp
priorResponse = response
} finally {
call.exitNetworkInterceptorExchange(closeActiveExchange)
}
}
可以看到,在 response = realChain.proceed(request) 之后,Interceptor对错误检查,通过recover()
的方法,把不能重连的错误筛选出来,再确定能过重连或者重定向的时候,通过while循环,重新开始责任链的业务。
2.2 BridgeInterceptor
这个Interceptor主要负责对请求信息的加工,比如加"Content-Length"啊,"Host"啊等等,后面收到response同样是对这些信息处理,去除多余的文本。这里就不对这个拦截器做过多的解释。
2.3 CacheInterceptor
同样的,这个Interceptor是负责做缓存处理的,代码相对独立,以后如果有时间再更新。
2.4 ConnectInterceptor
ConnectInterceptor,真正的发起连接的拦截器。
object ConnectInterceptor : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
val exchange = realChain.call.initExchange(chain)
val connectedChain = realChain.copy(exchange = exchange)
return connectedChain.proceed(realChain.request)
}
}
代码看起来很简单,但是一点都不简单。
internal fun initExchange(chain: RealInterceptorChain): Exchange {
synchronized(this) {
check(expectMoreExchanges) { "released" }
check(!responseBodyOpen)
check(!requestBodyOpen)
}
val exchangeFinder = this.exchangeFinder!!
val codec = exchangeFinder.find(client, chain)
val result = Exchange(this, eventListener, exchangeFinder, codec)
this.interceptorScopedExchange = result
this.exchange = result
synchronized(this) {
this.requestBodyOpen = true
this.responseBodyOpen = true
}
if (canceled) throw IOException("Canceled")
return result
}
fun acquireConnectionNoEvents(connection: RealConnection) {
connection.assertThreadHoldsLock()
check(this.connection == null)
this.connection = connection
connection.calls.add(CallReference(this, callStackTrace))
}
通过initExchange(chain)
方法 创建一个带连接的codec(coder & decoder编码解码器),然后通过Exchange()
进行流操作。PS:codec为读写工具,Exchange为读写管理员
fun find(
client: OkHttpClient,
chain: RealInterceptorChain
): ExchangeCodec {
try {
val resultConnection = findHealthyConnection(
connectTimeout = chain.connectTimeoutMillis,
readTimeout = chain.readTimeoutMillis,
writeTimeout = chain.writeTimeoutMillis,
pingIntervalMillis = client.pingIntervalMillis,
connectionRetryEnabled = client.retryOnConnectionFailure,
doExtensiveHealthChecks = chain.request.method != "GET"
)
return resultConnection.newCodec(client, chain)
} catch (e: RouteException) {
trackFailure(e.lastConnectException)
throw e
} catch (e: IOException) {
trackFailure(e)
throw RouteException(e)
}
创建一个可用的Codec有两步,一通过findHealthyConnection()
寻找一个可用连接,二通过newCodec()
创建一个可用的Codec。(也就是到底是通过http1还是http2来读写)
val candidate = findConnection(
connectTimeout = connectTimeout,
readTimeout = readTimeout,
writeTimeout = writeTimeout,
pingIntervalMillis = pingIntervalMillis,
connectionRetryEnabled = connectionRetryEnabled
)
findHealthyConnection()
通过findConnection()
寻找连接,然后检查连接是否健康,如果不健康就再取,健康就跳出while循环
2.4.1 findConnection()
// 我们需要一个新的连接。给它标记新的状态。
refusedStreamCount = 0
connectionShutdownCount = 0
otherFailureCount = 0
// 尝试从池中获取连接。
if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {
val result = call.connection!!
eventListener.connectionAcquired(call, result)
return result
OkHttp是有连接池的。所有优先考虑从连接池中取资源,避免浪费。通过
callAcquirePooledConnection()
尝试取连接
fun callAcquirePooledConnection(
address: Address,
call: RealCall,
routes: List<Route>?,
requireMultiplexed: Boolean
): Boolean {
for (connection in connections) {
synchronized(connection) {
if (requireMultiplexed && !connection.isMultiplexed) return@synchronized
if (!connection.isEligible(address, routes)) return@synchronized
call.acquireConnectionNoEvents(connection)
return true
}
}
return false
}
第一个if判断是否可以多路复用,第一次从池中获取,requireMultiplexed为false
第二个if判断连接是否可以的。 call.acquireConnectionNoEvents(connection)将连接给到当前call。isEligible()
会比对地址端口等等,会拿http2连接合并的连接,比对证书等等。
如果第一次从池中没有拿到那就再拿一次
connectionPool.callAcquirePooledConnection(address, call, routes, false)
注意,这次的请求是加上了routes的属性,这里可以拿到更加广泛的连接,比如http2的连接合并过的连接,这里是之前提到的isEligible()
的功能之一,这里必须是有routes,才会尝试拿http2的连接合并过的连接,详细业务可以看isEligible()
内。
如果这次也没有拿到连接,这时候findConnection()
会尝试建立连接。
val newConnection = RealConnection(connectionPool, route)
call.connectionToCancel = newConnection
try {
newConnection.connect(
connectTimeout,
readTimeout,
writeTimeout,
pingIntervalMillis,
connectionRetryEnabled,
call,
eventListener
)
} finally {
call.connectionToCancel = null
}
注意,此次创建connect后不会直接建立连接!而是会再一次callAcquirePooledConnection()
因为存在一种极端情况,同时访问同一域名下得不同地址,如 https://xxxx.com/1 和 https://xxxx.com/2,如果同时建立两个连接会造成资源浪费,所以,这种情况下,谁先创建好connect并却放入连接池的,就用谁的连接。同时此次的callAcquirePooledConnection()
是只拿多路复用的连接。
以上的操作,都是在call没有connection的情况下发生的。还有一种可能,call已经有connection的,经历重定向,或者重现,又来到了这里。这个时候
val callConnection = call.connection // This may be mutated by releaseConnectionNoEvents()!
if (callConnection != null) {
var toClose: Socket? = null
synchronized(callConnection) {
if (callConnection.noNewExchanges || !sameHostAndPort(callConnection.route().address.url)) {
toClose = call.releaseConnectionNoEvents()
}
}
如果connection 不适合这次新请求,那就丢弃连接
if (call.connection != null) {
check(toClose == null)
return callConnection
}
如果connection 适合这次新请求
综上所述,整个连接所需要的东西都在ExchangeFinder里面做处理,所以这个时候回归RetryAndFollowUpInterceptor可以发现,如果在做重连时,newExchangeFinder的值为false,直接复用上次可用的exchangeFinder。
connect()
看完了OKHTTP的连接策略,我们再来看连接本身的的业务
newConnection.connect(
connectTimeout,
readTimeout,
writeTimeout,
pingIntervalMillis,
connectionRetryEnabled,
call,
eventListener
)
可以看的到connect()
方法,提供了Tunnel()
(http代理https)方法,connectSocket()
方法建立TCP连接,establishProtocol()
方法建立HTTP连接,connectTls()
建立TLS连接、如果要做不校验证书的HTTPS功能,主要就是修改这里的逻辑。
2.5 ConnectInterceptor
CallServerInterceptor最后一个拦截器,实际工作就是操控你的ExChange进行对读写工作,如果有response,就从最后一个拦截器开始往第一个拦截器走,并却做拦截器的后置工作。