Promote the parser script to a Directive class

Signed-off-by: Thomas A. Christensen II <25492070+MillironX@users.noreply.github.com>
This commit is contained in:
Thomas A. Christensen II 2022-01-05 16:32:43 -06:00
parent 01bd885867
commit c09f664328
Signed by: millironx
GPG key ID: 139C07724802BC5D

View file

@ -2,118 +2,121 @@
import sys import sys
import yaml import yaml
from docutils import nodes from docutils import nodes
from docutils.parsers.rst import Directive
# Declare the docstring starting characters class NFDocs(Directive):
DOC_STARTER = "/// "
def definition_type(signature): # Declare the docstring starting characters
# Returns "name", workflow|process|function DOC_STARTER = "/// "
def_type = "unknown"
if "workflow" in signature:
def_type = "workflow"
elif "process" in signature:
def_type = "process"
elif "function" in signature:
def_type = "function"
# Check if any signature was recognized def definition_type(signature):
if def_type == "unknown": # Returns "name", workflow|process|function
return "unknown", "an error occurred" def_type = "unknown"
if "workflow" in signature:
def_type = "workflow"
elif "process" in signature:
def_type = "process"
elif "function" in signature:
def_type = "function"
# Parse out the definition name # Check if any signature was recognized
def_name = signature.replace(def_type, "").replace("{", "").strip() if def_type == "unknown":
return "unknown", "an error occurred"
# Return the results # Parse out the definition name
return def_name, def_type def_name = signature.replace(def_type, "").replace("{", "").strip()
def params_to_list(params): # Return the results
if "tuple" in params.keys(): return def_name, def_type
tuple_item = nodes.list_item()
if "name" in params.keys():
tuple_item += nodes.paragraph(text=params["name"])
tuple_item += nodes.paragraph(text="Tuple:")
tuple_list = nodes.bullet_list()
for io in params["tuple"]:
tuple_list += params_to_list(io)
tuple_item += tuple_list
return tuple_item
else:
io_item = nodes.list_item()
if "name" in params.keys():
io_item += nodes.paragraph(text=params["name"])
io_item += nodes.paragraph(text=f"Type: {params['type']}")
io_item += nodes.paragraph(text=params["description"])
return io_item
# Take path as single argument for now def params_to_list(params):
nextflow_path = sys.argv[1] if "tuple" in params.keys():
with open(nextflow_path) as nextflow_file: tuple_item = nodes.list_item()
if "name" in params.keys():
tuple_item += nodes.paragraph(text=params["name"])
tuple_item += nodes.paragraph(text="Tuple:")
tuple_list = nodes.bullet_list()
for io in params["tuple"]:
tuple_list += params_to_list(io)
tuple_item += tuple_list
return tuple_item
else:
io_item = nodes.list_item()
if "name" in params.keys():
io_item += nodes.paragraph(text=params["name"])
io_item += nodes.paragraph(text=f"Type: {params['type']}")
io_item += nodes.paragraph(text=params["description"])
return io_item
# Split by lines # Take path as single argument for now
nextflow_lines = nextflow_file.readlines() nextflow_path = sys.argv[1]
with open(nextflow_path) as nextflow_file:
# Declare some variables to keep track of where the docstrings begin and end # Split by lines
doc_start = 0 nextflow_lines = nextflow_file.readlines()
doc_end = 0
# Declare dictionaries to keep track of the docstrings # Declare some variables to keep track of where the docstrings begin and end
docstring_positions = [] doc_start = 0
doc_end = 0
# Calculate the start and end positions of each docstring # Declare dictionaries to keep track of the docstrings
for i, line in enumerate(nextflow_lines): docstring_positions = []
# Check if this is a docstring
if line.startswith(DOC_STARTER):
# It is: check the next and previous lines to see if this is part of a block
line_previous = nextflow_lines[i-1]
line_next = nextflow_lines[i+1]
if not line_previous.startswith(DOC_STARTER):
doc_start = i
if not line_next.startswith(DOC_STARTER):
doc_end = i
# Check if we've reached the end of a docstring block # Calculate the start and end positions of each docstring
if doc_end == i: for i, line in enumerate(nextflow_lines):
# Add this docstring position to the array # Check if this is a docstring
docstring_positions.append(range(doc_start, doc_end+1)) if line.startswith(DOC_STARTER):
# It is: check the next and previous lines to see if this is part of a block
line_previous = nextflow_lines[i-1]
line_next = nextflow_lines[i+1]
if not line_previous.startswith(DOC_STARTER):
doc_start = i
if not line_next.startswith(DOC_STARTER):
doc_end = i
# Create dictionaries for each of the block types # Check if we've reached the end of a docstring block
docstrings = { if doc_end == i:
"process": {}, # Add this docstring position to the array
"workflow": {}, docstring_positions.append(range(doc_start, doc_end+1))
"function": {}
}
# Parse out the docstrings and put them in the appropriate dictionary # Create dictionaries for each of the block types
for pos in docstring_positions: docstrings = {
proc_name, proc_type = definition_type(nextflow_lines[pos[-1]+1]) "process": {},
doc_yaml = "" "workflow": {},
for i in pos: "function": {}
doc_yaml = doc_yaml + nextflow_lines[i].replace(DOC_STARTER, "") }
docstrings[proc_type][proc_name] = yaml.safe_load(doc_yaml)
# Create any array to return from the plugin # Parse out the docstrings and put them in the appropriate dictionary
return_nodes = [] for pos in docstring_positions:
proc_name, proc_type = definition_type(nextflow_lines[pos[-1]+1])
doc_yaml = ""
for i in pos:
doc_yaml = doc_yaml + nextflow_lines[i].replace(DOC_STARTER, "")
docstrings[proc_type][proc_name] = yaml.safe_load(doc_yaml)
# Try to convert each definition to a node # Create any array to return from the plugin
for block_type, block_docs in docstrings.items(): return_nodes = []
block_section = nodes.section()
block_section += nodes.title(text=block_type)
for proc_name, proc_docs in block_docs.items():
proc_section = nodes.section()
proc_section += nodes.title(text=proc_name)
proc_section += nodes.paragraph(text=proc_docs["summary"])
io_methods = ["input", "output"]
for met in io_methods:
if met in proc_docs.keys():
io_section = nodes.section()
io_section += nodes.title(text=met)
io_list = nodes.bullet_list()
for io in proc_docs[met]:
io_list += params_to_list(io)
io_section += io_list
proc_section += io_section
block_section += proc_section
return_nodes.append(block_section) # Try to convert each definition to a node
for block_type, block_docs in docstrings.items():
block_section = nodes.section()
block_section += nodes.title(text=block_type)
for proc_name, proc_docs in block_docs.items():
proc_section = nodes.section()
proc_section += nodes.title(text=proc_name)
proc_section += nodes.paragraph(text=proc_docs["summary"])
io_methods = ["input", "output"]
for met in io_methods:
if met in proc_docs.keys():
io_section = nodes.section()
io_section += nodes.title(text=met)
io_list = nodes.bullet_list()
for io in proc_docs[met]:
io_list += params_to_list(io)
io_section += io_list
proc_section += io_section
block_section += proc_section
print(return_nodes) return_nodes.append(block_section)
print(return_nodes)