翻译: https://www.cloudera.com/documentation/enterprise/latest/topics/spark_sparksql.html
版本: 5.14.2
Spark SQL允许您使用SQL或使用DataFrame API查询Spark程序内的结构化数据。
有关Spark SQL的详细信息,请参阅Spark SQL和DataFrame指南。
继续阅读:
- SQLContext和HiveContext
- 将文件查询到DataFrame中
- Spark SQL示例
- 确保HiveContext实现安全访问
- 与Hive Views交互
- Spark SQL DROP TABLE PURGE的性能和存储注意事项
SQLContext和HiveContext
所有Spark SQL功能的入口点是 SQLContext 类或其后代之一。你创建一个 SQLContext 从一个 SparkContext 。使用SQLContext ,您可以从RDD,Hive表或数据源创建DataFrame。
要在Spark应用程序中使用存储在Hive或Impala表中的数据,请构建一个 HiveContext ,它继承自SQLContext 。使用 HiveContext ,您可以访问Hive或Impala表等代表的Metastore数据库。
注意:
Hive和Impala表和相关的SQL语法在大多数方面是可以互换的。因为Spark使用底层Hive基础架构,所以使用Spark SQL您可以使用HiveQL语法编写DDL语句,DML语句和查询。对于交互式查询性能,可以通过Impala使用impala-shell或Impala JDBC和ODBC接口访问相同的表。
如果你使用 spark-shell , 一个 HiveContext 已经为你创建,并作为 sqlContext 变量。
如果你使用 spark-submit ,在程序开始时使用如下代码:
Python:
from pyspark import SparkContext, HiveContext
sc = SparkContext(appName = "test")
sqlContext = HiveContext(sc)
Spark应用程序提交的主机 或 spark-shell or pyspark 运行的主机,运行必须具有在Cloudera Manager 中定义的Hive 网关角色以及部署的客户端配置。
当Spark作业访问Hive视图时,Spark必须有权读取底层Hive表中的数据文件。目前,Spark不能使用基于列或者where子句。如果Spark没有底层数据文件所需的权限,则针对视图的SparkSQL查询将返回空结果集,而不是错误。
将文件查询到DataFrame中
如果数据文件位于Hive或Impala表之外,则可以使用SQL将JSON或Parquet文件直接读取到DataFrame中:
- JSON:
df = sqlContext.sql("SELECT * FROM json.`input dir`")
- Parquet:
df = sqlContext.sql("SELECT * FROM parquet.`input dir`")
请参阅在文件上运行SQL。
Spark SQL示例
这个例子演示了如何使用sqlContext.sql 创建并加载两个表,并从表中选择两行到两个DataFrame。接下来的步骤使用DataFrame API从其中一个表中过滤大于150,000的工资,并显示结果DataFrame。然后将这两个DataFrame加入来创建第三个DataFrame。最后,新的DataFrame被保存到一个Hive表。
- 在命令行中,将Hue sample_07和sample_08 CSV文件复制到HDFS:
$ hdfs dfs -put HUE_HOME/apps/beeswax/data/sample_07.csv /user/hdfs
$ hdfs dfs -put HUE_HOME/apps/beeswax/data/sample_08.csv /user/hdfs
其中 HUE_HOME 默认/opt/cloudera/parcels/CDH/lib/hue (包裹安装)或 /usr/lib/hue(软件包安装)。
- 开始 spark-shell:: spark-shell:
- 创建Hive表sample_07和sample_08:
scala> sqlContext.sql("CREATE TABLE sample_07 (code string,description string,total_emp int,salary int) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TextFile")
scala> sqlContext.sql("CREATE TABLE sample_08 (code string,description string,total_emp int,salary int) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TextFile")
- 在Beeline中,显示Hive表:
[0: jdbc:hive2://hostname.com:> show tables;
+------------+--+
| tab_name |
+------------+--+
| sample_07 |
| sample_08 |
+------------+--+
- 将CSV文件中的数据加载到表中:
scala> sqlContext.sql("LOAD DATA INPATH '/user/hdfs/sample_07.csv' OVERWRITE INTO TABLE sample_07")
scala> sqlContext.sql("LOAD DATA INPATH '/user/hdfs/sample_08.csv' OVERWRITE INTO TABLE sample_08")
- 创建包含sample_07和sample_08表格内容的DataFrame:
scala> val df_07 = sqlContext.sql("SELECT * from sample_07")
scala> val df_08 = sqlContext.sql("SELECT * from sample_08")
- 显示薪水大于150,000的df_07中的所有行:
scala> df_07.filter(df_07("salary") > 150000).show()
输出应该是:
+-------+--------------------+---------+------+
| code| description|total_emp|salary|
+-------+--------------------+---------+------+
|11-1011| Chief executives| 299160|151370|
|29-1022|Oral and maxillof...| 5040|178440|
|29-1023| Orthodontists| 5350|185340|
|29-1024| Prosthodontists| 380|169360|
|29-1061| Anesthesiologists| 31030|192780|
|29-1062|Family and genera...| 113250|153640|
|29-1063| Internists, general| 46260|167270|
|29-1064|Obstetricians and...| 21340|183600|
|29-1067| Surgeons| 50260|191410|
|29-1069|Physicians and su...| 237400|155150|
+-------+--------------------+---------+------+
- 通过加入df_07和df_08创建DataFrame df_09,仅保留 code and description 列。
scala> val df_09 = df_07.join(df_08, df_07("code") === df_08("code")).select(df_07.col("code"),df_07.col("description"))
scala> df_09.show()
新的DataFrame如下所示:
+-------+--------------------+
| code| description|
+-------+--------------------+
|00-0000| All Occupations|
|11-0000|Management occupa...|
|11-1011| Chief executives|
|11-1021|General and opera...|
|11-1031| Legislators|
|11-2011|Advertising and p...|
|11-2021| Marketing managers|
|11-2022| Sales managers|
|11-2031|Public relations ...|
|11-3011|Administrative se...|
|11-3021|Computer and info...|
|11-3031| Financial managers|
|11-3041|Compensation and ...|
|11-3042|Training and deve...|
|11-3049|Human resources m...|
|11-3051|Industrial produc...|
|11-3061| Purchasing managers|
|11-3071|Transportation, s...|
|11-9011|Farm, ranch, and ...|
|11-9012|Farmers and ranchers|
+-------+--------------------+
- 将DataFrame df_09另存为Hive表sample_09:
scala> df_09.write.saveAsTable("sample_09")
- 在Beeline中,显示Hive表:
[0: jdbc:hive2://hostname.com:> show tables;
+------------+--+
| tab_name |
+------------+--+
| sample_07 |
| sample_08 |
| sample_09 |
+------------+--+
Python中的等效程序,您可以使用spark-submit提交, 将会:
from pyspark import SparkContext, SparkConf, HiveContext
if __name__ == "__main__":
# create Spark context with Spark configuration
conf = SparkConf().setAppName("Data Frame Join")
sc = SparkContext(conf=conf)
sqlContext = HiveContext(sc)
df_07 = sqlContext.sql("SELECT * from sample_07")
df_07.filter(df_07.salary > 150000).show()
df_08 = sqlContext.sql("SELECT * from sample_08")
tbls = sqlContext.sql("show tables")
tbls.show()
df_09 = df_07.join(df_08, df_07.code == df_08.code).select(df_07.code,df_07.description)
df_09.show()
df_09.write.saveAsTable("sample_09")
tbls = sqlContext.sql("show tables")
tbls.show()
而不是使用直线来显示表格 , show tables 查询使用Spark SQL API运行。
确保HiveContext实现安全访问
为了保证 HiveContext 强制ACL,按照同步HDFS ACL和Sentry权限中所述启用HDFS-Sentry插件。HDFS-Sentry插件不支持从Spark SQL进行访问的列级访问控制。
与Hive Views交互
当Spark作业访问Hive视图时,Spark必须有权读取底层Hive表中的数据文件。目前,Spark不能使用基于列或者where子句。如果Spark没有底层数据文件所需的权限,则针对视图的SparkSQL查询将返回空结果集,而不是错误。
Spark SQL DROP TABLE PURGE的性能和存储注意事项
Hive中的 DROP TABLE 语句的 PURGE 子句 会立即删除底层数据文件,而不会将其传输到临时存储区域(HDFS垃圾箱)中。
虽然 PURGE 子句被Spark SQL DROP TABLE 语句识别,此子句当前不会传递给在后台执行“drop table”操作的Hive语句。所以,如果你知道PURGE 行为对于性能,存储或安全性原因 非常重要,请执行此操作DROP TABLE 直接在Hive中,例如通过beeline shell,而不是通过Spark SQL。
即时删除方面 ,PURGE 在以下情况下可能很重要:
如果群集的存储空间不足,并且立即释放空间非常重要,而不是等待HDFS垃圾箱定期清空。
如果底层数据文件驻留在Amazon S3文件系统上。将文件从S3移动到HDFS垃圾箱涉及物理复制文件,即默认设置DROP TABLE S3上的行为涉及显着的性能开销。
如果底层数据文件包含敏感信息,并且将其完全删除非常重要,而不是通过定期清空垃圾箱来清除它们。
如果对HDFS加密区域的限制阻止将文件移动到HDFS垃圾箱。此限制主要适用于CDH 5.7及更低版本。在CDH 5.8及更高版本中,每个HDFS加密区都有自己的HDFS垃圾桶,因此不带PURGE 子句的 DROP TABLE 行为正常。