以下内容主要基于Spark2.1.0版本的Spark Streaming内容学习得到。
还是先把Maven的依赖加入进去:
Overview
Spark Streaming是core Spark API的一个扩展,对实时数据流进行scalable, high-throughput, fault-tolerant流处理。数据可以从多个来源得到,比如:Kafka,Flume,Kinesis或者TCP socket,并提供高级别的函数诸如map,reduce,join和window这样复合的算法。最终处理后的数据可以通过文件系统、数据库和实时dashboards输出。事实上还支持Spark的机器学习和图形处理算法在数据流上。
内部是按照如下方式处理的。Spark Streaming接收实时输入数据并将数据分成多个batches,然后Spark engine会产生最终的结果batches流。
Spark Streaming提供一个高级别的抽象概念:discretized stream(DStream),来代表一个连续的数据流。DStream既可以从Kafka、Flume和Kinesis输入数据流中创建,也可以从其他DStream上通过高级别的操作产生。在内部DStream是由一连串的RDDs来表示。
A Quick Example
在进一步学习Spark Streaming细节之前,可以先通过一个简单的例子看下大致的处理过程。下面这个例子就是从在TCP socket上监听数据服务器的文本数据并统计word个数。
下面逐步分解Example中的各段语句的作用。
执行过程需要建立两个终端,其中一个运行Netcat作为数据服务器,另一个正常使用spark-submit来提交Maven package出来的jar。
基础概念
Linking
在Maven中加入Spark-Streaming的依赖就不再赘述了。不过如果在代码中使用Kafka、Flume或者Kinesis则需要额外加入他们的artifact到pom.xml的依赖中。最新的版本在Maven repository中查找。
初始化StreamingContext
SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
JavaStreamingContext ssc = new JavaStreamingContext(conf,newDuration(1000));
这个初始化类似上面example中的代码,不过在集群使用中更倾向于在spark-submit的命令行中输入具体的master:Spark、Mesos或者YARN等。也就是master = args[0]之类。需要主要的是JavaStreamingContext在内部会创建一个JavaSparkContext,可以通过ssc.sparkContext访问。
batch的时长间隔是一个必须的配置入参。具体的细节可以在下面的调优章节中看到。
还可以通过已经创建的JavaSparkContext获得:
JavaSparkContext sc=...//existing JavaSparkContext
JavaStreamingContext ssc= new JavaStreamingContext(sc,Durations.seconds(1));
在创建好StreamingContext之后接下来要做的内容与example的相仿:
1、定义一个输入源——DStream
2、定义对DStream的计算:通过transformation输出DStream
3、启动接收数据并处理:streamingContext.start()
4、等待处理过程的停止信号:streamingContext.awaitTermination()
5、代码中控制停止:streamingContext.stop()
注意事项:
1、一旦Streaming context被启动,那么就不能修改或者添加任何新的DStream的transformation。
2、一旦Streaming context被停止,将不能被重启。
3、同一时刻在一个JVM上只能有一个StreamingContext在运行。
4、StreamingContext.stop()同时也停止了SparkContext。如果不想把SparkContext停止,需要在stop方法中设置可选参数ssc.stop(false);
5、SparkContext可以多次创建StreamingContext,只要在创建新的之前老的StreamingContext已经被停止,而且没有停止SparkContext。
Discretized Streams (DStreams)
DStream是一连串的RDD,其中每个RDD都包含一个时间间隔中的数据。DStream既是输入的数据流,也是transformation后的处理过的数据流。下图是DStream的表示图示:
其实对DStream的transformation操作就是对具体RDD的操作,比如前面那个example中的情况:
Input DStreams and Receivers
每一个输入的DStream(除了file stream)都与一个Receiver对象相关联。Receiver是从源接收数据并存入内存中。如果想在Application中并行接收多个Stream,那么就需要创建多个input DStream,这也将同时创建多个Receiver来接收多个数据stream。不过需要注意的一点是需要有足够的core来处理这些数据。
注意事项:
1、当在本地运行Spark Streaming程序,不要使用"local"/"local[1]"作为master参数。因为这表示只有本地的一个线程在运行这个任务,这时这个线程会运行input DStream所基于的Receiver,就没有额外的线程资源来进行数据的处理了。所以需要设置master参数为local[n],其中n > Receiver的个数。具体的如何设置master的参数参考Spark Properties
2、同样在集群中运行Streaming任务也需要设置core的个数大于Receiver的个数。不然系统就只会接收数据而不能处理数据了。
Basic Sources
在前面的例子中,我们看到通过使用ssc.socketTextStream(...)来创建一个DStream接收从TCP socket来的数据。其实除了socket,StreamingContext API还可以将文件作为数据源来创建DStream。
File Stream
为了能从任何文件系统中读取数据使用HDFS API(HDFS、S3、NFS等),DStream的创建方式为:
streamingContext.fileStream<KeyClass, ValueClass, InputFormatClass>(dataDirectory);
Spark Streaming将会监视目录dataDirectory并且处理任何新在这个目录中创建的文件(不支持子目录)。需要注意的是:
1、文件必须是统一的格式
2、文件必须是mv到这个文件目录下的(move或者rename)
3、一旦mv过去文件不能修改,所以如果一个文件被连续的修改,那么新加的内容将不会读取出来。
对于简单的text文件,有一种简单的创建DStream的方法:streamingContext.textFileStream(dataDirectory). file stream不需要运行一个Receiver,所以也不需要分配core资源。
Streams based on Custom Receivers
DStream还可以创建处理用户自定义的Receiver接收到的数据流。详见Custom Receiver Guide
Queue of RDDs as a Stream
为了用测试数据测试Spark Streaming,还可以基于一个队列的RDD创建DStream,通过方法streamingContext.queueStream(queueOfRDDs)。其中每一个RDD被认为是DStream的一个batch数据。
Advanced Sources
Kafka:Spark Streaming 2.1.0 is compatible with Kafka broker versions 0.8.2.1 or higher. See the Kafka Integration Guide for more details.
Flume:Spark Streaming 2.1.0 is compatible with Flume 1.6.0. See the Flume Integration Guide for more details.
Kinesis:Spark Streaming 2.1.0 is compatible with Kinesis Client Library 1.2.1. See the Kinesis Integration Guide for more details.
Custom Sources
Input DStreams同样可以创建为用户自定义的数据源。所有需要做的事情就是实现一个自定义的Receiver用以从用户的源中接收数据并推给Spark。详见Custom Receiver Guide
Receiver的可靠性
基于可靠性有两种数据源一种是可靠的,另一种是不可靠的。可靠的是指诸如Kafka或者Flume这种有ACK反馈的,不可靠的则没有。
可靠的Receiver:会在接收到数据后给源发送ACK
不可靠的Receiver:不会给源发送ACK
Transformations on DStreams
类似于RDD,DStream也可以通过transformation对输入的DStream进行处理。下面列出了常用的一些:
上面一些transformation将会重点介绍:
流计算往往是7*24小时不间断的,所以需要中间保存一些状态。
transform主要是对一些RDD支持的Transformation而DStream中没有支持的做扩展。也类似与对DStream的每个RDD进行操作。
Window Operations
如下图所示,Spark Streaming提供了一个窗口计算,允许在滑动窗口内的数据进行Transformation。
任何一个window operation都需要下面两个参数:
window length - 窗长
sliding interval - 窗滑动的时间间隔
还是以上面那个数单词的case举例,假如现在有个需求是每隔10s就统计下最近30s时间内的单词数目。那么代码将写为:
还有其他一些使用窗的Transformation:
Join Operations
最后需要重点说下在Spark Streaming中做多个不同类型的join是how easily
Stream-stream joins
stream之间可以非常简单的join在一起
两个window stream相join
Stream-dataset joins
下面展示下一个window stream如何和一个Dataset进行join
DStream的输出
output操作类似与RDD的action操作,只有output才会真正执行之前的Transformation操作。
Design Patterns for using foreachRDD
主要讲到实际运行时类的序列化和反序列化的设计,以及错误的使用会导致的一些问题。暂时不细说了。
DataFrame and SQL Operations
还可以在流数据中轻松使用DataFrame和SQL。首先必须要使用StreamingContext的SparkContext来创建一个SparkSession。下面就是一个将之前的word count的例子改成使用DataFrame和SQL的方式。每个RDD都被转成DataFrame,注册成一个临时的表并使用SQL。
下面是具体的代码示例:
其中
MLlib Operations
还可以轻松使用MLlib的机器学习算法。首先存在流机器学习算法(比如:Streaming Linear Regression或者Streaming KMeans等),这些算法可以同时学习流数据产生模型并把模型应用到流数据当中。除了这些算法,还有更多的算法可以从offline的历史数据中学习得到模型,然后将这些模型应用到流数据中。具体的可以看指导文档:MLlib