安装
在安装kafka之前需要保证本机是安装好JDK和ZK的。关于JDK和ZK的安装之前的文章是有写的。其实安装kafka只需要在官网下载二进制的压缩包解压就可以了。
http://kafka.apache.org/downloads
选择第一个二进制文件下载即可。
修改server.properties
log.dirs=D:\kafka_2.12-2.5.0\kafka-log
启动kafka服务
首先启动zk服务(之前文章有些如何启动)然后启动kafka服务。
.\bin\windows\kafka-server-start.bat .\config\server.properties
启动起来后不要关闭这个窗口。
创建主题
首先创建一个主题
.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic demo
查看已创建的主题
.\bin\windows\kafka-topics.bat --list --zookeeper localhost:2181
创建发送方
在kafka安装路径下执行
.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic demo
创建消费方
在kafka安装路径下执行
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic demo--from-beginning
最后发送消息
java 实现
依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
发送方
public void send(){
Properties properties = new Properties();
//设置key的序列化
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//设置重试次数
properties.put(ProducerConfig.RETRIES_CONFIG,10);
//设置序列化器
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
//设置集群地址
properties.put("bootstrap.servers","localhost:9092");
KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);
ProducerRecord<String,String> record = new ProducerRecord<>("demo","kafka-demo","hello kafka ...");
try{
Future<RecordMetadata> result = producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e == null){
System.out.println("topic : "+recordMetadata.topic());
System.out.println("partition : "+recordMetadata.partition());
System.out.println("offset : "+recordMetadata.offset());
}
}
});
}catch (Exception e){
e.printStackTrace();
}
/**
try{
Future<RecordMetadata> result = producer.send(record);
RecordMetadata recordMetadata = result.get();
System.out.println("topic : "+recordMetadata.topic());
System.out.println("partition : "+recordMetadata.partition());
System.out.println("offset : "+recordMetadata.offset());
}catch (Exception e){
e.printStackTrace();
}
**/
producer.close();
}
接收方
public static void getMsg(){
Properties props = new Properties();
// 必须设置的属性
props.put("bootstrap.servers", "127.0.0.1:9092");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("group.id", "group.one");
props.put("enable.auto.commit", "true");
// 自动提交offset,每1s提交一次
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset","earliest ");
props.put("client.id", "zy_client_id");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("demo"));
while(true) {
// 从服务器开始拉取数据
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> {
System.out.printf("topic = %s ,partition = %d,offset = %d, key = %s, value = %s%n", record.topic(), record.partition(),
record.offset(), record.key(), record.value());
});
}
}