Flink 中的状态
- 算子状态 (Operator State)
- 键控状态 (Keded State)
- 状态后端 (State Nackends)
- 由一个任务维护,用来计算耨个结果的所有数据,都属于这个任务的状态
- 可以认为状态是一个本地变量
算子状态
- 算子状态的作用作用于当前算子任务,同一个子任务所有数据,都可以访问到相同的状态 ,状态对于同一个子任务是共享的
- 算子状态不能由另一个子任务访问
列表状态 - list state
将状态定义为一组数据的列表
联合列表状态 - union list state
也是将状态定义为列表,与列表状态不同的是,发生故障时或从保存点启动,如何恢复
广播状态
当前所有分区状态全都一样
下面做一个代码示例。使用 map 算子统计输入数据的数量
package com.lxs.flink.realtime.state;
import com.lxs.utils.KafkaUtils;
import javafx.collections.ListChangeListener;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.util.Collections;
import java.util.List;
/**
* User: lixinsong
* Date: 2021/1/20
* Description:
*/
public class OperatorStateTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<Tuple2<String, Long>> dataStream = env.addSource(KafkaUtils.initKafka("lxs")).map(new MapFunction<String, Tuple2<String, Long >>() {
@Override
public Tuple2<String, Long> map(String s) throws Exception {
String[] arr = s.split(",");
return Tuple2.of(arr[1], Long.parseLong(arr[2]));
}
});
// 定义一个有状态的 map操作 ,统计当前当前分区的数据个数
SingleOutputStreamOperator<Integer> map = dataStream.map(new MyCountMapper());
map.print("test aa");
env.execute(" test");
}
public static class MyCountMapper implements MapFunction<Tuple2<String, Long>, Integer>, ListCheckpointed<Integer> {
// 定义一个本地变量,作为算子状态
private Integer count = 0;
@Override
public Integer map(Tuple2<String, Long> s) throws Exception {
count ++;
return count;
}
@Override
public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
// 对状态做一个快照
return Collections.singletonList(count);
}
@Override
public void restoreState(List<Integer> state) throws Exception {
// 恢复数据
for (Integer s : state) {
count += s;
}
}
}
}
此段代码比较简单,不做过多描述,需要注意,实现了 ListCheckpointed
的接口是为了,保存状态和恢复状态的
键控状态
- 键控状态是根据输入数据流中定义的键来维护和访问的
- Flink 为每个 key 维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护和处理这个key对应的状态
- 当任务处理一条数据时,他会自动将状态的访问范围限定为当前数据的 key
值状态 (Vlaue state)
将状态表示为单个的值
代码示例, 依然是最简单的wordCount, 这次在map中实现
package com.lxs.flink.realtime.state;
import com.lxs.utils.KafkaUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
/**
* User: lixinsong
* Date: 2021/1/21
* Description:
*/
public class KeyedStateTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 输入数据 word Count 示例
DataStream<Tuple2<String, Long>> dataStream = env.addSource(KafkaUtils.initKafka("lxs")).flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Long>> collector) throws Exception {
String[] arr = s.split(",");
for(String a : arr) {
collector.collect(Tuple2.of(a, 1L));
}
}
});
// 定义一个有状态的 map操作 ,统计当前当前课程的购买次数
DataStream<Tuple2<String, Long>> map = dataStream.keyBy(0).map(new MyCountMapper1());
map.print("test aa");
env.execute(" test");
}
public static class MyCountMapper1 extends RichMapFunction<Tuple2<String, Long>, Tuple2<String, Long>> {
private ValueState<Integer> keyCountState;
@Override
public void open(Configuration parameters) throws Exception {
keyCountState = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("key-count", Integer.class));
}
@Override
public void close() throws Exception {
super.close();
}
@Override
public Tuple2<String, Long> map(Tuple2<String, Long> s) throws Exception {
Integer count = keyCountState.value(); // 获取这个key的state
if (Objects.isNull(count)) {
count = 0;
}
count ++; // 次数 + 1
keyCountState.update(count); // 更新状态
return Tuple2.of(s.f0, (long)count);
}
}
}
测试结果
test aa> (li,1)
test aa> (xin,1)
test aa> (xin,2)
test aa> (song,1)
这次我们的wordcount 没有使用类似 sum()
,这样的聚合算子,只是在计算的时候,记录了每个key的中间值,每次累加,主要注意,这里一定要继承 RichMapFunction
, 只有这样才可以使用 运行时上下文
列表状态
将状态表示为一组数据的列表
有时候我们需要一个列表来储存状态,代码其实和上面的值类型相似,不在说具体场景,介绍下代码如何使用
public static class MyCountMapper1 extends RichMapFunction<Tuple2<String, Long>, Tuple2<String, Long>> {
// 值类型状态
// private ValueState<Integer> keyCountState;
// 列表类型
private ListState<Integer> listState;
@Override
public void open(Configuration parameters) throws Exception {
// keyCountState = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("key-count", Integer.class));
listState = getRuntimeContext().getListState(new ListStateDescriptor<Integer>("my-list", Integer.class));
}
@Override
public void close() throws Exception {
super.close();
}
@Override
public Tuple2<String, Long> map(Tuple2<String, Long> s) throws Exception {
// Integer count = keyCountState.value(); // 获取这个key的state
// if (Objects.isNull(count)) {
// count = 0;
// }
// count ++; // 次数 + 1
// keyCountState.update(count); // 更新状态
// return Tuple2.of(s.f0, (long)count);
Iterable<Integer> integers = listState.get(); // 获取状态
listState.add(1); // 追加操作
listState.update(Lists.newArrayList()); // 更新操作
return null;
}
}
此段代码只是演示,没有实际意义
映射状态
将状态表示为一组 key -value 对
其实与上面的类似,不在介绍
聚合状态 (Reducing state & Aggregating state)
将状态表示为一个用于聚合操作的列表 ,不在详细介绍
状态后端 (State Backends)
状态的存储、访问以及维护,由一个可插入的组件决定,这个组件就是 状态后端
状态后端主要负责: 本地的状态管理,将检查点(checkpoint)的状态写入远程存储
状态后端类型
MemoryStateBackend
内存级的状态后端,会将键控状态作为内存中的对象进行管理
FsStateBackend
将 checkpoint
存到远侧还能管的持久化文件系统,而本地状态和 MemoryStateBackend
一致
RocksDBStatebackend
将所有状态序列化后,存入本地的RocksDB 中存储
编程使用
在初始化话环境是,手动设定
env.setStateBackend(new MemoryStateBackend())