Kafka连接管理

private static Cache<Long, KafkaClient> clientCache = CacheBuilder.newBuilder()
        .maximumSize(80).expireAfterAccess(10, TimeUnit.MINUTES)
        .removalListener(new RemovalListener<Long, KafkaClient>() {

    @Override
    public void onRemoval(RemovalNotification<Long, KafkaClient> arg0) {
        KafkaClient client = arg0.getValue();
        client.close();
        LOG.info("{} close kafka connection[{}];reason:{}", Thread.currentThread().getId(), arg0.getKey(), arg0.getCause());
    }
}).build();

采用Guava缓存,若kafka链接对象在缓存中10分钟内没有读写,则直接过期;可以不用采用连接池的方式去实现链接管理;

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容