Working with State

原文链接


Keyed State and Operator State

在Flink中有两种基本类型的状态:Keyed State and Operator State。

Keyed State

Keyed State总是和keys相关,并且只能用于KeyedStream上的函数和操作。
你可以将Keyed State认为是已经被分段或分区的Operator State,每个key都有且仅有一个state-partition。每个keyed-state逻辑上绑定到一个唯一的<parallel-operator-instance, key>组合上,并且由于每个key“属于”keyed operator的一个并行实例,所以我们可以简单的认为是<operator,key>。
Keyed State进一步被组织到所谓的Key Groups中。Key Groups是Flink能够重新分配keyed State的原子单元。Key Groups的数量等于定义的最大并行度。在一个keyed operator的并行实例执行期间,它与一个或多个Key Groups配合工作。

Operator State

对于Operator State(或者non-keyed state),每个operator state绑定到一个并行operator实例上。在Flink中,Kafka Connector是一个使用Operator State的很好的例子。每个并行Kafka消费者实例维护一个主题分区和偏移的map作为它的Operator State。
当并行度被修改时,Operator State接口支持在并行operator实例上重新分配状态。进行这种重新分配可以有不同的方案。

Raw and Managed State

Keyed StateOperator State 有两种形式: managedraw
Managed State表示数据结构由Flink runtime控制,例如内部哈希表,或者RocksDB。例如,“ValueState”,“ListState”等等。Flink的runtime层编码State并将其写入checkpoint中。
Raw State是operator保存在它的数据结构中的state。当进行checkpoint时,它只写入字节序列到checkpoint中。Flink并不知道状态的数据结构,并且只能看到raw字节。
所有的数据流函数都可以使用managed state,但是raw state接口只可以在操作符的实现类中使用。推荐使用managed state(而不是raw state),因为使用managed state,当并行度变化时,Flink可以自动的重新分布状态,并且可以做更好的内存管理。
注意 如果你的managed state需要自定义序列化逻辑,请参见managed state的自定义序列化以确保未来的兼容性。Flink默认的序列化不需要特殊处理。

使用Managed Keyed State

managed keyed state接口提供了对当前输入元素的key的不同类型的状态的访问。这意味着这种类型的状态只能在KeyedStream中使用,它可以通过stream.keyBy(...)创建。
现在,我们首先看下不同类型的状态,然后展示如何在程序中使用它们。可用的状态原语是:

  • ValueState<T>:它会保存一个可以被更新和查询的值(限于上面提到的输入元素的key,因此操作看到的每个key可能都是同一个值)。可是使用update(T) 和 T value() 更新和查询值。
  • ListState<T>: 它保存了一个元素列表。你可以添加元素和检索Iterable来获取所有当前存储的元素。添加元素使用add(T)方法,获取Iterable使用Iterable<T> get()方法。
  • ReducingState<T>: 它保存了一个聚合了所有添加到这个状态的值的结果。接口和ListState相同,但是使用add(T)方法本质是使用指定ReduceFunction的聚合行为。
  • AggregatingState<IN, OUT>: 它保存了一个聚合了所有添加到这个状态的值的结果。与ReducingState想反,聚合类型可能不同于添加到状态的元素的类型。接口和ListState相同,但是使用add(IN)添加的元素通过使用指定的AggregateFunction进行聚合。
  • FoldingState<T, ACC>:它保存了一个聚合了所有添加到这个状态的值的结果。与ReducingState想反,聚合类型可能不同于添加到状态的元素的类型。接口和ListState相同,但是使用add(IN)添加的元素通过使用指定的FoldFunction折叠进行聚合。
  • MapState<UK, UV>:它保存了一个映射列表。你可以将key-value对放入状态中,并通过Iterable检索所有当前存储的映射关系。使用put(UK, UV) 或 putAll(Map<UK, UV>)添加映射关系。使用get(UK)获取key相关的value。分别使用entries(), keys() 和 values() 获取映射关系,key和value的视图。
    所有类型的状态都有一个clear()方法,它清除当前活跃key(即输入元素的key)的状态。
    注意 FoldingState 和 FoldingStateDescriptor在Flink1.4中已经被废弃,并且可能在将来完全删除。请使用AggregatingState和 AggregatingStateDescriptor替代。
    首先需要记住的是这些状态对象只能用来与状态进行交互。状态不一定存储在内存中,但是可能存储在磁盘或者其他地方。第二个需要记住的是,从状态获取的值依赖于输入元素的key。因此如果包含不同的key,那么在你的用户函数中的一个调用获得的值和另一个调用获得值可能不同。
    为了获得状态句柄,必须创建一个StateDescriptor。它维护了状态的名称(稍后将看到,你可以创建多个状态,并且他们必须有唯一的名称,以便你可以引用它们),状态维护的值的类型,和可能用户指定的function,例如ReduceFunction。根据你想要查询的状态的类型,你可以创建ValueStateDescriptor,ListStateDescriptor,ReducingStateDescriptor,FoldingStateDescriptor或MapStateDescriptor。
    使用RuntimeContext访问状态,因此它只有在rich function中才可以使用。rich function的相关信息请看这里,但是我们也很快会看到一个示例。RichFunction中,RuntimeContext有这些访问状态的方法:
  • ValueState<T> getState(ValueStateDescriptor<T>)
  • ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
  • ListState<T> getListState(ListStateDescriptor<T>)
  • AggregatingState<IN, OUT> getAggregatingState(AggregatingState<IN, OUT>)
  • FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)
  • MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)

这是一个显示了所有部分如何组合在一起的FlatMapFunction示例:

public class CountWindowAverage extends 
RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

    /**
     * The ValueState handle. The first field is the count, the second field a running sum.
     */
    private transient ValueState<Tuple2<Long, Long>> sum;

    @Override
    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {

        // access the state value
        Tuple2<Long, Long> currentSum = sum.value();

        // update the count
        currentSum.f0 += 1;

        // add the second field of the input value
        currentSum.f1 += input.f1;

        // update the state
        sum.update(currentSum);

        // if the count reaches 2, emit the average and clear the state
        if (currentSum.f0 >= 2) {
            out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
            sum.clear();
        }
    }

    @Override
    public void open(Configuration config) {
        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                new ValueStateDescriptor<>(
                        "average", // the state name
                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
                        Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
        sum = getRuntimeContext().getState(descriptor);
    }
}

// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
    .keyBy(0)
    .flatMap(new CountWindowAverage())
    .print();

// the printed output will be (1,4) and (1,5)

这个例子实现了一个计数窗口。我们以元组的第一个属性为key(在示例中都有相同的key 1)。该函数存储计数器和一个累加和到ValueState中。一旦计数器达到2,它会发出平均值并且清空状态以便重新从0开始。注意,如果我们在元组的第一个属性中有不同的值,那么将为每个不同的输入key保留不同的状态值。

State in the Scala DataStream API

除了上面描述的接口,Scala API在KeyedStream上为使用单个ValueState的有状态的map() 或 flatMap() 函数提供了快捷方式。用户函数在Option中获取ValueState的当前值,并且必须返回一个更新后的值,该值将用于更新状态。

val stream: DataStream[(String, Int)] = ...

val counts: DataStream[(String, Int)] = stream
  .keyBy(_._1)
  .mapWithState((in: (String, Int), count: Option[Int]) =>
    count match {
      case Some(c) => ( (in._1, c), Some(c + in._2) )
      case None => ( (in._1, 0), Some(in._2) )
    })

Using Managed Operator State

为了使用managed operator state,有状态的函数可以实现更通用的CheckpointedFunction接口,或者ListCheckpointed<T extends Serializable>接口。

CheckpointedFunction

CheckpointedFunction接口提供了访问具备不同的重新分配策略的非keyed状态。它需要方式的实现:

void snapshotState(FunctionSnapshotContext context) throws Exception;

void initializeState(FunctionInitializationContext context) throws Exception;

每当要执行checkpoint时,都会调用snapshotState()方法。对应的 initializeState()在每次用户定义的函数初始化时调用,即函数第一次初始化或者函数从较早的checkpoint恢复时。因此initializeState()不仅是不同类型的状态初始化的地方,也是包含恢复逻辑的地方。
目前,支持列表风格的managed操作符状态。状态期望是一个可序列化对象的列表,每个元素都是独立的,因此可以在弹性扩容时重新分配。换句话说,这些对象是非keyed状态可重新分配的最佳粒度。根据状态访问方法,定义了下属重新分配方案:

  • Even-split redistribution: 每个操作符返回一个状态元素列表。完整的状态逻辑上是所有列表的连接。在恢复/重新分配时,列表被均匀的分成操作符并行度数量相同的子列表。每个操作符获得一个子列表,它可以是空的,或者包含一个或多个元素。例如,如果操作符的并行度为1,checkpoint包含元素element1和element2,当并行度增加到2时,element1可能分配到操作符实例0中,而element2分配到操作符实例1中。

  • Union redistribution:每个操作符返回一个状态元素列表。完整的状态逻辑上是所有列表的连接。在恢复/重新分配时,每个操作符获得状态元素的完整列表。
    下面有一个有状态的SinkFunction示例,它使用CheckpointedFunction来缓存将发送到外部世界的元素。它展示了基本的均匀在分配列表状态:

    public class BufferingSink
          implements SinkFunction<Tuple2<String, Integer>>,
                 CheckpointedFunction {
    
        private final int threshold;
    
        private transient ListState<Tuple2<String, Integer>> checkpointedState;
    
        private List<Tuple2<String, Integer>> bufferedElements;
    
        public BufferingSink(int threshold) {
            this.threshold = threshold;
            this.bufferedElements = new ArrayList<>();
        }
    
        @Override
        public void invoke(Tuple2<String, Integer> value) throws Exception {
            bufferedElements.add(value);
            if (bufferedElements.size() == threshold) {
                for (Tuple2<String, Integer> element: bufferedElements) {
                    // send it to the sink
                }
                bufferedElements.clear();
            }
        }
    
        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            checkpointedState.clear();
            for (Tuple2<String, Integer> element : bufferedElements) {
                checkpointedState.add(element);
            }
        }
    
        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            ListStateDescriptor<Tuple2<String, Integer>> descriptor =
                new ListStateDescriptor<>(
                    "buffered-elements",
                    TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));
    
            checkpointedState = context.getOperatorStateStore().getListState(descriptor);
    
            if (context.isRestored()) {
                for (Tuple2<String, Integer> element : checkpointedState.get()) {
                    bufferedElements.add(element);
                }
            }
        }
    }
    

initializeState方法接受FunctionInitializationContext作为参数。它用来初始化非keyed状态“容器”。上面是ListState类型的容器,当进行checkpoint时非keyed状态的对象存储在ListState中。
注意状态是如何初始化的,类似于keyed状态,有一个包含状态的名称和状态所持有的状态的信息的StateDescriptor:

ListStateDescriptor<Tuple2<String, Integer>> descriptor =
    new ListStateDescriptor<>(
        "buffered-elements",
        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));

checkpointedState = context.getOperatorStateStore().getListState(descriptor);

状态访问方法的命名约定包含它的状态结构的重新分配的模式。例如,在恢复时使用Union redistribution方案的list state,通过使用getUnionListState(descriptor)方法访问状态。如果方法名不包含重新分配模式,例如getListState(descriptor),它意味着重新分配方案使用基本的even-split redistribution。
初始化容器后,我们使用context的isRestored()方法来检查我们是否正在从故障中恢复。如果是true,也就是正在恢复中,则应用恢复逻辑。
就像BufferingSink代码中所示,在状态初始化时恢复的ListState保存在一个类变量中,以便snapshotState()中使用。ListState清除所有前一个checkpoint包含的所有对象,然后填充我们想要checkpoint的新对象。
另外,keyed状态也能在 initializeState() 方法中初始化。这通过使用提供的FunctionInitializationContext实现。

ListCheckpointed

ListCheckpointed接口是CheckpointedFunction的限制更严的变体,它只支持恢复时使用even-split redistribution方案的列表风格的状态。它也要求实现两个方法:

List<T> snapshotState(long checkpointId, long timestamp) throws Exception;

void restoreState(List<T> state) throws Exception;

在snapshotState()上操作符应该返回一个checkpoint的对象列表,并且恢复时restoreState必须处理这样一个列表。如果状态是不可分割的,你可以在snapshotState()上总是返回Collections.singletonList(MY_STATE)。

Stateful Source Functions

有状态的Source相比其它操作符需要关注多一点。为了保证状态和输出集合的更新是原子的(精确一次语义在故障/恢复时要求),用户要求从Source的context中获取锁。

public static class CounterSource
        extends RichParallelSourceFunction<Long>
        implements ListCheckpointed<Long> {

    /**  current offset for exactly once semantics */
    private Long offset;

    /** flag for job cancellation */
    private volatile boolean isRunning = true;

    @Override
    public void run(SourceContext<Long> ctx) {
        final Object lock = ctx.getCheckpointLock();

        while (isRunning) {
            // output and state update are atomic
            synchronized (lock) {
                ctx.collect(offset);
                offset += 1;
            }
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }

    @Override
    public List<Long> snapshotState(long checkpointId, long checkpointTimestamp) {
        return Collections.singletonList(offset);
    }

    @Override
    public void restoreState(List<Long> state) {
        for (Long s : state)
            offset = s;
    }
}

一些操作符当一个checkpoint被Flink完全确认时可能需要与外部世界通信。在这种情况下见org.apache.flink.runtime.state.CheckpointListener接口。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,817评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,329评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,354评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,498评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,600评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,829评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,979评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,722评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,189评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,519评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,654评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,329评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,940评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,762评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,993评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,382评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,543评论 2 349

推荐阅读更多精彩内容