Compare commits

..

21 commits

Author SHA1 Message Date
f1072a4bb7
feat: Add biom conversion step 2025-04-18 18:27:14 -05:00
e5db185133
feat: Add DataFrame compat to cache 2025-04-18 18:26:43 -05:00
630385829f fixup! feat: Add feature table manipulation 2025-04-18 18:25:48 -05:00
9d124cb796 fixup! feat: Add feature table manipulation 2025-04-18 18:25:21 -05:00
4462ddab66 fixup! feat: Add feature table manipulation 2025-04-18 18:25:21 -05:00
0c09cd9e33 fixup! feat: Add feature table manipulation 2025-04-18 18:25:21 -05:00
30600e5e18
feat: Add feature table manipulation 2025-04-18 17:07:53 -05:00
5c74bf3d69 fixup! feat: Add caching function 2025-04-18 16:08:48 -05:00
e0b6a056ff
refactor: Switch to using cached file processes in Metaxa 2025-04-18 16:07:59 -05:00
d97c0898bc
feat: Add caching function 2025-04-18 16:06:56 -05:00
516cc4985c
refactor: Make exec_in_temp_dir more generic
Instead of having the function have
multiple methods with different signatures,
we can slurp args and then use reflection
to symlink any Path objects while passing
the values unmodified. So do that.
2025-04-18 08:35:53 -05:00
7b3cb740d9
meta: Add input files to gitignore 2025-04-15 16:20:38 -05:00
adfef5b9b3
refactor: Use FilePathsBase for all file paths 2025-04-15 16:19:09 -05:00
0f7102b9de
fix: Make metaxa2_dc use relative paths instead of absolute 2025-04-15 15:20:47 -05:00
10fdcd7b3f
perf: Make conda not re-init an existing env 2025-04-14 16:50:32 -05:00
418c7f1088
fix: Don't copy over a directory with a file 2025-04-14 16:00:52 -05:00
8e68ea3028
fix: add run to metaxa_dc command 2025-04-14 15:49:03 -05:00
3130e92395
feat: Add metaxa steps 2025-04-14 11:12:41 -05:00
9df8163123
style: Add formatter requirements 2025-04-11 21:05:13 -05:00
81f49a8ad8
feat: Add Conda environment installer 2025-04-11 21:01:45 -05:00
9a91fed82f
feat: Bootstrap basic package 2025-04-11 09:27:43 -05:00
8 changed files with 438 additions and 1 deletions

1
.JuliaFormatter.toml Normal file
View file

@ -0,0 +1 @@
style="blue"

32
.gitignore vendored
View file

@ -2,4 +2,34 @@
!.vscode/settings.json
!.vscode/tasks.json
!.vscode/launch.json
!.vscode/extensions.json
!.vscode/extensions.json
### Julia gitignore ###
## Files generated by invoking Julia with --code-coverage
*.jl.cov
*.jl.*.cov
# Files generated by invoking Julia with --track-allocation
*.jl.mem
# System-specific files and directories generated by the BinaryProvider and BinDeps packages
# They contain absolute paths specific to the host computer, and so should not be committed
deps/deps.jl
deps/build.log
deps/downloads/
deps/usr/
deps/src/
# Build artifacts for creating documentation generated by the Documenter package
docs/build/
docs/site/
# File generated by Pkg, the package manager, based on a corresponding Project.toml
# It records a fixed state of all packages used by the project. As such, it should not be
# committed for packages, but should be committed for applications that require a static
# environment.
Manifest.toml
### Input files ###
*.fastq.gz
metadata*.tsv

43
Project.toml Normal file
View file

@ -0,0 +1,43 @@
name = "Cowcalf_rumen_metagenomic_pipeline"
uuid = "7d0d08ee-6932-474e-8e49-cd3f4679ce2d"
authors = ["Thomas A. Christensen II <25492070+MillironX@users.noreply.github.com> and contributors"]
version = "0.1.0"
[deps]
BaseDirs = "18cc8868-cbac-4acf-b575-c8ff214dc66f"
CSV = "336ed68f-0bac-5ca0-87d4-7b16caf5d00b"
CodecZlib = "944b1d66-785c-5afd-91f1-9de20f533193"
Conda = "8f4d0f93-b110-5947-807f-2305c1781a2d"
Cowsay = "b6370f49-8ad1-4651-ad9e-3639b35da0e9"
DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0"
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
FilePathsBase = "48062228-2e41-5def-b9a4-89aafe57970f"
Glob = "c27321d9-0574-5035-807b-f59d2c89b15c"
HTTP = "cd3eb016-35fb-5094-929b-558a96fad6f3"
SHA = "ea8e919c-243c-51af-8825-aaa63cd721ce"
URIs = "5c2747f8-b7ea-4ff2-ba2e-563bfd36b1d4"
UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4"
YAML = "ddb6d928-2868-570f-bddf-ab3f9cf99eb6"
[compat]
BaseDirs = "1.2.4"
CSV = "0.10.15"
CodecZlib = "0.7.8"
Conda = "1.10.2"
Cowsay = "1.0.0"
DataFrames = "1.7.0"
Distributed = "1.11.0"
FilePathsBase = "0.9.24"
Glob = "1.3.1"
HTTP = "1.10.16"
SHA = "0.7.0"
URIs = "1.5.2"
UUIDs = "1.11.0"
YAML = "0.4.13"
julia = "1.11"
[extras]
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"
[targets]
test = ["Test"]

8
conda_envs/metaxa2.yml Normal file
View file

@ -0,0 +1,8 @@
channels:
- bioconda
- conda-forge
dependencies:
- metaxa=2.2
- blast-legacy=2.2.26
- hmmer=3.1
- mafft=7.525

View file

@ -0,0 +1,143 @@
module Cowcalf_rumen_metagenomic_pipeline
using Conda: Conda, runconda
using Cowsay: cowsay
using CSV: CSV
using DataFrames: DataFrame, Not, combine, eachrow, nrow, rename!, select!
using Distributed: pmap, @everywhere
using FilePathsBase: AbstractPath, Path, absolute, cwd, @__PATH__, @p_str
using Glob: GlobMatch, glob
using HTTP: HTTP
using URIs: URI
using UUIDs: uuid4
using YAML: YAML
export main
include("Metaxa.jl")
include("Qiime.jl")
using .Metaxa: Metaxa
using .Qiime: Qiime
_fetch_yaml_contents(yaml::AbstractPath) = YAML.load_file(string(yaml))
_fetch_yaml_contents(yaml::URI) = YAML.load(String(HTTP.get(yaml).body))
function setup_remote_conda_environment(yaml::Union{AbstractPath,URI}, env_name::Symbol)
# Check for the existence of the environment with the same name before creating one
# We assume that the existence of the environment means that it has been properly setup
# TODO: Provide a way to force re-initialization of a conda environment
isdir(Conda.prefix(env_name)) && return env_name
ENV["CONDA_JL_USE_MINIFORGE"] = "1"
# Install x86 packages via Rosetta2 on MacOS
if Sys.isapple()
ENV["CONDA_SUBDIR"] = "osx-64"
runconda(`config --env --set subdir osx-64`, env_name)
end #if
conda_definition = _fetch_yaml_contents(yaml)
map(c -> Conda.add_channel(c, env_name), conda_definition["channels"])
Conda.add(conda_definition["dependencies"], env_name)
return env_name
end #function
function import_metadata(metadata_tsv::AbstractPath)
df = DataFrame(CSV.File(metadata_tsv))
rename!(df, 1 => :sample_name)
return df
end #function
function sample_files(samplenames::Vector{<:AbstractString})
function _s(samplename::AbstractString)
# Use explicit GlobMatch constructor b/c we need to interpolate values
return (
samplename,
(
absolute(Path(first(glob(GlobMatch("$(samplename)*1*.fastq.gz"))))),
absolute(Path(first(glob(GlobMatch("$(samplename)*2*.fastq.gz"))))),
),
)
end #function
return map(_s, samplenames)
end #function
function qiime_feature_table(table_file::AbstractPath)
df = DataFrame(CSV.File(table_file))
# Biom format requires that taxonomy hierarchy be named exactly "taxonomy"
rename!(df, 1 => :taxonomy)
# Every OTU in a feature table must have a universally unique id
df[!, Symbol("#SampleId")] = [uuid4() for i = 1:nrow(df)]
# Biom tables are sensitive to the position of the columns:
# arrange them so that #SampleId is first, taxonomy is last,
# and all the observed counts are in the middle
select!(
feature_table,
Symbol("#SampleId"),
Not([Symbol("#SampleId"), :taxonomy]),
:taxonomy,
)
return df
end #function
function (@main)(ARGS)
metadata_file = Path(pop!(ARGS))
setup_remote_conda_environment(
URI(
"https://data.qiime2.org/distro/metagenome/qiime2-metagenome-2024.10-py310-osx-conda.yml",
),
:qiime2,
)
setup_remote_conda_environment(
joinpath(@__PATH__, p"..", p"conda_envs", p"metaxa2.yml"),
:metaxa2,
)
metadata = import_metadata(metadata_file)
fastq_files = sample_files(metadata[!, :sample_name])
@eval begin
@everywhere begin
include(joinpath(@__DIR__, "Metaxa.jl"))
include(joinpath(@__DIR__, "Qiime.jl"))
using .Metaxa: Metaxa
using .Qiime: Qiime
end #@everywhere
end #@eval
taxonomy_files = pmap(x -> Metaxa.taxonomy(first(x), last(x)...), fastq_files)
feature_table = qiime_feature_table(Metaxa.data_collector(taxonomy_files...))
rarefaction_min = minimum(
eachrow(
combine(
feature_table,
names(feature_table, Not(:taxonomy, Symbol("#SampleId"))) .=> sum,
renamecols = false,
),
)...,
)
rarefaction_max = maximum(
eachrow(
combine(
feature_table,
names(feature_table, Not(:taxonomy, Symbol("#SampleId"))) .=> sum,
renamecols = false,
),
)...,
)
@show rarefaction_min
@show rarefaction_max
cp(Qiime.biom_feature_table(feature_table), cwd())
cowsay("Hello from Cowcalf_rumen_metagenomic_pipeline")
return 0
end #function
end #module

72
src/Metaxa.jl Normal file
View file

@ -0,0 +1,72 @@
module Metaxa
using Conda: runconda
using FilePathsBase: AbstractPath, Path, absolute, exists
include("ProcessHelper.jl")
using .ProcessHelper: cache_function_value
export taxonomy
function _classifier(samplename::AbstractString, fastq1::AbstractPath, fastq2::AbstractPath)
runconda(
`run metaxa2 \
-1 $fastq1 \
-2 $fastq2 \
-o $samplename \
--format fastq \
--cpu 4 \
--summary F \
--graphical F \
--fasta F \
--taxonomy T
`,
:metaxa2,
)
exists(Path("$samplename.taxonomy.txt")) ||
error("metaxa2 ran, but $samplename.taxonomy.txt was not found!")
return absolute(Path("$samplename.taxonomy.txt"))
end #function
function _taxonomy_traversal(samplename::AbstractString, taxonomy::AbstractPath)
runconda(
`run metaxa2_ttt \
-i $taxonomy \
-o $samplename \
-m 7 \
-n 7 \
--summary F
`,
:metaxa2,
)
exists(Path("$samplename.level_7.txt")) ||
error("metaxa2 ran, but $samplename.level_7.txt was not found!")
return absolute(Path("$samplename.level_7.txt"))
end #function
function taxonomy(samplename::AbstractString, fastq::AbstractPath...)
taxonomy_file = cache_function_value(_classifier, samplename, fastq...)
level_7_taxonomy_file =
cache_function_value(_taxonomy_traversal, samplename, taxonomy_file)
return level_7_taxonomy_file
end #function
function _dc(taxonomies::AbstractPath...)
runconda(
`run metaxa2_dc \
-o feature-table.tsv \
$(map(basename, taxonomies))
`,
:metaxa2,
)
exists(Path("feature-table.tsv")) ||
error("metaxa2 ran, but feature-table.tsv was not found!")
return absolute(Path("feature-table.tsv"))
end #function
function data_collector(taxonomies::AbstractPath...)
return cache_function_value(_dc, taxonomies...)
end #function
end #module

107
src/ProcessHelper.jl Normal file
View file

@ -0,0 +1,107 @@
module ProcessHelper
using BaseDirs: BaseDirs
using CodecZlib: GzipCompressorStream, GzipDecompressorStream
using CSV: CSV
using DataFrames: DataFrame
using FilePathsBase: AbstractPath, Path, canonicalize, exists
using SHA: SHA1_CTX, update!, digest!
export cache_function_value
export exec_in_temp_dir
const CACHE_DIR = Path(
BaseDirs.User.cache(
BaseDirs.Project("Cowcalf_rumen_metagenomic_pipeline");
create = true,
),
)
function exec_in_temp_dir(f::Function, args...)
tmp_dir = Path(mktempdir(; cleanup = false))
@info "Creating temporary directory $tmp_dir"
if any(a -> a isa AbstractPath, args)
newargs = []
for arg in args
if arg isa AbstractPath
symlink_path = joinpath(tmp_dir, basename(arg))
symlink(canonicalize(arg), symlink_path)
@info "Symlinked $arg to $symlink_path"
push!(newargs, symlink_path)
else
push!(newargs, arg)
end #if
end #for
return cd(() -> f(newargs...), tmp_dir)
else
return cd(() -> f(args...), tmp_dir)
end #if
end #function
function cache_function_value(f::Function, args...)
ctx = SHA1_CTX()
update!(ctx, Vector{UInt8}(String(Symbol(f))))
for arg in args
if arg isa AbstractPath
open(string(arg), "r") do a
update!(ctx, read(a))
end #do
elseif arg isa DataFrame
for row in CSV.RowWriter(arg)
update!(ctx, Vector{UInt8}(row))
end #for
else
update!(ctx, Vector{UInt8}(String(arg)))
end #if
end #for
input_hash = bytes2hex(digest!(ctx))
cache_file = joinpath(CACHE_DIR, input_hash)
if exists(cache_file)
@info "Found cached file for function `$(String(Symbol(f)))` with input hash $input_hash"
tmp_dir = Path(mktempdir(; cleanup = false))
@info "Creating temporary directory $tmp_dir"
filename = readchomp(string(cache_file, ".filename"))
uncached_file = joinpath(tmp_dir, filename)
open(GzipDecompressorStream, string(cache_file), "r") do compressed
open(string(uncached_file), "w") do uncompressed
write(uncompressed, read(compressed))
end #do
end #do
@info "Copied cached file for $input_hash to $uncached_file"
return canonicalize(uncached_file)
end #if
@info "Input hash $input_hash not found for function `$(String(Symbol(f)))`"
result_file = exec_in_temp_dir(f, args...)
result_file isa AbstractPath || error(
"`cache_function_value` can only work on functions that return an AbstractPath",
)
filename = basename(result_file)
open(string(cache_file, ".filename"), "w") do fnf
write(fnf, filename)
end #do
open(string(result_file), "r") do uncompressed
open(GzipCompressorStream, string(cache_file), "w") do compressed
write(compressed, uncompressed)
end #do
end #do
@info "Copied result file $result_file into cache under hash $input_hash"
return result_file
end #function
end #module

33
src/Qiime.jl Normal file
View file

@ -0,0 +1,33 @@
module Qiime
using Base: cache_file_entry
using Conda: runconda
using CSV: CSV
using DataFrames: DataFrame
using FilePathsBase: AbstractPath, Path, absolute, exists
include("ProcessHelper.jl")
using .ProcessHelper: cache_function_value
function _write_biom_feature_table(df_feature_table::DataFrame)
CSV.write("feature-table.tsv", df_feature_table; delim = '\t')
runconda(
`run biom convert \
-i feature-table.tsv \
-o feature-table.hdf5.biom \
--table-type="OTU table" \
--to-hdf5 \
--process-obs-metadata taxonomy`,
:qiime2,
)
exists(Path("feature-table.hdf5.biom")) ||
error("qiime2 ran, but feature-table.hdf5.biom was not found!")
return absolute(Path("feature-table.hdf5.biom"))
end #function
function biom_feature_table(df_feature_table::DataFrame)
return cache_function_value(_write_biom_feature_table, df_feature_table)
end #function
end #module