okhttp3.6.0源码分析4——连接池

okhttp3.6.0源码分析系列文章整体内容如下:

前言

本章关注的是连接复用。
在http1.0之前的协议每一次请求都进行tcp三次握手与四次分手,假如用户访问一个复杂的网络,从发出请求,到得到响应要进行多次串行的http请求,就会造成很严重的延时。因此在http采用了一种叫keep-alive的机制。它可以在传输数据后仍保持连接,当客户端想要进行数据传输的时候可以重用上一个连接,而不需要重新创建新的连接。


1 连接池

1.1 初始化

在我们通过调用Okhttp.Build的默认构造器来使用okhttp默认配置的时候,在Build的默认构造器中会构建一个ConnectionPool:

connectionPool = new ConnectionPool();

在调用ConnectionPool的构造器之前,会先初始化ConnectionPool的静态变量和实例变量,先看一下它比较重要的属性:

private final Deque<RealConnection> connections = new ArrayDeque<>(); //双向队列,双端队列同时具有队列和栈性质,经常在缓存中被使用,里面维护了RealConnection也就是socket物理连接的包装。
private final int maxIdleConnections; //连接池最大空闲连接数量
private final long keepAliveDurationNs; //空闲连接最大存活时间
final RouteDatabase routeDatabase = new RouteDatabase(); //它用来记录连接失败的Route的黑名单,当连接失败的时候就会把失败的线路加进去
boolean cleanupRunning; //标记连接池回收器任务状态,默认是false

进入ConnectPool内部:

public ConnectionPool() {
   // 连接池最多包含5个连接(空闲+工作),空闲连接的最大存活时间是5分钟。
    this(5, 5, TimeUnit.MINUTES);
  }

  public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
    this.maxIdleConnections = maxIdleConnections;
    this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);

    // Put a floor on the keep alive duration, otherwise cleanup will spin loop.
    if (keepAliveDuration <= 0) {
      throw new IllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration);
    }
  }

会构建一个最多包含5个连接,控线连接最大存活时间为5分钟。

1.2 连接池第一次被使用

在ConnectInterceptor中我们会构建一个stream,也就是HttpCodec的实例。这时候会执行streamAllocation.newStream(client, doExtensiveHealthChecks);方法。

public HttpCodec newStream(OkHttpClient client, boolean doExtensiveHealthChecks) {
    int connectTimeout = client.connectTimeoutMillis();
    int readTimeout = client.readTimeoutMillis();
    int writeTimeout = client.writeTimeoutMillis();
    boolean connectionRetryEnabled = client.retryOnConnectionFailure();

    try {
      RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
          writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);
      HttpCodec resultCodec = resultConnection.newCodec(client, this);

      synchronized (connectionPool) {
        codec = resultCodec;
        return resultCodec;
      }
    } catch (IOException e) {
      throw new RouteException(e);
    }
  }

可以看出来它会先调用findHealthyConnection获取一个可用的connection,具体逻辑是由findConnection实现的。

    Route selectedRoute;
    synchronized (connectionPool) {
      if (released) throw new IllegalStateException("released");
      if (codec != null) throw new IllegalStateException("codec != null");
      if (canceled) throw new IOException("Canceled");

      // Attempt to use an already-allocated connection.
      RealConnection allocatedConnection = this.connection;
      if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
        return allocatedConnection;
      }

      // Attempt to get a connection from the pool.
      Internal.instance.get(connectionPool, address, this);
      if (connection != null) {
        return connection;
      }

      selectedRoute = route;
    }

    // If we need a route, make one. This is a blocking operation.
    if (selectedRoute == null) {
      selectedRoute = routeSelector.next();
    }

    // Create a connection and assign it to this allocation immediately. This makes it possible for
    // an asynchronous cancel() to interrupt the handshake we're about to do.
    RealConnection result;
    synchronized (connectionPool) {
      route = selectedRoute;
      refusedStreamCount = 0;
      result = new RealConnection(connectionPool, selectedRoute);
      acquire(result);
      if (canceled) throw new IOException("Canceled");
    }

    // Do TCP + TLS handshakes. This is a blocking operation.
    result.connect(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled);
    routeDatabase().connected(result.route());

    Socket socket = null;
    synchronized (connectionPool) {
      // Pool the connection.
      Internal.instance.put(connectionPool, result);

      // If another multiplexed connection to the same address was created concurrently, then
      // release this connection and acquire that one.
      if (result.isMultiplexed()) {
        socket = Internal.instance.deduplicate(connectionPool, address, this);
        result = connection;
      }
    }
    closeQuietly(socket);

    return result;
  1. 首先会找一个和streamAllocation绑定的并且可以传输stream的connection,找到了就直接返回这个connection
  2. 如果没有找到,会从连接池里面取出来一个可用的connection
  3. 如过连接池里面没有可用的connection,那么就会创建一个新的connection,并把它放到连接池里面。

1.3 往连接池添加connection

void put(RealConnection connection) {
    assert (Thread.holdsLock(this));
    if (!cleanupRunning) {
      cleanupRunning = true;
      executor.execute(cleanupRunnable);
    }
    connections.add(connection);
  }

如果connection回收器没有在运行,则启动回收器,然后将connection添加到connections 队列即连接池。

1.4 从连接池取connection

根据上面操作连接池的顺序,我们先看怎么从连接池取connection。
findConnection里面调用的Internal.instance.get(connectionPool, address, this);它的具体实现是ConnectionPool里面的RealConnection get(Address address, StreamAllocation streamAllocation)方法。

  RealConnection get(Address address, StreamAllocation streamAllocation) {
    assert (Thread.holdsLock(this));
    for (RealConnection connection : connections) {
      if (connection.isEligible(address)) {
        streamAllocation.acquire(connection);
        return connection;
      }
    }
    return null;
  }
  1. 遍历连接池,找到一个可以用的connection
  2. 如果这个connection是可用的,那就增加Connection中的List<WeakReference<StreamAllocation>>大小。List中Allocation的数量也就是物理socket被引用的计数(Refference Count),如果计数为0的话,说明此连接没有被使用,是空闲的
  3. 返回connection

1.5 connection重用

1.6 清空连接池
清空连接池里面空闲的socket

public void evictAll() {
    List<RealConnection> evictedConnections = new ArrayList<>();
    synchronized (this) {
      for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
        RealConnection connection = i.next();
        if (connection.allocations.isEmpty()) {
          connection.noNewStreams = true;
          evictedConnections.add(connection);
          i.remove();
        }
      }
    }

    for (RealConnection connection : evictedConnections) {
      closeQuietly(connection.socket());
    }
  }

遍历线程池里面所有的connection,对于没有传输stream的connection

  1. 那么就将这个connection标记为不能再传输stream。
  2. 将这个connection添加到evictedConnections这个队列里面。
  3. 删除这个connection的引用从原来的连接池里面删除掉

1.6 Connection自动回收

  void put(RealConnection connection) {
    assert (Thread.holdsLock(this));
    if (!cleanupRunning) {
      cleanupRunning = true;
      executor.execute(cleanupRunnable);
    }
    connections.add(connection);
  }

在第一次往连接池里面添加connection的时候,会检查cleanupRunning的状态,因为默认是false,所以会将cleanupRunning状态设置为true,同时会将cleanupRunnable提交到一个线程池里面。
下面看一下cleanupRunnable 的实现

private final Runnable cleanupRunnable = new Runnable() {
    @Override public void run() {
      while (true) {
        long waitNanos = cleanup(System.nanoTime());
        if (waitNanos == -1) return;
        if (waitNanos > 0) {
          long waitMillis = waitNanos / 1000000L;
          waitNanos -= (waitMillis * 1000000L);
          synchronized (ConnectionPool.this) {
            try {
              ConnectionPool.this.wait(waitMillis, (int) waitNanos);
            } catch (InterruptedException ignored) {
            }
          }
        }
      }
    }
  };

上面的代码做的事情是定时执行自动回收逻辑。执行connection回收逻辑的方法是cleanup(System.nanoTime());的源码如下:

long cleanup(long now) {
    int inUseConnectionCount = 0;
    int idleConnectionCount = 0;
    RealConnection longestIdleConnection = null;
    long longestIdleDurationNs = Long.MIN_VALUE;

    // Find either a connection to evict, or the time that the next eviction is due.
    synchronized (this) {
      //遍历连接池,找出空闲时间最长的那个空闲连接
      for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
        RealConnection connection = i.next();

        // If the connection is in use, keep searching.
        if (pruneAndGetAllocationCount(connection, now) > 0) {
          inUseConnectionCount++;
          continue;
        }
        ///并统计空闲连接数
        idleConnectionCount++;

        // 空闲线程空闲时间
        long idleDurationNs = now - connection.idleAtNanos;
        //大于最长空闲时间
        if (idleDurationNs > longestIdleDurationNs) {
          longestIdleDurationNs = idleDurationNs;
          longestIdleConnection = connection;
        }
      }
      //如果最大空闲时间大于5分钟或者最大空闲线程数大于5
      if (longestIdleDurationNs >= this.keepAliveDurationNs
          || idleConnectionCount > this.maxIdleConnections) {
        // 将该连接从连接队列删除
        connections.remove(longestIdleConnection);
      } else if (idleConnectionCount > 0) {
        // 空闲数目大于0,则返回下次执行清理程序的时间为最大存活时间与最大空闲时间之差
        return keepAliveDurationNs - longestIdleDurationNs;
      } else if (inUseConnectionCount > 0) {
        //如果在使用中的连接数目大于0,并且没有空闲连接数目则5分钟之后在执行清理逻辑
        return keepAliveDurationNs;
      } else {
        // 如果连接池里面没有连接,则将定时清理标志置为false,表示会退出自动清理逻辑
        cleanupRunning = false;
        return -1;
      }
    }
    
    //关闭一个最长时间空闲socket
    closeQuietly(longestIdleConnection.socket());

    // 表示会立即执行清理逻辑
    return 0;
  }

遍历连接池

  1. 如果该连接被引用的次数为0,则表示它是空闲连接,并统计空闲连接数,并记录空闲线程空闲时间
  2. 如果最大空闲时间大于5分钟或者最大空闲线程数大于5,将该连接从连接队列删除,并再次立即执行清理逻辑
  3. 空闲数目大于0,则返回下次执行清理程序的时间为最大存活时间与最大空闲时间之差
  4. 如果在使用中的连接数目大于0,并且没有空闲连接数目则5分钟之后在执行清理逻辑
  5. 如果连接池里面没有连接,则将定时清理标志置为false,表示会退出自动清理逻辑

下面看一下pruneAndGetAllocationCount(connection, now)的源码:

private int pruneAndGetAllocationCount(RealConnection connection, long now) {
    List<Reference<StreamAllocation>> references = connection.allocations;
    for (int i = 0; i < references.size(); ) {
      Reference<StreamAllocation> reference = references.get(i);

      if (reference.get() != null) {
        i++;
        continue;
      }

      // We've discovered a leaked allocation. This is an application bug.
      StreamAllocation.StreamAllocationReference streamAllocRef =
          (StreamAllocation.StreamAllocationReference) reference;
      String message = "A connection to " + connection.route().address().url()
          + " was leaked. Did you forget to close a response body?";
      Platform.get().logCloseableLeak(message, streamAllocRef.callStackTrace);

      references.remove(i);
      connection.noNewStreams = true;

      // If this was the last allocation, the connection is eligible for immediate eviction.
      if (references.isEmpty()) {
        connection.idleAtNanos = now - keepAliveDurationNs;
        return 0;
      }
    }

    return references.size();
  }
  1. 记录connection被引用次数
  2. 如果分配的stream为null,则把这个connection的空闲时间设置为当前时间与5分钟之差,会优先被清除掉。

遍历准备清空掉的connection的集合,将里面的socket调用socket.close();全部关闭掉
参考:
Android网络编程(八)源码解析OkHttp中篇[复用连接池]

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,542评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,596评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,021评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,682评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,792评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,985评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,107评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,845评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,299评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,612评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,747评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,441评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,072评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,828评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,069评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,545评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,658评论 2 350