转自:http://hbasefly.com/2017/02/16/sparksql-dataframe/
写在文章之前
本着更好地理解大数据生态圈的本意以及工作的需要,前段时间熟悉了SQL查询引擎SparkSQL、Hadoop文件格式Parquet/CarbonData、大数据基准测试标准TPCDS/TPCH等相关知识,后续将会陆续整理出相关的内容;所有分享内容都是参考相关资料完成,文中很多细节都是在阅读相关资料时的所感所悟,只希望能够及时记录下来,以免遗忘!另外,不可避免会有一些纰漏,还忘客官能够批判性阅读,讨论交流!当然,HBase相关博客还会继续更新;
SparkSQL 历史回顾
对SparkSQL了解的童鞋或多或少听说过Shark,不错,Shark就是SparkSQL的前身。2011的时候,Hive可以说是SQL On Hadoop的唯一选择,负责将SQL解析成MR任务运行在大数据上,实现交互式查询、报表等功能。就在那个时候,Spark社区的小伙伴就意识到可以使用Spark作为执行引擎替换Hive中的MR,这样可以使Hive的执行效率得到极大提升。这个思想的产物就是Shark,所以从实现功能上来看,Shark更像一个Hive On Spark实现版本。
改造完成刚开始,Shark确实比Hive的执行效率有了极大提升。然而,随着改造的深入,发现因为Shark继承了大量Hive代码导致添加优化规则等变得异常困难,优化的前景不再那么乐观。在意识到这个问题之后,Spark社区经过一段时间激烈的思想斗争之后,还是毅然决然的在2014年彻底放弃了Shark,转向SparkSQL。
因此可以理解为SparkSQL是一个全新的项目,接下来将会带大家一起走近SparkSQL的世界,从SparkSQL体系的最顶端走向最底层,寻根问底,深入理解SparkSQL是如何工作的。
SparkSQL 体系结构
SparkSQL体系结构如下图所示,整体由上到下分为三层:编程模型层、执行任务优化层以及任务执行引擎层,其中SparkSQL编程模型可以分为SQL和DataFrame两种;执行计划优化又称为Catalyst,该模块负责将SQL语句解析成AST(逻辑执行计划),并对原始逻辑执行计划进行优化,优化规则分为基于规则的优化策略和基于代价的优化策略两种,最终输出优化后的物理执行计划;任务执行引擎就是Spark内核,负责根据物理执行计划生成DAG,在任务调度系统的管理下分解为任务集并分发到集群节点上加载数据运行,Tungsten基于对内存和CPU的性能优化,使得Spark能够更好地利用当前硬件条件提升性能,详情可以阅读 Project Tungsten: Bringing Apache Spark Closer to Bare Metal。
SparkSQL系列文章会按照体系结构由上至下详细地进行说明,本篇下面会重点讲解编程接口DataFrame,后面会利用M篇文章分析Catalyst的工作原理,再后面会利用N篇文章分析Spark内核工作原理。
SparkSQL 编程模型 - DataFrame
说到计算模型,批处理计算从最初提出一直到现在,一共经历了两次大的变革,第一次变革是从MR编程模式到RDD编程模型,第二次则是从RDD编程模式进化到DataFrame模式。
第一次变革:MR编程模型 -> DAG编程模型
和MR计算模型相比,DAG计算模型有很多改进:
1. 可以支持更多的算子,比如filter算子、sum算子等,不再像MR只支持map和reduce两种
2. 更加灵活的存储机制,RDD可以支持本地硬盘存储、缓存存储以及混合存储三种模式,用户可以进行选择。而MR目前只支持HDFS存储一种模式。很显然,HDFS存储需要将中间数据存储三份,而RDD则不需要,这是DAG编程模型效率高的一个重要原因之一。
3. DAG模型带来了更细粒度的任务并发,不再像MR那样每次起个任务就要起个JVM进程,重死了;另外,DAG模型带来了另一个利好是很好的容错性,一个任务即使中间断掉了,也不需要从头再来一次。
4. 延迟计算机制一方面可以使得同一个stage内的操作可以合并到一起落在一块数据上,而不再是所有数据先执行a操作、再扫描一遍执行b操作,太浪费时间。另一方面给执行路径优化留下了可能性,随便你怎么优化…
所有这些改进使得DAG编程模型相比MR编程模型,性能可以有10~100倍的提升!然而,DAG计算模型就很完美吗?要知道,用户手写的RDD程序基本或多或少都会有些问题,性能也肯定不会是最优的。如果没有一个高手指点或者优化,性能依然有很大的优化潜力。这就是促成了第二次变革,从DAG编程模型进化到DataFrame编程模型。
第二次变革:DAG编程模型 -> DataFrame编程模型
相比RDD,DataFrame增加了scheme概念,从这个角度看,DataFrame有点类似于关系型数据库中表的概念。可以根据下图对比RDD与DataFrame数据结构的差别:
直观上看,DataFrame相比RDD多了一个表头,这个小小的变化带来了很多优化的空间:
1. RDD中每一行纪录都是一个整体,因此你不知道内部数据组织形式,这就使得你对数据项的操作能力很弱。表现出来就是支持很少的而且是比较粗粒度的算子,比如map、filter算子等。而DataFrame将一行切分了多个列,每个列都有一定的数据格式,这与数据库表模式就很相似了,数据粒度相比更细,因此就能支持更多更细粒度的算子,比如select算子、groupby算子、where算子等。更重要的,后者的表达能力要远远强于前者,比如同一个功能用RDD和DataFrame实现:
2. DataFrame的Schema的存在,数据项的转换也都将是类型安全的,这对于较为复杂的数据计算程序的调试是十分有利的,很多数据类型不匹配的问题都可以在编译阶段就被检查出来,而对于不合法的数据文件,DataFrame也具备一定分辨能力。
3. DataFrame schema的存在,开辟了另一种数据存储形式:列式数据存储。列式存储是相对于传统的行式存储而言的,简单来讲,就是将同一列的所有数据物理上存储在一起。对于列式存储和行式存储可以参考下图:
列式存储有两个重要的作用,首先,同一种类型的数据存储在一起可以很好的提升数据压缩效率,因为越“相似”的数据,越容易压缩。数据压缩可以减少存储空间需求,还可以减少数据传输过程中的带宽需求,这对于类似于Spark之类的大内存计算引擎来讲,会带来极大的益处;另外,列式存储还可以有效减少查询过程中的实际IO,大数据领域很多OLAP查询业务通常只会检索部分列值,而不是粗暴的select * ,这样列式存储可以有效执行’列值裁剪’,将不需要查找的列直接跳过。
4. DAG编程模式都是用户自己写RDD scala程序,自己写嘛,必然或多或少会有性能提升的空间!而DataFrame编程模式集成了一个优化神奇-Catalyst,这玩意类似于MySQL的SQL优化器,负责将用户写的DataFrame程序进行优化得到最优的执行计划(下文会讲),比如最常见的谓词下推优化。很显然,优化后的执行计划相比于手写的执行计划性能当然会来的好一些。下图是官方给出来的测试对比数据(测试过程是在10billion数据规模下进行过滤聚合):
个人觉得,RDD和DataFrame的关系类似于汇编语言和Java语言的关系,同一个功能,如果你用汇编实现的话,一方面会写的很长,另一方面写的代码可能还不是最优的,可谓是又臭又长。而Java语言有很多高级语意,可以很方便的实现相关功能,另一方面经过JVM优化后会更加高效。
DataFrame 使用
Spark官网已经对DataFrame的使用方法进行了详细的说明,强烈推荐大家阅读(不要去在百度上搜什么DataFrame使用等)。在此不在赘述,具体地址为:http://spark.apache.org/docs/latest/sql-programming-guide.html
参考文献:
1. A Tale of Three Apache Spark APIs: RDDs, DataFrames, and Datasets
2. http://people.csail.mit.edu/matei/papers/2015/sigmod_spark_sql.pdf
3. http://spark.apache.org/docs/latest/sql-programming-guide.html