diff --git a/subworkflows/local/profiling.nf b/subworkflows/local/profiling.nf index 9389e19..7fb3ce9 100644 --- a/subworkflows/local/profiling.nf +++ b/subworkflows/local/profiling.nf @@ -48,7 +48,7 @@ workflow PROFILING { } /* - PREPARE PROFILER INPUT CHANNELS + PREPARE PROFILER INPUT CHANNELS & RUN PROFILING */ // Each tool as a slightly different input structure and generally separate @@ -56,74 +56,27 @@ workflow PROFILING { // for each tool and make liberal use of multiMap to keep reads/databases // channel element order in sync with each other - // MALT: We groupTuple to have all samples in one channel for MALT as database - // loading takes a long time, so we only want to run it once per database - // TODO document somewhere we only accept illumina short reads for MALT? - ch_input_for_malt = ch_input_for_profiling.malt - .filter { it[0]['instrument_platform'] == 'ILLUMINA' } - .map { - it -> - def temp_meta = [ id: it[2]['db_name']] + it[2] - def db = it[3] - [ temp_meta, it[1], db ] - } - .groupTuple(by: [0,2]) - .multiMap { - it -> - reads: [ it[0], it[1].flatten() ] - db: it[2] - } - - // All subsequent tools can easily run on a per-sample basis - - ch_input_for_kraken2 = ch_input_for_profiling.kraken2 - .multiMap { - it -> - reads: [ it[0] + it[2], it[1] ] - db: it[3] - } - - ch_input_for_centrifuge = ch_input_for_profiling.centrifuge - .filter{ - if (it[0].is_fasta) log.warn "[nf-core/taxprofiler] Centrifuge currently does not accept FASTA files as input. Skipping Centrifuge for sample ${it[0].id}." - !it[0].is_fasta - } - .multiMap { - it -> - reads: [ it[0] + it[2], it[1] ] - db: it[3] - } - - ch_input_for_metaphlan3 = ch_input_for_profiling.metaphlan3 - .filter{ - if (it[0].is_fasta) log.warn "[nf-core/taxprofiler] MetaPhlAn3 currently does not accept FASTA files as input. Skipping MetaPhlAn3 for sample ${it[0].id}." - !it[0].is_fasta - } - .multiMap { - it -> - reads: [it[0] + it[2], it[1]] - db: it[3] - } - - ch_input_for_kaiju = ch_input_for_profiling.kaiju - .multiMap { - it -> - reads: [it[0] + it[2], it[1]] - db: it[3] - } - - ch_input_for_diamond = ch_input_for_profiling.diamond - .multiMap { - it -> - reads: [it[0] + it[2], it[1]] - db: it[3] - } - - /* - RUN PROFILING - */ - if ( params.run_malt ) { + + + // MALT: We groupTuple to have all samples in one channel for MALT as database + // loading takes a long time, so we only want to run it once per database + // TODO document somewhere we only accept illumina short reads for MALT? + ch_input_for_malt = ch_input_for_profiling.malt + .filter { it[0]['instrument_platform'] == 'ILLUMINA' } + .map { + it -> + def temp_meta = [ id: it[2]['db_name']] + it[2] + def db = it[3] + [ temp_meta, it[1], db ] + } + .groupTuple(by: [0,2]) + .multiMap { + it -> + reads: [ it[0], it[1].flatten() ] + db: it[2] + } + MALT_RUN ( ch_input_for_malt.reads, params.malt_mode, ch_input_for_malt.db ) ch_maltrun_for_megan = MALT_RUN.out.rma6 @@ -143,40 +96,94 @@ workflow PROFILING { ch_multiqc_files = ch_multiqc_files.mix( MALT_RUN.out.log.collect{it[1]}.ifEmpty([]) ) ch_versions = ch_versions.mix( MALT_RUN.out.versions.first(), MEGAN_RMA2INFO.out.versions.first() ) ch_raw_profiles = ch_raw_profiles.mix( MEGAN_RMA2INFO.out.txt ) + } if ( params.run_kraken2 ) { + + ch_input_for_kraken2 = ch_input_for_profiling.kraken2 + .multiMap { + it -> + reads: [ it[0] + it[2], it[1] ] + db: it[3] + } + KRAKEN2_KRAKEN2 ( ch_input_for_kraken2.reads, ch_input_for_kraken2.db ) ch_multiqc_files = ch_multiqc_files.mix( KRAKEN2_KRAKEN2.out.txt.collect{it[1]}.ifEmpty([]) ) ch_versions = ch_versions.mix( KRAKEN2_KRAKEN2.out.versions.first() ) ch_raw_profiles = ch_raw_profiles.mix( KRAKEN2_KRAKEN2.out.txt ) + } if ( params.run_centrifuge ) { + + ch_input_for_centrifuge = ch_input_for_profiling.centrifuge + .filter{ + if (it[0].is_fasta) log.warn "[nf-core/taxprofiler] Centrifuge currently does not accept FASTA files as input. Skipping Centrifuge for sample ${it[0].id}." + !it[0].is_fasta + } + .multiMap { + it -> + reads: [ it[0] + it[2], it[1] ] + db: it[3] + } + CENTRIFUGE_CENTRIFUGE ( ch_input_for_centrifuge.reads, ch_input_for_centrifuge.db, params.centrifuge_save_unaligned, params.centrifuge_save_aligned, params.centrifuge_sam_format ) CENTRIFUGE_KREPORT (CENTRIFUGE_CENTRIFUGE.out.results, ch_input_for_centrifuge.db) ch_versions = ch_versions.mix( CENTRIFUGE_CENTRIFUGE.out.versions.first() ) ch_raw_profiles = ch_raw_profiles.mix( CENTRIFUGE_KREPORT.out.kreport ) + } if ( params.run_metaphlan3 ) { + + ch_input_for_metaphlan3 = ch_input_for_profiling.metaphlan3 + .filter{ + if (it[0].is_fasta) log.warn "[nf-core/taxprofiler] MetaPhlAn3 currently does not accept FASTA files as input. Skipping MetaPhlAn3 for sample ${it[0].id}." + !it[0].is_fasta + } + .multiMap { + it -> + reads: [it[0] + it[2], it[1]] + db: it[3] + } + METAPHLAN3 ( ch_input_for_metaphlan3.reads, ch_input_for_metaphlan3.db ) ch_versions = ch_versions.mix( METAPHLAN3.out.versions.first() ) ch_raw_profiles = ch_raw_profiles.mix( METAPHLAN3.out.biom ) + } if ( params.run_kaiju ) { + + ch_input_for_kaiju = ch_input_for_profiling.kaiju + .multiMap { + it -> + reads: [it[0] + it[2], it[1]] + db: it[3] + } + KAIJU_KAIJU ( ch_input_for_kaiju.reads, ch_input_for_kaiju.db) KAIJU_KAIJU2TABLE (KAIJU_KAIJU.out.results, ch_input_for_kaiju.db, params.kaiju_taxon_name) ch_multiqc_files = ch_multiqc_files.mix( KAIJU_KAIJU2TABLE.out.summary.collect{it[1]}.ifEmpty([]) ) ch_versions = ch_versions.mix( KAIJU_KAIJU.out.versions.first() ) ch_raw_profiles = ch_raw_profiles.mix( KAIJU_KAIJU2TABLE.out.summary ) + } if ( params.run_diamond ) { + + ch_input_for_diamond = ch_input_for_profiling.diamond + .multiMap { + it -> + reads: [it[0] + it[2], it[1]] + db: it[3] + } + DIAMOND_BLASTX ( ch_input_for_diamond.reads, ch_input_for_diamond.db, params.diamond_output_format ) ch_versions = ch_versions.mix( DIAMOND_BLASTX.out.versions.first() ) ch_raw_profiles = ch_raw_profiles.mix( DIAMOND_BLASTX.out.output ) + } emit: