KNN 算法
k 近邻算法( kNN ):考察新记录周围距离最近的 k 条记录,而不是只看一条。
每个近邻都有投票权,程序会将新纪录判定为得票数最多的分类。比如说,我们使用三个近邻( k = 3 ),其中两条记录属于体操,一条记录属于马拉松,那我们会判定 x 为体操。
因此,我们在判定一条记录的具体分类时可以用这种投票的方法。如果两个分类的票数相等,就随机选取一个。
但对于需要预测具体数值的情形,我们可以计算** k 个近邻的距离加权平均值**。
例:
在计算平均值的时候我们希望距离越近的用户影响越大,因此可以对距离取倒数,从而得到下表:
把所有的距离倒数除以距离倒数的和( 0.2 + 0.1 + 0.067 = 0.367 ),从而得到评分的权重:
求的平均值,即对新纪录的预测值:
kNN算法实现
def knn(self, itemVector):
"""使用kNN算法判断itemVector所属类别"""
# 使用heapq.nsmallest来获得k个近邻
neighbors = heapq.nsmallest(self.k, [(self.manhattan(itemVector, item[1]), item)for item in self.data])
# 每个近邻都有投票权
results = {}
for neighbor in neighbors:
theClass = neighbor[1][0]
results.setdefault(theClass, 0)
results[theClass] += 1
resultList = sorted([(i[1], i[0]) for i in results.items()], reverse=True)
# 获取得票最高的分类
maxVotes = resultList[0][0]
possibleAnswers = [i[1] for i in resultList if i[0] == maxVotes]
# 若得票相等则随机选取一个
answer = random.choice(possibleAnswers) return(answer)
# -*- coding:utf-8 -*-
'''
Created on 2018年11月27日
@author: KingSley
'''
import heapq
import random
class Classifier:
def __init__(self, bucketPrefix, testBucketNumber, dataFormat, k):
"""该分类器程序将从 bucketPrefix 指定的一系列文件中读取数据,
并留出 testBucketNumber 指定的桶来做测试集,其余的做训练集。
dataFormat 用来表示数据的格式,如:
"class num num num num num comment"
"""
self.medianAndDeviation = []
self.k = k
# 从文件中读取文件
self.format = dataFormat.strip().split('\t')
self.data = []
# 用 1-10 来标记桶
for i in range(1, 11):
# 判断该桶时候包含在训练集中
if i != testBucketNumber:
filename = "%s-%02i" % (bucketPrefix, i)
f = open(filename)
lines = f.readlines()
f.close()
for line in lines[1:]:
fields = line.strip().split('\t')
ignore = []
vector = []
for i in range(len(fields)):
if self.format[i] == 'num':
vector.append(float(fields[i]))
elif self.format[i] == 'comment':
ignore.append(fields[i])
elif self.format[i] == 'class':
classification = fields[i]
self.data.append((classification, vector, ignore))
self.rawData = list(self.data)
# 获取特征向量的长度
self.vlen = len(self.data[0][1])
# 标准化数据
for i in range(self.vlen):
self.normalizeColumn(i)
def getMedian(self, alist):
"""返回中位数"""
if alist == []:
return []
blist = sorted(alist)
length = len(alist)
if length % 2 == 1:
# 列表有奇数个元素,返回中间元素
return blist[int(((length + 1) / 2) - 1)]
else:
# 列表有偶数个元素,返回总量两个元素的均值
v1 = blist[int(length / 2)]
v2 = blist[(int(length / 2) - 1)]
return (v1 + v2) / 2.0
def getAbsoluteStandardDeviation(self, alist, median):
"""计算绝对偏差"""
sum = 0
for item in alist:
sum += abs(item - median)
return sum / len(alist)
def normalizeColumn(self, columnNumber):
"""标准化 self.data 中的 columnNumber 列"""
# 将该列所有值提取到一个列表中
col = [v[1][columnNumber] for v in self.data]
median = self.getMedian(col)
asd = self.getAbsoluteStandardDeviation(col, median)
#print("Median: %f ASD = %f" % (median, asd))
self.medianAndDeviation.append((median, asd))
for v in self.data:
v[1][columnNumber] = (v[1][columnNumber] - median) / asd
def normalizeVector(self, v):
"""对每列的中位数和绝对偏差,计算标准化向量 v"""
vector = list(v)
for i in range(len(vector)):
(median, asd) = self.medianAndDeviation[i]
vector[i] = (vector[i] - median) / asd
return vector
def testBucket(self, bucketPrefix, bucketNumber):
"""读取 bucketPrefix - bucketNumber 所指定的文件作为测试集"""
filename = "%s-%02i" % (bucketPrefix, bucketNumber)
f = open(filename)
lines = f.readlines()
totals = {}
f.close()
for line in lines:
data = line.strip().split('\t')
vector = []
classInColumn = -1
for i in range(len(self.format)):
if self.format[i] == 'num':
vector.append(float(data[i]))
elif self.format[i] == 'class':
classInColumn = i
theRealClass = data[classInColumn]
classifiedAs = self.classify(vector)
totals.setdefault(theRealClass, {})
totals[theRealClass].setdefault(classifiedAs, 0)
totals[theRealClass][classifiedAs] += 1
return totals
def manhattan(self, vector1, vector2):
"""计算曼哈顿距离"""
return sum(map(lambda v1, v2: abs(v1 - v2), vector1, vector2))
def nearestNeighbor(self, itemVector):
"""返回 itemVector 的邻近"""
return min([(self.manhattan(itemVector, item[1]), item) for item in self.data])
def knn(self, itemVector):
"""使用 knn 算法判断 itemVector 所属类别"""
# 使用 heapq.nsmallest 来获得 k 个近邻
neighbors = heapq.nsmallest(self.k, [(self.manhattan(itemVector, item[1]), item) for item in self.data])
# 每个近邻都有投票权
results = {}
for neighbor in neighbors:
theClass = neighbor[1][0]
results.setdefault(theClass, 0)
results[theClass] += 1
resultList = sorted([(i[1], i[0]) for i in results.items()], reverse=True)
# 获取得票最高的分类
maxVectes = resultList[0][0]
possibleAnswers = [i[1] for i in resultList if i[0] == maxVectes]
# 若得票相等则随机选取一个
answer = random.choice(possibleAnswers)
return(answer)
def classify(self, itemVector):
"""预测 itemVector 的分类"""
return self.knn(self.normalizeVector(itemVector))
def tenfold(bucketPrefix, dataFormat, k):
results = {}
for i in range(1, 11):
c = Classifier(bucketPrefix, i, dataFormat, k)
t = c.testBucket(bucketPrefix, i)
for (key, value) in t.items():
results.setdefault(key, {})
for (ckey, cvalue) in value.items():
results[key].setdefault(ckey, 0)
results[key][ckey] += cvalue
# 输出结果
categories = list(results.keys())
categories.sort()
print( "\n Classified as: ")
header = " "
subheader = " +"
for category in categories:
header += category + " "
subheader += "----+"
print (header)
print (subheader)
total = 0.0
correct = 0.0
for category in categories:
row = category + " |"
for c2 in categories:
if c2 in results[category]:
count = results[category][c2]
else:
count = 0
row += " %2i |" % count
total += count
if c2 == category:
correct += count
print(row)
print(subheader)
print("\n%5.3f percent correct" %((correct * 100) / total))
print("total of %i instances" % total)
print("SMALL DATA SET")
tenfold("pimaSmall\pimaSmall","num\tnum\tnum\tnum\tnum\tnum\tnum\tnum\tclass", 3)
print("\n\nLARGE DATA SET")
tenfold("pima\pima","num\tnum\tnum\tnum\tnum\tnum\tnum\tnum\tclass", 3)
参考原文作者:Ron Zacharski CC BY-NC 3.0] https://github.com/egrcc/guidetodatamining