[TOC]
1 简介
MQ满天飞的时代,kafka也只是其中之一。
Kafka,一个基于分布式
的高吞吐量
的消息发布-订阅系统
。
具有快速
、持久
、可扩展
、处理大量不同消费者
的特性。Kafka 不会关心消息的处理过程及消费者队列。
- 磁盘线性读写,
O(1)
读写性能 - 高吞吐量
- 显式的
分布式
架构
详细的介绍可以看这里:http://blog.cloudera.com/blog/2014/09/apache-kafka-for-beginners/
2 名词解释
- broker
kafka集群中的服务器(一个或多个)被称为broker
。
- topic
顾名思义,topic就是主题的意思。也就是对消息的一个分类。kafka中的每个消息都有一个分类/类别,这个类别就是topic。
不同topic的消息在磁盘上是分开存储的,同时对消费者是透明的。也就是说当你消费消息的时候并不需要知道消息是怎么存储的,也不需要知道存储在哪里。
- partition
可以认为是topic的物理分组,一个topic可以分为一个或多个partition。partition一两句话说不清,详情见后面的小节。
- producer
这个好理解,就是消息的生产者。
- consumer
和producer相对应,consumer就是消息的消费者。通常是各种高级语言写的客户端API,比如Java、Python甚至是JavaScript所写的客户端API。
- consumer group
每个consumer都属于一个消费者组
。不显示地指定消费者组的时候属于默认的消费者组。
- message
消息是通信的基本单位,producer
可以向topic
发布message
。新发布的消息就会广播给订阅了这个主题de consumer。message只能传输给某个group中的某一个consumer。
3 partition
kafka作为一个MQ或者日志系统,他的最终数据存储还是离不开磁盘的。kafka对每个消息做了分类,即有了topic,每个topic当然也是持久化在磁盘上的。当消费完之后太过于陈旧的消息(message/topic)将被删除。
鉴于此,将所有topic都存储在同一个目录里将导致磁盘文件太过于庞大,这样一来,管理不便,如果一个磁盘挂了,将导致所有数据丢失;而且磁盘的空间总是有限的。也就是说一个磁盘持久化所有数据在生成环境是不靠谱的。
所以,kafka可以将不同的topic分布式地存储于不同的磁盘上,这就有了kafka所谓的partition的概念。
partition是一个有序的、不可变的消息队列。每个消息都有一个连续的序列号即offset
。每个partition又分为若干个segment
。
最简单的也就是kafka默认的partition分布存储策略就是hash了。简单理解就是所有可用的broker一次轮流一个一个地存储partition。
这么一搞,最终结果就是:
- 每台可用的broker上的数据逗比总数据要少
- 但是每个消息数据都有冗余
- 一般情况下,一台broker宕机并不影响整个系统(当然如果你只有一台broker那就另说了……)
日志文件的存储位置(partition就在这里了)在server.properties中指定: log.dirs=/Users/hylexus/data/kafka/kafka-logs
。
3.1 单机版partition的磁盘存储
此处先从单机版kafka中partition存储方式说起。
单机版并没有partition带来的好处。
本人此处的设置是kafka数据存储在log.dirs=/Users/hylexus/data/kafka/kafka-logs
。
创建一个测试用的topic名为topic01
$ bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 1 \
--partitions 4 \ # 分区数为4
--topic topic01
查看磁盘上partition的存储
hylexus@hylexusPC kafka-logs $ pwd
/Users/hylexus/data/kafka/kafka-logs
hylexus@hylexusPC kafka-logs $ ll
drwxr-xr-x 5 hylexus staff 170 5 1 15:46 topic01-0
drwxr-xr-x 5 hylexus staff 170 5 1 15:46 topic01-1
drwxr-xr-x 5 hylexus staff 170 5 1 15:46 topic01-2
drwxr-xr-x 5 hylexus staff 170 5 1 15:46 topic01-3
hylexus@hylexusPC kafka-logs $
单机版由于只有一个目录,partition分布较为简单。
3.2 多broker下partition的磁盘存储
此处的多broker是如下形式:
单机上启动三个broker,注册到同一个zookeeper。
创建一个topic名为topic02,用以测试
bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 2 \ # 2份拷贝
--partitions 4 \ # 4个partition
--topic topic02
查看磁盘磁盘上partition存储
hylexus@hylexusPC kafka-logs $ pwd
/Users/hylexus/data/kafka/kafka-logs
hylexus@hylexusPC kafka-logs $ ll
drwxr-xr-x 5 hylexus staff 170 5 1 16:29 topic02-0
drwxr-xr-x 5 hylexus staff 170 5 1 16:29 topic02-2
drwxr-xr-x 5 hylexus staff 170 5 1 16:29 topic02-3
hylexus@hylexusPC kafka-logs-02 $ pwd
/Users/hylexus/data/kafka/kafka-logs-02
hylexus@hylexusPC kafka-logs-02 $ ll
drwxr-xr-x 5 hylexus staff 170 5 1 16:29 topic02-0
drwxr-xr-x 5 hylexus staff 170 5 1 16:29 topic02-1
drwxr-xr-x 5 hylexus staff 170 5 1 16:29 topic02-3
hylexus@hylexusPC kafka-logs-03 $ pwd
/Users/hylexus/data/kafka/kafka-logs-03
hylexus@hylexusPC kafka-logs-03 $ ll
drwxr-xr-x 5 hylexus staff 170 5 1 16:29 topic02-1
drwxr-xr-x 5 hylexus staff 170 5 1 16:29 topic02-2
此时的partition磁盘分布如下:
记broker个数为n,则有:
- 第i个partition分配到第
(i % n)
个broker - 第i个Partition的第j个拷贝分配到第
((i + j) % n)
个broker
3.3 partition磁盘存储总结
- 每个partition为一个目录
- partiton命名规则为
topic名称
-有序序号
- 第一个partiton有序序号从0开始
- 序号最大值为partitions数量减1
- 如果你只是调用kafka提供的客户端程序的话,你没有必要清楚每个partition是怎么分布的,因为你只是调用客户端消费数据而已
3.4 partition中segment磁盘存储
接着上面说的,其实每个partition就相当于一个大型文件(整个消息记录)被分配到多个大小相等的文件中存储。大小相等但是消息个数不一定相等了,这样利于管理,可以快速的删除陈旧的文件,有效地提高了磁盘的利用率。
此处所说的每个大小相等的文件就是segment(partition的再次细分)了。
既然数据是存储在磁盘上的,即便比不上磁盘高效,但也要在一个MQ系统能接受的范围内。
所以,索引就必不可少了,kafka的segment文件分为两个主要部分:index+log
- index主要是索引文件,log才是真正的消息数据。
- segment文件命名时:
- 第一个segment文件名称从零开始,前导零填充至19位
- 其他每个segment文件名为上一个segment文件最后一个message的offset
此处就以单机版的kafka为例:
为测试方便,此处我将log.segment.bytes=1024
调整为1024,以便快速看到效果,生成一些message之后,topic01-0磁盘存储如下:
hylexus@hylexusPC topic01-0 $ pwd
/Users/hylexus/data/kafka/kafka-logs/topic01-0
hylexus@hylexusPC kafka-logs $ ll topic01-0
-rw-r--r-- 1 hylexus staff 0 5 1 19:43 00000000000000000000.index
-rw-r--r-- 1 hylexus staff 990 5 1 19:43 00000000000000000000.log
-rw-r--r-- 1 hylexus staff 0 5 1 19:43 00000000000000000012.index
-rw-r--r-- 1 hylexus staff 990 5 1 19:43 00000000000000000012.log
-rw-r--r-- 1 hylexus staff 0 5 1 19:43 00000000000000000024.index
-rw-r--r-- 1 hylexus staff 998 5 1 19:43 00000000000000000024.log
-rw-r--r-- 1 hylexus staff 0 5 1 19:43 00000000000000000036.index
-rw-r--r-- 1 hylexus staff 994 5 1 19:43 00000000000000000036.log
-rw-r--r-- 1 hylexus staff 0 5 1 19:43 00000000000000000048.index
-rw-r--r-- 1 hylexus staff 1004 5 1 19:43 00000000000000000048.log
3.5 通过offset查找message
就拿上面的segment来说,第一个segment的命名为零(前导零).{index,log}。
另外,上面我将每个segment的大小设置为1024即log.segment.bytes=1024
。
00000000000000000000 offset=0
00000000000000000012 offset=12+1==13
00000000000000000024 offset=24+1==25
……
因此,通过offset查找具体消息步骤如下:
- 二分查找定位到segment
- 在某个具体的segment文件中顺序查找到具体message
- 另外,为效率考虑,index所以文件是直接映射到内存的
例如:查找offset==23的message
- 二分查找到具体segment为00000000000000000012
- 在00000000000000000012内部顺序找到offset=23的message