Okhttp的源码分析
Okhttp的线程池和高并发
Okhttp链接池的使用
Okhttp的缓存机制
Okhttp的责任链模式
责任链模式
建议安装目录插件食用
采用责任链模式,意思是用来处理相关事务责任的一条执行链,链上拥有若干节点,,如果某个节点处理完了就可以根据实际业务需求传递给下一个节点继续处理或者返回处理完毕。举一个百科上的例子:
假天数如果是半天到1天,可能直接项目经理批准即可;
如果是1到3天的假期,需要项目主管批准;
如果是3天到30天,则需要部门经理审批;
Okhttp责任链模式浅析
而在okhttp中,一共有五个拦截器,其分别为
- 重试拦截器 (RetryAndFollowUpInterceptor)
- 基础的拦截器(BridgeInterceptor)
- 缓存拦截器 (CacheInterceptor)
- 连接的拦截器(ConnectInterceptor)
- CallServerInterceptor
其工作的流程图大体如下所示:
源码分析
重试拦截器 (RetryAndFollowUpInterceptor)
-
首先看一下最重要的intercept方法:其实现了网络请求失败后,在一些必要的条件下,会重新进行网络请求 主要做了
- 从chain中获取request,实例化StreamAllocation
- 进入while循环,将请求传递给下一个拦截器,等待response
- 当遇到RouteException和IOException的时候会在抛出异常后从新再proceed(向拦截器传递该request)
- 当((RouteException || IOException)&&ecover==false)的时候才会跳出死循环,放弃继续传递
- 下面的几个 if则是判断何时重传,并设计了重传的计数器
@Override public Response intercept(Chain chain) throws IOException { Request request = chain.request(); streamAllocation = new StreamAllocation( client.connectionPool(), createAddress(request.url()), callStackTrace); int followUpCount = 0; Response priorResponse = null; while (true) { if (canceled) { streamAllocation.release(); throw new IOException("Canceled"); } Response response = null; boolean releaseConnection = true; try { response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null); releaseConnection = false; } catch (RouteException e) { // The attempt to connect via a route failed. The request will not have been sent. if (!recover(e.getLastConnectException(), false, request)) { throw e.getLastConnectException(); } releaseConnection = false; continue; } catch (IOException e) { // An attempt to communicate with a server failed. The request may have been sent. boolean requestSendStarted = !(e instanceof ConnectionShutdownException); if (!recover(e, requestSendStarted, request)) throw e; releaseConnection = false; continue; } finally { // We're throwing an unchecked exception. Release any resources. if (releaseConnection) { streamAllocation.streamFailed(null); streamAllocation.release(); } } // Attach the prior response if it exists. Such responses never have a body. if (priorResponse != null) { response = response.newBuilder() .priorResponse(priorResponse.newBuilder() .body(null) .build()) .build(); } Request followUp = followUpRequest(response); if (followUp == null) { if (!forWebSocket) { streamAllocation.release(); } return response; } closeQuietly(response.body()); if (++followUpCount > MAX_FOLLOW_UPS) { streamAllocation.release(); throw new ProtocolException("Too many follow-up requests: " + followUpCount); } if (followUp.body() instanceof UnrepeatableRequestBody) { streamAllocation.release(); throw new HttpRetryException("Cannot retry streamed HTTP body", response.code()); } if (!sameConnection(response, followUp.url())) { streamAllocation.release(); streamAllocation = new StreamAllocation( client.connectionPool(), createAddress(followUp.url()), callStackTrace); } else if (streamAllocation.codec() != null) { throw new IllegalStateException("Closing the body of " + response + " didn't close its backing stream. Bad interceptor?"); } request = followUp; priorResponse = response; } }
-
okhttp通过followUpRequest方法实现了响应状态码的功能,包括重定向等,接下来详细看下其重定向功能的实现
- 获取重定向地址 location
- 重新创建和拼接请求新的Request
- 重新将Request进行封装,之后发送网络请求并返回
private Request followUpRequest(Response userResponse) throws IOException { if (userResponse == null) throw new IllegalStateException(); Connection connection = streamAllocation.connection(); Route route = connection != null ? connection.route() : null; int responseCode = userResponse.code(); final String method = userResponse.request().method(); switch (responseCode) { case HTTP_PROXY_AUTH: Proxy selectedProxy = route != null ? route.proxy() : client.proxy(); if (selectedProxy.type() != Proxy.Type.HTTP) { throw new ProtocolException("Received HTTP_PROXY_AUTH (407) code while not using proxy"); } return client.proxyAuthenticator().authenticate(route, userResponse); case HTTP_UNAUTHORIZED: return client.authenticator().authenticate(route, userResponse); case HTTP_PERM_REDIRECT: case HTTP_TEMP_REDIRECT: // "If the 307 or 308 status code is received in response to a request other than GET // or HEAD, the user agent MUST NOT automatically redirect the request" if (!method.equals("GET") && !method.equals("HEAD")) { return null; } // fall-through case HTTP_MULT_CHOICE: case HTTP_MOVED_PERM: case HTTP_MOVED_TEMP: case HTTP_SEE_OTHER: // Does the client allow redirects? if (!client.followRedirects()) return null; String location = userResponse.header("Location"); if (location == null) return null; HttpUrl url = userResponse.request().url().resolve(location); // Don't follow redirects to unsupported protocols. if (url == null) return null; // If configured, don't follow redirects between SSL and non-SSL. boolean sameScheme = url.scheme().equals(userResponse.request().url().scheme()); if (!sameScheme && !client.followSslRedirects()) return null; // Most redirects don't include a request body. Request.Builder requestBuilder = userResponse.request().newBuilder(); if (HttpMethod.permitsRequestBody(method)) { final boolean maintainBody = HttpMethod.redirectsWithBody(method); if (HttpMethod.redirectsToGet(method)) { requestBuilder.method("GET", null); } else { RequestBody requestBody = maintainBody ? userResponse.request().body() : null; requestBuilder.method(method, requestBody); } if (!maintainBody) { requestBuilder.removeHeader("Transfer-Encoding"); requestBuilder.removeHeader("Content-Length"); requestBuilder.removeHeader("Content-Type"); } } // When redirecting across hosts, drop all authentication headers. This // is potentially annoying to the application layer since they have no // way to retain them. if (!sameConnection(userResponse, url)) { requestBuilder.removeHeader("Authorization"); } return requestBuilder.url(url).build(); case HTTP_CLIENT_TIMEOUT: // 408's are rare in practice, but some servers like HAProxy use this response code. The // spec says that we may repeat the request without modifications. Modern browsers also // repeat the request (even non-idempotent ones.) if (userResponse.request().body() instanceof UnrepeatableRequestBody) { return null; } return userResponse.request(); default: return null; } }
基础的拦截器(BridgeInterceptor)
正如名字一样,其对Network Request配置基本的网络信息,
- 设置Content-Type
- /设置Host
- 设置Connection头(User-Agent、Cookie、Accept-Encoding)
- 判断服务器是否支持gzip压缩格式,如果支持则交给kio压缩
public final class BridgeInterceptor implements Interceptor {
private final CookieJar cookieJar;
public BridgeInterceptor(CookieJar cookieJar) {
this.cookieJar = cookieJar;
}
@Override public Response intercept(Chain chain) throws IOException {
Request userRequest = chain.request();
Request.Builder requestBuilder = userRequest.newBuilder();
RequestBody body = userRequest.body();
if (body != null) {
MediaType contentType = body.contentType();
if (contentType != null) {
requestBuilder.header("Content-Type", contentType.toString());
}
long contentLength = body.contentLength();
if (contentLength != -1) {
requestBuilder.header("Content-Length", Long.toString(contentLength));
requestBuilder.removeHeader("Transfer-Encoding");
} else {
requestBuilder.header("Transfer-Encoding", "chunked");
requestBuilder.removeHeader("Content-Length");
}
}
if (userRequest.header("Host") == null) {
requestBuilder.header("Host", hostHeader(userRequest.url(), false));
}
if (userRequest.header("Connection") == null) {
requestBuilder.header("Connection", "Keep-Alive");
}
// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
// the transfer stream.
boolean transparentGzip = false;
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
transparentGzip = true;
requestBuilder.header("Accept-Encoding", "gzip");
}
List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
if (!cookies.isEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies));
}
if (userRequest.header("User-Agent") == null) {
requestBuilder.header("User-Agent", Version.userAgent());
}
Response networkResponse = chain.proceed(requestBuilder.build());
HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
Response.Builder responseBuilder = networkResponse.newBuilder()
.request(userRequest);
if (transparentGzip
&& "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
&& HttpHeaders.hasBody(networkResponse)) {
GzipSource responseBody = new GzipSource(networkResponse.body().source());
Headers strippedHeaders = networkResponse.headers().newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build();
responseBuilder.headers(strippedHeaders);
responseBuilder.body(new RealResponseBody(strippedHeaders, Okio.buffer(responseBody)));
}
return responseBuilder.build();
}
/** Returns a 'Cookie' HTTP request header with all cookies, like {@code a=b; c=d}. */
private String cookieHeader(List<Cookie> cookies) {
StringBuilder cookieHeader = new StringBuilder();
for (int i = 0, size = cookies.size(); i < size; i++) {
if (i > 0) {
cookieHeader.append("; ");
}
Cookie cookie = cookies.get(i);
cookieHeader.append(cookie.name()).append('=').append(cookie.value());
}
return cookieHeader.toString();
}
}
缓存拦截器 (CacheInterceptor)
- 重点在于Intercept方法中,其中的机制 上篇博客有说道,亦可以直接查看代码的注释
public final class CacheInterceptor implements Interceptor {
final InternalCache cache;
public CacheInterceptor(InternalCache cache) {
this.cache = cache;
}
Response intercept(Chain chain) throws IOException {
//如果配置了缓存:优先从缓存中读取Response
Response cacheCandidate = cache != null
? cache.get(chain.request())
: null;
long now = System.currentTimeMillis();
//缓存策略,该策略通过某种规则来判断缓存是否有效
CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
Request networkRequest = strategy.networkRequest;
Response cacheResponse = strategy.cacheResponse;
。。。。
//如果根据缓存策略strategy禁止使用网络,并且缓存无效,直接返回空的Response
if (networkRequest == null && cacheResponse == null) {
return new Response.Builder()
。。。
.code(504)
.message("Unsatisfiable Request (only-if-cached)")
.body(Util.EMPTY_RESPONSE)//空的body
。。。
.build();
}
//如果根据缓存策略strategy禁止使用网络,且有缓存则直接使用缓存
if (networkRequest == null) {
return cacheResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build();
}
//需要网络
Response networkResponse = null;
try {//执行下一个拦截器,发起网路请求
networkResponse = chain.proceed(networkRequest);
} finally {
。。。
}
//本地有缓存,
if (cacheResponse != null) {
//并且服务器返回304状态码(说明缓存还没过期或服务器资源没修改)
if (networkResponse.code() == HTTP_NOT_MODIFIED) {
//使用缓存数据
Response response = cacheResponse.newBuilder()
。。。
.build();
。。。。
//返回缓存
return response;
} else {
closeQuietly(cacheResponse.body());
}
}
//如果网络资源已经修改:使用网络响应返回的最新数据
Response response = networkResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
//将最新的数据缓存起来
if (cache != null) {
if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
CacheRequest cacheRequest = cache.put(response);
return cacheWritingResponse(cacheRequest, response);
}
。。。。
//返回最新的数据
return response;
}
private static Response stripBody(Response response) {
return response != null && response.body() != null
? response.newBuilder().body(null).build()
: response;
}
/**
* Returns a new source that writes bytes to {@code cacheRequest} as they are read by the source
* consumer. This is careful to discard bytes left over when the stream is closed; otherwise we
* may never exhaust the source stream and therefore not complete the cached response.
*/
private Response cacheWritingResponse(final CacheRequest cacheRequest, Response response)
throws IOException {
// Some apps return a null body; for compatibility we treat that like a null cache request.
if (cacheRequest == null) return response;
Sink cacheBodyUnbuffered = cacheRequest.body();
if (cacheBodyUnbuffered == null) return response;
final BufferedSource source = response.body().source();
final BufferedSink cacheBody = Okio.buffer(cacheBodyUnbuffered);
Source cacheWritingSource = new Source() {
boolean cacheRequestClosed;
@Override public long read(Buffer sink, long byteCount) throws IOException {
long bytesRead;
try {
bytesRead = source.read(sink, byteCount);
} catch (IOException e) {
if (!cacheRequestClosed) {
cacheRequestClosed = true;
cacheRequest.abort(); // Failed to write a complete cache response.
}
throw e;
}
if (bytesRead == -1) {
if (!cacheRequestClosed) {
cacheRequestClosed = true;
cacheBody.close(); // The cache response is complete!
}
return -1;
}
sink.copyTo(cacheBody.buffer(), sink.size() - bytesRead, bytesRead);
cacheBody.emitCompleteSegments();
return bytesRead;
}
@Override public Timeout timeout() {
return source.timeout();
}
@Override public void close() throws IOException {
if (!cacheRequestClosed
&& !discard(this, HttpCodec.DISCARD_STREAM_TIMEOUT_MILLIS, MILLISECONDS)) {
cacheRequestClosed = true;
cacheRequest.abort();
}
source.close();
}
};
return response.newBuilder()
.body(new RealResponseBody(response.headers(), Okio.buffer(cacheWritingSource)))
.build();
}
/** Combines cached headers with a network headers as defined by RFC 2616, 13.5.3\. */
private static Headers combine(Headers cachedHeaders, Headers networkHeaders) {
Headers.Builder result = new Headers.Builder();
for (int i = 0, size = cachedHeaders.size(); i < size; i++) {
String fieldName = cachedHeaders.name(i);
String value = cachedHeaders.value(i);
if ("Warning".equalsIgnoreCase(fieldName) && value.startsWith("1")) {
continue; // Drop 100-level freshness warnings.
}
if (!isEndToEnd(fieldName) || networkHeaders.get(fieldName) == null) {
Internal.instance.addLenient(result, fieldName, value);
}
}
for (int i = 0, size = networkHeaders.size(); i < size; i++) {
String fieldName = networkHeaders.name(i);
if ("Content-Length".equalsIgnoreCase(fieldName)) {
continue; // Ignore content-length headers of validating responses.
}
if (isEndToEnd(fieldName)) {
Internal.instance.addLenient(result, fieldName, networkHeaders.value(i));
}
}
return result.build();
}
/**
* Returns true if {@code fieldName} is an end-to-end HTTP header, as defined by RFC 2616,
* 13.5.1.
*/
static boolean isEndToEnd(String fieldName) {
return !"Connection".equalsIgnoreCase(fieldName)
&& !"Keep-Alive".equalsIgnoreCase(fieldName)
&& !"Proxy-Authenticate".equalsIgnoreCase(fieldName)
&& !"Proxy-Authorization".equalsIgnoreCase(fieldName)
&& !"TE".equalsIgnoreCase(fieldName)
&& !"Trailers".equalsIgnoreCase(fieldName)
&& !"Transfer-Encoding".equalsIgnoreCase(fieldName)
&& !"Upgrade".equalsIgnoreCase(fieldName);
}
}
连接的拦截器(ConnectInterceptor)
打开了与服务器的链接,正式开启了网络请求(打开了socket链接)
public final class ConnectInterceptor implements Interceptor {
public final OkHttpClient client;
public ConnectInterceptor(OkHttpClient client) {
this.client = client;
}
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
//从拦截器链里得到StreamAllocation对象
StreamAllocation streamAllocation = realChain.streamAllocation();
// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
//获取realConnetion
RealConnection connection = streamAllocation.connection();
//执行下一个拦截器
return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
}
CallServerInterceptor
作为okhttp的最后一个拦截器,他的主要作用是向服务器发送请求, 以及返回从服务器的到的response对象供客户端使用
同样的首先看一下最关键的Intercept方法,其中有如下几个关键点
- 调用writeRequestHeaders,作用为将http发送的网络请求构建成服务器能够接受的形式 如 "query":"{\r\n user(login: \"" 省略。 其中writeRequestHeaders 又进一步调用了writeRequest,实现了OKio的Sink
- 检测是否有请求body部分则是负责解析post,put等需要请求体的方法。如果服务器允许发送ReqeustBody,则调用sink类和ReqeustBody的writeTo方法发送请求体
- 读取服务器响应然后构建Response对象,首先构建请求builder对象, 其次通过ResopnseBuilder来创建Response对象
public Response intercept(Chain chain) throws IOException {
// 省略部分代码
// 获取HttpCodec
HttpCodec httpCodec = realChain.httpStream();
// 省略部分代码
Request request = realChain.request();
//向服务器发送请求
httpCodec.writeRequestHeaders(request);
Response.Builder responseBuilder = null;
// 检测是否有请求body
if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
httpCodec.flushRequest();
//构建responseBuilder对象
responseBuilder = httpCodec.readResponseHeaders(true);
}
//如果服务器允许发送请求body发送
if (responseBuilder == null) {
Sink requestBodyOut = httpCodec.createRequestBody(request, request.body().contentLength());
BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
request.body().writeTo(bufferedRequestBody);
bufferedRequestBody.close();
} else if (!connection.isMultiplexed()) {
//省略部分代码
}
}
//结束请求
httpCodec.finishRequest();
//构建请求buidder对象
if (responseBuilder == null) {
responseBuilder = httpCodec.readResponseHeaders(false);
}
Response response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
int code = response.code();
if (forWebSocket && code == 101) {
//省略部分代码
} else {
response = response.newBuilder()
.body(httpCodec.openResponseBody(response))
.build();
}
//省略部分代码
return response;
}
- 可以看到,最后最后调用了openResponseBody(response),该方法定义在Http2Codec,跟踪进去,源码如下,主要做的工作为将Socket的输入流InputStream对象交给OkIo的Source对象,进而封装成RealResponseBody
@Override public ResponseBody openResponseBody(Response response) throws IOException {
Source source = new StreamFinishingSource(stream.getSource());
return new RealResponseBody(response.headers(), Okio.buffer(source));
}
- 数据返回到客户端的时候,直接调用ResponseBody .
string方法即可,至此我们便通过使用okhttp成功从网络获取到了相关的数据
public final String string() throws IOException {
BufferedSource source = source();
try {
Charset charset = Util.bomAwareCharset(source, charset());
return source.readString(charset);
} finally {
Util.closeQuietly(source);
}
}
拦截器在网络请求中的调用
拦截器的调用流程大体可以如下图概括
[图片上传失败...(image-19cf46-1605337548472)]
- 在realcall类里面的execute函数执行调用getResponseWithInterceptorChain
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
}
- getResponseWithInterceptorChain可以清楚地看到五个拦截器的添加
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));
Interceptor.Chain chain = new RealInterceptorChain(
interceptors, null, null, null, 0, originalRequest);
return chain.proceed(originalRequest);
}
- 在每一条链的Intercept方法里面都调用了RealInterceptorChain.proceed方法,保证了拦截器链可以正常的运行。
response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
- 在最后的拦截链里面返回了拦截链,代表着五个拦截器的执行结束
return realChain.proceed(request, streamAllocation, httpCodec, connection);