欢迎关注公众号“Tim在路上”
在讨论external shuffle service的具体实现之前,我们先来回顾下spark shuffle的大概过程。
spark shuffle分为两部分shuffle write和shuffle read。
在map write端,对每个task的数据,不管是按key hash还是在数据结构里先聚合再排序,最终都会将数据写到一个partitionFile里面,在partitionFile里面的数据是partitionId有序的,外加会生成一个索引文件,索引包含每个partition对应偏移量和长度。
而reduce read 端就是从这些partitionFile里面拉取相应partitionId的数据, 然后再进行聚合排序。
现在我们在来看下****external shuffle service(ESS)****,其乍从其名字上看,ESS是spark分布式集群为存储shuffle data而设计的分布式组件。但其实它只是Spark通过Executor获取Shuffle data块的代理。
我们可以理解为ESS负责管理shuffle write端生成的shuffle数据,ESS是和yarn一起使用的, 在yarn集群上的每一个nodemanager上面都运行一个ESS,是一个常驻进程。一个ESS管理每个nodemanager上所有的executor生成的shuffle数据。总而言之,ESS并不是分布式的组件,它的生命周期也不依赖于Executor。
为什么需要ESS ?
在Spark中,Executor进程除了运行task,还要负责写shuffle 数据,以及给其他Executor提供shuffle数据。当Executor进程任务过重,导致GC而不能为其他Executor提供shuffle数据时,会影响任务运行。同时,ESS的存在也使得,即使executor挂掉或者回收,都不影响其shuffle数据,因此只有在ESS开启情况下才能开启动态调整executor数目。
因此,spark提供了external shuffle service这个接口,常见的就是spark on yarn中的,YarnShuffleService。这样,在yarn的nodemanager中会常驻一个externalShuffleService服务进程来为所有的executor服务,默认为7337端口。
其实在spark中shuffleClient有两种,一种是blockTransferService,另一种是externalShuffleClient。如果在ESS开启,那么externalShuffleClient用来fetch shuffle数据,而blockTransferService用于获取broadCast等其他BlockManager保存的数据。
如果ESS没有开启,那么spark就只能使用自己的blockTransferService来拉取所有数据,包括shuffle数据以及broadcast数据。
ESS的架构与优势
在启用ESS后,ESS服务会在node节点上创建,并且每次存在时,新创建的Executor都会向其注册。
/** Registers a new Executor with all the configuration we need to find its shuffle files. */
public void registerExecutor(
String appId,
String execId,
ExecutorShuffleInfo executorInfo) {
在注册过程中,使用appId, execId和ExecutorShuffleInfo(localDirs, shuffleManager类型)作为参数,从参数信息可以看出Executor会通知ESS服务它创建在磁盘上文件的存储位置。由于这些信息,ESS服务守护进程能够在检索过程中将shuffle中间的临时文件返回给其他执行程序。
ESS服务的存在也会影响文件删除。在正常情况下(没有外部 shuffle 服务),当Executor停止时,它会自动删除生成的文件。但是启用ESS服务后,Executor关闭后文件不会被清理。以下架构图说明了启用外部 shuffle 服务时工作程序节点上发生的情况:
ESS服务的一大优势是提高了可靠性。即使其中一个 executor 出现故障,它的 shuffle 文件也不会丢失。另一个优点是可扩展性,因为在 Spark 中运行动态资源分配需要ESS服务,这块我们后续在进行介绍。
总之使用Spark ESS 为 Spark Shuffle 操作带来了以下好处:
- 即使 Spark Executor 正在经历 GC 停顿,Spark ESS 也可以为 Shuffle 块提供服务。
- 即使产生它们的 Spark Executor 挂了,Shuffle 块也能提供服务。
- 可以释放闲置的 Spark Executor 来节省集群的计算资源。
ESS源码初探
Executors 通过 RPC 协议与ESS服务通信,发送两种类型的消息:RegisterExecutor和OpenBlocks。当Executor想要在其local external shuffle service中注册时,使用RegisterExecutor, OpenBlocks在获取shuffle data过程中使用。
在Executor创建的时候,会调用env.blockManager.initialize(conf.getAppId),在blockManager存储当前node的externalBlockStoreClient ,在其initialize方法中执行blockStoreClient.init(appId),这里的blockStoreClient称为shuffleClient(这里是ExternalShuffleClient)。在shuffleClient 只是将一些实用对象设置为工厂来创建远程连接。
稍后 BlockManager 调用registerWithShuffleServer方法,这时ESS shuffle 服务会知道executor 存储 shuffle 文件的位置。
// blockManager类
// [1] executor 向ESS注册
// Register Executors' configuration with the local shuffle service, if one should exist.
if (externalShuffleServiceEnabled&& !blockManagerId.isDriver) {
registerWithExternalShuffleServer()
}
// [2] 封装localDirs, shuffle data的位置信息
val shuffleConfig = new ExecutorShuffleInfo(
diskBlockManager.localDirsString,
diskBlockManager.subDirsPerLocalDir,
shuffleManagerMeta)
// [3] 向ESS发送RegisterExecutor消息
try (TransportClient client = clientFactory.createClient(host, port)) {
ByteBuffer registerMessage = new RegisterExecutor(appId, execId, executorInfo).toByteBuffer();
//
client.sendRpcSync(registerMessage, registrationTimeoutMs);
}
最终会将其存放在ESS维护的executors列表中,它是以下数据结构ConcurrentMap<AppExecId, ExecutorShuffleInfo> 。
接下来我们来分析下,reducer如何通过ESS来获取shuffle数据块。
获取shuffle block的请求在ExternalShuffleClient的fetchBlocks方法中生成。获取的过程使用RetryingBlockFetcher实例,它可以在失败时重试获取块。实际上,获取过程最终是由OneForOneBlockFetcher类实现的,它负责发送请求以检索所需的块。
RetryingBlockTransferor.BlockTransferStarter blockFetchStarter =
(inputBlockId, inputListener) -> {
// Unless this client is closed.
if (clientFactory != null) {
assert inputListener instanceof BlockFetchingListener :
"Expecting a BlockFetchingListener, but got " + inputListener.getClass();
TransportClient client = clientFactory.createClient(host, port, maxRetries > 0);
new OneForOneBlockFetcher(client, appId, execId, inputBlockId,
(BlockFetchingListener) inputListener, transportConf, downloadFileManager).start();
} else {
logger.info("This clientFactory was closed. Skipping further block fetch retries.");
}
};
可以看到这里的代码和我们在shuffle reader中讲解的是一致的。
- 首先,请求获取(block id, chunks 数)组成的键值对。
- 其次,请求获取chunks 块的具体内容。
下面我们再来总结下chunks块获取的详细流程:
chunks块的获取有两种模式,分别是流模式或批处理模式。
流模式操作是通过 TransportClient 的stream方法实现的。它包括向TransportRequestHandler的实例发送 StreamRequest 消息。处理程序通知客户端打开用于发送所需数据的 TCP 连接,然后传输发生在整个连接中,在单个 TCP 连接中向客户端发送所需的数据。
批处理模式操作是使用 TransportClient 的fetchChunk方法实现的。该请求方法包含要获取的block的索引。处理程序只向客户端返回这个特定的数据块,所以它是每个请求响应一个块。
ESS的配置与使用
ESS shuffle 服务的配置以spark.shuffle.service前缀开头:
- spark.shuffle.service.enabled - 定义ESS服务是否启用。
- spark.shuffle.service.port - 定义运行ESS shuffle 服务的端口。由于该服务应该与执行程序在同一节点上运行,因此配置中不存在主机。
- spark.shuffle.service.index.cache.size - 确定缓存的大小。在开启ESS shuffle 服务情况下,用于缓存存储索引文件信息。它避免了每次获取块时打开/关闭这些文件。主要用于基于排序的 shuffle 数据。
学完External Shuffle Service,下面是一些思考题:
- External Shuffle Service的优势是什么?shuffle data是否被存储在ESS中?
- 为什么在Spark动态资源分配时需要ESS服务?