简介
Spark Streaming是核心Spark API的扩展,可对实时数据流进行可扩展,高吞吐量,容错处理。实时流可以有许多数据来源(例如Kafka,Flume,Kinesis或TCP套接字)等,并可以使用高级功能(如map,reduce,join和window)组成的复杂算法来处理数据。经过处理后的数据可以写入到文件系统、数据库、实时仪表盘等。在Spark内部,Spark Streaming接收实时输入流,并将其分成若干批,这些批次被送进Spark Engine中处理,最后按批次产生最后的结果。Spark Streaming的处理管道示意图如下:
背景知识
什么是流处理
在正式介绍Spark Streaming之前,我们先来介绍一下什么叫做数据流。
流处理是连续处理新到来的数据以更新计算结果的行为。在流处理中,输入数据是无边界的,没有预定的开始或结束。它是一系列到达流处理系统的事件(例如,信用卡交易,单击网站动作,或从物联网I o T传感器读取的数据),用户应用程序对此事件流可以执行各种查询操作(例如,跟踪每种事件类型的发生次数,或将这些事件按照某时间窗口聚合)。应用程序在运行时将输出多个版本的结果,或者在某外部系统 (如键值存储)中持续保持最新的聚合结果。
当然,我们可以将流处理(stream processing)与批处理(batch processing)进行比较,批处理是在固定大小输入数据集上进行计算的。通常,这可能是数据仓库中的大规模数据集,其包含来自应用程序的所有历史事件(例如,过去一个月的所有网站访问记录或传感器记录的 数据)。批处理也可以进行查询计算,与流处理差不多,但只计算一次结果。
流处理的应用场景
流处理系统主要有以下6个应用场景:
- 通知和警报
- 实时报告
- 增量ETL(Extract-Transform-Load)
最常见的流处理应用程序之一是减少公司在信息检索时必须忍受的延迟时间,简而言之,“把批处理任务用流处理方式执行”。 - 实时数据更新来提供实时服务
- 实时决策
流处理系统的实时决策包括分析新的输入,根据业务逻辑自动作出决策来应对新数据。 - 在线机器学习
结合用户的实时流数据和历史数据来进行在线实时推断。
流处理的优点
在大多数情况下,批处理更容易理解、更容易调试、也更容易编写应用程序。此外,批量处理数据也使得数据处理的吞吐量大大高于许多流处理系统。然而,流处理对于以下两种情况非常必要。
-
流处理可以降低系统延迟时间
当你的应用程序需要快速响应时间(在分钟、秒或毫秒级别上),你需要一个可以将状态保存在内存中的流处理系统,以获得更好的性能。 -
自动增量计算,高效更新结果
流处理系统可以记住以前计算的状态,只计算新数据。而对于传统批处理系统来说,必须加载所有时间段的数据才可以做到,并且还要添加额外的代码来实现相应功能。
Spark Streaming
Spark Streaming是用来处理实时数据流数据的,它是Spark Core API的一个非常有用的扩展。Spark Streaming可以对实时数据流进行高吞吐量、包含容错的流式处理。
Spark Streaming提供了一个高级抽象对象,名为discretized stream或者也叫DStream,用于代表连续的数据流。DStreams可以从输入数据源比如Kafka, Flume, and Kinesis等中创建得到,也可以通过对已有的DStream进行高阶操作得到。DStream代表的其实是一系列的RDDs。
Spark Streaming特性
Spark 流处理主要有以下6个特点:
- 可扩展性
Spark Streaming可以很容易地拓展到成千上百个节点上 - 速度
Spark Streaming实现了低延迟 - 容错能力
Spark 能快读地从错误中恢复 - 易集成到Spark体系
Spark集成了批处理和实时处理功能 - 商业分析
Spark Streaming可以用在商业数据分析中,用来追踪用户行为数据
Spark Streaming工作流程
Spark Streaming工作流程分为四个阶段。第一个是从各种数据源中获取流数据。这些数据源可以是流式数据源比如Akka, Kafka, Flume, AWS或者Parquet这样的实时数据流;也包括HBase, MySQL, PostgreSQL, Elastic Search, Mongo DB 以及 Cassandra这些用于静态或者批量流的数据源。一旦得到了数据流,Spakr可以在其之上使用Spark MLib API来进行机器学习算法处理,也可以使用Spark SQL来执行相关操作。最终,这些流的输出可以被存储在各种类型的数据存储系统中,比如HBase, Cassandra, MemSQL, Kafka, Elastic Search, HDFS 以及本地文件系统。Spark Streaming 基础组件
1. Streaming Context
Streaming Context用来消耗数据流。通过对它注册一个输入数据流,可以产生一个Receiver对象。这是使用Spark Streaming流处理功能的入口。Spark对包括Twitter、Akka和ZeroMQ等一些列数据源提供了默认的数据流读取实现。一个StreamContext对象可以从SparkContext对象构造出来。SparkContext对象表示与Spark集群的连接关系,可以用来在该集群上创建RDD、累加器和广播变量。
可以通过以下两种方式来创建一个StreamingContext对象。
- 通过已有的SparkContext对象来创建:
import org.apache.spark._
import org.apache.spark.streaming._
var ssc = new StreamingContext(sc,Seconds(1)) //这个处理间隔时间要根据具体业务来设定
- 通过SparkCconf对象来创建:
import org.apache.spark._
import org.apache.spark.streaming._
val conf = new SparkConf().setAppName("SparkStreaming").setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(1)) //这个处理间隔时间要根据具体业务来设定
在定义好StreamingContext对象之后,可以做以下事情:
- 通过创建输入DStreams来定义输入源
- 通过对DSteams应用Transformation和Action操作来定义流相关操作
- 使用streamingContext.start()来开启数据接收以及处理
- 使用streamingContext.awaitTermination()来等待数据处理停止(主动停止或者错误发生)
- 使用StreamContext.stop()来手动停止数据处理
注意事项:
- 一旦一个StreamingContext被启动了,就无法再继续向它添加新的计算规则了
- 一旦一个StreamingContext被启动了,它就无法再被重新启动
- 同一个时间段,只能有一个StreamContext存在与JVM中
- 对StreamingContext执行stop操作,同样也适用于SparkContext。如果只想停止StreamContext,那么请使用stopSparkContext
- 一个SparkContext可以被多次使用来创建多个StreamingContext,只要之前的StreamingContext已经被停止了。
2. DStream
Discretized Stream 或者 DStream 是Spark Streaming提供的基础抽象对象。它代表了连续的数据流,可以是从数据源中获取到的数据流,也可以是通过对输入流进行Transformation操作之后得到的处理后的数据流。在Spark内部,DStream代表的是一个RDD序列。DStream中的每个RDD包含了数据流按照指定时间间隔分割后的某一段。如下图所示:对DStream的Transformation操作与RDD类似,并且在普通RDD上的Transformation操作大多数在DStream上也可用。
这里再补充一些Transformation操作:
transform操作
Transform允许在DStream 上执行任意的 RDD-to-RDD 函数,即使这些函数并没有在 DStream 的 API 中暴露出来,通过该函数可以方便的扩展 Spark API。该函数每一批次调度一次。其实也就是对 DStream 中的 RDD 应用转换。join操作
两个流之间的 join 需要两个流的批次大小一致,这样才能做到同时触发计算。计算过程就是 对当前批次的两个流中各自的 RDD 进行 join,与两个 RDD的join效果相同。除此之外,也可以同样对DStream使用leftOuterJoin, rightOuterJoin, fullOuterJoin等操作。-
UpdateStateByKey操作
UpdateStateByKey 原语用于记录历史记录,有时,我们需要在 DStream中跨批次维护状态(例 如流计算中累加 wordcount)。针对这种情况,updateStateByKey()为我们提供了对一个状态变量的访问,用于键值对形式的 DStream。给定一个由(键,事件)对构成的 DStream,并传递一个指定如何根据新的事件更新每个键对应状态的函数,它可以构建出一个新的 DStream,其内部数据为(键,状态) 对。UpdateStateByKey允许开发者保存任意信息,并且根据新的数据对信息进行持续更新。要想使用这个功能,分为以下两步:- 定义状态: 状态可以是任意类型的
- 定义状态更新函数: 定义一个函数,它来决定如何根据历史数据和从输入流中得到的新数据来对状态进行更新
在每个批次中,Spark都会对所有的key应用状态更新函数,不管这些key在一个批次中是否有数据到来。如果更新函数返回None,那么这个key-value将会被删除。
注意:使用 updateStateByKey 需要对检查点目录进行配置,因为这个操作会使用检查点来保存状态。
-
Window操作
Spark Streaming也提供了windowed操作,它允许开发者在一个滑动窗口上应用Transformation操作。示意图如下:
- 窗口长度: 即窗口的持续时间,即窗口包含的时间单位,上图中是3
- 滑动间隔时间: 即执行窗口操作的周期,即窗口每滑动多少个时间单位就执行一次窗口操作,上图中是2
注意:
这两个参数必须是源DStream批处理周期的整数倍。
关于Window操作还有以下方法:
(1)window(windowLength, slideInterval): 基于对源 DStream 窗化的批次进行计算返回一个新的 Dstream
(2)countByWindow(windowLength, slideInterval): 返回一个滑动窗口计数流中的元素个数
(3)reduceByWindow(func, windowLength, slideInterval): 通过使用自定义函数整合滑动区间流元素来创建一个新的单元素流
(4)reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]): 当在一个(K,V)对的 DStream 上调用此函数,会返回一个新(K,V)对 DStream,此处通过对滑动窗口中批次数据使用 reduce 函数来整合每个 key 的 value 值。
3. Input DStream
Input DStream是代表从原始数据流源头得到的数据流。每个Input DStream都有一个Receiver对象与之搭配,Receiver对象负责从源头获取数据流然后保存在Spark内存中用于处理。Spark Streaming的内置数据源有两种分类,如下:
- 基础源:这些数据源直接在SparkContext中可用,比如内置的文件系统和Socket连接
- 高级数据源:比如像 Kafka, Flume, Kinesis等,这些数据源需要通过单独的工具程序类获取
4. Output DStream
输出操作允许将DStream的数据写入到外部的系统中,比如数据库,文件系统。与RDD中的惰性求值类似,OutPut操作执行的时候,才会触发所有应用在DStream上的Transformation操作的实际执行。输出操作一般有:
- print()
在运行流程序的驱动结点上打印DStream中每一批次数据的最开始10个元素。这用于开发和调试. - saveAsTextFiles(prefix, [suffix])
以 text 文件形式存储这个 DStream 的内容。每一批次的存储文件名基于参数中的 prefix 和 suffix。 - saveAsObjectFiles(prefix, [suffix])
以 Java 对象序列化的方式将 Stream 中的数据保存为SequenceFiles . 每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]". - saveAsHadoopFiles(prefix, [suffix])
将 Stream 中的数据保存为 Hadoop files. 每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]" - foreachRDD(func)
这是最通用的输出操作,即将函数 func 用于产生于 stream 的每一个RDD。其中参数传入的函数 func 应该实现将每一个 RDD 中数据推送到外部系统,如将RDD 存入文件或者通过网络将其写入数据库。
5. Caching
DStream可以允许开发者在内存中缓存或者持久化流数据,如果DStream中的数据会被多次计算的话,这通常是很有用的一种做法。可以通过调用DStream的persist()方法来实现缓存操作。对于从网络中接收到的数据流,比如Kafka,Flume,Socket等,默认的数据持久化级别是将数据复制两份,然后存储到两个节点中以实现容错机制。
6. CheckPoint
一个流程序必须24/7全天候运行,因此必须能够抵抗与应用程序逻辑无关的故障,比如系统错误、JVM崩溃等。为了实现这种功能,Spark Streaming需要保存足够的信息到容错存储系统中,以便能够从故障中恢复。CheckPoint有两种类型:
- Metadata checkpointing
- Data checkpointing
简单描述一下,Metadata checkpointing 主要是用于从驱动错误等中恢复;对于某些有状态的Transformation操作,如果期间出现了错误,可以使用Data checkpointing从错误中恢复。
如果应用程序有以下要求,那么必须使用Checkpointing技术:
- 使用了有状态的Transformation操作
比如使用了updateStateByKey或者reduceByKeyAndWindow等操作,那么就必须提供checkpoint的存储路径,以便允许定期执行RDD checkpointing操作,从而保存相应的信息。 - 需要从应用程序的驱动错误中恢复
对于普通的流式程序,即没有执行带状态的Transformation操作,那么无需打开checkpointing操作。
通过在容错的、可靠的文件系统上设置一个检查点目录即可启动Spark的CheckPointing功能,即可以保存检查点的信息,以便令程序可以从错误中恢复。可以通过执行streamingContext.checkpoint(checkpointDirectory)来实现这个功能。如果应用程序要使用带状态的Transformation操作,这一步是必须的。创建Checkpointing的示意代码如下:
// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...) // new context
val lines = ssc.socketTextStream(...) // create DStreams
...
ssc.checkpoint(checkpointDirectory) // set checkpoint directory
ssc
}
// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...
// Start the context
context.start()
context.awaitTermination()
如果checkpointDirectory目录存在,那么context会从checkpoint数据中重建;如果目录不存在,那么函数functionToCreateContext会被调用以创建一个新的context对象,并且设置DStream。