基于用户的协同过滤
基于用户的协同过滤又称为内存型协同过滤,需要将所有数据都保存在内存中进行计算;我们将一个用户和其他所有用户进行对比找到相似的人。这种算法有两个弊端:
1、扩展性 随着用户年年的增加,其计算量也会增加,这种算法在只有几千个用户的情况下能够工作的很好,但达到一百万个用户时就会出现瓶颈。
2、稀疏性 大多数推荐系统中,物品的数量要远大于用户的数量,因此用户仅仅对一小部分物品进行评价,这就造成了数据的稀疏性。
基于物品的协同过滤
基于物品的协同过滤又称为基于模型的协同过滤,不需要保存所有的数据,而是通过构建一个物品相似度模型来获取结果。
修正的余弦相似度
我们使用余弦相似度来计算两个物品的距离。考虑到“分数膨胀”现象,我们会从用户的评价中减去他所有评价的均值,获得修正的余弦相似度。
s(i, j) 表示物品 i 和 j 的相似度,分子表示将同时平见过物品 i 和 j 的用户的修正评分相乘评求和,分母这是对所有的物品的修正评分做一些汇总处理。
def computeSimilarity(band1, band2, userRatings):
averages = {}
for (key, ratings) in userRatings.items():
averages[key] = (float(sum(ratings.values())) / len(ratings.values()))
num = 0 # 分子
dem1 = 0 # 分母的第一部分
dem2 = 0
for (user, ratings) in userRatings.items():
if band1 in ratings and band2 in ratings:
avg = averages[user]
num += (ratings[band1] - avg) * (ratings[band2] - avg)
dem1 += (ratings[band1] - avg) ** 2
dem2 += (ratings[band2] - avg) ** 2
return num / (sqrt(dem1) * sqrt(dem2))
修正的余弦相似度范围扩展
MaxR 表示评分系统中的最高分, MinR 为最低分, Ru,N 是用户 u 对物品 N 的评分, NRu,N 则是表示修正后的评分(即范围在 -1 和 1 之间)
小结:
1、修正的余弦相似度是一种基于模型的协同过滤算法,这种算法的优势之一是扩展性好,对于大数据量而言,运算速度快、占用内存少。
2、用户的评价标准是不同的,修正的余弦近似度计算时会将用户对物品的评分减去用户所有评分的均值,而解决这个问题
# -*- coding:utf-8 -*-
"""Slope One 算法"""
import codecs
from math import sqrt
users2 = {"Amy": {"Taylor Swift": 4, "PSY": 3, "Whitney Houston": 4},
"Ben": {"Taylor Swift": 5, "PSY": 2},
"Clara": {"PSY": 3.5, "Whitney Houston": 4},
"Daisy": {"Taylor Swift": 5, "Whitney Houston": 3}}
users = {"Angelica": {"Blues Traveler": 3.5, "Broken Bells": 2.0,
"Norah Jones": 4.5, "Phoenix": 5.0,
"Slightly Stoopid": 1.5, "The Strokes": 2.5,
"Vampire Weekend": 2.0},
"Bill":{"Blues Traveler": 2.0, "Broken Bells": 3.5,
"Deadmau5": 4.0, "Phoenix": 2.0,
"Slightly Stoopid": 3.5, "Vampire Weekend": 3.0},
"Chan": {"Blues Traveler": 5.0, "Broken Bells": 1.0,
"Deadmau5": 1.0, "Norah Jones": 3.0,
"Phoenix": 5, "Slightly Stoopid": 1.0},
"Dan": {"Blues Traveler": 3.0, "Broken Bells": 4.0,
"Deadmau5": 4.5, "Phoenix": 3.0,
"Slightly Stoopid": 4.5, "The Strokes": 4.0,
"Vampire Weekend": 2.0},
"Hailey": {"Broken Bells": 4.0, "Deadmau5": 1.0,
"Norah Jones": 4.0, "The Strokes": 4.0,
"Vampire Weekend": 1.0},
"Jordyn": {"Broken Bells": 4.5, "Deadmau5": 4.0,
"Norah Jones": 5.0, "Phoenix": 5.0,
"Slightly Stoopid": 4.5, "The Strokes": 4.0,
"Vampire Weekend": 4.0},
"Sam": {"Blues Traveler": 5.0, "Broken Bells": 2.0,
"Norah Jones": 3.0, "Phoenix": 5.0,
"Slightly Stoopid": 4.0, "The Strokes": 5.0},
"Veronica": {"Blues Traveler": 3.0, "Norah Jones": 5.0,
"Phoenix": 4.0, "Slightly Stoopid": 2.5,
"The Strokes": 3.0}
}
class recommender:
def __init__(self, data, k=1, metric='pearson', n=5):
""" 初始化推荐模块
data 训练数据
k K 邻近算法中的值
metric 使用何种距离计算方式
n 推荐结果的数量
"""
self.k = k
self.n = n
self.username2id = {}
self.userid2name = {}
self.productid2name = {}
# 将距离计算方式保存下来
self.frequencies = {}
self.deviations = {}
self.metric = metric
if self.metric == 'pearson':
self.fn = self.pearson
# 如果 data 是一个字典类型,则保存下来,否则忽略
if type(data).__name__ == 'dict':
self.data = data
def convertProductID2name(self, id):
# 通过产品 ID 获取名称
if id in self.productid2name:
return self.productid2name[id]
else:
return id
def userRatings(self, id, n):
# 返回该用户评分最高的物品
print ("Ratings for " + self.userid2name[id])
ratings = self.data[id]
print(len(ratings))
ratings = list(ratings.items())[:n]
ratings = [(self.convertProductID2name(k), v) for (k, v) in ratings]
# 排序并返回结果
ratings.sort(key=lambda artistTuple: artistTuple[1], reverse = True)
for rating in ratings: print("%s\t%i" % (rating[0], rating[1]))
def showUserTopItems(self, user, n):
""""""
items = list(self.data[user].items())
items.sort(key=lambda itemTuple: itemTuple[1], reverse=True)
for i in range(n):
print("%s\t%i" % (self.convertProductID2name(items[i][0]), items[i][1]))
def loadMovieLens(self, path = ''):
self.data = {}
# 加载 Movie 数据集,path 是数据文件位置
i = 0
# 将数据评分数据放入 self.data
f = codecs.open(path + "u.data", 'r', 'ascii')
for line in f:
i += 1
#separate line into fields
fields = line.split('\t')
user = fields[0]
movie = fields[1]
rating = int(fields[2].strip().strip('"'))
if user in self.data:
currentRatings = self.data[user]
else:
currentRatings = {}
currentRatings[movie] = rating
self.data[user] = currentRatings
f.close()
# 将数据信息存入 self.productid2name
f = codecs.open("F:\\DataminingGuide\\chapter3\\ml-100k\\u.item", 'r', 'iso8859-1', 'ignore')
for line in f:
i += 1
#separate line into fields
fields = line.split('|')
mid = fields[0].strip()
title = fields[1].strip()
self.productid2name[mid] = title
f.close()
# 将用户信息存入 self.userid2name 和 self.username2id
f = open("F:\\DataminingGuide\\chapter3\\ml-100k\\u.user")
for line in f:
i += 1
fields = line.split('|')
userid = fields[0].strip('"')
self.userid2name[userid] = line
self.username2id[line] = userid
f.close()
print(i)
def loadBookDB(self, path = ''):
# 加载 BX 数据集,path 是数据文件位置
self.data = {}
i = 0
# 将数据评分数据放入 self.data
f = codecs.open(path + "u.data", 'r', 'utf8')
for line in f:
i += 1
#separate line into fields
fields = line.split(';')
user = fields[0].strip('"')
book = fields[1].strip('"')
rating = int(fields[2].strip().strip('"'))
if rating > 5:
print("EXCEEDING ", rating)
if user in self.data:
currentRatings = self.data[user]
else:
currentRatings = {}
currentRatings[book] = rating
self.data[user] = currentRatings
f.close()
# 将数据信息存入 self.productid2name
# 包括 isbn 号、书名、作者等
f = codecs.open(path + "\BX-Books.csv", 'r', 'utf8')
for line in f:
i += 1
#separate line into fields
fields = line.split(';')
isbn = fields[0].strip('"')
title = fields[1].strip('"')
author = fields[2].strip().strip('"')
title = title + ' by ' + author
self.productid2name[isbn] = title
f.close()
# 将用户信息存入 self.userid2name 和 self.username2id
f = codecs.open(path + "\BX-Users.csv", 'r', 'utf8')
for line in f:
i += 1
#separate line into fields
fields = line.split(';')
userid = fields[0].strip('"')
location = fields[1].strip('"')
if len(fields) > 3:
age = fields[2].strip().strip('"')
else:
age = 'NULL'
if age != 'NULL':
value = location + ' (age: ' + age + ')'
else:
value = location
self.userid2name[userid] = value
self.username2id[location] = userid
f.close()
print(i)
def computeDeviations(self):
"""获取美味用户的评分数据"""
for ratings in self.data.values():
# 对于该用户的每个评分项(歌手、分数)
for (item, rating) in ratings.items():
self.frequencies.setdefault(item, {})
self.deviations.setdefault(item, {})
# 再次遍历该用户的每个评分项
for (item2, rating2) in ratings.items():
if item != item2:
# 将评分的差异保存到变量中
self.frequencies[item].setdefault(item2, 0)
self.deviations[item].setdefault(item2, 0.0)
self.frequencies[item][item2] += 1
self.deviations[item][item2] += rating - rating2
for (item, ratings) in self.deviations.items():
for item2 in ratings:
ratings[item2] /= self.frequencies[item][item2]
def slopeOneRecommendations(self, userRatings):
recommendations = {}
frequencies = {}
# 遍历目标用户的评分项(歌手、分数)
for (userItem, userRating) in userRatings.items():
# 对目标用户未评价的歌手进行计算
for (diffItem, diffRatings) in self.deviations.items():
if diffItem not in userRatings and userItem in self.deviations[diffItem]:
freq = self.frequencies[diffItem][userItem]
recommendations.setdefault(diffItem, 0.0)
frequencies.setdefault(diffItem, 0)
# 分子
recommendations[diffItem] += (diffRatings[userItem] + userRating) * freq
# 分母
frequencies[diffItem] += freq
recommendations = [(self.convertProductID2name(k), v / frequencies[k])
for (k, v) in recommendations.items()]
# 排序并返回
recommendations.sort(key=lambda artistTuple: artistTuple[1], reverse = True)
return recommendations
def pearson(self, rating1, rating2):
sum_xy = 0
sum_x = 0
sum_y = 0
sum_x2 = 0
sum_y2 = 0
n = 0
for key in rating1:
if key in rating2:
n += 1
x = rating1[key]
y = rating2[key]
sum_xy += x * y
sum_x += x
sum_y += y
sum_x2 += pow(x, 2)
sum_y2 += pow(y, 2)
if n == 0:
return 0
# now compute denominator
denominator = (sqrt(sum_x2 - pow(sum_x, 2) / n) * sqrt(sum_y2 - pow(sum_y, 2) / n))
if denominator == 0:
return 0
else:
return (sum_xy - (sum_x * sum_y) / n) / denominator
def computeNearestNeighbor(self, username):
"""获取邻近用户"""
distances = []
for instance in self.data:
if instance != username:
distance = self.fn(self.data[username], self.data[instance])
distances.append((instance, distance))
# 按距离排序,距离近的排在前面
distances.sort(key=lambda artistTuple: artistTuple[1], reverse=True)
return distances
def recommend(self, user):
"""返回推荐列表"""
recommendations = {}
# 首先获取邻近用户
nearest = self.computeNearestNeighbor(user)
# 获取用户评价过的商品
userRatings = self.data[user]
# 计算总距离
totalDistance = 0.0
for i in range(self.k):
totalDistance += nearest[i][1]
# 汇总 k 邻近用户的评分
for i in range(self.k):
# 计算饼图的每个分片
weight = nearest[i][1] / totalDistance
# 获取用户名称
name = nearest[i][0]
# 获取用户评分
neighborRatings = self.data[name]
# 获取没有评价过的商品
for artist in neighborRatings:
if not artist in userRatings:
if artist not in recommendations:
recommendations[artist] = (neighborRatings[artist] * weight)
else:
recommendations[artist] = (recommendations[artist] + neighborRatings[artist] * weight)
# 开始推荐
recommendations = list(recommendations.items())
recommendations = [(self.convertProductID2name(k), v)
for (k, v) in recommendations]
# 排序并返回
recommendations.sort(key=lambda artistTuple: artistTuple[1], reverse=True)
# 返回前 n 个结果
return recommendations[:self.n]
r = recommender(0)
r.loadMovieLens('...')
r.computeDeviations()
print(r.slopeOneRecommendations(r.data['25']))
参考原文作者:Ron Zacharski CC BY-NC 3.0] https://github.com/egrcc/guidetodatamining