本文基于下述版本进行分析
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.4.1</version>
</dependency>
下述所有代码进行了必要的删减
发送请求流程
当我们要访问一个接口执行HttpClient的execute()的方法时,会运用责任链模式走到MainClientExec的execute()中;
public CloseableHttpResponse execute(
final HttpRoute route,
final HttpRequestWrapper request,
final HttpClientContext context,
final HttpExecutionAware execAware) throws IOException, HttpException {
//1. 从池中获取连接
Object userToken = context.getUserToken();
final ConnectionRequest connRequest = connManager.requestConnection(route, userToken);
final RequestConfig config = context.getRequestConfig();
final HttpClientConnection managedConn;
//ConnectionRequestTimeout配置用在这里
final int timeout = config.getConnectionRequestTimeout();
managedConn = connRequest.get(timeout > 0 ? timeout : 0, TimeUnit.MILLISECONDS);
context.setAttribute(HttpCoreContext.HTTP_CONNECTION, managedConn);
//第二个配置:检查connection的有效性
if (config.isStaleConnectionCheckEnabled()) {
// validate connection
if (managedConn.isOpen()) {
this.log.debug("Stale connection check");
if (managedConn.isStale()) {
this.log.debug("Stale connection detected");
managedConn.close();
}
}
}
final ConnectionHolder connHolder = new ConnectionHolder(this.log, this.connManager, managedConn);
try {
HttpResponse response;
for (int execCount = 1;; execCount++) {
if (!managedConn.isOpen()) {//没有绑定socket
//上面已经获取了connection,这里就要把这个connection和一个socket绑定了
this.log.debug("Opening connection " + route);
//这里会创建tcp/ip连接,并把socket绑定到managedConn上
establishRoute(proxyAuthState, managedConn, route, request, context);
}
//在真正和服务器交互之前,还要设置好socketTimeOut
final int timeout = config.getSocketTimeout();
if (timeout >= 0) {
managedConn.setSocketTimeout(timeout);
}
//2. 真正发送数据
response = requestExecutor.execute(request, managedConn, context);
// The connection is in or can be brought to a re-usable state.
if (reuseStrategy.keepAlive(response, context)) {
final long duration = keepAliveStrategy.getKeepAliveDuration(response, context);
connHolder.setValidFor(duration, TimeUnit.MILLISECONDS);
//这个会影响releaseConnection()的行为
connHolder.markReusable();
} else {
connHolder.markNonReusable();
}
}
// check for entity, release connection if possible
final HttpEntity entity = response.getEntity();
if (entity == null || !entity.isStreaming()) {
// connection not needed and (assumed to be) in re-usable state
connHolder.releaseConnection();
return new HttpResponseProxy(response, null);
} else {
return new HttpResponseProxy(response, connHolder);
}
} catch (...) {
...
}
}
大概总结一下上述流程:
- connectionPool中获取connection(还有各种验证);
- 使用这个connection发送数据;
- 根据返回的response,设置一些参数,比如keepAlive;
- 释放这个连接并返回response中的数据;
池中获取连接
池化技术相信很多人都使用过,比如ThreadPool,JDBCPool(DataSource)等。接下来看一下HttpConnectionPool的工作原理。
// PoolingHttpClientConnectionManager.java
public ConnectionRequest requestConnection(
final HttpRoute route,
final Object state) {
//这里是真正干活的
final Future<CPoolEntry> future = this.pool.lease(route, state, null);
return new ConnectionRequest() {
@Override
public boolean cancel() {
return future.cancel(true);
}
@Override
public HttpClientConnection get(
final long timeout,
final TimeUnit tunit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {
return leaseConnection(future, timeout, tunit);
}
};
}
lease()的返回值实际是自定义的一个Future,其实现的get()中调用了getPoolEntryBlocking(),在研究具体的代码之前,需要先说明一下代码中几个集合的作用,便于理解,如下图:
HttpClientPool(姑且称之为HPool吧)中维护了多个pool(specific pool,姑且称之为UPool吧), 一个url会对应一个pool,不同颜色的connection可以理解为访问不同的url创建的;其中的collection的含义如下:
- leased: 总的借出去的connection;
- available:可用的connection;
- connection pool:url对应的pool;
- pending:等待的线程队列;
在程序中,leased和available实际的和为allocatedCount。
UPool的结构和HPool基本一致,只是这里面的connection才是真正被使用的,每次当有线程来获取connection的时候,会到一个具体的UPool中来查找connection。HPool中维护的leased、available和pending是用来统计的;
当连接池里的connection超出限制时,当前线程就会被放入pending中等待被唤醒;
了解了上述的设计,读下面的代码就轻而易举了。
private E getPoolEntryBlocking(
final T route, final Object state,
final long timeout, final TimeUnit tunit,
final Future<E> future) throws IOException, InterruptedException, TimeoutException {
Date deadline = null;
if (timeout > 0) {
deadline = new Date (System.currentTimeMillis() + tunit.toMillis(timeout));
}
this.lock.lock();
try {
//定位 UPool
final RouteSpecificPool<T, C, E> pool = getPool(route);
E entry;
for (;;) {//死循环-1
for (;;) { //死循环-2:循环直到从UPool中获取一个没有过期的connection
entry = pool.getFree(state);
/////////////////////////////////////////////////////////// getFree()方法体
public E getFree(final Object state) {
if (!this.available.isEmpty()) {//有可用的connection
if (state != null) { //state与认证有关,先忽略
final Iterator<E> it = this.available.iterator();
while (it.hasNext()) {
final E entry = it.next();
if (state.equals(entry.getState())) {
it.remove();
this.leased.add(entry);
return entry;
}
}
}
final Iterator<E> it = this.available.iterator();
while (it.hasNext()) {
final E entry = it.next();
if (entry.getState() == null) {
it.remove(); //UPool的available中删掉这个connection
this.leased.add(entry);//UPool的leased中添加这个connection
return entry;
}
}
}
//走到这里说明没有可用的connection,下文一定会创建
return null;
}
///////////////////////////////////////////////////////////
if (entry == null) {//没有借到connection
break;
}
if (entry.isExpired(System.currentTimeMillis())) {
entry.close();
}
if (entry.isClosed()) {
//这个connection关闭了(这里是底层socket的关闭),也把HPool中available和leased中保存的删掉,池里彻底没有这个connection了
this.available.remove(entry);
pool.free(entry, false);
} else {
break;
}
}//死循环-2结束
if (entry != null) {//上面借到了connection
//HPool中做相应的处理以作统计用
this.available.remove(entry);
this.leased.add(entry);
//钩子方法
onReuse(entry);
return entry;
}
// 走到这里说明没有获取到有效的connection,需要创建
// 创建前先压缩一下UPool,把暂时空闲的connection删掉,腾出地儿
final int maxPerRoute = getMax(route);
// Shrink the pool prior to allocating a new connection
final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
if (excess > 0) {
for (int i = 0; i < excess; i++) {
final E lastUsed = pool.getLastUsed();
if (lastUsed == null) {
break;
}
lastUsed.close();
this.available.remove(lastUsed);
pool.remove(lastUsed);
}
}
//UPool中的connection量没到最大值才能新建
if (pool.getAllocatedCount() < maxPerRoute) {
final int totalUsed = this.leased.size();
final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
//也需要满足HPool对connection数量总的限制
if (freeCapacity > 0) {
final int totalAvailable = this.available.size();
// HPool中,总的可用的connection很多,几乎没有使用
// 为了让当前的url可以新创建一个connection,随机删除一个可用的connection
if (totalAvailable > freeCapacity - 1) {
if (!this.available.isEmpty()) {
final E lastUsed = this.available.removeLast();
lastUsed.close();
final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
otherpool.remove(lastUsed);
}
}
//已经删除了一个没有使用的connection把地儿挪了出来,接着创建当前url的connection
final C conn = this.connFactory.create(route);
//放入HPool和UPool的leased中
entry = pool.add(conn);
this.leased.add(entry);
return entry;
}
}
//走到这里说明pool已经满了,不能创建新的connection
boolean success = false;
//一个线程对应一个future
try {
if (future.isCancelled()) {
throw new InterruptedException("Operation interrupted");
}
//放入pending队列中
pool.queue(future);
this.pending.add(future);
if (deadline != null) {
//ConnectionRequestTimeout的设置最终会在这里起作用
//当前线程park了直到deadline这个时间点
//1. 线程一直park到deadline,返回false;
//2. 还没到deadline,被signal了,返回true;
//这是一个相对积极的信号,说明可能存在可用的connection。
//那么谁来调用signal呢?有两种可能:a. releaseConnection();b. 当前的获取操作被cancel()
//3. 被中断了,success也是false,直接走入finally;
success = this.condition.awaitUntil(deadline);
} else {
this.condition.await();
success = true;
}
// park被signal或睡到自然醒后,判断当前获取connection的操作是否被cancel
// 这里的cancel和FutureTask的cancel还不太一样。FutureTask的cancel是直接对线程进行interrupt(),这里只是对一个变量的值进行了改变;
if (future.isCancelled()) {
throw new InterruptedException("Operation interrupted");
}
} finally {
// In case of 'success', we were woken up by the
// connection pool and should now have a connection
// waiting for us, or else we're shutting down.
// Just continue in the loop, both cases are checked.
pool.unqueue(future);
this.pending.remove(future);
}
if (!success && (deadline != null && deadline.getTime() <= System.currentTimeMillis())) {
//这里说明这个线程在deadline之前被中断了,或者等到醒来都没有新的connection可用
break;//跳出死循环-1
}
} //死循环-1 结束
throw new TimeoutException("Timeout waiting for connection");
} finally {
this.lock.unlock();
}
}
释放连接
在上文中提到,在response返回给客户端之前会释放连接,接下来我们看一下释放的过程。
// ConnectionHolder.java
public void releaseConnection() {
//一个connection只能释放一次,因此要加锁
synchronized (this.managedConn) {
if (this.released) {
return;
}
this.released = true;
//上文说过,reuseable会影响释放的过程
if (this.reusable) {
//可重复使用的connection,其实就是把Pool中leased里的connection挪到available中的过程
//response的http头可能是这个样子: Keep-Alive: timeout=5, max=100
//这里的validDuration实际上是服务端返回的keep-alive的时间,若没有,就为-1
this.manager.releaseConnection(this.managedConn,
this.state, this.validDuration, this.tunit);
} else {
try {
//这里是真正的关闭,意味着socket也已经关闭
this.managedConn.close();
log.debug("Connection discarded");
} catch (final IOException ex) {
if (this.log.isDebugEnabled()) {
this.log.debug(ex.getMessage(), ex);
}
} finally {
this.manager.releaseConnection(
this.managedConn, null, 0, TimeUnit.MILLISECONDS);
}
}
}
}
池中释放就是把leased的connection挪到available中,但除了这个动作,还要有别的地方需要注意。available中可用connection并不是永远都有效的,因为tcp/ip协议是全双工方式工作,一个connection是否有效,要根据双方的时时状态来更新connection的生命周期。实际工作中,客户端一般要随服务端的状态来改变。比如服务端返回值中显示keepalive为10s,那么当这个connection在available中的存活时间也不能超过10s,否则就有问题。
// PoolingHttpClientConnectionManager.java
public void releaseConnection(
final HttpClientConnection managedConn,
final Object state,
final long keepalive, final TimeUnit tunit) {
Args.notNull(managedConn, "Managed connection");
synchronized (managedConn) {
final CPoolEntry entry = CPoolProxy.detach(managedConn);
if (entry == null) {
return;
}
final ManagedHttpClientConnection conn = entry.getConnection();
try {
if (conn.isOpen()) {
final TimeUnit effectiveUnit = tunit != null ? tunit : TimeUnit.MILLISECONDS;
entry.setState(state);
// 存活的最后时间点是 放入available的那一刻向后推keepalive;
// 当然,如果这个时间点在我们初始化时设置的最后时间点之后,还是以设置的值为准
entry.updateExpiry(keepalive, effectiveUnit);
if (this.log.isDebugEnabled()) {
final String s;
if (keepalive > 0) {
s = "for " + (double) effectiveUnit.toMillis(keepalive) / 1000 + " seconds";
} else {
s = "indefinitely";
}
this.log.debug("Connection " + format(entry) + " can be kept alive " + s);
}
}
} finally {
// 这里就是connection从leased到available的挪动
// HPool和UPool都要进行挪动的操作并唤醒等待的线程
this.pool.release(entry, conn.isOpen() && entry.isRouteComplete());
if (this.log.isDebugEnabled()) {
this.log.debug("Connection released: " + format(entry) + formatStats(entry.getRoute()));
}
}
}
}
关闭连接
除了不能重复使用的connection需要关闭外,一些超时无用的connection也要关闭
// 这个方法可以传入参数,可以由业务方根据实际情况设定值
public void closeIdleConnections(final long idleTimeout, final TimeUnit tunit) {
if (this.log.isDebugEnabled()) {
this.log.debug("Closing connections idle longer than " + idleTimeout + " " + tunit);
}
this.pool.closeIdle(idleTimeout, tunit);
}
// 这个方法没有参数,那么哪些算是expired的呢?
// 由上节我们知道,在释放连接的时候,会根据服务端的keepalive(没有的话,也有默认值) 设置expired的deadline;
public void closeExpiredConnections() {
this.log.debug("Closing expired connections");
this.pool.closeExpired();
}
idle: 从connection创建的时间点开始的idleTimeout时间范围,是一个绝对的时间范围;比如一个connection是10:00创建,idleTimeout设为60s,那么10:01以后这个connection就得关闭;
expire:expire需要一个deadline,这个deadline每次release的时候都会更新,值为release的时间点 + keepalive(或validityDeadline),是一个相对的时间范围;比如一个connection最后一次release的时间点是10:00,keepalive=6min,validityDeadline=5min,那么deadline=10:05,如果这个connection再没有使用过,则过了10:05,就算是过期的connection,应该被关闭; 如果在10:04的时候又被借出去使用了,release的时间是10:10,keepalive还是为6min,那么过了10:15,这个connection就应关闭了;
很多情况response的keepalive和validityDeadline都没有值,那么这个时候deadline就是Long.MAX_VALUE了,这个时候只能通过idle的值来关闭不需要的connection了;
下面再说明一下几个时间点
// 首次创建connection
public PoolEntry(final String id, final T route, final C conn,
final long timeToLive, final TimeUnit tunit) {
super();
Args.notNull(route, "Route");
Args.notNull(conn, "Connection");
Args.notNull(tunit, "Time unit");
this.id = id;
this.route = route;
this.conn = conn;
this.created = System.currentTimeMillis();
this.updated = this.created; //这个就是connection被创建的时间,会用于idle的判断
if (timeToLive > 0) { //这个值通过HttpClientBuilder.setConnectionTimeToLive()传入
final long deadline = this.created + tunit.toMillis(timeToLive);
// If the above overflows then default to Long.MAX_VALUE
this.validityDeadline = deadline > 0 ? deadline : Long.MAX_VALUE;
} else {
this.validityDeadline = Long.MAX_VALUE;
}
this.expiry = this.validityDeadline; //默认的expire deadline
}
上述两种关闭connection的方式都是从时间入手,到了一个时间点,过期的connection都干掉。现在假如把connection的idleTimeout设为10天,expired的deadline没有设置,即为Long.MAX_VALUE,这个时候池里面的connection会有什么问题?服务器端的connection不会保留10天这么久,很快就会断掉,那么此时池里的connection实际上就是半双工状态了,这个不正常的connection会被客户端获取到。为了解决这个问题,引入了validateAfterInactivity(默认5s)。
for (;;) {
final E leasedEntry = getPoolEntryBlocking(route, state, timeout, tunit, this);
//池中获取的connection要验证
if (validateAfterInactivity > 0) {
//比如10:00创建的connection,那么10:05后就要验证了
if (leasedEntry.getUpdated() + validateAfterInactivity <= System.currentTimeMillis()) {
if (!validate(leasedEntry)) {
//validate调用的是connection的isStale()
//////////////////////////////////////////////////////////////
public boolean isStale() {
if (!isOpen()) { //没有绑定socket 或 socket关闭
return true;
}
try {
//其实socket没读到数据也不能说明该socket无效
//这里我觉得是一种较悲观的处理,宁可错杀一千,不可放过一个
final int bytesRead = fillInputBuffer(1);
return bytesRead < 0;
} catch (final SocketTimeoutException ex) {
//这里要注意,SocketTimeoutException不能说明这个connection无效
return false; //上面的if无法进入,这个connection可能有问题
} catch (final IOException ex) {
return true;
}
}
//////////////////////////////////////////////////////////////
leasedEntry.close();
release(leasedEntry, false);
continue;
}
}
}
entryRef.set(leasedEntry);
done.set(true);
onLease(leasedEntry);
if (callback != null) {
callback.completed(leasedEntry);
}
return leasedEntry;
}
最后,本文有点长,如果读者觉得有哪里不对的地方,欢迎批评指正。