Spark 机器学习库从 1.2 版本以后被分为两个包:
-
spark.mllib
包含基于RDD的原始算法API。Spark MLlib 历史比较长,在1.0 以前的版本即已经包含了,提供的算法实现都是基于原始的 RDD。 -
spark.ml
则提供了基于DataFrames 高层次的API,可以用来构建机器学习工作流(PipeLine)。ML Pipeline 弥补了原始 MLlib 库的不足,向用户提供了一个基于 DataFrame 的机器学习工作流式 API 套件。
从Spark2.0开始,基于RDD的API进入维护模式(即不增加任何新的特性),并预期于3.0版本的时候被移除出MLLib。因此,我们将以ml包为主进行介绍。
一个典型的机器学习过程从数据收集开始,要经历多个步骤,才能得到需要的输出。这非常类似于流水线式工作,即通常会包含源数据ETL(抽取、转化、加载),数据预处理,指标提取,模型训练与交叉验证,新数据预测等步骤。
在介绍工作流之前,我们先来了解几个重要概念:
DataFrame:使用Spark SQL中的DataFrame作为数据集,它可以容纳各种数据类型。 较之 RDD,包含了 schema 信息,更类似传统数据库中的二维表格。它被 ML Pipeline 用来存储源数据。例如,DataFrame中的列可以是存储的文本,特征向量,真实标签和预测的标签等。
Transformer:翻译成转换器,是一种可以将一个DataFrame转换为另一个DataFrame的算法。比如一个模型就是一个 Transformer。它可以把 一个不包含预测标签的测试数据集 DataFrame 打上标签,转化成另一个包含预测标签的 DataFrame。技术上,Transformer实现了一个方法transform(),它通过附加一个或多个列将一个DataFrame转换为另一个DataFrame。
Estimator:翻译成估计器或评估器,它是学习算法或在训练数据上的训练方法的概念抽象。在 Pipeline 里通常是被用来操作 DataFrame 数据并生产一个 Transformer。从技术上讲,Estimator实现了一个方法fit(),它接受一个DataFrame并产生一个转换器。如一个随机森林算法就是一个 Estimator,它可以调用fit(),通过训练特征数据而得到一个随机森林模型。
Parameter:Parameter 被用来设置 Transformer 或者 Estimator 的参数。现在,所有转换器和估计器可共享用于指定参数的公共API。ParamMap是一组(参数,值)对。
PipeLine:翻译为工作流或者管道。工作流将多个工作流阶段(转换器和估计器)连接在一起,形成机器学习的工作流,并获得结果输出。
工作流如何工作
要构建一个 Pipeline工作流,首先需要定义 Pipeline 中的各个工作流阶段PipelineStage,(包括转换器和评估器),比如指标提取和转换模型训练等。有了这些处理特定问题的转换器和 评估器,就可以按照具体的处理逻辑有序的组织PipelineStages 并创建一个Pipeline。比如:
val pipeline = new Pipeline().setStages(Array(stage1,stage2,stage3,…))
然后就可以把训练数据集作为输入参数,调用 Pipeline 实例的 fit 方法来开始以流的方式来处理源训练数据。这个调用会返回一个 PipelineModel 类实例,进而被用来预测测试数据的标签。更具体的说,工作流的各个阶段按顺序运行,输入的DataFrame在它通过每个阶段时被转换。 对于Transformer阶段,在DataFrame上调用transform()方法。 对于估计器阶段,调用fit()方法来生成一个转换器(它成为PipelineModel的一部分或拟合的Pipeline),并且在DataFrame上调用该转换器的transform()方法。
上面,顶行表示具有三个阶段的流水线。 前两个(Tokenizer和HashingTF)是Transformers(蓝色),第三个(LogisticRegression)是Estimator(红色)。 底行表示流经管线的数据,其中圆柱表示DataFrames。 在原始DataFrame上调用Pipeline.fit()方法,它具有原始文本文档和标签。 Tokenizer.transform()方法将原始文本文档拆分为单词,向DataFrame添加一个带有单词的新列。 HashingTF.transform()方法将字列转换为特征向量,向这些向量添加一个新列到DataFrame。 现在,由于LogisticRegression是一个Estimator,Pipeline首先调用LogisticRegression.fit()产生一个LogisticRegressionModel。 如果流水线有更多的阶段,则在将DataFrame传递到下一个阶段之前,将在DataFrame上调用LogisticRegressionModel的transform()方法。
SparkSession是spark2.0的全新切入点,用以替代 sparkcontext ,StreamingContext,sqlContext,HiveContext。
直接隐式反馈。
al Array(training, test) = ratings.randomSplit(Array(0.8, 0.2))
val alsImplicit = new ALS().setMaxIter(5).setRegParam(0.01).setImplicitPrefs(true).
setUserCol("userId").setItemCol("movieId").setRatingCol("rating")
在 ML 中的实现有如下的参数:
- numBlocks 是用于并行化计算的用户和商品的分块个数 (默认为10)。
- rank 是模型中隐义因子的个数(默认为10)。
- maxIter 是迭代的次数(默认为10)。
- regParam 是ALS的正则化参数(默认为1.0)。
- implicitPrefs 决定了是用显性反馈ALS的版本还是用适用隐性反馈数据集的版本(默认是false,即用显性反馈)。
- alpha 是一个针对于隐性反馈 ALS 版本的参数,这个参数决定了偏好行为强度的基准(默认为1.0)。
- nonnegative 决定是否对最小二乘法使用非负的限制(默认为false)。
可以调整这些参数,不断优化结果,使均方差变小。比如:maxIter越大,regParam越 小,均方差会越小,推荐结果较优。
- userCol:用户列的名字,String类型。对应于后续调用fit()操作时输入的- Dataset<Row>入参时用户id所在schema中的name
- itemCol:item列的名字,String类型。对应于后续调用fit()操作时输入的Dataset<Row>入参时item id所在schema中的name
- ratingCol:rating列的名字,String类型。对应于后续调用fit()操作时输入的Dataset<Row>入参时rating值所在schema中的name
- predictionCol:String类型。做transform()操作时输出的预测值在Dataset<Row>结果的schema中的name,默认是“prediction”
- coldStartStrategy:String类型。有两个取值"nan" or "drop"。这个参数指示用在prediction阶段时遇到未知或者新加入的user或item时的处理策略。尤其是在交叉验证或者生产场景中,遇到没有在训练集中出现的user/item id时。"nan"表示对于未知id的prediction结果为NaN。"drop"表示对于transform()的入参DataFrame中出现未知ids的行,将会在包含prediction的返回DataFrame中被drop。默认值是"nan"
接下来,把推荐模型放在训练数据上训练:
val modelImplicit = alsImplicit.fit(training)
使用训练好的推荐模型对测试集中的用户商品进行预测评分,得到预测评分的数据集:
val predictionsImplicit = modelImplicit.transform(test)
结果:
predictionsImplicit.show()
+------+-------+------+----------+-----------+
|userId|movieId|rating| timestamp| prediction|
+------+-------+------+----------+-----------+
| 13| 31| 1.0|1424380312| 0.33150947|
| 5| 31| 1.0|1424380312|-0.24669354|
| 24| 31| 1.0|1424380312|-0.22434244|
| 29| 31| 1.0|1424380312| 0.15776125|
| 0| 31| 1.0|1424380312| 0.51940984|
| 28| 85| 1.0|1424380312| 0.88610375|
| 13| 85| 1.0|1424380312| 0.15872183|
| 20| 85| 2.0|1424380312| 0.64086926|
| 4| 85| 1.0|1424380312|-0.06314563|
| 8| 85| 5.0|1424380312| 0.2783457|
| 7| 85| 4.0|1424380312| 0.1618208|
| 29| 85| 1.0|1424380312|-0.19970453|
| 19| 65| 1.0|1424380312| 0.11606887|
| 4| 65| 1.0|1424380312|0.068018675|
| 2| 65| 1.0|1424380312| 0.28533924|
| 12| 53| 1.0|1424380312| 0.42327875|
| 20| 53| 3.0|1424380312| 0.17345423|
| 19| 53| 2.0|1424380312| 0.33321634|
| 8| 53| 5.0|1424380312| 0.10090684|
| 23| 53| 1.0|1424380312| 0.06724724|
+------+-------+------+----------+-----------+
only showing top 20 rows
模型评估
通过计算模型的均方根误差来对模型进行评估,均方根误差越小,模型越准确:
val evaluator = new RegressionEvaluator().setMetricName("rmse").setLabelCol("rating").
setPredictionCol("prediction")
val rmseImplicit = evaluator.evaluate(predictionsImplicit)
// rmseImplicit: Double = 1.8011620822359165
可以看到打分的均方差值为1.69和1.80左右。由于本例的数据量很少,预测的结果和实际相比有一定的差距。
使用推荐系统
ALS fit 方法返回 ALSModel,其有 recommendForAllUsers(numItems: Int) 和 recommendForAllItems(numUsers: Int) 方法,用于推荐。
例子
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.recommendation.ALS
case class Rating(userId: Int, movieId: Int, rating: Float, timestamp: Long)
def parseRating(str: String): Rating = {
val fields = str.split("::")
assert(fields.size == 4)
Rating(fields(0).toInt, fields(1).toInt, fields(2).toFloat, fields(3).toLong)
}
val ratings = spark.read.textFile("data/mllib/als/sample_movielens_ratings.txt")
.map(parseRating)
.toDF()
val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2))
// Build the recommendation model using ALS on the training data
val als = new ALS()
.setMaxIter(5)
.setRegParam(0.01)
.setUserCol("userId")
.setItemCol("movieId")
.setRatingCol("rating")
val model = als.fit(training)
// Evaluate the model by computing the RMSE on the test data
// Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
model.setColdStartStrategy("drop")
val predictions = model.transform(test)
val evaluator = new RegressionEvaluator()
.setMetricName("rmse")
.setLabelCol("rating")
.setPredictionCol("prediction")
val rmse = evaluator.evaluate(predictions)
println(s"Root-mean-square error = $rmse")
// Generate top 10 movie recommendations for each user
val userRecs = model.recommendForAllUsers(10)
// Generate top 10 user recommendations for each movie
val movieRecs = model.recommendForAllItems(10)
// Generate top 10 movie recommendations for a specified set of users
val users = ratings.select(als.getUserCol).distinct().limit(3)
val userSubsetRecs = model.recommendForUserSubset(users, 10)
// Generate top 10 user recommendations for a specified set of movies
val movies = ratings.select(als.getItemCol).distinct().limit(3)
val movieSubSetRecs = model.recommendForItemSubset(movies, 10)