Sink
writeAsText("file:///home/wangxiaotong/result");
结果:在对应工作节点的本地上,生成了result文件保存数据。
state
链接:https://zhuanlan.zhihu.com/p/29003852
- Keyed State
!!!keystream!!!
中才能,而且是与key用关。可以当成是operator state按照key划分了,每个key对应一个state-partition。 - Operator State
!!!nokeystream!!!
每个operator state对应一个并行度的实例。 - Managed State:flink里封装好的数据结构,比如“ValueState”, “ListState”,等,他们会被编码到checkpoint里
=
flink中的state可以从2个纬度来划分:是否属于某个key(key state或者operator state),是否受flink管理(raw state或者managed state)。key state用于在KeyedStream中保存状态,operater state用于在普通的非key中保存状态。managed state是指被flink所管理的状态。raw state是被应用程序自己管理,flink会调用相应的接口方法来实现状态的restore和snapshot。
Using Managed Keyed State
- ValueState<T>:使用update(T) 来set,使用T value()来取回
- ListState<T>:add(T) or addAll(List<T>)来添加元素,使用 Iterable<T> get()来获取元素, 使用update(List<T>)来重写已经存在的list
- ReducingState<T>: 维持一个聚合结果的值,使用 add(T) 来加入元素,并且使用ReduceFunction函数来reduce出来聚合结果
- AggregatingState<IN, OUT>: 维持一个聚合结果的值,和3不同的是,这里聚合的类型可以是不同的元素类型,使用 add(IN)来加入元素,并且使用AggregateFunction函数来aggregated 出来聚合结果
- FoldingState<T, ACC>: flink 1.4之后将会被弃用,推荐使用AggregatingState 来代替
- MapState<UK, UV>: 使用map存储key-value对,通过 put(UK, UV) or putAll(Map<UK, UV>)来添加,使用 get(UK)来获取。
- Raw State:算子自己的数据结构,他们会被编写成一系列的bytes写入checkpoint里
所有的datastream function可以使用managed state,但是raw state需要通过接口。更推荐managed state。
StateDescriptor:handle state
有a ValueStateDescriptor, a ListStateDescriptor, a ReducingStateDescriptor, a FoldingStateDescriptor or a MapStateDescriptor.
获取state:
- ValueState<T> getState(ValueStateDescriptor<T>)
- ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
- ListState<T> getListState(ListStateDescriptor<T>)
- AggregatingState<IN, OUT> -
getAggregatingState(AggregatingState<IN, OUT>) - FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)
- MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)
state的存在时间:State Time-To-Live (TTL)
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);
过期的state清除:只能通过显示的调用read清除,比如调用 ValueState.value()。这意味着默认情况下,如果未读取过期状态,则不会删除它,可能会导致状态不断增长。此外,您可以在获取 the full state snapshot 时激活 cleanup ,这将减小其大小。
在当前实现下不会清除本地状态,但在从上一个快照恢复的情况下,它不会包括已删除的过期状态。 它可以在StateTtlConfig中配置:
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupFullSnapshot()
.build();
note:This option is not applicable for the incremental checkpointing in the RocksDB state backend.
Using Managed Operator State
只能使用两种方法:第一种,通过CheckpointedFunction接口,支持两种分配策略。第二种,通过ListCheckpointed <T extends Serializable>接口,且 list-style state 和even-split redistribution
(事务切分的重分配策略),如果状态是re-partitionable,你可以使用Collections.singletonList(MY_STATE)。都当前只支持ListState 的状态。
分配策略
:
- Even-split redistribution:
在restore/redistribution阶段的时候,在多并行度的情况下,整个list state会切分成多个sublists。ListCheckpointed接口只支持这个。 - Union redistribution:
在restore/redistribution阶段的时候,每个operator会得到完整的list state。
CheckpointedFunction接口可以选择两种,通过
checkpointedState = context.getOperatorStateStore().getUnionListState(descriptor);
checkpointedState = context.getOperatorStateStore().getListState(descriptor);
checkpointfunction:
void snapshotState(FunctionSnapshotContext context) throws Exception;
void initializeState(FunctionInitializationContext context) throws Exception;
- 初始化或者recovery的时候调用initializeState()
- 例子:
public class BufferingSink
implements SinkFunction<Tuple2<String, Integer>>,
CheckpointedFunction {
private final int threshold;
private transient ListState<Tuple2<String, Integer>> checkpointedState;
private List<Tuple2<String, Integer>> bufferedElements;
public BufferingSink(int threshold) {
this.threshold = threshold;
this.bufferedElements = new ArrayList<>();
}
@Override
public void invoke(Tuple2<String, Integer> value) throws Exception {
bufferedElements.add(value);
if (bufferedElements.size() == threshold) {
for (Tuple2<String, Integer> element: bufferedElements) {
// send it to the sink
}
bufferedElements.clear();
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
checkpointedState.clear();
for (Tuple2<String, Integer> element : bufferedElements) {
checkpointedState.add(element);
}
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ListStateDescriptor<Tuple2<String, Integer>> descriptor =
new ListStateDescriptor<>(
"buffered-elements",
TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
checkpointedState = context.getOperatorStateStore().getListState(descriptor);
if (context.isRestored()) {
for (Tuple2<String, Integer> element : checkpointedState.get()) {
bufferedElements.add(element);
}
}
}
}
故障恢复的时候,调用 isRestored()来查看当前的状态是不是恢复状态,如果是的话,restore的逻辑将会被应用。
单个状态的保存可以参考Stateful Source Functions(https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/state.html#stateful-source-functions)的offset的保存。
Stateful Source Functions
为了保证r exactly-once semantics on failure/recovery。用户需要使用一个lock。
public static class CounterSource
extends RichParallelSourceFunction<Long>
implements ListCheckpointed<Long> {
/** current offset for exactly once semantics */
private Long offset;
/** flag for job cancellation */
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<Long> ctx) {
final Object lock = ctx.getCheckpointLock();
while (isRunning) {
// output and state update are atomic
synchronized (lock) {
ctx.collect(offset);
offset += 1;
}
}
}
@Override
public void cancel() {
isRunning = false;
}
@Override
public List<Long> snapshotState(long checkpointId, long checkpointTimestamp) {
return Collections.singletonList(offset);
}
@Override
public void restoreState(List<Long> state) {
for (Long s : state)
offset = s;
}
}
The Broadcast State Pattern
- it has a map format,
- it is only available to specific operators that have as inputs a broadcasted stream and a non-broadcasted one, and
- such an operator can have multiple broadcast states with different names.
BroadcastProcessFunction and KeyedBroadcastProcessFunction
public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {
public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;
public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
}
public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> {
public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;
public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception;
}
processElement用来处理non-broadcasted数据流,processBroadcastElement用来处理broadcasted 流。
并且processBroadcastElement用的时context,具有以下功能:
- give access to the broadcast state: ctx.getBroadcastState(MapStateDescriptor<K, V> stateDescriptor)
- allow to query the timestamp of the element: ctx.timestamp(),
- get the current watermark: ctx.currentWatermark()
- get the current processing time: ctx.currentProcessingTime(), and
- emit elements to side-outputs: ctx.output(OutputTag<X> outputTag, X value).
Checkpointing
启动:StreamExecutionEnvironment的 enableCheckpointing(n) ,n是checkpoint的间隔
参数:
- exactly-once vs. at-least-once:后者更适用于低延迟应用
- checkpoint timeout: 超时的话,这个checkpoint进程将会被终止
- minimum time between checkpoints: 表示新的checkpoint将会在上个完成的几秒内开始,比如设置5000,那么上个checkpoint完成的时候,下一个新checkpoint将会在5秒内开始。不受duration and the checkpoint interval的影响。
注意:间隔需要设置比这个参数大。这个值也意味着并发检查点的数量是1。
- number of concurrent checkpoints: 默认,只有一个chekpoint在处理。
注意:这个参数和上面的最小时间不能同时设定并存。
- externalized checkpoints: 定期把checkpoint持久化在外部。外部化的检查点将它们的元数据写到持久存储中,并且当job失败时不会自动清理。这样,如果你的工作失败了,你将会有一个检查点。设置可参考: deployment notes on externalized checkpoints.
- fail/continue task on checkpoint errors: 默认是开启的。如果task在checkpoint的阶段失败的话,那么也会认为这个task失败了。可以设定不开启,那么会简单的拒绝checkpoint协调器,并且继续执行。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// start a checkpoint every 1000 ms
env.enableCheckpointing(1000);
// advanced options:
// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
Related Config Options
Selecting a State Backend
Flink会存储定时器的,stateful operator的state,还有connectors,window,用户自定义的state。默认,state存在taskmanager的内存里,checkpoint存在jobmanager的内存里。
通过参数设置:treamExecutionEnvironment.setStateBackend(…).
State Checkpoints in Iterative Jobs
迭代程序的checkpoint,需要启动特殊的标志force:
env.enableCheckpointing(interval, force = true).注意:故障的时候,loop的边内正处理的数据和修改的state将会丢失
Restart Strategies
可通过 flink-conf.yaml来配置重启策略,
如果没开启checkpoint机制,那么是no restart策略。
如果开启了checkpoint机制,但是没配置启动策略,那么默认启动 fixed-delay 策略,最多尝试Integer.MAX_VALUE次。
1:Fixed Delay ,故障重启后会尝试3次,每次间隔10s。
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // number of restart attempts
Time.of(10, TimeUnit.SECONDS) // delay
));
也可在flink-conf.yaml里面配置相关设置:尝试次数,间隔延时等。
2:Failure Rate,和上面的不同的是,如果在设定的时间区间内,超出了故障的次数,那么就会最终失败。
也可在函数内配置:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, // max failures per interval
Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate
Time.of(10, TimeUnit.SECONDS) // delay
));
3:No Restart
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.noRestart());
4:Fallback Restart Strategy
State Backends
可使用的存储:
- MemoryStateBackend
- FsStateBackend
- RocksDBStateBackend
默认MemoryStateBackend
MemoryStateBackend
能配置成异步快照,推荐使用异步快照,从而消除流水线的blocking,这是默认设置的。不想开启异步快照,这需要设置标志位,如:
new MemoryStateBackend(MAX_MEM_STATE_SIZE, false);
限制:
- 默认每个state的sizie是5MB,这个可以MemoryStateBackend的数据结构里配置增加。
- state不能比akka frame size大
- The aggregate state must fit into the JobManager memory.
适用:
- 本地部署和debugging
- 小状态的任务,比如record-at-a-time functions (Map, FlatMap, Filter, …),kafka的consumer要求非常小的状态
FsStateBackend
保持in-flight数据存到TaskManager的内存里。
Checkpoint的时候:把状态快照,然后写到配置好的文件系统和目录里(“hdfs://namenode:40010/flink/checkpoints” or “file:///data/flink/checkpoints”.
)。最小的元数据存到JobManager的内存里。默认是开启异步的。不想开启异步快照,这需要设置标志位,如:
new FsStateBackend(path, false);
适用:
- 大状态任务,长窗口,大键值状态
- 所有高可用的设置
RocksDBStateBackend
保持in-flight数据到RocksDB数据库,这个数据库存在TaskManager的数据目录下。
Checkpoint的时候:整个RocksDB数据库会checkpointed到配置好的文件系统和目录下(“hdfs://namenode:40010/flink/checkpoints” or “file:///data/flink/checkpoints”.
)。最小的元数据存到JobManager的内存里。
限制:
- RocksDB的接口是基于byte[],key的大小和值限制是2^31。
适用: - 非常大状态任务,长窗口,大键值状态
- 所有高可用的设置
Note:保持的state只是受限于磁盘的大小,比起保持state在内存中的FsStateBackend要大的多,但是他的最大吞吐量也会降低。
他也是唯一支持增量checkpoint的后端。
Configuring a State Backend
- 每个任务粒度的设置:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
- 或者在flink-conf.yaml里面配置所有Setting Default State Backend,关键字
state.backend.
, jobmanager (MemoryStateBackend), filesystem (FsStateBackend), rocksdb (RocksDBStateBackend),还有种org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory for RocksDBStateBackend.
比如:
# The backend that will be used to store operator state checkpoints
state.backend: filesystem
# Directory for storing checkpoints
state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints
需要在flink-config.yaml里面配置,在FWC.yaml配置不行
Custom Serialization for Managed State
某些特殊需求要 Managed State定制serialization逻辑。如果没需求,只用简单的序列化,这部分可以跳过。
Retained Checkpoints
默认情况下,checkpoint不会保留,仅用于从失败中恢复作业。 取消任务时会删除它们。 但是,您可以配置要保留的定期checkpoint。 根据配置,当作业失败或取消时,不会自动清除这些保留的checkpoint。 这样,如果您的工作失败,您将有一个checkpoint可以从中恢复。
CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
- ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION: 当job取消的时候保留checkpoint,需要手动去删除。 ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 当job取消的时候删除checkpoint,checkpoint只在job故障的时候有用。
flink团队的实验
https://data-artisans.com/blog/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink
状态实验:
https://github.com/StephanEwen/flink-demos/tree/master/streaming-state-machine
容错使用
checkpointing enabled
, and for end-to-end exactly once guarantees
, you need to havesources that support replay
and sinks that are either idempotent or transactional
.
in your case the first place to start might be to configure a restart strategy
https://stackoverflow.com/questions/43637190/does-flink-garauntee-tasks-fault-tolerance-in-all-cases
stateful wordcount
http://sinanbir.com/apache-flink-stateful-streaming-example/
ValueStateDescriptor
两种初始化:
public void processElement(Tuple2<String, Long> input,
ProcessFunction<Tuple2<String, Long>, Tuple2<String, Long>>.Context ctx,
Collector<Tuple2<String, Long>> out) throws Exception {
// TODO Auto-generated method stub
Long currentSum = sum.value();
if (currentSum == null) {
currentSum = 0L;
}
currentSum += input.f1;
sum.update(currentSum);
out.collect(new Tuple2<>(input.f0, currentSum));
}
@Override
public void open(Configuration config) {
ValueStateDescriptor<Long> descriptor =
new ValueStateDescriptor<>(
"wordsum", // the state name
Long.class); // default value of the state, if nothing was set
sum = getRuntimeContext().getState(descriptor);
}
@Override
public void open(Configuration config) {
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
new ValueStateDescriptor<>(
"wordsum", // the state name
Long.class, // type information
0L); // default value of the state, if nothing was set
sum = getRuntimeContext().getState(descriptor);
}
wordcount的窗口多种实现
第一种:使用apply
text.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up tuple field "1"
.keyBy(0)
.countWindow(11)
.apply (new WindowFunction<Tuple2<String,Integer>, Tuple2<String,Integer>, Tuple, GlobalWindow>() {
public void apply (Tuple tuple,
GlobalWindow window,
Iterable<Tuple2<String, Integer>> values,
Collector<Tuple2<String, Integer>> out) throws Exception {
int sum = 0;
for (Tuple2<String, Integer> t: values) {
sum += t.f1;
System.out.println("word"+t.f0);
}
out.collect (new Tuple2<String, Integer>( String.valueOf(window) ,sum));
}
});
第二种:使用reduce
text.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up tuple field "1"
.keyBy(0)
.countWindow(11)
.reduce(new ReduceFunction<Tuple2<String,Integer>>() {
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);
}
});
结论:apply和process是通用的窗口函数,reduce等是对sum等特殊情况的简单应用。而且apply这是ProcessWindowFunction的旧版本,它提供较少的context信息,并且没有一些高级功能,例如per-window keyed state。 此接口将在某个时候弃用。推荐使用process。
窗口和key的问题:
Applies the given window function to each window. The window function is called for each evaluation of the window for each key individually.
而且经过实践和提问,发现每个窗口函数处理的是按per-key
粒度的。
Generating Timestamps / Watermarks
只在event time的环境下有效。
timestamp和watermarks的生成有两种方式:
- 直接在source中生成,调用collectWithTimestamp和emitWatermark,注意这个方式会被第二种方式覆盖。
- Timestamp Assigners / Watermark Generators接口,通常在source之后调用,但是如果是map或者filter操作也可以不需要,但是如果是设计event time的操作,比如窗口,那么在这种算子前就得设定好timestamp assigner 。如果使用kafka source的话,flink在内部设定了接口。
设定好了2中timestamp assigner,区别是watermark的生成方式不同:
- With Periodic Watermarks
AssignerWithPeriodicWatermarks 采集时间戳,并且定期的发送watermark,通过ExecutionConfig.setAutoWatermarkInterval(...).设定发送间隔,单位毫秒。只会发送比之前大的watermark。提供的BoundedOutOfOrdernessTimestampExtractor 和BoundedOutOfOrdernessGenerator 相似,功能是提取时间戳,并且按延时容忍度设置了产生watermark的规则。 - With Punctuated Watermarks
watermarkAssignerWithPunctuatedWatermarks采集时间戳,并且按某些特殊的标记触发计算watermark。checkAndGetNextWatermark用来复制watermark。
注意:可以每个记录就产生一个watermark,但是每个watermark可能会导致下游的一些计算,所以过多的watermark会导致性能的下降。
Pre-defined Timestamp Extractors / Watermark Emitters
-
Assigners with ascending timestamps
如果kafka partition中的时间戳在每个partition中是严格递增的,那么可以直接可以使用定义好的ascending timestamps。
这个的watermark是 current timestamp,因为是严格增的时间戳,所以不会有更早的时间戳到达。
- Assigners allowing a fixed amount of lateness
设定一个延时容忍度,如果被认定为late tuple,那么默认是被忽略,丢弃不加入计算。
Working with window results
窗口计算后的timestamp应该怎么算呢?
这里建议:
This is set to the maximum allowed timestamp of the processed window, which is end timestamp - 1, since the window-end timestamp is exclusive. Note that this is true for both event-time windows and processing-time windows.
设定为最大末尾时间戳-1,这个设定对于event-time windows 和 processing-time windows都正确。对于process窗口来说,这个并没有什么影响,但是对于event窗口来说,这个对连续的窗口计算有用:比如下面的topK
REST API
获取latency:
- 获取能获取的metrics列表
http://10.11.6.70:8081/jobs/metrics
- 使用GET指令获取具体的值
http://10.11.6.70:8081/jobs/metrics?get=latency.source_id.cbc357ccb763df2852fee8c4fc7d55f2.operator_id.7df19f87deec5680128845fd9a6ca18d.operator_subtask_index.0.latency_median
并行度设置
每个operator最好<=核数*机器数目