一、基础知识
1.SparkStreaming简介
SparkStreaming是流式处理框架,是Spark API的扩展,支持可扩展、高吞吐量、容错的实时数据流处理,实时数据的来源可以是:Kafka, Flume, Twitter, ZeroMQ或者TCP sockets,并且可以使用高级功能的复杂算子来处理流数据。例如:map,reduce,join,window 。最终,处理后的数据可以存放在文件系统,数据库等,方便实时展现。
2.SparkStreaming与Storm的区别
- Storm是纯实时的流式处理框架,SparkStreaming是准实时的处理框架(微批处理),可以通过控制间隔的时间做到实时处理。因为微批处理,SparkStreaming的吞吐量比Storm要高。
- Storm 的事务机制要比SparkStreaming的要完善。
- Storm支持动态资源调度。(spark1.2开始和之后也支持)
- SparkStreaming擅长复杂的业务处理,Storm不擅长复杂的业务处理,擅长简单的汇总型计算。
3.SparkStreaming接收数据原理
二、SparkStreaming代码
代码示例:WordCountFromSocket.scala
代码注意事项:
- 启动socket server 服务器:nc –lk 9999
- receiver模式下接受数据,local的模拟线程必须大于等于2,一个线程用来receiver用来接受数据,另一个线程用来执行job。
- Durations时间设置就是我们能接收的延迟度。这个需要根据集群的资源情况以及任务的执行情况来调节。
- 创建JavaStreamingContext有两种方式(SparkConf,SparkContext)
- 所有的代码逻辑完成后要有一个output operation类算子。
- JavaStreamingContext.start() Streaming框架启动后不能再次添加业务逻辑。
- JavaStreamingContext.stop() 无参的stop方法将SparkContext一同关闭,stop(false),不会关闭SparkContext。
- JavaStreamingContext.stop()停止之后不能再调用start。
三、SparkStreaming算子操作
1.foreachRDD
output operation类算子,必须对抽取出来的RDD执行action类算子,代码才能执行。
2.transform
transformation类算子,可以通过transform算子,拿到Dstream中的RDD,对RDD使用RDD的算子操作,但是最后要返回RDD,返回的RDD又被封装到Dstream中。
【代码示例:TransformBlackList.scala】
3.updateStateByKey
transformation类算子
(1)为SparkStreaming中每一个Key维护一份state状态,state类型可以是任意类型的,可以是一个自定义的对象,更新函数也可以是自定义的。
(2)通过更新函数对该key的状态不断更新,对于每个新的batch而言,SparkStreaming会在使用updateStateByKey的时候为已经存在的key进行state的状态更新。
使用到updateStateByKey要开启checkpoint机制和功能。
(3)多久会将内存中的数据写入到磁盘一份?
如果batchInterval设置的时间小于10秒,那么10秒写入磁盘一份。如果batchInterval设置的时间大于10秒,那么就会batchInterval时间间隔写入磁盘一份。
【代码示例:UpdateStateByKey.scala】
4.saveAsTextFiles
output operation类算子,将结果保存为文件
【代码示例:CopyFileToDirectory.scala,SaveAsTextFile.scala】
四、窗口操作
业务场景:每隔5s查看过去10s的数据
代码示例:WindowOperator.scala
假设每隔5s 1个batch,上图中窗口长度为15s,窗口滑动间隔10s。
注意:窗口长度和滑动间隔必须是batchInterval的整数倍,如果不是整数倍会检测报错。
为避免任务堆积,需要优化window窗口操作:每次拿到总和减去出去的batch,加上新进的batch。
上图的batchInterval = 1s (每隔1s生成一个batch),窗口长度5s,滑动间隔1s。
优化后的window操作要保存状态所以要设置checkpoint路径,没有优化的window操作可以不设置checkpoint路径。
五、Driver HA(Standalone或者Mesos)
因为SparkStreaming是7*24小时运行,Driver只是一个简单的进程,有可能挂掉,所以实现Driver的HA就有必要(如果使用的Client模式就无法实现Driver HA ,这里针对的是cluster模式)。Yarn平台cluster模式提交任务,AM(AplicationMaster)相当于Driver,如果挂掉会自动启动AM。这里所说的DriverHA针对的是Spark standalone和Mesos资源调度的情况下。实现Driver的高可用有两个步骤:
第一:提交任务层面,在提交任务的时候加上选项 --supervise,当Driver挂掉的时候会自动重启Driver。
第二:代码层面,使用JavaStreamingContext.getOrCreate(checkpoint路径,JavaStreamingContextFactory)
Driver中元数据包括:
1.创建应用程序的配置信息。
2.DStream的操作逻辑。
3.job中没有完成的批次数据,也就是job的执行进度。
注意:getOrCreate适用于代码逻辑不变,spark会记录下offset,可恢复数据。代码逻辑变了只能清空checkpoint目录才可执行新逻辑,解决此问题只能手动管理offset
【代码示例:SparkStreamingDriverHA.scala】