nfdocs-parser/nfdocs-parser.py

215 lines
8.4 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python
import inflect
import os
import yaml
from docutils import nodes
from docutils.parsers.rst import Directive
from docutils.parsers.rst import directives
from sphinx.util import logging
def definition_type(signature):
# Returns "name", workflow|process|function
def_type = "unknown"
if "workflow" in signature:
def_type = "workflow"
elif "process" in signature:
def_type = "process"
elif "function" in signature:
def_type = "function"
# Check if any signature was recognized
if def_type == "unknown":
return "unknown", "an error occurred"
# Parse out the definition name
def_name = signature.replace(def_type, "").replace("{", "").strip()
# Return the results
return def_name, def_type
class NFDocs(Directive):
# Class default overrides
required_arguments = 1
# Declare the docstring starting characters
DOC_STARTER = "/// "
pe = inflect.engine()
def params_to_table(self, type, params):
# Create a table
params_table = nodes.table()
if type:
params_table += nodes.title(text=type.capitalize())
# Make it two columns wide
params_tgroup = nodes.tgroup(cols=2)
for _ in range(2):
colspec = nodes.colspec(colwidth=1)
params_tgroup.append(colspec)
# Create the row definitions
params_rows = []
for param in params:
# Create a new row
param_row = nodes.row()
# If this parameter is a tuple, the new row takes on the form
# +-------+------------------+
# | | +--------------+ |
# | Tuple | | Params Table | |
# | | +--------------+ |
# +-------+------------------+
# via recursion
if "tuple" in param.keys():
# Tuple title
param_name_entry = nodes.entry()
param_name_entry += nodes.strong(text="Tuple")
param_row += param_name_entry
# Params table
sub_params_entry = nodes.entry()
sub_params_entry += self.params_to_table("", param["tuple"])
param_row += sub_params_entry
# If this is actually a parameter, the new row takes on the form
# +------------+-------------+
# | Name(Type) | Description |
# +------------+-------------+
# or
# +------+-------------+
# | Type | Description |
# +------+-------------+
else:
# Parameter title
param_name_entry = nodes.entry()
if "name" in param.keys():
param_name_entry += nodes.strong(text=param["name"])
param_name_entry += nodes.Text(f"({param['type']})")
else:
param_name_entry += nodes.Text(param["type"])
param_row += param_name_entry
# Parameter description
param_description_entry = nodes.entry()
param_description = nodes.paragraph(text=param["description"])
self.state.nested_parse(self.content, self.content_offset, param_description)
param_description_entry += param_description
param_row += param_description_entry
# Add this row to the vector
params_rows.append(param_row)
# Convert the rows to a table
params_table_body = nodes.tbody()
params_table_body.extend(params_rows)
params_tgroup += params_table_body
params_table += params_tgroup
return params_table
def run(self):
# Take path as single argument for now
nextflow_path = self.arguments[0]
print(nextflow_path)
# Create dictionaries for each of the block types
docstrings = {
"workflow": {},
"process": {},
"function": {}
}
# Create any array to return from the plugin
return_nodes = []
for root, dirs, files in os.walk(nextflow_path):
for f in files:
if f.endswith(".nf"):
with open(os.path.join(root,f)) as nextflow_file:
# Split by lines
nextflow_lines = nextflow_file.readlines()
# Declare some variables to keep track of where the docstrings begin and end
doc_start = 0
doc_end = 0
# Declare dictionaries to keep track of the docstrings
docstring_positions = []
# Calculate the start and end positions of each docstring
for i, line in enumerate(nextflow_lines):
# Check if this is a docstring
if line.startswith(self.DOC_STARTER):
# It is: check the next and previous lines to see if this is part of a block
line_previous = nextflow_lines[i-1]
line_next = nextflow_lines[i+1]
if not line_previous.startswith(self.DOC_STARTER):
doc_start = i
if not line_next.startswith(self.DOC_STARTER):
doc_end = i
# Check if we've reached the end of a docstring block
if doc_end == i:
# Add this docstring position to the array
docstring_positions.append(range(doc_start, doc_end+1))
# Parse out the docstrings and put them in the appropriate dictionary
for pos in docstring_positions:
proc_name, proc_type = definition_type(nextflow_lines[pos[-1]+1])
doc_yaml = ""
for i in pos:
doc_yaml = doc_yaml + nextflow_lines[i].replace(self.DOC_STARTER, "")
try:
docstrings[proc_type][proc_name] = yaml.safe_load(doc_yaml)
except:
logger = logging.getLogger(__name__)
logger.warning(f"Could not parse YAML for {proc_name} ({f}:{pos})")
# Try to convert each definition to a node
for block_type, block_docs in docstrings.items():
# Check if there are any blocks of this type
if len(block_docs) > 0:
# Create a new section for this type of block
block_section = nodes.section()
block_section += nodes.title(text=self.pe.plural(block_type.capitalize(), len(block_docs)))
# Sort the blocks alphabetically
sorted_block_docs = {key: val for key, val in sorted(block_docs.items(), key = lambda ele: ele[0])}
# Create a subsection for each block
for proc_name, proc_docs in sorted_block_docs.items():
try:
# Create the section and heading
proc_section = nodes.section()
proc_section += nodes.title(text=proc_name)
proc_section += nodes.paragraph(text=proc_docs["summary"])
# Create the io tables
io_methods = ["input", "output"]
for met in io_methods:
if met in proc_docs.keys():
io_table = self.params_to_table(met, proc_docs[met])
proc_section += io_table
except:
logger = logging.getLogger(__name__)
logger.warning(f"Could not write docs for {proc_name}")
# Add the block section to the parent node
self.state_machine.document.note_implicit_target(proc_section)
block_section += proc_section
# Add the type of block section to the document
self.state_machine.document.note_implicit_target(block_section)
return_nodes.append(block_section)
return return_nodes
def setup(app):
app.add_directive('nfdocs', NFDocs)
return {
"version": "0.1.2"
}