前言
GitHub地址:https://github.com/guofei1219
背景
统计新渠道进件数量
SparkSQL操作RDD两种方式对比
1.使用反射推断Schema类型,具体解析参考下面的官网描述
The Scala interface for Spark SQL supports automatically converting an RDD containing case classes to a DataFrame. The case class defines the schema of the table. The names of the arguments to the case class are read using reflection and become the names of the columns. Case classes can also be nested or contain complex types such as Sequences or Arrays. This RDD can be implicitly converted to a DataFrame and then be registered as a table. Tables can be used in subsequent SQL statements.
case class blb_intpc_info(chnl_code:String,id_num:String)
2.使用编程方式制定Schema类型,具体解析参考下面的官网描述
When case classes cannot be defined ahead of time (for example, the structure of records is encoded in a string, or a text dataset will be parsed and fields will be projected differently for different users), a DataFrame can be created programmatically with three steps.
>1.Create an RDD of Rows from the original RDD;
2.Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1.
2.Apply the schema to the RDD of Rows via createDataFrame method provided by SQLContext.
val structTypes = StructType(Array(
StructField("chnl_code", StringType, true),
StructField("id_num", StringType, true)
))
对比总结
1.case class模板类模式可视化比较好
2.case class模板类参数上限为22个,对于字段多的不能使用
3.编程方式更适合日常开发
代码实现
源数据格式
,第一列为渠道代码、第二列为进件ID
306DC4246 411324199209142831
306DC423A 360124199011241838
306DC423D 440802198010290019
306DC4226 612328197403120016
306DC4201 452629199104050312
306DC4201 350212198505025514
反射方式
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
/**
* 统计不同渠道进件数量
* Created by Michael on 2016/11/29.
*/
object Custmer_Statistics_CaseClass {
/**
* 使用模板类描述表元数据信息
* @param chnl_code
* @param id_num
*/
case class blb_intpc_info(chnl_code:String,id_num:String)
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Custmer_Statistics_CaseClass").setMaster("local[2]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
//RDD隐式转换成DataFrame
import sqlContext.implicits._
//读取本地文件
val blb_intpc_infoDF = sc.textFile("C:/work/ideabench/SparkSQL/data/channel/blb_intpc_info_10000_2.txt")
.map(_.split("\\t"))
.map(d => blb_intpc_info(d(0), d(1))).toDF()
//注册表
blb_intpc_infoDF.registerTempTable("blb_intpc_info")
/**
* 分渠道进件数量统计并按进件数量降序排列
*/
blb_intpc_infoDF.registerTempTable("blb_intpc_info")
sqlContext.sql("" +
"select chnl_code,count(*) as intpc_sum " +
"from blb_intpc_info " +
"group by chnl_code").toDF().sort($"intpc_sum".desc).show()
}
}
运行结果
+---------+---------+
|chnl_code|intpc_sum|
+---------+---------+
|306DC421E| 631|
|306DC4201| 603|
|306DC422B| 472|
|306DC4221| 326|
|306DC425E| 280|
|306DC4237| 277|
|306DC4210| 238|
|306DC4246| 236|
|306DC4229| 223|
|306DC4257| 202|
|306DC420E| 197|
|306DC4215| 183|
|306DC421F| 176|
|306DC425A| 156|
|306DC4251| 140|
|306DC4202| 131|
|306DC424D| 125|
|306DC4226| 122|
|306DC422A| 112|
|306DC422D| 108|
编程方式
查询Hive元数据库获取Hive 指定表字段信息。
注:对Hive元数据表结构不了解的同学用google搜几篇帖子看看或者参看本文末尾参考文章
不解释了,直接上代码
public static String getHiveMetaData(String hiveTableName) {
Connection conn = getConn();
String sql = "SELECT\n" +
" #TBLS.`TBL_NAME`,\n" +
" #表名\n" +
" COLUMNS_V2.`COLUMN_NAME`\n" +
" #列名\n" +
" #COLUMNS_V2.`TYPE_NAME` #列类型\n" +
"FROM\n" +
" TBLS #元数据信息表\n" +
" LEFT JOIN SDS #数据存储表\n" +
" ON TBLS.SD_ID = SDS.SD_ID\n" +
" LEFT JOIN CDS\n" +
" ON SDS.CD_ID = CDS.CD_ID\n" +
" LEFT JOIN COLUMNS_V2 #字段信息表\n" +
" ON CDS.CD_ID = COLUMNS_V2.CD_ID\n" +
"WHERE TBLS.`TBL_NAME` = \"gd_py_corp_sharehd_info\"";
PreparedStatement pstmt;
String result="";
try {
pstmt = (PreparedStatement)conn.prepareStatement(sql);
ResultSet rs = pstmt.executeQuery();
int col = rs.getMetaData().getColumnCount();
while (rs.next()) {
for (int i = 1; i <= col; i++) {
result = result + rs.getString(i) + "\t";
}
}
} catch (SQLException e) {
e.printStackTrace();
}
return result;
}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}
import utils.DataUtils
/**
* 统计不同渠道进件数量
* Created by Michael on 2016/11/29.
*/
object Custmer_Statistics_StructType {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Custmer_Statistics_StructType").setMaster("local[2]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
//RDD隐式转换成DataFrame
import sqlContext.implicits._
//读取本地文件
val blb_intpc_infoRow = sc.textFile("C:/work/ideabench/SparkSQL/data/channel/blb_intpc_info_10000_2.txt")
.map(_.split("\\t"))
.map(d => {
Row(d(0),d(1))
})
//Hive表字段元数据信息
val schemaString = DataUtils.getHiveMetaData("blb_intpc_info")
val schema =StructType(schemaString.split("\\t")
.map(fieldName => StructField(fieldName, StringType, true)))
val blb_intpc_infoDF = sqlContext.createDataFrame(blb_intpc_infoRow,schema)
//注册表
blb_intpc_infoDF.registerTempTable("blb_intpc_info")
/**
* 分渠道进件数量统计并按进件数量降序排列
*/
blb_intpc_infoDF.registerTempTable("blb_intpc_info")
sqlContext.sql("" +
"select chnl_code,count(*) as intpc_sum " +
"from blb_intpc_info " +
"group by chnl_code").toDF().sort($"intpc_sum".desc).show()
}
}
参考文章
1.hive元数据:http://blog.csdn.net/wf1982/article/details/6644258
2.SparkSQL官网:http://spark.apache.org/docs/1.6.0/sql-programming-guide.html