Spark InsertIntoHiveTable如何commit结果数据

在maintain我们的daily spark jobs时,发现有的时候一些spark jobs在insert数据到hive table时会在所有tasks完成后hang住很长一段时间后整个job才结束。经过一些调查分析后,我们发现这段时间里,spark是在把.hive-staging_hive*/-ext-10000目录里的文件一个一个地move到hive table的location目录下,由于我们一些spark job生成的hive表的文件数据比较多(数万个)。正常情况下,这也不是什么大问题,这个moving过程总共也就消耗几分至十几分钟。但是,当namenode的负载特别高时,这个moving过程可能持续一个或几个小时,这就有点接受不了了。

问题

要解决问题,先要搞清楚问题的来龙去脉, 本文的目的就是为了搞清楚以下问题:

1. spark job是怎么commit每个write task的?

2. spark job是怎么commit整个write job的?

3. .hive-staging_hive*目录是在什么时候被move到hive table的location目录下的?

为了搞清楚以上问题,我们查阅了spark源码(版本为2.3.0)。

注意,本文的目的是讲清楚在我们遇到的scenario下,Spark InsertIntoHiveTable是如何commit结果数据的,对于其他不同配置导致的不同scenario,请读者自行阅读源码(文中会对我们的配置情况稍加说明)。

对于如何解决上面提到的hang住一个或几个小时的问题,最好的解决方案还是保证namenode的正常响应速度,在正常情况情况下,以上问题影响不大。

至于如何通过修改commit过程使得最终数据文件的moving耗时更短,读者可以在了解了commit具体过程后加以思考。

committer的生成

InsertIntoHive的run方法会调用其processInsert方法进行处理,processInsert会做一些validation和准备工作,然后会调用SaveAsHiveFile.saveAsHiveFile方法,saveAsHiveFile也会做一些准备工作,然后会生成一个FileCommitProtocol类型的committer对象:

SaveAsHiveFile.saveAsHiveFile :生成committer对象

这里使用了反射生成commiter对象:

instantiate a FileCommitProtocol instance

这里的className就是前面SaveAsHiveFile.saveAsHiveFile中的sparkSession.sessionState.conf.fileCommitProtoclClass, 这个值由参数spark.sql.sources.commitProtocolClass控制, 默认为SQLHadoopMapReduceCommitProtocol : 

spark.sql.sources.commitProtocolClass

SQLHadoopMapReduceCommitProtocol继承自HadoopMapReduceCommitProtocol, 只是重写了其setupCommitter方法,setupCommitter方法的作用是生成HadoopMapReduceCommitProtocol内部的committer对象,而这个对象是OutputCommitter类型,其commitTask和commitJob方法会被用于commit每个task的结果和整个job的结果。

注意,这里其实是有两层committer,一层是HadoopMapReduceCommitProtocol内部的committer,另一层就是HadoopMapReduceCommitProtocol本身。在HadoopMapReduceCommitProtocol的commitTask和commitJob方法中都会直接或间接地调用其内部commiter对象的对应方法:

Call directly inner committer.commitJob in HadoopMapReduceCommitProtocol.commitJob
Call indirectly inner committer.commitTask in HadoopMapReduceCommitProtocol.commitTask

因为在我们的case中主要是依靠内部committer来进行结果数据文件的moving,所以本文主要关注内部committer(也就是OutputCommitter)的commit行为,对于HadoopMapReduceCommitProtocol层的commit行为,读者可阅读HadoopMapReduceCommitProtocol的commitTask, commitJob, newTaskTempFile, newTaskTempFileAbsPath等方法,可以结合FileFormatWriter中的DynamicPartitionWriteTask和SingleDirectoryWriteTask的newOutputWriter方法一起阅读。

task的commit

前面讲到,我们主要看OutputCommitter的commit行为,OutputCommitter是一个抽象类,我们主要来看一下其子类org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter的实现。

FileOutputCommitter的commitTask方法将单个task attempt生成的结果数据文件move到指定的committedTaskPath  或者 outputPath

FileOutputCommitter.commitTask

注意,commitTask这里的行为受变量algorithmVersion控制,而这个变量的值由参数mapreduce.fileoutputcommitter.algorithm.version控制,可选值为1和2. 在我们的scenario下,该值为2,所以最终调用了mergePaths方法把task attempt的输出目录中的数据文件都move到outputPath下面。

这里的outputPath就是对应的.hive-staging_hive*下的目录, 比如:/path/to/table/location/.hive-staging_hive_2020-05-03_16-14-42_568_1802626970789985228-1/-ext-10000. 而task attempt的输出目录是.hive-staging_hive*下的针对每个task attempt创建的临时目录,比如:/path/to/table/location/.hive-staging_hive_2020-05-03_16-14-42_568_1802626970789985228-1/-ext-10000/_temporary/0/_temporary/attempt_20200503162201_0005_m_000065_0.

FileOutputCommitter.mergePaths的功能就是把源路径(可以是目录,也可以是文件)的所有文件都move到目标路径下,如果源和目标有冲突,则以源覆盖目标,可以看看这个函数的code,其实就是个递归实现。

至此,我们知道了,对于InsertIntoHiveTable的每个task,在它执行完后都会把自己的结果文件从task attempt的临时folder移到.hive-staging_hive*/-ext-10000中来。每个task都只负责move自己生成的数据文件,这个过程也是各个task并行进行的。

job的commit

上面讲述了单个task如何commit自己的数据文件,那么当一个job的所有task都完成commit后,这个job的commit又做了些什么呢?

FileOutputCommitter.commitJob

主要逻辑在commitJobInternal中实现:

FileOutputCommitter.commitJobInternal

对于algorithmVersion为2的情况,因为在FileOutputCommitter.commitTask方法中已经调用mergePaths将task生成的数据文件merge到了.hive-staging_hive_*/-ext-10000下面,所以在commitJob中,对于algorithmVersion为2的情况,只需要清理_temporary目录并创建_SUCCESS的marker文件。

.hive-staging_hive目录中文件的moving

如上文所述,InsertIntoHive的processInsert会调用SaveAsHiveFile.saveAsHiveFile进行hive 文件的写入,写入的文件最终都会commit到.hive-staging_hive*/-ext-10000目录中,那么.hive-staging_hive*目录又是怎么被move到hive table的location目录下的呢?这个工作是在processInsert方法调用完SaveAsHiveFile.saveAsHiveFile之后,再通过调用org.apache.hadoop.hive.ql.metadata.Hive的loadDynamicPartitions方法完成的。

总结

通过上述分析,我们知道了:

1. 对于spark的InsertIntoHiveTable,结果rdd的每个partition的数据都有相应的task负责数据写入,而每个task都会在目标hive表的location目录下的.hive-staging_hive*/-ext-10000目录中创建相应的临时的staging目录,当前task的所有数据都会先写入到这个staging目录中;

2. 当单个task写入完成后,会调用FileOutputCommitter.commitTask把task的staging目录下的数据文件都move到.hive-staging_hive*/-ext-10000下面,这个过程就是单个task的commit

3. 当一个spark job的所有task都执行完成并commit成功后,spark会调用FileOutputCommitter.commitJob把临时的staging目录都删除掉,并创建_SUCCESS标记文件

4. 当spark成功将数据都写入到staging_hive*/-ext-10000中 (也就是commitJob成功后),spark会调用hive的相应API把数据文件都move到目标hive表的location目录下,并更新hive meta data以enable新的hive partition

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,277评论 6 503
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,689评论 3 393
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,624评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,356评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,402评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,292评论 1 301
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,135评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,992评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,429评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,636评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,785评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,492评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,092评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,723评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,858评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,891评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,713评论 2 354