diff --git a/.gitignore b/.gitignore index f9c1b1b..0acd465 100644 --- a/.gitignore +++ b/.gitignore @@ -29,3 +29,7 @@ docs/site/ # committed for packages, but should be committed for applications that require a static # environment. Manifest.toml + +### Input files ### +*.fastq.gz +metadata*.tsv diff --git a/Project.toml b/Project.toml index ed6c11c..3272bdb 100644 --- a/Project.toml +++ b/Project.toml @@ -4,27 +4,35 @@ authors = ["Thomas A. Christensen II <25492070+MillironX@users.noreply.github.co version = "0.1.0" [deps] +BaseDirs = "18cc8868-cbac-4acf-b575-c8ff214dc66f" CSV = "336ed68f-0bac-5ca0-87d4-7b16caf5d00b" +CodecZlib = "944b1d66-785c-5afd-91f1-9de20f533193" Conda = "8f4d0f93-b110-5947-807f-2305c1781a2d" Cowsay = "b6370f49-8ad1-4651-ad9e-3639b35da0e9" -Dagger = "d58978e5-989f-55fb-8d15-ea34adc7bf54" DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0" Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" +FilePathsBase = "48062228-2e41-5def-b9a4-89aafe57970f" Glob = "c27321d9-0574-5035-807b-f59d2c89b15c" HTTP = "cd3eb016-35fb-5094-929b-558a96fad6f3" +SHA = "ea8e919c-243c-51af-8825-aaa63cd721ce" URIs = "5c2747f8-b7ea-4ff2-ba2e-563bfd36b1d4" +UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4" YAML = "ddb6d928-2868-570f-bddf-ab3f9cf99eb6" [compat] +BaseDirs = "1.2.4" CSV = "0.10.15" +CodecZlib = "0.7.8" Conda = "1.10.2" Cowsay = "1.0.0" -Dagger = "0.18.14" DataFrames = "1.7.0" Distributed = "1.11.0" +FilePathsBase = "0.9.24" Glob = "1.3.1" HTTP = "1.10.16" +SHA = "0.7.0" URIs = "1.5.2" +UUIDs = "1.11.0" YAML = "0.4.13" julia = "1.11" diff --git a/src/Cowcalf_rumen_metagenomic_pipeline.jl b/src/Cowcalf_rumen_metagenomic_pipeline.jl index c08e683..8de0d43 100644 --- a/src/Cowcalf_rumen_metagenomic_pipeline.jl +++ b/src/Cowcalf_rumen_metagenomic_pipeline.jl @@ -3,22 +3,31 @@ module Cowcalf_rumen_metagenomic_pipeline using Conda: Conda, runconda using Cowsay: cowsay using CSV: CSV -using DataFrames: DataFrame, rename! +using DataFrames: DataFrame, Not, combine, eachrow, nrow, rename!, select! using Distributed: pmap, @everywhere +using FilePathsBase: AbstractPath, Path, absolute, cwd, @__PATH__, @p_str using Glob: GlobMatch, glob using HTTP: HTTP using URIs: URI +using UUIDs: uuid4 using YAML: YAML export main include("Metaxa.jl") +include("Qiime.jl") using .Metaxa: Metaxa +using .Qiime: Qiime -_fetch_yaml_contents(yaml::AbstractString) = YAML.load_file(yaml) +_fetch_yaml_contents(yaml::AbstractPath) = YAML.load_file(string(yaml)) _fetch_yaml_contents(yaml::URI) = YAML.load(String(HTTP.get(yaml).body)) -function setup_remote_conda_environment(yaml::Union{AbstractString,URI}, env_name::Symbol) +function setup_remote_conda_environment(yaml::Union{AbstractPath,URI}, env_name::Symbol) + # Check for the existence of the environment with the same name before creating one + # We assume that the existence of the environment means that it has been properly setup + # TODO: Provide a way to force re-initialization of a conda environment + isdir(Conda.prefix(env_name)) && return env_name + ENV["CONDA_JL_USE_MINIFORGE"] = "1" # Install x86 packages via Rosetta2 on MacOS @@ -35,7 +44,7 @@ function setup_remote_conda_environment(yaml::Union{AbstractString,URI}, env_nam return env_name end #function -function import_metadata(metadata_tsv::AbstractString) +function import_metadata(metadata_tsv::AbstractPath) df = DataFrame(CSV.File(metadata_tsv)) rename!(df, 1 => :sample_name) return df @@ -47,8 +56,8 @@ function sample_files(samplenames::Vector{<:AbstractString}) return ( samplename, ( - abspath(first(glob(GlobMatch("$(samplename)*1*.fastq.gz")))), - abspath(first(glob(GlobMatch("$(samplename)*2*.fastq.gz")))), + absolute(Path(first(glob(GlobMatch("$(samplename)*1*.fastq.gz"))))), + absolute(Path(first(glob(GlobMatch("$(samplename)*2*.fastq.gz"))))), ), ) end #function @@ -56,8 +65,29 @@ function sample_files(samplenames::Vector{<:AbstractString}) return map(_s, samplenames) end #function +function qiime_feature_table(table_file::AbstractPath) + df = DataFrame(CSV.File(table_file)) + + # Biom format requires that taxonomy hierarchy be named exactly "taxonomy" + rename!(df, 1 => :taxonomy) + + # Every OTU in a feature table must have a universally unique id + df[!, Symbol("#SampleId")] = [uuid4() for i = 1:nrow(df)] + + # Biom tables are sensitive to the position of the columns: + # arrange them so that #SampleId is first, taxonomy is last, + # and all the observed counts are in the middle + select!( + feature_table, + Symbol("#SampleId"), + Not([Symbol("#SampleId"), :taxonomy]), + :taxonomy, + ) + return df +end #function + function (@main)(ARGS) - metadata_file = pop!(ARGS) + metadata_file = Path(pop!(ARGS)) setup_remote_conda_environment( URI( @@ -66,7 +96,7 @@ function (@main)(ARGS) :qiime2, ) setup_remote_conda_environment( - joinpath(@__DIR__, "..", "conda_envs", "metaxa2.yml"), + joinpath(@__PATH__, p"..", p"conda_envs", p"metaxa2.yml"), :metaxa2, ) @@ -76,12 +106,35 @@ function (@main)(ARGS) @eval begin @everywhere begin include(joinpath(@__DIR__, "Metaxa.jl")) + include(joinpath(@__DIR__, "Qiime.jl")) using .Metaxa: Metaxa + using .Qiime: Qiime end #@everywhere end #@eval - taxonomy_files = pmap(x -> Metaxa.taxonomy(first(x), last(x)), fastq_files) - feature_table = Metaxa.data_collector(taxonomy_files...) - cp(feature_table, joinpath(pwd(), "feature-table.tsv")) + taxonomy_files = pmap(x -> Metaxa.taxonomy(first(x), last(x)...), fastq_files) + feature_table = qiime_feature_table(Metaxa.data_collector(taxonomy_files...)) + rarefaction_min = minimum( + eachrow( + combine( + feature_table, + names(feature_table, Not(:taxonomy, Symbol("#SampleId"))) .=> sum, + renamecols = false, + ), + )..., + ) + rarefaction_max = maximum( + eachrow( + combine( + feature_table, + names(feature_table, Not(:taxonomy, Symbol("#SampleId"))) .=> sum, + renamecols = false, + ), + )..., + ) + @show rarefaction_min + @show rarefaction_max + + cp(Qiime.biom_feature_table(feature_table), cwd()) cowsay("Hello from Cowcalf_rumen_metagenomic_pipeline") return 0 diff --git a/src/Metaxa.jl b/src/Metaxa.jl index fd40bee..57f5fbb 100644 --- a/src/Metaxa.jl +++ b/src/Metaxa.jl @@ -1,18 +1,15 @@ module Metaxa using Conda: runconda +using FilePathsBase: AbstractPath, Path, absolute, exists include("ProcessHelper.jl") -using .ProcessHelper: exec_in_temp_dir +using .ProcessHelper: cache_function_value export taxonomy -function _classifier( - samplename::AbstractString, - fastq1::AbstractString, - fastq2::AbstractString, -) +function _classifier(samplename::AbstractString, fastq1::AbstractPath, fastq2::AbstractPath) runconda( `run metaxa2 \ -1 $fastq1 \ @@ -27,12 +24,12 @@ function _classifier( `, :metaxa2, ) - ispath("$samplename.taxonomy.txt") || + exists(Path("$samplename.taxonomy.txt")) || error("metaxa2 ran, but $samplename.taxonomy.txt was not found!") - return abspath("$samplename.taxonomy.txt") + return absolute(Path("$samplename.taxonomy.txt")) end #function -function _taxonomy_traversal(samplename::AbstractString, taxonomy::AbstractString) +function _taxonomy_traversal(samplename::AbstractString, taxonomy::AbstractPath) runconda( `run metaxa2_ttt \ -i $taxonomy \ @@ -43,35 +40,33 @@ function _taxonomy_traversal(samplename::AbstractString, taxonomy::AbstractStrin `, :metaxa2, ) - ispath("$samplename.level_7.txt") || + exists(Path("$samplename.level_7.txt")) || error("metaxa2 ran, but $samplename.level_7.txt was not found!") - return abspath("$samplename.level_7.txt") + return absolute(Path("$samplename.level_7.txt")) end #function -function taxonomy( - samplename::AbstractString, - fastq::Tuple{<:AbstractString,<:AbstractString}, -) - taxonomy_file = exec_in_temp_dir(_classifier, samplename, fastq...) - level_7_taxonomy_file = exec_in_temp_dir(_taxonomy_traversal, samplename, taxonomy_file) +function taxonomy(samplename::AbstractString, fastq::AbstractPath...) + taxonomy_file = cache_function_value(_classifier, samplename, fastq...) + level_7_taxonomy_file = + cache_function_value(_taxonomy_traversal, samplename, taxonomy_file) return level_7_taxonomy_file end #function -function _dc(taxonomies::AbstractString...) +function _dc(taxonomies::AbstractPath...) runconda( `run metaxa2_dc \ -o feature-table.tsv \ - $(join(taxonomies, ' ')) + $(map(basename, taxonomies)) `, :metaxa2, ) - ispath("feature-table.tsv") || + exists(Path("feature-table.tsv")) || error("metaxa2 ran, but feature-table.tsv was not found!") - return abspath("feature-table.tsv") + return absolute(Path("feature-table.tsv")) end #function -function data_collector(taxonomies::AbstractString...) - return exec_in_temp_dir(_dc, taxonomies...) +function data_collector(taxonomies::AbstractPath...) + return cache_function_value(_dc, taxonomies...) end #function end #module diff --git a/src/ProcessHelper.jl b/src/ProcessHelper.jl index ca01370..e87ee8e 100644 --- a/src/ProcessHelper.jl +++ b/src/ProcessHelper.jl @@ -1,36 +1,107 @@ module ProcessHelper -export sym_temp +using BaseDirs: BaseDirs +using CodecZlib: GzipCompressorStream, GzipDecompressorStream +using CSV: CSV +using DataFrames: DataFrame +using FilePathsBase: AbstractPath, Path, canonicalize, exists +using SHA: SHA1_CTX, update!, digest! -""" - sym_temp(file::AbstractString) -> (tmp_dir, link) - sym_temp(files::Tuple{<:AbstractString,<:AbstractString}) -> (tmp_dir, links) +export cache_function_value +export exec_in_temp_dir -Copies `file(s)` to a new temporary directory named `tmp_dir` and symbolically links them -inside of that directory. Returns a tuple of the directory path and the path to the links. -""" -function sym_temp(files::AbstractString...) - tmp_dir = mktempdir(; cleanup = false) +const CACHE_DIR = Path( + BaseDirs.User.cache( + BaseDirs.Project("Cowcalf_rumen_metagenomic_pipeline"); + create = true, + ), +) + +function exec_in_temp_dir(f::Function, args...) + tmp_dir = Path(mktempdir(; cleanup = false)) @info "Creating temporary directory $tmp_dir" - function _symlink(file::AbstractString) - symlink_path = joinpath(tmp_dir, basename(file)) - symlink(realpath(file), symlink_path) - @info "Symlinked $file to $symlink_path" - return symlink_path - end #function + if any(a -> a isa AbstractPath, args) + newargs = [] + for arg in args + if arg isa AbstractPath + symlink_path = joinpath(tmp_dir, basename(arg)) + symlink(canonicalize(arg), symlink_path) + @info "Symlinked $arg to $symlink_path" + push!(newargs, symlink_path) + else + push!(newargs, arg) + end #if + end #for + + return cd(() -> f(newargs...), tmp_dir) + else + return cd(() -> f(args...), tmp_dir) + end #if - return (tmp_dir, map(_symlink, files)) end #function -function exec_in_temp_dir(f::Function, samplename::AbstractString, files::AbstractString...) - tmp_dir, tmp_files = sym_temp(files...) - return cd(() -> f(samplename, tmp_files...), tmp_dir) -end #function +function cache_function_value(f::Function, args...) + ctx = SHA1_CTX() + update!(ctx, Vector{UInt8}(String(Symbol(f)))) + for arg in args + if arg isa AbstractPath + open(string(arg), "r") do a + update!(ctx, read(a)) + end #do + elseif arg isa DataFrame + for row in CSV.RowWriter(arg) + update!(ctx, Vector{UInt8}(row)) + end #for + else + update!(ctx, Vector{UInt8}(String(arg))) + end #if + end #for + input_hash = bytes2hex(digest!(ctx)) + + cache_file = joinpath(CACHE_DIR, input_hash) + + if exists(cache_file) + @info "Found cached file for function `$(String(Symbol(f)))` with input hash $input_hash" + tmp_dir = Path(mktempdir(; cleanup = false)) + @info "Creating temporary directory $tmp_dir" + + filename = readchomp(string(cache_file, ".filename")) + uncached_file = joinpath(tmp_dir, filename) + + open(GzipDecompressorStream, string(cache_file), "r") do compressed + open(string(uncached_file), "w") do uncompressed + write(uncompressed, read(compressed)) + end #do + end #do + + @info "Copied cached file for $input_hash to $uncached_file" + return canonicalize(uncached_file) + + end #if + + @info "Input hash $input_hash not found for function `$(String(Symbol(f)))`" + + result_file = exec_in_temp_dir(f, args...) + + result_file isa AbstractPath || error( + "`cache_function_value` can only work on functions that return an AbstractPath", + ) + + filename = basename(result_file) + open(string(cache_file, ".filename"), "w") do fnf + write(fnf, filename) + end #do + + open(string(result_file), "r") do uncompressed + open(GzipCompressorStream, string(cache_file), "w") do compressed + write(compressed, uncompressed) + end #do + end #do + @info "Copied result file $result_file into cache under hash $input_hash" + + return result_file -function exec_in_temp_dir(f::Function, files::AbstractString...) - tmp_dir, tmp_files = sym_temp(files...) - return cd(() -> f(tmp_files...), tmp_dir) end #function end #module diff --git a/src/Qiime.jl b/src/Qiime.jl new file mode 100644 index 0000000..89b8378 --- /dev/null +++ b/src/Qiime.jl @@ -0,0 +1,33 @@ +module Qiime + +using Base: cache_file_entry +using Conda: runconda +using CSV: CSV +using DataFrames: DataFrame +using FilePathsBase: AbstractPath, Path, absolute, exists + +include("ProcessHelper.jl") + +using .ProcessHelper: cache_function_value + +function _write_biom_feature_table(df_feature_table::DataFrame) + CSV.write("feature-table.tsv", df_feature_table; delim = '\t') + runconda( + `run biom convert \ + -i feature-table.tsv \ + -o feature-table.hdf5.biom \ + --table-type="OTU table" \ + --to-hdf5 \ + --process-obs-metadata taxonomy`, + :qiime2, + ) + exists(Path("feature-table.hdf5.biom")) || + error("qiime2 ran, but feature-table.hdf5.biom was not found!") + return absolute(Path("feature-table.hdf5.biom")) +end #function + +function biom_feature_table(df_feature_table::DataFrame) + return cache_function_value(_write_biom_feature_table, df_feature_table) +end #function + +end #module