Spark使用总结-Scala

Scala Spark使用

特殊引用

使用比如hive sql或者rdd转换toDF是通过隐式转换,需要增加相关的包引用
1 hive sql: import spark.sql
2 隐式函数: import spark.implicits._

SparkSession

初始化

val sparkSession = SparkSession
  .builder()
  .appName("Spark Hive Example")
  .enableHiveSupport() //支持hive
  .getOrCreate()

json

1 读取json

sparkSession
  .read
  .schema(...)
  .json(path)

2 读取有效hdfs路径文件:有的时候按照时间范围获取文件,但有可能其中有的小时没有生成文件,则可:
1)hdfs:

import org.apache.hadoop.fs.{FileSystem, Path}
val fs: FileSystem = FileSystem.get(sparkSession.sparkContext.hadoopConfiguration) 
val validPaths = pathList.flatMap( p => fs.globStatus(new Path(p)).map(_.getPath.toString))
  1. OSS:可以用阿里云的OSSClient,如果没有,则循环try-cache来获取:
def readMultiOssPaths(spark: SparkSession, schema: StructType, paths: List[String], colNameList: Array[String]) = {
    var Df = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schema)
    for (path <- paths) {
      try {
        val singleDF = spark
          .read
          .schema(schema)
          .json(path)
          .select(colNameList.map(col): _*)
        Df = Df.union(singleDF)
      } catch {
        case e: Exception => println(s"${path} exception: ${e.getMessage}")
      }
    }
    Df
  }
}

3 写入hdfs或者oss文件系统

sparkSession
  .write
  .mode("override")//支持覆盖写入
  .format("json")
  .option("compression", "gzip")
  .save(path)

mysql

在spark-submit时,需要--jars yourpath/json-serde-xxx-jar-with-dependencies.jar
1 读取mysql表
基本模式:

sparkSession
  .read
  .format("jdbc")
  .option("driver", "com.mysql.jdbc.Driver")
  .option("url", jdbcUrl)
  .option("dbtable",table)
  .load()

分区模式:

sparkSession
  ...
  .option("partitionColumn", partitionCol) //分区依赖的列,选择均匀避免倾斜
  .option("lowerBound", 1) //partitionColumn最小值
  .option("upperBound", Long.MaxValue)//partitionColumn最大值
  .option("numPartitions", numPartition)//分区数
  ...

注意:join hive是无法支持的,需要将join的结果作为dbtable传入

val table = s"""(
       |select
       |    a.id,
       |    b.id,
       |    a.name,
       |    unix_timestamp(a.create_time) * 1000 as create_timestamp,
       |from
       |    a
       |    inner join
       |        b
       |    on
       |        (
       |         a.id = b.a_id and
       |         a.create_time >= '2019-09-18 00:00:00' and
       |         a.create_time < '2019-09-19 00:00:00' and
       |         a.name is not null
       |        )
       |) as oc
       |""".stripMargin
val aJoinBDF = sparkSession
  .read
  .format("jdbc")
  .option("driver", "com.mysql.jdbc.Driver")
  .option("url", jdbcUrl)
  .option("dbtable",table)
  .load()    

Spark Dataframe

转rdd

df.rdd

rdd 转DF

如果是定义好的POJO,当rdd map或者其他操作转换出来的是POJO的对象,则可以直接toDF为以对象属性为列

column操作

1 全部修改

val columnMap = Map(旧列名->新列名)
val newDF = oldDF.select(oldDF.columns.map(c => col(c).as(columnMap(c))): _*)

2 修改一个列名

DF.withColumnRenamed("old_col_name", "new_col_name")

3 合并一些列名为一个新的列名

val newDF = oldDF
  .withColumn("combined", struct("*"))//*可以替换为具体的一些列名,比如abc则为struct("a", "b", "c")

4 删掉列名
DF.drop("col_a", "cod_b")
5 选取列名
DF.select("a", "b", ...)
6 过滤
DF.filter(col("a") === "value" || col("b") =!= "value")

Dataframe join

join后,如果有同名的列名,join结果是同时有两个相同名称的列名。暂没有什么高级的处理方法,需要区分时,提前将不同DF中的相同列名重命名为不同的列名
1 使用col

val joined = aDF.as("aDF")
  .join(
    bDF.as("bDF"),
    col("aDF.id") === col("bDF.id"),
    "left"
  )

2 使用seq(适合不用区分两个表的相同col名称)

val joined = aDF
  .join(
    bDF,
    Seq("id"),
    "left"
  )

分组

有时需要将不同行按照相同的某列进行分组,然后进行计算,此时需要用groupBy分组,分组后是List(Record, Record)

DF
  .groupBy("col_name")
  .agg(collect_list("grouped_col_name").as("grouped_col_name"))

去重

有的时候需要取比如每个公司每个产品最新的一条结果,则需要对数据进行去重,并取时间最近的一条

val groupColNames = List("company", "product")
val tmColName = "create_timestamp"
val w =  Window.partitionBy(groupColNames.map(col): _*).orderBy(col(tmColName).desc)
var newDf = df
  .withColumn("rn", row_number.over(w))
  .where(col("rn") === 1)
  .toDF()

rdd

由于可能存在一些复杂的计算、合并,需要将DataFrame的转为rdd进行处理

解析

Dataframe转rdd后,每一行都是一个org.apache.spark.sql.Row
1 获取Column: row.getAs[...]("column_name")。这种支持基本的String,Long, 甚至Map
2 在DataFrame中,如果没有指定Map或者Array,则这些格式Spark是会指定为Struct类型,解析需:

  1. Map中值解析
val mapCol = row.getAs[Row]("map_col")
val a = mapCol.getAs[String]("a")

2)Array中值解析

val arrCols = row.getAs[Seq[Row]]("array_col")
for(col <- array_col){
    val colA = col.getAs[String]("a")
}

拍平

1)如果结果是List,使用flatMap:rdd.map(r => r)
2)如果结果是(key, List(...)), 使用flatmap:

rdd
  .flatMap{
    case (key, arr) => {
      arr.map(ele => (key, ele))
    }
  }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,393评论 5 467
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,790评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,391评论 0 330
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,703评论 1 270
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,613评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,003评论 1 275
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,507评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,158评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,300评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,256评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,274评论 1 328
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,984评论 3 316
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,569评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,662评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,899评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,268评论 2 345
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,840评论 2 339

推荐阅读更多精彩内容