Httpclient4.3+ 连接池监控详细介绍

前言

对于一个优秀的系统来说,各项指标的监控必不可少,特别是在容易出现性能瓶颈和发生故障的地方,能够准确清晰获取当发生异常时的信息,对于线上问题定位和排查提供有力的支撑。最近在使用 Apache Httpclient 时,经常出现问题。最后通过运用监控去合理分析资源利用情况,达到最优解。

PoolingHttpClientConnectionManager

PoolingHttpClientConnectionManager 是一个 Http 连接池管理器,用来服务于多线程时并发获取连接的请求。每个路由(IP)将池化不大于 defaultMaxPerRoute 参数的连接。那么根据 PoolingHttpClientConnectionManager 提供的 API ,我们只需要调用其相关 API 就可以实现,不需要其他的如通过 AOP 拦截实现。

PoolState

该对象是用来接收真正的连接池中数据而提供的,如在 ConnPoolControl 中定义的方法,通过调用 getTotalStats、getStats 来获取某一时刻连接池中的数据指标,PoolingHttpClientConnectionManager 实现了 ConnPoolControl 接口,因而提供了该能力

    PoolStats getTotalStats();

    PoolStats getStats(final T route);

getTotalStats

获取连接池中所有路由的连接池状况

    public PoolStats getTotalStats() {
        return this.pool.getTotalStats();
    }

getRoutes

获取连接池中所有的路由

    public Set<HttpRoute> getRoutes() {
        return this.pool.getRoutes();
    }

getStats

获取指定路由的连接池的状态

    @Override
    public PoolStats getStats(final HttpRoute route) {
        return this.pool.getStats(route);
    }

因此这里可以获取以下指标

  1. 当前正在使用的连接数大小
  2. 当前闲置的连接数大小
  3. 当前正在等待连接的线程数大小
  4. 当前总的连接数大小

完整代码

  @Bean
  public PoolingHttpClientConnectionManager poolingHttpClientConnectionManager() {
    PoolingHttpClientConnectionManager manager = new PoolingHttpClientConnectionManager(
        RegistryBuilder.<ConnectionSocketFactory>create()
            .register("http", PlainConnectionSocketFactory.getSocketFactory())
            .build(),
        ManagedHttpClientConnectionFactory.INSTANCE,
        null,
        null,
        -1,
        TimeUnit.MILLISECONDS);
    manager.setMaxTotal(400);
    manager.setDefaultMaxPerRoute(150);
    return manager;
  }

  @Bean
  public CloseableHttpClient closeableHttpClient() {
    return HttpClientBuilder.create()
        .evictExpiredConnections()
        .setConnectionManager(poolingHttpClientConnectionManager())
        .evictIdleConnections(74, TimeUnit.SECONDS)
        .build();
  }

  @Scheduled(fixedDelay = 30000, initialDelay = 30000)
  public void httpPoolStats() {
    // 获取每个路由的状态
    Set<HttpRoute> routes = poolingHttpClientConnectionManager.getRoutes();
    routes.forEach(e -> {
      PoolStats stats = poolingHttpClientConnectionManager.getStats(e);
      System.out.println("Per route:" + routes.toString() + stats.toString());
    });
    // 获取所有路由的连接池状态
    PoolStats totalStats = poolingHttpClientConnectionManager.getTotalStats();
    System.out.println("Total status:" + totalStats.toString());
  }

执行结果

Per route:[{}->http://127.0.0.1:9001][leased: 1; pending: 0; available: 0; max: 150]
Total status:[leased: 1; pending: 0; available: 0; max: 400]

注意

每个从 PoolingHttpClientConnectionManager 获取连接池状态的 API 调用都加锁,因此不要太频繁的调用,以免阻塞真正获取复用连接的请求

HttpConnectionFactory

Http Connection 连接的管理工厂,用来创建 Http Connection 连接,通过创建的 Http Connection 连接,可以根据暴露出的方法来获取当前连接的状态。
在 PoolingHttpClientConnectionManager 中的 create 方法可以看到,当创建一个新的连接时,将会调用 http connection 管理工厂 connFactory.create(route, config) 方法,
默认 connFactory 的实现是 ManagedHttpClientConnectionFactory,该类用来创建连接管理器 LoggingManagedHttpClientConnection ,使用日志来记录每次请求的详细情况

        @Override
        public ManagedHttpClientConnection create(final HttpRoute route) throws IOException {
            ConnectionConfig config = null;
            if (route.getProxyHost() != null) {
                config = this.configData.getConnectionConfig(route.getProxyHost());
            }
            if (config == null) {
                config = this.configData.getConnectionConfig(route.getTargetHost());
            }
            if (config == null) {
                config = this.configData.getDefaultConnectionConfig();
            }
            if (config == null) {
                config = ConnectionConfig.DEFAULT;
            }
            return this.connFactory.create(route, config);
        }

实际上,我们可以继承 ManagedHttpClientConnectionFactory 类, 重写它的 create 方法,将其替换为业务上需要的指标监控即可

 @Override
    public ManagedHttpClientConnection create(final HttpRoute route, final ConnectionConfig config) {
        final ConnectionConfig cconfig = config != null ? config : ConnectionConfig.DEFAULT;
        CharsetDecoder charDecoder = null;
        CharsetEncoder charEncoder = null;
        final Charset charset = cconfig.getCharset();
        final CodingErrorAction malformedInputAction = cconfig.getMalformedInputAction() != null ?
                cconfig.getMalformedInputAction() : CodingErrorAction.REPORT;
        final CodingErrorAction unmappableInputAction = cconfig.getUnmappableInputAction() != null ?
                cconfig.getUnmappableInputAction() : CodingErrorAction.REPORT;
        if (charset != null) {
            charDecoder = charset.newDecoder();
            charDecoder.onMalformedInput(malformedInputAction);
            charDecoder.onUnmappableCharacter(unmappableInputAction);
            charEncoder = charset.newEncoder();
            charEncoder.onMalformedInput(malformedInputAction);
            charEncoder.onUnmappableCharacter(unmappableInputAction);
        }
        final String id = "http-outgoing-" + Long.toString(COUNTER.getAndIncrement());
        return new LoggingManagedHttpClientConnection(
                id,
                log,
                headerLog,
                wireLog,
                cconfig.getBufferSize(),
                cconfig.getFragmentSizeHint(),
                charDecoder,
                charEncoder,
                cconfig.getMessageConstraints(),
                incomingContentStrategy,
                outgoingContentStrategy,
                requestWriterFactory,
                responseParserFactory);
    }

关于如何实现,可以参考 LoggingManagedHttpClientConnection 中重写 DefaultManagedHttpClientConnection 之后的方法,如

关闭连接时

   @Override
    public void close() throws IOException {
      //TODO
    }

设置超时时间时(每次请求都会调用)

    @Override
    public void setSocketTimeout(final int timeout) {
       //TODO
        super.setSocketTimeout(timeout);
    }

强制关闭该连接时

    @Override
    public void shutdown() throws IOException {
        //TODO
        super.shutdown();
    }

获取 inputstream 流时,这里可以实现 InputStream 的 read() 方法,可以对网络数据流进行详细的监控

    @Override
    protected InputStream getSocketInputStream(final Socket socket) throws IOException {
        InputStream in = super.getSocketInputStream(socket);
        //TODO
        return in;
    }

获取 outPutStream 流时,同上

    @Override
    protected OutputStream getSocketOutputStream(final Socket socket) throws IOException {
        OutputStream out = super.getSocketOutputStream(socket);
        //TODO
        return out;
    }

收到 HttpResponse 时

    @Override
    protected void onResponseReceived(final HttpResponse response) {
       //TODO
    }

提交请求时

    @Override
    protected void onRequestSubmitted(final HttpRequest request) {
        //TODO
    }

HttpConnectionMetrics 是 BHttpConnectionBase 提供的 http 请求记录器,记录了 http 请求和响应各部分数量,参考:HttpConnectionMetrics

HttpConnectionMetrics metrics = super.getMetrics();

完整代码参考

连接工厂

public class SimpleLogConnectionFactory extends ManagedHttpClientConnectionFactory {

  private static final AtomicLong COUNTER = new AtomicLong();
  public static final SimpleLogConnectionFactory INSTANCE = new SimpleLogConnectionFactory();
  private final HttpMessageWriterFactory<HttpRequest> requestWriterFactory;
  private final HttpMessageParserFactory<HttpResponse> responseParserFactory;
  private final ContentLengthStrategy incomingContentStrategy;
  private final ContentLengthStrategy outgoingContentStrategy;

  public SimpleLogConnectionFactory(
      final HttpMessageWriterFactory<HttpRequest> requestWriterFactory,
      final HttpMessageParserFactory<HttpResponse> responseParserFactory,
      final ContentLengthStrategy incomingContentStrategy,
      final ContentLengthStrategy outgoingContentStrategy) {
    super();
    this.requestWriterFactory = requestWriterFactory != null ? requestWriterFactory :
        DefaultHttpRequestWriterFactory.INSTANCE;
    this.responseParserFactory = responseParserFactory != null ? responseParserFactory :
        DefaultHttpResponseParserFactory.INSTANCE;
    this.incomingContentStrategy = incomingContentStrategy != null ? incomingContentStrategy :
        LaxContentLengthStrategy.INSTANCE;
    this.outgoingContentStrategy = outgoingContentStrategy != null ? outgoingContentStrategy :
        StrictContentLengthStrategy.INSTANCE;
  }

  public SimpleLogConnectionFactory(
      final HttpMessageWriterFactory<HttpRequest> requestWriterFactory,
      final HttpMessageParserFactory<HttpResponse> responseParserFactory) {
    this(requestWriterFactory, responseParserFactory, null, null);
  }

  public SimpleLogConnectionFactory(
      final HttpMessageParserFactory<HttpResponse> responseParserFactory) {
    this(null, responseParserFactory);
  }

  public SimpleLogConnectionFactory() {
    this(null, null);
  }

  @Override
  public ManagedHttpClientConnection create(final HttpRoute route, final ConnectionConfig config) {
    final ConnectionConfig cconfig = config != null ? config : ConnectionConfig.DEFAULT;
    CharsetDecoder charDecoder = null;
    CharsetEncoder charEncoder = null;
    final Charset charset = cconfig.getCharset();
    final CodingErrorAction malformedInputAction = cconfig.getMalformedInputAction() != null ?
        cconfig.getMalformedInputAction() : CodingErrorAction.REPORT;
    final CodingErrorAction unmappableInputAction = cconfig.getUnmappableInputAction() != null ?
        cconfig.getUnmappableInputAction() : CodingErrorAction.REPORT;
    if (charset != null) {
      charDecoder = charset.newDecoder();
      charDecoder.onMalformedInput(malformedInputAction);
      charDecoder.onUnmappableCharacter(unmappableInputAction);
      charEncoder = charset.newEncoder();
      charEncoder.onMalformedInput(malformedInputAction);
      charEncoder.onUnmappableCharacter(unmappableInputAction);
    }
    return new HttpRequestTimeoutLog(
        COUNTER.getAndIncrement(),
        cconfig.getBufferSize(),
        cconfig.getFragmentSizeHint(),
        charDecoder,
        charEncoder,
        cconfig.getMessageConstraints(),
        incomingContentStrategy,
        outgoingContentStrategy,
        requestWriterFactory,
        responseParserFactory);
  }

日志记录

@Slf4j
public class HttpRequestTimeoutLog extends DefaultManagedHttpClientConnection {

  /**
   * 每次请求的开始时间,单位毫秒
   */
  private final long callStartMill;

  private StringBuilder sbLog;

  private HttpConnectionMetrics metrics;

  private String callStringId;
  private long callId;
  private long lastTime;

  public HttpRequestTimeoutLog(
      final long id,
      final int bufferSize,
      final int fragmentSizeHint,
      final CharsetDecoder charDecoder,
      final CharsetEncoder charEncoder,
      final MessageConstraints constraints,
      final ContentLengthStrategy incomingContentStrategy,
      final ContentLengthStrategy outgoingContentStrategy,
      final HttpMessageWriterFactory<HttpRequest> requestWriterFactory,
      final HttpMessageParserFactory<HttpResponse> responseParserFactory) {
    super("http-outgoing-" + id, bufferSize, fragmentSizeHint, charDecoder, charEncoder,
        constraints, incomingContentStrategy, outgoingContentStrategy,
        requestWriterFactory, responseParserFactory);
    metrics = super.getMetrics();
    callStartMill = System.currentTimeMillis();
    callStringId = "http-outgoing-" + id;
    callId = id;
    sbLog = new StringBuilder();
  }

  private void recordEventLog(String name) {
    long elapseMills = System.currentTimeMillis() - lastTime;
    sbLog.append(name).append("=").append(elapseMills).append(" ms;");
    if ("responseEnd".equals(name)) {
      //打印出每个步骤的时间点
      if (elapseMills > 400) {
        log.warn(sbLog.toString());
      }
      sbLog.setLength(0);
    }
  }

  @Override
  public void setSocketTimeout(int timeout) {
    if (timeout != 0) {
      sbLog.append("callStringId=").append(callStringId).append(";remote-host=")
          .append(super.getRemoteAddress()).append(";");
      lastTime = System.currentTimeMillis();
      recordEventLog("delay request callStart");

    }
    super.setSocketTimeout(timeout);
  }

  @Override
  public void close() throws IOException {

    log.info(
        "{} now close connection,alive time={} ms,requestCount={},responseCount={},"
            + "receivedBytesCount={},sentBytesCount={}",
        callStringId,
        System.currentTimeMillis() - callStartMill,
        metrics.getRequestCount(),
        metrics.getResponseCount(),
        metrics.getReceivedBytesCount(),
        metrics.getSentBytesCount());
    super.close();
  }

  @Override
  public void shutdown() throws IOException {
    log.info("{} now shutdown connection,alive time={} ms", callStringId,
        System.currentTimeMillis() - callStartMill);
    super.shutdown();
  }

  @Override
  protected void onRequestSubmitted(final HttpRequest request) {
    recordEventLog("requestEnd");
    //连接时间超过最大超时时间,打印远程服务机器
    long costTime = System.currentTimeMillis() - lastTime;
    if (costTime >= Constant.DEFAULT_TIMEOUT) {
      log.warn("requestEnd cost time={} ms,uri={}", costTime,
          ((HttpRequestWrapper) request).getURI());
    }
  }

  @Override
  protected void onResponseReceived(final HttpResponse response) {
    recordEventLog("responseEnd");
  }
}  

全文完

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,123评论 6 490
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,031评论 2 384
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 156,723评论 0 345
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,357评论 1 283
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,412评论 5 384
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,760评论 1 289
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,904评论 3 405
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,672评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,118评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,456评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,599评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,264评论 4 328
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,857评论 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,731评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,956评论 1 264
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,286评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,465评论 2 348