diff --git a/Project.toml b/Project.toml index 850b570..29455b6 100644 --- a/Project.toml +++ b/Project.toml @@ -9,10 +9,10 @@ BGZFStreams = "28d598bf-9b8f-59f1-b38c-5a06b4a0f5e6" BioAlignments = "00701ae9-d1dc-5365-b64a-a3a3ebf5695e" BioGenerics = "47718e42-2ac5-11e9-14af-e5595289c2ea" BioSequences = "7e6ae17a-c86d-528c-b3b9-7f778a29fe59" -BufferedStreams = "e1450e63-4bb3-523b-b2a4-4ffa8c0fd77d" GenomicFeatures = "899a7d2d-5c61-547b-bef9-6698a8d05446" Indexes = "4ffb77ac-cb80-11e8-1b35-4b78cc642f6d" Printf = "de0858da-6303-5e67-8744-51eddeeeb8d7" +TranscodingStreams = "3bb67fe8-82b1-5028-8e26-92a6c54297fa" [compat] Automa = "0.7, 0.8" @@ -20,9 +20,9 @@ BGZFStreams = "0.3" BioAlignments = "2" BioGenerics = "0.1" BioSequences = "2" -BufferedStreams = "1" GenomicFeatures = "2" Indexes = "0.1" +TranscodingStreams = "0.6, 0.7, 0.8, 0.9" julia = "1.1" [extras] diff --git a/src/sam/reader.jl b/src/sam/reader.jl index 0406165..3270957 100644 --- a/src/sam/reader.jl +++ b/src/sam/reader.jl @@ -1,16 +1,22 @@ # SAM Reader # ========= -mutable struct Reader <: BioGenerics.IO.AbstractReader - state::State +mutable struct Reader{S <: TranscodingStream} <: BioGenerics.IO.AbstractReader + state::State{S} header::Header +end - function Reader(input::BufferedStreams.BufferedInputStream) - reader = new(State(sam_header_machine.start_state, input), Header()) - readheader!(reader) - reader.state.cs = sam_body_machine.start_state - return reader - end +function Reader(state::State{S}) where {S <: TranscodingStream} + + rdr = Reader(state, Header()) + + cs, ln, f = readheader!(rdr.state.stream, rdr.header, (sam_machine_header.start_state, rdr.state.linenum)) + + rdr.state.state = sam_machine_body.start_state + rdr.state.linenum = ln + rdr.state.filled = false + + return rdr end """ @@ -22,7 +28,19 @@ Create a data reader of the SAM file format. * `input`: data source """ function Reader(input::IO) - return Reader(BufferedStreams.BufferedInputStream(input)) + + if input isa TranscodingStream + return Reader(State(input, 1, 1, false)) + end + + stream = TranscodingStreams.NoopStream(input) + + return Reader(State(stream, 1, 1, false)) + +end + +function Base.eltype(::Type{<:Reader}) + return Record end function BioGenerics.IO.stream(reader::Reader) @@ -38,6 +56,42 @@ function header(reader::Reader)::Header return reader.header end -function Base.eltype(::Type{Reader}) - return Record +function Base.close(reader::Reader) + if reader.state.stream isa IO + close(reader.state.stream) + end + return nothing +end + +function index!(record::MetaInfo) + stream = TranscodingStreams.NoopStream(IOBuffer(record.data)) + found = index!(stream, record) + if !found + throw(ArgumentError("invalid SAM metadata")) + end + return record +end + +function index!(record::Record) + stream = TranscodingStreams.NoopStream(IOBuffer(record.data)) + found = index!(stream, record) + if !found + throw(ArgumentError("invalid SAM record")) + end + return record +end + +function Base.read!(rdr::Reader, rec::Record) + + cs, ln, f = readrecord!(rdr.state.stream, rec, (rdr.state.state, rdr.state.linenum)) + + rdr.state.state = cs + rdr.state.linenum = ln + rdr.state.filled = f + + if !f + cs == 0 && throw(EOFError()) + throw(ArgumentError("malformed SAM file")) + end + return rec end diff --git a/src/sam/readrecord.jl b/src/sam/readrecord.jl index 82ebb93..e7ea75f 100644 --- a/src/sam/readrecord.jl +++ b/src/sam/readrecord.jl @@ -1,179 +1,10 @@ -@inline function anchor!(stream::BufferedStreams.BufferedInputStream, p, immobilize = true) - stream.anchor = p - stream.immobilized = immobilize - return stream -end - -@inline function upanchor!(stream::BufferedStreams.BufferedInputStream) - @assert stream.anchor != 0 "upanchor! called with no anchor set" - anchor = stream.anchor - stream.anchor = 0 - stream.immobilized = false - return anchor -end - -function ensure_margin!(stream::BufferedStreams.BufferedInputStream) - if stream.position * 20 > length(stream.buffer) * 19 - BufferedStreams.shiftdata!(stream) - end - return nothing -end - -@inline function resize_and_copy!(dst::Vector{UInt8}, src::Vector{UInt8}, r::UnitRange{Int}) - return resize_and_copy!(dst, 1, src, r) -end - -@inline function resize_and_copy!(dst::Vector{UInt8}, dstart::Int, src::Vector{UInt8}, r::UnitRange{Int}) - rlen = length(r) - if length(dst) != dstart + rlen - 1 - resize!(dst, dstart + rlen - 1) - end - copyto!(dst, dstart, src, first(r), rlen) - return dst -end - -function generate_index_function(record_type, machine, init_code, actions; kwargs...) - kwargs = Dict(kwargs) - context = Automa.CodeGenContext( - generator = get(kwargs, :generator, :goto), - checkbounds = get(kwargs, :checkbounds, false), - loopunroll = get(kwargs, :loopunroll, 0) - ) - quote - function index!(record::$(record_type)) - data = record.data - p = 1 - p_end = p_eof = sizeof(data) - initialize!(record) - $(init_code) - cs = $(machine.start_state) - $(Automa.generate_exec_code(context, machine, actions)) - if cs != 0 - throw(ArgumentError(string("failed to index ", $(record_type), " ~>", repr(String(data[p:min(p+7,p_end)]))))) - end - @assert isfilled(record) - return record - end - end -end - -function generate_readheader_function(reader_type, metainfo_type, machine, init_code, actions, finish_code=:()) - quote - function readheader!(reader::$(reader_type)) - _readheader!(reader, reader.state) - end - - function _readheader!(reader::$(reader_type), state::State) - stream = state.stream - ensure_margin!(stream) - cs = state.cs - linenum = state.linenum - data = stream.buffer - p = stream.position - p_end = stream.available - p_eof = -1 - finish_header = false - record = $(metainfo_type)() - - $(init_code) - - while true - $(Automa.generate_exec_code(Automa.CodeGenContext(generator=:table), machine, actions)) - - state.cs = cs - state.finished = cs == 0 - state.linenum = linenum - stream.position = p - - if cs < 0 - error("$($(reader_type)) file format error on line ", linenum) - elseif finish_header - $(finish_code) - break - elseif p > p_eof ≥ 0 - error("incomplete $($(reader_type)) input on line ", linenum) - else - hits_eof = BufferedStreams.fillbuffer!(stream) == 0 - p = stream.position - p_end = stream.available - if hits_eof - p_eof = p_end - end - end - end - end - end -end - -function generate_read_function(reader_type, machine, init_code, actions; kwargs...) - kwargs = Dict(kwargs) - context = Automa.CodeGenContext( - generator=get(kwargs, :generator, :goto), - checkbounds=get(kwargs, :checkbounds, false), - loopunroll=get(kwargs, :loopunroll, 0) - ) - quote - function Base.read!(reader::$(reader_type), record::eltype($(reader_type)))::eltype($(reader_type)) - return _read!(reader, reader.state, record) - end - - function _read!(reader::$(reader_type), state::State, record::eltype($(reader_type))) - stream = state.stream - ensure_margin!(stream) - cs = state.cs - linenum = state.linenum - data = stream.buffer - p = stream.position - p_end = stream.available - p_eof = -1 - found_record = false - initialize!(record) - - $(init_code) - - if state.finished - throw(EOFError()) - end - - while true - $(Automa.generate_exec_code(context, machine, actions)) - - state.cs = cs - state.finished |= cs == 0 - state.linenum = linenum - stream.position = p - - if cs < 0 - error($(reader_type), " file format error on line ", linenum, " ~>", repr(String(data[p:min(p+7,p_end)]))) - elseif found_record - break - elseif cs == 0 - throw(EOFError()) - elseif p > p_eof ≥ 0 - error("incomplete $($(reader_type)) input on line ", linenum) - elseif BufferedStreams.available_bytes(stream) < 64 - hits_eof = BufferedStreams.fillbuffer!(stream) == 0 - p = stream.position - p_end = stream.available - if hits_eof - p_eof = p_end - end - end - end - - @assert isfilled(record) - return record - end - end -end - # Automa.jl generated readrecord! and readmetainfo! functions # ======================================== # file = header . body # header = metainfo* # body = record* -const sam_metainfo_machine, sam_record_machine, sam_header_machine, sam_body_machine = (function () +const sam_machine_metainfo, sam_machine_record, sam_machine_header, sam_machine_body, sam_machine = (function () isinteractive() && info("compiling SAM") @@ -304,95 +135,261 @@ const sam_metainfo_machine, sam_record_machine, sam_header_machine, sam_body_mac header = cat(header′, opt(any() \ cat('@'))) # look ahead body = rep(cat(record, newline)) + body.actions[:exit] = [:body] - return map(Automa.compile, (metainfo, record, header, body)) + sam = cat(header, body) + + return map(Automa.compile, (metainfo, record, header, body, sam)) end)() -const sam_metainfo_actions = Dict( - :metainfo_tag => :(record.tag = (pos1:p-1) .- offset), - :metainfo_val => :(record.val = (pos1:p-1) .- offset), - :metainfo_dict_key => :(push!(record.dictkey, (pos2:p-1) .- offset)), - :metainfo_dict_val => :(push!(record.dictval, (pos2:p-1) .- offset)), +# write("sam_machine_metainfo.dot", Automa.machine2dot(sam_machine_metainfo)) +# run(`dot -Tsvg -o sam_machine_metainfo.svg sam_machine_metainfo.dot`) +# +# write("sam_machine_record.dot", Automa.machine2dot(sam_machine_record)) +# run(`dot -Tsvg -o sam_machine_record.svg sam_machine_record.dot`) +# +# write("sam_machine_header.dot", Automa.machine2dot(sam_machine_header)) +# run(`dot -Tsvg -o sam_machine_header.svg sam_machine_header.dot`) +# +# write("sam_machine_body.dot", Automa.machine2dot(sam_machine_body)) +# run(`dot -Tsvg -o sam_machine_body.svg sam_machine_body.dot`) +# +# write("sam_machine.dot", Automa.machine2dot(sam_machine)) +# run(`dot -Tsvg -o sam_machine.svg sam_machine.dot`) + +function appendfrom!(dst, dpos, src, spos, n) + if length(dst) < dpos + n - 1 + resize!(dst, dpos + n - 1) + end + copyto!(dst, dpos, src, spos, n) + return dst +end + +const sam_actions_metainfo = Dict( + :mark => :(@mark), + :pos1 => :(pos1 = @relpos(p)), + :pos2 => :(pos2 = @relpos(p)), + :metainfo_tag => :(metainfo.tag = pos1:@relpos(p-1)), + :metainfo_val => :(metainfo.val = pos1:@relpos(p-1)), + :metainfo_dict_key => :(push!(metainfo.dictkey, pos2:@relpos(p-1))), + :metainfo_dict_val => :(push!(metainfo.dictval, pos2:@relpos(p-1))), :metainfo => quote - resize_and_copy!(record.data, data, offset+1:p-1) - record.filled = (offset+1:p-1) .- offset - end, - :mark => :(), - :pos1 => :(pos1 = p), - :pos2 => :(pos2 = p) -) + let markpos = @markpos() -generate_index_function( - MetaInfo, - sam_metainfo_machine, - :(pos1 = pos2 = offset = 0), - sam_metainfo_actions -) |> eval + appendfrom!(metainfo.data, 1, data, markpos, length(markpos:p-1)) -generate_readheader_function( - Reader, - MetaInfo, - sam_header_machine, - :(pos1 = pos2 = offset = 0), - merge(sam_metainfo_actions, Dict( - :metainfo => quote - resize_and_copy!(record.data, data, upanchor!(stream):p-1) - record.filled = (offset+1:p-1) .- offset - @assert isfilled(record) - push!(reader.header.metainfo, record) - ensure_margin!(stream) - record = MetaInfo() - end, - :header => :(finish_header = true; @escape), - :countline => :(linenum += 1), - :mark => :(anchor!(stream, p); offset = p - 1))), - quote - if !eof(stream) - stream.position -= 1 # cancel look-ahead + metainfo.filled = @relpos(markpos):@relpos(p-1) + + found_metainfo = true end end -) |> eval - -const sam_record_actions = Dict( - :record_qname => :(record.qname = (pos:p-1) .- offset), - :record_flag => :(record.flag = (pos:p-1) .- offset), - :record_rname => :(record.rname = (pos:p-1) .- offset), - :record_pos => :(record.pos = (pos:p-1) .- offset), - :record_mapq => :(record.mapq = (pos:p-1) .- offset), - :record_cigar => :(record.cigar = (pos:p-1) .- offset), - :record_rnext => :(record.rnext = (pos:p-1) .- offset), - :record_pnext => :(record.pnext = (pos:p-1) .- offset), - :record_tlen => :(record.tlen = (pos:p-1) .- offset), - :record_seq => :(record.seq = (pos:p-1) .- offset), - :record_qual => :(record.qual = (pos:p-1) .- offset), - :record_field => :(push!(record.fields, (pos:p-1) .- offset)), - :record => quote - resize_and_copy!(record.data, data, 1:p-1) - record.filled = (offset+1:p-1) .- offset - end, - :mark => :(), - :pos => :(pos = p) ) -generate_index_function( - Record, - sam_record_machine, - :(pos = offset = 0), - sam_record_actions -) |> eval +const sam_actions_header = merge( + sam_actions_metainfo, + Dict( + :countline => :(linenum += 1), + :header => quote + + finish_header = true + + if !eof(stream) + p -= 1 # cancel look-ahead + end + + @escape + end + ) +) + +const sam_actions_record = Dict( + :mark => :(@mark), + :pos => :(pos = @relpos(p)), + :record_qname => :(record.qname = pos:@relpos(p-1)), + :record_flag => :(record.flag = pos:@relpos(p-1)), + :record_rname => :(record.rname = pos:@relpos(p-1)), + :record_pos => :(record.pos = pos:@relpos(p-1)), + :record_mapq => :(record.mapq = pos:@relpos(p-1)), + :record_cigar => :(record.cigar = pos:@relpos(p-1)), + :record_rnext => :(record.rnext = pos:@relpos(p-1)), + :record_pnext => :(record.pnext = pos:@relpos(p-1)), + :record_tlen => :(record.tlen = pos:@relpos(p-1)), + :record_seq => :(record.seq = pos:@relpos(p-1)), + :record_qual => :(record.qual = pos:@relpos(p-1)), + :record_field => :(push!(record.fields, pos:@relpos(p-1))), + :record => quote + let markpos = @markpos() + + appendfrom!(record.data, 1, data, markpos, length(markpos:p-1)) + + record.filled = @relpos(markpos):@relpos(p-1) -generate_read_function( - Reader, - sam_body_machine, - :(pos = offset = 0), - merge(sam_record_actions, Dict( - :record => quote - resize_and_copy!(record.data, data, upanchor!(stream):p-1) - record.filled = (offset+1:p-1) .- offset found_record = true @escape - end, + end + end +) + +const sam_actions_body = merge( + sam_actions_record, + Dict( :countline => :(linenum += 1), - :mark => :(anchor!(stream, p); offset = p - 1)) + :body => quote + finish_body = true + @escape + end ) +) + +# const sam_actions = merge( +# sam_actions_header, +# sam_actions_body +# ) + +const sam_context = Automa.CodeGenContext( + generator = :goto, + checkbounds = false, + loopunroll = 0 +) + +const sam_initcode_metainfo = quote + pos1 = 0 + pos2 = 0 + found_metainfo = false +end + +const sam_initcode_record = quote + pos = 0 + found_record = false +end + +const sam_initcode_header = quote + $(sam_initcode_metainfo) + metainfo = MetaInfo() + finish_header = false + cs, linenum = state +end + +const sam_initcode_body = quote + $(sam_initcode_record) + finish_body = false + cs, linenum = state +end + +const sam_loopcode_metainfo = quote + + if cs < 0 + throw(ArgumentError("malformed metainfo at pos $(p)")) + end + + if found_metainfo + @goto __return__ + end +end + +const sam_returncode_metainfo = quote + return found_metainfo +end + + +Automa.Stream.generate_reader( + :index!, + sam_machine_metainfo, + arguments = (:(metainfo::MetaInfo),), + actions = sam_actions_metainfo, + context = sam_context, + initcode = sam_initcode_metainfo, + loopcode = sam_loopcode_metainfo, + returncode = sam_returncode_metainfo +) |> eval + +const sam_loopcode_header = quote + + if cs < 0 + throw(ArgumentError("malformed metainfo at line $(linenum)")) + end + + if found_metainfo + push!(header, metainfo) + found_metainfo = false + end + + metainfo = MetaInfo() + + if finish_header + @goto __return__ + end +end + +const sam_returncode_header = quote + return cs, linenum, finish_header +end + +Automa.Stream.generate_reader( + :readheader!, + sam_machine_header, + arguments = (:(header::SAM.Header), :(state::Tuple{Int,Int})), + actions = sam_actions_header, + context = sam_context, + initcode = sam_initcode_header, + loopcode = sam_loopcode_header, + returncode = sam_returncode_header +) |> eval + + +const sam_loopcode_record = quote + + if cs < 0 + throw(ArgumentError("malformed SAM record at position $(p), line $(linenum)")) + end + + # # if cs != 0 + # # throw(ArgumentError(string("failed to index ", $(record_type), " ~>", repr(String(data[p:min(p+7,p_end)]))))) + # # end + + if found_record + @goto __return__ + end + +end + +const sam_loopcode_body = quote + + $(sam_loopcode_record) + + if finish_body + @goto __return__ + end +end + +const sam_returncode_record = quote + return found_record +end + +Automa.Stream.generate_reader( + :index!, + sam_machine_record, + arguments = (:(record::Record),), + actions = sam_actions_record, + context = sam_context, + initcode = sam_initcode_record, + loopcode = sam_loopcode_record, + returncode = sam_returncode_record +) |> eval + + + +const sam_returncode_body = quote + return cs, linenum, found_record +end + +Automa.Stream.generate_reader( + :readrecord!, + sam_machine_body, + arguments = (:(record::Record), :(state::Tuple{Int,Int})), + actions = sam_actions_body, + context = sam_context, + initcode = sam_initcode_body, + loopcode = sam_loopcode_body, + returncode = sam_returncode_body ) |> eval diff --git a/src/sam/sam.jl b/src/sam/sam.jl index 5b6bab8..95e36f6 100644 --- a/src/sam/sam.jl +++ b/src/sam/sam.jl @@ -7,11 +7,14 @@ using BioGenerics import Automa import Automa.RegExp: @re_str +import Automa.Stream: @mark, @markpos, @relpos, @abspos import BioAlignments +import BioGenerics: BioGenerics, isfilled, header import BioGenerics.Exceptions: missingerror -import BioGenerics: isfilled, header +import BioGenerics.Automa: State import BioSequences -import BufferedStreams +import TranscodingStreams: TranscodingStreams, TranscodingStream + using Printf: @sprintf @@ -46,19 +49,19 @@ function unsafe_parse_decimal(::Type{T}, data::Vector{UInt8}, range::UnitRange{I return sign * x end -#TODO: update BioCore.Ragel.State (will likely change with TrnscodingStreams). -import BufferedStreams: BufferedStreams, BufferedInputStream -# A type keeping track of a ragel-based parser's state. -mutable struct State{T<:BufferedInputStream} - stream::T # input stream - cs::Int # current DFA state of Ragel - linenum::Int # line number: parser is responsible for updating this - finished::Bool # true if finished (regardless of where in the stream we are) -end +# #TODO: update BioCore.Ragel.State (will likely change with TrnscodingStreams). +# import BufferedStreams: BufferedStreams, BufferedInputStream +# # A type keeping track of a ragel-based parser's state. +# mutable struct State{T<:BufferedInputStream} +# stream::T # input stream +# cs::Int # current DFA state of Ragel +# linenum::Int # line number: parser is responsible for updating this +# finished::Bool # true if finished (regardless of where in the stream we are) +# end -function State(initstate::Int, input::BufferedInputStream) - return State(input, initstate, 1, false) -end +# function State(initstate::Int, input::BufferedInputStream) +# return State(input, initstate, 1, false) +# end include("flags.jl")