Task的失败
关于Task的状态: 6大状态
Launching:Task已经从Driver侧发送给了Executor侧(被DAGScheduler调度了,Task是通过Endpoint发出,RDD和Stage由BroadCast广播)
Running:Executor正在执行Task,尝未执行完成
Finished:Task成功的被Executor执行完成
Failed:Executor 执行Failed失败,失败原因可能是FetchFailed、ExecptionFailure等
Killed:执行Task的Executor被Killed,KilledExecutorOnhost事件会导致Executo被Killed
Lost:仅用于Mesos fine-grained调度模式(Mesos fine-grained调度模式已经不推荐使用),这种状态可不考虑
Task异常
执行Task的异常类型分别有TaskResultLost、ExecutorLostFailure、ExecptionFailure、FetchFailed、TaskCommitDenied,Task的容错处理其中重点是处理好这些错误。Task失败的异常均继承于TaskFailedReason
关于Task的失败原因总结
shuffle.FetchFailedException
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0
org.apache.spark.shuffle.FetchFailedException: Failed to connect to spark047215/192.168.47.215:50268
这种问题一般发生在有大量shuffle操作的时候,task不断的failed,然后又重执行,一直循环下去,非常的耗时。
解决办法:
- 遇到这种问题提高executor内存即可,同时增加每个executor的cpu,这样不会减少task并行度。
TaskSetManager: Lost task & TimeoutException
WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, aa.local): ExecutorLostFailure (executor lost)
WARN TaskSetManager: Lost task 69.2 in stage 7.0 (TID 1145, 192.168.47.217): java.io.IOException: Connection from /192.168.47.217:55483 closed
java.util.concurrent.TimeoutException: Futures timed out after [120 second
ERROR TransportChannelHandler: Connection to /192.168.47.212:35409 has been quiet for 120000 ms while there are outstanding requests. Assuming connection is dead; please adjust spark.network.timeout if this is wrong
原因:
- 因为网络或者gc的原因,worker或executor没有接收到executor或task的心跳反馈
解决方法:
- 提高 spark.network.timeout 的值,根据情况改成300(5min)或更高
- 配置所有网络传输的延时,如果没有主动设置以下参数,默认覆盖其属性
spark.core.connection.ack.wait.timeout
spark.akka.timeout
spark.storage.blockManagerSlaveTimeoutMs
spark.shuffle.io.connectionTimeout
spark.rpc.askTimeout or spark.rpc.lookupTimeout
OOM(内存溢出)
Driver端的OOM: 一般是使用了collect()操作将所有executor的数据聚合到driver导致。尽量不要使用collect操作即可
Executor端的OOM 更常见:
- ResultTask: 可能是数据量暴增,或缓存数据太多;
- ShuffleMapTask: shuffle数据量太大, gc频繁导致;
- 大RDD被cache()导致StorageMemory撑爆;
- 其他?
解决办法: ?
容错机制异常: OneForOneBlockFetcher
ERROR OneForOneBlockFetcher: Failed while starting block fetchesjava.io.IOException: Connection from some_port closed18/04/15 08:15:09 INFO RetryingBlockFetcher: Retrying fetch (1/30) for 20 outstanding blocks after 10000
TransportClientFactory: Found inactive connection to
原因分析:
spark.shuffle.io.maxRetries: 30
spark一直在尝试fetch那个挂了的executor上的数据,一直要尝试30次!然后还有一个对应的参数:spark.shuffle.io.retryWait=10s,这个表示两次retry之间的间隔。
发现那个executor没法连接上的时候,就想着重新建立一个连接。但毕竟那个节点已经挂了,必然一直没有回应,那就需要等待连接超时。连接超时时间很长,比如是5min
spark怎么这么傻?明明那个exector挂了,还是要做尝试。难道driver不能告知每个executor:那个挂了,不要去那里取数了,已经起了的任务就结束吧。通过查询多方资料才知道,原来早先设计的人好像没有考虑到这一点。下面是一些jira和github的issue,都是在吐槽这个问题:
https://issues.apache.org/jira/browse/SPARK-20178https://issues.apache.org/jira/browse/SPARK-20230https://github.com/apache/spark/pull/17088
连接: https://www.cnblogs.com/double-kill/p/9012383.html
解决办法:
当数据集超大时(或者是分配不均匀或者分区太少、并行度不够等导致的单个executor内存不够),会造成executor内存不够,频繁gc。
B、频繁的gc或者网络抖动,会造成数据传输超时、心跳超时等问题。
C、由于spark的重试机制,会先根据配置的时间间隔,再次去重试拉取数据。
D、超过重试次数之后,executor会被干掉,重新生成一个executor去重新执行。这样就导致了反复的remove掉executor,然后重新生成。但是任务还是不能完成
对spark的重试机制的参数进行设置(尝试次数、尝试间隔、还有各种通信超时时间)
每次尝试失败都是要等到通信超时,各种时间加起来,反复重试时间会很长
spark.shuffle.io.maxRetries: 30 #尝试次数
spark一直在尝试fetch那个挂了的executor上的数据,一直要尝试30次!
spark.shuffle.io.retryWait=10s #这个表示两次retry之间的间隔。
spark.network.timeout=300 #配置所有网络传输的延时