OkHttp 源码解析(二):建立连接

简介

上一篇文章(OkHttp源码解析(一):基本流程)介绍了 OkHttp 的基本流程,包括 Request 的创建、Dispatcher 对 Request 的调度以及 Interceptor 的使用。OkHttp 中默认会添加 RetryAndFollowUpInterceptor、BridgeInterceptor、CacheInterceptor、ConnectInterceptor 以及 CallServerInterceptor 这几个拦截器。本文主要看一下 RetryAndFollupInterceptor 并引出建立连接相关的分析。

RetryAndFollowUpInterceptor

Interceptor 最主要的代码都在 intercept 中,下面是 RetryAndFollowUpInterceptor#intercept 中的部分代码:

@Override public Response intercept(Chain chain) throws IOException {
    Request request = chain.request();

    streamAllocation = new StreamAllocation(
        client.connectionPool(), createAddress(request.url()), callStackTrace);  // 1

    int followUpCount = 0;
    Response priorResponse = null;
    while (true) {
      if (canceled) {
        streamAllocation.release();  // 2
        throw new IOException("Canceled");
      }
    ...
    response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null); // 3
    ...
}

上面注释 1 处创建了一个 StreamAllocation 对象,注释 2 处 调用了其 release 方法,注释 3 处则把这个对象传给了下一个 Interceptor。StreamAlloction 这个类很重要,下面就看一下它的用途。

StreamAlloction

StreamAllocation 从名字上看是流分配器,其实它是统筹管理了几样东西,注释写的非常清楚:

 /**
 * This class coordinates the relationship between three entities:
 *
 * <ul>
 *     <li><strong>Connections:</strong> physical socket connections to remote servers. These are
 *         potentially slow to establish so it is necessary to be able to cancel a connection
 *         currently being connected.
 *     <li><strong>Streams:</strong> logical HTTP request/response pairs that are layered on
 *         connections. Each connection has its own allocation limit, which defines how many
 *         concurrent streams that connection can carry. HTTP/1.x connections can carry 1 stream
 *         at a time, HTTP/2 typically carry multiple.
 *     <li><strong>Calls:</strong> a logical sequence of streams, typically an initial request and
 *         its follow up requests. We prefer to keep all streams of a single call on the same
 *         connection for better behavior and locality.
 * </ul>

简单来说, StreamAllocation 协调了 3 样东西:

  • Connections : 物理的 socket 连接
  • Streams:逻辑上的 HTTP request/response 对。每个 Connection 有个变量 allocationLimit ,用于定义可以承载的并发的 streams 的数量。HTTP/1.x 的 Connection 一次只能有一个 stream, HTTP/2 一般可以有多个。
  • CallsStreams 的序列。一个初始的 request 可能还会有后续的 request(如重定向)。OkHttp 倾向于让一个 call 所有的 streams 运行在同一个 connection 上。

StreamAllocation 提供了一些 API 来释放以上的资源对象。 在 RetryAndFollowUpInterceptor 中创建的 StreamAllocation 对象下一个用到的地方是 ConnectInterceptor,其 intercept 代码如下:

@Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    StreamAllocation streamAllocation = realChain.streamAllocation();

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
    RealConnection connection = streamAllocation.connection();

    return realChain.proceed(request, streamAllocation, httpCodec, connection);
}

在上面的代码中, streamAllocation 创建了 httpCodec 以及 connection 对象。 httpCodec 即是上面所说的 Streams,而 connection 则是上面的 ConnectionConnection 是一个接口,它的唯一实现类是 RealConnection

newStream

StreamAllocation 中的 newStream 方法用于寻找新的 RealConnection 以及 HttpCodec,代码如下:

public HttpCodec newStream(OkHttpClient client, boolean doExtensiveHealthChecks) {
    int connectTimeout = client.connectTimeoutMillis();
    int readTimeout = client.readTimeoutMillis();
    int writeTimeout = client.writeTimeoutMillis();
    boolean connectionRetryEnabled = client.retryOnConnectionFailure();

    try {
      RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
          writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);
      HttpCodec resultCodec = resultConnection.newCodec(client, this);

      synchronized (connectionPool) {
        codec = resultCodec;
        return resultCodec;
      }
    } catch (IOException e) {
      throw new RouteException(e);
    }
  }

newStream 中,通过 findHealthyConnection 找到可用的 Connection ,并用这个 Connection 生成一个 HttpCodec 对象。 findHealthyConnection 是找到一个健康的连接,代码如下:

private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
      int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
      throws IOException {
    while (true) {
      RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
          connectionRetryEnabled);

      // If this is a brand new connection, we can skip the extensive health checks.
      synchronized (connectionPool) {
      // successCount == 0 表示还未使用过,则可以使用
        if (candidate.successCount == 0) {
          return candidate;
        }
      }

      // Do a (potentially slow) check to confirm that the pooled connection is still good. If it
      // isn't, take it out of the pool and start again.
      if (!candidate.isHealthy(doExtensiveHealthChecks)) {
        noNewStreams();
        continue;
      }

      return candidate;
    }
 }

public boolean isHealthy(boolean doExtensiveChecks) {
    if (socket.isClosed() || socket.isInputShutdown() || socket.isOutputShutdown()) {
      return false;
    }
    ... // 省略 Http2 代码
    return true;
  }

在一个无限循环中,通过 findConnection 寻找一个 connection,并判断是否可用,首先如果没有使用过的肯定是健康的可直接返回,否则调用 isHealthy,主要就是判断 socket 是否关闭。这里的 socket 是在 findConnection 中赋值的,再看看 findConnection 的代码:

private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
      boolean connectionRetryEnabled) throws IOException {
    Route selectedRoute;
    synchronized (connectionPool) {
      if (released) throw new IllegalStateException("released");
      if (codec != null) throw new IllegalStateException("codec != null");
      if (canceled) throw new IOException("Canceled");

      // Attempt to use an already-allocated connection.
      RealConnection allocatedConnection = this.connection;
      if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
        return allocatedConnection;
      }

      // Attempt to get a connection from the pool.
      // 1. 从 ConnectionPool 取得 connection
      Internal.instance.get(connectionPool, address, this, null);
      if (connection != null) {
        return connection;
      }

      selectedRoute = route;
    }

    // If we need a route, make one. This is a blocking operation.
    if (selectedRoute == null) {
      selectedRoute = routeSelector.next();
    }

    RealConnection result;
    synchronized (connectionPool) {
      if (canceled) throw new IOException("Canceled");

      // Now that we have an IP address, make another attempt at getting a connection from the pool.
      // 2. 有了 ip 地址后再从 connectionpool中取一次
      // This could match due to connection coalescing.
      Internal.instance.get(connectionPool, address, this, selectedRoute);
      if (connection != null) return connection;

      // Create a connection and assign it to this allocation immediately. This makes it possible
      // for an asynchronous cancel() to interrupt the handshake we're about to do.
      route = selectedRoute;
      refusedStreamCount = 0;
      // 3. ConnectionPool 中没有,新创建一个
      result = new RealConnection(connectionPool, selectedRoute);
      // 3. 将 StreamAllocation 加入到 `RealConnection` 中的一个队列中
      acquire(result);
    }

    // Do TCP + TLS handshakes. This is a blocking operation.
    // 4. 建立连接,在其中创建 socket
    result.connect(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled);
    routeDatabase().connected(result.route());

    Socket socket = null;
    synchronized (connectionPool) {
      // Pool the connection.
      // 5. 将新创建的 connection 放到 ConnectionPool 中 
      Internal.instance.put(connectionPool, result);

      // If another multiplexed connection to the same address was created concurrently, then
      // release this connection and acquire that one.
      if (result.isMultiplexed()) {
        socket = Internal.instance.deduplicate(connectionPool, address, this);
        result = connection;
      }
    }
    closeQuietly(socket);

    return result;
  }

上面 Connection 的创建大体是以下几个步骤:

  1. 调用 Intenal.get 方法从 ConnectionPool 中获取一个 Connection,主要根据 url 的 host 判断,相关代码在 ConnectionPool 中。
  2. 如果没有并且又获取了 IP 地址,则再获取一次。
  3. 如果 ConnectionPool 中没有, 则新创建一个 RealConnection,并调用 acquireStreamAllocation 中加入 RealConnection 中的一个队列中。
  4. 调用 RealConnection#connect 方法建立连接,在内部会创建 Socket
  5. 将新创建的 Connection 加入到 ConnectionPool 中。

获取到了 Connection 之后,再创建一个 HttpCodec 对象。

public HttpCodec newCodec(
      OkHttpClient client, StreamAllocation streamAllocation) throws SocketException {
    if (http2Connection != null) {
      return new Http2Codec(client, streamAllocation, http2Connection);
    } else {
      socket.setSoTimeout(client.readTimeoutMillis());
      source.timeout().timeout(client.readTimeoutMillis(), MILLISECONDS);
      sink.timeout().timeout(client.writeTimeoutMillis(), MILLISECONDS);
      return new Http1Codec(client, streamAllocation, source, sink);
    }
}

根据是 Http1 还是 Http2 创建对应的 HttpCodec, 其中的 socket 是在 RealConnection 中的 connect 方法创建的。下面具体看看RealConnection

RealConnection

RealConnection 封装的是底层的 Socket 连接,内部必然有一个 Socket 对象,下面是 RealConnection 内部的变量:

public final class RealConnection extends Http2Connection.Listener implements Connection {
  private static final String NPE_THROW_WITH_NULL = "throw with null exception";
  private final ConnectionPool connectionPool;
  private final Route route;

  // The fields below are initialized by connect() and never reassigned.

  /** The low-level TCP socket. */
  private Socket rawSocket;

  /**
   * The application layer socket. Either an {@link SSLSocket} layered over {@link #rawSocket}, or
   * {@link #rawSocket} itself if this connection does not use SSL.
   */
  private Socket socket;
  private Handshake handshake;
  private Protocol protocol;
  private Http2Connection http2Connection;
  private BufferedSource source;
  private BufferedSink sink;

  // The fields below track connection state and are guarded by connectionPool.

  /** If true, no new streams can be created on this connection. Once true this is always true. */
  public boolean noNewStreams;

  public int successCount;

  /**
   * The maximum number of concurrent streams that can be carried by this connection. If {@code
   * allocations.size() < allocationLimit} then new streams can be created on this connection.
   */
  public int allocationLimit = 1;

  /** Current streams carried by this connection. */
  public final List<Reference<StreamAllocation>> allocations = new ArrayList<>();

  /** Nanotime timestamp when {@code allocations.size()} reached zero. */
  public long idleAtNanos = Long.MAX_VALUE;
  ...
}
  • Route 表示的是与服务端建立的路径,其实内部封装了 AddressAddress 则是封装了请求的 URL。
  • rawSocket 对象代表底层的连接,还有一个 socket 是用于 Https, 对于普通的 Http 请求来说,这两个对象是一样的。 sourcesink 则是利用 Okio 封装 socket 得到的输入输出流。(如果想了解 Okio 的原理,可以参考我之前的文章:Okio 源码解析(一):数据读取流程
  • noNewStream 对象用于标识这个 Connection 不能再用于 Http 请求了,一旦设置为 true, 则不会再变。
  • allocationLimit 指的是这个 Connection 最多能同时承载几个 Http 流,对于 Http/1 来说只能是一个。
  • allocations 是一个 List 对象,里面保存着正在使用这个 ConnectionStreamAllocation 的弱引用,当 StreamAllocation 调用 acquire 时,便会将其弱引用加入这个 List,调用 release 则是移除引用。allocations 为空说明此 Connection 为闲置, ConnectionPool 利用这些信息来决定是否关闭这个连接。

connect

RealConnection 用于建立连接,里面有相应的 connect 方法:

public void connect(
      int connectTimeout, int readTimeout, int writeTimeout, boolean connectionRetryEnabled) {
    ...
    while (true) {
      try {
        if (route.requiresTunnel()) {
          connectTunnel(connectTimeout, readTimeout, writeTimeout);
        } else {
          // 创建socket,建立连接
          connectSocket(connectTimeout, readTimeout);
        }
        // 建立
        establishProtocol(connectionSpecSelector);
        break;
      }
    ...
}

private void connectSocket(int connectTimeout, int readTimeout) throws IOException {
    Proxy proxy = route.proxy();
    Address address = route.address();
    // 创建 socket
    rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
        ? address.socketFactory().createSocket()
        : new Socket(proxy);

    rawSocket.setSoTimeout(readTimeout);
    try {
      // 建立连接,相当于调用 socket 的 connect 方法
      Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
    } catch (ConnectException e) {
      ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
      ce.initCause(e);
      throw ce;
    }
    
    try {
      // 获取输入输出流
      source = Okio.buffer(Okio.source(rawSocket));
      sink = Okio.buffer(Okio.sink(rawSocket));
    } catch (NullPointerException npe) {
      if (NPE_THROW_WITH_NULL.equals(npe.getMessage())) {
        throw new IOException(npe);
      }
    }
}

如果不是 Https, 则调用 connectSocket,在内部创建 rawSocket 对象,设置超时时间。紧接着 Platform.get().connectSocket 根据不同的平台调用相应的 connect 方法,这样 rawSocket 就连接到服务端了。然后是用 Okio 封装 rawSocket 的输入输出流,这里的输入输出流最终是交给 HttpCodec 进行 Http 报文的写入都读取。通过以上步骤,就实现了 Http 请求的连接。

总结

本文从 RetryAndFollowupIntercept 中创建 StreamAllocation 对象,到 Connection 中创建 RealConnectionHttpCodec,分析了 OkHttp 建立连接的基本过程。可以看出, OkHttp 中的连接由
RealConnection 封装,Http 流的输入输出由 HttpCodec 操作,而 StreamAllocation 则统筹管理这些资源。在连接的寻找与创建过程,有个关键的东西是 ConnectionPool, 即连接池。它负责管理所有的 Connection,OkHttp 利用这个连接池进行 Connection 的重用以提高网络请求的效率。本文并没有详细分析 ConnectionPool ,相关内容可以参见下一篇: OkHttp源码解析(三):连接池

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,451评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,172评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,782评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,709评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,733评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,578评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,320评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,241评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,686评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,878评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,992评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,715评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,336评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,912评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,040评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,173评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,947评论 2 355

推荐阅读更多精彩内容