[SPARK][CORE] 面试问题之什么是 external shuffle service?

欢迎关注公众号“Tim在路上”
在讨论external shuffle service的具体实现之前,我们先来回顾下spark shuffle的大概过程。

spark shuffle分为两部分shuffle write和shuffle read。

在map write端,对每个task的数据,不管是按key hash还是在数据结构里先聚合再排序,最终都会将数据写到一个partitionFile里面,在partitionFile里面的数据是partitionId有序的,外加会生成一个索引文件,索引包含每个partition对应偏移量和长度。

而reduce read 端就是从这些partitionFile里面拉取相应partitionId的数据, 然后再进行聚合排序。

现在我们在来看下****external shuffle service(ESS)****,其乍从其名字上看,ESS是spark分布式集群为存储shuffle data而设计的分布式组件。但其实它只是Spark通过Executor获取Shuffle data块的代理。

我们可以理解为ESS负责管理shuffle write端生成的shuffle数据,ESS是和yarn一起使用的, 在yarn集群上的每一个nodemanager上面都运行一个ESS,是一个常驻进程。一个ESS管理每个nodemanager上所有的executor生成的shuffle数据。总而言之,ESS并不是分布式的组件,它的生命周期也不依赖于Executor。

为什么需要ESS ?

在Spark中,Executor进程除了运行task,还要负责写shuffle 数据,以及给其他Executor提供shuffle数据。当Executor进程任务过重,导致GC而不能为其他Executor提供shuffle数据时,会影响任务运行。同时,ESS的存在也使得,即使executor挂掉或者回收,都不影响其shuffle数据,因此只有在ESS开启情况下才能开启动态调整executor数目。

因此,spark提供了external shuffle service这个接口,常见的就是spark on yarn中的,YarnShuffleService。这样,在yarn的nodemanager中会常驻一个externalShuffleService服务进程来为所有的executor服务,默认为7337端口。

其实在spark中shuffleClient有两种,一种是blockTransferService,另一种是externalShuffleClient。如果在ESS开启,那么externalShuffleClient用来fetch shuffle数据,而blockTransferService用于获取broadCast等其他BlockManager保存的数据。

如果ESS没有开启,那么spark就只能使用自己的blockTransferService来拉取所有数据,包括shuffle数据以及broadcast数据。

ESS的架构与优势

在启用ESS后,ESS服务会在node节点上创建,并且每次存在时,新创建的Executor都会向其注册。

/** Registers a new Executor with all the configuration we need to find its shuffle files. */
public void registerExecutor(
    String appId,
    String execId,
    ExecutorShuffleInfo executorInfo) {

在注册过程中,使用appId, execId和ExecutorShuffleInfo(localDirs, shuffleManager类型)作为参数,从参数信息可以看出Executor会通知ESS服务它创建在磁盘上文件的存储位置。由于这些信息,ESS服务守护进程能够在检索过程中将shuffle中间的临时文件返回给其他执行程序。

ESS服务的存在也会影响文件删除。在正常情况下(没有外部 shuffle 服务),当Executor停止时,它会自动删除生成的文件。但是启用ESS服务后,Executor关闭后文件不会被清理。以下架构图说明了启用外部 shuffle 服务时工作程序节点上发生的情况:

ed.png

ESS服务的一大优势是提高了可靠性。即使其中一个 executor 出现故障,它的 shuffle 文件也不会丢失。另一个优点是可扩展性,因为在 Spark 中运行动态资源分配需要ESS服务,这块我们后续在进行介绍。

总之使用Spark ESS 为 Spark Shuffle 操作带来了以下好处:

  1. 即使 Spark Executor 正在经历 GC 停顿,Spark ESS 也可以为 Shuffle 块提供服务。
  2. 即使产生它们的 Spark Executor 挂了,Shuffle 块也能提供服务。
  3. 可以释放闲置的 Spark Executor 来节省集群的计算资源。

ESS源码初探

Executors 通过 RPC 协议与ESS服务通信,发送两种类型的消息:RegisterExecutorOpenBlocks。当Executor想要在其local external shuffle service中注册时,使用RegisterExecutor, OpenBlocks在获取shuffle data过程中使用。

在Executor创建的时候,会调用env.blockManager.initialize(conf.getAppId),在blockManager存储当前node的externalBlockStoreClient ,在其initialize方法中执行blockStoreClient.init(appId),这里的blockStoreClient称为shuffleClient(这里是ExternalShuffleClient)在shuffleClient 只是将一些实用对象设置为工厂来创建远程连接。

稍后 BlockManager 调用registerWithShuffleServer方法,这时ESS shuffle 服务会知道executor 存储 shuffle 文件的位置。

// blockManager类
// [1] executor 向ESS注册
// Register Executors' configuration with the local shuffle service, if one should exist.
if (externalShuffleServiceEnabled&& !blockManagerId.isDriver) {
  registerWithExternalShuffleServer()
}

   // [2] 封装localDirs, shuffle data的位置信息
    val shuffleConfig = new ExecutorShuffleInfo(
      diskBlockManager.localDirsString,
      diskBlockManager.subDirsPerLocalDir,
      shuffleManagerMeta)

 // [3] 向ESS发送RegisterExecutor消息
    try (TransportClient client = clientFactory.createClient(host, port)) {
      ByteBuffer registerMessage = new RegisterExecutor(appId, execId, executorInfo).toByteBuffer();
      // 
      client.sendRpcSync(registerMessage, registrationTimeoutMs);
    }

最终会将其存放在ESS维护的executors列表中,它是以下数据结构ConcurrentMap<AppExecId, ExecutorShuffleInfo> 。

接下来我们来分析下,reducer如何通过ESS来获取shuffle数据块。

获取shuffle block的请求在ExternalShuffleClient的fetchBlocks方法中生成。获取的过程使用RetryingBlockFetcher实例,它可以在失败时重试获取块。实际上,获取过程最终是由OneForOneBlockFetcher类实现的,它负责发送请求以检索所需的块。

RetryingBlockTransferor.BlockTransferStarter blockFetchStarter =
    (inputBlockId, inputListener) -> {
      // Unless this client is closed.
      if (clientFactory != null) {
        assert inputListener instanceof BlockFetchingListener :
          "Expecting a BlockFetchingListener, but got " + inputListener.getClass();
        TransportClient client = clientFactory.createClient(host, port, maxRetries > 0);
        new OneForOneBlockFetcher(client, appId, execId, inputBlockId,
          (BlockFetchingListener) inputListener, transportConf, downloadFileManager).start();
      } else {
        logger.info("This clientFactory was closed. Skipping further block fetch retries.");
      }
    };

可以看到这里的代码和我们在shuffle reader中讲解的是一致的。

  1. 首先,请求获取(block id, chunks 数)组成的键值对。
  2. 其次,请求获取chunks 块的具体内容。

下面我们再来总结下chunks块获取的详细流程:


U2led.png

chunks块的获取有两种模式,分别是流模式或批处理模式。

流模式操作是通过 TransportClient 的stream方法实现的。它包括向TransportRequestHandler的实例发送 StreamRequest 消息。处理程序通知客户端打开用于发送所需数据的 TCP 连接,然后传输发生在整个连接中,在单个 TCP 连接中向客户端发送所需的数据。

批处理模式操作是使用 TransportClient 的fetchChunk方法实现的该请求方法包含要获取的block的索引。处理程序只向客户端返回这个特定的数据块,所以它是每个请求响应一个块。

ESS的配置与使用

ESS shuffle 服务的配置以spark.shuffle.service前缀开头:

  • spark.shuffle.service.enabled - 定义ESS服务是否启用。
  • spark.shuffle.service.port - 定义运行ESS shuffle 服务的端口。由于该服务应该与执行程序在同一节点上运行,因此配置中不存在主机。
  • spark.shuffle.service.index.cache.size - 确定缓存的大小。在开启ESS shuffle 服务情况下,用于缓存存储索引文件信息。它避免了每次获取块时打开/关闭这些文件。主要用于基于排序的 shuffle 数据。

学完External Shuffle Service,下面是一些思考题:

  1. External Shuffle Service的优势是什么?shuffle data是否被存储在ESS中?
  2. 为什么在Spark动态资源分配时需要ESS服务?
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 197,000评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,825评论 2 374
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 144,055评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,766评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,639评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,453评论 1 275
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,860评论 3 388
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,492评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,783评论 1 293
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,817评论 2 314
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,624评论 1 328
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,439评论 3 316
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,872评论 3 300
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,075评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,372评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,868评论 2 343
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,075评论 2 338

推荐阅读更多精彩内容