消息持久化和缓存
Kafka高度依赖文件系统来存储和缓存消息。一般的人都认为“磁盘是缓慢的”,这使得人们对“持久化结构提供具有竞争性的性能”这样的结论持有怀疑态度。实际上,磁盘比人们预想的快很多也慢很多,这取决于它们如何被使用;一个好的磁盘结构设计可以使之跟网络速度一样快。
一个有关磁盘性能的关键事实是:磁盘驱动器的吞吐量跟寻道延迟是相背离的。结果就是:在一个6 7200rpm SATA RAID-5 的磁盘阵列上线性写的速度大概是300M/秒,但是随机写的速度只有50K/秒,两者相差将近10000倍。线性读写在大多数应用场景下是可以预测的,因此,操作系统利用read-ahead和write-behind技术来从大的数据块中预取数据,或者将多个逻辑上的写操作组合成一个大写物理写操作中,对磁盘的线性读在有些情况下可以比内存的随机访问要快一些。消息读取
Kafka在读方面使用了sendfile这个高级系统函数,也即zero-copy技术,感兴趣的同学可以去阅读IBM的文章。 这项技术通过减少系统拷贝次数,极大地提高了数据传输的效率。为了理解sendfile的影响,需要理解一般的将数据从文件传到套接字的路径:
1)操作系统将数据从磁盘读到内核空间的页缓存中
2)应用将数据从内核空间读到用户空间的缓存中
3)应用将数据写回内存空间的套接字缓存中
4)操作系统将数据从套接字缓存写到网卡缓存中,以便将数据经网络发出
这样做明显是低效的,这里有四次拷贝,两次系统调用。如果使用sendfile,再次拷贝可以被避免:允许操作系统将数据直接从页缓存发送到网络上。所以在这个优化的路径中,只有最后一步将数据拷贝到网卡缓存中是需要的。
我们期望一个主题上有多个消费者是一种常见的应用场景。利用上述的零拷贝,数据只被拷贝到页缓存一次,然后就可以在每次消费时被重得利用,而不需要将数据存在内存中,然后在每次读的时候拷贝到内核空间中。这使得消息消费速度可以达到网络连接的速度。端到端的批量压缩
在许多场景下,瓶颈实际上不是CPU而是网络。这在需要在多个数据中心之间发送消息的数据流水线的情况下更是如此。当然,用户可以不需要Kafka的支持而发送压缩后的消息,但是这会导致非常差的压缩率。高效的压缩需要将多个消息一块儿压缩而不是对每一个消息进行压缩。理想情况下,这可以在端到端的情况下实现,数据会先被压缩,然后被生产者发送,并且在服务端也是保持压缩状态,只有在最终的消费者端才会被解压缩。
Kafka通过递归消息集合来支持这一点。一批消息可以放在一起被压缩,然后以这种形式发给服务器。这批消息会被递送到相同的消费者那里,并且保持压缩的形式,直到它到达目的地,Kafka支持GZIP和Snappy压缩协议。消费状态由消费者自己维护
在Kafka中,消费者负责记录状态信息(偏移量),也就是已经消费到哪个位置了。准确地说,消费者库将他们的状态信息写到zookeeper中。但是,将状态数据写到另一个地方——处理结果所存放的数据中心——可能会更好。打个比方,消费者可能只需要简单地将一些合计值写到中心化的事务型OLTP数据库中。在这种情况下,消费者可以将状态信息写到同一个事务中。这解决了分布式一致性问题——通过去除分布式部分。类似的技巧可以用在一些非事务型的系统中。一个搜索系统可以将消费者状态存放在索引块中。尽管这不提供持久性保证,但这意味着索引可以和消费者状态保持同步:如果一个没有刷新的索引块在一次故障中丢失了,那么这些索引可以从最近的检查点偏移处开始重新消费。同样的,在并行加载数据到Hadoop时,可以利用类似的技巧。每个mapper在map 任务的最后将偏移量写到HDFS中。这样的话,如果一个加载任务失败了,每个mapper可以简单地从存储在HDFS中的偏移量处重启消费。
这个决定有另外一个好处。消费者可以重新消费已经消费过的数据。这违反了队列的性质,但是这样可以使多个消费者一起来消费。打个比方,如果一段消费者代码出bug了,在发现bug之间这个消费者又消费了一堆数据,那个在bug修复之后,消费者可以从指定的位置重新消费。自动的生产者负载均衡
Kafka支持消息生产者在客户端的负载均衡,或者利用专有的负载均衡器来均衡TCP连接。一个专用的四层均衡器通过将TCP连接均衡到Kafka的broker上来工作。在这种配置下,所有的来自同一个生产者的消息被发送到一个borker上,这种做法的优点是,一个生产者只需要一个TCP连接,而不需要与zookeeper的连接。缺点是负载均衡只能在TCP连接的层面上来做,因此,它有可能不是均衡得非常好(如果一些生产者比其他生产者生产更多的消息,给每个broker分配相同的TCP连接不一定会使每个broker得到相同的消息)。
基于zookeeper的客户端的负载均衡可以解决这个问题。它允许生产者动态地发现新的broker,并且在每个请求上进行负载均衡。同样的,它允许生产者根据一些键将数据分开,而不是随机分,这可以增加与消费者的粘性(比如,根据用用户id来化分数据的消费)。这个特性被称为“语义化分”,下文会详述。
这种基于zookeeper的负载均衡如下所述。zookeeper watchers注册以下一些事件:
1)一个新的broker启动
2)一个broker关闭
3)一个新的主题注册进来
4)一个borker注册一个已经存在的主题
在内部,生产者维护一个与borker的弹性连接池。这个连接池通过zookeeper watchers的回调函数来保持更新以便与所有存活的broker建立或保持连接。当一个生产者对某一个主题的请求上来时,一个主题的分区被分区器提取到。连接池中的一个连接被用来将数据发送到前面所选的那个broker分区中。拉还是推?
Kafka采用的策略是:生产者把数据推到borker上,而消费者主动去broker上拉数据。最近的一些系统包括flume和scribe,都是broker将数据推给消费者,这有可能会存在一个问题,如果推的速度过快,消费者会被淹没。而在Kafka中不会出现这样的问题,因为消费者是主动去borker上拉数据的。异步发送
异步的非阻塞发送对于扩展消息系统是基本的。在Kafka中,生产者提供一个选项用来使用生产请求的异步分派(producer.type=async)。这允许将生产请求缓存在一个内存队列中,然后在被一个时间间隔或者预先设定的batch大小触发时发送出去。由于数据是从异构的机器上以不同的速率发布的,这种异步的缓存机制可以生成统一的通往broker的traffic, 从而使得网络资源得到充分利用,同时也提高吞吐量。Replication & Leader election
Kafka从0.8开始提供partition级别的replication,replication的数量可在$KAFKA_HOME/config/server.properties
中配置。
default.replication.factor = 1
该 Replication与leader election配合提供了自动的failover机制。replication对Kafka的吞吐率是有一定影响的,但极大的增强了可用性。默认情况下,Kafka的replication数量为1。 每个partition都有一个唯一的leader,所有的读写操作都在leader上完成,leader批量从leader上pull数据。一般情况下partition的数量大于等于broker的数量,并且所有partition的leader均匀分布在broker上。follower上的日志和其leader上的完全一样。
和大部分分布式系统一样,Kakfa处理失败需要明确定义一个broker是否alive。对于Kafka而言,Kafka存活包含两个条件,一是它必须维护与Zookeeper的session(这个通过Zookeeper的heartbeat机制来实现)。二是follower必须能够及时将leader的writing复制过来,不能“落后太多”。
leader会track“in sync”的node list。如果一个follower宕机,或者落后太多,leader将把它从”in sync” list中移除。这里所描述的“落后太多”指follower复制的消息落后于leader后的条数超过预定值,该值可在 $KAFKA_HOME/config/server.properties
中配置
#If a replica falls more than this many messages behind the leader, the leader will remove the follower from ISR and treat it as deadreplica.lag.max.messages=4000#If a follower hasn't sent any fetch requests for this window of time, the leader will remove the follower from ISR (in-sync replicas) and treat it as deadreplica.lag.time.max.ms=10000
需要说明的是,Kafka只解决”fail/recover”,不处理“Byzantine”(“拜占庭”)问题。
一条消息只有被“in sync” list里的所有follower都从leader复制过去才会被认为已提交。这样就避免了部分数据被写进了leader,还没来得及被任何follower复制就宕机了,而造成数据丢失(consumer无法消费这些数据)。而对于producer而言,它可以选择是否等待消息commit,这可以通过 request.required.acks
来设置。这种机制确保了只要“in sync” list有一个或以上的flollower,一条被commit的消息就不会丢失。
这里的复制机制即不是同步复制,也不是单纯的异步复制。事实上,同步复制要求“活着的”follower都复制完,这条消息才会被认为commit,这种复制方式极大的影响了吞吐率(高吞吐率是Kafka非常重要的一个特性)。而异步复制方式下,follower异步的从leader复制数据,数据只要被leader写入log就被认为已经commit,这种情况下如果follwer都落后于leader,而leader突然宕机,则会丢失数据。而Kafka的这种使用“in sync” list的方式则很好的均衡了确保数据不丢失以及吞吐率。follower可以批量的从leader复制数据,这样极大的提高复制性能(批量写磁盘),极大减少了follower与leader的差距(前文有说到,只要follower落后leader不太远,则被认为在“in sync” list里)。
上文说明了Kafka是如何做replication的,另外一个很重要的问题是当leader宕机了,怎样在follower中选举出新的leader。因为follower可能落后许多或者crash了,所以必须确保选择“最新”的follower作为新的leader。一个基本的原则就是,如果leader不在了,新的leader必须拥有原来的leader commit的所有消息。这就需要作一个折衷,如果leader在标明一条消息被commit前等待更多的follower确认,那在它die之后就有更多的follower可以作为新的leader,但这也会造成吞吐率的下降。
一种非常常用的选举leader的方式是“majority 灵秀”(“少数服从多数”),但Kafka并未采用这种方式。这种模式下,如果我们有2f+1个replica(包含leader和follower),那在commit之前必须保证有f+1个replica复制完消息,为了保证正确选出新的leader,fail的replica不能超过f个。因为在剩下的任意f+1个replica里,至少有一个replica包含有最新的所有消息。这种方式有个很大的优势,系统的latency只取决于最快的几台server,也就是说,如果replication factor是3,那latency就取决于最快的那个follower而非最慢那个。majority vote也有一些劣势,为了保证leader election的正常进行,它所能容忍的fail的follower个数比较少。如果要容忍1个follower挂掉,必须要有3个以上的replica,如果要容忍2个follower挂掉,必须要有5个以上的replica。也就是说,在生产环境下为了保证较高的容错程度,必须要有大量的replica,而大量的replica又会在大数据量下导致性能的急剧下降。这就是这种算法更多用在 Zookeeper 这种共享集群配置的系统中而很少在需要存储大量数据的系统中使用的原因。例如HDFS的HA feature是基于 majority-vote-based journal ,但是它的数据存储并没有使用这种expensive的方式。
实际上,leader election算法非常多,比如Zookeper的 Zab , Raft 和 Viewstamped Replication 。而Kafka所使用的leader election算法更像微软的 PacificA 算法。
Kafka在Zookeeper中动态维护了一个ISR(in-sync replicas) set,这个set里的所有replica都跟上了leader,只有ISR里的成员才有被选为leader的可能。在这种模式下,对于f+1个replica,一个Kafka topic能在保证不丢失已经ommit的消息的前提下容忍f个replica的失败。在大多数使用场景中,这种模式是非常有利的。事实上,为了容忍f个replica的失败,majority vote和ISR在commit前需要等待的replica数量是一样的,但是ISR需要的总的replica的个数几乎是majority vote的一半。
虽然majority vote与ISR相比有不需等待最慢的server这一优势,但是Kafka作者认为Kafka可以通过producer选择是否被commit阻塞来改善这一问题,并且节省下来的replica和磁盘使得ISR模式仍然值得。
上文提到,在ISR中至少有一个follower时,Kafka可以确保已经commit的数据不丢失,但如果某一个partition的所有replica都挂了,就无法保证数据不丢失了。这种情况下有两种可行的方案:
等待ISR中的任一个replica“活”过来,并且选它作为leader
选择第一个“活”过来的replica(不一定是ISR中的)作为leader
这就需要在可用性和一致性当中作出一个简单的平衡。如果一定要等待ISR中的replica“活”过来,那不可用的时间就可能会相对较长。而且如果ISR中的所有replica都无法“活”过来了,或者数据都丢失了,这个partition将永远不可用。选择第一个“活”过来的replica作为leader,而这个replica不是ISR中的replica,那即使它并不保证已经包含了所有已commit的消息,它也会成为leader而作为consumer的数据源(前文有说明,所有读写都由leader完成)。Kafka0.8.*使用了第二种方式。根据Kafka的文档,在以后的版本中,Kafka支持用户通过配置选择这两种方式中的一种,从而根据不同的使用场景选择高可用性还是强一致性。
上文说明了一个parition的replication过程,然尔Kafka集群需要管理成百上千个partition,Kafka通过round-robin的方式来平衡partition从而避免大量partition集中在了少数几个节点上。同时Kafka也需要平衡leader的分布,尽可能的让所有partition的leader均匀分布在不同broker上。另一方面,优化leadership election的过程也是很重要的,毕竟这段时间相应的partition处于不可用状态。一种简单的实现是暂停宕机的broker上的所有partition,并为之选举leader。实际上,Kafka选举一个broker作为controller,这个controller通过watch Zookeeper检测所有的broker failure,并负责为所有受影响的parition选举leader,再将相应的leader调整命令发送至受影响的broker,过程如下图所示。
这样做的好处是,可以批量的通知leadership的变化,从而使得选举过程成本更低,尤其对大量的partition而言。如果controller失败了,幸存的所有broker都会尝试在Zookeeper中创建/controller->{this broker id},如果创建成功(只可能有一个创建成功),则该broker会成为controller,若创建不成功,则该broker会等待新controller的命令。