summaryrefslogtreecommitdiff
path: root/test.py
diff options
context:
space:
mode:
Diffstat (limited to 'test.py')
-rw-r--r--test.py1513
1 files changed, 1513 insertions, 0 deletions
diff --git a/test.py b/test.py
new file mode 100644
index 0000000..698cefd
--- /dev/null
+++ b/test.py
@@ -0,0 +1,1513 @@
+#############################################################################
+# Functions that implement steps.
+
+
+#----------------------------------------------------------------------------
+# This code comes from: oso.py
+
+import os
+
+
+def start_server(ctx):
+ # Declare Subplot library names. In the generated Python program, the
+ # libraries will be included and can just be used via names, but to placate
+ # automated checks that only see this file, get the names from globals() at
+ # runtime.
+ daemon_start_on_port = globals()["daemon_start_on_port"]
+ runcmd_helper_srcdir_path = globals()["runcmd_helper_srcdir_path"]
+ srcdir = globals()["srcdir"]
+
+ # This installs srcdir in $PATH so that we can run the client and server
+ # easily.
+ runcmd_helper_srcdir_path(ctx)
+
+ # Start server.
+ server = os.path.join(srcdir, "server.py")
+ daemon_start_on_port(ctx, path=server, args="", name="server", port=5000)
+
+
+def stop_server(ctx):
+ daemon_stop = globals()["daemon_stop"]
+ daemon_stop(ctx, name="server")
+
+
+def answer_is(ctx, index):
+ assert_eq = globals()["assert_eq"]
+ runcmd_get_stdout = globals()["runcmd_get_stdout"]
+ stdout = runcmd_get_stdout(ctx)
+ assert_eq(stdout.strip(), index)
+
+
+#----------------------------------------------------------------------------
+# This code comes from: lib/daemon.py
+
+import logging
+import os
+import signal
+import socket
+import subprocess
+import time
+
+
+# A helper function for testing lib/daemon itself.
+def _daemon_shell_script(ctx, filename=None):
+ get_file = globals()["get_file"]
+ data = get_file(filename)
+ with open(filename, "wb") as f:
+ f.write(data)
+ os.chmod(filename, 0o755)
+
+
+# Start a daemon that will open a port on localhost.
+def daemon_start_on_port(ctx, path=None, args=None, name=None, port=None):
+ _daemon_start(ctx, path=path, args=args, name=name)
+ daemon_wait_for_port("localhost", port)
+
+
+# Start a daemon after a little wait. This is used only for testing the
+# port-waiting code.
+def _daemon_start_soonish(ctx, path=None, args=None, name=None, port=None):
+ _daemon_start(ctx, path=os.path.abspath(path), args=args, name=name)
+ daemon = ctx.declare("_daemon")
+
+ # Store the PID of the process we just started so that _daemon_stop_soonish
+ # can kill it during the cleanup phase. This works around the Subplot
+ # Python template not giving the step captures to cleanup functions. Note
+ # that this code assume at most one _soonish function is called.
+ daemon["_soonish"] = daemon[name]["pid"]
+
+ try:
+ daemon_wait_for_port("localhost", port)
+ except Exception as e:
+ daemon["_start_error"] = repr(e)
+
+ logging.info("pgrep: %r", _daemon_pgrep(path))
+
+
+def _daemon_stop_soonish(ctx, path=None, args=None, name=None, port=None):
+ ns = ctx.declare("_daemon")
+ pid = ns["_soonish"]
+ logging.debug(f"Stopping soonishly-started daemon, {pid}")
+ signo = signal.SIGKILL
+ try:
+ os.kill(pid, signo)
+ except ProcessLookupError:
+ logging.warning("Process did not actually exist (anymore?)")
+
+
+# Start a daeamon, get its PID. Don't wait for a port or anything. This is
+# meant for background processes that don't have port. Useful for testing the
+# lib/daemon library of Subplot, but not much else.
+def _daemon_start(ctx, path=None, args=None, name=None):
+ runcmd_run = globals()["runcmd_run"]
+ runcmd_exit_code_is = globals()["runcmd_exit_code_is"]
+ runcmd_get_exit_code = globals()["runcmd_get_exit_code"]
+ runcmd_get_stderr = globals()["runcmd_get_stderr"]
+ runcmd_prepend_to_path = globals()["runcmd_prepend_to_path"]
+
+ path = os.path.abspath(path)
+ argv = [path] + args.split()
+
+ logging.debug(f"Starting daemon {name}")
+ logging.debug(f" ctx={ctx.as_dict()}")
+ logging.debug(f" name={name}")
+ logging.debug(f" path={path}")
+ logging.debug(f" args={args}")
+ logging.debug(f" argv={argv}")
+
+ ns = ctx.declare("_daemon")
+
+ this = ns[name] = {
+ "pid-file": f"{name}.pid",
+ "stderr": f"{name}.stderr",
+ "stdout": f"{name}.stdout",
+ }
+
+ # Debian installs `daemonize` to /usr/sbin, which isn't part of the minimal
+ # environment that Subplot sets up. So we add /usr/sbin to the PATH.
+ runcmd_prepend_to_path(ctx, "/usr/sbin")
+ runcmd_run(
+ ctx,
+ [
+ "daemonize",
+ "-c",
+ os.getcwd(),
+ "-p",
+ this["pid-file"],
+ "-e",
+ this["stderr"],
+ "-o",
+ this["stdout"],
+ ]
+ + argv,
+ )
+
+ # Check that daemonize has exited OK. If it hasn't, it didn't start the
+ # background process at all. If so, log the stderr in case there was
+ # something useful there for debugging.
+ exit = runcmd_get_exit_code(ctx)
+ if exit != 0:
+ stderr = runcmd_get_stderr(ctx)
+ logging.error(f"daemon {name} stderr: {stderr}")
+ runcmd_exit_code_is(ctx, 0)
+
+ # Get the pid of the background process, from the pid file created by
+ # daemonize. We don't need to wait for it, since we know daemonize already
+ # exited. If it isn't there now, it's won't appear later.
+ if not os.path.exists(this["pid-file"]):
+ raise Exception("daemonize didn't create a PID file")
+
+ this["pid"] = _daemon_wait_for_pid(this["pid-file"], 10.0)
+
+ logging.debug(f"Started daemon {name}")
+ logging.debug(f" pid={this['pid']}")
+ logging.debug(f" ctx={ctx.as_dict()}")
+
+
+def _daemon_wait_for_pid(filename, timeout):
+ start = time.time()
+ while time.time() < start + timeout:
+ with open(filename) as f:
+ data = f.read().strip()
+ if data:
+ return int(data)
+ raise Exception("daemonize created a PID file without a PID")
+
+
+def daemon_wait_for_port(host, port, timeout=5.0):
+ addr = (host, port)
+ until = time.time() + timeout
+ while True:
+ try:
+ s = socket.create_connection(addr, timeout=timeout)
+ s.close()
+ return
+ except socket.timeout:
+ logging.error(
+ f"daemon did not respond at port {port} within {timeout} seconds"
+ )
+ raise
+ except socket.error as e:
+ logging.info(f"could not connect to daemon at {port}: {e}")
+ pass
+ if time.time() >= until:
+ logging.error(
+ f"could not connect to daemon at {port} within {timeout} seconds"
+ )
+ raise ConnectionRefusedError()
+ # Sleep a bit to avoid consuming too much CPU while busy-waiting.
+ time.sleep(0.1)
+
+
+# Stop a daemon.
+def daemon_stop(ctx, path=None, args=None, name=None):
+ logging.debug(f"Stopping daemon {name}")
+
+ ns = ctx.declare("_daemon")
+ logging.debug(f" ns={ns}")
+ pid = ns[name]["pid"]
+ signo = signal.SIGTERM
+
+ this = ns[name]
+ data = open(this["stdout"]).read()
+ logging.debug(f"{name} stdout, before: {data!r}")
+ data = open(this["stderr"]).read()
+ logging.debug(f"{name} stderr, before: {data!r}")
+
+ logging.debug(f"Terminating process {pid} with signal {signo}")
+ try:
+ os.kill(pid, signo)
+ except ProcessLookupError:
+ logging.warning("Process did not actually exist (anymore?)")
+
+ while True:
+ try:
+ os.kill(pid, 0)
+ logging.debug(f"Daemon {name}, pid {pid} still exists")
+ time.sleep(1)
+ except ProcessLookupError:
+ break
+ logging.debug(f"Daemon {name} is gone")
+
+ data = open(this["stdout"]).read()
+ logging.debug(f"{name} stdout, after: {data!r}")
+ data = open(this["stderr"]).read()
+ logging.debug(f"{name} stderr, after: {data!r}")
+
+
+def daemon_no_such_process(ctx, args=None):
+ assert not _daemon_pgrep(args)
+
+
+def daemon_process_exists(ctx, args=None):
+ assert _daemon_pgrep(args)
+
+
+def _daemon_pgrep(pattern):
+ logging.info(f"checking if process exists: pattern={pattern}")
+ exit = subprocess.call(
+ ["pgrep", "-laf", pattern], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
+ )
+ logging.info(f"exit code: {exit}")
+ return exit == 0
+
+
+def daemon_start_fails_with(ctx, message=None):
+ daemon = ctx.declare("_daemon")
+ error = daemon["_start_error"]
+ logging.debug(f"daemon_start_fails_with: error={error!r}")
+ logging.debug(f"daemon_start_fails_with: message={message!r}")
+ assert message.lower() in error.lower()
+
+
+def daemon_get_stdout(ctx, name):
+ return _daemon_get_output(ctx, name, "stdout")
+
+
+def daemon_get_stderr(ctx, name):
+ return _daemon_get_output(ctx, name, "stderr")
+
+
+def _daemon_get_output(ctx, name, which):
+ ns = ctx.declare("_daemon")
+ this = ns[name]
+ filename = this[which]
+ data = open(filename).read()
+ logging.debug(f"Read {which} of daemon {name} from {filename}: {data!r}")
+ return data
+
+
+def daemon_has_produced_output(ctx, name=None):
+ started = time.time()
+ timeout = 5.0
+ while time.time() < started + timeout:
+ stdout = daemon_get_stdout(ctx, name)
+ stderr = daemon_get_stderr(ctx, name)
+ if stdout and stderr:
+ break
+ time.sleep(0.1)
+
+
+def daemon_stdout_is(ctx, name=None, text=None):
+ daemon_get_stdout = globals()["daemon_get_stdout"]
+ _daemon_output_is(ctx, name, text, daemon_get_stdout)
+
+
+def daemon_stderr_is(ctx, name=None, text=None):
+ daemon_get_stderr = globals()["daemon_get_stderr"]
+ _daemon_output_is(ctx, name, text, daemon_get_stderr)
+
+
+def _daemon_output_is(ctx, name, text, getter):
+ assert_eq = globals()["assert_eq"]
+ text = bytes(text, "UTF-8").decode("unicode_escape")
+ output = getter(ctx, name)
+ assert_eq(text, output)
+
+
+#----------------------------------------------------------------------------
+# This code comes from: lib/runcmd.py
+
+import logging
+import os
+import re
+import shlex
+import subprocess
+
+
+#
+# Helper functions.
+#
+
+# Get exit code or other stored data about the latest command run by
+# runcmd_run.
+
+
+def _runcmd_get(ctx, name):
+ ns = ctx.declare("_runcmd")
+ return ns[name]
+
+
+def runcmd_get_exit_code(ctx):
+ return _runcmd_get(ctx, "exit")
+
+
+def runcmd_get_stdout(ctx):
+ return _runcmd_get(ctx, "stdout")
+
+
+def runcmd_get_stdout_raw(ctx):
+ return _runcmd_get(ctx, "stdout.raw")
+
+
+def runcmd_get_stderr(ctx):
+ return _runcmd_get(ctx, "stderr")
+
+
+def runcmd_get_stderr_raw(ctx):
+ return _runcmd_get(ctx, "stderr.raw")
+
+
+def runcmd_get_argv(ctx):
+ return _runcmd_get(ctx, "argv")
+
+
+# Run a command, given an argv and other arguments for subprocess.Popen.
+#
+# This is meant to be a helper function, not bound directly to a step. The
+# stdout, stderr, and exit code are stored in the "_runcmd" namespace in the
+# ctx context.
+def runcmd_run(ctx, argv, **kwargs):
+ log_value = globals()["log_value"]
+
+ ns = ctx.declare("_runcmd")
+
+ # The Subplot Python template empties os.environ at startup, modulo a small
+ # number of variables with carefully chosen values. Here, we don't need to
+ # care about what those variables are, but we do need to not overwrite
+ # them, so we just add anything in the env keyword argument, if any, to
+ # os.environ.
+ env = dict(os.environ)
+ for key, arg in kwargs.pop("env", {}).items():
+ env[key] = arg
+
+ pp = ns.get("path-prefix")
+ if pp:
+ env["PATH"] = pp + ":" + env["PATH"]
+
+ kwargs["stdout"] = subprocess.PIPE
+ kwargs["stderr"] = subprocess.PIPE
+
+ logging.debug(f"runcmd_run: running command")
+ log_value("argv", 1, dict(enumerate(argv)))
+ log_value("env", 1, env)
+ log_value("kwargs:", 1, kwargs)
+
+ p = subprocess.Popen(argv, env=env, **kwargs)
+ stdout, stderr = p.communicate("")
+
+ ns["argv"] = argv
+ ns["stdout.raw"] = stdout
+ ns["stderr.raw"] = stderr
+ ns["stdout"] = stdout.decode("utf-8")
+ ns["stderr"] = stderr.decode("utf-8")
+ ns["exit"] = p.returncode
+
+ log_value("ns", 1, ns.as_dict())
+
+
+# Step: prepend srcdir to PATH whenever runcmd runs a command.
+def runcmd_helper_srcdir_path(ctx):
+ srcdir = globals()["srcdir"]
+ runcmd_prepend_to_path(ctx, srcdir)
+
+
+# Step: This creates a helper script.
+def runcmd_helper_script(ctx, filename=None):
+ get_file = globals()["get_file"]
+ with open(filename, "wb") as f:
+ f.write(get_file(filename))
+
+
+#
+# Step functions for running commands.
+#
+
+
+def runcmd_prepend_to_path(ctx, dirname=None):
+ ns = ctx.declare("_runcmd")
+ pp = ns.get("path-prefix", "")
+ if pp:
+ pp = f"{pp}:{dirname}"
+ else:
+ pp = dirname
+ ns["path-prefix"] = pp
+
+
+def runcmd_step(ctx, argv0=None, args=None):
+ runcmd_try_to_run(ctx, argv0=argv0, args=args)
+ runcmd_exit_code_is_zero(ctx)
+
+
+def runcmd_step_in(ctx, dirname=None, argv0=None, args=None):
+ runcmd_try_to_run_in(ctx, dirname=dirname, argv0=argv0, args=args)
+ runcmd_exit_code_is_zero(ctx)
+
+
+def runcmd_try_to_run(ctx, argv0=None, args=None):
+ runcmd_try_to_run_in(ctx, dirname=None, argv0=argv0, args=args)
+
+
+def runcmd_try_to_run_in(ctx, dirname=None, argv0=None, args=None):
+ argv = [shlex.quote(argv0)] + shlex.split(args)
+ runcmd_run(ctx, argv, cwd=dirname)
+
+
+#
+# Step functions for examining exit codes.
+#
+
+
+def runcmd_exit_code_is_zero(ctx):
+ runcmd_exit_code_is(ctx, exit=0)
+
+
+def runcmd_exit_code_is(ctx, exit=None):
+ assert_eq = globals()["assert_eq"]
+ assert_eq(runcmd_get_exit_code(ctx), int(exit))
+
+
+def runcmd_exit_code_is_nonzero(ctx):
+ runcmd_exit_code_is_not(ctx, exit=0)
+
+
+def runcmd_exit_code_is_not(ctx, exit=None):
+ assert_ne = globals()["assert_ne"]
+ assert_ne(runcmd_get_exit_code(ctx), int(exit))
+
+
+#
+# Step functions and helpers for examining output in various ways.
+#
+
+
+def runcmd_stdout_is(ctx, text=None):
+ _runcmd_output_is(runcmd_get_stdout(ctx), text)
+
+
+def runcmd_stdout_isnt(ctx, text=None):
+ _runcmd_output_isnt(runcmd_get_stdout(ctx), text)
+
+
+def runcmd_stderr_is(ctx, text=None):
+ _runcmd_output_is(runcmd_get_stderr(ctx), text)
+
+
+def runcmd_stderr_isnt(ctx, text=None):
+ _runcmd_output_isnt(runcmd_get_stderr(ctx), text)
+
+
+def _runcmd_output_is(actual, wanted):
+ assert_eq = globals()["assert_eq"]
+ log_lines = globals()["log_lines"]
+ indent = " " * 4
+
+ wanted = bytes(wanted, "utf8").decode("unicode_escape")
+ logging.debug("_runcmd_output_is:")
+
+ logging.debug(f" actual:")
+ log_lines(indent, actual)
+
+ logging.debug(f" wanted:")
+ log_lines(indent, wanted)
+
+ assert_eq(actual, wanted)
+
+
+def _runcmd_output_isnt(actual, wanted):
+ assert_ne = globals()["assert_ne"]
+ log_lines = globals()["log_lines"]
+ indent = " " * 4
+
+ wanted = bytes(wanted, "utf8").decode("unicode_escape")
+ logging.debug("_runcmd_output_isnt:")
+
+ logging.debug(f" actual:")
+ log_lines(indent, actual)
+
+ logging.debug(f" wanted:")
+ log_lines(indent, wanted)
+
+ assert_ne(actual, wanted)
+
+
+def runcmd_stdout_contains(ctx, text=None):
+ _runcmd_output_contains(runcmd_get_stdout(ctx), text)
+
+
+def runcmd_stdout_doesnt_contain(ctx, text=None):
+ _runcmd_output_doesnt_contain(runcmd_get_stdout(ctx), text)
+
+
+def runcmd_stderr_contains(ctx, text=None):
+ _runcmd_output_contains(runcmd_get_stderr(ctx), text)
+
+
+def runcmd_stderr_doesnt_contain(ctx, text=None):
+ _runcmd_output_doesnt_contain(runcmd_get_stderr(ctx), text)
+
+
+def _runcmd_output_contains(actual, wanted):
+ assert_eq = globals()["assert_eq"]
+ log_lines = globals()["log_lines"]
+ indent = " " * 4
+
+ wanted = bytes(wanted, "utf8").decode("unicode_escape")
+ logging.debug("_runcmd_output_contains:")
+
+ logging.debug(f" actual:")
+ log_lines(indent, actual)
+
+ logging.debug(f" wanted:")
+ log_lines(indent, wanted)
+
+ assert_eq(wanted in actual, True)
+
+
+def _runcmd_output_doesnt_contain(actual, wanted):
+ assert_ne = globals()["assert_ne"]
+ log_lines = globals()["log_lines"]
+ indent = " " * 4
+
+ wanted = bytes(wanted, "utf8").decode("unicode_escape")
+ logging.debug("_runcmd_output_doesnt_contain:")
+
+ logging.debug(f" actual:")
+ log_lines(indent, actual)
+
+ logging.debug(f" wanted:")
+ log_lines(indent, wanted)
+
+ assert_ne(wanted in actual, True)
+
+
+def runcmd_stdout_matches_regex(ctx, regex=None):
+ _runcmd_output_matches_regex(runcmd_get_stdout(ctx), regex)
+
+
+def runcmd_stdout_doesnt_match_regex(ctx, regex=None):
+ _runcmd_output_doesnt_match_regex(runcmd_get_stdout(ctx), regex)
+
+
+def runcmd_stderr_matches_regex(ctx, regex=None):
+ _runcmd_output_matches_regex(runcmd_get_stderr(ctx), regex)
+
+
+def runcmd_stderr_doesnt_match_regex(ctx, regex=None):
+ _runcmd_output_doesnt_match_regex(runcmd_get_stderr(ctx), regex)
+
+
+def _runcmd_output_matches_regex(actual, regex):
+ assert_ne = globals()["assert_ne"]
+ log_lines = globals()["log_lines"]
+ indent = " " * 4
+
+ r = re.compile(regex)
+ m = r.search(actual)
+
+ logging.debug("_runcmd_output_matches_regex:")
+ logging.debug(f" actual: {actual!r}")
+ log_lines(indent, actual)
+
+ logging.debug(f" regex: {regex!r}")
+ logging.debug(f" match: {m}")
+
+ assert_ne(m, None)
+
+
+def _runcmd_output_doesnt_match_regex(actual, regex):
+ assert_eq = globals()["assert_eq"]
+ log_lines = globals()["log_lines"]
+ indent = " " * 4
+
+ r = re.compile(regex)
+ m = r.search(actual)
+
+ logging.debug("_runcmd_output_doesnt_match_regex:")
+ logging.debug(f" actual: {actual!r}")
+ log_lines(indent, actual)
+
+ logging.debug(f" regex: {regex!r}")
+ logging.debug(f" match: {m}")
+
+ assert_eq(m, None)
+
+
+
+
+#############################################################################
+# Scaffolding for generated test program.
+
+# import logging
+import re
+
+
+# Store context between steps.
+class Context:
+ def __init__(self):
+ self._vars = {}
+ self._ns = {}
+
+ def as_dict(self):
+ return dict(self._vars)
+
+ def get(self, key, default=None):
+ return self._vars.get(key, default)
+
+ def __getitem__(self, key):
+ return self._vars[key]
+
+ def __setitem__(self, key, value):
+ # logging.debug("Context: key {!r} set to {!r}".format(key, value))
+ self._vars[key] = value
+
+ def keys(self):
+ return self._vars.keys()
+
+ def __contains__(self, key):
+ return key in self._vars
+
+ def __delitem__(self, key):
+ del self._vars[key]
+
+ def __repr__(self):
+ return repr({"vars": self._vars, "namespaces": self._ns})
+
+ def declare(self, name):
+ if name not in self._ns:
+ self._ns[name] = NameSpace(name)
+ return self._ns[name]
+
+ def remember_value(self, name, value):
+ ns = self.declare("_values")
+ if name in ns:
+ raise KeyError(name)
+ ns[name] = value
+
+ def recall_value(self, name):
+ ns = self.declare("_values")
+ if name not in ns:
+ raise KeyError(name)
+ return ns[name]
+
+ def expand_values(self, pattern):
+ parts = []
+ while pattern:
+ m = re.search(r"(?<!\$)\$\{(?P<name>\S*)\}", pattern)
+ if not m:
+ parts.append(pattern)
+ break
+ name = m.group("name")
+ if not name:
+ raise KeyError("empty name in expansion")
+ value = self.recall_value(name)
+ parts.append(value)
+ pattern = pattern[m.end() :]
+ return "".join(parts)
+
+
+class NameSpace:
+ def __init__(self, name):
+ self.name = name
+ self._dict = {}
+
+ def as_dict(self):
+ return dict(self._dict)
+
+ def get(self, key, default=None):
+ if key not in self._dict:
+ if default is None:
+ return None
+ self._dict[key] = default
+ return self._dict[key]
+
+ def __setitem__(self, key, value):
+ self._dict[key] = value
+
+ def __getitem__(self, key):
+ return self._dict[key]
+
+ def keys(self):
+ return self._dict.keys()
+
+ def __contains__(self, key):
+ return key in self._dict
+
+ def __delitem__(self, key):
+ del self._dict[key]
+
+ def __repr__(self):
+ return repr(self._dict)
+
+# Decode a base64 encoded string. Result is binary or unicode string.
+
+
+import base64
+
+
+def decode_bytes(s):
+ return base64.b64decode(s)
+
+
+def decode_str(s):
+ return base64.b64decode(s).decode()
+
+# Retrieve an embedded test data file using filename.
+
+
+class Files:
+ def __init__(self):
+ self._files = {}
+
+ def set(self, filename, content):
+ self._files[filename] = content
+
+ def get(self, filename):
+ return self._files[filename]
+
+
+_files = Files()
+
+
+def store_file(filename, content):
+ _files.set(filename, content)
+
+
+def get_file(filename):
+ return _files.get(filename)
+
+# Check two values for equality and give error if they are not equal
+def assert_eq(a, b):
+ assert a == b, "expected %r == %r" % (a, b)
+
+
+# Check two values for inequality and give error if they are equal
+def assert_ne(a, b):
+ assert a != b, "expected %r != %r" % (a, b)
+
+
+# Check that two dict values are equal.
+def assert_dict_eq(a, b):
+ assert isinstance(a, dict)
+ assert isinstance(b, dict)
+ for key in a:
+ assert key in b, f"exected {key} in both dicts"
+ av = a[key]
+ bv = b[key]
+ assert_eq(type(av), type(bv))
+ if isinstance(av, list):
+ assert_eq(list(sorted(av)), list(sorted(bv)))
+ for key in b:
+ assert key in a, f"exected {key} in both dicts"
+
+import logging
+import os
+import tempfile
+
+
+#############################################################################
+# Code to implement the scenarios.
+
+
+class Step:
+ def __init__(self):
+ self._kind = None
+ self._text = None
+ self._args = {}
+ self._function = None
+ self._cleanup = None
+
+ def set_kind(self, kind):
+ self._kind = kind
+
+ def set_text(self, text):
+ self._text = text
+
+ def set_arg(self, name, value):
+ self._args[name] = value
+
+ def set_function(self, function):
+ self._function = function
+
+ def set_cleanup(self, cleanup):
+ self._cleanup = cleanup
+
+ def do(self, ctx):
+ print(" step: {} {}".format(self._kind, self._text))
+ logging.info("step: {} {}".format(self._kind, self._text))
+ self._function(ctx, **self._args)
+
+ def cleanup(self, ctx):
+ if self._cleanup:
+ print(" cleanup: {} {}".format(self._kind, self._text))
+ logging.info("cleanup: {} {}".format(self._kind, self._text))
+ self._cleanup(ctx, **self._args)
+
+
+class Scenario:
+ def __init__(self, ctx):
+ self._title = None
+ self._steps = []
+ self._ctx = ctx
+ self._logged_env = False
+
+ def get_title(self):
+ return self._title
+
+ def set_title(self, title):
+ self._title = title
+
+ def append_step(self, step):
+ self._steps.append(step)
+
+ def run(self, datadir, extra_env):
+ print("scenario: {}".format(self._title))
+ logging.info("Scenario: {}".format(self._title))
+
+ scendir = tempfile.mkdtemp(dir=datadir)
+ os.chdir(scendir)
+ self._set_environment_variables_to(scendir, extra_env)
+
+ done = []
+ ctx = self._ctx
+ try:
+ for step in self._steps:
+ step.do(ctx)
+ done.append(step)
+ except Exception as e:
+ logging.error(str(e), exc_info=True)
+ for step in reversed(done):
+ step.cleanup(ctx)
+ raise
+ for step in reversed(done):
+ step.cleanup(ctx)
+
+ def _set_environment_variables_to(self, scendir, extra_env):
+ log_value = globals()["log_value"]
+
+ minimal = {
+ "PATH": "/bin:/usr/bin",
+ "SHELL": "/bin/sh",
+ "HOME": scendir,
+ "TMPDIR": scendir,
+ }
+
+ os.environ.clear()
+ os.environ.update(minimal)
+ os.environ.update(extra_env)
+ if not self._logged_env:
+ self._logged_env = True
+ log_value("extra_env", 0, dict(extra_env))
+ log_value("os.environ", 0, dict(os.environ))
+
+import argparse
+import logging
+import os
+import random
+import shutil
+import tarfile
+import tempfile
+
+
+class MultilineFormatter(logging.Formatter):
+ def format(self, record):
+ s = super().format(record)
+ lines = list(s.splitlines())
+ return lines.pop(0) + "\n".join(" %s" % line for line in lines)
+
+
+def indent(n):
+ return " " * n
+
+
+def log_value(msg, level, v):
+ if is_multiline_string(v):
+ logging.debug(f"{indent(level)}{msg}:")
+ log_lines(indent(level + 1), v)
+ elif isinstance(v, dict) and v:
+ # Only non-empty dictionaries
+ logging.debug(f"{indent(level)}{msg}:")
+ for k in sorted(v.keys()):
+ log_value(f"{k!r}", level + 1, v[k])
+ elif isinstance(v, list) and v:
+ # Only non-empty lists
+ logging.debug(f"{indent(level)}{msg}:")
+ for i, x in enumerate(v):
+ log_value(f"{i}", level + 1, x)
+ else:
+ logging.debug(f"{indent(level)}{msg}: {v!r}")
+
+
+def is_multiline_string(v):
+ if isinstance(v, str) and "\n" in v:
+ return True
+ elif isinstance(v, bytes) and b"\n" in v:
+ return True
+ else:
+ return False
+
+
+def log_lines(prefix, v):
+ if isinstance(v, str):
+ nl = "\n"
+ else:
+ nl = b"\n"
+ if nl in v:
+ for line in v.splitlines(keepends=True):
+ logging.debug(f"{prefix}{line!r}")
+ else:
+ logging.debug(f"{prefix}{v!r}")
+
+
+# Remember where we started from. The step functions may need to refer
+# to files there.
+srcdir = os.getcwd()
+print("srcdir", srcdir)
+
+# Create a new temporary directory and chdir there. This allows step
+# functions to create new files in the current working directory
+# without having to be so careful.
+_datadir = tempfile.mkdtemp()
+print("datadir", _datadir)
+os.chdir(_datadir)
+
+
+def parse_command_line():
+ p = argparse.ArgumentParser()
+ p.add_argument("--log")
+ p.add_argument("--env", action="append", default=[])
+ p.add_argument("--save-on-failure")
+ p.add_argument("patterns", nargs="*")
+ return p.parse_args()
+
+
+def setup_logging(args):
+ if args.log:
+ fmt = "%(asctime)s %(levelname)s %(message)s"
+ datefmt = "%Y-%m-%d %H:%M:%S"
+ formatter = MultilineFormatter(fmt, datefmt)
+
+ filename = os.path.abspath(os.path.join(srcdir, args.log))
+ handler = logging.FileHandler(filename)
+ handler.setFormatter(formatter)
+ else:
+ handler = logging.NullHandler()
+
+ logger = logging.getLogger()
+ logger.addHandler(handler)
+ logger.setLevel(logging.DEBUG)
+
+
+def save_directory(dirname, tarname):
+ print("tarname", tarname)
+ logging.info("Saving {} to {}".format(dirname, tarname))
+ tar = tarfile.open(tarname, "w")
+ tar.add(dirname, arcname="datadir")
+ tar.close()
+
+
+def main(scenarios):
+ args = parse_command_line()
+ setup_logging(args)
+ logging.info("Test program starts")
+
+ logging.info("patterns: {}".format(args.patterns))
+ if len(args.patterns) == 0:
+ logging.info("Executing all scenarios")
+ todo = list(scenarios)
+ random.shuffle(todo)
+ else:
+ logging.info("Executing requested scenarios only: {}".format(args.patterns))
+ patterns = [arg.lower() for arg in args.patterns]
+ todo = [
+ scen
+ for scen in scenarios
+ if any(pattern in scen.get_title().lower() for pattern in patterns)
+ ]
+
+ extra_env = {}
+ for env in args.env:
+ (name, value) = env.split("=", 1)
+ extra_env[name] = value
+
+ try:
+ for scen in todo:
+ scen.run(_datadir, extra_env)
+ except Exception as e:
+ logging.error(str(e), exc_info=True)
+ if args.save_on_failure:
+ print(args.save_on_failure)
+ filename = os.path.abspath(os.path.join(srcdir, args.save_on_failure))
+ print(filename)
+ save_directory(_datadir, filename)
+ raise
+
+ shutil.rmtree(_datadir)
+ print("OK, all scenarios finished successfully")
+ logging.info("OK, all scenarios finished successfully")
+
+
+
+#############################################################################
+# Test data files that were embedded in the source document. Base64
+# encoding is used to allow arbitrary data.
+
+
+
+
+
+#############################################################################
+# Classes for individual scenarios.
+
+
+#----------------------------------------------------------------------------
+# Scenario: Find max of a list of one
+class Scenario_1():
+ def __init__(self):
+ ctx = Context()
+ self._scenario = Scenario(ctx)
+ self._scenario.set_title(decode_str('RmluZCBtYXggb2YgYSBsaXN0IG9mIG9uZQ=='))
+
+ # Step: server
+ step = Step()
+ step.set_kind('given')
+ step.set_text(decode_str('c2VydmVy'))
+ step.set_function(start_server)
+ if 'stop_server':
+ step.set_cleanup(stop_server)
+ self._scenario.append_step(step)
+
+ # Step: I run max-client.py 1
+ step = Step()
+ step.set_kind('when')
+ step.set_text(decode_str('SSBydW4gbWF4LWNsaWVudC5weSAx'))
+ step.set_function(runcmd_step)
+ if '':
+ step.set_cleanup()
+ self._scenario.append_step(step)
+ name = decode_str('YXJndjA=')
+ text = decode_str('bWF4LWNsaWVudC5weQ==')
+ step.set_arg(name, text)
+ name = decode_str('YXJncw==')
+ text = decode_str('IDE=')
+ step.set_arg(name, text)
+
+ # Step: answer is 1
+ step = Step()
+ step.set_kind('then')
+ step.set_text(decode_str('YW5zd2VyIGlzIDE='))
+ step.set_function(answer_is)
+ if '':
+ step.set_cleanup()
+ self._scenario.append_step(step)
+ name = decode_str('aW5kZXg=')
+ text = decode_str('MQ==')
+ step.set_arg(name, text)
+
+
+ def get_title(self):
+ return self._scenario.get_title()
+
+ def run(self, datadir, extra_env):
+ self._scenario.run(datadir, extra_env)
+
+#----------------------------------------------------------------------------
+# Scenario: Find max of a list of two
+class Scenario_2():
+ def __init__(self):
+ ctx = Context()
+ self._scenario = Scenario(ctx)
+ self._scenario.set_title(decode_str('RmluZCBtYXggb2YgYSBsaXN0IG9mIHR3bw=='))
+
+ # Step: server
+ step = Step()
+ step.set_kind('given')
+ step.set_text(decode_str('c2VydmVy'))
+ step.set_function(start_server)
+ if 'stop_server':
+ step.set_cleanup(stop_server)
+ self._scenario.append_step(step)
+
+ # Step: I run max-client.py 5 5
+ step = Step()
+ step.set_kind('when')
+ step.set_text(decode_str('SSBydW4gbWF4LWNsaWVudC5weSA1IDU='))
+ step.set_function(runcmd_step)
+ if '':
+ step.set_cleanup()
+ self._scenario.append_step(step)
+ name = decode_str('YXJndjA=')
+ text = decode_str('bWF4LWNsaWVudC5weQ==')
+ step.set_arg(name, text)
+ name = decode_str('YXJncw==')
+ text = decode_str('IDUgNQ==')
+ step.set_arg(name, text)
+
+ # Step: answer is 5
+ step = Step()
+ step.set_kind('then')
+ step.set_text(decode_str('YW5zd2VyIGlzIDU='))
+ step.set_function(answer_is)
+ if '':
+ step.set_cleanup()
+ self._scenario.append_step(step)
+ name = decode_str('aW5kZXg=')
+ text = decode_str('NQ==')
+ step.set_arg(name, text)
+
+ # Step: I run max-client.py 5 6
+ step = Step()
+ step.set_kind('when')
+ step.set_text(decode_str('SSBydW4gbWF4LWNsaWVudC5weSA1IDY='))
+ step.set_function(runcmd_step)
+ if '':
+ step.set_cleanup()
+ self._scenario.append_step(step)
+ name = decode_str('YXJndjA=')
+ text = decode_str('bWF4LWNsaWVudC5weQ==')
+ step.set_arg(name, text)
+ name = decode_str('YXJncw==')
+ text = decode_str('IDUgNg==')
+ step.set_arg(name, text)
+
+ # Step: answer is 6
+ step = Step()
+ step.set_kind('then')
+ step.set_text(decode_str('YW5zd2VyIGlzIDY='))
+ step.set_function(answer_is)
+ if '':
+ step.set_cleanup()
+ self._scenario.append_step(step)
+ name = decode_str('aW5kZXg=')
+ text = decode_str('Ng==')
+ step.set_arg(name, text)
+
+ # Step: I run max-client.py 6 5
+ step = Step()
+ step.set_kind('when')
+ step.set_text(decode_str('SSBydW4gbWF4LWNsaWVudC5weSA2IDU='))
+ step.set_function(runcmd_step)
+ if '':
+ step.set_cleanup()
+ self._scenario.append_step(step)
+ name = decode_str('YXJndjA=')
+ text = decode_str('bWF4LWNsaWVudC5weQ==')
+ step.set_arg(name, text)
+ name = decode_str('YXJncw==')
+ text = decode_str('IDYgNQ==')
+ step.set_arg(name, text)
+
+ # Step: answer is 6
+ step = Step()
+ step.set_kind('then')
+ step.set_text(decode_str('YW5zd2VyIGlzIDY='))
+ step.set_function(answer_is)
+ if '':
+ step.set_cleanup()
+ self._scenario.append_step(step)
+ name = decode_str('aW5kZXg=')
+ text = decode_str('Ng==')
+ step.set_arg(name, text)
+
+
+ def get_title(self):
+ return self._scenario.get_title()
+
+ def run(self, datadir, extra_env):
+ self._scenario.run(datadir, extra_env)
+
+#----------------------------------------------------------------------------
+# Scenario: Find max of a list of three
+class Scenario_3():
+ def __init__(self):
+ ctx = Context()
+ self._scenario = Scenario(ctx)
+ self._scenario.set_title(decode_str('RmluZCBtYXggb2YgYSBsaXN0IG9mIHRocmVl'))
+
+ # Step: server
+ step = Step()
+ step.set_kind('given')
+ step.set_text(decode_str('c2VydmVy'))
+ step.set_function(start_server)
+ if 'stop_server':
+ step.set_cleanup(stop_server)
+ self._scenario.append_step(step)
+
+ # Step: I run max-client.py 5 5 5
+ step = Step()
+ step.set_kind('when')
+ step.set_text(decode_str('SSBydW4gbWF4LWNsaWVudC5weSA1IDUgNQ=='))
+ step.set_function(runcmd_step)
+ if '':
+ step.set_cleanup()
+ self._scenario.append_step(step)
+ name = decode_str('YXJndjA=')
+ text = decode_str('bWF4LWNsaWVudC5weQ==')
+ step.set_arg(name, text)
+ name = decode_str('YXJncw==')
+ text = decode_str('IDUgNSA1')
+ step.set_arg(name, text)
+
+ # Step: answer is 5
+ step = Step()
+ step.set_kind('then')
+ step.set_text(decode_str('YW5zd2VyIGlzIDU='))
+ step.set_function(answer_is)
+ if '':
+ step.set_cleanup()
+ self._scenario.append_step(step)
+ name = decode_str('aW5kZXg=')
+ text = decode_str('NQ==')
+ step.set_arg(name, text)
+
+ # Step: I run max-client.py 5 5 6
+ step = Step()
+ step.set_kind('when')
+ step.set_text(decode_str('SSBydW4gbWF4LWNsaWVudC5weSA1IDUgNg=='))
+ step.set_function(runcmd_step)
+ if '':
+ step.set_cleanup()
+ self._scenario.append_step(step)
+ name = decode_str('YXJndjA=')
+ text = decode_str('bWF4LWNsaWVudC5weQ==')
+ step.set_arg(name, text)
+ name = decode_str('YXJncw==')
+ text = decode_str('IDUgNSA2')
+ step.set_arg(name, text)
+
+ # Step: answer is 6
+ step = Step()
+ step.set_kind('then')
+ step.set_text(decode_str('YW5zd2VyIGlzIDY='))
+ step.set_function(answer_is)
+ if '':
+ step.set_cleanup()
+ self._scenario.append_step(step)
+ name = decode_str('aW5kZXg=')
+ text = decode_str('Ng==')
+ step.set_arg(name, text)
+
+ # Step: I run max-client.py 5 6 5
+ step = Step()
+ step.set_kind('when')
+ step.set_text(decode_str('SSBydW4gbWF4LWNsaWVudC5weSA1IDYgNQ=='))
+ step.set_function(runcmd_step)
+ if '':
+ step.set_cleanup()
+ self._scenario.append_step(step)
+ name = decode_str('YXJndjA=')
+ text = decode_str('bWF4LWNsaWVudC5weQ==')
+ step.set_arg(name, text)
+ name = decode_str('YXJncw==')
+ text = decode_str('IDUgNiA1')
+ step.set_arg(name, text)
+
+ # Step: answer is 6
+ step = Step()
+ step.set_kind('then')
+ step.set_text(decode_str('YW5zd2VyIGlzIDY='))
+ step.set_function(answer_is)
+ if '':
+ step.set_cleanup()
+ self._scenario.append_step(step)
+ name = decode_str('aW5kZXg=')
+ text = decode_str('Ng==')
+ step.set_arg(name, text)
+
+ # Step: I run max-client.py 6 5 5
+ step = Step()
+ step.set_kind('when')
+ step.set_text(decode_str('SSBydW4gbWF4LWNsaWVudC5weSA2IDUgNQ=='))
+ step.set_function(runcmd_step)
+ if '':
+ step.set_cleanup()
+ self._scenario.append_step(step)
+ name = decode_str('YXJndjA=')
+ text = decode_str('bWF4LWNsaWVudC5weQ==')
+ step.set_arg(name, text)
+ name = decode_str('YXJncw==')
+ text = decode_str('IDYgNSA1')
+ step.set_arg(name, text)
+
+ # Step: answer is 6
+ step = Step()
+ step.set_kind('then')
+ step.set_text(decode_str('YW5zd2VyIGlzIDY='))
+ step.set_function(answer_is)
+ if '':
+ step.set_cleanup()
+ self._scenario.append_step(step)
+ name = decode_str('aW5kZXg=')
+ text = decode_str('Ng==')
+ step.set_arg(name, text)
+
+ # Step: I run max-client.py 5 6 7
+ step = Step()
+ step.set_kind('when')
+ step.set_text(decode_str('SSBydW4gbWF4LWNsaWVudC5weSA1IDYgNw=='))
+ step.set_function(runcmd_step)
+ if '':
+ step.set_cleanup()
+ self._scenario.append_step(step)
+ name = decode_str('YXJndjA=')
+ text = decode_str('bWF4LWNsaWVudC5weQ==')
+ step.set_arg(name, text)
+ name = decode_str('YXJncw==')
+ text = decode_str('IDUgNiA3')
+ step.set_arg(name, text)
+
+ # Step: answer is 7
+ step = Step()
+ step.set_kind('then')
+ step.set_text(decode_str('YW5zd2VyIGlzIDc='))
+ step.set_function(answer_is)
+ if '':
+ step.set_cleanup()
+ self._scenario.append_step(step)
+ name = decode_str('aW5kZXg=')
+ text = decode_str('Nw==')
+ step.set_arg(name, text)
+
+ # Step: I run max-client.py 5 7 6
+ step = Step()
+ step.set_kind('when')
+ step.set_text(decode_str('SSBydW4gbWF4LWNsaWVudC5weSA1IDcgNg=='))
+ step.set_function(runcmd_step)
+ if '':
+ step.set_cleanup()
+ self._scenario.append_step(step)
+ name = decode_str('YXJndjA=')
+ text = decode_str('bWF4LWNsaWVudC5weQ==')
+ step.set_arg(name, text)
+ name = decode_str('YXJncw==')
+ text = decode_str('IDUgNyA2')
+ step.set_arg(name, text)
+
+ # Step: answer is 7
+ step = Step()
+ step.set_kind('then')
+ step.set_text(decode_str('YW5zd2VyIGlzIDc='))
+ step.set_function(answer_is)
+ if '':
+ step.set_cleanup()
+ self._scenario.append_step(step)
+ name = decode_str('aW5kZXg=')
+ text = decode_str('Nw==')
+ step.set_arg(name, text)
+
+ # Step: I run max-client.py 6 5 7
+ step = Step()
+ step.set_kind('when')
+ step.set_text(decode_str('SSBydW4gbWF4LWNsaWVudC5weSA2IDUgNw=='))
+ step.set_function(runcmd_step)
+ if '':
+ step.set_cleanup()
+ self._scenario.append_step(step)
+ name = decode_str('YXJndjA=')
+ text = decode_str('bWF4LWNsaWVudC5weQ==')
+ step.set_arg(name, text)
+ name = decode_str('YXJncw==')
+ text = decode_str('IDYgNSA3')
+ step.set_arg(name, text)
+
+ # Step: answer is 7
+ step = Step()
+ step.set_kind('then')
+ step.set_text(decode_str('YW5zd2VyIGlzIDc='))
+ step.set_function(answer_is)
+ if '':
+ step.set_cleanup()
+ self._scenario.append_step(step)
+ name = decode_str('aW5kZXg=')
+ text = decode_str('Nw==')
+ step.set_arg(name, text)
+
+ # Step: I run max-client.py 6 7 5
+ step = Step()
+ step.set_kind('when')
+ step.set_text(decode_str('SSBydW4gbWF4LWNsaWVudC5weSA2IDcgNQ=='))
+ step.set_function(runcmd_step)
+ if '':
+ step.set_cleanup()
+ self._scenario.append_step(step)
+ name = decode_str('YXJndjA=')
+ text = decode_str('bWF4LWNsaWVudC5weQ==')
+ step.set_arg(name, text)
+ name = decode_str('YXJncw==')
+ text = decode_str('IDYgNyA1')
+ step.set_arg(name, text)
+
+ # Step: answer is 7
+ step = Step()
+ step.set_kind('then')
+ step.set_text(decode_str('YW5zd2VyIGlzIDc='))
+ step.set_function(answer_is)
+ if '':
+ step.set_cleanup()
+ self._scenario.append_step(step)
+ name = decode_str('aW5kZXg=')
+ text = decode_str('Nw==')
+ step.set_arg(name, text)
+
+ # Step: I run max-client.py 7 5 6
+ step = Step()
+ step.set_kind('when')
+ step.set_text(decode_str('SSBydW4gbWF4LWNsaWVudC5weSA3IDUgNg=='))
+ step.set_function(runcmd_step)
+ if '':
+ step.set_cleanup()
+ self._scenario.append_step(step)
+ name = decode_str('YXJndjA=')
+ text = decode_str('bWF4LWNsaWVudC5weQ==')
+ step.set_arg(name, text)
+ name = decode_str('YXJncw==')
+ text = decode_str('IDcgNSA2')
+ step.set_arg(name, text)
+
+ # Step: answer is 7
+ step = Step()
+ step.set_kind('then')
+ step.set_text(decode_str('YW5zd2VyIGlzIDc='))
+ step.set_function(answer_is)
+ if '':
+ step.set_cleanup()
+ self._scenario.append_step(step)
+ name = decode_str('aW5kZXg=')
+ text = decode_str('Nw==')
+ step.set_arg(name, text)
+
+ # Step: I run max-client.py 7 6 5
+ step = Step()
+ step.set_kind('when')
+ step.set_text(decode_str('SSBydW4gbWF4LWNsaWVudC5weSA3IDYgNQ=='))
+ step.set_function(runcmd_step)
+ if '':
+ step.set_cleanup()
+ self._scenario.append_step(step)
+ name = decode_str('YXJndjA=')
+ text = decode_str('bWF4LWNsaWVudC5weQ==')
+ step.set_arg(name, text)
+ name = decode_str('YXJncw==')
+ text = decode_str('IDcgNiA1')
+ step.set_arg(name, text)
+
+ # Step: answer is 7
+ step = Step()
+ step.set_kind('then')
+ step.set_text(decode_str('YW5zd2VyIGlzIDc='))
+ step.set_function(answer_is)
+ if '':
+ step.set_cleanup()
+ self._scenario.append_step(step)
+ name = decode_str('aW5kZXg=')
+ text = decode_str('Nw==')
+ step.set_arg(name, text)
+
+
+ def get_title(self):
+ return self._scenario.get_title()
+
+ def run(self, datadir, extra_env):
+ self._scenario.run(datadir, extra_env)
+
+
+_scenarios = {
+ Scenario_1(),
+ Scenario_2(),
+ Scenario_3(),
+}
+
+
+#############################################################################
+# Call main function and clean up.
+main(_scenarios)