-
Job
spark中对RDD的操作类型分为transformation和action,其中transformation是一种延迟执行的操作,并不会立即执行而是返回一个含有依赖关系的RDD,例如map、filter、sortBy、flatMap等操作,当调用action操作时,** spark会通过DAGScheduler构建一个job的执行拓扑,包括多个stage和task,所有的stage和task构成了这个action触发的job,最后提交到集群中执行。
Job是由stage构成的,spark中的stage只有两种,shuffleMapStage和resultStage。Job划分Stage的依据是shuffle与否(即依赖的类型),当DagScheduler进行DAG执行图构建时,遇到一个shuffle dependency会生成一个shuffle map stage,调用链最后一个shuffle reduce端将生成为result stage 也叫 final stage.
handleJobSubmitted->submitStage(finalStage)->递归调用submitStage,
如果当前提交的stage没有parent stage则直接提交taskSet,否则将当前
stage加入waiting stage列表,每当触发某些事件时
(MapStageSubmitted、TaskCompletion..)都会进行一次 waiting stages 的提交。
-
Task
task是逻辑的具体执行单元,stage由task构成,当提交stage时,DAGScheduler会根据当前stage的类型序列化出不同类型的task并进行broadcast,如果是shuffleMapStage则序列化出ShuffleMapTask,如果是resultStage则序列化出ResultTask,其中task的数量和当前stage所依赖的RDD的partition的数量是一致的,Task作用的数据单元是partition,即每个task只处理一个partition的数据。
task locallity:
partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd,
id))}.toMap
序列化task执行逻辑:
taskBinary=JavaUtils.bufferToArray(
closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
构造taskSet:
taskSet=
partitionsToCompute.map { id =>
new ShuffleMapTask(stage.id, stage.latestInfo.attemptId, taskBinary,
part, locs, stage.latestInfo.taskMetrics, properties)
提交task(包括后续的资源申请和管理):
taskScheduler.submitTasks(tasks)
-
RDD
RDD是spark计算模型的基础数据结构。RDD的容错建立在其不可变及数据血缘(RDD会保存依赖关系)的特性上,对RDD的transformation操作会产生新的RDD并在新的RDD中添加父RDD的依赖,因此如果某个RDD的某个分区数据丢失是可以通过重新提交task进行计算恢复的。
几个比较有意思的API:
- sortByKey :该Api会涉及到RangePartitioner的排序算法。
-* treeAggregate:按树形结构对数据进行多次聚合。*
-countApprox:涉及近似count算法。
-
Denpendency
dependency保证了RDD的容错性,spak中RDD依赖分为款依赖和窄依赖。
窄依赖:父RDD的每一个partition至多被子RDD的一个partition使用
宽依赖:多个子RDD的partition会依赖同一个父RDD的partition(父Rdd 的分区会被分割发送给所有的依赖子Rdd,也称为shuffle 依赖。)
wild dependency:class ShuffleDependency
narrow dependency:class OneToOneDependency
class RangeDependency
class PruneDependency
-
shuffle
shuffle是影响spark性能的核心所在,和mapreduce中的shuffle概念类似,在spark2.0中shuffleManager只有sortShuffleManager,并且在满足一定条件下可以使用Serialized sorting 即在tungsten中对其进行的优化。
ShuffleMapTask
SortShuffleManager:
UnsafeShuffleWriter
SortShuffleWriter
BypassMergeSortShuffleWriter
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <:
Product2[Any, Any]]]) :
每个RDD都有一个iterator方法,该方法首先会判断该分区数据是否存在如果不存在则进行计算
-
Tungsten
该项目主要是为了让spark计算模型能更好的利用硬件性能,主要包含三部分:
1、内存管理与二进制处理
2、cache-aware
3、代码生成
Project Tungsten: Bringing Apache Spark Closer to Bare Metal
Project Tungsten (Spark 1.5 Phase 1)