kafka概述
基于2.0
kafka架构图:
整个生成消费里分四块Producer、Borker,Consumer和zk。
1、Producer,负责生产数据
2、Consumer,负责消费数据,Consumer可以分为多个组,组里可以包含一个或多个Consumer,组内只能消费同一条消息一次,不同的组可以重复消费同一条消息。
3、ZooKeeper,注册中心,会记录集群信息
4、Borker,一个独立的Kafka节点,多个broker组成kafka集群。
kafka中有两个重要的概念,topic和分区
topic代表一个主题,相当于一个消息队列,一个topic可以分为多个分区
不同的分区保存的消息是不同的,消息在分区内部是有序的,对于topic来说是无序的(Topic有一个分区除外)每个分区会有多个副本,其中只有一个是leader副本
消息在发送的时候会指明要发送到哪一个分区,默认是轮训分区进行添加
分区可以看作是一个可以追加的日志文件,每条消息被追加到分区的时候都有一个offset,通过offset和分区号就可以找对应的消息
对于分区的名词解释:AR:所有副本,ISR:与leader副本保持一致的副本,包括leader副本,HW:高水位,表示消费者可以拉取到的最高偏移量,LEO:表示下一条代写入的位置,LAG:消费者没有消费的记录数
HW和LEO的作用: 生产者发送了一条消息,broker接收到后,其他副本还没有来的及同步,此时是消费者是不能消费的,这个时候LEO会比HW大。
消息生命周期过程:
producer生产消息,指明要发送的topic和分区,等待消息发送成功
kafka收到消息存储到指定分区,副本进行消息的同步,返回添加结果,producer收到发送结果
Consumer拉取消息进行消费,拉取的时候会指明topic和分区,已经要拉取的offset等值
拉取后进行,消费消息消费,消费完成后进行offset偏移量的提交
日志清理在一定的周期条件内将消息清除
Producer介绍
Producer客户端一共有两个重大版本,Java版本和Scala版本(Old Producer),Kafka 0.9版本正式使用Java版本的producer替换了原Scala版本的producer,下面都是基于Java版本介绍
消息发送
消息发送的,
两种API发送消息的方式:
Future send(ProducerRecord record)(相当于send(record,null))
Future send(ProducerRecord record, Callback callback)
对于客户端来说三种处理方式,直接发送,同步获取结果,异步获取结果。
ProducerRecord属性:
private final Stringtopic; //要发送的topic
private final Integerpartition; //要发送的分区
private final K key; //消息的key(对分区和消息清理都会有作用)
private final V value; //消息体
private final Longtimestamp; //时间戳,分CreateTime和LogAppendTime
消息发送成功判断的三种配置:直接发送,leader已经接收,所有ISR副本都已经同步成功,通过acks来配置,分别对应,0,1,-1(all)
序列化器
序列化作用是将发送的数据转成字节码在网络间传输
默认为StringSerialize,自定义序列化通过实现,org.apache.kafka.common.serialization.Serializer接口
序列化必须和消费者的反序列化一样,否则可能会导致消费出错,所以自定义序列化必须在consumer中自定义反序列化
分区器
分区器的作用是指明消息要发往的分区
当ProducerRecord中已经指明分区号的时候就不需要分区器
默认的分区器为DefaultPartitioner,实现的分区规则为:
1、如果消息中的key不为空,则对分区的key进行hash(MurmurHash2算法)来计算分区号,相同的Key会被写入同一个分区
2、如果Key为空就以轮训的方式发送主题内各个分区
注意:在分区数一旦添加的话,Key与分区之间的映射就可能有问题。
自定义分区器的实现需要实现:org.apache.kafka.clients.producer.Partitioner接口
拦截器
自定义拦截器需要实现org.apache.kafka.clients.producer.ProducerInterceptor接口
//发送前执行
public ProducerRecord onSend(ProducerRecord record);
//发送成功或失败是执行,优先于Callback
public void onAcknowledgement(RecordMetadata metadata, Exception exception);
//关闭后执行
public void close();
可以配置多个拦截器,配置的时候用逗号隔开,拦截器的执行顺序会按照配置顺序来执行
消息累加器
消息累加器相当于一个Map<partationId,Queue<ProducerBatch>>的数据结构,用来暂存要发送的消息。ProducerBatch是producerRecord的集合,当有新的发送消息进来时:
首先根据分区号找到对应队列,获取队列中的最后一个ProducerBatch
将ProducerRecorder插入ProducerBatch中
如果ProducerBatch已经达到最大容量,则新增一个ProducerBatch将ProducerBatch放入队尾,并把数据插入新创建的Batch中。
当ProducerBatch达到最大容量的时候Sender线程会进行发送
Sender线程
Sender线程是将消息发送到Broker的线程
从消息累加器中拿到消息,将Map<partationId,Queue<ProducerBatch>>转为Map<Node,List<ProducerBatch>>的形式之后转成<Node,Request>
将发送的Request记录到inFlightRequests中,Map<Node,Request>,记录的是已经发出但是还没有收到响应的请求。
生产流程
RecordAccumulator(双端队列):消息累加器,默认为32M,通过buffer.memory来配置,消息量过大时会阻塞,阻塞时间长度为max.block.ms配置,默认为60s
ProducerBatch:消息的集合、批次,会包含一条甚至多条ProducerRecorder,batch.size的配置会影响ProducerBatch大小。
retries(默认为0)和retry.backoff.ms(默认为100)重试次数和重试间隔时长,(max.in.flight.requests.per.connection参数配置为 1可以解决重发乱序问题,但是会影响性能)
lingers.ms指定producerBatch等待producerRecord加入的时间。
重要的参数
acks:发送成功的衡量标准,默认为1(Leader写入成功即可)
max.request.size:客户端发送消息的最大值,默认为1M(broker中有message.max.bytes参数的配置,需要同步配置,不能比max.request.size小)
retries(默认为0)和retry.backoff.ms(默认为100)重试次数和重试间隔时长
lingers.ms指定producerBatch等待producerRecord加入的时间默认为0
max.in.flight.requests.per.connection客户端和broker的链接数,默认为5.
consumer 介绍
消费者组
消费者组中会有多个消费者构成,组内的所有消费者协调在一起来消费订阅主题(subscribed topics)的所有分区(partition)
consumer group下可以有一个消费者或多个消费者
消费者组通过group.id(字符串),来唯一标识一个consumer group
consumer group下订阅的topic下的每个分区只能分配给某个group下的一个consumer
组内消费者共同分到不同的分区进行消费。一个分区只能有组内的一个消费者订阅,但可以被不同消费组中的消费者同时订阅。
消费者分区分配策略
两个主题,topic0,topic1都有三个分区,t0p0、t0p1、t0p2、t1p0、t1p1、t1p2两个消费者同时订阅
分区分配策略有三种:
RangeAssignor分配策略(默认策略)
按照消费者总数和分区总数进行整除获得一个跨度,来进行分区分配
即:
C1:t0p0、t0p1、t1p0、t1p1
C2:t0p2、t1p2
RoundRobinAssignor分配策略
通过轮训的方式将分区进行分配
即:
C1:t0p0、t0p2、t1p1
C2:t0p1、t1p0、t1p2
StickAssignor分配策略
两个原则
1、分区尽可能均匀 2、分配尽可能与上次分配相同
C1:t0p0、t0p2、t1p1
C2:t0p1、t1p0、t1p2
在重新分配是会与RoundRobinAssignor不一样,如:
C0:t0p0、t1p1、t3p0
C1:t0p1、t2p0、t3p1
C2:t1p0、t2p1
当C1离开后变为:
C0:t0p0、t1p1、t3p0、t2p0
C2:t1p0、t2p1、t0p1、t3p1
自定义分区策略,需要实现org.apache.kafka.clients.consumer.internals.PartitionAssignor接口
分区策略通过,partition.assignment.strategy来配置。
消费者订阅/取消订阅
订阅API为四种:
subscribe(Collection<String> topics) //主题
subscribe(Collection<String> topics, ConsumerRebalanceListener listener) //主题,重平衡监听器
subscribe(Pattern pattern, ConsumerRebalanceListener listener)//正则,重平衡监听器
subscribe(Pattern pattern)//正则
assign(TopicPartition)//指明订阅主题中的分区
取消订阅为两种:
unsubscribe()和subscribe(new ArrayList())
消费者消息拉取API
consumer.poll(Duration timeout);
拉取消息的时候,客户端在发送请求时会告诉broker要哪个分区的哪个offset之后的数据
拉取消息后数据会经过反序列化,反序列器必须和生产者的序列化器相对应,还会经过消费者拦截器,配置见下
拉取数据有可以有最大拉取条数,和最大拉取的数据量大小控制,具体配置参数见下。
拉取到的消息对象说明:
private final Stringtopic; //topic
private final int partition; //分区
private final long offset; //偏移量
private final long timestamp; //时间戳
private final TimestampTypetimestampType; 时间戳类型,分CreateTime和LogAppendTime
private final long checksum; //校验值
private final int serializedKeySize; //序列化key长度
private final int serializedValueSize; //序列化value长度
private final K key; //key的值
private final V value;//value的值
消费者提交offset
位移提交分为三种,自动提交(通过配置参数来控制,不推荐),手动同步提交,手动异步提交
api有四种,为:
//手动同步提交
consumer.commitSync();
//手动异步提交
consumer.commitAsync();
consumer.commitAsync(OffsetCommitCallback callback);
consumer.commitAsync(Map<TopicPartiton,OffsetAndMetadata> offsets, OffsetCommitCallback callback)
重要参数
fetach.max.bytes:一次拉取的最大数据量,默认为50M
fetch.min.bytes/fetch.max.wait.ms:拉取最小数据量和拉取最大等待时间,默认为1B和500ms
max.poll.records:最大拉取的消息数
auto.offset.reset:开始消费位置,latest(默认,重最新开始拉取),earliest,none
auto.commit.interval.ms:默认自动提交位移的周期,默认5000
partition.assignment.stategy:默认分区策略
interceptor.class:客户端拦截器
主题和分区
主题就是消息的归类(队列),主题细分可以分为一个或多个分区,分个分区可以有一个或多个副本,每一个副本多对应一个日志文件,每个日志文件可以分为多个日志分段,每个日志分段可以分为索引文件,日志存储文件和快照文件
主题
主题创建kafka-topic.sh来创建主题,指明分区和副本数量,例如:
./bin/kafka-topics.sh --create --zookeeper 172.16.14.100:2181 --topic test2 --partitions 10 --replication-factor 20
当broker端参数配置为auto.create.topics.enable为true时,当生产者生产和消费者消费时,如果主题不存在,会自动创建主题。
创建主题的副本数不能大于broker数,分区数可以大于
主题命名时,不能用__开头,__开头的默认为内部主题
查看、修改和删除主题都可以使用kafka-topics.sh来操作
分区
分区副本的存储主要有三个文件,日志文件,位移索引文件和时间索引文件
同一个副本下的文件,如:
00000000000000000000.log,00000000000000000000.index,00000000000000000000.timeindex
00000000000000000155.log,00000000000000000155.index,00000000000000000155.timeindex
命名规则:名称固定为20位数字,由0和日志中的第一条偏移量来命名。log结尾表示日志文件,index结尾则为offset索引文件,timeindex为时间索引文件,索引文件都为稀疏索引,只有当消息写入一定量的时候才会增加索引文件记录。由。log.index.interval.bytes来配置,默认为4KB
offset索引文件:每个索引项由8个字节组成,relativeOffset和position,relativeOffset代表的是相对的偏移量。存储格式为:
0000 0006 0000 009c(r0ffset= 6,position=156)
0000 000e 0000 01cb(r0ffset=14,position=495)
0000 0016 0000 02fa(r0ffset=22,position=656)
时间索引文件:与offset索引文件类似。长度为12字节,分为两个部分,timestamp和relativeOffset,查找时需要从offset索引文件中查找position
日志片段拆分条件:
1、日志分段超过log.segment.bytes配置的值默认为1G
2、当消息中的最大时间戳大于log.roll.ms(hours)时,默认为7天
3、偏移量或所有文件大于log.index.size.max.bytes,默认为10M
4、消息追加的偏移量大于Integer.MAX_VALUE
位移主题
__consumer_offsets主题为内部主题中会保存各个消费者提交的位移,通过group.id的hash来得到具体要存放到哪个分区。默认创建50个分区,3个副本,通过offset.topic.num.partitions和offsets.topic.replication.factor来配置。
日志清理
日志清理有两种策略:日志删除和日志压缩,通过log.cleanup.policy来设置清理策略。默认问delete,可以设置为compact,日志清理可以控制到主题级别,通过配置不同的参数
日志清理由一个单独的日志清理线程来执行,默认为5分种扫描一次,通过log.retention.check.interval.ms来配置
日志删除有两种策略:
1、基于时间,通过log.retention.ms来配置。默认为7天
2、基于日志大小,通过log.retention.bytes来配置(所有文件的总大小,非单个日志分段的大小,单个日志文件通过,log.segment.bytes来配置)
3、基于日志起始偏移量,判断下一日志文件的偏移量是否大于baseOffset
日志压缩:对于有相同key的不同value值的值保存最新的value
附
配置参考:https://kafka.apache.org/20/documentation.html#configuration