OkHttp4的源码分析(三)

1、Interceptor

InterceptorsOkHttp整个框架的核心,包含了请求监控、请求重写、调用重试等机制。它主要使用责任链模式,解决请求与请求处理之间的耦合。

1.1 责任链模式

将接收对象放入链中,按照链中顺序让多个对象处理请求。请求者不用知道具体是由谁处理。解决请求与接收之间的耦合,提高灵活性。 责任链负责对请求参数的解析,所有的扩展都是针对链中节点进行扩展。

各个拦截器之间的工作关系
OkHttp链式流程.jpg

PS:RetryAndFollowUpInterceptor前面一个拦截器是用户自定义的,networkinterceptor也是自定义。用于拦截被最纯净的request,和response

1.2 Interceptor内部

 fun intercept(chain: Chain): Response

  companion object {
    /**
     * Constructs an interceptor for a lambda. This compact syntax is most useful for inline
     * interceptors.
     *
     * ```
     * val interceptor = Interceptor { chain: Interceptor.Chain ->
     *     chain.proceed(chain.request())
     * }
     * ```
     */
    inline operator fun invoke(crossinline block: (chain: Chain) -> Response): Interceptor =
      Interceptor { block(it) }
  }

定义了一个intercept方法用于实现拦截器的具体业务

  interface Chain {
    fun request(): Request

    @Throws(IOException::class)
    fun proceed(request: Request): Response

    /**
     * Returns the connection the request will be executed on. This is only available in the chains
     * of network interceptors; for application interceptors this is always null.
     */
    fun connection(): Connection?

    fun call(): Call

    fun connectTimeoutMillis(): Int

    fun withConnectTimeout(timeout: Int, unit: TimeUnit): Chain

    fun readTimeoutMillis(): Int

    fun withReadTimeout(timeout: Int, unit: TimeUnit): Chain

    fun writeTimeoutMillis(): Int

    fun withWriteTimeout(timeout: Int, unit: TimeUnit): Chain
  }

定义了一个链的接口,用来让整个拦截器链运行起来。

2、getResponseWithInterceptorChain()

上一章我们说到,无论是同步请求,还是异步请求,最后的Response都会通过getResponseWithInterceptorChain()来获取

 internal fun getResponseWithInterceptorChain(): Response {
    // Build a full stack of interceptors.
    val interceptors = mutableListOf<Interceptor>()
    interceptors += client.interceptors
    interceptors += RetryAndFollowUpInterceptor(client)
    interceptors += BridgeInterceptor(client.cookieJar)
    interceptors += CacheInterceptor(client.cache)
    interceptors += ConnectInterceptor
    if (!forWebSocket) {
      interceptors += client.networkInterceptors
    }
    interceptors += CallServerInterceptor(forWebSocket)

    val chain = RealInterceptorChain(
        call = this,
        interceptors = interceptors,
        index = 0,
        exchange = null,
        request = originalRequest,
        connectTimeoutMillis = client.connectTimeoutMillis,
        readTimeoutMillis = client.readTimeoutMillis,
        writeTimeoutMillis = client.writeTimeoutMillis
    )

    var calledNoMoreExchanges = false
    try {
      val response = chain.proceed(originalRequest)
      if (isCanceled()) {
        response.closeQuietly()
        throw IOException("Canceled")
      }
      return response
    } catch (e: IOException) {
      calledNoMoreExchanges = true
      throw noMoreExchanges(e) as Throwable
    } finally {
      if (!calledNoMoreExchanges) {
        noMoreExchanges(null)
      }
    }
  }

首先,定义了一个interceptors集合,并却依次将RetryAndFollowUpInterceptorBridgeInterceptorCacheInterceptorConnectInterceptor放入。然后创建了一个chain:RealInterceptorChain,设置其索引归零,初始请求,连接时间参数等等。

 val response = chain.proceed(originalRequest)

由chain的proceed()方法,开始第一个拦截器的业务、

2.1 RetryAndFollowUpInterceptor

“重试和重定向拦截器”,看名字可以看出,该拦截器的主要是负责请求失败的时候重新尝试的。但是该拦截器还有一个很关键的前置工作

 call.enterNetworkInterceptorExchange(request, newExchangeFinder)

request是原始的请求,newExchangeFinder是一个Boolean参数,从最开始的业务进去,这个Boolean类型是true的。

  fun enterNetworkInterceptorExchange(request: Request, newExchangeFinder: Boolean) {
    check(interceptorScopedExchange == null)

    synchronized(this) {
      check(!responseBodyOpen) {
        "cannot make a new request because the previous response is still open: " +
            "please call response.close()"
      }
      check(!requestBodyOpen)
    }

    if (newExchangeFinder) {
      this.exchangeFinder = ExchangeFinder(
          connectionPool,
          createAddress(request.url),
          this,
          eventListener
      )
    }
  }

也就是说,RetryAndFollowUpInterceptor前置工作,在exchangeFinder 为空的前提下,会先创建一个exchangeFinder 。这个是进行连接的准备工作。

 while (true) {
      call.enterNetworkInterceptorExchange(request, newExchangeFinder)

      var response: Response
      var closeActiveExchange = true
      try {
        if (call.isCanceled()) {
          throw IOException("Canceled")
        }

        try {
          response = realChain.proceed(request)
          newExchangeFinder = true
        } catch (e: RouteException) {
          // The attempt to connect via a route failed. The request will not have been sent.
          if (!recover(e.lastConnectException, call, request, requestSendStarted = false)) {
            throw e.firstConnectException.withSuppressed(recoveredFailures)
          } else {
            recoveredFailures += e.firstConnectException
          }
          newExchangeFinder = false
          continue
        } catch (e: IOException) {
          // An attempt to communicate with a server failed. The request may have been sent.
          if (!recover(e, call, request, requestSendStarted = e !is ConnectionShutdownException)) {
            throw e.withSuppressed(recoveredFailures)
          } else {
            recoveredFailures += e
          }
          newExchangeFinder = false
          continue
        }

        // Attach the prior response if it exists. Such responses never have a body.
        if (priorResponse != null) {
          response = response.newBuilder()
              .priorResponse(priorResponse.newBuilder()
                  .body(null)
                  .build())
              .build()
        }

        val exchange = call.interceptorScopedExchange
        val followUp = followUpRequest(response, exchange)

        if (followUp == null) {
          if (exchange != null && exchange.isDuplex) {
            call.timeoutEarlyExit()
          }
          closeActiveExchange = false
          return response
        }

        val followUpBody = followUp.body
        if (followUpBody != null && followUpBody.isOneShot()) {
          closeActiveExchange = false
          return response
        }

        response.body?.closeQuietly()

        if (++followUpCount > MAX_FOLLOW_UPS) {
          throw ProtocolException("Too many follow-up requests: $followUpCount")
        }

        request = followUp
        priorResponse = response
      } finally {
        call.exitNetworkInterceptorExchange(closeActiveExchange)
      }
    }

可以看到,在 response = realChain.proceed(request) 之后,Interceptor对错误检查,通过recover()的方法,把不能重连的错误筛选出来,再确定能过重连或者重定向的时候,通过while循环,重新开始责任链的业务。

2.2 BridgeInterceptor

这个Interceptor主要负责对请求信息的加工,比如加"Content-Length"啊,"Host"啊等等,后面收到response同样是对这些信息处理,去除多余的文本。这里就不对这个拦截器做过多的解释。

2.3 CacheInterceptor

同样的,这个Interceptor是负责做缓存处理的,代码相对独立,以后如果有时间再更新。

2.4 ConnectInterceptor

ConnectInterceptor,真正的发起连接的拦截器。

object ConnectInterceptor : Interceptor {
  @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    val realChain = chain as RealInterceptorChain
    val exchange = realChain.call.initExchange(chain)
    val connectedChain = realChain.copy(exchange = exchange)
    return connectedChain.proceed(realChain.request)
  }
}

代码看起来很简单,但是一点都不简单。

  internal fun initExchange(chain: RealInterceptorChain): Exchange {
    synchronized(this) {
      check(expectMoreExchanges) { "released" }
      check(!responseBodyOpen)
      check(!requestBodyOpen)
    }

    val exchangeFinder = this.exchangeFinder!!
    val codec = exchangeFinder.find(client, chain)
    val result = Exchange(this, eventListener, exchangeFinder, codec)
    this.interceptorScopedExchange = result
    this.exchange = result
    synchronized(this) {
      this.requestBodyOpen = true
      this.responseBodyOpen = true
    }

    if (canceled) throw IOException("Canceled")
    return result
  }

  fun acquireConnectionNoEvents(connection: RealConnection) {
    connection.assertThreadHoldsLock()

    check(this.connection == null)
    this.connection = connection
    connection.calls.add(CallReference(this, callStackTrace))
  }

通过initExchange(chain)方法 创建一个带连接的codec(coder & decoder编码解码器),然后通过Exchange()进行流操作。PS:codec为读写工具,Exchange为读写管理员

 fun find(
    client: OkHttpClient,
    chain: RealInterceptorChain
  ): ExchangeCodec {
    try {
      val resultConnection = findHealthyConnection(
          connectTimeout = chain.connectTimeoutMillis,
          readTimeout = chain.readTimeoutMillis,
          writeTimeout = chain.writeTimeoutMillis,
          pingIntervalMillis = client.pingIntervalMillis,
          connectionRetryEnabled = client.retryOnConnectionFailure,
          doExtensiveHealthChecks = chain.request.method != "GET"
      )
      return resultConnection.newCodec(client, chain)
    } catch (e: RouteException) {
      trackFailure(e.lastConnectException)
      throw e
    } catch (e: IOException) {
      trackFailure(e)
      throw RouteException(e)
    }

创建一个可用的Codec有两步,一通过findHealthyConnection()寻找一个可用连接,二通过newCodec()创建一个可用的Codec。(也就是到底是通过http1还是http2来读写)

  val candidate = findConnection(
          connectTimeout = connectTimeout,
          readTimeout = readTimeout,
          writeTimeout = writeTimeout,
          pingIntervalMillis = pingIntervalMillis,
          connectionRetryEnabled = connectionRetryEnabled
      )

findHealthyConnection()通过findConnection()寻找连接,然后检查连接是否健康,如果不健康就再取,健康就跳出while循环

2.4.1 findConnection()

  // 我们需要一个新的连接。给它标记新的状态。
    refusedStreamCount = 0
    connectionShutdownCount = 0
    otherFailureCount = 0

    // 尝试从池中获取连接。
    if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {
      val result = call.connection!!
      eventListener.connectionAcquired(call, result)
      return result

OkHttp是有连接池的。所有优先考虑从连接池中取资源,避免浪费。通过
callAcquirePooledConnection()尝试取连接

 fun callAcquirePooledConnection(
    address: Address,
    call: RealCall,
    routes: List<Route>?,
    requireMultiplexed: Boolean
  ): Boolean {
    for (connection in connections) {
      synchronized(connection) {
        if (requireMultiplexed && !connection.isMultiplexed) return@synchronized
        if (!connection.isEligible(address, routes)) return@synchronized
        call.acquireConnectionNoEvents(connection)
        return true
      }
    }
    return false
  }

第一个if判断是否可以多路复用,第一次从池中获取,requireMultiplexed为false
第二个if判断连接是否可以的。 call.acquireConnectionNoEvents(connection)将连接给到当前call。isEligible()会比对地址端口等等,会拿http2连接合并的连接,比对证书等等。

如果第一次从池中没有拿到那就再拿一次

connectionPool.callAcquirePooledConnection(address, call, routes, false)

注意,这次的请求是加上了routes的属性,这里可以拿到更加广泛的连接,比如http2的连接合并过的连接,这里是之前提到的isEligible()的功能之一,这里必须是有routes,才会尝试拿http2的连接合并过的连接,详细业务可以看isEligible()内。

如果这次也没有拿到连接,这时候findConnection()会尝试建立连接。

    val newConnection = RealConnection(connectionPool, route)
    call.connectionToCancel = newConnection
    try {
      newConnection.connect(
          connectTimeout,
          readTimeout,
          writeTimeout,
          pingIntervalMillis,
          connectionRetryEnabled,
          call,
          eventListener
      )
    } finally {
      call.connectionToCancel = null
    }

注意,此次创建connect后不会直接建立连接!而是会再一次callAcquirePooledConnection() 因为存在一种极端情况,同时访问同一域名下得不同地址,如 https://xxxx.com/1https://xxxx.com/2,如果同时建立两个连接会造成资源浪费,所以,这种情况下,谁先创建好connect并却放入连接池的,就用谁的连接。同时此次的callAcquirePooledConnection()是只拿多路复用的连接。

以上的操作,都是在call没有connection的情况下发生的。还有一种可能,call已经有connection的,经历重定向,或者重现,又来到了这里。这个时候

  val callConnection = call.connection // This may be mutated by releaseConnectionNoEvents()!
    if (callConnection != null) {
      var toClose: Socket? = null
      synchronized(callConnection) {
        if (callConnection.noNewExchanges || !sameHostAndPort(callConnection.route().address.url)) {
          toClose = call.releaseConnectionNoEvents()
        }
      }

如果connection 不适合这次新请求,那就丢弃连接

 if (call.connection != null) {
        check(toClose == null)
        return callConnection
      }

如果connection 适合这次新请求

综上所述,整个连接所需要的东西都在ExchangeFinder里面做处理,所以这个时候回归RetryAndFollowUpInterceptor可以发现,如果在做重连时,newExchangeFinder的值为false,直接复用上次可用的exchangeFinder。

connect()

看完了OKHTTP的连接策略,我们再来看连接本身的的业务

   newConnection.connect(
          connectTimeout,
          readTimeout,
          writeTimeout,
          pingIntervalMillis,
          connectionRetryEnabled,
          call,
          eventListener
      )

可以看的到connect()方法,提供了Tunnel()(http代理https)方法,connectSocket()方法建立TCP连接,establishProtocol()方法建立HTTP连接,connectTls()建立TLS连接、如果要做不校验证书的HTTPS功能,主要就是修改这里的逻辑。

2.5 ConnectInterceptor

CallServerInterceptor最后一个拦截器,实际工作就是操控你的ExChange进行对读写工作,如果有response,就从最后一个拦截器开始往第一个拦截器走,并却做拦截器的后置工作。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 226,097评论 6 523
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 97,198评论 3 410
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 173,602评论 0 370
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 61,750评论 1 304
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 70,684评论 6 404
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 54,151评论 1 317
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 42,349评论 3 433
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 41,430评论 0 282
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 47,986评论 1 328
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 39,969评论 3 351
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 42,071评论 1 359
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 37,667评论 5 352
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 43,358评论 3 342
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 33,757评论 0 25
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 34,944评论 1 278
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 50,684评论 3 384
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 47,123评论 2 368

推荐阅读更多精彩内容