Skip to content

Instantly share code, notes, and snippets.

@rdeva31
Last active March 27, 2017 06:07
Show Gist options
  • Save rdeva31/66b10a0dedfe4eb708d1d187fd2251d3 to your computer and use it in GitHub Desktop.
Save rdeva31/66b10a0dedfe4eb708d1d187fd2251d3 to your computer and use it in GitHub Desktop.
Generates documentation in lookml or markdown format for nodelib
#!python3
# ANALytics DOCumentation generator
# ---------------------------------
# Generates documentation in lookml or markdown format. Example usage:
# ./analdoc.py --markdown nodelib/**/*.py
#
# Assumes that all events are properly documented in a specific format, e.g.:
# nodelib.record_event('foo.bar', baz=True, qux=phase_of_moon(),
# documentation="""
# :baz: boolean, Baz!
# :qux: string, Phase of moon when this bug occurs
# """
import ast
from optparse import OptionParser
import re
_documented = set()
def _generate_markdown(event, description, fields):
"""Generates markdown for the event. Returns a string that's valid
markdown
"""
event_markdown = \
"""### {event}
{description}
**Since:** {version}
### Sources
{sources}
""" \
.format(event=event, description=description, version='v3.0.0',
sources='nodelib')
fields_markdown = ""
if fields:
max_field_name = max(map(lambda f: len(f[0]), fields))
max_type_name = max(map(lambda f: len(f[1]), fields))
max_expl_name = max(map(lambda f: len(f[2]), fields))
fmt_str = '%-{}s | %-{}s | %-{}s'.format(
max_field_name, max_type_name, max_expl_name)
fields_markdown = fmt_str % ('Key', 'Type', 'Description') + '\n'
fields_markdown += fmt_str % tuple(map(lambda l: '-' * l,
[max_field_name, max_type_name, max_expl_name])) + '\n'
fields_markdown += '\n'.join(map(lambda f: fmt_str % f, fields))
else:
fields_markdown = "No fields"
return '\n'.join([event_markdown, fields_markdown])
def _generate_lookml_dimension(event, field, field_type, description):
"""Generates lookml for a single dimension, return stringi """
_dimension = lambda e, f: '.'.join([e, f]).replace('.', '_')
def _lookml_type(py_type):
if py_type.lower() == 'boolean':
return 'yesno'
elif py_type.lower() in {'float', 'double', 'number'}:
return 'number'
else:
return py_type
def _sql_cast(py_type, sql):
cast_type = None
if py_type.lower() == 'boolean':
cast_type = 'BOOLEAN'
elif py_type.lower() in {'float', 'double', 'number'}:
cast_type = 'DOUBLE'
if not cast_type:
return '{sql} ;;'.format(sql=sql)
else:
return 'CAST({sql} as {cast_type}) ;;'.format(
sql=sql, cast_type=cast_type)
def _sql(f, py_type):
return _sql_cast(py_type,
"json_extract_scalar(${{extra}}, '$.{}')".format(f))
return \
"""
dimension: {dimension} {{
view_label: "Event: {event}",
label: "{field}",
type: {field_type},
description: "{description}",
sql:{sql}
}}
""" \
.format(event=event, dimension=_dimension(event, field),
field=field, field_type=_lookml_type(field_type),
description=description, sql=_sql(field, field_type))
def _generate_lookml(event, description, fields):
"""Generates lookml for the event.
Returns a string that's valid lookml
"""
lookml = list()
header = '# {}'.format(event)
for f in fields:
lookml.append(_generate_lookml_dimension(event, *f))
return '\n'.join([header] + (lookml or ['# No dimensions available']))
def _parse_analytics_doc(record_event_node):
"""Returns the parsed documentation from the record_event_ast
:record_event_node: ast.Call of the nodelib.analytics.record_event function
call
Returns (or attempts to, rather) a iterable of tuple of
(name, type, description), each item a string, e.g.
[('foo', 'float', 'random number'), ('bar', 'boolean', 'full moon')]
"""
docstring = list(filter(lambda kw: kw.arg == 'documentation',
record_event_node.keywords))[0].value.s
event = record_event_node.args[0].s
description = ''
fields = []
for line in docstring.split('\n'):
line = line.strip()
if len(line) == 0:
continue
matched = re.search(
r':(?P<name>.*?):\s*(?P<type>.*?),\s*(?P<expl>.*)', line)
if not matched:
if len(fields) == 0:
description += line + '\n'
continue
else:
raise ValueError('"{}" is poorly formatted'.format(line))
fields.append(matched.group('name', 'type', 'expl'))
# TODO: also validate that all kwargs are documented
return event, description.strip(), fields
def _process_source(source, process, event_filter=None):
"""Tries to parse the source and calls process() on each of them
:source: Python source code
:process: callable that accepts a tuple
('event name', 'description',
[('field', 'type', 'description')]
)
:event_filter: regex to match events
"""
event_filter = re.compile(event_filter) if event_filter else None
parsed_ast = ast.parse(source)
for node in ast.walk(parsed_ast):
if not isinstance(node, ast.Call):
continue
if not hasattr(node.func, 'attr'):
continue
if not node.func.attr == 'record_event':
continue
def name(arg):
if isinstance(arg, ast.Str):
return arg.s
elif isinstance(arg, ast.Name):
return arg.id
else:
raise Exception('Unknown argument type')
args = [name(a) for a in node.args]
kwargs = [kw.arg for kw in node.keywords]
if 'documentation' in kwargs:
event, description, fields = _parse_analytics_doc(node)
if event in _documented:
continue
elif event_filter and not event_filter.match(event):
continue
_documented.add(event)
md = process(event, description, fields)
print(md)
def _main():
parser = OptionParser()
parser.add_option('--lookml', action='store_true',
dest='generate_lookml', help='Generate lookml')
parser.add_option('--md', action='store_true',
dest='generate_md', help='Generate markdown')
parser.add_option('--filter', dest='filter',
help='Filter for events, accepts regex')
(options, files) = parser.parse_args()
if not options.generate_lookml and not options.generate_md:
raise ValueError('--md or --lookml should be specified')
files = files or ['/dev/stdin']
for f in files:
with open(f, 'r') as fd:
if options.generate_md:
_process_source(fd.read(), _generate_markdown, options.filter)
if options.generate_lookml:
_process_source(fd.read(), _generate_lookml, options.filter)
if __name__ == '__main__':
_main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment