private static Cache<Long, KafkaClient> clientCache = CacheBuilder.newBuilder()
.maximumSize(80).expireAfterAccess(10, TimeUnit.MINUTES)
.removalListener(new RemovalListener<Long, KafkaClient>() {
@Override
public void onRemoval(RemovalNotification<Long, KafkaClient> arg0) {
KafkaClient client = arg0.getValue();
client.close();
LOG.info("{} close kafka connection[{}];reason:{}", Thread.currentThread().getId(), arg0.getKey(), arg0.getCause());
}
}).build();
采用Guava缓存,若kafka链接对象在缓存中10分钟内没有读写,则直接过期;可以不用采用连接池的方式去实现链接管理;