SparkEnv 和 RpcEnv 源码浅析

SparkEnv 与 RpcEnv

SparkEnv 保存着 Application 运行时的环境信息,包括 RpcEnv、Serializer、Block Manager 和 ShuffleManager 等,并为 Driver 端和 Executor 端分别提供了不同的创建方式。

RpcEnv 维持着 Spark 节点间的通信,并负责将传递过来的消息转发给监听者(通信端)。

流程概览

SparkEnv 在 Spark-Core 的 org.apache.spark 包下。

SparkEnv

在 SparkContext 对 SparkEnv 进行初始化的时候调用了 SparkContext.createSparkEnv() 方法,在 SparkContext 概览中提到过:

// 这两行代码在 SparkContext 中
_env = createSparkEnv(_conf, isLocal, listenerBus)
SparkEnv.set(_env)

我们以 SparkContext.createSparkEnv() 作为入口,看看 SparkEnv 到底做了些什么:

private[spark] def createSparkEnv(
    conf: SparkConf,
    isLocal: Boolean,
    listenerBus: LiveListenerBus): SparkEnv = {
  // 只有这一行语句
  SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master))
}

SparkEnv.createDriverEnv() 是创建 Driver 端 SparkEnv 的起始点:

private[spark] def createDriverEnv(
    conf: SparkConf,
    isLocal: Boolean,
    listenerBus: LiveListenerBus,
    numCores: Int,
    mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
 
  // 从 SparkConf 中获取 Driver 的相关信息
  val bindAddress = conf.get(DRIVER_BIND_ADDRESS)
  val advertiseAddress = conf.get(DRIVER_HOST_ADDRESS)
  val port = conf.get("spark.driver.port").toInt
  val ioEncryptionKey = if (conf.get(IO_ENCRYPTION_ENABLED)) {
    Some(CryptoStreamUtils.createKey(conf))
  } else {
    None
  }
  
  // SparkEnv 为 Driver 端和 Executor 端提供了不同的创建方式
  // 但是最终都会调用这个方法
  create(
    conf,
    SparkContext.DRIVER_IDENTIFIER,
    bindAddress,
    advertiseAddress,
    port,
    isLocal,
    numCores,
    ioEncryptionKey,
    listenerBus = listenerBus,
    mockOutputCommitCoordinator = mockOutputCommitCoordinator
  )
}

我们顺便看一下 SparkEnv.createExecutorEnv() 方法:

private[spark] def createExecutorEnv(
    conf: SparkConf,
    executorId: String,
    hostname: String,
    port: Int,
    numCores: Int,
    ioEncryptionKey: Option[Array[Byte]],
    isLocal: Boolean): SparkEnv = {
  
  // 直接调用了 create() 方法
  val env = create(
    conf,
    executorId,
    hostname,
    hostname,
    port,
    isLocal,
    numCores,
    ioEncryptionKey
  )
  SparkEnv.set(env)
  env
}

这两种创建方式最终都调用了 SparkEnv.create() 方法,接下来我们看看它的实现细节:

private def create(
    conf: SparkConf,
    executorId: String,
    bindAddress: String,
    advertiseAddress: String,
    port: Int,
    isLocal: Boolean,
    numUsableCores: Int,
    ioEncryptionKey: Option[Array[Byte]],
    listenerBus: LiveListenerBus = null,
    mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {

  // 是否为 Driver 端
  // 可以看出 Driver 端也是在一个 Executor 中运行的
  val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER

  // RPC 环境,注意下这个
  val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port, conf,
    securityManager, clientMode = !isDriver)

  // 对 conf 参数进行设置
  if (isDriver) {
    conf.set("spark.driver.port", rpcEnv.address.port.toString)
  } else if (rpcEnv.address != null) {
    conf.set("spark.executor.port", rpcEnv.address.port.toString)
  }

  // 默认使用的是 Java 序列化
  // 可以设置 Kryo
  val serializer = instantiateClassFromConf[Serializer](
    "spark.serializer", "org.apache.spark.serializer.JavaSerializer")

  // 序列化管理器
  val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey)

  // 注册或查找 Endpoint
  def registerOrLookupEndpoint(
      name: String, endpointCreator: => RpcEndpoint):
    RpcEndpointRef = {
    if (isDriver) {
      logInfo("Registering " + name)
      rpcEnv.setupEndpoint(name, endpointCreator)
    } else {
      RpcUtils.makeDriverRef(name, conf, rpcEnv)
    }
  }

  // 广播器 Manager
  val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
    
  // Map 输出跟踪器
  val mapOutputTracker = if (isDriver) {
    new MapOutputTrackerMaster(conf, broadcastManager, isLocal)
  } else {
    new MapOutputTrackerWorker(conf)
  }

  // 注册 Map 输出跟踪器通信端
  // 通信端负责监听并处理节点间传递过来的消息
  mapOutputTracker.trackerEndpoint = registerOrLookupEndpoint(MapOutputTracker.ENDPOINT_NAME,
    new MapOutputTrackerMasterEndpoint(
      rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))


  val shortShuffleMgrNames = Map(
    "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
    "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
  // 默认使用 SortShuffleManager
  val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
  val shuffleMgrClass =
    shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName)
  val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

  val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)
  // 默认使用通用内存 Manager
  val memoryManager: MemoryManager =
    if (useLegacyMemoryManager) {
      new StaticMemoryManager(conf, numUsableCores)
    } else {
      UnifiedMemoryManager(conf, numUsableCores)
    }

 
  val blockManagerPort = if (isDriver) {
    conf.get(DRIVER_BLOCK_MANAGER_PORT)
  } else {
    conf.get(BLOCK_MANAGER_PORT)
  }

  val blockTransferService =
    new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress,
      blockManagerPort, numUsableCores)
  
  // 创建 BlockManagerMaster,并且向 RpcEnv 注册通信端(反过来说更好一点)
  // 这个 BlockManagerMasterEndpoint 有点重要,Task 会将运行完结果存放的 Block 块信息发送给它 
  val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
    BlockManagerMaster.DRIVER_ENDPOINT_NAME,
    new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
    conf, isDriver)

  // 创建 BlockManager
  val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
    serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager,
    blockTransferService, securityManager, numUsableCores)

  // 创建 OutputCommitCoordinator
  val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {
    new OutputCommitCoordinator(conf, isDriver)
  }
  // 注册 OutputCommitCoordinator 通信端
  val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator",
    new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator))
  outputCommitCoordinator.coordinatorRef = Some(outputCommitCoordinatorRef)

  // 实例化 SparkEnv
  val envInstance = new SparkEnv(
    executorId,
    rpcEnv,
    serializer,
    closureSerializer,
    serializerManager,
    mapOutputTracker,
    shuffleManager,
    broadcastManager,
    blockManager,
    securityManager,
    metricsSystem,
    memoryManager,
    outputCommitCoordinator,
    conf)

  if (isDriver) {
    val sparkFilesDir = Utils.createTempDir(Utils.getLocalDir(conf), "userFiles").getAbsolutePath
    envInstance.driverTmpDir = Some(sparkFilesDir)
  }

  envInstance
}

由此可以看出,SparkEnv 维护着RpcEnv 和 一些 Manager,这些 Manager 会将通信端注册到 RpcEnv 中,以用于监听并处理节点间传递过来的消息。

RpcEnv

RpcEnv 是通信环境,负责 Spark 节点间的通信,并将节点间传递过来的消息转发给注册过来的监听器 (通信端)。

在实例化 SparkEnv 的时候会先创建 RpcEnv, SparkEnv.create() 方法中可以找到:

val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port, conf,
  securityManager, clientMode = !isDriver)

RpcEnv.create() 的实现细节:

def create(
    name: String,
    bindAddress: String,
    advertiseAddress: String,
    port: Int,
    conf: SparkConf,
    securityManager: SecurityManager,
    clientMode: Boolean): RpcEnv = {  // clientMode 为是否是 Driver 端
 
  val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port, securityManager,
    clientMode)
  // 这个版本的 Spark 使用 Netty 作为通信框架
  new NettyRpcEnvFactory().create(config)
}

在创建 SparkEnv 的时候,各个 Manager 会调用 registerOrLookupEndpoint() 方法进行通信端的注册,接下来我们看看 registerOrLookupEndpoin() 内部的实现细节:

def registerOrLookupEndpoint(
    name: String, endpointCreator: => RpcEndpoint):
  RpcEndpointRef = {

  if (isDriver) {
    // 在这里进行注册
    rpcEnv.setupEndpoint(name, endpointCreator)
  } else {
    RpcUtils.makeDriverRef(name, conf, rpcEnv)
  }
      
}

Spark 中使用 Netty 作为通信框架 (RpcEnv 唯一实现类), 我们看看 NettyRpcEnv.setupEndpoint 的内部实现细节:

override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = {
  // dispatcher 是 Dispatcher 的实例化对象
  dispatcher.registerRpcEndpoint(name, endpoint)
}

RpcEnv 中有一个调度者( Dispatcher ),负责将消息传递给各个端点,接下来我们需要看看 Dispatcher. registerRpcEndpoint() 的实现细节:

def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
  val addr = RpcEndpointAddress(nettyEnv.address, name)
  val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
  synchronized {
    // 略略略..
      
    val data = endpoints.get(name)
    // 使用 ConcurrentMap 存储注册过的通信端
    endpointRefs.put(data.endpoint, data.ref)
    receivers.offer(data)
  }
  endpointRef
}

我们看下 Dispatcher.postToAll() 方法,了解下 Dispatcher 是如何将 Rpc 消息转发给各个通信端的:

def postToAll(message: InboxMessage): Unit = {
  val iter = endpoints.keySet().iterator()
  // 将消息广播给所有监听者
  // 监听器模式
  while (iter.hasNext) {
    val name = iter.next
      // 大家可以看下内部实现细节
      postMessage(name, message, (e) => { //... }
    )}
}

各个通信端点在收到广播过来的消息后进行匹配,然后对这些消息进行相关的工作。

我这里以 BlockManagerMasterEndpoint 为例 (在 SparkEnv 的创建代码中可以找到),其它的通信端都是类似的。

消息类型分为需要应答和不需要应答两种(同步和异步),需要应答消息会被其 receiveAndReply() 方法接收,如果不应答就是 receive() 方法接收:

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
    
  case RegisterBlockManager(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint) => // ...

  case _updateBlockInfo @
      UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
     // .

  case GetLocations(blockId) => // ...

  case GetLocationsMultipleBlockIds(blockIds) => // ...

  // ...
    
}

后面的文章中会用到 SparkEnv,到时就体现它的作用了。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,039评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,426评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,417评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,868评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,892评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,692评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,416评论 3 419
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,326评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,782评论 1 316
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,957评论 3 337
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,102评论 1 350
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,790评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,442评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,996评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,113评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,332评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,044评论 2 355

推荐阅读更多精彩内容