一、Spark Streaming的数据源
对于SparkStreaming中处理的数据无论是通过内部接口获取,还是通过Kafka、Flume、以及TCP Socket等外部数据源,对于这些数据的处理,这些整个过程的数据均抽象于DStream,如下图所示:
不同的ReceiverInputDStream包含不同的流数据接收器Receiver(内部类),这些接收器继承于Receiver。在StreamingContext启动过程中,ReceiverTracker会把流数据接收器Receiver分发到Executor上,在每个Executor上由ReceiverSupervisor启动对应的Receiver。在Spark1.4及以前的版本中根据N个Receiver实例,在StreamingContext中创建一个作业,该作业包含N个任务,其创建结构如下图所示:
创建过程如下:
1、先遍历ReceiverInputDStream,通过其getReceiver获取需要启动的N个Receiver实例,然后把这些实例作为N份数据,在StreamingContext创建一个RDD实例,该实例分为N个partition,每个partition对应包含一个Receiver数据(即Receiver实例)。
2、在这里把Receiver所进行的计算定义为func函数,该函数以Receiver实例作为参数构建ReceiverSupervisorImpl实例supervisor,构造完毕后使用新线程启动该supervisor并阻塞该线程:
3、把ReceiverTracker尽可能地按照Receiver的首选位置分发到集群并启动,启动完毕后Receiver会处于阻塞状态,持续不断的接入流数据。
该Receiver分发方式在长时间的运行过程中,如果出现某个任务失败,则Spark会重新发送该任务到其他Executor进行重跑,但由于该分发过程属于随机分发,无法实现集群的负载均衡,可能会出现某Worker节点运行多个任务,而某些Worker节点却是空闲。而当该任务的失败次数超过规定的上限,会导致Receiver无法启动,针对这些问题,Spark1.5以及以后的版本,在StreamingContext中根据N个Receiver实例创建N个作业,各个作业中只包含一个任务,并加入了可插拔的Receiver分发策略,其结构如下图所示:
这样在SparkStreaming中每个Receiver都有一个作业来分发(该作业纸包含一个任务),而且对于这仅有的一个任务只有在第一次启动时,才尝试启动Receiver。如果该任务失败了,则不再尝试启动Receiver,对应的作业设置为完成状态,此时ReceiverTracker会新生成一个作业,在其他Executor尝试启动,直到成功,这样Receiver就不会受到任务失败上限而无法启动。通过这种方式,SparkStreaming中所有的Receiver总是保持活性。
可拔插的Receiver分发策略在ReceiverSchedulingPolicy 类定义,在Receiver分发之前会收集所有的InputDStream包含的所有Receiver实例和Executor,然后调用该类中scheduleReceiver方法计算每个Receiver对应的Executor。在该方法中以轮询调度方式进行分配,首先对存在首选位置的Receiver进行处理,尽可能把Receiver运行在首选位置机器进行Receiver个数最少的Executor中,接着对于没有首选位置的Receiver,则优先分配到运行Receiver个数最少的Executor中,分配完后返回调度好的Executor列表。源码如下(对该函数的功能描述):
参考内容
1、《图解Spark核心技术与案例实战》