okhttp是Android攻城狮必须掌握的网络库,很多其他开源库用到也是它,第一篇介绍okhttp原理最合适不过。
review keyword:请求分发、责任链
okhttp的使用
作为网络库,okhttp的基本功能就是发出request请求,得到response响应。OkHttpClient的构造方法,三种形式,使用构造者模式,里面有几十个参数,重要参数后面逐渐会讲到。
OkHttpClient okHttpClient1 = new OkHttpClient();
OkHttpClient okHttpClient2 = new OkHttpClient.Builder().build();
OkHttpClient okHttpClient3 = okHttpClient2.newBuilder().build();
同步请求和异步请求的例子:
private static void sync(OkHttpClient client, String url) {
Request request = new Request.Builder().url(url).build();
Response response = null;
try {
response = client.newCall(request).execute();
} catch (IOException e) {
e.printStackTrace();
}
try {
System.out.print(response.body().string());
} catch (IOException e) {
e.printStackTrace();
}
}
private static void async(OkHttpClient client, String url) {
Request request = new Request.Builder().url(url).build();
client.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
System.out.println("onFailure");
}
@Override
public void onResponse(Call call, Response response) throws IOException {
System.out.println("onResponse:" + response.body().string());
}
});
}
使用很简单不多说,速度进入源码,先放张okhttp结构图,涉及到的重要类都在上面了。
请求封装Call
请求request需要封装到Call对象,调用的是OkHttpClinet的newCall方法。Call定义为接口,具体实现类区分同步和异步:
- 同步请求:RealCall
- 异步请求:AsyncCall
异步请求需要在线程池里执行,所以AsyncCall继承了Runnable。Call提供了cancel方法,所以网络请求是可以中止的。
封装request到Call后,就交由Dispatcher执行请求分发。
请求分发Dispatcher
同步请求直接执行,异步请求提交到线程池里执行,Dispatcher里线程池的构建参数如下:
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
- 线程数量范围:0到无穷大;
- 空闲线程的回收时间是60秒;
- 阻塞队列是SynchronousQueue,不存放元素;
- 自定义线程工厂:名称直接硬编码,非守护。
没有新鲜的,线程池参数应该要烂熟于胸。
Dispatcher里有三个Deque,存放什么名称写得很清楚。
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
- readyAsyncCalls:待执行的异步请求
- runningAsyncCalls:正在执行的异步请求
- runningSyncCalls:正在执行的同步请求
同步请求调用execute方法,异步请求调用enqueue方法,本质就是将call放入对应的队列。
同步请求
@Override
public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
try {
client.dispatcher().executed(this);
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} catch (IOException e) {
eventListener.callFailed(this, e);
throw e;
} finally {
client.dispatcher().finished(this);
}
}
RealCall的execute方法,里面是一些状态判断和监听,最重要的是调用Dispatcher的executed:
synchronized void executed(RealCall call) {
runningSyncCalls.add(call);
}
很简单地,将Call加入runningSyncCalls。每个OkHttpClient对象只创建一个Dispatcher,所以操作队列时,需要同步。
具体的执行过程调用getResponseWithInterceptorChain,后文很快会说。当Call执行完成得到response时,在finally里调用Dispatcher的finished。
异步请求
@Override
public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
异步请求的enqueue,调用Dispatcher同名的enqueue,这时候传入的Call为AsyncCall。
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
到达Dispatcher进行分发,enqueue也是操作Call入队。请求的数量有限制(maxRequests=64 && maxRequestsPerHost=5),范围内加入runningAsyncCalls并提交线程池执行;否则加入readyAsyncCalls等待。
@Override
protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();
//省略callback处理部分
} catch (IOException e) {
//
} finally {
client.dispatcher().finished(this);
}
}
具体执行异步请求在execute方法,callback里的onResponse和onFailure很简单,代码略掉。execute核心同样调用getResponseWithInterceptorChain得到response,最后也是调用Dispatcher的finished。
请求完成后
请求执行完成的善后是Dispatcher.finished,功能是将完成的Call移出队列。
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
如果是异步请求,需要增加调用promoteCalls,看看readyAsyncCalls里的Call能不能放入runningAsyncCalls参与执行。这里用synchronized锁住Dispatcher,和“生产者-消费者”有点像。(回忆wait/notify的写法)
看到个扩展点,当Dispatcher里没有Call可执行时,可以设置一个idleCallback跑一些东西。
拦截器Interceptor
重头戏是方法getResponseWithInterceptorChain。
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
return chain.proceed(originalRequest);
}
getResponseWithInterceptorChain首先组建拦截器列表,包括okhttp自带的拦截器还有用户自定义的拦截器。这是责任链设计模式,每个request都需要按序通过拦截器,最终发出到服务器得到response,再反序依次通过拦截器。
- RetryAndFollowUpInterceptor
- BridgeInterceptor
- CacheInterceptor
- ConnectInterceptor
- CallServerInterceptor
okhttp的核心实现就是这几个拦截器,后面会逐个分析它们的功能。
补充说明自定义拦截器,有两种选择,application interceptors和network interceptors,区别在wiki写得很清楚,图直接搬过来。
okhttp支持WebSocket,在代码里看到如果是WebSocket,则不支持network interceptors。
WebSocket协议是基于TCP的一种新的网络协议。它实现了浏览器与服务器全双工(full-duplex)通信——允许服务器主动发送信息给客户端。
定义完拦截器后,由RealInterceptorChain将拦截器串联,调用proceed方法,传入originalRequest。
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
RealConnection connection) throws IOException {
//...
// Call the next interceptor in the chain.
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
writeTimeout);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);
//...
return response;
}
proceed的核心是创建下一个RealInterceptorChain,并传入index+1,表示获取下一个拦截器,然后执行当前拦截器的intercept,最后返回response。
不得不说,拦截器的设计非常美,每一层都各司其职,互不相干,但又配合着处理request和response,最终完成http整个流程。
连接和流
RealInterceptorChain的proceed有四个入参:
Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,RealConnection connection
RealConnection封装了Socket,是请求发送的通道;HttpCodec描述了request和response的输入输出,用的是okio;StreamAllocation是管理连接和流的桥梁。为了复用连接,okhttp使用ConnectionPool对连接进行管理。
对上面几个类的介绍,另外放在okhttp 3.10连接复用原理。
RetryAndFollowUpInterceptor
真的开始跟着请求过拦截器了。RetryAndFollowUpInterceptor是请求通过的第一个自带拦截器,负责处理请求失败的重试和服务器的重定向。
intercept的代码比较长,我们分开几部分来看。
StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
createAddress(request.url()), call, eventListener, callStackTrace);
this.streamAllocation = streamAllocation;
第一步就是创建StreamAllocation。
while(true){
//1、请求是否取消
//2、request交给下一个拦截器,得到response
//3、是否重定向
}
然后进入无限循环,逻辑很明确,检查response是否是重定向,是的话一直循环请求新url。当然,重定向次数有限制,最大到MAX_FOLLOW_UPS=20。
mark1
if (canceled) {
streamAllocation.release();
throw new IOException("Canceled");
}
首先判断canceled状态,哪里改变状态的呢?回看RealCall的cancel。
@Override
public void cancel() {
retryAndFollowUpInterceptor.cancel();
}
只有一句话,执行retryAndFollowUpInterceptor的cancel。
public void cancel() {
canceled = true;
StreamAllocation streamAllocation = this.streamAllocation;
if (streamAllocation != null) streamAllocation.cancel();
}
设置当前canceled=true,停止retryAndFollowUpInterceptor的无限循环,同时调用StreamAllocation的cancel,里面继续调用连接和流的cancel,将能停的东西都停了。
mark2
Response response;
boolean releaseConnection = true;
try {
response = realChain.proceed(request, streamAllocation, null, null);
releaseConnection = false;
} catch (RouteException e) {
// The attempt to connect via a route failed. The request will not have been sent.
if (!recover(e.getLastConnectException(), streamAllocation, false, request)) {
throw e.getLastConnectException();
}
releaseConnection = false;
continue;
} catch (IOException e) {
// An attempt to communicate with a server failed. The request may have been sent.
boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
if (!recover(e, streamAllocation, requestSendStarted, request)) throw e;
releaseConnection = false;
continue;
} finally {
// We’re throwing an unchecked exception. Release any resources.
if (releaseConnection) {
streamAllocation.streamFailed(null);
streamAllocation.release();
}
}
这部分将请求传递到下一个拦截器,并捕获处理各种网络请求异常。失败的原因很多,里面的异常处理调用recover方法判断是否能够重试。recover里检查配置、协议、exception类型。只要能重试,会保留连接。
mark3
//...
Request followUp = followUpRequest(response, streamAllocation.route());
if (followUp == null) {
if (!forWebSocket) {
streamAllocation.release();
}
return response;
}
//...
if (!sameConnection(response, followUp.url())) {
streamAllocation.release();
streamAllocation = new StreamAllocation(client.connectionPool(),
createAddress(followUp.url()), call, eventListener, callStackTrace);
this.streamAllocation = streamAllocation;
} else if (streamAllocation.codec() != null) {
throw new IllegalStateException("Closing the body of " + response
+ " didn't close its backing stream. Bad interceptor?");
}
request = followUp;
priorResponse = response;
这部分最重要的是调用followUpRequest检查是否重定向,followUpRequest里是swtich-case一系列http状态码,对我们学习各个状态码的处理是不可多得的资料。
最后还需要判断重定向的目标是否sameConnection,不是的话需要重新创建StreamAllocation。
BridgeInterceptor
使用request时,用户一般只会传入method和url。http header才不止这么少参数,填充默认参数、处理cookie、gzip等是BridgeInterceptor的工作。
CacheInterceptor
okhttp对http的缓存策略全部在CacheInterceptor中完成,另见okhttp 3.10缓存原理。request到达CacheInterceptor时,如果缓存命中,直接返回缓存response。当下层response返回到CacheInterceptor时,它可以将结果缓存起来。
ConnectInterceptor
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
StreamAllocation streamAllocation = realChain.streamAllocation();
boolean doExtensiveHealthChecks = !request.method().equals("GET");
HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
RealConnection connection = streamAllocation.connection();
return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
ConnectInterceptor的代码很短,核心功能是获取能用的流和连接,也就是HttpCodec和RealConnection,streamAllocation.newStream对应这两步:
public HttpCodec newStream(...) {
//调用findHealthyConnection得到RealConnection
//调用RealConnection.newCodec得到HttpCodec
}
private RealConnection findHealthyConnection(...) throws IOException {
while (true) {
//调用findConnection获取RealConnection
}
}
findHealthyConnection,顾名思义找到一个能用的连接,里面是个无限循环调用findConnection,直到获得RealConnection。
连接可能是复用旧有的,也可能是新建立的,findConnection方法比较长,分析下来就是对应这两步:
- 从ConnectionPool中取出连接复用;
- 创建新连接,放回ConnectionPool管理。
连接过程是tcp握手、ssl握手等协议过程,不细说。
得到连接后,就可以创建流。在RealCollection中,newCodec在当前连接上创建一个流,http1使用了okio的source和sink读写数据。
private BufferedSource source;
private BufferedSink sink;
public HttpCodec newCodec(OkHttpClient client, Interceptor.Chain chain,
StreamAllocation streamAllocation) throws SocketException {
if (http2Connection != null) {
return new Http2Codec(client, chain, streamAllocation, http2Connection);
} else {
socket.setSoTimeout(chain.readTimeoutMillis());
source.timeout().timeout(chain.readTimeoutMillis(), MILLISECONDS);
sink.timeout().timeout(chain.writeTimeoutMillis(), MILLISECONDS);
return new Http1Codec(client, streamAllocation, source, sink);
}
}
至此,请求发送需要的连接和流准备完毕。
CallServerInterceptor
最后执行的拦截器CallServerInterceptor,核心是通过okio在socket上写入request,读取response。
后记
okhttp是大师的作品,赞。
我和大师的距离就差那么一点,就像考北大差那么一点分(100分),哭笑。