############################################################################# # Functions that implement steps. #---------------------------------------------------------------------------- # This code comes from: oso.py import os def start_server(ctx): # Declare Subplot library names. In the generated Python program, the # libraries will be included and can just be used via names, but to placate # automated checks that only see this file, get the names from globals() at # runtime. daemon_start_on_port = globals()["daemon_start_on_port"] runcmd_helper_srcdir_path = globals()["runcmd_helper_srcdir_path"] srcdir = globals()["srcdir"] # This installs srcdir in $PATH so that we can run the client and server # easily. runcmd_helper_srcdir_path(ctx) # Start server. server = os.path.join(srcdir, "server.py") daemon_start_on_port(ctx, path=server, args="", name="server", port=5000) def stop_server(ctx): daemon_stop = globals()["daemon_stop"] daemon_stop(ctx, name="server") def answer_is(ctx, index): assert_eq = globals()["assert_eq"] runcmd_get_stdout = globals()["runcmd_get_stdout"] stdout = runcmd_get_stdout(ctx) assert_eq(stdout.strip(), index) #---------------------------------------------------------------------------- # This code comes from: lib/daemon.py import logging import os import signal import socket import subprocess import time # A helper function for testing lib/daemon itself. def _daemon_shell_script(ctx, filename=None): get_file = globals()["get_file"] data = get_file(filename) with open(filename, "wb") as f: f.write(data) os.chmod(filename, 0o755) # Start a daemon that will open a port on localhost. def daemon_start_on_port(ctx, path=None, args=None, name=None, port=None): _daemon_start(ctx, path=path, args=args, name=name) daemon_wait_for_port("localhost", port) # Start a daemon after a little wait. This is used only for testing the # port-waiting code. def _daemon_start_soonish(ctx, path=None, args=None, name=None, port=None): _daemon_start(ctx, path=os.path.abspath(path), args=args, name=name) daemon = ctx.declare("_daemon") # Store the PID of the process we just started so that _daemon_stop_soonish # can kill it during the cleanup phase. This works around the Subplot # Python template not giving the step captures to cleanup functions. Note # that this code assume at most one _soonish function is called. daemon["_soonish"] = daemon[name]["pid"] try: daemon_wait_for_port("localhost", port) except Exception as e: daemon["_start_error"] = repr(e) logging.info("pgrep: %r", _daemon_pgrep(path)) def _daemon_stop_soonish(ctx, path=None, args=None, name=None, port=None): ns = ctx.declare("_daemon") pid = ns["_soonish"] logging.debug(f"Stopping soonishly-started daemon, {pid}") signo = signal.SIGKILL try: os.kill(pid, signo) except ProcessLookupError: logging.warning("Process did not actually exist (anymore?)") # Start a daeamon, get its PID. Don't wait for a port or anything. This is # meant for background processes that don't have port. Useful for testing the # lib/daemon library of Subplot, but not much else. def _daemon_start(ctx, path=None, args=None, name=None): runcmd_run = globals()["runcmd_run"] runcmd_exit_code_is = globals()["runcmd_exit_code_is"] runcmd_get_exit_code = globals()["runcmd_get_exit_code"] runcmd_get_stderr = globals()["runcmd_get_stderr"] runcmd_prepend_to_path = globals()["runcmd_prepend_to_path"] path = os.path.abspath(path) argv = [path] + args.split() logging.debug(f"Starting daemon {name}") logging.debug(f" ctx={ctx.as_dict()}") logging.debug(f" name={name}") logging.debug(f" path={path}") logging.debug(f" args={args}") logging.debug(f" argv={argv}") ns = ctx.declare("_daemon") this = ns[name] = { "pid-file": f"{name}.pid", "stderr": f"{name}.stderr", "stdout": f"{name}.stdout", } # Debian installs `daemonize` to /usr/sbin, which isn't part of the minimal # environment that Subplot sets up. So we add /usr/sbin to the PATH. runcmd_prepend_to_path(ctx, "/usr/sbin") runcmd_run( ctx, [ "daemonize", "-c", os.getcwd(), "-p", this["pid-file"], "-e", this["stderr"], "-o", this["stdout"], ] + argv, ) # Check that daemonize has exited OK. If it hasn't, it didn't start the # background process at all. If so, log the stderr in case there was # something useful there for debugging. exit = runcmd_get_exit_code(ctx) if exit != 0: stderr = runcmd_get_stderr(ctx) logging.error(f"daemon {name} stderr: {stderr}") runcmd_exit_code_is(ctx, 0) # Get the pid of the background process, from the pid file created by # daemonize. We don't need to wait for it, since we know daemonize already # exited. If it isn't there now, it's won't appear later. if not os.path.exists(this["pid-file"]): raise Exception("daemonize didn't create a PID file") this["pid"] = _daemon_wait_for_pid(this["pid-file"], 10.0) logging.debug(f"Started daemon {name}") logging.debug(f" pid={this['pid']}") logging.debug(f" ctx={ctx.as_dict()}") def _daemon_wait_for_pid(filename, timeout): start = time.time() while time.time() < start + timeout: with open(filename) as f: data = f.read().strip() if data: return int(data) raise Exception("daemonize created a PID file without a PID") def daemon_wait_for_port(host, port, timeout=5.0): addr = (host, port) until = time.time() + timeout while True: try: s = socket.create_connection(addr, timeout=timeout) s.close() return except socket.timeout: logging.error( f"daemon did not respond at port {port} within {timeout} seconds" ) raise except socket.error as e: logging.info(f"could not connect to daemon at {port}: {e}") pass if time.time() >= until: logging.error( f"could not connect to daemon at {port} within {timeout} seconds" ) raise ConnectionRefusedError() # Sleep a bit to avoid consuming too much CPU while busy-waiting. time.sleep(0.1) # Stop a daemon. def daemon_stop(ctx, path=None, args=None, name=None): logging.debug(f"Stopping daemon {name}") ns = ctx.declare("_daemon") logging.debug(f" ns={ns}") pid = ns[name]["pid"] signo = signal.SIGTERM this = ns[name] data = open(this["stdout"]).read() logging.debug(f"{name} stdout, before: {data!r}") data = open(this["stderr"]).read() logging.debug(f"{name} stderr, before: {data!r}") logging.debug(f"Terminating process {pid} with signal {signo}") try: os.kill(pid, signo) except ProcessLookupError: logging.warning("Process did not actually exist (anymore?)") while True: try: os.kill(pid, 0) logging.debug(f"Daemon {name}, pid {pid} still exists") time.sleep(1) except ProcessLookupError: break logging.debug(f"Daemon {name} is gone") data = open(this["stdout"]).read() logging.debug(f"{name} stdout, after: {data!r}") data = open(this["stderr"]).read() logging.debug(f"{name} stderr, after: {data!r}") def daemon_no_such_process(ctx, args=None): assert not _daemon_pgrep(args) def daemon_process_exists(ctx, args=None): assert _daemon_pgrep(args) def _daemon_pgrep(pattern): logging.info(f"checking if process exists: pattern={pattern}") exit = subprocess.call( ["pgrep", "-laf", pattern], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) logging.info(f"exit code: {exit}") return exit == 0 def daemon_start_fails_with(ctx, message=None): daemon = ctx.declare("_daemon") error = daemon["_start_error"] logging.debug(f"daemon_start_fails_with: error={error!r}") logging.debug(f"daemon_start_fails_with: message={message!r}") assert message.lower() in error.lower() def daemon_get_stdout(ctx, name): return _daemon_get_output(ctx, name, "stdout") def daemon_get_stderr(ctx, name): return _daemon_get_output(ctx, name, "stderr") def _daemon_get_output(ctx, name, which): ns = ctx.declare("_daemon") this = ns[name] filename = this[which] data = open(filename).read() logging.debug(f"Read {which} of daemon {name} from {filename}: {data!r}") return data def daemon_has_produced_output(ctx, name=None): started = time.time() timeout = 5.0 while time.time() < started + timeout: stdout = daemon_get_stdout(ctx, name) stderr = daemon_get_stderr(ctx, name) if stdout and stderr: break time.sleep(0.1) def daemon_stdout_is(ctx, name=None, text=None): daemon_get_stdout = globals()["daemon_get_stdout"] _daemon_output_is(ctx, name, text, daemon_get_stdout) def daemon_stderr_is(ctx, name=None, text=None): daemon_get_stderr = globals()["daemon_get_stderr"] _daemon_output_is(ctx, name, text, daemon_get_stderr) def _daemon_output_is(ctx, name, text, getter): assert_eq = globals()["assert_eq"] text = bytes(text, "UTF-8").decode("unicode_escape") output = getter(ctx, name) assert_eq(text, output) #---------------------------------------------------------------------------- # This code comes from: lib/runcmd.py import logging import os import re import shlex import subprocess # # Helper functions. # # Get exit code or other stored data about the latest command run by # runcmd_run. def _runcmd_get(ctx, name): ns = ctx.declare("_runcmd") return ns[name] def runcmd_get_exit_code(ctx): return _runcmd_get(ctx, "exit") def runcmd_get_stdout(ctx): return _runcmd_get(ctx, "stdout") def runcmd_get_stdout_raw(ctx): return _runcmd_get(ctx, "stdout.raw") def runcmd_get_stderr(ctx): return _runcmd_get(ctx, "stderr") def runcmd_get_stderr_raw(ctx): return _runcmd_get(ctx, "stderr.raw") def runcmd_get_argv(ctx): return _runcmd_get(ctx, "argv") # Run a command, given an argv and other arguments for subprocess.Popen. # # This is meant to be a helper function, not bound directly to a step. The # stdout, stderr, and exit code are stored in the "_runcmd" namespace in the # ctx context. def runcmd_run(ctx, argv, **kwargs): log_value = globals()["log_value"] ns = ctx.declare("_runcmd") # The Subplot Python template empties os.environ at startup, modulo a small # number of variables with carefully chosen values. Here, we don't need to # care about what those variables are, but we do need to not overwrite # them, so we just add anything in the env keyword argument, if any, to # os.environ. env = dict(os.environ) for key, arg in kwargs.pop("env", {}).items(): env[key] = arg pp = ns.get("path-prefix") if pp: env["PATH"] = pp + ":" + env["PATH"] kwargs["stdout"] = subprocess.PIPE kwargs["stderr"] = subprocess.PIPE logging.debug(f"runcmd_run: running command") log_value("argv", 1, dict(enumerate(argv))) log_value("env", 1, env) log_value("kwargs:", 1, kwargs) p = subprocess.Popen(argv, env=env, **kwargs) stdout, stderr = p.communicate("") ns["argv"] = argv ns["stdout.raw"] = stdout ns["stderr.raw"] = stderr ns["stdout"] = stdout.decode("utf-8") ns["stderr"] = stderr.decode("utf-8") ns["exit"] = p.returncode log_value("ns", 1, ns.as_dict()) # Step: prepend srcdir to PATH whenever runcmd runs a command. def runcmd_helper_srcdir_path(ctx): srcdir = globals()["srcdir"] runcmd_prepend_to_path(ctx, srcdir) # Step: This creates a helper script. def runcmd_helper_script(ctx, filename=None): get_file = globals()["get_file"] with open(filename, "wb") as f: f.write(get_file(filename)) # # Step functions for running commands. # def runcmd_prepend_to_path(ctx, dirname=None): ns = ctx.declare("_runcmd") pp = ns.get("path-prefix", "") if pp: pp = f"{pp}:{dirname}" else: pp = dirname ns["path-prefix"] = pp def runcmd_step(ctx, argv0=None, args=None): runcmd_try_to_run(ctx, argv0=argv0, args=args) runcmd_exit_code_is_zero(ctx) def runcmd_step_in(ctx, dirname=None, argv0=None, args=None): runcmd_try_to_run_in(ctx, dirname=dirname, argv0=argv0, args=args) runcmd_exit_code_is_zero(ctx) def runcmd_try_to_run(ctx, argv0=None, args=None): runcmd_try_to_run_in(ctx, dirname=None, argv0=argv0, args=args) def runcmd_try_to_run_in(ctx, dirname=None, argv0=None, args=None): argv = [shlex.quote(argv0)] + shlex.split(args) runcmd_run(ctx, argv, cwd=dirname) # # Step functions for examining exit codes. # def runcmd_exit_code_is_zero(ctx): runcmd_exit_code_is(ctx, exit=0) def runcmd_exit_code_is(ctx, exit=None): assert_eq = globals()["assert_eq"] assert_eq(runcmd_get_exit_code(ctx), int(exit)) def runcmd_exit_code_is_nonzero(ctx): runcmd_exit_code_is_not(ctx, exit=0) def runcmd_exit_code_is_not(ctx, exit=None): assert_ne = globals()["assert_ne"] assert_ne(runcmd_get_exit_code(ctx), int(exit)) # # Step functions and helpers for examining output in various ways. # def runcmd_stdout_is(ctx, text=None): _runcmd_output_is(runcmd_get_stdout(ctx), text) def runcmd_stdout_isnt(ctx, text=None): _runcmd_output_isnt(runcmd_get_stdout(ctx), text) def runcmd_stderr_is(ctx, text=None): _runcmd_output_is(runcmd_get_stderr(ctx), text) def runcmd_stderr_isnt(ctx, text=None): _runcmd_output_isnt(runcmd_get_stderr(ctx), text) def _runcmd_output_is(actual, wanted): assert_eq = globals()["assert_eq"] log_lines = globals()["log_lines"] indent = " " * 4 wanted = bytes(wanted, "utf8").decode("unicode_escape") logging.debug("_runcmd_output_is:") logging.debug(f" actual:") log_lines(indent, actual) logging.debug(f" wanted:") log_lines(indent, wanted) assert_eq(actual, wanted) def _runcmd_output_isnt(actual, wanted): assert_ne = globals()["assert_ne"] log_lines = globals()["log_lines"] indent = " " * 4 wanted = bytes(wanted, "utf8").decode("unicode_escape") logging.debug("_runcmd_output_isnt:") logging.debug(f" actual:") log_lines(indent, actual) logging.debug(f" wanted:") log_lines(indent, wanted) assert_ne(actual, wanted) def runcmd_stdout_contains(ctx, text=None): _runcmd_output_contains(runcmd_get_stdout(ctx), text) def runcmd_stdout_doesnt_contain(ctx, text=None): _runcmd_output_doesnt_contain(runcmd_get_stdout(ctx), text) def runcmd_stderr_contains(ctx, text=None): _runcmd_output_contains(runcmd_get_stderr(ctx), text) def runcmd_stderr_doesnt_contain(ctx, text=None): _runcmd_output_doesnt_contain(runcmd_get_stderr(ctx), text) def _runcmd_output_contains(actual, wanted): assert_eq = globals()["assert_eq"] log_lines = globals()["log_lines"] indent = " " * 4 wanted = bytes(wanted, "utf8").decode("unicode_escape") logging.debug("_runcmd_output_contains:") logging.debug(f" actual:") log_lines(indent, actual) logging.debug(f" wanted:") log_lines(indent, wanted) assert_eq(wanted in actual, True) def _runcmd_output_doesnt_contain(actual, wanted): assert_ne = globals()["assert_ne"] log_lines = globals()["log_lines"] indent = " " * 4 wanted = bytes(wanted, "utf8").decode("unicode_escape") logging.debug("_runcmd_output_doesnt_contain:") logging.debug(f" actual:") log_lines(indent, actual) logging.debug(f" wanted:") log_lines(indent, wanted) assert_ne(wanted in actual, True) def runcmd_stdout_matches_regex(ctx, regex=None): _runcmd_output_matches_regex(runcmd_get_stdout(ctx), regex) def runcmd_stdout_doesnt_match_regex(ctx, regex=None): _runcmd_output_doesnt_match_regex(runcmd_get_stdout(ctx), regex) def runcmd_stderr_matches_regex(ctx, regex=None): _runcmd_output_matches_regex(runcmd_get_stderr(ctx), regex) def runcmd_stderr_doesnt_match_regex(ctx, regex=None): _runcmd_output_doesnt_match_regex(runcmd_get_stderr(ctx), regex) def _runcmd_output_matches_regex(actual, regex): assert_ne = globals()["assert_ne"] log_lines = globals()["log_lines"] indent = " " * 4 r = re.compile(regex) m = r.search(actual) logging.debug("_runcmd_output_matches_regex:") logging.debug(f" actual: {actual!r}") log_lines(indent, actual) logging.debug(f" regex: {regex!r}") logging.debug(f" match: {m}") assert_ne(m, None) def _runcmd_output_doesnt_match_regex(actual, regex): assert_eq = globals()["assert_eq"] log_lines = globals()["log_lines"] indent = " " * 4 r = re.compile(regex) m = r.search(actual) logging.debug("_runcmd_output_doesnt_match_regex:") logging.debug(f" actual: {actual!r}") log_lines(indent, actual) logging.debug(f" regex: {regex!r}") logging.debug(f" match: {m}") assert_eq(m, None) ############################################################################# # Scaffolding for generated test program. # import logging import re # Store context between steps. class Context: def __init__(self): self._vars = {} self._ns = {} def as_dict(self): return dict(self._vars) def get(self, key, default=None): return self._vars.get(key, default) def __getitem__(self, key): return self._vars[key] def __setitem__(self, key, value): # logging.debug("Context: key {!r} set to {!r}".format(key, value)) self._vars[key] = value def keys(self): return self._vars.keys() def __contains__(self, key): return key in self._vars def __delitem__(self, key): del self._vars[key] def __repr__(self): return repr({"vars": self._vars, "namespaces": self._ns}) def declare(self, name): if name not in self._ns: self._ns[name] = NameSpace(name) return self._ns[name] def remember_value(self, name, value): ns = self.declare("_values") if name in ns: raise KeyError(name) ns[name] = value def recall_value(self, name): ns = self.declare("_values") if name not in ns: raise KeyError(name) return ns[name] def expand_values(self, pattern): parts = [] while pattern: m = re.search(r"(?\S*)\}", pattern) if not m: parts.append(pattern) break name = m.group("name") if not name: raise KeyError("empty name in expansion") value = self.recall_value(name) parts.append(value) pattern = pattern[m.end() :] return "".join(parts) class NameSpace: def __init__(self, name): self.name = name self._dict = {} def as_dict(self): return dict(self._dict) def get(self, key, default=None): if key not in self._dict: if default is None: return None self._dict[key] = default return self._dict[key] def __setitem__(self, key, value): self._dict[key] = value def __getitem__(self, key): return self._dict[key] def keys(self): return self._dict.keys() def __contains__(self, key): return key in self._dict def __delitem__(self, key): del self._dict[key] def __repr__(self): return repr(self._dict) # Decode a base64 encoded string. Result is binary or unicode string. import base64 def decode_bytes(s): return base64.b64decode(s) def decode_str(s): return base64.b64decode(s).decode() # Retrieve an embedded test data file using filename. class Files: def __init__(self): self._files = {} def set(self, filename, content): self._files[filename] = content def get(self, filename): return self._files[filename] _files = Files() def store_file(filename, content): _files.set(filename, content) def get_file(filename): return _files.get(filename) # Check two values for equality and give error if they are not equal def assert_eq(a, b): assert a == b, "expected %r == %r" % (a, b) # Check two values for inequality and give error if they are equal def assert_ne(a, b): assert a != b, "expected %r != %r" % (a, b) # Check that two dict values are equal. def assert_dict_eq(a, b): assert isinstance(a, dict) assert isinstance(b, dict) for key in a: assert key in b, f"exected {key} in both dicts" av = a[key] bv = b[key] assert_eq(type(av), type(bv)) if isinstance(av, list): assert_eq(list(sorted(av)), list(sorted(bv))) for key in b: assert key in a, f"exected {key} in both dicts" import logging import os import tempfile ############################################################################# # Code to implement the scenarios. class Step: def __init__(self): self._kind = None self._text = None self._args = {} self._function = None self._cleanup = None def set_kind(self, kind): self._kind = kind def set_text(self, text): self._text = text def set_arg(self, name, value): self._args[name] = value def set_function(self, function): self._function = function def set_cleanup(self, cleanup): self._cleanup = cleanup def do(self, ctx): print(" step: {} {}".format(self._kind, self._text)) logging.info("step: {} {}".format(self._kind, self._text)) self._function(ctx, **self._args) def cleanup(self, ctx): if self._cleanup: print(" cleanup: {} {}".format(self._kind, self._text)) logging.info("cleanup: {} {}".format(self._kind, self._text)) self._cleanup(ctx, **self._args) class Scenario: def __init__(self, ctx): self._title = None self._steps = [] self._ctx = ctx self._logged_env = False def get_title(self): return self._title def set_title(self, title): self._title = title def append_step(self, step): self._steps.append(step) def run(self, datadir, extra_env): print("scenario: {}".format(self._title)) logging.info("Scenario: {}".format(self._title)) scendir = tempfile.mkdtemp(dir=datadir) os.chdir(scendir) self._set_environment_variables_to(scendir, extra_env) done = [] ctx = self._ctx try: for step in self._steps: step.do(ctx) done.append(step) except Exception as e: logging.error(str(e), exc_info=True) for step in reversed(done): step.cleanup(ctx) raise for step in reversed(done): step.cleanup(ctx) def _set_environment_variables_to(self, scendir, extra_env): log_value = globals()["log_value"] minimal = { "PATH": "/bin:/usr/bin", "SHELL": "/bin/sh", "HOME": scendir, "TMPDIR": scendir, } os.environ.clear() os.environ.update(minimal) os.environ.update(extra_env) if not self._logged_env: self._logged_env = True log_value("extra_env", 0, dict(extra_env)) log_value("os.environ", 0, dict(os.environ)) import argparse import logging import os import random import shutil import tarfile import tempfile class MultilineFormatter(logging.Formatter): def format(self, record): s = super().format(record) lines = list(s.splitlines()) return lines.pop(0) + "\n".join(" %s" % line for line in lines) def indent(n): return " " * n def log_value(msg, level, v): if is_multiline_string(v): logging.debug(f"{indent(level)}{msg}:") log_lines(indent(level + 1), v) elif isinstance(v, dict) and v: # Only non-empty dictionaries logging.debug(f"{indent(level)}{msg}:") for k in sorted(v.keys()): log_value(f"{k!r}", level + 1, v[k]) elif isinstance(v, list) and v: # Only non-empty lists logging.debug(f"{indent(level)}{msg}:") for i, x in enumerate(v): log_value(f"{i}", level + 1, x) else: logging.debug(f"{indent(level)}{msg}: {v!r}") def is_multiline_string(v): if isinstance(v, str) and "\n" in v: return True elif isinstance(v, bytes) and b"\n" in v: return True else: return False def log_lines(prefix, v): if isinstance(v, str): nl = "\n" else: nl = b"\n" if nl in v: for line in v.splitlines(keepends=True): logging.debug(f"{prefix}{line!r}") else: logging.debug(f"{prefix}{v!r}") # Remember where we started from. The step functions may need to refer # to files there. srcdir = os.getcwd() print("srcdir", srcdir) # Create a new temporary directory and chdir there. This allows step # functions to create new files in the current working directory # without having to be so careful. _datadir = tempfile.mkdtemp() print("datadir", _datadir) os.chdir(_datadir) def parse_command_line(): p = argparse.ArgumentParser() p.add_argument("--log") p.add_argument("--env", action="append", default=[]) p.add_argument("--save-on-failure") p.add_argument("patterns", nargs="*") return p.parse_args() def setup_logging(args): if args.log: fmt = "%(asctime)s %(levelname)s %(message)s" datefmt = "%Y-%m-%d %H:%M:%S" formatter = MultilineFormatter(fmt, datefmt) filename = os.path.abspath(os.path.join(srcdir, args.log)) handler = logging.FileHandler(filename) handler.setFormatter(formatter) else: handler = logging.NullHandler() logger = logging.getLogger() logger.addHandler(handler) logger.setLevel(logging.DEBUG) def save_directory(dirname, tarname): print("tarname", tarname) logging.info("Saving {} to {}".format(dirname, tarname)) tar = tarfile.open(tarname, "w") tar.add(dirname, arcname="datadir") tar.close() def main(scenarios): args = parse_command_line() setup_logging(args) logging.info("Test program starts") logging.info("patterns: {}".format(args.patterns)) if len(args.patterns) == 0: logging.info("Executing all scenarios") todo = list(scenarios) random.shuffle(todo) else: logging.info("Executing requested scenarios only: {}".format(args.patterns)) patterns = [arg.lower() for arg in args.patterns] todo = [ scen for scen in scenarios if any(pattern in scen.get_title().lower() for pattern in patterns) ] extra_env = {} for env in args.env: (name, value) = env.split("=", 1) extra_env[name] = value try: for scen in todo: scen.run(_datadir, extra_env) except Exception as e: logging.error(str(e), exc_info=True) if args.save_on_failure: print(args.save_on_failure) filename = os.path.abspath(os.path.join(srcdir, args.save_on_failure)) print(filename) save_directory(_datadir, filename) raise shutil.rmtree(_datadir) print("OK, all scenarios finished successfully") logging.info("OK, all scenarios finished successfully") ############################################################################# # Test data files that were embedded in the source document. Base64 # encoding is used to allow arbitrary data. ############################################################################# # Classes for individual scenarios. #---------------------------------------------------------------------------- # Scenario: Find max of a list of one class Scenario_1(): def __init__(self): ctx = Context() self._scenario = Scenario(ctx) self._scenario.set_title(decode_str('RmluZCBtYXggb2YgYSBsaXN0IG9mIG9uZQ==')) # Step: server step = Step() step.set_kind('given') step.set_text(decode_str('c2VydmVy')) step.set_function(start_server) if 'stop_server': step.set_cleanup(stop_server) self._scenario.append_step(step) # Step: I run max-client.py 1 step = Step() step.set_kind('when') step.set_text(decode_str('SSBydW4gbWF4LWNsaWVudC5weSAx')) step.set_function(runcmd_step) if '': step.set_cleanup() self._scenario.append_step(step) name = decode_str('YXJndjA=') text = decode_str('bWF4LWNsaWVudC5weQ==') step.set_arg(name, text) name = decode_str('YXJncw==') text = decode_str('IDE=') step.set_arg(name, text) # Step: answer is 1 step = Step() step.set_kind('then') step.set_text(decode_str('YW5zd2VyIGlzIDE=')) step.set_function(answer_is) if '': step.set_cleanup() self._scenario.append_step(step) name = decode_str('aW5kZXg=') text = decode_str('MQ==') step.set_arg(name, text) def get_title(self): return self._scenario.get_title() def run(self, datadir, extra_env): self._scenario.run(datadir, extra_env) #---------------------------------------------------------------------------- # Scenario: Find max of a list of two class Scenario_2(): def __init__(self): ctx = Context() self._scenario = Scenario(ctx) self._scenario.set_title(decode_str('RmluZCBtYXggb2YgYSBsaXN0IG9mIHR3bw==')) # Step: server step = Step() step.set_kind('given') step.set_text(decode_str('c2VydmVy')) step.set_function(start_server) if 'stop_server': step.set_cleanup(stop_server) self._scenario.append_step(step) # Step: I run max-client.py 5 5 step = Step() step.set_kind('when') step.set_text(decode_str('SSBydW4gbWF4LWNsaWVudC5weSA1IDU=')) step.set_function(runcmd_step) if '': step.set_cleanup() self._scenario.append_step(step) name = decode_str('YXJndjA=') text = decode_str('bWF4LWNsaWVudC5weQ==') step.set_arg(name, text) name = decode_str('YXJncw==') text = decode_str('IDUgNQ==') step.set_arg(name, text) # Step: answer is 5 step = Step() step.set_kind('then') step.set_text(decode_str('YW5zd2VyIGlzIDU=')) step.set_function(answer_is) if '': step.set_cleanup() self._scenario.append_step(step) name = decode_str('aW5kZXg=') text = decode_str('NQ==') step.set_arg(name, text) # Step: I run max-client.py 5 6 step = Step() step.set_kind('when') step.set_text(decode_str('SSBydW4gbWF4LWNsaWVudC5weSA1IDY=')) step.set_function(runcmd_step) if '': step.set_cleanup() self._scenario.append_step(step) name = decode_str('YXJndjA=') text = decode_str('bWF4LWNsaWVudC5weQ==') step.set_arg(name, text) name = decode_str('YXJncw==') text = decode_str('IDUgNg==') step.set_arg(name, text) # Step: answer is 6 step = Step() step.set_kind('then') step.set_text(decode_str('YW5zd2VyIGlzIDY=')) step.set_function(answer_is) if '': step.set_cleanup() self._scenario.append_step(step) name = decode_str('aW5kZXg=') text = decode_str('Ng==') step.set_arg(name, text) # Step: I run max-client.py 6 5 step = Step() step.set_kind('when') step.set_text(decode_str('SSBydW4gbWF4LWNsaWVudC5weSA2IDU=')) step.set_function(runcmd_step) if '': step.set_cleanup() self._scenario.append_step(step) name = decode_str('YXJndjA=') text = decode_str('bWF4LWNsaWVudC5weQ==') step.set_arg(name, text) name = decode_str('YXJncw==') text = decode_str('IDYgNQ==') step.set_arg(name, text) # Step: answer is 6 step = Step() step.set_kind('then') step.set_text(decode_str('YW5zd2VyIGlzIDY=')) step.set_function(answer_is) if '': step.set_cleanup() self._scenario.append_step(step) name = decode_str('aW5kZXg=') text = decode_str('Ng==') step.set_arg(name, text) def get_title(self): return self._scenario.get_title() def run(self, datadir, extra_env): self._scenario.run(datadir, extra_env) #---------------------------------------------------------------------------- # Scenario: Find max of a list of three class Scenario_3(): def __init__(self): ctx = Context() self._scenario = Scenario(ctx) self._scenario.set_title(decode_str('RmluZCBtYXggb2YgYSBsaXN0IG9mIHRocmVl')) # Step: server step = Step() step.set_kind('given') step.set_text(decode_str('c2VydmVy')) step.set_function(start_server) if 'stop_server': step.set_cleanup(stop_server) self._scenario.append_step(step) # Step: I run max-client.py 5 5 5 step = Step() step.set_kind('when') step.set_text(decode_str('SSBydW4gbWF4LWNsaWVudC5weSA1IDUgNQ==')) step.set_function(runcmd_step) if '': step.set_cleanup() self._scenario.append_step(step) name = decode_str('YXJndjA=') text = decode_str('bWF4LWNsaWVudC5weQ==') step.set_arg(name, text) name = decode_str('YXJncw==') text = decode_str('IDUgNSA1') step.set_arg(name, text) # Step: answer is 5 step = Step() step.set_kind('then') step.set_text(decode_str('YW5zd2VyIGlzIDU=')) step.set_function(answer_is) if '': step.set_cleanup() self._scenario.append_step(step) name = decode_str('aW5kZXg=') text = decode_str('NQ==') step.set_arg(name, text) # Step: I run max-client.py 5 5 6 step = Step() step.set_kind('when') step.set_text(decode_str('SSBydW4gbWF4LWNsaWVudC5weSA1IDUgNg==')) step.set_function(runcmd_step) if '': step.set_cleanup() self._scenario.append_step(step) name = decode_str('YXJndjA=') text = decode_str('bWF4LWNsaWVudC5weQ==') step.set_arg(name, text) name = decode_str('YXJncw==') text = decode_str('IDUgNSA2') step.set_arg(name, text) # Step: answer is 6 step = Step() step.set_kind('then') step.set_text(decode_str('YW5zd2VyIGlzIDY=')) step.set_function(answer_is) if '': step.set_cleanup() self._scenario.append_step(step) name = decode_str('aW5kZXg=') text = decode_str('Ng==') step.set_arg(name, text) # Step: I run max-client.py 5 6 5 step = Step() step.set_kind('when') step.set_text(decode_str('SSBydW4gbWF4LWNsaWVudC5weSA1IDYgNQ==')) step.set_function(runcmd_step) if '': step.set_cleanup() self._scenario.append_step(step) name = decode_str('YXJndjA=') text = decode_str('bWF4LWNsaWVudC5weQ==') step.set_arg(name, text) name = decode_str('YXJncw==') text = decode_str('IDUgNiA1') step.set_arg(name, text) # Step: answer is 6 step = Step() step.set_kind('then') step.set_text(decode_str('YW5zd2VyIGlzIDY=')) step.set_function(answer_is) if '': step.set_cleanup() self._scenario.append_step(step) name = decode_str('aW5kZXg=') text = decode_str('Ng==') step.set_arg(name, text) # Step: I run max-client.py 6 5 5 step = Step() step.set_kind('when') step.set_text(decode_str('SSBydW4gbWF4LWNsaWVudC5weSA2IDUgNQ==')) step.set_function(runcmd_step) if '': step.set_cleanup() self._scenario.append_step(step) name = decode_str('YXJndjA=') text = decode_str('bWF4LWNsaWVudC5weQ==') step.set_arg(name, text) name = decode_str('YXJncw==') text = decode_str('IDYgNSA1') step.set_arg(name, text) # Step: answer is 6 step = Step() step.set_kind('then') step.set_text(decode_str('YW5zd2VyIGlzIDY=')) step.set_function(answer_is) if '': step.set_cleanup() self._scenario.append_step(step) name = decode_str('aW5kZXg=') text = decode_str('Ng==') step.set_arg(name, text) # Step: I run max-client.py 5 6 7 step = Step() step.set_kind('when') step.set_text(decode_str('SSBydW4gbWF4LWNsaWVudC5weSA1IDYgNw==')) step.set_function(runcmd_step) if '': step.set_cleanup() self._scenario.append_step(step) name = decode_str('YXJndjA=') text = decode_str('bWF4LWNsaWVudC5weQ==') step.set_arg(name, text) name = decode_str('YXJncw==') text = decode_str('IDUgNiA3') step.set_arg(name, text) # Step: answer is 7 step = Step() step.set_kind('then') step.set_text(decode_str('YW5zd2VyIGlzIDc=')) step.set_function(answer_is) if '': step.set_cleanup() self._scenario.append_step(step) name = decode_str('aW5kZXg=') text = decode_str('Nw==') step.set_arg(name, text) # Step: I run max-client.py 5 7 6 step = Step() step.set_kind('when') step.set_text(decode_str('SSBydW4gbWF4LWNsaWVudC5weSA1IDcgNg==')) step.set_function(runcmd_step) if '': step.set_cleanup() self._scenario.append_step(step) name = decode_str('YXJndjA=') text = decode_str('bWF4LWNsaWVudC5weQ==') step.set_arg(name, text) name = decode_str('YXJncw==') text = decode_str('IDUgNyA2') step.set_arg(name, text) # Step: answer is 7 step = Step() step.set_kind('then') step.set_text(decode_str('YW5zd2VyIGlzIDc=')) step.set_function(answer_is) if '': step.set_cleanup() self._scenario.append_step(step) name = decode_str('aW5kZXg=') text = decode_str('Nw==') step.set_arg(name, text) # Step: I run max-client.py 6 5 7 step = Step() step.set_kind('when') step.set_text(decode_str('SSBydW4gbWF4LWNsaWVudC5weSA2IDUgNw==')) step.set_function(runcmd_step) if '': step.set_cleanup() self._scenario.append_step(step) name = decode_str('YXJndjA=') text = decode_str('bWF4LWNsaWVudC5weQ==') step.set_arg(name, text) name = decode_str('YXJncw==') text = decode_str('IDYgNSA3') step.set_arg(name, text) # Step: answer is 7 step = Step() step.set_kind('then') step.set_text(decode_str('YW5zd2VyIGlzIDc=')) step.set_function(answer_is) if '': step.set_cleanup() self._scenario.append_step(step) name = decode_str('aW5kZXg=') text = decode_str('Nw==') step.set_arg(name, text) # Step: I run max-client.py 6 7 5 step = Step() step.set_kind('when') step.set_text(decode_str('SSBydW4gbWF4LWNsaWVudC5weSA2IDcgNQ==')) step.set_function(runcmd_step) if '': step.set_cleanup() self._scenario.append_step(step) name = decode_str('YXJndjA=') text = decode_str('bWF4LWNsaWVudC5weQ==') step.set_arg(name, text) name = decode_str('YXJncw==') text = decode_str('IDYgNyA1') step.set_arg(name, text) # Step: answer is 7 step = Step() step.set_kind('then') step.set_text(decode_str('YW5zd2VyIGlzIDc=')) step.set_function(answer_is) if '': step.set_cleanup() self._scenario.append_step(step) name = decode_str('aW5kZXg=') text = decode_str('Nw==') step.set_arg(name, text) # Step: I run max-client.py 7 5 6 step = Step() step.set_kind('when') step.set_text(decode_str('SSBydW4gbWF4LWNsaWVudC5weSA3IDUgNg==')) step.set_function(runcmd_step) if '': step.set_cleanup() self._scenario.append_step(step) name = decode_str('YXJndjA=') text = decode_str('bWF4LWNsaWVudC5weQ==') step.set_arg(name, text) name = decode_str('YXJncw==') text = decode_str('IDcgNSA2') step.set_arg(name, text) # Step: answer is 7 step = Step() step.set_kind('then') step.set_text(decode_str('YW5zd2VyIGlzIDc=')) step.set_function(answer_is) if '': step.set_cleanup() self._scenario.append_step(step) name = decode_str('aW5kZXg=') text = decode_str('Nw==') step.set_arg(name, text) # Step: I run max-client.py 7 6 5 step = Step() step.set_kind('when') step.set_text(decode_str('SSBydW4gbWF4LWNsaWVudC5weSA3IDYgNQ==')) step.set_function(runcmd_step) if '': step.set_cleanup() self._scenario.append_step(step) name = decode_str('YXJndjA=') text = decode_str('bWF4LWNsaWVudC5weQ==') step.set_arg(name, text) name = decode_str('YXJncw==') text = decode_str('IDcgNiA1') step.set_arg(name, text) # Step: answer is 7 step = Step() step.set_kind('then') step.set_text(decode_str('YW5zd2VyIGlzIDc=')) step.set_function(answer_is) if '': step.set_cleanup() self._scenario.append_step(step) name = decode_str('aW5kZXg=') text = decode_str('Nw==') step.set_arg(name, text) def get_title(self): return self._scenario.get_title() def run(self, datadir, extra_env): self._scenario.run(datadir, extra_env) _scenarios = { Scenario_1(), Scenario_2(), Scenario_3(), } ############################################################################# # Call main function and clean up. main(_scenarios)