Apache之HttpClient

本文基于下述版本进行分析

<dependency>
      <groupId>org.apache.httpcomponents</groupId>
      <artifactId>httpclient</artifactId>
      <version>4.4.1</version>
</dependency>

下述所有代码进行了必要的删减

发送请求流程

当我们要访问一个接口执行HttpClientexecute()的方法时,会运用责任链模式走到MainClientExecexecute()中;

public CloseableHttpResponse execute(
            final HttpRoute route,
            final HttpRequestWrapper request,
            final HttpClientContext context,
            final HttpExecutionAware execAware) throws IOException, HttpException {
        //1. 从池中获取连接
        Object userToken = context.getUserToken();
        final ConnectionRequest connRequest = connManager.requestConnection(route, userToken);
        
        final RequestConfig config = context.getRequestConfig();
        final HttpClientConnection managedConn;
        
        //ConnectionRequestTimeout配置用在这里
        final int timeout = config.getConnectionRequestTimeout();
        managedConn = connRequest.get(timeout > 0 ? timeout : 0, TimeUnit.MILLISECONDS);
        context.setAttribute(HttpCoreContext.HTTP_CONNECTION, managedConn);

        //第二个配置:检查connection的有效性
        if (config.isStaleConnectionCheckEnabled()) {
            // validate connection
            if (managedConn.isOpen()) {
                this.log.debug("Stale connection check");
                if (managedConn.isStale()) {
                    this.log.debug("Stale connection detected");
                    managedConn.close();
                }
            }
        }

        final ConnectionHolder connHolder = new ConnectionHolder(this.log, this.connManager, managedConn);
        try {
            HttpResponse response;
            for (int execCount = 1;; execCount++) {
                if (!managedConn.isOpen()) {//没有绑定socket
                    //上面已经获取了connection,这里就要把这个connection和一个socket绑定了
                    this.log.debug("Opening connection " + route);
                   //这里会创建tcp/ip连接,并把socket绑定到managedConn上
                   establishRoute(proxyAuthState, managedConn, route, request, context);
                }
                //在真正和服务器交互之前,还要设置好socketTimeOut
                final int timeout = config.getSocketTimeout();
                if (timeout >= 0) {
                    managedConn.setSocketTimeout(timeout);
                }
                
                //2. 真正发送数据
                response = requestExecutor.execute(request, managedConn, context);

                // The connection is in or can be brought to a re-usable state.
                if (reuseStrategy.keepAlive(response, context)) {
                    final long duration = keepAliveStrategy.getKeepAliveDuration(response, context);
                    connHolder.setValidFor(duration, TimeUnit.MILLISECONDS);
                    //这个会影响releaseConnection()的行为
                    connHolder.markReusable(); 
                } else {
                    connHolder.markNonReusable();
                }          
            }
            // check for entity, release connection if possible
            final HttpEntity entity = response.getEntity();
            if (entity == null || !entity.isStreaming()) {
                // connection not needed and (assumed to be) in re-usable state
                connHolder.releaseConnection();
                return new HttpResponseProxy(response, null);
            } else {
                return new HttpResponseProxy(response, connHolder);
            }
        } catch (...) {
           ...
        } 
}

大概总结一下上述流程:

  1. connectionPool中获取connection(还有各种验证);
  2. 使用这个connection发送数据;
  3. 根据返回的response,设置一些参数,比如keepAlive;
  4. 释放这个连接并返回response中的数据;

池中获取连接

池化技术相信很多人都使用过,比如ThreadPool,JDBCPool(DataSource)等。接下来看一下HttpConnectionPool的工作原理。

//  PoolingHttpClientConnectionManager.java
public ConnectionRequest requestConnection(
            final HttpRoute route,
            final Object state) {
        //这里是真正干活的
        final Future<CPoolEntry> future = this.pool.lease(route, state, null);
        return new ConnectionRequest() {

            @Override
            public boolean cancel() {
                return future.cancel(true);
            }

            @Override
            public HttpClientConnection get(
                    final long timeout,
                    final TimeUnit tunit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {
                return leaseConnection(future, timeout, tunit);
            }
        };
 }

lease()的返回值实际是自定义的一个Future,其实现的get()中调用了getPoolEntryBlocking(),在研究具体的代码之前,需要先说明一下代码中几个集合的作用,便于理解,如下图:

HttpClientPool.png

HttpClientPool(姑且称之为HPool吧)中维护了多个pool(specific pool,姑且称之为UPool吧), 一个url会对应一个pool,不同颜色的connection可以理解为访问不同的url创建的;其中的collection的含义如下:

  • leased: 总的借出去的connection;
  • available:可用的connection;
  • connection pool:url对应的pool;
  • pending:等待的线程队列;
    在程序中,leased和available实际的和为allocatedCount。
SpecificPool.png

UPool的结构和HPool基本一致,只是这里面的connection才是真正被使用的,每次当有线程来获取connection的时候,会到一个具体的UPool中来查找connection。HPool中维护的leased、available和pending是用来统计的;

当连接池里的connection超出限制时,当前线程就会被放入pending中等待被唤醒;

了解了上述的设计,读下面的代码就轻而易举了。

 private E getPoolEntryBlocking(
            final T route, final Object state,
            final long timeout, final TimeUnit tunit,
            final Future<E> future) throws IOException, InterruptedException, TimeoutException {

        Date deadline = null;
        if (timeout > 0) {
            deadline = new Date (System.currentTimeMillis() + tunit.toMillis(timeout));
        }
        this.lock.lock();
        try {
            //定位 UPool
            final RouteSpecificPool<T, C, E> pool = getPool(route);
            E entry;
            for (;;) {//死循环-1
                for (;;) { //死循环-2:循环直到从UPool中获取一个没有过期的connection
                    entry = pool.getFree(state);
                    /////////////////////////////////////////////////////////// getFree()方法体
                    public E getFree(final Object state) {
                            if (!this.available.isEmpty()) {//有可用的connection
                                  if (state != null) { //state与认证有关,先忽略
                                        final Iterator<E> it = this.available.iterator();
                                        while (it.hasNext()) {
                                              final E entry = it.next();
                                              if (state.equals(entry.getState())) {
                                                    it.remove();
                                                    this.leased.add(entry);
                                                    return entry;
                                              }
                                         }
                                   }
                                  final Iterator<E> it = this.available.iterator();
                                  while (it.hasNext()) {
                                        final E entry = it.next();
                                        if (entry.getState() == null) {
                                              it.remove(); //UPool的available中删掉这个connection
                                              this.leased.add(entry);//UPool的leased中添加这个connection
                                              return entry;
                                        }
                                   }
                           }
                            //走到这里说明没有可用的connection,下文一定会创建
                            return null;
                      }
                    ///////////////////////////////////////////////////////////
                    if (entry == null) {//没有借到connection
                        break;
                    }
                    if (entry.isExpired(System.currentTimeMillis())) {
                        entry.close();
                    }
                    if (entry.isClosed()) {
                        //这个connection关闭了(这里是底层socket的关闭),也把HPool中available和leased中保存的删掉,池里彻底没有这个connection了
                        this.available.remove(entry);
                        pool.free(entry, false);
                    } else {
                        break;
                    }
                }//死循环-2结束
                if (entry != null) {//上面借到了connection
                    //HPool中做相应的处理以作统计用
                    this.available.remove(entry);
                    this.leased.add(entry);
                    //钩子方法
                    onReuse(entry);
                    return entry;
                }

                // 走到这里说明没有获取到有效的connection,需要创建
                // 创建前先压缩一下UPool,把暂时空闲的connection删掉,腾出地儿
                final int maxPerRoute = getMax(route);
                // Shrink the pool prior to allocating a new connection
                final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
                if (excess > 0) {
                    for (int i = 0; i < excess; i++) {
                        final E lastUsed = pool.getLastUsed();
                        if (lastUsed == null) {
                            break;
                        }
                        lastUsed.close();
                        this.available.remove(lastUsed);
                        pool.remove(lastUsed);
                    }
                }
                //UPool中的connection量没到最大值才能新建
                if (pool.getAllocatedCount() < maxPerRoute) {
                    final int totalUsed = this.leased.size();
                    final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
                    //也需要满足HPool对connection数量总的限制
                    if (freeCapacity > 0) {
                        final int totalAvailable = this.available.size();
                        // HPool中,总的可用的connection很多,几乎没有使用
                        // 为了让当前的url可以新创建一个connection,随机删除一个可用的connection
                        if (totalAvailable > freeCapacity - 1) {
                            if (!this.available.isEmpty()) {
                                final E lastUsed = this.available.removeLast();
                                lastUsed.close();
                                final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
                                otherpool.remove(lastUsed);
                            }
                        }
                        //已经删除了一个没有使用的connection把地儿挪了出来,接着创建当前url的connection
                        final C conn = this.connFactory.create(route);
                        //放入HPool和UPool的leased中
                        entry = pool.add(conn);
                        this.leased.add(entry);
                        return entry;
                    }
                }
                //走到这里说明pool已经满了,不能创建新的connection
                boolean success = false;
                //一个线程对应一个future
                try {
                    if (future.isCancelled()) {
                        throw new InterruptedException("Operation interrupted");
                    }
                    //放入pending队列中
                    pool.queue(future);
                    this.pending.add(future);
                    if (deadline != null) {
                        //ConnectionRequestTimeout的设置最终会在这里起作用
                        //当前线程park了直到deadline这个时间点
                        //1. 线程一直park到deadline,返回false;
                        //2. 还没到deadline,被signal了,返回true;
                        //这是一个相对积极的信号,说明可能存在可用的connection。
                        //那么谁来调用signal呢?有两种可能:a. releaseConnection();b. 当前的获取操作被cancel()
                        //3. 被中断了,success也是false,直接走入finally;
                        success = this.condition.awaitUntil(deadline);
                    } else {
                        this.condition.await();
                        success = true;
                    }
                    // park被signal或睡到自然醒后,判断当前获取connection的操作是否被cancel
                    // 这里的cancel和FutureTask的cancel还不太一样。FutureTask的cancel是直接对线程进行interrupt(),这里只是对一个变量的值进行了改变;
                    if (future.isCancelled()) {
                        throw new InterruptedException("Operation interrupted");
                    }
                } finally {
                    // In case of 'success', we were woken up by the
                    // connection pool and should now have a connection
                    // waiting for us, or else we're shutting down.
                    // Just continue in the loop, both cases are checked.
                    pool.unqueue(future);
                    this.pending.remove(future);
                }
               
                if (!success && (deadline != null && deadline.getTime() <= System.currentTimeMillis())) {
                    //这里说明这个线程在deadline之前被中断了,或者等到醒来都没有新的connection可用
                    break;//跳出死循环-1
                }
            } //死循环-1 结束
            throw new TimeoutException("Timeout waiting for connection");
        } finally {
            this.lock.unlock();
        }
    }

释放连接

在上文中提到,在response返回给客户端之前会释放连接,接下来我们看一下释放的过程。

// ConnectionHolder.java
public void releaseConnection() {
        //一个connection只能释放一次,因此要加锁
        synchronized (this.managedConn) {
            if (this.released) {
                return;
            }
            this.released = true;
            //上文说过,reuseable会影响释放的过程
            if (this.reusable) {
                //可重复使用的connection,其实就是把Pool中leased里的connection挪到available中的过程
                //response的http头可能是这个样子:   Keep-Alive: timeout=5, max=100
                //这里的validDuration实际上是服务端返回的keep-alive的时间,若没有,就为-1
                this.manager.releaseConnection(this.managedConn,
                        this.state, this.validDuration, this.tunit);
            } else {
                try {  
                    //这里是真正的关闭,意味着socket也已经关闭
                    this.managedConn.close();
                    log.debug("Connection discarded");
                } catch (final IOException ex) {
                    if (this.log.isDebugEnabled()) {
                        this.log.debug(ex.getMessage(), ex);
                    }
                } finally {
                    this.manager.releaseConnection(
                            this.managedConn, null, 0, TimeUnit.MILLISECONDS);
                }
            }
        }
    }

池中释放就是把leased的connection挪到available中,但除了这个动作,还要有别的地方需要注意。available中可用connection并不是永远都有效的,因为tcp/ip协议是全双工方式工作,一个connection是否有效,要根据双方的时时状态来更新connection的生命周期。实际工作中,客户端一般要随服务端的状态来改变。比如服务端返回值中显示keepalive为10s,那么当这个connection在available中的存活时间也不能超过10s,否则就有问题。

// PoolingHttpClientConnectionManager.java
public void releaseConnection(
            final HttpClientConnection managedConn,
            final Object state,
            final long keepalive, final TimeUnit tunit) {
        Args.notNull(managedConn, "Managed connection");
        synchronized (managedConn) {
            final CPoolEntry entry = CPoolProxy.detach(managedConn);
            if (entry == null) {
                return;
            }
            final ManagedHttpClientConnection conn = entry.getConnection();
            try {
                if (conn.isOpen()) {
                    final TimeUnit effectiveUnit = tunit != null ? tunit : TimeUnit.MILLISECONDS;
                    entry.setState(state);
                    // 存活的最后时间点是 放入available的那一刻向后推keepalive;
                    // 当然,如果这个时间点在我们初始化时设置的最后时间点之后,还是以设置的值为准
                    entry.updateExpiry(keepalive, effectiveUnit);
                    if (this.log.isDebugEnabled()) {
                        final String s;
                        if (keepalive > 0) {
                            s = "for " + (double) effectiveUnit.toMillis(keepalive) / 1000 + " seconds";
                        } else {
                            s = "indefinitely";
                        }
                        this.log.debug("Connection " + format(entry) + " can be kept alive " + s);
                    }
                }
            } finally {
                // 这里就是connection从leased到available的挪动
                // HPool和UPool都要进行挪动的操作并唤醒等待的线程
                this.pool.release(entry, conn.isOpen() && entry.isRouteComplete());
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Connection released: " + format(entry) + formatStats(entry.getRoute()));
                }
            }
        }
}

关闭连接

除了不能重复使用的connection需要关闭外,一些超时无用的connection也要关闭

// 这个方法可以传入参数,可以由业务方根据实际情况设定值
public void closeIdleConnections(final long idleTimeout, final TimeUnit tunit) {
        if (this.log.isDebugEnabled()) {
            this.log.debug("Closing connections idle longer than " + idleTimeout + " " + tunit);
        }
        this.pool.closeIdle(idleTimeout, tunit);
}

 // 这个方法没有参数,那么哪些算是expired的呢?
 // 由上节我们知道,在释放连接的时候,会根据服务端的keepalive(没有的话,也有默认值) 设置expired的deadline;
public void closeExpiredConnections() {
        this.log.debug("Closing expired connections");
        this.pool.closeExpired();
}

idle: 从connection创建的时间点开始的idleTimeout时间范围,是一个绝对的时间范围;比如一个connection是10:00创建,idleTimeout设为60s,那么10:01以后这个connection就得关闭;

expire:expire需要一个deadline,这个deadline每次release的时候都会更新,值为release的时间点 + keepalive(或validityDeadline),是一个相对的时间范围;比如一个connection最后一次release的时间点是10:00,keepalive=6min,validityDeadline=5min,那么deadline=10:05,如果这个connection再没有使用过,则过了10:05,就算是过期的connection,应该被关闭; 如果在10:04的时候又被借出去使用了,release的时间是10:10,keepalive还是为6min,那么过了10:15,这个connection就应关闭了;

很多情况response的keepalive和validityDeadline都没有值,那么这个时候deadline就是Long.MAX_VALUE了,这个时候只能通过idle的值来关闭不需要的connection了;

下面再说明一下几个时间点

// 首次创建connection
public PoolEntry(final String id, final T route, final C conn,
            final long timeToLive, final TimeUnit tunit) {
        super();
        Args.notNull(route, "Route");
        Args.notNull(conn, "Connection");
        Args.notNull(tunit, "Time unit");
        this.id = id;
        this.route = route;
        this.conn = conn;
        this.created = System.currentTimeMillis();
        this.updated = this.created; //这个就是connection被创建的时间,会用于idle的判断
        if (timeToLive > 0) { //这个值通过HttpClientBuilder.setConnectionTimeToLive()传入
            final long deadline = this.created + tunit.toMillis(timeToLive);
            // If the above overflows then default to Long.MAX_VALUE
            this.validityDeadline = deadline > 0 ? deadline : Long.MAX_VALUE;
        } else {
            this.validityDeadline = Long.MAX_VALUE;
        }
        this.expiry = this.validityDeadline; //默认的expire deadline
    }

上述两种关闭connection的方式都是从时间入手,到了一个时间点,过期的connection都干掉。现在假如把connection的idleTimeout设为10天,expired的deadline没有设置,即为Long.MAX_VALUE,这个时候池里面的connection会有什么问题?服务器端的connection不会保留10天这么久,很快就会断掉,那么此时池里的connection实际上就是半双工状态了,这个不正常的connection会被客户端获取到。为了解决这个问题,引入了validateAfterInactivity(默认5s)

for (;;) {
    final E leasedEntry = getPoolEntryBlocking(route, state, timeout, tunit, this);
    //池中获取的connection要验证
    if (validateAfterInactivity > 0)  {
        //比如10:00创建的connection,那么10:05后就要验证了
        if (leasedEntry.getUpdated() + validateAfterInactivity <= System.currentTimeMillis()) {
            if (!validate(leasedEntry)) {
                //validate调用的是connection的isStale()
                //////////////////////////////////////////////////////////////
                public boolean isStale() {
                    if (!isOpen()) { //没有绑定socket 或 socket关闭
                        return true;
                    }
                  
                    try {
                        //其实socket没读到数据也不能说明该socket无效
                        //这里我觉得是一种较悲观的处理,宁可错杀一千,不可放过一个
                        final int bytesRead = fillInputBuffer(1);
                        return bytesRead < 0;
                    } catch (final SocketTimeoutException ex) {
                        //这里要注意,SocketTimeoutException不能说明这个connection无效
                        return false; //上面的if无法进入,这个connection可能有问题
                    } catch (final IOException ex) {
                        return true;
                    }
            }
                //////////////////////////////////////////////////////////////
                leasedEntry.close();
                release(leasedEntry, false);
                continue;
            }
        }
    }
    entryRef.set(leasedEntry);
    done.set(true);
    onLease(leasedEntry);
    if (callback != null) {
        callback.completed(leasedEntry);
    }
    return leasedEntry;
}

最后,本文有点长,如果读者觉得有哪里不对的地方,欢迎批评指正。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,376评论 6 491
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,126评论 2 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 156,966评论 0 347
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,432评论 1 283
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,519评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,792评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,933评论 3 406
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,701评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,143评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,488评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,626评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,292评论 4 329
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,896评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,742评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,977评论 1 265
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,324评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,494评论 2 348

推荐阅读更多精彩内容

  • HttpClient整理资料 1、httpClient HttpClient是Apache中的一个开源的项目。它实...
    小白豆豆5阅读 29,842评论 5 38
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,629评论 18 139
  • 第一章 Nginx简介 Nginx是什么 没有听过Nginx?那么一定听过它的“同行”Apache吧!Ngi...
    JokerW阅读 32,649评论 24 1,002
  • 昨天和朋友一起去看了郭敬明和落落的《悲伤逆流成河》的电影版,说出来你们可能不信,电影的最后几幕情景,我哭...
    月羽九九阅读 421评论 1 2
  • 昨天晚上和我的一个妹聊天,得知她最近跑回我出生的那个小县城实习了。两个多月前,她还在贵阳实习,无意中问了一下她毕业...
    326公路阅读 207评论 0 0