spark 0.2版本与0.1版本相比较,RDD增加了groupBy算子,并丰富了PariRDD的算子(reduceByKey、combineByKey、groupByKey、join等),为此代码主要增加了与shuffle相关的几个模块:
- DAGScheduler
- Dependency
- Job
- Shuffle
- ShuffledRDD
- ShuffleFetcher
- Stage
- Aggregator
- Partitioner
- ShuffleFetcher
执行过程:
pairRDD.reduceByKey(func)
pairRDD在执行reduceByKey操作后生成shuffledRDD,包含以下属性:
- dependencies:由一个ShuffleDependency对象组成,包含shuffleId、父rdd、aggregator和partitioner等属性,aggregator指明如何对一个分区内数据进行聚合和如何对不同分区之间的结果进行聚合,partitioner指明如何对数据进行分区;
- splits:由多个shuffledRDDSplit对象组成;
- compute函数;
spark的任务由行动操作触发执行,由sc、dagScheduler提交job,首先生成final stage,其中stage包含四个属性:stage id、当前stage最下游的rdd、下游stage最上游rdd的shuffleDependency、当前stage的parent stages;
接着submitStage(finalStage),首先判断final stage是否有未执行的parent stage,若有则通过递归找到最上游的stage,将该stage放入running队列,然后提交其生成shuffleMapTasks,并将这些task加入该stage对应的pending task列表中,将其子stage进入wating队列中。当该stage的所有ShuffleMapTask执行完后,将该stage标注成available,然后提交其它最上游stage或者其子stage的tasks。当所有的ResultTask执行结束后,spark的job执行结束。
pair_rdd1.join(pair_rdd2)首先把两个rdd union成一个rdd,再groupByKey,再对一个key的seq(a1,a2,b1,b2,b3)结果进行组合,生成(a1,b1) (a1,b2) (a1,b3) (a2,b1) (a2,b2) (a2,b3)。
groupByKey的底层基于combineByKey实现;
action算子调用SparkContext的runJob,SparkContext再调用DAGScheduler的runJob来提交任务,首先生成finalStage,