一、 Hive on spark的基本架构/
1. Hive 的架构
Hive的整体架构可以分成以下几大部分:
- 用户接口 支持CLI, JDBC和Web UI
- Driver Driver负责将用户指令翻译转换成为相应的MapReduce Job
- MetaStore 元数据存储仓库,像数据库和表的定义这些内容就属于元数据这个范畴,默认使用的是Derby存储引擎
2. Hive on spark的架构
Hive on Spark总体的设计思路是,尽可能重用Hive逻辑层面的功能;从生成物理计划开始,提供一整套针对Spark的实现。
用什么算?算什么?怎么算?
计算引擎:spark
通过hive.execution.engine来设置计算引擎,该参数可选的值为mr、tez和spark。
Hadoop:mr
计算对象:以Hive的表作为RDD
将Hive的表转化为RDD以便Spark处理。本质上,Hive的表和Spark的HadoopRDD都是HDFS上的一组文件,通过InputFormat和RecordReader读取其中的数据,因此这个转化是自然而然的。
Hadoop : DSM
RDD & DSM 的对比
计算逻辑:使用Hive原语
这里主要是指使用Hive的操作符对数据进行处理。将Hive的操作符包装为Function,然后应用到RDD上。这样,我们只需要依赖较少的几种RDD的转换,而主要的计算逻辑仍由Hive提供。
Spark :Transformation
Spark为RDD提供了一系列的转换(Transformation),其中有些转换也是面向SQL的,如groupByKey、join等。但如果使用这些转换(就如Shark所做的那样),就意味着我们要重新实现一些Hive已有的功能;而且当Hive增加新的功能时,我们需要相应地修改Hive on Spark模式。
由于使用了Hive的原语,因此我们需要显式地调用一些Transformation来实现Shuffle的功能。下表中列举了Hive on Spark使用的所有转换。
repartitionAndSortWithinPartitions功能目的是提供一种MapReduce风格的Shuffle。虽然sortByKey也提供了排序的功能,但某些情况下我们并不需要全局有序,另外其使用的Range Partitioner对于某些Hive的查询并不适用。
元数据管理:HiveMetastoreCatalog
HiveMetastoreCatalog是Spark中对Hive Metastore访问的wrapper。HiveMetastoreCatalog通过调用相应的Hive Api可以获得数据库中的表及表的分区,也可以创建新的表和分区。
HiveMetastoreCatalog中会通过hive client来访问metastore中的元数据,使用了大量的Hive Api,对Hive Library依赖。
物理执行计划:Spark Task
通过SparkCompiler将Operator Tree转换为Task Tree,其中需要提交给Spark执行的任务即为SparkTask。
MapReduce:Map+Reduce的两阶段执行模式。
Spark:DAG执行模式。
DAG(Directed acyclic graph,有向无环图)
在Spark作业调度系统中,调度的前提是判断多个作业任务的依赖关系,这些作业任务之间可能存在因果的依赖关系,也就是说有些任务必须先获得执行,然后相关的依赖任务才能执行,但是任务之间显然不应出现任何直接或间接的循环依赖关系,所以本质上这种关系适合用DAG表示。
Spark之所以outperform Hadoop的关键有二:DAG scheduler和intermediate data in memory。Hadoop用的是AG而不是DAG。一个DAG可以包含多个AG。DAG除了可以提升scheduler效率之外,它同时是Spark Fault tolerance机制-Lineage 追溯的基础。
因此一个SparkTask包含了一个表示RDD转换的DAG,我们将这个DAG包装为SparkWork。执行SparkTask时,就根据SparkWork所表示的DAG计算出最终的RDD,然后通过RDD的foreachAsync来触发运算。
使用foreachAsync是因为我们使用了Hive原语,因此不需要RDD返回结果;此外foreachAsync异步提交任务便于我们对任务进行监控。
任务监控与统计信息收集:SparkListener & Spark API & Accumulator
Spark提供了SparkListener接口来监听任务执行期间的各种事件,因此我们可以实现一个Listener来监控任务执行进度以及收集任务级别的统计信息(目前任务级别的统计由SparkListener采集,任务进度则由Spark提供的专门的API来监控)。
另外Hive还提供了Operator级别的统计数据信息,比如读取的行数等。
MapReduce:Hadoop Counter
Spark:Accumulator
二、 Hive on spark的内部实现机制
参考链接:
Hive on MR:Hive SQL执行计划深度解析
Hive on spark:Hive on Spark解析
Spark SQL:sql的解析与执行
Hive on spark:hive on spark实现详解
1. Hive流程:
- 语法分析阶段,Hive利用Antlr将用户提交的SQL语句解析成一棵抽象语法树(Abstract Syntax Tree,AST)。
- 生成逻辑计划包括通过Metastore获取相关的元数据,以及对AST进行语义分析。得到的逻辑计划为一棵由Hive操作符组成的树,Hive操作符即Hive对表数据的处理逻辑,比如对表进行扫描的TableScanOperator,对表做Group的GroupByOperator等。
- 逻辑优化即对Operator Tree进行优化,与之后的物理优化的区别主要有两点:一是在操作符级别进行调整;二是这些优化不针对特定的计算引擎。比如谓词下推(Predicate Pushdown)就是一个逻辑优化:尽早的对底层数据进行过滤以减少后续需要处理的数据量,这对于不同的计算引擎都是有优化效果的。
- 生成物理计划即针对不同的引擎,将Operator Tree划分为若干个Task,并按照依赖关系生成一棵Task的树(在生成物理计划之前,各计算引擎还可以针对自身需求,对Operator Tree再进行一轮逻辑优化)。比如,对于MapReduce,一个GROUP BY+ORDER BY的查询会被转化成两个MapReduce的Task,第一个进行Group,第二个进行排序。
- 物理优化则是各计算引擎根据自身的特点,对Task Tree进行优化。比如对于MapReduce,Runtime Skew Join的优化就是在原始的Join Task之后加入一个Conditional Task来处理可能出现倾斜的数据。
- 最后按照依赖关系,依次执行Task Tree中的各个Task,并将结果返回给用户。每个Task按照不同的实现,会把任务提交到不同的计算引擎上执行。
2. Hive on spark解析SQL的过程
SQL
SQL语句在分析执行过程中会经历下图所示的几个步骤
- 语法解析
- 操作绑定
- 优化执行策略
- 交付执行
语法解析
语法解析之后,会形成一棵语法树,如下图所示。树中的每个节点是执行的rule,整棵树称之为执行策略。
策略优化
形成上述的执行策略树还只是第一步,因为这个执行策略可以进行优化,所谓的优化就是对树中节点进行合并或是进行顺序上的调整。
以大家熟悉的join操作为例,下图给出一个join优化的示例。A JOIN B等同于B JOIN A,但是顺序的调整可能给执行的性能带来极大的影响,下图就是调整前后的对比图。
在Hash Join中,首先被访问的表称之为“内部构建表”,第二个表为“探针输入”。创建内部表时,会将数据移动到数据仓库指向的路径;创建外部表,仅记录数据所在的路径。
再举一例,一般来说尽可能的先实施聚合操作(Aggregate)然后再join
这种优化自动完成,在调优时不需要考虑。
HQL
HiveContext是Spark提供的用户接口,HiveContext继承自SqlContext。
既然是继承自SqlContext,那么我们将普通sql与hiveql分析执行步骤做一个对比,可以得到下图。
Entrypoint
hiveql是整个的入口点
def hiveql(hqlQuery: String): SchemaRDD = {
val result = new SchemaRDD(this, HiveQl.parseSql(hqlQuery))
// We force query optimization to happen right away instead of letting it happen lazily like
// when using the query DSL. This is so DDL commands behave as expected. This is only
// generates the RDD lineage for DML queries, but does not perform any execution.
result.queryExecution.toRdd
result
}
上述hiveql的定义与sql的定义几乎一模一样,唯一的不同是sql中使用parseSql的结果作为SchemaRDD的入参而hiveql中使用HiveQl.parseSql作为SchemaRdd的入参。
对比:
sql函数的定义如下
def sql(sqlText: String): SchemaRDD = {
val result = new SchemaRDD(this, parseSql(sqlText))
result.queryExecution.toRdd
result
}
HiveQL, parser
parseSql的函数定义如代码所示,解析过程中将指令分成两大类:
nativecommand 非select语句,这类语句的特点是执行时间不会因为条件的不同而有很大的差异,基本上都能在较短的时间内完成
非nativecommand 主要是select语句
def parseSql(sql: String): LogicalPlan = {
try {
if (sql.toLowerCase.startsWith("set")) {
NativeCommand(sql)
} else if (sql.toLowerCase.startsWith("add jar")) {
AddJar(sql.drop(8))
} else if (sql.toLowerCase.startsWith("add file")) {
AddFile(sql.drop(9))
} else if (sql.startsWith("dfs")) {
DfsCommand(sql)
} else if (sql.startsWith("source")) {
SourceCommand(sql.split(" ").toSeq match { case Seq("source", filePath) => filePath })
} else if (sql.startsWith("!")) {
ShellCommand(sql.drop(1))
} else {
val tree = getAst(sql)
if (nativeCommands contains tree.getText) {
NativeCommand(sql)
} else {
nodeToPlan(tree) match {
case NativePlaceholder => NativeCommand(sql)
case other => other
}
}
}
} catch {
case e: Exception => throw new ParseException(sql, e)
case e: NotImplementedError => sys.error(
s"""
|Unsupported language features in query: $sql
|${dumpTree(getAst(sql))}
""".stripMargin)
}
}
哪些指令是nativecommand呢,答案在HiveQl.scala中的nativeCommands变量。
对于非nativeCommand,最重要的解析函数就是nodeToPlan
Spark对HiveQL所做的优化主要体现在Query相关的操作,其它的依然使用Hive的原生执行引擎。
3. SQL到Spark作业的转换过程
native command的执行流程
由于native command是一些非耗时的操作,直接使用Hive中原有的exeucte engine来执行即可。这些command的执行示意图如下
SparkTask的生成和执行
我们通过一个例子来看一下一个简单的两表JOIN查询如何被转换为SparkTask并被执行。下图左半部分展示了这个查询的Operator Tree,以及该Operator Tree如何被转化成SparkTask;右半部分展示了该SparkTask执行时如何得到最终的RDD并通过foreachAsync提交Spark任务。
SparkCompiler遍历Operator Tree,将其划分为不同的MapWork和ReduceWork。
MapWork为根节点,总是由TableScanOperator(Hive中对表进行扫描的操作符)开始;后续的Work均为ReduceWork。ReduceSinkOperator(Hive中进行Shuffle输出的操作符)用来标记两个Work之间的界线,出现ReduceSinkOperator表示当前Work到下一个Work之间的数据需要进行Shuffle。因此,当我们发现ReduceSinkOperator时,就会创建一个新的ReduceWork并作为当前Work的子节点。包含了FileSinkOperator(Hive中将结果输出到文件的操作符)的Work为叶子节点。
与MapReduce最大的不同在于,我们并不要求ReduceWork一定是叶子节点,即ReduceWork之后可以链接更多的ReduceWork,并在同一个SparkTask中执行。
从该图可以看出,这个查询的Operator Tree被转化成了两个MapWork和一个ReduceWork。
执行SparkTask步骤:
- 根据MapWork来生成最底层的HadoopRDD,
- 将各个MapWork和ReduceWork包装成Function应用到RDD上。
- 在有依赖的Work之间,需要显式地调用Shuffle转换,具体选用哪种Shuffle则要根据查询的类型来确定。另外,由于这个例子涉及多表查询,因此在Shuffle之前还要对RDD进行Union。
- 经过这一系列转换后,得到最终的RDD,并通过foreachAsync提交到Spark集群上进行计算。
toRdd
在logicalPlan到physicalPlan的转换过程中,toRdd最关键的元素
override lazy val toRdd: RDD[Row] =
analyzed match {
case NativeCommand(cmd) =>
val output = runSqlHive(cmd)
if (output.size == 0) {
emptyResult
} else {
val asRows = output.map(r => new GenericRow(r.split("\t").asInstanceOf[Array[Any]]))
sparkContext.parallelize(asRows, 1)
}
case _ =>
executedPlan.execute().map(_.copy())
}