【容错篇】WAL在Spark Streaming中的应用

【容错篇】WAL在Spark Streaming中的应用

WAL 即 write ahead log(预写日志),是在 1.2 版本中就添加的特性。作用就是,将数据通过日志的方式写到可靠的存储,比如 HDFS、s3,在 driver 或 worker failure 时可以从在可靠存储上的日志文件恢复数据。WAL 在 driver 端和 executor 端都有应用。我们分别来介绍。

WAL在 driver 端的应用

何时创建

用于写日志的对象 writeAheadLogOption: WriteAheadLog
在 StreamingContext 中的 JobScheduler 中的 ReceiverTracker 的 ReceivedBlockTracker 构造函数中被创建,ReceivedBlockTracker 用于管理已接收到的 blocks 信息。需要注意的是,这里只需要启用 checkpoint 就可以创建该 driver 端的 WAL 管理实例,而不需要将 spark.streaming.receiver.writeAheadLog.enable 设置为 true

参见:揭开Spark Streaming神秘面纱② - ReceiverTracker 与数据导入

写什么、何时写

写什么
首选需要明确的是,ReceivedBlockTracker 通过 WAL 写入 log 文件的内容是3种事件(当然,会进行序列化):

  • case class BlockAdditionEvent(receivedBlockInfo: ReceivedBlockInfo);即新增了一个 block 及该 block 的具体信息,包括 streamId、blockId、数据条数等
  • case class BatchAllocationEvent(time: Time, allocatedBlocks: AllocatedBlocks);即为某个 batchTime 分配了哪些 blocks 作为该 batch RDD 的数据源
  • case class BatchCleanupEvent(times: Seq[Time]);即清理了哪些 batchTime 对应的 blocks

    知道了写了什么内容,结合源码,也不难找出是什么时候写了这些内容。需要再次注意的是,写上面这三种事件,也不需要将 spark.streaming.receiver.writeAheadLog.enable 设置为 true

何时写BlockAdditionEvent
揭开Spark Streaming神秘面纱② - ReceiverTracker 与数据导入
一文中,已经介绍过当 Receiver 接收到数据后会调用 ReceiverSupervisor#pushAndReportBlock方法,该方法将 block 数据存储并写一份到日志文件中(即 WAL),之后最终将 block 信息,即 receivedBlockInfo(包括 streamId、batchId、数据条数)传递给 ReceivedBlockTracker.

ReceivedBlockTracker 接收到 receivedBlockInfo 后,将之封装成 BlockAdditionEvent(receivedBlockInfo) 并写入日志(WAL)。

抛开代码调用逻辑不谈,一句话总结的话,就是当 Receiver 接收数据产生新的 block 时,最终会触发产生并写 BlockAdditionEvent

何时写BatchAllocationEvent
揭开Spark Streaming神秘面纱③ - 动态生成 job一文中介绍了 JobGenerator 每隔 batch duration 就会为这个 batch 生成对应的 jobs。在生成 jobs 的时候需要为 RDD 提供数据,这个时候就会触发执行

jobScheduler.receiverTracker.allocateBlocksToBatch(time)

该操作将把所有该 streamId 对应的已接收存储但未分配的 blocks 都分配给该 batch,我们知道,ReceivedBlockTracker 保存着所有的 blocks 信息,所以为某个 batch 分配 blocks 这个分配请求最终会给到 ReceivedBlockTracker,ReceivedBlockTracker 在确认要分配哪些 blocks 之后,会将给某个 batchTime 分配了哪些 blocks 的对应关系封装成 BatchAllocationEvent(batchTime, allocatedBlocks) 并写入日志文件(WAL),这之后才进行真正的分配。

何时写BatchCleanupEvent

从我以前写的一些文章中可以知道,一个 batch 对应的是一个 jobSet,因为在一个 batch 可能会有多个 DStream 执行了多次 output 操作,每个 output 操作都将生成一个 job,这些 job 将组成 jobSet。总共有两种时机会触发将 BatchCleanupEvent 事件写入日志(WAL),我们进行依次介绍

我们先来介绍第一种,废话不多说,直接看具体步骤:

  1. 每当 jobSet 中某一个 job 完成的时候,job 调度器会去检查该 job 对应的 jobSet 中的所有 job 是否均已完成
  2. 若是,会通过 jobGenerator.eventLoop 给自身发送 ClearMetadata 消息
  3. jobGenerator 在接收到该消息后,调用自身clearMetadata方法,clearMetadata方法最终会调用到 ReceiverTracker#cleanupOldBlocksAndBatches,具体cleanupOldBlocksAndBatches方法干了什么稍后分析

另一种时机如下:

  1. JobGenerator在完成 checkpoint 时,会给自身发送一个 ClearCheckpointData 消息
  2. JobGenerator在收到 ClearCheckpointData 消息后,调用 clearCheckpointData 方法
  3. JobGenerator#ClearCheckpointData 方法中,会调用到 ReceiverTracker#drcleanupOldBlocksAndBatches

从上面的两小段分析我们可以知道,当一个 batch 的 jobSet 中的 jobs 都完成的时候和每次 checkpoint操作完成的时候会触发执行 ReceiverTracker#cleanupOldBlocksAndBatches 方法,该方法里做了什么呢?见下图:

上图描述了以上两个时机下,是如何:

  1. 将 batch cleanup 事件写入 WAL 中
  2. 清理过期的 blocks 及 batches 的元数据
  3. 清理过期的 blocks 数据(只有当将 spark.streaming.receiver.writeAheadLog.enable 设置为 true才会执行这一步)

WAL 在 executor 端的应用

Receiver 接收到的数据会源源不断的传递给 ReceiverSupervisor,是否启用 WAL 机制(即是否将 spark.streaming.receiver.writeAheadLog.enable 设置为 true)会影响 ReceiverSupervisor 在存储 block 时的行为:

  • 不启用 WAL:你设置的StorageLevel是什么,就怎么存储。比如MEMORY_ONLY只会在内存中存一份,MEMORY_AND_DISK会在内存和磁盘上各存一份等
  • 启用 WAL:在StorageLevel指定的存储的基础上,写一份到 WAL 中。存储一份在 WAL 上,更不容易丢数据但性能损失也比较大

关于什么时候以及如何清理存储在 WAL 中的过期的数据已在上图中说明

WAL 使用建议

关于是否要启用 WAL,要视具体的业务而定:

  • 若可以接受一定的数据丢失,则不需要启用 WAL,因为对性能影响较大
  • 若完全不能接受数据丢失,那就需要同时启用 checkpoint 和 WAL,checkpoint 保存着执行进度(比如已生成但未完成的 jobs),WAL 中保存着 blocks 及 blocks 元数据(比如保存着未完成的 jobs 对应的 blocks 信息及 block 文件)。同时,这种情况可能要在数据源和 Streaming Application 中联合来保证 exactly once 语义
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,012评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,628评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,653评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,485评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,574评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,590评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,596评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,340评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,794评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,102评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,276评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,940评论 5 339
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,583评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,201评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,441评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,173评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,136评论 2 352

推荐阅读更多精彩内容