From cf55cc592cf41868f8cd480f53116525ab08f960 Mon Sep 17 00:00:00 2001 From: James Fellows Yates Date: Fri, 18 Feb 2022 16:51:01 +0100 Subject: [PATCH] Get skeleton read processing to input for profiling --- README.md | 6 +- conf/modules.config | 39 +++++++++++ conf/test.config | 4 +- lib/WorkflowTaxprofiler.groovy | 9 +-- modules.json | 6 ++ modules/nf-core/modules/cat/fastq/main.nf | 51 ++++++++++++++ modules/nf-core/modules/cat/fastq/meta.yml | 39 +++++++++++ modules/nf-core/modules/fastp/main.nf | 75 ++++++++++++++++++++ modules/nf-core/modules/fastp/meta.yml | 68 ++++++++++++++++++ nextflow.config | 5 +- nextflow_schema.json | 9 --- subworkflows/local/input_check.nf | 43 ++++++++++-- workflows/taxprofiler.nf | 80 +++++++++++++++++++++- 13 files changed, 407 insertions(+), 27 deletions(-) create mode 100644 modules/nf-core/modules/cat/fastq/main.nf create mode 100644 modules/nf-core/modules/cat/fastq/meta.yml create mode 100644 modules/nf-core/modules/fastp/main.nf create mode 100644 modules/nf-core/modules/fastp/meta.yml diff --git a/README.md b/README.md index f57666d..5f00709 100644 --- a/README.md +++ b/README.md @@ -42,9 +42,9 @@ On release, automated continuous integration tests run the pipeline on a full-si - Centrifuge - Kaiju - mOTUs -4. Perform optional post-processing with: - - bracken -5. Standardises output tables +4. Perform optional post-processing with: + - bracken +5. Standardises output tables 6. Present QC for raw reads ([`MultiQC`](http://multiqc.info/)) ## Quick Start diff --git a/conf/modules.config b/conf/modules.config index a0506a4..2d533e9 100644 --- a/conf/modules.config +++ b/conf/modules.config @@ -28,8 +28,47 @@ process { withName: FASTQC { ext.args = '--quiet' + ext.prefix = { "${meta.id}_${meta.run_accession}_raw" } + publishDir = [ + path: { "${params.outdir}/fastqc/raw" }, + mode: 'copy', + pattern: '*.html' + ] } + withName: FASTP { + ext.prefix = { "${meta.id}_${meta.run_accession}" } + // TODO also include option to NOT merge + ext.args = [ + { ${meta.single_end} } == 0 ? "-m" : '', + params.fastp_exclude_unmerged ? '' : "--include_unmerged" + ].join(' ').trim() + publishDir = [ + path: { "${params.outdir}/fastp" }, + mode: 'copy', + pattern: '*.fastq.gz' + ] + } + + withName: FASTQC_POST { + ext.args = '--quiet' + ext.prefix = { "${meta.id}_${meta.run_accession}_processed" } + publishDir = [ + path: { "${params.outdir}/fastqc/processed" }, + mode: 'copy', + pattern: '*.html' + ] + } + + withName: CAT_FASTQ { + publishDir = [ + path: { "${params.outdir}/prepared_sequences" }, + mode: 'copy', + pattern: '*.fastq.gz' + ] + } + + withName: CUSTOM_DUMPSOFTWAREVERSIONS { publishDir = [ path: { "${params.outdir}/pipeline_info" }, diff --git a/conf/test.config b/conf/test.config index 45f87af..5db566a 100644 --- a/conf/test.config +++ b/conf/test.config @@ -22,8 +22,6 @@ params { // Input data // TODO nf-core: Specify the paths to your test data on nf-core/test-datasets // TODO nf-core: Give any required params for the test so that command line flags are not needed - input = 'https://raw.githubusercontent.com/nf-core/test-datasets/viralrecon/samplesheet/samplesheet_test_illumina_amplicon.csv' + input = 'https://raw.githubusercontent.com/nf-core/test-datasets/taxprofiler/samplesheet.csv' - // Genome references - genome = 'R64-1-1' } diff --git a/lib/WorkflowTaxprofiler.groovy b/lib/WorkflowTaxprofiler.groovy index 53482ac..57a95e3 100755 --- a/lib/WorkflowTaxprofiler.groovy +++ b/lib/WorkflowTaxprofiler.groovy @@ -10,10 +10,11 @@ class WorkflowTaxprofiler { public static void initialise(params, log) { genomeExistsError(params, log) - if (!params.fasta) { - log.error "Genome fasta file not specified with e.g. '--fasta genome.fa' or via a detectable config file." - System.exit(1) - } + // TODO update as necessary + //if (!params.fasta) { + // log.error "Genome fasta file not specified with e.g. '--fasta genome.fa' or via a detectable config file." + // System.exit(1) + //} } // diff --git a/modules.json b/modules.json index 939b85f..6cf2b3e 100644 --- a/modules.json +++ b/modules.json @@ -3,9 +3,15 @@ "homePage": "https://github.com/nf-core/taxprofiler", "repos": { "nf-core/modules": { + "cat/fastq": { + "git_sha": "e745e167c1020928ef20ea1397b6b4d230681b4d" + }, "custom/dumpsoftwareversions": { "git_sha": "20d8250d9f39ddb05dfb437603aaf99b5c0b2b41" }, + "fastp": { + "git_sha": "d0a1cbb703a130c19f6796c3fce24fbe7dfce789" + }, "fastqc": { "git_sha": "9d0cad583b9a71a6509b754fdf589cbfbed08961" }, diff --git a/modules/nf-core/modules/cat/fastq/main.nf b/modules/nf-core/modules/cat/fastq/main.nf new file mode 100644 index 0000000..bf0877c --- /dev/null +++ b/modules/nf-core/modules/cat/fastq/main.nf @@ -0,0 +1,51 @@ +process CAT_FASTQ { + tag "$meta.id" + label 'process_low' + + conda (params.enable_conda ? "conda-forge::sed=4.7" : null) + container "${ workflow.containerEngine == 'singularity' && !task.ext.singularity_pull_docker_container ? + 'https://containers.biocontainers.pro/s3/SingImgsRepo/biocontainers/v1.2.0_cv1/biocontainers_v1.2.0_cv1.img' : + 'biocontainers/biocontainers:v1.2.0_cv1' }" + + input: + tuple val(meta), path(reads, stageAs: "input*/*") + + output: + tuple val(meta), path("*.merged.fastq.gz"), emit: reads + path "versions.yml" , emit: versions + + when: + task.ext.when == null || task.ext.when + + script: + def args = task.ext.args ?: '' + def prefix = task.ext.prefix ?: "${meta.id}" + def readList = reads.collect{ it.toString() } + if (meta.single_end) { + if (readList.size > 1) { + """ + cat ${readList.join(' ')} > ${prefix}.merged.fastq.gz + + cat <<-END_VERSIONS > versions.yml + "${task.process}": + cat: \$(echo \$(cat --version 2>&1) | sed 's/^.*coreutils) //; s/ .*\$//') + END_VERSIONS + """ + } + } else { + if (readList.size > 2) { + def read1 = [] + def read2 = [] + readList.eachWithIndex{ v, ix -> ( ix & 1 ? read2 : read1 ) << v } + """ + cat ${read1.join(' ')} > ${prefix}_1.merged.fastq.gz + cat ${read2.join(' ')} > ${prefix}_2.merged.fastq.gz + + cat <<-END_VERSIONS > versions.yml + "${task.process}": + cat: \$(echo \$(cat --version 2>&1) | sed 's/^.*coreutils) //; s/ .*\$//') + END_VERSIONS + """ + } + } +} diff --git a/modules/nf-core/modules/cat/fastq/meta.yml b/modules/nf-core/modules/cat/fastq/meta.yml new file mode 100644 index 0000000..c836598 --- /dev/null +++ b/modules/nf-core/modules/cat/fastq/meta.yml @@ -0,0 +1,39 @@ +name: cat_fastq +description: Concatenates fastq files +keywords: + - fastq + - concatenate +tools: + - cat: + description: | + The cat utility reads files sequentially, writing them to the standard output. + documentation: https://www.gnu.org/software/coreutils/manual/html_node/cat-invocation.html + licence: ["GPL-3.0-or-later"] +input: + - meta: + type: map + description: | + Groovy Map containing sample information + e.g. [ id:'test', single_end:false ] + - reads: + type: list + description: | + List of input FastQ files to be concatenated. +output: + - meta: + type: map + description: | + Groovy Map containing sample information + e.g. [ id:'test', single_end:false ] + - reads: + type: file + description: Merged fastq file + pattern: "*.{merged.fastq.gz}" + - versions: + type: file + description: File containing software versions + pattern: "versions.yml" + +authors: + - "@joseespinosa" + - "@drpatelh" diff --git a/modules/nf-core/modules/fastp/main.nf b/modules/nf-core/modules/fastp/main.nf new file mode 100644 index 0000000..5c9e3b8 --- /dev/null +++ b/modules/nf-core/modules/fastp/main.nf @@ -0,0 +1,75 @@ +process FASTP { + tag "$meta.id" + label 'process_medium' + + conda (params.enable_conda ? 'bioconda::fastp=0.23.2' : null) + container "${ workflow.containerEngine == 'singularity' && !task.ext.singularity_pull_docker_container ? + 'https://depot.galaxyproject.org/singularity/fastp:0.23.2--h79da9fb_0' : + 'quay.io/biocontainers/fastp:0.23.2--h79da9fb_0' }" + + input: + tuple val(meta), path(reads) + val save_trimmed_fail + val save_merged + + output: + tuple val(meta), path('*.trim.fastq.gz') , optional:true, emit: reads + tuple val(meta), path('*.json') , emit: json + tuple val(meta), path('*.html') , emit: html + tuple val(meta), path('*.log') , emit: log + path "versions.yml" , emit: versions + tuple val(meta), path('*.fail.fastq.gz') , optional:true, emit: reads_fail + tuple val(meta), path('*.merged.fastq.gz'), optional:true, emit: reads_merged + + when: + task.ext.when == null || task.ext.when + + script: + def args = task.ext.args ?: '' + // Added soft-links to original fastqs for consistent naming in MultiQC + def prefix = task.ext.prefix ?: "${meta.id}" + if (meta.single_end) { + def fail_fastq = save_trimmed_fail ? "--failed_out ${prefix}.fail.fastq.gz" : '' + """ + [ ! -f ${prefix}.fastq.gz ] && ln -s $reads ${prefix}.fastq.gz + fastp \\ + --in1 ${prefix}.fastq.gz \\ + --out1 ${prefix}.trim.fastq.gz \\ + --thread $task.cpus \\ + --json ${prefix}.fastp.json \\ + --html ${prefix}.fastp.html \\ + $fail_fastq \\ + $args \\ + 2> ${prefix}.fastp.log + cat <<-END_VERSIONS > versions.yml + "${task.process}": + fastp: \$(fastp --version 2>&1 | sed -e "s/fastp //g") + END_VERSIONS + """ + } else { + def fail_fastq = save_trimmed_fail ? "--unpaired1 ${prefix}_1.fail.fastq.gz --unpaired2 ${prefix}_2.fail.fastq.gz" : '' + def merge_fastq = save_merged ? "-m --merged_out ${prefix}.merged.fastq.gz" : '' + """ + [ ! -f ${prefix}_1.fastq.gz ] && ln -s ${reads[0]} ${prefix}_1.fastq.gz + [ ! -f ${prefix}_2.fastq.gz ] && ln -s ${reads[1]} ${prefix}_2.fastq.gz + fastp \\ + --in1 ${prefix}_1.fastq.gz \\ + --in2 ${prefix}_2.fastq.gz \\ + --out1 ${prefix}_1.trim.fastq.gz \\ + --out2 ${prefix}_2.trim.fastq.gz \\ + --json ${prefix}.fastp.json \\ + --html ${prefix}.fastp.html \\ + $fail_fastq \\ + $merge_fastq \\ + --thread $task.cpus \\ + --detect_adapter_for_pe \\ + $args \\ + 2> ${prefix}.fastp.log + + cat <<-END_VERSIONS > versions.yml + "${task.process}": + fastp: \$(fastp --version 2>&1 | sed -e "s/fastp //g") + END_VERSIONS + """ + } +} diff --git a/modules/nf-core/modules/fastp/meta.yml b/modules/nf-core/modules/fastp/meta.yml new file mode 100644 index 0000000..3274e41 --- /dev/null +++ b/modules/nf-core/modules/fastp/meta.yml @@ -0,0 +1,68 @@ +name: fastp +description: Perform adapter/quality trimming on sequencing reads +keywords: + - trimming + - quality control + - fastq +tools: + - fastp: + description: | + A tool designed to provide fast all-in-one preprocessing for FastQ files. This tool is developed in C++ with multithreading supported to afford high performance. + documentation: https://github.com/OpenGene/fastp + doi: https://doi.org/10.1093/bioinformatics/bty560 + licence: ["MIT"] +input: + - meta: + type: map + description: | + Groovy Map containing sample information + e.g. [ id:'test', single_end:false ] + - reads: + type: file + description: | + List of input FastQ files of size 1 and 2 for single-end and paired-end data, + respectively. + - save_trimmed_fail: + type: boolean + description: Specify true to save files that failed to pass trimming thresholds ending in `*.fail.fastq.gz` + - save_merged: + type: boolean + description: Specify true to save all merged reads to the a file ending in `*.merged.fastq.gz` + +output: + - meta: + type: map + description: | + Groovy Map containing sample information + e.g. [ id:'test', single_end:false ] + - reads: + type: file + description: The trimmed/modified/unmerged fastq reads + pattern: "*trim.fastq.gz" + - json: + type: file + description: Results in JSON format + pattern: "*.json" + - html: + type: file + description: Results in HTML format + pattern: "*.html" + - log: + type: file + description: fastq log file + pattern: "*.log" + - versions: + type: file + description: File containing software versions + pattern: "versions.yml" + - reads_fail: + type: file + description: Reads the failed the preprocessing + pattern: "*fail.fastq.gz" + - reads_merged: + type: file + description: Reads that were successfully merged + pattern: "*.{merged.fastq.gz}" +authors: + - "@drpatelh" + - "@kevinmenden" diff --git a/nextflow.config b/nextflow.config index 186979c..160dba7 100644 --- a/nextflow.config +++ b/nextflow.config @@ -33,7 +33,7 @@ params { help = false validate_params = true show_hidden_params = false - schema_ignore_params = 'genomes' + schema_ignore_params = 'genomes,fasta' enable_conda = false // Config options @@ -50,6 +50,9 @@ params { max_cpus = 16 max_time = '240.h' + // FASTQ preprocessing + fastp_clip_merge = false + fastp_exclude_unmerged = true } // Load base.config by default for all pipelines diff --git a/nextflow_schema.json b/nextflow_schema.json index 535e08d..cc43add 100644 --- a/nextflow_schema.json +++ b/nextflow_schema.json @@ -56,15 +56,6 @@ "fa_icon": "fas fa-book", "help_text": "If using a reference genome configured in the pipeline using iGenomes, use this parameter to give the ID for the reference. This is then used to build the full paths for all required reference genome files e.g. `--genome GRCh38`. \n\nSee the [nf-core website docs](https://nf-co.re/usage/reference_genomes) for more details." }, - "fasta": { - "type": "string", - "format": "file-path", - "mimetype": "text/plain", - "pattern": "^\\S+\\.fn?a(sta)?(\\.gz)?$", - "description": "Path to FASTA genome file.", - "help_text": "This parameter is *mandatory* if `--genome` is not specified. If you don't have a BWA index available this will be generated for you automatically. Combine with `--save_reference` to save BWA index for future runs.", - "fa_icon": "far fa-file-code" - }, "igenomes_base": { "type": "string", "format": "directory-path", diff --git a/subworkflows/local/input_check.nf b/subworkflows/local/input_check.nf index cddcbb3..241a8a7 100644 --- a/subworkflows/local/input_check.nf +++ b/subworkflows/local/input_check.nf @@ -9,22 +9,38 @@ workflow INPUT_CHECK { samplesheet // file: /path/to/samplesheet.csv main: - SAMPLESHEET_CHECK ( samplesheet ) + parsed_samplesheet = SAMPLESHEET_CHECK ( samplesheet ) .csv .splitCsv ( header:true, sep:',' ) + .dump(tag: "split_csv_out") + .branch { + fasta: it['fasta'] != '' + fastq: true + } + + parsed_samplesheet.fastq .map { create_fastq_channels(it) } - .set { reads } + .dump(tag: "fastq_channel_init") + .set { fastq } + + parsed_samplesheet.fasta + .map { create_fasta_channels(it) } + .dump(tag: "fasta_channel_init") + .set { fasta } emit: - reads // channel: [ val(meta), [ reads ] ] + fastq // channel: [ val(meta), [ reads ] ] + fasta // channel: [ val(meta), fasta ] versions = SAMPLESHEET_CHECK.out.versions // channel: [ versions.yml ] } // Function to get list of [ meta, [ fastq_1, fastq_2 ] ] def create_fastq_channels(LinkedHashMap row) { def meta = [:] - meta.id = row.sample - meta.single_end = row.single_end.toBoolean() + meta.id = row.sample + meta.run_accession = row.run_accession + meta.instrument_platform = row.instrument_platform + meta.single_end = row.single_end.toBoolean() def array = [] if (!file(row.fastq_1).exists()) { @@ -40,3 +56,20 @@ def create_fastq_channels(LinkedHashMap row) { } return array } + +// Function to get list of [ meta, fasta ] +def create_fasta_channels(LinkedHashMap row) { + def meta = [:] + meta.id = row.sample + meta.run_accession = row.run_accession + meta.instrument_platform = row.instrument_platform + meta.single_end = true + + def array = [] + if (!file(row.fasta).exists()) { + exit 1, "ERROR: Please check input samplesheet -> FastA file does not exist!\n${row.fasta}" + } + array = [ meta, [ file(row.fasta) ] ] + + return array +} diff --git a/workflows/taxprofiler.nf b/workflows/taxprofiler.nf index 56c532b..3f10a9d 100644 --- a/workflows/taxprofiler.nf +++ b/workflows/taxprofiler.nf @@ -11,7 +11,7 @@ WorkflowTaxprofiler.initialise(params, log) // TODO nf-core: Add all file path parameters for the pipeline to the list below // Check input path parameters to see if they exist -def checkPathParamList = [ params.input, params.multiqc_config, params.fasta ] +def checkPathParamList = [ params.input, params.multiqc_config ] for (param in checkPathParamList) { if (param) { file(param, checkIfExists: true) } } // Check mandatory parameters @@ -50,6 +50,11 @@ include { FASTQC } from '../modules/nf-core/modules/fastqc/ include { MULTIQC } from '../modules/nf-core/modules/multiqc/main' include { CUSTOM_DUMPSOFTWAREVERSIONS } from '../modules/nf-core/modules/custom/dumpsoftwareversions/main' +include { FASTP as FASTP_SINGLE } from '../modules/nf-core/modules/fastp/main' +include { FASTP as FASTP_PAIRED } from '../modules/nf-core/modules/fastp/main' +include { FASTQC as FASTQC_POST } from '../modules/nf-core/modules/fastqc/main' +include { CAT_FASTQ } from '../modules/nf-core/modules/cat/fastq/main' + /* ======================================================================================== RUN MAIN WORKFLOW @@ -75,7 +80,7 @@ workflow TAXPROFILER { // MODULE: Run FastQC // FASTQC ( - INPUT_CHECK.out.reads + INPUT_CHECK.out.fastq ) ch_versions = ch_versions.mix(FASTQC.out.versions.first()) @@ -83,6 +88,71 @@ workflow TAXPROFILER { ch_versions.unique().collectFile(name: 'collated_versions.yml') ) + // + // MODULE: Run Clip/Merge/Complexity + // + // TODO give option to clip only and retain pairs + // TODO give option to retain singletons (probably fastp option likely) + // TODO move to subworkflow + if ( params.fastp_clip_merge ) { + + ch_input_for_fastp = INPUT_CHECK.out.fastq + .dump(tag: "pre-fastp_branch") + .branch{ + single: it[0]['single_end'] == true + paired: it[0]['single_end'] == false + } + + ch_input_for_fastp.single.dump(tag: "input_fastp_single") + ch_input_for_fastp.paired.dump(tag: "input_fastp_paired") + + FASTP_SINGLE ( ch_input_for_fastp.single, false, false ) + FASTP_PAIRED ( ch_input_for_fastp.paired, false, true ) + + ch_fastp_reads_prepped = FASTP_PAIRED.out.reads_merged + .mix( FASTP_SINGLE.out.reads ) + .map { + meta, reads -> + def meta_new = meta.clone() + meta_new['single_end'] = 1 + [ meta_new, reads ] + } + + FASTQC_POST ( ch_fastp_reads_prepped ) + + ch_versions = ch_versions.mix(FASTP_SINGLE.out.versions.first()) + ch_versions = ch_versions.mix(FASTP_PAIRED.out.versions.first()) + + ch_processed_reads = ch_fastp_reads_prepped + + } else { + ch_processed_reads = INPUT_CHECK.out.fastq + } + + + // MODULE: Cat merge runs of same sample + ch_processed_for_combine = ch_processed_reads + .dump(tag: "prep_for_combine_grouping") + .map { + meta, reads -> + def meta_new = meta.clone() + meta_new['run_accession'] = 'combined' + [ meta_new, reads ] + } + .groupTuple ( by: 0 ) + .branch{ + combine: it[1].size() >= 2 + skip: it[1].size() < 2 + } + + CAT_FASTQ ( ch_processed_for_combine.combine ) + + // Ready for profiling! + ch_reads_for_profiling = ch_processed_for_combine.skip + .dump(tag: "skip_combine") + .mix( CAT_FASTQ.out.reads ) + .dump(tag: "files_for_profiling") + // // MODULE: MultiQC // @@ -95,6 +165,12 @@ workflow TAXPROFILER { ch_multiqc_files = ch_multiqc_files.mix(ch_workflow_summary.collectFile(name: 'workflow_summary_mqc.yaml')) ch_multiqc_files = ch_multiqc_files.mix(CUSTOM_DUMPSOFTWAREVERSIONS.out.mqc_yml.collect()) ch_multiqc_files = ch_multiqc_files.mix(FASTQC.out.zip.collect{it[1]}.ifEmpty([])) + if (params.fastp_clip_merge) { + ch_multiqc_files = ch_multiqc_files.mix(FASTP_SINGLE.out.json.collect{it[1]}.ifEmpty([])) + ch_multiqc_files = ch_multiqc_files.mix(FASTP_PAIRED.out.json.collect{it[1]}.ifEmpty([])) + ch_multiqc_files = ch_multiqc_files.mix(FASTQC_POST.out.zip.collect{it[1]}.ifEmpty([])) + } + MULTIQC ( ch_multiqc_files.collect()