只能说画图挺上头的 简述下这个过程:
master启动 main方法创建rpcEnv 注册 MasterEndPoint->执行onStart方法
worker启动 注册endPoint 通过rpcEnv和master完成通信 资源层建立
submit执行 注册ClientEndPoint onStart方法向master发送消息 申请Driver
master schedule方法处理submit请求 shuffle一台不忙的worker 给该worker发送消息 创建Driver进程
Driver进程执行我们的代码 创建sparkContext
sparkContext创建初始化过程中 由standaloneSchedulerBackend 注册clientEndPoint 和 DriverEndPoint
ClientEndPoint onStart方法向master发送消息 调用schedule 执行startExecutorsOnWorkers方法
startExecutorOnWorker方法对所有worker按照 用户配置校验资源 按照核心数要求 水平或垂直 分配executor
各个worker启动 executor进程 executor同时创建了sparkEnv环境 未来作为线程池计算环境
executor进程注册coarseGrainedExecutorBackend onstart方法向DriverEndPoint ask请求反向注册
driverEndpoint回复executor消息 RegisteredExecutor
executor收到消息后创建 Executor对象
Executor对象内部有launchTask队列 接受存储由coarseGrainedExecutorBackend接受的任务 有一个线程池threadPool execute执行队列中的任务
未来会由DriverEndpoint 向其发送任务
driver继续向下执行我们的代码 完成sparkContext的初始化 创建了sparkEnv 创建DAGScheduler和TaskScheduler
继续我们的代码sc.textFile创建RDD RDD不断转换 最终最后一个RDD(finalRDD)执行foreach 其内部调用了sc的runJob方法
runJob中DAGScheduler 依据finalRDD的依赖关系 对stage递归 对RDD栈遍历(shuffle进行切割创建stage) 进行DAG返回stage列表
sparkContext是计算发生的核心
sparkEnv内部有各类型的管理器 可以进行广播变量 内存管理 缓存了一些中间数据 而不是每次由shuffle读 拉取磁盘文件 提高了计算速度