前言
之前看了好多开源库的源码,奈何工作原因,每次看都是零零碎碎的一段段的源码,没有一个大局观。最近抽空把一些源码重新翻看了一遍,理理大致流程,并做相关的记录,以便以后查看时能一眼就回忆起来。
okhttp使用流程
使用时,无非就是先创建一个okhttpclient的实例,然后再通过client创建Call,再使用call.enqueue()方法执行异步请求或者call.execute()执行同步请求,代码大概就是下面的样子,但凡点进来看到这个文章的,基本上都使用过okhttp,这里不多做赘述。
OkHttpClient okHttpClient = new OkHttpClient.Builder().build();//注释1
Call call = okHttpClient.newCall(new Request());//注释2
//异步
call.enqueue(new Callback() {//注释3
@Override
public void onFailure(@NotNull Call call, @NotNull IOException e) {
}
@Override
public void onResponse(@NotNull Call call, @NotNull Response response) throws IOException {
}
});
// call.execute();//同步
看源码
从使用的入口我们点进去查看源码,注释1中使用build模式创建了一个okhttpclient,可以在build中进行个性化设置,比如超时时间,代理,缓存等。
注释2中newCall创建了一个继承自Call的RealCall,源码如下:
/**
* Prepares the {@code request} to be executed at some point in the future.
*/
@Override
public Call newCall(Request request) {
return RealCall.newRealCall(this, request, false /* for web socket */);
}
通过Realcall的一个静态方法newRealCall来创建RealCall的实例。
在注释3中RealCall会执行enqueue来进行异步请求,源码如下:
@Override
public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
client.dispatcher().enqueue(new AsyncCall(responseCallback));//注释1
}
这个方法中被synchronized包裹的代码是为了确保这个方法只执行一次,如果同一个RealCall实例试图第二次调用这个方法时,就会报错IllegalStateException("Already Executed");,我们需要看的重点是注释1这一行这一行里面使用了okhttpclient的分发器,这个分发器的作用就是将我们的各种Call放到runningCall和readyCall的队列中。
- runningCall:正在运行的队列
- readyCall:就绪(准备运行的)的队列
就此引出两个问题:
- 一个call过来什么时候才应该放到runningCall队列,又是什么时候放到readyCall队列呢?
- 还有就是在readyCall的队列要什么时候才能运行呢?
我们继续往下追踪
client.dispatcher().enqueue(new AsyncCall(responseCallback))执行这个方法后会调用下面的方法
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {//注释1
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
注释1这一行是分拣call实例的核心,上面有两个条件,第一个runningAsyncCalls.size() < maxRequests 表示在runningCall队列中排队的call要小于maxRequests (默认为64);第二个条件runningCallsForHost(call) < maxRequestsPerHost 表示访问相同主机的call的数量要小于maxRequestsPerHost(默认为5),如果同时满足这两个条件就直接放到runningCall中执行,如果不满足就放到readyCall中就绪。
runningCallsForHost 源码如下
/**
* Returns the number of running calls that share a host with {@code call}.
*/
private int runningCallsForHost(AsyncCall call) {
int result = 0;
for (AsyncCall c : runningAsyncCalls) {//遍历整个runningCall队列
if (c.get().forWebSocket) continue;
if (c.host().equals(call.host())) result++;//如果发现和当前call访问相同的主机,就将数量++
}
return result;
}
这样第一个问题就解决了
在放入runningCall队列后,会执行executorService().execute(call); 这里就是获取/创建线程池,并执行runnable(call),其中executorService()方法如下:
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher",
false));
}
return executorService;
}
就是创建了一个线程池,这个线程池是一个高并发的写法,保证了最高的吞吐量。这里大概讲一下,线程池的构造函数中有几个参数第一个是核心线程数,第二个是最大线程数,第三个是空闲线程存活的最长时间,第四个是第三个参数的单位,第五个是队列(一会儿讲这个队列是干嘛的),第六个是线程池的创建工厂。当一个任务需要放到线程池中执行时,首先会放在核心线程中,核心线程数满了以后会放在队列中,队列满了以后,会放在最大线程数中,如果最大也满了,就直接执行拒绝策略。当一个任务(runnable)放到了okhttp线程池中时,首先发现核心线程数为0,那就直接放队列中,这时候发现队列是SynchronousQueue这么个玩意,我们直到SynchronousQueue这个队列是压根没有容器的,然后就找空闲线程啊,一找发现,哎!没有空闲线程,那就重开一个线程来跑这个runnable。
这么写有一个好处,就是假如我们的某个runnable1是一个需要很久才能返回结果,说极端点就是直接一个死循环,永远执行不完,这时候呢下一个runnable2进来了,发现空闲线程没有,核心线程数为0,队列也是一个没容器的玩意儿,那怎么办?就直接new Thread出一个线程自己跑自己的。我们反过来说,如果说我们的队列是有容器的,假如队列的容量为1,runnable1一开始在队列中,然后被线程执行,换句话说,现在队列为空,但是线程已经在跑runnable1的死循环了,你再来一个runnable2也是只能塞到队列中,默默地看runnable1在线程中疯狂死循环,永远也等不到线程空闲的时候。唯一能拯救runnable1的只有再来一个runnable3,当runnable3进入线程池后,发现runnable1在疯狂死循环,runnable2就像小鸟一样被强制囚禁在笼子里,观看runnable1的疯狂死循环,runnable3既进不了笼子,也没有核心线程给他跑,于是线程池就给它(runnable3)新建了一个线程3,等runnable3跑完了以后,线程3就处于空闲,那线程池能让这个线程3闲着吗?当然不行,那线程池就去找啊,找啊,发现runnable2还在队列中没有被执行,于是就吧runnable2交给了线程3来执行,这个时候runnable2才终于被执行了。
那么call它具体是做了什么呢?我们看看它究竟做了啥
刚才讲到这里client.dispatcher().enqueue(new AsyncCall(responseCallback));我们进去看看AsyncCall到底是个啥
final class AsyncCall extends NamedRunnable {
public abstract class NamedRunnable implements Runnable {
protected final String name;
public NamedRunnable(String format, Object... args) {
this.name = Util.format(format, args);
}
@Override
public final void run() {
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(name);
try {
execute();
} finally {
Thread.currentThread().setName(oldName);
}
}
protected abstract void execute();
}
AsyncCall--继承了-->NamedRunnable --继承了-->Runnable,而在NamedRunnable的run方法中又执行了 execute();,所以说,线程池要执行AsyncCall ,那肯定是直接执行AsyncCall 的execute方法,不然你这么写也没意义啊,我们带着猜测再回源码中打探一番
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
AsyncCall(Callback responseCallback) {
super("OkHttp %s", redactedUrl());
this.responseCallback = responseCallback;
}
String host() {
return originalRequest.url().host();
}
Request request() {
return originalRequest;
}
RealCall get() {
return RealCall.this;
}
@Override
protected void execute() {//注释1
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
eventListener.callFailed(RealCall.this, e);
responseCallback.onFailure(RealCall.this, e);
}
} finally {//注释2
client.dispatcher().finished(this);
}
}
}
发现果不其然,在注释1中确实是实现了execute方法,然后在方法内就是一通try catch,但是我们注意一下注释2的这个地方,也就是说,我不管你是请求成功还是失败,你肯定得执行注释2的内容,这个你跑不了,我们再次跟进一下代码,发现Dispatcher.java中有这几个重载的finish方法
//方法1
void finished(AsyncCall call) {
finished(runningAsyncCalls, call, true);
}
/**
* Used by {@code Call#execute} to signal completion.
*/
void finished(RealCall call) {
finished(runningSyncCalls, call, false);
}
//方法3
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
if (promoteCalls) promoteCalls();//注释1
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
很明显, client.dispatcher().finished(this);是直接执行了方法1,然后再执行方法3,其中要关注注释1中的这一行,这一行执行的方法如下
private void promoteCalls() {
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
其中前面的两个return就不多讲了,后面有个for循环,这里面首先遍历了readyCall的队列,发现如果说runningCall队列中有位置被腾出来了,就将readyCall队列中的任务拿出来放到runningCall队列中执行,如果没有那就继续遍历。在这个方法中分别判断了runningCall队列是否是最大,readyCall队列是否为空,遍历循环中又判断了runningCall队列是不是大于了最大值,不得不说到底是大公司的程序员,代码写的十分严谨👍。
到这里就解释了第二个问题:在readyCall的队列要什么时候才能运行呢?
现在我们再看回AsynCall的execute()方法:
@Override
protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();//注释1
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
eventListener.callFailed(RealCall.this, e);
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
点进注释1中的这个方法
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors()); //注释0
interceptors.add(retryAndFollowUpInterceptor);//注释1
interceptors.add(new BridgeInterceptor(client.cookieJar()));/注释2
interceptors.add(new CacheInterceptor(client.internalCache()));/注释3
interceptors.add(new ConnectInterceptor(client));/注释4
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());/注释5
}
interceptors.add(new CallServerInterceptor(forWebSocket));/注释6
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
return chain.proceed(originalRequest);
}
这个方法就是各种okhttp的拦截器
注释0中是用户自定义的拦截器,注释1~6是系统加的拦截器,网络访问的时候,都是靠这些拦截器来实现的。下一篇文章将会详细说明这些拦截器的作用