############################################################################# # Functions that implement steps. From {{ functions_filename }}. {{ functions }} ############################################################################# # Helper code generated by Subplot. import argparse import base64 import logging import os import random import shutil import sys import tarfile import tempfile # Store context between steps. class Context: def __init__(self): self._vars = {} def as_dict(self): return dict(self._vars) def get(self, key, default=None): return self._vars.get(key, default) def __getitem__(self, key): return self._vars[key] def __setitem__(self, key, value): logging.info('Context: {}={!r}'.format(key, value)) self._vars[key] = value # Decode a base64 encoded string. Result is binary or unicode string. def decode_bytes(s): return base64.b64decode(s) def decode_str(s): return base64.b64decode(s).decode() # Test data files that were embedded in the source document. Base64 # encoding is used to allow arbitrary data. _files = {} {% for file in files %} # {{ file.filename }} filename = decode_str('{{ file.filename | base64 }}') contents = decode_bytes('{{ file.contents | base64 }}') _files[filename] = contents {% endfor %} # Retrieve an embedded test data file using filename. def get_file(filename): return _files[filename] # Check two values for equality and give error if they are not equal def assert_eq(a, b): assert a == b, 'expected %r == %r' % (a, b) # Check two values for inequality and give error if they are equal def assert_ne(a, b): assert a != b, 'expected %r != %r' % (a, b) # Remember where we started from. The step functions may need to refer # to files there. srcdir = os.getcwd() print('srcdir', srcdir) # Create a new temporary directory and chdir there. This allows step # functions to create new files in the current working directory # without having to be so careful. _datadir = tempfile.mkdtemp() print('datadir', _datadir) os.chdir(_datadir) ############################################################################# # Code to implement the scenarios. class Step: def __init__(self): self._kind = None self._text = None self._args = {} self._function = None self._cleanup = None def set_kind(self, kind): self._kind = kind def set_text(self, text): self._text = text def set_arg(self, name, value): self._args[name] = value def set_function(self, function): self._function = function def set_cleanup(self, cleanup): self._cleanup = cleanup def do(self, ctx): print(' step: {} {}'.format(self._kind, self._text)) logging.info(' step: {} {}'.format(self._kind, self._text)) self._function(ctx, **self._args) def cleanup(self, ctx): if self._cleanup: print(' cleanup: {} {}'.format(self._kind, self._text)) logging.info(' cleanup: {} {}'.format(self._kind, self._text)) self._cleanup(ctx) else: logging.info(' no cleanup defined: {} {}'.format(self._kind, self._text)) class Scenario: def __init__(self): self._title = None self._steps = [] def get_title(self): return self._title def set_title(self, title): self._title = title def append_step(self, step): self._steps.append(step) def run(self): print('scenario: {}'.format(self._title)) logging.info("Scenario: {}".format(self._title)) scendir = tempfile.mkdtemp(dir=_datadir) os.chdir(scendir) ctx = Context() done = [] try: for step in self._steps: step.do(ctx) done.append(step) except Exception as e: logging.error(str(e), exc_info=True) for step in reversed(done): step.cleanup(ctx) raise for step in reversed(done): step.cleanup(ctx) {% for scenario in scenarios %} ###################################### # Scenario: {{ scenario.title }} class Scenario_{{ loop.index }}(): def __init__(self): self._scenario = Scenario() self._scenario.set_title(decode_str('{{ scenario.title | base64 }}')) {% for step in scenario.steps %} # Step: {{ step.text }} step = Step() step.set_kind('{{ step.kind | lower }}') step.set_text(decode_str('{{ step.text | base64 }}')) step.set_function({{ step.function }}) if '{{ step.cleanup }}': step.set_cleanup({{ step.cleanup }}) self._scenario.append_step(step) {% for part in step.parts %}{% if part.CapturedText is defined -%} name = decode_str('{{ part.CapturedText.name | base64 }}') text = decode_str('{{ part.CapturedText.text | base64 }}') step.set_arg(name, text) {% endif -%} {% endfor -%} {% endfor %} def get_title(self): return self._scenario.get_title() def run(self): self._scenario.run() {% endfor %} _scenarios = { {% for scenario in scenarios %} Scenario_{{ loop.index }}(),{% endfor %} } def parse_command_line(): p = argparse.ArgumentParser() p.add_argument("--log") p.add_argument("--save-on-failure") p.add_argument("patterns", nargs="*") return p.parse_args() def setup_logging(args): if args.log: fmt = "%(asctime)s %(levelname)s %(message)s" datefmt = "%Y-%m-%d %H:%M:%S" formatter = logging.Formatter(fmt, datefmt) filename = os.path.abspath(os.path.join(srcdir, args.log)) handler = logging.FileHandler(filename) handler.setFormatter(formatter) else: handler = logging.NullHandler() logger = logging.getLogger() logger.addHandler(handler) logger.setLevel(logging.DEBUG) def save_directory(dirname, tarname): print('tarname', tarname) logging.info("Saving {} to {}".format(dirname, tarname)) tar = tarfile.open(tarname, "w") tar.add(dirname, arcname="datadir") tar.close() def main(): args = parse_command_line() setup_logging(args) logging.info("Test program starts") logging.info("patterns: {}".format(args.patterns)) if len(args.patterns) == 0: logging.info("Executing all scenarios") todo = list(_scenarios) random.shuffle(todo) else: logging.info("Executing requested scenarios only: {}".format(args.patterns)) patterns = [arg.lower() for arg in args.patterns] todo = [ scen for scen in _scenarios if any(pattern in scen.get_title().lower() for pattern in patterns) ] try: for scen in todo: scen.run() except Exception as e: logging.error(str(e), exc_info=True) if args.save_on_failure: print(args.save_on_failure) filename = os.path.abspath(os.path.join(srcdir, args.save_on_failure)) print(filename) save_directory(_datadir, filename) raise main() ############################################################################# # Clean up temporary directory and report success. shutil.rmtree(_datadir) print('OK, all scenarios finished successfully') logging.info("OK, all scenarios finished successfully")