2022-04-10 04:43:30 +00:00
//
// Run profiling
//
2022-06-14 13:48:01 +00:00
include { MALT_RUN } from '../../modules/nf-core/modules/malt/run/main'
include { MEGAN_RMA2INFO } from '../../modules/nf-core/modules/megan/rma2info/main'
include { KRAKEN2_KRAKEN2 } from '../../modules/nf-core/modules/kraken2/kraken2/main'
include { KRAKENTOOLS_KREPORT2KRONA } from '../../modules/nf-core/modules/krakentools/kreport2krona/main'
include { KRONA_CLEANUP as KRONA_KRAKENCLEANUP } from '../../modules/local/krona_cleanup'
include { KRONA_KTIMPORTTEXT as KRONA_IMPORTKRAKEN } from '../../modules/nf-core/modules/krona/ktimporttext/main'
include { CENTRIFUGE_CENTRIFUGE } from '../../modules/nf-core/modules/centrifuge/centrifuge/main'
include { CENTRIFUGE_KREPORT } from '../../modules/nf-core/modules/centrifuge/kreport/main'
include { METAPHLAN3 } from '../../modules/nf-core/modules/metaphlan3/main'
include { KAIJU_KAIJU } from '../../modules/nf-core/modules/kaiju/kaiju/main'
include { KAIJU_KAIJU2TABLE } from '../../modules/nf-core/modules/kaiju/kaiju2table/main'
include { DIAMOND_BLASTX } from '../../modules/nf-core/modules/diamond/blastx/main'
2022-04-29 19:59:42 +00:00
2022-04-10 04:43:30 +00:00
workflow PROFILING {
take:
2022-04-12 08:12:17 +00:00
reads // [ [ meta ], [ reads ] ]
2022-04-10 04:43:30 +00:00
databases // [ [ meta ], path ]
main:
2022-04-13 09:49:35 +00:00
ch_versions = Channel.empty()
ch_multiqc_files = Channel.empty()
ch_raw_profiles = Channel.empty()
2022-06-14 13:48:01 +00:00
ch_visualizations = Channel.empty()
2022-04-10 04:43:30 +00:00
/*
COMBINE READS WITH POSSIBLE DATABASES
*/
// e.g. output [DUMP: reads_plus_db] [['id':'2612', 'run_accession':'combined', 'instrument_platform':'ILLUMINA', 'single_end':1], <reads_path>/2612.merged.fastq.gz, ['tool':'malt', 'db_name':'mal95', 'db_params':'"-id 90"'], <db_path>/malt90]
2022-04-12 08:12:17 +00:00
ch_input_for_profiling = reads
2022-04-12 08:47:44 +00:00
.map {
2022-04-12 08:46:03 +00:00
meta, reads ->
def meta_new = meta.clone()
pairtype = meta_new['single_end'] ? '_se' : '_pe'
meta_new['id'] = meta_new['id'] + pairtype
[meta_new, reads]
}
2022-04-10 04:43:30 +00:00
.combine(databases)
.branch {
malt: it[2]['tool'] == 'malt'
kraken2: it[2]['tool'] == 'kraken2'
metaphlan3: it[2]['tool'] == 'metaphlan3'
centrifuge: it[2]['tool'] == 'centrifuge'
2022-04-13 16:51:56 +00:00
kaiju: it[2]['tool'] == 'kaiju'
2022-04-29 19:59:42 +00:00
diamond: it[2]['tool'] == 'diamond'
2022-04-10 04:43:30 +00:00
unknown: true
}
/*
2022-05-01 05:24:58 +00:00
PREPARE PROFILER INPUT CHANNELS & RUN PROFILING
2022-04-10 04:43:30 +00:00
*/
// Each tool as a slightly different input structure and generally separate
// input channels for reads vs databases. We restructure the channel tuple
// for each tool and make liberal use of multiMap to keep reads/databases
// channel element order in sync with each other
2022-05-01 05:24:58 +00:00
if ( params.run_malt ) {
2022-04-10 04:43:30 +00:00
2022-04-13 16:51:56 +00:00
2022-05-01 05:24:58 +00:00
// MALT: We groupTuple to have all samples in one channel for MALT as database
// loading takes a long time, so we only want to run it once per database
ch_input_for_malt = ch_input_for_profiling.malt
.filter { it[0]['instrument_platform'] == 'ILLUMINA' }
.map {
2022-05-07 03:22:35 +00:00
meta, reads, db_meta, db ->
2022-06-10 17:33:37 +00:00
// Reset entire input meta for MALT to just database name,
// as we don't run run on a per-sample basis due to huge datbaases
// so all samples are in one run and so sample-specific metadata
// unnecessary. Set as database name to prevent `null` job ID and prefix.
def temp_meta = [ id: meta['db_name'] ]
// Extend database parameters to specify whether to save alignments or not
2022-05-07 03:22:35 +00:00
def new_db_meta = db_meta.clone()
2022-06-10 09:27:51 +00:00
def sam_format = params.malt_save_reads ? ' --alignments ./ -za false' : ""
2022-05-07 03:22:35 +00:00
new_db_meta['db_params'] = db_meta['db_params'] + sam_format
2022-06-10 17:33:37 +00:00
// Combine reduced sample metadata with updated database parameters metadata,
// make sure id is db_name for publishing purposes.
def new_meta = temp_meta + new_db_meta
new_meta['id'] = new_meta['db_name']
[ new_meta, reads, db ]
2022-05-01 05:24:58 +00:00
}
.groupTuple(by: [0,2])
.multiMap {
it ->
reads: [ it[0], it[1].flatten() ]
db: it[2]
2022-05-01 05:28:29 +00:00
}
2022-04-10 04:43:30 +00:00
MALT_RUN ( ch_input_for_malt.reads, params.malt_mode, ch_input_for_malt.db )
2022-04-16 05:42:30 +00:00
ch_maltrun_for_megan = MALT_RUN.out.rma6
.transpose()
.map{
meta, rma ->
// re-extract meta from file names, use filename without rma to
// ensure we keep paired-end information in downstream filenames
// when no pair-merging
def meta_new = meta.clone()
meta_new['db_name'] = meta.id
2022-04-19 10:54:57 +00:00
meta_new['id'] = rma.baseName
2022-04-16 05:42:30 +00:00
[ meta_new, rma ]
}
2022-05-07 03:22:35 +00:00
MEGAN_RMA2INFO (ch_maltrun_for_megan, params.malt_generate_megansummary )
2022-04-16 05:42:30 +00:00
ch_multiqc_files = ch_multiqc_files.mix( MALT_RUN.out.log.collect{it[1]}.ifEmpty([]) )
2022-04-23 06:36:01 +00:00
ch_versions = ch_versions.mix( MALT_RUN.out.versions.first(), MEGAN_RMA2INFO.out.versions.first() )
2022-04-16 05:42:30 +00:00
ch_raw_profiles = ch_raw_profiles.mix( MEGAN_RMA2INFO.out.txt )
2022-05-01 05:24:58 +00:00
2022-04-10 04:43:30 +00:00
}
if ( params.run_kraken2 ) {
2022-05-01 05:24:58 +00:00
ch_input_for_kraken2 = ch_input_for_profiling.kraken2
.multiMap {
it ->
reads: [ it[0] + it[2], it[1] ]
db: it[3]
}
2022-05-07 03:22:35 +00:00
KRAKEN2_KRAKEN2 ( ch_input_for_kraken2.reads, ch_input_for_kraken2.db, params.kraken2_save_reads, params.kraken2_save_readclassification )
2022-06-14 13:48:01 +00:00
KRAKENTOOLS_KREPORT2KRONA ( KRAKEN2_KRAKEN2.out.report )
KRONA_KRAKENCLEANUP ( KRAKENTOOLS_KREPORT2KRONA.out.txt )
KRONA_IMPORTKRAKEN( KRONA_KRAKENCLEANUP.out.txt.map{[[id: it[0].db_name], it[1]]}.groupTuple() )
2022-05-07 03:22:35 +00:00
ch_multiqc_files = ch_multiqc_files.mix( KRAKEN2_KRAKEN2.out.report.collect{it[1]}.ifEmpty([]) )
2022-06-14 13:48:01 +00:00
ch_visualizations = ch_visualizations.mix( KRONA_IMPORTKRAKEN.out.html )
2022-04-16 05:42:30 +00:00
ch_versions = ch_versions.mix( KRAKEN2_KRAKEN2.out.versions.first() )
2022-06-14 13:48:01 +00:00
ch_versions = ch_versions.mix( KRAKENTOOLS_KREPORT2KRONA.out.versions.first() )
ch_versions = ch_versions.mix( KRONA_KRAKENCLEANUP.out.versions.first() )
ch_versions = ch_versions.mix( KRONA_IMPORTKRAKEN.out.versions.first() )
2022-05-07 03:22:35 +00:00
ch_raw_profiles = ch_raw_profiles.mix( KRAKEN2_KRAKEN2.out.report )
2022-05-01 05:24:58 +00:00
2022-04-10 04:43:30 +00:00
}
if ( params.run_centrifuge ) {
2022-05-01 05:24:58 +00:00
ch_input_for_centrifuge = ch_input_for_profiling.centrifuge
.filter{
if (it[0].is_fasta) log.warn "[nf-core/taxprofiler] Centrifuge currently does not accept FASTA files as input. Skipping Centrifuge for sample ${it[0].id}."
!it[0].is_fasta
}
.multiMap {
it ->
reads: [ it[0] + it[2], it[1] ]
db: it[3]
}
2022-05-07 03:22:35 +00:00
CENTRIFUGE_CENTRIFUGE ( ch_input_for_centrifuge.reads, ch_input_for_centrifuge.db, params.centrifuge_save_reads, params.centrifuge_save_reads, params.centrifuge_save_reads )
2022-04-22 13:24:10 +00:00
CENTRIFUGE_KREPORT (CENTRIFUGE_CENTRIFUGE.out.results, ch_input_for_centrifuge.db)
2022-04-16 05:42:30 +00:00
ch_versions = ch_versions.mix( CENTRIFUGE_CENTRIFUGE.out.versions.first() )
2022-04-22 13:24:10 +00:00
ch_raw_profiles = ch_raw_profiles.mix( CENTRIFUGE_KREPORT.out.kreport )
2022-05-01 05:24:58 +00:00
2022-04-10 04:43:30 +00:00
}
if ( params.run_metaphlan3 ) {
2022-05-01 05:24:58 +00:00
ch_input_for_metaphlan3 = ch_input_for_profiling.metaphlan3
.filter{
if (it[0].is_fasta) log.warn "[nf-core/taxprofiler] MetaPhlAn3 currently does not accept FASTA files as input. Skipping MetaPhlAn3 for sample ${it[0].id}."
!it[0].is_fasta
}
.multiMap {
it ->
reads: [it[0] + it[2], it[1]]
db: it[3]
}
2022-04-10 04:43:30 +00:00
METAPHLAN3 ( ch_input_for_metaphlan3.reads, ch_input_for_metaphlan3.db )
2022-04-16 05:42:30 +00:00
ch_versions = ch_versions.mix( METAPHLAN3.out.versions.first() )
2022-04-13 09:49:35 +00:00
ch_raw_profiles = ch_raw_profiles.mix( METAPHLAN3.out.biom )
2022-05-01 05:24:58 +00:00
2022-04-10 04:43:30 +00:00
}
2022-04-13 16:51:56 +00:00
if ( params.run_kaiju ) {
2022-05-01 05:24:58 +00:00
ch_input_for_kaiju = ch_input_for_profiling.kaiju
.multiMap {
it ->
reads: [it[0] + it[2], it[1]]
db: it[3]
}
2022-04-25 15:36:29 +00:00
KAIJU_KAIJU ( ch_input_for_kaiju.reads, ch_input_for_kaiju.db)
KAIJU_KAIJU2TABLE (KAIJU_KAIJU.out.results, ch_input_for_kaiju.db, params.kaiju_taxon_name)
ch_multiqc_files = ch_multiqc_files.mix( KAIJU_KAIJU2TABLE.out.summary.collect{it[1]}.ifEmpty([]) )
2022-04-13 16:51:56 +00:00
ch_versions = ch_versions.mix( KAIJU_KAIJU.out.versions.first() )
2022-04-25 15:36:29 +00:00
ch_raw_profiles = ch_raw_profiles.mix( KAIJU_KAIJU2TABLE.out.summary )
2022-05-01 05:24:58 +00:00
2022-04-13 16:51:56 +00:00
}
2022-04-10 04:43:30 +00:00
2022-04-29 19:59:42 +00:00
if ( params.run_diamond ) {
2022-05-01 05:24:58 +00:00
ch_input_for_diamond = ch_input_for_profiling.diamond
.multiMap {
it ->
reads: [it[0] + it[2], it[1]]
db: it[3]
}
2022-05-07 03:22:35 +00:00
// diamond only accepts single output file specification, therefore
// this will replace output file!
ch_diamond_reads_format = params.diamond_save_reads ? 'sam' : params.diamond_output_format
2022-06-03 20:29:04 +00:00
DIAMOND_BLASTX ( ch_input_for_diamond.reads, ch_input_for_diamond.db, ch_diamond_reads_format , [] )
2022-04-29 19:59:42 +00:00
ch_versions = ch_versions.mix( DIAMOND_BLASTX.out.versions.first() )
2022-05-07 03:22:35 +00:00
ch_raw_profiles = ch_raw_profiles.mix( DIAMOND_BLASTX.out.tsv )
2022-05-01 05:24:58 +00:00
2022-04-29 19:59:42 +00:00
}
2022-04-10 04:43:30 +00:00
emit:
2022-06-14 14:02:06 +00:00
profiles = ch_raw_profiles // channel: [ val(meta), [ reads ] ] - should be text files or biom
versions = ch_versions // channel: [ versions.yml ]
mqc = ch_multiqc_files
visualizations = ch_visualizations
2022-04-10 04:43:30 +00:00
}