summaryrefslogtreecommitdiff
path: root/subplot
diff options
context:
space:
mode:
authorLars Wirzenius <liw@liw.fi>2020-10-04 14:52:18 +0300
committerLars Wirzenius <liw@liw.fi>2020-10-04 14:52:18 +0300
commit328b6d6e6a97407a5dba1cf0a38fcda565330cd4 (patch)
treeac0645c791e5c20e66aaa05932ff06d700d50a05 /subplot
parent864e20652458c1a4ae09c882ad3e29d6b0988b06 (diff)
downloadobnam2-328b6d6e6a97407a5dba1cf0a38fcda565330cd4.tar.gz
chore: update lib/runcmd.py from Subplot, make required changes
Diffstat (limited to 'subplot')
-rw-r--r--subplot/daemon.py13
-rw-r--r--subplot/obnam.py10
-rw-r--r--subplot/runcmd.py294
3 files changed, 233 insertions, 84 deletions
diff --git a/subplot/daemon.py b/subplot/daemon.py
index e223505..5fd959a 100644
--- a/subplot/daemon.py
+++ b/subplot/daemon.py
@@ -10,8 +10,8 @@ import time
# Start a process in the background.
def start_daemon(ctx, name, argv):
- runcmd = globals()["runcmd"]
- exit_code_is = globals()["exit_code_is"]
+ runcmd_run = globals()["runcmd_run"]
+ runcmd_exit_code_is = globals()["runcmd_exit_code_is"]
logging.debug(f"Starting daemon {name}")
logging.debug(f" ctx={ctx.as_dict()}")
@@ -26,7 +26,7 @@ def start_daemon(ctx, name, argv):
"stderr": f"{name}.stderr",
"stdout": f"{name}.stdout",
}
- runcmd(
+ runcmd_run(
ctx,
[
"/usr/sbin/daemonize",
@@ -44,10 +44,11 @@ def start_daemon(ctx, name, argv):
# Wait for a bit for daemon to start and maybe find a problem and die.
time.sleep(3)
- if ctx["exit"] != 0:
- logging.error(f"obnam-server stderr: {ctx['stderr']}")
+ ns = ctx.declare("_runcmd")
+ if ns["exit"] != 0:
+ logging.error(f"obnam-server stderr: {ns['stderr']}")
- exit_code_is(ctx, 0)
+ runcmd_exit_code_is(ctx, 0)
this["pid"] = int(open(this["pid-file"]).read().strip())
assert process_exists(this["pid"])
diff --git a/subplot/obnam.py b/subplot/obnam.py
index 5947f1f..79a4649 100644
--- a/subplot/obnam.py
+++ b/subplot/obnam.py
@@ -130,9 +130,9 @@ def json_body_matches(ctx, wanted=None):
def back_up_directory(ctx, dirname=None):
- runcmd = globals()["runcmd"]
+ runcmd_run = globals()["runcmd_run"]
- runcmd(ctx, ["pgrep", "-laf", "obnam"])
+ runcmd_run(ctx, ["pgrep", "-laf", "obnam"])
config = {"server_name": "localhost", "server_port": ctx["config"]["port"]}
config = yaml.safe_dump(config)
@@ -147,12 +147,12 @@ def back_up_directory(ctx, dirname=None):
t.close()
with open(tarball, "rb") as f:
- runcmd(ctx, [_binary("obnam-backup"), filename], stdin=f)
+ runcmd_run(ctx, [_binary("obnam-backup"), filename], stdin=f)
def command_is_successful(ctx):
- exit_code_zero = globals()["exit_code_zero"]
- exit_code_zero(ctx)
+ runcmd_exit_code_is_zero = globals()["runcmd_exit_code_is_zero"]
+ runcmd_exit_code_is_zero(ctx)
# Name of Rust binary, debug-build.
diff --git a/subplot/runcmd.py b/subplot/runcmd.py
index 7193c15..6ea5e47 100644
--- a/subplot/runcmd.py
+++ b/subplot/runcmd.py
@@ -1,77 +1,225 @@
-# Some step implementations for running commands and capturing the result.
-
+import logging
+import os
+import re
+import shlex
import subprocess
-# Run a command, capture its stdout, stderr, and exit code in context.
-def runcmd(ctx, argv, **kwargs):
- p = subprocess.Popen(argv, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs)
+#
+# Helper functions.
+#
+
+
+# Run a command, given an argv and other arguments for subprocess.Popen.
+#
+# This is meant to be a helper function, not bound directly to a step. The
+# stdout, stderr, and exit code are stored in the "_runcmd" namespace in the
+# ctx context.
+def runcmd_run(ctx, argv, **kwargs):
+ ns = ctx.declare("_runcmd")
+ env = dict(os.environ)
+ pp = ns.get("path-prefix")
+ if pp:
+ env["PATH"] = pp + ":" + env["PATH"]
+
+ logging.debug(f"runcmd_run")
+ logging.debug(f" argv: {argv}")
+ logging.debug(f" env: {env}")
+ p = subprocess.Popen(
+ argv, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env, **kwargs
+ )
stdout, stderr = p.communicate("")
- ctx["argv"] = argv
- ctx["stdout"] = stdout.decode("utf-8")
- ctx["stderr"] = stderr.decode("utf-8")
- ctx["exit"] = p.returncode
-
-
-# Check that latest exit code captured by runcmd was a specific one.
-def exit_code_is(ctx, wanted):
- if ctx.get("exit") != wanted:
- print("context:", ctx.as_dict())
- assert_eq(ctx.get("exit"), wanted)
-
-
-# Check that latest exit code captured by runcmd was not a specific one.
-def exit_code_is_not(ctx, unwanted):
- if ctx.get("exit") == unwanted:
- print("context:", ctx.as_dict())
- assert_ne(ctx.get("exit"), unwanted)
-
-
-# Check that latest exit code captured by runcmd was zero.
-def exit_code_zero(ctx):
- exit_code_is(ctx, 0)
-
-
-# Check that latest exit code captured by runcmd was not zero.
-def exit_code_nonzero(ctx):
- exit_code_is_not(ctx, 0)
-
-
-# Check that stdout of latest runcmd contains a specific string.
-def stdout_contains(ctx, pattern=None):
- stdout = ctx.get("stdout", "")
- if pattern not in stdout:
- print("pattern:", repr(pattern))
- print("stdout:", repr(stdout))
- print("ctx:", ctx.as_dict())
- assert_eq(pattern in stdout, True)
-
-
-# Check that stdout of latest runcmd does not contain a specific string.
-def stdout_does_not_contain(ctx, pattern=None):
- stdout = ctx.get("stdout", "")
- if pattern in stdout:
- print("pattern:", repr(pattern))
- print("stdout:", repr(stdout))
- print("ctx:", ctx.as_dict())
- assert_eq(pattern not in stdout, True)
-
-
-# Check that stderr of latest runcmd does contains a specific string.
-def stderr_contains(ctx, pattern=None):
- stderr = ctx.get("stderr", "")
- if pattern not in stderr:
- print("pattern:", repr(pattern))
- print("stderr:", repr(stderr))
- print("ctx:", ctx.as_dict())
- assert_eq(pattern in stderr, True)
-
-
-# Check that stderr of latest runcmd does not contain a specific string.
-def stderr_does_not_contain(ctx, pattern=None):
- stderr = ctx.get("stderr", "")
- if pattern not in stderr:
- print("pattern:", repr(pattern))
- print("stderr:", repr(stderr))
- print("ctx:", ctx.as_dict())
- assert_eq(pattern not in stderr, True)
+ ns["argv"] = argv
+ ns["stdout.raw"] = stdout
+ ns["stderr.raw"] = stderr
+ ns["stdout"] = stdout.decode("utf-8")
+ ns["stderr"] = stderr.decode("utf-8")
+ ns["exit"] = p.returncode
+ logging.debug(f" ctx: {ctx}")
+ logging.debug(f" ns: {ns}")
+
+
+# Step: prepend srcdir to PATH whenever runcmd runs a command.
+def runcmd_helper_srcdir_path(ctx):
+ srcdir = globals()["srcdir"]
+ runcmd_prepend_to_path(ctx, srcdir)
+
+
+# Step: This creates a helper script.
+def runcmd_helper_script(ctx, filename=None):
+ get_file = globals()["get_file"]
+ with open(filename, "wb") as f:
+ f.write(get_file(filename))
+
+
+#
+# Step functions for running commands.
+#
+
+
+def runcmd_prepend_to_path(ctx, dirname=None):
+ ns = ctx.declare("_runcmd")
+ pp = ns.get("path-prefix", "")
+ if pp:
+ pp = f"{pp}:{dirname}"
+ else:
+ pp = dirname
+ ns["path-prefix"] = pp
+
+
+def runcmd_step(ctx, argv0=None, args=None):
+ runcmd_try_to_run(ctx, argv0=argv0, args=args)
+ runcmd_exit_code_is_zero(ctx)
+
+
+def runcmd_try_to_run(ctx, argv0=None, args=None):
+ argv = [shlex.quote(argv0)] + shlex.split(args)
+ runcmd_run(ctx, argv)
+
+
+#
+# Step functions for examining exit codes.
+#
+
+
+def runcmd_exit_code_is_zero(ctx):
+ runcmd_exit_code_is(ctx, exit=0)
+
+
+def runcmd_exit_code_is(ctx, exit=None):
+ assert_eq = globals()["assert_eq"]
+ ns = ctx.declare("_runcmd")
+ assert_eq(ns["exit"], int(exit))
+
+
+def runcmd_exit_code_is_nonzero(ctx):
+ runcmd_exit_code_is_not(ctx, exit=0)
+
+
+def runcmd_exit_code_is_not(ctx, exit=None):
+ assert_ne = globals()["assert_ne"]
+ ns = ctx.declare("_runcmd")
+ assert_ne(ns["exit"], int(exit))
+
+
+#
+# Step functions and helpers for examining output in various ways.
+#
+
+
+def runcmd_stdout_is(ctx, text=None):
+ ns = ctx.declare("_runcmd")
+ _runcmd_output_is(ns["stdout"], text)
+
+
+def runcmd_stdout_isnt(ctx, text=None):
+ ns = ctx.declare("_runcmd")
+ _runcmd_output_isnt(ns["stdout"], text)
+
+
+def runcmd_stderr_is(ctx, text=None):
+ ns = ctx.declare("_runcmd")
+ _runcmd_output_is(ns["stderr"], text)
+
+
+def runcmd_stderr_isnt(ctx, text=None):
+ ns = ctx.declare("_runcmd")
+ _runcmd_output_isnt(ns["stderr"], text)
+
+
+def _runcmd_output_is(actual, wanted):
+ assert_eq = globals()["assert_eq"]
+ wanted = bytes(wanted, "utf8").decode("unicode_escape")
+ logging.debug("_runcmd_output_is:")
+ logging.debug(f" actual: {actual!r}")
+ logging.debug(f" wanted: {wanted!r}")
+ assert_eq(actual, wanted)
+
+
+def _runcmd_output_isnt(actual, wanted):
+ assert_ne = globals()["assert_ne"]
+ wanted = bytes(wanted, "utf8").decode("unicode_escape")
+ logging.debug("_runcmd_output_isnt:")
+ logging.debug(f" actual: {actual!r}")
+ logging.debug(f" wanted: {wanted!r}")
+ assert_ne(actual, wanted)
+
+
+def runcmd_stdout_contains(ctx, text=None):
+ ns = ctx.declare("_runcmd")
+ _runcmd_output_contains(ns["stdout"], text)
+
+
+def runcmd_stdout_doesnt_contain(ctx, text=None):
+ ns = ctx.declare("_runcmd")
+ _runcmd_output_doesnt_contain(ns["stdout"], text)
+
+
+def runcmd_stderr_contains(ctx, text=None):
+ ns = ctx.declare("_runcmd")
+ _runcmd_output_contains(ns["stderr"], text)
+
+
+def runcmd_stderr_doesnt_contain(ctx, text=None):
+ ns = ctx.declare("_runcmd")
+ _runcmd_output_doesnt_contain(ns["stderr"], text)
+
+
+def _runcmd_output_contains(actual, wanted):
+ assert_eq = globals()["assert_eq"]
+ wanted = bytes(wanted, "utf8").decode("unicode_escape")
+ logging.debug("_runcmd_output_contains:")
+ logging.debug(f" actual: {actual!r}")
+ logging.debug(f" wanted: {wanted!r}")
+ assert_eq(wanted in actual, True)
+
+
+def _runcmd_output_doesnt_contain(actual, wanted):
+ assert_ne = globals()["assert_ne"]
+ wanted = bytes(wanted, "utf8").decode("unicode_escape")
+ logging.debug("_runcmd_output_doesnt_contain:")
+ logging.debug(f" actual: {actual!r}")
+ logging.debug(f" wanted: {wanted!r}")
+ assert_ne(wanted in actual, True)
+
+
+def runcmd_stdout_matches_regex(ctx, regex=None):
+ ns = ctx.declare("_runcmd")
+ _runcmd_output_matches_regex(ns["stdout"], regex)
+
+
+def runcmd_stdout_doesnt_match_regex(ctx, regex=None):
+ ns = ctx.declare("_runcmd")
+ _runcmd_output_doesnt_match_regex(ns["stdout"], regex)
+
+
+def runcmd_stderr_matches_regex(ctx, regex=None):
+ ns = ctx.declare("_runcmd")
+ _runcmd_output_matches_regex(ns["stderr"], regex)
+
+
+def runcmd_stderr_doesnt_match_regex(ctx, regex=None):
+ ns = ctx.declare("_runcmd")
+ _runcmd_output_doesnt_match_regex(ns["stderr"], regex)
+
+
+def _runcmd_output_matches_regex(actual, regex):
+ assert_ne = globals()["assert_ne"]
+ r = re.compile(regex)
+ m = r.search(actual)
+ logging.debug("_runcmd_output_matches_regex:")
+ logging.debug(f" actual: {actual!r}")
+ logging.debug(f" regex: {regex!r}")
+ logging.debug(f" match: {m}")
+ assert_ne(m, None)
+
+
+def _runcmd_output_doesnt_match_regex(actual, regex):
+ assert_eq = globals()["assert_eq"]
+ r = re.compile(regex)
+ m = r.search(actual)
+ logging.debug("_runcmd_output_doesnt_match_regex:")
+ logging.debug(f" actual: {actual!r}")
+ logging.debug(f" regex: {regex!r}")
+ logging.debug(f" match: {m}")
+ assert_eq(m, None)