diff options
author | Lars Wirzenius <liw@liw.fi> | 2020-10-14 09:35:39 +0300 |
---|---|---|
committer | Lars Wirzenius <liw@liw.fi> | 2020-10-14 09:35:39 +0300 |
commit | 739d624d6f60776a9baba905e23c818d23dc52e0 (patch) | |
tree | 55594594602a8a2647dc79b2d0eb0f528142a4f6 /subplot/runcmd.py | |
parent | 8cc1a68d733762ad6baaec439360a84ab8450d3a (diff) | |
download | ewww-739d624d6f60776a9baba905e23c818d23dc52e0.tar.gz |
chore: update subplot/runcmd.* from Subplot
Also fix everything that needs fixing. Tests pass.
Diffstat (limited to 'subplot/runcmd.py')
-rw-r--r-- | subplot/runcmd.py | 272 |
1 files changed, 221 insertions, 51 deletions
diff --git a/subplot/runcmd.py b/subplot/runcmd.py index 631d8c0..532b60b 100644 --- a/subplot/runcmd.py +++ b/subplot/runcmd.py @@ -1,73 +1,243 @@ -# Some step implementations for running commands and capturing the result. - import logging import os +import re +import shlex import subprocess -# Run a command, capture its stdout, stderr, and exit code in context. -def runcmd(ctx, argv, **kwargs): - logging.debug(f"Running program") - logging.debug(f" cwd={os.getcwd()}") - logging.debug(f" argv={argv}") - logging.debug(f" kwargs={argv}") - p = subprocess.Popen(argv, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs) +# +# Helper functions. +# + +# Get exit code or other stored data about the latest command run by +# runcmd_run. + + +def _runcmd_get(ctx, name): + ns = ctx.declare("_runcmd") + return ns[name] + + +def runcmd_get_exit_code(ctx): + return _runcmd_get(ctx, "exit") + + +def runcmd_get_stdout(ctx): + return _runcmd_get(ctx, "stdout") + + +def runcmd_get_stdout_raw(ctx): + return _runcmd_get(ctx, "stdout.raw") + + +def runcmd_get_stderr(ctx): + return _runcmd_get(ctx, "stderr") + + +def runcmd_get_stderr_raw(ctx): + return _runcmd_get(ctx, "stderr.raw") + + +def runcmd_get_argv(ctx): + return _runcmd_get(ctx, "argv") + + +# Run a command, given an argv and other arguments for subprocess.Popen. +# +# This is meant to be a helper function, not bound directly to a step. The +# stdout, stderr, and exit code are stored in the "_runcmd" namespace in the +# ctx context. +def runcmd_run(ctx, argv, **kwargs): + ns = ctx.declare("_runcmd") + env = dict(os.environ) + pp = ns.get("path-prefix") + if pp: + env["PATH"] = pp + ":" + env["PATH"] + + logging.debug(f"runcmd_run") + logging.debug(f" argv: {argv}") + logging.debug(f" env: {env}") + p = subprocess.Popen( + argv, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env, **kwargs + ) stdout, stderr = p.communicate("") - ctx["argv"] = argv - ctx["stdout"] = stdout.decode("utf-8") - ctx["stderr"] = stderr.decode("utf-8") - ctx["exit"] = p.returncode + ns["argv"] = argv + ns["stdout.raw"] = stdout + ns["stderr.raw"] = stderr + ns["stdout"] = stdout.decode("utf-8") + ns["stderr"] = stderr.decode("utf-8") + ns["exit"] = p.returncode + logging.debug(f" ctx: {ctx}") + logging.debug(f" ns: {ns}") + + +# Step: prepend srcdir to PATH whenever runcmd runs a command. +def runcmd_helper_srcdir_path(ctx): + srcdir = globals()["srcdir"] + runcmd_prepend_to_path(ctx, srcdir) + + +# Step: This creates a helper script. +def runcmd_helper_script(ctx, filename=None): + get_file = globals()["get_file"] + with open(filename, "wb") as f: + f.write(get_file(filename)) + + +# +# Step functions for running commands. +# + + +def runcmd_prepend_to_path(ctx, dirname=None): + ns = ctx.declare("_runcmd") + pp = ns.get("path-prefix", "") + if pp: + pp = f"{pp}:{dirname}" + else: + pp = dirname + ns["path-prefix"] = pp + + +def runcmd_step(ctx, argv0=None, args=None): + runcmd_try_to_run(ctx, argv0=argv0, args=args) + runcmd_exit_code_is_zero(ctx) + + +def runcmd_try_to_run(ctx, argv0=None, args=None): + argv = [shlex.quote(argv0)] + shlex.split(args) + runcmd_run(ctx, argv) + + +# +# Step functions for examining exit codes. +# + + +def runcmd_exit_code_is_zero(ctx): + runcmd_exit_code_is(ctx, exit=0) + + +def runcmd_exit_code_is(ctx, exit=None): + assert_eq = globals()["assert_eq"] + assert_eq(runcmd_get_exit_code(ctx), int(exit)) + + +def runcmd_exit_code_is_nonzero(ctx): + runcmd_exit_code_is_not(ctx, exit=0) + + +def runcmd_exit_code_is_not(ctx, exit=None): + assert_ne = globals()["assert_ne"] + assert_ne(runcmd_get_exit_code(ctx), int(exit)) + + +# +# Step functions and helpers for examining output in various ways. +# + + +def runcmd_stdout_is(ctx, text=None): + _runcmd_output_is(runcmd_get_stdout(ctx), text) + + +def runcmd_stdout_isnt(ctx, text=None): + _runcmd_output_isnt(runcmd_get_stdout(ctx), text) + + +def runcmd_stderr_is(ctx, text=None): + _runcmd_output_is(runcmd_get_stderr(ctx), text) + + +def runcmd_stderr_isnt(ctx, text=None): + _runcmd_output_isnt(runcmd_get_stderr(ctx), text) + + +def _runcmd_output_is(actual, wanted): + assert_eq = globals()["assert_eq"] + wanted = bytes(wanted, "utf8").decode("unicode_escape") + logging.debug("_runcmd_output_is:") + logging.debug(f" actual: {actual!r}") + logging.debug(f" wanted: {wanted!r}") + assert_eq(actual, wanted) + + +def _runcmd_output_isnt(actual, wanted): + assert_ne = globals()["assert_ne"] + wanted = bytes(wanted, "utf8").decode("unicode_escape") + logging.debug("_runcmd_output_isnt:") + logging.debug(f" actual: {actual!r}") + logging.debug(f" wanted: {wanted!r}") + assert_ne(actual, wanted) + + +def runcmd_stdout_contains(ctx, text=None): + _runcmd_output_contains(runcmd_get_stdout(ctx), text) + + +def runcmd_stdout_doesnt_contain(ctx, text=None): + _runcmd_output_doesnt_contain(runcmd_get_stdout(ctx), text) + + +def runcmd_stderr_contains(ctx, text=None): + _runcmd_output_contains(runcmd_get_stderr(ctx), text) + + +def runcmd_stderr_doesnt_contain(ctx, text=None): + _runcmd_output_doesnt_contain(runcmd_get_stderr(ctx), text) -# Check that latest exit code captured by runcmd was a specific one. -def exit_code_is(ctx, wanted): - logging.debug(f"Verifying exit code is {wanted} (it is {ctx.get('exit')})") - assert_eq(ctx.get("exit"), wanted) +def _runcmd_output_contains(actual, wanted): + assert_eq = globals()["assert_eq"] + wanted = bytes(wanted, "utf8").decode("unicode_escape") + logging.debug("_runcmd_output_contains:") + logging.debug(f" actual: {actual!r}") + logging.debug(f" wanted: {wanted!r}") + assert_eq(wanted in actual, True) -# Check that latest exit code captured by runcmd was not a specific one. -def exit_code_is_not(ctx, unwanted): - logging.debug(f"Verifying exit code is NOT {unwanted} (it is {ctx.get('exit')})") - assert_ne(ctx.get("exit"), wanted) +def _runcmd_output_doesnt_contain(actual, wanted): + assert_ne = globals()["assert_ne"] + wanted = bytes(wanted, "utf8").decode("unicode_escape") + logging.debug("_runcmd_output_doesnt_contain:") + logging.debug(f" actual: {actual!r}") + logging.debug(f" wanted: {wanted!r}") + assert_ne(wanted in actual, True) -# Check that latest exit code captured by runcmd was zero. -def exit_code_zero(ctx): - exit_code_is(ctx, 0) +def runcmd_stdout_matches_regex(ctx, regex=None): + _runcmd_output_matches_regex(runcmd_get_stdout(ctx), regex) -# Check that latest exit code captured by runcmd was not zero. -def exit_code_nonzero(ctx): - exit_code_is_not(ctx, 0) +def runcmd_stdout_doesnt_match_regex(ctx, regex=None): + _runcmd_output_doesnt_match_regex(runcmd_get_stdout(ctx), regex) -# Check that stdout of latest runcmd contains a specific string. -def stdout_contains(ctx, pattern=None): - logging.debug(f"Verifying stdout contains {pattern}") - logging.debug(f" stdout is {ctx.get('stdout', '')})") - stdout = ctx.get("stdout", "") - assert_eq(pattern in stdout, True) +def runcmd_stderr_matches_regex(ctx, regex=None): + _runcmd_output_matches_regex(runcmd_get_stderr(ctx), regex) -# Check that stdout of latest runcmd does not contain a specific string. -def stdout_does_not_contain(ctx, pattern=None): - logging.debug(f"Verifying stdout does NOT contain {pattern}") - logging.debug(f" stdout is {ctx.get('stdout', '')})") - stdout = ctx.get("stdout", "") - assert_eq(pattern not in stdout, True) +def runcmd_stderr_doesnt_match_regex(ctx, regex=None): + _runcmd_output_doesnt_match_regex(runcmd_get_stderr(ctx), regex) -# Check that stderr of latest runcmd does contains a specific string. -def stderr_contains(ctx, pattern=None): - logging.debug(f"Verifying stderr contains {pattern}") - logging.debug(f" stderr is {ctx.get('stderr', '')})") - stderr = ctx.get("stderr", "") - assert_eq(pattern in stderr, True) +def _runcmd_output_matches_regex(actual, regex): + assert_ne = globals()["assert_ne"] + r = re.compile(regex) + m = r.search(actual) + logging.debug("_runcmd_output_matches_regex:") + logging.debug(f" actual: {actual!r}") + logging.debug(f" regex: {regex!r}") + logging.debug(f" match: {m}") + assert_ne(m, None) -# Check that stderr of latest runcmd does not contain a specific string. -def stderr_does_not_contain(ctx, pattern=None): - logging.debug(f"Verifying stderr does NOT contain {pattern}") - logging.debug(f" stderr is {ctx.get('stderr', '')})") - stderr = ctx.get("stderr", "") - assert_eq(pattern not in stderr, True) +def _runcmd_output_doesnt_match_regex(actual, regex): + assert_eq = globals()["assert_eq"] + r = re.compile(regex) + m = r.search(actual) + logging.debug("_runcmd_output_doesnt_match_regex:") + logging.debug(f" actual: {actual!r}") + logging.debug(f" regex: {regex!r}") + logging.debug(f" match: {m}") + assert_eq(m, None) |