Flink State 管理与恢复
概述:
Flink是一个默认就有状态的分析引擎,前面的WordCount案例可以做到单词的数量的累加,其实是因为在内存中保证了每个单词的
出现次数,这些数据其实就是状态数据,但是如果一个Task在处理过程挂掉了,那么它在内存中的状态都会丢失,所有的数据都需要重新
计算,从容错和消息处理的语义(At-least-once,和Exactly-once)上来说.Flink引入了State,和CheckPoint
State(状态):
State一般是指一个具体的Task/Operator的状态,State数据默认保存在Java的堆内存中
CheckPoint:
可以理解为CheckPoint是把State数据持久化存储了)则表示一个Flink Job在一个特定时刻的一份全局状态快照,即包含了所有
Task/Operator的状态.
常用State
概述:
Flink有两种常见的State类型,分别是:
1. keyed State(键控状态)
2. Operator State(算子状态)
1. Keyed State(键控状态)
概念:
顾名思义就是基于KeyedStream上的状态,这个的状态是跟特定的key绑定的,KeyedStream流上的每个Key, 都对应一个State,
Flink针对Keyed State提供了以下可以保存State的数据结构
1.** ValueState<T>**
保存一个可以更新和检索的值(如上所述,每个值都能对应到当前的输入数据的key,因此算子接受到的每个Key都可以对应一个值)
这个值可以通过update(T)进行更新,通过T value()进行检索
2. **ListState<T> **
保存一个元素的列表,可以往这个列表中追加数据,并在当前的列表上进行检索.可以通过addd(T)或者addAll(List<T>)进行添加的元素,
通过Iterable<T> get() 获得整个列表,还可以通过 update(List<T>) 覆盖当前的列表.
3. ReducingState<T>
保存一个单值,表示添加到状态的所有值的聚合,接口与ListState类似,但使用add(T)增加元素,会使用提供的ReduceFunction进行结合.
4.AggregatingState<In,Out>:
保留一个单值,表示添加到状态的所有值的聚合.和ReducingState 相反的是,聚合类型可能与添加到状态的元素的类型不同,,
即可欧与ListState类似,但使用add(In)添加的元素会用指定的AggregateFUnction进行聚合.
5. FoldingState<T,ACC>:
保留一个单值,表示添加到状态的所有值的聚合,与ReducingState相反,聚合类型可能与添加到状态的元素类型不同,接口与ListState
类似,但使用add(T)添加的元素会用指定的的FoldFunction折叠成聚合值.)
6. *MapState<UK,UV>*
维护了一个映射列表,你可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器,使用put(UK,UV) 或者putAll(Map<Uk,UV>)
添加映射.使用get<UK)检索特定Key,使用entries(),keys()和values()分别检索映射、键和值的可迭代视图.
2. Operator State(算子状态)
概念:
Operator State与Key无关,而是与Operator绑定,整个Operator只对应一个State.
比如Flink中的kafka Connector 就是用了Operator State,
它会在每个Connector实例中,保存该实例消费Topic的所有(partition offset)映射
3. Keyed State 案例
需求:
//案例需求: 统计每个手机的呼叫间隔时间并输出
code:
object StateCallInterval {
// 导入Flink隐式转换
import org.apache.flink.streaming.api.scala._
def main(args: Array[String]): Unit = {
//获取flink实时流处理的环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val data = env.readTextFile("D:\\Workspace\\IdeaProjects\\F1Demo\\src\\FlinkDemo\\functions\\station.log")
.map { line =>
var arr = line.split(",")
StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
}
data.keyBy(_.callInt).flatMap(new CallIntervalFunction()).print()
env.execute("d")
}
//RichFlatMapFunction[In,Out]
class CallIntervalFunction() extends RichFlatMapFunction[StationLog, (String, Long)] {
// 定义一个保存前一条呼叫的数据的状态对象
private var preData: ValueState[StationLog] = _
// 使用富函数会有一个生命周期,在调用flatMap方法之前执行.
override def open(parameters: Configuration): Unit = {
// 获取ValueState描述器
val stateDescriptor = new ValueStateDescriptor[StationLog]("pre", classOf[StationLog])
preData = getRuntimeContext.getState(stateDescriptor)
}
override def flatMap(in: StationLog, collector: Collector[(String, Long)]): Unit = {
// 获取绑定key1的状态值
val pre = preData.value()
if (pre == null) {
//如果状态中没有,则存入
preData.update(in)
} else {
// 如果状态中有值则计算时间间隔
var interval = in.callTime - pre.callTime
//收集器收集输出
collector.collect((in.callInt, interval))
}
}
}
}
2. 简单方法使用FlatMapWithState:
code:
// 2. 简单方法: 调用FlatMapWithState[R,T]算子-R-> return 返回类型, T Stateful状态类型
/**
* 1.通过flatMapWithstate的方式来保存状态 Seq与List都是TraversableOnce类型
* 这里面的状态其实就是通过Option()来维护的,
* 如果之前没有状态保存,那么option就是None,
* 如过之前有保存过状态,那么Option就是Some,可以通过getOrElse(0)获取状态
* -----------------------------------------------------------------------
* def flatMapWithState[R: TypeInformation, S: TypeInformation](
* fun: (T, Option[S]) => (TraversableOnce[R], Option[S])): DataStream[R]
* 源码范型解析:
* >>>>>>>> DataStream[R] -> R就是返回值中的类型
* >>>>>>>> Oprion[S] -> 那么S肯定就是状态的类型
* >>>>>>>> 返回值、状态类型都有了,那么T肯定就是输入数据的类型
*/
.flatMapWithState[(String, Long), StationLog] {
// 如果状态中没有,则存入
case (in: StationLog, None) => (List.empty, Some(in))
// 如果状态中有值则计算时间间隔
case (in: StationLog, pre: Some[StationLog]) =>
val interval = in.callTime - pre.get.callTime
(List((in.callInt, interval)), Some(in))
}.print()
3. 使用MapFlatWithState:
code:
/* def mapWithState[R: TypeInformation, S: TypeInformation](
fun: (T, Option[S]) => (R, Option[S])): DataStream[R] = {*/
.mapWithState[(String, Long), StationLog] {
case (in: StationLog, None) => ((in.callInt, 0), Some(in))
case (in: StationLog, pre: Some[StationLog]) => {
val interval = in.callTime - pre.get.callTime
((in.callInt, interval), Some(in))
}
}.print()
CheckPoint
概述:
当程序出现问题需要恢复State数据的时候,只有程序提供支持才可以实现State的容错,State的容错需要依靠CheckPoint机制,这样才可以
保证Exactly-once这种语义,当时注意,它只能保证Flink系统内的Exactly-ONce,比如Flink内置支持的算子,
针对Source 和Sink组件,如果想要保证Exactly-once的话,则这些组件本身应支持这种语义
1. CheckPoint 原理
概述:
Flink中基于异步轻量级的分布式快照技术提供了CheckPoints容错机制,分布式快照可以将同一时间点/Task/Operator的状态数据全局统一
快照处理,包括前面提到的Keyed State和 Operator State ,Flink会在输入的数据集上间隔性地生成Checkpoint barrieer,
通过栅栏(barrier) 将间隔时间段内的数据划分到相应的Checkpoint中.
2. CheckPoint参数和设置
概述:
默认情况下Flink不开启检查点,用户需要在程序中通过调用方法配置和开启检查点,另外还可以调整其相关参数:
1. CheckPoint开启时间间隔指定
开启检查点并且指定检查点时间间隔为1000mx,根据实际情况自行选择,如果状态比较大,则建议适当增加该值
env.enableCheckpointing(1000)
2. Exactly-once 和 at-least-once语义选择
选择exactly-once语义保证整个应用内端到端的数据一致性,这种情况比较适合于数据要求比较高,不允许出现丢数据或者数据重复,
于此同时,Flink的性能也相对较弱,而at-least-once语义更适合于时延和吞吐量要求非常高但对数据一致性要求不高的场景.
默认的语义是exactly-once模式
env.getCheckpintConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
3. CheckPoint超时时间
超时时间指定了每次Checkpoint执行过程中的上限时间范围,一旦Checkpoin执行时间超过该阈值,FLink将会中端Checkpoint过程,
并按照超时处理,该指标可以通过setCheckpointTimeout方法设定,默认为10minute
env.getCheckpointConfig.setCheckpointTimeout(5000)
4. 检查点之间最小时间间隔:
该参数主要目的是设定两个Checkpoint之间的最小时间间隔,防止出现例如状态数据过大而导致Checkpoint执行时间过长,从而
导致Checkpoint挤压过大,最终Flink应用密集地触发Checkpoint操作,会占用了大量计算资源而影响到整个应用的性能.
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(600)
5. 最大并行执行的检查点数量:
通过SetMaxConcurrentCheckpoints()方法设定能够最大同时执行的Checkpoint数量.
在默认情况下,只有一个检查点可以运行,根据用户指定的数量可以同时出发多个Checkpoint,进而提升Checkpoint整体效果
env.getCheckpoingConfig.setMaxConcurrentCheckpoints(1)
6. 是否删除Checkpoint中保存的数据:
设置为RETAIN_ON_CANCELLATION:表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
设置为DELETE_ON_CANCELLATION:表示一旦Flink处理程序被cancel后,会删除Checkpoint数据只有job执行失败的时候才会保存Checkpoint
env.getCheckpoingConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION)
7. TolerableCheckpointFailureNumber:
设置可以容忍的检查的失败数,超过这个数量则系统自动关闭和停止任务
env.getCheckpointConfig.setTolearbleCheckpointFailureNumber(1)
保存机制 StateBackend(状态后端)
概述:
默认情况下,State会保存在TaskManager的内存中,Checkpoint会存储在JobManager的内存中. State 和 Checkpoint的存储位置取决于
StateBackend的配置. Flink一共提供了3种StateBackend.
包括基于内存的MemortStateBackend基于文件系统的FsStateBackend,以及基于RockDB作为存储介质的RocksDBSState-Backend.
1. MemoryStateBackend
概述:
基于内存的状态管理具有非常快速和高效特点,但也有很多限制,最主要的是内存容限制,一旦存储的状态数据过多,就会导致系统内存溢出.
从而影响整个应用的正常运行.同时如果机器出现问题,整个主机内存中的状态数据都会丢失,进而无法恢复任务中的状态数据.
因此从数据安全的角度建议用户尽可能地避免在生产环境中使用MemoryStateBackend
env.setStateBackend(new MemoryStateBackend(10*1024*1024))
2. FsStateBackend
概述:
和MemoryStateBackend有所不同,FsStateBackend是基于文件系统的一种状态管理器,这里的文件系统可以是本地文件系统,也可以是HDFS分布式文件系统.
FsStateBackend更适合任务状态非常大的情况,例如应用中含有时间范围非常长的窗口计算,或Key/Value State状态数据量非常大的场景
env.setStateBackend(new FsStateBackend(10*1024*1024))
3. RocksDBStateBackend
概述:
RocksDBStateBackend 是Flink中内置的第三方状态管理器,和前面的状态管理器不同,RocksDBStateBackend需要单独引入相关的
依赖包到工程中
<groupId>org.apache.flink</groupId>
<artifictId>flink-statebackend-rocksdb_2.11</artifactId>
<version>1.9.1</version>
差异:
RocksDBStateBackend 采用异步的方式进行状态数据的Snapshot,任务中的状态数据首先被写入本地RockDB中,这样在RockDB仅会存储
正在进行计算的热数据,而需要进行CheckPoint的时候,会吧本地的数据直接复制到远端的FileSystem中.
RocksDBStateBackend在性能上要比FsStateBackend高一些
原因:
主要是因为借助于RocksDB在本地存储了最新热数据,然后通过异步的方式再同步到文件系统中,但RocksDBStateBackend和MemoryStateBackend相比
性能就会较弱一些,RocksDB克服了State受内存限制的缺点,同时又能够持久化到远端文件系统中,推荐生产中使用
env.setStateBackend(new RocksDBStateBackend("Hdfs://hadoop01:9000/checkpoint/cp2")
4. 全局配置 StateBackend
概述;
以上的代码都是单JOb配置状态后端,也可以全局配置状态后端,需要修改flink-conf.yaml配置文件:
state.backend:filesystem
其中:
filesystem 表示使用FsStateBackend
jobmanager 表示使用 MemoryStateBackend
rocksdb 表示使用RocksDBStateBackend
state.checkpoints.dir:hdfs://hadoop01:9000/checkpoints
intro:
默认情况下,如果设置了CheckPoint选项,则Flink只保留最近生成的1个CheckPoint,而当Flink程序失败时,可以通过最近的CheckPoint来
进行恢复,但是,如果希望保留多个CheckPoint,并能根据实际需要恢复其中一个进行恢复,就会更加灵活,
添加如下配置,指定最多可以保存的CheckPoint的个数
state.checkpoint.num-retained:2
5. SavePoint
概述:
Savepoints 是检查点的一种特殊实现,底层实现其实也是使用Checkpoints的机制.
Savepoints是用户已手工命令的方式触发Checkpoint,并将结果持久化到指定的存储路径中,其主要目的是帮助用户在升级和维护集群过程
中保存系统的状态数据,避免因为停机运维或者升级应用,等正常终止应用的操作而导致系统无法恢复到原有的计算状态的情况.从而
无法实现从端到端的Exactly-Once语义保证.
1. 配置Savepoints的存储路径
在flink-conf.yaml 中配置SavePoint存储的位置,设置后,如果要创建指定Job的SavePoint,可以不用在手动执行命令时指定SavePoint的位置
state.savepoints.dir: hdfs:/hadoop101:9000savepoints
2. 在代码中设置算子ID
为了能够在作业的不同版本之间以及Flink的不同版本之间顺利升级,强烈推荐程序员通过手动给算子赋予ID,这些ID将用于确定每一个算子的
状态范围,如果不手动给各算子指定ID,则会由Flink自动给每个算子生成一个ID,强烈建议用户手动设置ID
code:
package com.bjsxt.flink.state
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object TestSavePoints {
def main(args: Array[String]): Unit = {
//1、初始化Flink流计算的环境
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//修改并行度
streamEnv.setParallelism(1) //默认所有算子的并行度为1
//2、导入隐式转换
import org.apache.flink.streaming.api.scala._
//3、读取数据,读取sock流中的数据
val stream: DataStream[String] = streamEnv.socketTextStream("hadoop101",8888) //DataStream ==> spark 中Dstream
.uid("socket001")
//4、转换和处理数据
val result: DataStream[(String, Int)] = stream.flatMap(_.split(" "))
.uid("flatmap001")
.map((_, 1)).setParallelism(2)
.uid("map001")
.keyBy(0)//分组算子 : 0 或者 1 代表下标。前面的DataStream[二元组] , 0代表单词 ,1代表单词出现的次数
.sum(1)
.uid("sum001")
//5、打印结果
result.print("结果").setParallelism(1)
//6、启动流计算程序
streamEnv.execute("wordcount")
}
}
3. 触发SavePoint
4. 从SavePoint 启动 Job / 也可以通过Web UI启动 Job:
root@hadoop101 bin]# ./flink run -s
hdfs://hadoop101:9000/savepoints/savepoint-6ecb8c-e56ccb88576a -c
com.bjsxt.flink.state.TestSavepoints -d /home/Flink-Demo-1.0-SNAPSHOT.jar