上一期介绍了kafka的基本信息和简单使用。这一期介绍kafka的详细信息。
Kafka的Producer
1、消息和数据的生产者,向 Kafka 的 topic 发布消息。
2、Producer将消息发布到指定的Topic中,Producer可以指定将此消息归属于哪个partition,如果不指定,kafka会基于"round-robin"的方式,将消息存放到partition中去.
3、异步发送,批量发送可以很有效的提高发送效率。Kafka producer的异步发送模式允许进行批量发送,先将消息缓存在内存中,然后一次请求批量发送出去。
Kafka的Consumer
1、消息和数据的消费者,订阅 topics 并处理其订阅的消息。
2、每个consumer属于一个consumer group,每个group中可以有多个consumer,发送到Topic的消息,只会被订阅此Topic的每个group中的一个consumer消费。
3、在 kafka中,我们可以认为一个group是一个"订阅"者,一个Topic中的每个partition只会被一个"订阅者"中的一个consumer消费,不过一个 consumer可以消费多个partitions中的消息。
4、consumer group(包含多个consumer)对一个topic进行消费,不同的consumer group之间独立订阅。
Kafka的Broker
1、Broker:缓存代理,Kafka 集群中的一台或多台服务器统称为 broker。
2、为了减少磁盘写入的次数,broker会将消息暂时buffer起来(segment),当消息的个数达到一定阀值时,再flush到磁盘,这样减少了磁盘IO调用的次数。
3、Broker不保存订阅者的状态,由订阅者自己保存(低版本kafka由zookeeper保存,新版本中broker保存订阅者的状态)。
4、kafka采用基于时间的SLA(服务水平保证),消息保存一定时间(通常为7天)后会被删除。
5、消息订阅者可以rewind back到任意位置重新进行消费,当订阅者故障时,可以选择最小的offset(id)进行重新读取消费消息。
Kafka的Message
1、Message消息:是通信的基本单位,每个 producer 可以向一个 topic(主题)发布一些消息。
2、Kafka中的Message是以topic为基本单位组织的,不同的topic之间是相互独立的。每个topic又可以分成几个不同的partition(每个topic有几个partition是在创建topic时指定的),每个partition存储一部分Message。
Kafka的Partitions
1、kafka基于文件存储.通过分区,可以将日志内容分散到多个server上,来避免文件尺寸达到单机磁盘的上限,每个partiton都会被当前server(kafka实例)保存。
2、可以将一个topic切分多任意多个partitions,来消息保存/消费的效率。
3、越多的partitions意味着可以容纳更多的consumer,有效提升并发消费的能力。
Kafka的持久化
1、kafka采用日志形式来持久化,添加数据时,直接追加到日志文件上面。
2、一个Topic可以认为是一类消息,每个topic将被分成多partition(区),每个partition在存储层面是append log文件。
3、Logs文件根据broker中的配置要求,保留一定时间后删除来释放磁盘空间。
4、为数据文件建索引:稀疏存储,每隔一定字节的数据建立一条索引。
Kafka的分布式架构
Kafka的分布式架构,producer、broker(kafka)和consumer都可以有多个。Producer,consumer实现Kafka注册的接口,数据从producer发送到broker,broker承担一个中间缓存和分发的作用。broker分发注册到系统中的consumer。broker的作用类似于缓存,即活跃的数据和离线处理系统之间的缓存。
kafka-manager集群管理器
kafka-manager是yahoo的开源项目,通过Kafka Manager用户能够更容易地发现集群中哪些主题或者分区分布不均匀,同时能够管理多个集群,能够更容易地检查集群的状态,能够创建主题,执行首选的副本选择,能够基于集群当前的状态生成分区分配,并基于生成的分配执行分区的重分配,此外,Kafka Manager还是一个非常好的可以快速查看集群状态的工具。项目地址:https://github.com/yahoo/kafka-manager。
Kafka集群配置
KafkaUtil文件:
package com.lqq.demo2;
import java.util.Properties;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
public class KafkaUtil {
public final static String topic = "Kaffka_demo2";
public final static String bootstrap_servers = "localhost:9092,localhost:9093,localhost:9094";
public final static String group_id = "group_demo2";
public final static String key_serializer = "org.apache.kafka.common.serialization.StringSerializer";
public final static String value_serializer = "org.apache.kafka.common.serialization.StringSerializer";
public final static String key_deserializer = "org.apache.kafka.common.serialization.StringDeserializer";
public final static String value_deserializer = "org.apache.kafka.common.serialization.StringDeserializer";
private static KafkaProducer<String, String> kp;
private static KafkaConsumer<String, String> kc;
public static KafkaProducer<String, String> getProducer() {
if (kp == null) {
Properties props = new Properties();
props.put("bootstrap.servers", bootstrap_servers);
props.put("acks", "all");
props.put("client.id", "demo2_producer");
props.put("retries", 1);
props.put("batch.size", 16384);
props.put("key.serializer", key_serializer);
props.put("value.serializer", value_serializer);
kp = new KafkaProducer<String, String>(props);
}
return kp;
}
public static KafkaConsumer<String, String> getConsumer() {
if (kc == null) {
Properties props = new Properties();
props.put("bootstrap.servers", bootstrap_servers);
props.put("group.id", group_id);
props.put("client.id", "demo2_consumer");
props.put("heartbeat.interval.ms", "200");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", key_deserializer);
props.put("value.deserializer", value_deserializer);
kc = new KafkaConsumer<String, String>(props);
}
return kc;
}
}
生产者类:
package com.lqq.demo2;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class KProducer{
private static AtomicInteger msgNo = new AtomicInteger(1);
public static class ProducerThread implements Runnable{
Producer<String, String> producer=KafkaUtil.getProducer();
String topic;
public ProducerThread(String topic){
this.topic=topic;
}
@Override
public void run() {
while (true) {
int no=msgNo.getAndIncrement();
String data = new String("hello kafka message " + no);
String key = String.valueOf(no);
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, key, data);
producer.send(record, new SendCallback());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
ExecutorService producerPool = Executors.newFixedThreadPool(3);
for (int i = 0; i < 3; i++) {
producerPool.execute(new ProducerThread(KafkaUtil.topic));
}
producerPool.shutdown();
}
}
KConsumer类:
package com.lqq.demo2;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
public class KConsumer{
private final String topic;
private final Consumer<String, String> consumer=KafkaUtil.getConsumer();
public KConsumer(String topic) {
this.topic = topic;
}
public void consume() {
consumer.subscribe(Arrays.asList(topic));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.println("Consumer record offset=" + record.offset() + " key=" + record.key() + " value="
+ record.value());
}
}
}
public void close(){
consumer.close();
}
public static void main(String[] args) {
KConsumer consumer=new KConsumer(KafkaUtil.topic);
consumer.consume();
consumer.close();
}
}