在SparkSQL中SparkSession
是创建DataFrame和执行SQL的入口,创建DataFrame有三种方式:通过Spark的数据源进行创建,从一个存在的RDD进行转换和从Hive table中进行查询返回。
创建
-
从Spark数据源进行创建
-
查看Spark数据源进行创建的文件格式
通过调用SparkSQL的DataFrameReader创建,支持的文件格式有:
- csv
- jdbc
- json
- orc
- parquet
- text
-
读取json文件创建DataFrame
/* * 文件内容为 * {"id": 1, "name": "adam"} * {"id": 2, "name": "brad"} * {"id": 3, "name": "carl"} */ val df = spark.read.json("test.json") df.show
-
展示结果
+---+----+
| id|name|
+---+----+
| 1|adam|
| 2|brad|
| 3|carl|
+---+----+
-
-
从RDD转换
注:如果需要在RDD与DataFrame或DataSet间进行操作,必须进行隐式转换引入
import spark.implicits._
[spark为SparkSession对象]-
新建RDD
/* 文件内容 1,adam,18 2,brad,21 3,carl,13 */ val rdd = sc.textFile("test.json")
-
通过手动确定转换
val df = rdd.map(x => { (x.split(",")(0).trim.toInt, x.split(",")(1).trim.toInt, x.split(",")(2).trim.toInt) }).toDF("id", "name", "age") df.show
-
结果
+---+----+---+
| id|name|age|
+---+----+---+
| 1|adam| 18|
| 2|brad| 21|
| 3|carl| 13|
+---+----+---+
-
从HiveTable查询返回
SQL风格语法
-
创建一个DataFrame
/* * 文件内容为 * {"id": 1, "name": "adam", "age": 18} * {"id": 2, "name": "brad", "age": 21} * {"id": 3, "name": "carl", "age": 13} */ val df = spark.read.json("test.json")
-
对DataFrame创建一个临时表
df.createOrReplaceTempView("student")
-
通过SQL查询全表
val sqlDf = spark.sql("select * from student") sqlDf.show
-
结果
+---+---+----+
|age| id|name|
+---+---+----+
| 18| 1|adam|
| 21| 2|brad|
| 13| 3|carl|
+---+---+----+
注:临时表是Session范围内的,Session退出后表就失效了。如果想要在应用范围内有效,可以使用全局临时表,访问时需要全表名访问,如:global_temp.student
-
对DataFrame创建一个临时表
df.createOrReplaceGlobalTempView("student")
-
通过SQL查询全表
val sqlDf = spark.sql("select * from global_temp.student") sqlDf.show
-
结果
+---+---+----+
|age| id|name|
+---+---+----+
| 18| 1|adam|
| 21| 2|brad|
| 13| 3|carl|
+---+---+----+
DSL风格语法
-
创建一个DataFrame
/* * 文件内容为 * {"id": 1, "name": "adam", "age": 18} * {"id": 2, "name": "brad", "age": 21} * {"id": 3, "name": "carl", "age": 13} */ val df = spark.read.json("test.json")
-
查看DataFrame的schema信息
df.printSchema /* 结果 root |-- age: long (nullable = true) |-- id: long (nullable = true) |-- name: string (nullable = true) */
-
只查看“name”列
df.select("name").show /* 结果 +----+ |name| +----+ |adam| |brad| |carl| +----+ */
-
查看“name”列和“age+1”数据
df.select($"name", $"age" + 1).show /* 结果 +----+---------+ |name|(age + 1)| +----+---------+ |adam| 19| |brad| 22| |carl| 14| +----+---------+ */
-
查看age大于15的数据
df.filter($"age" > 15).show /* 结果 +---+---+----+ |age| id|name| +---+---+----+ | 18| 1|adam| | 21| 2|brad| +---+---+----+ */