kafka学习总结

kafka概述

基于2.0

kafka架构图:


整个生成消费里分四块Producer、Borker,Consumer和zk。

1、Producer,负责生产数据

2、Consumer,负责消费数据,Consumer可以分为多个组,组里可以包含一个或多个Consumer,组内只能消费同一条消息一次,不同的组可以重复消费同一条消息。

3、ZooKeeper,注册中心,会记录集群信息

4、Borker,一个独立的Kafka节点,多个broker组成kafka集群。

kafka中有两个重要的概念,topic和分区

topic代表一个主题,相当于一个消息队列,一个topic可以分为多个分区

不同的分区保存的消息是不同的,消息在分区内部是有序的,对于topic来说是无序的(Topic有一个分区除外)每个分区会有多个副本,其中只有一个是leader副本

消息在发送的时候会指明要发送到哪一个分区,默认是轮训分区进行添加

分区可以看作是一个可以追加的日志文件,每条消息被追加到分区的时候都有一个offset,通过offset和分区号就可以找对应的消息

对于分区的名词解释:AR:所有副本,ISR:与leader副本保持一致的副本,包括leader副本,HW:高水位,表示消费者可以拉取到的最高偏移量,LEO:表示下一条代写入的位置,LAG:消费者没有消费的记录数

 HW和LEO的作用: 生产者发送了一条消息,broker接收到后,其他副本还没有来的及同步,此时是消费者是不能消费的,这个时候LEO会比HW大。


消息生命周期过程:

producer生产消息,指明要发送的topic和分区,等待消息发送成功

kafka收到消息存储到指定分区,副本进行消息的同步,返回添加结果,producer收到发送结果

Consumer拉取消息进行消费,拉取的时候会指明topic和分区,已经要拉取的offset等值

拉取后进行,消费消息消费,消费完成后进行offset偏移量的提交

日志清理在一定的周期条件内将消息清除

Producer介绍

Producer客户端一共有两个重大版本,Java版本和Scala版本(Old Producer),Kafka 0.9版本正式使用Java版本的producer替换了原Scala版本的producer,下面都是基于Java版本介绍

消息发送

消息发送的,

两种API发送消息的方式:

Future send(ProducerRecord record)(相当于send(record,null))

Future send(ProducerRecord record, Callback callback)

对于客户端来说三种处理方式,直接发送,同步获取结果,异步获取结果。

ProducerRecord属性:

private final Stringtopic; //要发送的topic

private final Integerpartition; //要发送的分区

private final K key; //消息的key(对分区和消息清理都会有作用)

private final V value; //消息体

private final Longtimestamp; //时间戳,分CreateTime和LogAppendTime

消息发送成功判断的三种配置:直接发送,leader已经接收,所有ISR副本都已经同步成功,通过acks来配置,分别对应,0,1,-1(all)

序列化器

序列化作用是将发送的数据转成字节码在网络间传输

默认为StringSerialize,自定义序列化通过实现,org.apache.kafka.common.serialization.Serializer接口

序列化必须和消费者的反序列化一样,否则可能会导致消费出错,所以自定义序列化必须在consumer中自定义反序列化

分区器

分区器的作用是指明消息要发往的分区

当ProducerRecord中已经指明分区号的时候就不需要分区器

默认的分区器为DefaultPartitioner,实现的分区规则为:

    1、如果消息中的key不为空,则对分区的key进行hash(MurmurHash2算法)来计算分区号,相同的Key会被写入同一个分区

    2、如果Key为空就以轮训的方式发送主题内各个分区

    注意:在分区数一旦添加的话,Key与分区之间的映射就可能有问题。

自定义分区器的实现需要实现:org.apache.kafka.clients.producer.Partitioner接口

拦截器

自定义拦截器需要实现org.apache.kafka.clients.producer.ProducerInterceptor接口

//发送前执行

public ProducerRecord onSend(ProducerRecord record);

//发送成功或失败是执行,优先于Callback

public void onAcknowledgement(RecordMetadata metadata, Exception exception);

//关闭后执行

public void close();

可以配置多个拦截器,配置的时候用逗号隔开,拦截器的执行顺序会按照配置顺序来执行

消息累加器

消息累加器相当于一个Map<partationId,Queue<ProducerBatch>>的数据结构,用来暂存要发送的消息。ProducerBatch是producerRecord的集合,当有新的发送消息进来时:

首先根据分区号找到对应队列,获取队列中的最后一个ProducerBatch

将ProducerRecorder插入ProducerBatch中

如果ProducerBatch已经达到最大容量,则新增一个ProducerBatch将ProducerBatch放入队尾,并把数据插入新创建的Batch中。

当ProducerBatch达到最大容量的时候Sender线程会进行发送

Sender线程

Sender线程是将消息发送到Broker的线程

从消息累加器中拿到消息,将Map<partationId,Queue<ProducerBatch>>转为Map<Node,List<ProducerBatch>>的形式之后转成<Node,Request>

将发送的Request记录到inFlightRequests中,Map<Node,Request>,记录的是已经发出但是还没有收到响应的请求。

生产流程


RecordAccumulator(双端队列):消息累加器,默认为32M,通过buffer.memory来配置,消息量过大时会阻塞,阻塞时间长度为max.block.ms配置,默认为60s

ProducerBatch:消息的集合、批次,会包含一条甚至多条ProducerRecorder,batch.size的配置会影响ProducerBatch大小。

retries(默认为0)和retry.backoff.ms(默认为100)重试次数和重试间隔时长,(max.in.flight.requests.per.connection参数配置为 1可以解决重发乱序问题,但是会影响性能)

lingers.ms指定producerBatch等待producerRecord加入的时间。

重要的参数

acks:发送成功的衡量标准,默认为1(Leader写入成功即可)

max.request.size:客户端发送消息的最大值,默认为1M(broker中有message.max.bytes参数的配置,需要同步配置,不能比max.request.size小)

retries(默认为0)和retry.backoff.ms(默认为100)重试次数和重试间隔时长

lingers.ms指定producerBatch等待producerRecord加入的时间默认为0

max.in.flight.requests.per.connection客户端和broker的链接数,默认为5.

consumer 介绍

消费者组


消费者组中会有多个消费者构成,组内的所有消费者协调在一起来消费订阅主题(subscribed topics)的所有分区(partition)

consumer group下可以有一个消费者或多个消费者

消费者组通过group.id(字符串),来唯一标识一个consumer group

consumer group下订阅的topic下的每个分区只能分配给某个group下的一个consumer

组内消费者共同分到不同的分区进行消费。一个分区只能有组内的一个消费者订阅,但可以被不同消费组中的消费者同时订阅。

消费者分区分配策略

两个主题,topic0,topic1都有三个分区,t0p0、t0p1、t0p2、t1p0、t1p1、t1p2两个消费者同时订阅

分区分配策略有三种:

RangeAssignor分配策略(默认策略)

按照消费者总数和分区总数进行整除获得一个跨度,来进行分区分配

即:

C1:t0p0、t0p1、t1p0、t1p1

C2:t0p2、t1p2

RoundRobinAssignor分配策略

通过轮训的方式将分区进行分配

即:

C1:t0p0、t0p2、t1p1

C2:t0p1、t1p0、t1p2

StickAssignor分配策略

两个原则

1、分区尽可能均匀     2、分配尽可能与上次分配相同

C1:t0p0、t0p2、t1p1

C2:t0p1、t1p0、t1p2

在重新分配是会与RoundRobinAssignor不一样,如:

C0:t0p0、t1p1、t3p0

C1:t0p1、t2p0、t3p1

C2:t1p0、t2p1

当C1离开后变为:

C0:t0p0、t1p1、t3p0、t2p0

C2:t1p0、t2p1、t0p1、t3p1

自定义分区策略,需要实现org.apache.kafka.clients.consumer.internals.PartitionAssignor接口

分区策略通过,partition.assignment.strategy来配置。

消费者订阅/取消订阅

订阅API为四种:

subscribe(Collection<String> topics) //主题

subscribe(Collection<String> topics, ConsumerRebalanceListener listener) //主题,重平衡监听器

subscribe(Pattern pattern, ConsumerRebalanceListener listener)//正则,重平衡监听器

subscribe(Pattern pattern)//正则

assign(TopicPartition)//指明订阅主题中的分区

取消订阅为两种:

unsubscribe()和subscribe(new ArrayList())

消费者消息拉取API

consumer.poll(Duration timeout);

拉取消息的时候,客户端在发送请求时会告诉broker要哪个分区的哪个offset之后的数据

拉取消息后数据会经过反序列化,反序列器必须和生产者的序列化器相对应,还会经过消费者拦截器,配置见下

拉取数据有可以有最大拉取条数,和最大拉取的数据量大小控制,具体配置参数见下。

拉取到的消息对象说明:

private final Stringtopic; //topic

private final int partition; //分区

private final long offset; //偏移量

private final long timestamp; //时间戳

private final TimestampTypetimestampType; 时间戳类型,分CreateTime和LogAppendTime

private final long checksum; //校验值

private final int serializedKeySize; //序列化key长度

private final int serializedValueSize; //序列化value长度

private final K key; //key的值

private final V value;//value的值

消费者提交offset

位移提交分为三种,自动提交(通过配置参数来控制,不推荐),手动同步提交,手动异步提交

api有四种,为:

//手动同步提交

consumer.commitSync();

//手动异步提交

consumer.commitAsync();

consumer.commitAsync(OffsetCommitCallback callback);

consumer.commitAsync(Map<TopicPartiton,OffsetAndMetadata> offsets, OffsetCommitCallback callback)

重要参数

fetach.max.bytes:一次拉取的最大数据量,默认为50M

fetch.min.bytes/fetch.max.wait.ms:拉取最小数据量和拉取最大等待时间,默认为1B和500ms

max.poll.records:最大拉取的消息数

auto.offset.reset:开始消费位置,latest(默认,重最新开始拉取),earliest,none

auto.commit.interval.ms:默认自动提交位移的周期,默认5000

partition.assignment.stategy:默认分区策略

interceptor.class:客户端拦截器

主题和分区

主题就是消息的归类(队列),主题细分可以分为一个或多个分区,分个分区可以有一个或多个副本,每一个副本多对应一个日志文件,每个日志文件可以分为多个日志分段,每个日志分段可以分为索引文件,日志存储文件和快照文件

主题

主题创建kafka-topic.sh来创建主题,指明分区和副本数量,例如:

./bin/kafka-topics.sh --create --zookeeper 172.16.14.100:2181 --topic test2 --partitions 10 --replication-factor 20

当broker端参数配置为auto.create.topics.enable为true时,当生产者生产和消费者消费时,如果主题不存在,会自动创建主题。

创建主题的副本数不能大于broker数,分区数可以大于

主题命名时,不能用__开头,__开头的默认为内部主题

查看、修改和删除主题都可以使用kafka-topics.sh来操作

分区

分区副本的存储主要有三个文件,日志文件,位移索引文件和时间索引文件

同一个副本下的文件,如:

00000000000000000000.log,00000000000000000000.index,00000000000000000000.timeindex

00000000000000000155.log,00000000000000000155.index,00000000000000000155.timeindex

命名规则:名称固定为20位数字,由0和日志中的第一条偏移量来命名。log结尾表示日志文件,index结尾则为offset索引文件,timeindex为时间索引文件,索引文件都为稀疏索引,只有当消息写入一定量的时候才会增加索引文件记录。由。log.index.interval.bytes来配置,默认为4KB

offset索引文件:每个索引项由8个字节组成,relativeOffset和position,relativeOffset代表的是相对的偏移量。存储格式为:

0000 0006 0000 009c(r0ffset= 6,position=156)

0000 000e 0000 01cb(r0ffset=14,position=495)

0000 0016 0000 02fa(r0ffset=22,position=656)

时间索引文件:与offset索引文件类似。长度为12字节,分为两个部分,timestamp和relativeOffset,查找时需要从offset索引文件中查找position

日志片段拆分条件:

1、日志分段超过log.segment.bytes配置的值默认为1G

2、当消息中的最大时间戳大于log.roll.ms(hours)时,默认为7天

3、偏移量或所有文件大于log.index.size.max.bytes,默认为10M

4、消息追加的偏移量大于Integer.MAX_VALUE

位移主题

__consumer_offsets主题为内部主题中会保存各个消费者提交的位移,通过group.id的hash来得到具体要存放到哪个分区。默认创建50个分区,3个副本,通过offset.topic.num.partitions和offsets.topic.replication.factor来配置。

日志清理

日志清理有两种策略:日志删除和日志压缩,通过log.cleanup.policy来设置清理策略。默认问delete,可以设置为compact,日志清理可以控制到主题级别,通过配置不同的参数

日志清理由一个单独的日志清理线程来执行,默认为5分种扫描一次,通过log.retention.check.interval.ms来配置

日志删除有两种策略:

1、基于时间,通过log.retention.ms来配置。默认为7天

2、基于日志大小,通过log.retention.bytes来配置(所有文件的总大小,非单个日志分段的大小,单个日志文件通过,log.segment.bytes来配置)

3、基于日志起始偏移量,判断下一日志文件的偏移量是否大于baseOffset

日志压缩:对于有相同key的不同value值的值保存最新的value

配置参考:https://kafka.apache.org/20/documentation.html#configuration

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,884评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,755评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,369评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,799评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,910评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,096评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,159评论 3 411
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,917评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,360评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,673评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,814评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,509评论 4 334
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,156评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,882评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,123评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,641评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,728评论 2 351

推荐阅读更多精彩内容