OkHttp源码解析

okHttp概述

上一篇文章讲到retrofit网络请求框架,其实retrofit内部并没有真正的实现网络请求,它内部将网络请求封装成了Call,并将网络请求转角给okhttp去执行。okhttp是square开源的一个网络库,将每一个网络请求都封装成一个call,然后利用call去执行网络请求。

下面是官方给的例子:


OkHttpClient client = new OkHttpClient();

String run(String url) throws IOException {

  Request request = new Request.Builder()

      .url(url)

      .build();

  Response response = client.newCall(request).execute();

  return response.body().string();

}

okHttp源码阅读

okhttp的使用就像上面的例子一样这么简单,我们看看其内部的实现

调用流程

构造Client

首先用builder模式创建一个okhttpClient实例,里面有一个非常重要的成员dispatcher,顾名思义,主要是用来做okHttp的网络请求分发的。后续我们会再详细介绍。每一个client在创建时,都会直接new出一个dispatcher。


public Builder() {

      dispatcher = new Dispatcher();



    }

构造Request

okHttp的网络请求表示为Request,里面包含了一个请求所需要的参数。


    //请求url

    HttpUrl url;

    //请求方式

    String method;

    //请求头

    Headers.Builder headers;

    //请求body

    RequestBody body;

    //请求tag

    Object tag;

对于okHttp请求的直接触发单位并不是request,而是在request的基础上继续包装了一层,包装为Call。

构造Call

call是一个接口,供我们外部调用,主要有以下几个比较重要的方法


//获取当次请求的Request

Request request();

//同步执行入口

Response execute() throws IOException;

//异步执行入口

void enqueue(Callback responseCallback);

//取消入口

void cancel();

我们实际使用的是其实现类RealCall,我们直接通过client提供的工厂方法,传入request,并返回RealCall。


private RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {

    this.client = client;

    this.originalRequest = originalRequest;

    this.forWebSocket = forWebSocket;

    this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);

  }

请求的执行

同步请求 execute


@Override public Response execute() throws IOException {

    synchronized (this) {

      if (executed) throw new IllegalStateException("Already Executed");

      executed = true;

    }

    captureCallStackTrace();

    try {

      //添加到dispatcher的同步队列中

      client.dispatcher().executed(this);

      //通过拦截器链执行请求

      Response result = getResponseWithInterceptorChain();

      if (result == null) throw new IOException("Canceled");

      return result;

    } finally {

        //从dispatcher正在运行队列中中移除

      client.dispatcher().finished(this);

    }

  }

通过局部变量executed来标识一个call只能执行一次,需要用symchronized来保证多线程同步。

然后通过dispatcher的请求处理,其实仅仅是将这个call添加到维护的正在运行的队列中,然后立马通过拦截器链发起请求。


synchronized void executed(RealCall call) {

    runningSyncCalls.add(call);

  }

后面通过拦截器链执行网络请求。

异步请求

异步请求和同步请求非常类似。


@Override public void enqueue(Callback responseCallback) {

    synchronized (this) {

      if (executed) throw new IllegalStateException("Already Executed");

      executed = true;

    }

    captureCallStackTrace();

    eventListener.callStart(this);

    client.dispatcher().enqueue(new AsyncCall(responseCallback));

  }

enqueue仅仅调用了dispatcher的enqueue,需要将RealCall转化为AsyncCall,也就是异步请求Call,其实就是一个Runnable。


final class AsyncCall extends NamedRunnable {

    private final Callback responseCallback;

    AsyncCall(Callback responseCallback) {

      super("OkHttp %s", redactedUrl());

      this.responseCallback = responseCallback;

    }

Dispatcher的分发机制

不管同步还是异步请求,okHttp最终都会将请求传递给dispatcher。

dispatcher内部维护三个队列,一个线程池。


private @Nullable ExecutorService executorService;

  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

对于同步请求,会直接将请求添加到运行队列中,并马上执行请求,而异步请求可能需要先将请求添加到就绪异步队列中,等待dispatcher的调度唤起。


synchronized void enqueue(AsyncCall call) {

    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {

      runningAsyncCalls.add(call);

      executorService().execute(call);

    } else {

      readyAsyncCalls.add(call);

    }

  }

如果当前正在运行的网络请求�数小于设置最大值,那么表示这个请求可以立马发起,则需要将这个call添加到运行的异步队列中,并且调用线程池执行这个请求。否则需要奖这个请求添加到等待队列中。当线程池执行时,其实是调用了这个runnable的run方法,


@Override public final void run() {

    String oldName = Thread.currentThread().getName();

    Thread.currentThread().setName(name);

    try {

      execute();

    } finally {

      Thread.currentThread().setName(oldName);

    }

  }

其实内部就是执行excute方法


  @Override protected void execute() {

      boolean signalledCallback = false;

      try {

        Response response = getResponseWithInterceptorChain(null);

        ....

        responseCallback.onFailure()

        responseCallback.onResponse()

        ....

      } finally {

        client.dispatcher().finished(this);

      }

    }

在run方法执行了execute方法之后,同样是先调用了拦截器链发起网络请求,然后在网络请求回来之后,回调callback,所以,可见这个callBack是在子线程里面做的回调。最后同样是调用finished关闭这个请求。


private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {

    int runningCallsCount;

    Runnable idleCallback;

    synchronized (this) {

    //从异步执行队列中移除

      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");

      //唤起下一个call的调用

      if (promoteCalls) promoteCalls();

      //重新计算当前网络请求数

      runningCallsCount = runningCallsCount();

      idleCallback = this.idleCallback;

    }

    if (runningCallsCount == 0 && idleCallback != null) {

      idleCallback.run();

    }

  }

如果是异步执行的call的finish操作,那么它还多了一个非常重要的步骤,通过promoteCalls来唤起下一个call请求发起。


private void promoteCalls() {

    if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.

    if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.

    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {

      AsyncCall call = i.next();

      if (runningCallsForHost(call) < maxRequestsPerHost) {

        i.remove();

        runningAsyncCalls.add(call);

        executorService().execute(call);

      }

      if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.

    }

  }

首先会判断当前正在请求数是否大于设置的最大请求,然后判断等待队列是否为空,在非空的情况下,从就绪队列中移除首个call,并开始执行。

拦截器链->获取response

上面的同步或者异步请求,最终都会调用到getResponseWithinterceptorChain这个方法上。


Response getResponseWithInterceptorChain(DiskCacheListener diskCacheListener) throws IOException {

    // Build a full stack of interceptors.

    List<Interceptor> interceptors = new ArrayList<>();

    interceptors.addAll(client.interceptors());

    interceptors.add(retryAndFollowUpInterceptor);

    interceptors.add(new BridgeInterceptor(client.cookieJar()));

    interceptors.add(new CacheInterceptor(client.internalCache(), diskCacheListener));

    interceptors.add(new ConnectInterceptor(client));

    if (!forWebSocket) {

      interceptors.addAll(client.networkInterceptors());

    }

    interceptors.add(new CallServerInterceptor(forWebSocket));

    Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,

        originalRequest, this, eventListener, client.connectTimeoutMillis(),

        client.readTimeoutMillis(), client.writeTimeoutMillis());

    return chain.proceed(originalRequest);

  }

首先会构造拦截器链,其实就是一个list,将一些需要的拦截器按序添加到list中,然后构造Chain,最终调用proceed开始执行。

链式调用

okHttp这个拦截器的遍历的实现其实是非常巧妙的。Chain有一个index表示当前执行的哪一个拦截器。


//第一次执行

Interceptor.Chain chain = new RealInterceptorChain(.., 0, ...);

    return chain.proceed(originalRequest);

proceed方法


// Call the next interceptor in the chain.

RealInterceptorChain next = new RealInterceptorChain(index + 1);

Interceptor interceptor = interceptors.get(index);

Response response = interceptor.intercept(next);

在proceed中首先会构造出下一个Chain,只需要把index+1即可,然后拿出当前的拦截器,并且执行其intercept方法。


@Override public Response intercept(Chain chain) throws IOException {

....

Response networkResponse = chain.proceed(requestBuilder.build());

....

}

在intercept中,再次调用了下一个链的proceed方法。依次执行,形成了一个链式调用。

拦截器

RetryAndFollow

网络请求失败时的重连和重定向的拦截器

Bridge

桥接拦截器,主要是用来增加请求头,cookied, userAgent。

Cache

主要是负责okHttp的缓存。okHttp有一个缓存策略。

CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();

    Request networkRequest = strategy.networkRequest;

    Response cacheResponse = strategy.cacheResponse;

networkRequest表示当前请求,cacheResponse表示是否有该请求对应的缓存response。


// If we're forbidden from using the network and the cache is insufficient, fail.

    if (networkRequest == null && cacheResponse == null) {

      return new Response.Builder()

          .request(chain.request())

          .protocol(Protocol.HTTP_1_1)

          .code(504)

          .message("Unsatisfiable Request (only-if-cached)")

          .body(Util.EMPTY_RESPONSE)

          .sentRequestAtMillis(-1L)

          .receivedResponseAtMillis(System.currentTimeMillis())

          .build();

    }

    // If we don't need the network, we're done.

    if (networkRequest == null) {

      return cacheResponse.newBuilder()

          .cacheResponse(stripBody(cacheResponse))

          .build();

    }

如果当前发起的请求为null,而且没有缓存的response,那么直接返回错误。

如果发起的请求不为null,而且存在缓存,那么直接用缓存,提前返回。否则正常发起网络请求。

Connect

ConnectIntercept主要用建立网络链接。


public Response intercept(Chain chain) throws IOException {

    RealInterceptorChain realChain = (RealInterceptorChain) chain;

    Request request = realChain.request();

    StreamAllocation streamAllocation = realChain.streamAllocation();

    // We need the network to satisfy this request. Possibly for validating a conditional GET.

    boolean doExtensiveHealthChecks = !request.method().equals("GET");

    HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);

    RealConnection connection = streamAllocation.connection();

    return realChain.proceed(request, streamAllocation, httpCodec, connection);

  }

  • 首先会从获取出链中拿出stramALlocation。StreamAlloction是okHttp用来辅助管理的一个类,主要是管理流、连接、和call之间的关系。是流和连接之间的component,它会为call请求去寻找连接并建立一个流。这个流是在第一个retryAndFollowIntercept中创建的,最后传递到connectIntercept。

  • 然后通过newStream方法找到一个合适的socket链接。并返回对应的socket操作封装类HttpCodeC,newStream会从连接池中去寻找合适的连接,如果没有可复用的连接,将会创建出新连接。


RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,

          writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);

      HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);

CallServer

CallServerIntercept是真正发起网络请求的类,通过上一个拦截器处理完后的HttpCodec进行网络的读写。内部封装了okio进行数据读写


...

//写入请求头

httpCodec.writeRequestHeaders(request);

//通过okIo写入请求请求体

request.body().writeTo(bufferedRequestBody);

然后通过okio进行response读取


if (responseBuilder == null) {

      realChain.eventListener().responseHeadersStart(realChain.call());

      responseBuilder = httpCodec.readResponseHeaders(false);

    }

    Response response = responseBuilder

        .request(request)

        .handshake(streamAllocation.connection().handshake())

        .sentRequestAtMillis(sentRequestMillis)

        .receivedResponseAtMillis(System.currentTimeMillis())

        .build();

okHttp连接池的处理

put

当没有找到可以复用的连接时,需要创建新的连接,并将连接放入连接池中,


void put(RealConnection connection) {

    assert (Thread.holdsLock(this));

    if (!cleanupRunning) {

      cleanupRunning = true;

      executor.execute(cleanupRunnable);

    }

    connections.add(connection);

  }

每次添加新的连接,需要看是否需要清理连接池。这个处理操作是通过线程池来处理的。通过下面几个步骤来判断一个连接是否需要被回收

  • 计数,看一个connect是否被多个streamAllocation所引用。Connecttion内部存储了一个StreamAllocation的弱引用列表list,当一个网络请求结束时,将会关闭stremAllocation,并且回收

for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {

        RealConnection connection = i.next();

        // If the connection is in use, keep searching.

        if (pruneAndGetAllocationCount(connection, now) > 0) {

          inUseConnectionCount++;

          continue;

        }

        idleConnectionCount++;

        }

  • 如果空闲数大于最大空闲数,那么回收

if (longestIdleDurationNs >= this.keepAliveDurationNs

          || idleConnectionCount > this.maxIdleConnections) {

        connections.remove(longestIdleConnection);

      }

get连接


Internal.instance.get(connectionPool, address, this, null);

通过address来从连接池中获取需要复用的连接。


  @Nullable RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {

    assert (Thread.holdsLock(this));

    for (RealConnection connection : connections) {

      if (connection.isEligible(address, route)) {

        streamAllocation.acquire(connection);

        return connection;

      }

    }

    return null;

  }

首先会判断连接池中是否包含了这个请求地址和路由对应的连接,如果有,那么在这个connection中添加streamAllocation的引用并直接返回,

socket

socket连接的建立,是在StreamAllocation中建立的。当我们没找到可以复用的连接,那么将会创建一个新的连接。


result = new RealConnection(connectionPool, selectedRoute);

//为connect添加streamAllocation的引用

acquire(result);

在创建完connection之后,需要建立socket连接


  /** Does all the work necessary to build a full HTTP or HTTPS connection on a raw socket. */

  private void connectSocket(int connectTimeout, int readTimeout, Call call,

      EventListener eventListener) throws IOException {

    Proxy proxy = route.proxy();

    Address address = route.address();

    rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP

        ? address.socketFactory().createSocket()

        : new Socket(proxy);

    eventListener.connectStart(call, route.socketAddress(), proxy);

    rawSocket.setSoTimeout(readTimeout);

    try {

      Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);

    } catch (ConnectException e) {

      ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());

      ce.initCause(e);

      throw ce;

    }

    // The following try/catch block is a pseudo hacky way to get around a crash on Android 7.0

    // More details:

    // https://github.com/square/okhttp/issues/3245

    // https://android-review.googlesource.com/#/c/271775/

    try {

      source = Okio.buffer(Okio.source(rawSocket));

      sink = Okio.buffer(Okio.sink(rawSocket));

    } catch (NullPointerException npe) {

      if (NPE_THROW_WITH_NULL.equals(npe.getMessage())) {

        throw new IOException(npe);

      }

    }

  }

上面的就是socket连接的建立过程,大体可以分为以下几步:

  • 创建socket套接字:

rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP

        ? address.socketFactory().createSocket()

        : new Socket(proxy);

  • 连接远端Socket

Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);

  • 读写流,okIo封装

//read

source = Okio.buffer(Okio.source(rawSocket));

//write

sink = Okio.buffer(Okio.sink(rawSocket));

这个是客户端的socket创建,服务端的socket创建需要多一个监听

总结

其实okHttp的利用了java的职责链模式,每一个拦截器都有属于自己功能。下面看看一张自己绘制的okhttp访问的流程。

[图片上传失败...(image-2de00c-1541683717380)]

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,193评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,306评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,130评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,110评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,118评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,085评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,007评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,844评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,283评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,508评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,667评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,395评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,985评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,630评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,797评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,653评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,553评论 2 352

推荐阅读更多精彩内容