分组DataStream
首先可以通过keyBy(KeySelector) 方法将一个DataStream分组。KeySelector函数以一条记录为入参并返回该条记录的key.
// some ordinary POJO
public class WC {
public String word;
public int count;
public String getWord() { return word; }
}
DataStream<WC> words = // [...]
KeyedStream<WC> keyed = words
.keyBy(WC::getWord);
分组状态
通过Keyed State 接口可以获取到keyed stream的状态。state支持(但不限于)如下类型:
- ValueState<T>:支持更新和查询
- ListState<T>:支持更新,添加和查询操作
- ReducingState<T>:记录添加到state的元素的聚合结果,支持add操作,并根据定义好的ReduceFunction更新state值
- AggregatingState<IN, OUT>: 保存所有添加到state的值的聚合结果,与ReducingState区别是聚合结果的类型与添加元素的类型可以不一致
- MapState<UK, UV>
state通过RuntimeContext获得,因此只能在rich functions中使用。
public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
/**
* The ValueState handle. The first field is the count, the second field a running sum.
*/
private transient ValueState<Tuple2<Long, Long>> sum;
@Override
public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
// access the state value
Tuple2<Long, Long> currentSum = sum.value();
// update the count
currentSum.f0 += 1;
// add the second field of the input value
currentSum.f1 += input.f1;
// update the state
sum.update(currentSum);
// if the count reaches 2, emit the average and clear the state
if (currentSum.f0 >= 2) {
out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
sum.clear();
}
}
@Override
public void open(Configuration config) {
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
new ValueStateDescriptor<>(
"average", // the state name
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
sum = getRuntimeContext().getState(descriptor);
}
}
// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
.keyBy(value -> value.f0)
.flatMap(new CountWindowAverage())
.print();
// the printed output will be (1,4) and (1,5)
状态生存时间 Time-To-Live
通过StateTtlConfig可以定义state的生存时间,到期后自动清除。TTL是per key的。
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);
参数解释:
update type 定义什么时候刷新state ttl:
- StateTtlConfig.UpdateType.OnCreateAndWrite - only on creation and write access
- StateTtlConfig.UpdateType.OnReadAndWrite - also on read access
state visibility 定义是否返回过期的state: - StateTtlConfig.StateVisibility.NeverReturnExpired - expired value is never returned
- StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp - returned if still available
Notes:
- state backend会存储user value的修改时间
- 目前ttl仅支持processing time
清除过期state
默认state定期后台回收,但可以disable此设置。
import org.apache.flink.api.common.state.StateTtlConfig;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.disableCleanupInBackground()
.build();
目前,heap state backend依赖增量回收,RocksDB使用compaction filter 后台回收。
- 全快照回收
在snapshot前手动触发回收(但不适用于RocksDB state backend增量checkpointting的情况)。
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupFullSnapshot()
.build();
- 增量清理
每次处理或访问状态都都触发一次清理行为。storage backend为状态的所有条目维护一个懒加载的全局迭代器,每一次增量清理行为被触发,迭代器就向前检查并清除已经过期的状态。
import org.apache.flink.api.common.state.StateTtlConfig;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupIncrementally(10, true)
.build();
第一个参数指定每一次清理行为中需要检查的state条目的数量,通常是在每一次state访问行为之后;第二个参数指定是否在state处理行为之后也触发清理。默认是只在state访问后检查清理5条记录。即(5, false);
Notes:
- 如果一直不访问或处理state,则过期state会一直存在
- 增量清理会增大记录处理的时延
- 目前增量清理只适用于Heap State Backend
- 同步快照模式下,global iterator保留所有key的副本,且不支持并发修改,会增大内存使用。异步快照不存在这个问题
- RocksDB compaction清理
RocksDB 会定义执行compaction filter操作来合并state,减少存储。
import org.apache.flink.api.common.state.StateTtlConfig;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupInRocksdbCompactFilter(1000)
.build();
RocksDB根据当前时间戳检查过期状态,清理越频繁清理的速度就会越快,但compaction的性能也会越差,因为涉及到native 代码的JNI调用。默认每处理1000条记录清理一次。
Operator State
kafka connector就是operator state的一个实例,每一个parallel operator都各自记录消费的kafka partition的offset. operator state支持重分布。
广播state
broadcast state是operator state的一种。用来支持一个stream的记录需要广播到所有下游stream的情况。
使用Operator State
stateful function 需要实现CheckpointedFunction接口。
CheckpointFunction提供了访问非分组state的接口和不同的重分发机制。
void snapshotState(FunctionSnapshotContext context) throws Exception;
void (FunctionInitializationContext context) throws Exception;
snapshotState方法在checkpoint时被调用。initializeState在用户自定义函数初始化时调用(包括从上一个checkpoint恢复时)。
目前operator state支持list类型,list中的每个对象都可序列化,并彼此独立,因此能够支持重分发。
重分发机制:
- Even-split redistribution:每个operator都有一个state list, 所有的list组合在一起成为完整的state, 重分发时平均分配给并发的operator
- Union redistribution:每个operator都有一个state list, 所有的list组合在一起成为完整的state, 重分发时每个operator都得到完整的state
even-split redistribution的例子:
public class BufferingSink
implements SinkFunction<Tuple2<String, Integer>>,
CheckpointedFunction {
private final int threshold;
private transient ListState<Tuple2<String, Integer>> checkpointedState;
private List<Tuple2<String, Integer>> bufferedElements;
public BufferingSink(int threshold) {
this.threshold = threshold;
this.bufferedElements = new ArrayList<>();
}
@Override
public void invoke(Tuple2<String, Integer> value, Context contex) throws Exception {
bufferedElements.add(value);
if (bufferedElements.size() == threshold) {
for (Tuple2<String, Integer> element: bufferedElements) {
// send it to the sink
}
bufferedElements.clear();
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
checkpointedState.clear();
for (Tuple2<String, Integer> element : bufferedElements) {
checkpointedState.add(element);
}
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ListStateDescriptor<Tuple2<String, Integer>> descriptor =
new ListStateDescriptor<>(
"buffered-elements",
TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
checkpointedState = context.getOperatorStateStore().getListState(descriptor);
if (context.isRestored()) {
for (Tuple2<String, Integer> element : checkpointedState.get()) {
bufferedElements.add(element);
}
}
}
}
Stateful Source Function
为了保证状态更新和集合输出的原子性,我们需要获取到source context的锁:
public static class CounterSource
extends RichParallelSourceFunction<Long>
implements CheckpointedFunction {
/** current offset for exactly once semantics */
private Long offset = 0L;
/** flag for job cancellation */
private volatile boolean isRunning = true;
/** Our state object. */
private ListState<Long> state;
@Override
public void run(SourceContext<Long> ctx) {
final Object lock = ctx.getCheckpointLock();
while (isRunning) {
// output and state update are atomic
synchronized (lock) {
ctx.collect(offset);
offset += 1;
}
}
}
@Override
public void cancel() {
isRunning = false;
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
state = context.getOperatorStateStore().getListState(new ListStateDescriptor<>(
"state",
LongSerializer.INSTANCE));
// restore any state that we might already have to our fields, initialize state
// is also called in case of restore.
for (Long l : state.get()) {
offset = l;
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
state.clear();
state.add(offset);
}
}