为什么要有Kafka?
在目前流量越来越大的时代,很多时候我们的服务器资源是没有利用到的,而是在某一瞬间才利用到这个资源,而这一瞬间正是服务器流量的高峰期,而如果没有先弄多几台服务器加强集群的性能,可能就在那一刻就挂了。
Kafka就是将这些流量进行一个排队缓存,一个一个来,相当于一个保安维护好整个秩序。
什么是kafka?
Kafka是由LinkedIn使用Scala开发的,而后很多Api使用Java来写,并捐给了Apache并开源。
Kafka是一个分布式消息队列,为了处理实时数据提供一个统一、高通量、低等待的平台,通常在流式计算中Kafka用来缓存数据,Storm消费Kafka的数据进行计算。
Kafka对消息保存时是根据Topic(主题)进行归类的,发送消息者称为Producer(生产者),消息接收者称为Consumer(消费者),此外Kafka集群中有多个Kafka实例组成,就是多个服务器,每个实例称为broker(经纪人)。
Kafka集群时依赖Zookeeper来保存一些元数据信息,从而保证系统的可用性。
组成
Broker:一台Kafka服务器就是一个broker。
一个集群可以由多个broker组成。
一个broker可以容纳多个Topic。
Partition(分区):为了实现扩展性,一个非常大的Topic可以分布到多个Broker上,一个topic可以分到多个partition。
Kafka只保证按照一个partition中的顺序将消息有序的发送给consumer,不保证一个Topic(整体,多个partition)的顺序,单分区有序,不保证整体分区数据有序。
Kafka消息发送时,都会发送到一个Topic,其本质是一个目录,而topic是由一些partition logs(分区日志)组成的。
Topic相当于管道,topic可以有多个分区。
每个Partition中的消息都是有序的,生成的消息不断的追加到partition log上,其中的每一个消息都赋予了一个唯一的offset的值。
请一边看图,一边阅读。
分区的原因在于,方便集群扩展,每个partition可以通过调整以适应他所在是机器,而一个topic又可以有多个partition组成,因此整个集群就可以适应任意数据的大小了。
另外就是提高并发,可以以partition为单位的读写。
分区的原则:1.指定了partition,则直接使用
2.为指定partition,但指定了key,对key的value进行hash出一个partition。
3.partition和key都未指定,使用轮询选出一个partition。
Replication(副本):同一个Partition可能会有有个副本(对应server.properties配置中的default.replication.factory=N),没有副本的情况下,一旦broker挂了,其上所有分区数据都不可被消费了,同时producer也不能将数据存于其上的分区,引入副本后同一个分区可能有个副本,而此时需要再这些副本直接选出一个Leader,producer和Consumer只能这个leader做交互,其他的副本作为follower从leader中复制数据
Offset:kafka存储文件都是按照offset.kafka来命名的,用offset做名称的好处在于方便查找,如果是要找1024的位置,只要找到1024.kafka的文件即可,offset就是偏移量,相当于指针。
Producer:消息生产者,就是向Kafka broker发送消息的客户端。
Consumer:消息消费者,就是从Kafka broker取消息的客户端。
Topic:就是一个队列,每一个队列要自定名称来区分。
一个Topic可以有多个Partition。
Consumer Group(消费者组):这是Kafka用来实现topic消息的广播(发给所有consumer)和单播(发给其中一个)的,一个Topic可以有多个Consumer Group。
Topic的消息会到所有的Consumer
Group,但每个分区只会把消息发送到该Consumer Group中的一个Consumer。
用Consumer Group可以将Consumer进行自由分组,而不需要每次发送消息到不同的Topic。
就是由一个或多个消费者组成的一个组,共同消费一个topic,每个分区在同一时间只能有一个组中的一个消费者去读取,但是有多个组的话,可以同时去一个分区消费。
假设有三个消费者组成的Group,有一个消费者读取Topic中的一个分区,另外两个分区各读取一个分区的数据,在这种情况下,消费者可以通过水平扩展的方式,同时读取大量的消息,另外如果有一个消费者失败了,那么组的其他成员会自动的负载均衡读取之前失败的消费者读取的分区。
某消费者读取某分区也叫做某消费者是这个分区的拥有者。
存储策略
无论消息是否被消费,kafka都会保留所有消息,有两种策略删除旧数据。
1.基于时间:log.retention.hours=168
2.基于大小:log.retention.bytes=1073741824
需要注意,因为kafka读取特定消息的时间复杂度为O(1),也就是和文件大小无关无关,所以这里删除过期文件和提升Kafka性能无关。
优点
解耦:允许独立的扩展或修改两边的处理进程,只要确保他们遵守同样的接口约束。
冗余:消息队列把数据持久化直到他们已经被完全处理,通过这一方式可以规避了数据丢失风险,许多消息队列所采用的”插入-获取-删除”范式中,在一个消息从队列删除之前,需要你的处理系统明确指出该消息已经被完全处理完毕,从而确保你的数据被完全的保存直到你使用完毕,就是维护数据在写入之前的一个缓存,避免丢失。
扩展性:消息队列解耦了处理过程,所以增大消息入队和处理频率是很容易的,处理过程中只要额外增加,只做维护,要多少取多少。
灵活性&峰值处理能力:在访问量急剧增加的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见,如果以处理这样的流量峰值访问为标准来投入资源随时待命无疑是巨大的浪费,使用消息队列能够让关键组件顶住突发的访问能力,而不会因为突发的超负载的请求而完全崩溃,就是数据先到消息队列做缓存,压力不会一下子堆积,如果处理压力和承接压力过大,就很容易发生崩溃了。
可恢复性:系统的一部分组件失效时,不会影响到整个系统,消息队列降低了进程直接的耦合度,所以即使一个处理消息的进程挂了,加入队列中的消息依然可以在系统恢复后被处理。
顺序保证:在大多数业务场景下,数据的处理顺序是非常重要的,大部分的消息队列本来就是排序的,并且能够保证数据会按照特定的顺序来处理(kafka保证一个partition(分区)内的消息有序性)。
缓冲:有助于控制和优化数据流经过的系统速度,解决生产消息和消费消息的速度不一致问题。
异步通信:很多时候,用户并不想也不需要立即处理消息,消息队列提供了异步处理的机制,允许用户把一个消息加入队列,但并不处理他,想向队列中放多少消息就放多少,然后在需要的时候就去处理他们。
写流程
Producer采用推(push)模式将消息发布到broker,每条消息都被追加(append)到分区(partition)中,属于顺序写磁盘,顺序写磁盘比随机写内存要高,保证kafka吞吐量。
存储方式:物理上Topic分成一个或多个partition,每个partition物理上对应一个文件夹,改文件夹存储该partition的所有消息和索引文件,可以通过查看logs文件夹内的内容
Kafka常用命令
启动服务器:bin/kafka-server-start.shconfig/server.properties &
关闭服务器:bin/kafka-server-stop.sh
查看服务器的topic:bin/kafka-topics.sh--list --zookeeper hadoop-senior00-levi.com:2181
创建topic:bin/kafka-topics.sh--create --zookeeper hadoop-senior00-levi.com:2181 --replication-factor 3--partitions 1 --topic first
删除topic(在server.properties设置delete.topic.enable=true才会删除,否则是逻辑删除):bin/kafka-topics.sh --delete --zookeeperhadoop-senior00-levi.com:2181 --topic first
发送消息,生产者:kafka-console-producer.sh --ker-listhadoop-senior00-levi.com:9092 --topic first
消费消息,消费者(在生产者界面输入内容后回车):bin/kafka-console-consumer.sh --zookeeperhadoop-senior00-levi.com:2181 --from-beginning --topic first
查看topic详情:bin/kafka-topics.sh--topic first --describe --zookeeper hadoop-senior00-levi.com:2181
高级Api和低级Api
拦截器
拦截器是在0.1版本引入的,主要实现clients端的定制化控制逻辑。
拦截器可能是运行多个线程,因此具体实现用户需要自行保证线程安全,如果指定了多个则producer将按照指定顺序调用。
生产者拦截器JavaApi接口:ProducerInterceptor
消费者拦截器JavaApi接口:ConsumerInterceptor
Java API实现
Consumer
public static void main(String[] args) {
Propertiesprops = new Properties();
//服务器,指定要消费的服务器地址
props.put("bootstrap.servers", "hadoop-senior00-levi.com:9092");
//组ID
props.put("group.id", "test");
//自动提交,自动进行offset偏移量
props.put("enable.auto.commit", true);
//自动处理间隔1秒
props.put("auto.commit.interval.ms", 1000);
//key序列化
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//value序列化
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//客户端
KafkaConsumerconsumer = new KafkaConsumer<>(props);
//订阅topic,可变参数,可订阅多个topic
consumer.subscribe(Arrays.asList("first","second"));
while (true) {
//100ms消费一次服务端的推送过来的数据
ConsumerRecordsrecords = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println("偏移量:" + record.offset() + " key:" + record.key() + " Value:" + record.value());
}
}
}
拦截器1
public class TimeInterceptor implementsProducerInterceptor<String, String> {
/*
*初始化
* @Title configure
* @param configs
* @date 2018年12月20日 上午10:22:51
* @overridden @seeorg.apache.kafka.common.Configurable#configure(java.util.Map)
*/
@Override
public void configure(Map<String, ?> configs) {
}
/*
*发送前的数据处理
* @Title onSend
* @param record
* @return
* @date 2018年12月20日 上午10:35:57
* @overridden @see org.apache.kafka.clients.producer.ProducerInterceptor#onSend(org.apache.kafka.clients.producer.ProducerRecord)
*/
@Override
public ProducerRecord<String, String>
onSend(ProducerRecord<String, String> record) {
return new ProducerRecord<String, String>(record.topic(), record.partition(), record.timestamp(), record.key(), System.currentTimeMillis() +" interceptor: " + record.value(), record.headers());
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
}
}
拦截器2
public class ContentInterceptor implementsProducerInterceptor<String, String>{
/*
*初始化
* @Title configure
* @param configs
* @date 2018年12月20日 上午10:25:40
* @overridden @seeorg.apache.kafka.common.Configurable#configure(java.util.Map)
*/
@Override
public void configure(Map<String, ?> configs) {
// TODO Auto-generated method stub
}
/*
*发送前数据处理
* @Title onSend
* @param record
* @return
* @date 2018年12月20日 上午10:25:48
* @overridden @seeorg.apache.kafka.clients.producer.ProducerInterceptor#onSend(org.apache.kafka.clients.producer.ProducerRecord)
*/
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String,
String> record) {
return new ProducerRecord<String, String>(record.topic(), record.partition(), record.timestamp(), record.key(),record.value(), record.headers());
}
int successCount = 0;
int failCount = 0;
/*
*发送之后,服务器响应后的处理
*
*/
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
if(null == exception) {
successCount++;
}else {
failCount++;
}
}
@Override
public void close() {
System.out.println("服务器关闭,看下成功了多少,失败了多少!");
System.out.println("Success:" + successCount + " --- Fail:" + failCount);
}
}
自定义分区
public class ConsumerPartition implementsPartitioner {
/*
*配置方法,初始化调用
* @Title configure
* @param configs
* @date 2018年12月20日 上午9:57:39
* @overridden @seeorg.apache.kafka.common.Configurable#configure(java.util.Map)
*/
@Override
public void configure(Map<String, ?> configs) {
// TODO Auto-generated method stub
}
/*
*定制分区
* @Title partition
* @param topic
* @param key
* @param keyBytes
* @param value
* @param valueBytes
* @param cluster
* @return
* @date 2018年12月20日 上午9:57:49
* @overridden @seeorg.apache.kafka.clients.producer.Partitioner#partition(java.lang.String,java.lang.Object, byte[], java.lang.Object, byte[],org.apache.kafka.common.Cluster)
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
System.out.println("即将进入分区的数据:" + new String(keyBytes) + " - " + new String(valueBytes));
return 0;
}
/*
*关闭处理
* @Title close
* @date 2018年12月20日 上午9:57:56
* @overridden @see org.apache.kafka.clients.producer.Partitioner#close()
*/
@Override
public void close() {
// TODO Auto-generated method stub
}
}
消费者
public class CallBackProducer {
public static void main(String[] args) {
//配置信息,记得hosts要配置
Propertiesprops = new Properties();
props.put("bootstrap.servers", "hadoop-senior00-levi.com:9092");
//等待所有响应
props.put("acks", "all");
//重试
props.put("retries", 0);
//缓存大小
props.put("batch.size", 16384);
//间隔时间
props.put("linger.ms", 1);
//内存缓存
props.put("buffer.memory", 33554432);
//数据的key序列化
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//数据的value序列化
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//自定义分区
props.put("partitioner.class", "com.levi.kafka.producer.ConsumerPartition");
//配置拦截器
Listlist = new ArrayList<>();
list.add("com.levi.kafka.interceptor.TimeInterceptor");
list.add("com.levi.kafka.interceptor.ContentInterceptor");
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, list);
//生产者
Producerproducer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++) {
//第一个参数:topic名称 第二个参数:key 第三个参数:value
producer.send(new ProducerRecord<String,
String>("second", Integer.toString(i), "==== "+ Integer.toString(i)),new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
System.out.println("此次消息发送返回的偏移量:" + metadata.offset() + " 数据到了哪个分区:" + metadata.partition());
}
});
}
producer.close();
}
}
Kafka Stream
KafkaStream是Apache Kafka开源项目的一个组成部分,一个流式处理的库,并非框架,这就意味着这是一个非常轻量级的库,用于在Kafka构建高可用分布式,拓展和容错的应用程序。
特点
1.高扩展性(拓扑结构)、弹性、容错性
2.轻量级,不需要专门的集群,是一个库并非框架,框架的话有1000个功能只用到100个,库就不一样,专门干某件事的,只有干这件事的功能。
3.完成集成和兼容kafka0.10.0版本
4.实时性,毫秒级延迟,流式处理,窗口允许乱序数据,允许迟到数据,数据来了就干活,就处理。
为什么存在?
有了Storm,有了Spark
Stream等流式框架,但是这些都是框架,对于小的流式业务场景并不需要使用到那么重量级的东西。
而且如果本身就使用Kafka作为消息队列,那么直接使用Kafka Stream即可,而Kafka本身也支持数据持久化,因此Kafka Stream提供滚动部署和滚动升级,并可以在线动态调整并行度。(看写流程)
案例
Application
public class Application {
public static void main(String[] args) {
//配置信息
Propertiesproperties = new Properties();
//定义这个流的ID
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "log");
//服务器端
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop-senior00-levi.com:9092");
//流配置完成,创建流对象
StreamsConfigstreamsConfig = new StreamsConfig(properties);
//创建拓扑,拓扑只是流处理里面的概念意义,把所有的这个处理、流向都绘画成一张图。
TopologyBuildertBuilder = new TopologyBuilder();
//开始构建网络拓扑
//构建输入源
tBuilder.addSource("MySourceName-Test", "first")
//处理这个输入源的类
.addProcessor("MyProcessName-Test2", new ProcessorSupplier<byte[], byte[]>() {
@Override
public Processor<byte[], byte[]> get() {
//返回日志处理类
return new LogProcessor();
}
//这个处理的数据哪里来?来自上一个MySourceName - Test
},"MySourceName-Test")
//输出到哪个Topic,并且这个数据来源于哪里
.addSink("MySinkName-Test3", "second", "MyProcessName-Test2");
//构建输出流
KafkaStreamskStreams = new KafkaStreams(tBuilder, streamsConfig);
kStreams.start();
//在服务器启动,生产者first,消费者second,生产者输入111>>>222
}
}
LogProcessor
public class LogProcessor implementsProcessor<byte[], byte[]> {
private ProcessorContext context;
/*
*初始化
*/
@Override
public void init(ProcessorContext context) {
//拿到这个内容类,可以给处理方法方法使用
this.context = context;
}
/*
*处理
* @Title process
* @param key
* @param value
* @date 2018年12月20日 下午5:05:06
* @overridden @seeorg.apache.kafka.streams.processor.Processor#process(java.lang.Object,java.lang.Object)
*/
@Override
public void process(byte[] key, byte[] value) {
Stringstring = new String(value);
// System.out.println(string);
//写出数据
if(string.contains(">>>")) {
this.context.forward("log".getBytes(), string.split(">>>")[1].getBytes());
}
}
@Override
public void punctuate(long timestamp) {
}
@Override
public void close() {
}
}