Flink状态的缩放(rescale)与键组(Key Group)设计

前言

在之前那篇讲解Flink Timer的文章里,我曾经用三言两语简单解释了Key Group和KeyGroupRange的概念。实际上,Key Group是Flink状态机制中的一个重要设计,值得专门探究一下。本文先介绍Flink状态的理念,再经由状态——主要是Keyed State——的缩放(rescale)引出KeyGroup的细节。

再认识Flink状态

自从开始写关于Flink的东西以来,“状态”这个词被提过不下百次,却从来没有统一的定义。Flink官方博客中给出的一种定义如下:

When it comes to stateful stream processing, state comprises of the information that an application or stream processing engine will remember across events and streams as more realtime (unbounded) and/or offline (bounded) data flow through the system.

根据这句话,状态就是流处理过程中需要“记住”的那些数据的快照。而这些数据既可以包括业务数据,也可以包括元数据(例如Kafka Consumer的offset)。以最常用也是最可靠的RocksDB状态后端为例,状态数据的流动可以抽象为3层,如下图所示。

用户代码产生的状态实时地存储在本地文件中,并且随着Checkpoint的周期异步地同步到远端的可靠分布式文件系统(如HDFS)。这样就保证了100%本地性,各个Sub-Task只需要负责自己所属的那部分状态,不需要通过网络互相传输状态数据,也不需要频繁地读写HDFS,减少了开销。在Flink作业重启时,从HDFS取回状态数据到本地,即可恢复现场。

我们已经知道Flink的状态分为两类:Keyed State和Operator State。前者与每个键相关联,后者与每个算子的并行实例(即Sub-Task)相关联。下面来看看Keyed State的缩放。

Keyed State的缩放

所谓缩放,在Flink中就是指改变算子的并行度。Flink是不支持动态改变并行度的,必须先停止作业,修改并行度之后再从Savepoint恢复。如果没有状态,那么不管scale-in还是scale-out都非常简单,只要做好数据流的重新分配就行,如下图的例子所示。

可是如果考虑状态的话,就没有那么简单了:并行度改变之后,HDFS里的状态数据该按何种规则取回给新作业里的各个Sub-Task?下图示出了这种困局。

按照最naive的思路考虑,Flink中的key是按照hash(key) % parallelism的规则分配到各个Sub-Task上去的,那么我们可以在缩放完成后,根据新分配的key集合从HDFS直接取回对应的Keyed State数据。下图示出并行度从3增加到4后,Keyed State中各个key的重新分配。

在Checkpoint发生时,状态数据是顺序写入文件系统的。但从上图可以看出,从状态恢复时是随机读的,效率非常低下。并且缩放之后各Sub-Task处理的key有可能大多都不是缩放之前的那些key,无形中降低了本地性。为了解决这两个问题,在FLINK-3755对Keyed State专门引入了Key Group,下面具体看看。

引入Key Group

如果看官有仔细读Flink官方文档的话,可能对这个概念已经不陌生了,原话抄录如下:

Keyed State is further organized into so-called Key Groups. Key Groups are the atomic unit by which Flink can redistribute Keyed State; there are exactly as many Key Groups as the defined maximum parallelism. During execution each parallel instance of a keyed operator works with the keys for one or more Key Groups.

翻译一下,Key Group是Keyed State分配的原子单位,且Flink作业内Key Group的数量与最大并行度相同,也就是说Key Group的索引位于[0, maxParallelism - 1]的区间内。每个Sub-Task都会处理一个到多个Key Group,在源码中,以KeyGroupRange数据结构来表示。

KeyGroupRange的逻辑相对简单,部分源码如下。注意startKeyGroup和endKeyGroup实际上指的是Key Group的索引,并且是闭区间。

public class KeyGroupRange implements KeyGroupsList, Serializable {
    private static final long serialVersionUID = 4869121477592070607L;
    public static final KeyGroupRange EMPTY_KEY_GROUP_RANGE = new KeyGroupRange();

    private final int startKeyGroup;
    private final int endKeyGroup;

    private KeyGroupRange() {
        this.startKeyGroup = 0;
        this.endKeyGroup = -1;
    }

    public KeyGroupRange(int startKeyGroup, int endKeyGroup) {
        this.startKeyGroup = startKeyGroup;
        this.endKeyGroup = endKeyGroup;
    }

    @Override
    public boolean contains(int keyGroup) {
        return keyGroup >= startKeyGroup && keyGroup <= endKeyGroup;
    }

    public KeyGroupRange getIntersection(KeyGroupRange other) {
        int start = Math.max(startKeyGroup, other.startKeyGroup);
        int end = Math.min(endKeyGroup, other.endKeyGroup);
        return start <= end ? new KeyGroupRange(start, end) : EMPTY_KEY_GROUP_RANGE;
    }

    public int getNumberOfKeyGroups() {
        return 1 + endKeyGroup - startKeyGroup;
    }

    public int getStartKeyGroup() {
        return startKeyGroup;
    }

    public int getEndKeyGroup() {
        return endKeyGroup;
    }

    @Override
    public int getKeyGroupId(int idx) {
        if (idx < 0 || idx > getNumberOfKeyGroups()) {
            throw new IndexOutOfBoundsException("Key group index out of bounds: " + idx);
        }
        return startKeyGroup + idx;
    }

    public static KeyGroupRange of(int startKeyGroup, int endKeyGroup) {
        return startKeyGroup <= endKeyGroup ? new KeyGroupRange(startKeyGroup, endKeyGroup) : EMPTY_KEY_GROUP_RANGE;
    }
}

我们还有两个问题需要解决:

  • 如何决定一个key该分配到哪个Key Group中?
  • 如何决定一个Sub-Task该处理哪些Key Group(即对应的KeyGroupRange)?

第一个问题,相关方法位于KeyGroupRangeAssignment类:

    public static int assignToKeyGroup(Object key, int maxParallelism) {
        return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
    }

    public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {
        return MathUtils.murmurHash(keyHash) % maxParallelism;
    }

可见是对key进行两重哈希(一次取hashCode,一次做MurmurHash)之后,再对最大并行度取余,得到Key Group的索引。

第二个问题,仍然在上述类中的computeKeyGroupRangeForOperatorIndex()方法,源码如下。

    public static KeyGroupRange computeKeyGroupRangeForOperatorIndex(
        int maxParallelism,
        int parallelism,
        int operatorIndex) {

        checkParallelismPreconditions(parallelism);
        checkParallelismPreconditions(maxParallelism);

        Preconditions.checkArgument(maxParallelism >= parallelism,
            "Maximum parallelism must not be smaller than parallelism.");

        int start = ((operatorIndex * maxParallelism + parallelism - 1) / parallelism);
        int end = ((operatorIndex + 1) * maxParallelism - 1) / parallelism;
        return new KeyGroupRange(start, end);
    }

可见是由并行度、最大并行度和算子实例(即Sub-Task)的ID共同决定的。根据Key Group的逻辑,上一节中Keyed State重分配的场景就会变成下图所示(设最大并行度为10)。

很明显,将Key Group作为Keyed State的基本分配单元之后,上文所述本地性差和随机读的问题都部分得到了解决。当然还要注意,最大并行度对Key Group分配的影响是显而易见的,因此不要随意修改最大并行度的值。Flink内部确定默认最大并行度的逻辑如下代码所示。

    public static int computeDefaultMaxParallelism(int operatorParallelism) {
        checkParallelismPreconditions(operatorParallelism);
        return Math.min(
                Math.max(
                        MathUtils.roundUpToPowerOfTwo(operatorParallelism + (operatorParallelism / 2)),
                        DEFAULT_LOWER_BOUND_MAX_PARALLELISM),
                UPPER_BOUND_MAX_PARALLELISM);
    }

其中,下限值DEFAULT_LOWER_BOUND_MAX_PARALLELISM为128,上限值UPPER_BOUND_MAX_PARALLELISM为32768。

The End

准备开启远程办公模式。希望疫情快点好转。

民那晚安。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,657评论 6 505
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,889评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,057评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,509评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,562评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,443评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,251评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,129评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,561评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,779评论 3 335
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,902评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,621评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,220评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,838评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,971评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,025评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,843评论 2 354