#!/usr/bin/env python3 import argparse import copy import json import os import re import subprocess import sys import tempfile import CommonMark_bkrs as CommonMark import yaml hardcoded = ''' import fable, json, logging from %s import * run = fable.Run() ''' class Scenario: def __init__(self): self._name = None self._steps = [] def get_name(self): return self._name def set_name(self, name): self._name = name def append_steps(self, steps): self._steps.extend(steps) def get_steps(self): steps = [] prev_keyword = None for step in self._steps: words = step.split() if words: if words[0].lower() == 'and': assert prev_keyword is not None words[0] = prev_keyword else: words[0] = words[0].lower() prev_keyword = words[0] steps.append(' '.join(words)) return steps def is_empty(self): return len(self._steps) == 0 class Fable: def __init__(self): self._scenarios = [] def start_scenario(self, name): s = Scenario() s.set_name(name) self._scenarios.append(s) def append_steps(self, steps): s = self._scenarios[-1] s.append_steps(steps) def get_scenarios(self): return self._scenarios[:] def collect_header(fable, o): heading = ' '.join(o.strings) fable.start_scenario(heading) def collect_fencedcode(fable, o): if o.info == 'fable': fable.append_steps(o.strings[1:]) collectors = { 'ATXHeader': collect_header, 'SetextHeader': collect_header, 'FencedCode': collect_fencedcode, } def collect(fable, o): if o.t not in collectors: # debug('{} not known'.format(repr(o.t))) return collector = collectors[o.t] return collector(fable, o) def walk(o, func): done = func(o) if not done: for c in o.children: walk(c, func) def find_binding(bindings, keyword, rest): keyword = keyword.lower() for b in bindings: if keyword not in b: continue m = re.match(b[keyword], rest, re.I) if not m: continue return b['function'], m.groupdict() assert 0, "Couldn't find binding for {} {}".format(keyword, rest) def codegen(f, step, bindings): words = step.split() if not words: return keyword = words[0] rest = ' '.join(words[1:]) function, args = find_binding(bindings, keyword, rest) f.write(' logging.info("step %s", {})\n'.format(json.dumps(step))) f.write(' args = {}\n'.format(json.dumps(args))) f.write(' logging.debug("calling {} with args %s", args)\n'.format(function)) f.write(' {}(run.get_context(), **args)\n\n'.format(function)) def infer_basename(markdowns): root, ext = os.path.splitext(markdowns[0]) if ext not in ['.md', '.mdwn']: sys.exit('Input filenames must end in .md or .mdwn') return root # Usage example: # ftt-codegen foo.md bar.md -- --test-arg --other-test-arg def parse_cli(argv=None): p = argparse.ArgumentParser() p.add_argument('--run', action='store_true') p.add_argument('markdown', nargs='A...') args = p.parse_args(argv) more_args = [] if '--' in args.markdown: break_index = args.markdown.index('--') more_args = args.markdown[break_index + 1:] args.markdown = args.markdown[:break_index] return args, more_args def writeprog(f, scenarios, bindings, funcs): f.write(hardcoded % funcs) for s in scenarios: name = s.get_name() f.write('\nif run.start("{}"):\n'.format(name)) for step in s.get_steps(): codegen(f, step, bindings) f.write(' run.end()\n') f.flush() def debug(msg): if False: sys.stderr.write('DEBUG: {}\n'.format(msg)) sys.stderr.flush() def test_parse_cli(): args, more_args = parse_cli([ 'foo.md', 'bar.md', '--', '--test-arg', '--other-test-arg' ]) assert args.markdown == ['foo.md', 'bar.md'] assert more_args == ['--test-arg', '--other-test-arg'] args, more_args = parse_cli(['foo.md']) assert args.markdown == ['foo.md'] assert not more_args args, test_args = parse_cli() basename = infer_basename(args.markdown) dirname = os.path.dirname(basename) funcs = os.path.basename(basename) # drop directories text = ''.join(open(filename).read() for filename in args.markdown) with open(basename + '.yaml') as f: bindings = yaml.safe_load(f) parser = CommonMark.DocParser() ast = parser.parse(text) fable = Fable() walk(ast, lambda o: collect(fable, o)) scenarios = [] for s in fable.get_scenarios(): if not s.is_empty(): scenarios.append(s) if args.run: with tempfile.NamedTemporaryFile(mode='w', dir=dirname) as f: writeprog(f, scenarios, bindings, funcs) argv = ['python3', f.name] + test_args returncode = subprocess.call(argv) if returncode != 0: sys.exit('test failed') else: writeprog(sys.stdout, scenarios, bindings, funcs) debug('ok')