Spark-DataFrame的基本操作

在SparkSQL中SparkSession是创建DataFrame和执行SQL的入口,创建DataFrame有三种方式:通过Spark的数据源进行创建,从一个存在的RDD进行转换和从Hive table中进行查询返回。

创建

  1. 从Spark数据源进行创建

    1. 查看Spark数据源进行创建的文件格式

      通过调用SparkSQL的DataFrameReader创建,支持的文件格式有:

      1. csv
      2. jdbc
      3. json
      4. orc
      5. parquet
      6. text
    2. 读取json文件创建DataFrame

      /*
       * 文件内容为
       * {"id": 1, "name": "adam"}
       * {"id": 2, "name": "brad"}
       * {"id": 3, "name": "carl"}
       */
      val df = spark.read.json("test.json")
      df.show
      
    3. 展示结果

      +---+----+

      | id|name|
      +---+----+
      | 1|adam|
      | 2|brad|
      | 3|carl|
      +---+----+

  2. 从RDD转换

    注:如果需要在RDD与DataFrame或DataSet间进行操作,必须进行隐式转换引入import spark.implicits._[spark为SparkSession对象]

    1. 新建RDD

      /* 文件内容
      1,adam,18
      2,brad,21
      3,carl,13
      */
      val rdd = sc.textFile("test.json")
      
    2. 通过手动确定转换

      val df = rdd.map(x => {
            (x.split(",")(0).trim.toInt, x.split(",")(1).trim.toInt, x.split(",")(2).trim.toInt)
          }).toDF("id", "name", "age")
      df.show
      
    3. 结果

      +---+----+---+
      | id|name|age|
      +---+----+---+
      | 1|adam| 18|
      | 2|brad| 21|
      | 3|carl| 13|
      +---+----+---+

  3. 从HiveTable查询返回

SQL风格语法

  1. 创建一个DataFrame

    /*
     * 文件内容为
     * {"id": 1, "name": "adam", "age": 18}
     * {"id": 2, "name": "brad", "age": 21}
     * {"id": 3, "name": "carl", "age": 13}
     */
    val df = spark.read.json("test.json")
    
  2. 对DataFrame创建一个临时表

    df.createOrReplaceTempView("student")
    
  3. 通过SQL查询全表

    val sqlDf = spark.sql("select * from student")
    sqlDf.show
    
  4. 结果

    +---+---+----+
    |age| id|name|
    +---+---+----+
    | 18| 1|adam|
    | 21| 2|brad|
    | 13| 3|carl|
    +---+---+----+

注:临时表是Session范围内的,Session退出后表就失效了。如果想要在应用范围内有效,可以使用全局临时表,访问时需要全表名访问,如:global_temp.student

  1. 对DataFrame创建一个临时表

    df.createOrReplaceGlobalTempView("student")
    
  2. 通过SQL查询全表

    val sqlDf = spark.sql("select * from global_temp.student")
    sqlDf.show
    
  3. 结果

    +---+---+----+
    |age| id|name|
    +---+---+----+
    | 18| 1|adam|
    | 21| 2|brad|
    | 13| 3|carl|
    +---+---+----+

DSL风格语法

  1. 创建一个DataFrame

    /*
     * 文件内容为
     * {"id": 1, "name": "adam", "age": 18}
     * {"id": 2, "name": "brad", "age": 21}
     * {"id": 3, "name": "carl", "age": 13}
     */
    val df = spark.read.json("test.json")
    
  2. 查看DataFrame的schema信息

    df.printSchema
    /* 结果
    root
     |-- age: long (nullable = true)
     |-- id: long (nullable = true)
     |-- name: string (nullable = true)
    */
    
  3. 只查看“name”列

    df.select("name").show
    /* 结果
    +----+
    |name|
    +----+
    |adam|
    |brad|
    |carl|
    +----+
    */
    
  4. 查看“name”列和“age+1”数据

    df.select($"name", $"age" + 1).show
    /* 结果
    +----+---------+
    |name|(age + 1)|
    +----+---------+
    |adam|       19|
    |brad|       22|
    |carl|       14|
    +----+---------+
    */
    
  5. 查看age大于15的数据

    df.filter($"age" > 15).show
    /* 结果
    +---+---+----+
    |age| id|name|
    +---+---+----+
    | 18|  1|adam|
    | 21|  2|brad|
    +---+---+----+
    */
    
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。