1
0
Fork 0
mirror of https://github.com/MillironX/taxprofiler.git synced 2024-11-22 08:49:55 +00:00

Merge pull request #258 from nf-core/clone-maps

refactor: double check maps and validation
This commit is contained in:
Moritz E. Beber 2023-03-11 21:39:49 +01:00 committed by GitHub
commit efa398edab
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 72 additions and 108 deletions

View file

@ -28,22 +28,20 @@ workflow DB_CHECK {
// Normal checks for within-row validity, so can be moved to separate functions // Normal checks for within-row validity, so can be moved to separate functions
parsed_samplesheet = Channel.fromPath(dbsheet) parsed_samplesheet = Channel.fromPath(dbsheet)
.splitCsv ( header:true, sep:',' ) .splitCsv ( header:true, sep:',' )
.map { .map { row ->
validate_db_rows(it) validate_db_rows(row)
create_db_channels(it) return [ row.subMap(['tool', 'db_name', 'db_params']), file(row.db_path) ]
} }
ch_dbs_for_untar = parsed_samplesheet ch_dbs_for_untar = parsed_samplesheet
.branch { .branch { db_meta, db ->
untar: it[1].toString().endsWith(".tar.gz") untar: db.name.endsWith(".tar.gz")
skip: true skip: true
} }
// Filter the channel to untar only those databases for tools that are selected to be run by the user. // Filter the channel to untar only those databases for tools that are selected to be run by the user.
ch_input_untar = ch_dbs_for_untar.untar ch_input_untar = ch_dbs_for_untar.untar
.filter { .filter { db_meta, db -> params["run_${db_meta.tool}"] }
params["run_${it[0]['tool']}"]
}
UNTAR (ch_input_untar) UNTAR (ch_input_untar)
ch_versions = ch_versions.mix(UNTAR.out.versions.first()) ch_versions = ch_versions.mix(UNTAR.out.versions.first())
@ -54,41 +52,27 @@ workflow DB_CHECK {
versions = ch_versions // channel: [ versions.yml ] versions = ch_versions // channel: [ versions.yml ]
} }
def validate_db_rows(LinkedHashMap row){ def validate_db_rows(LinkedHashMap row) {
// check minimum number of columns // check minimum number of columns
if (row.size() < 4) exit 1, "[nf-core/taxprofiler] ERROR: Invalid database input sheet - malformed row (e.g. missing column). See documentation for more information. Error in: ${row}" if (row.size() < 4) exit 1, "[nf-core/taxprofiler] ERROR: Invalid database input sheet - malformed row (e.g. missing column). See documentation for more information. Error in: ${row}"
// all columns there // all columns there
def expected_headers = ['tool', 'db_name', 'db_params', 'db_path'] def expected_headers = ['tool', 'db_name', 'db_params', 'db_path']
if ( !row.keySet().containsAll(expected_headers) ) exit 1, "[nf-core/taxprofiler] ERROR: Invalid database input sheet - malformed column names. Please check input TSV. Column names should be: ${expected_keys.join(", ")}" if ( !row.keySet().containsAll(expected_headers) ) exit 1, "[nf-core/taxprofiler] ERROR: Invalid database input sheet - malformed column names. Please check input TSV. Column names should be: ${expected_headers.join(", ")}"
// valid tools specified // valid tools specified
def expected_tools = [ "bracken", "centrifuge", "diamond", "kaiju", "kraken2", "krakenuniq", "malt", "metaphlan3", "motus" ] def expected_tools = [ "bracken", "centrifuge", "diamond", "kaiju", "kraken2", "krakenuniq", "malt", "metaphlan3", "motus" ]
if ( !expected_tools.contains(row.tool) ) exit 1, "[nf-core/taxprofiler] ERROR: Invalid tool name. Please see documentation for all supported profilers. Error in: ${row}" if ( !expected_tools.contains(row.tool) ) exit 1, "[nf-core/taxprofiler] ERROR: Invalid tool name. Please see documentation for all supported profilers. Error in: ${row}"
// detect quotes in params // detect quotes in params
if ( row.db_params.contains('"') ) exit 1, "[nf-core/taxprofiler] ERROR: Invalid database db_params entry. No quotes allowed. Error in: ${row}" if ( row.db_params.contains('"') ) exit 1, "[nf-core/taxprofiler] ERROR: Invalid database db_params entry. No quotes allowed. Error in: ${row}"
if ( row.db_params.contains("'") ) exit 1, "[nf-core/taxprofiler] ERROR: Invalid database db_params entry. No quotes allowed. Error in: ${row}" if ( row.db_params.contains("'") ) exit 1, "[nf-core/taxprofiler] ERROR: Invalid database db_params entry. No quotes allowed. Error in: ${row}"
// check if any form of bracken params, that it must have `;` // check if any form of bracken params, that it must have `;`
if ( row.tool == 'bracken' && row.db_params && !row.db_params.contains(";") ) exit 1, "[nf-core/taxprofiler] ERROR: Invalid database db_params entry. Bracken requires a semi-colon if passing parameter. Error in: ${row}" if ( row.tool == 'bracken' && row.db_params && !row.db_params.contains(";") ) exit 1, "[nf-core/taxprofiler] ERROR: Invalid database db_params entry. Bracken requires a semi-colon if passing parameter. Error in: ${row}"
// ensure that the database directory exists
if (!file(row.db_path, type: 'dir').exists()) exit 1, "ERROR: Please check input samplesheet -> database path could not be found!\n${row.db_path}"
} }
def create_db_channels(LinkedHashMap row) {
def meta = [:]
meta.tool = row.tool
meta.db_name = row.db_name
meta.db_params = row.db_params
def array = []
if (!file(row.db_path, type: 'dir').exists()) {
exit 1, "ERROR: Please check input samplesheet -> database path could not be found!\n${row.db_path}"
}
array = [ meta, file(row.db_path) ]
return array
}

View file

@ -12,9 +12,9 @@ workflow INPUT_CHECK {
parsed_samplesheet = SAMPLESHEET_CHECK ( samplesheet ) parsed_samplesheet = SAMPLESHEET_CHECK ( samplesheet )
.csv .csv
.splitCsv ( header:true, sep:',' ) .splitCsv ( header:true, sep:',' )
.branch { .branch { row ->
fasta: it['fasta'] != '' fasta: row.fasta != ''
nanopore: it['instrument_platform'] == 'OXFORD_NANOPORE' nanopore: row.instrument_platform == 'OXFORD_NANOPORE'
fastq: true fastq: true
} }
@ -37,49 +37,42 @@ workflow INPUT_CHECK {
// Function to get list of [ meta, [ fastq_1, fastq_2 ] ] // Function to get list of [ meta, [ fastq_1, fastq_2 ] ]
def create_fastq_channel(LinkedHashMap row) { def create_fastq_channel(LinkedHashMap row) {
// create meta map // create meta map
def meta = [:] def meta = row.subMap(['sample', 'run_accession', 'instrument_platform'])
meta.id = row.sample meta.id = meta.sample
meta.run_accession = row.run_accession meta.single_end = row.single_end.toBoolean()
meta.instrument_platform = row.instrument_platform meta.is_fasta = false
meta.single_end = row.single_end.toBoolean()
meta.is_fasta = false
// add path(s) of the fastq file(s) to the meta map // add path(s) of the fastq file(s) to the meta map
def fastq_meta = []
if (!file(row.fastq_1).exists()) { if (!file(row.fastq_1).exists()) {
exit 1, "ERROR: Please check input samplesheet -> Read 1 FastQ file does not exist!\n${row.fastq_1}" exit 1, "ERROR: Please check input samplesheet -> Read 1 FastQ file does not exist!\n${row.fastq_1}"
} }
if (meta.single_end) { if (meta.single_end) {
fastq_meta = [ meta, [ file(row.fastq_1) ] ] return [ meta, [ file(row.fastq_1) ] ]
} else { } else {
if (meta.instrument_platform == 'OXFORD_NANOPORE') { if (meta.instrument_platform == 'OXFORD_NANOPORE') {
if (row.fastq_2 != '') { if (row.fastq_2 != '') {
exit 1, "ERROR: Please check input samplesheet -> For Oxford Nanopore reads Read 2 FastQ should be empty!\n${row.fastq_2}" exit 1, "ERROR: Please check input samplesheet -> For Oxford Nanopore reads Read 2 FastQ should be empty!\n${row.fastq_2}"
} }
fastq_meta = [ meta, [ file(row.fastq_1) ] ] return [ meta, [ file(row.fastq_1) ] ]
} else { } else {
if (!file(row.fastq_2).exists()) { if (!file(row.fastq_2).exists()) {
exit 1, "ERROR: Please check input samplesheet -> Read 2 FastQ file does not exist!\n${row.fastq_2}" exit 1, "ERROR: Please check input samplesheet -> Read 2 FastQ file does not exist!\n${row.fastq_2}"
} }
fastq_meta = [ meta, [ file(row.fastq_1), file(row.fastq_2) ] ] return [ meta, [ file(row.fastq_1), file(row.fastq_2) ] ]
} }
} }
return fastq_meta }
}// Function to get list of [ meta, fasta ]
def create_fasta_channel(LinkedHashMap row) { // Function to get list of [ meta, fasta ]
def meta = [:] def create_fasta_channel(LinkedHashMap row) {
meta.id = row.sample def meta = row.subMap(['sample', 'run_accession', 'instrument_platform'])
meta.run_accession = row.run_accession meta.id = meta.sample
meta.instrument_platform = row.instrument_platform meta.single_end = true
meta.single_end = true meta.is_fasta = true
meta.is_fasta = true
def array = []
if (!file(row.fasta).exists()) { if (!file(row.fasta).exists()) {
exit 1, "ERROR: Please check input samplesheet -> FastA file does not exist!\n${row.fasta}" exit 1, "ERROR: Please check input samplesheet -> FastA file does not exist!\n${row.fasta}"
} }
array = [ meta, [ file(row.fasta) ] ] return [ meta, [ file(row.fasta) ] ]
return array
} }

View file

@ -46,7 +46,7 @@ workflow LONGREAD_HOSTREMOVAL {
ch_versions = ch_versions.mix( SAMTOOLS_INDEX.out.versions.first() ) ch_versions = ch_versions.mix( SAMTOOLS_INDEX.out.versions.first() )
bam_bai = MINIMAP2_ALIGN.out.bam bam_bai = MINIMAP2_ALIGN.out.bam
.join(SAMTOOLS_INDEX.out.bai, remainder: true) .join(SAMTOOLS_INDEX.out.bai)
SAMTOOLS_STATS ( bam_bai, reference ) SAMTOOLS_STATS ( bam_bai, reference )
ch_versions = ch_versions.mix(SAMTOOLS_STATS.out.versions.first()) ch_versions = ch_versions.mix(SAMTOOLS_STATS.out.versions.first())

View file

@ -20,33 +20,23 @@ workflow LONGREAD_PREPROCESSING {
PORECHOP_PORECHOP ( reads ) PORECHOP_PORECHOP ( reads )
ch_processed_reads = PORECHOP_PORECHOP.out.reads ch_processed_reads = PORECHOP_PORECHOP.out.reads
.map { .map { meta, reads -> [ meta + [single_end: 1], reads ] }
meta, reads ->
def meta_new = meta.clone()
meta_new['single_end'] = 1
[ meta_new, reads ]
}
ch_versions = ch_versions.mix(PORECHOP_PORECHOP.out.versions.first()) ch_versions = ch_versions.mix(PORECHOP_PORECHOP.out.versions.first())
ch_multiqc_files = ch_multiqc_files.mix( PORECHOP_PORECHOP.out.log ) ch_multiqc_files = ch_multiqc_files.mix( PORECHOP_PORECHOP.out.log )
} else if ( params.longread_qc_skipadaptertrim && !params.longread_qc_skipqualityfilter) { } else if ( params.longread_qc_skipadaptertrim && !params.longread_qc_skipqualityfilter) {
ch_processed_reads = FILTLONG ( reads.map{ meta, reads -> [meta, [], reads ]} ) ch_processed_reads = FILTLONG ( reads.map { meta, reads -> [meta, [], reads ] } )
ch_versions = ch_versions.mix(FILTLONG.out.versions.first()) ch_versions = ch_versions.mix(FILTLONG.out.versions.first())
ch_multiqc_files = ch_multiqc_files.mix( FILTLONG.out.log ) ch_multiqc_files = ch_multiqc_files.mix( FILTLONG.out.log )
} else { } else {
PORECHOP_PORECHOP ( reads ) PORECHOP_PORECHOP ( reads )
ch_clipped_reads = PORECHOP_PORECHOP.out.reads ch_clipped_reads = PORECHOP_PORECHOP.out.reads
.map { .map { meta, reads -> [ meta + [single_end: 1], reads ] }
meta, reads ->
def meta_new = meta.clone()
meta_new['single_end'] = 1
[ meta_new, reads ]
}
ch_processed_reads = FILTLONG ( ch_clipped_reads.map{ meta, reads -> [meta, [], reads ]} ).reads ch_processed_reads = FILTLONG ( ch_clipped_reads.map { meta, reads -> [ meta, [], reads ] } ).reads
ch_versions = ch_versions.mix(PORECHOP_PORECHOP.out.versions.first()) ch_versions = ch_versions.mix(PORECHOP_PORECHOP.out.versions.first())
ch_versions = ch_versions.mix(FILTLONG.out.versions.first()) ch_versions = ch_versions.mix(FILTLONG.out.versions.first())

View file

@ -35,10 +35,7 @@ workflow PROFILING {
ch_input_for_profiling = reads ch_input_for_profiling = reads
.map { .map {
meta, reads -> meta, reads ->
def meta_new = meta.clone() [meta + [id: "${meta.id}${meta.single_end ? '_se' : '_pe'}"], reads]
pairtype = meta_new['single_end'] ? '_se' : '_pe'
meta_new['id'] = meta_new['id'] + pairtype
[meta_new, reads]
} }
.combine(databases) .combine(databases)
.branch { .branch {
@ -68,34 +65,34 @@ workflow PROFILING {
// MALT: We groupTuple to have all samples in one channel for MALT as database // MALT: We groupTuple to have all samples in one channel for MALT as database
// loading takes a long time, so we only want to run it once per database // loading takes a long time, so we only want to run it once per database
ch_input_for_malt = ch_input_for_profiling.malt ch_input_for_malt = ch_input_for_profiling.malt
.map { .map {
meta, reads, db_meta, db -> meta, reads, db_meta, db ->
// Reset entire input meta for MALT to just database name, // Reset entire input meta for MALT to just database name,
// as we don't run run on a per-sample basis due to huge datbaases // as we don't run run on a per-sample basis due to huge datbaases
// so all samples are in one run and so sample-specific metadata // so all samples are in one run and so sample-specific metadata
// unnecessary. Set as database name to prevent `null` job ID and prefix. // unnecessary. Set as database name to prevent `null` job ID and prefix.
def temp_meta = [ id: meta['db_name'] ] def temp_meta = [ id: meta['db_name'] ]
// Extend database parameters to specify whether to save alignments or not // Extend database parameters to specify whether to save alignments or not
def new_db_meta = db_meta.clone() def new_db_meta = db_meta.clone()
def sam_format = params.malt_save_reads ? ' --alignments ./ -za false' : "" def sam_format = params.malt_save_reads ? ' --alignments ./ -za false' : ""
new_db_meta['db_params'] = db_meta['db_params'] + sam_format new_db_meta['db_params'] = db_meta['db_params'] + sam_format
// Combine reduced sample metadata with updated database parameters metadata, // Combine reduced sample metadata with updated database parameters metadata,
// make sure id is db_name for publishing purposes. // make sure id is db_name for publishing purposes.
def new_meta = temp_meta + new_db_meta def new_meta = temp_meta + new_db_meta
new_meta['id'] = new_meta['db_name'] new_meta['id'] = new_meta['db_name']
[ new_meta, reads, db ] [ new_meta, reads, db ]
} }
.groupTuple(by: [0,2]) .groupTuple(by: [0,2])
.multiMap { .multiMap {
it -> meta, reads, db ->
reads: [ it[0], it[1].flatten() ] reads: [ meta, reads.flatten() ]
db: it[2] db: db
} }
MALT_RUN ( ch_input_for_malt.reads, ch_input_for_malt.db ) MALT_RUN ( ch_input_for_malt.reads, ch_input_for_malt.db )