https://0x0fff.com/spark-memory-management/
起源
spark提供了一种根据负载动态调整资源的的方式,意味着你的程序可以将闲置的资源释放和添加需要的资源。对多个程序共享一块资源是非常有意义的。
配置
为了使动态资源分配生效,需要注意两点:
- 在不同的模式下,设置properties文件
spark.dynamicAllocation.enabled=true
- 在每个worker node上设置一个额外的shuffle service服务,也就是添加一个类,这个类的作用是保留executor被删除前所执行任务的结果,避免重算。>
example:yarn mode
在每个node上的yarn-site.xml文件中添加一下两个属性,然后重启NodeManager
1.spark_shuffle=yarn.nodemanager.aux-services
2.yarn.nodemanager.aux-services.spark_shuffle.class=org.apache.spark.network.yarn.YarnShuffleService
.
dynamicAllocation设置
如果不配置动态分配的最大可分配executor数量,executor将会消耗完所有的executors ,因此集群的运维人员需要在cluster level对spark_user分配固定的资源。
spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.initialExecutors=minExecutors
spark.dynamicAllocation.minExecutors=0
spark.dynamicAllocation.maxExecutors=0
如果设置了--num-executors 或者spark.executor.instances,从中选取最大的作为初始executor数量。
### 资源分配策略
由于无法预测资源的使用情况,因此就需要启发式的探索
##### 请求资源
spark申请executor是轮询的方式,第一次添加一个,第二次添加2,4,8等,其实就和tcp的慢启动快增长一致。
##### 释放资源
除了executor死掉或者application执行结束,每个executor都会保存一些状态和write一些数据。如果此executor空闲的时间太长而被删除,就会导致一些运算被重新计算。因此spark提供了一种机制在删除executor之前保存当前的状态和相关数据。特别是在shuffle阶段,executor会将需要shuffle的数据映射到disk上,然后充当这些数据的server,当其他executor需要的时候就fetch过去。在一种极端的情况下,某一个task的数据特别的慢,而其他执行相同task的executor已经被删除了,这就会导致数据需要被重新计算,而这并不是我们想要的。
为了解决以上的问题,需要使用一个external shuffle service。这个服务启动了一个独立于application和executor,一旦启动了该服务,spark executor就会直接从此服务取数据。意味着此服务的生命周期比executor都长。
executor除了保存shuffle的数据,还可能cache data在内存和disk上,当executor被移除了,cache的数据就会失效,目前executor的cache的数据不会被移除。不过可以配置spark.dynamicAllocation.cachedExecutorIdleTimeout控制含有cache的executor是否被超时删除。在未来的版本中,cache data应该会被保留到off-heap中。