简介
目前在HTTP协议请求库中,OKHttp应当是非常火的,使用也非常的简单。网上有很多文章写了关于OkHttp的特点以及使用,而本章主要带领大家阅读OkHttp的源码,让大家对OKhttp的工作原理有所了解,当然源码的代码量是非常大的,这里我只是抓住主线和重点部分,至于细节部分,大家随着我抛出的线来跟进基本是没什么问题的。这篇文章要干嘛,引用一句话:
read the fucking source code
目录:
- OkHttp介绍
- 粗绘请求流程
- RealCall方法execute
- getResponseWithInterceptorChain调用链
- RetryAndFollowUpInterceptor
- ConnectInterceptor获取连接
- CallServerInterceptor网络请求
- RealConnection
- StreamAllocation
- HttpCodec(Http1Codec)
- 同步/异步请求
OkHttp介绍:
特点:
- 支持连接同一地址的连接共享同一个socket(前提服务端支持)
- 支持Socket连接池,减少请求延迟
- 使用拦截器模式,将流程拆分
- 透明的GZIP压缩
粗绘请求流程
注意:这里我选择OkHttp源码版本是 3.8.0。为了方便大家能够和文章同步,最好保持版本一致,我看过老版本和新的版本还是有点不同的。
官网给出的示例
OkHttpClient client = new OkHttpClient();
String run(String url) throws IOException {
Request request = new Request.Builder()
.url(url)
.build();
Response response = client.newCall(request).execute();
return response.body().string();
}
我们就从这里入口,来一步一步的跟进。
- 首先是创建一个OkHttpClient,Http请求工厂,也就是只要需要发Http请求,那都得找他。内部当然后很多的成员变量和方法,这里我们先不做介绍,等用到时再解释。
- 我们继续看
client.newCall(request)
。找到源码
@Override public Call newCall(Request request) {
return new RealCall(this, request, false /* for web socket */);
}
很简单,创建了一个RealCall,这里我就称为一个请求。Request不说大家能理解,里面封装了各种请求的信息。创建过程也很简单,做一些成员变量赋值和初始化。
RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
final EventListener.Factory eventListenerFactory = client.eventListenerFactory();
this.client = client;
this.originalRequest = originalRequest;
this.forWebSocket = forWebSocket;
this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);
// TODO(jwilson): this is unsafe publication and not threadsafe.
this.eventListener = eventListenerFactory.create(this);
}
这里注意retryAndFollowUpInterceptor;
变量,后面会用到。
- 调用了RealCall的
execute()
方法并返回Response结果。
RealCall方法execute
前面我们知道了大致的请求流程,下面我们重点看
@Override public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
try {
client.dispatcher().executed(this);
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} finally {
client.dispatcher().finished(this);
}
}
- 首先我们发现try的前后调用了Dispatcher的方法:
client.dispatcher().executed(this);
client.dispatcher().finished(this);
分别是将Call加入到Dispatcher中的同步队列中,结束后,移除队列。
- 调用getResponseWithInterceptorChain获取Response。
接下来我们就重点看getResponseWithInterceptorChain方法
getResponseWithInterceptorChain调用链
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));
Interceptor.Chain chain = new RealInterceptorChain(
interceptors, null, null, null, 0, originalRequest);
return chain.proceed(originalRequest);
}
这里的代码就很关键了,设计的很巧妙。可能有点绕,这里我讲一下几个关键的类。
Chain
- 获取Request
- 执行proceed
RealInterceptorChain
Chain的实现
- 包含了完成请求需要的类,包括StreamAllocation、HttpCodec、RealConnection、Request等。这里必要重要的就是可以实现了Chain的
request()
来获取Request。 - 控制Interceptor的调用,调用Interceptor的拦截方法intercept后,就封装下一个RealInterceptorChain并指定index。声明下一个将要被调用的Interceptor。这部分逻辑主要在proceed方法中。我们看核心代码
// Call the next interceptor in the chain.
RealInterceptorChain next = new RealInterceptorChain(
interceptors, streamAllocation, httpCodec, connection, index + 1, request);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);
首先会获取当前index的Interceptor。然后执行对应的intercept
方法。同时出入的参数是新创建的RealInterceptorChain。而新创建的RealInterceptorChain对应的index+1。如果执行新创建的RealInterceptorChain的proceed方法,那么interceptors的第index+1个Interceptor的intercept会被执行。依次循环下去。
总结: RealInterceptorChain就是对请求中个中重要对象的封装,执行Interceptor的intercept
的调用,确定下一个RealInterceptorChain。保证所有的Interceptor依次执行intercept
。
Interceptor
前面讲到了RealInterceptorChain会执行Interceptor的intercept方法,同时传入下一个RealInterceptorChain。那么intercept方法究竟做了什么事呢,因为Interceptor的实现很多,这里我们挑一个系统的实现类看看,比如:BridgeInterceptor,这个代码虽然长,但逻辑想对简单
@Override public Response intercept(Chain chain) throws IOException {
Request userRequest = chain.request();
Request.Builder requestBuilder = userRequest.newBuilder();
RequestBody body = userRequest.body();
if (body != null) {
MediaType contentType = body.contentType();
if (contentType != null) {
requestBuilder.header("Content-Type", contentType.toString());
}
long contentLength = body.contentLength();
if (contentLength != -1) {
requestBuilder.header("Content-Length", Long.toString(contentLength));
requestBuilder.removeHeader("Transfer-Encoding");
} else {
requestBuilder.header("Transfer-Encoding", "chunked");
requestBuilder.removeHeader("Content-Length");
}
}
if (userRequest.header("Host") == null) {
requestBuilder.header("Host", hostHeader(userRequest.url(), false));
}
if (userRequest.header("Connection") == null) {
requestBuilder.header("Connection", "Keep-Alive");
}
// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
// the transfer stream.
boolean transparentGzip = false;
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
transparentGzip = true;
requestBuilder.header("Accept-Encoding", "gzip");
}
List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
if (!cookies.isEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies));
}
if (userRequest.header("User-Agent") == null) {
requestBuilder.header("User-Agent", Version.userAgent());
}
Response networkResponse = chain.proceed(requestBuilder.build());
HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
Response.Builder responseBuilder = networkResponse.newBuilder()
.request(userRequest);
if (transparentGzip
&& "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
&& HttpHeaders.hasBody(networkResponse)) {
GzipSource responseBody = new GzipSource(networkResponse.body().source());
Headers strippedHeaders = networkResponse.headers().newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build();
responseBuilder.headers(strippedHeaders);
responseBuilder.body(new RealResponseBody(strippedHeaders, Okio.buffer(responseBody)));
}
return responseBuilder.build();
}
里面逻辑我们现在可能还看不懂,我们看中间最核心的一句话,。Response networkResponse = chain.proceed(requestBuilder.build());
有没有觉得顿时豁然开朗。realChain是传入的参数,而执行proceed方法,就又回到了前面我们讲RealInterceptorChain
的流程。那前后RealInterceptorChain有什么区别呢?那就是index在不断的增加,同时对应的Interceptor也就不同。
那么Interceptor有什么用呢?
我们刚才只关注了中间的chain.proceed(requestBuilder.build());
。而在此前后我们可以做很多的逻辑操作了,比如:
对Request进行一些请求头的判断,处理和完善。对Response进行一些处理,如在有gzip的情况下数据的处理等。
总结:Interceptor这里我称之为拦截器。Okhttp将请求的流程,从封装请求头,获取连接,发请求数据,读请求数据等等。拆分成一个个Interceptor。每一个Interceptor有着自己单一的功能,而下层的Interceptor为上层的Interceptor服务,有没有觉得有点像我们的网络TCP/IP的模型,哈哈。其实这种思想让我们的请求变的更加清晰,并且扩展性很好。每一层也就是Interceptor可以有自己的实现。同时我们可以定义自己的Interceptor。 而Interceptor的顺序执行就由RealInterceptorChain
完成。
到这里我们就讲了整个请求的大体执行框架和模式。这部分一定要好好的理解,方便后面的学习。
RetryAndFollowUpInterceptor
这个拦截器用来做重连接和重定向的。其中逻辑有以下:
创建StreamAllocation
@Override public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
streamAllocation = new StreamAllocation(
client.connectionPool(), createAddress(request.url()), callStackTrace);
int followUpCount = 0;
Response priorResponse = null;
while (true) {
if (canceled) {
streamAllocation.release();
throw new IOException("Canceled");
}
Response response = null;
boolean releaseConnection = true;
try {
response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
...省略剩余代码
看到了吧new StreamAllocation了吧。这里第一个疑惑解决,StreamAllocation的创建地方。这里还要多讲一个地方就是构造参数ConnectionPool。我们看到是从OkHttpClient传了的。而在OkHttpclient创建时候创建了ConnectionPool。
public class OkHttpClient implements Cloneable, Call.Factory, WebSocket.Factory {
...省略
public static final class Builder {
public Builder() {
...省略
connectionPool = new ConnectionPool();
...省略
后面用到ConnectionPool大家就别再疑惑了。
创建StreamAllocation在这里,那当然释放也是在这类里:
失败重连接
while (true) {
if (canceled) {
streamAllocation.release();
throw new IOException("Canceled");
}
Response response = null;
boolean releaseConnection = true;
try {
response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
releaseConnection = false;
} catch (RouteException e) {
// The attempt to connect via a route failed. The request will not have been sent.
if (!recover(e.getLastConnectException(), false, request)) {
throw e.getLastConnectException();
}
releaseConnection = false;
continue;
} catch (IOException e) {
// An attempt to communicate with a server failed. The request may have been sent.
boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
if (!recover(e, requestSendStarted, request)) throw e;
releaseConnection = false;
continue;
} finally {
// We're throwing an unchecked exception. Release any resources.
if (releaseConnection) {
streamAllocation.streamFailed(null);
streamAllocation.release();
}
}
我们看到response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
是在一个无限循环中,如果出现异常,并且满足重连接,就会再次调用。
重定向
Request followUp = followUpRequest(response);
if (followUp == null) {
if (!forWebSocket) {
streamAllocation.release();
}
return response;
}
ConnectInterceptor获取连接
前面我们讲了Interceptor的执行流程。而getResponseWithInterceptorChain方法中添加了一些给定的Interceptor。如:RetryAndFollowUpInterceptor(这个是在创建RealCall时候创建的,前面有提醒大家注意)、BridgeInterceptor(上一节有讲到,主要做一些请求头和响应数据的处理)、CacheInterceptor(看名称知道,处理缓存)、ConnectInterceptor、CallServerInterceptor。按上一节的流程,这些Interceptor会依次被调用。这里我们要重点看最后两个,首先是ConnectInterceptor。
通过名称我们知道主要做连接处理。我们看下源码:
public final class ConnectInterceptor implements Interceptor {
public final OkHttpClient client;
public ConnectInterceptor(OkHttpClient client) {
this.client = client;
}
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
StreamAllocation streamAllocation = realChain.streamAllocation();
// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
RealConnection connection = streamAllocation.connection();
return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
}
估计大家都喜欢看这种源码,代码量很少,简洁。我们主要看中间三行代码:
获取StreamAllocation
StreamAllocation streamAllocation = realChain.streamAllocation();
而streamAllocation是RealInterceptorChain的成员变量,在构造方法中赋值。这里我们就往前找,往直前的Interceport找,看谁构造RealInterceptorChain传递了StreamAllocation。经过我们的一个个查找,在RetryAndFollowUpInterceptor的intercept方法中找到:
获取HttpCodec
HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
获取RealConnection
RealConnection connection = streamAllocation.connection();
HttpCodec和RealConnection这两玩意干啥的,就现在这几行代码我们也看不出来,那就先不管。继续看CallServerInterceptor
CallServerInterceptor网络请求
这个就厉害了。看名称,请求服务。那就是最核心的地方了。撸上代码:
/** This is the last interceptor in the chain. It makes a network call to the server. */
public final class CallServerInterceptor implements Interceptor {
private final boolean forWebSocket;
public CallServerInterceptor(boolean forWebSocket) {
this.forWebSocket = forWebSocket;
}
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
HttpCodec httpCodec = realChain.httpStream();
StreamAllocation streamAllocation = realChain.streamAllocation();
RealConnection connection = (RealConnection) realChain.connection();
Request request = realChain.request();
long sentRequestMillis = System.currentTimeMillis();
httpCodec.writeRequestHeaders(request);
Response.Builder responseBuilder = null;
if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
// If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
// Continue" response before transmitting the request body. If we don't get that, return what
// we did get (such as a 4xx response) without ever transmitting the request body.
if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
httpCodec.flushRequest();
responseBuilder = httpCodec.readResponseHeaders(true);
}
if (responseBuilder == null) {
// Write the request body if the "Expect: 100-continue" expectation was met.
Sink requestBodyOut = httpCodec.createRequestBody(request, request.body().contentLength());
BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
request.body().writeTo(bufferedRequestBody);
bufferedRequestBody.close();
} else if (!connection.isMultiplexed()) {
// If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection from
// being reused. Otherwise we're still obligated to transmit the request body to leave the
// connection in a consistent state.
streamAllocation.noNewStreams();
}
}
httpCodec.finishRequest();
if (responseBuilder == null) {
responseBuilder = httpCodec.readResponseHeaders(false);
}
Response response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
int code = response.code();
if (forWebSocket && code == 101) {
// Connection is upgrading, but we need to ensure interceptors see a non-null response body.
response = response.newBuilder()
.body(Util.EMPTY_RESPONSE)
.build();
} else {
response = response.newBuilder()
.body(httpCodec.openResponseBody(response))
.build();
}
if ("close".equalsIgnoreCase(response.request().header("Connection"))
|| "close".equalsIgnoreCase(response.header("Connection"))) {
streamAllocation.noNewStreams();
}
if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
throw new ProtocolException(
"HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
}
return response;
}
}
1. 从RealInterceptorChain获取相关对象
RealInterceptorChain realChain = (RealInterceptorChain) chain;
HttpCodec httpCodec = realChain.httpStream();
StreamAllocation streamAllocation = realChain.streamAllocation();
RealConnection connection = (RealConnection) realChain.connection();
Request request = realChain.request();
这些变量在上一节中已经介绍,这里只是获取一下。
2. 发送请求头数据
httpCodec.writeRequestHeaders(request);
3. 发送请求体
这里会先判断请求方法以及是否有请求体数据。如果有则发送。
4. 读取响应头
responseBuilder = httpCodec.readResponseHeaders(false);
5. 封装相应内容
int code = response.code();
if (forWebSocket && code == 101) {
// Connection is upgrading, but we need to ensure interceptors see a non-null response body.
response = response.newBuilder()
.body(Util.EMPTY_RESPONSE)
.build();
} else {
response = response.newBuilder()
.body(httpCodec.openResponseBody(response))
.build();
}
6. 判断Connection头部信息是否是close
如果请求头或响应头的Connection值为close。则标识改Connection为noNewStreams。标识不会有新的流。
if ("close".equalsIgnoreCase(response.request().header("Connection"))
|| "close".equalsIgnoreCase(response.header("Connection"))) {
streamAllocation.noNewStreams();
}
至于noNewStreams是用来控制,当前的连接是否能再次被使用,我们后面会讲到。
前面我们讲了CallServerInterceptor的请求流程,但里面有很多的类我们还不清楚是怎么来的,以及干啥用的。接下来我们就讲核心类的用法以及创建和使用流程。
RealConnection
这个类主要负责进行Socket的操作(连接),获取Socket的输入输出流并封装。
建立Socket连接(connect)
while (true) {
try {
if (route.requiresTunnel()) {
connectTunnel(connectTimeout, readTimeout, writeTimeout);
} else {
connectSocket(connectTimeout, readTimeout);
}
establishProtocol(connectionSpecSelector);
break;
} catch (IOException e) {
closeQuietly(socket);
closeQuietly(rawSocket);
socket = null;
rawSocket = null;
source = null;
sink = null;
handshake = null;
protocol = null;
http2Connection = null;
if (routeException == null) {
routeException = new RouteException(e);
} else {
routeException.addConnectException(e);
}
if (!connectionRetryEnabled || !connectionSpecSelector.connectionFailed(e)) {
throw routeException;
}
}
}
我们先简单的看,直接连接socket的方式,大家可以看到是一个无限循环,知道连接成功,或者指定的相关异常抛出则跳出循环。具体哪些异常可以看connectionSpecSelector.connectionFailed(e)
内部的实现。
接下来我们就具体看连接socket的方法connectSocket(connectTimeout, readTimeout)
private void connectSocket(int connectTimeout, int readTimeout) throws IOException {
Proxy proxy = route.proxy();
Address address = route.address();
rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
? address.socketFactory().createSocket()
: new Socket(proxy);
rawSocket.setSoTimeout(readTimeout);
try {
Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
} catch (ConnectException e) {
ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
ce.initCause(e);
throw ce;
}
// The following try/catch block is a pseudo hacky way to get around a crash on Android 7.0
// More details:
// https://github.com/square/okhttp/issues/3245
// https://android-review.googlesource.com/#/c/271775/
try {
source = Okio.buffer(Okio.source(rawSocket));
sink = Okio.buffer(Okio.sink(rawSocket));
} catch (NullPointerException npe) {
if (NPE_THROW_WITH_NULL.equals(npe.getMessage())) {
throw new IOException(npe);
}
}
}
逻辑很清晰,就是建立socket连接,然后封装输入输出流。
- 建立socket连接
Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
进行连接,我们查看实现:
public void connectSocket(Socket socket, InetSocketAddress address,
int connectTimeout) throws IOException {
socket.connect(address, connectTimeout);
}
这就到了我们熟悉的Socket连接了。
- 封装输入输出流
source = Okio.buffer(Okio.source(rawSocket));
sink = Okio.buffer(Okio.sink(rawSocket));
获取并封装Socket输入输出流
我们看一下Okio.source方法:
/**
* Returns a source that reads from {@code socket}. Prefer this over {@link
* #source(InputStream)} because this method honors timeouts. When the socket
* read times out, the socket is asynchronously closed by a watchdog thread.
*/
public static Source source(Socket socket) throws IOException {
if (socket == null) throw new IllegalArgumentException("socket == null");
AsyncTimeout timeout = timeout(socket);
Source source = source(socket.getInputStream(), timeout);
return timeout.source(source);
}
还得继续扒:看source的重载方法:
private static Source source(final InputStream in, final Timeout timeout) {
if (in == null) throw new IllegalArgumentException("in == null");
if (timeout == null) throw new IllegalArgumentException("timeout == null");
return new Source() {
@Override public long read(Buffer sink, long byteCount) throws IOException {
if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
if (byteCount == 0) return 0;
try {
timeout.throwIfReached();
Segment tail = sink.writableSegment(1);
int maxToCopy = (int) Math.min(byteCount, Segment.SIZE - tail.limit);
int bytesRead = in.read(tail.data, tail.limit, maxToCopy);
if (bytesRead == -1) return -1;
tail.limit += bytesRead;
sink.size += bytesRead;
return bytesRead;
} catch (AssertionError e) {
if (isAndroidGetsocknameError(e)) throw new IOException(e);
throw e;
}
}
@Override public void close() throws IOException {
in.close();
}
@Override public Timeout timeout() {
return timeout;
}
@Override public String toString() {
return "source(" + in + ")";
}
};
}
来了,这下清晰了,对输入流做了包装。既然是输入,就只有read方法而么有write方法。而读的逻辑就是讲InputStream的数据存放到Buffer中。
到这里Okio.source(rawSocket)
我们清楚了,把输入流封装成Source。read方法将数据读入到Buffer中。我们接下来继续看Okio.buffer(Okio.source(rawSocket))
外面这个方法:
public static BufferedSource buffer(Source source) {
return new RealBufferedSource(source);
}
有点粗暴,就是new了个RealBufferedSource。而它又是啥玩意呢,它实现了BufferedSource。这分明就是个装饰模式嘛。在原有Source的基础上,多了一些方法。如:readInt、skip等等。那还有啥用呢,我们看read方法:
@Override public long read(Buffer sink, long byteCount) throws IOException {
if (sink == null) throw new IllegalArgumentException("sink == null");
if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
if (closed) throw new IllegalStateException("closed");
if (buffer.size == 0) {
long read = source.read(buffer, Segment.SIZE);
if (read == -1) return -1;
}
long toRead = Math.min(byteCount, buffer.size);
return buffer.read(sink, toRead);
}
这里进行了方法的覆写,数据先读到Buffer里,然后再写到sink里。
至于sink = Okio.buffer(Okio.sink(rawSocket));
我就不讲了,模式一样。只是一个负责读,一个负责写。
StreamAllocation
协调Connections、Streams、Calls之间的关系。包括控制RealConnection的创建,释放,状态的管理。
一个RealConnection中可以包含多个StreamAllocation,默认为1个。
findHealthyConnection
这个方法就比较重要了。找到一个健康可用的RealConnection,通过阅读这个类,我们可以把上面说的几个类的关系搞清楚。先源码:
/**
* Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated
* until a healthy connection is found.
*/
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
throws IOException {
while (true) {
RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
connectionRetryEnabled);
// If this is a brand new connection, we can skip the extensive health checks.
synchronized (connectionPool) {
if (candidate.successCount == 0) {
return candidate;
}
}
// Do a (potentially slow) check to confirm that the pooled connection is still good. If it
// isn't, take it out of the pool and start again.
if (!candidate.isHealthy(doExtensiveHealthChecks)) {
noNewStreams();
continue;
}
return candidate;
}
}
首选进入一个死循环,直到获取一个健康的可用的RealConnection或者有异常抛出。
findConnection》》》》》》》》》》》》》》》》Start
第一步调用到findConnection方法,我们查看该方法
/**
* Returns a connection to host a new stream. This prefers the existing connection if it exists,
* then the pool, finally building a new connection.
*/
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
boolean connectionRetryEnabled) throws IOException {
Route selectedRoute;
synchronized (connectionPool) {
if (released) throw new IllegalStateException("released");
if (codec != null) throw new IllegalStateException("codec != null");
if (canceled) throw new IOException("Canceled");
// Attempt to use an already-allocated connection.
RealConnection allocatedConnection = this.connection;
if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
return allocatedConnection;
}
// Attempt to get a connection from the pool.
Internal.instance.get(connectionPool, address, this, null);
if (connection != null) {
return connection;
}
selectedRoute = route;
}
// If we need a route, make one. This is a blocking operation.
if (selectedRoute == null) {
selectedRoute = routeSelector.next();
}
RealConnection result;
synchronized (connectionPool) {
if (canceled) throw new IOException("Canceled");
// Now that we have an IP address, make another attempt at getting a connection from the pool.
// This could match due to connection coalescing.
Internal.instance.get(connectionPool, address, this, selectedRoute);
if (connection != null) return connection;
// Create a connection and assign it to this allocation immediately. This makes it possible
// for an asynchronous cancel() to interrupt the handshake we're about to do.
route = selectedRoute;
refusedStreamCount = 0;
result = new RealConnection(connectionPool, selectedRoute);
acquire(result);
}
// Do TCP + TLS handshakes. This is a blocking operation.
result.connect(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled);
routeDatabase().connected(result.route());
Socket socket = null;
synchronized (connectionPool) {
// Pool the connection.
Internal.instance.put(connectionPool, result);
// If another multiplexed connection to the same address was created concurrently, then
// release this connection and acquire that one.
if (result.isMultiplexed()) {
socket = Internal.instance.deduplicate(connectionPool, address, this);
result = connection;
}
}
closeQuietly(socket);
return result;
}
先判断当前的
RealConnection allocatedConnection = this.connection;
判断当前连接是否已存在,如果存在且没有标记noNewStreams,则直接返回该连接-
到连接池中寻找匹配的连接
Internal.instance.get(connectionPool, address, this, null);
。这里Internal.instance是一个抽象类中的静态变量,那在哪里实现的呢。我们看到OkHttpClient类。类中第三个static关键字就是instance的实现static { Internal.instance = new Internal() { @Override public void addLenient(Headers.Builder builder, String line) { builder.addLenient(line); } @Override public RealConnection get(ConnectionPool pool, Address address, StreamAllocation streamAllocation, Route route) { return pool.get(address, streamAllocation, route); } ...省略代码 }
这里我们就知道其实最终调用到了ConnectionPool的get方法。我们查看
/** * Returns a recycled connection to {@code address}, or null if no such connection exists. The * route is null if the address has not yet been routed. */ @Nullable RealConnection get(Address address, StreamAllocation streamAllocation, Route route) { assert (Thread.holdsLock(this)); for (RealConnection connection : connections) { if (connection.isEligible(address, route)) { streamAllocation.acquire(connection); return connection; } } return null; }
内部就是循环遍历connections,找到匹配的Connection。至于如何判断,大家查看方法的实现即可。如果找到则直接返回,否则进入下一步
前面在调用ConnectionPool.get方法时候Route参数为空,这一步就是获取一个Route然后再次查找。如果成功就返回
-
经过上面三个步骤后,说明已经没有可用的Connection。那么就得创建一个,
result = new RealConnection(connectionPool, selectedRoute); acquire(result);
-
创建完后调用
acquire
,这个是干啥的呢public void acquire(RealConnection connection) { assert (Thread.holdsLock(connectionPool)); if (this.connection != null) throw new IllegalStateException(); this.connection = connection; connection.allocations.add(new StreamAllocationReference(this, callStackTrace)); }
把当前的StreamAllocation添加到RealConnection。这和我们前面说到的一个RealConnection可能对应多个StreamAllocation。
-
开始进行Socket连接
result.connect(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled); routeDatabase().connected(result.route());
-
将RealConnection添加到connectionPool中
Internal.instance.put(connectionPool, result);
到这里findHealthyConnection执行完毕,结果获取一个可用的RealConnection。
findConnection《《《《《《《《《《《《《《《《《《《《End
继续findHealthyConnection的代码:
synchronized (connectionPool) {
if (candidate.successCount == 0) {
return candidate;
}
}
// Do a (potentially slow) check to confirm that the pooled connection is still good. If it
// isn't, take it out of the pool and start again.
if (!candidate.isHealthy(doExtensiveHealthChecks)) {
noNewStreams();
continue;
}
return candidate;
这里就是检查RealConnection是否正常可用。也就是做个体检isHealthy
,如果发现返回False,那么标记这个RealConnection的noNewStreams为true。此变量标记为true后,代码后面就不要从使用这个RealConnection。何以得知呢?
看到前面第2步,从ConnectionPool调用get方法寻找合适的RealConnection,有一句判断,前面我们没有讲,这里我跟踪一下:
@Nullable RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
assert (Thread.holdsLock(this));
for (RealConnection connection : connections) {
if (connection.isEligible(address, route)) {
streamAllocation.acquire(connection);
return connection;
}
}
return null;
}
看到connection.isEligible(address, route)
这句话,我们进入:
public boolean isEligible(Address address, @Nullable Route route) {
// If this connection is not accepting new streams, we're done.
if (allocations.size() >= allocationLimit || noNewStreams) return false;
// If the non-host fields of the address don't overlap, we're done.
if (!Internal.instance.equalsNonHost(this.route.address(), address)) return false;
.../省略代码
看到noNewStreams了吧, 现在知道他的用处了吧。还有allocations.size() >= allocationLimit
,控制一个RealConnection可以被多少个StreamAllocation持有,这下都清楚了吧。
HttpCodec(Http1Codec)
Http请求和响应的编解码抽象HttpCodec
这是一个接口,定义了编解码的抽象方法。
public interface HttpCodec {
/**
* The timeout to use while discarding a stream of input data. Since this is used for connection
* reuse, this timeout should be significantly less than the time it takes to establish a new
* connection.
*/
int DISCARD_STREAM_TIMEOUT_MILLIS = 100;
/** Returns an output stream where the request body can be streamed. */
Sink createRequestBody(Request request, long contentLength);
/** This should update the HTTP engine's sentRequestMillis field. */
void writeRequestHeaders(Request request) throws IOException;
/** Flush the request to the underlying socket. */
void flushRequest() throws IOException;
/** Flush the request to the underlying socket and signal no more bytes will be transmitted. */
void finishRequest() throws IOException;
/**
* Parses bytes of a response header from an HTTP transport.
*
* @param expectContinue true to return null if this is an intermediate response with a "100"
* response code. Otherwise this method never returns null.
*/
Response.Builder readResponseHeaders(boolean expectContinue) throws IOException;
/** Returns a stream that reads the response body. */
ResponseBody openResponseBody(Response response) throws IOException;
/**
* Cancel this stream. Resources held by this stream will be cleaned up, though not synchronously.
* That may happen later by the connection pool thread.
*/
void cancel();
}
主要就是针对Request和Response的处理。将我们传入的请求Request编码成Http的协议请求,将响应解码成Response。
针对HTTP/1.1的实现Http1Codec
前面讲了HttpCodec的抽象方法。这里就是实现,Http协议也有多个版本,也就对应不同的实现。这里我们就看现在常用的Http/1.1。
而Http1Codec的创建在ConnectInterceptor中。
HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
我们继续跟踪到StreamAllocation的newStream方法
HttpCodec resultCodec = resultConnection.newCodec(client, this);
继续进入:
public HttpCodec newCodec(
OkHttpClient client, StreamAllocation streamAllocation) throws SocketException {
if (http2Connection != null) {
return new Http2Codec(client, streamAllocation, http2Connection);
} else {
socket.setSoTimeout(client.readTimeoutMillis());
source.timeout().timeout(client.readTimeoutMillis(), MILLISECONDS);
sink.timeout().timeout(client.writeTimeoutMillis(), MILLISECONDS);
return new Http1Codec(client, streamAllocation, source, sink);
}
}
两个参数我们已经很熟悉了。重点看到了new Http1Codec(client, streamAllocation, source, sink)
,在这里创建了Http1Codec。而传递的参数source、sink前面我们已经介绍了。在连接Socket后进行输入输出的封装。
Http1Codec核心方法实现
我们在介绍CallServerInterceptor的intercept方法时候,只是粗略的讲了下流程。这里我们将一下和Http1Codec相关的方法。
-
发送请求头
httpCodec.writeRequestHeaders(request);
看到实现
/** * Prepares the HTTP headers and sends them to the server. * * <p>For streaming requests with a body, headers must be prepared <strong>before</strong> the * output stream has been written to. Otherwise the body would need to be buffered! * * <p>For non-streaming requests with a body, headers must be prepared <strong>after</strong> the * output stream has been written to and closed. This ensures that the {@code Content-Length} * header field receives the proper value. */ @Override public void writeRequestHeaders(Request request) throws IOException { String requestLine = RequestLine.get( request, streamAllocation.connection().route().proxy().type()); writeRequest(request.headers(), requestLine); }
requestLine就是HTTP的起始行,内部大家可以自己查看。然后看到
writeRequest
方法:
/** Returns bytes of a request header for sending on an HTTP transport. */
public void writeRequest(Headers headers, String requestLine) throws IOException {
if (state != STATE_IDLE) throw new IllegalStateException("state: " + state);
sink.writeUtf8(requestLine).writeUtf8("\r\n");
for (int i = 0, size = headers.size(); i < size; i++) {
sink.writeUtf8(headers.name(i))
.writeUtf8(": ")
.writeUtf8(headers.value(i))
.writeUtf8("\r\n");
}
sink.writeUtf8("\r\n");
state = STATE_OPEN_REQUEST_BODY;
}
逻辑就很简单了,遍历Headers,将请求头写入到sink中。
- 发送请求体
if (responseBuilder == null) {
// Write the request body if the "Expect: 100-continue" expectation was met.
Sink requestBodyOut = httpCodec.createRequestBody(request, request.body().contentLength());
BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
request.body().writeTo(bufferedRequestBody);
bufferedRequestBody.close();
} else if (!connection.isMultiplexed()) {
// If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection from
// being reused. Otherwise we're still obligated to transmit the request body to leave the
// connection in a consistent state.
streamAllocation.noNewStreams();
}
这里会先判断,如果后请求体的话就发送。看到createRequestBody
方法。
@Override public Sink createRequestBody(Request request, long contentLength) {
if ("chunked".equalsIgnoreCase(request.header("Transfer-Encoding"))) {
// Stream a request body of unknown length.
return newChunkedSink();
}
if (contentLength != -1) {
// Stream a request body of a known length.
return newFixedLengthSink(contentLength);
}
throw new IllegalStateException(
"Cannot stream a request body without chunked encoding or a known content length!");
}
确定发送的请求数据大小是否确定,然后返回对应的Sink实现。接下来看到request.body().writeTo(bufferedRequestBody);
的实现。我们看到request.body()返回是RequestBody,一个抽象类,定义请求体的方法。看到自带的实现有FormBody、MultipartBody。我们挑一个看FormBody。看到writeTo
方法:
@Override public void writeTo(BufferedSink sink) throws IOException {
writeOrCountBytes(sink, false);
}
继续看:
/**
* Either writes this request to {@code sink} or measures its content length. We have one method
* do double-duty to make sure the counting and content are consistent, particularly when it comes
* to awkward operations like measuring the encoded length of header strings, or the
* length-in-digits of an encoded integer.
*/
private long writeOrCountBytes(@Nullable BufferedSink sink, boolean countBytes) {
long byteCount = 0L;
Buffer buffer;
if (countBytes) {
buffer = new Buffer();
} else {
buffer = sink.buffer();
}
for (int i = 0, size = encodedNames.size(); i < size; i++) {
if (i > 0) buffer.writeByte('&');
buffer.writeUtf8(encodedNames.get(i));
buffer.writeByte('=');
buffer.writeUtf8(encodedValues.get(i));
}
if (countBytes) {
byteCount = buffer.size();
buffer.clear();
}
return byteCount;
}
这里就清晰了,将请求体(Form表单)遍历的写出。
- 读取响应头
if (responseBuilder == null) {
responseBuilder = httpCodec.readResponseHeaders(false);
}
进入到实现:
@Override public Response.Builder readResponseHeaders(boolean expectContinue) throws IOException {
if (state != STATE_OPEN_REQUEST_BODY && state != STATE_READ_RESPONSE_HEADERS) {
throw new IllegalStateException("state: " + state);
}
try {
StatusLine statusLine = StatusLine.parse(source.readUtf8LineStrict());
Response.Builder responseBuilder = new Response.Builder()
.protocol(statusLine.protocol)
.code(statusLine.code)
.message(statusLine.message)
.headers(readHeaders());
if (expectContinue && statusLine.code == HTTP_CONTINUE) {
return null;
}
state = STATE_OPEN_RESPONSE_BODY;
return responseBuilder;
} catch (EOFException e) {
// Provide more context if the server ends the stream before sending a response.
IOException exception = new IOException("unexpected end of stream on " + streamAllocation);
exception.initCause(e);
throw exception;
}
}
首先读取起始行statusLine
。我们进入到实现source.readUtf8LineStrict()
看怎么读取的
long newline = indexOf((byte) '\n', 0, scanLength);
if (scanLength < Long.MAX_VALUE
&& request(scanLength) && buffer.getByte(scanLength - 1) == '\r'
&& request(scanLength + 1) && buffer.getByte(scanLength) == '\n') {
return buffer.readUtf8Line(scanLength); // The line was 'limit' UTF-8 bytes followed by \r\n.
}
这里先时候去到\n
的位置。然后看到,起始行的结束是否是\r\n
。最后读取并返回。StatusLine.parse就是解析得到Http的Method、Code、Message。下面读取响应头:
Response.Builder responseBuilder = new Response.Builder()
.protocol(statusLine.protocol)
.code(statusLine.code)
.message(statusLine.message)
.headers(readHeaders());
先将起始行封装到responseBuilder中,然后readHeaders()读取响应头。
/** Reads headers or trailers. */
public Headers readHeaders() throws IOException {
Headers.Builder headers = new Headers.Builder();
// parse the result headers until the first blank line
for (String line; (line = source.readUtf8LineStrict()).length() != 0; ) {
Internal.instance.addLenient(headers, line);
}
return headers.build();
}
这里就是一行行读取响应头,然后添加到Headers中。细节大家跟踪到方法内部查看即可。
- 封装响应体
if (forWebSocket && code == 101) {
// Connection is upgrading, but we need to ensure interceptors see a non-null response body.
response = response.newBuilder()
.body(Util.EMPTY_RESPONSE)
.build();
} else {
response = response.newBuilder()
.body(httpCodec.openResponseBody(response))
.build();
}
先判断响应头,101啥意思呢:服务器将遵从客户的请求转换到另外一种协议(HTTP 1.1新)。所以就不用管响应体了。我们重点看到httpCodec.openResponseBody(response)
@Override public ResponseBody openResponseBody(Response response) throws IOException {
Source source = getTransferStream(response);
return new RealResponseBody(response.headers(), Okio.buffer(source));
}
两个步骤:
- 第一步getTransferStream(response)
private Source getTransferStream(Response response) throws IOException {
if (!HttpHeaders.hasBody(response)) {
return newFixedLengthSource(0);
}
if ("chunked".equalsIgnoreCase(response.header("Transfer-Encoding"))) {
return newChunkedSource(response.request().url());
}
long contentLength = HttpHeaders.contentLength(response);
if (contentLength != -1) {
return newFixedLengthSource(contentLength);
}
// Wrap the input stream from the connection (rather than just returning
// "socketIn" directly here), so that we can control its use after the
// reference escapes.
return newUnknownLengthSource();
}
主要判断响应头Transfer-Encoding
来确定响应体的数据大小是否确定,如果是chunked
则是分块传输,则没有Content-Length。否则可以确定响应体大小。然后返回不同的Source实现。
- 第二步
new RealResponseBody(response.headers(), Okio.buffer(source)
public final class RealResponseBody extends ResponseBody {
private final Headers headers;
private final BufferedSource source;
public RealResponseBody(Headers headers, BufferedSource source) {
this.headers = headers;
this.source = source;
}
@Override public MediaType contentType() {
String contentType = headers.get("Content-Type");
return contentType != null ? MediaType.parse(contentType) : null;
}
@Override public long contentLength() {
return HttpHeaders.contentLength(headers);
}
@Override public BufferedSource source() {
return source;
}
}
这个就是做了一个封装,没什么别的逻辑。
到此为止整个Http的发送和响应就介绍完毕了。
同步/异步请求
先来一张流程图:
文章的开始我们发送http请求直接使用的同步请求
Response response = client.newCall(request).execute();
这样比较粗暴,我们还需要开启线程。如此OkHttp当然也就提供了异步调用方法。
client.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
}
@Override
public void onResponse(Call call, Response response) throws IOException {
}
});
调用也是非常的方便的。整个请求OkHttp会帮我们开启线程,并完成Http请求。接下来我们就分析这块的流程。
newCall方法就不介绍了,我们看到enqueue方法。
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
- 先判断是否已经executed。防止多次调用
- 将任务加入到队里中
client.dispatcher().enqueue(new AsyncCall(responseCallback));
我们进入到enqueue方法。
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
- 先判断当前正在请求的数量是否大于最大请求数,同一个主机的请求是否超过限制
- 如果超过限制,则将AsyncCall任务放到readyAsyncCalls(准备任务)队列中。
- 如果没有超过限制,加入到runningAsyncCalls(运行)队列中,并直接调度执行。
这里我们先看几个变量runningAsyncCalls、readyAsyncCalls:
/** Ready async calls in the order they'll be run. */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
这个写的很清楚,异步就绪队列、异步运行队列,同步运行队列。runningSyncCalls这个在前面同步调用的时候有涉及。剩下两个变量就在这里体现。整体的思路就是,异步调用先判断请求数量是否超限,如果没有直接交给线程池执行;超限就先放到准备队列中。
我们在看到executorService().execute(call);
进入到executorService()方法:
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
这里很明显了一个单例的实现,方法上加上了synchronized关键字。ThreadPoolExecutor是创建一个线程池。
这里有朋友要问了,当请求数量超限制我们只看到了把任务放到准备队列中,那啥时候被调用呢?这里大家先别着急,后面会讲到。
到此我们已经把client.dispatcher().enqueue(new AsyncCall(responseCallback));
enqueue这个方法干了什么事讲清楚了。这里还设计一个类AsyncCall。我们看看:
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
AsyncCall(Callback responseCallback) {
super("OkHttp %s", redactedUrl());
this.responseCallback = responseCallback;
}
String host() {
return originalRequest.url().host();
}
Request request() {
return originalRequest;
}
RealCall get() {
return RealCall.this;
}
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
}
光看这里看不出啥门道,我们得看看父类NamedRunnable
public abstract class NamedRunnable implements Runnable {
protected final String name;
public NamedRunnable(String format, Object... args) {
this.name = Util.format(format, args);
}
@Override public final void run() {
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(name);
try {
execute();
} finally {
Thread.currentThread().setName(oldName);
}
}
protected abstract void execute();
}
嘿!就是个Runnable,run里就做了两个事
- 设置线程名称
- 进行execute()调用
然后我们回到AsyncCall看到execute的实现:
Response response = getResponseWithInterceptorChain();
有没有觉得很眼熟,这个不就是我们前面讲同步调用的时候,通过这个方法完成的请求。
请求成功进行CallBack回调
responseCallback.onResponse(RealCall.this, response)
失败或者发生异常也回调
responseCallback.onFailure(RealCall.this, e);
上面我们就把异步调用的发起和回调讲清楚了,前面我们还有个问题就是准备队列的任务啥时候被执行。
准备就绪队列任务的调度
我们还是看到AsyncCall的execute方法,真正的执行调用时在这个方法中,我们看到最后的finally块
finally {
client.dispatcher().finished(this);
}
跟踪到Dispatcher的finish方法
/** Used by {@code AsyncCall#run} to signal completion. */
void finished(AsyncCall call) {
finished(runningAsyncCalls, call, true);
}
看到重载的方法:
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
这里先把call从队列中移除。然后判断promoteCalls,这里我们知道是true,所以重点看到if (promoteCalls) promoteCalls();
这段代码:
private void promoteCalls() {
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
- 还是先判断正在请求的数量是否超限制
- 遍历readyAsyncCalls,找到符合条件的请求(同一个主机的请求数量是否超限制)。如果找到就从readyAsyncCalls中移除,然后加入到runningAsyncCalls。然后通过线程池获进行调度执行。
前面我们将到同步调用的时候,RealCall的execute()方法的开始有client.dispatcher().executed(this);
方法的结束finally调用了client.dispatcher().finished(this);
。然后调用了Dispatcher的finished(runningSyncCalls, call, false);
方法。这里和异步调用的区别就是最后一个参数为false。
到这里整个同步调用和异步调用我们就串联起来了。