Spark机器学习实战(四)电影推荐算法 - 协同过滤
这篇文章将要介绍推荐算法中最核心的部分,协同过滤。基于大量用户对大量电影的评分,将要完成的任务有两个:第一,向用户推荐可能感兴趣的电影(即可能会评高分的电影);第二,找出和某部电影相似的电影出来,即找出的特征相似的电影来。
文章中列出了关键代码,完整代码见我的github repository,这篇文章的代码在
chapter04/src/main/scala/ScalaApp.scala
第1步:协同过滤简介
我们假设有100个用户,100个电影以及用户对电影的评分。然而,没有一个网站会拥有观看了所有电影并且评分了的用户的。在理想状态中,我们希望有10000个电影评分,但是这显然是不可能的。我们可能有1000条评分,可能500条。
协同过滤要做的就是,基于这仅有的1000条评分,估计出剩下的9000条未打出的评分。这有什么意义呢?我们这样就可以预估出用户可能喜欢的电影,并向它们推荐。同时,我们可以学习出用户的特征,电影的特征并为它们找出相似的同类来。算法很强大,目前各大购物,视频网站都在使用它。
我们用矩阵的角度来看一下这个问题,假设我们有U名用户,I个电影,部分用户对部分电影进行了评分,评分可以表示为一个U*I
的矩阵,这个矩阵大部分值是缺失的,因此为一个稀疏矩阵:
现在我们想把用户表示为一个K维的特征,电影也表示成一个K维的特征,特征的点积就是对应的评分。协同过滤就是利用现有的评分来学习出这些K维特征,从而填补未评分的项
从数学上说就是把U*I
的矩阵,分解为U*K
与K*I
的矩阵积,满足已有评分正确的同时,填补未评分项。为什么要分解呢?因为K远小于U和I,存储方便,表示简单。
具体求分解矩阵的方式是采用最小二乘法(Alternating Least Squares,ALS)。
另外补充一句,这种矩阵分解称为显示矩阵分解。此外还有隐式矩阵分解,区别在于评分矩阵的项为二进制值,另外多了一个U*I
的信心权重矩阵表示对于二进制值的信心度。像下面这样:
第2步:从数据集中提取训练特征
我们现在要从MovieLens 100k数据集中提取出可以用来训练协同过滤模型的特征。
u.data中的每一条数据长这样,分别代表用户,电影,分数,时间戳。时间戳在这里并不需要。
196 242 3 881250949
Rating是MLlib中定义的数据格式,其中包括了用户编号,Item即电影编号以及评分。
import org.apache.spark.mllib.recommendation.Rating
val sc = new SparkContext("local[2]", "First Spark App")
sc.setLogLevel("ERROR")
val rawData = sc.textFile("data/u.data")
val rawRatings = rawData.map(_.split("\t").take(3))
val ratings = rawRatings.map {case Array(user, movie, rating)
=> Rating(user.toInt, movie.toInt, rating.toDouble)}
第3步:训练模型
ALS是MLlib提供的最小二乘回归模型,原理是每次控制一个矩阵,优化另一个,反复迭代最终收敛。ALS接收的三个数值参数分别为,特征长度,即K的大小,Iteration次数,和lambda参数(这个参数应该要由交叉验证得出,这里取了0.01)。
import org.apache.spark.mllib.recommendation.ALS
val model = ALS.train(ratings, 50, 10, 0.01)
println(model.userFeatures.count)
println(model.userFeatures.take(1))
println(model.predict(196, 242))
训练完毕后就可以看一下我们训练的结果,以及试着预测一下196号用户对242号影片的评分了:评分结果为2.9820089343215352。
第4步:向用户推荐电影
假设我们要向789号用户推荐5部可能感兴趣的电影,推荐模型内置函数帮我们完成了这一点,可惜我们得到的是五个Rating实例,还不够具体。
val userId = 789
val K = 5
val topKRecs = model.recommendProducts(userId, K)
println(topKRecs.mkString("\n"))
结果为
Rating(789,182,5.525450045512849)
Rating(789,573,5.351473049607477)
Rating(789,504,5.1301817702377095)
Rating(789,97,5.125571781347671)
Rating(789,92,5.121346111181028)
我们进一步把这位用户喜欢的电影和推荐给他的电影名字给打印出来。中间用到了u.item数据库来读取电影名字
val movies = sc.textFile("data/u.item")
val titles = movies.map(line => line.split("\\|")).map(fields => (fields(0).toInt, fields(1))).collectAsMap()
val moviesForUser = ratings.keyBy(_.user).lookup(789)
println("User " + userId +"'s favorite movies:")
moviesForUser.sortBy(-_.rating).take(5).map(rating => (titles(rating.product), rating.rating)).foreach(println)
println("Movies recommended to user " + userId)
topKRecs.map(rating => (titles(rating.product), rating.rating)).foreach(println)
结果为:
User 789's favorite movies:
(Godfather, The (1972),5.0)
(Trainspotting (1996),5.0)
(Dead Man Walking (1995),5.0)
(Star Wars (1977),5.0)
(Swingers (1996),5.0)
Movies recommended to user 789
(GoodFellas (1990),5.525450045512849)
(Body Snatchers (1993),5.351473049607477)
(Bonnie and Clyde (1967),5.1301817702377095)
(Dances with Wolves (1990),5.125571781347671)
(True Romance (1993),5.121346111181028)
第5步:寻找相似电影
相似电影的寻找并没有内置的函数,我们的思路是找出与目标电影的K维特征“夹角”最小的电影,两个向量的夹角定义为向量的点积除以它们的二阶范数。
import org.jblas.DoubleMatrix
def cosineSimilarity(vec1: DoubleMatrix, vec2: DoubleMatrix): Double = {
vec1.dot(vec2) / (vec1.norm2() * vec2.norm2())
}
val itemId = 567
val itemFactor = model.productFeatures.lookup(itemId).head
val itemVector = new DoubleMatrix(itemFactor)
val sims = model.productFeatures.map {case (id, factor) =>
val factorVector = new DoubleMatrix(factor)
val sim = cosineSimilarity(factorVector, itemVector)
(id, sim)
}
val sortedSims = sims.top(K)(Ordering.by[(Int, Double), Double] {case (id, similarity) => similarity})
println(sortedSims.mkString("\n"))
结果为:
(567,1.0)
(403,0.7299325745744089)
(853,0.7260626510960669)
(563,0.7231278972915091)
(413,0.7225324149301402)
第一个结果是它自身,道理很显然。
下面同样是把这些电影的名字输出来:
println("Item number " + itemId + "'s name:")
println(titles(itemId))
println("Names of similar movies:")
println(sortedSims.map {case (id, similarity) => (titles(id), similarity)}.mkString("\n"))
结果为:
Item number 567's name:
Wes Craven's New Nightmare (1994)
Names of similar movies:
(Wes Craven's New Nightmare (1994),1.0)
(Batman (1989),0.7299325745744089)
(Braindead (1992),0.7260626510960669)
(Stephen King's The Langoliers (1995),0.7231278972915091)
(Tales from the Crypt Presents: Bordello of Blood (1996),0.7225324149301402)
可以看到这些电影都是科幻悬疑类的,说明算法还是很有效的。
第6步:推荐模型效果评估
算法得出模型后,我们要评估算法就需要规定一些数值结果。最常用的检验标准为MSE,即均方误差。但是这个检查方式与推荐无关,它检验的是对已有评分能否完美再现。即比较真实得分与模型计算得分之间的差。计算方式也不难。其中RMSE为MSE开根号。
val usersProducts = ratings.map {case Rating(user, product, rating) => (user, product)}
val predictions = model.predict(usersProducts).map {case Rating(user, product, rating) => ((user, product), rating)}
val ratingsAndPredictions = ratings.map {case Rating(user, product, rating) =>
((user, product), rating)}.join(predictions)
val MSE = ratingsAndPredictions.map {
case ((user, product), (actual, predicted)) => math.pow((actual - predicted), 2)
}.reduce((x, y) => x + y) / ratingsAndPredictions.count
println("MSE = " + MSE)
println("RMSE = " + math.sqrt(MSE))
我们可以用MLlib的函数来计算得出一样的结果
import org.apache.spark.mllib.evaluation.RegressionMetrics
val predictedAndTrue = ratingsAndPredictions.map {
case ((user, product), (actual, predicted)) => (actual, predicted)}
val regressionMetrics = new RegressionMetrics(predictedAndTrue)
println("MLlib MSE = " + regressionMetrics.meanSquaredError)
println("MLlib RMSE = " + regressionMetrics.rootMeanSquaredError)
两者的计算结果完全一致:
MSE = 0.08519901884772077
RMSE = 0.29188870969552894
MLlib MSE = 0.08519901884772077
MLlib RMSE = 0.29188870969552894
除了MSE外还有一种评判方式称为Mean Average Precision即MAP,MAP是用测试数据来得出的,即我们知道最佳的推荐是什么,来和模型的预测结果相比较。然而我们暂时没有这样的测试集,所以下面的测试并不严谨,得分自然也很低,我们的做法是把用户的评分过的当作应该推荐的。
val userMovies = ratings.map{ case Rating(user, product, rating) => (user, product) }.groupBy(_._1)
val itemFactors = model.productFeatures.map { case (id, factor) => factor }.collect()
val itemMatrix = new DoubleMatrix(itemFactors)
val imBroadcast = sc.broadcast(itemMatrix)
val allRecs = model.userFeatures.map{ case (userId, array) =>
val userVector = new DoubleMatrix(array)
val scores = imBroadcast.value.mmul(userVector)
val sortedWithId = scores.data.zipWithIndex.sortBy(-_._1)
val recommendedIds = sortedWithId.map(_._2 + 1).toSeq
(userId, recommendedIds)
}
val predictedAndTrueForRanking = allRecs.join(userMovies).map{ case (userId, (predicted, actualWithIds)) =>
val actual = actualWithIds.map(_._2)
(predicted.toArray, actual.toArray)
}
val rankingMetrics = new RankingMetrics(predictedAndTrueForRanking)
println("Mean Average Precision = " + rankingMetrics.meanAveragePrecision)
结果没有太多参考价值,也不贴出来了。仅仅是为了介绍一下MAP的计算方法,利用了MLlib的库,很简单。