1
0
Fork 0
mirror of https://github.com/MillironX/taxprofiler.git synced 2024-09-21 05:12:05 +00:00

Merge branch 'dev' into nf-core-template-merge-2.3

This commit is contained in:
James A. Fellows Yates 2022-03-17 13:11:34 +01:00 committed by GitHub
commit ac77676d2b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
21 changed files with 1053 additions and 255 deletions

View file

@ -17,7 +17,7 @@
## Introduction ## Introduction
<!-- TODO nf-core: Write a 1-2 sentence summary of what data the pipeline is for and what it does --> <!-- TODO nf-core: Write a 1-2 sentence summary of what data the pipeline is for and what it does -->
**nf-core/taxprofiler** is a bioinformatics best-practice analysis pipeline for Taxonomic profiling of shotgun metagenomic data. **nf-core/taxprofiler** is a bioinformatics best-practice analysis pipeline for taxonomic profiling of shotgun metagenomic data. It allows for in-parallel profiling against multiple profiling tools and databases and produces standardised output tables.
The pipeline is built using [Nextflow](https://www.nextflow.io), a workflow tool to run tasks across multiple compute infrastructures in a very portable manner. It uses Docker/Singularity containers making installation trivial and results highly reproducible. The [Nextflow DSL2](https://www.nextflow.io/docs/latest/dsl2.html) implementation of this pipeline uses one container per process which makes it much easier to maintain and update software dependencies. Where possible, these processes have been submitted to and installed from [nf-core/modules](https://github.com/nf-core/modules) in order to make them available to all nf-core pipelines, and to everyone within the Nextflow community! The pipeline is built using [Nextflow](https://www.nextflow.io), a workflow tool to run tasks across multiple compute infrastructures in a very portable manner. It uses Docker/Singularity containers making installation trivial and results highly reproducible. The [Nextflow DSL2](https://www.nextflow.io/docs/latest/dsl2.html) implementation of this pipeline uses one container per process which makes it much easier to maintain and update software dependencies. Where possible, these processes have been submitted to and installed from [nf-core/modules](https://github.com/nf-core/modules) in order to make them available to all nf-core pipelines, and to everyone within the Nextflow community!
@ -29,7 +29,23 @@ On release, automated continuous integration tests run the pipeline on a full-si
<!-- TODO nf-core: Fill in short bullet-pointed list of the default steps in the pipeline --> <!-- TODO nf-core: Fill in short bullet-pointed list of the default steps in the pipeline -->
1. Read QC ([`FastQC`](https://www.bioinformatics.babraham.ac.uk/projects/fastqc/)) 1. Read QC ([`FastQC`](https://www.bioinformatics.babraham.ac.uk/projects/fastqc/))
2. Present QC for raw reads ([`MultiQC`](http://multiqc.info/)) 2. Performs optional read pre-processing
- Adapter clipping and merging
- Low complexity filtering
- Host read removal
- Run merging
3. Performs taxonomic profiling a choice of:
- Kraken2
- MetaPhlAn3
- MALT
- DIAMOND
- Centrifuge
- Kaiju
- mOTUs
4. Perform optional post-processing with:
- bracken
5. Standardises output tables
6. Present QC for raw reads ([`MultiQC`](http://multiqc.info/))
## Quick Start ## Quick Start

View file

@ -1,249 +1,227 @@
#!/usr/bin/env python #!/usr/bin/env python
from distutils import extension
"""Provide a command line tool to validate and transform tabular samplesheets.""" import os
import argparse
import csv
import logging
import sys import sys
from collections import Counter import errno
from pathlib import Path import argparse
logger = logging.getLogger() def parse_args(args=None):
Description = (
"Reformat nf-core/taxprofiler samplesheet file and check its contents."
class RowChecker:
"""
Define a service that can validate and transform each given row.
Attributes:
modified (list): A list of dicts, where each dict corresponds to a previously
validated and transformed row. The order of rows is maintained.
"""
VALID_FORMATS = (
".fq.gz",
".fastq.gz",
) )
Epilog = "Example usage: python check_samplesheet.py <FILE_IN> <FILE_OUT>"
def __init__( parser = argparse.ArgumentParser(description=Description, epilog=Epilog)
self, parser.add_argument("FILE_IN", help="Input samplesheet file.")
sample_col="sample", parser.add_argument("FILE_OUT", help="Output file.")
first_col="fastq_1", return parser.parse_args(args)
second_col="fastq_2",
single_col="single_end",
**kwargs,
):
"""
Initialize the row checker with the expected column names.
Args:
sample_col (str): The name of the column that contains the sample name
(default "sample").
first_col (str): The name of the column that contains the first (or only)
FASTQ file path (default "fastq_1").
second_col (str): The name of the column that contains the second (if any)
FASTQ file path (default "fastq_2").
single_col (str): The name of the new column that will be inserted and
records whether the sample contains single- or paired-end sequencing
reads (default "single_end").
""" def make_dir(path):
super().__init__(**kwargs) if len(path) > 0:
self._sample_col = sample_col try:
self._first_col = first_col os.makedirs(path)
self._second_col = second_col except OSError as exception:
self._single_col = single_col if exception.errno != errno.EEXIST:
self._seen = set() raise exception
self.modified = []
def validate_and_transform(self, row):
"""
Perform all validations on the given row and insert the read pairing status.
Args: def print_error(error, context="Line", context_str=""):
row (dict): A mapping from column headers (keys) to elements of that row error_str = "ERROR: Please check samplesheet -> {}".format(error)
(values). if context != "" and context_str != "":
error_str = "ERROR: Please check samplesheet -> {}\n{}: '{}'".format(
""" error, context.strip(), context_str.strip()
self._validate_sample(row)
self._validate_first(row)
self._validate_second(row)
self._validate_pair(row)
self._seen.add((row[self._sample_col], row[self._first_col]))
self.modified.append(row)
def _validate_sample(self, row):
"""Assert that the sample name exists and convert spaces to underscores."""
assert len(row[self._sample_col]) > 0, "Sample input is required."
# Sanitize samples slightly.
row[self._sample_col] = row[self._sample_col].replace(" ", "_")
def _validate_first(self, row):
"""Assert that the first FASTQ entry is non-empty and has the right format."""
assert len(row[self._first_col]) > 0, "At least the first FASTQ file is required."
self._validate_fastq_format(row[self._first_col])
def _validate_second(self, row):
"""Assert that the second FASTQ entry has the right format if it exists."""
if len(row[self._second_col]) > 0:
self._validate_fastq_format(row[self._second_col])
def _validate_pair(self, row):
"""Assert that read pairs have the same file extension. Report pair status."""
if row[self._first_col] and row[self._second_col]:
row[self._single_col] = False
assert (
Path(row[self._first_col]).suffixes == Path(row[self._second_col]).suffixes
), "FASTQ pairs must have the same file extensions."
else:
row[self._single_col] = True
def _validate_fastq_format(self, filename):
"""Assert that a given filename has one of the expected FASTQ extensions."""
assert any(filename.endswith(extension) for extension in self.VALID_FORMATS), (
f"The FASTQ file has an unrecognized extension: {filename}\n"
f"It should be one of: {', '.join(self.VALID_FORMATS)}"
) )
print(error_str)
def validate_unique_samples(self): sys.exit(1)
"""
Assert that the combination of sample name and FASTQ filename is unique.
In addition to the validation, also rename the sample if more than one sample,
FASTQ file combination exists.
"""
assert len(self._seen) == len(self.modified), "The pair of sample name and FASTQ must be unique."
if len({pair[0] for pair in self._seen}) < len(self._seen):
counts = Counter(pair[0] for pair in self._seen)
seen = Counter()
for row in self.modified:
sample = row[self._sample_col]
seen[sample] += 1
if counts[sample] > 1:
row[self._sample_col] = f"{sample}_T{seen[sample]}"
def sniff_format(handle):
"""
Detect the tabular format.
Args:
handle (text file): A handle to a `text file`_ object. The read position is
expected to be at the beginning (index 0).
Returns:
csv.Dialect: The detected tabular format.
.. _text file:
https://docs.python.org/3/glossary.html#term-text-file
"""
peek = handle.read(2048)
sniffer = csv.Sniffer()
if not sniffer.has_header(peek):
logger.critical(f"The given sample sheet does not appear to contain a header.")
sys.exit(1)
dialect = sniffer.sniff(peek)
handle.seek(0)
return dialect
def check_samplesheet(file_in, file_out): def check_samplesheet(file_in, file_out):
""" """
Check that the tabular samplesheet has the structure expected by nf-core pipelines. This function checks that the samplesheet follows the following structure:
Validate the general shape of the table, expected columns, and each row. Also add
an additional column which records whether one or two FASTQ reads were found.
Args:
file_in (pathlib.Path): The given tabular samplesheet. The format can be either
CSV, TSV, or any other format automatically recognized by ``csv.Sniffer``.
file_out (pathlib.Path): Where the validated and transformed samplesheet should
be created; always in CSV format.
Example:
This function checks that the samplesheet follows the following structure,
see also the `viral recon samplesheet`_::
sample,fastq_1,fastq_2
SAMPLE_PE,SAMPLE_PE_RUN1_1.fastq.gz,SAMPLE_PE_RUN1_2.fastq.gz
SAMPLE_PE,SAMPLE_PE_RUN2_1.fastq.gz,SAMPLE_PE_RUN2_2.fastq.gz
SAMPLE_SE,SAMPLE_SE_RUN1_1.fastq.gz,
.. _viral recon samplesheet:
https://raw.githubusercontent.com/nf-core/test-datasets/viralrecon/samplesheet/samplesheet_test_illumina_amplicon.csv
sample,run_accession,instrument_platform,fastq_1,fastq_2,fasta
2611,ERR5766174,ILLUMINA,,,ERX5474930_ERR5766174_1.fa.gz
2612,ERR5766176,ILLUMINA,ERX5474932_ERR5766176_1.fastq.gz,ERX5474932_ERR5766176_2.fastq.gz,
2612,ERR5766174,ILLUMINA,ERX5474936_ERR5766180_1.fastq.gz,,
2613,ERR5766181,ILLUMINA,ERX5474937_ERR5766181_1.fastq.gz,ERX5474937_ERR5766181_2.fastq.gz,
""" """
required_columns = {"sample", "fastq_1", "fastq_2"}
# See https://docs.python.org/3.9/library/csv.html#id3 to read up on `newline=""`. FQ_EXTENSIONS = (".fq", ".fq.gz", ".fastq", ".fastq.gz")
with file_in.open(newline="") as in_handle: FA_EXTENSIONS = (
reader = csv.DictReader(in_handle, dialect=sniff_format(in_handle)) ".fa",
# Validate the existence of the expected header columns. ".fa.gz",
if not required_columns.issubset(reader.fieldnames): ".fasta",
logger.critical(f"The sample sheet **must** contain the column headers: {', '.join(required_columns)}.") ".fasta.gz",
".fna",
".fna.gz",
".fas",
".fas.gz",
)
INSTRUMENT_PLATFORMS = [
"ABI_SOLID",
"BGISEQ",
"CAPILLARY",
"COMPLETE_GENOMICS",
"DNBSEQ",
"HELICOS",
"ILLUMINA",
"ION_TORRENT",
"LS454",
"OXFORD_NANOPORE",
"PACBIO_SMRT",
]
sample_mapping_dict = {}
with open(file_in, "r") as fin:
## Check header
MIN_COLS = 4
HEADER = [
"sample",
"run_accession",
"instrument_platform",
"fastq_1",
"fastq_2",
"fasta",
]
header = [x.strip('"') for x in fin.readline().strip().split(",")]
if header[: len(HEADER)] != HEADER:
print(
"ERROR: Please check samplesheet header -> {} != {}".format(
",".join(header), ",".join(HEADER)
)
)
sys.exit(1) sys.exit(1)
# Validate each row.
checker = RowChecker() ## Check sample entries
for i, row in enumerate(reader): for line in fin:
try: lspl = [x.strip().strip('"') for x in line.strip().split(",")]
checker.validate_and_transform(row)
except AssertionError as error: # Check valid number of columns per row
logger.critical(f"{str(error)} On line {i + 2}.") if len(lspl) < len(HEADER):
sys.exit(1) print_error(
checker.validate_unique_samples() "Invalid number of columns (minimum = {})!".format(len(HEADER)),
header = list(reader.fieldnames) "Line",
header.insert(1, "single_end") line,
# See https://docs.python.org/3.9/library/csv.html#id3 to read up on `newline=""`. )
with file_out.open(mode="w", newline="") as out_handle: num_cols = len([x for x in lspl if x])
writer = csv.DictWriter(out_handle, header, delimiter=",") if num_cols < MIN_COLS:
writer.writeheader() print_error(
for row in checker.modified: "Invalid number of populated columns (minimum = {})!".format(
writer.writerow(row) MIN_COLS
),
"Line",
line,
)
## Check sample name entries
(
sample,
run_accession,
instrument_platform,
fastq_1,
fastq_2,
fasta,
) = lspl[: len(HEADER)]
sample = sample.replace(" ", "_")
if not sample:
print_error("Sample entry has not been specified!", "Line", line)
## Check FastQ file extension
for fastq in [fastq_1, fastq_2]:
if fastq:
if fastq.find(" ") != -1:
print_error("FastQ file contains spaces!", "Line", line)
if not fastq.endswith(FQ_EXTENSIONS):
print_error(
f"FastQ file does not have extension {' or '.join(list(FQ_EXTENSIONS))} !",
"Line",
line,
)
if fasta:
if fasta.find(" ") != -1:
print_error("FastA file contains spaces!", "Line", line)
if not fasta.endswith(FA_EXTENSIONS):
print_error(
f"FastA file does not have extension {' or '.join(list(FA_EXTENSIONS))}!",
"Line",
line,
)
sample_info = []
# Check run_accession
if not run_accession:
print_error("Run accession has not been specified!", "Line", line)
else:
sample_info.append(run_accession)
# Check instrument_platform
if not instrument_platform:
print_error("Instrument platform has not been specified!", "Line", line)
else:
if instrument_platform not in INSTRUMENT_PLATFORMS:
print_error(
f"Instrument platform {instrument_platform} is not supported!",
f"List of supported platforms {', '.join(INSTRUMENT_PLATFORMS)}",
"Line",
line,
)
sample_info.append(instrument_platform)
## Auto-detect paired-end/single-end
if sample and fastq_1 and fastq_2: ## Paired-end short reads
sample_info.extend(["0", fastq_1, fastq_2, fasta])
elif sample and fastq_1 and not fastq_2: ## Single-end short reads
sample_info.extend(["1", fastq_1, fastq_2, fasta])
elif (
sample and fasta and not fastq_1 and not fastq_2
): ## Single-end long reads
sample_info.extend(["1", fastq_1, fastq_2, fasta])
elif fasta and (fastq_1 or fastq_2):
print_error(
"FastQ and FastA files cannot be specified together in the same library!",
"Line",
line,
)
else:
print_error("Invalid combination of columns provided!", "Line", line)
## Create sample mapping dictionary = { sample: [ run_accession, instrument_platform, single_end, fastq_1, fastq_2 , fasta ] }
if sample not in sample_mapping_dict:
sample_mapping_dict[sample] = [sample_info]
else:
if sample_info in sample_mapping_dict[sample]:
print_error("Samplesheet contains duplicate rows!", "Line", line)
else:
sample_mapping_dict[sample].append(sample_info)
## Write validated samplesheet with appropriate columns
HEADER_OUT = [
"sample",
"run_accession",
"instrument_platform",
"single_end",
"fastq_1",
"fastq_2",
"fasta",
]
if len(sample_mapping_dict) > 0:
out_dir = os.path.dirname(file_out)
make_dir(out_dir)
with open(file_out, "w") as fout:
fout.write(",".join(HEADER_OUT) + "\n")
for sample in sorted(sample_mapping_dict.keys()):
for idx, val in enumerate(sample_mapping_dict[sample]):
fout.write(f"{sample},{','.join(val)}\n")
else:
print_error("No entries to process!", "Samplesheet: {}".format(file_in))
def parse_args(argv=None): def main(args=None):
"""Define and immediately parse command line arguments.""" args = parse_args(args)
parser = argparse.ArgumentParser( check_samplesheet(args.FILE_IN, args.FILE_OUT)
description="Validate and transform a tabular samplesheet.",
epilog="Example: python check_samplesheet.py samplesheet.csv samplesheet.valid.csv",
)
parser.add_argument(
"file_in",
metavar="FILE_IN",
type=Path,
help="Tabular input samplesheet in CSV or TSV format.",
)
parser.add_argument(
"file_out",
metavar="FILE_OUT",
type=Path,
help="Transformed output samplesheet in CSV format.",
)
parser.add_argument(
"-l",
"--log-level",
help="The desired log level (default WARNING).",
choices=("CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG"),
default="WARNING",
)
return parser.parse_args(argv)
def main(argv=None):
"""Coordinate argument parsing and program execution."""
args = parse_args(argv)
logging.basicConfig(level=args.log_level, format="[%(levelname)s] %(message)s")
if not args.file_in.is_file():
logger.error(f"The given input file {args.file_in} was not found!")
sys.exit(2)
args.file_out.parent.mkdir(parents=True, exist_ok=True)
check_samplesheet(args.file_in, args.file_out)
if __name__ == "__main__": if __name__ == "__main__":

View file

@ -28,6 +28,65 @@ process {
withName: FASTQC { withName: FASTQC {
ext.args = '--quiet' ext.args = '--quiet'
ext.prefix = { "${meta.id}_${meta.run_accession}_raw" }
publishDir = [
path: { "${params.outdir}/fastqc/raw" },
mode: 'copy',
pattern: '*.html'
]
}
withName: FASTP {
ext.prefix = { "${meta.id}_${meta.run_accession}" }
// TODO also include option to NOT merge
ext.args = [
{ ${meta.single_end} } == 0 ? "-m" : '',
params.fastp_exclude_unmerged ? '' : "--include_unmerged"
].join(' ').trim()
publishDir = [
path: { "${params.outdir}/fastp" },
mode: 'copy',
pattern: '*.fastq.gz'
]
}
withName: FASTQC_POST {
ext.args = '--quiet'
ext.prefix = { "${meta.id}_${meta.run_accession}_processed" }
publishDir = [
path: { "${params.outdir}/fastqc/processed" },
mode: 'copy',
pattern: '*.html'
]
}
withName: CAT_FASTQ {
publishDir = [
path: { "${params.outdir}/prepared_sequences" },
mode: 'copy',
pattern: '*.fastq.gz'
]
}
withName: MALT_RUN {
publishDir = [
path: { "${params.outdir}/malt/${meta.db_name}" },
mode: 'copy',
pattern: '*.{rma6,tab,text,sam,log}'
]
ext.args = { "${meta.db_params}" }
ext.when = params.run_malt
}
withName: KRAKEN2_KRAKEN2 {
publishDir = [
path: { "${params.outdir}/kraken2/${meta.db_name}" },
mode: 'copy',
pattern: '.{fastq.gz,txt}'
]
ext.args = { "${meta.db_params}" }
ext.when = params.run_kraken2
ext.prefix = { "${meta.id}-${meta.db_name}" }
} }
withName: CUSTOM_DUMPSOFTWAREVERSIONS { withName: CUSTOM_DUMPSOFTWAREVERSIONS {

View file

@ -22,8 +22,6 @@ params {
// Input data // Input data
// TODO nf-core: Specify the paths to your test data on nf-core/test-datasets // TODO nf-core: Specify the paths to your test data on nf-core/test-datasets
// TODO nf-core: Give any required params for the test so that command line flags are not needed // TODO nf-core: Give any required params for the test so that command line flags are not needed
input = 'https://raw.githubusercontent.com/nf-core/test-datasets/viralrecon/samplesheet/samplesheet_test_illumina_amplicon.csv' input = 'https://raw.githubusercontent.com/nf-core/test-datasets/taxprofiler/samplesheet.csv'
// Genome references
genome = 'R64-1-1'
} }

View file

@ -10,10 +10,11 @@ class WorkflowTaxprofiler {
public static void initialise(params, log) { public static void initialise(params, log) {
genomeExistsError(params, log) genomeExistsError(params, log)
if (!params.fasta) { // TODO update as necessary
log.error "Genome fasta file not specified with e.g. '--fasta genome.fa' or via a detectable config file." //if (!params.fasta) {
System.exit(1) // log.error "Genome fasta file not specified with e.g. '--fasta genome.fa' or via a detectable config file."
} // System.exit(1)
//}
} }
// //

View file

@ -3,12 +3,24 @@
"homePage": "https://github.com/nf-core/taxprofiler", "homePage": "https://github.com/nf-core/taxprofiler",
"repos": { "repos": {
"nf-core/modules": { "nf-core/modules": {
"cat/fastq": {
"git_sha": "e745e167c1020928ef20ea1397b6b4d230681b4d"
},
"custom/dumpsoftwareversions": { "custom/dumpsoftwareversions": {
"git_sha": "20d8250d9f39ddb05dfb437603aaf99b5c0b2b41" "git_sha": "20d8250d9f39ddb05dfb437603aaf99b5c0b2b41"
}, },
"fastp": {
"git_sha": "d0a1cbb703a130c19f6796c3fce24fbe7dfce789"
},
"fastqc": { "fastqc": {
"git_sha": "9d0cad583b9a71a6509b754fdf589cbfbed08961" "git_sha": "9d0cad583b9a71a6509b754fdf589cbfbed08961"
}, },
"kraken2/kraken2": {
"git_sha": "e745e167c1020928ef20ea1397b6b4d230681b4d"
},
"malt/run": {
"git_sha": "72b96f4e504eef673f2b5c13560a9d90b669129b"
},
"multiqc": { "multiqc": {
"git_sha": "20d8250d9f39ddb05dfb437603aaf99b5c0b2b41" "git_sha": "20d8250d9f39ddb05dfb437603aaf99b5c0b2b41"
} }

View file

@ -0,0 +1,25 @@
process DATABASE_CHECK {
tag "$databasesheet"
conda (params.enable_conda ? "conda-forge::python=3.8.3" : null)
container "${ workflow.containerEngine == 'singularity' && !task.ext.singularity_pull_docker_container ?
'https://depot.galaxyproject.org/singularity/python:3.8.3' :
'quay.io/biocontainers/python:3.8.3' }"
input:
path databasesheet
output:
path '*.csv' , emit: csv
path "versions.yml", emit: versions
script: // This script is bundled with the pipeline, in nf-core/taxprofiler/bin/
"""
cat $databasesheet >> database_sheet.valid.csv
cat <<-END_VERSIONS > versions.yml
"${task.process}":
python: \$(python --version | sed 's/Python //g')
END_VERSIONS
"""
}

View file

@ -0,0 +1,51 @@
process CAT_FASTQ {
tag "$meta.id"
label 'process_low'
conda (params.enable_conda ? "conda-forge::sed=4.7" : null)
container "${ workflow.containerEngine == 'singularity' && !task.ext.singularity_pull_docker_container ?
'https://containers.biocontainers.pro/s3/SingImgsRepo/biocontainers/v1.2.0_cv1/biocontainers_v1.2.0_cv1.img' :
'biocontainers/biocontainers:v1.2.0_cv1' }"
input:
tuple val(meta), path(reads, stageAs: "input*/*")
output:
tuple val(meta), path("*.merged.fastq.gz"), emit: reads
path "versions.yml" , emit: versions
when:
task.ext.when == null || task.ext.when
script:
def args = task.ext.args ?: ''
def prefix = task.ext.prefix ?: "${meta.id}"
def readList = reads.collect{ it.toString() }
if (meta.single_end) {
if (readList.size > 1) {
"""
cat ${readList.join(' ')} > ${prefix}.merged.fastq.gz
cat <<-END_VERSIONS > versions.yml
"${task.process}":
cat: \$(echo \$(cat --version 2>&1) | sed 's/^.*coreutils) //; s/ .*\$//')
END_VERSIONS
"""
}
} else {
if (readList.size > 2) {
def read1 = []
def read2 = []
readList.eachWithIndex{ v, ix -> ( ix & 1 ? read2 : read1 ) << v }
"""
cat ${read1.join(' ')} > ${prefix}_1.merged.fastq.gz
cat ${read2.join(' ')} > ${prefix}_2.merged.fastq.gz
cat <<-END_VERSIONS > versions.yml
"${task.process}":
cat: \$(echo \$(cat --version 2>&1) | sed 's/^.*coreutils) //; s/ .*\$//')
END_VERSIONS
"""
}
}
}

View file

@ -0,0 +1,39 @@
name: cat_fastq
description: Concatenates fastq files
keywords:
- fastq
- concatenate
tools:
- cat:
description: |
The cat utility reads files sequentially, writing them to the standard output.
documentation: https://www.gnu.org/software/coreutils/manual/html_node/cat-invocation.html
licence: ["GPL-3.0-or-later"]
input:
- meta:
type: map
description: |
Groovy Map containing sample information
e.g. [ id:'test', single_end:false ]
- reads:
type: list
description: |
List of input FastQ files to be concatenated.
output:
- meta:
type: map
description: |
Groovy Map containing sample information
e.g. [ id:'test', single_end:false ]
- reads:
type: file
description: Merged fastq file
pattern: "*.{merged.fastq.gz}"
- versions:
type: file
description: File containing software versions
pattern: "versions.yml"
authors:
- "@joseespinosa"
- "@drpatelh"

75
modules/nf-core/modules/fastp/main.nf generated Normal file
View file

@ -0,0 +1,75 @@
process FASTP {
tag "$meta.id"
label 'process_medium'
conda (params.enable_conda ? 'bioconda::fastp=0.23.2' : null)
container "${ workflow.containerEngine == 'singularity' && !task.ext.singularity_pull_docker_container ?
'https://depot.galaxyproject.org/singularity/fastp:0.23.2--h79da9fb_0' :
'quay.io/biocontainers/fastp:0.23.2--h79da9fb_0' }"
input:
tuple val(meta), path(reads)
val save_trimmed_fail
val save_merged
output:
tuple val(meta), path('*.trim.fastq.gz') , optional:true, emit: reads
tuple val(meta), path('*.json') , emit: json
tuple val(meta), path('*.html') , emit: html
tuple val(meta), path('*.log') , emit: log
path "versions.yml" , emit: versions
tuple val(meta), path('*.fail.fastq.gz') , optional:true, emit: reads_fail
tuple val(meta), path('*.merged.fastq.gz'), optional:true, emit: reads_merged
when:
task.ext.when == null || task.ext.when
script:
def args = task.ext.args ?: ''
// Added soft-links to original fastqs for consistent naming in MultiQC
def prefix = task.ext.prefix ?: "${meta.id}"
if (meta.single_end) {
def fail_fastq = save_trimmed_fail ? "--failed_out ${prefix}.fail.fastq.gz" : ''
"""
[ ! -f ${prefix}.fastq.gz ] && ln -s $reads ${prefix}.fastq.gz
fastp \\
--in1 ${prefix}.fastq.gz \\
--out1 ${prefix}.trim.fastq.gz \\
--thread $task.cpus \\
--json ${prefix}.fastp.json \\
--html ${prefix}.fastp.html \\
$fail_fastq \\
$args \\
2> ${prefix}.fastp.log
cat <<-END_VERSIONS > versions.yml
"${task.process}":
fastp: \$(fastp --version 2>&1 | sed -e "s/fastp //g")
END_VERSIONS
"""
} else {
def fail_fastq = save_trimmed_fail ? "--unpaired1 ${prefix}_1.fail.fastq.gz --unpaired2 ${prefix}_2.fail.fastq.gz" : ''
def merge_fastq = save_merged ? "-m --merged_out ${prefix}.merged.fastq.gz" : ''
"""
[ ! -f ${prefix}_1.fastq.gz ] && ln -s ${reads[0]} ${prefix}_1.fastq.gz
[ ! -f ${prefix}_2.fastq.gz ] && ln -s ${reads[1]} ${prefix}_2.fastq.gz
fastp \\
--in1 ${prefix}_1.fastq.gz \\
--in2 ${prefix}_2.fastq.gz \\
--out1 ${prefix}_1.trim.fastq.gz \\
--out2 ${prefix}_2.trim.fastq.gz \\
--json ${prefix}.fastp.json \\
--html ${prefix}.fastp.html \\
$fail_fastq \\
$merge_fastq \\
--thread $task.cpus \\
--detect_adapter_for_pe \\
$args \\
2> ${prefix}.fastp.log
cat <<-END_VERSIONS > versions.yml
"${task.process}":
fastp: \$(fastp --version 2>&1 | sed -e "s/fastp //g")
END_VERSIONS
"""
}
}

68
modules/nf-core/modules/fastp/meta.yml generated Normal file
View file

@ -0,0 +1,68 @@
name: fastp
description: Perform adapter/quality trimming on sequencing reads
keywords:
- trimming
- quality control
- fastq
tools:
- fastp:
description: |
A tool designed to provide fast all-in-one preprocessing for FastQ files. This tool is developed in C++ with multithreading supported to afford high performance.
documentation: https://github.com/OpenGene/fastp
doi: https://doi.org/10.1093/bioinformatics/bty560
licence: ["MIT"]
input:
- meta:
type: map
description: |
Groovy Map containing sample information
e.g. [ id:'test', single_end:false ]
- reads:
type: file
description: |
List of input FastQ files of size 1 and 2 for single-end and paired-end data,
respectively.
- save_trimmed_fail:
type: boolean
description: Specify true to save files that failed to pass trimming thresholds ending in `*.fail.fastq.gz`
- save_merged:
type: boolean
description: Specify true to save all merged reads to the a file ending in `*.merged.fastq.gz`
output:
- meta:
type: map
description: |
Groovy Map containing sample information
e.g. [ id:'test', single_end:false ]
- reads:
type: file
description: The trimmed/modified/unmerged fastq reads
pattern: "*trim.fastq.gz"
- json:
type: file
description: Results in JSON format
pattern: "*.json"
- html:
type: file
description: Results in HTML format
pattern: "*.html"
- log:
type: file
description: fastq log file
pattern: "*.log"
- versions:
type: file
description: File containing software versions
pattern: "versions.yml"
- reads_fail:
type: file
description: Reads the failed the preprocessing
pattern: "*fail.fastq.gz"
- reads_merged:
type: file
description: Reads that were successfully merged
pattern: "*.{merged.fastq.gz}"
authors:
- "@drpatelh"
- "@kevinmenden"

View file

@ -0,0 +1,49 @@
process KRAKEN2_KRAKEN2 {
tag "$meta.id"
label 'process_high'
conda (params.enable_conda ? 'bioconda::kraken2=2.1.2 conda-forge::pigz=2.6' : null)
container "${ workflow.containerEngine == 'singularity' && !task.ext.singularity_pull_docker_container ?
'https://depot.galaxyproject.org/singularity/mulled-v2-5799ab18b5fc681e75923b2450abaa969907ec98:87fc08d11968d081f3e8a37131c1f1f6715b6542-0' :
'quay.io/biocontainers/mulled-v2-5799ab18b5fc681e75923b2450abaa969907ec98:87fc08d11968d081f3e8a37131c1f1f6715b6542-0' }"
input:
tuple val(meta), path(reads)
path db
output:
tuple val(meta), path('*classified*') , emit: classified
tuple val(meta), path('*unclassified*'), emit: unclassified
tuple val(meta), path('*report.txt') , emit: txt
path "versions.yml" , emit: versions
when:
task.ext.when == null || task.ext.when
script:
def args = task.ext.args ?: ''
def prefix = task.ext.prefix ?: "${meta.id}"
def paired = meta.single_end ? "" : "--paired"
def classified = meta.single_end ? "${prefix}.classified.fastq" : "${prefix}.classified#.fastq"
def unclassified = meta.single_end ? "${prefix}.unclassified.fastq" : "${prefix}.unclassified#.fastq"
"""
kraken2 \\
--db $db \\
--threads $task.cpus \\
--unclassified-out $unclassified \\
--classified-out $classified \\
--report ${prefix}.kraken2.report.txt \\
--gzip-compressed \\
$paired \\
$args \\
$reads
pigz -p $task.cpus *.fastq
cat <<-END_VERSIONS > versions.yml
"${task.process}":
kraken2: \$(echo \$(kraken2 --version 2>&1) | sed 's/^.*Kraken version //; s/ .*\$//')
pigz: \$( pigz --version 2>&1 | sed 's/pigz //g' )
END_VERSIONS
"""
}

View file

@ -0,0 +1,60 @@
name: kraken2_kraken2
description: Classifies metagenomic sequence data
keywords:
- classify
- metagenomics
- fastq
- db
tools:
- kraken2:
description: |
Kraken2 is a taxonomic sequence classifier that assigns taxonomic labels to sequence reads
homepage: https://ccb.jhu.edu/software/kraken2/
documentation: https://github.com/DerrickWood/kraken2/wiki/Manual
doi: 10.1186/s13059-019-1891-0
licence: ["MIT"]
input:
- meta:
type: map
description: |
Groovy Map containing sample information
e.g. [ id:'test', single_end:false ]
- reads:
type: file
description: |
List of input FastQ files of size 1 and 2 for single-end and paired-end data,
respectively.
- db:
type: directory
description: Kraken2 database
output:
- meta:
type: map
description: |
Groovy Map containing sample information
e.g. [ id:'test', single_end:false ]
- classified:
type: file
description: |
Reads classified to belong to any of the taxa
on the Kraken2 database.
pattern: "*{fastq.gz}"
- unclassified:
type: file
description: |
Reads not classified to belong to any of the taxa
on the Kraken2 database.
pattern: "*{fastq.gz}"
- txt:
type: file
description: |
Kraken2 report containing stats about classified
and not classifed reads.
pattern: "*.{report.txt}"
- versions:
type: file
description: File containing software versions
pattern: "versions.yml"
authors:
- "@joseespinosa"
- "@drpatelh"

50
modules/nf-core/modules/malt/run/main.nf generated Normal file
View file

@ -0,0 +1,50 @@
process MALT_RUN {
tag "$meta.id"
label 'process_high'
conda (params.enable_conda ? "bioconda::malt=0.53" : null)
container "${ workflow.containerEngine == 'singularity' && !task.ext.singularity_pull_docker_container ?
'https://depot.galaxyproject.org/singularity/malt:0.53--hdfd78af_0' :
'quay.io/biocontainers/malt:0.53--hdfd78af_0' }"
input:
tuple val(meta), path(fastqs)
val mode
path index
output:
tuple val(meta), path("*.rma6") , emit: rma6
tuple val(meta), path("*.{tab,text,sam}"), optional:true, emit: alignments
tuple val(meta), path("*.log") , emit: log
path "versions.yml" , emit: versions
when:
task.ext.when == null || task.ext.when
script:
def args = task.ext.args ?: ''
def prefix = task.ext.prefix ?: "${meta.id}"
def avail_mem = 6
if (!task.memory) {
log.info '[MALT_RUN] Available memory not known - defaulting to 6GB. Specify process memory requirements to change this.'
} else {
avail_mem = task.memory.giga
}
"""
malt-run \\
-J-Xmx${avail_mem}g \\
-t $task.cpus \\
-v \\
-o . \\
$args \\
--inFile ${fastqs.join(' ')} \\
-m $mode \\
--index $index/ |&tee ${prefix}-malt-run.log
cat <<-END_VERSIONS > versions.yml
"${task.process}":
malt: \$(malt-run --help 2>&1 | grep -o 'version.* ' | cut -f 1 -d ',' | cut -f2 -d ' ')
END_VERSIONS
"""
}

View file

@ -0,0 +1,58 @@
name: malt_run
description: MALT, an acronym for MEGAN alignment tool, is a sequence alignment and analysis tool designed for processing high-throughput sequencing data, especially in the context of metagenomics.
keywords:
- malt
- alignment
- metagenomics
- ancient DNA
- aDNA
- palaeogenomics
- archaeogenomics
- microbiome
tools:
- malt:
description: A tool for mapping metagenomic data
homepage: https://www.wsi.uni-tuebingen.de/lehrstuehle/algorithms-in-bioinformatics/software/malt/
documentation: https://software-ab.informatik.uni-tuebingen.de/download/malt/manual.pdf
tool_dev_url: None
doi: "10.1038/s41559-017-0446-6"
licence: ["GPL v3"]
input:
- meta:
type: map
description: |
Groovy Map containing sample information
e.g. [ id:'test', single_end:false ]
- fastqs:
type: file
description: Input FASTQ files
pattern: "*.{fastq.gz,fq.gz}"
- mode:
type: string
description: Program mode
pattern: "Unknown|BlastN|BlastP|BlastX|Classifier"
- index:
type: directory
description: Index/database directory from malt-build
pattern: "*/"
output:
- versions:
type: file
description: File containing software versions
pattern: "versions.yml"
- rma6:
type: file
description: MEGAN6 RMA6 file
pattern: "*.rma6"
- sam:
type: file
description: Alignment files in Tab, Text or MEGAN-compatible SAM format
pattern: "*.{tab,txt,sam}"
- log:
type: file
description: Log of verbose MALT stdout
pattern: "*-malt-run.log"
authors:
- "@jfy133"

View file

@ -34,7 +34,7 @@ params {
help = false help = false
validate_params = true validate_params = true
show_hidden_params = false show_hidden_params = false
schema_ignore_params = 'genomes' schema_ignore_params = 'genomes,fasta'
enable_conda = false enable_conda = false
// Config options // Config options
@ -51,6 +51,19 @@ params {
max_cpus = 16 max_cpus = 16
max_time = '240.h' max_time = '240.h'
// Databaess
databases = null
// FASTQ preprocessing
fastp_clip_merge = false
fastp_exclude_unmerged = true
// MALT
run_malt = false
malt_mode = 'BlastN'
// kraken2
run_kraken2 = false
} }
// Load base.config by default for all pipelines // Load base.config by default for all pipelines

View file

@ -57,15 +57,6 @@
"fa_icon": "fas fa-book", "fa_icon": "fas fa-book",
"help_text": "If using a reference genome configured in the pipeline using iGenomes, use this parameter to give the ID for the reference. This is then used to build the full paths for all required reference genome files e.g. `--genome GRCh38`. \n\nSee the [nf-core website docs](https://nf-co.re/usage/reference_genomes) for more details." "help_text": "If using a reference genome configured in the pipeline using iGenomes, use this parameter to give the ID for the reference. This is then used to build the full paths for all required reference genome files e.g. `--genome GRCh38`. \n\nSee the [nf-core website docs](https://nf-co.re/usage/reference_genomes) for more details."
}, },
"fasta": {
"type": "string",
"format": "file-path",
"mimetype": "text/plain",
"pattern": "^\\S+\\.fn?a(sta)?(\\.gz)?$",
"description": "Path to FASTA genome file.",
"help_text": "This parameter is *mandatory* if `--genome` is not specified. If you don't have a BWA index available this will be generated for you automatically. Combine with `--save_reference` to save BWA index for future runs.",
"fa_icon": "far fa-file-code"
},
"igenomes_base": { "igenomes_base": {
"type": "string", "type": "string",
"format": "directory-path", "format": "directory-path",

View file

@ -0,0 +1,40 @@
//
// Check input samplesheet and get read channels
//
include { DATABASE_CHECK } from '../../modules/local/database_check'
workflow DB_CHECK {
take:
dbsheet // file: /path/to/dbsheet.csv
main:
// TODO: make database sheet check
parsed_samplesheet = DATABASE_CHECK ( dbsheet )
.csv
.splitCsv ( header:true, sep:',' )
.dump(tag: "db_split_csv_out")
.map { create_db_channels(it) }
.dump(tag: "db_channel_prepped")
.set{ dbs }
emit:
dbs // channel: [ val(meta), [ db ] ]
versions = DATABASE_CHECK.out.versions // channel: [ versions.yml ]
}
def create_db_channels(LinkedHashMap row) {
def meta = [:]
meta.tool = row.tool
meta.db_name = row.db_name
meta.db_params = row.db_params
def array = []
if (!file(row.db_path, type: 'dir').exists()) {
exit 1, "ERROR: Please check input samplesheet -> database could not be found!\n${row.db_path}"
}
array = [ meta, file(row.db_path) ]
return array
}

View file

@ -9,14 +9,28 @@ workflow INPUT_CHECK {
samplesheet // file: /path/to/samplesheet.csv samplesheet // file: /path/to/samplesheet.csv
main: main:
SAMPLESHEET_CHECK ( samplesheet ) parsed_samplesheet = SAMPLESHEET_CHECK ( samplesheet )
.csv .csv
.splitCsv ( header:true, sep:',' ) .splitCsv ( header:true, sep:',' )
.map { create_fastq_channel(it) } .dump(tag: "input_split_csv_out")
.set { reads } .branch {
fasta: it['fasta'] != ''
fastq: true
}
parsed_samplesheet.fastq
.map { create_fastq_channels(it) }
.dump(tag: "fastq_channel_init")
.set { fastq }
parsed_samplesheet.fasta
.map { create_fasta_channels(it) }
.dump(tag: "fasta_channel_init")
.set { fasta }
emit: emit:
reads // channel: [ val(meta), [ reads ] ] fastq // channel: [ val(meta), [ reads ] ]
fasta // channel: [ val(meta), fasta ]
versions = SAMPLESHEET_CHECK.out.versions // channel: [ versions.yml ] versions = SAMPLESHEET_CHECK.out.versions // channel: [ versions.yml ]
} }
@ -24,8 +38,10 @@ workflow INPUT_CHECK {
def create_fastq_channel(LinkedHashMap row) { def create_fastq_channel(LinkedHashMap row) {
// create meta map // create meta map
def meta = [:] def meta = [:]
meta.id = row.sample meta.id = row.sample
meta.single_end = row.single_end.toBoolean() meta.run_accession = row.run_accession
meta.instrument_platform = row.instrument_platform
meta.single_end = row.single_end.toBoolean()
// add path(s) of the fastq file(s) to the meta map // add path(s) of the fastq file(s) to the meta map
def fastq_meta = [] def fastq_meta = []
@ -42,3 +58,20 @@ def create_fastq_channel(LinkedHashMap row) {
} }
return fastq_meta return fastq_meta
} }
// Function to get list of [ meta, fasta ]
def create_fasta_channels(LinkedHashMap row) {
def meta = [:]
meta.id = row.sample
meta.run_accession = row.run_accession
meta.instrument_platform = row.instrument_platform
meta.single_end = true
def array = []
if (!file(row.fasta).exists()) {
exit 1, "ERROR: Please check input samplesheet -> FastA file does not exist!\n${row.fasta}"
}
array = [ meta, [ file(row.fasta) ] ]
return array
}

View file

@ -0,0 +1,73 @@
//
// Check input samplesheet and get read channels
//
include { FASTP as FASTP_SINGLE } from '../../modules/nf-core/modules/fastp/main'
include { FASTP as FASTP_PAIRED } from '../../modules/nf-core/modules/fastp/main'
include { FASTQC as FASTQC_POST } from '../../modules/nf-core/modules/fastqc/main'
workflow FASTQ_PREPROCESSING {
take:
reads // file: /path/to/samplesheet.csv
main:
ch_versions = Channel.empty()
ch_multiqc_files = Channel.empty()
//
// STEP: Read clipping and merging
//
// TODO give option to clip only and retain pairs
// TODO give option to retain singletons (probably fastp option likely)
// TODO move to subworkflow
if ( params.fastp_clip_merge ) {
ch_input_for_fastp = reads
.dump(tag: "pre-fastp_branch")
.branch{
single: it[0]['single_end'] == true
paired: it[0]['single_end'] == false
}
ch_input_for_fastp.single.dump(tag: "input_fastp_single")
ch_input_for_fastp.paired.dump(tag: "input_fastp_paired")
FASTP_SINGLE ( ch_input_for_fastp.single, false, false )
FASTP_PAIRED ( ch_input_for_fastp.paired, false, true )
ch_fastp_reads_prepped = FASTP_PAIRED.out.reads_merged
.mix( FASTP_SINGLE.out.reads )
.map {
meta, reads ->
def meta_new = meta.clone()
meta_new['single_end'] = 1
[ meta_new, reads ]
}
FASTQC_POST ( ch_fastp_reads_prepped )
ch_versions = ch_versions.mix(FASTP_SINGLE.out.versions.first())
ch_versions = ch_versions.mix(FASTP_PAIRED.out.versions.first())
ch_processed_reads = ch_fastp_reads_prepped
ch_multiqc_files = ch_multiqc_files.mix( FASTQC_POST.out.zip.collect{it[1]} )
ch_multiqc_files = ch_multiqc_files.mix( FASTP_SINGLE.out.json.collect{it[1]} )
ch_multiqc_files = ch_multiqc_files.mix( FASTP_PAIRED.out.json.collect{it[1]} )
ch_multiqc_files.dump(tag: "preprocessing_mqc_final")
} else {
ch_processed_reads = reads
}
emit:
reads = ch_processed_reads // channel: [ val(meta), [ reads ] ]
versions = ch_versions // channel: [ versions.yml ]
mqc = ch_multiqc_files
}

View file

@ -11,11 +11,12 @@ WorkflowTaxprofiler.initialise(params, log)
// TODO nf-core: Add all file path parameters for the pipeline to the list below // TODO nf-core: Add all file path parameters for the pipeline to the list below
// Check input path parameters to see if they exist // Check input path parameters to see if they exist
def checkPathParamList = [ params.input, params.multiqc_config, params.fasta ] def checkPathParamList = [ params.input, params.databases, params.multiqc_config ]
for (param in checkPathParamList) { if (param) { file(param, checkIfExists: true) } } for (param in checkPathParamList) { if (param) { file(param, checkIfExists: true) } }
// Check mandatory parameters // Check mandatory parameters
if (params.input) { ch_input = file(params.input) } else { exit 1, 'Input samplesheet not specified!' } if (params.input ) { ch_input = file(params.input) } else { exit 1, 'Input samplesheet not specified!' }
if (params.databases) { ch_databases = file(params.databases) } else { exit 1, 'Input database sheet not specified!' }
/* /*
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@ -35,7 +36,11 @@ ch_multiqc_custom_config = params.multiqc_config ? Channel.fromPath(params.multi
// //
// SUBWORKFLOW: Consisting of a mix of local and nf-core/modules // SUBWORKFLOW: Consisting of a mix of local and nf-core/modules
// //
include { INPUT_CHECK } from '../subworkflows/local/input_check' include { INPUT_CHECK } from '../subworkflows/local/input_check'
include { DB_CHECK } from '../subworkflows/local/db_check'
include { FASTQ_PREPROCESSING } from '../subworkflows/local/preprocessing'
/* /*
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@ -50,6 +55,11 @@ include { FASTQC } from '../modules/nf-core/modules/fastqc/
include { MULTIQC } from '../modules/nf-core/modules/multiqc/main' include { MULTIQC } from '../modules/nf-core/modules/multiqc/main'
include { CUSTOM_DUMPSOFTWAREVERSIONS } from '../modules/nf-core/modules/custom/dumpsoftwareversions/main' include { CUSTOM_DUMPSOFTWAREVERSIONS } from '../modules/nf-core/modules/custom/dumpsoftwareversions/main'
include { CAT_FASTQ } from '../modules/nf-core/modules/cat/fastq/main'
include { MALT_RUN } from '../modules/nf-core/modules/malt/run/main'
include { KRAKEN2_KRAKEN2 } from '../modules/nf-core/modules/kraken2/kraken2/main'
/* /*
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
RUN MAIN WORKFLOW RUN MAIN WORKFLOW
@ -71,11 +81,15 @@ workflow TAXPROFILER {
) )
ch_versions = ch_versions.mix(INPUT_CHECK.out.versions) ch_versions = ch_versions.mix(INPUT_CHECK.out.versions)
DB_CHECK (
ch_databases
)
// //
// MODULE: Run FastQC // MODULE: Run FastQC
// //
FASTQC ( FASTQC (
INPUT_CHECK.out.reads INPUT_CHECK.out.fastq
) )
ch_versions = ch_versions.mix(FASTQC.out.versions.first()) ch_versions = ch_versions.mix(FASTQC.out.versions.first())
@ -83,6 +97,88 @@ workflow TAXPROFILER {
ch_versions.unique().collectFile(name: 'collated_versions.yml') ch_versions.unique().collectFile(name: 'collated_versions.yml')
) )
//
// PERFORM PREPROCESSING
//
if ( params.fastp_clip_merge ) {
FASTQ_PREPROCESSING ( INPUT_CHECK.out.fastq )
}
//
// PERFORM RUN MERGING
//
ch_processed_for_combine = FASTQ_PREPROCESSING.out.reads
.dump(tag: "prep_for_combine_grouping")
.map {
meta, reads ->
def meta_new = meta.clone()
meta_new['run_accession'] = 'combined'
[ meta_new, reads ]
}
.groupTuple ( by: 0 )
.branch{
combine: it[1].size() >= 2
skip: it[1].size() < 2
}
CAT_FASTQ ( ch_processed_for_combine.combine )
ch_reads_for_profiling = ch_processed_for_combine.skip
.dump(tag: "skip_combine")
.mix( CAT_FASTQ.out.reads )
.dump(tag: "files_for_profiling")
//
// COMBINE READS WITH POSSIBLE DATABASES
//
// output [DUMP: reads_plus_db] [['id':'2612', 'run_accession':'combined', 'instrument_platform':'ILLUMINA', 'single_end':1], <reads_path>/2612.merged.fastq.gz, ['tool':'malt', 'db_name':'mal95', 'db_params':'"-id 90"'], <db_path>/malt90]
ch_input_for_profiling = ch_reads_for_profiling
.combine(DB_CHECK.out.dbs)
.dump(tag: "reads_plus_db")
.branch {
malt: it[2]['tool'] == 'malt'
kraken2: it[2]['tool'] == 'kraken2'
unknown: true
}
//
// PREP PROFILER INPUT CHANNELS ON PER TOOL BASIS
//
// We groupTuple to have all samples in one channel for MALT as database
// loading takes a long time, so we only want to run it once per database
ch_input_for_malt = ch_input_for_profiling.malt
.map {
it ->
def temp_meta = [ id: it[2]['db_name']] + it[2]
def db = it[3]
[ temp_meta, it[1], db ]
}
.groupTuple(by: [0,2])
.dump(tag: "input for malt")
.multiMap {
it ->
reads: [ it[0], it[1].flatten() ]
db: it[2]
}
// We can run Kraken2 one-by-one sample-wise
ch_input_for_kraken2 = ch_input_for_profiling.kraken2
.dump(tag: "input for kraken")
.multiMap {
it ->
reads: [ it[0] + it[2], it[1] ]
db: it[3]
}
//
// RUN PROFILING
//
MALT_RUN ( ch_input_for_malt.reads, params.malt_mode, ch_input_for_malt.db )
KRAKEN2_KRAKEN2 ( ch_input_for_kraken2.reads, ch_input_for_kraken2.db )
// //
// MODULE: MultiQC // MODULE: MultiQC
// //
@ -95,7 +191,20 @@ workflow TAXPROFILER {
ch_multiqc_files = ch_multiqc_files.mix(ch_workflow_summary.collectFile(name: 'workflow_summary_mqc.yaml')) ch_multiqc_files = ch_multiqc_files.mix(ch_workflow_summary.collectFile(name: 'workflow_summary_mqc.yaml'))
ch_multiqc_files = ch_multiqc_files.mix(CUSTOM_DUMPSOFTWAREVERSIONS.out.mqc_yml.collect()) ch_multiqc_files = ch_multiqc_files.mix(CUSTOM_DUMPSOFTWAREVERSIONS.out.mqc_yml.collect())
ch_multiqc_files = ch_multiqc_files.mix(FASTQC.out.zip.collect{it[1]}.ifEmpty([])) ch_multiqc_files = ch_multiqc_files.mix(FASTQC.out.zip.collect{it[1]}.ifEmpty([]))
if (params.fastp_clip_merge) {
ch_multiqc_files = ch_multiqc_files.mix(FASTQ_PREPROCESSING.out.mqc)
}
if (params.run_kraken2) {
ch_multiqc_files = ch_multiqc_files.mix(KRAKEN2_KRAKEN2.out.txt.collect{it[1]}.ifEmpty([]))
ch_versions = ch_versions.mix(KRAKEN2_KRAKEN2.out.versions.first())
}
if (params.run_malt) {
ch_multiqc_files = ch_multiqc_files.mix(MALT_RUN.out.log.collect{it[1]}.ifEmpty([]))
ch_versions = ch_versions.mix(MALT_RUN.out.versions.first())
}
// TODO MALT results overwriting per database?
// TODO Versions for Karken/MALT not report?
MULTIQC ( MULTIQC (
ch_multiqc_files.collect() ch_multiqc_files.collect()
) )