keywords: 时间序列处理; 深度学习; keras
针对现有时间序列分类方法的特征提取与分类过程分离,且无法提取存在于不同时间尺度序列的不同特征的问题,作者提出MCNN模型。
对于单一时间序列输入,进行降采样和滑动平均等变化,产生多组长度不同的时间序列,并在多组时间序列上进行卷积,提取不同时间尺度序列的特征。
- Original 表示原始时间序列
-
Multi-Frequency 表示对原始数据滑动平均后的序列
-
Multi-Scale 表示对原始数据降采样后的序列
通过降采样的变换,实现在不同时间尺度的序列上的特征提取。
通过滑动平均的变换,实现对噪音的抵抗性。
加载数据
论文中使用了UCR数据集中的44个任务,首先,根据数据集任务名获取数据集的存储路径。
def get_datasets_path(base_path):
datasets_used = ['Adiac', 'Beef', 'CBF', 'ChlorineConcentration', 'CinC_ECG_torso', 'Coffee', 'Cricket_X', 'Cricket_Y',
'Cricket_Z', 'DiatomSizeReduction', 'ECGFiveDays', 'FaceAll', 'FaceFour', 'FacesUCR', '50words',
'FISH', 'Gun_Point', 'Haptics', 'InlineSkate', 'ItalyPowerDemand', 'Lighting2', 'Lighting7', 'MALLAT',
'MedicalImages', 'MoteStrain', 'NonInvasiveFatalECG_Thorax1', 'NonInvasiveFatalECG_Thorax2', 'OliveOil',
'OSULeaf', 'SonyAIBORobotSurface', 'SonyAIBORobotSurfaceII', 'StarLightCurves', 'SwedishLeaf',
'Symbols',
'synthetic_control', 'Trace', 'TwoLeadECG', 'Two_Patterns', 'UWaveGestureLibrary_X',
'UWaveGestureLibrary_Y', 'UWaveGestureLibrary_Z', 'wafer', 'WordSynonyms', 'yoga']
data_list = os.listdir(base_path)
return ["%s/%s/%s_" % (base_path, data, data) for data in datasets_used if data in data_list]
数据集分为TRAIN和TEST两个文件,分别存储训练集和测试集。作者使用了一种增加数据规模的技巧,使用滑动窗口在时间序列上截取数据,并共享同一个类别标签,同时增加了测试集和训练集的规模。
def data_augmentation(feature, label, ws):
seq_len = feature.shape[1]
aug_feature, aug_label = feature[:, :ws], label
for i in range(1, seq_len-ws+1):
_feature = feature[:, i:i+ws]
aug_feature = np.concatenate((aug_feature, _feature), axis=0)
aug_label = np.concatenate((aug_label, label), axis=0)
return aug_feature, aug_label
def load_feature_label(path, aug_times=0):
data = np.loadtxt(path, dtype=np.float, delimiter=',')
# the first column is label, and the rest are features
feature, label = data[:, 1:], data[:, 0]
if aug_times>0:
feature, label = data_augmentation(feature, label, data.shape[1]-aug_times)
return feature, label
数据变换
降采样
使用一组降采样因子 k1, k2, k3,每隔 ki-1 个数据取一个。
def down_sampling(data, rates):
ds_seq_len = []
ds_data = []
# down sampling by rate k
for k in rates:
if k > data.shape[1] / 3:
break
_data = data[:, ::k] # temp after down sampling
ds_data.append(_data)
ds_seq_len.append(_data.shape[1]) # remark the length info
return ds_data, ds_seq_len
滑动平均
使用一组滑动窗口l1, l2, l3,每li个数据取平均。
def moving_average(data, moving_ws):
num, seq_len = data.shape[0], data.shape[1]
ma_data = []
ma_seq_len = []
for ws in moving_ws:
if ws>data.shape[1]/3:
break
_data = np.zeros((num, seq_len-ws+1))
for i in range(seq_len-ws+1):
_data[:, i] = np.mean(data[:, i: i+ws], axis=1)
ma_data.append(_data)
ma_seq_len.append(_data.shape[1])
return ma_data, ma_seq_len
获取MCNN输入
通过多组降频因子k,和滑动窗口l,对原序列进行处理,得到多个时间序列,并在不同时间序列上进行一维卷积操作,提取在不同时间规模下的抽象特征,是该论文的主要思想。
def get_mcnn_input(feature, label):
origin = feature
ms_branch, ms_lens = down_sampling(feature, rates=[2, 3, 4, 5])
mf_branch, mf_lens = moving_average(feature, moving_ws=[5, 8, 11])
label = np_utils.to_categorical(label) # one hot
features = [origin, *ms_branch, *mf_branch]
features = [data.reshape(data.shape+(1,)) for data in features]
data_lens = [origin.shape[1], *ms_lens, *mf_lens]
return features, label
模型定义
mcnn 模型的输入是多个长度不唯一的时间序列,为了减少代码长度,使用列表推导式建立模型。
def MCNN_model(feature_lens, class_num):
input_sigs = [Input(shape=(bra_len, 1)) for bra_len in feature_lens]
# local convolution
ms_sigs = []
for i in range(len(input_sigs)):
_ms = Conv1D(padding='same', kernel_size=conv_size, filters=256, activation='relu')(input_sigs[i])
pooling_size = (_ms.shape[1].value - conv_size + 1) // pooling_factor
_ms = MaxPooling1D(pool_size=pooling_size)(_ms)
ms_sigs.append(_ms)
merged = concatenate(ms_sigs, axis=1)
# fully convolution
conved = Conv1D(padding='valid', kernel_size=conv_size, filters=256, activation='relu')(merged)
pooled = MaxPooling1D(pool_size=5)(conved)
x = Flatten()(pooled)
x = Dense(256, activation='relu')(x)
x = Dense(256, activation='relu')(x)
x = Dense(class_num, activation='softmax')(x)
MCNN = Model(inputs=input_sigs, outputs=x)
# MCNN.summary()
MCNN.compile(loss='categorical_crossentropy', optimizer='Adam', metrics=['accuracy'])
return MCNN
主函数
def on_single_dataset(data_path):
data_name = data_path.split('/')[-2]
# get the origin time series
f_train, la_tr = load_feature_label(data_path+'TRAIN', aug_times=0)
f_test, la_te = load_feature_label(data_path+'TEST', aug_times=0)
f_train, f_test = normalization(f_train, f_test)
# get the transform sequences
f_trains, la_tr = get_mcnn_input(f_train, la_tr)
f_tests, la_te = get_mcnn_input(f_test, la_te)
feat_lens = [data.shape[1] for data in f_trains] # get the length info of each transform sequence
class_num = la_tr.shape[1]
mcnn = MCNN_model(feat_lens, class_num)
mcnn.fit(f_trains, la_tr, batch_size=128, epochs=64, verbose=0)
te_loss, te_acc = mcnn.evaluate(f_tests, la_te, verbose=0)
print("[%(data)s]: Test loss - %(loss).2f, Test accuracy - %(acc).2f%%" % {'data': data_name, 'loss': te_loss, 'acc': te_acc*100})
la_pred = mcnn.predict(f_tests)
for data in f_trains, f_tests, la_te, la_tr:
del data
gc.collect()
def main():
dataPaths = get_datasets_path(base_path)
for dp in dataPaths:
on_single_dataset(dp)