Spark--DataFrme操作Hive

http://www.zhangrenhua.com/2015/11/28/hadoop-spark-dataFrame%E6%93%8D%E4%BD%9Chive/

背景

从spark1.3起,spark SQL中的SchemaRDD变为了DataFrame,DataFrame相对于SchemaRDD有了较大改变,同时提供了更多好用且方便的API。

本文主要演示如何在spark1.5.2中使用DataFrame将数据写入hive中以及DataFrame的一些其他API,仅供参考。

DataFrame SaveAsTable

示例

新建数据文件test.txt,并上次至HDFS的/test/zhangrenhua目录下:

zrh,19
z,20
r,21
h,90

创建hive表:

CREATE DATABASE test;
DROP TABLE
 test.test;
CREATE TABLE
 test.test
 (
 t_string string,
 t_integer INT,
 t_boolean BOOLEAN,
 t_double DOUBLE,
 t_decimal DECIMAL(20,8)
 )
 stored AS orc;

Demo.java:

import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

/**
 * spark程序入口
 */
public class Demo {

 public static void main(String[] args) {

 String input = "/test/zhangrenhua";
 SparkConf sparkConf = new SparkConf().setAppName("Demo");
 SparkContext sc = new SparkContext(sparkConf);
 HiveContext hiveContext = new HiveContext(sc);
 hiveContext.setConf("spark.sql.hive.metastore.version", "0.13.0.2.1");

 // 创建spark Context
 try (JavaSparkContext ctx = new JavaSparkContext(sc);) {
 // 读取测试文件
 JavaRDD<String> textFile = ctx.textFile(input);
 JavaRDD<Row> map = textFile.map(new Function<String, Row>() {

 /**
 * 
 */
 private static final long serialVersionUID = 8745604304589989962L;

 @Override
 public Row call(String v1) throws Exception {

 // 解析测试数据,并setting默认值
 String[] array = v1.split(",");
 TestBean result = new TestBean();
 result.setT_string(array[0]);
 result.setT_integer(Integer.parseInt(array[1]));
 result.setT_boolean(true);
 result.setT_double(12.12);
 Decimal t_decimal = new Decimal();
 t_decimal.set(new scala.math.BigDecimal(new BigDecimal("11111111.11111111")));
 result.setT_decimal(t_decimal);

 // 不能使用hiveContext.createDataFrame(map, TestBean.class);
 // 字段顺序不一致,不知道是bug还是什么。所以只能选择row
 return result.toRow();
 }
 });

 // Generate the schema based on the string of schema
 List<StructField> fields = new ArrayList<StructField>();
 fields.add(DataTypes.createStructField("t_string", DataTypes.StringType, true));
 fields.add(DataTypes.createStructField("t_integer", DataTypes.IntegerType, true));
 fields.add(DataTypes.createStructField("t_boolean", DataTypes.BooleanType, true));
 fields.add(DataTypes.createStructField("t_double", DataTypes.DoubleType, true));
 fields.add(DataTypes.createStructField("t_decimal", DataTypes.createDecimalType(20, 8), true));

 StructType schema = DataTypes.createStructType(fields);

 DataFrame df = hiveContext.createDataFrame(map, schema);
 df.write().format("orc").mode(SaveMode.Append).saveAsTable("test.test");
 }
 }

}

TestBean.java:

import java.io.Serializable;

import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.Decimal;

public class TestBean implements Serializable {

 /**
 * 
 */
 private static final long serialVersionUID = -5868257956951746438L;
 private String t_string;
 private Integer t_integer;
 private Boolean t_boolean;
 private Double t_double;
 private Decimal t_decimal;

 public String getT_string() {
 return t_string;
 }

 public void setT_string(String t_string) {
 this.t_string = t_string;
 }

 public Integer getT_integer() {
 return t_integer;
 }

 public void setT_integer(Integer t_integer) {
 this.t_integer = t_integer;
 }

 public Boolean getT_boolean() {
 return t_boolean;
 }

 public void setT_boolean(Boolean t_boolean) {
 this.t_boolean = t_boolean;
 }

 public Double getT_double() {
 return t_double;
 }

 public void setT_double(Double t_double) {
 this.t_double = t_double;
 }

 public Decimal getT_decimal() {
 return t_decimal;
 }

 public void setT_decimal(Decimal t_decimal) {
 this.t_decimal = t_decimal;
 }

 public Row toRow() {
 return RowFactory.create(t_string, t_integer, t_boolean, t_double, t_decimal);
 }
}

根据以上代码,编译打包即刻使用spark-submit命令执行:

spark-submit --master yarn-client --class Demo xx.jar

要点记录

1、根据官方例子中使用hiveContext.createDataFrame(map, TestBean.class);是可以将对象转换成DataFrame的,但是我在测试中发现如果使用这种方式创建DataFrame,最终存到hive表中的数据字段是不对的。,所以还是自定义schema,返回Row对象。

RowFactory.create(t_string, t_integer, t_boolean, t_double, t_decimal);

// Generate the schema based on the string of schema
List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("t_string", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("t_integer", DataTypes.IntegerType, true));
fields.add(DataTypes.createStructField("t_boolean", DataTypes.BooleanType, true));
fields.add(DataTypes.createStructField("t_double", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("t_decimal", DataTypes.createDecimalType(20, 8), true));

创建Listfields和Row对象时,传入的字段顺序一定要和表中的顺序保持一致。

2、将DataFrame存入自定义数据库的表中

df.saveAsTable("test.test");

从spark1.5.1开始支持显示指定数据库名,在spark1.5.1之前的版本需要使用下面方法:

hiveContext.sql("use test");

DataFrame Save As 分区表

上面演示了如何将常用数据类型(string、integer、boolean、double、decimal)的DataFrame存入到hive表中。但是没有描述如何将数据存入分区表中,下面我会给出如何写入分区表中的具体逻辑。

1、动态分区

// 设置动态分区
hiveContext.sql("set hive.exec.dynamic.partition=true");
hiveContext.sql("set hive.exec.dynamic.partition.mode=nonstrict");

创建Row对象时,将分区字段放到最后

RowFactory.create(t_string, t_integer, t_boolean, t_double, t_decimal, '分区字段');

df.saveAsTable(“test.test”);

使用动态分区会影响写入性能,如果分区字段是可以固定的,则建议使用下面 固定分区方法。

2、固定分区

df.registerTempTable("demo");
hiveContext.sql("insert into test.test partition(date='20151205') select * from demo");

注:date为分区字段,20151205为分区值,可由参数传入

Metadata Refreshing

Spark SQL caches Parquet metadata for better performance. When Hive metastore Parquet table conversion is enabled, metadata of those converted tables are also cached. If these tables are updated by Hive or other external tools, you need to refresh them manually to ensure consistent metadata.

// sqlContext is an existing HiveContext
sqlContext.refreshTable("my_table")

DataFrame Operations

DataFrame提供一个结构化的数据,下面是使用Java操作DataFrame的一些基本例子:

JavaSparkContext sc // An existing SparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc)

// Create the DataFrame
DataFrame df = sqlContext.read().json("examples/src/main/resources/people.json");

// Show the content of the DataFrame
df.show();
// age  name
// null Michael
// 30   Andy
// 19   Justin

// Print the schema in a tree format
df.printSchema();
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
df.select("name").show();
// name
// Michael
// Andy
// Justin

// Select everybody, but increment the age by 1
df.select(df.col("name"), df.col("age").plus(1)).show();
// name    (age + 1)
// Michael null
// Andy    31
// Justin  20

// Select people older than 21
df.filter(df.col("age").gt(21)).show();
// age name
// 30  Andy

// Count people by age
df.groupBy("age").count().show();
// age  count
// null 1
// 19   1
// 30   1

Parquet Configuration

Configuration of Parquet can be done using the setConf method on SQLContext or by running
SET key=value commands using SQL.

| Property Name | Default | Meaning |
| spark.sql.parquet.binaryAsString | false |
Some other Parquet-producing systems, in particular Impala, Hive, and older versions of Spark SQL, do
not differentiate between binary data and strings when writing out the Parquet schema. This
flag tells Spark SQL to interpret binary data as a string to provide compatibility with these systems.
|
| spark.sql.parquet.int96AsTimestamp | true |
Some Parquet-producing systems, in particular Impala and Hive, store Timestamp into INT96. This
flag tells Spark SQL to interpret INT96 data as a timestamp to provide compatibility with these systems.
|
| spark.sql.parquet.cacheMetadata | true |
Turns on caching of Parquet schema metadata. Can speed up querying of static data.
|
| spark.sql.parquet.compression.codec | gzip |
Sets the compression codec use when writing Parquet files. Acceptable values include:
uncompressed, snappy, gzip, lzo.
|
| spark.sql.parquet.filterPushdown | true | Enables Parquet filter push-down optimization when set to true. |
| spark.sql.hive.convertMetastoreParquet | true |
When set to false, Spark SQL will use the Hive SerDe for parquet tables instead of the built in
support.
|
| spark.sql.parquet.output.committer.class | org.apache.parquet.hadoop. ParquetOutputCommitter |

The output committer class used by Parquet. The specified class needs to be a subclass of
org.apache.hadoop. mapreduce.OutputCommitter. Typically, it’s also a
subclass of org.apache.parquet.hadoop.ParquetOutputCommitter.

Note:

  • This option is automatically ignored if spark.speculation is turned on.

  • This option must be set via Hadoop Configuration rather than Spark
    SQLConf.

  • This option overrides spark.sql.sources. outputCommitterClass.

Spark SQL comes with a builtin
org.apache.spark.sql. parquet.DirectParquetOutputCommitter, which can be more
efficient then the default Parquet output committer when writing data to S3.

|
| spark.sql.parquet.mergeSchema | false |

When true, the Parquet data source merges schemas collected from all data files, otherwise the
schema is picked from the summary file or a random data file if no summary file is available.

|

Interacting with Different Versions of Hive Metastore

One of the most important pieces of Spark SQL’s Hive support is interaction with Hive metastore, which enables Spark SQL to access metadata of Hive tables. Starting from Spark 1.4.0, a single binary build of Spark SQL can be used to query different versions of Hive metastores, using the configuration described below. Note that independent of the version of Hive that is being used to talk to the metastore, internally Spark SQL will compile against Hive 1.2.1 and use those classes for internal execution (serdes, UDFs, UDAFs, etc).

The following options can be used to configure the version of Hive that is used to retrieve metadata:

| Property Name | Default | Meaning |
| spark.sql.hive.metastore.version | 1.2.1 |
Version of the Hive metastore. Available
options are 0.12.0 through 1.2.1.
|
| spark.sql.hive.metastore.jars | builtin |
Location of the jars that should be used to instantiate the HiveMetastoreClient. This
property can be one of three options:

  1. builtin

-Phive

spark.sql.hive.metastore.version

1.2.1

  1. maven

  2. A classpath in the standard format for the JVM. This classpath must include all of Hive
    and its dependencies, including the correct version of Hadoop. These jars only need to be
    present on the driver, but if you are running in yarn cluster mode then you must ensure
    they are packaged with you application.

|
| spark.sql.hive.metastore.sharedPrefixes | com.mysql.jdbc, org.postgresql, com.microsoft.sqlserver, oracle.jdbc |

A comma separated list of class prefixes that should be loaded using the classloader that is
shared between Spark SQL and a specific version of Hive. An example of classes that should
be shared is JDBC drivers that are needed to talk to the metastore. Other classes that need
to be shared are those that interact with classes that are already shared. For example,
custom appenders that are used by log4j.

|
| spark.sql.hive.metastore.barrierPrefixes | (empty) |

A comma separated list of class prefixes that should explicitly be reloaded for each version
of Hive that Spark SQL is communicating with. For example, Hive UDFs that are declared in a
prefix that typically would be shared (i.e. org.apache.spark.*).

|

JDBC To Other Databases

Spark SQL also includes a data source that can read data from other databases using JDBC. This functionality should be preferred over using JdbcRDD. This is because the results are returned as a DataFrame and they can easily be processed in Spark SQL or joined with other data sources. The JDBC data source is also easier to use from Java or Python as it does not require the user to provide a ClassTag. (Note that this is different than the Spark SQL JDBC server, which allows other applications to run queries using Spark SQL).

To get started you will need to include the JDBC driver for you particular database on the spark classpath. For example, to connect to postgres from the Spark Shell you would run the following command:

SPARK_CLASSPATH=postgresql-9.3-1102-jdbc41.jar bin/spark-shell

Tables from the remote database can be loaded as a DataFrame or Spark SQL Temporary table using the Data Sources API. The following options are supported:

| Property Name | Meaning |
| url |
The JDBC URL to connect to.
|
| dbtable |
The JDBC table that should be read. Note that anything that is valid in a FROM clause of
a SQL query can be used. For example, instead of a full table you could also use a
subquery in parentheses.
|
| driver |
The class name of the JDBC driver needed to connect to this URL. This class will be loaded
on the master and workers before running an JDBC commands to allow the driver to
register itself with the JDBC subsystem.
|
| partitionColumn, lowerBound, upperBound, numPartitions |
These options must all be specified if any of them is specified. They describe how to
partition the table when reading in parallel from multiple workers.
partitionColumn must be a numeric column from the table in question. Notice
that lowerBound and upperBound are just used to decide the
partition stride, not for filtering the rows in table. So all rows in the table will be
partitioned and returned.
|

示例:

Map<String, String> options = new HashMap<String, String>();
options.put("url", "jdbc:postgresql:dbserver");
options.put("dbtable", "schema.tablename");

DataFrame jdbcDF = sqlContext.read().format("jdbc"). options(options).load();

参考资料

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,884评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,347评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,435评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,509评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,611评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,837评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,987评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,730评论 0 267
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,194评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,525评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,664评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,334评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,944评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,764评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,997评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,389评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,554评论 2 349

推荐阅读更多精彩内容