Spark Structured Streaming 入门

无限增长的表格

Structured Streaming 的关键思想是将持续不断的数据当做一个不断追加的表

将输入的流数据当做一张 “输入表”。把每一条到达的数据作为输入表的新的一行来追加。

image

在输入表上执行的查询将会生成 “结果表”。每个触发间隔(trigger interval)(如: 1s),新的行追加到输入表,最终更新结果表。无论何时更新结果表,我们都希望将更改的结果行 output 到外部存储/接收器(external sink)。

image

基于“无限增长的表格”的编程模型的 Structured Streaming 的 word count例子:

val spark = SparkSession.builder().master("...").getOrCreate()  // 创建一个 SparkSession 程序入口

val lines = spark.readStream.textFile("some_dir")  // 将 some_dir 里的内容创建为 Dataset/DataFrame;即 input table
val words = lines.flatMap(_.split(" "))
val wordCounts = words.groupBy("value").count()    // 对 "value" 列做 count,得到多行二列的 Dataset/DataFrame;即 result table

val query = wordCounts.writeStream                 // 打算写出 wordCounts 这个 Dataset/DataFrame
  .outputMode("complete")                          // 打算写出 wordCounts 的全量数据
  .format("console")                               // 打算写出到控制台
  .start()                                         // 新起一个线程开始真正不停写出

query.awaitTermination() 
image
  • Structured Streaming 也是先纯定义、再触发执行的模式,即
    • 前面大部分代码是 纯定义Dataset/DataFrame 的产生、变换和写出
    • 后面位置再真正 start一个新线程,去触发执行之前的定义
  • 在新的执行线程里我们需要持续地去发现新数据,进而持续地查询最新计算结果至写出
    • 这个过程叫做 continous query持续查询)

写入模式

Complete

整个更新的结果表将被写入外部存储。由存储连接器(storage connector)决定如何处理整个表的写入。

image

Append

只有结果表中自上次触发后附加的新行将被写入外部存储。这仅适用于不期望更改结果表中现有行的查询。

image

Append 的语义将保证,一旦输出了某条 key,未来就不会再输出同一个 key。

所以,在上图 12:10 这个批次直接输出 12:00-12:10|cat|1, 12:05-12:15|cat|1 将是错误的,因为在 12:20 将结果更新为了 12:00-12:10|cat|2,但是 Append 模式下却不会再次输出 12:00-12:10|cat|2,因为前面输出过了同一条 key 12:00-12:10|cat 的结果12:00-12:10|cat|1

为了解决这个问题,在 Append 模式下,Structured Streaming 需要知道,某一条 key 的结果什么时候不会再更新了。当确认结果不会再更新的时候,就可以将结果进行输出。

Update

只有自上次触发后结果表中更新的行将被写入外部存储(自 Spark 2.1.1 起可用)。 请注意,这与完全模式不同,因为此模式仅输出自上次触发以来更改的行。如果查询不包含聚合操作,它将等同于附加模式。

image

如上图所示,在 Update 模式中,只有本执行批次 State 中被更新了的条目会被输出:

  • 在 12:10 这个执行批次,State 中全部 2 条都是新增的(因而也都是被更新了的),所以输出全部 2 条;
  • 在 12:20 这个执行批次,State 中 2 条是被更新了的、 4 条都是新增的(因而也都是被更新了的),所以输出全部 6 条;
  • 在 12:30 这个执行批次,State 中 4 条是被更新了的,所以输出 4 条。这些需要特别注意的一点是,如 Append 模式一样,本执行批次中由于(通过 watermark 机制)确认 12:00-12:10 这个 window 不会再被更新,因而将其从 State 中去除,但没有因此产生输出。

StreamExecution:持续查询的驱动引擎

初始状态

前文刚解析的先定义好 Dataset/DataFrame 的产生、变换和写出,再启动 StreamExection 去持续查询。这些 Dataset/DataFrame 的产生、变换和写出的信息就对应保存在 StreamExecution 非常重要的 3 个成员变量中:

  • sources: streaming data 的产生端(比如 kafka 等)
  • logicalPlan: DataFrame/Dataset 的一系列变换(即计算逻辑)
  • sink: 最终结果写出的接收端(比如 file system 等)

StreamExection 另外的重要成员变量是:

  • currentBatchId: 当前执行的 id
  • batchCommitLog: 已经成功处理过的批次有哪些
  • offsetLog, availableOffsets, committedOffsets: 当前执行需要处理的 source data 的 meta 信息
  • offsetSeqMetadata: 当前执行的 watermark 信息(event time 相关)等
image

持续查询

image

一次执行的过程如上图;这里有 6 个关键步骤:

  1. StreamExecution 通过 Source.getOffset() 获取最新的 offsets,即最新的数据进度;
  2. StreamExecution 将 offsets 等写入到 offsetLog 里
    • 这里的 offsetLog 是一个持久化的 WAL (Write-Ahead-Log),是将来可用作故障恢复用
  3. StreamExecution 构造本次执行的 LogicalPlan
    • (3a) 将预先定义好的逻辑(即 StreamExecution 里的 logicalPlan 成员变量)制作一个副本出来
    • (3b) 给定刚刚取到的 offsets,通过 Source.getBatch(offsets) 获取本执行新收到的数据的 Dataset/DataFrame 表示,并替换到 (3a) 中的副本里
    • 经过 (3a), (3b) 两步,构造完成的 LogicalPlan 就是针对本执行新收到的数据的 Dataset/DataFrame 变换(即整个处理逻辑)了
  4. 触发对本次执行的 LogicalPlan 的优化,得到 IncrementalExecution
    • 逻辑计划的优化:通过 Catalyst 优化器完成
    • 物理计划的生成与选择:结果是可以直接用于执行的 RDD DAG
    • 逻辑计划、优化的逻辑计划、物理计划、及最后结果 RDD DAG,合并起来就是 IncrementalExecution
  5. 将表示计算结果的 Dataset/DataFrame (包含 IncrementalExecution) 交给 Sink,即调用 Sink.add(ds/df)
  6. 计算完成后的 commit
    • (6a) 通过 Source.commit() 告知 Source 数据已经完整处理结束;Source 可按需完成数据的 garbage-collection
    • (6b) 将本次执行的批次 id 写入到 batchCommitLog 里

持续查询(增量)

image

Structured Streaming 在编程模型上暴露给用户的是,每次持续查询看做面对全量数据(而不仅仅是本次执行信收到的数据),所以每次执行的结果是针对全量数据进行计算的结果。

但是在实际执行过程中,由于全量数据会越攒越多,那么每次对全量数据进行计算的代价和消耗会越来越大。

Structured Streaming 的做法是:

  • 引入全局范围、高可用的 StateStore
  • 转全量为增量,即在每次执行时:
    • 先从 StateStore 里 restore 出上次执行后的状态
    • 然后加入本执行的新数据,再进行计算
    • 如果有状态改变,将把改变的状态重新 save 到 StateStore 里
  • 为了在 Dataset/DataFrame 框架里完成对 StateStore 的 restore 和 save 操作,引入两个新的物理计划节点 —— StateStoreRestoreExec 和 StateStoreSaveExec

所以 Structured Streaming 在编程模型上暴露给用户的是,每次持续查询看做面对全量数据,但在具体实现上转换为增量的持续查询。

故障恢复

通过上文知道存储 source offsets 的 offsetLog,和存储计算状态的 StateStore,是全局高可用的。仍然采用前面的示意图,offsetLog 和 StateStore 被特殊标识为紫色,代表高可用。

image

由于 exectutor 节点的故障可由 Spark 框架本身很好的 handle,不引起可用性问题,我们本节的故障恢复只讨论 driver 故障恢复。

如果在某个执行过程中发生 driver 故障,那么重新起来的 StreamExecution:

  • 读取 WAL offsetlog 恢复出最新的 offsets 等;相当于取代正常流程里的 (1)(2) 步
  • 读取 batchCommitLog 决定是否需要重做最近一个批次
  • 如果需要,那么重做 (3a), (3b), (4), (5), (6a), (6b) 步
    • 这里第 (5) 步需要分两种情况讨论
      • (i) 如果上次执行在 (5) *结束前即失效*,那么本次执行里 sink 应该完整写出计算结果
      • (ii) 如果上次执行在 (5) *结束后才失效*,那么本次执行里 sink 可以重新写出计算结果(覆盖上次结果),也可以跳过写出计算结果(因为上次执行已经完整写出过计算结果了)

这样即可保证每次执行的计算结果,在 sink 这个层面,是 *不重不丢* 的 —— 即使中间发生过 1 次或以上的失效和恢复。

参考

Structured Streaming 实现思路与实现概述
Structured Streaming Programming Guide

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,372评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,368评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,415评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,157评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,171评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,125评论 1 297
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,028评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,887评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,310评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,533评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,690评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,411评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,004评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,659评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,812评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,693评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,577评论 2 353

推荐阅读更多精彩内容