# -*- coding: utf-8 -*-
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.optim import lr_scheduler #设置动态调整率
from torch.autograd import Variable
#以上代码导入相应的函数模块和库文件
#准备训练文件
TRAIN_DATA_FILE = 'data/MFCC_train_new'
TEST_DATA_FILE = 'data/MFCC_train_new'
#准备相关参数
MINI_BATCH_SIZE = 50
NUM_ITERATIONS = 10000
PRINT_INTERAL = 10
AE_ITERATIONS = 20000
AE_LOSS = nn.MSELoss()
#构建一个神经网络模型
model = nn.Sequential(
nn.Linear(2340, 500),
nn.ReLU(inplace=True),
nn.Linear(500, 200),
nn.ReLU(inplace=True),
nn.Linear(200,10),
nn.ReLU(inplace=True)
)
#对数据文件进行预处理
def parse_file(file_name, shuffle=True):
data_raw = []
labels_raw = []
with open(file_name) as file:
lines = file.readlines() #读取数据,并且自动读取成行的形式
#以上代码是为了读取文件里的数据
#下面将对数据进行特征输入和标志输出分离,把他们分别放进不同的元组里
count = 0
for line in lines: #遍历数组中的每一行
line = line.strip().split(' ') #去除数据的每一行的左右两边的空格,并且用‘ ’隔开
if count == 0:
num_samples, dimensions, num_classes = [int(l) for l in line]
print('{:s} has {:d} samples, each of {:d}-dimensions, in {:d} classes'.format(file_name, num_samples, dimensions, num_classes))
elif count % 2 == 0:
row = [int(l) for l in line]
labels_raw.append(row.index(1)) #返回这行数据中1的序列数,即0-9
else:
row = [int(l) for l in line]
data_raw.append(row)
count += 1
data_array, labels_array = np.array(data_raw, dtype=np.float32),np.array(labels_raw,dtype=np.int64)
#下面为了避免数据本身序列对结果的影响,将数据的顺序打乱,使结果更有说服力
if shuffle:
indices = np.arange(data_array.shape[0]) #0一般表示取出数据的行数,indices表示从0-19999的数
np.random.shuffle(indices) #随机打乱数据的序数
data_array, labels_array = data_array[indices], labels_array[indices]
#把打乱后的数据重新送到数组里
data, labels = torch.from_numpy(data_array), torch.from_numpy(labels_array)
return data, labels
#下面定义一个数据下载函数
def load_data():
x_train, y_train = parse_file(TRAIN_DATA_FILE)
x_test, y_test = parse_file(TEST_DATA_FILE)
return x_train, y_train, x_test, y_test
#定义批量抽取函数
def sample_mini_batch(data, labels): #批量梯度下降法MBGD
indices = np.random.choice(data.size(0), MINI_BATCH_SIZE)
x_batch = data[indices, :]
y_batch = labels[indices]
return x_batch, y_batch
#定义精度计算函数
def evaluate_acc(model, x, y):
scores = model(Variable(x))
_, preds = scores.data.max(1) #表示scores.data.max(1)返回两个最大数的序列号,10个神经元对应0-9个数字号
#_,表示返回的第一个值不要
num_corrects = (preds == y).sum()
num_samples = scores.size(0) #取样本的行数,相当于取样本个数
return float(num_corrects) / num_samples
#下面训练一个自编码器
#首先定义一个自编码器的类
class AutoEncoder(nn.Module):
def __init__(self, input_dim, hidden_dim):
super(AutoEncoder, self).__init__()
#首先调用AutoEncoder的父类,即nn.Module,把AutoEncoder的对象self转换为nn.Module的对象
#然后nn.Module的对象调用自己的__init__函数
def forward(self, x):
h_relu = F.relu(self.encoder(x))
out = self.decoder(h_relu)
return out, h_relu
def get_encoder(self):
return self.encoder.state_dict()['weight'], self.encoder.state_dict()['bias']
#下面定义一个预训练网络模型
def pretrain_model(x_train):
auto_encoders = []
for layer in model:
if type(layer) is nn.Linear:
input_dim = layer.in _features
hidden_dim = layer.out_features
auto_encoders.append(AutoEncoder(input_dim, hidden_dim))
print(auto_encoders)
#以上代码把参数传递给AutoEncoder
for ae_idx in range(len(auto_encoders)):
print('Training AutoEncoder %d' % ae_idx)
x_var = x_train.clone()
for i in range(ae_idx):
_, x_var = auto_encoders[i](x_var)
ae = auto_encoders[ae_idx]
print(ae)
optimizer = optim.Adam(ae.parameters(), lr=1e-4)
for i in range(AE_ITERATIONS):
x_var = Variable(x_var)
x_ae, _ = ae(x_var)
optimizer.zero_grad()
loss = AE_LOSS(x_ae, x_var)
loss.backward()
optimizer.step()
print('Iteration %d, loss = %f' % (i+1, loss))
ae_idx = 0
for layer in model:
if type(layer) is nn.Linear:
W, b = auto_encoders[ae_idx].get_encoder()
for name, param in layer.named_parameters():
print(name)
print(param.size())
if 'weight' in name:
param.data.copy_(W)
if 'bias' in name:
param.data.copy_(b)
ae_idx += 1
return model
#定义训练函数
def train():
x_train, y_train, x_test, y_test = load_data()
model = pretrain_model(x_train)
loss_fn = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)
for t in range(NUM_ITERATIONS):
x_batch, y_batch = sample_mini_batch(x_train, y_train)
x_var, y_var = Variable(x_batch), Variable(y_batch)
scores = model(x_var)
loss = loss_fn(scores, y_var)
optimizer.zero_grad()
loss.backward()
optimizer.step()
train_acc = evaluate_acc(model, x_batch, y_batch)
test_acc = evaluate_acc(model, x_test, y_test)
if t % PRINT_INTERAL == 0:
print('Iteration %d: training loss = %f, training accurary = %f, test accurary = %f' % (t+1,loss, train_acc, test_acc))
final_train_acc = evaluate_acc(model, x_train, y_train)
final_test_acc = evaluate_acc(model, x_test, y_test)
print('Final training accurary:{:.4f}, final test accurary:{:.4f}'.format(final_train_acc, final_test_acc))
if __name__ == '__main__':
train()