[root@hk bin]# ./kafka-topics.sh --list --zookeeper localhost 2181
__consumer_offsets
zhisheng
__consumer_offsets是kafka创建的,用来记录消费者的消费偏移量
默认会有50个_consumer_offsets-i,唯一的作用是用于判断消费者消费偏移量的系统主题,是系统默认创建的
每个consumer_offsets的结构:
zhisheng自定义主题的分区
默认主题的名称是主题名字-i(i>=0)
分区的属性:
- 每个分区都是一个有序的、不可变的消息序列,后续新来的消息会源源不断地、持续地追加到分区的后面,这相当于一种结构化的提交日志(类似于Git的提交日志)。
- 分区中的每一条消息都会被分配一个连续的id值(即offset),该值用于唯一标识分区中的每一条消息。
分区的作用:
- 分区中的消息数据是存储在日志文件中的,而且同一个分区中的消息数据是按照发送顺序严格有序的,分区在逻辑概念上对应一个日志,当生产者将消息写入到分区中时,实际上是写到分区对应的日志中,而日志可以看作是一种逻辑上的概念,它对应于磁盘上的一个目录,一个日志文件由多个Segment(段)来构成,每个Segment对应于一个索引文件与一个日志文件。
- 借助于分区,我们可以实现Kafka Server的水平扩展,对于一台机器来说,无论是物理机还是虚拟机,其运行能力总归是有上限的。当一台机器到达其能力上限时就无法扩展了,即垂直扩展能力总是受到硬件制约的,通过使用分区,我们可以将一个主题中的消息分散到不同的Kafka Server上(这里需要使用Kafka集群),这样当机器能力不足时,我们只要添加机器就可以了,在新的机器上创建新的分区,这样理论上就可以实现无限的水平扩展能力。
- 分区还可以实现并行处理能力,向一个主题所发送的消息会发送给这个主题所拥有的不同的分区中,这样消息就可以实现并行发送与处理,由多个分区来接收所发送的消息。
创建主题:
[root@hk bin]# ./kafka-topics.sh --zookeeper localhost 2181 --create --topic Topic-1 --replication-factor 1 --partitions 3
Created topic Topic-1.
显示主题:
[root@hk bin]# ./kafka-topics.sh --list --zookeeper localhost 2181
Topic-1
__consumer_offsets
zhisheng
生产者-生产消息:
[root@hk bin]# ./kafka-console-producer.sh --broker-list localhost:9092 --topic Topic-1
1
2>
>>3
>4
>5
>6
>7
>8
>9
>10
>hello
>welcome
>kafka message
消费者-消费消息:
[root@hk bin]# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic Topic-1 //这个命令不会消费历史消息
^CProcessed a total of 0 messages
[root@hk bin]# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic Topic-1//会从开始生产出开始消费
2
5
8
3
6
9
1
4
7
10
hello
welcome
kafka message
上述的消息顺序是无序的,因为存在多个分区。
Segment: /'segm(ə)nt/ 段:
一个分区(Partition)是由一系列有序的、不可变的消息所构成的,一个Partition中的消息数量可能会非常多,因此当然不能将所有的消息都保存到一个文件当中,因此,类似于log4j中的rolling log (卷动日志),当Partition中的消息数量增长到一定程度之后,消息会进行切割,新的消息会被写到一个新的文件当中,当新的文件增长到一定程度后,新的消息会被写到另一个文件当中,以此类推,这一个个新的数据文件我们就称之为Segment(段)。
因此,一个Partition在物理上是由一个或者多个Segment所构成的,每个Segment中则会保存真实的消息数据。
观察一下正式环境Kafka的分区内容:
存在多个.log文件,
Topic -Partition: 1:N ,Partition-Segment: 1:N
关于Partition与Segment之间的关系:
- 每个Partition都相当于一个大型的文件被分配到多个大小相等的Segment数据文件中,每个Segment中的消息数量未必相等(这与消息的大小有着紧密的联系,不同的消息所占据的磁盘空间显然是不一样的),这个特点使得老的Segment文件可以很容易就被删除掉,有助于提升磁盘的利用效率。
- 每个Partition只需要支持顺序读写即可,Segment文件的生命周期是由Kafka Server的配置参数决定的,比如说server.properties文件中的参数项:
The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168
默认是7天,表示7天后删除老的消息文件
关于分区中的文件的含义及作用(4个):
00000000000000042610.index:它是Segment文件的索引文件,它与.log文件时成对出现的,后缀.index就表示这是个索引文件。
00000000000000042610.log:它是Segment文件的数据文件,用于存储实际的数据,是二进制格式的文件,方便压缩。Segment文件的命名规则是Partition全局的第1个Segment从0开始,后续每个Segment文件名为上一个Segment文件中最后一个消息的offset值,没有数字则使用0填充(20位),由于这里主题的消息较少,因此只有一个数据文件。
00000000000000042610.timeindex:该文件是一个基于消息日期的索引文件,主要用途是在一些根据日期或是时间来寻找消息的场景下使用,此外在基于时间的日志rolling或是基于时间的日志保留策略等情况下也会使用,实际上,该文件在Kafka较新的版本中才增加的,老版本的Kafka Server中是没有该文件的,它是对.index文件的一个有益补充,.index文件时基于偏移量的索引文件,而.timeindex是基于时间戳的索引文件。
leader-epoch-checkpoint:是leader的一个缓存文件,实际上,它是与Kafka的HW(High Water)与LEO(Log End Offset)相关的一个重要文件。
其实还有一个.snapshot:
00000000000000042610.snapshot:
If snapshot files are deleted, during start up method log.loadSegmentFiles(), all messages in the partition will have to be read to recreate the snapshot even if log and index files are present. This will increase the time to load partition.
For contents of snapshot file, please refer writeSnapshot() in ProducerStateManager.writeSnapshot()
Parameter log.dir defines where topics (ie, data) is stored (supplemental for log.dirs property).
提供了对于一个时间点的数据备份
A snapshot basically gives you a copy of your data at one point in time.
In a situation like yours, instead of waiting for a response, you could:
change the log.dirs path, restart everything and see how it goes;
backup the snapshots, saving them in a different location, delete them all from the previous one and see how it goes.
After that you're meant to be able to start up Kafka.