CDH Spark SQL

翻译: https://www.cloudera.com/documentation/enterprise/latest/topics/spark_sparksql.html
版本: 5.14.2

Spark SQL允许您使用SQL或使用DataFrame API查询Spark程序内的结构化数据。

有关Spark SQL的详细信息,请参阅Spark SQL和DataFrame指南

继续阅读:

SQLContext和HiveContext

所有Spark SQL功能的入口点是 SQLContext 类或其后代之一。你创建一个 SQLContext 从一个 SparkContext 。使用SQLContext ,您可以从RDD,Hive表或数据源创建DataFrame。

要在Spark应用程序中使用存储在Hive或Impala表中的数据,请构建一个 HiveContext ,它继承自SQLContext 。使用 HiveContext ,您可以访问Hive或Impala表等代表的Metastore数据库。

注意:
Hive和Impala表和相关的SQL语法在大多数方面是可以互换的。因为Spark使用底层Hive基础架构,所以使用Spark SQL您可以使用HiveQL语法编写DDL语句,DML语句和查询。对于交互式查询性能,可以通过Impala使用impala-shell或Impala JDBC和ODBC接口访问相同的表。

如果你使用 spark-shell , 一个 HiveContext 已经为你创建,并作为 sqlContext 变量。

如果你使用 spark-submit ,在程序开始时使用如下代码:

Python:

from pyspark import SparkContext, HiveContext
sc = SparkContext(appName = "test")
sqlContext = HiveContext(sc)

Spark应用程序提交的主机 或 spark-shell or pyspark 运行的主机,运行必须具有在Cloudera Manager 定义的Hive 网关角色以及部署的客户端配置

当Spark作业访问Hive视图时,Spark必须有权读取底层Hive表中的数据文件。目前,Spark不能使用基于列或者where子句。如果Spark没有底层数据文件所需的权限,则针对视图的SparkSQL查询将返回空结果集,而不是错误。

将文件查询到DataFrame中

如果数据文件位于Hive或Impala表之外,则可以使用SQL将JSON或Parquet文件直接读取到DataFrame中:

  • JSON:
df = sqlContext.sql("SELECT * FROM json.`input dir`")
  • Parquet:
df = sqlContext.sql("SELECT * FROM parquet.`input dir`")

请参阅在文件上运行SQL

Spark SQL示例

这个例子演示了如何使用sqlContext.sql 创建并加载两个表,并从表中选择两行到两个DataFrame。接下来的步骤使用DataFrame API从其中一个表中过滤大于150,000的工资,并显示结果DataFrame。然后将这两个DataFrame加入来创建第三个DataFrame。最后,新的DataFrame被保存到一个Hive表。

  1. 在命令行中,将Hue sample_07和sample_08 CSV文件复制到HDFS:
$ hdfs dfs -put HUE_HOME/apps/beeswax/data/sample_07.csv /user/hdfs
$ hdfs dfs -put HUE_HOME/apps/beeswax/data/sample_08.csv /user/hdfs

其中 HUE_HOME 默认/opt/cloudera/parcels/CDH/lib/hue (包裹安装)或 /usr/lib/hue(软件包安装)。

  1. 开始 spark-shell:: spark-shell:
  1. 创建Hive表sample_07和sample_08:
scala> sqlContext.sql("CREATE TABLE sample_07 (code string,description string,total_emp int,salary int) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TextFile")
scala> sqlContext.sql("CREATE TABLE sample_08 (code string,description string,total_emp int,salary int) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TextFile")
  1. 在Beeline中,显示Hive表:
[0: jdbc:hive2://hostname.com:> show tables;
+------------+--+
|  tab_name  |
+------------+--+
| sample_07  |
| sample_08  |
+------------+--+
  1. 将CSV文件中的数据加载到表中:
scala> sqlContext.sql("LOAD DATA INPATH '/user/hdfs/sample_07.csv' OVERWRITE INTO TABLE sample_07")
scala> sqlContext.sql("LOAD DATA INPATH '/user/hdfs/sample_08.csv' OVERWRITE INTO TABLE sample_08")
  1. 创建包含sample_07和sample_08表格内容的DataFrame:
scala> val df_07 = sqlContext.sql("SELECT * from sample_07")
scala> val df_08 = sqlContext.sql("SELECT * from sample_08")
  1. 显示薪水大于150,000的df_07中的所有行:
scala> df_07.filter(df_07("salary") > 150000).show()

输出应该是:

+-------+--------------------+---------+------+
|   code|         description|total_emp|salary|
+-------+--------------------+---------+------+
|11-1011|    Chief executives|   299160|151370|
|29-1022|Oral and maxillof...|     5040|178440|
|29-1023|       Orthodontists|     5350|185340|
|29-1024|     Prosthodontists|      380|169360|
|29-1061|   Anesthesiologists|    31030|192780|
|29-1062|Family and genera...|   113250|153640|
|29-1063| Internists, general|    46260|167270|
|29-1064|Obstetricians and...|    21340|183600|
|29-1067|            Surgeons|    50260|191410|
|29-1069|Physicians and su...|   237400|155150|
+-------+--------------------+---------+------+
  1. 通过加入df_07和df_08创建DataFrame df_09,仅保留 code and description 列。
scala> val df_09 = df_07.join(df_08, df_07("code") === df_08("code")).select(df_07.col("code"),df_07.col("description"))
scala> df_09.show()

新的DataFrame如下所示:

+-------+--------------------+
|   code|         description|
+-------+--------------------+
|00-0000|     All Occupations|
|11-0000|Management occupa...|
|11-1011|    Chief executives|
|11-1021|General and opera...|
|11-1031|         Legislators|
|11-2011|Advertising and p...|
|11-2021|  Marketing managers|
|11-2022|      Sales managers|
|11-2031|Public relations ...|
|11-3011|Administrative se...|
|11-3021|Computer and info...|
|11-3031|  Financial managers|
|11-3041|Compensation and ...|
|11-3042|Training and deve...|
|11-3049|Human resources m...|
|11-3051|Industrial produc...|
|11-3061| Purchasing managers|
|11-3071|Transportation, s...|
|11-9011|Farm, ranch, and ...|
|11-9012|Farmers and ranchers|
+-------+--------------------+
  1. 将DataFrame df_09另存为Hive表sample_09:
scala> df_09.write.saveAsTable("sample_09")
  1. 在Beeline中,显示Hive表:
[0: jdbc:hive2://hostname.com:> show tables;
+------------+--+
|  tab_name  |
+------------+--+
| sample_07  |
| sample_08  |
| sample_09  |
+------------+--+

Python中的等效程序,您可以使用spark-submit提交, 将会:

from pyspark import SparkContext, SparkConf, HiveContext

if __name__ == "__main__":

  # create Spark context with Spark configuration
  conf = SparkConf().setAppName("Data Frame Join")
  sc = SparkContext(conf=conf)
  sqlContext = HiveContext(sc)
  df_07 = sqlContext.sql("SELECT * from sample_07")
  df_07.filter(df_07.salary > 150000).show()
  df_08 = sqlContext.sql("SELECT * from sample_08")
  tbls = sqlContext.sql("show tables")
  tbls.show()
  df_09 = df_07.join(df_08, df_07.code == df_08.code).select(df_07.code,df_07.description)
  df_09.show()
  df_09.write.saveAsTable("sample_09")
  tbls = sqlContext.sql("show tables")
  tbls.show()

而不是使用直线来显示表格 , show tables 查询使用Spark SQL API运行。

确保HiveContext实现安全访问

为了保证 HiveContext 强制ACL,按照同步HDFS ACL和Sentry权限中所述启用HDFS-Sentry插件。HDFS-Sentry插件不支持从Spark SQL进行访问的列级访问控制。

与Hive Views交互

当Spark作业访问Hive视图时,Spark必须有权读取底层Hive表中的数据文件。目前,Spark不能使用基于列或者where子句。如果Spark没有底层数据文件所需的权限,则针对视图的SparkSQL查询将返回空结果集,而不是错误。

Spark SQL DROP TABLE PURGE的性能和存储注意事项

Hive中的 DROP TABLE 语句的 PURGE 子句 会立即删除底层数据文件,而不会将其传输到临时存储区域(HDFS垃圾箱)中。

虽然 PURGE 子句被Spark SQL DROP TABLE 语句识别,此子句当前不会传递给在后台执行“drop table”操作的Hive语句。所以,如果你知道PURGE 行为对于性能,存储或安全性原因 非常重要,请执行此操作DROP TABLE 直接在Hive中,例如通过beeline shell,而不是通过Spark SQL。

即时删除方面 ,PURGE 在以下情况下可能很重要:

  • 如果群集的存储空间不足,并且立即释放空间非常重要,而不是等待HDFS垃圾箱定期清空。

  • 如果底层数据文件驻留在Amazon S3文件系统上。将文件从S3移动到HDFS垃圾箱涉及物理复制文件,即默认设置DROP TABLE S3上的行为涉及显着的性能开销。

  • 如果底层数据文件包含敏感信息,并且将其完全删除非常重要,而不是通过定期清空垃圾箱来清除它们。

  • 如果对HDFS加密区域的限制阻止将文件移动到HDFS垃圾箱。此限制主要适用于CDH 5.7及更低版本。在CDH 5.8及更高版本中,每个HDFS加密区都有自己的HDFS垃圾桶,因此不带PURGE 子句的 DROP TABLE 行为正常。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,692评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,482评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,995评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,223评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,245评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,208评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,091评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,929评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,346评论 1 311
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,570评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,739评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,437评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,037评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,677评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,833评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,760评论 2 369
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,647评论 2 354

推荐阅读更多精彩内容