从一例状态引发的性能问题谈Flink状态序列化

前言

好久不见(鞠躬

最近处在转型期,每天忙到飞起,关注具体技术细节的精力自然就比较少了(上一篇许下的周更承诺也食言了 = =)。上周帮助他人快速解决了一个因误用Flink状态类型引发的性能问题,在这里做个quick notes,并简要介绍一下Flink状态序列化方面的基础知识。

问题及排查

上游部门同事反馈,一个计算逻辑并不复杂的多流join DataStream API作业频繁发生消费积压、checkpoint失败(现场截图已丢失)。作业拓扑如下图所示。

为了脱敏所以缩得很小 = =

按大状态作业的pattern对集群参数进行调优,未果。

通过Flink Web UI定位到问题点位于拓扑中倒数第二个算子,部分sub-task checkpoint总是过不去。观察Metrics面板,发现有少量数据倾斜,而上下游反压度量值全部为0。

经过持续观察,存在倾斜的sub-task数据量最多只比其他sub-task多出10%~15%,按照常理不应引起如此严重的性能问题。遂找到对应的TaskManager pod打印火焰图,结果如下。

可见RocksDB状态读写的耗时极长,大部分时间花在了Kryo序列化上,说明状态内存储了Flink序列化框架原生不支持的对象。直接让相关研发同学show me the code,真相大白:

private transient MapState<String, HashSet<String>> state1;
private transient MapState<String, HashSet<String>> state2;
private transient ValueState<Map<String, String>> state3;

Flink序列化框架内并没有针对HashSet的序列化器,自然会fallback到Kryo。即使这些Set并不算大,状态操作的开销也会急剧上升。当然,ValueState<Map<String, String>>用法也是错误的,应改成MapState<String, String>

最快的临时解决方法很简单:把所有状态内用到的HashSet全部改成Map<String, Boolean>,同样可以去重。虽然并不优雅,但因为有了原生MapSerializer支持,效率大幅提升。下面简要介绍Flink的状态序列化。

TypeSerializer

在我们创建状态句柄所需的描述符StateDescriptor时,要指定状态数据的类型,如:

ValueStateDescriptor<Integer> stateDesc = new ValueStateDescriptor<>("myState", Integer.class);
ValueState<Integer> state = this.getRuntimeContext().getState(stateDesc);

与此同时,也就指定了对应数据类型的Serializer。我们知道,TypeSerializer是Flink Runtime序列化机制的底层抽象,状态数据的序列化也不例外。以处理Map类型的MapSerializer为例,代码如下,比较清晰。

@Internal
public final class MapSerializer<K, V> extends TypeSerializer<Map<K, V>> {

    private static final long serialVersionUID = -6885593032367050078L;

    /** The serializer for the keys in the map */
    private final TypeSerializer<K> keySerializer;

    /** The serializer for the values in the map */
    private final TypeSerializer<V> valueSerializer;

    /**
     * Creates a map serializer that uses the given serializers to serialize the key-value pairs in
     * the map.
     *
     * @param keySerializer The serializer for the keys in the map
     * @param valueSerializer The serializer for the values in the map
     */
    public MapSerializer(TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer) {
        this.keySerializer =
                Preconditions.checkNotNull(keySerializer, "The key serializer cannot be null");
        this.valueSerializer =
                Preconditions.checkNotNull(valueSerializer, "The value serializer cannot be null.");
    }

    // ------------------------------------------------------------------------
    //  MapSerializer specific properties
    // ------------------------------------------------------------------------

    public TypeSerializer<K> getKeySerializer() {
        return keySerializer;
    }

    public TypeSerializer<V> getValueSerializer() {
        return valueSerializer;
    }

    // ------------------------------------------------------------------------
    //  Type Serializer implementation
    // ------------------------------------------------------------------------

    @Override
    public boolean isImmutableType() {
        return false;
    }

    @Override
    public TypeSerializer<Map<K, V>> duplicate() {
        TypeSerializer<K> duplicateKeySerializer = keySerializer.duplicate();
        TypeSerializer<V> duplicateValueSerializer = valueSerializer.duplicate();

        return (duplicateKeySerializer == keySerializer)
                        && (duplicateValueSerializer == valueSerializer)
                ? this
                : new MapSerializer<>(duplicateKeySerializer, duplicateValueSerializer);
    }

    @Override
    public Map<K, V> createInstance() {
        return new HashMap<>();
    }

    @Override
    public Map<K, V> copy(Map<K, V> from) {
        Map<K, V> newMap = new HashMap<>(from.size());

        for (Map.Entry<K, V> entry : from.entrySet()) {
            K newKey = keySerializer.copy(entry.getKey());
            V newValue = entry.getValue() == null ? null : valueSerializer.copy(entry.getValue());

            newMap.put(newKey, newValue);
        }

        return newMap;
    }

    @Override
    public Map<K, V> copy(Map<K, V> from, Map<K, V> reuse) {
        return copy(from);
    }

    @Override
    public int getLength() {
        return -1; // var length
    }

    @Override
    public void serialize(Map<K, V> map, DataOutputView target) throws IOException {
        final int size = map.size();
        target.writeInt(size);

        for (Map.Entry<K, V> entry : map.entrySet()) {
            keySerializer.serialize(entry.getKey(), target);

            if (entry.getValue() == null) {
                target.writeBoolean(true);
            } else {
                target.writeBoolean(false);
                valueSerializer.serialize(entry.getValue(), target);
            }
        }
    }

    @Override
    public Map<K, V> deserialize(DataInputView source) throws IOException {
        final int size = source.readInt();

        final Map<K, V> map = new HashMap<>(size);
        for (int i = 0; i < size; ++i) {
            K key = keySerializer.deserialize(source);

            boolean isNull = source.readBoolean();
            V value = isNull ? null : valueSerializer.deserialize(source);

            map.put(key, value);
        }

        return map;
    }

    @Override
    public Map<K, V> deserialize(Map<K, V> reuse, DataInputView source) throws IOException {
        return deserialize(source);
    }

    @Override
    public void copy(DataInputView source, DataOutputView target) throws IOException {
        final int size = source.readInt();
        target.writeInt(size);

        for (int i = 0; i < size; ++i) {
            keySerializer.copy(source, target);

            boolean isNull = source.readBoolean();
            target.writeBoolean(isNull);

            if (!isNull) {
                valueSerializer.copy(source, target);
            }
        }
    }

    @Override
    public boolean equals(Object obj) {
        return obj == this
                || (obj != null
                        && obj.getClass() == getClass()
                        && keySerializer.equals(((MapSerializer<?, ?>) obj).getKeySerializer())
                        && valueSerializer.equals(
                                ((MapSerializer<?, ?>) obj).getValueSerializer()));
    }

    @Override
    public int hashCode() {
        return keySerializer.hashCode() * 31 + valueSerializer.hashCode();
    }

    // --------------------------------------------------------------------------------------------
    // Serializer configuration snapshotting
    // --------------------------------------------------------------------------------------------

    @Override
    public TypeSerializerSnapshot<Map<K, V>> snapshotConfiguration() {
        return new MapSerializerSnapshot<>(this);
    }
}

总结:

  • 序列化和反序列化本质上都是对MemorySegment的操作,通过DataOutputView写出二进制数据,通过DataInputView读入二进制数据;
  • 对于复合数据类型,也应嵌套定义并调用内部元素类型的TypeSerializer
  • 必须要有对应的TypeSerializerSnapshot。该组件定义了TypeSerializer本身及其所包含的元数据(即state schema)的序列化方式,这些信息会存储在快照中。可见,通过TypeSerializerSnapshot可以判断状态恢复时数据的兼容性,是Flink实现state schema evolution特性的关键所在。

TypeSerializerSnapshot

TypeSerializerSnapshot接口有以下几个重要的方法。注释写得很清晰,不再废话了(实际是因为懒而且累 = =

    /**
     * Returns the version of the current snapshot's written binary format.
     *
     * @return the version of the current snapshot's written binary format.
     */
    int getCurrentVersion();

    /**
     * Writes the serializer snapshot to the provided {@link DataOutputView}. The current version of
     * the written serializer snapshot's binary format is specified by the {@link
     * #getCurrentVersion()} method.
     *
     * @param out the {@link DataOutputView} to write the snapshot to.
     * @throws IOException Thrown if the snapshot data could not be written.
     * @see #writeVersionedSnapshot(DataOutputView, TypeSerializerSnapshot)
     */
    void writeSnapshot(DataOutputView out) throws IOException;

    /**
     * Reads the serializer snapshot from the provided {@link DataInputView}. The version of the
     * binary format that the serializer snapshot was written with is provided. This version can be
     * used to determine how the serializer snapshot should be read.
     *
     * @param readVersion version of the serializer snapshot's written binary format
     * @param in the {@link DataInputView} to read the snapshot from.
     * @param userCodeClassLoader the user code classloader
     * @throws IOException Thrown if the snapshot data could be read or parsed.
     * @see #readVersionedSnapshot(DataInputView, ClassLoader)
     */
    void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader)
            throws IOException;

    /**
     * Recreates a serializer instance from this snapshot. The returned serializer can be safely
     * used to read data written by the prior serializer (i.e., the serializer that created this
     * snapshot).
     *
     * @return a serializer instance restored from this serializer snapshot.
     */
    TypeSerializer<T> restoreSerializer();

    /**
     * Checks a new serializer's compatibility to read data written by the prior serializer.
     *
     * <p>When a checkpoint/savepoint is restored, this method checks whether the serialization
     * format of the data in the checkpoint/savepoint is compatible for the format of the serializer
     * used by the program that restores the checkpoint/savepoint. The outcome can be that the
     * serialization format is compatible, that the program's serializer needs to reconfigure itself
     * (meaning to incorporate some information from the TypeSerializerSnapshot to be compatible),
     * that the format is outright incompatible, or that a migration needed. In the latter case, the
     * TypeSerializerSnapshot produces a serializer to deserialize the data, and the restoring
     * program's serializer re-serializes the data, thus converting the format during the restore
     * operation.
     *
     * @param newSerializer the new serializer to check.
     * @return the serializer compatibility result.
     */
    TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(
            TypeSerializer<T> newSerializer);

特别注意,在状态恢复时,state schema的兼容性判断结果TypeSerializerSchemaCompatibility有4种:

  • COMPATIBLE_AS_IS:兼容,可以直接使用新Serializer;
  • COMPATIBLE_AFTER_MIGRATION:兼容,但需要用快照中的旧Serializer反序列化一遍数据,再将数据用新Serializer重新序列化。最常见的场景如状态POJO中增加或删除字段,详情可以参考PojoSerializerSnapshot类的相关代码;
  • COMPATIBLE_WITH_RECONFIGURED_SERIALIZER:兼容,但需要将新Serializer重新配置之后再使用。此类场景不太常见,举例如状态POJO的类继承关系发生变化;
  • INCOMPATIBLE:不兼容,无法恢复。例如,更改POJO中的一个简单类型字段的type(e.g. String → Integer),由于负责处理简单数据类型的SimpleTypeSerializerSnapshot不支持此类更改,就会抛出异常:
    @Override
    public TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(
            TypeSerializer<T> newSerializer) {

        return newSerializer.getClass() == serializerSupplier.get().getClass()
                ? TypeSerializerSchemaCompatibility.compatibleAsIs()
                : TypeSerializerSchemaCompatibility.incompatible();
    }

显然,对于复合类型(如List、Map),需要先判断外部容器Serializer的兼容性,再判断嵌套Serializer的兼容性。详情可以参考Flink内部专门为此定义的CompositeTypeSerializerSnapshot抽象类,该类比较复杂,在此按下不表。

The End

在一些特殊的场景下,我们需要自定义Serializers来实现更好的状态序列化(例如用RoaringBitmap代替Set在状态中进行高效的去重),今天时间已经很晚,暂时不给出具体实现了。关于自定义状态序列化器的更多细节,请看官参见官方文档<<Custom Serialization for Managed State>>一章。

晚安晚安。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,039评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,223评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,916评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,009评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,030评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,011评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,934评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,754评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,202评论 1 309
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,433评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,590评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,321评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,917评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,568评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,738评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,583评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,482评论 2 352

推荐阅读更多精彩内容