当SparkSQL里内置的函数无法满足我们业务需求时,我们可以通过自定义UDF来实现。
1、自定义ConcatLongStringUDF
/**
* 将两个字段拼接起来(使用指定的分隔符)
*/
class ConcatLongStringUDF extends UDF3[Long, String, String, String] {
override def call(v1: Long, v2: String, spilt: String): String = {
v1.toString + spilt + v2
}
}
这里自定义UDF来使用指定的分隔符来拼接Long和String
2、使用
首先注册自定义UDF并指定返回类型
spark.udf.register("concat_long_str", new ConcatLongStringUDF(), DataTypes.StringType)
使用自定义UDF
spark.sql("select concat_long_str(session_id,page_id,':') from temp_session_page").show()
完成代码UDFTest
object UDFTest {
def main(args: Array[String]): Unit = {
//权限问题
System.setProperty("HADOOP_USER_NAME", "hadoop")
val spark =
SparkSession.builder()
.appName("UDFTest")
.master("local[1]")
.getOrCreate()
spark.udf.register("concat_long_str", new ConcatLongStringUDF(), DataTypes.StringType)
val tempSchema: StructType = StructType(Seq(
StructField("session_id", LongType, false),
StructField("page_id", StringType, false)
))
val rowRDD =
spark.sparkContext.parallelize(Array("1,page1", "2,page2"))
.map(line => {
val fields = line.split(",")
Row(fields(0).toLong, fields(1))
})
val tempDF = spark.createDataFrame(rowRDD, tempSchema)
tempDF.createOrReplaceTempView("temp_session_page")
spark.sql("select * from temp_session_page").show()
spark.sql("select concat_long_str(session_id,page_id,':') from temp_session_page").show()
}
}
打印结果
+----------+-------+
|session_id|page_id|
+----------+-------+
| 1| page1|
| 2| page2|
+----------+-------+
+-------------------------------------------+
|UDF:concat_long_str(session_id, page_id, :)|
+-------------------------------------------+
| 1:page1|
| 2:page2|
+-------------------------------------------+
可以根据自己业务的需求定制更多UDF