摘要:图神经网络
,GraphSAGE
GraphSAGE快速开始
找到了github排名第一的GraphSAGE项目,下载下来
git clone https://github.com/williamleif/GraphSAGE.git
再下载一下所需要的蛋白质数据ppi.zip
链接在http://snap.stanford.edu/graphsage/
直接启动项目下的example_supervised.sh
./example_supervised.sh
...
Iter: 0083 train_loss= 0.47060 train_f1_mic= 0.55874 train_f1_mac= 0.38880 val_loss= 0.45000 val_f1_mic= 0.56650 val_f1_mac= 0.38983 time= 0.06750
Optimization Finished!
Full validation stats: loss= 0.45913 f1_micro= 0.57371 f1_macro= 0.40486 time= 0.95981
Writing test set stats to file (don't peak!)
数据介绍
PPI是指两种或以上的蛋白质结合的过程,如果两个蛋白质共同参与一个生命过程或者协同完成某一功能,都被看作这两个蛋白质之间存在相互作用。多个蛋白质之间的复杂的相互作用关系可以用PPI网络来描述。
下面从作者代码开始看数据源,作者在main中load_data读取数据源,其中FLAGS.train_prefix为./example_data/ppi
train_data = load_data(FLAGS.train_prefix)
def load_data(prefix, normalize=True, load_walks=False):
G_data = json.load(open(prefix + "-G.json"))
G = json_graph.node_link_graph(G_data)
G_data.keys()
Out[11]: dict_keys(['directed', 'graph', 'nodes', 'links', 'multigraph'])
看一下G_data的nodes属性
G_data['nodes']
{'test': False, 'id': 998, 'val': False},
{'test': False, 'id': 999, 'val': False},
...]
G_data['nodes'][0]
Out[20]: {'test': False, 'id': 0, 'val': False}
G_data['nodes'][-1]
Out[19]: {'test': True, 'id': 56943, 'val': False}
图中一共包含56944个节点,每个节点的属性包括id
,test
,val
,分别表示出是否是测试集和验证集,看下G_data的links属性
len(G_data['links'])
Out[22]: 818716
G_data['links'][-1]
Out[23]: {'source': 56939, 'target': 56939}
图上一共有818716条边,每个边的属性包括source
起点和target
终点,调用networkx的json_graph读取为图数据,图数据的节点数和边数和数据源一致
len(G.edges())
Out[28]: 818716
len(G.nodes())
Out[29]: 56944
下面读取节点特征向量
if os.path.exists(prefix + "-feats.npy"):
feats = np.load(prefix + "-feats.npy")
else:
print("No features present.. Only identity features will be used.")
feats = None
看一下features的维度为56944节点,50个特征,特征是稀疏的1,0
feats.shape
Out[33]: (56944, 50)
下一步读取id映射关系,实际上是一一映射的
id_map = json.load(open(prefix + "-id_map.json"))
id_map = {conversion(k): int(v) for k, v in id_map.items()}
id_map[56943]
Out[45]: 56943
下面读取y值标签
class_map = json.load(open(prefix + "-class_map.json"))
if isinstance(list(class_map.values())[0], list):
lab_conversion = lambda n: n
else:
lab_conversion = lambda n: int(n)
class_map = {conversion(k): lab_conversion(v) for k, v in class_map.items()}
label是字典格式,key是节点id,value是label值,是一个121长度的列表,其中有多个1,剩余是0,是一个multihot模式
len(class_map)
Out[55]: 56944
下一步数据清洗,去掉节点中没有test和val属性的脏数据
broken_count = 0
for node in G.nodes():
if not 'val' in G.node[node] or not 'test' in G.node[node]:
G.remove_node(node)
broken_count += 1
print("Removed {:d} nodes that lacked proper annotations due to networkx versioning issues".format(broken_count))
下一步给边打上train_removed属性,如果边的两个节点有任意一个test或者val属性是True,则将边属性置为train_removed
for edge in G.edges():
if (G.node[edge[0]]['val'] or G.node[edge[1]]['val'] or
G.node[edge[0]]['test'] or G.node[edge[1]]['test']):
G[edge[0]][edge[1]]['train_removed'] = True
else:
G[edge[0]][edge[1]]['train_removed'] = False
下一步对训练集节点进行标准化,即列方向均值为0,标准差为1
if normalize and not feats is None:
from sklearn.preprocessing import StandardScaler
train_ids = np.array([id_map[n] for n in G.nodes() if not G.node[n]['val'] and not G.node[n]['test']])
train_feats = feats[train_ids]
scaler = StandardScaler()
scaler.fit(train_feats)
feats = scaler.transform(feats)
最终load_data返回G, feats, id_map, walks, class_map,分别是图对象,节点特征矩阵,idmap,walks空列表和y值字典
模型训练部分
数据准备
直接看train函数,接受刚才得到的load_data数据
G = train_data[0]
features = train_data[1]
id_map = train_data[2]
class_map = train_data[4]
if isinstance(list(class_map.values())[0], list):
num_classes = len(list(class_map.values())[0])
else:
num_classes = len(set(class_map.values()))
首先拿到了tuple每个位置对象,并且得到num_classes=121,接下来对features做了一次padding,在最下面加了一行50*0,具体原因未知
if not features is None:
# pad with dummy zero vector
features = np.vstack([features, np.zeros((features.shape[1],))])
下面定义了一个和walks一样的列表context_pairs,默认是空列表
context_pairs = train_data[3] if FLAGS.random_context else None
下面定义placeholder
placeholders = construct_placeholders(num_classes)
跟一下这个函数construct_placeholders
def construct_placeholders(num_classes):
# Define placeholders
placeholders = {
'labels': tf.placeholder(tf.float32, shape=(None, num_classes), name='labels'),
'batch': tf.placeholder(tf.int32, shape=(None), name='batch1'),
'dropout': tf.placeholder_with_default(0., shape=(), name='dropout'),
'batch_size': tf.placeholder(tf.int32, name='batch_size'),
}
return placeholders
占位符定义了labels
,batch
,dropout
,batch_size
并且定义了tensor的名称。下面看这个函数,名字叫小批量,看起来像生成一个批量的训练样本
minibatch = NodeMinibatchIterator(G,
id_map,
placeholders,
class_map,
num_classes,
batch_size=FLAGS.batch_size,
max_degree=FLAGS.max_degree,
context_pairs=context_pairs)
跟一下这个类
def __init__(self, G, id2idx,
placeholders, label_map, num_classes,
batch_size=100, max_degree=25,
**kwargs):
self.G = G
self.nodes = G.nodes()
self.id2idx = id2idx # id_map
self.placeholders = placeholders
self.batch_size = batch_size # 512
self.max_degree = max_degree # 128
self.batch_num = 0
self.label_map = label_map # class_map
self.num_classes = num_classes # 121
以上是一帮初始值,重点看一下max_degree=128
,代表邻接矩阵下采样的最大值,说白了每个节点取最大邻接节点128个。下面看这行代码生成邻接列表
和度列表
self.adj, self.deg = self.construct_adj()
跟一下construct_adj
,简单而言作者创建了adj
和deg
两个array,其中adj的shape=(56945, 128)
,deg的shape=(56944,)
def construct_adj(self):
adj = len(self.id2idx) * np.ones((len(self.id2idx) + 1, self.max_degree))
deg = np.zeros((len(self.id2idx),))
for nodeid in self.G.nodes():
if self.G.node[nodeid]['test'] or self.G.node[nodeid]['val']:
continue
# 只取训练
# 获取nodeid的邻居节点,且边不为train_removed的节点id
neighbors = np.array([self.id2idx[neighbor]
for neighbor in self.G.neighbors(nodeid)
if (not self.G[nodeid][neighbor]['train_removed'])])
# nodeid的度
deg[self.id2idx[nodeid]] = len(neighbors)
if len(neighbors) == 0:
continue
if len(neighbors) > self.max_degree:
# 抽取最大max_degree邻居数,不放回抽样
neighbors = np.random.choice(neighbors, self.max_degree, replace=False)
elif len(neighbors) < self.max_degree:
# 邻居不足128,有放回抽样到128
neighbors = np.random.choice(neighbors, self.max_degree, replace=True)
adj[self.id2idx[nodeid], :] = neighbors
return adj, deg
下面只对训练集求每个节点的度和邻居,如果邻居超过max_degree=128则进行阶段,如果少于则重复抽样直到达到128,最后adj每一行代表一个节点的邻居列表,dag的每一个元素代表对应位置的度。看一下最后的结果
minibatch.adj[0]
Out[115]:
array([ 766., 1101., 766., 1101., 766., 1101., 372., 1101., 766.,
1101., 372., 766., 372., 1101., 766., 1101., 372., 372.,
372., 372., 1101., 766., 372., 766., 372., 372., 1101.,
372., 1101., 372., 372., 372., 766., 1101., 1101., 766.,
766., 766., 1101., 372., 372., 766., 1101., 372., 766.,
766., 766., 766., 1101., 766., 372., 1101., 372., 766.,
372., 766., 1101., 766., 372., 766., 766., 372., 766.,
1101., 766., 1101., 1101., 766., 372., 372., 1101., 372.,
766., 1101., 1101., 372., 1101., 1101., 372., 372., 1101.,
766., 1101., 1101., 1101., 766., 372., 766., 766., 1101.,
766., 766., 1101., 766., 372., 766., 766., 1101., 1101.,
766., 766., 372., 1101., 1101., 372., 1101., 372., 1101.,
766., 766., 372., 1101., 1101., 1101., 372., 1101., 1101.,
372., 766., 1101., 766., 372., 1101., 372., 766., 372.,
372., 766.])
minibatch.deg[0]
Out[116]: 3.0
set(minibatch.adj[0])
Out[117]: {372.0, 766.0, 1101.0}
节点0的邻居只有三个,adj对其进行了采样到128个,在看测试数据部分construct_test_adj
def construct_test_adj(self):
adj = len(self.id2idx) * np.ones((len(self.id2idx) + 1, self.max_degree))
for nodeid in self.G.nodes():
neighbors = np.array([self.id2idx[neighbor]
for neighbor in self.G.neighbors(nodeid)])
if len(neighbors) == 0:
continue
if len(neighbors) > self.max_degree:
neighbors = np.random.choice(neighbors, self.max_degree, replace=False)
elif len(neighbors) < self.max_degree:
neighbors = np.random.choice(neighbors, self.max_degree, replace=True)
adj[self.id2idx[nodeid], :] = neighbors
return adj
区别就是不限制节点的训练测试验证属性了,但是还是要做128的采样,同时只输出adj。来看初始化最后收尾
self.val_nodes = [n for n in self.G.nodes() if self.G.node[n]['val']]
self.test_nodes = [n for n in self.G.nodes() if self.G.node[n]['test']]
self.no_train_nodes_set = set(self.val_nodes + self.test_nodes)
self.train_nodes = set(G.nodes()).difference(self.no_train_nodes_set)
# don't train on nodes that only have edges to test set
self.train_nodes = [n for n in self.train_nodes if self.deg[id2idx[n]] > 0]
分别获得val_nodes
节点,test_nodes
节点,train_nodes
节点,并且把在训练集中没有边的节点去掉。
继续回到train函数,作者定义了占位符和minibatch.adj维度(56945, 128)一致,将他赋值为tensor变量,不随模型迭代而改变,命名为adj_info
adj_info_ph = tf.placeholder(tf.int32, shape=minibatch.adj.shape)
adj_info = tf.Variable(adj_info_ph, trainable=False, name="adj_info")
GraphSAGE采样
下面到了采样部分,默认采用graphsage_mean
模式
if FLAGS.model == 'graphsage_mean':
# Create model
sampler = UniformNeighborSampler(adj_info)
if FLAGS.samples_3 != 0:
layer_infos = [SAGEInfo("node", sampler, FLAGS.samples_1, FLAGS.dim_1),
SAGEInfo("node", sampler, FLAGS.samples_2, FLAGS.dim_2),
SAGEInfo("node", sampler, FLAGS.samples_3, FLAGS.dim_2)]
elif FLAGS.samples_2 != 0:
layer_infos = [SAGEInfo("node", sampler, FLAGS.samples_1, FLAGS.dim_1),
SAGEInfo("node", sampler, FLAGS.samples_2, FLAGS.dim_2)]
else:
layer_infos = [SAGEInfo("node", sampler, FLAGS.samples_1, FLAGS.dim_1)]
跟进UniformNeighborSampler
class UniformNeighborSampler(Layer):
"""
Uniformly samples neighbors.
Assumes that adj lists are padded with random re-sampling
"""
def __init__(self, adj_info, **kwargs):
super(UniformNeighborSampler, self).__init__(**kwargs)
self.adj_info = adj_info
def _call(self, inputs):
ids, num_samples = inputs
adj_lists = tf.nn.embedding_lookup(self.adj_info, ids)
adj_lists = tf.transpose(tf.random_shuffle(tf.transpose(adj_lists)))
adj_lists = tf.slice(adj_lists, [0, 0], [-1, num_samples])
return adj_lists
初始化传入adj_info,_call复写父类Layer,目前还没有被调用,先接着往下看
if FLAGS.samples_3 != 0:
layer_infos = [SAGEInfo("node", sampler, FLAGS.samples_1, FLAGS.dim_1),
SAGEInfo("node", sampler, FLAGS.samples_2, FLAGS.dim_2),
SAGEInfo("node", sampler, FLAGS.samples_3, FLAGS.dim_2)]
elif FLAGS.samples_2 != 0:
layer_infos = [SAGEInfo("node", sampler, FLAGS.samples_1, FLAGS.dim_1),
SAGEInfo("node", sampler, FLAGS.samples_2, FLAGS.dim_2)]
else:
layer_infos = [SAGEInfo("node", sampler, FLAGS.samples_1, FLAGS.dim_1)]
其中samples_3
默认等于0,samples_2=10
,samples_1=25
,继续紧跟SAGEInfo
SAGEInfo = namedtuple("SAGEInfo",
['layer_name', # name of the layer (to get feature embedding etc.)
'neigh_sampler', # callable neigh_sampler constructor
'num_samples',
'output_dim' # the output (i.e., hidden) dimension
])
作者定义了一个namedtuple用于存储模型层信息,namedtuple类似于字典可以将一组不可变的变量集合起来进行统一存放和读取。例如
a = SAGEInfo(layer_name=1, neigh_sampler=2, num_samples=3, output_dim=4)
可以使用.
或者依旧使用元祖的索引方式取到值
a[0]
Out[156]: 1
a.neigh_sampler
Out[157]: 2
因此如下代码定义了2层,第一层的layer_name=node,取样器为sampler,邻居数25,输出维度128;第二层layer_name=node,取样器为sampler,邻居数10,输出维度128。
layer_infos = [SAGEInfo("node", sampler, FLAGS.samples_1, FLAGS.dim_1), # 128
SAGEInfo("node", sampler, FLAGS.samples_2, FLAGS.dim_2)] # 128
模型构建
下面来看模型SupervisedGraphsage
model = SupervisedGraphsage(num_classes, placeholders,
features,
adj_info,
minibatch.deg,
layer_infos,
model_size=FLAGS.model_size,
sigmoid_loss=FLAGS.sigmoid,
identity_dim=FLAGS.identity_dim,
logging=True)
追进看
if aggregator_type == "mean":
self.aggregator_cls = MeanAggregator
MeanAggregator
是一个继承Layer的类用来均值聚合,先放一边,下面就是一帮数据初始化
# get info from placeholders...
self.inputs1 = placeholders["batch"] # tf.placeholder(tf.int32, shape=(None), name='batch1')
self.model_size = model_size # small
self.adj_info = adj # tf.Variable(adj_info_ph, trainable=False, name="adj_info")
if identity_dim > 0: # 0
self.embeds = tf.get_variable("node_embeddings", [adj.get_shape().as_list()[0], identity_dim])
else:
self.embeds = None
if features is None:
if identity_dim == 0:
raise Exception("Must have a positive value for identity feature dimension if no input features given.")
self.features = self.embeds
else:
self.features = tf.Variable(tf.constant(features, dtype=tf.float32), trainable=False) # np.vstack([features, np.zeros((features.shape[1],))])
if not self.embeds is None:
# is None
self.features = tf.concat([self.embeds, self.features], axis=1)
self.degrees = degrees
self.concat = concat # True
self.num_classes = num_classes # 121
self.sigmoid_loss = sigmoid_loss # False
self.dims = [(0 if features is None else features.shape[1]) + identity_dim] # 50
self.dims.extend([layer_infos[i].output_dim for i in range(len(layer_infos))]) # range(2),[50,128,128]
self.batch_size = placeholders["batch_size"] # tf.placeholder(tf.int32, name='batch_size')
self.placeholders = placeholders
self.layer_infos = layer_infos
self.optimizer = tf.train.AdamOptimizer(learning_rate=FLAGS.learning_rate) # 0.01
self.build()
看build,第一行我就吓了一跳
samples1, support_sizes1 = self.sample(self.inputs1, self.layer_infos)
采样模块
sample
方法继承父类models.SampleAndAggregate
,输入输入的批次数据和层信息
def sample(self, inputs, layer_infos, batch_size=None):
""" Sample neighbors to be the supportive fields for multi-layer convolutions.
Args:
inputs: batch inputs
batch_size: the number of inputs (different for batch inputs and negative samples).
"""
if batch_size is None:
batch_size = self.batch_size
samples = [inputs] # [tf.placeholder(tf.int32, shape=(None), name='batch1')]
# size of convolution support at each layer per node
support_size = 1
support_sizes = [support_size]
for k in range(len(layer_infos)): # range(2)
t = len(layer_infos) - k - 1 # 1, 0
support_size *= layer_infos[t].num_samples # 10, 25
sampler = layer_infos[t].neigh_sampler # 采样器
# 开始调用__call__
# [tf.placeholder(tf.int32, shape=(None), name='batch1')]
# 10
node = sampler((samples[k], layer_infos[t].num_samples))
# 采样的节点纳入输入数据
# support_size * batch_size将所有节点全部铺开
samples.append(tf.reshape(node, [support_size * batch_size, ]))
support_sizes.append(support_size)
return samples, support_sizes
这个里面在根据输入节点数据做2层卷积和1层卷积采样,将2层采样得到的邻居节点加入数据集,第一层以第二层的节点为中心节点再找一层25个近邻,这个地方和下面这个公式已经投起来了
以作者的例子来看,最终模型是一个两层的模型,每个中心节点是两层之后的特征表示,为了获得每个节点2阶之后的聚合表达,必定要先拿到k-1层
(第一阶之后)的邻居节点的特征表示在k层
第二层进行操作,第二层采样数量是10,因此先找到输入节点采样邻居10个,这样k-1层的节点就获得了,下一步为了获得k-1层的特征表示必须获得k-2层
(第一阶之前)的邻居节点做第一阶的聚合,第一阶的采样是25,因此以k-1层采样的节点为中心,取每个邻居25。画个示意图如下:
下面来看采样的实现,直接看sampler的_call函数
def _call(self, inputs):
# 输入数据,取样个数
ids, num_samples = inputs
# 获得节点采样的128个邻居列表
adj_lists = tf.nn.embedding_lookup(self.adj_info, ids)
# 行内统一shuffle算子乱序,各个节点的;邻居统一乱序
adj_lists = tf.transpose(tf.random_shuffle(tf.transpose(adj_lists)))
# 截取,从左顶点开始,截取所有行,25列
adj_lists = tf.slice(adj_lists, [0, 0], [-1, num_samples]) # 25
return adj_lists
输入的inputs是一个元组,首先根据tf.nn.embedding_lookup
拿到了该批次输入数据ids的邻居列表,因为embedding_lookup表是之前构建好的各节点的邻居列表minibatch.adj的占位符,这也解释了之前创建minibatch.adj邻居列表的目的,如果邻居不足采样Sk就会重复抽样。下一步通过tf.transpose
+tf.random_shuffle
对每个节点的邻居列表进行shuffle,最后通过tf.slice
截取每个节点128个邻居的前25个(num_samples,第一层25个,第二层10个),结果返回邻居列表adj_lists
。在回到sample函数,第一层拿到邻居之后有新增了batch_size×25
个节点,因此samples在append的时候需要reshape[support_size * batch_size, ]
打回原形,第二层在第一层的基础上再做embedding_lookup,始终保证tf.slice处理一个二维矩阵
samples.append(tf.reshape(node, [support_size * batch_size, ]))
第二层的节点再错reshape统一维度之后append到samples中,最后输出samples
,support_sizes
,分别对应[inputs, node_layer_2, node_layer_1]
, [1, 10, 250]
。
聚合模块
下面继续看build构建模型
# [25, 10]
num_samples = [layer_info.num_samples for layer_info in self.layer_infos]
self.outputs1, self.aggregators = self.aggregate(samples1, [self.features], self.dims, num_samples,
support_sizes1, concat=self.concat, model_size=self.model_size)
看输出的变量名是拿到了聚合对象和聚合之后的节点表示,跟进aggregate
def aggregate(self, samples, input_features, dims, num_samples, support_sizes, batch_size=None,
aggregators=None, name=None, concat=False, model_size="small"):
""" At each layer, aggregate hidden representations of neighbors to compute the hidden representations
at next layer.
Args:
samples: a list of samples of variable hops away for convolving at each layer of the
network. Length is the number of layers + 1. Each is a vector of node indices.
input_features: the input features for each sample of various hops away.
dims: a list of dimensions of the hidden representations from the input layer to the
final layer. Length is the number of layers + 1.
num_samples: list of number of samples for each layer.
support_sizes: the number of nodes to gather information from for each layer.
batch_size: the number of inputs (different for batch inputs and negative samples).
Returns:
The hidden representation at the final layer for all nodes in batch
"""
先看函数的注释,这个函数返回了该批次下所有节点在最后一层
的节点表征,其他参数如下:
-
samples
:list格式,记录每层的节点,list长度等于卷积层数+1,list中每个元素是几点索引组成的向量,案例中是[inputs, node_layer_2, node_layer_1] -
input_features
:记录不同跳距的每个节点的特征向量 -
dims
:list格式,记录从原始输入到每一层卷积后节点的表征维度,案例中默认是[50,128,128],长度是卷积层数+1 -
num_samples
:list格式,记录每一层卷积采样个数,案例中是[25, 10] -
support_sizes
:list格式,表示为每层收集信息的节点数,案例中是[1, 10, 250]
下面看该函数的实现部分,先对samples的每一层做了embedding_lookup拿到每一层的features,分别是批次中中心节点的features,执行2层卷积的邻居节点features,执行1层卷积的邻居节点features
# length: number of layers + 1
hidden = [tf.nn.embedding_lookup(input_features, node_samples) for node_samples in samples]
下面判断是否传入aggregators对象,否则新建一个aggregators列表
new_agg = aggregators is None
if new_agg:
aggregators = []
下面看迭代部分,一共2层迭代计算两次
for layer in range(len(num_samples)): # [25, 10], 2
if new_agg:
# concat=True, layer=1, dim_mult = 1
dim_mult = 2 if concat and (layer != 0) else 1
# aggregator at current layer
if layer == len(num_samples) - 1: # 1
aggregator = self.aggregator_cls(dim_mult * dims[layer], dims[layer + 1], act=lambda x: x,
dropout=self.placeholders['dropout'],
name=name, concat=concat, model_size=model_size)
else:
# layer = 0,dim_mult=1,dims[layer]=50,dims[layer + 1]=128
aggregator = self.aggregator_cls(dim_mult * dims[layer], dims[layer + 1],
dropout=self.placeholders['dropout'], # tf.placeholder_with_default(0., shape=(), name='dropout')
name=name, concat=concat, model_size=model_size)
aggregators.append(aggregator)
else:
aggregator = aggregators[layer]
这个地方通过各种判断实例化了aggregator
对象,在本案例中由于在实例化SupervisedGraphsage
使用了默认的aggregator_type="mean"
因此此处aggregator是MeanAggregator
if aggregator_type == "mean":
self.aggregator_cls = MeanAggregator
下面重点来看计算更新过程
next_hidden = []
# as layer increases, the number of support nodes needed decreases
for hop in range(len(num_samples) - layer): # range(2)
# dim_mult = 1
dim_mult = 2 if concat and (layer != 0) else 1
neigh_dims = [batch_size * support_sizes[hop], # [1, 10, 250][0]
num_samples[len(num_samples) - hop - 1], # num_samples[1] = 10
dim_mult * dims[layer]] # 50
# hidden[0]=input的features, hidden[1]=node_layer_2的features, batch_size,10,50
h = aggregator((hidden[hop],
tf.reshape(hidden[hop + 1], neigh_dims)))
next_hidden.append(h)
hidden = next_hidden
开始有点看不懂了,跟进MeanAggregator的_call
def _call(self, inputs):
# hidden[0]=input的features, reshape(hidden[1]=node_layer_2的features, batch_size,10,50)
# 自己的特征向量,邻居的特征向量
self_vecs, neigh_vecs = inputs
neigh_vecs = tf.nn.dropout(neigh_vecs, 1 - self.dropout)
self_vecs = tf.nn.dropout(self_vecs, 1 - self.dropout)
# 邻居的特征向量聚合求均值
neigh_means = tf.reduce_mean(neigh_vecs, axis=1)
# [nodes] x [out_dim]
# 邻居节点聚合均值之后的W
from_neighs = tf.matmul(neigh_means, self.vars['neigh_weights'])
# 自身节点的W
from_self = tf.matmul(self_vecs, self.vars["self_weights"])
if not self.concat: # concat=True
output = tf.add_n([from_self, from_neighs])
else: # ok
output = tf.concat([from_self, from_neighs], axis=1)
# bias
if self.bias:
output += self.vars['bias']
return self.act(output)
看了这个_call之后大概知道它在干嘛了,这一步在做邻居聚合求均值
=>W×邻居
=>W×自身
=>向量concat
=>激活函数输出
,对应公式中这段内容
单独看一下两个W权重部分
# self.name=layer_1
with tf.variable_scope(self.name + name + '_vars'):
# neigh_input_dim=50, output_dim=128
self.vars['neigh_weights'] = glorot([neigh_input_dim, output_dim],
name='neigh_weights')
# neigh_input_dim=50, output_dim=128
self.vars['self_weights'] = glorot([input_dim, output_dim],
name='self_weights')
if self.bias: # False
self.vars['bias'] = zeros([self.output_dim], name='bias')
作者的代码实现看起来和论文不太一样,作者是分别给邻居和自身的特征向量乘以了一个单独的W再concat,而论文中是先concat再一起给一个W,并且论文中最后激活函数后给了L2归一化,但是作者没有这个逻辑。
继续分析这段代码,这里是整个项目的重中之重,,锁定layer=0
,此时dim_mult * dims[layer], dims[layer + 1]分别是50和128,输入是50维
,输出是128维
,再看hop,当hop=0
时
h = aggregator((hidden[hop],
tf.reshape(hidden[hop + 1], neigh_dims)))
next_hidden.append(h)
卷积层对原始节点
和周边邻居(采样10)
做了一次卷积聚合操作,计算出h0
存入next_hidden,当hop=1时,卷积层对第二阶的节点
和周围邻居节点(采样25)
做了一次卷积聚合操作,此时是用的同一个聚合器,就是说聚合器的W权重是共享
的,计算出h1
存入next_hidden,将next_hidden更新为hidden给下一层计算使用,在下一层中layer=1
,此时dim_mult=2
导致输入dim_mult * dims[layer]变成1282因为在上一步中做了concat*,输出维度不变还是128,第二层卷积最终没有激活函数看样子要直接输出了。在layer=1的情况下hop只能等于0
h = aggregator((hidden[hop],
tf.reshape(hidden[hop + 1], neigh_dims)))
next_hidden.append(h)
此时上边这段代码的含义就是以上一层卷积之后的中心节点向量作为自身,以上一层卷积之后的邻居节点向量作为邻居,再做一次卷积,输出得到中心节点的最终向量表示,存入next_hidden更新到hidden,最终输出的hidden[0]
就是全部中心节点的最终特征向量
,这个特征向量长度应该是128 × 2
,aggregators返回了第一层卷积和第二层卷积的MeanAggregator对象。用图示表示一下这个过程就是这样的
再接着看build下面就轻松了
self.outputs1 = tf.nn.l2_normalize(self.outputs1, 1)
dim_mult = 2 if self.concat else 1 # 2
# 128 * 2, 121,没有激活函数
self.node_pred = layers.Dense(dim_mult * self.dims[-1], self.num_classes,
dropout=self.placeholders['dropout'],
act=lambda x: x)
# TF graph management
self.node_preds = self.node_pred(self.outputs1)
作者在第二层后面加入了L2归一化,然后加入全连接
,输入是128 × 2, 输出是121,没有激活函数,相当于回归预测,也是调用的layers下的Dense的_call函数,这个Dense里面会做dropout。
loss模块
到这里模型看完了可以休息一下,半条命已经看掉了,下面看loss模块,这个模块也相当重要啊。
# self._loss()
def _loss(self):
# Weight decay loss
for aggregator in self.aggregators:
for var in aggregator.vars.values():
self.loss += FLAGS.weight_decay * tf.nn.l2_loss(var)
for var in self.node_pred.vars.values():
self.loss += FLAGS.weight_decay * tf.nn.l2_loss(var)
# classification loss
if self.sigmoid_loss:
self.loss += tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(
logits=self.node_preds,
labels=self.placeholders['labels']))
else:
self.loss += tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(
logits=self.node_preds,
labels=self.placeholders['labels']))
tf.summary.scalar('loss', self.loss)
先简单来看上面都是给卷积部分和全连接部分的W假的l2 loss,下面是残差,用的是softmax的交叉熵,这个地方是label会有多个1,试一下softmax_cross_entropy_with_logits能否用于这种情况
import numpy as np
A = np.array([[1.0, 2.0, 3.0, 4.0, 5.0, 6.0],
[1.0, 2.0, 3.0, 4.0, 5.0, 6.0]])
with tf.Session() as sess:
print(sess.run(tf.nn.softmax(A)))
print(sess.run(tf.nn.softmax_cross_entropy_with_logits(
labels=[[0, 0, 1, 0, 0, 1],
[0, 0, 0, 1, 0, 1]],
logits=A)))
# [3.91238663 2.91238663]
可以没啥问题,照算无误,下面看模型层的vars部分,卷积部分的W就是neigh_weights
,self_weights
,bias
(默认没有),由此可见同一层的aggregator是权重共享
的。全连接的vars是weights
和bais
(默认有),最后的loss是所有W的l2 loss+softmax残差loss。
优化器模块
优化器模块使用了
grads_and_vars = self.optimizer.compute_gradients(self.loss)
clipped_grads_and_vars = [(tf.clip_by_value(grad, -5.0, 5.0) if grad is not None else None, var)
for grad, var in grads_and_vars]
self.grad, _ = clipped_grads_and_vars[0]
self.opt_op = self.optimizer.apply_gradients(clipped_grads_and_vars)
这个地方手动计算出梯度,并且对梯度进行了修剪
,限制了梯度的最大最小值,最终优化的目标是这个optimizer.apply_gradients(clipped_grads_and_vars)
预测模块
预测模块比较简单,使用softmax拿到全连接之后的预测值
def predict(self):
if self.sigmoid_loss:
return tf.nn.sigmoid(self.node_preds)
else:
return tf.nn.softmax(self.node_preds)
ok到现在为止,整个模型网络已经构建完成了。
模型训练
一开始指定了一些tensorflow的配置
# 记录设备指派情况
config = tf.ConfigProto(log_device_placement=FLAGS.log_device_placement) # False
# 控制显存动态增长
config.gpu_options.allow_growth = True
# config.gpu_options.per_process_gpu_memory_fraction = GPU_MEM_FRACTION
# 如果你指定的设备不存在,允许TF自动分配设
config.allow_soft_placement = True
下面创建Session并且对summary做准备写入文件地址
# Initialize session
sess = tf.Session(config=config)
merged = tf.summary.merge_all()
summary_writer = tf.summary.FileWriter(log_dir(), sess.graph)
下面这一步神奇了
# Init variables
sess.run(tf.global_variables_initializer(), feed_dict={adj_info_ph: minibatch.adj})
本来就对上面的这一段代码有疑问,作者设置了一个占位符,又赋值给一个tf.Variable
adj_info_ph = tf.placeholder(tf.int32, shape=minibatch.adj.shape)
adj_info = tf.Variable(adj_info_ph, trainable=False, name="adj_info")
这样直接在Session中run adj_info feed_dict adj_info_ph是直接报错的,所以作者的这种写法给出了答案,将placeholder直接在runtf.global_variables_initializer()
的时候feed_dict传值,测试一下
tf.reset_default_graph()
adj_info_ph = tf.placeholder(tf.int32, shape=(2, 2))
adj_info = tf.Variable(adj_info_ph, trainable=False)
with tf.Session() as sess:
sess.run(tf.global_variables_initializer(), feed_dict={adj_info_ph: np.array([[2, 2], [1, 3]])})
print(sess.run(adj_info)
[[2 2]
[1 3]]
太强了作者真乃天人也。这下在global_variables_initializer之后直接run对应的tf.Variable即可不需要再传值了,相当于这个传入的是全局变量
,每个epoch下是不变的。换成以下这种写法直接报错
tf.reset_default_graph()
adj_info_ph = tf.placeholder(tf.int32, shape=(2, 2))
adj_info = tf.Variable(adj_info_ph, trainable=False)
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
print(sess.run(adj_info, feed_dict={adj_info_ph: np.array([[2, 2], [1, 3]])}))
tensorflow.python.framework.errors_impl.InvalidArgumentError: You must feed a value for placeholder tensor 'Placeholder' with dtype int32 and shape [2,2]
因此初始化adj_info必须使用global_variables_initializer
在全局指定
下面定义了小东西和训练的邻接列表和测试的邻接列表,这个地方相当于对adj_info重新赋值给新的变量,原始的adj_info值不变。
下面看真正批次迭代训练部分
for epoch in range(FLAGS.epochs): # 10
minibatch.shuffle()
iter = 0
print('Epoch: %04d' % (epoch + 1))
epoch_val_costs.append(0)
while not minibatch.end():
# Construct feed dictionary
feed_dict, labels = minibatch.next_minibatch_feed_dict()
此时轮到minibatch显神威了,只见他先shuffle
了一下,跟进一下
def shuffle(self):
""" Re-shuffle the training set.
Also reset the batch number.
"""
self.train_nodes = np.random.permutation(self.train_nodes)
self.batch_num = 0
其中train_nodes是一个列表,列表中记录了训练节点的索引值,这个地方使用np.random.permutation
打乱列表顺序,并将batch_num重置为0。
下面开始判断minibatch.end()
,显然这个是False,看下代码
def end(self):
return self.batch_num * self.batch_size >= len(self.train_nodes)
其中batch_num初始化之后是0,batch_size实在实例化NodeMinibatchIterator
中设置512,len(train_nodes)=44906,显然作者就是判断多轮batch_num之后是否超出train_nodes的长度,如果没有都可以继续采小批量,训练样本还可以再利用。
下面重点来了minibatch.next_minibatch_feed_dict()
这个函数肯定是生成小批量的,跟进
def next_minibatch_feed_dict(self):
start_idx = self.batch_num * self.batch_size # 0 * 512 = 0
self.batch_num += 1 # 1
end_idx = min(start_idx + self.batch_size, len(self.train_nodes)) # min(512, 44906)
batch_nodes = self.train_nodes[start_idx: end_idx]
return self.batch_feed_dict(batch_nodes)
这个函数很好理解,实际上就是获得开始索引和结束索引,在train_nodes上顺序滑动取批量数据,end_idx如果超出了train_nodes最大值则取train_nodes最后一个值的索引,跟进batch_feed_dict
def batch_feed_dict(self, batch_nodes, val=False):
batch1id = batch_nodes
batch1 = [self.id2idx[n] for n in batch1id]
# 获得每个节点的label
labels = np.vstack([self._make_label_vec(node) for node in batch1id])
feed_dict = dict()
feed_dict.update({self.placeholders['batch_size']: len(batch1)})
feed_dict.update({self.placeholders['batch']: batch1})
feed_dict.update({self.placeholders['labels']: labels})
return feed_dict, labels
这个也很好理解,这个函数里面获得了labels,这个函数_make_label_vec
就是通过label_map
获得了node对应的label表示,是一个multihot的array格式。最后将实际的batch_size
,batch
,labels
赋值给对应的占位符,返回feed_dict和实际的labels矩阵。
总结一下没调用一次minibatch.next_minibatch_feed_dict()
,就会在train_nodes中顺序滑动512个节点进入批次训练,因此最终的输入是节点索引的列表
。每次调用之前先用minibatch.end()
判断,如果train_nodes全部用完就会跳出循环重新下一轮迭代整个训练样本重来。
下面继续看训练部分
feed_dict.update({placeholders['dropout']: FLAGS.dropout}) # 0
所有模型中卷积和全连接的dropout都是0,下面开始训练
# Training step
outs = sess.run([merged, model.opt_op, model.loss, model.preds], feed_dict=feed_dict)
train_cost = outs[2]
主要是基于feed_dict的批次节点列表和labels值训练model.opt_op
,得到model.loss
。下面开始验证部分
if iter % FLAGS.validate_iter == 0: # 5000
# Validation
sess.run(val_adj_info.op)
if FLAGS.validate_batch_size == -1:
val_cost, val_f1_mic, val_f1_mac, duration = incremental_evaluate(sess, model, minibatch,
FLAGS.batch_size)
else:
val_cost, val_f1_mic, val_f1_mac, duration = evaluate(sess, model, minibatch,
FLAGS.validate_batch_size)
sess.run(train_adj_info.op)
epoch_val_costs[-1] += val_cost
if total_steps % FLAGS.print_every == 0:
summary_writer.add_summary(outs[0], total_steps)
每隔5000个iter验证一下sess.run(val_adj_info.op)
,这个是什么意思?默认的validate_batch_size
是256,直接看这个
val_cost, val_f1_mic, val_f1_mac, duration = evaluate(sess, model, minibatch, FLAGS.validate_batch_size)
跟进evaluate
def evaluate(sess, model, minibatch_iter, size=None):
t_test = time.time()
feed_dict_val, labels = minibatch_iter.node_val_feed_dict(size)
node_outs_val = sess.run([model.preds, model.loss],
feed_dict=feed_dict_val)
mic, mac = calc_f1(labels, node_outs_val[0])
return node_outs_val[1], mic, mac, (time.time() - t_test)
其中minibatch_iter
就是minibatch,直接看minibatch_iter.node_val_feed_dict(256)
def node_val_feed_dict(self, size=None, test=False):
if test:
val_nodes = self.test_nodes
else:
val_nodes = self.val_nodes
if not size is None:
val_nodes = np.random.choice(val_nodes, size, replace=True)
# add a dummy neighbor
ret_val = self.batch_feed_dict(val_nodes)
return ret_val[0], ret_val[1]
这个好理解,从val_nodes
中有放回地抽取256个节点索引,调用batch_feed_dict
返回feed_dict和labels列表。下一步验证部分直接预测拿softmax和loss
node_outs_val = sess.run([model.preds, model.loss],
feed_dict=feed_dict_val)
下一步计算micro_f1
和macro_f1
mic, mac = calc_f1(labels, node_outs_val[0])
看一下这个函数calc_f1
有点意思
def calc_f1(y_true, y_pred):
if not FLAGS.sigmoid:
y_true = np.argmax(y_true, axis=1)
y_pred = np.argmax(y_pred, axis=1)
else:
y_pred[y_pred > 0.5] = 1
y_pred[y_pred <= 0.5] = 0
return metrics.f1_score(y_true, y_pred, average="micro"), metrics.f1_score(y_true, y_pred, average="macro")
其中这两步,直接把121的每个元素置为0或者1,在调用metrics.f1_score
的时候实际上
y_pred[y_pred > 0.5] = 1
y_pred[y_pred <= 0.5] = 0
相当于作者最后用序列中每个位置预测1的F1值作为评价指标,测试一下sklearn.metrics的f1_score
metrics.f1_score([[1, 0, 0], [1, 0, 1]], [[0, 0, 0], [1, 1, 1]], average="micro")
Out[210]: 0.6666666666666666
首先看精确率,右边是预测的,预测3个1,其中第一个和第三个是对的,精确率2/3,再看召回率,一共3个1,其中第二个和第三个被命中了,召回率是2/3,因此f1=0.6666,micro表示正规的计算方式,macro会对样本分布中的0,1加权。显然,如果预测的multihot和实际的multihot接近,f1值应该越高。因此整体上evaluate
返回了验证的loss,两个f1值以及耗时。
最终验证完毕loss会自增到之前的0上
sess.run(train_adj_info.op)
epoch_val_costs[-1] += val_cost
然后每隔5个total_steps
,记录一次summary
if total_steps % FLAGS.print_every == 0: # 5
summary_writer.add_summary(outs[0], total_steps)
然后直接看格式化输出部分
if total_steps % FLAGS.print_every == 0:
train_f1_mic, train_f1_mac = calc_f1(labels, outs[-1])
print("Iter:", '%04d' % iter,
"train_loss=", "{:.5f}".format(train_cost),
"train_f1_mic=", "{:.5f}".format(train_f1_mic),
"train_f1_mac=", "{:.5f}".format(train_f1_mac),
"val_loss=", "{:.5f}".format(val_cost),
"val_f1_mic=", "{:.5f}".format(val_f1_mic),
"val_f1_mac=", "{:.5f}".format(val_f1_mac),
"time=", "{:.5f}".format(avg_time))
iter += 1
total_steps += 1
if total_steps > FLAGS.max_total_steps: # 10000000000
break
if total_steps > FLAGS.max_total_steps: # 10000000000
break
作者定义了两个计数器iter
和total_steps
,其中iter是单个Epoch内的,可以算出iter的最大值是44906 / 512=87左右。total_steps是总共的,模型每训练一个batch这两个计数器都+1。由于FLAGS.validate_iter
设置了5000,所以iter % FLAGS.validate_iter == 0
永远不成立,因此只有iter=0的时候首次验证一次,下一次验证只能等下一个Epoch。所以每个Epoch下train的信息5轮迭代输出一次,而val信息一轮之内都是一样的。
Iter: 0066 train_loss= 0.46198 train_f1_mic= 0.56420 train_f1_mac= 0.38442 val_loss= 0.47892 val_f1_mic= 0.58327 val_f1_mac= 0.42079 time= 0.07254
Iter: 0071 train_loss= 0.46786 train_f1_mic= 0.56934 train_f1_mac= 0.41866 val_loss= 0.47892 val_f1_mic= 0.58327 val_f1_mac= 0.42079 time= 0.07247
Iter: 0076 train_loss= 0.45099 train_f1_mic= 0.58254 train_f1_mac= 0.41838 val_loss= 0.47892 val_f1_mic= 0.58327 val_f1_mac= 0.42079 time= 0.07238
Iter: 0081 train_loss= 0.46226 train_f1_mic= 0.57681 train_f1_mac= 0.41835 val_loss= 0.47892 val_f1_mic= 0.58327 val_f1_mac= 0.42079 time= 0.07240
Iter: 0086 train_loss= 0.45582 train_f1_mic= 0.60693 train_f1_mac= 0.45930 val_loss= 0.47892 val_f1_mic= 0.58327 val_f1_mac= 0.42079 time= 0.07235
Epoch: 0010
Iter: 0003 train_loss= 0.45136 train_f1_mic= 0.57961 train_f1_mac= 0.42155 val_loss= 0.45077 val_f1_mic= 0.56654 val_f1_mac= 0.38990 time= 0.07236
Iter: 0008 train_loss= 0.46137 train_f1_mic= 0.59752 train_f1_mac= 0.45520 val_loss= 0.45077 val_f1_mic= 0.56654 val_f1_mac= 0.38990 time= 0.07231
Iter: 0013 train_loss= 0.45797 train_f1_mic= 0.57825 train_f1_mac= 0.42601 val_loss= 0.45077 val_f1_mic= 0.56654 val_f1_mac= 0.38990 time= 0.07225
Iter: 0018 train_loss= 0.45655 train_f1_mic= 0.58694 train_f1_mac= 0.42978 val_loss= 0.45077 val_f1_mic= 0.56654 val_f1_mac= 0.38990 time= 0.07218
Iter: 0023 train_loss= 0.46944 train_f1_mic= 0.59518 train_f1_mac= 0.44362 val_loss= 0.45077 val_f1_mic= 0.56654 val_f1_mac= 0.38990 time= 0.07215
下面看一下训练结束的部分
print("Optimization Finished!")
sess.run(val_adj_info.op)
val_cost, val_f1_mic, val_f1_mac, duration = incremental_evaluate(sess, model, minibatch, FLAGS.batch_size)
print("Full validation stats:",
"loss=", "{:.5f}".format(val_cost),
"f1_micro=", "{:.5f}".format(val_f1_mic),
"f1_macro=", "{:.5f}".format(val_f1_mac),
"time=", "{:.5f}".format(duration))
模型达到设置的Epoch=10的时候停止,最后做了一步验证集的测试,跟进incremental_evaluate
def incremental_evaluate(sess, model, minibatch_iter, size, test=False):
t_test = time.time()
finished = False
val_losses = []
val_preds = []
labels = []
iter_num = 0
finished = False
while not finished:
feed_dict_val, batch_labels, finished, _ = minibatch_iter.incremental_node_val_feed_dict(size, iter_num,
test=test) # False
node_outs_val = sess.run([model.preds, model.loss],
feed_dict=feed_dict_val)
val_preds.append(node_outs_val[0])
labels.append(batch_labels)
val_losses.append(node_outs_val[1])
iter_num += 1
val_preds = np.vstack(val_preds)
labels = np.vstack(labels)
f1_scores = calc_f1(labels, val_preds)
return np.mean(val_losses), f1_scores[0], f1_scores[1], (time.time() - t_test)
跟进incremental_node_val_feed_dict
def incremental_node_val_feed_dict(self, size, iter_num, test=False):
if test:
val_nodes = self.test_nodes
else:
val_nodes = self.val_nodes
# [0: min(512,6514)]
val_node_subset = val_nodes[iter_num * size:min((iter_num + 1) * size,
len(val_nodes))]
# add a dummy neighbor
ret_val = self.batch_feed_dict(val_node_subset)
return ret_val[0], ret_val[1], (iter_num + 1) * size >= len(val_nodes), val_node_subset # False, val_node_subset
这个也很简单,就是每个512个滑动读取一下val,批量预测,最后整合在一起求f1和平均每个批次的loss。最后把验证集和测试集的f1情况和loss情况写入本地文件
with open(log_dir() + "val_stats.txt", "w") as fp:
fp.write("loss={:.5f} f1_micro={:.5f} f1_macro={:.5f} time={:.5f}".
format(val_cost, val_f1_mic, val_f1_mac, duration))
print("Writing test set stats to file (don't peak!)")
val_cost, val_f1_mic, val_f1_mac, duration = incremental_evaluate(sess, model, minibatch, FLAGS.batch_size,
test=True)
with open(log_dir() + "test_stats.txt", "w") as fp:
fp.write("loss={:.5f} f1_micro={:.5f} f1_macro={:.5f}".
format(val_cost, val_f1_mic, val_f1_mac))
全部流程结束
代码反思
(1)数据链路分析
重新梳理一下输入数据,以及输入数据怎么在模型内部流转的,整个流转就是采样部分
+神经网络计算
部分,先看采样部分
以案例中的一批次512个节点为例,采样部分一次性将1跳邻居和2跳邻居全部采样遍历了出来,实现是存储了每个节点的邻接列表
,从中间抽取指定数据量的邻居,下一步拿到所有相关节点的特征向量,实现是采样之后拉直
,基于训练集的节点特征矩阵进行embedding_lookup,最后每一跳距离下的节点集合的特征矩阵都是一个二维的。
下面看神经网络计算部分
整理了之后其实不复杂,神经网络的学习参数只有两层的W,每层的W是权重共享的,剩下的就是一个全连接,还有一部分大头是一开始的1跳2跳采样,这一块完全可以和模型隔离,放在数据处理模块。
(2)使用这个代码预测
这个代码似乎不能在新数据上预测,原因是作者将邻接列表传入tensor,并且在全局直接初始化好
sess.run(tf.global_variables_initializer(), feed_dict={adj_info_ph: minibatch.adj})
导致在预测时传入新的邻接列表会报错,issue里面也有这个问题
因此要修改源码,只需要将采样过程搬出来即可,模型层只有纯的两层卷积和全连接,所有跳的节点的特征矩阵在进模型之前全部处理好。
(3)为什么要使用0向量padding
不知道,感觉数据集最下面的0向量没有发挥作用。