流程化!个人感觉是生信最头疼的问题之一,当你东平西凑整出一台符合自己分析目的的流程时,会发现在处理过程中,仍需要人工对接每一步的输入输出,特别是对于流程复杂,而又经常使用的情况,这是一笔不小的人工消耗;既然如此,那就将这些流程统统写进代码里,下次使用时,直接调用!流程化的软件很多,我调研过的就三种:
- Shell脚本:最简单的流程化方式,在脚本内你可以指定待输入的参数,但最大的问题就是无法缓存中间文件,这意味着如果那一天你的脚本报错,你只能自己去排错,然后又重新跑。
- snakemake:这应该是网上教程最多的流程化软件,基于Python框架,对于初学者和python爱好者会比较易懂,相比于单纯shell脚本,它能做到的是缓存中间的文件,在修改代码之后,就不必从头开始执行。但在使用的过程中,个人感觉使用有点“生硬”,即从后向前结构很别扭,
- nextflow:基于Groovy语言编写(类似于python),与snakemake不同的是,其向前向后的结构是通过channel来相互连接,此外拥有很好的社区支持(google group,gitter),原生对云环境(AWS,google cloud)支持,最重要的是,其被越来多的人采用,从而衍生大量优秀成熟流程nf-core。作为后起之秀,nextflow可能会是生信流程软件的大头。
关于nextflow的教程也是比较多的,不过大部分都是英文:
官网Doc:AGet started — Nextflow 21.10.6 documentation
nf-core:nextflow-io/awesome-nextflow: A curated list of nextflow based pipelines (github.com)
教程1:Creating a NextFlow workflow - Bioinformatics Workbook
教程2:基于Nextflow的宏基因组有参分析-I 安装Nextflow | Chenhao's Personal Page
教程3:Nextflow training (seqera.io) Youtobe:Nextflow Training Workshop - July 2020 - Day 3 - YouTube
综合性:Nextflow tutorials » nf-core
通过几天的摸索,对nextflow的总结如下:你需要设置参数,并将文件以参数或Channel的形式输入到Process中,在不同的Process中以Channel相互连接,进行文件的输入输出。对于我来说,刚开始学习的难点就是,文件的输入以及结果的输出。
基本架构:
以下仅以main.nf作为例子说明:完整代码见RNA-Seq pipeline | Nextflow
定义参数:
- 指定使用的nextflow的版本,这里是用户bin/env下
- .nf脚本的注释是以 /* 开头和以 */ 结尾
- 参数的定义:params.XXX= "默认值”,实际使用过程中可以使用--XXX 进行重新定义
#!/usr/bin/env nextflow
/*
* Defines some parameters in order to specify the refence genomes
* and read pairs by using the command line options
*/
params.reads = "$baseDir/data/ggal/*_{1,2}.fq" #这里是指输入该目录下所有*_1.fq和*_2.fq
params.annot = "$baseDir/data/ggal/ggal_1_48850000_49020000.bed.gff"
params.genome= "$baseDir/data/ggal/ggal_1_48850000_49020000.Ggal71.500bpflank.fa"
params.outdir = 'results'
定义通道:
定义通道的命令(factory)有很多,常见的有from、fromPath、fromFilePairs;后两个常被用于文件的输出通道使用(默认识别的Type:file,可修改至dir),返回的是List,list内含不同的文件,不同的是fromFilePairs返回的还有配对文件的common identified ID,因此常与 “_{1,2}.fq”的params搭配。
Channel.fromFilePairs( params.reads, checkIfExists:true ) ##checkIfExists检查文件是否配对
.ifEmpty { error "Cannot find any reads matching: {params.reads}" }
.set { read_pairs_ch }
##以上命令等价于:
Channel.fromFilePairs( params.reads ).set { read_pairs_ch } #略去ifEmpty
#若使用read_pairs_ch.view()则 :
[liver, [/user/nf-training/data/ggal/liver_1.fq, /user/nf-training/data/ggal/liver_2.fq]]
[gut, [/user/nf-training/data/ggal/gut_1.fq, /user/nf-training/data/ggal/gut_2.fq]]
[lung, [/user/nf-training/data/ggal/lung_1.fq, /user/nf-training/data/ggal/lung_2.fq]]
#作为对比这里使用fromPair;
Channel. fromFile(params.reads) .set(read_ch) reac_ch.view()
[/user/nf-training/data/ggal/liver_1.fq, /user/nf-training/data/ggal/liver_2.fq]
[/user/nf-training/data/ggal/gut_1.fq, /user/nf-training/data/ggal/gut_2.fq]
[/user/nf-training/data/ggal/lung_1.fq, /user/nf-training/data/ggal/lung_2.fq]
##与fromFilePairs相比缺少了Sample_ID
##由于同一个queue channel下的内容并不是可以无限使用,实际上其内容是在process即用即少;
#因此,可以一次性定义多个相同的内容的channel,例如:
Channel.fromFilePairs( params.reads ).into { read1_pairs_ch,read2_pairs_ch }
需要说明的是,nextflow的通道只有两种类型:queue channels and value channels.
- A value channel (a.k.a. singleton channel) by definition is bound to a single value and it can be read unlimited times without consuming its contents.
- A queue channel is a non-blocking unidirectional FIFO queue which connects two processes or operators. The definition implies that the same queue channel cannot be used more than one time as process output and more than one time as process input.
除此之外:fromFilePairs似乎无法在Process中重新Create Channel。
Process执行:
/*
* Step 2\. Maps each read-pair by using Tophat2 mapper tool
*/
process mapping {
tag "$pair_id" #这里实时反馈进程,即处理到那个文件
input:
path genome from params.genome
path annot from params.annot
path index from index_ch
tuple val(pair_id), path(reads) from read_pairs_ch
output:
set pair_id, "accepted_hits.bam" into bam_ch
"""
tophat2 -p ${task.cpus} --GTF $annot genome.index $reads
mv tophat_out/accepted_hits.bam .
"""
}
/*
* Step 3\. Assembles the transcript by using the "cufflinks" tool
*/
process makeTranscript {
tag "$pair_id"
publishDir params.outdir, mode: 'copy'
input:
path annot from params.annot
tuple val(pair_id), path(bam_file) from bam_ch
output:
tuple val(pair_id), path('transcript_*.gtf')
"""
cufflinks --no-update-check -q -p $task.cpus -G $annot $bam_file
mv transcripts.gtf transcript_${pair_id}.gtf
"""
}
几点说明:
- 为了防止书写的混乱,输出的格式统一为类似:file("$XXX") into XX_ch,并使用双引号(单引号无法识别通配符)。
- Process内的结果无法用fromFilePairs导入到通道内(至少现在我还未发现)
- publishDir params.outdir, mode: 'copy' 这里意思是将该process的output内的所有文件都输出到指定目录下(params.outdir),模式是copy,默认是link。
#补充1: tuple和set的语义相同,例如以上的几个output:
set pair_id, "accepted_hits.bam" into bam_ch
tuple val(pair_id), path('transcript_*.gtf') into final_ch
其结果都是产生List表,但写法上有所不同,tuple需要指明类型:val、path等
#补充2: collectFile(),是处理Channel的operators,旨在将同一通道内的item整合为一个
#新的item (需要命名),并产生新的Channel。例如在b porcess中整合并引用a_ch内的所有item
porcess a{
...
output:
set "file1","file2","file3" into a_ch
....
}
process b{
input:
file FNA from a_ch.collectFile(name:'a_fna')
... } #仅作说明,大部分省略
#补充3: process内的script需要注意上下的channel的对接,
执行nf并输出结果:
nextflow -bg run main.nf -with-report -with-trace -with-timeline -with-dag dag.png
#这里默认输入文件在当前目录下,如重新指定,则--reads xxx 等
#运行之后在当前目录下出现result文件夹(params.outdir)和work文件夹,
其中work文件起到的作用:
1:相当于运行缓存,在你再次执行时(-resume)时,就不需要从头执行
2:报错debug的作用,根据报错信息,查看隐藏的运行脚本(.command.sh)