文本分类是指将一篇文章归到事先定义好的某一类或者某几类,互联网时代到来,数据以指数级增长,自媒体的兴起,让文本的增长更是突飞猛进,文档作为一种非结构化的数据(MySQL 中存放的是结构化数据),对于它的分析本来就存在一定的难度,再加上数据量的猛增,让原本 Python 的单机机器学习也压力倍增,显得力不从心。。
本文介绍使用Spark MLlib提供的朴素贝叶斯(Naive Bayes)及随机森林算法,完成对中文文本的分类过程。主要包括中文分词、文本向量化表示(TF-IDF、word2vec)、模型训练、分类预测等。
中文分词
对于中文文本分类而言,需要先对文章进行分词,我使用的是Hanlp中文分析工具
<dependency>
<groupId>com.hankcs</groupId>
<artifactId>hanlp</artifactId>
<version>portable-1.8.1</version>
</dependency>
中文特征向量化处理
对文本特征处理,即文本向量化的过程。常用的特征处理方法有:
TF-IDF 从字面意思来看分为 TF 和 IDF,TF 的意思是 Term Frequency,也就是词在文章中出现的频率,可以简单的认为是:一个词在文章中出现的频率越高,代表这个词越重要。比如:“坦克”这个词在军事类文章中出现了很多次,那么这个词对这类文章就会很重要,可能经济类的文章也会偶尔出现“坦克”,但肯定不会出现很多,那么这个词对经济类文章相对而言就不是那么重要。
IDF 的意思是 Inverse Document Frequency,也就是逆文本频率,可以认为是:一些词在一类文章中出现很多,如“坦克”,但在其他经济、政治类文章中很少出现,那么这个词就具有很好的分类能力,但相反,一些词在很多文章中都出现,如“有的”、“我们”等,它们虽然在很多文章中都出现了,但并没有很好的分类的能力,这个时候逆词频就发挥作用了,你出现的越多,你的比重反而下降了。
- TF-IDF 的基本思想是:
一个词的重要性随着它在文件中出现的次数成正比增加,但同时会随着它包含的文档数成反比下降。
分好词后,每一个词都作为一个特征,但需要将中文词语转换成Double型来表示,通常使用该词语的TF-IDF值作为特征值,Spark提供了全面的特征抽取及转换的API,非常方便,详见:TF-IDF的API
- 用以下数据举例
0,苹果 官网 苹果 宣布
1,苹果 梨 香蕉
举个例子,“苹果”在 1 篇文章共 1000 个词中总共出现了 10 次,那么“苹果”的 TF 就是 10/1000 = 0.01,“苹果”在 10000 篇文章中只在 10 篇里面出现过,那么“苹果”的 IDF 就是lg(10000/10) = 3,那么“苹果”的 TF-IDF 值就是 0.01*3 = 0.03。
TFIDF特征处理如下:
case class RawDataRecord(category: String, text: String)
//将原始数据映射到DataFrame中,字段category为分类编号,字段text为分好的词,以空格分隔
srcDF.select("category", "text").take(2).foreach(println)
[0,苹果 官网 苹果 宣布]
[1,苹果 梨 香蕉]
//将分好的词转换为数组
var tokenizer = new Tokenizer().setInputCol("text").setOutputCol("words")
var wordsData = tokenizer.transform(srcDF)
wordsData.select($"category",$"text",$"words").take(2).foreach(println)
[0,苹果 官网 苹果 宣布,WrappedArray(苹果, 官网, 苹果, 宣布)]
[1,苹果 梨 香蕉,WrappedArray(苹果, 梨, 香蕉)]
//将每个词转换成Int型,并计算其在文档中的词频(TF)
var hashingTF =
new HashingTF().setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(100)
var featurizedData = hashingTF.transform(wordsData)
这里将中文词语转换成INT型的Hashing算法,类似于Bloomfilter,上面的setNumFeatures(100)表示将Hash分桶的数量设置为100个,这个值默认为2的20次方,即1048576,可以根据你的词语数量来调整,一般来说,这个值越大,不同的词被计算为一个Hash值的概率就越小,数据也更准确,但需要消耗更大的内存,和Bloomfilter是一个道理。
featurizedData.select($"category", $"words", $"rawFeatures").take(2).foreach(println)
[0,WrappedArray(苹果, 官网, 苹果, 宣布),(100,[23,81,96],[2.0,1.0,1.0])]
[1,WrappedArray(苹果, 梨, 香蕉),(100,[23,72,92],[1.0,1.0,1.0])]
结果中,“苹果”用23来表示,第一个文档中,词频为2,第二个文档中词频为1.
//计算TF-IDF值
var idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
var idfModel = idf.fit(featurizedData)
var rescaledData = idfModel.transform(featurizedData)
rescaledData.select($"category", $"words", $"features").take(2).foreach(println)
[0,WrappedArray(苹果, 官网, 苹果, 宣布),(100,[23,81,96],[0.0,0.4054651081081644,0.4054651081081644])]
[1,WrappedArray(苹果, 梨, 香蕉),(100,[23,72,92],[0.0,0.4054651081081644,0.4054651081081644])]
//因为一共只有两个文档,且都出现了“苹果”,因此该词的TF-IDF值为0.
特征转换
最后将上述数据转换为Bayes输入格式
var trainDataRdd = rescaledData.select($"category",$"features").map {
case Row(label: String, features: Vector) =>
LabeledPoint(label.toDouble, Vectors.dense(features.toArray))
}
每一个LabeledPoint中,特征数组的长度为100(setNumFeatures(100)),”官网”和”宣布”对应的特征索引号分别为81和96,因此,在特征数组中,第81位和第96位分别为它们的TF-IDF值。
模型训练
数据准备好了,接下来进行模型训练及分类预测,代码:
%spark
import org.apache.spark.ml.attribute.Attribute
import org.apache.spark.ml.feature.{IndexToString, StringIndexer}
import com.hankcs.hanlp.HanLP;
import scala.collection.JavaConversions._
import com.hankcs.hanlp.seg.common.Term;
import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}
import org.apache.spark.ml.classification.NaiveBayes
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.tuning.{TrainValidationSplit, CrossValidator, ParamGridBuilder}
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.Word2Vec
import org.apache.spark.ml.classification.MultilayerPerceptronClassifier
import org.apache.spark.ml.classification.{RandomForestClassifier, GBTClassifier}
// import spark.implicits._
val df = spark.read.option("header",true).csv("/data/stat/recommend/ireg2/2021-05-12.csv")
case class wordFearture(category:String, wordsFearture:String)
val wordsSet = df.where("category_id is not null and name<>'会员免费'").rdd.map(row=>{
// try {
// print(row)
var words = ""
val ls = HanLP.segment(row.getAs("title").toString())
for(item <- ls){
if(item.word.length>=1 && !item.word.startsWith("%")){
words = words + item.word + " "
}
}
wordFearture(row.getAs[String]("name") , words)
// }catch {
// //如果解析报错赋予空值
// case e:Exception=> print(e)
// }
})
val wordsDF = spark.createDataFrame(wordsSet)
val indexer = new StringIndexer()
.setInputCol("category")
.setOutputCol("label")
.fit(wordsDF)
//val indexed = indexer.transform(wordsDF)
val tokenizer = new Tokenizer().setInputCol("wordsFearture").setOutputCol("words")
//TF IDF
val hashingTF = new HashingTF()
.setInputCol("words").setOutputCol("rawFeatures")
//将上一步的 TF 计算 IDF
val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
val VECTOR_SIZE = 512
//word2vec
val word2Vec = new Word2Vec()
.setInputCol("words")
.setOutputCol("features")
.setVectorSize(VECTOR_SIZE)
.setMinCount(1)
val nb = new NaiveBayes()
val layers = Array[Int](VECTOR_SIZE,6,5,indexer.labels.size)
// val md = new MultilayerPerceptronClassifier().setLayers(layers).setBlockSize(512).setSeed(1234L).setMaxIter(128).setFeaturesCol("features").setPredictionCol("prediction")
val md = new RandomForestClassifier()
.setLabelCol("label")
.setFeaturesCol("features")
.setNumTrees(20)
.setMaxDepth(5)
val converter = new IndexToString()
.setInputCol("prediction")
.setOutputCol("predictionName")
.setLabels(indexer.labels)
//贝叶斯分类
//val pipeline = new Pipeline().setStages(Array(indexer, tokenizer, hashingTF, idf, nb, converter))
//随机森林分类树
val pipeline = new Pipeline().setStages(Array(indexer, tokenizer, word2Vec, md, converter))
//网格参数使得超参数调优更加的方便,只需要在网格中加入可能的参数
val paramGrid = new ParamGridBuilder()
.addGrid(nb.smoothing, Array(0.5, 1,1.5))
.build()
//将所有的步骤加入到 TrainValidationSplit 中,包括 训练器、评估方法、模型的网格参数、并行度等
// val cv = new TrainValidationSplit()
// .setEstimator(pipeline)
// .setEvaluator(new MulticlassClassificationEvaluator)
// .setEstimatorParamMaps(paramGrid)
// .setTrainRatio(0.7)
// .setParallelism(2)
val cv = new CrossValidator()
.setEstimator(pipeline)
.setEvaluator(new MulticlassClassificationEvaluator)
.setEstimatorParamMaps(paramGrid)
.setNumFolds(5) // Use 3+ in practice
.setParallelism(2) // Evaluate up to 2 parameter settings in parallel
val Array(training, test) = wordsDF.randomSplit(Array(0.8, 0.2), seed = 12345)
// val model = cv.fit(training)
val model = pipeline.fit(training)
val predictions = model.transform(test)
//评估模型
val evaluator = new MulticlassClassificationEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")
.setMetricName("accuracy")
val accuracy = evaluator.evaluate(predictions)
println(s"Test set accuracy = $accuracy")
import org.apache.spark.ml.functions.vector_to_array
import org.apache.spark.sql.functions._
predictions.select($"category", $"predictionName",round(element_at(vector_to_array($"probability"),1),4)).show
-
效果展示