Streaming -- State & Fault Tolerance -- Checkpointing

Flink中的每个函数和运算符都可以是有状态的(有关详细信息,请参见使用状态)。 有状态功能在处理单个元素/事件的过程中存储数据,使状态成为任何类型的更复杂操作的关键构建块。

为了使状态容错,Flink需要对状态进行检查点。检查点允许Flink恢复流中的状态和位置,从而为应用程序提供与无故障执行相同的语义。

documentation on streaming fault tolerance详细描述了Flink流容错机制背后的技术。

Prerequisites

Flink的检查点机制与流和状态的持久存储交互。一般来说,它需要:

  • 可以在一定时间内重播记录的持续的(或长久的)数据源。这类源的例子有持久的消息队列(例如,Apache Kafka, RabbitMQ, Amazon Kinesis,谷歌PubSub)或文件系统(例如,HDFS, S3, GFS, NFS, Ceph)。
  • 状态的持久存储,通常是分布式文件系统(例如,HDFS、S3、GFS、NFS、Ceph)
Enabling and Configuring Checkpointing

默认情况下,检查点是禁用的。要启用检查点,在StreamExecutionEnvironment上调用enableCheckpointing(n),其中n是检查点间隔(以毫秒为单位)。

检查点的其他参数包括:

  • 精确一次vs.至少一次:您可以选择将模式传递给enableCheckpointing(n)方法,以便在两个保证级别之间进行选择。对大多数应用程序来说,精确一次是最好的。对于某些超低延迟(始终只有几毫秒)的应用程序,至少需要一次。

  • 检查点超时:如果正在进行的检查点在此之前没有完成,则在此之后中止的时间。

  • 检查点之间的最小时间:为了确保流应用程序在检查点之间取得一定的进展,可以定义检查点之间需要经过多少时间。例如,如果将该值设置为5000,则下一个检查点将在上一个检查点完成后不早于5秒启动,无论检查点持续时间和间隔如何。注意,这意味着检查点间隔永远不会小于此参数。
    通过定义检查点之间的时间通常比定义检查点间隔更容易配置应用程序,因为检查点之间的时间不受检查点有时花费的时间超过平均时间这一事实的影响(例如,如果目标存储系统暂时变慢)。

  • 并发检查点的数量:默认情况下,当一个检查点仍在运行时,系统不会触发另一个检查点。这确保拓扑不会在检查点上花费太多时间,也不会在处理流方面取得进步。允许多个重叠的检查点是可能的,这对于具有一定处理延迟(例如,因为函数调用了需要一些时间来响应的外部服务)但仍然希望执行非常频繁的检查点(100毫秒)以在失败时很少重新处理的管道来说是很有趣的。
    当定义了检查点之间的最短时间时,不能使用此选项。

  • 外部化检查点:可以配置定期检查点,以在外部持久保存。外部化的检查点将其元数据写入持久存储,并且在作业失败时不会自动清除。这样,如果作业失败,您的周围就会有一个检查点来恢复。在 deployment notes on externalized checkpoints中有更多细节。

  • 检查点错误的失败/继续任务:如果执行任务的检查点过程时发生错误,这将确定任务是否失败。 这是默认行为。 或者,禁用此选项后,任务将简单地将检查点拒绝给检查点协调器并继续运行。

  • 宁愿使用检查点进行恢复:这决定了一个作业是否将回退到最新的检查点,即使有更多的最近保存点可用来减少恢复时间。

val env = StreamExecutionEnvironment.getExecutionEnvironment()

// start a checkpoint every 1000 ms
env.enableCheckpointing(1000)

// advanced options:

// set mode to exactly-once (this is the default)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)

// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig.setCheckpointTimeout(60000)

// prevent the tasks from failing if an error happens in their checkpointing, the checkpoint will just be declined.
env.getCheckpointConfig.setFailTasksOnCheckpointingErrors(false)

// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)

Related Config Options

可以通过conf / flink-conf.yaml设置更多的参数和/或默认值(请参阅configuration以获取完整指南):

Key Default Type Description
state.backend (none) String 用于存储和检查点状态的状态后端。
state.backend.async true Boolean 选项状态后端是否应在可能且可配置的情况下使用异步快照方法。一些状态后端可能不支持异步快照,或者只支持异步快照,并忽略此选项。
state.backend.fs.memory-threshold 1024 Integer 状态数据文件的最小大小。所有小于此值的状态块都内联存储在根检查点元数据文件中。
state.backend.fs.write-buffer-size 4096 Integer 写到文件系统的检查点流的写缓冲区的默认大小。实际的写缓冲区大小被确定为这个选项和选项'state.backend.fs.memory-threshold'的最大值。
state.backend.incremental false Boolean 选项状态后端是否应该创建增量检查点(如果可能)。对于增量检查点,只存储与前一个检查点的差异,而不是完整的检查点状态。一些状态后端可能不支持增量检查点并忽略此选项。
state.backend.local-recovery false Boolean 此选项配置此状态后端的本地恢复。默认情况下,本地恢复是停用的。本地恢复目前只覆盖键控状态后端。目前,MemoryStateBackend不支持本地恢复并忽略此选项。
state.checkpoints.dir (none) String 用于在Flink支持的文件系统中存储检查点的数据文件和元数据的默认目录。 必须从所有参与的进程/节点(即所有TaskManager和JobManager)访问存储路径。
state.checkpoints.num-retained 1 Integer 要保留的已完成检查点的最大数量。
state.savepoints.dir (none) String 保存点的默认目录。由将保存点写入文件系统的状态后端使用(MemoryStateBackend, FsStateBackend, RocksDBStateBackend)。
taskmanager.state.local.root-dirs (none) String 定义根目录的配置参数,用于存储用于本地恢复的基于文件的状态。本地恢复目前只覆盖键控状态后端。目前,MemoryStateBackend不支持本地恢复并忽略此选项

Selecting a State Backend

Flink的checkpointing mechanism在计时器和有状态操作符中存储所有状态的一致快照,包括连接器、窗口和任何用户定义的状态。检查点存储的位置(例如,JobManager内存、文件系统、数据库)取决于配置的状态后端

默认情况下,状态保存在taskmanager的内存中,检查点存储在JobManager的内存中。为了正确地持久化大状态,Flink支持在其他状态后端存储和检查点状态的各种方法。状态后端可以通过StreamExecutionEnvironment.setStateBackend()配置。

有关可用状态后端以及作业范围和集群范围配置选项的详细信息,请参阅状态后端。

State Checkpoints in Iterative Jobs

Flink目前仅为没有迭代的作业提供处理保证。在迭代作业上启用检查点会导致异常。为了在迭代程序上强制检查点,用户需要在启用检查点时设置一个特殊的标志:env.enableCheckpointing(interval,CheckpointingMode.EXACTLY_ONCE,force = true)。

请注意,在失败期间,循环边缘中的运行记录(以及与它们相关联的状态更改)将丢失。

Restart Strategies

Flink支持不同的重新启动策略,这些策略控制在出现故障时如何重新启动作业。有关更多信息,请参见 Restart Strategies

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,968评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,601评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,220评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,416评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,425评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,144评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,432评论 3 401
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,088评论 0 261
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,586评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,028评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,137评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,783评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,343评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,333评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,559评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,595评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,901评论 2 345