Kafka Partition的消费分配策略

前言

之前的一篇文章中,笔者介绍了Kafka Consumer Group(消费者组)以及Rebalance(重平衡)的概念:

为了使得Consumer易于组织、可扩展以及更好地容错,Kafka将一个或多个Consumer组织为Consumer Group,即消费者组。Consumer Group的唯一标识就是group.id。Group内的所有Consumer共同消费已订阅的各个Topic的所有Partition,并且保证每个Partition只分配给该Group内的唯一一个Consumer……

Rebalance就是一个Consumer Group内的所有Consumer分配消费已订阅的Topic的各Partition的过程。

问题来了:Partition是按照什么规则分配给Group中的各个Consumer的呢?本文就来简单讲解一下Kafka的Partition分配策略,可以通过partition.asssignment.strategy参数进行配置。目前一共有3种:

  • org.apache.kafka.clients.consumer.RangeAssignor
  • org.apache.kafka.clients.consumer.RoundRobinAssignor
  • org.apache.kafka.clients.consumer.StickyAssignor

当然也支持自定义分配策略。下面逐个进行讲解。

RangeAssignor

RangeAssignor是默认的分配策略。

对于每个被订阅的Topic,设Consumer总数为c,Partition总数为p,那么RangeAssignor会根据p / c的结果得出一个区间值r,以及余数值p % c,记为m。将所有Consumer按照预设好的Member ID字典序排序,从第一个Consumer开始顺序分配,前m个Consumer分配连续的(r + 1)个Partition,后(c - m)个Consumer分配连续的r个Partition。也就是说,如果不能均分,那么排在前面的Consumer会被多分配1个Partition。

可见,这种策略是按照范围来尽量平均分配Partition的,所以得名RangeAssignor。下面举一个例子,如图所示。

图中有1个Topic,8个Partition,3个Consumer,最终分配的比例为3:3:2,大致是均匀的。

如果我们再多加两个Topic,每个Topic有2个Partition,分配结果又是如何呢?答案如图所示。

显然,c2完全没有分配到t2中的Partition,因为c0和c1按照字典序排在它前面,已经被优先分配到了。如果推广上述情况,很容易发现该策略无法保证平均分配,造成负载倾斜。当订阅了多个Topic时,尤其需要注意RangeAssignor的适用性。

RoundRobinAssignor

"round-robin"一词的含义是轮询。RoundRobinAssignor策略仍然会将所有Consumer按照预设好的Member ID字典序排序,同时也会将所有Topic中的所有Partition按照字典序排序(注意这点不同),再轮询进行分配。单纯文字描述可能不容易理解,下面还是用单个Topic的情况举例。

多个Topic的情况也类似,按列观察图下方的分配列表就可以看出轮询的风格。

由于分配时是按所有Partition来的,所以即使Topic之间Partition的数量是不平均的,分配结果也是基本平均的,克服了RangeAssignor的缺点。

但是,RoundRobinAssignor也并非十全十美的。由于一个Group内的Consumer可以订阅不同的Topic,所以仍然可能会产生倾斜。考虑一种情况:Topic t0、t1、t2分别有1、2、3个Partition,而Consumer c0订阅了t0,c1订阅了t0~t1,c2订阅了t0~t2,那么分配结果会如下图所示,显然不是最优的。

StickyAssignor

StickyAssignor是在Kafka 0.11版本引入的,它的实现比RangeAssignor和RoundRobinAssignor都要复杂得多(代码算上注释有将近1k行)。按照文档的描述,它的设计目的有二:

  • First, it guarantees an assignment that is as balanced as possible——Partition的分配尽量平均,这也是前两种分配策略努力要达到的目标;
  • Second, it preserved as many existing assignment as possible when a reassignment occurs——当Partition重新分配时,能够尽量保留上一次的分配,即尽量少将已经分配了的Partition分配给其他的Consumer,这也就是"sticky"(粘性)一词的含义。

如果两者发生冲突,则优先保证平均分配。StickyAssignor的流程也比较晦涩难懂,主流程位于其assign()方法中,逻辑简述如下,看官将就读读,也可以自行参考源码。

  1. 获取当前的分配方案currentAssignment。如果为空,说明是初次分配。
  2. 获取每个Partition可能分配给的Consumer的映射表,记为partition2AllPotentialConsumers;获取每个Consumer可能分配到的Partition列表,记为consumer2AllPotentialPartitions。
  3. 遍历各Consumer,如果Consumer还没有任何分配,将其加入currentAssignment集合,但对应的Partition列表仍为空。
  4. 创建currentPartitionConsumer映射表,该映射表用来记录当前哪个Partition已经分配给了哪个Consumer。
  5. 调用sortPartitions()方法对所有Partition进行排序,返回结果为sortedPartitions,分为两种情况:
    • 如果不是初次分配,并且每个Consumer订阅的Topic是相同的,那么就将Consumer按照已分配的Partition数量从高到低排序,将这些Partition按轮询的方式插入sortedPartitions,再将未被分配的Partition插入sortedPartitions;
    • 否则,就将Partition按照可能分配给的Consumer数量(即partition2AllPotentialConsumers)从低到高排序,将这些Partition插入sortedPartitions。
  6. 遍历currentAssignment,将已经分配了的Partition从sortedPartitions中移除,剩下的就是需要分配的Partition,记为unassignedPartitions。
  7. 调用balance()方法对currentAssignment进行平衡(当然也包含初次分配),几个要点如下:
    • 遍历unassignedPartitions,将未分配的Partition依次分配给订阅了对应Topic且拥有Partition最少的Consumer;
    • 通过partition2AllPotentialConsumers检查一个Partition是否可以被多于一个Consumer消费。如是,说明此Partition可以被重分配;
    • 检查Consumer已分配的Partition数是否超过了允许分配的最大数量(即consumer2AllPotentialPartitions的size),并且它分配的Partition是否可以被多于一个Consumer消费。如果以上两个条件都为否,说明该Consumer可以参与重分配,否则将该Consumer排除出去;
    • 根据平衡分数(balance score)进行实际的重分配动作。平衡分数定义为每两个Consumer之间Partition数差值的绝对值,每分配一个Partition都会更新此分数。当达到绝对均衡或者所有Partition都已经被分配时,流程结束。

按照上述流程来解决上一节RoundRobinAssignor倾斜的问题,分配结果如下图所示。

这种分配方式虽然不均衡,但已经是最优的。现假设Consumer c0下线,分配结果就会变成:

这样就保留了5个Partition的原始分配,只需要将t0/p0重新分配给c1即可,并且是均匀的。如果按照RoundRobinAssignor的逻辑做重分配,则是如下图所示。

这样只会保留3个Partition的原始分配,并且分配是不均匀的,可见StickyAssignor确实是更优的策略。

自定义分配策略

自定义分配策略可以通过继承PartitionAssignor接口或者AbstractPartitionAssignor抽象类来实现,后者相对比较简单。下面给出一个在Consumer之间随机分配Partition的示例,部分代码复用了RangeAssignor的实现,简单易懂。

public class RandomAssignor extends AbstractPartitionAssignor {
  @Override
  public String name() {
    return "random";
  }

  @Override
  public Map<String, List<TopicPartition>> assign(
    Map<String, Integer> partitionsPerTopic,
    Map<String, Subscription> subscriptions
  ) {
    Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions);
    Map<String, List<TopicPartition>> assignment = new HashMap<>();
    for (String memberId : subscriptions.keySet()) {
      assignment.put(memberId, new ArrayList<TopicPartition>());
    }

    for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) {
      String topic = topicEntry.getKey();
      List<String> consumersForTopic = topicEntry.getValue();

      Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
      if (numPartitionsForTopic == null)
        continue;

      for (TopicPartition partition : partitions(topic, numPartitionsForTopic)) {
        int rand = ThreadLocalRandom.current().nextInt(consumersForTopic.size());
        assignment.get(consumersForTopic.get(rand)).add(partition);
      }
    }

    return assignment;
  }

  private Map<String, List<String>> consumersPerTopic(Map<String, Subscription> consumerMetadata) {
    Map<String, List<String>> res = new HashMap<>();
    for (Map.Entry<String, Subscription> subscriptionEntry : consumerMetadata.entrySet()) {
      String consumerId = subscriptionEntry.getKey();
      for (String topic : subscriptionEntry.getValue().topics())
        put(res, topic, consumerId);
    }
    return res;
  }
}

注意RandomAssignor基本上不能用于生产环境,只是个示例而已。

The End

明天早起搬砖,民那晚安晚安。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,123评论 6 490
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,031评论 2 384
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 156,723评论 0 345
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,357评论 1 283
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,412评论 5 384
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,760评论 1 289
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,904评论 3 405
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,672评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,118评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,456评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,599评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,264评论 4 328
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,857评论 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,731评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,956评论 1 264
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,286评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,465评论 2 348