前言
对于一个优秀的系统来说,各项指标的监控必不可少,特别是在容易出现性能瓶颈和发生故障的地方,能够准确清晰获取当发生异常时的信息,对于线上问题定位和排查提供有力的支撑。最近在使用 Apache Httpclient 时,经常出现问题。最后通过运用监控去合理分析资源利用情况,达到最优解。
PoolingHttpClientConnectionManager
PoolingHttpClientConnectionManager 是一个 Http 连接池管理器,用来服务于多线程时并发获取连接的请求。每个路由(IP)将池化不大于 defaultMaxPerRoute 参数的连接。那么根据 PoolingHttpClientConnectionManager 提供的 API ,我们只需要调用其相关 API 就可以实现,不需要其他的如通过 AOP 拦截实现。
PoolState
该对象是用来接收真正的连接池中数据而提供的,如在 ConnPoolControl 中定义的方法,通过调用 getTotalStats、getStats 来获取某一时刻连接池中的数据指标,PoolingHttpClientConnectionManager 实现了 ConnPoolControl 接口,因而提供了该能力
PoolStats getTotalStats();
PoolStats getStats(final T route);
getTotalStats
获取连接池中所有路由的连接池状况
public PoolStats getTotalStats() {
return this.pool.getTotalStats();
}
getRoutes
获取连接池中所有的路由
public Set<HttpRoute> getRoutes() {
return this.pool.getRoutes();
}
getStats
获取指定路由的连接池的状态
@Override
public PoolStats getStats(final HttpRoute route) {
return this.pool.getStats(route);
}
因此这里可以获取以下指标
- 当前正在使用的连接数大小
- 当前闲置的连接数大小
- 当前正在等待连接的线程数大小
- 当前总的连接数大小
完整代码
@Bean
public PoolingHttpClientConnectionManager poolingHttpClientConnectionManager() {
PoolingHttpClientConnectionManager manager = new PoolingHttpClientConnectionManager(
RegistryBuilder.<ConnectionSocketFactory>create()
.register("http", PlainConnectionSocketFactory.getSocketFactory())
.build(),
ManagedHttpClientConnectionFactory.INSTANCE,
null,
null,
-1,
TimeUnit.MILLISECONDS);
manager.setMaxTotal(400);
manager.setDefaultMaxPerRoute(150);
return manager;
}
@Bean
public CloseableHttpClient closeableHttpClient() {
return HttpClientBuilder.create()
.evictExpiredConnections()
.setConnectionManager(poolingHttpClientConnectionManager())
.evictIdleConnections(74, TimeUnit.SECONDS)
.build();
}
@Scheduled(fixedDelay = 30000, initialDelay = 30000)
public void httpPoolStats() {
// 获取每个路由的状态
Set<HttpRoute> routes = poolingHttpClientConnectionManager.getRoutes();
routes.forEach(e -> {
PoolStats stats = poolingHttpClientConnectionManager.getStats(e);
System.out.println("Per route:" + routes.toString() + stats.toString());
});
// 获取所有路由的连接池状态
PoolStats totalStats = poolingHttpClientConnectionManager.getTotalStats();
System.out.println("Total status:" + totalStats.toString());
}
执行结果
Per route:[{}->http://127.0.0.1:9001][leased: 1; pending: 0; available: 0; max: 150]
Total status:[leased: 1; pending: 0; available: 0; max: 400]
注意
每个从 PoolingHttpClientConnectionManager 获取连接池状态的 API 调用都加锁,因此不要太频繁的调用,以免阻塞真正获取复用连接的请求
HttpConnectionFactory
Http Connection 连接的管理工厂,用来创建 Http Connection 连接,通过创建的 Http Connection 连接,可以根据暴露出的方法来获取当前连接的状态。
在 PoolingHttpClientConnectionManager 中的 create 方法可以看到,当创建一个新的连接时,将会调用 http connection 管理工厂 connFactory.create(route, config) 方法,
默认 connFactory 的实现是 ManagedHttpClientConnectionFactory,该类用来创建连接管理器 LoggingManagedHttpClientConnection ,使用日志来记录每次请求的详细情况
@Override
public ManagedHttpClientConnection create(final HttpRoute route) throws IOException {
ConnectionConfig config = null;
if (route.getProxyHost() != null) {
config = this.configData.getConnectionConfig(route.getProxyHost());
}
if (config == null) {
config = this.configData.getConnectionConfig(route.getTargetHost());
}
if (config == null) {
config = this.configData.getDefaultConnectionConfig();
}
if (config == null) {
config = ConnectionConfig.DEFAULT;
}
return this.connFactory.create(route, config);
}
实际上,我们可以继承 ManagedHttpClientConnectionFactory 类, 重写它的 create 方法,将其替换为业务上需要的指标监控即可
@Override
public ManagedHttpClientConnection create(final HttpRoute route, final ConnectionConfig config) {
final ConnectionConfig cconfig = config != null ? config : ConnectionConfig.DEFAULT;
CharsetDecoder charDecoder = null;
CharsetEncoder charEncoder = null;
final Charset charset = cconfig.getCharset();
final CodingErrorAction malformedInputAction = cconfig.getMalformedInputAction() != null ?
cconfig.getMalformedInputAction() : CodingErrorAction.REPORT;
final CodingErrorAction unmappableInputAction = cconfig.getUnmappableInputAction() != null ?
cconfig.getUnmappableInputAction() : CodingErrorAction.REPORT;
if (charset != null) {
charDecoder = charset.newDecoder();
charDecoder.onMalformedInput(malformedInputAction);
charDecoder.onUnmappableCharacter(unmappableInputAction);
charEncoder = charset.newEncoder();
charEncoder.onMalformedInput(malformedInputAction);
charEncoder.onUnmappableCharacter(unmappableInputAction);
}
final String id = "http-outgoing-" + Long.toString(COUNTER.getAndIncrement());
return new LoggingManagedHttpClientConnection(
id,
log,
headerLog,
wireLog,
cconfig.getBufferSize(),
cconfig.getFragmentSizeHint(),
charDecoder,
charEncoder,
cconfig.getMessageConstraints(),
incomingContentStrategy,
outgoingContentStrategy,
requestWriterFactory,
responseParserFactory);
}
关于如何实现,可以参考 LoggingManagedHttpClientConnection 中重写 DefaultManagedHttpClientConnection 之后的方法,如
关闭连接时
@Override
public void close() throws IOException {
//TODO
}
设置超时时间时(每次请求都会调用)
@Override
public void setSocketTimeout(final int timeout) {
//TODO
super.setSocketTimeout(timeout);
}
强制关闭该连接时
@Override
public void shutdown() throws IOException {
//TODO
super.shutdown();
}
获取 inputstream 流时,这里可以实现 InputStream 的 read() 方法,可以对网络数据流进行详细的监控
@Override
protected InputStream getSocketInputStream(final Socket socket) throws IOException {
InputStream in = super.getSocketInputStream(socket);
//TODO
return in;
}
获取 outPutStream 流时,同上
@Override
protected OutputStream getSocketOutputStream(final Socket socket) throws IOException {
OutputStream out = super.getSocketOutputStream(socket);
//TODO
return out;
}
收到 HttpResponse 时
@Override
protected void onResponseReceived(final HttpResponse response) {
//TODO
}
提交请求时
@Override
protected void onRequestSubmitted(final HttpRequest request) {
//TODO
}
HttpConnectionMetrics 是 BHttpConnectionBase 提供的 http 请求记录器,记录了 http 请求和响应各部分数量,参考:HttpConnectionMetrics
HttpConnectionMetrics metrics = super.getMetrics();
完整代码参考
连接工厂
public class SimpleLogConnectionFactory extends ManagedHttpClientConnectionFactory {
private static final AtomicLong COUNTER = new AtomicLong();
public static final SimpleLogConnectionFactory INSTANCE = new SimpleLogConnectionFactory();
private final HttpMessageWriterFactory<HttpRequest> requestWriterFactory;
private final HttpMessageParserFactory<HttpResponse> responseParserFactory;
private final ContentLengthStrategy incomingContentStrategy;
private final ContentLengthStrategy outgoingContentStrategy;
public SimpleLogConnectionFactory(
final HttpMessageWriterFactory<HttpRequest> requestWriterFactory,
final HttpMessageParserFactory<HttpResponse> responseParserFactory,
final ContentLengthStrategy incomingContentStrategy,
final ContentLengthStrategy outgoingContentStrategy) {
super();
this.requestWriterFactory = requestWriterFactory != null ? requestWriterFactory :
DefaultHttpRequestWriterFactory.INSTANCE;
this.responseParserFactory = responseParserFactory != null ? responseParserFactory :
DefaultHttpResponseParserFactory.INSTANCE;
this.incomingContentStrategy = incomingContentStrategy != null ? incomingContentStrategy :
LaxContentLengthStrategy.INSTANCE;
this.outgoingContentStrategy = outgoingContentStrategy != null ? outgoingContentStrategy :
StrictContentLengthStrategy.INSTANCE;
}
public SimpleLogConnectionFactory(
final HttpMessageWriterFactory<HttpRequest> requestWriterFactory,
final HttpMessageParserFactory<HttpResponse> responseParserFactory) {
this(requestWriterFactory, responseParserFactory, null, null);
}
public SimpleLogConnectionFactory(
final HttpMessageParserFactory<HttpResponse> responseParserFactory) {
this(null, responseParserFactory);
}
public SimpleLogConnectionFactory() {
this(null, null);
}
@Override
public ManagedHttpClientConnection create(final HttpRoute route, final ConnectionConfig config) {
final ConnectionConfig cconfig = config != null ? config : ConnectionConfig.DEFAULT;
CharsetDecoder charDecoder = null;
CharsetEncoder charEncoder = null;
final Charset charset = cconfig.getCharset();
final CodingErrorAction malformedInputAction = cconfig.getMalformedInputAction() != null ?
cconfig.getMalformedInputAction() : CodingErrorAction.REPORT;
final CodingErrorAction unmappableInputAction = cconfig.getUnmappableInputAction() != null ?
cconfig.getUnmappableInputAction() : CodingErrorAction.REPORT;
if (charset != null) {
charDecoder = charset.newDecoder();
charDecoder.onMalformedInput(malformedInputAction);
charDecoder.onUnmappableCharacter(unmappableInputAction);
charEncoder = charset.newEncoder();
charEncoder.onMalformedInput(malformedInputAction);
charEncoder.onUnmappableCharacter(unmappableInputAction);
}
return new HttpRequestTimeoutLog(
COUNTER.getAndIncrement(),
cconfig.getBufferSize(),
cconfig.getFragmentSizeHint(),
charDecoder,
charEncoder,
cconfig.getMessageConstraints(),
incomingContentStrategy,
outgoingContentStrategy,
requestWriterFactory,
responseParserFactory);
}
日志记录
@Slf4j
public class HttpRequestTimeoutLog extends DefaultManagedHttpClientConnection {
/**
* 每次请求的开始时间,单位毫秒
*/
private final long callStartMill;
private StringBuilder sbLog;
private HttpConnectionMetrics metrics;
private String callStringId;
private long callId;
private long lastTime;
public HttpRequestTimeoutLog(
final long id,
final int bufferSize,
final int fragmentSizeHint,
final CharsetDecoder charDecoder,
final CharsetEncoder charEncoder,
final MessageConstraints constraints,
final ContentLengthStrategy incomingContentStrategy,
final ContentLengthStrategy outgoingContentStrategy,
final HttpMessageWriterFactory<HttpRequest> requestWriterFactory,
final HttpMessageParserFactory<HttpResponse> responseParserFactory) {
super("http-outgoing-" + id, bufferSize, fragmentSizeHint, charDecoder, charEncoder,
constraints, incomingContentStrategy, outgoingContentStrategy,
requestWriterFactory, responseParserFactory);
metrics = super.getMetrics();
callStartMill = System.currentTimeMillis();
callStringId = "http-outgoing-" + id;
callId = id;
sbLog = new StringBuilder();
}
private void recordEventLog(String name) {
long elapseMills = System.currentTimeMillis() - lastTime;
sbLog.append(name).append("=").append(elapseMills).append(" ms;");
if ("responseEnd".equals(name)) {
//打印出每个步骤的时间点
if (elapseMills > 400) {
log.warn(sbLog.toString());
}
sbLog.setLength(0);
}
}
@Override
public void setSocketTimeout(int timeout) {
if (timeout != 0) {
sbLog.append("callStringId=").append(callStringId).append(";remote-host=")
.append(super.getRemoteAddress()).append(";");
lastTime = System.currentTimeMillis();
recordEventLog("delay request callStart");
}
super.setSocketTimeout(timeout);
}
@Override
public void close() throws IOException {
log.info(
"{} now close connection,alive time={} ms,requestCount={},responseCount={},"
+ "receivedBytesCount={},sentBytesCount={}",
callStringId,
System.currentTimeMillis() - callStartMill,
metrics.getRequestCount(),
metrics.getResponseCount(),
metrics.getReceivedBytesCount(),
metrics.getSentBytesCount());
super.close();
}
@Override
public void shutdown() throws IOException {
log.info("{} now shutdown connection,alive time={} ms", callStringId,
System.currentTimeMillis() - callStartMill);
super.shutdown();
}
@Override
protected void onRequestSubmitted(final HttpRequest request) {
recordEventLog("requestEnd");
//连接时间超过最大超时时间,打印远程服务机器
long costTime = System.currentTimeMillis() - lastTime;
if (costTime >= Constant.DEFAULT_TIMEOUT) {
log.warn("requestEnd cost time={} ms,uri={}", costTime,
((HttpRequestWrapper) request).getURI());
}
}
@Override
protected void onResponseReceived(final HttpResponse response) {
recordEventLog("responseEnd");
}
}
全文完