Driver 发送 Task 到 Executor
SparkContext初始化完毕后,执行用户编写代码
SparkPi中调用RDD的reduce,reduce中
调用 SparkContext.runJob 方法提交任务,SparkContext.runJob方法调用DAGScheduler.runJob方法
DAGScheduler中,根据rdd的Dependency生成stage,stage分为ShuffleMapStage和ResultStage两种类型,根据stage类型生成对应的task,分别是ShuffleMapTask、ResultTask,最后调用 TaskScheduler 的 submitTasks提交任务,submitTasks 是接口方法,最终实现是在 TaskSchedulerImpl 中实现。
TaskSchedulerImpl 方法中最终调用 backend.reviveOffers()
,backend 的子类为 CoarseGrainedSchedulerBackend
。其实现了 reviveOffers 方法,最终执行 launchTasks(taskDescs)
查看 launchTasks(taskDescs)
如下:
//从 executorDataMap 中取 executorData,executorData 中保存了 Executor的连接方式 RpcEndpointRef
val executorData = executorDataMap(task.executorId)
executorData.freeCores -= scheduler.CPUS_PER_TASK
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
executorDataMap中保存了Executor的连接方式,关于Executor如何注册到executorDataMap中,参考Spark 任务调度之创建Executor。
Executor接收Task:
Worker节点的CoarseGrainedExecutorBackend进程接收Driver(其实是 TaskScheduler,DAGScheduler、TaskScheduler 都是通过 SparkContext启动的,用户的jar包也就是 用户写的程序 都是属于 Driver)发送的task,交给Executor对象处理,如下
至此从RDD的action开始,至Executor对象接收任务的流程就结束了。
整理流程大致如下 :
Executor 执行 task 并返回结果
Executor的launchTask方法将收到的信息封装为TaskRunner对象,TaskRunner继承自Runnable,Executor使用线程池threadPool调度TaskRunner.
下来查看TaskRunner中run方法对应的逻辑,我将其分为:反序列化 task、运行 task、发送 result,三部分。
反序列化 task:
如上图注释,反序列化得到Task对象。
运行 task:
调用Task的run方法执行计算,Task是抽象类,其实现类有两个,ShuffleMapTask和ResultTask,分别对应shuffle和非shuffle任务。
Task的run方法调用其runTask方法执行task,我们以Task的子类ResultTask为例(ShuffleMapTask相比ResultTask多了一个步骤,使用ShuffleWriter将结果写到本地),如下:
为了说明上图中的func,我们以RDD的map方法为例,如下
至此,task的计算就完成了,task的run方法返回计算结果。
发送 result
最后调用CoarseGrainedExecutorBackend的statusUpdate方法返回result给Driver。
在 CoarseGrainedSchedulerBackend.scala 中的
class DriverEndpoint
中接收消息并处理。
从Executor接收任务,到发送结果给Driver的流程,如下 :
- 上图①所示路径,执行task任务。
- 上图②所示路径,将执行结果返回给Driver,后续Driver调用TaskScheduler处理返回结果,不再介绍。