SparkSQL操作RDD两种方式对比案例

前言

GitHub地址:https://github.com/guofei1219

背景

统计新渠道进件数量

SparkSQL操作RDD两种方式对比

1.使用反射推断Schema类型,具体解析参考下面的官网描述

The Scala interface for Spark SQL supports automatically converting an RDD containing case classes to a DataFrame. The case class defines the schema of the table. The names of the arguments to the case class are read using reflection and become the names of the columns. Case classes can also be nested or contain complex types such as Sequences or Arrays. This RDD can be implicitly converted to a DataFrame and then be registered as a table. Tables can be used in subsequent SQL statements.

case class blb_intpc_info(chnl_code:String,id_num:String)

2.使用编程方式制定Schema类型,具体解析参考下面的官网描述

When case classes cannot be defined ahead of time (for example, the structure of records is encoded in a string, or a text dataset will be parsed and fields will be projected differently for different users), a DataFrame can be created programmatically with three steps.

>1.Create an RDD of Rows from the original RDD;
2.Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1.
2.Apply the schema to the RDD of Rows via createDataFrame method provided by SQLContext.
val structTypes = StructType(Array(
  StructField("chnl_code", StringType, true),
  StructField("id_num", StringType, true)
))
对比总结

1.case class模板类模式可视化比较好
2.case class模板类参数上限为22个,对于字段多的不能使用
3.编程方式更适合日常开发

代码实现

源数据格式

,第一列为渠道代码、第二列为进件ID

306DC4246 411324199209142831
306DC423A 360124199011241838
306DC423D 440802198010290019
306DC4226 612328197403120016
306DC4201 452629199104050312
306DC4201 350212198505025514

反射方式
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

/**
  * 统计不同渠道进件数量
  * Created by Michael on 2016/11/29.
  */
object Custmer_Statistics_CaseClass {

  /**
    * 使用模板类描述表元数据信息
    * @param chnl_code
    * @param id_num
    */
  case class blb_intpc_info(chnl_code:String,id_num:String)

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Custmer_Statistics_CaseClass").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    //RDD隐式转换成DataFrame
    import sqlContext.implicits._
    //读取本地文件
    val blb_intpc_infoDF = sc.textFile("C:/work/ideabench/SparkSQL/data/channel/blb_intpc_info_10000_2.txt")
      .map(_.split("\\t"))
      .map(d => blb_intpc_info(d(0), d(1))).toDF()

    //注册表
    blb_intpc_infoDF.registerTempTable("blb_intpc_info")

    /**
      * 分渠道进件数量统计并按进件数量降序排列
      */
    blb_intpc_infoDF.registerTempTable("blb_intpc_info")
    sqlContext.sql("" +
      "select chnl_code,count(*) as intpc_sum " +
      "from blb_intpc_info " +
      "group by chnl_code").toDF().sort($"intpc_sum".desc).show()
  }

}
运行结果
+---------+---------+
|chnl_code|intpc_sum|
+---------+---------+
|306DC421E|      631|
|306DC4201|      603|
|306DC422B|      472|
|306DC4221|      326|
|306DC425E|      280|
|306DC4237|      277|
|306DC4210|      238|
|306DC4246|      236|
|306DC4229|      223|
|306DC4257|      202|
|306DC420E|      197|
|306DC4215|      183|
|306DC421F|      176|
|306DC425A|      156|
|306DC4251|      140|
|306DC4202|      131|
|306DC424D|      125|
|306DC4226|      122|
|306DC422A|      112|
|306DC422D|      108|
编程方式

查询Hive元数据库获取Hive 指定表字段信息。
注:对Hive元数据表结构不了解的同学用google搜几篇帖子看看或者参看本文末尾参考文章
不解释了,直接上代码

public static String getHiveMetaData(String hiveTableName) {
    Connection conn = getConn();
    String sql = "SELECT\n" +
            "  #TBLS.`TBL_NAME`,\n" +
            "  #表名\n" +
            "  COLUMNS_V2.`COLUMN_NAME`\n" +
            "  #列名\n" +
            "  #COLUMNS_V2.`TYPE_NAME` #列类型\n" +
            "FROM\n" +
            "  TBLS #元数据信息表\n" +
            "  LEFT JOIN SDS #数据存储表\n" +
            "    ON TBLS.SD_ID = SDS.SD_ID\n" +
            "  LEFT JOIN CDS\n" +
            "    ON SDS.CD_ID = CDS.CD_ID\n" +
            "  LEFT JOIN COLUMNS_V2 #字段信息表\n" +
            "    ON CDS.CD_ID = COLUMNS_V2.CD_ID\n" +
            "WHERE TBLS.`TBL_NAME` = \"gd_py_corp_sharehd_info\"";
    PreparedStatement pstmt;
    String result="";
    try {
        pstmt = (PreparedStatement)conn.prepareStatement(sql);
        ResultSet rs = pstmt.executeQuery();
        int col = rs.getMetaData().getColumnCount();

        while (rs.next()) {
            for (int i = 1; i <= col; i++) {
                result = result + rs.getString(i) + "\t";
            }
        }

    } catch (SQLException e) {
        e.printStackTrace();
    }
    return result;
}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}
import utils.DataUtils

/**
  * 统计不同渠道进件数量
  * Created by Michael on 2016/11/29.
  */
object Custmer_Statistics_StructType {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Custmer_Statistics_StructType").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    //RDD隐式转换成DataFrame
    import sqlContext.implicits._
    //读取本地文件
    val blb_intpc_infoRow = sc.textFile("C:/work/ideabench/SparkSQL/data/channel/blb_intpc_info_10000_2.txt")
      .map(_.split("\\t"))
      .map(d => {
        Row(d(0),d(1))
      })

    //Hive表字段元数据信息
    val schemaString = DataUtils.getHiveMetaData("blb_intpc_info")
    val schema =StructType(schemaString.split("\\t")
      .map(fieldName => StructField(fieldName, StringType, true)))

    val blb_intpc_infoDF = sqlContext.createDataFrame(blb_intpc_infoRow,schema)
    //注册表
    blb_intpc_infoDF.registerTempTable("blb_intpc_info")

    /**
      * 分渠道进件数量统计并按进件数量降序排列
      */
    blb_intpc_infoDF.registerTempTable("blb_intpc_info")
    sqlContext.sql("" +
      "select chnl_code,count(*) as intpc_sum " +
      "from blb_intpc_info " +
      "group by chnl_code").toDF().sort($"intpc_sum".desc).show()
  }

}
参考文章

1.hive元数据:http://blog.csdn.net/wf1982/article/details/6644258
2.SparkSQL官网:http://spark.apache.org/docs/1.6.0/sql-programming-guide.html

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 196,583评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,669评论 2 374
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 143,684评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,682评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,533评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,396评论 1 275
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,814评论 3 387
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,458评论 0 255
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,745评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,789评论 2 314
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,565评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,410评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,828评论 3 300
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,050评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,342评论 1 253
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,793评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,010评论 2 337

推荐阅读更多精彩内容