From 820a97f0144458fb03fb169f904d385a38152022 Mon Sep 17 00:00:00 2001 From: Lars Wirzenius Date: Sun, 1 Nov 2020 10:34:30 +0200 Subject: chore: update vendored copy of runcmd.py from Subplot --- subplot/runcmd.py | 312 ++++++++++++++++++++++++++++++++++++++++------------- subplot/subplot.py | 16 +-- 2 files changed, 247 insertions(+), 81 deletions(-) (limited to 'subplot') diff --git a/subplot/runcmd.py b/subplot/runcmd.py index 7193c15..532b60b 100644 --- a/subplot/runcmd.py +++ b/subplot/runcmd.py @@ -1,77 +1,243 @@ -# Some step implementations for running commands and capturing the result. - +import logging +import os +import re +import shlex import subprocess -# Run a command, capture its stdout, stderr, and exit code in context. -def runcmd(ctx, argv, **kwargs): - p = subprocess.Popen(argv, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs) +# +# Helper functions. +# + +# Get exit code or other stored data about the latest command run by +# runcmd_run. + + +def _runcmd_get(ctx, name): + ns = ctx.declare("_runcmd") + return ns[name] + + +def runcmd_get_exit_code(ctx): + return _runcmd_get(ctx, "exit") + + +def runcmd_get_stdout(ctx): + return _runcmd_get(ctx, "stdout") + + +def runcmd_get_stdout_raw(ctx): + return _runcmd_get(ctx, "stdout.raw") + + +def runcmd_get_stderr(ctx): + return _runcmd_get(ctx, "stderr") + + +def runcmd_get_stderr_raw(ctx): + return _runcmd_get(ctx, "stderr.raw") + + +def runcmd_get_argv(ctx): + return _runcmd_get(ctx, "argv") + + +# Run a command, given an argv and other arguments for subprocess.Popen. +# +# This is meant to be a helper function, not bound directly to a step. The +# stdout, stderr, and exit code are stored in the "_runcmd" namespace in the +# ctx context. +def runcmd_run(ctx, argv, **kwargs): + ns = ctx.declare("_runcmd") + env = dict(os.environ) + pp = ns.get("path-prefix") + if pp: + env["PATH"] = pp + ":" + env["PATH"] + + logging.debug(f"runcmd_run") + logging.debug(f" argv: {argv}") + logging.debug(f" env: {env}") + p = subprocess.Popen( + argv, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env, **kwargs + ) stdout, stderr = p.communicate("") - ctx["argv"] = argv - ctx["stdout"] = stdout.decode("utf-8") - ctx["stderr"] = stderr.decode("utf-8") - ctx["exit"] = p.returncode - - -# Check that latest exit code captured by runcmd was a specific one. -def exit_code_is(ctx, wanted): - if ctx.get("exit") != wanted: - print("context:", ctx.as_dict()) - assert_eq(ctx.get("exit"), wanted) - - -# Check that latest exit code captured by runcmd was not a specific one. -def exit_code_is_not(ctx, unwanted): - if ctx.get("exit") == unwanted: - print("context:", ctx.as_dict()) - assert_ne(ctx.get("exit"), unwanted) - - -# Check that latest exit code captured by runcmd was zero. -def exit_code_zero(ctx): - exit_code_is(ctx, 0) - - -# Check that latest exit code captured by runcmd was not zero. -def exit_code_nonzero(ctx): - exit_code_is_not(ctx, 0) - - -# Check that stdout of latest runcmd contains a specific string. -def stdout_contains(ctx, pattern=None): - stdout = ctx.get("stdout", "") - if pattern not in stdout: - print("pattern:", repr(pattern)) - print("stdout:", repr(stdout)) - print("ctx:", ctx.as_dict()) - assert_eq(pattern in stdout, True) - - -# Check that stdout of latest runcmd does not contain a specific string. -def stdout_does_not_contain(ctx, pattern=None): - stdout = ctx.get("stdout", "") - if pattern in stdout: - print("pattern:", repr(pattern)) - print("stdout:", repr(stdout)) - print("ctx:", ctx.as_dict()) - assert_eq(pattern not in stdout, True) - - -# Check that stderr of latest runcmd does contains a specific string. -def stderr_contains(ctx, pattern=None): - stderr = ctx.get("stderr", "") - if pattern not in stderr: - print("pattern:", repr(pattern)) - print("stderr:", repr(stderr)) - print("ctx:", ctx.as_dict()) - assert_eq(pattern in stderr, True) - - -# Check that stderr of latest runcmd does not contain a specific string. -def stderr_does_not_contain(ctx, pattern=None): - stderr = ctx.get("stderr", "") - if pattern not in stderr: - print("pattern:", repr(pattern)) - print("stderr:", repr(stderr)) - print("ctx:", ctx.as_dict()) - assert_eq(pattern not in stderr, True) + ns["argv"] = argv + ns["stdout.raw"] = stdout + ns["stderr.raw"] = stderr + ns["stdout"] = stdout.decode("utf-8") + ns["stderr"] = stderr.decode("utf-8") + ns["exit"] = p.returncode + logging.debug(f" ctx: {ctx}") + logging.debug(f" ns: {ns}") + + +# Step: prepend srcdir to PATH whenever runcmd runs a command. +def runcmd_helper_srcdir_path(ctx): + srcdir = globals()["srcdir"] + runcmd_prepend_to_path(ctx, srcdir) + + +# Step: This creates a helper script. +def runcmd_helper_script(ctx, filename=None): + get_file = globals()["get_file"] + with open(filename, "wb") as f: + f.write(get_file(filename)) + + +# +# Step functions for running commands. +# + + +def runcmd_prepend_to_path(ctx, dirname=None): + ns = ctx.declare("_runcmd") + pp = ns.get("path-prefix", "") + if pp: + pp = f"{pp}:{dirname}" + else: + pp = dirname + ns["path-prefix"] = pp + + +def runcmd_step(ctx, argv0=None, args=None): + runcmd_try_to_run(ctx, argv0=argv0, args=args) + runcmd_exit_code_is_zero(ctx) + + +def runcmd_try_to_run(ctx, argv0=None, args=None): + argv = [shlex.quote(argv0)] + shlex.split(args) + runcmd_run(ctx, argv) + + +# +# Step functions for examining exit codes. +# + + +def runcmd_exit_code_is_zero(ctx): + runcmd_exit_code_is(ctx, exit=0) + + +def runcmd_exit_code_is(ctx, exit=None): + assert_eq = globals()["assert_eq"] + assert_eq(runcmd_get_exit_code(ctx), int(exit)) + + +def runcmd_exit_code_is_nonzero(ctx): + runcmd_exit_code_is_not(ctx, exit=0) + + +def runcmd_exit_code_is_not(ctx, exit=None): + assert_ne = globals()["assert_ne"] + assert_ne(runcmd_get_exit_code(ctx), int(exit)) + + +# +# Step functions and helpers for examining output in various ways. +# + + +def runcmd_stdout_is(ctx, text=None): + _runcmd_output_is(runcmd_get_stdout(ctx), text) + + +def runcmd_stdout_isnt(ctx, text=None): + _runcmd_output_isnt(runcmd_get_stdout(ctx), text) + + +def runcmd_stderr_is(ctx, text=None): + _runcmd_output_is(runcmd_get_stderr(ctx), text) + + +def runcmd_stderr_isnt(ctx, text=None): + _runcmd_output_isnt(runcmd_get_stderr(ctx), text) + + +def _runcmd_output_is(actual, wanted): + assert_eq = globals()["assert_eq"] + wanted = bytes(wanted, "utf8").decode("unicode_escape") + logging.debug("_runcmd_output_is:") + logging.debug(f" actual: {actual!r}") + logging.debug(f" wanted: {wanted!r}") + assert_eq(actual, wanted) + + +def _runcmd_output_isnt(actual, wanted): + assert_ne = globals()["assert_ne"] + wanted = bytes(wanted, "utf8").decode("unicode_escape") + logging.debug("_runcmd_output_isnt:") + logging.debug(f" actual: {actual!r}") + logging.debug(f" wanted: {wanted!r}") + assert_ne(actual, wanted) + + +def runcmd_stdout_contains(ctx, text=None): + _runcmd_output_contains(runcmd_get_stdout(ctx), text) + + +def runcmd_stdout_doesnt_contain(ctx, text=None): + _runcmd_output_doesnt_contain(runcmd_get_stdout(ctx), text) + + +def runcmd_stderr_contains(ctx, text=None): + _runcmd_output_contains(runcmd_get_stderr(ctx), text) + + +def runcmd_stderr_doesnt_contain(ctx, text=None): + _runcmd_output_doesnt_contain(runcmd_get_stderr(ctx), text) + + +def _runcmd_output_contains(actual, wanted): + assert_eq = globals()["assert_eq"] + wanted = bytes(wanted, "utf8").decode("unicode_escape") + logging.debug("_runcmd_output_contains:") + logging.debug(f" actual: {actual!r}") + logging.debug(f" wanted: {wanted!r}") + assert_eq(wanted in actual, True) + + +def _runcmd_output_doesnt_contain(actual, wanted): + assert_ne = globals()["assert_ne"] + wanted = bytes(wanted, "utf8").decode("unicode_escape") + logging.debug("_runcmd_output_doesnt_contain:") + logging.debug(f" actual: {actual!r}") + logging.debug(f" wanted: {wanted!r}") + assert_ne(wanted in actual, True) + + +def runcmd_stdout_matches_regex(ctx, regex=None): + _runcmd_output_matches_regex(runcmd_get_stdout(ctx), regex) + + +def runcmd_stdout_doesnt_match_regex(ctx, regex=None): + _runcmd_output_doesnt_match_regex(runcmd_get_stdout(ctx), regex) + + +def runcmd_stderr_matches_regex(ctx, regex=None): + _runcmd_output_matches_regex(runcmd_get_stderr(ctx), regex) + + +def runcmd_stderr_doesnt_match_regex(ctx, regex=None): + _runcmd_output_doesnt_match_regex(runcmd_get_stderr(ctx), regex) + + +def _runcmd_output_matches_regex(actual, regex): + assert_ne = globals()["assert_ne"] + r = re.compile(regex) + m = r.search(actual) + logging.debug("_runcmd_output_matches_regex:") + logging.debug(f" actual: {actual!r}") + logging.debug(f" regex: {regex!r}") + logging.debug(f" match: {m}") + assert_ne(m, None) + + +def _runcmd_output_doesnt_match_regex(actual, regex): + assert_eq = globals()["assert_eq"] + r = re.compile(regex) + m = r.search(actual) + logging.debug("_runcmd_output_doesnt_match_regex:") + logging.debug(f" actual: {actual!r}") + logging.debug(f" regex: {regex!r}") + logging.debug(f" match: {m}") + assert_eq(m, None) diff --git a/subplot/subplot.py b/subplot/subplot.py index 537c353..08472bd 100644 --- a/subplot/subplot.py +++ b/subplot/subplot.py @@ -85,7 +85,7 @@ def set_vars_file(ctx, filename=None): def try_playbook(ctx): - runcmd = globals()["runcmd"] + runcmd_run = globals()["runcmd_run"] assert_ne = globals()["assert_ne"] srcdir = globals()["srcdir"] @@ -126,20 +126,20 @@ def try_playbook(ctx): "playbook.yml", ] - runcmd(ctx, argv, env=env) + runcmd_run(ctx, argv, env=env) def command_fails(ctx): - exit_code_nonzero = globals()["exit_code_nonzero"] - exit_code_nonzero(ctx) + runcmd_exit_code_is_nonzero = globals()["runcmd_exit_code_is_nonzero"] + runcmd_exit_code_is_nonzero(ctx) def run_playbook(ctx): - exit_code_zero = globals()["exit_code_zero"] + runcmd_exit_code_is_zero = globals()["runcmd_exit_code_is_zero"] try_playbook(ctx) - exit_code_zero(ctx) + runcmd_exit_code_is_zero(ctx) def xstdout_contains(ctx, text=None): - stdout_contains = globals()["stdout_contains"] - stdout_contains(ctx, pattern=text) + runcmd_stdout_contains = globals()["runcmd_stdout_contains"] + runcmd_stdout_contains(ctx, text=text) -- cgit v1.2.1