http://spark.apache.org/docs/1.6.1/sql-programming-guide.html
DataFrames
在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。
DataFrame与RDD的主要区别在于,前者带有schema元信息
即DataFrame所表示的二维表数据集的每一列都带有名称和类型,这使得Spark SQL得以洞察更多的结构信息。
从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,
最终达到大幅提升运行时效率的目标。反观RDD,由于无从得知所存数据元素的具体内部结构,
Spark Core只能在stage层面进行简单、通用的流水线优化
Spark SQL Core
Spark SQL的核心是把已有的RDD,带上Schema信息,然后注册成类似sql里的“Table”,对其进行sql查询
这里面主要分两部分:一是生成SchemaRDD,二十执行查询
正如RDD的各种变换实际上只是在构造RDD DAG,DataFrame的各种变换同样也是lazy的。
他们并不直接求出计算结果,而是将各种变换组装成与RDD DAG类似的逻辑查询计划
如前所述,由于DataFrame带有schema元信息,Spark SQL的查询优化器得以洞察数据和计算的精细结构
从而施行具有很强针对性的优化。随后,经过优化的逻辑执行计划被翻译为物理执行计划,并最终落实为RDD DAG。
#vi person.txt
1,tingting,23,80
2,ningning,25,90
3,ruhua,27,60
4,mimi,33,85
#hdfs dfs -put person.txt /
#hdfs dfs -cat /person.txt
scala> val line = sc.textFile("hdfs://hadoop21:9000/person.txt").map(_.split(","))
line: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[38] at map at <console>:24
scala> case class Person(id: Int, name: String, age: Int, faceValue: Int)
defined class Person
scala> val personRDD = line.map(x => Person(x(0).toInt,x(1), x(2).toInt,x(3).toInt))
personRDD: org.apache.spark.rdd.RDD[Person] = MapPartitionsRDD[39] at map at <console>:28
scala> val personDF = personRDD.toDF
personDF: org.apache.spark.sql.DataFrame = [id: int, name: string ... 2 more fields]
scala> personDF.show
+---+--------+---+---------+
| id| name|age|faceValue|
+---+--------+---+---------+
| 1|tingting| 23| 80|
| 2|ningning| 25| 90|
| 3| ruhua| 27| 60|
| 4| mimi| 33| 85|
+---+--------+---+---------+