Driver容错
- 思想:
- 周期性将Dstream的DAG持久化到文件 系统中,重新启动Driver时重新加载DAG
- 实现:
- 启动Driver自动重启(ClusterManager支持该功能)
- standalone: 提交任务时添加
--supervise
参数
官方文档链接 - yarn:设置
yarn.resourcemanager.am.max-attempts
或者spark.yarn.maxAppAttempts
spark on yarn参数配置 - mesos: 提交任务时添加 --supervise 参数
- standalone: 提交任务时添加
- 设置checkpoint
StreamingContext.setCheckpoint(hdfsDirectory)
- 支持从checkpoint中重启
def createContext(checkpointDirectory: String): StreamingContext = {
val ssc = ???
ssc.checkpoint(checkpointDirectory)
ssc
}
val ssc = StreamingContext.getOrCreate(checkpointDirectory, createContext(checkpointDirectory))
Executor通用容错
- 思路:
- spark 处理RDD失败会通过lineage进行重做保证数据可靠
- 对于reduceByKey等Stateful操作重做的lineage较长的,强制启动checkpoint,减少重做几率
- 实现:
- 启用checkpoint
ssc.setCheckpoint(checkpointDir)
Receiver容错
- 思想:
- 接收到数据后先写日志(WAL)到可靠文件系统中,后才写入实际的RDD。如果后续处理失败则成功写入WAL的数据通过WAL进行恢复,未成功写入WAL的数据通过可回溯的Source进行重放
- 实现:
- 启用checkpoint
ssc.setCheckpoint(checkpointDir)
- 启用WAL
sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
对Receiver使用可靠性存储
StoreageLevel.MEMORY_AND_DISK_SER
orStoreageLevel.MEMORY_AND_DISK_SER2
效果:
- 常规的数据流:Receiver提供
AtLeastOne
语义(可能重复)
- 基于KafkaDirect的Receiver提供
ExactlyOne
语义,保证数据不丢不重复
Graceful Stop 优雅停止
- 思想:
- 结束任务时等待处理中的任务结束,并保留当前工作状态到checkpoint中,确保重启任务后能获取到正确的checkpoint
- 等待时间的设置(如何保证所有数据写入checkpoint?, 一般设置成slideWindow的5-10倍)
- 实现: 有如下几种:
- 设置spark.streaming.stopGracefullyOnShutdown为true
- 添加hook
sys.ShutdownHookThread {
println("Gracefully stopping Spark Streaming Application at"+ new Date())
ssc.stop(true, true)
println("Application stopped at"+ new Date())
}
StreamingContext代码链接,graceful stop 逻辑见stop函数
TODO
- Kafka Direct如何保证 Exactly One
- 如何支持window容灾?(个人YY的一个场景和自我理解,不确定是否正确)
window为5分钟,slide为1分钟的任务,执行到第100分钟暂停后,checkpoint中的进度应该是[96,100], 10分钟后启动,此时需要的进度是[106,110],输出数据会出现丢失?
spark-streaming使用的是系统时间,而非数据时间。虽然数据本身是连续的,但是到达spark-streaming的时间相关较久导致无法存入同一window,无法正常计算?