From 328b6d6e6a97407a5dba1cf0a38fcda565330cd4 Mon Sep 17 00:00:00 2001 From: Lars Wirzenius Date: Sun, 4 Oct 2020 14:52:18 +0300 Subject: chore: update lib/runcmd.py from Subplot, make required changes --- subplot/daemon.py | 13 +-- subplot/obnam.py | 10 +- subplot/runcmd.py | 294 ++++++++++++++++++++++++++++++++++++++++-------------- 3 files changed, 233 insertions(+), 84 deletions(-) (limited to 'subplot') diff --git a/subplot/daemon.py b/subplot/daemon.py index e223505..5fd959a 100644 --- a/subplot/daemon.py +++ b/subplot/daemon.py @@ -10,8 +10,8 @@ import time # Start a process in the background. def start_daemon(ctx, name, argv): - runcmd = globals()["runcmd"] - exit_code_is = globals()["exit_code_is"] + runcmd_run = globals()["runcmd_run"] + runcmd_exit_code_is = globals()["runcmd_exit_code_is"] logging.debug(f"Starting daemon {name}") logging.debug(f" ctx={ctx.as_dict()}") @@ -26,7 +26,7 @@ def start_daemon(ctx, name, argv): "stderr": f"{name}.stderr", "stdout": f"{name}.stdout", } - runcmd( + runcmd_run( ctx, [ "/usr/sbin/daemonize", @@ -44,10 +44,11 @@ def start_daemon(ctx, name, argv): # Wait for a bit for daemon to start and maybe find a problem and die. time.sleep(3) - if ctx["exit"] != 0: - logging.error(f"obnam-server stderr: {ctx['stderr']}") + ns = ctx.declare("_runcmd") + if ns["exit"] != 0: + logging.error(f"obnam-server stderr: {ns['stderr']}") - exit_code_is(ctx, 0) + runcmd_exit_code_is(ctx, 0) this["pid"] = int(open(this["pid-file"]).read().strip()) assert process_exists(this["pid"]) diff --git a/subplot/obnam.py b/subplot/obnam.py index 5947f1f..79a4649 100644 --- a/subplot/obnam.py +++ b/subplot/obnam.py @@ -130,9 +130,9 @@ def json_body_matches(ctx, wanted=None): def back_up_directory(ctx, dirname=None): - runcmd = globals()["runcmd"] + runcmd_run = globals()["runcmd_run"] - runcmd(ctx, ["pgrep", "-laf", "obnam"]) + runcmd_run(ctx, ["pgrep", "-laf", "obnam"]) config = {"server_name": "localhost", "server_port": ctx["config"]["port"]} config = yaml.safe_dump(config) @@ -147,12 +147,12 @@ def back_up_directory(ctx, dirname=None): t.close() with open(tarball, "rb") as f: - runcmd(ctx, [_binary("obnam-backup"), filename], stdin=f) + runcmd_run(ctx, [_binary("obnam-backup"), filename], stdin=f) def command_is_successful(ctx): - exit_code_zero = globals()["exit_code_zero"] - exit_code_zero(ctx) + runcmd_exit_code_is_zero = globals()["runcmd_exit_code_is_zero"] + runcmd_exit_code_is_zero(ctx) # Name of Rust binary, debug-build. diff --git a/subplot/runcmd.py b/subplot/runcmd.py index 7193c15..6ea5e47 100644 --- a/subplot/runcmd.py +++ b/subplot/runcmd.py @@ -1,77 +1,225 @@ -# Some step implementations for running commands and capturing the result. - +import logging +import os +import re +import shlex import subprocess -# Run a command, capture its stdout, stderr, and exit code in context. -def runcmd(ctx, argv, **kwargs): - p = subprocess.Popen(argv, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs) +# +# Helper functions. +# + + +# Run a command, given an argv and other arguments for subprocess.Popen. +# +# This is meant to be a helper function, not bound directly to a step. The +# stdout, stderr, and exit code are stored in the "_runcmd" namespace in the +# ctx context. +def runcmd_run(ctx, argv, **kwargs): + ns = ctx.declare("_runcmd") + env = dict(os.environ) + pp = ns.get("path-prefix") + if pp: + env["PATH"] = pp + ":" + env["PATH"] + + logging.debug(f"runcmd_run") + logging.debug(f" argv: {argv}") + logging.debug(f" env: {env}") + p = subprocess.Popen( + argv, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env, **kwargs + ) stdout, stderr = p.communicate("") - ctx["argv"] = argv - ctx["stdout"] = stdout.decode("utf-8") - ctx["stderr"] = stderr.decode("utf-8") - ctx["exit"] = p.returncode - - -# Check that latest exit code captured by runcmd was a specific one. -def exit_code_is(ctx, wanted): - if ctx.get("exit") != wanted: - print("context:", ctx.as_dict()) - assert_eq(ctx.get("exit"), wanted) - - -# Check that latest exit code captured by runcmd was not a specific one. -def exit_code_is_not(ctx, unwanted): - if ctx.get("exit") == unwanted: - print("context:", ctx.as_dict()) - assert_ne(ctx.get("exit"), unwanted) - - -# Check that latest exit code captured by runcmd was zero. -def exit_code_zero(ctx): - exit_code_is(ctx, 0) - - -# Check that latest exit code captured by runcmd was not zero. -def exit_code_nonzero(ctx): - exit_code_is_not(ctx, 0) - - -# Check that stdout of latest runcmd contains a specific string. -def stdout_contains(ctx, pattern=None): - stdout = ctx.get("stdout", "") - if pattern not in stdout: - print("pattern:", repr(pattern)) - print("stdout:", repr(stdout)) - print("ctx:", ctx.as_dict()) - assert_eq(pattern in stdout, True) - - -# Check that stdout of latest runcmd does not contain a specific string. -def stdout_does_not_contain(ctx, pattern=None): - stdout = ctx.get("stdout", "") - if pattern in stdout: - print("pattern:", repr(pattern)) - print("stdout:", repr(stdout)) - print("ctx:", ctx.as_dict()) - assert_eq(pattern not in stdout, True) - - -# Check that stderr of latest runcmd does contains a specific string. -def stderr_contains(ctx, pattern=None): - stderr = ctx.get("stderr", "") - if pattern not in stderr: - print("pattern:", repr(pattern)) - print("stderr:", repr(stderr)) - print("ctx:", ctx.as_dict()) - assert_eq(pattern in stderr, True) - - -# Check that stderr of latest runcmd does not contain a specific string. -def stderr_does_not_contain(ctx, pattern=None): - stderr = ctx.get("stderr", "") - if pattern not in stderr: - print("pattern:", repr(pattern)) - print("stderr:", repr(stderr)) - print("ctx:", ctx.as_dict()) - assert_eq(pattern not in stderr, True) + ns["argv"] = argv + ns["stdout.raw"] = stdout + ns["stderr.raw"] = stderr + ns["stdout"] = stdout.decode("utf-8") + ns["stderr"] = stderr.decode("utf-8") + ns["exit"] = p.returncode + logging.debug(f" ctx: {ctx}") + logging.debug(f" ns: {ns}") + + +# Step: prepend srcdir to PATH whenever runcmd runs a command. +def runcmd_helper_srcdir_path(ctx): + srcdir = globals()["srcdir"] + runcmd_prepend_to_path(ctx, srcdir) + + +# Step: This creates a helper script. +def runcmd_helper_script(ctx, filename=None): + get_file = globals()["get_file"] + with open(filename, "wb") as f: + f.write(get_file(filename)) + + +# +# Step functions for running commands. +# + + +def runcmd_prepend_to_path(ctx, dirname=None): + ns = ctx.declare("_runcmd") + pp = ns.get("path-prefix", "") + if pp: + pp = f"{pp}:{dirname}" + else: + pp = dirname + ns["path-prefix"] = pp + + +def runcmd_step(ctx, argv0=None, args=None): + runcmd_try_to_run(ctx, argv0=argv0, args=args) + runcmd_exit_code_is_zero(ctx) + + +def runcmd_try_to_run(ctx, argv0=None, args=None): + argv = [shlex.quote(argv0)] + shlex.split(args) + runcmd_run(ctx, argv) + + +# +# Step functions for examining exit codes. +# + + +def runcmd_exit_code_is_zero(ctx): + runcmd_exit_code_is(ctx, exit=0) + + +def runcmd_exit_code_is(ctx, exit=None): + assert_eq = globals()["assert_eq"] + ns = ctx.declare("_runcmd") + assert_eq(ns["exit"], int(exit)) + + +def runcmd_exit_code_is_nonzero(ctx): + runcmd_exit_code_is_not(ctx, exit=0) + + +def runcmd_exit_code_is_not(ctx, exit=None): + assert_ne = globals()["assert_ne"] + ns = ctx.declare("_runcmd") + assert_ne(ns["exit"], int(exit)) + + +# +# Step functions and helpers for examining output in various ways. +# + + +def runcmd_stdout_is(ctx, text=None): + ns = ctx.declare("_runcmd") + _runcmd_output_is(ns["stdout"], text) + + +def runcmd_stdout_isnt(ctx, text=None): + ns = ctx.declare("_runcmd") + _runcmd_output_isnt(ns["stdout"], text) + + +def runcmd_stderr_is(ctx, text=None): + ns = ctx.declare("_runcmd") + _runcmd_output_is(ns["stderr"], text) + + +def runcmd_stderr_isnt(ctx, text=None): + ns = ctx.declare("_runcmd") + _runcmd_output_isnt(ns["stderr"], text) + + +def _runcmd_output_is(actual, wanted): + assert_eq = globals()["assert_eq"] + wanted = bytes(wanted, "utf8").decode("unicode_escape") + logging.debug("_runcmd_output_is:") + logging.debug(f" actual: {actual!r}") + logging.debug(f" wanted: {wanted!r}") + assert_eq(actual, wanted) + + +def _runcmd_output_isnt(actual, wanted): + assert_ne = globals()["assert_ne"] + wanted = bytes(wanted, "utf8").decode("unicode_escape") + logging.debug("_runcmd_output_isnt:") + logging.debug(f" actual: {actual!r}") + logging.debug(f" wanted: {wanted!r}") + assert_ne(actual, wanted) + + +def runcmd_stdout_contains(ctx, text=None): + ns = ctx.declare("_runcmd") + _runcmd_output_contains(ns["stdout"], text) + + +def runcmd_stdout_doesnt_contain(ctx, text=None): + ns = ctx.declare("_runcmd") + _runcmd_output_doesnt_contain(ns["stdout"], text) + + +def runcmd_stderr_contains(ctx, text=None): + ns = ctx.declare("_runcmd") + _runcmd_output_contains(ns["stderr"], text) + + +def runcmd_stderr_doesnt_contain(ctx, text=None): + ns = ctx.declare("_runcmd") + _runcmd_output_doesnt_contain(ns["stderr"], text) + + +def _runcmd_output_contains(actual, wanted): + assert_eq = globals()["assert_eq"] + wanted = bytes(wanted, "utf8").decode("unicode_escape") + logging.debug("_runcmd_output_contains:") + logging.debug(f" actual: {actual!r}") + logging.debug(f" wanted: {wanted!r}") + assert_eq(wanted in actual, True) + + +def _runcmd_output_doesnt_contain(actual, wanted): + assert_ne = globals()["assert_ne"] + wanted = bytes(wanted, "utf8").decode("unicode_escape") + logging.debug("_runcmd_output_doesnt_contain:") + logging.debug(f" actual: {actual!r}") + logging.debug(f" wanted: {wanted!r}") + assert_ne(wanted in actual, True) + + +def runcmd_stdout_matches_regex(ctx, regex=None): + ns = ctx.declare("_runcmd") + _runcmd_output_matches_regex(ns["stdout"], regex) + + +def runcmd_stdout_doesnt_match_regex(ctx, regex=None): + ns = ctx.declare("_runcmd") + _runcmd_output_doesnt_match_regex(ns["stdout"], regex) + + +def runcmd_stderr_matches_regex(ctx, regex=None): + ns = ctx.declare("_runcmd") + _runcmd_output_matches_regex(ns["stderr"], regex) + + +def runcmd_stderr_doesnt_match_regex(ctx, regex=None): + ns = ctx.declare("_runcmd") + _runcmd_output_doesnt_match_regex(ns["stderr"], regex) + + +def _runcmd_output_matches_regex(actual, regex): + assert_ne = globals()["assert_ne"] + r = re.compile(regex) + m = r.search(actual) + logging.debug("_runcmd_output_matches_regex:") + logging.debug(f" actual: {actual!r}") + logging.debug(f" regex: {regex!r}") + logging.debug(f" match: {m}") + assert_ne(m, None) + + +def _runcmd_output_doesnt_match_regex(actual, regex): + assert_eq = globals()["assert_eq"] + r = re.compile(regex) + m = r.search(actual) + logging.debug("_runcmd_output_doesnt_match_regex:") + logging.debug(f" actual: {actual!r}") + logging.debug(f" regex: {regex!r}") + logging.debug(f" match: {m}") + assert_eq(m, None) -- cgit v1.2.1