1. 概述
MapOutputTracker
用来跟踪中间过程Stage的输出, 为后续的shuffle过程准备好上游的数据. 这些数据的句柄由BlockManager来管理, 大小由BlockManager来估计.
在Driver端跑着MapOutputTrackerMaster
在Executor端跑着MapOutputTrackerWorker
通信使用的是一个注册到akka的RPC调用MapOutputTrackerMasterEndpoint
整个MapOutputTracker被维护在sparkEnv结构, sparkEnv结构是sparkContexty的一部分, 只要持有sc, 就可以对应的使用这个结构了.
2. MapOutputTrackerMasterEndpoint的基本概念
作为一个RPC调用, 它处理两个时间
GetMapOutputStatuses(shuffleId: Int)
得到ShuffleId, 继而判断传递的shuffle status结构序列化后是否超过了akka的最大消息体积(默认128MB)StopMapOutputTracker
停止这个MapOutputTracker
3. MapOutputTracker的Abstract对象
3.1 初始化和维护的结构
MapOutputTracker是一个abstract class, 后边两个具体实现分别是Master端和Worker端. 它们分别维护了不同的映射表, Master端需要维护的是全局的映射表, 而Worker端只维护本地的即可
初始化方法和说明如下
/**
* Class that keeps track of the location of the map output of
* a stage. This is abstract because different versions of MapOutputTracker
* (driver and executor) use different HashMap to store its metadata.
*/
private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
- Driver端维护master的RPCendpoint
/** Set to the MapOutputTrackerMasterEndpoint living on the driver. */
var trackerEndpoint: RpcEndpointRef = _
- 维护shuffleMapTask的句柄, 从这个句柄可以拿到Task, 继而拿到Task涉及的状态和Block, 继而可以获取数据或者操作数据
/**
* This HashMap has different behavior for the driver and the executors.
*
* On the driver, it serves as the source of map outputs recorded from ShuffleMapTasks.
* On the executors, it simply serves as a cache, in which a miss triggers a fetch from the
* driver's corresponding HashMap.
*
* Note: because mapStatuses is accessed concurrently, subclasses should make sure it's a
* thread-safe map.
*/
protected val mapStatuses: Map[Int, Array[MapStatus]]
- 计数锁结构
/**
* Incremented every time a fetch fails so that client nodes know to clear
* their cache of map output locations if this happens.
*/
protected var epoch: Long = 0
protected val epochLock = new AnyRef
- 记录哪些数据正处于被拉取状态
/** Remembers which map output locations are currently being fetched on an executor. */
private val fetching = new HashSet[Int]
3.2 重要的方法
- getMapSizesByExecutorId
获取shuffle block的状态和所在的位置以及大小等基本信息
/**
* Called from executors to get the server URIs and output sizes for each shuffle block that
* needs to be read from a given reduce task.
*
* @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId,
* and the second item is a sequence of (shuffle block id, shuffle block size) tuples
* describing the shuffle blocks that are stored at that block manager.
*/
def getMapSizesByExecutorId(shuffleId: Int, reduceId: Int)
: Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
getMapSizesByExecutorId(shuffleId, reduceId, reduceId + 1)
}
/**
* Called from executors to get the server URIs and output sizes for each shuffle block that
* needs to be read from a given range of map output partitions (startPartition is included but
* endPartition is excluded from the range).
*
* @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId,
* and the second item is a sequence of (shuffle block id, shuffle block size) tuples
* describing the shuffle blocks that are stored at that block manager.
*/
def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int)
: Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
logDebug(s"Fetching outputs for shuffle $shuffleId, partitions $startPartition-$endPartition")
val statuses = getStatuses(shuffleId)
// Synchronize on the returned array because, on the driver, it gets mutated in place
statuses.synchronized {
return MapOutputTracker.convertMapStatuses(shuffleId, startPartition, endPartition, statuses)
}
}
- getStatuses(shuffleId: Int)
获得shuffle output的基本的状态