SparkEnv 与 RpcEnv
SparkEnv 保存着 Application 运行时的环境信息,包括 RpcEnv、Serializer、Block Manager 和 ShuffleManager 等,并为 Driver 端和 Executor 端分别提供了不同的创建方式。
RpcEnv 维持着 Spark 节点间的通信,并负责将传递过来的消息转发给监听者(通信端)。
SparkEnv 在 Spark-Core 的 org.apache.spark 包下。
SparkEnv
在 SparkContext 对 SparkEnv 进行初始化的时候调用了 SparkContext.createSparkEnv() 方法,在 SparkContext 概览中提到过:
// 这两行代码在 SparkContext 中
_env = createSparkEnv(_conf, isLocal, listenerBus)
SparkEnv.set(_env)
我们以 SparkContext.createSparkEnv() 作为入口,看看 SparkEnv 到底做了些什么:
private[spark] def createSparkEnv(
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus): SparkEnv = {
// 只有这一行语句
SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master))
}
SparkEnv.createDriverEnv() 是创建 Driver 端 SparkEnv 的起始点:
private[spark] def createDriverEnv(
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus,
numCores: Int,
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
// 从 SparkConf 中获取 Driver 的相关信息
val bindAddress = conf.get(DRIVER_BIND_ADDRESS)
val advertiseAddress = conf.get(DRIVER_HOST_ADDRESS)
val port = conf.get("spark.driver.port").toInt
val ioEncryptionKey = if (conf.get(IO_ENCRYPTION_ENABLED)) {
Some(CryptoStreamUtils.createKey(conf))
} else {
None
}
// SparkEnv 为 Driver 端和 Executor 端提供了不同的创建方式
// 但是最终都会调用这个方法
create(
conf,
SparkContext.DRIVER_IDENTIFIER,
bindAddress,
advertiseAddress,
port,
isLocal,
numCores,
ioEncryptionKey,
listenerBus = listenerBus,
mockOutputCommitCoordinator = mockOutputCommitCoordinator
)
}
我们顺便看一下 SparkEnv.createExecutorEnv() 方法:
private[spark] def createExecutorEnv(
conf: SparkConf,
executorId: String,
hostname: String,
port: Int,
numCores: Int,
ioEncryptionKey: Option[Array[Byte]],
isLocal: Boolean): SparkEnv = {
// 直接调用了 create() 方法
val env = create(
conf,
executorId,
hostname,
hostname,
port,
isLocal,
numCores,
ioEncryptionKey
)
SparkEnv.set(env)
env
}
这两种创建方式最终都调用了 SparkEnv.create() 方法,接下来我们看看它的实现细节:
private def create(
conf: SparkConf,
executorId: String,
bindAddress: String,
advertiseAddress: String,
port: Int,
isLocal: Boolean,
numUsableCores: Int,
ioEncryptionKey: Option[Array[Byte]],
listenerBus: LiveListenerBus = null,
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
// 是否为 Driver 端
// 可以看出 Driver 端也是在一个 Executor 中运行的
val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER
// RPC 环境,注意下这个
val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port, conf,
securityManager, clientMode = !isDriver)
// 对 conf 参数进行设置
if (isDriver) {
conf.set("spark.driver.port", rpcEnv.address.port.toString)
} else if (rpcEnv.address != null) {
conf.set("spark.executor.port", rpcEnv.address.port.toString)
}
// 默认使用的是 Java 序列化
// 可以设置 Kryo
val serializer = instantiateClassFromConf[Serializer](
"spark.serializer", "org.apache.spark.serializer.JavaSerializer")
// 序列化管理器
val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey)
// 注册或查找 Endpoint
def registerOrLookupEndpoint(
name: String, endpointCreator: => RpcEndpoint):
RpcEndpointRef = {
if (isDriver) {
logInfo("Registering " + name)
rpcEnv.setupEndpoint(name, endpointCreator)
} else {
RpcUtils.makeDriverRef(name, conf, rpcEnv)
}
}
// 广播器 Manager
val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
// Map 输出跟踪器
val mapOutputTracker = if (isDriver) {
new MapOutputTrackerMaster(conf, broadcastManager, isLocal)
} else {
new MapOutputTrackerWorker(conf)
}
// 注册 Map 输出跟踪器通信端
// 通信端负责监听并处理节点间传递过来的消息
mapOutputTracker.trackerEndpoint = registerOrLookupEndpoint(MapOutputTracker.ENDPOINT_NAME,
new MapOutputTrackerMasterEndpoint(
rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))
val shortShuffleMgrNames = Map(
"sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
"tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
// 默认使用 SortShuffleManager
val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
val shuffleMgrClass =
shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)
// 默认使用通用内存 Manager
val memoryManager: MemoryManager =
if (useLegacyMemoryManager) {
new StaticMemoryManager(conf, numUsableCores)
} else {
UnifiedMemoryManager(conf, numUsableCores)
}
val blockManagerPort = if (isDriver) {
conf.get(DRIVER_BLOCK_MANAGER_PORT)
} else {
conf.get(BLOCK_MANAGER_PORT)
}
val blockTransferService =
new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress,
blockManagerPort, numUsableCores)
// 创建 BlockManagerMaster,并且向 RpcEnv 注册通信端(反过来说更好一点)
// 这个 BlockManagerMasterEndpoint 有点重要,Task 会将运行完结果存放的 Block 块信息发送给它
val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
BlockManagerMaster.DRIVER_ENDPOINT_NAME,
new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
conf, isDriver)
// 创建 BlockManager
val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager,
blockTransferService, securityManager, numUsableCores)
// 创建 OutputCommitCoordinator
val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {
new OutputCommitCoordinator(conf, isDriver)
}
// 注册 OutputCommitCoordinator 通信端
val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator",
new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator))
outputCommitCoordinator.coordinatorRef = Some(outputCommitCoordinatorRef)
// 实例化 SparkEnv
val envInstance = new SparkEnv(
executorId,
rpcEnv,
serializer,
closureSerializer,
serializerManager,
mapOutputTracker,
shuffleManager,
broadcastManager,
blockManager,
securityManager,
metricsSystem,
memoryManager,
outputCommitCoordinator,
conf)
if (isDriver) {
val sparkFilesDir = Utils.createTempDir(Utils.getLocalDir(conf), "userFiles").getAbsolutePath
envInstance.driverTmpDir = Some(sparkFilesDir)
}
envInstance
}
由此可以看出,SparkEnv 维护着RpcEnv 和 一些 Manager,这些 Manager 会将通信端注册到 RpcEnv 中,以用于监听并处理节点间传递过来的消息。
RpcEnv
RpcEnv 是通信环境,负责 Spark 节点间的通信,并将节点间传递过来的消息转发给注册过来的监听器 (通信端)。
在实例化 SparkEnv 的时候会先创建 RpcEnv, SparkEnv.create() 方法中可以找到:
val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port, conf,
securityManager, clientMode = !isDriver)
RpcEnv.create() 的实现细节:
def create(
name: String,
bindAddress: String,
advertiseAddress: String,
port: Int,
conf: SparkConf,
securityManager: SecurityManager,
clientMode: Boolean): RpcEnv = { // clientMode 为是否是 Driver 端
val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port, securityManager,
clientMode)
// 这个版本的 Spark 使用 Netty 作为通信框架
new NettyRpcEnvFactory().create(config)
}
在创建 SparkEnv 的时候,各个 Manager 会调用 registerOrLookupEndpoint() 方法进行通信端的注册,接下来我们看看 registerOrLookupEndpoin() 内部的实现细节:
def registerOrLookupEndpoint(
name: String, endpointCreator: => RpcEndpoint):
RpcEndpointRef = {
if (isDriver) {
// 在这里进行注册
rpcEnv.setupEndpoint(name, endpointCreator)
} else {
RpcUtils.makeDriverRef(name, conf, rpcEnv)
}
}
Spark 中使用 Netty 作为通信框架 (RpcEnv 唯一实现类), 我们看看 NettyRpcEnv.setupEndpoint 的内部实现细节:
override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = {
// dispatcher 是 Dispatcher 的实例化对象
dispatcher.registerRpcEndpoint(name, endpoint)
}
RpcEnv 中有一个调度者( Dispatcher ),负责将消息传递给各个端点,接下来我们需要看看 Dispatcher. registerRpcEndpoint() 的实现细节:
def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
val addr = RpcEndpointAddress(nettyEnv.address, name)
val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
synchronized {
// 略略略..
val data = endpoints.get(name)
// 使用 ConcurrentMap 存储注册过的通信端
endpointRefs.put(data.endpoint, data.ref)
receivers.offer(data)
}
endpointRef
}
我们看下 Dispatcher.postToAll() 方法,了解下 Dispatcher 是如何将 Rpc 消息转发给各个通信端的:
def postToAll(message: InboxMessage): Unit = {
val iter = endpoints.keySet().iterator()
// 将消息广播给所有监听者
// 监听器模式
while (iter.hasNext) {
val name = iter.next
// 大家可以看下内部实现细节
postMessage(name, message, (e) => { //... }
)}
}
各个通信端点在收到广播过来的消息后进行匹配,然后对这些消息进行相关的工作。
我这里以 BlockManagerMasterEndpoint 为例 (在 SparkEnv 的创建代码中可以找到),其它的通信端都是类似的。
消息类型分为需要应答和不需要应答两种(同步和异步),需要应答消息会被其 receiveAndReply() 方法接收,如果不应答就是 receive() 方法接收:
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RegisterBlockManager(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint) => // ...
case _updateBlockInfo @
UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
// .
case GetLocations(blockId) => // ...
case GetLocationsMultipleBlockIds(blockIds) => // ...
// ...
}
后面的文章中会用到 SparkEnv,到时就体现它的作用了。