mirror of
https://github.com/MillironX/taxprofiler.git
synced 2024-11-11 00:03:09 +00:00
Merge pull request #47 from nf-core/profiling-subworkflow
Move profiling to subworkflow
This commit is contained in:
commit
dd22e042be
4 changed files with 142 additions and 102 deletions
|
@ -167,7 +167,7 @@ process {
|
|||
publishDir = [
|
||||
path: { "${params.outdir}/malt/${meta.db_name}" },
|
||||
mode: params.publish_dir_mode,
|
||||
pattern: '*.{rma6,tab,text,sam,log}'
|
||||
pattern: '*.{log}'
|
||||
]
|
||||
}
|
||||
|
||||
|
@ -177,7 +177,7 @@ process {
|
|||
publishDir = [
|
||||
path: { "${params.outdir}/kraken2/${meta.db_name}" },
|
||||
mode: params.publish_dir_mode,
|
||||
pattern: '*.{fastq.gz,txt}'
|
||||
pattern: '*.{txt}'
|
||||
]
|
||||
}
|
||||
|
||||
|
@ -190,6 +190,16 @@ process {
|
|||
ext.prefix = { "${meta.id}-${meta.run_accession}-${meta.db_name}" }
|
||||
}
|
||||
|
||||
withName: CENTRIFUGE_CENTRIFUGE {
|
||||
publishDir = [
|
||||
path: { "${params.outdir}/centrifuge/${meta.db_name}" },
|
||||
mode: params.publish_dir_mode,
|
||||
pattern: '*.txt'
|
||||
]
|
||||
ext.args = { "${meta.db_params}" }
|
||||
ext.prefix = { "${meta.id}-${meta.run_accession}-${meta.db_name}" }
|
||||
}
|
||||
|
||||
withName: CUSTOM_DUMPSOFTWAREVERSIONS {
|
||||
publishDir = [
|
||||
path: { "${params.outdir}/pipeline_info" },
|
||||
|
@ -198,14 +208,12 @@ process {
|
|||
]
|
||||
}
|
||||
|
||||
withName: CENTRIFUGE_CENTRIFUGE {
|
||||
withName: MULTIQC {
|
||||
publishDir = [
|
||||
path: { "${params.outdir}/centrifuge/${meta.db_name}" },
|
||||
path: { "${params.outdir}/${task.process.tokenize(':')[-1].tokenize('_')[0].toLowerCase()}" },
|
||||
mode: params.publish_dir_mode,
|
||||
pattern: '*.{fastq.gz,txt}'
|
||||
saveAs: { filename -> filename.equals('versions.yml') ? null : filename }
|
||||
]
|
||||
ext.args = { "${meta.db_params}" }
|
||||
ext.prefix = { "${meta.id}-${meta.run_accession}-${meta.db_name}" }
|
||||
}
|
||||
|
||||
}
|
||||
|
|
119
subworkflows/local/profiling.nf
Normal file
119
subworkflows/local/profiling.nf
Normal file
|
@ -0,0 +1,119 @@
|
|||
//
|
||||
// Run profiling
|
||||
//
|
||||
|
||||
include { MALT_RUN } from '../../modules/nf-core/modules/malt/run/main'
|
||||
include { KRAKEN2_KRAKEN2 } from '../../modules/nf-core/modules/kraken2/kraken2/main'
|
||||
include { CENTRIFUGE_CENTRIFUGE } from '../../modules/nf-core/modules/centrifuge/centrifuge/main'
|
||||
include { METAPHLAN3 } from '../../modules/nf-core/modules/metaphlan3/main'
|
||||
|
||||
workflow PROFILING {
|
||||
take:
|
||||
shortreads // [ [ meta ], [ reads ] ]
|
||||
longreads // [ [ meta ], [ reads ] ]
|
||||
databases // [ [ meta ], path ]
|
||||
|
||||
main:
|
||||
ch_versions = Channel.empty()
|
||||
ch_multiqc_files = Channel.empty()
|
||||
|
||||
/*
|
||||
COMBINE READS WITH POSSIBLE DATABASES
|
||||
*/
|
||||
|
||||
// e.g. output [DUMP: reads_plus_db] [['id':'2612', 'run_accession':'combined', 'instrument_platform':'ILLUMINA', 'single_end':1], <reads_path>/2612.merged.fastq.gz, ['tool':'malt', 'db_name':'mal95', 'db_params':'"-id 90"'], <db_path>/malt90]
|
||||
ch_input_for_profiling = shortreads
|
||||
.mix( longreads )
|
||||
.combine(databases)
|
||||
.branch {
|
||||
malt: it[2]['tool'] == 'malt'
|
||||
kraken2: it[2]['tool'] == 'kraken2'
|
||||
metaphlan3: it[2]['tool'] == 'metaphlan3'
|
||||
centrifuge: it[2]['tool'] == 'centrifuge'
|
||||
unknown: true
|
||||
}
|
||||
|
||||
/*
|
||||
PREPARE PROFILER INPUT CHANNELS
|
||||
*/
|
||||
|
||||
// Each tool as a slightly different input structure and generally separate
|
||||
// input channels for reads vs databases. We restructure the channel tuple
|
||||
// for each tool and make liberal use of multiMap to keep reads/databases
|
||||
// channel element order in sync with each other
|
||||
|
||||
// MALT: We groupTuple to have all samples in one channel for MALT as database
|
||||
// loading takes a long time, so we only want to run it once per database
|
||||
// TODO document somewhere we only accept illumina short reads for MALT?
|
||||
ch_input_for_malt = ch_input_for_profiling.malt
|
||||
.filter { it[0]['instrument_platform'] == 'ILLUMINA' }
|
||||
.map {
|
||||
it ->
|
||||
def temp_meta = [ id: it[2]['db_name']] + it[2]
|
||||
def db = it[3]
|
||||
[ temp_meta, it[1], db ]
|
||||
}
|
||||
.groupTuple(by: [0,2])
|
||||
.multiMap {
|
||||
it ->
|
||||
reads: [ it[0], it[1].flatten() ]
|
||||
db: it[2]
|
||||
}
|
||||
|
||||
// All subsequent tools can easily run on a per-sample basis
|
||||
|
||||
ch_input_for_kraken2 = ch_input_for_profiling.kraken2
|
||||
.multiMap {
|
||||
it ->
|
||||
reads: [ it[0] + it[2], it[1] ]
|
||||
db: it[3]
|
||||
}
|
||||
|
||||
ch_input_for_centrifuge = ch_input_for_profiling.centrifuge
|
||||
.multiMap {
|
||||
it ->
|
||||
reads: [ it[0] + it[2], it[1] ]
|
||||
db: it[3]
|
||||
}
|
||||
|
||||
ch_input_for_metaphlan3 = ch_input_for_profiling.metaphlan3
|
||||
.multiMap {
|
||||
it ->
|
||||
reads: [it[0] + it[2], it[1]]
|
||||
db: it[3]
|
||||
}
|
||||
|
||||
/*
|
||||
RUN PROFILING
|
||||
*/
|
||||
|
||||
if ( params.run_malt ) {
|
||||
MALT_RUN ( ch_input_for_malt.reads, params.malt_mode, ch_input_for_malt.db )
|
||||
ch_multiqc_files = ch_multiqc_files.mix( MALT_RUN.out.log.collect{it[1]}.ifEmpty([]) )
|
||||
ch_versions = ch_versions.mix( MALT_RUN.out.versions.first() )
|
||||
}
|
||||
|
||||
if ( params.run_kraken2 ) {
|
||||
KRAKEN2_KRAKEN2 ( ch_input_for_kraken2.reads, ch_input_for_kraken2.db )
|
||||
ch_multiqc_files = ch_multiqc_files.mix( KRAKEN2_KRAKEN2.out.txt.collect{it[1]}.ifEmpty([]) )
|
||||
ch_versions = ch_versions.mix( KRAKEN2_KRAKEN2.out.versions.first() )
|
||||
}
|
||||
|
||||
if ( params.run_centrifuge ) {
|
||||
CENTRIFUGE_CENTRIFUGE ( ch_input_for_centrifuge.reads, ch_input_for_centrifuge.db, params.centrifuge_save_unaligned, params.centrifuge_save_aligned, params.centrifuge_sam_format )
|
||||
ch_versions = ch_versions.mix( CENTRIFUGE_CENTRIFUGE.out.versions.first() )
|
||||
}
|
||||
|
||||
if ( params.run_metaphlan3 ) {
|
||||
METAPHLAN3 ( ch_input_for_metaphlan3.reads, ch_input_for_metaphlan3.db )
|
||||
ch_versions = ch_versions.mix( METAPHLAN3.out.versions.first() )
|
||||
}
|
||||
|
||||
|
||||
emit:
|
||||
// TODO work out if there is enough standardisation of output to export as one?
|
||||
//output = ch_filtered_reads // channel: [ val(meta), [ reads ] ]
|
||||
versions = ch_versions // channel: [ versions.yml ]
|
||||
mqc = ch_multiqc_files
|
||||
}
|
||||
|
|
@ -44,6 +44,7 @@ include { DB_CHECK } from '../subworkflows/local/db_check'
|
|||
include { SHORTREAD_PREPROCESSING } from '../subworkflows/local/shortread_preprocessing'
|
||||
include { LONGREAD_PREPROCESSING } from '../subworkflows/local/longread_preprocessing'
|
||||
include { SHORTREAD_COMPLEXITYFILTERING } from '../subworkflows/local/shortread_complexityfiltering'
|
||||
include { PROFILING } from '../subworkflows/local/profiling'
|
||||
|
||||
/*
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
@ -59,10 +60,6 @@ include { MULTIQC } from '../modules/nf-core/modules/multiqc
|
|||
include { CUSTOM_DUMPSOFTWAREVERSIONS } from '../modules/nf-core/modules/custom/dumpsoftwareversions/main'
|
||||
|
||||
include { CAT_FASTQ } from '../modules/nf-core/modules/cat/fastq/main'
|
||||
include { MALT_RUN } from '../modules/nf-core/modules/malt/run/main'
|
||||
include { KRAKEN2_KRAKEN2 } from '../modules/nf-core/modules/kraken2/kraken2/main'
|
||||
include { CENTRIFUGE_CENTRIFUGE } from '../modules/nf-core/modules/centrifuge/centrifuge/main'
|
||||
include { METAPHLAN3 } from '../modules/nf-core/modules/metaphlan3/main'
|
||||
|
||||
/*
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
@ -88,6 +85,7 @@ workflow TAXPROFILER {
|
|||
DB_CHECK (
|
||||
ch_databases
|
||||
)
|
||||
ch_versions = ch_versions.mix(DB_CHECK.out.versions)
|
||||
|
||||
/*
|
||||
MODULE: Run FastQC
|
||||
|
@ -104,6 +102,7 @@ workflow TAXPROFILER {
|
|||
SUBWORKFLOW: PERFORM PREPROCESSING
|
||||
*/
|
||||
if ( params.shortread_clipmerge ) {
|
||||
|
||||
ch_shortreads_preprocessed = SHORTREAD_PREPROCESSING ( INPUT_CHECK.out.fastq ).reads
|
||||
} else {
|
||||
ch_shortreads_preprocessed = INPUT_CHECK.out.fastq
|
||||
|
@ -127,88 +126,11 @@ workflow TAXPROFILER {
|
|||
}
|
||||
|
||||
/*
|
||||
COMBINE READS WITH POSSIBLE DATABASES
|
||||
SUBWORKFLOW: PROFILING
|
||||
*/
|
||||
|
||||
// e.g. output [DUMP: reads_plus_db] [['id':'2612', 'run_accession':'combined', 'instrument_platform':'ILLUMINA', 'single_end':1], <reads_path>/2612.merged.fastq.gz, ['tool':'malt', 'db_name':'mal95', 'db_params':'"-id 90"'], <db_path>/malt90]
|
||||
ch_input_for_profiling = ch_shortreads_filtered
|
||||
.mix( ch_longreads_preprocessed )
|
||||
.combine(DB_CHECK.out.dbs)
|
||||
.branch {
|
||||
malt: it[2]['tool'] == 'malt'
|
||||
kraken2: it[2]['tool'] == 'kraken2'
|
||||
metaphlan3: it[2]['tool'] == 'metaphlan3'
|
||||
centrifuge: it[2]['tool'] == 'centrifuge'
|
||||
unknown: true
|
||||
}
|
||||
|
||||
/*
|
||||
PREPARE PROFILER INPUT CHANNELS
|
||||
*/
|
||||
|
||||
// We groupTuple to have all samples in one channel for MALT as database
|
||||
// loading takes a long time, so we only want to run it once per database
|
||||
// TODO document somewhere we only accept illumina short reads for MALT?
|
||||
ch_input_for_malt = ch_input_for_profiling.malt
|
||||
.filter { it[0]['instrument_platform'] == 'ILLUMINA' }
|
||||
.map {
|
||||
it ->
|
||||
def temp_meta = [ id: it[2]['db_name']] + it[2]
|
||||
def db = it[3]
|
||||
[ temp_meta, it[1], db ]
|
||||
}
|
||||
.groupTuple(by: [0,2])
|
||||
.multiMap {
|
||||
it ->
|
||||
reads: [ it[0], it[1].flatten() ]
|
||||
db: it[2]
|
||||
}
|
||||
|
||||
// We can run Kraken2 one-by-one sample-wise
|
||||
ch_input_for_kraken2 = ch_input_for_profiling.kraken2
|
||||
.multiMap {
|
||||
it ->
|
||||
reads: [ it[0] + it[2], it[1] ]
|
||||
db: it[3]
|
||||
}
|
||||
|
||||
// We can run centrifuge one-by-one sample-wise
|
||||
ch_input_for_centrifuge = ch_input_for_profiling.centrifuge
|
||||
.dump(tag: "input for centrifuge")
|
||||
.multiMap {
|
||||
it ->
|
||||
reads: [ it[0] + it[2], it[1] ]
|
||||
db: it[3]
|
||||
}
|
||||
|
||||
//
|
||||
// RUN PROFILING
|
||||
//
|
||||
ch_input_for_metaphlan3 = ch_input_for_profiling.metaphlan3
|
||||
.multiMap {
|
||||
it ->
|
||||
reads: [it[0] + it[2], it[1]]
|
||||
db: it[3]
|
||||
}
|
||||
|
||||
/*
|
||||
MODULE: RUN PROFILING
|
||||
*/
|
||||
if ( params.run_malt ) {
|
||||
MALT_RUN ( ch_input_for_malt.reads, params.malt_mode, ch_input_for_malt.db )
|
||||
}
|
||||
|
||||
if ( params.run_kraken2 ) {
|
||||
KRAKEN2_KRAKEN2 ( ch_input_for_kraken2.reads, ch_input_for_kraken2.db )
|
||||
}
|
||||
|
||||
if ( params.run_centrifuge ) {
|
||||
CENTRIFUGE_CENTRIFUGE ( ch_input_for_centrifuge.reads, ch_input_for_centrifuge.db, params.centrifuge_save_unaligned, params.centrifuge_save_aligned, params.centrifuge_sam_format )
|
||||
}
|
||||
|
||||
if ( params.run_metaphlan3 ) {
|
||||
METAPHLAN3 ( ch_input_for_metaphlan3.reads, ch_input_for_metaphlan3.db )
|
||||
}
|
||||
PROFILING ( ch_shortreads_filtered, ch_longreads_preprocessed, DB_CHECK.out.dbs )
|
||||
ch_versions = ch_versions.mix( PROFILING.out.versions )
|
||||
|
||||
/*
|
||||
MODULE: MultiQC
|
||||
|
@ -244,17 +166,8 @@ workflow TAXPROFILER {
|
|||
ch_versions = ch_versions.mix( SHORTREAD_COMPLEXITYFILTERING.out.versions )
|
||||
}
|
||||
|
||||
if (params.run_kraken2) {
|
||||
ch_multiqc_files = ch_multiqc_files.mix( KRAKEN2_KRAKEN2.out.txt.collect{it[1]}.ifEmpty([]) )
|
||||
ch_versions = ch_versions.mix( KRAKEN2_KRAKEN2.out.versions.first() )
|
||||
}
|
||||
ch_multiqc_files = ch_multiqc_files.mix( PROFILING.out.mqc )
|
||||
|
||||
if (params.run_malt) {
|
||||
ch_multiqc_files = ch_multiqc_files.mix( MALT_RUN.out.log.collect{it[1]}.ifEmpty([]) )
|
||||
ch_versions = ch_versions.mix( MALT_RUN.out.versions.first() )
|
||||
}
|
||||
|
||||
// TODO Versions for Karken/MALT not report?
|
||||
// TODO create multiQC module for metaphlan
|
||||
MULTIQC (
|
||||
ch_multiqc_files.collect()
|
||||
|
|
Loading…
Reference in a new issue