okhttp的内部实现通过一个责任链模式完成,将网络请求的各个阶段封装到各个链条中,实现了各层的解耦。
文内源码基于okhttp最新版本4.10.0
我们从发起一个简单的请求开始
OkHttpClient httpClient = new OkHttpClient();
String url = "https://www.baidu.com/";
Request getRequest = new Request.Builder()
.url(url)
.get()
.build();
Call call = httpClient.newCall(getRequest);
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
}
@Override
public void onResponse(Call call, Response response) throws IOException {
}
});
分析源码之前,先看一张详细的流程图,通过这种图来看下内部的逻辑是如何流动的:
创建一个Okhttp请求
OkHttpClient实例化可以直接创建,也可以使用Builder建造者模式进行配置build()即可。Request的创建也是使用建造者模式,这里就不在赘述。
接着我们看HttpClient的newCall方法:
override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)
可见HttpClient的newCall方法获得Call实际是RealCall。RealCall就是准备执行的请求,是对接口Call的实现。其内部持有OkHttpClient实例、Request实例。
再回头看RealCall实现的接口Call:
// 已准备要执行的请求。由于表示单个请求/响应对(流),因此无法执行两次
actual interface Call : Cloneable {
actual fun request(): Request
//同步请求,会阻塞
@Throws(IOException::class)
fun execute(): Response
//异步请求
actual fun enqueue(responseCallback: Callback)
//取消请求,已经完成的不能取消。
actual fun cancel()
actual fun isExecuted(): Boolean
actual fun isCanceled(): Boolean
fun timeout(): Timeout
public actual override fun clone(): Call
actual fun interface Factory {
actual fun newCall(request: Request): Call
}
}
主要是定义请求的执行动作和状态。RealCall对Call的具体实现,在后面执行流程中说明。
请求如何被调度
执行分为同步和异步,这里先从同步请求开始,即RealCall的execute方法:
# RealCall
override fun execute(): Response {
check(executed.compareAndSet(false, true)) { "Already Executed" }
timeout.enter()//超时计时开始
callStart()//回调请求监听器的请求开始
try {
client.dispatcher.executed(this)//放入队列
return getResponseWithInterceptorChain()// 执行请求获取结果
} finally {
client.dispatcher.finished(this)//请求结束
}
}
首先判断如果已经执行,就会抛出异常。这就是一个请求只能执行一次的原因。然后回调请求监听器的请求开始。然后调用Client的调度器Dispathcher的executed方法:
# Dispathcher
@Synchronized internal fun executed(call: RealCall) {
runningSyncCalls.add(call)
}
很简单,请求放入一个双端队列runningSyncCalls中,表示正在执行的同步请求。
然后返回了getResponseWithInterceptorChain()的结果Response,可以猜到,同步请求真真的请求流程就是getResponseWithInterceptorChain方法中。最后请求结束,会走Dispatcher的finished(calls: Deque<T>, call: T)
方法:
# Dispathcher
//结束 异步请求
internal fun finished(call: AsyncCall) {
call.callsPerHost.decrementAndGet()
finished(runningAsyncCalls, call)
}
//结束 同步请求
internal fun finished(call: RealCall) {
finished(runningSyncCalls, call)
}
//异步、同步的结束都会走到这里:从running中移除,并调用promoteAndExecute
private fun <T> finished(calls: Deque<T>, call: T) {
val idleCallback: Runnable?
synchronized(this) {
//从队列中移除
if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
idleCallback = this.idleCallback
}
val isRunning = promoteAndExecute()
if (!isRunning && idleCallback != null) {
idleCallback.run()
}
}
从队列中移除Call,然后执行了promoteAndExecute(),这里为了执行流程的完整性,先不跟进去了,后面会讲到。
到这,我们知道了,同步请求走的是getResponseWithInterceptorChain()方法。
我们在来看异步请求,即RealCall的enqueue方法:
# RealCall
override fun enqueue(responseCallback: Callback) {
check(executed.compareAndSet(false, true)) { "Already Executed" }
callStart()//回调请求监听器的请求开始
client.dispatcher.enqueue(AsyncCall(responseCallback))//请求调度
}
同样先判断是否已请求过,回调请求开始。然后调用Dispatcher的enqueue方法,参数接受的是AsyncCall,AsyncCall继承Runnable(老版本是是继承NamedRunnable,NamedRunnable实现Runnable)。可以想到它是会在线程和线程池执行run方法的。
我们先继续去看Dispatcher的enqueue方法:
# Dispatcher
internal fun enqueue(call: AsyncCall) {
synchronized(this) {
//存入等待执行的队列
readyAsyncCalls.add(call)
// 相同host的请求,共用一个 调用计数
if (!call.call.forWebSocket) {
val existingCall = findExistingCallWithHost(call.host)
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
}
}
promoteAndExecute()
}
//从runningAsyncCalls或者readyAsyncCalls中找到相同host的请求
private fun findExistingCallWithHost(host: String): AsyncCall? {
for (existingCall in runningAsyncCalls) {
if (existingCall.host == host) return existingCall
}
for (existingCall in readyAsyncCalls) {
if (existingCall.host == host) return existingCall
}
return null
}
先把请求放入双端队列readyAsyncCalls中,表示等待执行的异步请求。为啥是等待执行呢?先留一个疑问。接着从正在执行的请求runningAsyncCalls或等待执行的请求readyAsyncCalls中找到是相同host的请求,把callsPerHost重用给当前请求。callsPerHost看名字感觉像是拥有相同host的请求数量,并且注意到类型是AtomicInteger,声明如下:
# RealCall$AsyncCall
@Volatile var callsPerHost = AtomicInteger(0)
所以,相同host的请求是共享callsPerHost的,为了后面判断host并发做准备。
继续看,接着调用了promoteAndExecute(),前面看的finish方法也有调用,这里可以跟进看看了:
# Dispatcher
//调度的核心方法:在控制异步并发的策略基础上,使用线程池执行异步请求
private fun promoteAndExecute(): Boolean {
this.assertThreadDoesntHoldLock()
val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
val i = readyAsyncCalls.iterator()
while (i.hasNext()) {
val asyncCall = i.next()
if (runningAsyncCalls.size >= this.maxRequests) break //最大并发数64
if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue //相同Host最大数5
i.remove()//从等待队列中移除
asyncCall.callsPerHost.incrementAndGet()//Host并发数+1
executableCalls.add(asyncCall)//加入可执行请求的集合
runningAsyncCalls.add(asyncCall)//加入正在执行的异步请求队列
}
isRunning = runningCallsCount() > 0//正在执行的异步/同步请求数 >0
}
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
asyncCall.executeOn(executorService)//可执行的请求
}
return isRunning
}
@Synchronized fun runningCallsCount(): Int = runningAsyncCalls.size + runningSyncCalls.size
遍历readyAsyncCalls,先进行两个检查:正在执行的异步请求runningAsyncCalls数量大于最大并发请求数64就break或者相同host请求的数量大于5就continue。如果检查都通过,就从等待队列中移除,callsPerHost自增1,放入可执行的集合executableCalls,并添加到队列runningAsyncCalls中,表示正在执行的异步请求。这里就解释了一步请求等待队列的意义了,就是为了控制最大并发数的缓冲:异步请求并发数达到64、相同host的异步请求达到5,都要放入等待队列。
这里插入介绍一下双端队列,OkHttp同时用了三个双端队列存储这些请求:
# Dispatcher
//异步任务等待队列
private val readyAsyncCalls = ArrayDeque<AsyncCall>()
//异步任务队列
private val runningAsyncCalls = ArrayDeque<AsyncCall>()
//同步任务队列
private val runningSyncCalls = ArrayDeque<RealCall>()
为什么要使用双端队列?很简单因为网络请求执行顺序跟排队一样,讲究先来后到,新来的请求放队尾,执行请求从对头部取。
说到这LinkedList表示不服,我们知道LinkedList同样也实现了Deque接口,内部是用链表实现的双端队列,那为什么不用LinkedList呢?
实际上这与readyAsyncCalls向runningAsyncCalls转换有关,当执行完一个请求或调用enqueue方法入队新的请求时,会对readyAsyncCalls进行一次遍历,将那些符合条件的等待请求转移到runningAsyncCalls队列中并交给线程池执行。尽管二者都能完成这项任务,但是由于链表的数据结构致使元素离散的分布在内存的各个位置,CPU缓存无法带来太多的便利,另外在垃圾回收时,使用数组结构的效率要优于链表。
回到主题,遍历完后把executableCalls中的请求都走asyncCall.executeOn方法:
# RealCall$AsyncCall
fun executeOn(executorService: ExecutorService) {
client.dispatcher.assertThreadDoesntHoldLock()
var success = false
try {
executorService.execute(this)//在线程池执行asyncCall
success = true
} catch (e: RejectedExecutionException) {
val ioException = InterruptedIOException("executor rejected")
ioException.initCause(e)
noMoreExchanges(ioException)
responseCallback.onFailure(this@RealCall, ioException)//回调失败
} finally {
if (!success) {
client.dispatcher.finished(this) //执行发生异常,结束
}
}
}
上面代码中可以看到将AsyncCall放入线程池中执行,那么Okhttp的线程池再什么地方呢?
# Dispatcher
@get:Synchronized
@get:JvmName("executorService") val executorService: ExecutorService
get() {
if (executorServiceOrNull == null) {
executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
}
return executorServiceOrNull!!
}
这不是一个newCachedThreadPool吗?没错,除了最后一个threadFactory参数之外与newCachedThreadPool一毛一样,只不过是设置了线程名字而已,用于排查问题。
阻塞队列用的SynchronousQueue,它的特点是不存储数据,当添加一个元素时,必须等待一个消费线程取出它,否则一直阻塞,如果当前有空闲线程则直接在这个空闲线程执行,如果没有则新启动一个线程执行任务。通常用于需要快速响应任务的场景,在网络请求要求低延迟的大背景下比较合适。
executeOn方法很简单:使用类似CachedThreadPool的线程池执行请求RealCall。如果执行失败,也会调用Dispatcher的finished(calls: Deque<T>, call: T) 方法。
来看下AsyncCall的run方法:
# RealCall$AsyncCall
override fun run() {
threadName("OkHttp ${redactedUrl()}") {
var signalledCallback = false
timeout.enter()//超时计时开始
try {
val response = getResponseWithInterceptorChain()//执行请求获取结果
signalledCallback = true
responseCallback.onResponse(this@RealCall, response)//回调结果
} catch (e: IOException) {
if (signalledCallback) {
Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
} else {
responseCallback.onFailure(this@RealCall, e)
}
} catch (t: Throwable) {
cancel()
if (!signalledCallback) {
val canceledException = IOException("canceled due to $t")
canceledException.addSuppressed(t)
responseCallback.onFailure(this@RealCall, canceledException)//回调失败
}
throw t
} finally {
client.dispatcher.finished(this)//请求结束
}
}
}
我们发现,这里和 同步请求 就很像了,同样是调用getResponseWithInterceptorChain()方法来获取结果Response,不同点是使用responseCallback把结果回调出去,最后请求结束也是调用了dispatcher的finish方法。
另外,前面说过,finish方法中也调用了promoteAndExecute()方法,说明 同步/异步 请求 结束后 也会重新调度当前的异步请求。
好了,到这里我们把 调度流程 梳理下:
发起 同步 请求后,RealCall使用Dispatcher存入runningSyncCalls,然后使用getResponseWithInterceptorChain()获取结果,最后调用Dispatcher的finish方法结束请求。
发起 异步 请求后,RealCall使用Dispatcher存入readyAsyncCalls,获得host并发数,使用promoteAndExecute()方法 在 控制异步并发 的策略基础上,使用 线程池 执行异步请求(并发控制有包括 最大并发数64、host最大并发数5)。异步请求的执行 也是使用getResponseWithInterceptorChain(),获得结果后回调出去。最后调用Dispatcher的finish方法结束请求。
Dispatcher:调度器,主要是异步请求的并发控制、把异步请求放入线程池执行,实现方法是promoteAndExecute()。 promoteAndExecute()有两处调用:添加异步请求时、同步/异步请求 结束时。
如何执行请求(重点)
通过上面分析指导,无论同步还是异步请求,最终执行都是在RealCall的getResponseWithInterceptorChain()方法,只不过异步请求需要先通过Dispatcher进行并发控制和线程池处理。那么就来看看getResponseWithInterceptorChain():
#RealCall
@Throws(IOException::class)
internal fun getResponseWithInterceptorChain(): Response {
val interceptors = mutableListOf<Interceptor>()
interceptors += client.interceptors //使用者配置的 应用拦截器,最先拦截
interceptors += RetryAndFollowUpInterceptor(client)//重试跟进拦截器
interceptors += BridgeInterceptor(client.cookieJar)//桥拦截器
interceptors += CacheInterceptor(client.cache)//缓存拦截器
interceptors += ConnectInterceptor//连接拦截器
if (!forWebSocket) {
interceptors += client.networkInterceptors//使用者配置的网络拦截器
}
interceptors += CallServerInterceptor(forWebSocket)//请求服务拦截器
//拦截器链
val chain = RealInterceptorChain(
call = this,
interceptors = interceptors,
index = 0,
exchange = null,
request = originalRequest,
connectTimeoutMillis = client.connectTimeoutMillis,
readTimeoutMillis = client.readTimeoutMillis,
writeTimeoutMillis = client.writeTimeoutMillis
)
var calledNoMoreExchanges = false
try {
val response = chain.proceed(originalRequest)//链 开始执行
if (isCanceled()) {
response.closeQuietly()
throw IOException("Canceled")
}
return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw noMoreExchanges(e) as Throwable
} finally {
if (!calledNoMoreExchanges) {
noMoreExchanges(null)
}
}
}
首先是把
- 应用拦截器(外部配置)client.interceptors()
- 重试跟进拦截器RetryAndFollowUpInterceptor
- 桥拦截器BridgeInterceptor
- 缓存拦截器CacheInterceptor
- 连接拦截器ConnectInterceptor
- 网络拦截器(外部配置)client.networkInterceptors()
- 请求服务拦截器CallServerInterceptor
依次添加到集合interceptors中。然后使用interceptors、originalRequest等穿件了拦截器链RealInterceptorChain实例,最后用proceed方法获取到请求的结果Response。
在使用OkHttp过程中,自定义配置的拦截器实际就是应用拦截器:client.interceptors(),是最早被添加到interceptors中。那么到底 拦截器是个啥呢?chain.proceed是如何获取到结果的呢?不着急,我们先看看Interceptor类:
# Interceptor
fun interface Interceptor {
@Throws(IOException::class)
fun intercept(chain: Chain): Response
companion object {
inline operator fun invoke(crossinline block: (chain: Chain) -> Response): Interceptor =
Interceptor { block(it) }
}
//拦截器链
interface Chain {
fun request(): Request
//Chain的核心方法
@Throws(IOException::class)
fun proceed(request: Request): Response
//返回请求执行的连接, 仅网络拦截器可用,应用拦截器就是null。
fun connection(): Connection?
fun call(): Call
fun connectTimeoutMillis(): Int
fun withConnectTimeout(timeout: Int, unit: TimeUnit): Chain
fun readTimeoutMillis(): Int
fun withReadTimeout(timeout: Int, unit: TimeUnit): Chain
fun writeTimeoutMillis(): Int
fun withWriteTimeout(timeout: Int, unit: TimeUnit): Chain
}
}
Interceptor是个接口类,只有一个intercept方法,参数是Chain对象。再注意到 内部接口类Chain -- 拦截器链,有个proceed方法,参数是Request对象,返回值是Response,那么这个方法的实现就是请求的处理过程了。Chain的唯一实现类就是RealInterceptorChain,负责把所有拦截器串联起来,proceed方法就是串联的操作。
上述一系列的拦截器都是Interceptor的实现类,这里先贴出自定义的应用拦截器(其他拦截器的实现暂不去跟进):
new Interceptor() {
@Override
public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
String url = request.url().toString();
Log.i(TAG, "intercept: proceed start: url"+ url+ ", at "+System.currentTimeMillis());
Response response = chain.proceed(request);
ResponseBody body = response.body();
Log.i(TAG, "intercept: proceed end: url"+ url+ ", at "+System.currentTimeMillis());
return response;
}
}
在intercept方法中我们调用chain.proceed方法获取了结果 并在前后打印了一些日志,那这个Chain实例是哪来的呢?intercept方法啥时候被调用的呢?— — 我们再回头看getResponseWithInterceptorChain方法,所有拦截器都被传入RealInterceptorChain,可以猜想到,必定是RealInterceptorChain的proceed方法内部调用了拦截器的intercept方法。 那么就来看看吧:
# RealInterceptorChain
@Throws(IOException::class)
override fun proceed(request: Request): Response {
check(index < interceptors.size)
calls++
if (exchange != null) {
check(exchange.finder.routePlanner.sameHostAndPort(request.url)) {
"network interceptor ${interceptors[index - 1]} must retain the same host and port"
}
check(calls == 1) {
"network interceptor ${interceptors[index - 1]} must call proceed() exactly once"
}
}
// Call the next interceptor in the chain.
val next = copy(index = index + 1, request = request)
val interceptor = interceptors[index]
@Suppress("USELESS_ELVIS")
val response = interceptor.intercept(next) ?: throw NullPointerException(
"interceptor $interceptor returned null")
if (exchange != null) {
check(index + 1 >= interceptors.size || next.calls == 1) {
"network interceptor $interceptor must call proceed() exactly once"
}
}
return response
}
在实例化RealInterceptorChain时 index赋值是0,exchange是null,所以前面三个if都没走进去。然后获取了第一个拦截器,也就是我们配置的应用拦截器,调用了它的interceptor方法,并返回和校验了结果。这里证实了我们猜想。同时注意到,调用 应用拦截器的interceptor方法传入的参数:拦截器链实例next,next就是把index + 1而已,其他参数和当前实例是一样的。也就是说 在我们的应用拦截器中调用的是 next的proceed方法。
进一步,next的proceed方法中 同样会获取interceptors的index=1的拦截器,即RetryAndFollowUpInterceptor实例,然后调用其interceptor方法,参数是index+1即index=2的chain。跟进RetryAndFollowUpInterceptor的代码发现,interceptor方法内部也是有调用chain的proceed方法。这样就会依次传递下去,直到最后一个拦截器CallServerInterceptor。
实际上 除了最后一个拦截器CallServerInterceptor之外,所有拦截器的interceptor方法都调用了 传入 chain的proceed方法。每个拦截器在chain的proceed方法 前后 处理了自己负责的工作。例如我们的应用拦截器,在chain的proceed方法前 打印了request信息的日志,chain的proceed方法获取结果 之后 打印了response信息的日志。每个拦截器interceptor方法在 调用chain的proceed方法时 都是为了获取下一个拦截器处理的response,然后返回给上一个拦截器。
逻辑总结如下图:
这就是 okhttp执行流程的核心了,整体流程如下:
现在来总结下:
- 拦截器链:把原始请求 request 依次 传入到 每个拦截器。拦截器 处理后 把response 反向 依次 回传。
- 拦截器:可以对request进行处理,然后调用index+1的拦截器链proceed方法 获取下一个拦截器处理的结果,接着自己也可以处理这个结果,即: 处理request、chain.proceed、处理response。
拦截器 | 作用 |
---|---|
应用拦截器 | 处理原始请求和最终的响应:可以添加自定义header、通用参数、参数加密、网关接入等等。 |
RetryAndFollowUpInterceptor | 处理错误重试和重定向 |
BridgeInterceptor | 应用层和网络层的桥接拦截器,主要工作是为请求添加cookie、添加固定的header,比如Host、Content-Length、Content-Type、User-Agent等等,然后保存响应结果的cookie,如果响应使用gzip压缩过,则还需要进行解压。 |
CacheInterceptor | 缓存拦截器,获取缓存、更新缓存。如果命中缓存则不会发起网络请求。 |
ConnectInterceptor | 连接拦截器,内部会维护一个连接池,负责连接复用、创建连接(三次握手等等)、释放连接以及创建连接上的socket流。 |
网络拦截器 | 用户自定义拦截器,通常用于监控网络层的数据传输。 |
CallServerInterceptor | 请求拦截器,在前置准备工作完成后,真正发起网络请求,进行IO读写。 |
addInterceptor与addNetworkInterceptor的区别:
二者通常的叫法为应用拦截器和网络拦截器,从整个责任链路来看,应用拦截器是最先执行的拦截器,也就是用户自己设置request属性后的原始请求,而网络拦截器位于ConnectInterceptor和CallServerInterceptor之间,此时网络链路已经准备好,只等待发送请求数据。
- 首先,应用拦截器在RetryAndFollowUpInterceptor和CacheInterceptor之前,所以一旦发生错误重试或者网络重定向,网络拦截器可能执行多次,因为相当于进行了二次请求,但是应用拦截器永远只会触发一次。另外如果在CacheInterceptor中命中了缓存就不需要走网络请求了,因此会存在短路网络拦截器的情况。
- 其次,除了CallServerInterceptor,每个拦截器都应该至少调用一次realChain.proceed方法。实际上在应用拦截器这层可以多次调用proceed方法(本地异常重试)或者不调用proceed方法(中断),但是网络拦截器这层连接已经准备好,可且仅可调用一次proceed方法。
- 最后,从使用场景看,应用拦截器因为只会调用一次,通常用于统计客户端的网络请求发起情况;而网络拦截器一次调用代表了一定会发起一次网络通信,因此通常可用于统计网络链路上传输的数据。
参考: