-
学习资料来源:
- anndata主页:https://anndata-tutorials.readthedocs.io/en/latest/index.html
- 官网:https://anndata-tutorials.readthedocs.io/en/latest/annloader.html【注意教程有两个版本,这里是latest版本的学习笔记】
本教程展示了如何使用AnnLoader来利用anndata构建pytorch模型,进行单细胞数据中批次的矫正。
- 检查用pytorch构建接口的很好的替代方法,例如scvi-tools
- 在这里,我们使用Pyro框架来简化变异自动解码器(a Variational Autoencoder)的代码
那此教程可能需要简单理解一下什么是pytorch:https://blog.csdn.net/ljx1400052550/article/details/110005062
简而言之:pytorch是一个深度学习框架,可以使用pytorch搭建自己的模型
首先,加载各种包
其中pyro这个包的安装可参考:http://pyro.ai/
import gdown
import torch
import torch.nn as nn
import pyro
import pyro.distributions as dist
import numpy as np
import scanpy as sc
from sklearn.preprocessing import OneHotEncoder, LabelEncoder
from anndata.experimental.pytorch import AnnLoader
import matplotlib.pyplot as plt
VAE 模型定义
这里我们定义了一个辅助多层感知器类,将它与下面的一个VAE一起使用
Note:这段代码在复制的时候最好放在vscode中,python有严格的代码缩进要求,不然可能会报错。
class MLP(nn.Module):
def __init__(self, input_dim, hidden_dims, out_dim):
super().__init__()
modules = []
for in_size, out_size in zip([input_dim]+hidden_dims, hidden_dims):
modules.append(nn.Linear(in_size, out_size))
modules.append(nn.LayerNorm(out_size))
modules.append(nn.ReLU())
modules.append(nn.Dropout(p=0.05))
modules.append(nn.Linear(hidden_dims[-1], out_dim))
self.fc = nn.Sequential(*modules)
def forward(self, *inputs):
input_cat = torch.cat(inputs, dim=-1)
return self.fc(input_cat)
下面显示了VAE模型的图形表示。它使用未观察到的潜变量Z和观察到的批次标签batch重构输入数据X和分类标签class。注意,该模型将Class视为来自给定Z的X的自变量。
在这里我们定义了我们的模型,请参阅Pyro VAE教程了解更多细节。
class CVAE(nn.Module):
# The code is based on the scarches trVAE model
# https://github.com/theislab/scarches/blob/v0.3.5/scarches/models/trvae/trvae.py
# and on the pyro.ai Variational Autoencoders tutorial
# http://pyro.ai/examples/vae.html
def __init__(self, input_dim, n_conds, n_classes, hidden_dims, latent_dim):
super().__init__()
self.encoder = MLP(input_dim+n_conds, hidden_dims, 2*latent_dim) # output - mean and logvar of z
self.decoder = MLP(latent_dim+n_conds, hidden_dims[::-1], input_dim)
self.theta = nn.Linear(n_conds, input_dim, bias=False)
self.classifier = nn.Linear(latent_dim, n_classes)
self.latent_dim = latent_dim
def model(self, x, batches, classes, size_factors):
pyro.module("cvae", self)
batch_size = x.shape[0]
with pyro.plate("data", batch_size):
z_loc = x.new_zeros((batch_size, self.latent_dim))
z_scale = x.new_ones((batch_size, self.latent_dim))
z = pyro.sample("latent", dist.Normal(z_loc, z_scale).to_event(1))
classes_probs = self.classifier(z).softmax(dim=-1)
pyro.sample("class", dist.Categorical(probs=classes_probs), obs=classes)
dec_mu = self.decoder(z, batches).softmax(dim=-1) * size_factors[:, None]
dec_theta = torch.exp(self.theta(batches))
logits = (dec_mu + 1e-6).log() - (dec_theta + 1e-6).log()
pyro.sample("obs", dist.NegativeBinomial(total_count=dec_theta, logits=logits).to_event(1), obs=x.int())
def guide(self, x, batches, classes, size_factors):
batch_size = x.shape[0]
with pyro.plate("data", batch_size):
z_loc_scale = self.encoder(x, batches)
z_mu = z_loc_scale[:, :self.latent_dim]
z_var = torch.sqrt(torch.exp(z_loc_scale[:, self.latent_dim:]) + 1e-4)
pyro.sample("latent", dist.Normal(z_mu, z_var).to_event(1))
AnnLoader初始化
首先,下载数据
链接需要访问谷歌,我自己单独使用魔法进行了下载,到本地。
数据包括15681个细胞,1000个基因
url = 'https://drive.google.com/uc?id=1ehxgfHTsMZXy6YzlFKGJOsBKQ5rrvMnd'
dir = '/Pub/Users/zhangjuan/project/scanpy/pytorch/'
output = dir + 'pancreas.h5ad'
gdown.download(url, output, quiet=False)
# 读取数据
adata = sc.read(dir + 'pancreas_normalized.h5ad')
adata
# AnnData object with n_obs × n_vars = 15681 × 1000
# obs: 'batch', 'study', 'cell_type', 'size_factors'
通过Scanpy流程使用UMAP可视化数据。
sc.pp.neighbors(adata)
sc.tl.umap(adata)
sc.pl.umap(adata, color=['study', 'cell_type'], wspace=0.35)
plt.savefig(dir + "01-UMAP.png")
结果图如下:我们可以看到数据具有很强的批处理效应,来自不同的数据集。我们想用我们的VAE模型来整合这些批次
对于我们的模型,我们需要每个细胞的大小因子(库大小)作为负二项式重构loss的均值。
# put raw counts to .X
adata.X = adata.raw.X
adata.obs['size_factors'] = adata.X.sum(1)
在这里,我们为AnnData对象中的标签设置编码器。当在训练阶段从数据加载器访问标签时,AnnLoader将使用这些编码器动态地转换标签。
adata.obs['study'].cat.categories
encoder_study = OneHotEncoder(sparse=False, dtype=np.float32)
encoder_study.fit(adata.obs['study'].to_numpy()[:, None])
adata.obs['cell_type'].cat.categories
encoder_celltype = LabelEncoder()
encoder_celltype.fit(adata.obs['cell_type'])
use_cuda = torch.cuda.is_available()
可以使用函数或函数的映射创建转换器,这些函数将应用于属性的值(.obs, .obsm, .layers, . x)或子集对象中这些属性的特定键。指定一个属性和一个键(如果需要)作为传递的Mapping的键,并指定一个函数作为值应用。
这里我们定义了一个转换器,它将使用上面创建的编码器转换.obs的'study'和'cell_type'键的值.
encoders = {
'obs': {
'study': lambda s: encoder_study.transform(s.to_numpy()[:, None]),
'cell_type': encoder_celltype.transform
}
}
这里我们创建了一个AnnLoader对象,它将返回一个为我们的AnnData对象正确设置的PyTorch数据加载器.
use_cuda参数表示我们想要惰性转换AnnData对象中的所有数值。所谓惰性转换,是指在访问数据之前,数据不会被转换。AnnLoader对象从提供的AnnData对象创建了一个包装器对象,它负责子集设置和转换。这里不会复制完整的AnnData对象。
传递到转换的编码器在发送任何内容到cuda之前被应用。
这就是上面提到的包装器对象。数据加载器遍历该对象:
注意:如果use_cuda=True,那么所有数值都将转换为tensors并发送给cuda,因此在训练阶段不需要做任何转换。
dataloader = AnnLoader(adata, batch_size=128, shuffle=True, convert=encoders, use_cuda=use_cuda)
dataloader.dataset
# AnnCollection object with n_obs × n_vars = 15681 × 1000
# constructed from 1 AnnData objects
# view of obsm: 'X_pca', 'X_umap'
# obs: 'batch', 'study', 'cell_type', 'size_factors'
dataloader.dataset[:10]
# AnnCollectionView object with n_obs × n_vars = 10 × 1000
# obsm: 'X_pca', 'X_umap'
# obs: 'batch', 'study', 'cell_type', 'size_factors'
还可以使用自定义sampler,而不是AnnLoader中的。只需传递sampler和batch_size即可。
from torch.utils.data import WeightedRandomSampler
weights = np.ones(adata.n_obs)
weights[adata.obs['cell_type'] == 'Pancreas Stellate'] = 2.
sampler = WeightedRandomSampler(weights, adata.n_obs)
dataloader = AnnLoader(adata, batch_size=128, sampler=sampler, convert=encoders, use_cuda=use_cuda)
我们不使用自定义采样器来训练模型,所以这里返回到默认采样器:
dataloader = AnnLoader(adata, batch_size=128, shuffle=True, convert=encoders, use_cuda=use_cuda)
初始化和训练模型
在这里,我们初始化模型和Pyro例程以进行训练。
n_conds = len(adata.obs['study'].cat.categories)
n_classes = len(adata.obs['cell_type'].cat.categories)
cvae = CVAE(adata.n_vars, n_conds=n_conds, n_classes=n_classes, hidden_dims=[128, 128], latent_dim=10)
if use_cuda:
cvae.cuda()
optimizer = pyro.optim.Adam({"lr": 1e-3})
svi = pyro.infer.SVI(cvae.model, cvae.guide, optimizer, loss=pyro.infer.TraceMeanField_ELBO())
注意,现在可以简单地从数据加载器中获取batch,选择所需的属性,在需要时对其进行处理并传递给loss函数。所有内容都已由预定义的转换器转换。你不需要复制你的AnnData对象,你不需要一个自定义的数据加载器字典所需的键,所有的观察键已经在bacthes。
def train(svi, train_loader):
epoch_loss = 0.
for batch in train_loader:
epoch_loss += svi.step(batch.X, batch.obs['study'], batch.obs['cell_type'], batch.obs['size_factors'])
normalizer_train = len(train_loader.dataset)
total_epoch_loss_train = epoch_loss / normalizer_train
return total_epoch_loss_train
NUM_EPOCHS = 210
for epoch in range(NUM_EPOCHS):
total_epoch_loss_train = train(svi, dataloader)
if epoch % 40 == 0 or epoch == NUM_EPOCHS-1:
print("[epoch %03d] average training loss: %.4f" % (epoch, total_epoch_loss_train))
这里运行时间比较久,运行日志:
[epoch 000] average training loss: 1236.4254
[epoch 040] average training loss: 697.7242
[epoch 080] average training loss: 687.5900
[epoch 120] average training loss: 684.3700
[epoch 160] average training loss: 682.2220
[epoch 200] average training loss: 681.1151
[epoch 209] average training loss: 681.0516
检查结果
# No copies yet, nothing is copied until you access specific attributes (.X, .obsm etc.).
full_data = dataloader.dataset[:]
# get mean values of the latent variables
means = cvae.encoder(full_data.X, full_data.obs['study'])[:, :10]
adata.obsm['X_cvae'] = means.data.cpu().numpy()
sc.pp.neighbors(adata, use_rep='X_cvae')
sc.tl.umap(adata)
sc.pl.umap(adata, color=['study', 'cell_type'], wspace=0.35)
plt.savefig(dir + "02-UMAP_integration.png")
使用VAE 模型整合后的结果:整合后,可以看到下面右图中相同细胞类型都聚在了一起。
准确性:
accuracy = (cvae.classifier(means).argmax(dim=-1)==full_data.obs['cell_type']).sum().item()/adata.n_obs
accuracy
# 0.9195204387475289
下次见~