- ShuffleMapTask的runTask()方法
override def runTask(context: TaskContext): MapStatus = {
// Deserialize the RDD using the broadcast variable.
val deserializeStartTime = System.currentTimeMillis()
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
metrics = Some(context.taskMetrics)
var writer: ShuffleWriter[Any, Any] = null
try {
val manager = SparkEnv.get.shuffleManager
writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
return writer.stop(success = true).get
} catch {
case e: Exception =>
try {
if (writer != null) {
writer.stop(success = false)
}
} catch {
case e: Exception =>
log.debug("Could not stop writer", e)
}
throw e
}
}
首先得到shuffleManager,shuffleManager分为三种SortShuffleManager,HashshuffleManager,UnsafeShuffleManager。这里我们focus on UnsafeShuffleManager。得到shuffleManager后,再拿到UnsafeShuffleWriter。在调用UnsafeShuffleWriter的write()方法将数据写入shuffle文件。
- UnsafeShuffleWriter的write()方法
public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException {
boolean success = false;
try {
while (records.hasNext()) {
insertRecordIntoSorter(records.next());
}
closeAndWriteOutput();
success = true;
} finally {
if (!success) {
sorter.cleanupAfterError();
}
}
}
write()方法调用insertRecordIntoSorter()方法。
void insertRecordIntoSorter(Product2<K, V> record) throws IOException {
final K key = record._1();
final int partitionId = partitioner.getPartition(key);
serBuffer.reset();
serOutputStream.writeKey(key, OBJECT_CLASS_TAG);
serOutputStream.writeValue(record._2(), OBJECT_CLASS_TAG);
serOutputStream.flush();
final int serializedRecordSize = serBuffer.size();
assert (serializedRecordSize > 0);
sorter.insertRecord(
serBuffer.getBuf(), PlatformDependent.BYTE_ARRAY_OFFSET, serializedRecordSize, partitionId);
}
先将数据序列化,insertRecord()方法将其插入到UnsafeShuffleExternalSorter中。
- UnsafeShuffleExternalSorter的insertRecord()方法
public void insertRecord(
Object recordBaseObject,
long recordBaseOffset,
int lengthInBytes,
int partitionId) throws IOException {
// Need 4 bytes to store the record length.
final int totalSpaceRequired = lengthInBytes + 4;
if (!haveSpaceForRecord(totalSpaceRequired)) {
allocateSpaceForRecord(totalSpaceRequired);
}
final long recordAddress =
memoryManager.encodePageNumberAndOffset(currentPage, currentPagePosition);
final Object dataPageBaseObject = currentPage.getBaseObject();
PlatformDependent.UNSAFE.putInt(dataPageBaseObject, currentPagePosition, lengthInBytes);
currentPagePosition += 4;
freeSpaceInCurrentPage -= 4;
PlatformDependent.copyMemory(
recordBaseObject,
recordBaseOffset,
dataPageBaseObject,
currentPagePosition,
lengthInBytes);
currentPagePosition += lengthInBytes;
freeSpaceInCurrentPage -= lengthInBytes;
sorter.insertRecord(recordAddress, partitionId);
}
先将数据存储到page中,再在UnsafeShuffleExternalSorter中插入数据的内存寻址。在存储到page时,如果内存达到threshold,会调用allocateSpaceForRecord()分配更多内存,如果内存不够,则会spill()到磁盘。spill()函数会调用writeSortedFile()先把数据排序在落盘。
- UnsafeShuffleInMemorySorter的insertRecord()方法
public void insertRecord(long recordPointer, int partitionId) {
if (!hasSpaceForAnotherRecord()) {
if (pointerArray.length == Integer.MAX_VALUE) {
throw new IllegalStateException("Sort pointer array has reached maximum size");
} else {
expandPointerArray();
}
}
pointerArray[pointerArrayInsertPosition] =
PackedRecordPointer.packPointer(recordPointer, partitionId);
pointerArrayInsertPosition++;
}
PackedRecordPointerPackedRecordPointer对象用一个64bit的long型变量来记录数据信息:
[24 bit partition number][13 bit memory page number][27 bit offset in page]。
这些信息用来数据排序。
- UnsafeShuffleWriter的closeAndWriteOutput()方法
public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException {
boolean success = false;
try {
while (records.hasNext()) {
insertRecordIntoSorter(records.next());
}
closeAndWriteOutput();
success = true;
} finally {
if (!success) {
sorter.cleanupAfterError();
}
}
}
void closeAndWriteOutput() throws IOException {
serBuffer = null;
serOutputStream = null;
final SpillInfo[] spills = sorter.closeAndGetSpills();
sorter = null;
final long[] partitionLengths;
try {
partitionLengths = mergeSpills(spills);
} finally {
for (SpillInfo spill : spills) {
if (spill.file.exists() && ! spill.file.delete()) {
logger.error("Error while deleting spill file {}", spill.file.getPath());
}
}
}
shuffleBlockResolver.writeIndexFile(shuffleId, mapId, partitionLengths);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
}
closeAndWriteOutput()方法调用mergeSpills()方法将spilled的文件合并成一个文件,调用writeIndexFile()落盘数据索引文件。SpillInfo保存spilled文件的信息,最主要的是每个分区数据在文件中的起始位置和终止位置,这样信息助于merge。