From 278f5605ca831338384b5045e34f500f0b5ca1a2 Mon Sep 17 00:00:00 2001 From: James Fellows Yates Date: Sat, 19 Feb 2022 12:36:08 +0100 Subject: [PATCH] Added database preparation and final channel for profiling --- modules/local/database_check.nf | 25 ++++++++++ nextflow.config | 3 ++ subworkflows/local/db_check.nf | 40 ++++++++++++++++ subworkflows/local/input_check.nf | 2 +- subworkflows/local/preprocessing.nf | 73 +++++++++++++++++++++++++++++ workflows/taxprofiler.nf | 65 ++++++++----------------- 6 files changed, 161 insertions(+), 47 deletions(-) create mode 100644 modules/local/database_check.nf create mode 100644 subworkflows/local/db_check.nf create mode 100644 subworkflows/local/preprocessing.nf diff --git a/modules/local/database_check.nf b/modules/local/database_check.nf new file mode 100644 index 0000000..4da4313 --- /dev/null +++ b/modules/local/database_check.nf @@ -0,0 +1,25 @@ +process DATABASE_CHECK { + tag "$databasesheet" + + conda (params.enable_conda ? "conda-forge::python=3.8.3" : null) + container "${ workflow.containerEngine == 'singularity' && !task.ext.singularity_pull_docker_container ? + 'https://depot.galaxyproject.org/singularity/python:3.8.3' : + 'quay.io/biocontainers/python:3.8.3' }" + + input: + path databasesheet + + output: + path '*.csv' , emit: csv + path "versions.yml", emit: versions + + script: // This script is bundled with the pipeline, in nf-core/taxprofiler/bin/ + """ + cat $databasesheet >> database_sheet.valid.csv + + cat <<-END_VERSIONS > versions.yml + "${task.process}": + python: \$(python --version | sed 's/Python //g') + END_VERSIONS + """ +} diff --git a/nextflow.config b/nextflow.config index 160dba7..60b5a42 100644 --- a/nextflow.config +++ b/nextflow.config @@ -50,6 +50,9 @@ params { max_cpus = 16 max_time = '240.h' + // Databaess + databases = null + // FASTQ preprocessing fastp_clip_merge = false fastp_exclude_unmerged = true diff --git a/subworkflows/local/db_check.nf b/subworkflows/local/db_check.nf new file mode 100644 index 0000000..909d98f --- /dev/null +++ b/subworkflows/local/db_check.nf @@ -0,0 +1,40 @@ +// +// Check input samplesheet and get read channels +// + +include { DATABASE_CHECK } from '../../modules/local/database_check' + +workflow DB_CHECK { + take: + dbsheet // file: /path/to/dbsheet.csv + + main: + + // TODO: make database sheet check + parsed_samplesheet = DATABASE_CHECK ( dbsheet ) + .csv + .splitCsv ( header:true, sep:',' ) + .dump(tag: "db_split_csv_out") + .map { create_db_channels(it) } + .dump(tag: "db_channel_prepped") + .set{ dbs } + + emit: + dbs // channel: [ val(meta), [ db ] ] + versions = DATABASE_CHECK.out.versions // channel: [ versions.yml ] +} + +def create_db_channels(LinkedHashMap row) { + def meta = [:] + meta.tool = row.tool + meta.db_name = row.db_name + meta.db_params = row.db_params + + def array = [] + if (!file(row.db_path, type: 'dir').exists()) { + exit 1, "ERROR: Please check input samplesheet -> database could not be found!\n${row.db_path}" + } + array = [ meta, file(row.db_path) ] + + return array +} diff --git a/subworkflows/local/input_check.nf b/subworkflows/local/input_check.nf index 241a8a7..8497faa 100644 --- a/subworkflows/local/input_check.nf +++ b/subworkflows/local/input_check.nf @@ -12,7 +12,7 @@ workflow INPUT_CHECK { parsed_samplesheet = SAMPLESHEET_CHECK ( samplesheet ) .csv .splitCsv ( header:true, sep:',' ) - .dump(tag: "split_csv_out") + .dump(tag: "input_split_csv_out") .branch { fasta: it['fasta'] != '' fastq: true diff --git a/subworkflows/local/preprocessing.nf b/subworkflows/local/preprocessing.nf new file mode 100644 index 0000000..5832824 --- /dev/null +++ b/subworkflows/local/preprocessing.nf @@ -0,0 +1,73 @@ +// +// Check input samplesheet and get read channels +// + + +include { FASTP as FASTP_SINGLE } from '../../modules/nf-core/modules/fastp/main' +include { FASTP as FASTP_PAIRED } from '../../modules/nf-core/modules/fastp/main' +include { FASTQC as FASTQC_POST } from '../../modules/nf-core/modules/fastqc/main' + +workflow FASTQ_PREPROCESSING { + take: + reads // file: /path/to/samplesheet.csv + + main: + ch_versions = Channel.empty() + ch_multiqc_files = Channel.empty() + + // + // STEP: Read clipping and merging + // + // TODO give option to clip only and retain pairs + // TODO give option to retain singletons (probably fastp option likely) + // TODO move to subworkflow + + + if ( params.fastp_clip_merge ) { + + ch_input_for_fastp = reads + .dump(tag: "pre-fastp_branch") + .branch{ + single: it[0]['single_end'] == true + paired: it[0]['single_end'] == false + } + + ch_input_for_fastp.single.dump(tag: "input_fastp_single") + ch_input_for_fastp.paired.dump(tag: "input_fastp_paired") + + FASTP_SINGLE ( ch_input_for_fastp.single, false, false ) + FASTP_PAIRED ( ch_input_for_fastp.paired, false, true ) + + ch_fastp_reads_prepped = FASTP_PAIRED.out.reads_merged + .mix( FASTP_SINGLE.out.reads ) + .map { + meta, reads -> + def meta_new = meta.clone() + meta_new['single_end'] = 1 + [ meta_new, reads ] + } + + FASTQC_POST ( ch_fastp_reads_prepped ) + + ch_versions = ch_versions.mix(FASTP_SINGLE.out.versions.first()) + ch_versions = ch_versions.mix(FASTP_PAIRED.out.versions.first()) + + ch_processed_reads = ch_fastp_reads_prepped + + ch_multiqc_files = ch_multiqc_files.mix( FASTQC_POST.out.zip.collect{it[1]} ) + ch_multiqc_files = ch_multiqc_files.mix( FASTP_SINGLE.out.json.collect{it[1]} ) + ch_multiqc_files = ch_multiqc_files.mix( FASTP_PAIRED.out.json.collect{it[1]} ) + + ch_multiqc_files.dump(tag: "preprocessing_mqc_final") + + } else { + ch_processed_reads = reads + } + + + emit: + reads = ch_processed_reads // channel: [ val(meta), [ reads ] ] + versions = ch_versions // channel: [ versions.yml ] + mqc = ch_multiqc_files +} + diff --git a/workflows/taxprofiler.nf b/workflows/taxprofiler.nf index 3f10a9d..4a356a5 100644 --- a/workflows/taxprofiler.nf +++ b/workflows/taxprofiler.nf @@ -11,11 +11,12 @@ WorkflowTaxprofiler.initialise(params, log) // TODO nf-core: Add all file path parameters for the pipeline to the list below // Check input path parameters to see if they exist -def checkPathParamList = [ params.input, params.multiqc_config ] +def checkPathParamList = [ params.input, params.databases, params.multiqc_config ] for (param in checkPathParamList) { if (param) { file(param, checkIfExists: true) } } // Check mandatory parameters -if (params.input) { ch_input = file(params.input) } else { exit 1, 'Input samplesheet not specified!' } +if (params.input ) { ch_input = file(params.input) } else { exit 1, 'Input samplesheet not specified!' } +if (params.databases) { ch_databases = file(params.databases) } else { exit 1, 'Input database sheet not specified!' } /* ======================================================================================== @@ -35,7 +36,11 @@ ch_multiqc_custom_config = params.multiqc_config ? Channel.fromPath(params.multi // // SUBWORKFLOW: Consisting of a mix of local and nf-core/modules // -include { INPUT_CHECK } from '../subworkflows/local/input_check' +include { INPUT_CHECK } from '../subworkflows/local/input_check' + +include { DB_CHECK } from '../subworkflows/local/db_check' +include { FASTQ_PREPROCESSING } from '../subworkflows/local/preprocessing' + /* ======================================================================================== @@ -50,9 +55,6 @@ include { FASTQC } from '../modules/nf-core/modules/fastqc/ include { MULTIQC } from '../modules/nf-core/modules/multiqc/main' include { CUSTOM_DUMPSOFTWAREVERSIONS } from '../modules/nf-core/modules/custom/dumpsoftwareversions/main' -include { FASTP as FASTP_SINGLE } from '../modules/nf-core/modules/fastp/main' -include { FASTP as FASTP_PAIRED } from '../modules/nf-core/modules/fastp/main' -include { FASTQC as FASTQC_POST } from '../modules/nf-core/modules/fastqc/main' include { CAT_FASTQ } from '../modules/nf-core/modules/cat/fastq/main' /* @@ -76,6 +78,10 @@ workflow TAXPROFILER { ) ch_versions = ch_versions.mix(INPUT_CHECK.out.versions) + DB_CHECK ( + ch_databases + ) + // // MODULE: Run FastQC // @@ -91,47 +97,12 @@ workflow TAXPROFILER { // // MODULE: Run Clip/Merge/Complexity // - // TODO give option to clip only and retain pairs - // TODO give option to retain singletons (probably fastp option likely) - // TODO move to subworkflow if ( params.fastp_clip_merge ) { - - ch_input_for_fastp = INPUT_CHECK.out.fastq - .dump(tag: "pre-fastp_branch") - .branch{ - single: it[0]['single_end'] == true - paired: it[0]['single_end'] == false - } - - ch_input_for_fastp.single.dump(tag: "input_fastp_single") - ch_input_for_fastp.paired.dump(tag: "input_fastp_paired") - - FASTP_SINGLE ( ch_input_for_fastp.single, false, false ) - FASTP_PAIRED ( ch_input_for_fastp.paired, false, true ) - - ch_fastp_reads_prepped = FASTP_PAIRED.out.reads_merged - .mix( FASTP_SINGLE.out.reads ) - .map { - meta, reads -> - def meta_new = meta.clone() - meta_new['single_end'] = 1 - [ meta_new, reads ] - } - - FASTQC_POST ( ch_fastp_reads_prepped ) - - ch_versions = ch_versions.mix(FASTP_SINGLE.out.versions.first()) - ch_versions = ch_versions.mix(FASTP_PAIRED.out.versions.first()) - - ch_processed_reads = ch_fastp_reads_prepped - - } else { - ch_processed_reads = INPUT_CHECK.out.fastq + FASTQ_PREPROCESSING ( INPUT_CHECK.out.fastq ) } - // MODULE: Cat merge runs of same sample - ch_processed_for_combine = ch_processed_reads + ch_processed_for_combine = FASTQ_PREPROCESSING.out.reads .dump(tag: "prep_for_combine_grouping") .map { meta, reads -> @@ -153,6 +124,10 @@ workflow TAXPROFILER { .mix( CAT_FASTQ.out.reads ) .dump(tag: "files_for_profiling") + // Combine reads with possible databases + + ch_reads_for_profiling.combine(DB_CHECK.out.dbs).dump(tag: "reads_plus_db") + // // MODULE: MultiQC // @@ -166,9 +141,7 @@ workflow TAXPROFILER { ch_multiqc_files = ch_multiqc_files.mix(CUSTOM_DUMPSOFTWAREVERSIONS.out.mqc_yml.collect()) ch_multiqc_files = ch_multiqc_files.mix(FASTQC.out.zip.collect{it[1]}.ifEmpty([])) if (params.fastp_clip_merge) { - ch_multiqc_files = ch_multiqc_files.mix(FASTP_SINGLE.out.json.collect{it[1]}.ifEmpty([])) - ch_multiqc_files = ch_multiqc_files.mix(FASTP_PAIRED.out.json.collect{it[1]}.ifEmpty([])) - ch_multiqc_files = ch_multiqc_files.mix(FASTQC_POST.out.zip.collect{it[1]}.ifEmpty([])) + ch_multiqc_files = ch_multiqc_files.mix(FASTQ_PREPROCESSING.out.mqc) }