KAFKA 消费端代码示例

kafka 消费端代码:

public static void main(String[] args) {
    String recordStrFormat = "offset = %d, key = %s, value = %s\n";
    Properties props = new Properties();
    props.put("bootstrap.servers", "spidercdh-01:9092");
    props.put("group.id", "default");
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", 1000);
    props.put("session.timeout.ms", 30000);
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
    //test test2 为topic的名字
    consumer.subscribe(Arrays.asList("test","test2"));
    try {
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(String.format(recordStrFormat, record.offset(), record.key(), record.value()));
            }
        }
    } finally {
        if (null != consumer)
            consumer.close();
    }
}}
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容