diff --git a/.gitignore b/.gitignore index 0acd465..f9c1b1b 100644 --- a/.gitignore +++ b/.gitignore @@ -29,7 +29,3 @@ docs/site/ # committed for packages, but should be committed for applications that require a static # environment. Manifest.toml - -### Input files ### -*.fastq.gz -metadata*.tsv diff --git a/Project.toml b/Project.toml index 3272bdb..ed6c11c 100644 --- a/Project.toml +++ b/Project.toml @@ -4,35 +4,27 @@ authors = ["Thomas A. Christensen II <25492070+MillironX@users.noreply.github.co version = "0.1.0" [deps] -BaseDirs = "18cc8868-cbac-4acf-b575-c8ff214dc66f" CSV = "336ed68f-0bac-5ca0-87d4-7b16caf5d00b" -CodecZlib = "944b1d66-785c-5afd-91f1-9de20f533193" Conda = "8f4d0f93-b110-5947-807f-2305c1781a2d" Cowsay = "b6370f49-8ad1-4651-ad9e-3639b35da0e9" +Dagger = "d58978e5-989f-55fb-8d15-ea34adc7bf54" DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0" Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" -FilePathsBase = "48062228-2e41-5def-b9a4-89aafe57970f" Glob = "c27321d9-0574-5035-807b-f59d2c89b15c" HTTP = "cd3eb016-35fb-5094-929b-558a96fad6f3" -SHA = "ea8e919c-243c-51af-8825-aaa63cd721ce" URIs = "5c2747f8-b7ea-4ff2-ba2e-563bfd36b1d4" -UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4" YAML = "ddb6d928-2868-570f-bddf-ab3f9cf99eb6" [compat] -BaseDirs = "1.2.4" CSV = "0.10.15" -CodecZlib = "0.7.8" Conda = "1.10.2" Cowsay = "1.0.0" +Dagger = "0.18.14" DataFrames = "1.7.0" Distributed = "1.11.0" -FilePathsBase = "0.9.24" Glob = "1.3.1" HTTP = "1.10.16" -SHA = "0.7.0" URIs = "1.5.2" -UUIDs = "1.11.0" YAML = "0.4.13" julia = "1.11" diff --git a/src/Cowcalf_rumen_metagenomic_pipeline.jl b/src/Cowcalf_rumen_metagenomic_pipeline.jl index 8de0d43..c08e683 100644 --- a/src/Cowcalf_rumen_metagenomic_pipeline.jl +++ b/src/Cowcalf_rumen_metagenomic_pipeline.jl @@ -3,31 +3,22 @@ module Cowcalf_rumen_metagenomic_pipeline using Conda: Conda, runconda using Cowsay: cowsay using CSV: CSV -using DataFrames: DataFrame, Not, combine, eachrow, nrow, rename!, select! +using DataFrames: DataFrame, rename! using Distributed: pmap, @everywhere -using FilePathsBase: AbstractPath, Path, absolute, cwd, @__PATH__, @p_str using Glob: GlobMatch, glob using HTTP: HTTP using URIs: URI -using UUIDs: uuid4 using YAML: YAML export main include("Metaxa.jl") -include("Qiime.jl") using .Metaxa: Metaxa -using .Qiime: Qiime -_fetch_yaml_contents(yaml::AbstractPath) = YAML.load_file(string(yaml)) +_fetch_yaml_contents(yaml::AbstractString) = YAML.load_file(yaml) _fetch_yaml_contents(yaml::URI) = YAML.load(String(HTTP.get(yaml).body)) -function setup_remote_conda_environment(yaml::Union{AbstractPath,URI}, env_name::Symbol) - # Check for the existence of the environment with the same name before creating one - # We assume that the existence of the environment means that it has been properly setup - # TODO: Provide a way to force re-initialization of a conda environment - isdir(Conda.prefix(env_name)) && return env_name - +function setup_remote_conda_environment(yaml::Union{AbstractString,URI}, env_name::Symbol) ENV["CONDA_JL_USE_MINIFORGE"] = "1" # Install x86 packages via Rosetta2 on MacOS @@ -44,7 +35,7 @@ function setup_remote_conda_environment(yaml::Union{AbstractPath,URI}, env_name: return env_name end #function -function import_metadata(metadata_tsv::AbstractPath) +function import_metadata(metadata_tsv::AbstractString) df = DataFrame(CSV.File(metadata_tsv)) rename!(df, 1 => :sample_name) return df @@ -56,8 +47,8 @@ function sample_files(samplenames::Vector{<:AbstractString}) return ( samplename, ( - absolute(Path(first(glob(GlobMatch("$(samplename)*1*.fastq.gz"))))), - absolute(Path(first(glob(GlobMatch("$(samplename)*2*.fastq.gz"))))), + abspath(first(glob(GlobMatch("$(samplename)*1*.fastq.gz")))), + abspath(first(glob(GlobMatch("$(samplename)*2*.fastq.gz")))), ), ) end #function @@ -65,29 +56,8 @@ function sample_files(samplenames::Vector{<:AbstractString}) return map(_s, samplenames) end #function -function qiime_feature_table(table_file::AbstractPath) - df = DataFrame(CSV.File(table_file)) - - # Biom format requires that taxonomy hierarchy be named exactly "taxonomy" - rename!(df, 1 => :taxonomy) - - # Every OTU in a feature table must have a universally unique id - df[!, Symbol("#SampleId")] = [uuid4() for i = 1:nrow(df)] - - # Biom tables are sensitive to the position of the columns: - # arrange them so that #SampleId is first, taxonomy is last, - # and all the observed counts are in the middle - select!( - feature_table, - Symbol("#SampleId"), - Not([Symbol("#SampleId"), :taxonomy]), - :taxonomy, - ) - return df -end #function - function (@main)(ARGS) - metadata_file = Path(pop!(ARGS)) + metadata_file = pop!(ARGS) setup_remote_conda_environment( URI( @@ -96,7 +66,7 @@ function (@main)(ARGS) :qiime2, ) setup_remote_conda_environment( - joinpath(@__PATH__, p"..", p"conda_envs", p"metaxa2.yml"), + joinpath(@__DIR__, "..", "conda_envs", "metaxa2.yml"), :metaxa2, ) @@ -106,35 +76,12 @@ function (@main)(ARGS) @eval begin @everywhere begin include(joinpath(@__DIR__, "Metaxa.jl")) - include(joinpath(@__DIR__, "Qiime.jl")) using .Metaxa: Metaxa - using .Qiime: Qiime end #@everywhere end #@eval - taxonomy_files = pmap(x -> Metaxa.taxonomy(first(x), last(x)...), fastq_files) - feature_table = qiime_feature_table(Metaxa.data_collector(taxonomy_files...)) - rarefaction_min = minimum( - eachrow( - combine( - feature_table, - names(feature_table, Not(:taxonomy, Symbol("#SampleId"))) .=> sum, - renamecols = false, - ), - )..., - ) - rarefaction_max = maximum( - eachrow( - combine( - feature_table, - names(feature_table, Not(:taxonomy, Symbol("#SampleId"))) .=> sum, - renamecols = false, - ), - )..., - ) - @show rarefaction_min - @show rarefaction_max - - cp(Qiime.biom_feature_table(feature_table), cwd()) + taxonomy_files = pmap(x -> Metaxa.taxonomy(first(x), last(x)), fastq_files) + feature_table = Metaxa.data_collector(taxonomy_files...) + cp(feature_table, joinpath(pwd(), "feature-table.tsv")) cowsay("Hello from Cowcalf_rumen_metagenomic_pipeline") return 0 diff --git a/src/Metaxa.jl b/src/Metaxa.jl index 57f5fbb..fd40bee 100644 --- a/src/Metaxa.jl +++ b/src/Metaxa.jl @@ -1,15 +1,18 @@ module Metaxa using Conda: runconda -using FilePathsBase: AbstractPath, Path, absolute, exists include("ProcessHelper.jl") -using .ProcessHelper: cache_function_value +using .ProcessHelper: exec_in_temp_dir export taxonomy -function _classifier(samplename::AbstractString, fastq1::AbstractPath, fastq2::AbstractPath) +function _classifier( + samplename::AbstractString, + fastq1::AbstractString, + fastq2::AbstractString, +) runconda( `run metaxa2 \ -1 $fastq1 \ @@ -24,12 +27,12 @@ function _classifier(samplename::AbstractString, fastq1::AbstractPath, fastq2::A `, :metaxa2, ) - exists(Path("$samplename.taxonomy.txt")) || + ispath("$samplename.taxonomy.txt") || error("metaxa2 ran, but $samplename.taxonomy.txt was not found!") - return absolute(Path("$samplename.taxonomy.txt")) + return abspath("$samplename.taxonomy.txt") end #function -function _taxonomy_traversal(samplename::AbstractString, taxonomy::AbstractPath) +function _taxonomy_traversal(samplename::AbstractString, taxonomy::AbstractString) runconda( `run metaxa2_ttt \ -i $taxonomy \ @@ -40,33 +43,35 @@ function _taxonomy_traversal(samplename::AbstractString, taxonomy::AbstractPath) `, :metaxa2, ) - exists(Path("$samplename.level_7.txt")) || + ispath("$samplename.level_7.txt") || error("metaxa2 ran, but $samplename.level_7.txt was not found!") - return absolute(Path("$samplename.level_7.txt")) + return abspath("$samplename.level_7.txt") end #function -function taxonomy(samplename::AbstractString, fastq::AbstractPath...) - taxonomy_file = cache_function_value(_classifier, samplename, fastq...) - level_7_taxonomy_file = - cache_function_value(_taxonomy_traversal, samplename, taxonomy_file) +function taxonomy( + samplename::AbstractString, + fastq::Tuple{<:AbstractString,<:AbstractString}, +) + taxonomy_file = exec_in_temp_dir(_classifier, samplename, fastq...) + level_7_taxonomy_file = exec_in_temp_dir(_taxonomy_traversal, samplename, taxonomy_file) return level_7_taxonomy_file end #function -function _dc(taxonomies::AbstractPath...) +function _dc(taxonomies::AbstractString...) runconda( `run metaxa2_dc \ -o feature-table.tsv \ - $(map(basename, taxonomies)) + $(join(taxonomies, ' ')) `, :metaxa2, ) - exists(Path("feature-table.tsv")) || + ispath("feature-table.tsv") || error("metaxa2 ran, but feature-table.tsv was not found!") - return absolute(Path("feature-table.tsv")) + return abspath("feature-table.tsv") end #function -function data_collector(taxonomies::AbstractPath...) - return cache_function_value(_dc, taxonomies...) +function data_collector(taxonomies::AbstractString...) + return exec_in_temp_dir(_dc, taxonomies...) end #function end #module diff --git a/src/ProcessHelper.jl b/src/ProcessHelper.jl index e87ee8e..ca01370 100644 --- a/src/ProcessHelper.jl +++ b/src/ProcessHelper.jl @@ -1,107 +1,36 @@ module ProcessHelper -using BaseDirs: BaseDirs -using CodecZlib: GzipCompressorStream, GzipDecompressorStream -using CSV: CSV -using DataFrames: DataFrame -using FilePathsBase: AbstractPath, Path, canonicalize, exists -using SHA: SHA1_CTX, update!, digest! +export sym_temp -export cache_function_value -export exec_in_temp_dir +""" + sym_temp(file::AbstractString) -> (tmp_dir, link) + sym_temp(files::Tuple{<:AbstractString,<:AbstractString}) -> (tmp_dir, links) -const CACHE_DIR = Path( - BaseDirs.User.cache( - BaseDirs.Project("Cowcalf_rumen_metagenomic_pipeline"); - create = true, - ), -) - -function exec_in_temp_dir(f::Function, args...) - tmp_dir = Path(mktempdir(; cleanup = false)) +Copies `file(s)` to a new temporary directory named `tmp_dir` and symbolically links them +inside of that directory. Returns a tuple of the directory path and the path to the links. +""" +function sym_temp(files::AbstractString...) + tmp_dir = mktempdir(; cleanup = false) @info "Creating temporary directory $tmp_dir" - if any(a -> a isa AbstractPath, args) - newargs = [] - for arg in args - if arg isa AbstractPath - symlink_path = joinpath(tmp_dir, basename(arg)) - symlink(canonicalize(arg), symlink_path) - @info "Symlinked $arg to $symlink_path" - push!(newargs, symlink_path) - else - push!(newargs, arg) - end #if - end #for - - return cd(() -> f(newargs...), tmp_dir) - else - return cd(() -> f(args...), tmp_dir) - end #if + function _symlink(file::AbstractString) + symlink_path = joinpath(tmp_dir, basename(file)) + symlink(realpath(file), symlink_path) + @info "Symlinked $file to $symlink_path" + return symlink_path + end #function + return (tmp_dir, map(_symlink, files)) end #function -function cache_function_value(f::Function, args...) - ctx = SHA1_CTX() - update!(ctx, Vector{UInt8}(String(Symbol(f)))) - for arg in args - if arg isa AbstractPath - open(string(arg), "r") do a - update!(ctx, read(a)) - end #do - elseif arg isa DataFrame - for row in CSV.RowWriter(arg) - update!(ctx, Vector{UInt8}(row)) - end #for - else - update!(ctx, Vector{UInt8}(String(arg))) - end #if - end #for - input_hash = bytes2hex(digest!(ctx)) - - cache_file = joinpath(CACHE_DIR, input_hash) - - if exists(cache_file) - @info "Found cached file for function `$(String(Symbol(f)))` with input hash $input_hash" - tmp_dir = Path(mktempdir(; cleanup = false)) - @info "Creating temporary directory $tmp_dir" - - filename = readchomp(string(cache_file, ".filename")) - uncached_file = joinpath(tmp_dir, filename) - - open(GzipDecompressorStream, string(cache_file), "r") do compressed - open(string(uncached_file), "w") do uncompressed - write(uncompressed, read(compressed)) - end #do - end #do - - @info "Copied cached file for $input_hash to $uncached_file" - return canonicalize(uncached_file) - - end #if - - @info "Input hash $input_hash not found for function `$(String(Symbol(f)))`" - - result_file = exec_in_temp_dir(f, args...) - - result_file isa AbstractPath || error( - "`cache_function_value` can only work on functions that return an AbstractPath", - ) - - filename = basename(result_file) - open(string(cache_file, ".filename"), "w") do fnf - write(fnf, filename) - end #do - - open(string(result_file), "r") do uncompressed - open(GzipCompressorStream, string(cache_file), "w") do compressed - write(compressed, uncompressed) - end #do - end #do - @info "Copied result file $result_file into cache under hash $input_hash" - - return result_file +function exec_in_temp_dir(f::Function, samplename::AbstractString, files::AbstractString...) + tmp_dir, tmp_files = sym_temp(files...) + return cd(() -> f(samplename, tmp_files...), tmp_dir) +end #function +function exec_in_temp_dir(f::Function, files::AbstractString...) + tmp_dir, tmp_files = sym_temp(files...) + return cd(() -> f(tmp_files...), tmp_dir) end #function end #module diff --git a/src/Qiime.jl b/src/Qiime.jl deleted file mode 100644 index 89b8378..0000000 --- a/src/Qiime.jl +++ /dev/null @@ -1,33 +0,0 @@ -module Qiime - -using Base: cache_file_entry -using Conda: runconda -using CSV: CSV -using DataFrames: DataFrame -using FilePathsBase: AbstractPath, Path, absolute, exists - -include("ProcessHelper.jl") - -using .ProcessHelper: cache_function_value - -function _write_biom_feature_table(df_feature_table::DataFrame) - CSV.write("feature-table.tsv", df_feature_table; delim = '\t') - runconda( - `run biom convert \ - -i feature-table.tsv \ - -o feature-table.hdf5.biom \ - --table-type="OTU table" \ - --to-hdf5 \ - --process-obs-metadata taxonomy`, - :qiime2, - ) - exists(Path("feature-table.hdf5.biom")) || - error("qiime2 ran, but feature-table.hdf5.biom was not found!") - return absolute(Path("feature-table.hdf5.biom")) -end #function - -function biom_feature_table(df_feature_table::DataFrame) - return cache_function_value(_write_biom_feature_table, df_feature_table) -end #function - -end #module