Kafka

分布式系统中,系统由多个子系统组成,数据需要在子系统中高性能、低延迟的流转。Kafka是"发布-订阅"消息系统,是一个分布式的、分区的日志服务,用于处理活跃的流式数据

特点

  1. 高吞吐量:同时为发布和订阅提供高吞吐量,读速度为100MB/s,写速度为50MB/s
  2. 持久化:消息持久化在磁盘上,同时replication防止数据丢失
  3. 分布式:所有的producer、broker和consumer都支持多个
  4. 消息处理状态由consumer端使用offset维护,而不是由broker维护

Kafka中Zookeeper的作用

Zookeeper是1个分布式系统中的协调服务,主要用于集群中不同节点间的相互通信,解决节点一致性问题

Kafka中,Zookeeper用于存储Consumer Group信息和偏移量offset,分区再平衡,实时监控节点信息,Broker leader选举,Producer和Consumer用Zookeeper来发现Broker列表
(Producer和Consumer需要知道哪些Broker是可用的,没有zk,每个Producer和Consumer客户端在生产和消费之前都需要检查Broker是否可用,效率太低)
Zookeeper不保存消息,消息保存在Broker上。从Kafka0.9开始,Consumer group和offset信息不保存在zk上,也保存在Broker上

  1. Broker

(1)Broker启动时,向zk注册。在zk中,Broker是1个临时节点znode,当Broker与zk断开连接时,znode被删除
/brokers/ids/brokerid

(2)Broker启动时,向zk注册持有的topic和partition,是1个临时节点
/brokers/topics/topic_name/partition_index

(3)集群中的Broker在启动时,会进行leader选举。Broker会尝试在zk上创建/controller临时节点,成功创建的Broker会成为集群的leader,其它Broker在leader确定时,会收到"节点已存在"的消息,并在leader创建的/controller临时节点上设置Watch。当leader挂掉后,leader创建的临时节点会被zk删除,其它Broker会收到Watch通知,并尝试创建/controller临时节点,创建成功的Broker成为新的leader

(4)Broker leader负责Partition leader的选举
当1个分区的leader所在的Broker挂掉时,Broker会遍历分区的所有副本,在副本中选择1个新的分区leader,简单说,就是在isr列表中选择第1个副本作为新的分区leader。选出新的分区leader后,Broker leader向所有Broker通知这1变动,新的分区leader开始处理读写请求,分区follower开始同步新的分区leader的消息

  1. Producer

使用zk发现Broker列表,和Topic下的Partition的leader建立连接

  1. Consumer

发现Broker列表,和Topic下的Partition的leader建立连接,注册Consumer信息

Producer和Consumer需要知道哪些Broker是可用的,没有zk,每个Producer和Consumer客户端在生产和消费之前都需要检查Broker是否可用,效率太低

相关概念

  1. 生产者Producer,向Kafka集群发送消息
  2. 消费者Consumer,与Kafka集群中的Broker实例建立长连接,不断拉取消息,然后进行处理
  3. 主题Topic,通过对消息指定主题,来对消息分类

(1)1个主题Topic被认为是1类消息,每个Topic被分为多个分区Partition,Partition在存储层面是1个增量的文件,发布到partition的消息会被追加到文件的末尾,每条消息在文件中的位置称为偏移量offset,是1long型的数字,唯一标记1条消息

(2)创建分区时,需要指定分区数partition和复制系数(默认都是1)
分区数越大,吞吐量越大,需要的资源也就越多,Kafka集群在接收到Producer生产的消息时,会根据均衡策略(轮询,Key-Hash,Random随机)将消息存储到不同的partition。通过partition存储消息,既可以存储更多的数据(partition分布在多个Broker上,避免文件达到单机磁盘上限),也可以提高读写效率(多个Broker处理读写操作的效率,肯定大于1个Broker)
复制系数的目的是冗余备份,分为leader和follower,leader负责处理所有的读写请求,follower会定期同步leader的数据,当leader挂掉,Kafka会选择1个follower成为leader

创建Topic时,Kafka集群会决定如何将partition及其副本分配到Broker上。规则是
(a)不同Partition的leader分配到不同的Broker上,相同Partition的leader和follower分配到不同的Broker上
例如,有5个Broker,Toipc有10个Partition,复制系数为3,则需要在5个Broker上分配30个副本
(a)随机选择1个Broker,假如是Broker4,将Partition1的leader分配给Broker4,之后使用轮询的方式将其他leader分配给Broker,Partition2的leader分配给Broker5,Partition3的leader分配给Broker1
(b)分配完Partition leader,会分配follower,需要保证1个Partition的leader和follower分配在不同的Broker

(3)被消费的消息不是立刻被删除,文件会根据Broker的配置,保留一段时间后才删除。默认为7天

  1. 偏移量offset,Consumer持有的元数据,offset是Consumer当前消费消息在Kafka文件中的位置

分组消费

Consumer需要做1些高延迟的操作,例如数据写入DB、使用数据进行耗时计算。此时,单个Consumer无法跟上Producer生产数据的速度,使用分组消费模式,增加Consumer的个数,提高处理能力

  1. 分区再平衡 Rebalance
    分组消费模式下,1个Consumer只能消费1个Topic下的1个分区,Consumer和Partition是有对应关系的,当出现1些情况,例如
    (1)Consumer加入Consumer Group
    (2)Consumer退出Consumer Group或取消订阅
    (3)Topic下的partition个数增加
    Consumer Group中的协调者Coordinator使用分区再平衡机制,来调整Consumer和Partition的对应关系

  2. Coordinator

Kafka0.9之后,Consumer的offset不再存放在zk上,而是存在Broker上,存放的位置是使用Math.abs()对groupId.hashCode()取绝对值,再对offsets.topic.num.partitions(配置文件中的offset分区总数,默认50)取余

Kafka0.9后,每个Consumer Group都有1个coordinator,用来管理Consumer Group和offset。coordinator就是当前Consumer Group存放offset的这个partition的leader所在的Broker。coordinator 负责与Consumer Group中的所有Consumer进行协调通信
(1)定期发送心跳,监控Consumer存活情况
(2)Consumer离开Consumer Group时,会告诉coordinator
(3)将parititon的分配情况,通知组内所有Consumer

Kafka提供了2套API,High-Level Consumer API和SimpleConsumer API
High-Level Consumer API提供了对Consumer Group各种操作的封装,在消费消息时,不需要关注offset的提交,会自动提交;若使用多线程
(1)如果Consumer线程>partition,会有线程收不到消息
(2)如果Consumer线程<partition,会有线程收到多个partition的消息
(3)如果1个线程消费多个partition,无法保证收到消息的顺序,而1个partition中的消息时有序的

如果Consumer希望从头开始消费Partition的全量数据
(1)使用新的Consumer Group,即"group.id"
(2)指定"auto.offset.reset"为earliest

[KafkaConsumer is not safe for multi-threaded access]

SimpleConsumer API可以多Kafka进行更精确的控制,但是需要自己编写代码,处理offset的提交,leader的变更等

生产者

消息发送流程:
(1)创建KafkaProducer对象,初始化组件,例如缓冲区,发送消息线程
(2)创建ProducerRecord消息对象,设置消息Topic,key和value,将key和value序列化为byte[]
(3)分区器为消息选择合适的分区partition,默认使用轮询,可实现Partitioner接口的partition方法自定义分区
(4)消息会根据partition发送到不同的暂存区暂存
(5)后台消息发送线程Sender,从暂存区中获取消息,发送给Broker
(6)如果消息成功发送到Broker,返回RecordMetadata对象,包含消息目标分区,offset和时间戳;如果发生失败,且设置了重试次数,进行重试,否则返回失败

  1. 创建配置对象Properties
    (1)Broker地址 bootstrap.servers (localhost:9101)
    (2)消息写入成功策略 acks all (0,Producer不等待确认;1,等待分区leader写入成功;all,等待isr所有副本写入成功)
    (3)重试次数 retries 0
    (4)缓存大小 batch.size 默认16kb,Producer会将消息缓存,当消息达到一定大小时,才一起发送
    (5)每次发送消息时,延迟一定时间 linger.ms 默认0
    (6)Producer可用内存大小 buffer.memory 默认32M
    (7)Key和Value的序列化方式

  2. 创建KafkaProducer对象
    Producer<String,String> producer = new KafkaProducer<>(props);

  3. 使用KafkaProducer对象的send(),发送1个代表消息的ProducerRecord对象。send()是异步的,返回1个放在Future中的RecordMetadata对象,如果使用Future.get()获取RecordMetadata对象,会阻塞直到有结果返回;可以在send()上设置回调方法Callback实现无阻塞

消费者

  1. 创建配置对象Properties
    (1)Broker地址 bootstrap.servers
    (2)组id group.id
    (3)开启自动提交 enable.auto.commit true
    (4)offset重置方式 auto.offset.reset 默认latest
    earliest 之前提交过offset,从offset开始消费;未提交过,从头开始消费
    latest 之前提交过offset,从offset开始消费;未提交过,等待分区产生新的消息,从新消息开始消费
    none 之前提交过offset,从offset开始消费;Consumer Group涉及的分区,有1个未提交过offset,报错
    (5)自动提交频率 auto.commit.interval.ms 默认5s
    (6)Key和Value的序列化方式

  2. 创建KafkaConsumer对象,订阅主题
    KafkaConsumer<String,String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList("sonia"));

  3. Kafka High-Level Consumer API在KafkaConsumer对象的poll()中封装了
    (1)Consumer Group管理,调用poll()时,获得coordinator和分区partition分布情况
    (2)获得分区重平衡后,Consumer和Partition的对应信息
    (3)向Broker发送心跳,否则超时Broker会认为Consumer死亡,Consumer消费的partition会被分配给其他的Consumer
    (4)获得消息
    使用时,将poll()放在while(true)循环中,不断执行poll(),不断的从Broker pull消息;同时向Broker发送心跳,维持与Broker之间的连接,实时感知分区再平衡后的Consumer与Partition的关系,并作出调整

[提交offset的问题]

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,884评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,347评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,435评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,509评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,611评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,837评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,987评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,730评论 0 267
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,194评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,525评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,664评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,334评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,944评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,764评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,997评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,389评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,554评论 2 349

推荐阅读更多精彩内容