也谈Okhttp3

本文基于okhttp3:okhttp:3.10.0

一、简单用法

 val client: OkHttpClient? = OkHttpClient.Builder()
                    .addInterceptor({
                        val response = it.proceed(it.request())
                        val body = it.proceed(it.request()).body()
                        response
                    }).build()
             //创建请求
            val request = Request.Builder()
                    .url("url")
                    .get()
                    .build()
             //执行请求      
            val execute = client!!.newCall(request)
                    .execute()
            if (execute.isSuccessful) {
                //获取结果
                val body = execute.body()
                val result = body!!.string().toString()
            }

okhttpClient调用newCall()方法后会创建一个RealCall对象,进行网络请求时有两种方式供我们选择一种是同步方法execute(),一种是异步方法enqueue()异步方法。我们来看下这两个方法的实现:

1.同步方法:execute()
@Override public Response execute() throws IOException {
synchronized (this) {
  if (executed) throw new IllegalStateException("Already Executed");
  executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
try {
  //将当前请求添加到runningSyncCalls正在请求的队列中
  client.dispatcher().executed(this);
  //执行请求
  Response result = getResponseWithInterceptorChain();
  if (result == null) throw new IOException("Canceled");
  return result;
} catch (IOException e) {
  eventListener.callFailed(this, e);
  throw e;
} finally {
  client.dispatcher().finished(this);
}
}

getResponseWithInterceptorChain()方法在下面(二)中进行分析。

2.异步方法:enqueue()
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
  if (executed) throw new IllegalStateException("Already Executed");
  executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
client.dispatcher().enqueue(new AsyncCall(responseCallback));
 }

这里调用了Dispatcher的enqueue()方法:

synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
  runningAsyncCalls.add(call);
  executorService().execute(call);
} else {
  readyAsyncCalls.add(call);
}

}
当正在请求的队列中的数量超过最大请求数量(默认是64)并且不同域名的请求超过最大限制(默认为5)时,该请求加入到待执行列表等着执行;否则加入执行队列中并且执行该任务。这里用了一个线程池来执行任务,默认为:

xecutorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
      new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));

最后通过执行AsyncCall的execute方法发起请求,这里和同步方法差不多

@Override protected void execute() {
  boolean signalledCallback = false;
  try {
    Response response = getResponseWithInterceptorChain();
    if (retryAndFollowUpInterceptor.isCanceled()) {
      signalledCallback = true;
      responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
    } else {
      signalledCallback = true;
      responseCallback.onResponse(RealCall.this, response);
    }
  } catch (IOException e) {
    if (signalledCallback) {
      // Do not signal the callback twice!
      Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
    } else {
      eventListener.callFailed(RealCall.this, e);
      responseCallback.onFailure(RealCall.this, e);
    }
  } finally {
    client.dispatcher().finished(this);
  }
}

二、拦截器原理浅析

拦截器是okhttp中的一大亮点,可以随意的添加拦截器然后对请求进行加工及对返回结果进行处理。上面提到的getResponseWithInterceptorChain()就是为请求添加各种拦截器。

1.RealCall.getResponseWithInterceptorChain()
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
//添加自定义拦截器
interceptors.addAll(client.interceptors());
//添加错误重试拦截器
interceptors.add(retryAndFollowUpInterceptor);
//添加请求头拦截器
interceptors.add(new BridgeInterceptor(client.cookieJar()));
//添加缓存拦截器
interceptors.add(new CacheInterceptor(client.internalCache()));
//添加网络连接拦截器
interceptors.add(new ConnectInterceptor(client));
//如果不是webSocket请求则添加网络拦截器
if (!forWebSocket) {
  interceptors.addAll(client.networkInterceptors());
}
//添加执行请求的拦截器
interceptors.add(new CallServerInterceptor(forWebSocket));
//创建拦截器执行对象
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
    originalRequest, this, eventListener, client.connectTimeoutMillis(),
    client.readTimeoutMillis(), client.writeTimeoutMillis());
//执行拦截器proceed方法
return chain.proceed(originalRequest);
}

在这里进行拦截器的添加;拦截器的原理就是递归调用各个拦截器的方法。来看下RealInterceptorChain中的具体实现:

public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
  RealConnection connection) throws IOException {
if (index >= interceptors.size()) throw new AssertionError();
calls++;
//判断原始请求的url是否发生变化
if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) {
  throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
      + " must retain the same host and port");
}
// network interceptor只能执行一次请求
if (this.httpCodec != null && calls > 1) {
  throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
      + " must call proceed() exactly once");
}
//获取拦截链中的下一个拦截器
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
    connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
    writeTimeout);
Interceptor interceptor = interceptors.get(index);
//依次调用各个拦截器
Response response = interceptor.intercept(next);

// Confirm that the next interceptor made its required call to chain.proceed().
if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
  throw new IllegalStateException("network interceptor " + interceptor
      + " must call proceed() exactly once");
}
if (response == null) {
  throw new NullPointerException("interceptor " + interceptor + " returned null");
}
if (response.body() == null) {
  throw new IllegalStateException(
      "interceptor " + interceptor + " returned a response with no body");
}
return response;
}
2.重试实现RetryAndFollowUpInterceptor拦截器
@Override public Response intercept(Chain chain) throws IOException {
//重试机制 重试次数MAX_FOLLOW_UPS=20
Request request = chain.request();
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Call call = realChain.call();
EventListener eventListener = realChain.eventListener();

StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
    createAddress(request.url()), call, eventListener, callStackTrace);
this.streamAllocation = streamAllocation;

int followUpCount = 0;
Response priorResponse = null;
//进入循环当重试次数超过设置的值、返回成功则终止
while (true) {
  if (canceled) {
    streamAllocation.release();
    throw new IOException("Canceled");
  }

  Response response;
  boolean releaseConnection = true;
  try {
    //加工之后将继续执行下一个拦截器
    response = realChain.proceed(request, streamAllocation, null, null);
    releaseConnection = false;
  } catch (RouteException e) {
    // 如果是连接失败,则判断是否需要重试连接
    if (!recover(e.getLastConnectException(), streamAllocation, false, request)) {
      throw e.getLastConnectException();
    }
    releaseConnection = false;
    continue;
  } catch (IOException e) {
    // An attempt to communicate with a server failed. The request may have been sent.
    boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
    if (!recover(e, streamAllocation, requestSendStarted, request)) throw e;
    releaseConnection = false;
    continue;
  } finally {
    // We're throwing an unchecked exception. Release any resources.
    if (releaseConnection) {
      streamAllocation.streamFailed(null);
      streamAllocation.release();
    }
  }
  if (priorResponse != null) {
    response = response.newBuilder()
        .priorResponse(priorResponse.newBuilder()
                .body(null)
                .build())
        .build();
  }
  //判断请求是否成功,失败则返回request,成功则返回null
  Request followUp = followUpRequest(response, streamAllocation.route());
  //请求成功则结束该循环
  if (followUp == null) {
    if (!forWebSocket) {
      streamAllocation.release();
    }
    return response;
  }
  closeQuietly(response.body());
  //当重试了MAX_FOLLOW_UPS时终止循环
  if (++followUpCount > MAX_FOLLOW_UPS) {
    streamAllocation.release();
    throw new ProtocolException("Too many follow-up requests: " + followUpCount);
  }
 //省略部分代码
  request = followUp;
  priorResponse = response;
}
}
recover() 判断是否需要进行重新连接
 private boolean recover(IOException e, StreamAllocation streamAllocation,
  boolean requestSendStarted, Request userRequest) {
streamAllocation.streamFailed(e);

// 获取设置,如果设置了不自动重新连接则返回false
if (!client.retryOnConnectionFailure()) return false;

// 如果网络请求已经开始,并且body内容只可以发送一次
if (requestSendStarted && userRequest.body() instanceof UnrepeatableRequestBody) return false;

// 判断异常类型,是否要继续尝试。不会重试的类型:协议异常、Socketet异常并且网络情况还没开始,ssl认证异常
if (!isRecoverable(e, requestSendStarted)) return false;

// 已经没有其他可用的路由地址了
if (!streamAllocation.hasMoreRoutes()) return false;

// 如果以上条件都不满足 那么这个请求将会重试
return true;

}

3.缓存实现CacheInterceptor拦截器
@Override public Response intercept(Chain chain) throws IOException {
//判断是否有缓存
Response cacheCandidate = cache != null
    ? cache.get(chain.request())
    : null;
long now = System.currentTimeMillis();
//判断该请求是否可以从缓存中获取
CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
Request networkRequest = strategy.networkRequest;
Response cacheResponse = strategy.cacheResponse;
if (cache != null) {
  cache.trackResponse(strategy);
}
if (cacheCandidate != null && cacheResponse == null) {
  closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
}

// 如果禁止使用网络并且本地没有相应的缓存时,直接返回错误
if (networkRequest == null && cacheResponse == null) {
  return new Response.Builder()
      .request(chain.request())
      .protocol(Protocol.HTTP_1_1)
      .code(504)
      .message("Unsatisfiable Request (only-if-cached)")
      .body(Util.EMPTY_RESPONSE)
      .sentRequestAtMillis(-1L)
      .receivedResponseAtMillis(System.currentTimeMillis())
      .build();
}
//如果不需要网络则直接从缓存中获取返回
if (networkRequest == null) {
  return cacheResponse.newBuilder()
      .cacheResponse(stripBody(cacheResponse))
      .build();
}
Response networkResponse = null;
try {
  networkResponse = chain.proceed(networkRequest);
} finally {
  // If we're crashing on I/O or otherwise, don't leak the cache body.
  if (networkResponse == null && cacheCandidate != null) {
    closeQuietly(cacheCandidate.body());
  }
}
// 更新缓存
if (cacheResponse != null) {
  if (networkResponse.code() == HTTP_NOT_MODIFIED) {
    Response response = cacheResponse.newBuilder()
        .headers(combine(cacheResponse.headers(), networkResponse.headers()))
        .sentRequestAtMillis(networkResponse.sentRequestAtMillis())
        .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
        .cacheResponse(stripBody(cacheResponse))
        .networkResponse(stripBody(networkResponse))
        .build();
    networkResponse.body().close();

    // Update the cache after combining headers but before stripping the
    // Content-Encoding header (as performed by initContentStream()).
    cache.trackConditionalCacheHit();
    cache.update(cacheResponse, response);
    return response;
  } else {
    closeQuietly(cacheResponse.body());
  }
}

Response response = networkResponse.newBuilder()
    .cacheResponse(stripBody(cacheResponse))
    .networkResponse(stripBody(networkResponse))
    .build();

if (cache != null) {
  //判断是否需要缓存(当方法为HEAD时则不缓存,当请求结果成功时判断response.cacheControl().noStore() && !request.cacheControl().noStore())
  if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
    // 将缓存写入本地
    CacheRequest cacheRequest = cache.put(response);
    return cacheWritingResponse(cacheRequest, response);
  }
  //如果该请求不在是get请求时则删除本地缓存
  if (HttpMethod.invalidatesCache(networkRequest.method())) {
    try {
      cache.remove(networkRequest);
    } catch (IOException ignored) {
      // The cache cannot be written.
    }
  }
}
return response;
}

okhttp为我们提个一个缓存策略Cache类,设置相应的请求头和相应体来进行缓存,内部使用的是LruCache算法是实现本地缓存的更新。我们这里具体看下使用缓存的条件:

3.1 使用缓存

new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get()

private CacheStrategy getCandidate() {
  // 本地没有相应的缓存,请求网络
  if (cacheResponse == null) {
    return new CacheStrategy(request, null);
  }
 //当请求为https并且TLS handshake为空时,请求网络
  if (request.isHttps() && cacheResponse.handshake() == null) {
    return new CacheStrategy(request, null);
  }
  //当请求不使用缓存时,请求网络
  if (!isCacheable(cacheResponse, request)) {
    return new CacheStrategy(request, null);
  }
    //当请求不使用缓存时,请求网络
  CacheControl requestCaching = request.cacheControl();
  if (requestCaching.noCache() || hasConditions(request)) {
    return new CacheStrategy(request, null);
  }
 //当本地缓存存在且可用时,返回缓存
  CacheControl responseCaching = cacheResponse.cacheControl();
  if (responseCaching.immutable()) {
    return new CacheStrategy(null, cacheResponse);
  }
  //定义缓存策略(设置相应请求头)
  //省略部分代码
}

三、连接复用机制(连接池)

大家知道,一个Http请求除了真正的数据传输,还需要花费一段时间在TCP连接的建立上。如果每次网络请求都要去发起一个TCP连接,那显然会非常影响网络请求速度,尤其是当多数网络请求都是连接同一个远程地址时,反复建立连接显得非常浪费。

而Http 1.1已经提供了Keep-Alive机制,也就是在数据传输完毕后,仍然会保留这条连接一段时间,下次无需再次握手即可进行数据传输。

而在OkHttp内部,也维护了一个Socket连接池,里面存放了5个并发Keep-Alive的Socket连接,每一个连接存活5分钟。当你需要建立连接时,先来池子里查一下,如果有可用的连接,则直接复用即可。

如何维护这个Socket连接池?

1、初始化
在创建OKhttpClient的时候就初始化

//默认连接池中的最大连接数为5、连接空闲的最长时间为5分钟
  public ConnectionPool() {
    this(5, 5, TimeUnit.MINUTES);
  }

  public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
    this.maxIdleConnections = maxIdleConnections;
    this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);

    // Put a floor on the keep alive duration, otherwise cleanup will spin loop.
    if (keepAliveDuration <= 0) {
      throw new IllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration);
    }
  }

2、添加Socket连接

void put(RealConnection connection) {
    //这里断言当前线程拥有ConnectionPool对象锁,因为ConnectionPool对象加锁了所以是线程安全的
    assert (Thread.holdsLock(this));
    //判断当前清除操作是否正在执行
    if (!cleanupRunning) {
      cleanupRunning = true;
      executor.execute(cleanupRunnable);
    }
    connections.add(connection);
  }

每次进行添加动作时判断该连接池是否正在执行清理工作,如果没有则进行清理操作

3、清理Socket连接池

 private final Runnable cleanupRunnable = new Runnable() {
    @Override public void run() {
      while (true) {
        long waitNanos = cleanup(System.nanoTime());
         // 当前连接池没有连接时返回并且设置当前没有进行清除连接操作
        if (waitNanos == -1) return;
        //下一次自动执行清除连接操作时间
        if (waitNanos > 0) {
          long waitMillis = waitNanos / 1000000L;
          waitNanos -= (waitMillis * 1000000L);
          synchronized (ConnectionPool.this) {
            try {
              ConnectionPool.this.wait(waitMillis, (int) waitNanos);
            } catch (InterruptedException ignored) {
            }
          }
        }
      }
    }
  };

  //如果某个连接闲置了超过5分钟或当前链接池中超过了5个连接时则清理了连接,如果没有要清理的连接,则返回下一次需要执行清理的时间
  long cleanup(long now) {
       //正在使用的连接数
      int inUseConnectionCount = 0;
      //闲置的连接数
      int idleConnectionCount = 0;
       //最长闲置时间的连接
      RealConnection longestIdleConnection = null;
      //闲置的连接最长闲置时间
      long longestIdleDurationNs = Long.MIN_VALUE;

      // Find either a connection to evict, or the time that the next eviction is due.
      synchronized (this) {
        for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
          RealConnection connection = i.next();

          // 如果当前连接正在使用中
          if (pruneAndGetAllocationCount(connection, now) > 0) {
          //正在使用的连接数+1
            inUseConnectionCount++;
            continue;
          }
          //闲置的连接数+1
          idleConnectionCount++;

          // 寻找出闲置时间最长的连接
          long idleDurationNs = now - connection.idleAtNanos;
          if (idleDurationNs > longestIdleDurationNs) {
            longestIdleDurationNs = idleDurationNs;
            longestIdleConnection = connection;
          }
        }
        //如果某个连接闲置了超过5分钟或当前链接池中超过了5个连接时则清理了连接,
        if (longestIdleDurationNs >= this.keepAliveDurationNs
            || idleConnectionCount > this.maxIdleConnections) {
          // We've found a connection to evict. Remove it from the list, then close it below (outside
          // of the synchronized block).
          connections.remove(longestIdleConnection);
        } else if (idleConnectionCount > 0) {
          // 当闲置连接数不为0时,返回下一次需要执行清理的时间
          return keepAliveDurationNs - longestIdleDurationNs;
        } else if (inUseConnectionCount > 0) {
          // 当连接数量不超过5并且没有闲置的连接时,则返回默认的闲置时间
          return keepAliveDurationNs;
        } else {
          // 当前连接池没有连接时返回-1
          cleanupRunning = false;
          return -1;
        }
      }
      //关闭不用的socket连接
      closeQuietly(longestIdleConnection.socket());

      //返回0时需要在一次的进行判断执行清理操作
      return 0;
    }

总结连接清理机制:当第一个连接添加至连接池中时,清理连接的线程就开始进行清理操作,当连接池中有连接时则该线程会一直运行,在清理过程中会查找出闲置最长时间的连接以及下一次执行清理操作的时间,此时该线程会进入等待状态。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,294评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,780评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,001评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,593评论 1 289
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,687评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,679评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,667评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,426评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,872评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,180评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,346评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,019评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,658评论 3 323
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,268评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,495评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,275评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,207评论 2 352

推荐阅读更多精彩内容

  • 这篇文章主要讲 Android 网络请求时所使用到的各个请求库的关系,以及 OkHttp3 的介绍。(如理解有误,...
    小庄bb阅读 1,154评论 0 4
  • 这段时间老李的新公司要更换网络层,知道现在主流网络层的模式是RxJava+Retrofit+OKHttp,所以老李...
    隔壁老李头阅读 32,768评论 51 406
  • 参考资源 官网 国内博客 GitHub官网 鉴于一些关于OKHttp3源码的解析文档过于碎片化,本文系统的,由浅入...
    风骨依存阅读 12,499评论 11 82
  • 这段时间又遇到新问题,四轴在解锁起飞的时候会死机重启,甚至连烧录好的代码都丢失了。 换了MPU,换了场管都没有作用...
    沧海一声笑的DIY宇宙阅读 767评论 0 0
  • 【感悟】 1、有几条不变的铁律 早上刚被闹钟叫醒 总是觉得离不开床。其实只要立即起床,然后也就没有离不开床这一念头...
    i期待阅读 267评论 0 0