推荐系统之离线模块

import java.sql.Date

import java.text.SimpleDateFormat

import org.apache.spark.SparkConf

import org.apache.spark.sql.{Dataset, SparkSession}

object StatoisticsRecommender {

val MONGO_URI:String ="mongodb://hadoop100:27017/recom3"

    val MONGODB_DATABASE:String ="recom3"

    val MONGODB_RATING_COLLECTION ="Rating"

    val MONGODB_MOVIE_COLLECTION ="Movie"

    //驱动

    val MONGO_DRVIVE_CLASS:String ="com.mongodb.spark.sql"

    //优质电影

    val MONGODB_RATE_MORE_MOVIES_COLLECTION ="RateMoreMovies"

    //热门电影

    val MONGODB_RATE_MORE_MOVIES_RECENTLY_COLLECTION ="RateMoreMoviesRecently"

    //平均评分

    val MONGODB_AVERAGE_MOVIES_SCORE_COLLECTION ="AverageMoviesScore"

    val MONGODB_GENRES_TOP_MOVIES_COLLECTION ="GenresTopMovies"

    def main(args: Array[String]): Unit = {

//使用map封装参数

      val conf =Map("spark.cores" ->"local[2]",

"mongo.uri" ->MONGO_URI,

"mongo.db" ->MONGODB_DATABASE)

//sparkconf

      val sparkConf =new SparkConf().setAppName("statisticsRecommender").setMaster(conf("spark.cores"))

//sparkSeesion

      val spark = SparkSession.builder().config(sparkConf).getOrCreate()

implicit val mongoConf =new MongoConfig(conf("mongo.uri"), conf("mongo.db"))

//从mongo 中读取数据

      //导入sparkSession的隐式转换

      import spark.implicits._

val ratings = spark.read

.option("uri", mongoConf.uri)

.option("collection",MONGODB_RATING_COLLECTION)

.format(MONGO_DRVIVE_CLASS)

.load()

.as[MoviesRating]

.cache

val movies = spark.read

.option("uri", mongoConf.uri)

.option("collection",MONGODB_MOVIE_COLLECTION)

.format(MONGO_DRVIVE_CLASS)

.load()

.as[Movie]

.cache

//把数据注册成view[下面用到]

      ratings.createOrReplaceTempView("ratings")

//分析

      //1.优质电影=>总的评分个数最多的电影==>RateMoreMoveies

      rateMore(spark)

//2.热门电影=>一个月内评分最多的电影==>RateMoreRecentlyMovies

      rateMoreRecently(spark)

//3.电影的平均评分==>AverageMovies

      averageMovieScore(spark, movies)

//4.每类电影topN  ==>GenresTopMovies

      //关闭资源

    }

/**

    * 优质电影的计算

    *

    * @param spark

*/

    def rateMore(spark: SparkSession)(implicit mongoConf: MongoConfig): Unit = {

//select mid, count(mid) as count from ratings group by mid order by count desc

      //根据业务执行sqlma

      val rateMoreDF = spark.sql("select mid, count(mid) as count from ratings group by mid order by count desc")

//把结果数据写入到mongodb对应的表中

      rateMoreDF

.write

.option("uri", mongoConf.uri)

.option("collection",MONGODB_RATE_MORE_MOVIES_COLLECTION)

.mode("overwrite")

.format(MONGO_DRVIVE_CLASS)

.save()

}

/**

    * 热门电影:月评分最多的电影

    *

    * @param spark

*/

    def rateMoreRecently(spark: SparkSession)(implicit mongoConf: MongoConfig): Unit = {

val simpleDateFormat =new SimpleDateFormat("yyyyMM")

//sparkSql自定义函数,用于将时间戳转化成年月的形式(乘上1000 是将秒为单位的转化成毫秒)

      spark.udf.register("changDate", (x: Long) => simpleDateFormat.format(new Date(x *1000L)).toLong)

//根据业务执行sql

      val yeahMonthOfRatings = spark.sql("select mid, uid, score, changDate(timestamp) as yeahmonth from ratings")

//将上一步得到的df注册成表ymRatings

      yeahMonthOfRatings.createOrReplaceTempView("ymRatings")

//根据业务执行sql

      val rateMoreRecentlyDF = spark.sql("select mid, count(mid) as count,yeahmonth from ymRatings group by yeahmonth,mid order by yeahmonth desc,count desc")

//将我们的结果数据写入到mongo的RateMoreMoviesRecently表中

      rateMoreRecentlyDF

.write

.option("uri", mongoConf.uri)

.option("collection",MONGODB_RATE_MORE_MOVIES_RECENTLY_COLLECTION)

.mode("overwrite")

.format(MONGO_DRVIVE_CLASS)

.save()

}

/**

    * 计算电影的平均评分

    *

    * @param spark

    * @param movies

    * @param mongoConf

*/

    def averageMovieScore(spark: SparkSession, movies: Dataset[Movie])(implicit mongoConf: MongoConfig): Unit = {

//求出每个电影的平均评分

      val averageMovieScoreDF = spark.sql("select mid, avg(score) as avg from ratings group by mid").cache()

//把结果数据写入到mongo的AverageMoviesScore表中

      averageMovieScoreDF

.write

.option("uri", mongoConf.uri)

.option("collection",MONGODB_AVERAGE_MOVIES_SCORE_COLLECTION)

.mode("overwrite")

.format(MONGO_DRVIVE_CLASS)

.save()

import spark.implicits._

//电影里面所有的类别,使用list进行封装

      val genres =List("Action","Adventure","Animation","Comedy","Ccrime","Documentary","Drama","Family","Fantasy","Foreign","History","Horror","Music","Mystery"

        ,"Romance","Science","Tv","Thriller","War","Western")

//把电影里面的类别由list的类型转化成rdd的类型

      val genresRdd = spark.sparkContext.makeRDD(genres)

// 统计每种类别最热电影【每种类别中平均评分最高的10部电影】

      val moviesWithSocreDF = movies.join(averageMovieScoreDF,Seq("mid","mid")).select("mid","avg","genres").cache()

//类别.cartesian(电影数据集(含平均评分))

      val genresTopMovies = genresRdd.cartesian(moviesWithSocreDF.rdd).filter(x => {

xmatch {

//包含的就留下,不包含的就去掉

          case (genres, row) => {

row.getAs[String]("genres").toLowerCase().contains(genres.toLowerCase)

}

}

})

// mid avg genres

        .map {

//对数据的格式进行一个调整

          case (genres, row) => {

(genres, (row.getAs[Int]("mid"), row.getAs[Double]("avg")))

}

}//按照电影的类别惊醒分组

        //    (key,((),()))

        .groupByKey()

.map {

case (genres, items) => {

GenresRecommendation(genres, items.toList.sortWith(_._2 > _._2).slice(0,10).map(x =>Recommendation(x._1, x._2)))

}

}.toDF

//把结果数据写入到mongo的GenresTopMovies表中

      genresTopMovies

.write

.option("uri", mongoConf.uri)

.option("collection",MONGODB_GENRES_TOP_MOVIES_COLLECTION)

.mode("overwrite")

.format(MONGO_DRVIVE_CLASS)

.save()

}

/**

  * 推荐项目

  *

  * @param rid 项目ID

  * @param r  推荐分数

  */

  case class Recommendation(rid: Int, r: Double)

/**

  * 电影种类推荐样例类

  *

  * @param genres

  * @param recs

*/

  case class GenresRecommendation(genres:String, recs:Seq[Recommendation])

/**

  * MongoDB 配置对象

  *

  * @param uri MongoDB连接地址

  * @param db  操作的MongoDB数据库

  */

  case class MongoConfig(val uri:String,val db:String)

/**

  * Rating Class 电影的评分类

  *

  * @param uid      用户的ID

  * @param mid      电影的ID

  * @param score    用户为该电影的评分

  * @param timestamp 用户为该电影评分的时间

  */

  case class MoviesRating(val uid: Int,val mid: Int,val score: Double,val timestamp: Int)

/**

  * Movie Class 电影类

  *

  * @param mid      电影的ID

  * @param name      电影的名称

  * @param descri    电影的描述

  * @param timelong  电影的时长

  * @param issue    电影的发行时间

  * @param shoot    电影的拍摄时间

  * @param language  电影的语言

  * @param genres    电影的类别

  * @param actors    电影的演员

  * @param directors 电影的导演

  */

  case class Movie(val mid: Int,val name:String,val descri:String,val timelong:String,val issue:String,val shoot:String,val language:String,val genres:String,val actors:String,val directors:String)

}

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,539评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,594评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,871评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,963评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,984评论 6 393
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,763评论 1 307
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,468评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,357评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,850评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,002评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,144评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,823评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,483评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,026评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,150评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,415评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,092评论 2 355