diff --git a/subworkflows/local/db_check.nf b/subworkflows/local/db_check.nf index 21ed21d..5cafae0 100644 --- a/subworkflows/local/db_check.nf +++ b/subworkflows/local/db_check.nf @@ -28,22 +28,20 @@ workflow DB_CHECK { // Normal checks for within-row validity, so can be moved to separate functions parsed_samplesheet = Channel.fromPath(dbsheet) .splitCsv ( header:true, sep:',' ) - .map { - validate_db_rows(it) - create_db_channels(it) + .map { row -> + validate_db_rows(row) + return [ row.subMap(['tool', 'db_name', 'db_params']), file(row.db_path) ] } ch_dbs_for_untar = parsed_samplesheet - .branch { - untar: it[1].toString().endsWith(".tar.gz") + .branch { db_meta, db -> + untar: db.name.endsWith(".tar.gz") skip: true } // Filter the channel to untar only those databases for tools that are selected to be run by the user. ch_input_untar = ch_dbs_for_untar.untar - .filter { - params["run_${it[0]['tool']}"] - } + .filter { db_meta, db -> params["run_${db_meta.tool}"] } UNTAR (ch_input_untar) ch_versions = ch_versions.mix(UNTAR.out.versions.first()) @@ -54,41 +52,27 @@ workflow DB_CHECK { versions = ch_versions // channel: [ versions.yml ] } -def validate_db_rows(LinkedHashMap row){ +def validate_db_rows(LinkedHashMap row) { - // check minimum number of columns - if (row.size() < 4) exit 1, "[nf-core/taxprofiler] ERROR: Invalid database input sheet - malformed row (e.g. missing column). See documentation for more information. Error in: ${row}" + // check minimum number of columns + if (row.size() < 4) exit 1, "[nf-core/taxprofiler] ERROR: Invalid database input sheet - malformed row (e.g. missing column). See documentation for more information. Error in: ${row}" - // all columns there - def expected_headers = ['tool', 'db_name', 'db_params', 'db_path'] - if ( !row.keySet().containsAll(expected_headers) ) exit 1, "[nf-core/taxprofiler] ERROR: Invalid database input sheet - malformed column names. Please check input TSV. Column names should be: ${expected_keys.join(", ")}" + // all columns there + def expected_headers = ['tool', 'db_name', 'db_params', 'db_path'] + if ( !row.keySet().containsAll(expected_headers) ) exit 1, "[nf-core/taxprofiler] ERROR: Invalid database input sheet - malformed column names. Please check input TSV. Column names should be: ${expected_headers.join(", ")}" - // valid tools specified - def expected_tools = [ "bracken", "centrifuge", "diamond", "kaiju", "kraken2", "krakenuniq", "malt", "metaphlan3", "motus" ] - if ( !expected_tools.contains(row.tool) ) exit 1, "[nf-core/taxprofiler] ERROR: Invalid tool name. Please see documentation for all supported profilers. Error in: ${row}" + // valid tools specified + def expected_tools = [ "bracken", "centrifuge", "diamond", "kaiju", "kraken2", "krakenuniq", "malt", "metaphlan3", "motus" ] + if ( !expected_tools.contains(row.tool) ) exit 1, "[nf-core/taxprofiler] ERROR: Invalid tool name. Please see documentation for all supported profilers. Error in: ${row}" - // detect quotes in params - if ( row.db_params.contains('"') ) exit 1, "[nf-core/taxprofiler] ERROR: Invalid database db_params entry. No quotes allowed. Error in: ${row}" - if ( row.db_params.contains("'") ) exit 1, "[nf-core/taxprofiler] ERROR: Invalid database db_params entry. No quotes allowed. Error in: ${row}" + // detect quotes in params + if ( row.db_params.contains('"') ) exit 1, "[nf-core/taxprofiler] ERROR: Invalid database db_params entry. No quotes allowed. Error in: ${row}" + if ( row.db_params.contains("'") ) exit 1, "[nf-core/taxprofiler] ERROR: Invalid database db_params entry. No quotes allowed. Error in: ${row}" - // check if any form of bracken params, that it must have `;` - if ( row.tool == 'bracken' && row.db_params && !row.db_params.contains(";") ) exit 1, "[nf-core/taxprofiler] ERROR: Invalid database db_params entry. Bracken requires a semi-colon if passing parameter. Error in: ${row}" + // check if any form of bracken params, that it must have `;` + if ( row.tool == 'bracken' && row.db_params && !row.db_params.contains(";") ) exit 1, "[nf-core/taxprofiler] ERROR: Invalid database db_params entry. Bracken requires a semi-colon if passing parameter. Error in: ${row}" + + // ensure that the database directory exists + if (!file(row.db_path, type: 'dir').exists()) exit 1, "ERROR: Please check input samplesheet -> database path could not be found!\n${row.db_path}" } - -def create_db_channels(LinkedHashMap row) { - def meta = [:] - meta.tool = row.tool - meta.db_name = row.db_name - meta.db_params = row.db_params - - def array = [] - if (!file(row.db_path, type: 'dir').exists()) { - exit 1, "ERROR: Please check input samplesheet -> database path could not be found!\n${row.db_path}" - } - array = [ meta, file(row.db_path) ] - - return array -} - - diff --git a/subworkflows/local/input_check.nf b/subworkflows/local/input_check.nf index 57061f1..5764b72 100644 --- a/subworkflows/local/input_check.nf +++ b/subworkflows/local/input_check.nf @@ -12,9 +12,9 @@ workflow INPUT_CHECK { parsed_samplesheet = SAMPLESHEET_CHECK ( samplesheet ) .csv .splitCsv ( header:true, sep:',' ) - .branch { - fasta: it['fasta'] != '' - nanopore: it['instrument_platform'] == 'OXFORD_NANOPORE' + .branch { row -> + fasta: row.fasta != '' + nanopore: row.instrument_platform == 'OXFORD_NANOPORE' fastq: true } @@ -37,49 +37,42 @@ workflow INPUT_CHECK { // Function to get list of [ meta, [ fastq_1, fastq_2 ] ] def create_fastq_channel(LinkedHashMap row) { // create meta map - def meta = [:] - meta.id = row.sample - meta.run_accession = row.run_accession - meta.instrument_platform = row.instrument_platform - meta.single_end = row.single_end.toBoolean() - meta.is_fasta = false + def meta = row.subMap(['sample', 'run_accession', 'instrument_platform']) + meta.id = meta.sample + meta.single_end = row.single_end.toBoolean() + meta.is_fasta = false // add path(s) of the fastq file(s) to the meta map - def fastq_meta = [] if (!file(row.fastq_1).exists()) { exit 1, "ERROR: Please check input samplesheet -> Read 1 FastQ file does not exist!\n${row.fastq_1}" } + if (meta.single_end) { - fastq_meta = [ meta, [ file(row.fastq_1) ] ] + return [ meta, [ file(row.fastq_1) ] ] } else { if (meta.instrument_platform == 'OXFORD_NANOPORE') { if (row.fastq_2 != '') { exit 1, "ERROR: Please check input samplesheet -> For Oxford Nanopore reads Read 2 FastQ should be empty!\n${row.fastq_2}" } - fastq_meta = [ meta, [ file(row.fastq_1) ] ] + return [ meta, [ file(row.fastq_1) ] ] } else { if (!file(row.fastq_2).exists()) { exit 1, "ERROR: Please check input samplesheet -> Read 2 FastQ file does not exist!\n${row.fastq_2}" } - fastq_meta = [ meta, [ file(row.fastq_1), file(row.fastq_2) ] ] + return [ meta, [ file(row.fastq_1), file(row.fastq_2) ] ] } - } - return fastq_meta -}// Function to get list of [ meta, fasta ] -def create_fasta_channel(LinkedHashMap row) { - def meta = [:] - meta.id = row.sample - meta.run_accession = row.run_accession - meta.instrument_platform = row.instrument_platform - meta.single_end = true - meta.is_fasta = true +} + +// Function to get list of [ meta, fasta ] +def create_fasta_channel(LinkedHashMap row) { + def meta = row.subMap(['sample', 'run_accession', 'instrument_platform']) + meta.id = meta.sample + meta.single_end = true + meta.is_fasta = true - def array = [] if (!file(row.fasta).exists()) { exit 1, "ERROR: Please check input samplesheet -> FastA file does not exist!\n${row.fasta}" } - array = [ meta, [ file(row.fasta) ] ] - - return array + return [ meta, [ file(row.fasta) ] ] } diff --git a/subworkflows/local/longread_hostremoval.nf b/subworkflows/local/longread_hostremoval.nf index 63fce58..83660fe 100644 --- a/subworkflows/local/longread_hostremoval.nf +++ b/subworkflows/local/longread_hostremoval.nf @@ -46,7 +46,7 @@ workflow LONGREAD_HOSTREMOVAL { ch_versions = ch_versions.mix( SAMTOOLS_INDEX.out.versions.first() ) bam_bai = MINIMAP2_ALIGN.out.bam - .join(SAMTOOLS_INDEX.out.bai, remainder: true) + .join(SAMTOOLS_INDEX.out.bai) SAMTOOLS_STATS ( bam_bai, reference ) ch_versions = ch_versions.mix(SAMTOOLS_STATS.out.versions.first()) diff --git a/subworkflows/local/longread_preprocessing.nf b/subworkflows/local/longread_preprocessing.nf index 961417d..30963ec 100644 --- a/subworkflows/local/longread_preprocessing.nf +++ b/subworkflows/local/longread_preprocessing.nf @@ -20,33 +20,23 @@ workflow LONGREAD_PREPROCESSING { PORECHOP_PORECHOP ( reads ) ch_processed_reads = PORECHOP_PORECHOP.out.reads - .map { - meta, reads -> - def meta_new = meta.clone() - meta_new['single_end'] = 1 - [ meta_new, reads ] - } + .map { meta, reads -> [ meta + [single_end: 1], reads ] } ch_versions = ch_versions.mix(PORECHOP_PORECHOP.out.versions.first()) ch_multiqc_files = ch_multiqc_files.mix( PORECHOP_PORECHOP.out.log ) } else if ( params.longread_qc_skipadaptertrim && !params.longread_qc_skipqualityfilter) { - ch_processed_reads = FILTLONG ( reads.map{ meta, reads -> [meta, [], reads ]} ) + ch_processed_reads = FILTLONG ( reads.map { meta, reads -> [meta, [], reads ] } ) ch_versions = ch_versions.mix(FILTLONG.out.versions.first()) ch_multiqc_files = ch_multiqc_files.mix( FILTLONG.out.log ) } else { PORECHOP_PORECHOP ( reads ) ch_clipped_reads = PORECHOP_PORECHOP.out.reads - .map { - meta, reads -> - def meta_new = meta.clone() - meta_new['single_end'] = 1 - [ meta_new, reads ] - } + .map { meta, reads -> [ meta + [single_end: 1], reads ] } - ch_processed_reads = FILTLONG ( ch_clipped_reads.map{ meta, reads -> [meta, [], reads ]} ).reads + ch_processed_reads = FILTLONG ( ch_clipped_reads.map { meta, reads -> [ meta, [], reads ] } ).reads ch_versions = ch_versions.mix(PORECHOP_PORECHOP.out.versions.first()) ch_versions = ch_versions.mix(FILTLONG.out.versions.first()) diff --git a/subworkflows/local/profiling.nf b/subworkflows/local/profiling.nf index 0f9951e..e9440b3 100644 --- a/subworkflows/local/profiling.nf +++ b/subworkflows/local/profiling.nf @@ -35,10 +35,7 @@ workflow PROFILING { ch_input_for_profiling = reads .map { meta, reads -> - def meta_new = meta.clone() - pairtype = meta_new['single_end'] ? '_se' : '_pe' - meta_new['id'] = meta_new['id'] + pairtype - [meta_new, reads] + [meta + [id: "${meta.id}${meta.single_end ? '_se' : '_pe'}"], reads] } .combine(databases) .branch { @@ -68,34 +65,34 @@ workflow PROFILING { // MALT: We groupTuple to have all samples in one channel for MALT as database // loading takes a long time, so we only want to run it once per database ch_input_for_malt = ch_input_for_profiling.malt - .map { - meta, reads, db_meta, db -> + .map { + meta, reads, db_meta, db -> - // Reset entire input meta for MALT to just database name, - // as we don't run run on a per-sample basis due to huge datbaases - // so all samples are in one run and so sample-specific metadata - // unnecessary. Set as database name to prevent `null` job ID and prefix. - def temp_meta = [ id: meta['db_name'] ] + // Reset entire input meta for MALT to just database name, + // as we don't run run on a per-sample basis due to huge datbaases + // so all samples are in one run and so sample-specific metadata + // unnecessary. Set as database name to prevent `null` job ID and prefix. + def temp_meta = [ id: meta['db_name'] ] - // Extend database parameters to specify whether to save alignments or not - def new_db_meta = db_meta.clone() - def sam_format = params.malt_save_reads ? ' --alignments ./ -za false' : "" - new_db_meta['db_params'] = db_meta['db_params'] + sam_format + // Extend database parameters to specify whether to save alignments or not + def new_db_meta = db_meta.clone() + def sam_format = params.malt_save_reads ? ' --alignments ./ -za false' : "" + new_db_meta['db_params'] = db_meta['db_params'] + sam_format - // Combine reduced sample metadata with updated database parameters metadata, - // make sure id is db_name for publishing purposes. - def new_meta = temp_meta + new_db_meta - new_meta['id'] = new_meta['db_name'] + // Combine reduced sample metadata with updated database parameters metadata, + // make sure id is db_name for publishing purposes. + def new_meta = temp_meta + new_db_meta + new_meta['id'] = new_meta['db_name'] - [ new_meta, reads, db ] + [ new_meta, reads, db ] - } - .groupTuple(by: [0,2]) - .multiMap { - it -> - reads: [ it[0], it[1].flatten() ] - db: it[2] - } + } + .groupTuple(by: [0,2]) + .multiMap { + meta, reads, db -> + reads: [ meta, reads.flatten() ] + db: db + } MALT_RUN ( ch_input_for_malt.reads, ch_input_for_malt.db )