实时流式处理系统是7*24小时运行的,同时可以从各种同时可以从各种各样的系统错误中恢复,在设计之处,Spark Streaing就支持driver和worker节点的错误恢复。
worker容错:
streaming构建在spark之上,spark在集群的worker上设计了容错性,streaming的worker容错机制和spark的是一样的,Spark Streaming应用的高可用性要求应用的driver进程也要有容错性,它是应用的主要进程用于协调所有的worker节点,因为用户应用的计算模式是可变的导致driver的容错性非常棘手,Spark Streaming会对实时流中的每一批数据进行运行同样的Spark计算,这样就可以定期的保存应用的状态到一个可靠的存储中,driver重启的时候恢复这些状态。
driver容错:依赖WAL
WAL(write ahead logs)使用在文件系统和数据库中用于数据操作的持久性,先把数据写到一个持久化的日志中,然后对数据做操作,如果操作过程中系统挂了,恢复的时候可以重新读取日志文件再次进行操作。
启动WAL需要做如下的配置
1:给streamingContext设置checkpoint的目录,该目录必须是HADOOP支持的文件系统,用来保存WAL和做Streaming的checkpoint
2:spark.streaming.receiver.writeAheadLog.enable 设置为true
当WAL被启动了以后,所有的接收器接收的数据可以很稳定的恢复,推荐的内存备份可以关闭了(给输入流设置合适的持久化级别),因为WAL保存在可容错的文件系统上,数据已经备份了。此外,如果想要恢复缓冲的数据,必须使用支持应答的数据源(flume,kafka,kinses)。 当数据存储到日志以后那些支持应答接收器可以向数据源确认。内置的flume和kafka接收器已经实现了这些功能。最后,值得注意的是WAL开启了以后会减少Spark Streaming处理数据的吞吐,因为所有接收的数据会被写到到容错的文件系统上,这样文件系统的吞吐和网络带宽将成为瓶颈。可以通过添加更多接收器或使用更好的软件。
实现细节
下面讲解下WAL的工作原理。过一下Spark Streaming的架构
当一个Spark Streaming应用启动了(例如driver启动), 相应的StreamingContext使用SparkContet去启动receiver,receiver是一个长时间执行的作业,这些接收器接收并保存这些数据到Spark的executor进程的内存中,这些数据的生命周期如下图所示
1:蓝色的箭头表示接收的数据,接收器把数据流打包成块,存储在executor的内存中,如果开启了WAL,将会把数据写入到存在容错文件系统的日志文件中
2:青色的箭头表示提醒driver, 接收到的数据块的元信息发送给driver中的StreamingContext, 这些元数据包括:executor内存中数据块的引用ID和日志文件中数据块的偏移信息
3:红色箭头表示处理数据,每一个批处理间隔,StreamingContext使用块信息用来生成RDD和jobs. SparkContext执行这些job用于处理executor内存中的数据块
4:黄色箭头表示checkpoint这些计算,以便于恢复。流式处理会周期的被checkpoint到文件中
当一个失败的driver重启以后,恢复流程如下
1:黄色的箭头用于恢复计算,checkpointed的信息是用于重启driver,重新构造上下文和重启所有的receiver
2: 青色箭头恢复块元数据信息,所有的块信息对已恢复计算很重要
3:重新生成未完成的job(红色箭头),会使用到2恢复的元数据信息
4:读取保存在日志中的块(蓝色箭头),当job重新执行的时候,块数据将会直接从日志中读取,
5:重发没有确认的数据(紫色的箭头)。缓冲的数据没有写到WAL中去将会被重新发送。