前言
需要对Kafka有一定的了解
先抛出几个问题:
spring boot kafka使用
- 如何配置多线程消费?
- 如何手动管理Offset?
- 如何命名多线程名称?
1. quick start
先快速开始体验一下
springBootVersion=1.5.17.RELEASE
jdk=1.8.0_161
// spring-kafka版本 2.2.0.RELEASE
compile("org.springframework.kafka:spring-kafka:2.2.0.RELEASE")
配置一下KafkaTemplate和KafkaConsumer
package com.dobest.bfas.collection.consumer;
import com.dobest.analytics.utils.IPUtil;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import java.util.HashMap;
import java.util.Map;
/**
* @author: hujiansong
*/
@Configuration
public class KafkaTemplateConfig {
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>(16);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.RETRIES_CONFIG, "3");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 200);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
return props;
}
@Bean
ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
// factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>(16);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test.consumer");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, IPUtil.getHostName() + "-h5-consumer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
}
这样就配置好了,开袋即食用:
@Component
@Slf4j
public class SinkMessageListener implements CommandLineRunner {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
// 指定topics,可以指定多个,最好每个topic对应一个
@KafkaListener(id = "test-consumer", topics = {"realtime.analytics_h5_source"})
public void listen(ConsumerRecord<String, String> cr) throws Exception {
log.debug(Thread.currentThread().getName() + " {}", cr.value());
}
@Override
public void run(String... args) throws Exception {
log.info("start producer 10000 message ");
for (int i = 0; i < 100; i++) {
kafkaTemplate.send("realtime.analytics_h5_source", UUID.randomUUID().toString(), "i" + i);
}
}
}
回答问题
- 如何配置多线程消费?
可以看到消费工厂,可以指定
factory.setConcurrency(3)
一行代码解决多线程消费问题
- 如何手动管理Offset?
首先将
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
将自动提交设置为false
然后消费工厂设置提交模式
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
AckMode 如下:
RECORD :当listener一读到消息,就提交offset
BATCH : poll() 函数读取到的所有消息,就提交offset
TIME : 当超过设置的ackTime ,即提交Offset
COUNT :当超过设置的COUNT,即提交Offset
COUNT_TIME :TIME和COUNT两个条件都满足,提交offset
MANUAL : Acknowledgment.acknowledge()即提交Offset,和Batch类似
MANUAL_IMMEDIATE: Acknowledgment.acknowledge()被调用即提交Offset
count 和 time都可以通过factory的配置进行设置
factory.getContainerProperties().setAckCount();
factory.getContainerProperties().setAckTime();
设置完之后,就可以在Listen中回调
@KafkaListener(id = "test-consumer", topics = {"realtime.analytics_h5_source"})
public void listen(ConsumerRecord<String, String> cr,Acknowledgment acknowledgment) throws Exception {
log.debug(Thread.currentThread().getName() + " {}", cr.value());
// 注意: 比 quickstart 读了一个Acknowledgment参数
acknowledgment.acknowledge();
}
如果遇到Exception
java.lang.IllegalStateException: Consumer cannot be configured for auto commit for ackMode MANUAL_IMMEDIATE
修改配置,将自动提交关闭
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
这样就可以手动提交offset了
- 如何命名多线程?
大家都知道,见名知意的重要性,所以Spring-kafka可以对并发消费修改线程名字
@KafkaListener( topics = {"realtime.analytics_h5_source"})
public void listen(ConsumerRecord<String, String> cr) throws Exception {
// 打印当前线程
log.debug(Thread.currentThread().getName() + " {}", cr.value());
}
KafkaListener不加id得到的线程名字
KafkaListener 加id得到的线程名字:
@KafkaListener(id = "test-consumer", topics = {"realtime.analytics_h5_source"})
public void listen(ConsumerRecord<String, String> cr) throws Exception {
log.debug(Thread.currentThread().getName() + " {}", cr.value());
}
总结
使用KafkaTemplate生产者步骤:
- 配置producerConfigs
- 配置producerFactory
- 配置KafkaTemplate
使用KafkaLinstener消费者步骤
- 配置consumerConfigs
- 配置consumerFactory
- 配置kafkaListenerContainerFactory
其中kafkaListenerContainerFactory可以设置并发数,offset提交模式。