使用numpy实现神经网络
使用numpy实现MLP(多层感知机)
手写神经网络
手写反向传播
手写梯度下降、随机梯度下降
代码都在https://github.com/wushangbin/tripping/blob/master/Python/MLP_with_numpy.py
看了GitHub代码,别忘了star哦!
1 前向传播
1.1 前向传播理论
先看前向传播过程,还是很简单的。
这里:
设这一层输出的维度为, 这也是本层的结点个数,而上一层输出维度为
一次性输入网络的样本个数为 (即batch_size的大小)
:上一层的输出,如果上一层是输入层,那就是x. 它的shape:(, )
: 权重矩阵,在训练中调整,它的shape:(, )
: 偏置,shape:(, 1)
: 这一层的输出(未经激活函数),shape: (, )
最后这个输出要经过激活函数变为,即:
其中, 为激活函数,可以是sigmoid、tanh、relu等等。
1.2 前向传播代码
在前向传播前,首先要把参数进行随机初始化:
import numpy as np
class MLPnet:
def __init__(self, x, y, lr=0.005):
"""
:param x: data
:param y: labels
:param lr: learning rate
yh: predicted labels
"""
self.X = x
self.Y = y
self.yh = np.zeros((1, self.Y.shape[1]))
self.lr = lr
self.dims = [12, 20, 1] # 不同层的结点个数
self.param = {} # 需要训练的参数
self.ch = {} # 将一些结果存在这里,以便在反向传播时使用
self.loss = [] # 存放每个epoch的loss
self.batch_size = 64
def nInit(self):
"""对神经网络中的参数进行随机初始化"""
np.random.seed(1)
self.param['theta1'] = np.random.randn(self.dims[1], self.dims[0]) / np.sqrt(self.dims[0])
self.param['b1'] = np.zeros((self.dims[1], 1))
self.param['theta2'] = np.random.randn(self.dims[2], self.dims[1]) / np.sqrt(self.dims[1])
self.param['b2'] = np.zeros((self.dims[2], 1))
然后需要注意的是,我们在前向传播中,需要实现激活函数,这里,我们实现Relu和Tanh:
Relu:
Tanh:
这个实现起来还是很简单的:
def Relu(self, u):
return np.maximum(0, u)
def Tanh(self, u):
return (np.exp(u) - np.exp(-u)) / (np.exp(u) + np.exp(-u))
这里两个函数的输入u实际上可以是任意维度的
然后就是前向传播的整体过程了,实际上就是把之前的乘法和加法用代码实现就好:
def forward(self, x):
if self.param == {}:
self.nInit()
u1 = np.matmul(self.param['theta1'], x) + self.param['b1']
o1 = self.Tanh(u1)
u2 = np.matmul(self.param['theta2'], o1) + self.param['b2']
o2 = self.Relu(u2)
self.ch['X'] = x
self.ch['u1'], self.ch['o1'] = u1, o1
self.ch['u2'], self.ch['o2'] = u2, o2
return o2
要注意,这里我们要把进行保存,是为了在反向传播时使用
2 反向传播
2.1 损失函数
在前向传播之后,模型实际上就已经输出了一个预测结果,当然了,这个预测结果可能并不好,我们需要根据预测结果调整模型中参数的权重。
所以这里需要一个损失函数,来评价我们的预测结果,损失函数的值越小,说明我们的预测结果与真实值差距越小,在这里,我们使用的损失函数是MSE(Mean Square Error)
其中
: 真实值
: 预测值
这里应注意,我们y和yh的形状,都是2维的,所以在计算的时候,应该y[0][i] - yh[0][i]
而且,因为我们y的形状是(1, n) 所以我们不能用len(y)来表示样本数量,要把样本数量显示地提取出来
代码:
def nloss(self,y, yh):
"""
:param y: 1*n, 列表
:param yh: 1*N, 列表
:return: 1*1 标量
"""
n = y.shape[1]
error = []
squaredError = []
for i in range(n):
error.append(y[0][i] - yh[0][i])
for val in error:
squaredError.append(val * val)
result = sum(squaredError) / (2 * n)
return result
2.2 反向传播理论
我们在计算出loss之后,下一步就是根据loss调整模型中的参数
具体怎么调整呢?
这个原理其实很简单:我们的输入是固定的,像, 这样的参数是可训练的(可变的),因此我们现在就可以得到一个这样的函数,函数的输出值,取决于参数:
我们把整个网络看作一个函数, 其中的所有的参数表示为,那么接下来,我们就是要找到这个函数的最小值(的最小值),以及Loss取最小值时的的值。
在线性回归中,我们可以直接通过数学方法直接求出最优解,但是现在模型复杂,我们要采用梯度下降法。
在梯度下降法中,我们需要知道在取一个确定值时,对的导数值,这个导数值即为下降方向,也就是说,我们把w沿着这个方向调整,w就会离最小值的点更近。而学习率(learning rate)即为我们调整的幅度,如果调整幅度过大,可能会错过最小值的点。具体到我们模型中的4个参数上,参数更新公式分别是:
这其中,参数的值是随机初始化好的,是提前设定好的,需要计算4个偏导数的值。
那么我们如何求这4个偏导数呢?直接求比较困难,因为我们并不知道与其中每个参数的函数关系式,但是我们知道每一层的函数(),所以要根据链式法则:
不必被这一长串公式吓到,我们接下来求出其中等号右边的每一项,然后就可以算出等号左边的偏导数了
那么对于等号右边的每一项,其实都是很好求的,我们举个例子:
对于, 因为有
我们这里, = ,第二层的输出过了激活函数即为模型的输出结果,然后是真实值,注意这里没有求和,因为我这里的和都是矢量。那么就很好求了,它就是:
类似地,和的关系,就是一层激活函数的关系,所以它们之间的偏导数,就是对激活函数求偏导即可.
Tanh的偏导:
Relu的偏导:
因此,类似我们可得:
把这些式子带入链式法则,可求出对每个参数的偏导,然后再把偏导带入参数更新公式,就可以求出新的参数了。
2.3 反向传播代码
我们先把激活函数的导数写好
def dRelu(self, u):
"""
:param u: u of any dimension
:return: dRelu(u) """
u[u<=0] = 0
u[u>0] = 1
return u
def dTanh(self, u):
"""
:param u: u of any dimension
:return: dTanh(u)
"""
o = np.tanh(u)
return 1-o**2
在反向传播代码中,有一个细节,是需要注意的:
- 在中,其实除以n或者不除以n都没有很大区别,因为这里说到底是影响了学习率,不影响梯度下降的方向,如果你没有除以n,那么就需要在学习率上做出调整;
- 后面每一个操作其实有类似情况,比如你矩阵相乘后的结果,融合了所有样本的,那么是否需要除以n,再比如有对所有样本loss求和的操作,求和后是否需要除以n等等。我自己亲身实验了下,大部分影响不大,有公式的改动影响很大。这里我统一采用:一旦有合并所有样本的梯度的情况,就除以n
- 这里的yh-y不能搞反,不然梯度下降的方向就错了,会导致loss一致不下降
- 有的时候是点乘,有的时候是矩阵乘,这一点要注意,时时小心,关注每一步的维度变化
def backward(self, y, yh):
n = y.shape[1]
dLoss_o2 = (yh - y) / n
dLoss_u2 = dLoss_o2 * self.dRelu(self.ch['o2']) # (1,379)
dLoss_theta2 = np.matmul(dLoss_u2, self.ch['o1'].T) / n
dLoss_b2 = np.sum(dLoss_u2) / n
dLoss_o1 = np.matmul(self.param["theta2"].T, dLoss_u2) # (20*1) mul (1*379)
dLoss_u1 = dLoss_o1 * self.dTanh(self.ch['u1']) # (20*379)
dLoss_theta1 = np.matmul(dLoss_u1, self.X.T) # (20*379) mul (379*13)
dLoss_b1 = np.sum(dLoss_u1, axis=1, keepdims=True) / n
# parameters update:
self.param["theta2"] = self.param["theta2"] - self.lr * dLoss_theta2
self.param["b2"] = self.param["b2"] - self.lr * dLoss_b2
self.param["theta1"] = self.param["theta1"] - self.lr * dLoss_theta1
self.param["b1"] = self.param["b1"] - self.lr * dLoss_b1
return dLoss_theta2, dLoss_b2, dLoss_theta1, dLoss_b1
3 梯度下降
3.1 梯度下降算法
梯度下降这一块儿,比较简单,现在先说最基础的,我们把所有数据放入模型,跑出结果,然后根据结果调用backward函数,更新参数即可,这里我们每一次都跑全部的数据,然后设定一个参数iter即为迭代的次数。
先写一个函数,输入数据,输出预测结果(其实就是调用forward):
def predict(self, x):
yh = self.forward(x)
return yh
然后就是梯度下降:
def gradient_descent(self, x, y, iter=60000):
"""
每次跑全部数据,跑iter次,每2000次存储一次loss
:param x: data
:param y: labels
:param iter: 迭代次数
"""
for i in range(iter):
pre_y = self.predict(x)
this_loss = self.nloss(y, pre_y)
self.loss.append(this_loss)
if i % 2000 == 0:
print("Loss after iteration", i, ":", this_loss)
self.backward(y, pre_y)
3.2 批量梯度下降
刚刚的方法,每次都要放全部数据,计算量太大了,现在,我们每次放batch_size个数据。它的优点在于,下降速度更快,每次看少量数据就开始修改参数,缺点是loss波动性大,因为它每次没有看全部的数据,很可能这次看了一部分数据,改了参数,第二次看了另一部分数据,又把参数改回去了。因此有一定的波动性。
我们每次选取batch_size个数据,进行训练,然后再往后选batch_size个数据,这样就好。
def batch_gradient_descent(self, x, y, iter=60000):
"""
这里的迭代次数,依然是backward的次数,并非epoch(epoch是看全部数据的次数)
"""
n = y.shape[1]
begin = 0
for k in range(iter):
index_list = [i % n for i in range(begin, begin + self.batch_size)]
x_batch = x[:, index_list]
y_batch = y[:, index_list]
pre_y = self.predict(x_batch)
self.X = x_batch
this_loss = self.nloss(y_batch, pre_y)
if k % 1000 == 0:
self.loss.append(this_loss)
print("Loss after iteration", k, ":", this_loss)
self.backward(y_batch, pre_y)
begin = begin + self.batch_size
4 实验结果
4.1 梯度下降
使用boston数据集
from sklearn.datasets import load_boston
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
if __name__ == '__main__':
dataset = load_boston() # load the dataset
x, y = dataset.data, dataset.target
y = y.reshape(-1, 1)
x = MinMaxScaler().fit_transform(x) # normalize data
y = MinMaxScaler().fit_transform(y)
x_train, x_test, y_train, y_test = train_test_split(x, y, random_state=1) # split data
x_train, x_test, y_train, y_test = x_train.T, x_test.T, y_train.reshape(1, -1), y_test
nn = MLPnet(x_train, y_train, lr=0.001)
nn.gradient_descent(x_train, y_train, iter=60000) # train
# create figure
fig = plt.plot(np.array(nn.loss).squeeze())
plt.title(f'Training: MLPnet')
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.show()
Loss after iteration 0 : 0.08998720790406557
Loss after iteration 2000 : 0.07518079881255157
Loss after iteration 4000 : 0.04587554948861338
Loss after iteration 6000 : 0.03592926084683294
Loss after iteration 8000 : 0.03037128839345559
Loss after iteration 10000 : 0.02678080951732856
...
Loss after iteration 48000 : 0.010800052957060184
Loss after iteration 50000 : 0.010534368656369569
Loss after iteration 52000 : 0.010286303319926585
Loss after iteration 54000 : 0.010054283077500703
Loss after iteration 56000 : 0.009836923785315307
Loss after iteration 58000 : 0.009633001471168611
Mean Squared Error (MSE) 0.02512249554701871
4.2 批量梯度下降:
把刚刚代码中的gradient_descent改一下即可:
nn.batch_gradient_descent(x_train, y_train, iter = 60000) #train
实验结果:
Loss after iteration 0 : 0.08928835763543144
Loss after iteration 1000 : 0.06964017531640365
Loss after iteration 2000 : 0.08718731719795095
Loss after iteration 3000 : 0.07178114114739374
Loss after iteration 4000 : 0.03668939190963524
...
Loss after iteration 56000 : 0.008814925199710841
Loss after iteration 57000 : 0.006920864366920335
Loss after iteration 58000 : 0.005792226865048128
Loss after iteration 59000 : 0.012028778071099835
Mean Squared Error (MSE) 0.025001722467090818
4.3 实验结果分析
在训练中,可以明显感觉到
- 随机梯度下降更快,耗时更少(我没有计时,但是感觉很明显)
- 随机梯度下降地波动性更大
另外要注意,随机梯度下降,我们这里的每个iter的数据量远远小于梯度下降的全量数据
代码在https://github.com/wushangbin/tripping/blob/master/Python/MLP_with_numpy.py
看完代码有帮助的话记得star哦!