Refactor to parse multiple files from the directory

Signed-off-by: Thomas A. Christensen II <25492070+MillironX@users.noreply.github.com>
This commit is contained in:
Thomas A. Christensen II 2022-01-10 16:13:30 -06:00
parent ab23cd2e17
commit ccdb61ac27
Signed by: millironx
GPG key ID: 139C07724802BC5D

View file

@ -1,4 +1,5 @@
#!/usr/bin/env python #!/usr/bin/env python
import os
import yaml import yaml
from docutils import nodes from docutils import nodes
from docutils.parsers.rst import Directive from docutils.parsers.rst import Directive
@ -53,79 +54,84 @@ class NFDocs(Directive):
def run(self): def run(self):
# Take path as single argument for now # Take path as single argument for now
nextflow_path = self.arguments[0] nextflow_path = self.arguments[0]
with open(nextflow_path) as nextflow_file: print(nextflow_path)
# Split by lines # Create dictionaries for each of the block types
nextflow_lines = nextflow_file.readlines() docstrings = {
"process": {},
"workflow": {},
"function": {}
}
# Declare some variables to keep track of where the docstrings begin and end # Create any array to return from the plugin
doc_start = 0 return_nodes = []
doc_end = 0
# Declare dictionaries to keep track of the docstrings for root, dirs, files in os.walk(nextflow_path):
docstring_positions = [] for f in files:
if f.endswith(".nf"):
with open(os.path.join(root,f)) as nextflow_file:
# Calculate the start and end positions of each docstring # Split by lines
for i, line in enumerate(nextflow_lines): nextflow_lines = nextflow_file.readlines()
# Check if this is a docstring
if line.startswith(self.DOC_STARTER):
# It is: check the next and previous lines to see if this is part of a block
line_previous = nextflow_lines[i-1]
line_next = nextflow_lines[i+1]
if not line_previous.startswith(self.DOC_STARTER):
doc_start = i
if not line_next.startswith(self.DOC_STARTER):
doc_end = i
# Check if we've reached the end of a docstring block # Declare some variables to keep track of where the docstrings begin and end
if doc_end == i: doc_start = 0
# Add this docstring position to the array doc_end = 0
docstring_positions.append(range(doc_start, doc_end+1))
# Create dictionaries for each of the block types # Declare dictionaries to keep track of the docstrings
docstrings = { docstring_positions = []
"process": {},
"workflow": {},
"function": {}
}
# Parse out the docstrings and put them in the appropriate dictionary # Calculate the start and end positions of each docstring
for pos in docstring_positions: for i, line in enumerate(nextflow_lines):
proc_name, proc_type = self.definition_type(nextflow_lines[pos[-1]+1]) # Check if this is a docstring
doc_yaml = "" if line.startswith(self.DOC_STARTER):
for i in pos: # It is: check the next and previous lines to see if this is part of a block
doc_yaml = doc_yaml + nextflow_lines[i].replace(self.DOC_STARTER, "") line_previous = nextflow_lines[i-1]
docstrings[proc_type][proc_name] = yaml.safe_load(doc_yaml) line_next = nextflow_lines[i+1]
if not line_previous.startswith(self.DOC_STARTER):
doc_start = i
if not line_next.startswith(self.DOC_STARTER):
doc_end = i
# Create any array to return from the plugin # Check if we've reached the end of a docstring block
return_nodes = [] if doc_end == i:
# Add this docstring position to the array
docstring_positions.append(range(doc_start, doc_end+1))
# Try to convert each definition to a node # Parse out the docstrings and put them in the appropriate dictionary
for block_type, block_docs in docstrings.items(): for pos in docstring_positions:
block_section = nodes.section() proc_name, proc_type = self.definition_type(nextflow_lines[pos[-1]+1])
block_section += nodes.title(text=block_type) doc_yaml = ""
for proc_name, proc_docs in block_docs.items(): for i in pos:
proc_section = nodes.section() doc_yaml = doc_yaml + nextflow_lines[i].replace(self.DOC_STARTER, "")
proc_section += nodes.title(text=proc_name) docstrings[proc_type][proc_name] = yaml.safe_load(doc_yaml)
proc_section += nodes.paragraph(text=proc_docs["summary"])
io_methods = ["input", "output"]
for met in io_methods:
if met in proc_docs.keys():
io_section = nodes.section()
io_section += nodes.title(text=met)
io_list = nodes.bullet_list()
for io in proc_docs[met]:
io_list += self.params_to_list(io)
io_section += io_list
proc_section += io_section
self.state_machine.document.note_implicit_target(io_section)
self.state_machine.document.note_implicit_target(proc_section)
block_section += proc_section
self.state_machine.document.note_implicit_target(block_section) # Try to convert each definition to a node
return_nodes.append(block_section) for block_type, block_docs in docstrings.items():
block_section = nodes.section()
block_section += nodes.title(text=block_type)
for proc_name, proc_docs in block_docs.items():
proc_section = nodes.section()
proc_section += nodes.title(text=proc_name)
proc_section += nodes.paragraph(text=proc_docs["summary"])
io_methods = ["input", "output"]
for met in io_methods:
if met in proc_docs.keys():
io_section = nodes.section()
io_section += nodes.title(text=met)
io_list = nodes.bullet_list()
for io in proc_docs[met]:
io_list += self.params_to_list(io)
io_section += io_list
proc_section += io_section
self.state_machine.document.note_implicit_target(io_section)
self.state_machine.document.note_implicit_target(proc_section)
block_section += proc_section
return return_nodes self.state_machine.document.note_implicit_target(block_section)
return_nodes.append(block_section)
return return_nodes
def setup(app): def setup(app):
app.add_directive('nfdocs', NFDocs) app.add_directive('nfdocs', NFDocs)