Compare commits

..

No commits in common. "f1072a4bb752abb4df2f244a390b2449cdfeee90" and "418c7f10884401258485133e98e06f8e5e81285b" have entirely different histories.

6 changed files with 59 additions and 223 deletions

4
.gitignore vendored
View file

@ -29,7 +29,3 @@ docs/site/
# committed for packages, but should be committed for applications that require a static
# environment.
Manifest.toml
### Input files ###
*.fastq.gz
metadata*.tsv

View file

@ -4,35 +4,27 @@ authors = ["Thomas A. Christensen II <25492070+MillironX@users.noreply.github.co
version = "0.1.0"
[deps]
BaseDirs = "18cc8868-cbac-4acf-b575-c8ff214dc66f"
CSV = "336ed68f-0bac-5ca0-87d4-7b16caf5d00b"
CodecZlib = "944b1d66-785c-5afd-91f1-9de20f533193"
Conda = "8f4d0f93-b110-5947-807f-2305c1781a2d"
Cowsay = "b6370f49-8ad1-4651-ad9e-3639b35da0e9"
Dagger = "d58978e5-989f-55fb-8d15-ea34adc7bf54"
DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0"
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
FilePathsBase = "48062228-2e41-5def-b9a4-89aafe57970f"
Glob = "c27321d9-0574-5035-807b-f59d2c89b15c"
HTTP = "cd3eb016-35fb-5094-929b-558a96fad6f3"
SHA = "ea8e919c-243c-51af-8825-aaa63cd721ce"
URIs = "5c2747f8-b7ea-4ff2-ba2e-563bfd36b1d4"
UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4"
YAML = "ddb6d928-2868-570f-bddf-ab3f9cf99eb6"
[compat]
BaseDirs = "1.2.4"
CSV = "0.10.15"
CodecZlib = "0.7.8"
Conda = "1.10.2"
Cowsay = "1.0.0"
Dagger = "0.18.14"
DataFrames = "1.7.0"
Distributed = "1.11.0"
FilePathsBase = "0.9.24"
Glob = "1.3.1"
HTTP = "1.10.16"
SHA = "0.7.0"
URIs = "1.5.2"
UUIDs = "1.11.0"
YAML = "0.4.13"
julia = "1.11"

View file

@ -3,31 +3,22 @@ module Cowcalf_rumen_metagenomic_pipeline
using Conda: Conda, runconda
using Cowsay: cowsay
using CSV: CSV
using DataFrames: DataFrame, Not, combine, eachrow, nrow, rename!, select!
using DataFrames: DataFrame, rename!
using Distributed: pmap, @everywhere
using FilePathsBase: AbstractPath, Path, absolute, cwd, @__PATH__, @p_str
using Glob: GlobMatch, glob
using HTTP: HTTP
using URIs: URI
using UUIDs: uuid4
using YAML: YAML
export main
include("Metaxa.jl")
include("Qiime.jl")
using .Metaxa: Metaxa
using .Qiime: Qiime
_fetch_yaml_contents(yaml::AbstractPath) = YAML.load_file(string(yaml))
_fetch_yaml_contents(yaml::AbstractString) = YAML.load_file(yaml)
_fetch_yaml_contents(yaml::URI) = YAML.load(String(HTTP.get(yaml).body))
function setup_remote_conda_environment(yaml::Union{AbstractPath,URI}, env_name::Symbol)
# Check for the existence of the environment with the same name before creating one
# We assume that the existence of the environment means that it has been properly setup
# TODO: Provide a way to force re-initialization of a conda environment
isdir(Conda.prefix(env_name)) && return env_name
function setup_remote_conda_environment(yaml::Union{AbstractString,URI}, env_name::Symbol)
ENV["CONDA_JL_USE_MINIFORGE"] = "1"
# Install x86 packages via Rosetta2 on MacOS
@ -44,7 +35,7 @@ function setup_remote_conda_environment(yaml::Union{AbstractPath,URI}, env_name:
return env_name
end #function
function import_metadata(metadata_tsv::AbstractPath)
function import_metadata(metadata_tsv::AbstractString)
df = DataFrame(CSV.File(metadata_tsv))
rename!(df, 1 => :sample_name)
return df
@ -56,8 +47,8 @@ function sample_files(samplenames::Vector{<:AbstractString})
return (
samplename,
(
absolute(Path(first(glob(GlobMatch("$(samplename)*1*.fastq.gz"))))),
absolute(Path(first(glob(GlobMatch("$(samplename)*2*.fastq.gz"))))),
abspath(first(glob(GlobMatch("$(samplename)*1*.fastq.gz")))),
abspath(first(glob(GlobMatch("$(samplename)*2*.fastq.gz")))),
),
)
end #function
@ -65,29 +56,8 @@ function sample_files(samplenames::Vector{<:AbstractString})
return map(_s, samplenames)
end #function
function qiime_feature_table(table_file::AbstractPath)
df = DataFrame(CSV.File(table_file))
# Biom format requires that taxonomy hierarchy be named exactly "taxonomy"
rename!(df, 1 => :taxonomy)
# Every OTU in a feature table must have a universally unique id
df[!, Symbol("#SampleId")] = [uuid4() for i = 1:nrow(df)]
# Biom tables are sensitive to the position of the columns:
# arrange them so that #SampleId is first, taxonomy is last,
# and all the observed counts are in the middle
select!(
feature_table,
Symbol("#SampleId"),
Not([Symbol("#SampleId"), :taxonomy]),
:taxonomy,
)
return df
end #function
function (@main)(ARGS)
metadata_file = Path(pop!(ARGS))
metadata_file = pop!(ARGS)
setup_remote_conda_environment(
URI(
@ -96,7 +66,7 @@ function (@main)(ARGS)
:qiime2,
)
setup_remote_conda_environment(
joinpath(@__PATH__, p"..", p"conda_envs", p"metaxa2.yml"),
joinpath(@__DIR__, "..", "conda_envs", "metaxa2.yml"),
:metaxa2,
)
@ -106,35 +76,12 @@ function (@main)(ARGS)
@eval begin
@everywhere begin
include(joinpath(@__DIR__, "Metaxa.jl"))
include(joinpath(@__DIR__, "Qiime.jl"))
using .Metaxa: Metaxa
using .Qiime: Qiime
end #@everywhere
end #@eval
taxonomy_files = pmap(x -> Metaxa.taxonomy(first(x), last(x)...), fastq_files)
feature_table = qiime_feature_table(Metaxa.data_collector(taxonomy_files...))
rarefaction_min = minimum(
eachrow(
combine(
feature_table,
names(feature_table, Not(:taxonomy, Symbol("#SampleId"))) .=> sum,
renamecols = false,
),
)...,
)
rarefaction_max = maximum(
eachrow(
combine(
feature_table,
names(feature_table, Not(:taxonomy, Symbol("#SampleId"))) .=> sum,
renamecols = false,
),
)...,
)
@show rarefaction_min
@show rarefaction_max
cp(Qiime.biom_feature_table(feature_table), cwd())
taxonomy_files = pmap(x -> Metaxa.taxonomy(first(x), last(x)), fastq_files)
feature_table = Metaxa.data_collector(taxonomy_files...)
cp(feature_table, joinpath(pwd(), "feature-table.tsv"))
cowsay("Hello from Cowcalf_rumen_metagenomic_pipeline")
return 0

View file

@ -1,15 +1,18 @@
module Metaxa
using Conda: runconda
using FilePathsBase: AbstractPath, Path, absolute, exists
include("ProcessHelper.jl")
using .ProcessHelper: cache_function_value
using .ProcessHelper: exec_in_temp_dir
export taxonomy
function _classifier(samplename::AbstractString, fastq1::AbstractPath, fastq2::AbstractPath)
function _classifier(
samplename::AbstractString,
fastq1::AbstractString,
fastq2::AbstractString,
)
runconda(
`run metaxa2 \
-1 $fastq1 \
@ -24,12 +27,12 @@ function _classifier(samplename::AbstractString, fastq1::AbstractPath, fastq2::A
`,
:metaxa2,
)
exists(Path("$samplename.taxonomy.txt")) ||
ispath("$samplename.taxonomy.txt") ||
error("metaxa2 ran, but $samplename.taxonomy.txt was not found!")
return absolute(Path("$samplename.taxonomy.txt"))
return abspath("$samplename.taxonomy.txt")
end #function
function _taxonomy_traversal(samplename::AbstractString, taxonomy::AbstractPath)
function _taxonomy_traversal(samplename::AbstractString, taxonomy::AbstractString)
runconda(
`run metaxa2_ttt \
-i $taxonomy \
@ -40,33 +43,35 @@ function _taxonomy_traversal(samplename::AbstractString, taxonomy::AbstractPath)
`,
:metaxa2,
)
exists(Path("$samplename.level_7.txt")) ||
ispath("$samplename.level_7.txt") ||
error("metaxa2 ran, but $samplename.level_7.txt was not found!")
return absolute(Path("$samplename.level_7.txt"))
return abspath("$samplename.level_7.txt")
end #function
function taxonomy(samplename::AbstractString, fastq::AbstractPath...)
taxonomy_file = cache_function_value(_classifier, samplename, fastq...)
level_7_taxonomy_file =
cache_function_value(_taxonomy_traversal, samplename, taxonomy_file)
function taxonomy(
samplename::AbstractString,
fastq::Tuple{<:AbstractString,<:AbstractString},
)
taxonomy_file = exec_in_temp_dir(_classifier, samplename, fastq...)
level_7_taxonomy_file = exec_in_temp_dir(_taxonomy_traversal, samplename, taxonomy_file)
return level_7_taxonomy_file
end #function
function _dc(taxonomies::AbstractPath...)
function _dc(taxonomies::AbstractString...)
runconda(
`run metaxa2_dc \
-o feature-table.tsv \
$(map(basename, taxonomies))
$(join(taxonomies, ' '))
`,
:metaxa2,
)
exists(Path("feature-table.tsv")) ||
ispath("feature-table.tsv") ||
error("metaxa2 ran, but feature-table.tsv was not found!")
return absolute(Path("feature-table.tsv"))
return abspath("feature-table.tsv")
end #function
function data_collector(taxonomies::AbstractPath...)
return cache_function_value(_dc, taxonomies...)
function data_collector(taxonomies::AbstractString...)
return exec_in_temp_dir(_dc, taxonomies...)
end #function
end #module

View file

@ -1,107 +1,36 @@
module ProcessHelper
using BaseDirs: BaseDirs
using CodecZlib: GzipCompressorStream, GzipDecompressorStream
using CSV: CSV
using DataFrames: DataFrame
using FilePathsBase: AbstractPath, Path, canonicalize, exists
using SHA: SHA1_CTX, update!, digest!
export sym_temp
export cache_function_value
export exec_in_temp_dir
"""
sym_temp(file::AbstractString) -> (tmp_dir, link)
sym_temp(files::Tuple{<:AbstractString,<:AbstractString}) -> (tmp_dir, links)
const CACHE_DIR = Path(
BaseDirs.User.cache(
BaseDirs.Project("Cowcalf_rumen_metagenomic_pipeline");
create = true,
),
)
function exec_in_temp_dir(f::Function, args...)
tmp_dir = Path(mktempdir(; cleanup = false))
Copies `file(s)` to a new temporary directory named `tmp_dir` and symbolically links them
inside of that directory. Returns a tuple of the directory path and the path to the links.
"""
function sym_temp(files::AbstractString...)
tmp_dir = mktempdir(; cleanup = false)
@info "Creating temporary directory $tmp_dir"
if any(a -> a isa AbstractPath, args)
newargs = []
for arg in args
if arg isa AbstractPath
symlink_path = joinpath(tmp_dir, basename(arg))
symlink(canonicalize(arg), symlink_path)
@info "Symlinked $arg to $symlink_path"
push!(newargs, symlink_path)
else
push!(newargs, arg)
end #if
end #for
return cd(() -> f(newargs...), tmp_dir)
else
return cd(() -> f(args...), tmp_dir)
end #if
function _symlink(file::AbstractString)
symlink_path = joinpath(tmp_dir, basename(file))
symlink(realpath(file), symlink_path)
@info "Symlinked $file to $symlink_path"
return symlink_path
end #function
return (tmp_dir, map(_symlink, files))
end #function
function cache_function_value(f::Function, args...)
ctx = SHA1_CTX()
update!(ctx, Vector{UInt8}(String(Symbol(f))))
for arg in args
if arg isa AbstractPath
open(string(arg), "r") do a
update!(ctx, read(a))
end #do
elseif arg isa DataFrame
for row in CSV.RowWriter(arg)
update!(ctx, Vector{UInt8}(row))
end #for
else
update!(ctx, Vector{UInt8}(String(arg)))
end #if
end #for
input_hash = bytes2hex(digest!(ctx))
cache_file = joinpath(CACHE_DIR, input_hash)
if exists(cache_file)
@info "Found cached file for function `$(String(Symbol(f)))` with input hash $input_hash"
tmp_dir = Path(mktempdir(; cleanup = false))
@info "Creating temporary directory $tmp_dir"
filename = readchomp(string(cache_file, ".filename"))
uncached_file = joinpath(tmp_dir, filename)
open(GzipDecompressorStream, string(cache_file), "r") do compressed
open(string(uncached_file), "w") do uncompressed
write(uncompressed, read(compressed))
end #do
end #do
@info "Copied cached file for $input_hash to $uncached_file"
return canonicalize(uncached_file)
end #if
@info "Input hash $input_hash not found for function `$(String(Symbol(f)))`"
result_file = exec_in_temp_dir(f, args...)
result_file isa AbstractPath || error(
"`cache_function_value` can only work on functions that return an AbstractPath",
)
filename = basename(result_file)
open(string(cache_file, ".filename"), "w") do fnf
write(fnf, filename)
end #do
open(string(result_file), "r") do uncompressed
open(GzipCompressorStream, string(cache_file), "w") do compressed
write(compressed, uncompressed)
end #do
end #do
@info "Copied result file $result_file into cache under hash $input_hash"
return result_file
function exec_in_temp_dir(f::Function, samplename::AbstractString, files::AbstractString...)
tmp_dir, tmp_files = sym_temp(files...)
return cd(() -> f(samplename, tmp_files...), tmp_dir)
end #function
function exec_in_temp_dir(f::Function, files::AbstractString...)
tmp_dir, tmp_files = sym_temp(files...)
return cd(() -> f(tmp_files...), tmp_dir)
end #function
end #module

View file

@ -1,33 +0,0 @@
module Qiime
using Base: cache_file_entry
using Conda: runconda
using CSV: CSV
using DataFrames: DataFrame
using FilePathsBase: AbstractPath, Path, absolute, exists
include("ProcessHelper.jl")
using .ProcessHelper: cache_function_value
function _write_biom_feature_table(df_feature_table::DataFrame)
CSV.write("feature-table.tsv", df_feature_table; delim = '\t')
runconda(
`run biom convert \
-i feature-table.tsv \
-o feature-table.hdf5.biom \
--table-type="OTU table" \
--to-hdf5 \
--process-obs-metadata taxonomy`,
:qiime2,
)
exists(Path("feature-table.hdf5.biom")) ||
error("qiime2 ran, but feature-table.hdf5.biom was not found!")
return absolute(Path("feature-table.hdf5.biom"))
end #function
function biom_feature_table(df_feature_table::DataFrame)
return cache_function_value(_write_biom_feature_table, df_feature_table)
end #function
end #module