Spark MLlib ALS 推荐系统

Spark 机器学习库从 1.2 版本以后被分为两个包:

  • spark.mllib包含基于RDD的原始算法API。Spark MLlib 历史比较长,在1.0 以前的版本即已经包含了,提供的算法实现都是基于原始的 RDD。
  • spark.ml 则提供了基于DataFrames 高层次的API,可以用来构建机器学习工作流(PipeLine)。ML Pipeline 弥补了原始 MLlib 库的不足,向用户提供了一个基于 DataFrame 的机器学习工作流式 API 套件。

从Spark2.0开始,基于RDD的API进入维护模式(即不增加任何新的特性),并预期于3.0版本的时候被移除出MLLib。因此,我们将以ml包为主进行介绍。

一个典型的机器学习过程从数据收集开始,要经历多个步骤,才能得到需要的输出。这非常类似于流水线式工作,即通常会包含源数据ETL(抽取、转化、加载),数据预处理,指标提取,模型训练与交叉验证,新数据预测等步骤。

在介绍工作流之前,我们先来了解几个重要概念:

  • DataFrame:使用Spark SQL中的DataFrame作为数据集,它可以容纳各种数据类型。 较之 RDD,包含了 schema 信息,更类似传统数据库中的二维表格。它被 ML Pipeline 用来存储源数据。例如,DataFrame中的列可以是存储的文本,特征向量,真实标签和预测的标签等。

  • Transformer:翻译成转换器,是一种可以将一个DataFrame转换为另一个DataFrame的算法。比如一个模型就是一个 Transformer。它可以把 一个不包含预测标签的测试数据集 DataFrame 打上标签,转化成另一个包含预测标签的 DataFrame。技术上,Transformer实现了一个方法transform(),它通过附加一个或多个列将一个DataFrame转换为另一个DataFrame。

  • Estimator:翻译成估计器或评估器,它是学习算法或在训练数据上的训练方法的概念抽象。在 Pipeline 里通常是被用来操作 DataFrame 数据并生产一个 Transformer。从技术上讲,Estimator实现了一个方法fit(),它接受一个DataFrame并产生一个转换器。如一个随机森林算法就是一个 Estimator,它可以调用fit(),通过训练特征数据而得到一个随机森林模型。

  • Parameter:Parameter 被用来设置 Transformer 或者 Estimator 的参数。现在,所有转换器和估计器可共享用于指定参数的公共API。ParamMap是一组(参数,值)对。

  • PipeLine:翻译为工作流或者管道。工作流将多个工作流阶段(转换器和估计器)连接在一起,形成机器学习的工作流,并获得结果输出。

工作流如何工作

要构建一个 Pipeline工作流,首先需要定义 Pipeline 中的各个工作流阶段PipelineStage,(包括转换器和评估器),比如指标提取和转换模型训练等。有了这些处理特定问题的转换器和 评估器,就可以按照具体的处理逻辑有序的组织PipelineStages 并创建一个Pipeline。比如:

val pipeline = new Pipeline().setStages(Array(stage1,stage2,stage3,…))

然后就可以把训练数据集作为输入参数,调用 Pipeline 实例的 fit 方法来开始以流的方式来处理源训练数据。这个调用会返回一个 PipelineModel 类实例,进而被用来预测测试数据的标签。更具体的说,工作流的各个阶段按顺序运行,输入的DataFrame在它通过每个阶段时被转换。 对于Transformer阶段,在DataFrame上调用transform()方法。 对于估计器阶段,调用fit()方法来生成一个转换器(它成为PipelineModel的一部分或拟合的Pipeline),并且在DataFrame上调用该转换器的transform()方法。


ml-pipeline

上面,顶行表示具有三个阶段的流水线。 前两个(Tokenizer和HashingTF)是Transformers(蓝色),第三个(LogisticRegression)是Estimator(红色)。 底行表示流经管线的数据,其中圆柱表示DataFrames。 在原始DataFrame上调用Pipeline.fit()方法,它具有原始文本文档和标签。 Tokenizer.transform()方法将原始文本文档拆分为单词,向DataFrame添加一个带有单词的新列。 HashingTF.transform()方法将字列转换为特征向量,向这些向量添加一个新列到DataFrame。 现在,由于LogisticRegression是一个Estimator,Pipeline首先调用LogisticRegression.fit()产生一个LogisticRegressionModel。 如果流水线有更多的阶段,则在将DataFrame传递到下一个阶段之前,将在DataFrame上调用LogisticRegressionModel的transform()方法。

SparkSession是spark2.0的全新切入点,用以替代 sparkcontext ,StreamingContext,sqlContext,HiveContext。

直接隐式反馈。

al Array(training, test) = ratings.randomSplit(Array(0.8, 0.2))

val alsImplicit = new ALS().setMaxIter(5).setRegParam(0.01).setImplicitPrefs(true). 
    setUserCol("userId").setItemCol("movieId").setRatingCol("rating")

​ 在 ML 中的实现有如下的参数:

  • numBlocks 是用于并行化计算的用户和商品的分块个数 (默认为10)。
  • rank 是模型中隐义因子的个数(默认为10)。
  • maxIter 是迭代的次数(默认为10)。
  • regParam 是ALS的正则化参数(默认为1.0)。
  • implicitPrefs 决定了是用显性反馈ALS的版本还是用适用隐性反馈数据集的版本(默认是false,即用显性反馈)。
  • alpha 是一个针对于隐性反馈 ALS 版本的参数,这个参数决定了偏好行为强度的基准(默认为1.0)。
  • nonnegative 决定是否对最小二乘法使用非负的限制(默认为false)。

可以调整这些参数,不断优化结果,使均方差变小。比如:maxIter越大,regParam越 小,均方差会越小,推荐结果较优。

  • userCol:用户列的名字,String类型。对应于后续调用fit()操作时输入的- Dataset<Row>入参时用户id所在schema中的name
  • itemCol:item列的名字,String类型。对应于后续调用fit()操作时输入的Dataset<Row>入参时item id所在schema中的name
  • ratingCol:rating列的名字,String类型。对应于后续调用fit()操作时输入的Dataset<Row>入参时rating值所在schema中的name
  • predictionCol:String类型。做transform()操作时输出的预测值在Dataset<Row>结果的schema中的name,默认是“prediction”
  • coldStartStrategy:String类型。有两个取值"nan" or "drop"。这个参数指示用在prediction阶段时遇到未知或者新加入的user或item时的处理策略。尤其是在交叉验证或者生产场景中,遇到没有在训练集中出现的user/item id时。"nan"表示对于未知id的prediction结果为NaN。"drop"表示对于transform()的入参DataFrame中出现未知ids的行,将会在包含prediction的返回DataFrame中被drop。默认值是"nan"

接下来,把推荐模型放在训练数据上训练:
val modelImplicit = alsImplicit.fit(training)

使用训练好的推荐模型对测试集中的用户商品进行预测评分,得到预测评分的数据集:
val predictionsImplicit = modelImplicit.transform(test)

结果:

predictionsImplicit.show()
 
+------+-------+------+----------+-----------+
|userId|movieId|rating| timestamp| prediction|
+------+-------+------+----------+-----------+
|    13|     31|   1.0|1424380312| 0.33150947|
|     5|     31|   1.0|1424380312|-0.24669354|
|    24|     31|   1.0|1424380312|-0.22434244|
|    29|     31|   1.0|1424380312| 0.15776125|
|     0|     31|   1.0|1424380312| 0.51940984|
|    28|     85|   1.0|1424380312| 0.88610375|
|    13|     85|   1.0|1424380312| 0.15872183|
|    20|     85|   2.0|1424380312| 0.64086926|
|     4|     85|   1.0|1424380312|-0.06314563|
|     8|     85|   5.0|1424380312|  0.2783457|
|     7|     85|   4.0|1424380312|  0.1618208|
|    29|     85|   1.0|1424380312|-0.19970453|
|    19|     65|   1.0|1424380312| 0.11606887|
|     4|     65|   1.0|1424380312|0.068018675|
|     2|     65|   1.0|1424380312| 0.28533924|
|    12|     53|   1.0|1424380312| 0.42327875|
|    20|     53|   3.0|1424380312| 0.17345423|
|    19|     53|   2.0|1424380312| 0.33321634|
|     8|     53|   5.0|1424380312| 0.10090684|
|    23|     53|   1.0|1424380312| 0.06724724|
+------+-------+------+----------+-----------+
only showing top 20 rows                   
模型评估

​ 通过计算模型的均方根误差来对模型进行评估,均方根误差越小,模型越准确:

val evaluator = new RegressionEvaluator().setMetricName("rmse").setLabelCol("rating"). 
    setPredictionCol("prediction")

val rmseImplicit = evaluator.evaluate(predictionsImplicit)
//  rmseImplicit: Double = 1.8011620822359165

可以看到打分的均方差值为1.69和1.80左右。由于本例的数据量很少,预测的结果和实际相比有一定的差距。

使用推荐系统

ALS fit 方法返回 ALSModel,其有 recommendForAllUsers(numItems: Int) 和 recommendForAllItems(numUsers: Int) 方法,用于推荐。

例子
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.recommendation.ALS

case class Rating(userId: Int, movieId: Int, rating: Float, timestamp: Long)
def parseRating(str: String): Rating = {
  val fields = str.split("::")
  assert(fields.size == 4)
  Rating(fields(0).toInt, fields(1).toInt, fields(2).toFloat, fields(3).toLong)
}

val ratings = spark.read.textFile("data/mllib/als/sample_movielens_ratings.txt")
  .map(parseRating)
  .toDF()
val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2))

// Build the recommendation model using ALS on the training data
val als = new ALS()
  .setMaxIter(5)
  .setRegParam(0.01)
  .setUserCol("userId")
  .setItemCol("movieId")
  .setRatingCol("rating")
val model = als.fit(training)

// Evaluate the model by computing the RMSE on the test data
// Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
model.setColdStartStrategy("drop")
val predictions = model.transform(test)

val evaluator = new RegressionEvaluator()
  .setMetricName("rmse")
  .setLabelCol("rating")
  .setPredictionCol("prediction")
val rmse = evaluator.evaluate(predictions)
println(s"Root-mean-square error = $rmse")

// Generate top 10 movie recommendations for each user
val userRecs = model.recommendForAllUsers(10)
// Generate top 10 user recommendations for each movie
val movieRecs = model.recommendForAllItems(10)

// Generate top 10 movie recommendations for a specified set of users
val users = ratings.select(als.getUserCol).distinct().limit(3)
val userSubsetRecs = model.recommendForUserSubset(users, 10)
// Generate top 10 user recommendations for a specified set of movies
val movies = ratings.select(als.getItemCol).distinct().limit(3)
val movieSubSetRecs = model.recommendForItemSubset(movies, 10)
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,194评论 6 490
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,058评论 2 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 156,780评论 0 346
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,388评论 1 283
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,430评论 5 384
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,764评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,907评论 3 406
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,679评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,122评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,459评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,605评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,270评论 4 329
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,867评论 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,734评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,961评论 1 265
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,297评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,472评论 2 348

推荐阅读更多精彩内容