mirror of
https://github.com/MillironX/taxprofiler.git
synced 2024-11-21 21:06:03 +00:00
Added database preparation and final channel for profiling
This commit is contained in:
parent
cf55cc592c
commit
278f5605ca
6 changed files with 161 additions and 47 deletions
25
modules/local/database_check.nf
Normal file
25
modules/local/database_check.nf
Normal file
|
@ -0,0 +1,25 @@
|
||||||
|
process DATABASE_CHECK {
|
||||||
|
tag "$databasesheet"
|
||||||
|
|
||||||
|
conda (params.enable_conda ? "conda-forge::python=3.8.3" : null)
|
||||||
|
container "${ workflow.containerEngine == 'singularity' && !task.ext.singularity_pull_docker_container ?
|
||||||
|
'https://depot.galaxyproject.org/singularity/python:3.8.3' :
|
||||||
|
'quay.io/biocontainers/python:3.8.3' }"
|
||||||
|
|
||||||
|
input:
|
||||||
|
path databasesheet
|
||||||
|
|
||||||
|
output:
|
||||||
|
path '*.csv' , emit: csv
|
||||||
|
path "versions.yml", emit: versions
|
||||||
|
|
||||||
|
script: // This script is bundled with the pipeline, in nf-core/taxprofiler/bin/
|
||||||
|
"""
|
||||||
|
cat $databasesheet >> database_sheet.valid.csv
|
||||||
|
|
||||||
|
cat <<-END_VERSIONS > versions.yml
|
||||||
|
"${task.process}":
|
||||||
|
python: \$(python --version | sed 's/Python //g')
|
||||||
|
END_VERSIONS
|
||||||
|
"""
|
||||||
|
}
|
|
@ -50,6 +50,9 @@ params {
|
||||||
max_cpus = 16
|
max_cpus = 16
|
||||||
max_time = '240.h'
|
max_time = '240.h'
|
||||||
|
|
||||||
|
// Databaess
|
||||||
|
databases = null
|
||||||
|
|
||||||
// FASTQ preprocessing
|
// FASTQ preprocessing
|
||||||
fastp_clip_merge = false
|
fastp_clip_merge = false
|
||||||
fastp_exclude_unmerged = true
|
fastp_exclude_unmerged = true
|
||||||
|
|
40
subworkflows/local/db_check.nf
Normal file
40
subworkflows/local/db_check.nf
Normal file
|
@ -0,0 +1,40 @@
|
||||||
|
//
|
||||||
|
// Check input samplesheet and get read channels
|
||||||
|
//
|
||||||
|
|
||||||
|
include { DATABASE_CHECK } from '../../modules/local/database_check'
|
||||||
|
|
||||||
|
workflow DB_CHECK {
|
||||||
|
take:
|
||||||
|
dbsheet // file: /path/to/dbsheet.csv
|
||||||
|
|
||||||
|
main:
|
||||||
|
|
||||||
|
// TODO: make database sheet check
|
||||||
|
parsed_samplesheet = DATABASE_CHECK ( dbsheet )
|
||||||
|
.csv
|
||||||
|
.splitCsv ( header:true, sep:',' )
|
||||||
|
.dump(tag: "db_split_csv_out")
|
||||||
|
.map { create_db_channels(it) }
|
||||||
|
.dump(tag: "db_channel_prepped")
|
||||||
|
.set{ dbs }
|
||||||
|
|
||||||
|
emit:
|
||||||
|
dbs // channel: [ val(meta), [ db ] ]
|
||||||
|
versions = DATABASE_CHECK.out.versions // channel: [ versions.yml ]
|
||||||
|
}
|
||||||
|
|
||||||
|
def create_db_channels(LinkedHashMap row) {
|
||||||
|
def meta = [:]
|
||||||
|
meta.tool = row.tool
|
||||||
|
meta.db_name = row.db_name
|
||||||
|
meta.db_params = row.db_params
|
||||||
|
|
||||||
|
def array = []
|
||||||
|
if (!file(row.db_path, type: 'dir').exists()) {
|
||||||
|
exit 1, "ERROR: Please check input samplesheet -> database could not be found!\n${row.db_path}"
|
||||||
|
}
|
||||||
|
array = [ meta, file(row.db_path) ]
|
||||||
|
|
||||||
|
return array
|
||||||
|
}
|
|
@ -12,7 +12,7 @@ workflow INPUT_CHECK {
|
||||||
parsed_samplesheet = SAMPLESHEET_CHECK ( samplesheet )
|
parsed_samplesheet = SAMPLESHEET_CHECK ( samplesheet )
|
||||||
.csv
|
.csv
|
||||||
.splitCsv ( header:true, sep:',' )
|
.splitCsv ( header:true, sep:',' )
|
||||||
.dump(tag: "split_csv_out")
|
.dump(tag: "input_split_csv_out")
|
||||||
.branch {
|
.branch {
|
||||||
fasta: it['fasta'] != ''
|
fasta: it['fasta'] != ''
|
||||||
fastq: true
|
fastq: true
|
||||||
|
|
73
subworkflows/local/preprocessing.nf
Normal file
73
subworkflows/local/preprocessing.nf
Normal file
|
@ -0,0 +1,73 @@
|
||||||
|
//
|
||||||
|
// Check input samplesheet and get read channels
|
||||||
|
//
|
||||||
|
|
||||||
|
|
||||||
|
include { FASTP as FASTP_SINGLE } from '../../modules/nf-core/modules/fastp/main'
|
||||||
|
include { FASTP as FASTP_PAIRED } from '../../modules/nf-core/modules/fastp/main'
|
||||||
|
include { FASTQC as FASTQC_POST } from '../../modules/nf-core/modules/fastqc/main'
|
||||||
|
|
||||||
|
workflow FASTQ_PREPROCESSING {
|
||||||
|
take:
|
||||||
|
reads // file: /path/to/samplesheet.csv
|
||||||
|
|
||||||
|
main:
|
||||||
|
ch_versions = Channel.empty()
|
||||||
|
ch_multiqc_files = Channel.empty()
|
||||||
|
|
||||||
|
//
|
||||||
|
// STEP: Read clipping and merging
|
||||||
|
//
|
||||||
|
// TODO give option to clip only and retain pairs
|
||||||
|
// TODO give option to retain singletons (probably fastp option likely)
|
||||||
|
// TODO move to subworkflow
|
||||||
|
|
||||||
|
|
||||||
|
if ( params.fastp_clip_merge ) {
|
||||||
|
|
||||||
|
ch_input_for_fastp = reads
|
||||||
|
.dump(tag: "pre-fastp_branch")
|
||||||
|
.branch{
|
||||||
|
single: it[0]['single_end'] == true
|
||||||
|
paired: it[0]['single_end'] == false
|
||||||
|
}
|
||||||
|
|
||||||
|
ch_input_for_fastp.single.dump(tag: "input_fastp_single")
|
||||||
|
ch_input_for_fastp.paired.dump(tag: "input_fastp_paired")
|
||||||
|
|
||||||
|
FASTP_SINGLE ( ch_input_for_fastp.single, false, false )
|
||||||
|
FASTP_PAIRED ( ch_input_for_fastp.paired, false, true )
|
||||||
|
|
||||||
|
ch_fastp_reads_prepped = FASTP_PAIRED.out.reads_merged
|
||||||
|
.mix( FASTP_SINGLE.out.reads )
|
||||||
|
.map {
|
||||||
|
meta, reads ->
|
||||||
|
def meta_new = meta.clone()
|
||||||
|
meta_new['single_end'] = 1
|
||||||
|
[ meta_new, reads ]
|
||||||
|
}
|
||||||
|
|
||||||
|
FASTQC_POST ( ch_fastp_reads_prepped )
|
||||||
|
|
||||||
|
ch_versions = ch_versions.mix(FASTP_SINGLE.out.versions.first())
|
||||||
|
ch_versions = ch_versions.mix(FASTP_PAIRED.out.versions.first())
|
||||||
|
|
||||||
|
ch_processed_reads = ch_fastp_reads_prepped
|
||||||
|
|
||||||
|
ch_multiqc_files = ch_multiqc_files.mix( FASTQC_POST.out.zip.collect{it[1]} )
|
||||||
|
ch_multiqc_files = ch_multiqc_files.mix( FASTP_SINGLE.out.json.collect{it[1]} )
|
||||||
|
ch_multiqc_files = ch_multiqc_files.mix( FASTP_PAIRED.out.json.collect{it[1]} )
|
||||||
|
|
||||||
|
ch_multiqc_files.dump(tag: "preprocessing_mqc_final")
|
||||||
|
|
||||||
|
} else {
|
||||||
|
ch_processed_reads = reads
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
emit:
|
||||||
|
reads = ch_processed_reads // channel: [ val(meta), [ reads ] ]
|
||||||
|
versions = ch_versions // channel: [ versions.yml ]
|
||||||
|
mqc = ch_multiqc_files
|
||||||
|
}
|
||||||
|
|
|
@ -11,11 +11,12 @@ WorkflowTaxprofiler.initialise(params, log)
|
||||||
|
|
||||||
// TODO nf-core: Add all file path parameters for the pipeline to the list below
|
// TODO nf-core: Add all file path parameters for the pipeline to the list below
|
||||||
// Check input path parameters to see if they exist
|
// Check input path parameters to see if they exist
|
||||||
def checkPathParamList = [ params.input, params.multiqc_config ]
|
def checkPathParamList = [ params.input, params.databases, params.multiqc_config ]
|
||||||
for (param in checkPathParamList) { if (param) { file(param, checkIfExists: true) } }
|
for (param in checkPathParamList) { if (param) { file(param, checkIfExists: true) } }
|
||||||
|
|
||||||
// Check mandatory parameters
|
// Check mandatory parameters
|
||||||
if (params.input) { ch_input = file(params.input) } else { exit 1, 'Input samplesheet not specified!' }
|
if (params.input ) { ch_input = file(params.input) } else { exit 1, 'Input samplesheet not specified!' }
|
||||||
|
if (params.databases) { ch_databases = file(params.databases) } else { exit 1, 'Input database sheet not specified!' }
|
||||||
|
|
||||||
/*
|
/*
|
||||||
========================================================================================
|
========================================================================================
|
||||||
|
@ -35,7 +36,11 @@ ch_multiqc_custom_config = params.multiqc_config ? Channel.fromPath(params.multi
|
||||||
//
|
//
|
||||||
// SUBWORKFLOW: Consisting of a mix of local and nf-core/modules
|
// SUBWORKFLOW: Consisting of a mix of local and nf-core/modules
|
||||||
//
|
//
|
||||||
include { INPUT_CHECK } from '../subworkflows/local/input_check'
|
include { INPUT_CHECK } from '../subworkflows/local/input_check'
|
||||||
|
|
||||||
|
include { DB_CHECK } from '../subworkflows/local/db_check'
|
||||||
|
include { FASTQ_PREPROCESSING } from '../subworkflows/local/preprocessing'
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
========================================================================================
|
========================================================================================
|
||||||
|
@ -50,9 +55,6 @@ include { FASTQC } from '../modules/nf-core/modules/fastqc/
|
||||||
include { MULTIQC } from '../modules/nf-core/modules/multiqc/main'
|
include { MULTIQC } from '../modules/nf-core/modules/multiqc/main'
|
||||||
include { CUSTOM_DUMPSOFTWAREVERSIONS } from '../modules/nf-core/modules/custom/dumpsoftwareversions/main'
|
include { CUSTOM_DUMPSOFTWAREVERSIONS } from '../modules/nf-core/modules/custom/dumpsoftwareversions/main'
|
||||||
|
|
||||||
include { FASTP as FASTP_SINGLE } from '../modules/nf-core/modules/fastp/main'
|
|
||||||
include { FASTP as FASTP_PAIRED } from '../modules/nf-core/modules/fastp/main'
|
|
||||||
include { FASTQC as FASTQC_POST } from '../modules/nf-core/modules/fastqc/main'
|
|
||||||
include { CAT_FASTQ } from '../modules/nf-core/modules/cat/fastq/main'
|
include { CAT_FASTQ } from '../modules/nf-core/modules/cat/fastq/main'
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -76,6 +78,10 @@ workflow TAXPROFILER {
|
||||||
)
|
)
|
||||||
ch_versions = ch_versions.mix(INPUT_CHECK.out.versions)
|
ch_versions = ch_versions.mix(INPUT_CHECK.out.versions)
|
||||||
|
|
||||||
|
DB_CHECK (
|
||||||
|
ch_databases
|
||||||
|
)
|
||||||
|
|
||||||
//
|
//
|
||||||
// MODULE: Run FastQC
|
// MODULE: Run FastQC
|
||||||
//
|
//
|
||||||
|
@ -91,47 +97,12 @@ workflow TAXPROFILER {
|
||||||
//
|
//
|
||||||
// MODULE: Run Clip/Merge/Complexity
|
// MODULE: Run Clip/Merge/Complexity
|
||||||
//
|
//
|
||||||
// TODO give option to clip only and retain pairs
|
|
||||||
// TODO give option to retain singletons (probably fastp option likely)
|
|
||||||
// TODO move to subworkflow
|
|
||||||
if ( params.fastp_clip_merge ) {
|
if ( params.fastp_clip_merge ) {
|
||||||
|
FASTQ_PREPROCESSING ( INPUT_CHECK.out.fastq )
|
||||||
ch_input_for_fastp = INPUT_CHECK.out.fastq
|
|
||||||
.dump(tag: "pre-fastp_branch")
|
|
||||||
.branch{
|
|
||||||
single: it[0]['single_end'] == true
|
|
||||||
paired: it[0]['single_end'] == false
|
|
||||||
}
|
|
||||||
|
|
||||||
ch_input_for_fastp.single.dump(tag: "input_fastp_single")
|
|
||||||
ch_input_for_fastp.paired.dump(tag: "input_fastp_paired")
|
|
||||||
|
|
||||||
FASTP_SINGLE ( ch_input_for_fastp.single, false, false )
|
|
||||||
FASTP_PAIRED ( ch_input_for_fastp.paired, false, true )
|
|
||||||
|
|
||||||
ch_fastp_reads_prepped = FASTP_PAIRED.out.reads_merged
|
|
||||||
.mix( FASTP_SINGLE.out.reads )
|
|
||||||
.map {
|
|
||||||
meta, reads ->
|
|
||||||
def meta_new = meta.clone()
|
|
||||||
meta_new['single_end'] = 1
|
|
||||||
[ meta_new, reads ]
|
|
||||||
}
|
|
||||||
|
|
||||||
FASTQC_POST ( ch_fastp_reads_prepped )
|
|
||||||
|
|
||||||
ch_versions = ch_versions.mix(FASTP_SINGLE.out.versions.first())
|
|
||||||
ch_versions = ch_versions.mix(FASTP_PAIRED.out.versions.first())
|
|
||||||
|
|
||||||
ch_processed_reads = ch_fastp_reads_prepped
|
|
||||||
|
|
||||||
} else {
|
|
||||||
ch_processed_reads = INPUT_CHECK.out.fastq
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// MODULE: Cat merge runs of same sample
|
// MODULE: Cat merge runs of same sample
|
||||||
ch_processed_for_combine = ch_processed_reads
|
ch_processed_for_combine = FASTQ_PREPROCESSING.out.reads
|
||||||
.dump(tag: "prep_for_combine_grouping")
|
.dump(tag: "prep_for_combine_grouping")
|
||||||
.map {
|
.map {
|
||||||
meta, reads ->
|
meta, reads ->
|
||||||
|
@ -153,6 +124,10 @@ workflow TAXPROFILER {
|
||||||
.mix( CAT_FASTQ.out.reads )
|
.mix( CAT_FASTQ.out.reads )
|
||||||
.dump(tag: "files_for_profiling")
|
.dump(tag: "files_for_profiling")
|
||||||
|
|
||||||
|
// Combine reads with possible databases
|
||||||
|
|
||||||
|
ch_reads_for_profiling.combine(DB_CHECK.out.dbs).dump(tag: "reads_plus_db")
|
||||||
|
|
||||||
//
|
//
|
||||||
// MODULE: MultiQC
|
// MODULE: MultiQC
|
||||||
//
|
//
|
||||||
|
@ -166,9 +141,7 @@ workflow TAXPROFILER {
|
||||||
ch_multiqc_files = ch_multiqc_files.mix(CUSTOM_DUMPSOFTWAREVERSIONS.out.mqc_yml.collect())
|
ch_multiqc_files = ch_multiqc_files.mix(CUSTOM_DUMPSOFTWAREVERSIONS.out.mqc_yml.collect())
|
||||||
ch_multiqc_files = ch_multiqc_files.mix(FASTQC.out.zip.collect{it[1]}.ifEmpty([]))
|
ch_multiqc_files = ch_multiqc_files.mix(FASTQC.out.zip.collect{it[1]}.ifEmpty([]))
|
||||||
if (params.fastp_clip_merge) {
|
if (params.fastp_clip_merge) {
|
||||||
ch_multiqc_files = ch_multiqc_files.mix(FASTP_SINGLE.out.json.collect{it[1]}.ifEmpty([]))
|
ch_multiqc_files = ch_multiqc_files.mix(FASTQ_PREPROCESSING.out.mqc)
|
||||||
ch_multiqc_files = ch_multiqc_files.mix(FASTP_PAIRED.out.json.collect{it[1]}.ifEmpty([]))
|
|
||||||
ch_multiqc_files = ch_multiqc_files.mix(FASTQC_POST.out.zip.collect{it[1]}.ifEmpty([]))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue