diff options
Diffstat (limited to 'test.py')
-rw-r--r-- | test.py | 1513 |
1 files changed, 1513 insertions, 0 deletions
@@ -0,0 +1,1513 @@ +############################################################################# +# Functions that implement steps. + + +#---------------------------------------------------------------------------- +# This code comes from: oso.py + +import os + + +def start_server(ctx): + # Declare Subplot library names. In the generated Python program, the + # libraries will be included and can just be used via names, but to placate + # automated checks that only see this file, get the names from globals() at + # runtime. + daemon_start_on_port = globals()["daemon_start_on_port"] + runcmd_helper_srcdir_path = globals()["runcmd_helper_srcdir_path"] + srcdir = globals()["srcdir"] + + # This installs srcdir in $PATH so that we can run the client and server + # easily. + runcmd_helper_srcdir_path(ctx) + + # Start server. + server = os.path.join(srcdir, "server.py") + daemon_start_on_port(ctx, path=server, args="", name="server", port=5000) + + +def stop_server(ctx): + daemon_stop = globals()["daemon_stop"] + daemon_stop(ctx, name="server") + + +def answer_is(ctx, index): + assert_eq = globals()["assert_eq"] + runcmd_get_stdout = globals()["runcmd_get_stdout"] + stdout = runcmd_get_stdout(ctx) + assert_eq(stdout.strip(), index) + + +#---------------------------------------------------------------------------- +# This code comes from: lib/daemon.py + +import logging +import os +import signal +import socket +import subprocess +import time + + +# A helper function for testing lib/daemon itself. +def _daemon_shell_script(ctx, filename=None): + get_file = globals()["get_file"] + data = get_file(filename) + with open(filename, "wb") as f: + f.write(data) + os.chmod(filename, 0o755) + + +# Start a daemon that will open a port on localhost. +def daemon_start_on_port(ctx, path=None, args=None, name=None, port=None): + _daemon_start(ctx, path=path, args=args, name=name) + daemon_wait_for_port("localhost", port) + + +# Start a daemon after a little wait. This is used only for testing the +# port-waiting code. +def _daemon_start_soonish(ctx, path=None, args=None, name=None, port=None): + _daemon_start(ctx, path=os.path.abspath(path), args=args, name=name) + daemon = ctx.declare("_daemon") + + # Store the PID of the process we just started so that _daemon_stop_soonish + # can kill it during the cleanup phase. This works around the Subplot + # Python template not giving the step captures to cleanup functions. Note + # that this code assume at most one _soonish function is called. + daemon["_soonish"] = daemon[name]["pid"] + + try: + daemon_wait_for_port("localhost", port) + except Exception as e: + daemon["_start_error"] = repr(e) + + logging.info("pgrep: %r", _daemon_pgrep(path)) + + +def _daemon_stop_soonish(ctx, path=None, args=None, name=None, port=None): + ns = ctx.declare("_daemon") + pid = ns["_soonish"] + logging.debug(f"Stopping soonishly-started daemon, {pid}") + signo = signal.SIGKILL + try: + os.kill(pid, signo) + except ProcessLookupError: + logging.warning("Process did not actually exist (anymore?)") + + +# Start a daeamon, get its PID. Don't wait for a port or anything. This is +# meant for background processes that don't have port. Useful for testing the +# lib/daemon library of Subplot, but not much else. +def _daemon_start(ctx, path=None, args=None, name=None): + runcmd_run = globals()["runcmd_run"] + runcmd_exit_code_is = globals()["runcmd_exit_code_is"] + runcmd_get_exit_code = globals()["runcmd_get_exit_code"] + runcmd_get_stderr = globals()["runcmd_get_stderr"] + runcmd_prepend_to_path = globals()["runcmd_prepend_to_path"] + + path = os.path.abspath(path) + argv = [path] + args.split() + + logging.debug(f"Starting daemon {name}") + logging.debug(f" ctx={ctx.as_dict()}") + logging.debug(f" name={name}") + logging.debug(f" path={path}") + logging.debug(f" args={args}") + logging.debug(f" argv={argv}") + + ns = ctx.declare("_daemon") + + this = ns[name] = { + "pid-file": f"{name}.pid", + "stderr": f"{name}.stderr", + "stdout": f"{name}.stdout", + } + + # Debian installs `daemonize` to /usr/sbin, which isn't part of the minimal + # environment that Subplot sets up. So we add /usr/sbin to the PATH. + runcmd_prepend_to_path(ctx, "/usr/sbin") + runcmd_run( + ctx, + [ + "daemonize", + "-c", + os.getcwd(), + "-p", + this["pid-file"], + "-e", + this["stderr"], + "-o", + this["stdout"], + ] + + argv, + ) + + # Check that daemonize has exited OK. If it hasn't, it didn't start the + # background process at all. If so, log the stderr in case there was + # something useful there for debugging. + exit = runcmd_get_exit_code(ctx) + if exit != 0: + stderr = runcmd_get_stderr(ctx) + logging.error(f"daemon {name} stderr: {stderr}") + runcmd_exit_code_is(ctx, 0) + + # Get the pid of the background process, from the pid file created by + # daemonize. We don't need to wait for it, since we know daemonize already + # exited. If it isn't there now, it's won't appear later. + if not os.path.exists(this["pid-file"]): + raise Exception("daemonize didn't create a PID file") + + this["pid"] = _daemon_wait_for_pid(this["pid-file"], 10.0) + + logging.debug(f"Started daemon {name}") + logging.debug(f" pid={this['pid']}") + logging.debug(f" ctx={ctx.as_dict()}") + + +def _daemon_wait_for_pid(filename, timeout): + start = time.time() + while time.time() < start + timeout: + with open(filename) as f: + data = f.read().strip() + if data: + return int(data) + raise Exception("daemonize created a PID file without a PID") + + +def daemon_wait_for_port(host, port, timeout=5.0): + addr = (host, port) + until = time.time() + timeout + while True: + try: + s = socket.create_connection(addr, timeout=timeout) + s.close() + return + except socket.timeout: + logging.error( + f"daemon did not respond at port {port} within {timeout} seconds" + ) + raise + except socket.error as e: + logging.info(f"could not connect to daemon at {port}: {e}") + pass + if time.time() >= until: + logging.error( + f"could not connect to daemon at {port} within {timeout} seconds" + ) + raise ConnectionRefusedError() + # Sleep a bit to avoid consuming too much CPU while busy-waiting. + time.sleep(0.1) + + +# Stop a daemon. +def daemon_stop(ctx, path=None, args=None, name=None): + logging.debug(f"Stopping daemon {name}") + + ns = ctx.declare("_daemon") + logging.debug(f" ns={ns}") + pid = ns[name]["pid"] + signo = signal.SIGTERM + + this = ns[name] + data = open(this["stdout"]).read() + logging.debug(f"{name} stdout, before: {data!r}") + data = open(this["stderr"]).read() + logging.debug(f"{name} stderr, before: {data!r}") + + logging.debug(f"Terminating process {pid} with signal {signo}") + try: + os.kill(pid, signo) + except ProcessLookupError: + logging.warning("Process did not actually exist (anymore?)") + + while True: + try: + os.kill(pid, 0) + logging.debug(f"Daemon {name}, pid {pid} still exists") + time.sleep(1) + except ProcessLookupError: + break + logging.debug(f"Daemon {name} is gone") + + data = open(this["stdout"]).read() + logging.debug(f"{name} stdout, after: {data!r}") + data = open(this["stderr"]).read() + logging.debug(f"{name} stderr, after: {data!r}") + + +def daemon_no_such_process(ctx, args=None): + assert not _daemon_pgrep(args) + + +def daemon_process_exists(ctx, args=None): + assert _daemon_pgrep(args) + + +def _daemon_pgrep(pattern): + logging.info(f"checking if process exists: pattern={pattern}") + exit = subprocess.call( + ["pgrep", "-laf", pattern], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL + ) + logging.info(f"exit code: {exit}") + return exit == 0 + + +def daemon_start_fails_with(ctx, message=None): + daemon = ctx.declare("_daemon") + error = daemon["_start_error"] + logging.debug(f"daemon_start_fails_with: error={error!r}") + logging.debug(f"daemon_start_fails_with: message={message!r}") + assert message.lower() in error.lower() + + +def daemon_get_stdout(ctx, name): + return _daemon_get_output(ctx, name, "stdout") + + +def daemon_get_stderr(ctx, name): + return _daemon_get_output(ctx, name, "stderr") + + +def _daemon_get_output(ctx, name, which): + ns = ctx.declare("_daemon") + this = ns[name] + filename = this[which] + data = open(filename).read() + logging.debug(f"Read {which} of daemon {name} from {filename}: {data!r}") + return data + + +def daemon_has_produced_output(ctx, name=None): + started = time.time() + timeout = 5.0 + while time.time() < started + timeout: + stdout = daemon_get_stdout(ctx, name) + stderr = daemon_get_stderr(ctx, name) + if stdout and stderr: + break + time.sleep(0.1) + + +def daemon_stdout_is(ctx, name=None, text=None): + daemon_get_stdout = globals()["daemon_get_stdout"] + _daemon_output_is(ctx, name, text, daemon_get_stdout) + + +def daemon_stderr_is(ctx, name=None, text=None): + daemon_get_stderr = globals()["daemon_get_stderr"] + _daemon_output_is(ctx, name, text, daemon_get_stderr) + + +def _daemon_output_is(ctx, name, text, getter): + assert_eq = globals()["assert_eq"] + text = bytes(text, "UTF-8").decode("unicode_escape") + output = getter(ctx, name) + assert_eq(text, output) + + +#---------------------------------------------------------------------------- +# This code comes from: lib/runcmd.py + +import logging +import os +import re +import shlex +import subprocess + + +# +# Helper functions. +# + +# Get exit code or other stored data about the latest command run by +# runcmd_run. + + +def _runcmd_get(ctx, name): + ns = ctx.declare("_runcmd") + return ns[name] + + +def runcmd_get_exit_code(ctx): + return _runcmd_get(ctx, "exit") + + +def runcmd_get_stdout(ctx): + return _runcmd_get(ctx, "stdout") + + +def runcmd_get_stdout_raw(ctx): + return _runcmd_get(ctx, "stdout.raw") + + +def runcmd_get_stderr(ctx): + return _runcmd_get(ctx, "stderr") + + +def runcmd_get_stderr_raw(ctx): + return _runcmd_get(ctx, "stderr.raw") + + +def runcmd_get_argv(ctx): + return _runcmd_get(ctx, "argv") + + +# Run a command, given an argv and other arguments for subprocess.Popen. +# +# This is meant to be a helper function, not bound directly to a step. The +# stdout, stderr, and exit code are stored in the "_runcmd" namespace in the +# ctx context. +def runcmd_run(ctx, argv, **kwargs): + log_value = globals()["log_value"] + + ns = ctx.declare("_runcmd") + + # The Subplot Python template empties os.environ at startup, modulo a small + # number of variables with carefully chosen values. Here, we don't need to + # care about what those variables are, but we do need to not overwrite + # them, so we just add anything in the env keyword argument, if any, to + # os.environ. + env = dict(os.environ) + for key, arg in kwargs.pop("env", {}).items(): + env[key] = arg + + pp = ns.get("path-prefix") + if pp: + env["PATH"] = pp + ":" + env["PATH"] + + kwargs["stdout"] = subprocess.PIPE + kwargs["stderr"] = subprocess.PIPE + + logging.debug(f"runcmd_run: running command") + log_value("argv", 1, dict(enumerate(argv))) + log_value("env", 1, env) + log_value("kwargs:", 1, kwargs) + + p = subprocess.Popen(argv, env=env, **kwargs) + stdout, stderr = p.communicate("") + + ns["argv"] = argv + ns["stdout.raw"] = stdout + ns["stderr.raw"] = stderr + ns["stdout"] = stdout.decode("utf-8") + ns["stderr"] = stderr.decode("utf-8") + ns["exit"] = p.returncode + + log_value("ns", 1, ns.as_dict()) + + +# Step: prepend srcdir to PATH whenever runcmd runs a command. +def runcmd_helper_srcdir_path(ctx): + srcdir = globals()["srcdir"] + runcmd_prepend_to_path(ctx, srcdir) + + +# Step: This creates a helper script. +def runcmd_helper_script(ctx, filename=None): + get_file = globals()["get_file"] + with open(filename, "wb") as f: + f.write(get_file(filename)) + + +# +# Step functions for running commands. +# + + +def runcmd_prepend_to_path(ctx, dirname=None): + ns = ctx.declare("_runcmd") + pp = ns.get("path-prefix", "") + if pp: + pp = f"{pp}:{dirname}" + else: + pp = dirname + ns["path-prefix"] = pp + + +def runcmd_step(ctx, argv0=None, args=None): + runcmd_try_to_run(ctx, argv0=argv0, args=args) + runcmd_exit_code_is_zero(ctx) + + +def runcmd_step_in(ctx, dirname=None, argv0=None, args=None): + runcmd_try_to_run_in(ctx, dirname=dirname, argv0=argv0, args=args) + runcmd_exit_code_is_zero(ctx) + + +def runcmd_try_to_run(ctx, argv0=None, args=None): + runcmd_try_to_run_in(ctx, dirname=None, argv0=argv0, args=args) + + +def runcmd_try_to_run_in(ctx, dirname=None, argv0=None, args=None): + argv = [shlex.quote(argv0)] + shlex.split(args) + runcmd_run(ctx, argv, cwd=dirname) + + +# +# Step functions for examining exit codes. +# + + +def runcmd_exit_code_is_zero(ctx): + runcmd_exit_code_is(ctx, exit=0) + + +def runcmd_exit_code_is(ctx, exit=None): + assert_eq = globals()["assert_eq"] + assert_eq(runcmd_get_exit_code(ctx), int(exit)) + + +def runcmd_exit_code_is_nonzero(ctx): + runcmd_exit_code_is_not(ctx, exit=0) + + +def runcmd_exit_code_is_not(ctx, exit=None): + assert_ne = globals()["assert_ne"] + assert_ne(runcmd_get_exit_code(ctx), int(exit)) + + +# +# Step functions and helpers for examining output in various ways. +# + + +def runcmd_stdout_is(ctx, text=None): + _runcmd_output_is(runcmd_get_stdout(ctx), text) + + +def runcmd_stdout_isnt(ctx, text=None): + _runcmd_output_isnt(runcmd_get_stdout(ctx), text) + + +def runcmd_stderr_is(ctx, text=None): + _runcmd_output_is(runcmd_get_stderr(ctx), text) + + +def runcmd_stderr_isnt(ctx, text=None): + _runcmd_output_isnt(runcmd_get_stderr(ctx), text) + + +def _runcmd_output_is(actual, wanted): + assert_eq = globals()["assert_eq"] + log_lines = globals()["log_lines"] + indent = " " * 4 + + wanted = bytes(wanted, "utf8").decode("unicode_escape") + logging.debug("_runcmd_output_is:") + + logging.debug(f" actual:") + log_lines(indent, actual) + + logging.debug(f" wanted:") + log_lines(indent, wanted) + + assert_eq(actual, wanted) + + +def _runcmd_output_isnt(actual, wanted): + assert_ne = globals()["assert_ne"] + log_lines = globals()["log_lines"] + indent = " " * 4 + + wanted = bytes(wanted, "utf8").decode("unicode_escape") + logging.debug("_runcmd_output_isnt:") + + logging.debug(f" actual:") + log_lines(indent, actual) + + logging.debug(f" wanted:") + log_lines(indent, wanted) + + assert_ne(actual, wanted) + + +def runcmd_stdout_contains(ctx, text=None): + _runcmd_output_contains(runcmd_get_stdout(ctx), text) + + +def runcmd_stdout_doesnt_contain(ctx, text=None): + _runcmd_output_doesnt_contain(runcmd_get_stdout(ctx), text) + + +def runcmd_stderr_contains(ctx, text=None): + _runcmd_output_contains(runcmd_get_stderr(ctx), text) + + +def runcmd_stderr_doesnt_contain(ctx, text=None): + _runcmd_output_doesnt_contain(runcmd_get_stderr(ctx), text) + + +def _runcmd_output_contains(actual, wanted): + assert_eq = globals()["assert_eq"] + log_lines = globals()["log_lines"] + indent = " " * 4 + + wanted = bytes(wanted, "utf8").decode("unicode_escape") + logging.debug("_runcmd_output_contains:") + + logging.debug(f" actual:") + log_lines(indent, actual) + + logging.debug(f" wanted:") + log_lines(indent, wanted) + + assert_eq(wanted in actual, True) + + +def _runcmd_output_doesnt_contain(actual, wanted): + assert_ne = globals()["assert_ne"] + log_lines = globals()["log_lines"] + indent = " " * 4 + + wanted = bytes(wanted, "utf8").decode("unicode_escape") + logging.debug("_runcmd_output_doesnt_contain:") + + logging.debug(f" actual:") + log_lines(indent, actual) + + logging.debug(f" wanted:") + log_lines(indent, wanted) + + assert_ne(wanted in actual, True) + + +def runcmd_stdout_matches_regex(ctx, regex=None): + _runcmd_output_matches_regex(runcmd_get_stdout(ctx), regex) + + +def runcmd_stdout_doesnt_match_regex(ctx, regex=None): + _runcmd_output_doesnt_match_regex(runcmd_get_stdout(ctx), regex) + + +def runcmd_stderr_matches_regex(ctx, regex=None): + _runcmd_output_matches_regex(runcmd_get_stderr(ctx), regex) + + +def runcmd_stderr_doesnt_match_regex(ctx, regex=None): + _runcmd_output_doesnt_match_regex(runcmd_get_stderr(ctx), regex) + + +def _runcmd_output_matches_regex(actual, regex): + assert_ne = globals()["assert_ne"] + log_lines = globals()["log_lines"] + indent = " " * 4 + + r = re.compile(regex) + m = r.search(actual) + + logging.debug("_runcmd_output_matches_regex:") + logging.debug(f" actual: {actual!r}") + log_lines(indent, actual) + + logging.debug(f" regex: {regex!r}") + logging.debug(f" match: {m}") + + assert_ne(m, None) + + +def _runcmd_output_doesnt_match_regex(actual, regex): + assert_eq = globals()["assert_eq"] + log_lines = globals()["log_lines"] + indent = " " * 4 + + r = re.compile(regex) + m = r.search(actual) + + logging.debug("_runcmd_output_doesnt_match_regex:") + logging.debug(f" actual: {actual!r}") + log_lines(indent, actual) + + logging.debug(f" regex: {regex!r}") + logging.debug(f" match: {m}") + + assert_eq(m, None) + + + + +############################################################################# +# Scaffolding for generated test program. + +# import logging +import re + + +# Store context between steps. +class Context: + def __init__(self): + self._vars = {} + self._ns = {} + + def as_dict(self): + return dict(self._vars) + + def get(self, key, default=None): + return self._vars.get(key, default) + + def __getitem__(self, key): + return self._vars[key] + + def __setitem__(self, key, value): + # logging.debug("Context: key {!r} set to {!r}".format(key, value)) + self._vars[key] = value + + def keys(self): + return self._vars.keys() + + def __contains__(self, key): + return key in self._vars + + def __delitem__(self, key): + del self._vars[key] + + def __repr__(self): + return repr({"vars": self._vars, "namespaces": self._ns}) + + def declare(self, name): + if name not in self._ns: + self._ns[name] = NameSpace(name) + return self._ns[name] + + def remember_value(self, name, value): + ns = self.declare("_values") + if name in ns: + raise KeyError(name) + ns[name] = value + + def recall_value(self, name): + ns = self.declare("_values") + if name not in ns: + raise KeyError(name) + return ns[name] + + def expand_values(self, pattern): + parts = [] + while pattern: + m = re.search(r"(?<!\$)\$\{(?P<name>\S*)\}", pattern) + if not m: + parts.append(pattern) + break + name = m.group("name") + if not name: + raise KeyError("empty name in expansion") + value = self.recall_value(name) + parts.append(value) + pattern = pattern[m.end() :] + return "".join(parts) + + +class NameSpace: + def __init__(self, name): + self.name = name + self._dict = {} + + def as_dict(self): + return dict(self._dict) + + def get(self, key, default=None): + if key not in self._dict: + if default is None: + return None + self._dict[key] = default + return self._dict[key] + + def __setitem__(self, key, value): + self._dict[key] = value + + def __getitem__(self, key): + return self._dict[key] + + def keys(self): + return self._dict.keys() + + def __contains__(self, key): + return key in self._dict + + def __delitem__(self, key): + del self._dict[key] + + def __repr__(self): + return repr(self._dict) + +# Decode a base64 encoded string. Result is binary or unicode string. + + +import base64 + + +def decode_bytes(s): + return base64.b64decode(s) + + +def decode_str(s): + return base64.b64decode(s).decode() + +# Retrieve an embedded test data file using filename. + + +class Files: + def __init__(self): + self._files = {} + + def set(self, filename, content): + self._files[filename] = content + + def get(self, filename): + return self._files[filename] + + +_files = Files() + + +def store_file(filename, content): + _files.set(filename, content) + + +def get_file(filename): + return _files.get(filename) + +# Check two values for equality and give error if they are not equal +def assert_eq(a, b): + assert a == b, "expected %r == %r" % (a, b) + + +# Check two values for inequality and give error if they are equal +def assert_ne(a, b): + assert a != b, "expected %r != %r" % (a, b) + + +# Check that two dict values are equal. +def assert_dict_eq(a, b): + assert isinstance(a, dict) + assert isinstance(b, dict) + for key in a: + assert key in b, f"exected {key} in both dicts" + av = a[key] + bv = b[key] + assert_eq(type(av), type(bv)) + if isinstance(av, list): + assert_eq(list(sorted(av)), list(sorted(bv))) + for key in b: + assert key in a, f"exected {key} in both dicts" + +import logging +import os +import tempfile + + +############################################################################# +# Code to implement the scenarios. + + +class Step: + def __init__(self): + self._kind = None + self._text = None + self._args = {} + self._function = None + self._cleanup = None + + def set_kind(self, kind): + self._kind = kind + + def set_text(self, text): + self._text = text + + def set_arg(self, name, value): + self._args[name] = value + + def set_function(self, function): + self._function = function + + def set_cleanup(self, cleanup): + self._cleanup = cleanup + + def do(self, ctx): + print(" step: {} {}".format(self._kind, self._text)) + logging.info("step: {} {}".format(self._kind, self._text)) + self._function(ctx, **self._args) + + def cleanup(self, ctx): + if self._cleanup: + print(" cleanup: {} {}".format(self._kind, self._text)) + logging.info("cleanup: {} {}".format(self._kind, self._text)) + self._cleanup(ctx, **self._args) + + +class Scenario: + def __init__(self, ctx): + self._title = None + self._steps = [] + self._ctx = ctx + self._logged_env = False + + def get_title(self): + return self._title + + def set_title(self, title): + self._title = title + + def append_step(self, step): + self._steps.append(step) + + def run(self, datadir, extra_env): + print("scenario: {}".format(self._title)) + logging.info("Scenario: {}".format(self._title)) + + scendir = tempfile.mkdtemp(dir=datadir) + os.chdir(scendir) + self._set_environment_variables_to(scendir, extra_env) + + done = [] + ctx = self._ctx + try: + for step in self._steps: + step.do(ctx) + done.append(step) + except Exception as e: + logging.error(str(e), exc_info=True) + for step in reversed(done): + step.cleanup(ctx) + raise + for step in reversed(done): + step.cleanup(ctx) + + def _set_environment_variables_to(self, scendir, extra_env): + log_value = globals()["log_value"] + + minimal = { + "PATH": "/bin:/usr/bin", + "SHELL": "/bin/sh", + "HOME": scendir, + "TMPDIR": scendir, + } + + os.environ.clear() + os.environ.update(minimal) + os.environ.update(extra_env) + if not self._logged_env: + self._logged_env = True + log_value("extra_env", 0, dict(extra_env)) + log_value("os.environ", 0, dict(os.environ)) + +import argparse +import logging +import os +import random +import shutil +import tarfile +import tempfile + + +class MultilineFormatter(logging.Formatter): + def format(self, record): + s = super().format(record) + lines = list(s.splitlines()) + return lines.pop(0) + "\n".join(" %s" % line for line in lines) + + +def indent(n): + return " " * n + + +def log_value(msg, level, v): + if is_multiline_string(v): + logging.debug(f"{indent(level)}{msg}:") + log_lines(indent(level + 1), v) + elif isinstance(v, dict) and v: + # Only non-empty dictionaries + logging.debug(f"{indent(level)}{msg}:") + for k in sorted(v.keys()): + log_value(f"{k!r}", level + 1, v[k]) + elif isinstance(v, list) and v: + # Only non-empty lists + logging.debug(f"{indent(level)}{msg}:") + for i, x in enumerate(v): + log_value(f"{i}", level + 1, x) + else: + logging.debug(f"{indent(level)}{msg}: {v!r}") + + +def is_multiline_string(v): + if isinstance(v, str) and "\n" in v: + return True + elif isinstance(v, bytes) and b"\n" in v: + return True + else: + return False + + +def log_lines(prefix, v): + if isinstance(v, str): + nl = "\n" + else: + nl = b"\n" + if nl in v: + for line in v.splitlines(keepends=True): + logging.debug(f"{prefix}{line!r}") + else: + logging.debug(f"{prefix}{v!r}") + + +# Remember where we started from. The step functions may need to refer +# to files there. +srcdir = os.getcwd() +print("srcdir", srcdir) + +# Create a new temporary directory and chdir there. This allows step +# functions to create new files in the current working directory +# without having to be so careful. +_datadir = tempfile.mkdtemp() +print("datadir", _datadir) +os.chdir(_datadir) + + +def parse_command_line(): + p = argparse.ArgumentParser() + p.add_argument("--log") + p.add_argument("--env", action="append", default=[]) + p.add_argument("--save-on-failure") + p.add_argument("patterns", nargs="*") + return p.parse_args() + + +def setup_logging(args): + if args.log: + fmt = "%(asctime)s %(levelname)s %(message)s" + datefmt = "%Y-%m-%d %H:%M:%S" + formatter = MultilineFormatter(fmt, datefmt) + + filename = os.path.abspath(os.path.join(srcdir, args.log)) + handler = logging.FileHandler(filename) + handler.setFormatter(formatter) + else: + handler = logging.NullHandler() + + logger = logging.getLogger() + logger.addHandler(handler) + logger.setLevel(logging.DEBUG) + + +def save_directory(dirname, tarname): + print("tarname", tarname) + logging.info("Saving {} to {}".format(dirname, tarname)) + tar = tarfile.open(tarname, "w") + tar.add(dirname, arcname="datadir") + tar.close() + + +def main(scenarios): + args = parse_command_line() + setup_logging(args) + logging.info("Test program starts") + + logging.info("patterns: {}".format(args.patterns)) + if len(args.patterns) == 0: + logging.info("Executing all scenarios") + todo = list(scenarios) + random.shuffle(todo) + else: + logging.info("Executing requested scenarios only: {}".format(args.patterns)) + patterns = [arg.lower() for arg in args.patterns] + todo = [ + scen + for scen in scenarios + if any(pattern in scen.get_title().lower() for pattern in patterns) + ] + + extra_env = {} + for env in args.env: + (name, value) = env.split("=", 1) + extra_env[name] = value + + try: + for scen in todo: + scen.run(_datadir, extra_env) + except Exception as e: + logging.error(str(e), exc_info=True) + if args.save_on_failure: + print(args.save_on_failure) + filename = os.path.abspath(os.path.join(srcdir, args.save_on_failure)) + print(filename) + save_directory(_datadir, filename) + raise + + shutil.rmtree(_datadir) + print("OK, all scenarios finished successfully") + logging.info("OK, all scenarios finished successfully") + + + +############################################################################# +# Test data files that were embedded in the source document. Base64 +# encoding is used to allow arbitrary data. + + + + + +############################################################################# +# Classes for individual scenarios. + + +#---------------------------------------------------------------------------- +# Scenario: Find max of a list of one +class Scenario_1(): + def __init__(self): + ctx = Context() + self._scenario = Scenario(ctx) + self._scenario.set_title(decode_str('RmluZCBtYXggb2YgYSBsaXN0IG9mIG9uZQ==')) + + # Step: server + step = Step() + step.set_kind('given') + step.set_text(decode_str('c2VydmVy')) + step.set_function(start_server) + if 'stop_server': + step.set_cleanup(stop_server) + self._scenario.append_step(step) + + # Step: I run max-client.py 1 + step = Step() + step.set_kind('when') + step.set_text(decode_str('SSBydW4gbWF4LWNsaWVudC5weSAx')) + step.set_function(runcmd_step) + if '': + step.set_cleanup() + self._scenario.append_step(step) + name = decode_str('YXJndjA=') + text = decode_str('bWF4LWNsaWVudC5weQ==') + step.set_arg(name, text) + name = decode_str('YXJncw==') + text = decode_str('IDE=') + step.set_arg(name, text) + + # Step: answer is 1 + step = Step() + step.set_kind('then') + step.set_text(decode_str('YW5zd2VyIGlzIDE=')) + step.set_function(answer_is) + if '': + step.set_cleanup() + self._scenario.append_step(step) + name = decode_str('aW5kZXg=') + text = decode_str('MQ==') + step.set_arg(name, text) + + + def get_title(self): + return self._scenario.get_title() + + def run(self, datadir, extra_env): + self._scenario.run(datadir, extra_env) + +#---------------------------------------------------------------------------- +# Scenario: Find max of a list of two +class Scenario_2(): + def __init__(self): + ctx = Context() + self._scenario = Scenario(ctx) + self._scenario.set_title(decode_str('RmluZCBtYXggb2YgYSBsaXN0IG9mIHR3bw==')) + + # Step: server + step = Step() + step.set_kind('given') + step.set_text(decode_str('c2VydmVy')) + step.set_function(start_server) + if 'stop_server': + step.set_cleanup(stop_server) + self._scenario.append_step(step) + + # Step: I run max-client.py 5 5 + step = Step() + step.set_kind('when') + step.set_text(decode_str('SSBydW4gbWF4LWNsaWVudC5weSA1IDU=')) + step.set_function(runcmd_step) + if '': + step.set_cleanup() + self._scenario.append_step(step) + name = decode_str('YXJndjA=') + text = decode_str('bWF4LWNsaWVudC5weQ==') + step.set_arg(name, text) + name = decode_str('YXJncw==') + text = decode_str('IDUgNQ==') + step.set_arg(name, text) + + # Step: answer is 5 + step = Step() + step.set_kind('then') + step.set_text(decode_str('YW5zd2VyIGlzIDU=')) + step.set_function(answer_is) + if '': + step.set_cleanup() + self._scenario.append_step(step) + name = decode_str('aW5kZXg=') + text = decode_str('NQ==') + step.set_arg(name, text) + + # Step: I run max-client.py 5 6 + step = Step() + step.set_kind('when') + step.set_text(decode_str('SSBydW4gbWF4LWNsaWVudC5weSA1IDY=')) + step.set_function(runcmd_step) + if '': + step.set_cleanup() + self._scenario.append_step(step) + name = decode_str('YXJndjA=') + text = decode_str('bWF4LWNsaWVudC5weQ==') + step.set_arg(name, text) + name = decode_str('YXJncw==') + text = decode_str('IDUgNg==') + step.set_arg(name, text) + + # Step: answer is 6 + step = Step() + step.set_kind('then') + step.set_text(decode_str('YW5zd2VyIGlzIDY=')) + step.set_function(answer_is) + if '': + step.set_cleanup() + self._scenario.append_step(step) + name = decode_str('aW5kZXg=') + text = decode_str('Ng==') + step.set_arg(name, text) + + # Step: I run max-client.py 6 5 + step = Step() + step.set_kind('when') + step.set_text(decode_str('SSBydW4gbWF4LWNsaWVudC5weSA2IDU=')) + step.set_function(runcmd_step) + if '': + step.set_cleanup() + self._scenario.append_step(step) + name = decode_str('YXJndjA=') + text = decode_str('bWF4LWNsaWVudC5weQ==') + step.set_arg(name, text) + name = decode_str('YXJncw==') + text = decode_str('IDYgNQ==') + step.set_arg(name, text) + + # Step: answer is 6 + step = Step() + step.set_kind('then') + step.set_text(decode_str('YW5zd2VyIGlzIDY=')) + step.set_function(answer_is) + if '': + step.set_cleanup() + self._scenario.append_step(step) + name = decode_str('aW5kZXg=') + text = decode_str('Ng==') + step.set_arg(name, text) + + + def get_title(self): + return self._scenario.get_title() + + def run(self, datadir, extra_env): + self._scenario.run(datadir, extra_env) + +#---------------------------------------------------------------------------- +# Scenario: Find max of a list of three +class Scenario_3(): + def __init__(self): + ctx = Context() + self._scenario = Scenario(ctx) + self._scenario.set_title(decode_str('RmluZCBtYXggb2YgYSBsaXN0IG9mIHRocmVl')) + + # Step: server + step = Step() + step.set_kind('given') + step.set_text(decode_str('c2VydmVy')) + step.set_function(start_server) + if 'stop_server': + step.set_cleanup(stop_server) + self._scenario.append_step(step) + + # Step: I run max-client.py 5 5 5 + step = Step() + step.set_kind('when') + step.set_text(decode_str('SSBydW4gbWF4LWNsaWVudC5weSA1IDUgNQ==')) + step.set_function(runcmd_step) + if '': + step.set_cleanup() + self._scenario.append_step(step) + name = decode_str('YXJndjA=') + text = decode_str('bWF4LWNsaWVudC5weQ==') + step.set_arg(name, text) + name = decode_str('YXJncw==') + text = decode_str('IDUgNSA1') + step.set_arg(name, text) + + # Step: answer is 5 + step = Step() + step.set_kind('then') + step.set_text(decode_str('YW5zd2VyIGlzIDU=')) + step.set_function(answer_is) + if '': + step.set_cleanup() + self._scenario.append_step(step) + name = decode_str('aW5kZXg=') + text = decode_str('NQ==') + step.set_arg(name, text) + + # Step: I run max-client.py 5 5 6 + step = Step() + step.set_kind('when') + step.set_text(decode_str('SSBydW4gbWF4LWNsaWVudC5weSA1IDUgNg==')) + step.set_function(runcmd_step) + if '': + step.set_cleanup() + self._scenario.append_step(step) + name = decode_str('YXJndjA=') + text = decode_str('bWF4LWNsaWVudC5weQ==') + step.set_arg(name, text) + name = decode_str('YXJncw==') + text = decode_str('IDUgNSA2') + step.set_arg(name, text) + + # Step: answer is 6 + step = Step() + step.set_kind('then') + step.set_text(decode_str('YW5zd2VyIGlzIDY=')) + step.set_function(answer_is) + if '': + step.set_cleanup() + self._scenario.append_step(step) + name = decode_str('aW5kZXg=') + text = decode_str('Ng==') + step.set_arg(name, text) + + # Step: I run max-client.py 5 6 5 + step = Step() + step.set_kind('when') + step.set_text(decode_str('SSBydW4gbWF4LWNsaWVudC5weSA1IDYgNQ==')) + step.set_function(runcmd_step) + if '': + step.set_cleanup() + self._scenario.append_step(step) + name = decode_str('YXJndjA=') + text = decode_str('bWF4LWNsaWVudC5weQ==') + step.set_arg(name, text) + name = decode_str('YXJncw==') + text = decode_str('IDUgNiA1') + step.set_arg(name, text) + + # Step: answer is 6 + step = Step() + step.set_kind('then') + step.set_text(decode_str('YW5zd2VyIGlzIDY=')) + step.set_function(answer_is) + if '': + step.set_cleanup() + self._scenario.append_step(step) + name = decode_str('aW5kZXg=') + text = decode_str('Ng==') + step.set_arg(name, text) + + # Step: I run max-client.py 6 5 5 + step = Step() + step.set_kind('when') + step.set_text(decode_str('SSBydW4gbWF4LWNsaWVudC5weSA2IDUgNQ==')) + step.set_function(runcmd_step) + if '': + step.set_cleanup() + self._scenario.append_step(step) + name = decode_str('YXJndjA=') + text = decode_str('bWF4LWNsaWVudC5weQ==') + step.set_arg(name, text) + name = decode_str('YXJncw==') + text = decode_str('IDYgNSA1') + step.set_arg(name, text) + + # Step: answer is 6 + step = Step() + step.set_kind('then') + step.set_text(decode_str('YW5zd2VyIGlzIDY=')) + step.set_function(answer_is) + if '': + step.set_cleanup() + self._scenario.append_step(step) + name = decode_str('aW5kZXg=') + text = decode_str('Ng==') + step.set_arg(name, text) + + # Step: I run max-client.py 5 6 7 + step = Step() + step.set_kind('when') + step.set_text(decode_str('SSBydW4gbWF4LWNsaWVudC5weSA1IDYgNw==')) + step.set_function(runcmd_step) + if '': + step.set_cleanup() + self._scenario.append_step(step) + name = decode_str('YXJndjA=') + text = decode_str('bWF4LWNsaWVudC5weQ==') + step.set_arg(name, text) + name = decode_str('YXJncw==') + text = decode_str('IDUgNiA3') + step.set_arg(name, text) + + # Step: answer is 7 + step = Step() + step.set_kind('then') + step.set_text(decode_str('YW5zd2VyIGlzIDc=')) + step.set_function(answer_is) + if '': + step.set_cleanup() + self._scenario.append_step(step) + name = decode_str('aW5kZXg=') + text = decode_str('Nw==') + step.set_arg(name, text) + + # Step: I run max-client.py 5 7 6 + step = Step() + step.set_kind('when') + step.set_text(decode_str('SSBydW4gbWF4LWNsaWVudC5weSA1IDcgNg==')) + step.set_function(runcmd_step) + if '': + step.set_cleanup() + self._scenario.append_step(step) + name = decode_str('YXJndjA=') + text = decode_str('bWF4LWNsaWVudC5weQ==') + step.set_arg(name, text) + name = decode_str('YXJncw==') + text = decode_str('IDUgNyA2') + step.set_arg(name, text) + + # Step: answer is 7 + step = Step() + step.set_kind('then') + step.set_text(decode_str('YW5zd2VyIGlzIDc=')) + step.set_function(answer_is) + if '': + step.set_cleanup() + self._scenario.append_step(step) + name = decode_str('aW5kZXg=') + text = decode_str('Nw==') + step.set_arg(name, text) + + # Step: I run max-client.py 6 5 7 + step = Step() + step.set_kind('when') + step.set_text(decode_str('SSBydW4gbWF4LWNsaWVudC5weSA2IDUgNw==')) + step.set_function(runcmd_step) + if '': + step.set_cleanup() + self._scenario.append_step(step) + name = decode_str('YXJndjA=') + text = decode_str('bWF4LWNsaWVudC5weQ==') + step.set_arg(name, text) + name = decode_str('YXJncw==') + text = decode_str('IDYgNSA3') + step.set_arg(name, text) + + # Step: answer is 7 + step = Step() + step.set_kind('then') + step.set_text(decode_str('YW5zd2VyIGlzIDc=')) + step.set_function(answer_is) + if '': + step.set_cleanup() + self._scenario.append_step(step) + name = decode_str('aW5kZXg=') + text = decode_str('Nw==') + step.set_arg(name, text) + + # Step: I run max-client.py 6 7 5 + step = Step() + step.set_kind('when') + step.set_text(decode_str('SSBydW4gbWF4LWNsaWVudC5weSA2IDcgNQ==')) + step.set_function(runcmd_step) + if '': + step.set_cleanup() + self._scenario.append_step(step) + name = decode_str('YXJndjA=') + text = decode_str('bWF4LWNsaWVudC5weQ==') + step.set_arg(name, text) + name = decode_str('YXJncw==') + text = decode_str('IDYgNyA1') + step.set_arg(name, text) + + # Step: answer is 7 + step = Step() + step.set_kind('then') + step.set_text(decode_str('YW5zd2VyIGlzIDc=')) + step.set_function(answer_is) + if '': + step.set_cleanup() + self._scenario.append_step(step) + name = decode_str('aW5kZXg=') + text = decode_str('Nw==') + step.set_arg(name, text) + + # Step: I run max-client.py 7 5 6 + step = Step() + step.set_kind('when') + step.set_text(decode_str('SSBydW4gbWF4LWNsaWVudC5weSA3IDUgNg==')) + step.set_function(runcmd_step) + if '': + step.set_cleanup() + self._scenario.append_step(step) + name = decode_str('YXJndjA=') + text = decode_str('bWF4LWNsaWVudC5weQ==') + step.set_arg(name, text) + name = decode_str('YXJncw==') + text = decode_str('IDcgNSA2') + step.set_arg(name, text) + + # Step: answer is 7 + step = Step() + step.set_kind('then') + step.set_text(decode_str('YW5zd2VyIGlzIDc=')) + step.set_function(answer_is) + if '': + step.set_cleanup() + self._scenario.append_step(step) + name = decode_str('aW5kZXg=') + text = decode_str('Nw==') + step.set_arg(name, text) + + # Step: I run max-client.py 7 6 5 + step = Step() + step.set_kind('when') + step.set_text(decode_str('SSBydW4gbWF4LWNsaWVudC5weSA3IDYgNQ==')) + step.set_function(runcmd_step) + if '': + step.set_cleanup() + self._scenario.append_step(step) + name = decode_str('YXJndjA=') + text = decode_str('bWF4LWNsaWVudC5weQ==') + step.set_arg(name, text) + name = decode_str('YXJncw==') + text = decode_str('IDcgNiA1') + step.set_arg(name, text) + + # Step: answer is 7 + step = Step() + step.set_kind('then') + step.set_text(decode_str('YW5zd2VyIGlzIDc=')) + step.set_function(answer_is) + if '': + step.set_cleanup() + self._scenario.append_step(step) + name = decode_str('aW5kZXg=') + text = decode_str('Nw==') + step.set_arg(name, text) + + + def get_title(self): + return self._scenario.get_title() + + def run(self, datadir, extra_env): + self._scenario.run(datadir, extra_env) + + +_scenarios = { + Scenario_1(), + Scenario_2(), + Scenario_3(), +} + + +############################################################################# +# Call main function and clean up. +main(_scenarios) |